feat: Human Input node (Frontend Part) (#31631)

Co-authored-by: JzoNg <jzongcode@gmail.com>
Co-authored-by: Joel <iamjoel007@gmail.com>
Co-authored-by: yessenia <yessenia.contact@gmail.com>
Co-authored-by: QuantumGhost <obelisk.reg+git@gmail.com>
This commit is contained in:
Wu Tianwei
2026-01-30 10:16:46 +08:00
committed by GitHub
parent 5bf0251554
commit fedd097f63
198 changed files with 10955 additions and 1683 deletions

View File

@ -18,7 +18,7 @@ import {
useStore,
useWorkflowStore,
} from '../../store'
import { BlockEnum } from '../../types'
import { BlockEnum, WorkflowRunningStatus } from '../../types'
import ConversationVariableModal from './conversation-variable-modal'
import Empty from './empty'
import { useChat } from './hooks'
@ -84,7 +84,9 @@ const ChatWrapper = (
suggestedQuestions,
handleSend,
handleRestart,
setTargetMessageId,
handleSwitchSibling,
handleSubmitHumanInputForm,
getHumanInputNodeData,
} = useChat(
config,
{
@ -121,6 +123,22 @@ const ChatWrapper = (
doSend(editedQuestion ? editedQuestion.message : question.content, editedQuestion ? editedQuestion.files : question.message_files, true, isValidGeneratedAnswer(parentAnswer) ? parentAnswer : null)
}, [chatList, doSend])
const doSwitchSibling = useCallback((siblingMessageId: string) => {
handleSwitchSibling(siblingMessageId, {
onGetSuggestedQuestions: (messageId, getAbortController) => fetchSuggestedQuestions(appDetail!.id, messageId, getAbortController),
})
}, [handleSwitchSibling, appDetail])
const doHumanInputFormSubmit = useCallback(async (formToken: string, formData: any) => {
// Handle human input form submission
await handleSubmitHumanInputForm(formToken, formData)
}, [handleSubmitHumanInputForm])
const inputDisabled = useMemo(() => {
const latestMessage = chatList[chatList.length - 1]
return latestMessage?.isAnswer && (latestMessage.workflowProcess?.status === WorkflowRunningStatus.Paused)
}, [chatList])
const { eventEmitter } = useEventEmitterContextContext()
eventEmitter?.useSubscription((v: any) => {
if (v.type === EVENT_WORKFLOW_STOP)
@ -168,6 +186,8 @@ const ChatWrapper = (
inputsForm={(startVariables || []) as any}
onRegenerate={doRegenerate}
onStopResponding={handleStop}
onHumanInputFormSubmit={doHumanInputFormSubmit}
getHumanInputNodeData={getHumanInputNodeData}
chatNode={(
<>
{showInputsFieldsPanel && <UserInput />}
@ -182,7 +202,9 @@ const ChatWrapper = (
suggestedQuestions={suggestedQuestions}
showPromptLog
chatAnswerContainerInner="!pr-2"
switchSibling={setTargetMessageId}
switchSibling={doSwitchSibling}
inputDisabled={inputDisabled}
hideAvatar
/>
{showConversationVariableModal && (
<ConversationVariableModal

View File

@ -5,6 +5,7 @@ import type {
Inputs,
} from '@/app/components/base/chat/types'
import type { FileEntity } from '@/app/components/base/file-uploader/types'
import type { IOtherOptions } from '@/service/base'
import { uniqBy } from 'es-toolkit/compat'
import { produce, setAutoFreeze } from 'immer'
import {
@ -15,6 +16,7 @@ import {
useState,
} from 'react'
import { useTranslation } from 'react-i18next'
import { useStoreApi } from 'reactflow'
import {
getProcessedInputs,
processOpeningStatement,
@ -25,7 +27,12 @@ import {
getProcessedFilesFromResponse,
} from '@/app/components/base/file-uploader/utils'
import { useToastContext } from '@/app/components/base/toast'
import {
CUSTOM_NODE,
} from '@/app/components/workflow/constants'
import { sseGet } from '@/service/base'
import { useInvalidAllLastRun } from '@/service/use-workflow'
import { submitHumanInputForm } from '@/service/workflow'
import { TransferMethod } from '@/types/app'
import { DEFAULT_ITER_TIMES, DEFAULT_LOOP_TIMES } from '../../constants'
import {
@ -58,6 +65,7 @@ export const useChat = (
const taskIdRef = useRef('')
const [isResponding, setIsResponding] = useState(false)
const isRespondingRef = useRef(false)
const workflowEventsAbortControllerRef = useRef<AbortController | null>(null)
const configsMap = useHooksStore(s => s.configsMap)
const invalidAllLastRun = useInvalidAllLastRun(configsMap?.flowType, configsMap?.flowId)
const { fetchInspectVars } = useSetWorkflowVarsWithValue()
@ -67,6 +75,7 @@ export const useChat = (
setIterTimes,
setLoopTimes,
} = workflowStore.getState()
const store = useStoreApi()
const handleResponding = useCallback((isResponding: boolean) => {
setIsResponding(isResponding)
@ -131,6 +140,29 @@ export const useChat = (
})
}, [])
type UpdateChatTreeNode = {
(id: string, fields: Partial<ChatItemInTree>): void
(id: string, update: (node: ChatItemInTree) => void): void
}
const updateChatTreeNode: UpdateChatTreeNode = useCallback((
id: string,
fieldsOrUpdate: Partial<ChatItemInTree> | ((node: ChatItemInTree) => void),
) => {
const nextState = produceChatTreeNode(id, (node) => {
if (typeof fieldsOrUpdate === 'function') {
fieldsOrUpdate(node)
}
else {
Object.keys(fieldsOrUpdate).forEach((key) => {
(node as any)[key] = (fieldsOrUpdate as any)[key]
})
}
})
setChatTree(nextState)
chatTreeRef.current = nextState
}, [produceChatTreeNode])
const handleStop = useCallback(() => {
hasStopResponded.current = true
handleResponding(false)
@ -140,6 +172,8 @@ export const useChat = (
setLoopTimes(DEFAULT_LOOP_TIMES)
if (suggestedQuestionsAbortControllerRef.current)
suggestedQuestionsAbortControllerRef.current.abort()
if (workflowEventsAbortControllerRef.current)
workflowEventsAbortControllerRef.current.abort()
}, [handleResponding, setIterTimes, setLoopTimes, stopChat])
const handleRestart = useCallback(() => {
@ -206,6 +240,10 @@ export const useChat = (
return false
}
// Abort previous handleResume SSE connection if any
if (workflowEventsAbortControllerRef.current)
workflowEventsAbortControllerRef.current.abort()
const parentMessage = threadMessages.find(item => item.id === params.parent_message_id)
const placeholderQuestionId = `question-${Date.now()}`
@ -243,6 +281,8 @@ export const useChat = (
isAnswer: true,
parentMessageId: questionItem.id,
siblingIndex: parentMessage?.children?.length ?? chatTree.length,
humanInputFormDataList: [],
humanInputFilledFormDataList: [],
}
handleResponding(true)
@ -270,6 +310,9 @@ export const useChat = (
handleRun(
bodyParams,
{
getAbortController: (abortController) => {
workflowEventsAbortControllerRef.current = abortController
},
onData: (message: string, isFirstMessage: boolean, { conversationId: newConversationId, messageId, taskId }: any) => {
responseItem.content = responseItem.content + message
@ -295,35 +338,38 @@ export const useChat = (
})
},
async onCompleted(hasError?: boolean, errorMessage?: string) {
const { workflowRunningData } = workflowStore.getState()
handleResponding(false)
fetchInspectVars({})
invalidAllLastRun()
if (workflowRunningData?.result.status !== WorkflowRunningStatus.Paused) {
fetchInspectVars({})
invalidAllLastRun()
if (hasError) {
if (errorMessage) {
responseItem.content = errorMessage
responseItem.isError = true
updateCurrentQAOnTree({
placeholderQuestionId,
questionItem,
responseItem,
parentId: params.parent_message_id,
})
if (hasError) {
if (errorMessage) {
responseItem.content = errorMessage
responseItem.isError = true
updateCurrentQAOnTree({
placeholderQuestionId,
questionItem,
responseItem,
parentId: params.parent_message_id,
})
}
return
}
return
}
if (config?.suggested_questions_after_answer?.enabled && !hasStopResponded.current && onGetSuggestedQuestions) {
try {
const { data }: any = await onGetSuggestedQuestions(
responseItem.id,
newAbortController => suggestedQuestionsAbortControllerRef.current = newAbortController,
)
setSuggestQuestions(data)
}
// eslint-disable-next-line unused-imports/no-unused-vars
catch (error) {
setSuggestQuestions([])
if (config?.suggested_questions_after_answer?.enabled && !hasStopResponded.current && onGetSuggestedQuestions) {
try {
const { data }: any = await onGetSuggestedQuestions(
responseItem.id,
newAbortController => suggestedQuestionsAbortControllerRef.current = newAbortController,
)
setSuggestQuestions(data)
}
// eslint-disable-next-line unused-imports/no-unused-vars
catch (error) {
setSuggestQuestions([])
}
}
}
},
@ -345,12 +391,29 @@ export const useChat = (
onError() {
handleResponding(false)
},
onWorkflowStarted: ({ workflow_run_id, task_id }) => {
taskIdRef.current = task_id
responseItem.workflow_run_id = workflow_run_id
responseItem.workflowProcess = {
status: WorkflowRunningStatus.Running,
tracing: [],
onWorkflowStarted: ({ workflow_run_id, task_id, conversation_id, message_id }) => {
// If there are no streaming messages, we still need to set the conversation_id to avoid create a new conversation when regeneration in chat-flow.
if (conversation_id) {
conversationId.current = conversation_id
}
if (message_id && !hasSetResponseId) {
questionItem.id = `question-${message_id}`
responseItem.id = message_id
responseItem.parentMessageId = questionItem.id
hasSetResponseId = true
}
if (responseItem.workflowProcess && responseItem.workflowProcess.tracing.length > 0) {
handleResponding(true)
responseItem.workflowProcess.status = WorkflowRunningStatus.Running
}
else {
taskIdRef.current = task_id
responseItem.workflow_run_id = workflow_run_id
responseItem.workflowProcess = {
status: WorkflowRunningStatus.Running,
tracing: [],
}
}
updateCurrentQAOnTree({
placeholderQuestionId,
@ -423,10 +486,19 @@ export const useChat = (
}
},
onNodeStarted: ({ data }) => {
responseItem.workflowProcess!.tracing!.push({
...data,
status: NodeRunningStatus.Running,
} as any)
const currentIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.node_id === data.node_id)
if (currentIndex > -1) {
responseItem.workflowProcess!.tracing![currentIndex] = {
...data,
status: NodeRunningStatus.Running,
}
}
else {
responseItem.workflowProcess!.tracing!.push({
...data,
status: NodeRunningStatus.Running,
})
}
updateCurrentQAOnTree({
placeholderQuestionId,
questionItem,
@ -499,17 +571,383 @@ export const useChat = (
})
}
},
onHumanInputRequired: ({ data }) => {
if (!responseItem.humanInputFormDataList) {
responseItem.humanInputFormDataList = [data]
}
else {
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === data.node_id)
if (currentFormIndex > -1) {
responseItem.humanInputFormDataList[currentFormIndex] = data
}
else {
responseItem.humanInputFormDataList.push(data)
}
}
const currentTracingIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.node_id === data.node_id)
if (currentTracingIndex > -1) {
responseItem.workflowProcess!.tracing[currentTracingIndex].status = NodeRunningStatus.Paused
updateCurrentQAOnTree({
placeholderQuestionId,
questionItem,
responseItem,
parentId: params.parent_message_id,
})
}
},
onHumanInputFormFilled: ({ data }) => {
if (responseItem.humanInputFormDataList?.length) {
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === data.node_id)
responseItem.humanInputFormDataList.splice(currentFormIndex, 1)
}
if (!responseItem.humanInputFilledFormDataList) {
responseItem.humanInputFilledFormDataList = [data]
}
else {
responseItem.humanInputFilledFormDataList.push(data)
}
updateCurrentQAOnTree({
placeholderQuestionId,
questionItem,
responseItem,
parentId: params.parent_message_id,
})
},
onHumanInputFormTimeout: ({ data }) => {
if (responseItem.humanInputFormDataList?.length) {
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === data.node_id)
responseItem.humanInputFormDataList[currentFormIndex].expiration_time = data.expiration_time
}
updateCurrentQAOnTree({
placeholderQuestionId,
questionItem,
responseItem,
parentId: params.parent_message_id,
})
},
onWorkflowPaused: ({ data: _data }) => {
responseItem.workflowProcess!.status = WorkflowRunningStatus.Paused
updateCurrentQAOnTree({
placeholderQuestionId,
questionItem,
responseItem,
parentId: params.parent_message_id,
})
},
},
)
}, [threadMessages, chatTree.length, updateCurrentQAOnTree, handleResponding, formSettings?.inputsForm, handleRun, notify, t, config?.suggested_questions_after_answer?.enabled, fetchInspectVars, invalidAllLastRun])
}, [threadMessages, chatTree.length, updateCurrentQAOnTree, handleResponding, formSettings?.inputsForm, handleRun, notify, t, workflowStore, fetchInspectVars, invalidAllLastRun, config?.suggested_questions_after_answer?.enabled])
const handleSubmitHumanInputForm = async (formToken: string, formData: any) => {
await submitHumanInputForm(formToken, formData)
}
const getHumanInputNodeData = (nodeID: string) => {
const {
getNodes,
} = store.getState()
const nodes = getNodes().filter(node => node.type === CUSTOM_NODE)
const node = nodes.find(n => n.id === nodeID)
return node
}
const handleResume = useCallback((
messageId: string,
workflowRunId: string,
{
onGetSuggestedQuestions,
}: SendCallback,
) => {
// Re-subscribe to workflow events for the specific message
const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true`
const otherOptions: IOtherOptions = {
getAbortController: (abortController) => {
workflowEventsAbortControllerRef.current = abortController
},
onData: (message: string, _isFirstMessage: boolean, { conversationId: newConversationId, messageId: msgId, taskId }: any) => {
updateChatTreeNode(messageId, (responseItem) => {
responseItem.content = responseItem.content + message
if (msgId)
responseItem.id = msgId
})
if (newConversationId)
conversationId.current = newConversationId
if (taskId)
taskIdRef.current = taskId
},
async onCompleted(hasError?: boolean) {
const { workflowRunningData } = workflowStore.getState()
handleResponding(false)
if (workflowRunningData?.result.status !== WorkflowRunningStatus.Paused) {
fetchInspectVars({})
invalidAllLastRun()
if (hasError)
return
if (config?.suggested_questions_after_answer?.enabled && !hasStopResponded.current && onGetSuggestedQuestions) {
try {
const { data }: any = await onGetSuggestedQuestions(
messageId,
newAbortController => suggestedQuestionsAbortControllerRef.current = newAbortController,
)
setSuggestQuestions(data)
}
catch {
setSuggestQuestions([])
}
}
}
},
onMessageEnd: (messageEnd) => {
updateChatTreeNode(messageId, (responseItem) => {
responseItem.citation = messageEnd.metadata?.retriever_resources || []
const processedFilesFromResponse = getProcessedFilesFromResponse(messageEnd.files || [])
responseItem.allFiles = uniqBy([...(responseItem.allFiles || []), ...(processedFilesFromResponse || [])], 'id')
})
},
onMessageReplace: (messageReplace) => {
updateChatTreeNode(messageId, (responseItem) => {
responseItem.content = messageReplace.answer
})
},
onError() {
handleResponding(false)
},
onWorkflowStarted: ({ workflow_run_id, task_id }) => {
handleResponding(true)
hasStopResponded.current = false
updateChatTreeNode(messageId, (responseItem) => {
if (responseItem.workflowProcess && responseItem.workflowProcess.tracing.length > 0) {
responseItem.workflowProcess.status = WorkflowRunningStatus.Running
}
else {
taskIdRef.current = task_id
responseItem.workflow_run_id = workflow_run_id
responseItem.workflowProcess = {
status: WorkflowRunningStatus.Running,
tracing: [],
}
}
})
},
onWorkflowFinished: ({ data: workflowFinishedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (responseItem.workflowProcess)
responseItem.workflowProcess.status = workflowFinishedData.status as WorkflowRunningStatus
})
},
onIterationStart: ({ data: iterationStartedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess)
return
if (!responseItem.workflowProcess.tracing)
responseItem.workflowProcess.tracing = []
responseItem.workflowProcess.tracing.push({
...iterationStartedData,
status: WorkflowRunningStatus.Running,
})
})
},
onIterationFinish: ({ data: iterationFinishedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess?.tracing)
return
const tracing = responseItem.workflowProcess.tracing
const iterationIndex = tracing.findIndex(item => item.node_id === iterationFinishedData.node_id
&& (item.execution_metadata?.parallel_id === iterationFinishedData.execution_metadata?.parallel_id || item.parallel_id === iterationFinishedData.execution_metadata?.parallel_id))!
if (iterationIndex > -1) {
tracing[iterationIndex] = {
...tracing[iterationIndex],
...iterationFinishedData,
status: WorkflowRunningStatus.Succeeded,
}
}
})
},
onNodeStarted: ({ data: nodeStartedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess)
return
if (!responseItem.workflowProcess.tracing)
responseItem.workflowProcess.tracing = []
const currentIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === nodeStartedData.node_id)
if (currentIndex > -1) {
responseItem.workflowProcess.tracing[currentIndex] = {
...nodeStartedData,
status: NodeRunningStatus.Running,
}
}
else {
if (nodeStartedData.iteration_id)
return
responseItem.workflowProcess.tracing.push({
...nodeStartedData,
status: WorkflowRunningStatus.Running,
})
}
})
},
onNodeFinished: ({ data: nodeFinishedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess?.tracing)
return
if (nodeFinishedData.iteration_id)
return
const currentIndex = responseItem.workflowProcess.tracing.findIndex((item) => {
if (!item.execution_metadata?.parallel_id)
return item.id === nodeFinishedData.id
return item.id === nodeFinishedData.id && (item.execution_metadata?.parallel_id === nodeFinishedData.execution_metadata?.parallel_id)
})
if (currentIndex > -1)
responseItem.workflowProcess.tracing[currentIndex] = nodeFinishedData as any
})
},
onLoopStart: ({ data: loopStartedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess)
return
if (!responseItem.workflowProcess.tracing)
responseItem.workflowProcess.tracing = []
responseItem.workflowProcess.tracing.push({
...loopStartedData,
status: WorkflowRunningStatus.Running,
})
})
},
onLoopFinish: ({ data: loopFinishedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess?.tracing)
return
const tracing = responseItem.workflowProcess.tracing
const loopIndex = tracing.findIndex(item => item.node_id === loopFinishedData.node_id
&& (item.execution_metadata?.parallel_id === loopFinishedData.execution_metadata?.parallel_id || item.parallel_id === loopFinishedData.execution_metadata?.parallel_id))!
if (loopIndex > -1) {
tracing[loopIndex] = {
...tracing[loopIndex],
...loopFinishedData,
status: WorkflowRunningStatus.Succeeded,
}
}
})
},
onHumanInputRequired: ({ data: humanInputRequiredData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.humanInputFormDataList) {
responseItem.humanInputFormDataList = [humanInputRequiredData]
}
else {
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputRequiredData.node_id)
if (currentFormIndex > -1) {
responseItem.humanInputFormDataList[currentFormIndex] = humanInputRequiredData
}
else {
responseItem.humanInputFormDataList.push(humanInputRequiredData)
}
}
if (responseItem.workflowProcess?.tracing) {
const currentTracingIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === humanInputRequiredData.node_id)
if (currentTracingIndex > -1)
responseItem.workflowProcess.tracing[currentTracingIndex].status = NodeRunningStatus.Paused
}
})
},
onHumanInputFormFilled: ({ data: humanInputFilledFormData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (responseItem.humanInputFormDataList?.length) {
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputFilledFormData.node_id)
if (currentFormIndex > -1)
responseItem.humanInputFormDataList.splice(currentFormIndex, 1)
}
if (!responseItem.humanInputFilledFormDataList) {
responseItem.humanInputFilledFormDataList = [humanInputFilledFormData]
}
else {
responseItem.humanInputFilledFormDataList.push(humanInputFilledFormData)
}
})
},
onHumanInputFormTimeout: ({ data: humanInputFormTimeoutData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (responseItem.humanInputFormDataList?.length) {
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputFormTimeoutData.node_id)
responseItem.humanInputFormDataList[currentFormIndex].expiration_time = humanInputFormTimeoutData.expiration_time
}
})
},
onWorkflowPaused: ({ data: workflowPausedData }) => {
const resumeUrl = `/workflow/${workflowPausedData.workflow_run_id}/events`
sseGet(
resumeUrl,
{},
otherOptions,
)
updateChatTreeNode(messageId, (responseItem) => {
responseItem.workflowProcess!.status = WorkflowRunningStatus.Paused
})
},
}
if (workflowEventsAbortControllerRef.current)
workflowEventsAbortControllerRef.current.abort()
sseGet(
url,
{},
otherOptions,
)
}, [updateChatTreeNode, handleResponding, workflowStore, fetchInspectVars, invalidAllLastRun, config?.suggested_questions_after_answer])
const handleSwitchSibling = useCallback((
siblingMessageId: string,
callbacks: SendCallback,
) => {
setTargetMessageId(siblingMessageId)
// Helper to find message in tree
const findMessageInTree = (nodes: ChatItemInTree[], targetId: string): ChatItemInTree | undefined => {
for (const node of nodes) {
if (node.id === targetId)
return node
if (node.children) {
const found = findMessageInTree(node.children, targetId)
if (found)
return found
}
}
return undefined
}
const targetMessage = findMessageInTree(chatTreeRef.current, siblingMessageId)
if (targetMessage?.workflow_run_id && targetMessage.humanInputFormDataList && targetMessage.humanInputFormDataList.length > 0) {
handleResume(
targetMessage.id,
targetMessage.workflow_run_id,
callbacks,
)
}
}, [handleResume])
return {
conversationId: conversationId.current,
chatList,
setTargetMessageId,
handleSwitchSibling,
handleSend,
handleStop,
handleRestart,
handleResume,
handleSubmitHumanInputForm,
getHumanInputNodeData,
isResponding,
suggestedQuestions,
}