Compare commits

...

18 Commits

Author SHA1 Message Date
73a1dc293f [autofix.ci] apply automated fixes 2026-04-21 10:31:40 +00:00
26c00ffd85 Merge branch 'main' into feat/replayable-stream 2026-04-21 18:27:03 +08:00
33db927327 remove stale comments 2026-04-21 18:23:51 +08:00
d226a5bdd3 rollback unnecessary changes 2026-04-21 18:23:51 +08:00
2c5a9ec1a0 [autofix.ci] apply automated fixes 2026-04-21 09:24:45 +00:00
b59d8d3446 resolve conflicts 2026-04-21 17:19:48 +08:00
87bbcc4e3b improve workflow stop event handling 2026-04-21 17:18:11 +08:00
91008c0329 Merge remote-tracking branch 'upstream/main' into feat/replayable-stream 2026-04-21 16:05:26 +08:00
49e12984c3 remove replay 2026-04-21 15:52:02 +08:00
34c46561e9 fix(eslint): update suppression count for ts/no-explicit-any rule 2026-04-20 18:17:16 +08:00
a25e3b66c1 test(chat): enhance unit tests for chat hooks and answer component visibility 2026-04-20 17:52:45 +08:00
45b0751826 fix(chat): add abort mechanism for in-flight requests on unmount 2026-04-20 17:17:42 +08:00
ea58415f88 fix(chat): update conversation handling and improve response logic 2026-04-17 16:22:22 +08:00
73ff36cb0e refactor: remove replay parameter from workflow event URL 2026-04-17 14:18:14 +08:00
6afdde1bc4 remove replay capability 2026-04-17 11:58:21 +08:00
c294006ecf fix unit tests 2026-04-17 11:10:11 +08:00
53c0dde3d5 [autofix.ci] apply automated fixes 2026-04-16 09:45:48 +00:00
a1759fccb8 feat: replayable message 2026-04-16 17:41:21 +08:00
24 changed files with 923 additions and 57 deletions

View File

@ -84,6 +84,9 @@ class WorkflowEventsApi(WebApiResource):
def _generate_stream_events():
if include_state_snapshot:
# TODO(wylswz): events between shapshot and live tail may be lost.
# TODO(wylswz): previous message chunks are not replayed. In order to support replay, we need
# to figure out a way to deduplicate events between snapshot and stream.
return generator.convert_to_event_stream(
build_workflow_event_stream(
app_mode=app_mode,

View File

@ -311,7 +311,7 @@ class MessageBasedAppGenerator(BaseAppGenerator):
cls,
app_mode: AppMode,
workflow_run_id: str,
idle_timeout=300,
idle_timeout: float = 300,
on_subscribe: Callable[[], None] | None = None,
) -> Generator[Mapping | str, None, None]:
topic = cls.get_response_topic(app_mode, workflow_run_id)

View File

@ -23,7 +23,7 @@ class MessageGenerator:
cls,
app_mode: AppMode,
workflow_run_id: str,
idle_timeout=300,
idle_timeout: float = 300,
ping_interval: float = 10.0,
on_subscribe: Callable[[], None] | None = None,
) -> Generator[Mapping | str, None, None]:

View File

