From 378577767beabe22da4dd10fd9132b1efbbf71ef Mon Sep 17 00:00:00 2001 From: Coding On Star <447357187@qq.com> Date: Mon, 16 Mar 2026 14:42:32 +0800 Subject: [PATCH] refactor(web): split text-generation result flow and raise coverage (#33499) Co-authored-by: CodingOnStar Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .../use-text-generation-batch.spec.ts | 4 +- .../hooks/use-text-generation-batch.ts | 5 +- .../result/__tests__/index.spec.tsx | 334 +++++++ .../result/__tests__/result-request.spec.ts | 293 ++++++ .../workflow-stream-handlers.spec.ts | 901 ++++++++++++++++++ .../__tests__/use-result-run-state.spec.ts | 200 ++++ .../hooks/__tests__/use-result-sender.spec.ts | 510 ++++++++++ .../result/hooks/use-result-run-state.ts | 237 +++++ .../result/hooks/use-result-sender.ts | 230 +++++ .../share/text-generation/result/index.tsx | 622 +----------- .../text-generation/result/result-request.ts | 156 +++ .../result/workflow-stream-handlers.ts | 404 ++++++++ web/eslint-suppressions.json | 5 +- .../components-coverage-thresholds.mjs | 8 +- 14 files changed, 3319 insertions(+), 590 deletions(-) create mode 100644 web/app/components/share/text-generation/result/__tests__/index.spec.tsx create mode 100644 web/app/components/share/text-generation/result/__tests__/result-request.spec.ts create mode 100644 web/app/components/share/text-generation/result/__tests__/workflow-stream-handlers.spec.ts create mode 100644 web/app/components/share/text-generation/result/hooks/__tests__/use-result-run-state.spec.ts create mode 100644 web/app/components/share/text-generation/result/hooks/__tests__/use-result-sender.spec.ts create mode 100644 web/app/components/share/text-generation/result/hooks/use-result-run-state.ts create mode 100644 web/app/components/share/text-generation/result/hooks/use-result-sender.ts create mode 100644 web/app/components/share/text-generation/result/result-request.ts create mode 100644 web/app/components/share/text-generation/result/workflow-stream-handlers.ts diff --git a/web/app/components/share/text-generation/hooks/__tests__/use-text-generation-batch.spec.ts b/web/app/components/share/text-generation/hooks/__tests__/use-text-generation-batch.spec.ts index 3dab88c578..973440e511 100644 --- a/web/app/components/share/text-generation/hooks/__tests__/use-text-generation-batch.spec.ts +++ b/web/app/components/share/text-generation/hooks/__tests__/use-text-generation-batch.spec.ts @@ -275,7 +275,7 @@ describe('useTextGenerationBatch', () => { }) act(() => { - result.current.handleCompleted({ answer: 'failed' } as unknown as string, 1, false) + result.current.handleCompleted('{"answer":"failed"}', 1, false) }) expect(result.current.allFailedTaskList).toEqual([ @@ -291,7 +291,7 @@ describe('useTextGenerationBatch', () => { { 'Name': 'Alice', 'Score': '', - 'generation.completionResult': JSON.stringify({ answer: 'failed' }), + 'generation.completionResult': '{"answer":"failed"}', }, ]) diff --git a/web/app/components/share/text-generation/hooks/use-text-generation-batch.ts b/web/app/components/share/text-generation/hooks/use-text-generation-batch.ts index 522d2e4681..65ef4d775a 100644 --- a/web/app/components/share/text-generation/hooks/use-text-generation-batch.ts +++ b/web/app/components/share/text-generation/hooks/use-text-generation-batch.ts @@ -241,10 +241,7 @@ export const useTextGenerationBatch = ({ result[variable.name] = String(task.params.inputs[variable.key] ?? '') }) - let completionValue = batchCompletionMap[String(task.id)] - if (typeof completionValue === 'object') - completionValue = JSON.stringify(completionValue) - + const completionValue = batchCompletionMap[String(task.id)] ?? '' result[t('generation.completionResult', { ns: 'share' })] = completionValue return result }) diff --git a/web/app/components/share/text-generation/result/__tests__/index.spec.tsx b/web/app/components/share/text-generation/result/__tests__/index.spec.tsx new file mode 100644 index 0000000000..3a349cf1c0 --- /dev/null +++ b/web/app/components/share/text-generation/result/__tests__/index.spec.tsx @@ -0,0 +1,334 @@ +import type { PromptConfig } from '@/models/debug' +import type { SiteInfo } from '@/models/share' +import type { IOtherOptions } from '@/service/base' +import type { VisionSettings } from '@/types/app' +import { act, fireEvent, render, screen, waitFor } from '@testing-library/react' +import { AppSourceType } from '@/service/share' +import { Resolution, TransferMethod } from '@/types/app' +import Result from '../index' + +const { + notifyMock, + sendCompletionMessageMock, + sendWorkflowMessageMock, + stopChatMessageRespondingMock, + textGenerationResPropsSpy, +} = vi.hoisted(() => ({ + notifyMock: vi.fn(), + sendCompletionMessageMock: vi.fn(), + sendWorkflowMessageMock: vi.fn(), + stopChatMessageRespondingMock: vi.fn(), + textGenerationResPropsSpy: vi.fn(), +})) + +vi.mock('i18next', () => ({ + t: (key: string) => key, +})) + +vi.mock('@/app/components/base/toast', () => ({ + default: { + notify: notifyMock, + }, +})) + +vi.mock('@/utils', async () => { + const actual = await vi.importActual('@/utils') + return { + ...actual, + sleep: () => new Promise(() => {}), + } +}) + +vi.mock('@/service/share', async () => { + const actual = await vi.importActual('@/service/share') + return { + ...actual, + sendCompletionMessage: (...args: Parameters) => sendCompletionMessageMock(...args), + sendWorkflowMessage: (...args: Parameters) => sendWorkflowMessageMock(...args), + stopChatMessageResponding: (...args: Parameters) => stopChatMessageRespondingMock(...args), + } +}) + +vi.mock('@/app/components/app/text-generate/item', () => ({ + default: (props: Record) => { + textGenerationResPropsSpy(props) + return ( +
+ {typeof props.content === 'string' ? props.content : JSON.stringify(props.content ?? null)} +
+ ) + }, +})) + +vi.mock('@/app/components/share/text-generation/no-data', () => ({ + default: () =>
No data
, +})) + +const promptConfig: PromptConfig = { + prompt_template: 'template', + prompt_variables: [ + { key: 'name', name: 'Name', type: 'string', required: true }, + ], +} + +const siteInfo: SiteInfo = { + title: 'Share title', + description: 'Share description', + icon_type: 'emoji', + icon: 'robot', +} + +const visionConfig: VisionSettings = { + enabled: false, + number_limits: 2, + detail: Resolution.low, + transfer_methods: [TransferMethod.local_file], +} + +const baseProps = { + appId: 'app-1', + appSourceType: AppSourceType.webApp, + completionFiles: [], + controlRetry: 0, + controlSend: 0, + controlStopResponding: 0, + handleSaveMessage: vi.fn(), + inputs: { name: 'Alice' }, + isCallBatchAPI: false, + isError: false, + isMobile: false, + isPC: true, + isShowTextToSpeech: true, + isWorkflow: false, + moreLikeThisEnabled: true, + onCompleted: vi.fn(), + onRunControlChange: vi.fn(), + onRunStart: vi.fn(), + onShowRes: vi.fn(), + promptConfig, + siteInfo, + visionConfig, +} + +describe('Result', () => { + beforeEach(() => { + vi.clearAllMocks() + stopChatMessageRespondingMock.mockResolvedValue(undefined) + }) + + it('should render no data before the first execution', () => { + render() + + expect(screen.getByTestId('no-data')).toBeTruthy() + expect(screen.queryByTestId('text-generation-res')).toBeNull() + }) + + it('should stream completion results and stop the current task', async () => { + let completionHandlers: { + onCompleted: () => void + onData: (chunk: string, isFirstMessage: boolean, info: { messageId: string, taskId?: string }) => void + onError: () => void + onMessageReplace: (messageReplace: { answer: string }) => void + } | null = null + + sendCompletionMessageMock.mockImplementation(async (_data, handlers) => { + completionHandlers = handlers + }) + + const onCompleted = vi.fn() + const onRunControlChange = vi.fn() + const { rerender } = render( + , + ) + + rerender( + , + ) + + expect(sendCompletionMessageMock).toHaveBeenCalledTimes(1) + expect(screen.getByRole('status', { name: 'appApi.loading' })).toBeTruthy() + + await act(async () => { + completionHandlers?.onData('Hello', false, { + messageId: 'message-1', + taskId: 'task-1', + }) + }) + + expect(screen.getByTestId('text-generation-res').textContent).toContain('Hello') + + await waitFor(() => { + expect(onRunControlChange).toHaveBeenLastCalledWith(expect.objectContaining({ + isStopping: false, + })) + }) + + fireEvent.click(screen.getByRole('button', { name: 'operation.stopResponding' })) + await waitFor(() => { + expect(stopChatMessageRespondingMock).toHaveBeenCalledWith('app-1', 'task-1', AppSourceType.webApp, 'app-1') + }) + + await act(async () => { + completionHandlers?.onCompleted() + }) + + expect(onCompleted).toHaveBeenCalledWith('Hello', undefined, true) + expect(textGenerationResPropsSpy).toHaveBeenLastCalledWith(expect.objectContaining({ + messageId: 'message-1', + })) + }) + + it('should render workflow results after workflow completion', async () => { + let workflowHandlers: IOtherOptions | null = null + sendWorkflowMessageMock.mockImplementation(async (_data, handlers) => { + workflowHandlers = handlers + }) + + const onCompleted = vi.fn() + const { rerender } = render( + , + ) + + rerender( + , + ) + + await act(async () => { + workflowHandlers?.onWorkflowStarted?.({ + workflow_run_id: 'run-1', + task_id: 'task-1', + event: 'workflow_started', + data: { + id: 'run-1', + workflow_id: 'wf-1', + created_at: 0, + }, + }) + workflowHandlers?.onTextChunk?.({ + task_id: 'task-1', + workflow_run_id: 'run-1', + event: 'text_chunk', + data: { + text: 'Hello', + }, + }) + workflowHandlers?.onWorkflowFinished?.({ + task_id: 'task-1', + workflow_run_id: 'run-1', + event: 'workflow_finished', + data: { + id: 'run-1', + workflow_id: 'wf-1', + status: 'succeeded', + outputs: { + answer: 'Hello', + }, + error: '', + elapsed_time: 0, + total_tokens: 0, + total_steps: 0, + created_at: 0, + created_by: { + id: 'user-1', + name: 'User', + email: 'user@example.com', + }, + finished_at: 0, + }, + }) + }) + + expect(screen.getByTestId('text-generation-res').textContent).toContain('{"answer":"Hello"}') + expect(textGenerationResPropsSpy).toHaveBeenLastCalledWith(expect.objectContaining({ + workflowProcessData: expect.objectContaining({ + resultText: 'Hello', + status: 'succeeded', + }), + })) + expect(onCompleted).toHaveBeenCalledWith('{"answer":"Hello"}', undefined, true) + }) + + it('should render batch task ids for both short and long indexes', () => { + const { rerender } = render( + , + ) + + expect(textGenerationResPropsSpy).toHaveBeenLastCalledWith(expect.objectContaining({ + taskId: '03', + })) + + rerender( + , + ) + + expect(textGenerationResPropsSpy).toHaveBeenLastCalledWith(expect.objectContaining({ + taskId: '12', + })) + }) + + it('should render the mobile stop button layout while a batch run is responding', async () => { + let completionHandlers: { + onData: (chunk: string, isFirstMessage: boolean, info: { messageId: string, taskId?: string }) => void + } | null = null + + sendCompletionMessageMock.mockImplementation(async (_data, handlers) => { + completionHandlers = handlers + }) + + const { rerender } = render( + , + ) + + rerender( + , + ) + + await act(async () => { + completionHandlers?.onData('Hello', false, { + messageId: 'message-batch', + taskId: 'task-batch', + }) + }) + + expect(screen.getByRole('button', { name: 'operation.stopResponding' }).parentElement?.className).toContain('justify-center') + }) +}) diff --git a/web/app/components/share/text-generation/result/__tests__/result-request.spec.ts b/web/app/components/share/text-generation/result/__tests__/result-request.spec.ts new file mode 100644 index 0000000000..f2ff68abe8 --- /dev/null +++ b/web/app/components/share/text-generation/result/__tests__/result-request.spec.ts @@ -0,0 +1,293 @@ +import type { FileEntity } from '@/app/components/base/file-uploader/types' +import type { PromptConfig } from '@/models/debug' +import type { VisionFile, VisionSettings } from '@/types/app' +import { Resolution, TransferMethod } from '@/types/app' +import { buildResultRequestData, validateResultRequest } from '../result-request' + +const createTranslator = () => vi.fn((key: string) => key) + +const createFileEntity = (overrides: Partial = {}): FileEntity => ({ + id: 'file-1', + name: 'example.txt', + size: 128, + type: 'text/plain', + progress: 100, + transferMethod: TransferMethod.local_file, + supportFileType: 'document', + uploadedId: 'uploaded-1', + url: 'https://example.com/file.txt', + ...overrides, +}) + +const createVisionFile = (overrides: Partial = {}): VisionFile => ({ + type: 'image', + transfer_method: TransferMethod.local_file, + upload_file_id: 'upload-1', + url: 'https://example.com/image.png', + ...overrides, +}) + +const promptConfig: PromptConfig = { + prompt_template: 'template', + prompt_variables: [ + { key: 'name', name: 'Name', type: 'string', required: true }, + { key: 'enabled', name: 'Enabled', type: 'boolean', required: true }, + { key: 'file', name: 'File', type: 'file', required: false }, + { key: 'files', name: 'Files', type: 'file-list', required: false }, + ], +} + +const visionConfig: VisionSettings = { + enabled: true, + number_limits: 2, + detail: Resolution.low, + transfer_methods: [TransferMethod.local_file], +} + +describe('result-request', () => { + it('should reject missing required non-boolean inputs', () => { + const t = createTranslator() + + const result = validateResultRequest({ + completionFiles: [], + inputs: { + enabled: false, + }, + isCallBatchAPI: false, + promptConfig, + t, + }) + + expect(result).toEqual({ + canSend: false, + notification: { + type: 'error', + message: 'errorMessage.valueOfVarRequired', + }, + }) + }) + + it('should allow required number inputs with a value of zero', () => { + const result = validateResultRequest({ + completionFiles: [], + inputs: { + count: 0, + }, + isCallBatchAPI: false, + promptConfig: { + prompt_template: 'template', + prompt_variables: [ + { key: 'count', name: 'Count', type: 'number', required: true }, + ], + }, + t: createTranslator(), + }) + + expect(result).toEqual({ canSend: true }) + }) + + it('should reject required text inputs that only contain whitespace', () => { + const result = validateResultRequest({ + completionFiles: [], + inputs: { + name: ' ', + }, + isCallBatchAPI: false, + promptConfig: { + prompt_template: 'template', + prompt_variables: [ + { key: 'name', name: 'Name', type: 'string', required: true }, + ], + }, + t: createTranslator(), + }) + + expect(result).toEqual({ + canSend: false, + notification: { + type: 'error', + message: 'errorMessage.valueOfVarRequired', + }, + }) + }) + + it('should reject required file lists when no files are selected', () => { + const result = validateResultRequest({ + completionFiles: [], + inputs: { + files: [], + }, + isCallBatchAPI: false, + promptConfig: { + prompt_template: 'template', + prompt_variables: [ + { key: 'files', name: 'Files', type: 'file-list', required: true }, + ], + }, + t: createTranslator(), + }) + + expect(result).toEqual({ + canSend: false, + notification: { + type: 'error', + message: 'errorMessage.valueOfVarRequired', + }, + }) + }) + + it('should allow required file inputs when a file is selected', () => { + const result = validateResultRequest({ + completionFiles: [], + inputs: { + file: createFileEntity(), + }, + isCallBatchAPI: false, + promptConfig: { + prompt_template: 'template', + prompt_variables: [ + { key: 'file', name: 'File', type: 'file', required: true }, + ], + }, + t: createTranslator(), + }) + + expect(result).toEqual({ canSend: true }) + }) + + it('should reject pending local uploads outside batch mode', () => { + const t = createTranslator() + + const result = validateResultRequest({ + completionFiles: [ + createVisionFile({ upload_file_id: '' }), + ], + inputs: { + name: 'Alice', + }, + isCallBatchAPI: false, + promptConfig, + t, + }) + + expect(result).toEqual({ + canSend: false, + notification: { + type: 'info', + message: 'errorMessage.waitForFileUpload', + }, + }) + }) + + it('should handle missing prompt metadata with and without pending uploads', () => { + const t = createTranslator() + + const blocked = validateResultRequest({ + completionFiles: [ + createVisionFile({ upload_file_id: '' }), + ], + inputs: {}, + isCallBatchAPI: false, + promptConfig: null, + t, + }) + + const allowed = validateResultRequest({ + completionFiles: [], + inputs: {}, + isCallBatchAPI: false, + promptConfig: null, + t, + }) + + expect(blocked).toEqual({ + canSend: false, + notification: { + type: 'info', + message: 'errorMessage.waitForFileUpload', + }, + }) + expect(allowed).toEqual({ canSend: true }) + }) + + it('should skip validation in batch mode', () => { + const result = validateResultRequest({ + completionFiles: [ + createVisionFile({ upload_file_id: '' }), + ], + inputs: {}, + isCallBatchAPI: true, + promptConfig, + t: createTranslator(), + }) + + expect(result).toEqual({ canSend: true }) + }) + + it('should build request data for single and list file inputs', () => { + const file = createFileEntity() + const secondFile = createFileEntity({ + id: 'file-2', + name: 'second.txt', + uploadedId: 'uploaded-2', + url: 'https://example.com/second.txt', + }) + + const result = buildResultRequestData({ + completionFiles: [ + createVisionFile(), + createVisionFile({ + transfer_method: TransferMethod.remote_url, + upload_file_id: '', + url: 'https://example.com/remote.png', + }), + ], + inputs: { + enabled: true, + file, + files: [file, secondFile], + name: 'Alice', + }, + promptConfig, + visionConfig, + }) + + expect(result).toEqual({ + files: [ + expect.objectContaining({ + transfer_method: TransferMethod.local_file, + upload_file_id: 'upload-1', + url: '', + }), + expect.objectContaining({ + transfer_method: TransferMethod.remote_url, + url: 'https://example.com/remote.png', + }), + ], + inputs: { + enabled: true, + file: { + type: 'document', + transfer_method: TransferMethod.local_file, + upload_file_id: 'uploaded-1', + url: 'https://example.com/file.txt', + }, + files: [ + { + type: 'document', + transfer_method: TransferMethod.local_file, + upload_file_id: 'uploaded-1', + url: 'https://example.com/file.txt', + }, + { + type: 'document', + transfer_method: TransferMethod.local_file, + upload_file_id: 'uploaded-2', + url: 'https://example.com/second.txt', + }, + ], + name: 'Alice', + }, + }) + }) +}) diff --git a/web/app/components/share/text-generation/result/__tests__/workflow-stream-handlers.spec.ts b/web/app/components/share/text-generation/result/__tests__/workflow-stream-handlers.spec.ts new file mode 100644 index 0000000000..369f78eb76 --- /dev/null +++ b/web/app/components/share/text-generation/result/__tests__/workflow-stream-handlers.spec.ts @@ -0,0 +1,901 @@ +import type { WorkflowProcess } from '@/app/components/base/chat/types' +import type { IOtherOptions } from '@/service/base' +import type { HumanInputFormData, HumanInputFormTimeoutData, NodeTracing } from '@/types/workflow' +import { act } from '@testing-library/react' +import { BlockEnum, NodeRunningStatus, WorkflowRunningStatus } from '@/app/components/workflow/types' +import { + appendParallelNext, + appendParallelStart, + appendResultText, + applyWorkflowFinishedState, + applyWorkflowOutputs, + applyWorkflowPaused, + createWorkflowStreamHandlers, + finishParallelTrace, + finishWorkflowNode, + markNodesStopped, + replaceResultText, + updateHumanInputFilled, + updateHumanInputRequired, + updateHumanInputTimeout, + upsertWorkflowNode, +} from '../workflow-stream-handlers' + +const sseGetMock = vi.fn() + +type TraceOverrides = Omit, 'execution_metadata'> & { + execution_metadata?: Partial> +} + +vi.mock('@/service/base', async () => { + const actual = await vi.importActual('@/service/base') + return { + ...actual, + sseGet: (...args: Parameters) => sseGetMock(...args), + } +}) + +const createTrace = (overrides: TraceOverrides = {}): NodeTracing => { + const { execution_metadata, ...restOverrides } = overrides + + return { + id: 'trace-1', + index: 0, + predecessor_node_id: '', + node_id: 'node-1', + node_type: BlockEnum.LLM, + title: 'Node', + inputs: {}, + inputs_truncated: false, + process_data: {}, + process_data_truncated: false, + outputs: {}, + outputs_truncated: false, + status: NodeRunningStatus.Running, + elapsed_time: 0, + metadata: { + iterator_length: 0, + iterator_index: 0, + loop_length: 0, + loop_index: 0, + }, + created_at: 0, + created_by: { + id: 'user-1', + name: 'User', + email: 'user@example.com', + }, + finished_at: 0, + details: [[]], + execution_metadata: { + total_tokens: 0, + total_price: 0, + currency: 'USD', + ...execution_metadata, + }, + ...restOverrides, + } +} + +const createWorkflowProcess = (): WorkflowProcess => ({ + status: WorkflowRunningStatus.Running, + tracing: [], + expand: false, + resultText: '', +}) + +const createHumanInput = (overrides: Partial = {}): HumanInputFormData => ({ + form_id: 'form-1', + node_id: 'node-1', + node_title: 'Node', + form_content: 'content', + inputs: [], + actions: [], + form_token: 'token-1', + resolved_default_values: {}, + display_in_ui: true, + expiration_time: 100, + ...overrides, +}) + +describe('workflow-stream-handlers helpers', () => { + it('should update tracing, result text, and human input state', () => { + const parallelTrace = createTrace({ + node_id: 'parallel-node', + execution_metadata: { parallel_id: 'parallel-1' }, + details: [[]], + }) + + let workflowProcessData = appendParallelStart(undefined, parallelTrace) + workflowProcessData = appendParallelNext(workflowProcessData, parallelTrace) + workflowProcessData = finishParallelTrace(workflowProcessData, createTrace({ + node_id: 'parallel-node', + execution_metadata: { parallel_id: 'parallel-1' }, + error: 'failed', + })) + workflowProcessData = upsertWorkflowNode(workflowProcessData, createTrace({ + node_id: 'node-1', + execution_metadata: { parallel_id: 'parallel-2' }, + }))! + workflowProcessData = appendResultText(workflowProcessData, 'Hello ') + workflowProcessData = replaceResultText(workflowProcessData, 'Hello world') + workflowProcessData = updateHumanInputRequired(workflowProcessData, createHumanInput()) + workflowProcessData = updateHumanInputFilled(workflowProcessData, { + action_id: 'action-1', + action_text: 'Submit', + node_id: 'node-1', + node_title: 'Node', + rendered_content: 'Done', + }) + workflowProcessData = updateHumanInputTimeout(workflowProcessData, { + node_id: 'node-1', + node_title: 'Node', + expiration_time: 200, + } satisfies HumanInputFormTimeoutData) + workflowProcessData = applyWorkflowPaused(workflowProcessData) + + expect(workflowProcessData.expand).toBe(false) + expect(workflowProcessData.resultText).toBe('Hello world') + expect(workflowProcessData.humanInputFilledFormDataList).toEqual([ + expect.objectContaining({ + action_text: 'Submit', + }), + ]) + expect(workflowProcessData.tracing[0]).toEqual(expect.objectContaining({ + node_id: 'parallel-node', + expand: true, + })) + }) + + it('should initialize missing parallel details on start and next events', () => { + const parallelTrace = createTrace({ + node_id: 'parallel-node', + execution_metadata: { parallel_id: 'parallel-1' }, + }) + + const startedProcess = appendParallelStart(undefined, parallelTrace) + const nextProcess = appendParallelNext(startedProcess, parallelTrace) + + expect(startedProcess.tracing[0]?.details).toEqual([[]]) + expect(nextProcess.tracing[0]?.details).toEqual([[], []]) + }) + + it('should leave tracing unchanged when a parallel next event has no matching trace', () => { + const process = createWorkflowProcess() + process.tracing = [ + createTrace({ + node_id: 'parallel-node', + execution_metadata: { parallel_id: 'parallel-1' }, + details: [[]], + }), + ] + + const nextProcess = appendParallelNext(process, createTrace({ + node_id: 'missing-node', + execution_metadata: { parallel_id: 'parallel-2' }, + })) + + expect(nextProcess.tracing).toEqual(process.tracing) + expect(nextProcess.expand).toBe(true) + }) + + it('should mark running nodes as stopped recursively', () => { + const workflowProcessData = createWorkflowProcess() + workflowProcessData.tracing = [ + createTrace({ + status: NodeRunningStatus.Running, + details: [[createTrace({ status: NodeRunningStatus.Waiting })]], + }), + ] + + const stoppedWorkflow = applyWorkflowFinishedState(workflowProcessData, WorkflowRunningStatus.Stopped) + markNodesStopped(stoppedWorkflow.tracing) + + expect(stoppedWorkflow.status).toBe(WorkflowRunningStatus.Stopped) + expect(stoppedWorkflow.tracing[0].status).toBe(NodeRunningStatus.Stopped) + expect(stoppedWorkflow.tracing[0].details?.[0][0].status).toBe(NodeRunningStatus.Stopped) + }) + + it('should cover unmatched and replacement helper branches', () => { + const process = createWorkflowProcess() + process.tracing = [ + createTrace({ + node_id: 'node-1', + parallel_id: 'parallel-1', + extras: { + source: 'extra', + }, + status: NodeRunningStatus.Succeeded, + }), + ] + process.humanInputFormDataList = [ + createHumanInput({ node_id: 'node-1' }), + ] + process.humanInputFilledFormDataList = [ + { + action_id: 'action-0', + action_text: 'Existing', + node_id: 'node-0', + node_title: 'Node 0', + rendered_content: 'Existing', + }, + ] + + const parallelMatched = appendParallelNext(process, createTrace({ + node_id: 'node-1', + execution_metadata: { + parallel_id: 'parallel-1', + }, + })) + const notFinished = finishParallelTrace(process, createTrace({ + node_id: 'missing', + execution_metadata: { + parallel_id: 'parallel-missing', + }, + })) + const ignoredIteration = upsertWorkflowNode(process, createTrace({ + iteration_id: 'iteration-1', + })) + const replacedNode = upsertWorkflowNode(process, createTrace({ + node_id: 'node-1', + })) + const ignoredFinish = finishWorkflowNode(process, createTrace({ + loop_id: 'loop-1', + })) + const unmatchedFinish = finishWorkflowNode(process, createTrace({ + node_id: 'missing', + execution_metadata: { + parallel_id: 'missing', + }, + })) + const finishedWithExtras = finishWorkflowNode(process, createTrace({ + node_id: 'node-1', + execution_metadata: { + parallel_id: 'parallel-1', + }, + error: 'failed', + })) + const succeededWorkflow = applyWorkflowFinishedState(process, WorkflowRunningStatus.Succeeded) + const outputlessWorkflow = applyWorkflowOutputs(undefined, null) + const updatedHumanInput = updateHumanInputRequired(process, createHumanInput({ + node_id: 'node-1', + expiration_time: 300, + })) + const appendedHumanInput = updateHumanInputRequired(process, createHumanInput({ + node_id: 'node-2', + })) + const noListFilled = updateHumanInputFilled(undefined, { + action_id: 'action-1', + action_text: 'Submit', + node_id: 'node-1', + node_title: 'Node', + rendered_content: 'Done', + }) + const appendedFilled = updateHumanInputFilled(process, { + action_id: 'action-2', + action_text: 'Append', + node_id: 'node-2', + node_title: 'Node 2', + rendered_content: 'More', + }) + const timeoutWithoutList = updateHumanInputTimeout(undefined, { + node_id: 'node-1', + node_title: 'Node', + expiration_time: 200, + }) + const timeoutWithMatch = updateHumanInputTimeout(process, { + node_id: 'node-1', + node_title: 'Node', + expiration_time: 400, + }) + + markNodesStopped(undefined) + + expect(parallelMatched.tracing[0].details).toHaveLength(2) + expect(notFinished).toEqual(expect.objectContaining({ + expand: true, + tracing: process.tracing, + })) + expect(ignoredIteration).toEqual(process) + expect(replacedNode?.tracing[0]).toEqual(expect.objectContaining({ + node_id: 'node-1', + status: NodeRunningStatus.Running, + })) + expect(ignoredFinish).toEqual(process) + expect(unmatchedFinish).toEqual(process) + expect(finishedWithExtras?.tracing[0]).toEqual(expect.objectContaining({ + extras: { + source: 'extra', + }, + error: 'failed', + })) + expect(succeededWorkflow.status).toBe(WorkflowRunningStatus.Succeeded) + expect(outputlessWorkflow.files).toEqual([]) + expect(updatedHumanInput.humanInputFormDataList?.[0].expiration_time).toBe(300) + expect(appendedHumanInput.humanInputFormDataList).toHaveLength(2) + expect(noListFilled.humanInputFilledFormDataList).toHaveLength(1) + expect(appendedFilled.humanInputFilledFormDataList).toHaveLength(2) + expect(timeoutWithoutList).toEqual(expect.objectContaining({ + status: WorkflowRunningStatus.Running, + tracing: [], + })) + expect(timeoutWithMatch.humanInputFormDataList?.[0].expiration_time).toBe(400) + }) +}) + +describe('createWorkflowStreamHandlers', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + const setupHandlers = (overrides: { isTimedOut?: () => boolean } = {}) => { + let completionRes = '' + let currentTaskId: string | null = null + let isStopping = false + let messageId: string | null = null + let workflowProcessData: WorkflowProcess | undefined + + const setCurrentTaskId = vi.fn((value: string | null | ((prev: string | null) => string | null)) => { + currentTaskId = typeof value === 'function' ? value(currentTaskId) : value + }) + const setIsStopping = vi.fn((value: boolean | ((prev: boolean) => boolean)) => { + isStopping = typeof value === 'function' ? value(isStopping) : value + }) + const setMessageId = vi.fn((value: string | null | ((prev: string | null) => string | null)) => { + messageId = typeof value === 'function' ? value(messageId) : value + }) + const setWorkflowProcessData = vi.fn((value: WorkflowProcess | undefined) => { + workflowProcessData = value + }) + const setCompletionRes = vi.fn((value: string) => { + completionRes = value + }) + const notify = vi.fn() + const onCompleted = vi.fn() + const resetRunState = vi.fn() + const setRespondingFalse = vi.fn() + const markEnded = vi.fn() + + const handlers = createWorkflowStreamHandlers({ + getCompletionRes: () => completionRes, + getWorkflowProcessData: () => workflowProcessData, + isTimedOut: overrides.isTimedOut ?? (() => false), + markEnded, + notify, + onCompleted, + resetRunState, + setCompletionRes, + setCurrentTaskId, + setIsStopping, + setMessageId, + setRespondingFalse, + setWorkflowProcessData, + t: (key: string) => key, + taskId: 3, + }) + + return { + currentTaskId: () => currentTaskId, + handlers, + isStopping: () => isStopping, + messageId: () => messageId, + notify, + onCompleted, + resetRunState, + setCompletionRes, + setCurrentTaskId, + setMessageId, + setRespondingFalse, + workflowProcessData: () => workflowProcessData, + } + } + + it('should process workflow success and paused events', () => { + const setup = setupHandlers() + const handlers = setup.handlers as Required> + + act(() => { + handlers.onWorkflowStarted({ + workflow_run_id: 'run-1', + task_id: 'task-1', + event: 'workflow_started', + data: { id: 'run-1', workflow_id: 'wf-1', created_at: 0 }, + }) + handlers.onNodeStarted({ + task_id: 'task-1', + workflow_run_id: 'run-1', + event: 'node_started', + data: createTrace({ node_id: 'node-1' }), + }) + handlers.onNodeFinished({ + task_id: 'task-1', + workflow_run_id: 'run-1', + event: 'node_finished', + data: createTrace({ node_id: 'node-1', error: '' }), + }) + handlers.onIterationStart({ + task_id: 'task-1', + workflow_run_id: 'run-1', + event: 'iteration_start', + data: createTrace({ + node_id: 'iter-1', + execution_metadata: { parallel_id: 'parallel-1' }, + details: [[]], + }), + }) + handlers.onIterationNext({ + task_id: 'task-1', + workflow_run_id: 'run-1', + event: 'iteration_next', + data: createTrace({ + node_id: 'iter-1', + execution_metadata: { parallel_id: 'parallel-1' }, + details: [[]], + }), + }) + handlers.onIterationFinish({ + task_id: 'task-1', + workflow_run_id: 'run-1', + event: 'iteration_finish', + data: createTrace({ + node_id: 'iter-1', + execution_metadata: { parallel_id: 'parallel-1' }, + }), + }) + handlers.onLoopStart({ + task_id: 'task-1', + workflow_run_id: 'run-1', + event: 'loop_start', + data: createTrace({ + node_id: 'loop-1', + execution_metadata: { parallel_id: 'parallel-2' }, + details: [[]], + }), + }) + handlers.onLoopNext({ + task_id: 'task-1', + workflow_run_id: 'run-1', + event: 'loop_next', + data: createTrace({ + node_id: 'loop-1', + execution_metadata: { parallel_id: 'parallel-2' }, + details: [[]], + }), + }) + handlers.onLoopFinish({ + task_id: 'task-1', + workflow_run_id: 'run-1', + event: 'loop_finish', + data: createTrace({ + node_id: 'loop-1', + execution_metadata: { parallel_id: 'parallel-2' }, + }), + }) + handlers.onTextChunk({ + task_id: 'task-1', + workflow_run_id: 'run-1', + event: 'text_chunk', + data: { text: 'Hello' }, + }) + handlers.onHumanInputRequired({ + task_id: 'task-1', + workflow_run_id: 'run-1', + event: 'human_input_required', + data: createHumanInput({ node_id: 'node-1' }), + }) + handlers.onHumanInputFormFilled({ + task_id: 'task-1', + workflow_run_id: 'run-1', + event: 'human_input_form_filled', + data: { + node_id: 'node-1', + node_title: 'Node', + rendered_content: 'Done', + action_id: 'action-1', + action_text: 'Submit', + }, + }) + handlers.onHumanInputFormTimeout({ + task_id: 'task-1', + workflow_run_id: 'run-1', + event: 'human_input_form_timeout', + data: { + node_id: 'node-1', + node_title: 'Node', + expiration_time: 200, + }, + }) + handlers.onWorkflowPaused({ + task_id: 'task-1', + workflow_run_id: 'run-1', + event: 'workflow_paused', + data: { + outputs: {}, + paused_nodes: [], + reasons: [], + workflow_run_id: 'run-1', + }, + }) + handlers.onWorkflowFinished({ + task_id: 'task-1', + workflow_run_id: 'run-1', + event: 'workflow_finished', + data: { + id: 'run-1', + workflow_id: 'wf-1', + status: WorkflowRunningStatus.Succeeded, + outputs: { answer: 'Hello' }, + error: '', + elapsed_time: 0, + total_tokens: 0, + total_steps: 0, + created_at: 0, + created_by: { + id: 'user-1', + name: 'User', + email: 'user@example.com', + }, + finished_at: 0, + }, + }) + }) + + expect(setup.currentTaskId()).toBe('task-1') + expect(setup.isStopping()).toBe(false) + expect(setup.workflowProcessData()).toEqual(expect.objectContaining({ + resultText: 'Hello', + status: WorkflowRunningStatus.Succeeded, + })) + expect(sseGetMock).toHaveBeenCalledWith('/workflow/run-1/events', {}, expect.any(Object)) + expect(setup.messageId()).toBe('run-1') + expect(setup.onCompleted).toHaveBeenCalledWith('{"answer":"Hello"}', 3, true) + expect(setup.setRespondingFalse).toHaveBeenCalled() + expect(setup.resetRunState).toHaveBeenCalled() + }) + + it('should handle timeout and workflow failures', () => { + const timeoutSetup = setupHandlers({ + isTimedOut: () => true, + }) + const timeoutHandlers = timeoutSetup.handlers as Required> + + act(() => { + timeoutHandlers.onWorkflowFinished({ + task_id: 'task-1', + workflow_run_id: 'run-1', + event: 'workflow_finished', + data: { + id: 'run-1', + workflow_id: 'wf-1', + status: WorkflowRunningStatus.Succeeded, + outputs: null, + error: '', + elapsed_time: 0, + total_tokens: 0, + total_steps: 0, + created_at: 0, + created_by: { + id: 'user-1', + name: 'User', + email: 'user@example.com', + }, + finished_at: 0, + }, + }) + }) + + expect(timeoutSetup.notify).toHaveBeenCalledWith({ + type: 'warning', + message: 'warningMessage.timeoutExceeded', + }) + + const failureSetup = setupHandlers() + const failureHandlers = failureSetup.handlers as Required> + + act(() => { + failureHandlers.onWorkflowStarted({ + workflow_run_id: 'run-2', + task_id: 'task-2', + event: 'workflow_started', + data: { id: 'run-2', workflow_id: 'wf-2', created_at: 0 }, + }) + failureHandlers.onWorkflowFinished({ + task_id: 'task-2', + workflow_run_id: 'run-2', + event: 'workflow_finished', + data: { + id: 'run-2', + workflow_id: 'wf-2', + status: WorkflowRunningStatus.Failed, + outputs: null, + error: 'failed', + elapsed_time: 0, + total_tokens: 0, + total_steps: 0, + created_at: 0, + created_by: { + id: 'user-1', + name: 'User', + email: 'user@example.com', + }, + finished_at: 0, + }, + }) + }) + + expect(failureSetup.notify).toHaveBeenCalledWith({ + type: 'error', + message: 'failed', + }) + expect(failureSetup.onCompleted).toHaveBeenCalledWith('', 3, false) + }) + + it('should cover existing workflow starts, stopped runs, and non-string outputs', () => { + const setup = setupHandlers() + let existingProcess: WorkflowProcess = { + status: WorkflowRunningStatus.Paused, + tracing: [ + createTrace({ + node_id: 'existing-node', + status: NodeRunningStatus.Waiting, + }), + ], + expand: false, + resultText: '', + } + + const handlers = createWorkflowStreamHandlers({ + getCompletionRes: () => '', + getWorkflowProcessData: () => existingProcess, + isTimedOut: () => false, + markEnded: vi.fn(), + notify: setup.notify, + onCompleted: setup.onCompleted, + resetRunState: setup.resetRunState, + setCompletionRes: setup.setCompletionRes, + setCurrentTaskId: setup.setCurrentTaskId, + setIsStopping: vi.fn(), + setMessageId: setup.setMessageId, + setRespondingFalse: setup.setRespondingFalse, + setWorkflowProcessData: (value) => { + existingProcess = value! + }, + t: (key: string) => key, + taskId: 5, + }) as Required> + + act(() => { + handlers.onWorkflowStarted({ + workflow_run_id: 'run-existing', + task_id: '', + event: 'workflow_started', + data: { id: 'run-existing', workflow_id: 'wf-1', created_at: 0 }, + }) + handlers.onTextReplace({ + task_id: 'task-existing', + workflow_run_id: 'run-existing', + event: 'text_replace', + data: { text: 'Replaced text' }, + }) + }) + + expect(existingProcess).toEqual(expect.objectContaining({ + expand: true, + status: WorkflowRunningStatus.Running, + resultText: 'Replaced text', + })) + + act(() => { + handlers.onWorkflowFinished({ + task_id: 'task-existing', + workflow_run_id: 'run-existing', + event: 'workflow_finished', + data: { + id: 'run-existing', + workflow_id: 'wf-1', + status: WorkflowRunningStatus.Stopped, + outputs: null, + error: '', + elapsed_time: 0, + total_tokens: 0, + total_steps: 0, + created_at: 0, + created_by: { + id: 'user-1', + name: 'User', + email: 'user@example.com', + }, + finished_at: 0, + }, + }) + }) + + expect(existingProcess.status).toBe(WorkflowRunningStatus.Stopped) + expect(existingProcess.tracing[0].status).toBe(NodeRunningStatus.Stopped) + expect(setup.onCompleted).toHaveBeenCalledWith('', 5, false) + + const noOutputSetup = setupHandlers() + const noOutputHandlers = noOutputSetup.handlers as Required> + + act(() => { + noOutputHandlers.onWorkflowStarted({ + workflow_run_id: 'run-no-output', + task_id: '', + event: 'workflow_started', + data: { id: 'run-no-output', workflow_id: 'wf-2', created_at: 0 }, + }) + noOutputHandlers.onTextReplace({ + task_id: 'task-no-output', + workflow_run_id: 'run-no-output', + event: 'text_replace', + data: { text: 'Draft' }, + }) + noOutputHandlers.onWorkflowFinished({ + task_id: 'task-no-output', + workflow_run_id: 'run-no-output', + event: 'workflow_finished', + data: { + id: 'run-no-output', + workflow_id: 'wf-2', + status: WorkflowRunningStatus.Succeeded, + outputs: null, + error: '', + elapsed_time: 0, + total_tokens: 0, + total_steps: 0, + created_at: 0, + created_by: { + id: 'user-1', + name: 'User', + email: 'user@example.com', + }, + finished_at: 0, + }, + }) + }) + + expect(noOutputSetup.setCompletionRes).toHaveBeenCalledWith('') + + const objectOutputSetup = setupHandlers() + const objectOutputHandlers = objectOutputSetup.handlers as Required> + + act(() => { + objectOutputHandlers.onWorkflowStarted({ + workflow_run_id: 'run-object', + task_id: undefined as unknown as string, + event: 'workflow_started', + data: { id: 'run-object', workflow_id: 'wf-3', created_at: 0 }, + }) + objectOutputHandlers.onWorkflowFinished({ + task_id: 'task-object', + workflow_run_id: 'run-object', + event: 'workflow_finished', + data: { + id: 'run-object', + workflow_id: 'wf-3', + status: WorkflowRunningStatus.Succeeded, + outputs: { + answer: 'Hello', + meta: { + mode: 'object', + }, + }, + error: '', + elapsed_time: 0, + total_tokens: 0, + total_steps: 0, + created_at: 0, + created_by: { + id: 'user-1', + name: 'User', + email: 'user@example.com', + }, + finished_at: 0, + }, + }) + }) + + expect(objectOutputSetup.currentTaskId()).toBeNull() + expect(objectOutputSetup.setCompletionRes).toHaveBeenCalledWith('{"answer":"Hello","meta":{"mode":"object"}}') + expect(objectOutputSetup.workflowProcessData()).toEqual(expect.objectContaining({ + status: WorkflowRunningStatus.Succeeded, + resultText: '', + })) + }) + + it('should serialize empty, string, and circular workflow outputs', () => { + const noOutputSetup = setupHandlers() + const noOutputHandlers = noOutputSetup.handlers as Required> + + act(() => { + noOutputHandlers.onWorkflowFinished({ + task_id: 'task-empty', + workflow_run_id: 'run-empty', + event: 'workflow_finished', + data: { + id: 'run-empty', + workflow_id: 'wf-empty', + status: WorkflowRunningStatus.Succeeded, + outputs: null, + error: '', + elapsed_time: 0, + total_tokens: 0, + total_steps: 0, + created_at: 0, + created_by: { + id: 'user-1', + name: 'User', + email: 'user@example.com', + }, + finished_at: 0, + }, + }) + }) + + expect(noOutputSetup.setCompletionRes).toHaveBeenCalledWith('') + + const stringOutputSetup = setupHandlers() + const stringOutputHandlers = stringOutputSetup.handlers as Required> + + act(() => { + stringOutputHandlers.onWorkflowFinished({ + task_id: 'task-string', + workflow_run_id: 'run-string', + event: 'workflow_finished', + data: { + id: 'run-string', + workflow_id: 'wf-string', + status: WorkflowRunningStatus.Succeeded, + outputs: 'plain text output', + error: '', + elapsed_time: 0, + total_tokens: 0, + total_steps: 0, + created_at: 0, + created_by: { + id: 'user-1', + name: 'User', + email: 'user@example.com', + }, + finished_at: 0, + }, + }) + }) + + expect(stringOutputSetup.setCompletionRes).toHaveBeenCalledWith('plain text output') + + const circularOutputSetup = setupHandlers() + const circularOutputHandlers = circularOutputSetup.handlers as Required> + const circularOutputs: Record = { + answer: 'Hello', + } + circularOutputs.self = circularOutputs + + act(() => { + circularOutputHandlers.onWorkflowFinished({ + task_id: 'task-circular', + workflow_run_id: 'run-circular', + event: 'workflow_finished', + data: { + id: 'run-circular', + workflow_id: 'wf-circular', + status: WorkflowRunningStatus.Succeeded, + outputs: circularOutputs, + error: '', + elapsed_time: 0, + total_tokens: 0, + total_steps: 0, + created_at: 0, + created_by: { + id: 'user-1', + name: 'User', + email: 'user@example.com', + }, + finished_at: 0, + }, + }) + }) + + expect(circularOutputSetup.setCompletionRes).toHaveBeenCalledWith('[object Object]') + }) +}) diff --git a/web/app/components/share/text-generation/result/hooks/__tests__/use-result-run-state.spec.ts b/web/app/components/share/text-generation/result/hooks/__tests__/use-result-run-state.spec.ts new file mode 100644 index 0000000000..66c99c0317 --- /dev/null +++ b/web/app/components/share/text-generation/result/hooks/__tests__/use-result-run-state.spec.ts @@ -0,0 +1,200 @@ +import type { FeedbackType } from '@/app/components/base/chat/chat/type' +import { act, renderHook, waitFor } from '@testing-library/react' +import { AppSourceType } from '@/service/share' +import { useResultRunState } from '../use-result-run-state' + +const { + stopChatMessageRespondingMock, + stopWorkflowMessageMock, + updateFeedbackMock, +} = vi.hoisted(() => ({ + stopChatMessageRespondingMock: vi.fn(), + stopWorkflowMessageMock: vi.fn(), + updateFeedbackMock: vi.fn(), +})) + +vi.mock('@/service/share', async () => { + const actual = await vi.importActual('@/service/share') + return { + ...actual, + stopChatMessageResponding: (...args: Parameters) => stopChatMessageRespondingMock(...args), + stopWorkflowMessage: (...args: Parameters) => stopWorkflowMessageMock(...args), + updateFeedback: (...args: Parameters) => updateFeedbackMock(...args), + } +}) + +describe('useResultRunState', () => { + beforeEach(() => { + vi.clearAllMocks() + stopChatMessageRespondingMock.mockResolvedValue(undefined) + stopWorkflowMessageMock.mockResolvedValue(undefined) + updateFeedbackMock.mockResolvedValue(undefined) + }) + + it('should expose run control and stop completion requests', async () => { + const notify = vi.fn() + const onRunControlChange = vi.fn() + const { result } = renderHook(() => useResultRunState({ + appId: 'app-1', + appSourceType: AppSourceType.webApp, + controlStopResponding: 0, + isWorkflow: false, + notify, + onRunControlChange, + })) + + const abort = vi.fn() + + act(() => { + result.current.abortControllerRef.current = { abort } as unknown as AbortController + result.current.setCurrentTaskId('task-1') + result.current.setRespondingTrue() + }) + + await waitFor(() => { + expect(onRunControlChange).toHaveBeenLastCalledWith(expect.objectContaining({ + isStopping: false, + })) + }) + + await act(async () => { + await result.current.handleStop() + }) + + expect(stopChatMessageRespondingMock).toHaveBeenCalledWith('app-1', 'task-1', AppSourceType.webApp, 'app-1') + expect(abort).toHaveBeenCalledTimes(1) + }) + + it('should update feedback and react to external stop control', async () => { + const notify = vi.fn() + const onRunControlChange = vi.fn() + const { result, rerender } = renderHook(({ controlStopResponding }) => useResultRunState({ + appId: 'app-2', + appSourceType: AppSourceType.installedApp, + controlStopResponding, + isWorkflow: true, + notify, + onRunControlChange, + }), { + initialProps: { controlStopResponding: 0 }, + }) + + const abort = vi.fn() + act(() => { + result.current.abortControllerRef.current = { abort } as unknown as AbortController + result.current.setMessageId('message-1') + }) + + await act(async () => { + await result.current.handleFeedback({ + rating: 'like', + } satisfies FeedbackType) + }) + + expect(updateFeedbackMock).toHaveBeenCalledWith({ + url: '/messages/message-1/feedbacks', + body: { + rating: 'like', + content: undefined, + }, + }, AppSourceType.installedApp, 'app-2') + expect(result.current.feedback).toEqual({ + rating: 'like', + }) + + act(() => { + result.current.setCurrentTaskId('task-2') + result.current.setRespondingTrue() + }) + + rerender({ controlStopResponding: 1 }) + + await waitFor(() => { + expect(abort).toHaveBeenCalled() + expect(result.current.currentTaskId).toBeNull() + expect(onRunControlChange).toHaveBeenLastCalledWith(null) + }) + }) + + it('should stop workflow requests through the workflow stop API', async () => { + const notify = vi.fn() + const { result } = renderHook(() => useResultRunState({ + appId: 'app-3', + appSourceType: AppSourceType.installedApp, + controlStopResponding: 0, + isWorkflow: true, + notify, + })) + + act(() => { + result.current.setCurrentTaskId('task-3') + }) + + await act(async () => { + await result.current.handleStop() + }) + + expect(stopWorkflowMessageMock).toHaveBeenCalledWith('app-3', 'task-3', AppSourceType.installedApp, 'app-3') + }) + + it('should ignore invalid stops and report non-Error failures', async () => { + const notify = vi.fn() + stopChatMessageRespondingMock.mockRejectedValueOnce('stop failed') + + const { result } = renderHook(() => useResultRunState({ + appSourceType: AppSourceType.webApp, + controlStopResponding: 0, + isWorkflow: false, + notify, + })) + + await act(async () => { + await result.current.handleStop() + }) + + expect(stopChatMessageRespondingMock).not.toHaveBeenCalled() + + act(() => { + result.current.setCurrentTaskId('task-4') + result.current.setIsStopping(prev => !prev) + result.current.setIsStopping(prev => !prev) + }) + + await act(async () => { + await result.current.handleStop() + }) + + expect(stopChatMessageRespondingMock).toHaveBeenCalledWith(undefined, 'task-4', AppSourceType.webApp, '') + expect(notify).toHaveBeenCalledWith({ + type: 'error', + message: 'stop failed', + }) + expect(result.current.isStopping).toBe(false) + }) + + it('should report Error instances from workflow stop failures without an app id fallback', async () => { + const notify = vi.fn() + stopWorkflowMessageMock.mockRejectedValueOnce(new Error('workflow stop failed')) + + const { result } = renderHook(() => useResultRunState({ + appSourceType: AppSourceType.installedApp, + controlStopResponding: 0, + isWorkflow: true, + notify, + })) + + act(() => { + result.current.setCurrentTaskId('task-5') + }) + + await act(async () => { + await result.current.handleStop() + }) + + expect(stopWorkflowMessageMock).toHaveBeenCalledWith(undefined, 'task-5', AppSourceType.installedApp, '') + expect(notify).toHaveBeenCalledWith({ + type: 'error', + message: 'workflow stop failed', + }) + }) +}) diff --git a/web/app/components/share/text-generation/result/hooks/__tests__/use-result-sender.spec.ts b/web/app/components/share/text-generation/result/hooks/__tests__/use-result-sender.spec.ts new file mode 100644 index 0000000000..58b47789c1 --- /dev/null +++ b/web/app/components/share/text-generation/result/hooks/__tests__/use-result-sender.spec.ts @@ -0,0 +1,510 @@ +import type { ResultInputValue } from '../../result-request' +import type { ResultRunStateController } from '../use-result-run-state' +import type { PromptConfig } from '@/models/debug' +import type { AppSourceType } from '@/service/share' +import type { VisionSettings } from '@/types/app' +import { act, renderHook, waitFor } from '@testing-library/react' +import { AppSourceType as AppSourceTypeEnum } from '@/service/share' +import { Resolution, TransferMethod } from '@/types/app' +import { useResultSender } from '../use-result-sender' + +const { + buildResultRequestDataMock, + createWorkflowStreamHandlersMock, + sendCompletionMessageMock, + sendWorkflowMessageMock, + sleepMock, + validateResultRequestMock, +} = vi.hoisted(() => ({ + buildResultRequestDataMock: vi.fn(), + createWorkflowStreamHandlersMock: vi.fn(), + sendCompletionMessageMock: vi.fn(), + sendWorkflowMessageMock: vi.fn(), + sleepMock: vi.fn(), + validateResultRequestMock: vi.fn(), +})) + +vi.mock('@/service/share', async () => { + const actual = await vi.importActual('@/service/share') + return { + ...actual, + sendCompletionMessage: (...args: Parameters) => sendCompletionMessageMock(...args), + sendWorkflowMessage: (...args: Parameters) => sendWorkflowMessageMock(...args), + } +}) + +vi.mock('@/utils', async () => { + const actual = await vi.importActual('@/utils') + return { + ...actual, + sleep: (...args: Parameters) => sleepMock(...args), + } +}) + +vi.mock('../../result-request', () => ({ + buildResultRequestData: (...args: unknown[]) => buildResultRequestDataMock(...args), + validateResultRequest: (...args: unknown[]) => validateResultRequestMock(...args), +})) + +vi.mock('../../workflow-stream-handlers', () => ({ + createWorkflowStreamHandlers: (...args: unknown[]) => createWorkflowStreamHandlersMock(...args), +})) + +type RunStateHarness = { + state: { + completionRes: string + currentTaskId: string | null + messageId: string | null + workflowProcessData: ResultRunStateController['workflowProcessData'] + } + runState: ResultRunStateController +} + +type CompletionHandlers = { + getAbortController: (abortController: AbortController) => void + onCompleted: () => void + onData: (chunk: string, isFirstMessage: boolean, info: { messageId: string, taskId?: string }) => void + onError: () => void + onMessageReplace: (messageReplace: { answer: string }) => void +} + +const createRunStateHarness = (): RunStateHarness => { + const state: RunStateHarness['state'] = { + completionRes: '', + currentTaskId: null, + messageId: null, + workflowProcessData: undefined, + } + + const runState: ResultRunStateController = { + abortControllerRef: { current: null }, + clearMoreLikeThis: vi.fn(), + completionRes: '', + controlClearMoreLikeThis: 0, + currentTaskId: null, + feedback: { rating: null }, + getCompletionRes: vi.fn(() => state.completionRes), + getWorkflowProcessData: vi.fn(() => state.workflowProcessData), + handleFeedback: vi.fn(), + handleStop: vi.fn(), + isResponding: false, + isStopping: false, + messageId: null, + prepareForNewRun: vi.fn(() => { + state.completionRes = '' + state.currentTaskId = null + state.messageId = null + state.workflowProcessData = undefined + runState.completionRes = '' + runState.currentTaskId = null + runState.messageId = null + runState.workflowProcessData = undefined + }), + resetRunState: vi.fn(() => { + state.currentTaskId = null + runState.currentTaskId = null + runState.isStopping = false + }), + setCompletionRes: vi.fn((value: string) => { + state.completionRes = value + runState.completionRes = value + }), + setCurrentTaskId: vi.fn((value) => { + state.currentTaskId = typeof value === 'function' ? value(state.currentTaskId) : value + runState.currentTaskId = state.currentTaskId + }), + setIsStopping: vi.fn((value) => { + runState.isStopping = typeof value === 'function' ? value(runState.isStopping) : value + }), + setMessageId: vi.fn((value) => { + state.messageId = typeof value === 'function' ? value(state.messageId) : value + runState.messageId = state.messageId + }), + setRespondingFalse: vi.fn(() => { + runState.isResponding = false + }), + setRespondingTrue: vi.fn(() => { + runState.isResponding = true + }), + setWorkflowProcessData: vi.fn((value) => { + state.workflowProcessData = value + runState.workflowProcessData = value + }), + workflowProcessData: undefined, + } + + return { + state, + runState, + } +} + +const promptConfig: PromptConfig = { + prompt_template: 'template', + prompt_variables: [ + { key: 'name', name: 'Name', type: 'string', required: true }, + ], +} + +const visionConfig: VisionSettings = { + enabled: false, + number_limits: 2, + detail: Resolution.low, + transfer_methods: [TransferMethod.local_file], +} + +type RenderSenderOptions = { + appSourceType?: AppSourceType + controlRetry?: number + controlSend?: number + inputs?: Record + isPC?: boolean + isWorkflow?: boolean + runState?: ResultRunStateController + taskId?: number +} + +const renderSender = ({ + appSourceType = AppSourceTypeEnum.webApp, + controlRetry = 0, + controlSend = 0, + inputs = { name: 'Alice' }, + isPC = true, + isWorkflow = false, + runState, + taskId, +}: RenderSenderOptions = {}) => { + const notify = vi.fn() + const onCompleted = vi.fn() + const onRunStart = vi.fn() + const onShowRes = vi.fn() + + const hook = renderHook((props: { controlRetry: number, controlSend: number }) => useResultSender({ + appId: 'app-1', + appSourceType, + completionFiles: [], + controlRetry: props.controlRetry, + controlSend: props.controlSend, + inputs, + isCallBatchAPI: false, + isPC, + isWorkflow, + notify, + onCompleted, + onRunStart, + onShowRes, + promptConfig, + runState: runState || createRunStateHarness().runState, + t: (key: string) => key, + taskId, + visionConfig, + }), { + initialProps: { + controlRetry, + controlSend, + }, + }) + + return { + ...hook, + notify, + onCompleted, + onRunStart, + onShowRes, + } +} + +describe('useResultSender', () => { + beforeEach(() => { + vi.clearAllMocks() + validateResultRequestMock.mockReturnValue({ canSend: true }) + buildResultRequestDataMock.mockReturnValue({ inputs: { name: 'Alice' } }) + createWorkflowStreamHandlersMock.mockReturnValue({ onWorkflowFinished: vi.fn() }) + sendCompletionMessageMock.mockResolvedValue(undefined) + sendWorkflowMessageMock.mockResolvedValue(undefined) + sleepMock.mockImplementation(() => new Promise(() => {})) + }) + + it('should reject sends while a response is already in progress', async () => { + const { runState } = createRunStateHarness() + runState.isResponding = true + const { result, notify } = renderSender({ runState }) + + await act(async () => { + expect(await result.current.handleSend()).toBe(false) + }) + + expect(notify).toHaveBeenCalledWith({ + type: 'info', + message: 'errorMessage.waitForResponse', + }) + expect(validateResultRequestMock).not.toHaveBeenCalled() + expect(sendCompletionMessageMock).not.toHaveBeenCalled() + }) + + it('should surface validation failures without building request payloads', async () => { + const { runState } = createRunStateHarness() + validateResultRequestMock.mockReturnValue({ + canSend: false, + notification: { + type: 'error', + message: 'invalid', + }, + }) + + const { result, notify } = renderSender({ runState }) + + await act(async () => { + expect(await result.current.handleSend()).toBe(false) + }) + + expect(notify).toHaveBeenCalledWith({ + type: 'error', + message: 'invalid', + }) + expect(buildResultRequestDataMock).not.toHaveBeenCalled() + expect(sendCompletionMessageMock).not.toHaveBeenCalled() + }) + + it('should send completion requests when controlSend changes and process callbacks', async () => { + const harness = createRunStateHarness() + let completionHandlers: CompletionHandlers | undefined + + sendCompletionMessageMock.mockImplementation(async (_data, handlers) => { + completionHandlers = handlers as CompletionHandlers + }) + + const { rerender, onCompleted, onRunStart, onShowRes } = renderSender({ + controlSend: 0, + isPC: false, + runState: harness.runState, + taskId: 7, + }) + + rerender({ + controlRetry: 0, + controlSend: 1, + }) + + expect(validateResultRequestMock).toHaveBeenCalledWith(expect.objectContaining({ + inputs: { name: 'Alice' }, + isCallBatchAPI: false, + })) + expect(buildResultRequestDataMock).toHaveBeenCalled() + expect(harness.runState.prepareForNewRun).toHaveBeenCalledTimes(1) + expect(harness.runState.setRespondingTrue).toHaveBeenCalledTimes(1) + expect(harness.runState.clearMoreLikeThis).toHaveBeenCalledTimes(1) + expect(onShowRes).toHaveBeenCalledTimes(1) + expect(onRunStart).toHaveBeenCalledTimes(1) + expect(sendCompletionMessageMock).toHaveBeenCalledWith( + { inputs: { name: 'Alice' } }, + expect.objectContaining({ + onCompleted: expect.any(Function), + onData: expect.any(Function), + }), + AppSourceTypeEnum.webApp, + 'app-1', + ) + + const abortController = {} as AbortController + expect(completionHandlers).toBeDefined() + completionHandlers!.getAbortController(abortController) + expect(harness.runState.abortControllerRef.current).toBe(abortController) + + await act(async () => { + completionHandlers!.onData('Hello', false, { + messageId: 'message-1', + taskId: 'task-1', + }) + }) + + expect(harness.runState.setCurrentTaskId).toHaveBeenCalled() + expect(harness.runState.currentTaskId).toBe('task-1') + + await act(async () => { + completionHandlers!.onMessageReplace({ answer: 'Replaced' }) + completionHandlers!.onCompleted() + }) + + expect(harness.runState.setCompletionRes).toHaveBeenLastCalledWith('Replaced') + expect(harness.runState.setRespondingFalse).toHaveBeenCalled() + expect(harness.runState.resetRunState).toHaveBeenCalled() + expect(harness.runState.setMessageId).toHaveBeenCalledWith('message-1') + expect(onCompleted).toHaveBeenCalledWith('Replaced', 7, true) + }) + + it('should trigger workflow sends on retry and report workflow request failures', async () => { + const harness = createRunStateHarness() + sendWorkflowMessageMock.mockRejectedValue(new Error('workflow failed')) + + const { rerender, notify } = renderSender({ + controlRetry: 0, + isWorkflow: true, + runState: harness.runState, + }) + + rerender({ + controlRetry: 2, + controlSend: 0, + }) + + await waitFor(() => { + expect(createWorkflowStreamHandlersMock).toHaveBeenCalledWith(expect.objectContaining({ + getCompletionRes: harness.runState.getCompletionRes, + resetRunState: harness.runState.resetRunState, + setWorkflowProcessData: harness.runState.setWorkflowProcessData, + })) + expect(sendWorkflowMessageMock).toHaveBeenCalledWith( + { inputs: { name: 'Alice' } }, + expect.any(Object), + AppSourceTypeEnum.webApp, + 'app-1', + ) + }) + + await waitFor(() => { + expect(harness.runState.setRespondingFalse).toHaveBeenCalled() + expect(harness.runState.resetRunState).toHaveBeenCalled() + expect(notify).toHaveBeenCalledWith({ + type: 'error', + message: 'workflow failed', + }) + }) + expect(harness.runState.clearMoreLikeThis).not.toHaveBeenCalled() + }) + + it('should stringify non-Error workflow failures', async () => { + const harness = createRunStateHarness() + sendWorkflowMessageMock.mockRejectedValue('workflow failed') + + const { result, notify } = renderSender({ + isWorkflow: true, + runState: harness.runState, + }) + + await act(async () => { + await result.current.handleSend() + }) + + await waitFor(() => { + expect(notify).toHaveBeenCalledWith({ + type: 'error', + message: 'workflow failed', + }) + }) + }) + + it('should timeout unfinished completion requests', async () => { + const harness = createRunStateHarness() + sleepMock.mockResolvedValue(undefined) + + const { result, onCompleted } = renderSender({ + runState: harness.runState, + taskId: 9, + }) + + await act(async () => { + expect(await result.current.handleSend()).toBe(true) + }) + + await waitFor(() => { + expect(harness.runState.setRespondingFalse).toHaveBeenCalled() + expect(harness.runState.resetRunState).toHaveBeenCalled() + expect(onCompleted).toHaveBeenCalledWith('', 9, false) + }) + }) + + it('should ignore empty task ids and surface timeout warnings from stream callbacks', async () => { + const harness = createRunStateHarness() + let completionHandlers: CompletionHandlers | undefined + + sleepMock.mockResolvedValue(undefined) + sendCompletionMessageMock.mockImplementation(async (_data, handlers) => { + completionHandlers = handlers as CompletionHandlers + }) + + const { result, notify, onCompleted } = renderSender({ + runState: harness.runState, + taskId: 11, + }) + + await act(async () => { + await result.current.handleSend() + }) + + await act(async () => { + completionHandlers!.onData('Hello', false, { + messageId: 'message-2', + taskId: ' ', + }) + completionHandlers!.onCompleted() + completionHandlers!.onError() + }) + + expect(harness.runState.currentTaskId).toBeNull() + expect(notify).toHaveBeenNthCalledWith(1, { + type: 'warning', + message: 'warningMessage.timeoutExceeded', + }) + expect(notify).toHaveBeenNthCalledWith(2, { + type: 'warning', + message: 'warningMessage.timeoutExceeded', + }) + expect(onCompleted).toHaveBeenCalledWith('', 11, false) + }) + + it('should avoid timeout fallback after a completion response has already ended', async () => { + const harness = createRunStateHarness() + let resolveSleep!: () => void + let completionHandlers: CompletionHandlers | undefined + + sleepMock.mockImplementation(() => new Promise((resolve) => { + resolveSleep = resolve + })) + sendCompletionMessageMock.mockImplementation(async (_data, handlers) => { + completionHandlers = handlers as CompletionHandlers + }) + + const { result, onCompleted } = renderSender({ + runState: harness.runState, + taskId: 12, + }) + + await act(async () => { + await result.current.handleSend() + }) + + await act(async () => { + harness.runState.setCompletionRes('Done') + completionHandlers!.onCompleted() + resolveSleep() + await Promise.resolve() + }) + + expect(onCompleted).toHaveBeenCalledWith('Done', 12, true) + expect(onCompleted).toHaveBeenCalledTimes(1) + }) + + it('should handle non-timeout stream errors as failed completions', async () => { + const harness = createRunStateHarness() + let completionHandlers: CompletionHandlers | undefined + + sendCompletionMessageMock.mockImplementation(async (_data, handlers) => { + completionHandlers = handlers as CompletionHandlers + }) + + const { result, onCompleted } = renderSender({ + runState: harness.runState, + taskId: 13, + }) + + await act(async () => { + await result.current.handleSend() + completionHandlers!.onError() + }) + + expect(harness.runState.setRespondingFalse).toHaveBeenCalled() + expect(harness.runState.resetRunState).toHaveBeenCalled() + expect(onCompleted).toHaveBeenCalledWith('', 13, false) + }) +}) diff --git a/web/app/components/share/text-generation/result/hooks/use-result-run-state.ts b/web/app/components/share/text-generation/result/hooks/use-result-run-state.ts new file mode 100644 index 0000000000..d2f276e848 --- /dev/null +++ b/web/app/components/share/text-generation/result/hooks/use-result-run-state.ts @@ -0,0 +1,237 @@ +import type { Dispatch, MutableRefObject, SetStateAction } from 'react' +import type { FeedbackType } from '@/app/components/base/chat/chat/type' +import type { WorkflowProcess } from '@/app/components/base/chat/types' +import type { AppSourceType } from '@/service/share' +import { useBoolean } from 'ahooks' +import { useCallback, useEffect, useReducer, useRef, useState } from 'react' +import { + stopChatMessageResponding, + stopWorkflowMessage, + updateFeedback, +} from '@/service/share' + +type Notify = (payload: { type: 'error', message: string }) => void + +type RunControlState = { + currentTaskId: string | null + isStopping: boolean +} + +type RunControlAction + = | { type: 'reset' } + | { type: 'setCurrentTaskId', value: SetStateAction } + | { type: 'setIsStopping', value: SetStateAction } + +type UseResultRunStateOptions = { + appId?: string + appSourceType: AppSourceType + controlStopResponding?: number + isWorkflow: boolean + notify: Notify + onRunControlChange?: (control: { onStop: () => Promise | void, isStopping: boolean } | null) => void +} + +export type ResultRunStateController = { + abortControllerRef: MutableRefObject + clearMoreLikeThis: () => void + completionRes: string + controlClearMoreLikeThis: number + currentTaskId: string | null + feedback: FeedbackType + getCompletionRes: () => string + getWorkflowProcessData: () => WorkflowProcess | undefined + handleFeedback: (feedback: FeedbackType) => Promise + handleStop: () => Promise + isResponding: boolean + isStopping: boolean + messageId: string | null + prepareForNewRun: () => void + resetRunState: () => void + setCompletionRes: (res: string) => void + setCurrentTaskId: Dispatch> + setIsStopping: Dispatch> + setMessageId: Dispatch> + setRespondingFalse: () => void + setRespondingTrue: () => void + setWorkflowProcessData: (data: WorkflowProcess | undefined) => void + workflowProcessData: WorkflowProcess | undefined +} + +const runControlReducer = (state: RunControlState, action: RunControlAction): RunControlState => { + switch (action.type) { + case 'reset': + return { + currentTaskId: null, + isStopping: false, + } + case 'setCurrentTaskId': + return { + ...state, + currentTaskId: typeof action.value === 'function' ? action.value(state.currentTaskId) : action.value, + } + case 'setIsStopping': + return { + ...state, + isStopping: typeof action.value === 'function' ? action.value(state.isStopping) : action.value, + } + } +} + +export const useResultRunState = ({ + appId, + appSourceType, + controlStopResponding, + isWorkflow, + notify, + onRunControlChange, +}: UseResultRunStateOptions): ResultRunStateController => { + const [isResponding, { setTrue: setRespondingTrue, setFalse: setRespondingFalse }] = useBoolean(false) + const [completionResState, setCompletionResState] = useState('') + const completionResRef = useRef('') + const [workflowProcessDataState, setWorkflowProcessDataState] = useState() + const workflowProcessDataRef = useRef(undefined) + const [messageId, setMessageId] = useState(null) + const [feedback, setFeedback] = useState({ + rating: null, + }) + const [controlClearMoreLikeThis, setControlClearMoreLikeThis] = useState(0) + const abortControllerRef = useRef(null) + const [{ currentTaskId, isStopping }, dispatchRunControl] = useReducer(runControlReducer, { + currentTaskId: null, + isStopping: false, + }) + + const setCurrentTaskId = useCallback>>((value) => { + dispatchRunControl({ + type: 'setCurrentTaskId', + value, + }) + }, []) + + const setIsStopping = useCallback>>((value) => { + dispatchRunControl({ + type: 'setIsStopping', + value, + }) + }, []) + + const setCompletionRes = useCallback((res: string) => { + completionResRef.current = res + setCompletionResState(res) + }, []) + + const getCompletionRes = useCallback(() => completionResRef.current, []) + + const setWorkflowProcessData = useCallback((data: WorkflowProcess | undefined) => { + workflowProcessDataRef.current = data + setWorkflowProcessDataState(data) + }, []) + + const getWorkflowProcessData = useCallback(() => workflowProcessDataRef.current, []) + + const resetRunState = useCallback(() => { + dispatchRunControl({ type: 'reset' }) + abortControllerRef.current = null + onRunControlChange?.(null) + }, [onRunControlChange]) + + const prepareForNewRun = useCallback(() => { + setMessageId(null) + setFeedback({ rating: null }) + setCompletionRes('') + setWorkflowProcessData(undefined) + resetRunState() + }, [resetRunState, setCompletionRes, setWorkflowProcessData]) + + const handleFeedback = useCallback(async (nextFeedback: FeedbackType) => { + await updateFeedback({ + url: `/messages/${messageId}/feedbacks`, + body: { + rating: nextFeedback.rating, + content: nextFeedback.content, + }, + }, appSourceType, appId) + setFeedback(nextFeedback) + }, [appId, appSourceType, messageId]) + + const handleStop = useCallback(async () => { + if (!currentTaskId || isStopping) + return + + setIsStopping(true) + try { + if (isWorkflow) + await stopWorkflowMessage(appId!, currentTaskId, appSourceType, appId || '') + else + await stopChatMessageResponding(appId!, currentTaskId, appSourceType, appId || '') + + abortControllerRef.current?.abort() + } + catch (error) { + const message = error instanceof Error ? error.message : String(error) + notify({ type: 'error', message }) + } + finally { + setIsStopping(false) + } + }, [appId, appSourceType, currentTaskId, isStopping, isWorkflow, notify, setIsStopping]) + + const clearMoreLikeThis = useCallback(() => { + setControlClearMoreLikeThis(Date.now()) + }, []) + + useEffect(() => { + const abortCurrentRequest = () => { + abortControllerRef.current?.abort() + } + + if (controlStopResponding) { + abortCurrentRequest() + setRespondingFalse() + resetRunState() + } + + return abortCurrentRequest + }, [controlStopResponding, resetRunState, setRespondingFalse]) + + useEffect(() => { + if (!onRunControlChange) + return + + if (isResponding && currentTaskId) { + onRunControlChange({ + onStop: handleStop, + isStopping, + }) + return + } + + onRunControlChange(null) + }, [currentTaskId, handleStop, isResponding, isStopping, onRunControlChange]) + + return { + abortControllerRef, + clearMoreLikeThis, + completionRes: completionResState, + controlClearMoreLikeThis, + currentTaskId, + feedback, + getCompletionRes, + getWorkflowProcessData, + handleFeedback, + handleStop, + isResponding, + isStopping, + messageId, + prepareForNewRun, + resetRunState, + setCompletionRes, + setCurrentTaskId, + setIsStopping, + setMessageId, + setRespondingFalse, + setRespondingTrue, + setWorkflowProcessData, + workflowProcessData: workflowProcessDataState, + } +} diff --git a/web/app/components/share/text-generation/result/hooks/use-result-sender.ts b/web/app/components/share/text-generation/result/hooks/use-result-sender.ts new file mode 100644 index 0000000000..3bae2b02f8 --- /dev/null +++ b/web/app/components/share/text-generation/result/hooks/use-result-sender.ts @@ -0,0 +1,230 @@ +import type { ResultInputValue } from '../result-request' +import type { ResultRunStateController } from './use-result-run-state' +import type { PromptConfig } from '@/models/debug' +import type { AppSourceType } from '@/service/share' +import type { VisionFile, VisionSettings } from '@/types/app' +import { useCallback, useEffect, useRef } from 'react' +import { TEXT_GENERATION_TIMEOUT_MS } from '@/config' +import { + sendCompletionMessage, + sendWorkflowMessage, +} from '@/service/share' +import { sleep } from '@/utils' +import { buildResultRequestData, validateResultRequest } from '../result-request' +import { createWorkflowStreamHandlers } from '../workflow-stream-handlers' + +type Notify = (payload: { type: 'error' | 'info' | 'warning', message: string }) => void +type Translate = (key: string, options?: Record) => string + +type UseResultSenderOptions = { + appId?: string + appSourceType: AppSourceType + completionFiles: VisionFile[] + controlRetry?: number + controlSend?: number + inputs: Record + isCallBatchAPI: boolean + isPC: boolean + isWorkflow: boolean + notify: Notify + onCompleted: (completionRes: string, taskId?: number, success?: boolean) => void + onRunStart: () => void + onShowRes: () => void + promptConfig: PromptConfig | null + runState: ResultRunStateController + t: Translate + taskId?: number + visionConfig: VisionSettings +} + +const logRequestError = (notify: Notify, error: unknown) => { + const message = error instanceof Error ? error.message : String(error) + notify({ type: 'error', message }) +} + +export const useResultSender = ({ + appId, + appSourceType, + completionFiles, + controlRetry, + controlSend, + inputs, + isCallBatchAPI, + isPC, + isWorkflow, + notify, + onCompleted, + onRunStart, + onShowRes, + promptConfig, + runState, + t, + taskId, + visionConfig, +}: UseResultSenderOptions) => { + const { clearMoreLikeThis } = runState + + const handleSend = useCallback(async () => { + if (runState.isResponding) { + notify({ type: 'info', message: t('errorMessage.waitForResponse', { ns: 'appDebug' }) }) + return false + } + + const validation = validateResultRequest({ + completionFiles, + inputs, + isCallBatchAPI, + promptConfig, + t, + }) + if (!validation.canSend) { + notify(validation.notification!) + return false + } + + const data = buildResultRequestData({ + completionFiles, + inputs, + promptConfig, + visionConfig, + }) + + runState.prepareForNewRun() + + if (!isPC) { + onShowRes() + onRunStart() + } + + runState.setRespondingTrue() + + let isEnd = false + let isTimeout = false + let completionChunks: string[] = [] + let tempMessageId = '' + + void (async () => { + await sleep(TEXT_GENERATION_TIMEOUT_MS) + if (!isEnd) { + runState.setRespondingFalse() + onCompleted(runState.getCompletionRes(), taskId, false) + runState.resetRunState() + isTimeout = true + } + })() + + if (isWorkflow) { + const otherOptions = createWorkflowStreamHandlers({ + getCompletionRes: runState.getCompletionRes, + getWorkflowProcessData: runState.getWorkflowProcessData, + isTimedOut: () => isTimeout, + markEnded: () => { + isEnd = true + }, + notify, + onCompleted, + resetRunState: runState.resetRunState, + setCompletionRes: runState.setCompletionRes, + setCurrentTaskId: runState.setCurrentTaskId, + setIsStopping: runState.setIsStopping, + setMessageId: runState.setMessageId, + setRespondingFalse: runState.setRespondingFalse, + setWorkflowProcessData: runState.setWorkflowProcessData, + t, + taskId, + }) + + void sendWorkflowMessage(data, otherOptions, appSourceType, appId).catch((error) => { + runState.setRespondingFalse() + runState.resetRunState() + logRequestError(notify, error) + }) + return true + } + + void sendCompletionMessage(data, { + onData: (chunk, _isFirstMessage, { messageId, taskId: nextTaskId }) => { + tempMessageId = messageId + if (nextTaskId && nextTaskId.trim() !== '') + runState.setCurrentTaskId(prev => prev ?? nextTaskId) + + completionChunks.push(chunk) + runState.setCompletionRes(completionChunks.join('')) + }, + onCompleted: () => { + if (isTimeout) { + notify({ type: 'warning', message: t('warningMessage.timeoutExceeded', { ns: 'appDebug' }) }) + return + } + + runState.setRespondingFalse() + runState.resetRunState() + runState.setMessageId(tempMessageId) + onCompleted(runState.getCompletionRes(), taskId, true) + isEnd = true + }, + onMessageReplace: (messageReplace) => { + completionChunks = [messageReplace.answer] + runState.setCompletionRes(completionChunks.join('')) + }, + onError: () => { + if (isTimeout) { + notify({ type: 'warning', message: t('warningMessage.timeoutExceeded', { ns: 'appDebug' }) }) + return + } + + runState.setRespondingFalse() + runState.resetRunState() + onCompleted(runState.getCompletionRes(), taskId, false) + isEnd = true + }, + getAbortController: (abortController) => { + runState.abortControllerRef.current = abortController + }, + }, appSourceType, appId) + + return true + }, [ + appId, + appSourceType, + completionFiles, + inputs, + isCallBatchAPI, + isPC, + isWorkflow, + notify, + onCompleted, + onRunStart, + onShowRes, + promptConfig, + runState, + t, + taskId, + visionConfig, + ]) + + const handleSendRef = useRef(handleSend) + + useEffect(() => { + handleSendRef.current = handleSend + }, [handleSend]) + + useEffect(() => { + if (!controlSend) + return + + void handleSendRef.current() + clearMoreLikeThis() + }, [clearMoreLikeThis, controlSend]) + + useEffect(() => { + if (!controlRetry) + return + + void handleSendRef.current() + }, [controlRetry]) + + return { + handleSend, + } +} diff --git a/web/app/components/share/text-generation/result/index.tsx b/web/app/components/share/text-generation/result/index.tsx index 2bcd1c9d94..e0e366c52b 100644 --- a/web/app/components/share/text-generation/result/index.tsx +++ b/web/app/components/share/text-generation/result/index.tsx @@ -1,46 +1,18 @@ 'use client' import type { FC } from 'react' -import type { FeedbackType } from '@/app/components/base/chat/chat/type' -import type { WorkflowProcess } from '@/app/components/base/chat/types' -import type { FileEntity } from '@/app/components/base/file-uploader/types' import type { PromptConfig } from '@/models/debug' import type { SiteInfo } from '@/models/share' -import type { - IOtherOptions, -} from '@/service/base' +import type { AppSourceType } from '@/service/share' import type { VisionFile, VisionSettings } from '@/types/app' -import { RiLoader2Line } from '@remixicon/react' -import { useBoolean } from 'ahooks' import { t } from 'i18next' -import { produce } from 'immer' import * as React from 'react' -import { useCallback, useEffect, useRef, useState } from 'react' import TextGenerationRes from '@/app/components/app/text-generate/item' import Button from '@/app/components/base/button' -import { - getFilesInLogs, - getProcessedFiles, -} from '@/app/components/base/file-uploader/utils' -import { StopCircle } from '@/app/components/base/icons/src/vender/solid/mediaAndDevices' import Loading from '@/app/components/base/loading' import Toast from '@/app/components/base/toast' import NoData from '@/app/components/share/text-generation/no-data' -import { NodeRunningStatus, WorkflowRunningStatus } from '@/app/components/workflow/types' -import { TEXT_GENERATION_TIMEOUT_MS } from '@/config' -import { - sseGet, -} from '@/service/base' -import { - AppSourceType, - sendCompletionMessage, - sendWorkflowMessage, - stopChatMessageResponding, - stopWorkflowMessage, - updateFeedback, -} from '@/service/share' -import { TransferMethod } from '@/types/app' -import { sleep } from '@/utils' -import { formatBooleanInputs } from '@/utils/model-config' +import { useResultRunState } from './hooks/use-result-run-state' +import { useResultSender } from './hooks/use-result-sender' export type IResultProps = { isWorkflow: boolean @@ -95,554 +67,52 @@ const Result: FC = ({ onRunControlChange, hideInlineStopButton = false, }) => { - const [isResponding, { setTrue: setRespondingTrue, setFalse: setRespondingFalse }] = useBoolean(false) - const [completionRes, doSetCompletionRes] = useState('') - const completionResRef = useRef('') - const setCompletionRes = (res: string) => { - completionResRef.current = res - doSetCompletionRes(res) - } - const getCompletionRes = () => completionResRef.current - const [workflowProcessData, doSetWorkflowProcessData] = useState() - const workflowProcessDataRef = useRef(undefined) - const setWorkflowProcessData = useCallback((data: WorkflowProcess | undefined) => { - workflowProcessDataRef.current = data - doSetWorkflowProcessData(data) - }, []) - const getWorkflowProcessData = () => workflowProcessDataRef.current - const [currentTaskId, setCurrentTaskId] = useState(null) - const [isStopping, setIsStopping] = useState(false) - const abortControllerRef = useRef(null) - const resetRunState = useCallback(() => { - setCurrentTaskId(null) - setIsStopping(false) - abortControllerRef.current = null - onRunControlChange?.(null) - }, [onRunControlChange]) - - useEffect(() => { - const abortCurrentRequest = () => { - abortControllerRef.current?.abort() - } - - if (controlStopResponding) { - abortCurrentRequest() - setRespondingFalse() - resetRunState() - } - - return abortCurrentRequest - }, [controlStopResponding, resetRunState, setRespondingFalse]) - const { notify } = Toast - const isNoData = !completionRes - - const [messageId, setMessageId] = useState(null) - const [feedback, setFeedback] = useState({ - rating: null, + const runState = useResultRunState({ + appId, + appSourceType, + controlStopResponding, + isWorkflow, + notify, + onRunControlChange, }) - const handleFeedback = async (feedback: FeedbackType) => { - await updateFeedback({ url: `/messages/${messageId}/feedbacks`, body: { rating: feedback.rating, content: feedback.content } }, appSourceType, appId) - setFeedback(feedback) - } + const { handleSend } = useResultSender({ + appId, + appSourceType, + completionFiles, + controlRetry, + controlSend, + inputs, + isCallBatchAPI, + isPC, + isWorkflow, + notify, + onCompleted, + onRunStart, + onShowRes, + promptConfig, + runState, + t, + taskId, + visionConfig, + }) - const logError = (message: string) => { - notify({ type: 'error', message }) - } - - const handleStop = useCallback(async () => { - if (!currentTaskId || isStopping) - return - setIsStopping(true) - try { - if (isWorkflow) - await stopWorkflowMessage(appId!, currentTaskId, appSourceType, appId || '') - else - await stopChatMessageResponding(appId!, currentTaskId, appSourceType, appId || '') - abortControllerRef.current?.abort() - } - catch (error) { - const message = error instanceof Error ? error.message : String(error) - notify({ type: 'error', message }) - } - finally { - setIsStopping(false) - } - }, [appId, currentTaskId, appSourceType, isStopping, isWorkflow, notify]) - - useEffect(() => { - if (!onRunControlChange) - return - if (isResponding && currentTaskId) { - onRunControlChange({ - onStop: handleStop, - isStopping, - }) - } - else { - onRunControlChange(null) - } - }, [currentTaskId, handleStop, isResponding, isStopping, onRunControlChange]) - - const checkCanSend = () => { - // batch will check outer - if (isCallBatchAPI) - return true - - const prompt_variables = promptConfig?.prompt_variables - if (!prompt_variables || prompt_variables?.length === 0) { - if (completionFiles.some(item => item.transfer_method === TransferMethod.local_file && !item.upload_file_id)) { - notify({ type: 'info', message: t('errorMessage.waitForFileUpload', { ns: 'appDebug' }) }) - return false - } - return true - } - - let hasEmptyInput = '' - const requiredVars = prompt_variables?.filter(({ key, name, required, type }) => { - if (type === 'boolean' || type === 'checkbox') - return false // boolean/checkbox input is not required - const res = (!key || !key.trim()) || (!name || !name.trim()) || (required || required === undefined || required === null) - return res - }) || [] // compatible with old version - requiredVars.forEach(({ key, name }) => { - if (hasEmptyInput) - return - - if (!inputs[key]) - hasEmptyInput = name - }) - - if (hasEmptyInput) { - logError(t('errorMessage.valueOfVarRequired', { ns: 'appDebug', key: hasEmptyInput })) - return false - } - - if (completionFiles.some(item => item.transfer_method === TransferMethod.local_file && !item.upload_file_id)) { - notify({ type: 'info', message: t('errorMessage.waitForFileUpload', { ns: 'appDebug' }) }) - return false - } - return !hasEmptyInput - } - - const handleSend = async () => { - if (isResponding) { - notify({ type: 'info', message: t('errorMessage.waitForResponse', { ns: 'appDebug' }) }) - return false - } - - if (!checkCanSend()) - return - - // Process inputs: convert file entities to API format - const processedInputs = { ...formatBooleanInputs(promptConfig?.prompt_variables, inputs) } - promptConfig?.prompt_variables.forEach((variable) => { - const value = processedInputs[variable.key] - if (variable.type === 'file' && value && typeof value === 'object' && !Array.isArray(value)) { - // Convert single file entity to API format - processedInputs[variable.key] = getProcessedFiles([value as FileEntity])[0] - } - else if (variable.type === 'file-list' && Array.isArray(value) && value.length > 0) { - // Convert file entity array to API format - processedInputs[variable.key] = getProcessedFiles(value as FileEntity[]) - } - }) - - const data: Record = { - inputs: processedInputs, - } - if (visionConfig.enabled && completionFiles && completionFiles?.length > 0) { - data.files = completionFiles.map((item) => { - if (item.transfer_method === TransferMethod.local_file) { - return { - ...item, - url: '', - } - } - return item - }) - } - - setMessageId(null) - setFeedback({ - rating: null, - }) - setCompletionRes('') - setWorkflowProcessData(undefined) - resetRunState() - - let res: string[] = [] - let tempMessageId = '' - - if (!isPC) { - onShowRes() - onRunStart() - } - - setRespondingTrue() - let isEnd = false - let isTimeout = false; - (async () => { - await sleep(TEXT_GENERATION_TIMEOUT_MS) - if (!isEnd) { - setRespondingFalse() - onCompleted(getCompletionRes(), taskId, false) - resetRunState() - isTimeout = true - } - })() - - if (isWorkflow) { - const otherOptions: IOtherOptions = { - isPublicAPI: appSourceType === AppSourceType.webApp, - onWorkflowStarted: ({ workflow_run_id, task_id }) => { - const workflowProcessData = getWorkflowProcessData() - if (workflowProcessData && workflowProcessData.tracing.length > 0) { - setWorkflowProcessData(produce(workflowProcessData, (draft) => { - draft.expand = true - draft.status = WorkflowRunningStatus.Running - })) - } - else { - tempMessageId = workflow_run_id - setCurrentTaskId(task_id || null) - setIsStopping(false) - setWorkflowProcessData({ - status: WorkflowRunningStatus.Running, - tracing: [], - expand: false, - resultText: '', - }) - } - }, - onIterationStart: ({ data }) => { - setWorkflowProcessData(produce(getWorkflowProcessData()!, (draft) => { - draft.expand = true - draft.tracing!.push({ - ...data, - status: NodeRunningStatus.Running, - expand: true, - }) - })) - }, - onIterationNext: () => { - setWorkflowProcessData(produce(getWorkflowProcessData()!, (draft) => { - draft.expand = true - const iterations = draft.tracing.find(item => item.node_id === data.node_id - && (item.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id || item.parallel_id === data.execution_metadata?.parallel_id))! - iterations?.details!.push([]) - })) - }, - onIterationFinish: ({ data }) => { - setWorkflowProcessData(produce(getWorkflowProcessData()!, (draft) => { - draft.expand = true - const iterationsIndex = draft.tracing.findIndex(item => item.node_id === data.node_id - && (item.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id || item.parallel_id === data.execution_metadata?.parallel_id))! - draft.tracing[iterationsIndex] = { - ...data, - expand: !!data.error, - } - })) - }, - onLoopStart: ({ data }) => { - setWorkflowProcessData(produce(getWorkflowProcessData()!, (draft) => { - draft.expand = true - draft.tracing!.push({ - ...data, - status: NodeRunningStatus.Running, - expand: true, - }) - })) - }, - onLoopNext: () => { - setWorkflowProcessData(produce(getWorkflowProcessData()!, (draft) => { - draft.expand = true - const loops = draft.tracing.find(item => item.node_id === data.node_id - && (item.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id || item.parallel_id === data.execution_metadata?.parallel_id))! - loops?.details!.push([]) - })) - }, - onLoopFinish: ({ data }) => { - setWorkflowProcessData(produce(getWorkflowProcessData()!, (draft) => { - draft.expand = true - const loopsIndex = draft.tracing.findIndex(item => item.node_id === data.node_id - && (item.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id || item.parallel_id === data.execution_metadata?.parallel_id))! - draft.tracing[loopsIndex] = { - ...data, - expand: !!data.error, - } - })) - }, - onNodeStarted: ({ data }) => { - if (data.iteration_id) - return - - if (data.loop_id) - return - const workflowProcessData = getWorkflowProcessData() - setWorkflowProcessData(produce(workflowProcessData!, (draft) => { - if (draft.tracing.length > 0) { - const currentIndex = draft.tracing.findIndex(item => item.node_id === data.node_id) - if (currentIndex > -1) { - draft.expand = true - draft.tracing![currentIndex] = { - ...data, - status: NodeRunningStatus.Running, - expand: true, - } - } - else { - draft.expand = true - draft.tracing.push({ - ...data, - status: NodeRunningStatus.Running, - expand: true, - }) - } - } - else { - draft.expand = true - draft.tracing!.push({ - ...data, - status: NodeRunningStatus.Running, - expand: true, - }) - } - })) - }, - onNodeFinished: ({ data }) => { - if (data.iteration_id) - return - - if (data.loop_id) - return - - setWorkflowProcessData(produce(getWorkflowProcessData()!, (draft) => { - const currentIndex = draft.tracing!.findIndex(trace => trace.node_id === data.node_id - && (trace.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id || trace.parallel_id === data.execution_metadata?.parallel_id)) - if (currentIndex > -1 && draft.tracing) { - draft.tracing[currentIndex] = { - ...(draft.tracing[currentIndex].extras - ? { extras: draft.tracing[currentIndex].extras } - : {}), - ...data, - expand: !!data.error, - } - } - })) - }, - onWorkflowFinished: ({ data }) => { - if (isTimeout) { - notify({ type: 'warning', message: t('warningMessage.timeoutExceeded', { ns: 'appDebug' }) }) - return - } - const workflowStatus = data.status as WorkflowRunningStatus | undefined - const markNodesStopped = (traces?: WorkflowProcess['tracing']) => { - if (!traces) - return - const markTrace = (trace: WorkflowProcess['tracing'][number]) => { - if ([NodeRunningStatus.Running, NodeRunningStatus.Waiting].includes(trace.status as NodeRunningStatus)) - trace.status = NodeRunningStatus.Stopped - trace.details?.forEach(detailGroup => detailGroup.forEach(markTrace)) - trace.retryDetail?.forEach(markTrace) - trace.parallelDetail?.children?.forEach(markTrace) - } - traces.forEach(markTrace) - } - if (workflowStatus === WorkflowRunningStatus.Stopped) { - setWorkflowProcessData(produce(getWorkflowProcessData()!, (draft) => { - draft.status = WorkflowRunningStatus.Stopped - markNodesStopped(draft.tracing) - })) - setRespondingFalse() - resetRunState() - onCompleted(getCompletionRes(), taskId, false) - isEnd = true - return - } - if (data.error) { - notify({ type: 'error', message: data.error }) - setWorkflowProcessData(produce(getWorkflowProcessData()!, (draft) => { - draft.status = WorkflowRunningStatus.Failed - markNodesStopped(draft.tracing) - })) - setRespondingFalse() - resetRunState() - onCompleted(getCompletionRes(), taskId, false) - isEnd = true - return - } - setWorkflowProcessData(produce(getWorkflowProcessData()!, (draft) => { - draft.status = WorkflowRunningStatus.Succeeded - draft.files = getFilesInLogs(data.outputs || []) as any[] - })) - if (!data.outputs) { - setCompletionRes('') - } - else { - setCompletionRes(data.outputs) - const isStringOutput = Object.keys(data.outputs).length === 1 && typeof data.outputs[Object.keys(data.outputs)[0]] === 'string' - if (isStringOutput) { - setWorkflowProcessData(produce(getWorkflowProcessData()!, (draft) => { - draft.resultText = data.outputs[Object.keys(data.outputs)[0]] - })) - } - } - setRespondingFalse() - resetRunState() - setMessageId(tempMessageId) - onCompleted(getCompletionRes(), taskId, true) - isEnd = true - }, - onTextChunk: (params) => { - const { data: { text } } = params - setWorkflowProcessData(produce(getWorkflowProcessData()!, (draft) => { - draft.resultText += text - })) - }, - onTextReplace: (params) => { - const { data: { text } } = params - setWorkflowProcessData(produce(getWorkflowProcessData()!, (draft) => { - draft.resultText = text - })) - }, - onHumanInputRequired: ({ data: humanInputRequiredData }) => { - const workflowProcessData = getWorkflowProcessData() - setWorkflowProcessData(produce(workflowProcessData!, (draft) => { - if (!draft.humanInputFormDataList) { - draft.humanInputFormDataList = [humanInputRequiredData] - } - else { - const currentFormIndex = draft.humanInputFormDataList.findIndex(item => item.node_id === humanInputRequiredData.node_id) - if (currentFormIndex > -1) { - draft.humanInputFormDataList[currentFormIndex] = humanInputRequiredData - } - else { - draft.humanInputFormDataList.push(humanInputRequiredData) - } - } - const currentIndex = draft.tracing!.findIndex(item => item.node_id === humanInputRequiredData.node_id) - if (currentIndex > -1) { - draft.tracing![currentIndex].status = NodeRunningStatus.Paused - } - })) - }, - onHumanInputFormFilled: ({ data: humanInputFilledFormData }) => { - setWorkflowProcessData(produce(getWorkflowProcessData()!, (draft) => { - if (draft.humanInputFormDataList?.length) { - const currentFormIndex = draft.humanInputFormDataList.findIndex(item => item.node_id === humanInputFilledFormData.node_id) - draft.humanInputFormDataList.splice(currentFormIndex, 1) - } - if (!draft.humanInputFilledFormDataList) { - draft.humanInputFilledFormDataList = [humanInputFilledFormData] - } - else { - draft.humanInputFilledFormDataList.push(humanInputFilledFormData) - } - })) - }, - onHumanInputFormTimeout: ({ data: humanInputFormTimeoutData }) => { - setWorkflowProcessData(produce(getWorkflowProcessData()!, (draft) => { - if (draft.humanInputFormDataList?.length) { - const currentFormIndex = draft.humanInputFormDataList.findIndex(item => item.node_id === humanInputFormTimeoutData.node_id) - draft.humanInputFormDataList[currentFormIndex].expiration_time = humanInputFormTimeoutData.expiration_time - } - })) - }, - onWorkflowPaused: ({ data: workflowPausedData }) => { - tempMessageId = workflowPausedData.workflow_run_id - const url = `/workflow/${workflowPausedData.workflow_run_id}/events` - sseGet( - url, - {}, - otherOptions, - ) - setWorkflowProcessData(produce(getWorkflowProcessData()!, (draft) => { - draft.expand = false - draft.status = WorkflowRunningStatus.Paused - })) - }, - } - sendWorkflowMessage( - data, - otherOptions, - appSourceType, - appId, - ).catch((error) => { - setRespondingFalse() - resetRunState() - const message = error instanceof Error ? error.message : String(error) - notify({ type: 'error', message }) - }) - } - else { - sendCompletionMessage(data, { - onData: (data: string, _isFirstMessage: boolean, { messageId, taskId }) => { - tempMessageId = messageId - if (taskId && typeof taskId === 'string' && taskId.trim() !== '') - setCurrentTaskId(prev => prev ?? taskId) - res.push(data) - setCompletionRes(res.join('')) - }, - onCompleted: () => { - if (isTimeout) { - notify({ type: 'warning', message: t('warningMessage.timeoutExceeded', { ns: 'appDebug' }) }) - return - } - setRespondingFalse() - resetRunState() - setMessageId(tempMessageId) - onCompleted(getCompletionRes(), taskId, true) - isEnd = true - }, - onMessageReplace: (messageReplace) => { - res = [messageReplace.answer] - setCompletionRes(res.join('')) - }, - onError() { - if (isTimeout) { - notify({ type: 'warning', message: t('warningMessage.timeoutExceeded', { ns: 'appDebug' }) }) - return - } - setRespondingFalse() - resetRunState() - onCompleted(getCompletionRes(), taskId, false) - isEnd = true - }, - getAbortController: (abortController) => { - abortControllerRef.current = abortController - }, - }, appSourceType, appId) - } - } - - const [controlClearMoreLikeThis, setControlClearMoreLikeThis] = useState(0) - useEffect(() => { - if (controlSend) { - handleSend() - setControlClearMoreLikeThis(Date.now()) - } - }, [controlSend]) - - useEffect(() => { - if (controlRetry) - handleSend() - }, [controlRetry]) + const isNoData = !runState.completionRes const renderTextGenerationRes = () => ( <> - {!hideInlineStopButton && isResponding && currentTaskId && ( + {!hideInlineStopButton && runState.isResponding && runState.currentTaskId && (
@@ -650,15 +120,15 @@ const Result: FC = ({ )} = ({ // isLoading={isCallBatchAPI ? (!completionRes && isResponding) : false} isLoading={false} taskId={isCallBatchAPI ? ((taskId as number) < 10 ? `0${taskId}` : `${taskId}`) : undefined} - controlClearMoreLikeThis={controlClearMoreLikeThis} + controlClearMoreLikeThis={runState.controlClearMoreLikeThis} isShowTextToSpeech={isShowTextToSpeech} hideProcessDetail siteInfo={siteInfo} @@ -677,7 +147,7 @@ const Result: FC = ({ return ( <> {!isCallBatchAPI && !isWorkflow && ( - (isResponding && !completionRes) + (runState.isResponding && !runState.completionRes) ? (
@@ -692,13 +162,13 @@ const Result: FC = ({ ) )} {!isCallBatchAPI && isWorkflow && ( - (isResponding && !workflowProcessData) + (runState.isResponding && !runState.workflowProcessData) ? (
) - : !workflowProcessData + : !runState.workflowProcessData ? : renderTextGenerationRes() )} diff --git a/web/app/components/share/text-generation/result/result-request.ts b/web/app/components/share/text-generation/result/result-request.ts new file mode 100644 index 0000000000..95b2353dff --- /dev/null +++ b/web/app/components/share/text-generation/result/result-request.ts @@ -0,0 +1,156 @@ +import type { FileEntity } from '@/app/components/base/file-uploader/types' +import type { PromptConfig } from '@/models/debug' +import type { VisionFile, VisionSettings } from '@/types/app' +import { getProcessedFiles } from '@/app/components/base/file-uploader/utils' +import { TransferMethod } from '@/types/app' +import { formatBooleanInputs } from '@/utils/model-config' + +export type ResultInputValue + = | string + | boolean + | number + | string[] + | Record + | FileEntity + | FileEntity[] + | undefined + +type Translate = (key: string, options?: Record) => string + +type ValidationResult = { + canSend: boolean + notification?: { + type: 'error' | 'info' + message: string + } +} + +type ValidateResultRequestParams = { + completionFiles: VisionFile[] + inputs: Record + isCallBatchAPI: boolean + promptConfig: PromptConfig | null + t: Translate +} + +type BuildResultRequestDataParams = { + completionFiles: VisionFile[] + inputs: Record + promptConfig: PromptConfig | null + visionConfig: VisionSettings +} + +const isMissingRequiredInput = ( + variable: PromptConfig['prompt_variables'][number], + value: ResultInputValue, +) => { + if (value === undefined || value === null) + return true + + if (variable.type === 'file-list') + return !Array.isArray(value) || value.length === 0 + + if (['string', 'paragraph', 'number', 'json_object', 'select'].includes(variable.type)) + return typeof value !== 'string' ? false : value.trim() === '' + + return false +} + +const hasPendingLocalFiles = (completionFiles: VisionFile[]) => { + return completionFiles.some(item => item.transfer_method === TransferMethod.local_file && !item.upload_file_id) +} + +export const validateResultRequest = ({ + completionFiles, + inputs, + isCallBatchAPI, + promptConfig, + t, +}: ValidateResultRequestParams): ValidationResult => { + if (isCallBatchAPI) + return { canSend: true } + + const promptVariables = promptConfig?.prompt_variables + if (!promptVariables?.length) { + if (hasPendingLocalFiles(completionFiles)) { + return { + canSend: false, + notification: { + type: 'info', + message: t('errorMessage.waitForFileUpload', { ns: 'appDebug' }), + }, + } + } + + return { canSend: true } + } + + const requiredVariables = promptVariables.filter(({ key, name, required, type }) => { + if (type === 'boolean' || type === 'checkbox') + return false + + return (!key || !key.trim()) || (!name || !name.trim()) || required === undefined || required === null || required + }) + + const missingRequiredVariable = requiredVariables.find(variable => isMissingRequiredInput(variable, inputs[variable.key]))?.name + if (missingRequiredVariable) { + return { + canSend: false, + notification: { + type: 'error', + message: t('errorMessage.valueOfVarRequired', { + ns: 'appDebug', + key: missingRequiredVariable, + }), + }, + } + } + + if (hasPendingLocalFiles(completionFiles)) { + return { + canSend: false, + notification: { + type: 'info', + message: t('errorMessage.waitForFileUpload', { ns: 'appDebug' }), + }, + } + } + + return { canSend: true } +} + +export const buildResultRequestData = ({ + completionFiles, + inputs, + promptConfig, + visionConfig, +}: BuildResultRequestDataParams) => { + const processedInputs = { + ...formatBooleanInputs(promptConfig?.prompt_variables, inputs as Record), + } + + promptConfig?.prompt_variables.forEach((variable) => { + const value = processedInputs[variable.key] + if (variable.type === 'file' && value && typeof value === 'object' && !Array.isArray(value)) { + processedInputs[variable.key] = getProcessedFiles([value as FileEntity])[0] + return + } + + if (variable.type === 'file-list' && Array.isArray(value) && value.length > 0) + processedInputs[variable.key] = getProcessedFiles(value as FileEntity[]) + }) + + return { + inputs: processedInputs, + ...(visionConfig.enabled && completionFiles.length > 0 + ? { + files: completionFiles.map((item) => { + if (item.transfer_method === TransferMethod.local_file) + return { ...item, url: '' } + + return item + }), + } + : {}), + } +} diff --git a/web/app/components/share/text-generation/result/workflow-stream-handlers.ts b/web/app/components/share/text-generation/result/workflow-stream-handlers.ts new file mode 100644 index 0000000000..843bac9e2c --- /dev/null +++ b/web/app/components/share/text-generation/result/workflow-stream-handlers.ts @@ -0,0 +1,404 @@ +import type { Dispatch, SetStateAction } from 'react' +import type { WorkflowProcess } from '@/app/components/base/chat/types' +import type { IOtherOptions } from '@/service/base' +import type { HumanInputFormTimeoutData, NodeTracing, WorkflowFinishedResponse } from '@/types/workflow' +import { produce } from 'immer' +import { getFilesInLogs } from '@/app/components/base/file-uploader/utils' +import { NodeRunningStatus, WorkflowRunningStatus } from '@/app/components/workflow/types' +import { sseGet } from '@/service/base' + +type Notify = (payload: { type: 'error' | 'warning', message: string }) => void +type Translate = (key: string, options?: Record) => string + +type CreateWorkflowStreamHandlersParams = { + getCompletionRes: () => string + getWorkflowProcessData: () => WorkflowProcess | undefined + isTimedOut: () => boolean + markEnded: () => void + notify: Notify + onCompleted: (completionRes: string, taskId?: number, success?: boolean) => void + resetRunState: () => void + setCompletionRes: (res: string) => void + setCurrentTaskId: Dispatch> + setIsStopping: Dispatch> + setMessageId: Dispatch> + setRespondingFalse: () => void + setWorkflowProcessData: (data: WorkflowProcess | undefined) => void + t: Translate + taskId?: number +} + +const createInitialWorkflowProcess = (): WorkflowProcess => ({ + status: WorkflowRunningStatus.Running, + tracing: [], + expand: false, + resultText: '', +}) + +const updateWorkflowProcess = ( + current: WorkflowProcess | undefined, + updater: (draft: WorkflowProcess) => void, +) => { + return produce(current ?? createInitialWorkflowProcess(), updater) +} + +const matchParallelTrace = (trace: WorkflowProcess['tracing'][number], data: NodeTracing) => { + return trace.node_id === data.node_id + && (trace.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id + || trace.parallel_id === data.execution_metadata?.parallel_id) +} + +const ensureParallelTraceDetails = (details?: NodeTracing['details']) => { + return details?.length ? details : [[]] +} + +const appendParallelStart = (current: WorkflowProcess | undefined, data: NodeTracing) => { + return updateWorkflowProcess(current, (draft) => { + draft.expand = true + draft.tracing.push({ + ...data, + details: ensureParallelTraceDetails(data.details), + status: NodeRunningStatus.Running, + expand: true, + }) + }) +} + +const appendParallelNext = (current: WorkflowProcess | undefined, data: NodeTracing) => { + return updateWorkflowProcess(current, (draft) => { + draft.expand = true + const trace = draft.tracing.find(item => matchParallelTrace(item, data)) + if (!trace) + return + + trace.details = ensureParallelTraceDetails(trace.details) + trace.details.push([]) + }) +} + +const finishParallelTrace = (current: WorkflowProcess | undefined, data: NodeTracing) => { + return updateWorkflowProcess(current, (draft) => { + draft.expand = true + const traceIndex = draft.tracing.findIndex(item => matchParallelTrace(item, data)) + if (traceIndex > -1) { + draft.tracing[traceIndex] = { + ...data, + expand: !!data.error, + } + } + }) +} + +const upsertWorkflowNode = (current: WorkflowProcess | undefined, data: NodeTracing) => { + if (data.iteration_id || data.loop_id) + return current + + return updateWorkflowProcess(current, (draft) => { + draft.expand = true + const currentIndex = draft.tracing.findIndex(item => item.node_id === data.node_id) + const nextTrace = { + ...data, + status: NodeRunningStatus.Running, + expand: true, + } + + if (currentIndex > -1) + draft.tracing[currentIndex] = nextTrace + else + draft.tracing.push(nextTrace) + }) +} + +const finishWorkflowNode = (current: WorkflowProcess | undefined, data: NodeTracing) => { + if (data.iteration_id || data.loop_id) + return current + + return updateWorkflowProcess(current, (draft) => { + const currentIndex = draft.tracing.findIndex(trace => matchParallelTrace(trace, data)) + if (currentIndex > -1) { + draft.tracing[currentIndex] = { + ...(draft.tracing[currentIndex].extras + ? { extras: draft.tracing[currentIndex].extras } + : {}), + ...data, + expand: !!data.error, + } + } + }) +} + +const markNodesStopped = (traces?: WorkflowProcess['tracing']) => { + if (!traces) + return + + const markTrace = (trace: WorkflowProcess['tracing'][number]) => { + if ([NodeRunningStatus.Running, NodeRunningStatus.Waiting].includes(trace.status as NodeRunningStatus)) + trace.status = NodeRunningStatus.Stopped + + trace.details?.forEach(detailGroup => detailGroup.forEach(markTrace)) + trace.retryDetail?.forEach(markTrace) + trace.parallelDetail?.children?.forEach(markTrace) + } + + traces.forEach(markTrace) +} + +const applyWorkflowFinishedState = ( + current: WorkflowProcess | undefined, + status: WorkflowRunningStatus, +) => { + return updateWorkflowProcess(current, (draft) => { + draft.status = status + if ([WorkflowRunningStatus.Stopped, WorkflowRunningStatus.Failed].includes(status)) + markNodesStopped(draft.tracing) + }) +} + +const applyWorkflowOutputs = ( + current: WorkflowProcess | undefined, + outputs: WorkflowFinishedResponse['data']['outputs'], +) => { + return updateWorkflowProcess(current, (draft) => { + draft.status = WorkflowRunningStatus.Succeeded + draft.files = getFilesInLogs(outputs || []) as unknown as WorkflowProcess['files'] + }) +} + +const appendResultText = (current: WorkflowProcess | undefined, text: string) => { + return updateWorkflowProcess(current, (draft) => { + draft.resultText = `${draft.resultText || ''}${text}` + }) +} + +const replaceResultText = (current: WorkflowProcess | undefined, text: string) => { + return updateWorkflowProcess(current, (draft) => { + draft.resultText = text + }) +} + +const updateHumanInputRequired = ( + current: WorkflowProcess | undefined, + data: NonNullable[number], +) => { + return updateWorkflowProcess(current, (draft) => { + if (!draft.humanInputFormDataList) { + draft.humanInputFormDataList = [data] + } + else { + const currentFormIndex = draft.humanInputFormDataList.findIndex(item => item.node_id === data.node_id) + if (currentFormIndex > -1) + draft.humanInputFormDataList[currentFormIndex] = data + else + draft.humanInputFormDataList.push(data) + } + + const currentIndex = draft.tracing.findIndex(item => item.node_id === data.node_id) + if (currentIndex > -1) + draft.tracing[currentIndex].status = NodeRunningStatus.Paused + }) +} + +const updateHumanInputFilled = ( + current: WorkflowProcess | undefined, + data: NonNullable[number], +) => { + return updateWorkflowProcess(current, (draft) => { + if (draft.humanInputFormDataList?.length) { + const currentFormIndex = draft.humanInputFormDataList.findIndex(item => item.node_id === data.node_id) + if (currentFormIndex > -1) + draft.humanInputFormDataList.splice(currentFormIndex, 1) + } + + if (!draft.humanInputFilledFormDataList) + draft.humanInputFilledFormDataList = [data] + else + draft.humanInputFilledFormDataList.push(data) + }) +} + +const updateHumanInputTimeout = ( + current: WorkflowProcess | undefined, + data: HumanInputFormTimeoutData, +) => { + return updateWorkflowProcess(current, (draft) => { + if (!draft.humanInputFormDataList?.length) + return + + const currentFormIndex = draft.humanInputFormDataList.findIndex(item => item.node_id === data.node_id) + if (currentFormIndex > -1) + draft.humanInputFormDataList[currentFormIndex].expiration_time = data.expiration_time + }) +} + +const applyWorkflowPaused = (current: WorkflowProcess | undefined) => { + return updateWorkflowProcess(current, (draft) => { + draft.expand = false + draft.status = WorkflowRunningStatus.Paused + }) +} + +const serializeWorkflowOutputs = (outputs: WorkflowFinishedResponse['data']['outputs']) => { + if (outputs === undefined || outputs === null) + return '' + + if (typeof outputs === 'string') + return outputs + + try { + return JSON.stringify(outputs) ?? '' + } + catch { + return String(outputs) + } +} + +export const createWorkflowStreamHandlers = ({ + getCompletionRes, + getWorkflowProcessData, + isTimedOut, + markEnded, + notify, + onCompleted, + resetRunState, + setCompletionRes, + setCurrentTaskId, + setIsStopping, + setMessageId, + setRespondingFalse, + setWorkflowProcessData, + t, + taskId, +}: CreateWorkflowStreamHandlersParams): IOtherOptions => { + let tempMessageId = '' + + const finishWithFailure = () => { + setRespondingFalse() + resetRunState() + onCompleted(getCompletionRes(), taskId, false) + markEnded() + } + + const finishWithSuccess = () => { + setRespondingFalse() + resetRunState() + setMessageId(tempMessageId) + onCompleted(getCompletionRes(), taskId, true) + markEnded() + } + + const otherOptions: IOtherOptions = { + onWorkflowStarted: ({ workflow_run_id, task_id }) => { + const workflowProcessData = getWorkflowProcessData() + if (workflowProcessData?.tracing.length) { + setWorkflowProcessData(updateWorkflowProcess(workflowProcessData, (draft) => { + draft.expand = true + draft.status = WorkflowRunningStatus.Running + })) + return + } + + tempMessageId = workflow_run_id + setCurrentTaskId(task_id || null) + setIsStopping(false) + setWorkflowProcessData(createInitialWorkflowProcess()) + }, + onIterationStart: ({ data }) => { + setWorkflowProcessData(appendParallelStart(getWorkflowProcessData(), data)) + }, + onIterationNext: ({ data }) => { + setWorkflowProcessData(appendParallelNext(getWorkflowProcessData(), data)) + }, + onIterationFinish: ({ data }) => { + setWorkflowProcessData(finishParallelTrace(getWorkflowProcessData(), data)) + }, + onLoopStart: ({ data }) => { + setWorkflowProcessData(appendParallelStart(getWorkflowProcessData(), data)) + }, + onLoopNext: ({ data }) => { + setWorkflowProcessData(appendParallelNext(getWorkflowProcessData(), data)) + }, + onLoopFinish: ({ data }) => { + setWorkflowProcessData(finishParallelTrace(getWorkflowProcessData(), data)) + }, + onNodeStarted: ({ data }) => { + setWorkflowProcessData(upsertWorkflowNode(getWorkflowProcessData(), data)) + }, + onNodeFinished: ({ data }) => { + setWorkflowProcessData(finishWorkflowNode(getWorkflowProcessData(), data)) + }, + onWorkflowFinished: ({ data }) => { + if (isTimedOut()) { + notify({ type: 'warning', message: t('warningMessage.timeoutExceeded', { ns: 'appDebug' }) }) + return + } + + const workflowStatus = data.status as WorkflowRunningStatus | undefined + if (workflowStatus === WorkflowRunningStatus.Stopped) { + setWorkflowProcessData(applyWorkflowFinishedState(getWorkflowProcessData(), WorkflowRunningStatus.Stopped)) + finishWithFailure() + return + } + + if (data.error) { + notify({ type: 'error', message: data.error }) + setWorkflowProcessData(applyWorkflowFinishedState(getWorkflowProcessData(), WorkflowRunningStatus.Failed)) + finishWithFailure() + return + } + + setWorkflowProcessData(applyWorkflowOutputs(getWorkflowProcessData(), data.outputs)) + const serializedOutputs = serializeWorkflowOutputs(data.outputs) + setCompletionRes(serializedOutputs) + if (data.outputs) { + const outputKeys = Object.keys(data.outputs) + const isStringOutput = outputKeys.length === 1 && typeof data.outputs[outputKeys[0]] === 'string' + if (isStringOutput) { + setWorkflowProcessData(updateWorkflowProcess(getWorkflowProcessData(), (draft) => { + draft.resultText = data.outputs[outputKeys[0]] + })) + } + } + + finishWithSuccess() + }, + onTextChunk: ({ data: { text } }) => { + setWorkflowProcessData(appendResultText(getWorkflowProcessData(), text)) + }, + onTextReplace: ({ data: { text } }) => { + setWorkflowProcessData(replaceResultText(getWorkflowProcessData(), text)) + }, + onHumanInputRequired: ({ data }) => { + setWorkflowProcessData(updateHumanInputRequired(getWorkflowProcessData(), data)) + }, + onHumanInputFormFilled: ({ data }) => { + setWorkflowProcessData(updateHumanInputFilled(getWorkflowProcessData(), data)) + }, + onHumanInputFormTimeout: ({ data }) => { + setWorkflowProcessData(updateHumanInputTimeout(getWorkflowProcessData(), data)) + }, + onWorkflowPaused: ({ data }) => { + tempMessageId = data.workflow_run_id + void sseGet(`/workflow/${data.workflow_run_id}/events`, {}, otherOptions) + setWorkflowProcessData(applyWorkflowPaused(getWorkflowProcessData())) + }, + } + + return otherOptions +} + +export { + appendParallelNext, + appendParallelStart, + appendResultText, + applyWorkflowFinishedState, + applyWorkflowOutputs, + applyWorkflowPaused, + finishParallelTrace, + finishWorkflowNode, + markNodesStopped, + replaceResultText, + updateHumanInputFilled, + updateHumanInputRequired, + updateHumanInputTimeout, + upsertWorkflowNode, +} diff --git a/web/eslint-suppressions.json b/web/eslint-suppressions.json index 141e3d6983..5b7f0e3bc1 100644 --- a/web/eslint-suppressions.json +++ b/web/eslint-suppressions.json @@ -5964,11 +5964,8 @@ } }, "app/components/share/text-generation/result/index.tsx": { - "react-hooks-extra/no-direct-set-state-in-use-effect": { - "count": 3 - }, "ts/no-explicit-any": { - "count": 3 + "count": 1 } }, "app/components/share/text-generation/run-batch/csv-download/index.tsx": { diff --git a/web/scripts/components-coverage-thresholds.mjs b/web/scripts/components-coverage-thresholds.mjs index d61a6ad814..b73de41f12 100644 --- a/web/scripts/components-coverage-thresholds.mjs +++ b/web/scripts/components-coverage-thresholds.mjs @@ -92,10 +92,10 @@ export const COMPONENT_MODULE_THRESHOLDS = { branches: 90, }, 'share': { - lines: 15, - statements: 15, - functions: 20, - branches: 20, + lines: 95, + statements: 95, + functions: 95, + branches: 95, }, 'signin': { lines: 95,