feat: implement SSE for data source node processing and completion events, replacing previous run methods

This commit is contained in:
twwu
2025-06-18 15:06:50 +08:00
parent 4b3a54633f
commit f85e6a0dea
6 changed files with 115 additions and 168 deletions

View File

@ -1,12 +1,14 @@
import { useCallback, useEffect, useMemo, useRef, useState } from 'react'
import { useCallback, useEffect, useMemo, useState } from 'react'
import WorkspaceSelector from '@/app/components/base/notion-page-selector/workspace-selector'
import SearchInput from '@/app/components/base/notion-page-selector/search-input'
import PageSelector from '@/app/components/base/notion-page-selector/page-selector'
import type { DataSourceNotionPageMap, DataSourceNotionWorkspace, NotionPage } from '@/models/common'
import Header from '@/app/components/datasets/create/website/base/header'
import { useDatasetDetailContextWithSelector } from '@/context/dataset-detail'
import { useDraftDatasourceNodeRun, usePublishedDatasourceNodeRun } from '@/service/use-pipeline'
import { DatasourceType } from '@/models/pipeline'
import { ssePost } from '@/service/base'
import Toast from '@/app/components/base/toast'
import type { DataSourceNodeCompletedResponse } from '@/types/pipeline'
type OnlineDocumentSelectorProps = {
value?: string[]
@ -33,28 +35,37 @@ const OnlineDocumentSelector = ({
nodeId,
headerInfo,
}: OnlineDocumentSelectorProps) => {
const pipeline_id = useDatasetDetailContextWithSelector(s => s.dataset?.pipeline_id)
const pipelineId = useDatasetDetailContextWithSelector(s => s.dataset?.pipeline_id)
const [documentsData, setDocumentsData] = useState<DataSourceNotionWorkspace[]>([])
const [searchValue, setSearchValue] = useState('')
const [currentWorkspaceId, setCurrentWorkspaceId] = useState('')
const useDatasourceNodeRun = useRef(!isInPipeline ? usePublishedDatasourceNodeRun : useDraftDatasourceNodeRun)
const { mutateAsync: crawlOnlineDocuments } = useDatasourceNodeRun.current()
const datasourceNodeRunURL = !isInPipeline
? `/rag/pipelines/${pipelineId}/workflows/published/datasource/nodes/${nodeId}/run`
: `/rag/pipelines/${pipelineId}/workflows/draft/datasource/nodes/${nodeId}/run`
const getOnlineDocuments = useCallback(async () => {
if (pipeline_id) {
await crawlOnlineDocuments({
pipeline_id,
node_id: nodeId,
inputs: {},
datasource_type: DatasourceType.onlineDocument,
}, {
onSuccess(documentsData) {
setDocumentsData(documentsData.result as DataSourceNotionWorkspace[])
ssePost(
datasourceNodeRunURL,
{
body: {
inputs: {},
datasource_type: DatasourceType.onlineDocument,
},
})
}
}, [crawlOnlineDocuments, nodeId, pipeline_id])
},
{
onDataSourceNodeCompleted: (documentsData: DataSourceNodeCompletedResponse) => {
setDocumentsData(documentsData.data as DataSourceNotionWorkspace[])
},
onError: (message: string) => {
Toast.notify({
type: 'error',
message,
})
},
},
)
}, [datasourceNodeRunURL])
useEffect(() => {
getOnlineDocuments()

View File

@ -8,16 +8,16 @@ import Crawling from './crawling'
import ErrorMessage from './error-message'
import CrawledResult from './crawled-result'
import {
useDraftDatasourceNodeRun,
useDraftDatasourceNodeRunStatus,
useDraftPipelinePreProcessingParams,
usePublishedDatasourceNodeRun,
usePublishedDatasourceNodeRunStatus,
usePublishedPipelinePreProcessingParams,
} from '@/service/use-pipeline'
import { useDatasetDetailContextWithSelector } from '@/context/dataset-detail'
import { DatasourceType } from '@/models/pipeline'
import { sleep } from '@/utils'
import { ssePost } from '@/service/base'
import type {
DataSourceNodeCompletedResponse,
DataSourceNodeProcessingResponse,
} from '@/types/pipeline'
const I18N_PREFIX = 'datasetCreation.stepOne.website'
@ -51,6 +51,8 @@ const Crawler = ({
const { t } = useTranslation()
const [step, setStep] = useState<Step>(Step.init)
const [controlFoldOptions, setControlFoldOptions] = useState<number>(0)
const [totalNum, setTotalNum] = useState(0)
const [crawledNum, setCrawledNum] = useState(0)
const pipelineId = useDatasetDetailContextWithSelector(s => s.dataset?.pipeline_id)
const usePreProcessingParams = useRef(!isInPipeline ? usePublishedPipelinePreProcessingParams : useDraftPipelinePreProcessingParams)
@ -68,66 +70,49 @@ const Crawler = ({
const isCrawlFinished = step === Step.finished
const isRunning = step === Step.running
const [crawlResult, setCrawlResult] = useState<{
result: CrawlResultItem[]
data: CrawlResultItem[]
time_consuming: number | string
} | undefined>(undefined)
const [crawlErrorMessage, setCrawlErrorMessage] = useState('')
const showError = isCrawlFinished && crawlErrorMessage
const useDatasourceNodeRun = useRef(!isInPipeline ? usePublishedDatasourceNodeRun : useDraftDatasourceNodeRun)
const useDatasourceNodeRunStatus = useRef(!isInPipeline ? usePublishedDatasourceNodeRunStatus : useDraftDatasourceNodeRunStatus)
const { mutateAsync: runDatasourceNode } = useDatasourceNodeRun.current()
const { mutateAsync: getDatasourceNodeRunStatus } = useDatasourceNodeRunStatus.current()
const checkCrawlStatus = useCallback(async (jobId: string) => {
const res = await getDatasourceNodeRunStatus({
node_id: nodeId,
pipeline_id: pipelineId!,
job_id: jobId,
datasource_type: DatasourceType.websiteCrawl,
}, {
onError: async (error: any) => {
const message = await error.json()
setCrawlErrorMessage(message || t(`${I18N_PREFIX}.unknownError`))
},
}) as any
if (res.status === 'completed') {
setCrawlResult(res)
onCheckedCrawlResultChange(res.result || []) // default select the crawl result
setCrawlErrorMessage('')
setStep(Step.finished)
}
else if (res.status === 'processing') {
await sleep(2500)
await checkCrawlStatus(jobId)
}
}, [getDatasourceNodeRunStatus, nodeId, pipelineId, t, onCheckedCrawlResultChange])
const datasourceNodeRunURL = !isInPipeline
? `/rag/pipelines/${pipelineId}/workflows/published/datasource/nodes/${nodeId}/run`
: `/rag/pipelines/${pipelineId}/workflows/draft/datasource/nodes/${nodeId}/run`
const handleRun = useCallback(async (value: Record<string, any>) => {
setStep(Step.running)
const res = await runDatasourceNode({
node_id: nodeId,
pipeline_id: pipelineId!,
inputs: value,
datasource_type: DatasourceType.websiteCrawl,
}, {
onError: async (error: any) => {
const message = await error.json()
setCrawlErrorMessage(message || t(`${I18N_PREFIX}.unknownError`))
setStep(Step.finished)
ssePost(
datasourceNodeRunURL,
{
body: {
inputs: value,
datasource_type: DatasourceType.websiteCrawl,
response_mode: 'streaming',
},
},
}) as any
const jobId = res.job_id
if (!jobId && res.status === 'completed') {
setCrawlResult(res)
onCheckedCrawlResultChange(res.result || []) // default select the crawl result
setStep(Step.finished)
}
else if (jobId) {
await checkCrawlStatus(jobId)
}
setCrawlErrorMessage('')
}, [runDatasourceNode, nodeId, pipelineId, onCheckedCrawlResultChange, checkCrawlStatus, t])
{
onDataSourceNodeProcessing: (data: DataSourceNodeProcessingResponse) => {
setTotalNum(data.total ?? 0)
setCrawledNum(data.completed ?? 0)
},
onDataSourceNodeCompleted: (data: DataSourceNodeCompletedResponse) => {
const { data: crawlData, time_consuming } = data
setCrawlResult({
data: crawlData as CrawlResultItem[],
time_consuming: time_consuming ?? 0,
})
onCheckedCrawlResultChange(crawlData || []) // default select the crawl result
setCrawlErrorMessage('')
setStep(Step.finished)
},
onError: (message: string) => {
setCrawlErrorMessage(message || t(`${I18N_PREFIX}.unknownError`))
setStep(Step.finished)
},
},
)
}, [datasourceNodeRunURL, onCheckedCrawlResultChange, t])
const handleSubmit = useCallback((value: Record<string, any>) => {
handleRun(value)
@ -152,8 +137,8 @@ const Crawler = ({
<div className='relative flex flex-col'>
{isRunning && (
<Crawling
crawledNum={0}
totalNum={0}
crawledNum={crawledNum}
totalNum={totalNum}
/>
)}
{showError && (
@ -166,7 +151,7 @@ const Crawler = ({
{isCrawlFinished && !showError && (
<CrawledResult
className='mt-2'
list={crawlResult?.result || []}
list={crawlResult?.data || []}
checkedList={checkedCrawlResult}
onSelectedChange={onCheckedCrawlResultChange}
usedTime={Number.parseFloat(crawlResult?.time_consuming as string) || 0}