@ -27,6 +27,9 @@ def stream_topic_events(
terminal_values = _normalize_terminal_events(terminal_events)
last_msg_time = time.time()
last_ping_time = last_msg_time
# The application layer intentionally does not use broadcast-channel replay;
# callers that need historical events should compose them from persisted state
# (see ``build_workflow_event_stream``) and then tail the live stream.
with topic.subscribe() as sub:
# on_subscribe fires only after the Redis subscription is active.
# This is used to gate task start and reduce pub/sub race for the first event.

View File

@ -410,7 +410,7 @@ class WorkflowBasedAppRunner:
elif isinstance(event, GraphRunFailedEvent):
self._publish_event(QueueWorkflowFailedEvent(error=event.error, exceptions_count=event.exceptions_count))
elif isinstance(event, GraphRunAbortedEvent):
self._publish_event(QueueWorkflowFailedEvent(error=event.reason or "Unknown error", exceptions_count=0))
self._publish_event(QueueWorkflowPartialSuccessEvent(outputs={}, exceptions_count=0))
elif isinstance(event, GraphRunPausedEvent):
runtime_state = workflow_entry.graph_engine.graph_runtime_state
paused_nodes = runtime_state.get_paused_nodes()

View File

@ -58,6 +58,7 @@ class MessageListItem(ResponseModel):
message_files: list[MessageFile]
status: str
error: str | None = None
workflow_run_id: str | None = None
extra_contents: list[ExecutionExtraContentDomainModel]
@field_validator("inputs", mode="before")

View File

@ -76,7 +76,6 @@ class _StreamsSubscription(Subscription):
# reading and writing the _listener / `_closed` attribute.
self._lock = threading.Lock()
self._closed: bool = False
# self._closed = threading.Event()
self._listener: threading.Thread | None = None
def _listen(self) -> None:

View File

@ -37,10 +37,13 @@ class AppTaskService:
Returns:
None
"""
# Legacy mechanism: Set stop flag in Redis
AppQueueManager.set_stop_flag(task_id, invoke_from, user_id)
# New mechanism: Send stop command via GraphEngine for workflow-based apps
# This ensures proper workflow status recording in the persistence layer
if app_mode in (AppMode.ADVANCED_CHAT, AppMode.WORKFLOW):
# Let the event handler process the Graphon abort event instead of
# stopping the queue listener immediately. Otherwise, events may be
# lost and the workflow run can remain stuck in the running state.
GraphEngineManager(redis_client).send_stop_command(task_id)
else:
# Legacy mechanism: Set stop flag in Redis
AppQueueManager.set_stop_flag(task_id, invoke_from, user_id)

View File

@ -62,6 +62,19 @@ def build_workflow_event_stream(
idle_timeout: float = 300,
ping_interval: float = 10.0,
) -> Generator[Mapping[str, Any] | str, None, None]:
"""Yield a stream of workflow events composed of a DB-derived snapshot followed by live tail.
The stream is assembled in two phases that are kept **structurally disjoint** so no
per-event deduplication is required:
1. Snapshot phase: events rebuilt from persistent state via ``_build_snapshot_events``
(``workflow_started``, optional ``message_replace``, ``node_started``/``node_finished``
for each persisted execution, and an optional terminal ``workflow_paused``). This
represents the history that already happened from the client's point of view.
2. Tail phase: events delivered by the broadcast subscription **from the moment of
subscription onward only**. Anything that was published before the subscription was
established is ignored here, since message id is not tracked in current implementation.
"""
topic = MessageGenerator.get_response_topic(app_mode, workflow_run.id)
workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
node_execution_repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository(session_maker)

View File

@ -50,6 +50,7 @@ def make_message():
msg.user_feedback = MagicMock(rating=None)
msg.status = "normal"
msg.error = None
msg.workflow_run_id = "22222222-2222-2222-2222-222222222222"
return msg
@ -84,6 +85,8 @@ class TestMessageListApi:
assert result["limit"] == 20
assert result["has_more"] is False
assert len(result["data"]) == 2
assert result["data"][0]["workflow_run_id"] == "22222222-2222-2222-2222-222222222222"
assert result["data"][1]["workflow_run_id"] == "22222222-2222-2222-2222-222222222222"
def test_get_not_chat_app(self):
api = module.MessageListApi()

View File

@ -991,7 +991,7 @@
"count": 2
},
"ts/no-explicit-any": {
"count": 17
"count": 20
}
},
"web/app/components/base/chat/chat/index.tsx": {

View File

@ -125,6 +125,7 @@ const defaultChatHookReturn: Partial<ChatHookReturn> = {
handleSend: vi.fn(),
handleStop: vi.fn(),
handleSwitchSibling: vi.fn(),
handleReconnect: vi.fn(),
isResponding: false,
suggestedQuestions: [],
}
@ -605,6 +606,95 @@ describe('ChatWrapper', () => {
expect(handleSwitchSibling).not.toHaveBeenCalled()
})
it('should reconnect to a recent running workflow on mount', () => {
const handleReconnect = vi.fn()
vi.mocked(useChat).mockReturnValue({
...defaultChatHookReturn,
chatList: [],
handleReconnect,
} as unknown as ChatHookReturn)
vi.mocked(useChatWithHistoryContext).mockReturnValue({
...defaultContextValue,
appPrevChatTree: [{
id: 'running-answer',
isAnswer: true,
content: 'partial content',
workflow_run_id: 'run-active',
created_at: Math.floor(Date.now() / 1000) - 30,
children: [],
} as unknown as ChatItemInTree],
})
render(<ChatWrapper />)
expect(handleReconnect).toHaveBeenCalledWith(
'running-answer',
'run-active',
expect.objectContaining({ isPublicAPI: true }),
)
})
it('should not reconnect to an old workflow beyond the retention window', () => {
const handleReconnect = vi.fn()
vi.mocked(useChat).mockReturnValue({
...defaultChatHookReturn,
chatList: [],
handleReconnect,
} as unknown as ChatHookReturn)
vi.mocked(useChatWithHistoryContext).mockReturnValue({
...defaultContextValue,
appPrevChatTree: [{
id: 'old-answer',
isAnswer: true,
content: 'old content',
workflow_run_id: 'run-old',
created_at: Math.floor(Date.now() / 1000) - 700,
children: [],
} as unknown as ChatItemInTree],
})
render(<ChatWrapper />)
expect(handleReconnect).not.toHaveBeenCalled()
})
it('should prefer paused workflow over running workflow for reconnection', () => {
const handleSwitchSibling = vi.fn()
const handleReconnect = vi.fn()
vi.mocked(useChat).mockReturnValue({
...defaultChatHookReturn,
chatList: [],
handleSwitchSibling,
handleReconnect,
} as unknown as ChatHookReturn)
vi.mocked(useChatWithHistoryContext).mockReturnValue({
...defaultContextValue,
appPrevChatTree: [
{
id: 'running-answer',
isAnswer: true,
content: '',
workflow_run_id: 'run-active',
created_at: Math.floor(Date.now() / 1000) - 10,
children: [],
} as unknown as ChatItemInTree,
{
id: 'paused-answer',
isAnswer: true,
content: '',
workflow_run_id: 'run-paused',
humanInputFormDataList: [{ node_id: 'n-1' }],
children: [],
} as unknown as ChatItemInTree,
],
})
render(<ChatWrapper />)
expect(handleSwitchSibling).toHaveBeenCalledWith('paused-answer', expect.any(Object))
expect(handleReconnect).not.toHaveBeenCalled()
})
it('should call stopChatMessageResponding when handleStop is triggered', () => {
const handleStop = vi.fn()
vi.mocked(useChat).mockReturnValue({

View File

@ -1836,6 +1836,82 @@ describe('useChatWithHistory', () => {
expect(messageWithFiles?.children?.[0]?.message_files).toHaveLength(1)
expect(messageWithFiles?.children?.[0]?.agent_thoughts?.[0]?.message_files).toHaveLength(1)
})
it('should pass through workflow_run_id from item and created_at', async () => {
const listData = createConversationData({
data: [createConversationItem({ id: 'conversation-1' })],
})
mockFetchConversations.mockResolvedValue(listData)
mockFetchChatList.mockResolvedValue({
data: [
{
id: 'msg-running',
query: 'Running query',
answer: 'Running answer',
message_files: [],
feedback: null,
retriever_resources: [],
agent_thoughts: null,
parent_message_id: null,
inputs: {},
status: 'normal',
workflow_run_id: 'wf-direct-id',
created_at: 1700000000,
},
],
})
const { result } = await renderWithClient(() => useChatWithHistory())
await waitFor(() => {
expect(result!.current.appPrevChatTree.length).toBeGreaterThan(0)
})
const answerNode = result!.current.appPrevChatTree[0]?.children?.[0]
expect(answerNode?.workflow_run_id).toBe('wf-direct-id')
expect(answerNode?.created_at).toBe(1700000000)
})
it('should prefer item.workflow_run_id over extra_contents workflow_run_id', async () => {
const listData = createConversationData({
data: [createConversationItem({ id: 'conversation-1' })],
})
mockFetchConversations.mockResolvedValue(listData)
mockFetchChatList.mockResolvedValue({
data: [
{
id: 'msg-both',
query: 'Both query',
answer: 'Both answer',
message_files: [],
feedback: null,
retriever_resources: [],
agent_thoughts: null,
parent_message_id: null,
inputs: {},
status: 'paused',
workflow_run_id: 'wf-item-level',
extra_contents: [
{
type: 'human_input',
submitted: false,
form_definition: { fields: [] },
workflow_run_id: 'wf-extra-level',
},
],
},
],
})
const { result } = await renderWithClient(() => useChatWithHistory())
await waitFor(() => {
expect(result!.current.appPrevChatTree.length).toBeGreaterThan(0)
})
const answerNode = result!.current.appPrevChatTree[0]?.children?.[0]
expect(answerNode?.workflow_run_id).toBe('wf-item-level')
})
})
// Scenario: newConversation merge replaces existing conversation item when id already exists.

View File

@ -79,6 +79,7 @@ const ChatWrapper = () => {
handleSend,
handleStop,
handleSwitchSibling,
handleReconnect,
isResponding: respondingState,
suggestedQuestions,
} = useChat(
@ -140,37 +141,47 @@ const ChatWrapper = () => {
setIsResponding(respondingState)
}, [respondingState, setIsResponding])
// Resume paused workflows when chat history is loaded
// Resume paused workflows or reconnect to running workflows when chat history is loaded
useEffect(() => {
if (!appPrevChatTree || appPrevChatTree.length === 0)
return
// Find the last answer item with workflow_run_id that needs resumption (DFS - find deepest first)
let lastPausedNode: ChatItemInTree | undefined
const findLastPausedWorkflow = (nodes: ChatItemInTree[]) => {
nodes.forEach((node) => {
// DFS: recurse to children first
if (node.children && node.children.length > 0)
findLastPausedWorkflow(node.children)
const STREAM_RETENTION_SECONDS = 600
// Track the last node with humanInputFormDataList
if (node.isAnswer && node.workflow_run_id && node.humanInputFormDataList && node.humanInputFormDataList.length > 0)
lastPausedNode = node
let lastPausedNode: ChatItemInTree | undefined
let lastRunningNode: ChatItemInTree | undefined
const findReconnectableWorkflow = (nodes: ChatItemInTree[]) => {
nodes.forEach((node) => {
if (node.isAnswer && node.workflow_run_id) {
if (node.humanInputFormDataList && node.humanInputFormDataList.length > 0) {
lastPausedNode = node
}
else if (
node.created_at
&& (Date.now() / 1000 - node.created_at) < STREAM_RETENTION_SECONDS
) {
lastRunningNode = node
}
}
if (node.children && node.children.length > 0)
findReconnectableWorkflow(node.children)
})
}
findLastPausedWorkflow(appPrevChatTree)
findReconnectableWorkflow(appPrevChatTree)
const callbacks = {
onGetSuggestedQuestions: (responseItemId: string) => fetchSuggestedQuestions(responseItemId, appSourceType, appId),
onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted,
isPublicAPI: appSourceType === AppSourceType.webApp,
}
// Only resume the last paused workflow
if (lastPausedNode) {
handleSwitchSibling(
lastPausedNode.id,
{
onGetSuggestedQuestions: responseItemId => fetchSuggestedQuestions(responseItemId, appSourceType, appId),
onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted,
isPublicAPI: appSourceType === AppSourceType.webApp,
},
)
handleSwitchSibling(lastPausedNode.id, callbacks)
}
else if (lastRunningNode) {
handleReconnect(lastRunningNode.id, lastRunningNode.workflow_run_id!, callbacks)
}
}, [])

View File

@ -35,12 +35,12 @@ function getFormattedChatList(messages: any[]) {
const answerFiles = item.message_files?.filter((file: any) => file.belongs_to === 'assistant') || []
const humanInputFormDataList: HumanInputFormData[] = []
const humanInputFilledFormDataList: HumanInputFilledFormData[] = []
let workflowRunId = ''
let workflowRunIdFromExtra = ''
if (item.status === 'paused') {
item.extra_contents?.forEach((content: ExtraContent) => {
if (content.type === 'human_input' && !content.submitted) {
humanInputFormDataList.push(content.form_definition)
workflowRunId = content.workflow_run_id
workflowRunIdFromExtra = content.workflow_run_id
}
})
}
@ -62,7 +62,8 @@ function getFormattedChatList(messages: any[]) {
parentMessageId: `question-${item.id}`,
humanInputFormDataList,
humanInputFilledFormDataList,
workflow_run_id: workflowRunId,
workflow_run_id: item.workflow_run_id || workflowRunIdFromExtra,
created_at: item.created_at,
})
})
return newChatList
@ -437,7 +438,7 @@ export const useChatWithHistory = (installedAppInfo?: InstalledApp) => {
if (conversationId === currentConversationId)
handleNewConversation()
handleUpdateConversationList()
}, [isInstalledApp, appId, t, handleUpdateConversationList, handleNewConversation, currentConversationId, conversationDeleting])
}, [conversationDeleting, currentConversationId, handleNewConversation, handleUpdateConversationList, appSourceType, appId, t])
const [conversationRenaming, setConversationRenaming] = useState(false)
const handleRenameConversation = useCallback(async (conversationId: string, newName: string, { onSuccess }: Callback) => {
if (conversationRenaming)
@ -463,7 +464,7 @@ export const useChatWithHistory = (installedAppInfo?: InstalledApp) => {
finally {
setConversationRenaming(false)
}
}, [isInstalledApp, appId, t, conversationRenaming, originConversationList])
}, [conversationRenaming, t, appSourceType, appId, originConversationList])
const handleNewConversationCompleted = useCallback((newConversationId: string) => {
setNewConversationId(newConversationId)
handleConversationIdInfoChange(newConversationId)

View File

@ -1264,6 +1264,102 @@ describe('useChat', () => {
})
})
describe('handleReconnect', () => {
it('should call sseGet with include_state_snapshot and rebuild from snapshot events', () => {
let callbacks: HookCallbacks
vi.mocked(sseGet).mockImplementation(async (_url, _params, options) => {
callbacks = options as HookCallbacks
})
const prevChatTree = [{
id: 'q-1',
content: 'query',
isAnswer: false,
children: [{
id: 'm-reconnect',
content: 'stale partial content',
isAnswer: true,
siblingIndex: 0,
workflowProcess: { status: 'running', tracing: [{ node_id: 'old-node' }] },
}],
}]
const { result } = renderHook(() => useChat(undefined, undefined, prevChatTree as ChatItemInTree[]))
act(() => {
result.current.handleReconnect('m-reconnect', 'wr-reconnect', { isPublicAPI: true })
})
expect(sseGet).toHaveBeenCalledWith(
'/workflow/wr-reconnect/events?include_state_snapshot=true',
expect.any(Object),
expect.any(Object),
)
// Content is not reset until onWorkflowStarted fires
const beforeStart = result.current.chatList[1]
expect(beforeStart.content).toBe('stale partial content')
act(() => {
callbacks.onWorkflowStarted({ workflow_run_id: 'wr-reconnect', task_id: 't-1' })
})
// After onWorkflowStarted, content is reset and workflowProcess is fresh
const afterStart = result.current.chatList[1]
expect(afterStart.content).toBe('')
expect(afterStart.workflowProcess).toEqual({ status: WorkflowRunningStatus.Running, tracing: [] })
act(() => {
callbacks.onMessageReplace({ answer: 'full snapshot text' })
callbacks.onNodeStarted({ data: { node_id: 'n-1', id: 'n-1', title: 'Node 1' } })
callbacks.onNodeFinished({ data: { node_id: 'n-1', id: 'n-1', title: 'Node 1', status: 'succeeded' } })
callbacks.onWorkflowFinished({ data: { status: 'succeeded' } })
callbacks.onCompleted()
})
const lastResponse = result.current.chatList[1]
expect(lastResponse.content).toBe('full snapshot text')
expect(lastResponse.workflowProcess?.status).toBe('succeeded')
expect(lastResponse.workflowProcess?.tracing).toHaveLength(1)
expect(result.current.isResponding).toBe(false)
})
it('should abort previous stream when reconnecting again', () => {
const callbacksList: HookCallbacks[] = []
vi.mocked(sseGet).mockImplementation(async (_url, _params, options) => {
callbacksList.push(options as HookCallbacks)
})
const prevChatTree = [{
id: 'q-1',
content: 'query',
isAnswer: false,
children: [{
id: 'm-rc',
content: 'partial',
isAnswer: true,
siblingIndex: 0,
}],
}]
const { result } = renderHook(() => useChat(undefined, undefined, prevChatTree as ChatItemInTree[]))
const previousAbort = createAbortControllerMock()
act(() => {
result.current.handleReconnect('m-rc', 'wr-1', { isPublicAPI: true })
})
act(() => {
callbacksList[0].getAbortController(previousAbort)
})
act(() => {
result.current.handleReconnect('m-rc', 'wr-2', { isPublicAPI: true })
})
expect(previousAbort.abort).toHaveBeenCalledTimes(1)
})
})
describe('createAudioPlayerManager branch cases', () => {
it('should handle ttsUrl generation for appId with installed apps', async () => {
vi.mocked(usePathname).mockReturnValue('/explore/installed/app')
@ -1444,6 +1540,89 @@ describe('useChat', () => {
})
})
describe('onWorkflowStarted re-enables responding in handleSend', () => {
it('should set isResponding back to true when onWorkflowStarted fires after stop', () => {
let callbacks: HookCallbacks
vi.mocked(ssePost).mockImplementation(async (_url, _params, options) => {
callbacks = options as HookCallbacks
})
const stopChat = vi.fn()
const { result } = renderHook(() => useChat(undefined, undefined, undefined, stopChat))
act(() => {
result.current.handleSend('test-url', { query: 'workflow restart' }, {})
})
expect(result.current.isResponding).toBe(true)
act(() => {
callbacks.onWorkflowStarted({ workflow_run_id: 'wr-1', task_id: 't-1' })
callbacks.onData('part', true, { messageId: 'm-1', conversationId: 'c-1', taskId: 't-1' })
})
act(() => {
result.current.handleStop()
})
expect(result.current.isResponding).toBe(false)
act(() => {
callbacks.onWorkflowStarted({ workflow_run_id: 'wr-2', task_id: 't-2' })
})
expect(result.current.isResponding).toBe(true)
})
})
describe('abortInflightRequests on unmount', () => {
it('should abort all in-flight requests when the hook unmounts', () => {
let callbacks: HookCallbacks
vi.mocked(ssePost).mockImplementation(async (_url, _params, options) => {
callbacks = options as HookCallbacks
})
const workflowAbort = createAbortControllerMock()
const conversationAbort = createAbortControllerMock()
const suggestedAbort = createAbortControllerMock()
const config = { suggested_questions_after_answer: { enabled: true } }
const onGetConversationMessages = vi.fn().mockImplementation(async (_id: string, setAbort: (ac: AbortController) => void) => {
setAbort(conversationAbort)
return {
data: [{
id: 'm-1',
answer: 'a',
message: [{ role: 'assistant', text: 'a' }],
created_at: Date.now(),
answer_tokens: 1,
message_tokens: 1,
provider_response_latency: 0.1,
inputs: {},
query: 'q',
}],
}
})
const onGetSuggestedQuestions = vi.fn().mockImplementation(async (_id: string, setAbort: (ac: AbortController) => void) => {
setAbort(suggestedAbort)
return { data: [] }
})
const { result, unmount } = renderHook(() => useChat(config as ChatConfig))
act(() => {
result.current.handleSend('test-url', { query: 'unmount' }, {
onGetConversationMessages,
onGetSuggestedQuestions,
})
})
act(() => {
callbacks.getAbortController(workflowAbort)
})
unmount()
expect(workflowAbort.abort).toHaveBeenCalled()
})
})
describe('annotations and siblings', () => {
const prevChatTree = [{
id: 'q-1',

View File

@ -143,6 +143,67 @@ describe('Answer Component', () => {
})
})
describe('ContentSwitch visibility while responding', () => {
it('should hide ContentSwitch when responding in non-human-inputs layout', () => {
render(
<Answer
{...defaultProps}
responding={true}
item={{
...defaultProps.item,
siblingCount: 3,
siblingIndex: 1,
prevSibling: 'msg-0',
nextSibling: 'msg-2',
} as unknown as ChatItem}
switchSibling={vi.fn()}
/>,
)
expect(screen.queryByRole('button', { name: 'Previous' })).not.toBeInTheDocument()
expect(screen.queryByRole('button', { name: 'Next' })).not.toBeInTheDocument()
})
it('should hide ContentSwitch when responding in human-inputs layout with content', () => {
render(
<Answer
{...defaultProps}
responding={true}
item={{
...defaultProps.item,
content: 'partial response',
siblingCount: 3,
siblingIndex: 1,
prevSibling: 'msg-0',
nextSibling: 'msg-2',
humanInputFormDataList: [{ id: 'form1' }],
} as unknown as ChatItem}
switchSibling={vi.fn()}
/>,
)
expect(screen.queryByRole('button', { name: 'Previous' })).not.toBeInTheDocument()
expect(screen.queryByRole('button', { name: 'Next' })).not.toBeInTheDocument()
})
it('should show ContentSwitch when not responding with siblings', () => {
render(
<Answer
{...defaultProps}
responding={false}
item={{
...defaultProps.item,
siblingCount: 3,
siblingIndex: 1,
prevSibling: 'msg-0',
nextSibling: 'msg-2',
} as unknown as ChatItem}
switchSibling={vi.fn()}
/>,
)
expect(screen.getByRole('button', { name: 'Previous' })).toBeInTheDocument()
expect(screen.getByRole('button', { name: 'Next' })).toBeInTheDocument()
})
})
describe('Interactions', () => {
it('should handle switch sibling', () => {
const mockSwitchSibling = vi.fn()

View File

@ -303,6 +303,7 @@ const Answer: FC<AnswerProps> = ({
{
typeof item.siblingCount === 'number'
&& item.siblingCount > 1
&& !responding
&& (
<ContentSwitch
count={item.siblingCount}
@ -408,7 +409,9 @@ const Answer: FC<AnswerProps> = ({
}
{
typeof item.siblingCount === 'number'
&& item.siblingCount > 1 && (
&& item.siblingCount > 1
&& !responding
&& (
<ContentSwitch
count={item.siblingCount}
currentIndex={item.siblingIndex}

View File

@ -209,6 +209,19 @@ export const useChat = (
cb?.()
}, [handleStop])
const abortInflightRequests = useCallback(() => {
conversationMessagesAbortControllerRef.current?.abort()
suggestedQuestionsAbortControllerRef.current?.abort()
workflowEventsAbortControllerRef.current?.abort()
}, [])
// Abort all in-flight fetch/SSE requests when the consumer unmounts
useEffect(() => {
return () => {
abortInflightRequests()
}
}, [abortInflightRequests])
const createAudioPlayerManager = useCallback(() => {
let ttsUrl = ''
let ttsIsPublic = false
@ -244,7 +257,8 @@ export const useChat = (
}: SendCallback,
) => {
const getOrCreatePlayer = createAudioPlayerManager()
// Re-subscribe to workflow events for the specific message
// Re-subscribe to workflow events for the specific message.
// so all retained events are replayed on reconnection (e.g. page refresh).
const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true`
const otherOptions: IOtherOptions = {
@ -583,6 +597,320 @@ export const useChat = (
)
}, [updateChatTreeNode, handleResponding, createAudioPlayerManager, config?.suggested_questions_after_answer])
const handleReconnect = useCallback((
messageId: string,
workflowRunId: string,
{
onGetSuggestedQuestions,
onConversationComplete,
isPublicAPI,
}: SendCallback,
) => {
const getOrCreatePlayer = createAudioPlayerManager()
const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true`
const otherOptions: IOtherOptions = {
isPublicAPI,
getAbortController: (abortController) => {
workflowEventsAbortControllerRef.current = abortController
},
onData: (message: string, isFirstMessage: boolean, { conversationId: newConversationId, messageId: msgId, taskId }: IOnDataMoreInfo) => {
updateChatTreeNode(messageId, (responseItem) => {
const isAgentMode = responseItem.agent_thoughts && responseItem.agent_thoughts.length > 0
if (!isAgentMode) {
responseItem.content = responseItem.content + message
}
else {
const lastThought = responseItem.agent_thoughts?.[responseItem.agent_thoughts?.length - 1]
if (lastThought)
lastThought.thought = lastThought.thought + message
}
if (msgId)
responseItem.id = msgId
})
if (isFirstMessage && newConversationId)
conversationIdRef.current = newConversationId
if (taskId)
taskIdRef.current = taskId
},
async onCompleted(hasError?: boolean) {
handleResponding(false)
if (hasError)
return
if (onConversationComplete)
onConversationComplete(conversationIdRef.current)
if (config?.suggested_questions_after_answer?.enabled && !hasStopRespondedRef.current && onGetSuggestedQuestions) {
try {
const { data }: any = await onGetSuggestedQuestions(
messageId,
newAbortController => suggestedQuestionsAbortControllerRef.current = newAbortController,
)
setSuggestedQuestions(data)
}
catch {
setSuggestedQuestions([])
}
}
},
onFile(file) {
const fileType = (file as { type?: string }).type || 'image'
const baseFile = ('transferMethod' in file) ? (file as Partial<FileEntity>) : null
const convertedFile: FileEntity = {
id: baseFile?.id || (file as { id: string }).id,
type: baseFile?.type || (fileType === 'image' ? 'image/png' : fileType === 'video' ? 'video/mp4' : fileType === 'audio' ? 'audio/mpeg' : 'application/octet-stream'),
transferMethod: (baseFile?.transferMethod as FileEntity['transferMethod']) || (fileType === 'image' ? 'remote_url' : 'local_file'),
uploadedId: baseFile?.uploadedId || (file as { id: string }).id,
supportFileType: baseFile?.supportFileType || (fileType === 'image' ? 'image' : fileType === 'video' ? 'video' : fileType === 'audio' ? 'audio' : 'document'),
progress: baseFile?.progress ?? 100,
name: baseFile?.name || `generated_${fileType}.${fileType === 'image' ? 'png' : fileType === 'video' ? 'mp4' : fileType === 'audio' ? 'mp3' : 'bin'}`,
url: baseFile?.url || (file as { url?: string }).url,
size: baseFile?.size ?? 0,
}
updateChatTreeNode(messageId, (responseItem) => {
const lastThought = responseItem.agent_thoughts?.[responseItem.agent_thoughts?.length - 1]
if (lastThought) {
responseItem.agent_thoughts!.at(-1)!.message_files = [...(lastThought as any).message_files, convertedFile]
}
else {
const currentFiles = (responseItem.message_files as FileEntity[] | undefined) ?? []
responseItem.message_files = [...currentFiles, convertedFile]
}
})
},
onThought(thought) {
updateChatTreeNode(messageId, (responseItem) => {
if (thought.message_id)
responseItem.id = thought.message_id
if (thought.conversation_id)
responseItem.conversationId = thought.conversation_id
if (!responseItem.agent_thoughts)
responseItem.agent_thoughts = []
if (responseItem.agent_thoughts.length === 0) {
responseItem.agent_thoughts.push(thought)
}
else {
const lastThought = responseItem.agent_thoughts.at(-1)
if (lastThought?.id === thought.id) {
thought.thought = lastThought.thought
thought.message_files = lastThought.message_files
responseItem.agent_thoughts[responseItem.agent_thoughts.length - 1] = thought
}
else {
responseItem.agent_thoughts.push(thought)
}
}
})
},
onMessageEnd: (messageEnd) => {
updateChatTreeNode(messageId, (responseItem) => {
if (messageEnd.metadata?.annotation_reply) {
responseItem.annotation = ({
id: messageEnd.metadata.annotation_reply.id,
authorName: messageEnd.metadata.annotation_reply.account.name,
})
return
}
responseItem.citation = messageEnd.metadata?.retriever_resources || []
const processedFilesFromResponse = getProcessedFilesFromResponse(messageEnd.files || [])
responseItem.allFiles = uniqBy([...(responseItem.allFiles || []), ...(processedFilesFromResponse || [])], 'id')
})
},
onMessageReplace: (messageReplace) => {
updateChatTreeNode(messageId, (responseItem) => {
responseItem.content = messageReplace.answer
})
},
onError() {
handleResponding(false)
},
onWorkflowStarted: ({ workflow_run_id, task_id }) => {
handleResponding(true)
hasStopRespondedRef.current = false
updateChatTreeNode(messageId, (responseItem) => {
taskIdRef.current = task_id
responseItem.content = ''
responseItem.workflow_run_id = workflow_run_id
responseItem.workflowProcess = {
status: WorkflowRunningStatus.Running,
tracing: [],
}
})
},
onWorkflowFinished: ({ data: workflowFinishedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (responseItem.workflowProcess)
responseItem.workflowProcess.status = workflowFinishedData.status as WorkflowRunningStatus
})
},
onIterationStart: ({ data: iterationStartedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess)
return
if (!responseItem.workflowProcess.tracing)
responseItem.workflowProcess.tracing = []
responseItem.workflowProcess.tracing.push({
...iterationStartedData,
status: WorkflowRunningStatus.Running,
})
})
},
onIterationFinish: ({ data: iterationFinishedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess?.tracing)
return
const tracing = responseItem.workflowProcess.tracing
const iterationIndex = tracing.findIndex(item => item.node_id === iterationFinishedData.node_id
&& (item.execution_metadata?.parallel_id === iterationFinishedData.execution_metadata?.parallel_id || item.parallel_id === iterationFinishedData.execution_metadata?.parallel_id))!
if (iterationIndex > -1) {
tracing[iterationIndex] = {
...tracing[iterationIndex],
...iterationFinishedData,
status: WorkflowRunningStatus.Succeeded,
}
}
})
},
onNodeStarted: ({ data: nodeStartedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess)
return
if (!responseItem.workflowProcess.tracing)
responseItem.workflowProcess.tracing = []
const currentIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === nodeStartedData.node_id)
if (currentIndex > -1) {
responseItem.workflowProcess.tracing[currentIndex] = {
...nodeStartedData,
status: NodeRunningStatus.Running,
}
}
else {
if (nodeStartedData.iteration_id)
return
responseItem.workflowProcess.tracing.push({
...nodeStartedData,
status: WorkflowRunningStatus.Running,
})
}
})
},
onNodeFinished: ({ data: nodeFinishedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess?.tracing)
return
if (nodeFinishedData.iteration_id)
return
const currentIndex = responseItem.workflowProcess.tracing.findIndex((item) => {
if (!item.execution_metadata?.parallel_id)
return item.id === nodeFinishedData.id
return item.id === nodeFinishedData.id && (item.execution_metadata?.parallel_id === nodeFinishedData.execution_metadata?.parallel_id)
})
if (currentIndex > -1)
responseItem.workflowProcess.tracing[currentIndex] = nodeFinishedData as any
})
},
onTTSChunk: (msgId: string, audio: string) => {
if (!audio || audio === '')
return
const audioPlayer = getOrCreatePlayer()
if (audioPlayer) {
audioPlayer.playAudioWithAudio(audio, true)
AudioPlayerManager.getInstance().resetMsgId(msgId)
}
},
onTTSEnd: (_msgId: string, audio: string) => {
const audioPlayer = getOrCreatePlayer()
if (audioPlayer)
audioPlayer.playAudioWithAudio(audio, false)
},
onLoopStart: ({ data: loopStartedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess)
return
if (!responseItem.workflowProcess.tracing)
responseItem.workflowProcess.tracing = []
responseItem.workflowProcess.tracing.push({
...loopStartedData,
status: WorkflowRunningStatus.Running,
})
})
},
onLoopFinish: ({ data: loopFinishedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess?.tracing)
return
const tracing = responseItem.workflowProcess.tracing
const loopIndex = tracing.findIndex(item => item.node_id === loopFinishedData.node_id
&& (item.execution_metadata?.parallel_id === loopFinishedData.execution_metadata?.parallel_id || item.parallel_id === loopFinishedData.execution_metadata?.parallel_id))!
if (loopIndex > -1) {
tracing[loopIndex] = {
...tracing[loopIndex],
...loopFinishedData,
status: WorkflowRunningStatus.Succeeded,
}
}
})
},
onHumanInputRequired: ({ data: humanInputRequiredData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.humanInputFormDataList) {
responseItem.humanInputFormDataList = [humanInputRequiredData]
}
else {
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputRequiredData.node_id)
if (currentFormIndex > -1)
responseItem.humanInputFormDataList[currentFormIndex] = humanInputRequiredData
else
responseItem.humanInputFormDataList.push(humanInputRequiredData)
}
if (responseItem.workflowProcess?.tracing) {
const currentTracingIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === humanInputRequiredData.node_id)
if (currentTracingIndex > -1)
responseItem.workflowProcess.tracing[currentTracingIndex].status = NodeRunningStatus.Paused
}
})
},
onHumanInputFormFilled: ({ data: humanInputFilledFormData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (responseItem.humanInputFormDataList?.length) {
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputFilledFormData.node_id)
if (currentFormIndex > -1)
responseItem.humanInputFormDataList.splice(currentFormIndex, 1)
}
if (!responseItem.humanInputFilledFormDataList)
responseItem.humanInputFilledFormDataList = [humanInputFilledFormData]
else
responseItem.humanInputFilledFormDataList.push(humanInputFilledFormData)
})
},
onHumanInputFormTimeout: ({ data: humanInputFormTimeoutData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (responseItem.humanInputFormDataList?.length) {
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputFormTimeoutData.node_id)
responseItem.humanInputFormDataList[currentFormIndex].expiration_time = humanInputFormTimeoutData.expiration_time
}
})
},
onWorkflowPaused: ({ data: workflowPausedData }) => {
const resumeUrl = `/workflow/${workflowPausedData.workflow_run_id}/events`
pausedStateRef.current = true
sseGet(resumeUrl, {}, otherOptions)
updateChatTreeNode(messageId, (responseItem) => {
responseItem.workflowProcess!.status = WorkflowRunningStatus.Paused
})
},
}
if (workflowEventsAbortControllerRef.current)
workflowEventsAbortControllerRef.current.abort()
sseGet(url, {}, otherOptions)
}, [updateChatTreeNode, handleResponding, createAudioPlayerManager, config?.suggested_questions_after_answer])
const updateCurrentQAOnTree = useCallback(({
parentId,
responseItem,
@ -917,6 +1245,8 @@ export const useChat = (
})
},
onWorkflowStarted: ({ workflow_run_id, task_id, conversation_id, message_id }) => {
handleResponding(true)
hasStopRespondedRef.current = false
// If there are no streaming messages, we still need to set the conversation_id to avoid create a new conversation when regeneration in chat-flow.
if (conversation_id) {
conversationIdRef.current = conversation_id
@ -1275,6 +1605,7 @@ export const useChat = (
setIsResponding,
handleSend,
handleResume,
handleReconnect,
handleSwitchSibling,
suggestedQuestions,
handleRestart,

View File

@ -111,6 +111,7 @@ export type IChatItem = {
agent_thoughts?: ThoughtItem[]
message_files?: FileEntity[]
workflow_run_id?: string
created_at?: number
// for agent log
conversationId?: string
input?: any

View File

@ -177,6 +177,7 @@ const createUseChatReturn = (overrides: Partial<UseChatReturn> = {}): UseChatRet
setTargetMessageId: vi.fn() as UseChatReturn['setTargetMessageId'],
handleSend: vi.fn(),
handleResume: vi.fn(),
handleReconnect: vi.fn(),
setIsResponding: vi.fn() as UseChatReturn['setIsResponding'],
handleStop: vi.fn(),
handleSwitchSibling: vi.fn(),
@ -541,6 +542,52 @@ describe('EmbeddedChatbot chat-wrapper', () => {
expect(handleSwitchSibling).toHaveBeenCalled()
})
it('should reconnect to a recent running workflow on mount', () => {
const handleReconnect = vi.fn()
vi.mocked(useChat).mockReturnValue(createUseChatReturn({
handleReconnect,
}))
vi.mocked(useEmbeddedChatbotContext).mockReturnValue(createContextValue({
appPrevChatList: [
{
id: 'running-node',
isAnswer: true,
content: 'partial',
workflow_run_id: 'run-active',
created_at: Math.floor(Date.now() / 1000) - 30,
children: [],
} as unknown as ChatItemInTree,
],
}))
render(<ChatWrapper />)
expect(handleReconnect).toHaveBeenCalledWith(
'running-node',
'run-active',
expect.objectContaining({ isPublicAPI: true }),
)
})
it('should not reconnect to an old workflow beyond the retention window', () => {
const handleReconnect = vi.fn()
vi.mocked(useChat).mockReturnValue(createUseChatReturn({
handleReconnect,
}))
vi.mocked(useEmbeddedChatbotContext).mockReturnValue(createContextValue({
appPrevChatList: [
{
id: 'old-node',
isAnswer: true,
content: 'done',
workflow_run_id: 'run-old',
created_at: Math.floor(Date.now() / 1000) - 700,
children: [],
} as unknown as ChatItemInTree,
],
}))
render(<ChatWrapper />)
expect(handleReconnect).not.toHaveBeenCalled()
})
it('should handle conversation completion and suggested questions in chat actions', async () => {
const handleSend = vi.fn()
vi.mocked(useChat).mockReturnValue(createUseChatReturn({

View File

@ -834,6 +834,34 @@ describe('useEmbeddedChatbot', () => {
const question = chatList.find((m: unknown) => (m as Record<string, unknown>).id === 'question-msg-no-files')
expect(question).toBeDefined()
})
it('should pass through workflow_run_id and created_at from message items', async () => {
localStorage.setItem(CONVERSATION_ID_INFO, JSON.stringify({ 'app-1': { DEFAULT: 'conversation-1' } }))
mockFetchConversations.mockResolvedValue(
createConversationData({ data: [createConversationItem({ id: 'conversation-1' })] }),
)
mockFetchChatList.mockResolvedValue({
data: [{
id: 'msg-wf',
query: 'Running workflow',
answer: 'Partial',
workflow_run_id: 'wf-embedded-1',
created_at: 1700000000,
}],
})
const { result } = await renderWithClient(() => useEmbeddedChatbot(AppSourceType.webApp))
await waitFor(() => expect(result.current.appPrevChatList.length).toBeGreaterThan(0), { timeout: 3000 })
const questionNode = result.current.appPrevChatList.find(
(m: unknown) => (m as Record<string, unknown>).id === 'question-msg-wf',
) as Record<string, unknown> | undefined
expect(questionNode).toBeDefined()
const answerNode = (questionNode!.children as Record<string, unknown>[])?.[0]
expect(answerNode).toBeDefined()
expect(answerNode!.workflow_run_id).toBe('wf-embedded-1')
expect(answerNode!.created_at).toBe(1700000000)
})
})
describe('currentConversationItem from pinned list', () => {

View File

@ -85,6 +85,7 @@ const ChatWrapper = () => {
handleSend,
handleStop,
handleSwitchSibling,
handleReconnect,
isResponding: respondingState,
suggestedQuestions,
} = useChat(
@ -142,37 +143,47 @@ const ChatWrapper = () => {
setIsResponding(respondingState)
}, [respondingState, setIsResponding])
// Resume paused workflows when chat history is loaded
// Resume paused workflows or reconnect to running workflows when chat history is loaded
useEffect(() => {
if (!appPrevChatList || appPrevChatList.length === 0)
return
// Find the last answer item with workflow_run_id that needs resumption (DFS - find deepest first)
let lastPausedNode: ChatItemInTree | undefined
const findLastPausedWorkflow = (nodes: ChatItemInTree[]) => {
nodes.forEach((node) => {
// DFS: recurse to children first
if (node.children && node.children.length > 0)
findLastPausedWorkflow(node.children)
const STREAM_RETENTION_SECONDS = 600
// Track the last node with humanInputFormDataList
if (node.isAnswer && node.workflow_run_id && node.humanInputFormDataList && node.humanInputFormDataList.length > 0)
lastPausedNode = node
let lastPausedNode: ChatItemInTree | undefined
let lastRunningNode: ChatItemInTree | undefined
const findReconnectableWorkflow = (nodes: ChatItemInTree[]) => {
nodes.forEach((node) => {
if (node.isAnswer && node.workflow_run_id) {
if (node.humanInputFormDataList && node.humanInputFormDataList.length > 0) {
lastPausedNode = node
}
else if (
node.created_at
&& (Date.now() / 1000 - node.created_at) < STREAM_RETENTION_SECONDS
) {
lastRunningNode = node
}
}
if (node.children && node.children.length > 0)
findReconnectableWorkflow(node.children)
})
}
findLastPausedWorkflow(appPrevChatList)
findReconnectableWorkflow(appPrevChatList)
const callbacks = {
onGetSuggestedQuestions: (responseItemId: string) => fetchSuggestedQuestions(responseItemId, appSourceType, appId),
onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted,
isPublicAPI: appSourceType === AppSourceType.webApp,
}
// Only resume the last paused workflow
if (lastPausedNode) {
handleSwitchSibling(
lastPausedNode.id,
{
onGetSuggestedQuestions: responseItemId => fetchSuggestedQuestions(responseItemId, appSourceType, appId),
onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted,
isPublicAPI: appSourceType === AppSourceType.webApp,
},
)
handleSwitchSibling(lastPausedNode.id, callbacks)
}
else if (lastRunningNode) {
handleReconnect(lastRunningNode.id, lastRunningNode.workflow_run_id!, callbacks)
}
}, [])

View File

@ -42,6 +42,8 @@ function getFormattedChatList(messages: any[]) {
citation: item.retriever_resources,
message_files: getProcessedFilesFromResponse(answerFiles.map((item: any) => ({ ...item, related_id: item.id }))),
parentMessageId: `question-${item.id}`,
workflow_run_id: item.workflow_run_id,
created_at: item.created_at,
})
})
return newChatList