mirror of
https://github.com/langgenius/dify.git
synced 2026-05-17 15:36:25 +08:00
Compare commits
18 Commits
1.14.0
...
feat/repla
| Author | SHA1 | Date | |
|---|---|---|---|
| 73a1dc293f | |||
| 26c00ffd85 | |||
| 33db927327 | |||
| d226a5bdd3 | |||
| 2c5a9ec1a0 | |||
| b59d8d3446 | |||
| 87bbcc4e3b | |||
| 91008c0329 | |||
| 49e12984c3 | |||
| 34c46561e9 | |||
| a25e3b66c1 | |||
| 45b0751826 | |||
| ea58415f88 | |||
| 73ff36cb0e | |||
| 6afdde1bc4 | |||
| c294006ecf | |||
| 53c0dde3d5 | |||
| a1759fccb8 |
@ -84,6 +84,9 @@ class WorkflowEventsApi(WebApiResource):
|
||||
|
||||
def _generate_stream_events():
|
||||
if include_state_snapshot:
|
||||
# TODO(wylswz): events between shapshot and live tail may be lost.
|
||||
# TODO(wylswz): previous message chunks are not replayed. In order to support replay, we need
|
||||
# to figure out a way to deduplicate events between snapshot and stream.
|
||||
return generator.convert_to_event_stream(
|
||||
build_workflow_event_stream(
|
||||
app_mode=app_mode,
|
||||
|
||||
@ -311,7 +311,7 @@ class MessageBasedAppGenerator(BaseAppGenerator):
|
||||
cls,
|
||||
app_mode: AppMode,
|
||||
workflow_run_id: str,
|
||||
idle_timeout=300,
|
||||
idle_timeout: float = 300,
|
||||
on_subscribe: Callable[[], None] | None = None,
|
||||
) -> Generator[Mapping | str, None, None]:
|
||||
topic = cls.get_response_topic(app_mode, workflow_run_id)
|
||||
|
||||
@ -23,7 +23,7 @@ class MessageGenerator:
|
||||
cls,
|
||||
app_mode: AppMode,
|
||||
workflow_run_id: str,
|
||||
idle_timeout=300,
|
||||
idle_timeout: float = 300,
|
||||
ping_interval: float = 10.0,
|
||||
on_subscribe: Callable[[], None] | None = None,
|
||||
) -> Generator[Mapping | str, None, None]:
|
||||
|
||||
@ -27,6 +27,9 @@ def stream_topic_events(
|
||||
terminal_values = _normalize_terminal_events(terminal_events)
|
||||
last_msg_time = time.time()
|
||||
last_ping_time = last_msg_time
|
||||
# The application layer intentionally does not use broadcast-channel replay;
|
||||
# callers that need historical events should compose them from persisted state
|
||||
# (see ``build_workflow_event_stream``) and then tail the live stream.
|
||||
with topic.subscribe() as sub:
|
||||
# on_subscribe fires only after the Redis subscription is active.
|
||||
# This is used to gate task start and reduce pub/sub race for the first event.
|
||||
|
||||
@ -410,7 +410,7 @@ class WorkflowBasedAppRunner:
|
||||
elif isinstance(event, GraphRunFailedEvent):
|
||||
self._publish_event(QueueWorkflowFailedEvent(error=event.error, exceptions_count=event.exceptions_count))
|
||||
elif isinstance(event, GraphRunAbortedEvent):
|
||||
self._publish_event(QueueWorkflowFailedEvent(error=event.reason or "Unknown error", exceptions_count=0))
|
||||
self._publish_event(QueueWorkflowPartialSuccessEvent(outputs={}, exceptions_count=0))
|
||||
elif isinstance(event, GraphRunPausedEvent):
|
||||
runtime_state = workflow_entry.graph_engine.graph_runtime_state
|
||||
paused_nodes = runtime_state.get_paused_nodes()
|
||||
|
||||
@ -58,6 +58,7 @@ class MessageListItem(ResponseModel):
|
||||
message_files: list[MessageFile]
|
||||
status: str
|
||||
error: str | None = None
|
||||
workflow_run_id: str | None = None
|
||||
extra_contents: list[ExecutionExtraContentDomainModel]
|
||||
|
||||
@field_validator("inputs", mode="before")
|
||||
|
||||
@ -76,7 +76,6 @@ class _StreamsSubscription(Subscription):
|
||||
# reading and writing the _listener / `_closed` attribute.
|
||||
self._lock = threading.Lock()
|
||||
self._closed: bool = False
|
||||
# self._closed = threading.Event()
|
||||
self._listener: threading.Thread | None = None
|
||||
|
||||
def _listen(self) -> None:
|
||||
|
||||
@ -37,10 +37,13 @@ class AppTaskService:
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
# Legacy mechanism: Set stop flag in Redis
|
||||
AppQueueManager.set_stop_flag(task_id, invoke_from, user_id)
|
||||
|
||||
# New mechanism: Send stop command via GraphEngine for workflow-based apps
|
||||
# This ensures proper workflow status recording in the persistence layer
|
||||
if app_mode in (AppMode.ADVANCED_CHAT, AppMode.WORKFLOW):
|
||||
# Let the event handler process the Graphon abort event instead of
|
||||
# stopping the queue listener immediately. Otherwise, events may be
|
||||
# lost and the workflow run can remain stuck in the running state.
|
||||
GraphEngineManager(redis_client).send_stop_command(task_id)
|
||||
else:
|
||||
# Legacy mechanism: Set stop flag in Redis
|
||||
AppQueueManager.set_stop_flag(task_id, invoke_from, user_id)
|
||||
|
||||
@ -62,6 +62,19 @@ def build_workflow_event_stream(
|
||||
idle_timeout: float = 300,
|
||||
ping_interval: float = 10.0,
|
||||
) -> Generator[Mapping[str, Any] | str, None, None]:
|
||||
"""Yield a stream of workflow events composed of a DB-derived snapshot followed by live tail.
|
||||
|
||||
The stream is assembled in two phases that are kept **structurally disjoint** so no
|
||||
per-event deduplication is required:
|
||||
|
||||
1. Snapshot phase: events rebuilt from persistent state via ``_build_snapshot_events``
|
||||
(``workflow_started``, optional ``message_replace``, ``node_started``/``node_finished``
|
||||
for each persisted execution, and an optional terminal ``workflow_paused``). This
|
||||
represents the history that already happened from the client's point of view.
|
||||
2. Tail phase: events delivered by the broadcast subscription **from the moment of
|
||||
subscription onward only**. Anything that was published before the subscription was
|
||||
established is ignored here, since message id is not tracked in current implementation.
|
||||
"""
|
||||
topic = MessageGenerator.get_response_topic(app_mode, workflow_run.id)
|
||||
workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
|
||||
node_execution_repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository(session_maker)
|
||||
|
||||
@ -50,6 +50,7 @@ def make_message():
|
||||
msg.user_feedback = MagicMock(rating=None)
|
||||
msg.status = "normal"
|
||||
msg.error = None
|
||||
msg.workflow_run_id = "22222222-2222-2222-2222-222222222222"
|
||||
return msg
|
||||
|
||||
|
||||
@ -84,6 +85,8 @@ class TestMessageListApi:
|
||||
assert result["limit"] == 20
|
||||
assert result["has_more"] is False
|
||||
assert len(result["data"]) == 2
|
||||
assert result["data"][0]["workflow_run_id"] == "22222222-2222-2222-2222-222222222222"
|
||||
assert result["data"][1]["workflow_run_id"] == "22222222-2222-2222-2222-222222222222"
|
||||
|
||||
def test_get_not_chat_app(self):
|
||||
api = module.MessageListApi()
|
||||
|
||||
@ -991,7 +991,7 @@
|
||||
"count": 2
|
||||
},
|
||||
"ts/no-explicit-any": {
|
||||
"count": 17
|
||||
"count": 20
|
||||
}
|
||||
},
|
||||
"web/app/components/base/chat/chat/index.tsx": {
|
||||
|
||||
@ -125,6 +125,7 @@ const defaultChatHookReturn: Partial<ChatHookReturn> = {
|
||||
handleSend: vi.fn(),
|
||||
handleStop: vi.fn(),
|
||||
handleSwitchSibling: vi.fn(),
|
||||
handleReconnect: vi.fn(),
|
||||
isResponding: false,
|
||||
suggestedQuestions: [],
|
||||
}
|
||||
@ -605,6 +606,95 @@ describe('ChatWrapper', () => {
|
||||
expect(handleSwitchSibling).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should reconnect to a recent running workflow on mount', () => {
|
||||
const handleReconnect = vi.fn()
|
||||
vi.mocked(useChat).mockReturnValue({
|
||||
...defaultChatHookReturn,
|
||||
chatList: [],
|
||||
handleReconnect,
|
||||
} as unknown as ChatHookReturn)
|
||||
|
||||
vi.mocked(useChatWithHistoryContext).mockReturnValue({
|
||||
...defaultContextValue,
|
||||
appPrevChatTree: [{
|
||||
id: 'running-answer',
|
||||
isAnswer: true,
|
||||
content: 'partial content',
|
||||
workflow_run_id: 'run-active',
|
||||
created_at: Math.floor(Date.now() / 1000) - 30,
|
||||
children: [],
|
||||
} as unknown as ChatItemInTree],
|
||||
})
|
||||
|
||||
render(<ChatWrapper />)
|
||||
expect(handleReconnect).toHaveBeenCalledWith(
|
||||
'running-answer',
|
||||
'run-active',
|
||||
expect.objectContaining({ isPublicAPI: true }),
|
||||
)
|
||||
})
|
||||
|
||||
it('should not reconnect to an old workflow beyond the retention window', () => {
|
||||
const handleReconnect = vi.fn()
|
||||
vi.mocked(useChat).mockReturnValue({
|
||||
...defaultChatHookReturn,
|
||||
chatList: [],
|
||||
handleReconnect,
|
||||
} as unknown as ChatHookReturn)
|
||||
|
||||
vi.mocked(useChatWithHistoryContext).mockReturnValue({
|
||||
...defaultContextValue,
|
||||
appPrevChatTree: [{
|
||||
id: 'old-answer',
|
||||
isAnswer: true,
|
||||
content: 'old content',
|
||||
workflow_run_id: 'run-old',
|
||||
created_at: Math.floor(Date.now() / 1000) - 700,
|
||||
children: [],
|
||||
} as unknown as ChatItemInTree],
|
||||
})
|
||||
|
||||
render(<ChatWrapper />)
|
||||
expect(handleReconnect).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should prefer paused workflow over running workflow for reconnection', () => {
|
||||
const handleSwitchSibling = vi.fn()
|
||||
const handleReconnect = vi.fn()
|
||||
vi.mocked(useChat).mockReturnValue({
|
||||
...defaultChatHookReturn,
|
||||
chatList: [],
|
||||
handleSwitchSibling,
|
||||
handleReconnect,
|
||||
} as unknown as ChatHookReturn)
|
||||
|
||||
vi.mocked(useChatWithHistoryContext).mockReturnValue({
|
||||
...defaultContextValue,
|
||||
appPrevChatTree: [
|
||||
{
|
||||
id: 'running-answer',
|
||||
isAnswer: true,
|
||||
content: '',
|
||||
workflow_run_id: 'run-active',
|
||||
created_at: Math.floor(Date.now() / 1000) - 10,
|
||||
children: [],
|
||||
} as unknown as ChatItemInTree,
|
||||
{
|
||||
id: 'paused-answer',
|
||||
isAnswer: true,
|
||||
content: '',
|
||||
workflow_run_id: 'run-paused',
|
||||
humanInputFormDataList: [{ node_id: 'n-1' }],
|
||||
children: [],
|
||||
} as unknown as ChatItemInTree,
|
||||
],
|
||||
})
|
||||
|
||||
render(<ChatWrapper />)
|
||||
expect(handleSwitchSibling).toHaveBeenCalledWith('paused-answer', expect.any(Object))
|
||||
expect(handleReconnect).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should call stopChatMessageResponding when handleStop is triggered', () => {
|
||||
const handleStop = vi.fn()
|
||||
vi.mocked(useChat).mockReturnValue({
|
||||
|
||||
@ -1836,6 +1836,82 @@ describe('useChatWithHistory', () => {
|
||||
expect(messageWithFiles?.children?.[0]?.message_files).toHaveLength(1)
|
||||
expect(messageWithFiles?.children?.[0]?.agent_thoughts?.[0]?.message_files).toHaveLength(1)
|
||||
})
|
||||
|
||||
it('should pass through workflow_run_id from item and created_at', async () => {
|
||||
const listData = createConversationData({
|
||||
data: [createConversationItem({ id: 'conversation-1' })],
|
||||
})
|
||||
mockFetchConversations.mockResolvedValue(listData)
|
||||
mockFetchChatList.mockResolvedValue({
|
||||
data: [
|
||||
{
|
||||
id: 'msg-running',
|
||||
query: 'Running query',
|
||||
answer: 'Running answer',
|
||||
message_files: [],
|
||||
feedback: null,
|
||||
retriever_resources: [],
|
||||
agent_thoughts: null,
|
||||
parent_message_id: null,
|
||||
inputs: {},
|
||||
status: 'normal',
|
||||
workflow_run_id: 'wf-direct-id',
|
||||
created_at: 1700000000,
|
||||
},
|
||||
],
|
||||
})
|
||||
|
||||
const { result } = await renderWithClient(() => useChatWithHistory())
|
||||
|
||||
await waitFor(() => {
|
||||
expect(result!.current.appPrevChatTree.length).toBeGreaterThan(0)
|
||||
})
|
||||
|
||||
const answerNode = result!.current.appPrevChatTree[0]?.children?.[0]
|
||||
expect(answerNode?.workflow_run_id).toBe('wf-direct-id')
|
||||
expect(answerNode?.created_at).toBe(1700000000)
|
||||
})
|
||||
|
||||
it('should prefer item.workflow_run_id over extra_contents workflow_run_id', async () => {
|
||||
const listData = createConversationData({
|
||||
data: [createConversationItem({ id: 'conversation-1' })],
|
||||
})
|
||||
mockFetchConversations.mockResolvedValue(listData)
|
||||
mockFetchChatList.mockResolvedValue({
|
||||
data: [
|
||||
{
|
||||
id: 'msg-both',
|
||||
query: 'Both query',
|
||||
answer: 'Both answer',
|
||||
message_files: [],
|
||||
feedback: null,
|
||||
retriever_resources: [],
|
||||
agent_thoughts: null,
|
||||
parent_message_id: null,
|
||||
inputs: {},
|
||||
status: 'paused',
|
||||
workflow_run_id: 'wf-item-level',
|
||||
extra_contents: [
|
||||
{
|
||||
type: 'human_input',
|
||||
submitted: false,
|
||||
form_definition: { fields: [] },
|
||||
workflow_run_id: 'wf-extra-level',
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
})
|
||||
|
||||
const { result } = await renderWithClient(() => useChatWithHistory())
|
||||
|
||||
await waitFor(() => {
|
||||
expect(result!.current.appPrevChatTree.length).toBeGreaterThan(0)
|
||||
})
|
||||
|
||||
const answerNode = result!.current.appPrevChatTree[0]?.children?.[0]
|
||||
expect(answerNode?.workflow_run_id).toBe('wf-item-level')
|
||||
})
|
||||
})
|
||||
|
||||
// Scenario: newConversation merge replaces existing conversation item when id already exists.
|
||||
|
||||
@ -79,6 +79,7 @@ const ChatWrapper = () => {
|
||||
handleSend,
|
||||
handleStop,
|
||||
handleSwitchSibling,
|
||||
handleReconnect,
|
||||
isResponding: respondingState,
|
||||
suggestedQuestions,
|
||||
} = useChat(
|
||||
@ -140,37 +141,47 @@ const ChatWrapper = () => {
|
||||
setIsResponding(respondingState)
|
||||
}, [respondingState, setIsResponding])
|
||||
|
||||
// Resume paused workflows when chat history is loaded
|
||||
// Resume paused workflows or reconnect to running workflows when chat history is loaded
|
||||
useEffect(() => {
|
||||
if (!appPrevChatTree || appPrevChatTree.length === 0)
|
||||
return
|
||||
|
||||
// Find the last answer item with workflow_run_id that needs resumption (DFS - find deepest first)
|
||||
let lastPausedNode: ChatItemInTree | undefined
|
||||
const findLastPausedWorkflow = (nodes: ChatItemInTree[]) => {
|
||||
nodes.forEach((node) => {
|
||||
// DFS: recurse to children first
|
||||
if (node.children && node.children.length > 0)
|
||||
findLastPausedWorkflow(node.children)
|
||||
const STREAM_RETENTION_SECONDS = 600
|
||||
|
||||
// Track the last node with humanInputFormDataList
|
||||
if (node.isAnswer && node.workflow_run_id && node.humanInputFormDataList && node.humanInputFormDataList.length > 0)
|
||||
lastPausedNode = node
|
||||
let lastPausedNode: ChatItemInTree | undefined
|
||||
let lastRunningNode: ChatItemInTree | undefined
|
||||
const findReconnectableWorkflow = (nodes: ChatItemInTree[]) => {
|
||||
nodes.forEach((node) => {
|
||||
if (node.isAnswer && node.workflow_run_id) {
|
||||
if (node.humanInputFormDataList && node.humanInputFormDataList.length > 0) {
|
||||
lastPausedNode = node
|
||||
}
|
||||
else if (
|
||||
node.created_at
|
||||
&& (Date.now() / 1000 - node.created_at) < STREAM_RETENTION_SECONDS
|
||||
) {
|
||||
lastRunningNode = node
|
||||
}
|
||||
}
|
||||
|
||||
if (node.children && node.children.length > 0)
|
||||
findReconnectableWorkflow(node.children)
|
||||
})
|
||||
}
|
||||
|
||||
findLastPausedWorkflow(appPrevChatTree)
|
||||
findReconnectableWorkflow(appPrevChatTree)
|
||||
|
||||
const callbacks = {
|
||||
onGetSuggestedQuestions: (responseItemId: string) => fetchSuggestedQuestions(responseItemId, appSourceType, appId),
|
||||
onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted,
|
||||
isPublicAPI: appSourceType === AppSourceType.webApp,
|
||||
}
|
||||
|
||||
// Only resume the last paused workflow
|
||||
if (lastPausedNode) {
|
||||
handleSwitchSibling(
|
||||
lastPausedNode.id,
|
||||
{
|
||||
onGetSuggestedQuestions: responseItemId => fetchSuggestedQuestions(responseItemId, appSourceType, appId),
|
||||
onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted,
|
||||
isPublicAPI: appSourceType === AppSourceType.webApp,
|
||||
},
|
||||
)
|
||||
handleSwitchSibling(lastPausedNode.id, callbacks)
|
||||
}
|
||||
else if (lastRunningNode) {
|
||||
handleReconnect(lastRunningNode.id, lastRunningNode.workflow_run_id!, callbacks)
|
||||
}
|
||||
}, [])
|
||||
|
||||
|
||||
@ -35,12 +35,12 @@ function getFormattedChatList(messages: any[]) {
|
||||
const answerFiles = item.message_files?.filter((file: any) => file.belongs_to === 'assistant') || []
|
||||
const humanInputFormDataList: HumanInputFormData[] = []
|
||||
const humanInputFilledFormDataList: HumanInputFilledFormData[] = []
|
||||
let workflowRunId = ''
|
||||
let workflowRunIdFromExtra = ''
|
||||
if (item.status === 'paused') {
|
||||
item.extra_contents?.forEach((content: ExtraContent) => {
|
||||
if (content.type === 'human_input' && !content.submitted) {
|
||||
humanInputFormDataList.push(content.form_definition)
|
||||
workflowRunId = content.workflow_run_id
|
||||
workflowRunIdFromExtra = content.workflow_run_id
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -62,7 +62,8 @@ function getFormattedChatList(messages: any[]) {
|
||||
parentMessageId: `question-${item.id}`,
|
||||
humanInputFormDataList,
|
||||
humanInputFilledFormDataList,
|
||||
workflow_run_id: workflowRunId,
|
||||
workflow_run_id: item.workflow_run_id || workflowRunIdFromExtra,
|
||||
created_at: item.created_at,
|
||||
})
|
||||
})
|
||||
return newChatList
|
||||
@ -437,7 +438,7 @@ export const useChatWithHistory = (installedAppInfo?: InstalledApp) => {
|
||||
if (conversationId === currentConversationId)
|
||||
handleNewConversation()
|
||||
handleUpdateConversationList()
|
||||
}, [isInstalledApp, appId, t, handleUpdateConversationList, handleNewConversation, currentConversationId, conversationDeleting])
|
||||
}, [conversationDeleting, currentConversationId, handleNewConversation, handleUpdateConversationList, appSourceType, appId, t])
|
||||
const [conversationRenaming, setConversationRenaming] = useState(false)
|
||||
const handleRenameConversation = useCallback(async (conversationId: string, newName: string, { onSuccess }: Callback) => {
|
||||
if (conversationRenaming)
|
||||
@ -463,7 +464,7 @@ export const useChatWithHistory = (installedAppInfo?: InstalledApp) => {
|
||||
finally {
|
||||
setConversationRenaming(false)
|
||||
}
|
||||
}, [isInstalledApp, appId, t, conversationRenaming, originConversationList])
|
||||
}, [conversationRenaming, t, appSourceType, appId, originConversationList])
|
||||
const handleNewConversationCompleted = useCallback((newConversationId: string) => {
|
||||
setNewConversationId(newConversationId)
|
||||
handleConversationIdInfoChange(newConversationId)
|
||||
|
||||
@ -1264,6 +1264,102 @@ describe('useChat', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('handleReconnect', () => {
|
||||
it('should call sseGet with include_state_snapshot and rebuild from snapshot events', () => {
|
||||
let callbacks: HookCallbacks
|
||||
|
||||
vi.mocked(sseGet).mockImplementation(async (_url, _params, options) => {
|
||||
callbacks = options as HookCallbacks
|
||||
})
|
||||
|
||||
const prevChatTree = [{
|
||||
id: 'q-1',
|
||||
content: 'query',
|
||||
isAnswer: false,
|
||||
children: [{
|
||||
id: 'm-reconnect',
|
||||
content: 'stale partial content',
|
||||
isAnswer: true,
|
||||
siblingIndex: 0,
|
||||
workflowProcess: { status: 'running', tracing: [{ node_id: 'old-node' }] },
|
||||
}],
|
||||
}]
|
||||
|
||||
const { result } = renderHook(() => useChat(undefined, undefined, prevChatTree as ChatItemInTree[]))
|
||||
|
||||
act(() => {
|
||||
result.current.handleReconnect('m-reconnect', 'wr-reconnect', { isPublicAPI: true })
|
||||
})
|
||||
|
||||
expect(sseGet).toHaveBeenCalledWith(
|
||||
'/workflow/wr-reconnect/events?include_state_snapshot=true',
|
||||
expect.any(Object),
|
||||
expect.any(Object),
|
||||
)
|
||||
|
||||
// Content is not reset until onWorkflowStarted fires
|
||||
const beforeStart = result.current.chatList[1]
|
||||
expect(beforeStart.content).toBe('stale partial content')
|
||||
|
||||
act(() => {
|
||||
callbacks.onWorkflowStarted({ workflow_run_id: 'wr-reconnect', task_id: 't-1' })
|
||||
})
|
||||
|
||||
// After onWorkflowStarted, content is reset and workflowProcess is fresh
|
||||
const afterStart = result.current.chatList[1]
|
||||
expect(afterStart.content).toBe('')
|
||||
expect(afterStart.workflowProcess).toEqual({ status: WorkflowRunningStatus.Running, tracing: [] })
|
||||
|
||||
act(() => {
|
||||
callbacks.onMessageReplace({ answer: 'full snapshot text' })
|
||||
callbacks.onNodeStarted({ data: { node_id: 'n-1', id: 'n-1', title: 'Node 1' } })
|
||||
callbacks.onNodeFinished({ data: { node_id: 'n-1', id: 'n-1', title: 'Node 1', status: 'succeeded' } })
|
||||
callbacks.onWorkflowFinished({ data: { status: 'succeeded' } })
|
||||
callbacks.onCompleted()
|
||||
})
|
||||
|
||||
const lastResponse = result.current.chatList[1]
|
||||
expect(lastResponse.content).toBe('full snapshot text')
|
||||
expect(lastResponse.workflowProcess?.status).toBe('succeeded')
|
||||
expect(lastResponse.workflowProcess?.tracing).toHaveLength(1)
|
||||
expect(result.current.isResponding).toBe(false)
|
||||
})
|
||||
|
||||
it('should abort previous stream when reconnecting again', () => {
|
||||
const callbacksList: HookCallbacks[] = []
|
||||
vi.mocked(sseGet).mockImplementation(async (_url, _params, options) => {
|
||||
callbacksList.push(options as HookCallbacks)
|
||||
})
|
||||
|
||||
const prevChatTree = [{
|
||||
id: 'q-1',
|
||||
content: 'query',
|
||||
isAnswer: false,
|
||||
children: [{
|
||||
id: 'm-rc',
|
||||
content: 'partial',
|
||||
isAnswer: true,
|
||||
siblingIndex: 0,
|
||||
}],
|
||||
}]
|
||||
|
||||
const { result } = renderHook(() => useChat(undefined, undefined, prevChatTree as ChatItemInTree[]))
|
||||
const previousAbort = createAbortControllerMock()
|
||||
|
||||
act(() => {
|
||||
result.current.handleReconnect('m-rc', 'wr-1', { isPublicAPI: true })
|
||||
})
|
||||
act(() => {
|
||||
callbacksList[0].getAbortController(previousAbort)
|
||||
})
|
||||
act(() => {
|
||||
result.current.handleReconnect('m-rc', 'wr-2', { isPublicAPI: true })
|
||||
})
|
||||
|
||||
expect(previousAbort.abort).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
})
|
||||
|
||||
describe('createAudioPlayerManager branch cases', () => {
|
||||
it('should handle ttsUrl generation for appId with installed apps', async () => {
|
||||
vi.mocked(usePathname).mockReturnValue('/explore/installed/app')
|
||||
@ -1444,6 +1540,89 @@ describe('useChat', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('onWorkflowStarted re-enables responding in handleSend', () => {
|
||||
it('should set isResponding back to true when onWorkflowStarted fires after stop', () => {
|
||||
let callbacks: HookCallbacks
|
||||
vi.mocked(ssePost).mockImplementation(async (_url, _params, options) => {
|
||||
callbacks = options as HookCallbacks
|
||||
})
|
||||
|
||||
const stopChat = vi.fn()
|
||||
const { result } = renderHook(() => useChat(undefined, undefined, undefined, stopChat))
|
||||
|
||||
act(() => {
|
||||
result.current.handleSend('test-url', { query: 'workflow restart' }, {})
|
||||
})
|
||||
expect(result.current.isResponding).toBe(true)
|
||||
|
||||
act(() => {
|
||||
callbacks.onWorkflowStarted({ workflow_run_id: 'wr-1', task_id: 't-1' })
|
||||
callbacks.onData('part', true, { messageId: 'm-1', conversationId: 'c-1', taskId: 't-1' })
|
||||
})
|
||||
|
||||
act(() => {
|
||||
result.current.handleStop()
|
||||
})
|
||||
expect(result.current.isResponding).toBe(false)
|
||||
|
||||
act(() => {
|
||||
callbacks.onWorkflowStarted({ workflow_run_id: 'wr-2', task_id: 't-2' })
|
||||
})
|
||||
expect(result.current.isResponding).toBe(true)
|
||||
})
|
||||
})
|
||||
|
||||
describe('abortInflightRequests on unmount', () => {
|
||||
it('should abort all in-flight requests when the hook unmounts', () => {
|
||||
let callbacks: HookCallbacks
|
||||
vi.mocked(ssePost).mockImplementation(async (_url, _params, options) => {
|
||||
callbacks = options as HookCallbacks
|
||||
})
|
||||
|
||||
const workflowAbort = createAbortControllerMock()
|
||||
const conversationAbort = createAbortControllerMock()
|
||||
const suggestedAbort = createAbortControllerMock()
|
||||
|
||||
const config = { suggested_questions_after_answer: { enabled: true } }
|
||||
const onGetConversationMessages = vi.fn().mockImplementation(async (_id: string, setAbort: (ac: AbortController) => void) => {
|
||||
setAbort(conversationAbort)
|
||||
return {
|
||||
data: [{
|
||||
id: 'm-1',
|
||||
answer: 'a',
|
||||
message: [{ role: 'assistant', text: 'a' }],
|
||||
created_at: Date.now(),
|
||||
answer_tokens: 1,
|
||||
message_tokens: 1,
|
||||
provider_response_latency: 0.1,
|
||||
inputs: {},
|
||||
query: 'q',
|
||||
}],
|
||||
}
|
||||
})
|
||||
const onGetSuggestedQuestions = vi.fn().mockImplementation(async (_id: string, setAbort: (ac: AbortController) => void) => {
|
||||
setAbort(suggestedAbort)
|
||||
return { data: [] }
|
||||
})
|
||||
|
||||
const { result, unmount } = renderHook(() => useChat(config as ChatConfig))
|
||||
|
||||
act(() => {
|
||||
result.current.handleSend('test-url', { query: 'unmount' }, {
|
||||
onGetConversationMessages,
|
||||
onGetSuggestedQuestions,
|
||||
})
|
||||
})
|
||||
act(() => {
|
||||
callbacks.getAbortController(workflowAbort)
|
||||
})
|
||||
|
||||
unmount()
|
||||
|
||||
expect(workflowAbort.abort).toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
|
||||
describe('annotations and siblings', () => {
|
||||
const prevChatTree = [{
|
||||
id: 'q-1',
|
||||
|
||||
@ -143,6 +143,67 @@ describe('Answer Component', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('ContentSwitch visibility while responding', () => {
|
||||
it('should hide ContentSwitch when responding in non-human-inputs layout', () => {
|
||||
render(
|
||||
<Answer
|
||||
{...defaultProps}
|
||||
responding={true}
|
||||
item={{
|
||||
...defaultProps.item,
|
||||
siblingCount: 3,
|
||||
siblingIndex: 1,
|
||||
prevSibling: 'msg-0',
|
||||
nextSibling: 'msg-2',
|
||||
} as unknown as ChatItem}
|
||||
switchSibling={vi.fn()}
|
||||
/>,
|
||||
)
|
||||
expect(screen.queryByRole('button', { name: 'Previous' })).not.toBeInTheDocument()
|
||||
expect(screen.queryByRole('button', { name: 'Next' })).not.toBeInTheDocument()
|
||||
})
|
||||
|
||||
it('should hide ContentSwitch when responding in human-inputs layout with content', () => {
|
||||
render(
|
||||
<Answer
|
||||
{...defaultProps}
|
||||
responding={true}
|
||||
item={{
|
||||
...defaultProps.item,
|
||||
content: 'partial response',
|
||||
siblingCount: 3,
|
||||
siblingIndex: 1,
|
||||
prevSibling: 'msg-0',
|
||||
nextSibling: 'msg-2',
|
||||
humanInputFormDataList: [{ id: 'form1' }],
|
||||
} as unknown as ChatItem}
|
||||
switchSibling={vi.fn()}
|
||||
/>,
|
||||
)
|
||||
expect(screen.queryByRole('button', { name: 'Previous' })).not.toBeInTheDocument()
|
||||
expect(screen.queryByRole('button', { name: 'Next' })).not.toBeInTheDocument()
|
||||
})
|
||||
|
||||
it('should show ContentSwitch when not responding with siblings', () => {
|
||||
render(
|
||||
<Answer
|
||||
{...defaultProps}
|
||||
responding={false}
|
||||
item={{
|
||||
...defaultProps.item,
|
||||
siblingCount: 3,
|
||||
siblingIndex: 1,
|
||||
prevSibling: 'msg-0',
|
||||
nextSibling: 'msg-2',
|
||||
} as unknown as ChatItem}
|
||||
switchSibling={vi.fn()}
|
||||
/>,
|
||||
)
|
||||
expect(screen.getByRole('button', { name: 'Previous' })).toBeInTheDocument()
|
||||
expect(screen.getByRole('button', { name: 'Next' })).toBeInTheDocument()
|
||||
})
|
||||
})
|
||||
|
||||
describe('Interactions', () => {
|
||||
it('should handle switch sibling', () => {
|
||||
const mockSwitchSibling = vi.fn()
|
||||
|
||||
@ -303,6 +303,7 @@ const Answer: FC<AnswerProps> = ({
|
||||
{
|
||||
typeof item.siblingCount === 'number'
|
||||
&& item.siblingCount > 1
|
||||
&& !responding
|
||||
&& (
|
||||
<ContentSwitch
|
||||
count={item.siblingCount}
|
||||
@ -408,7 +409,9 @@ const Answer: FC<AnswerProps> = ({
|
||||
}
|
||||
{
|
||||
typeof item.siblingCount === 'number'
|
||||
&& item.siblingCount > 1 && (
|
||||
&& item.siblingCount > 1
|
||||
&& !responding
|
||||
&& (
|
||||
<ContentSwitch
|
||||
count={item.siblingCount}
|
||||
currentIndex={item.siblingIndex}
|
||||
|
||||
@ -209,6 +209,19 @@ export const useChat = (
|
||||
cb?.()
|
||||
}, [handleStop])
|
||||
|
||||
const abortInflightRequests = useCallback(() => {
|
||||
conversationMessagesAbortControllerRef.current?.abort()
|
||||
suggestedQuestionsAbortControllerRef.current?.abort()
|
||||
workflowEventsAbortControllerRef.current?.abort()
|
||||
}, [])
|
||||
|
||||
// Abort all in-flight fetch/SSE requests when the consumer unmounts
|
||||
useEffect(() => {
|
||||
return () => {
|
||||
abortInflightRequests()
|
||||
}
|
||||
}, [abortInflightRequests])
|
||||
|
||||
const createAudioPlayerManager = useCallback(() => {
|
||||
let ttsUrl = ''
|
||||
let ttsIsPublic = false
|
||||
@ -244,7 +257,8 @@ export const useChat = (
|
||||
}: SendCallback,
|
||||
) => {
|
||||
const getOrCreatePlayer = createAudioPlayerManager()
|
||||
// Re-subscribe to workflow events for the specific message
|
||||
// Re-subscribe to workflow events for the specific message.
|
||||
// so all retained events are replayed on reconnection (e.g. page refresh).
|
||||
const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true`
|
||||
|
||||
const otherOptions: IOtherOptions = {
|
||||
@ -583,6 +597,320 @@ export const useChat = (
|
||||
)
|
||||
}, [updateChatTreeNode, handleResponding, createAudioPlayerManager, config?.suggested_questions_after_answer])
|
||||
|
||||
const handleReconnect = useCallback((
|
||||
messageId: string,
|
||||
workflowRunId: string,
|
||||
{
|
||||
onGetSuggestedQuestions,
|
||||
onConversationComplete,
|
||||
isPublicAPI,
|
||||
}: SendCallback,
|
||||
) => {
|
||||
const getOrCreatePlayer = createAudioPlayerManager()
|
||||
const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true`
|
||||
|
||||
const otherOptions: IOtherOptions = {
|
||||
isPublicAPI,
|
||||
getAbortController: (abortController) => {
|
||||
workflowEventsAbortControllerRef.current = abortController
|
||||
},
|
||||
onData: (message: string, isFirstMessage: boolean, { conversationId: newConversationId, messageId: msgId, taskId }: IOnDataMoreInfo) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
const isAgentMode = responseItem.agent_thoughts && responseItem.agent_thoughts.length > 0
|
||||
if (!isAgentMode) {
|
||||
responseItem.content = responseItem.content + message
|
||||
}
|
||||
else {
|
||||
const lastThought = responseItem.agent_thoughts?.[responseItem.agent_thoughts?.length - 1]
|
||||
if (lastThought)
|
||||
lastThought.thought = lastThought.thought + message
|
||||
}
|
||||
if (msgId)
|
||||
responseItem.id = msgId
|
||||
})
|
||||
|
||||
if (isFirstMessage && newConversationId)
|
||||
conversationIdRef.current = newConversationId
|
||||
|
||||
if (taskId)
|
||||
taskIdRef.current = taskId
|
||||
},
|
||||
async onCompleted(hasError?: boolean) {
|
||||
handleResponding(false)
|
||||
|
||||
if (hasError)
|
||||
return
|
||||
|
||||
if (onConversationComplete)
|
||||
onConversationComplete(conversationIdRef.current)
|
||||
|
||||
if (config?.suggested_questions_after_answer?.enabled && !hasStopRespondedRef.current && onGetSuggestedQuestions) {
|
||||
try {
|
||||
const { data }: any = await onGetSuggestedQuestions(
|
||||
messageId,
|
||||
newAbortController => suggestedQuestionsAbortControllerRef.current = newAbortController,
|
||||
)
|
||||
setSuggestedQuestions(data)
|
||||
}
|
||||
catch {
|
||||
setSuggestedQuestions([])
|
||||
}
|
||||
}
|
||||
},
|
||||
onFile(file) {
|
||||
const fileType = (file as { type?: string }).type || 'image'
|
||||
const baseFile = ('transferMethod' in file) ? (file as Partial<FileEntity>) : null
|
||||
const convertedFile: FileEntity = {
|
||||
id: baseFile?.id || (file as { id: string }).id,
|
||||
type: baseFile?.type || (fileType === 'image' ? 'image/png' : fileType === 'video' ? 'video/mp4' : fileType === 'audio' ? 'audio/mpeg' : 'application/octet-stream'),
|
||||
transferMethod: (baseFile?.transferMethod as FileEntity['transferMethod']) || (fileType === 'image' ? 'remote_url' : 'local_file'),
|
||||
uploadedId: baseFile?.uploadedId || (file as { id: string }).id,
|
||||
supportFileType: baseFile?.supportFileType || (fileType === 'image' ? 'image' : fileType === 'video' ? 'video' : fileType === 'audio' ? 'audio' : 'document'),
|
||||
progress: baseFile?.progress ?? 100,
|
||||
name: baseFile?.name || `generated_${fileType}.${fileType === 'image' ? 'png' : fileType === 'video' ? 'mp4' : fileType === 'audio' ? 'mp3' : 'bin'}`,
|
||||
url: baseFile?.url || (file as { url?: string }).url,
|
||||
size: baseFile?.size ?? 0,
|
||||
}
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
const lastThought = responseItem.agent_thoughts?.[responseItem.agent_thoughts?.length - 1]
|
||||
if (lastThought) {
|
||||
responseItem.agent_thoughts!.at(-1)!.message_files = [...(lastThought as any).message_files, convertedFile]
|
||||
}
|
||||
else {
|
||||
const currentFiles = (responseItem.message_files as FileEntity[] | undefined) ?? []
|
||||
responseItem.message_files = [...currentFiles, convertedFile]
|
||||
}
|
||||
})
|
||||
},
|
||||
onThought(thought) {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (thought.message_id)
|
||||
responseItem.id = thought.message_id
|
||||
if (thought.conversation_id)
|
||||
responseItem.conversationId = thought.conversation_id
|
||||
if (!responseItem.agent_thoughts)
|
||||
responseItem.agent_thoughts = []
|
||||
if (responseItem.agent_thoughts.length === 0) {
|
||||
responseItem.agent_thoughts.push(thought)
|
||||
}
|
||||
else {
|
||||
const lastThought = responseItem.agent_thoughts.at(-1)
|
||||
if (lastThought?.id === thought.id) {
|
||||
thought.thought = lastThought.thought
|
||||
thought.message_files = lastThought.message_files
|
||||
responseItem.agent_thoughts[responseItem.agent_thoughts.length - 1] = thought
|
||||
}
|
||||
else {
|
||||
responseItem.agent_thoughts.push(thought)
|
||||
}
|
||||
}
|
||||
})
|
||||
},
|
||||
onMessageEnd: (messageEnd) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (messageEnd.metadata?.annotation_reply) {
|
||||
responseItem.annotation = ({
|
||||
id: messageEnd.metadata.annotation_reply.id,
|
||||
authorName: messageEnd.metadata.annotation_reply.account.name,
|
||||
})
|
||||
return
|
||||
}
|
||||
responseItem.citation = messageEnd.metadata?.retriever_resources || []
|
||||
const processedFilesFromResponse = getProcessedFilesFromResponse(messageEnd.files || [])
|
||||
responseItem.allFiles = uniqBy([...(responseItem.allFiles || []), ...(processedFilesFromResponse || [])], 'id')
|
||||
})
|
||||
},
|
||||
onMessageReplace: (messageReplace) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
responseItem.content = messageReplace.answer
|
||||
})
|
||||
},
|
||||
onError() {
|
||||
handleResponding(false)
|
||||
},
|
||||
onWorkflowStarted: ({ workflow_run_id, task_id }) => {
|
||||
handleResponding(true)
|
||||
hasStopRespondedRef.current = false
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
taskIdRef.current = task_id
|
||||
responseItem.content = ''
|
||||
responseItem.workflow_run_id = workflow_run_id
|
||||
responseItem.workflowProcess = {
|
||||
status: WorkflowRunningStatus.Running,
|
||||
tracing: [],
|
||||
}
|
||||
})
|
||||
},
|
||||
onWorkflowFinished: ({ data: workflowFinishedData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (responseItem.workflowProcess)
|
||||
responseItem.workflowProcess.status = workflowFinishedData.status as WorkflowRunningStatus
|
||||
})
|
||||
},
|
||||
onIterationStart: ({ data: iterationStartedData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (!responseItem.workflowProcess)
|
||||
return
|
||||
if (!responseItem.workflowProcess.tracing)
|
||||
responseItem.workflowProcess.tracing = []
|
||||
responseItem.workflowProcess.tracing.push({
|
||||
...iterationStartedData,
|
||||
status: WorkflowRunningStatus.Running,
|
||||
})
|
||||
})
|
||||
},
|
||||
onIterationFinish: ({ data: iterationFinishedData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (!responseItem.workflowProcess?.tracing)
|
||||
return
|
||||
const tracing = responseItem.workflowProcess.tracing
|
||||
const iterationIndex = tracing.findIndex(item => item.node_id === iterationFinishedData.node_id
|
||||
&& (item.execution_metadata?.parallel_id === iterationFinishedData.execution_metadata?.parallel_id || item.parallel_id === iterationFinishedData.execution_metadata?.parallel_id))!
|
||||
if (iterationIndex > -1) {
|
||||
tracing[iterationIndex] = {
|
||||
...tracing[iterationIndex],
|
||||
...iterationFinishedData,
|
||||
status: WorkflowRunningStatus.Succeeded,
|
||||
}
|
||||
}
|
||||
})
|
||||
},
|
||||
onNodeStarted: ({ data: nodeStartedData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (!responseItem.workflowProcess)
|
||||
return
|
||||
if (!responseItem.workflowProcess.tracing)
|
||||
responseItem.workflowProcess.tracing = []
|
||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === nodeStartedData.node_id)
|
||||
if (currentIndex > -1) {
|
||||
responseItem.workflowProcess.tracing[currentIndex] = {
|
||||
...nodeStartedData,
|
||||
status: NodeRunningStatus.Running,
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (nodeStartedData.iteration_id)
|
||||
return
|
||||
responseItem.workflowProcess.tracing.push({
|
||||
...nodeStartedData,
|
||||
status: WorkflowRunningStatus.Running,
|
||||
})
|
||||
}
|
||||
})
|
||||
},
|
||||
onNodeFinished: ({ data: nodeFinishedData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (!responseItem.workflowProcess?.tracing)
|
||||
return
|
||||
if (nodeFinishedData.iteration_id)
|
||||
return
|
||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex((item) => {
|
||||
if (!item.execution_metadata?.parallel_id)
|
||||
return item.id === nodeFinishedData.id
|
||||
return item.id === nodeFinishedData.id && (item.execution_metadata?.parallel_id === nodeFinishedData.execution_metadata?.parallel_id)
|
||||
})
|
||||
if (currentIndex > -1)
|
||||
responseItem.workflowProcess.tracing[currentIndex] = nodeFinishedData as any
|
||||
})
|
||||
},
|
||||
onTTSChunk: (msgId: string, audio: string) => {
|
||||
if (!audio || audio === '')
|
||||
return
|
||||
const audioPlayer = getOrCreatePlayer()
|
||||
if (audioPlayer) {
|
||||
audioPlayer.playAudioWithAudio(audio, true)
|
||||
AudioPlayerManager.getInstance().resetMsgId(msgId)
|
||||
}
|
||||
},
|
||||
onTTSEnd: (_msgId: string, audio: string) => {
|
||||
const audioPlayer = getOrCreatePlayer()
|
||||
if (audioPlayer)
|
||||
audioPlayer.playAudioWithAudio(audio, false)
|
||||
},
|
||||
onLoopStart: ({ data: loopStartedData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (!responseItem.workflowProcess)
|
||||
return
|
||||
if (!responseItem.workflowProcess.tracing)
|
||||
responseItem.workflowProcess.tracing = []
|
||||
responseItem.workflowProcess.tracing.push({
|
||||
...loopStartedData,
|
||||
status: WorkflowRunningStatus.Running,
|
||||
})
|
||||
})
|
||||
},
|
||||
onLoopFinish: ({ data: loopFinishedData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (!responseItem.workflowProcess?.tracing)
|
||||
return
|
||||
const tracing = responseItem.workflowProcess.tracing
|
||||
const loopIndex = tracing.findIndex(item => item.node_id === loopFinishedData.node_id
|
||||
&& (item.execution_metadata?.parallel_id === loopFinishedData.execution_metadata?.parallel_id || item.parallel_id === loopFinishedData.execution_metadata?.parallel_id))!
|
||||
if (loopIndex > -1) {
|
||||
tracing[loopIndex] = {
|
||||
...tracing[loopIndex],
|
||||
...loopFinishedData,
|
||||
status: WorkflowRunningStatus.Succeeded,
|
||||
}
|
||||
}
|
||||
})
|
||||
},
|
||||
onHumanInputRequired: ({ data: humanInputRequiredData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (!responseItem.humanInputFormDataList) {
|
||||
responseItem.humanInputFormDataList = [humanInputRequiredData]
|
||||
}
|
||||
else {
|
||||
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputRequiredData.node_id)
|
||||
if (currentFormIndex > -1)
|
||||
responseItem.humanInputFormDataList[currentFormIndex] = humanInputRequiredData
|
||||
else
|
||||
responseItem.humanInputFormDataList.push(humanInputRequiredData)
|
||||
}
|
||||
if (responseItem.workflowProcess?.tracing) {
|
||||
const currentTracingIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === humanInputRequiredData.node_id)
|
||||
if (currentTracingIndex > -1)
|
||||
responseItem.workflowProcess.tracing[currentTracingIndex].status = NodeRunningStatus.Paused
|
||||
}
|
||||
})
|
||||
},
|
||||
onHumanInputFormFilled: ({ data: humanInputFilledFormData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (responseItem.humanInputFormDataList?.length) {
|
||||
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputFilledFormData.node_id)
|
||||
if (currentFormIndex > -1)
|
||||
responseItem.humanInputFormDataList.splice(currentFormIndex, 1)
|
||||
}
|
||||
if (!responseItem.humanInputFilledFormDataList)
|
||||
responseItem.humanInputFilledFormDataList = [humanInputFilledFormData]
|
||||
else
|
||||
responseItem.humanInputFilledFormDataList.push(humanInputFilledFormData)
|
||||
})
|
||||
},
|
||||
onHumanInputFormTimeout: ({ data: humanInputFormTimeoutData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (responseItem.humanInputFormDataList?.length) {
|
||||
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputFormTimeoutData.node_id)
|
||||
responseItem.humanInputFormDataList[currentFormIndex].expiration_time = humanInputFormTimeoutData.expiration_time
|
||||
}
|
||||
})
|
||||
},
|
||||
onWorkflowPaused: ({ data: workflowPausedData }) => {
|
||||
const resumeUrl = `/workflow/${workflowPausedData.workflow_run_id}/events`
|
||||
pausedStateRef.current = true
|
||||
sseGet(resumeUrl, {}, otherOptions)
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
responseItem.workflowProcess!.status = WorkflowRunningStatus.Paused
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
if (workflowEventsAbortControllerRef.current)
|
||||
workflowEventsAbortControllerRef.current.abort()
|
||||
|
||||
sseGet(url, {}, otherOptions)
|
||||
}, [updateChatTreeNode, handleResponding, createAudioPlayerManager, config?.suggested_questions_after_answer])
|
||||
|
||||
const updateCurrentQAOnTree = useCallback(({
|
||||
parentId,
|
||||
responseItem,
|
||||
@ -917,6 +1245,8 @@ export const useChat = (
|
||||
})
|
||||
},
|
||||
onWorkflowStarted: ({ workflow_run_id, task_id, conversation_id, message_id }) => {
|
||||
handleResponding(true)
|
||||
hasStopRespondedRef.current = false
|
||||
// If there are no streaming messages, we still need to set the conversation_id to avoid create a new conversation when regeneration in chat-flow.
|
||||
if (conversation_id) {
|
||||
conversationIdRef.current = conversation_id
|
||||
@ -1275,6 +1605,7 @@ export const useChat = (
|
||||
setIsResponding,
|
||||
handleSend,
|
||||
handleResume,
|
||||
handleReconnect,
|
||||
handleSwitchSibling,
|
||||
suggestedQuestions,
|
||||
handleRestart,
|
||||
|
||||
@ -111,6 +111,7 @@ export type IChatItem = {
|
||||
agent_thoughts?: ThoughtItem[]
|
||||
message_files?: FileEntity[]
|
||||
workflow_run_id?: string
|
||||
created_at?: number
|
||||
// for agent log
|
||||
conversationId?: string
|
||||
input?: any
|
||||
|
||||
@ -177,6 +177,7 @@ const createUseChatReturn = (overrides: Partial<UseChatReturn> = {}): UseChatRet
|
||||
setTargetMessageId: vi.fn() as UseChatReturn['setTargetMessageId'],
|
||||
handleSend: vi.fn(),
|
||||
handleResume: vi.fn(),
|
||||
handleReconnect: vi.fn(),
|
||||
setIsResponding: vi.fn() as UseChatReturn['setIsResponding'],
|
||||
handleStop: vi.fn(),
|
||||
handleSwitchSibling: vi.fn(),
|
||||
@ -541,6 +542,52 @@ describe('EmbeddedChatbot chat-wrapper', () => {
|
||||
expect(handleSwitchSibling).toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should reconnect to a recent running workflow on mount', () => {
|
||||
const handleReconnect = vi.fn()
|
||||
vi.mocked(useChat).mockReturnValue(createUseChatReturn({
|
||||
handleReconnect,
|
||||
}))
|
||||
vi.mocked(useEmbeddedChatbotContext).mockReturnValue(createContextValue({
|
||||
appPrevChatList: [
|
||||
{
|
||||
id: 'running-node',
|
||||
isAnswer: true,
|
||||
content: 'partial',
|
||||
workflow_run_id: 'run-active',
|
||||
created_at: Math.floor(Date.now() / 1000) - 30,
|
||||
children: [],
|
||||
} as unknown as ChatItemInTree,
|
||||
],
|
||||
}))
|
||||
render(<ChatWrapper />)
|
||||
expect(handleReconnect).toHaveBeenCalledWith(
|
||||
'running-node',
|
||||
'run-active',
|
||||
expect.objectContaining({ isPublicAPI: true }),
|
||||
)
|
||||
})
|
||||
|
||||
it('should not reconnect to an old workflow beyond the retention window', () => {
|
||||
const handleReconnect = vi.fn()
|
||||
vi.mocked(useChat).mockReturnValue(createUseChatReturn({
|
||||
handleReconnect,
|
||||
}))
|
||||
vi.mocked(useEmbeddedChatbotContext).mockReturnValue(createContextValue({
|
||||
appPrevChatList: [
|
||||
{
|
||||
id: 'old-node',
|
||||
isAnswer: true,
|
||||
content: 'done',
|
||||
workflow_run_id: 'run-old',
|
||||
created_at: Math.floor(Date.now() / 1000) - 700,
|
||||
children: [],
|
||||
} as unknown as ChatItemInTree,
|
||||
],
|
||||
}))
|
||||
render(<ChatWrapper />)
|
||||
expect(handleReconnect).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should handle conversation completion and suggested questions in chat actions', async () => {
|
||||
const handleSend = vi.fn()
|
||||
vi.mocked(useChat).mockReturnValue(createUseChatReturn({
|
||||
|
||||
@ -834,6 +834,34 @@ describe('useEmbeddedChatbot', () => {
|
||||
const question = chatList.find((m: unknown) => (m as Record<string, unknown>).id === 'question-msg-no-files')
|
||||
expect(question).toBeDefined()
|
||||
})
|
||||
|
||||
it('should pass through workflow_run_id and created_at from message items', async () => {
|
||||
localStorage.setItem(CONVERSATION_ID_INFO, JSON.stringify({ 'app-1': { DEFAULT: 'conversation-1' } }))
|
||||
mockFetchConversations.mockResolvedValue(
|
||||
createConversationData({ data: [createConversationItem({ id: 'conversation-1' })] }),
|
||||
)
|
||||
mockFetchChatList.mockResolvedValue({
|
||||
data: [{
|
||||
id: 'msg-wf',
|
||||
query: 'Running workflow',
|
||||
answer: 'Partial',
|
||||
workflow_run_id: 'wf-embedded-1',
|
||||
created_at: 1700000000,
|
||||
}],
|
||||
})
|
||||
|
||||
const { result } = await renderWithClient(() => useEmbeddedChatbot(AppSourceType.webApp))
|
||||
await waitFor(() => expect(result.current.appPrevChatList.length).toBeGreaterThan(0), { timeout: 3000 })
|
||||
|
||||
const questionNode = result.current.appPrevChatList.find(
|
||||
(m: unknown) => (m as Record<string, unknown>).id === 'question-msg-wf',
|
||||
) as Record<string, unknown> | undefined
|
||||
expect(questionNode).toBeDefined()
|
||||
const answerNode = (questionNode!.children as Record<string, unknown>[])?.[0]
|
||||
expect(answerNode).toBeDefined()
|
||||
expect(answerNode!.workflow_run_id).toBe('wf-embedded-1')
|
||||
expect(answerNode!.created_at).toBe(1700000000)
|
||||
})
|
||||
})
|
||||
|
||||
describe('currentConversationItem from pinned list', () => {
|
||||
|
||||
@ -85,6 +85,7 @@ const ChatWrapper = () => {
|
||||
handleSend,
|
||||
handleStop,
|
||||
handleSwitchSibling,
|
||||
handleReconnect,
|
||||
isResponding: respondingState,
|
||||
suggestedQuestions,
|
||||
} = useChat(
|
||||
@ -142,37 +143,47 @@ const ChatWrapper = () => {
|
||||
setIsResponding(respondingState)
|
||||
}, [respondingState, setIsResponding])
|
||||
|
||||
// Resume paused workflows when chat history is loaded
|
||||
// Resume paused workflows or reconnect to running workflows when chat history is loaded
|
||||
useEffect(() => {
|
||||
if (!appPrevChatList || appPrevChatList.length === 0)
|
||||
return
|
||||
|
||||
// Find the last answer item with workflow_run_id that needs resumption (DFS - find deepest first)
|
||||
let lastPausedNode: ChatItemInTree | undefined
|
||||
const findLastPausedWorkflow = (nodes: ChatItemInTree[]) => {
|
||||
nodes.forEach((node) => {
|
||||
// DFS: recurse to children first
|
||||
if (node.children && node.children.length > 0)
|
||||
findLastPausedWorkflow(node.children)
|
||||
const STREAM_RETENTION_SECONDS = 600
|
||||
|
||||
// Track the last node with humanInputFormDataList
|
||||
if (node.isAnswer && node.workflow_run_id && node.humanInputFormDataList && node.humanInputFormDataList.length > 0)
|
||||
lastPausedNode = node
|
||||
let lastPausedNode: ChatItemInTree | undefined
|
||||
let lastRunningNode: ChatItemInTree | undefined
|
||||
const findReconnectableWorkflow = (nodes: ChatItemInTree[]) => {
|
||||
nodes.forEach((node) => {
|
||||
if (node.isAnswer && node.workflow_run_id) {
|
||||
if (node.humanInputFormDataList && node.humanInputFormDataList.length > 0) {
|
||||
lastPausedNode = node
|
||||
}
|
||||
else if (
|
||||
node.created_at
|
||||
&& (Date.now() / 1000 - node.created_at) < STREAM_RETENTION_SECONDS
|
||||
) {
|
||||
lastRunningNode = node
|
||||
}
|
||||
}
|
||||
|
||||
if (node.children && node.children.length > 0)
|
||||
findReconnectableWorkflow(node.children)
|
||||
})
|
||||
}
|
||||
|
||||
findLastPausedWorkflow(appPrevChatList)
|
||||
findReconnectableWorkflow(appPrevChatList)
|
||||
|
||||
const callbacks = {
|
||||
onGetSuggestedQuestions: (responseItemId: string) => fetchSuggestedQuestions(responseItemId, appSourceType, appId),
|
||||
onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted,
|
||||
isPublicAPI: appSourceType === AppSourceType.webApp,
|
||||
}
|
||||
|
||||
// Only resume the last paused workflow
|
||||
if (lastPausedNode) {
|
||||
handleSwitchSibling(
|
||||
lastPausedNode.id,
|
||||
{
|
||||
onGetSuggestedQuestions: responseItemId => fetchSuggestedQuestions(responseItemId, appSourceType, appId),
|
||||
onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted,
|
||||
isPublicAPI: appSourceType === AppSourceType.webApp,
|
||||
},
|
||||
)
|
||||
handleSwitchSibling(lastPausedNode.id, callbacks)
|
||||
}
|
||||
else if (lastRunningNode) {
|
||||
handleReconnect(lastRunningNode.id, lastRunningNode.workflow_run_id!, callbacks)
|
||||
}
|
||||
}, [])
|
||||
|
||||
|
||||
@ -42,6 +42,8 @@ function getFormattedChatList(messages: any[]) {
|
||||
citation: item.retriever_resources,
|
||||
message_files: getProcessedFilesFromResponse(answerFiles.map((item: any) => ({ ...item, related_id: item.id }))),
|
||||
parentMessageId: `question-${item.id}`,
|
||||
workflow_run_id: item.workflow_run_id,
|
||||
created_at: item.created_at,
|
||||
})
|
||||
})
|
||||
return newChatList
|
||||
|
||||
Reference in New Issue
Block a user