Compare commits

..

2 Commits

Author SHA1 Message Date
53c0dde3d5 [autofix.ci] apply automated fixes 2026-04-16 09:45:48 +00:00
a1759fccb8 feat: replayable message 2026-04-16 17:41:21 +08:00
25 changed files with 670 additions and 78 deletions

View File

@ -81,6 +81,7 @@ class WorkflowEventsApi(WebApiResource):
raise InvalidArgumentError(f"cannot subscribe to workflow run, workflow_run_id={workflow_run.id}")
include_state_snapshot = request.args.get("include_state_snapshot", "false").lower() == "true"
replay = request.args.get("replay", "false").lower() == "true"
def _generate_stream_events():
if include_state_snapshot:
@ -91,10 +92,11 @@ class WorkflowEventsApi(WebApiResource):
tenant_id=app_model.tenant_id,
app_id=app_model.id,
session_maker=session_maker,
replay=replay,
)
)
return generator.convert_to_event_stream(
msg_generator.retrieve_events(app_mode, workflow_run.id),
msg_generator.retrieve_events(app_mode, workflow_run.id, replay=replay),
)
event_generator = _generate_stream_events

View File

@ -311,12 +311,14 @@ class MessageBasedAppGenerator(BaseAppGenerator):
cls,
app_mode: AppMode,
workflow_run_id: str,
idle_timeout=300,
idle_timeout: float = 300,
on_subscribe: Callable[[], None] | None = None,
replay: bool = False,
) -> Generator[Mapping | str, None, None]:
topic = cls.get_response_topic(app_mode, workflow_run_id)
return stream_topic_events(
topic=topic,
idle_timeout=idle_timeout,
on_subscribe=on_subscribe,
replay=replay,
)

View File

@ -23,9 +23,10 @@ class MessageGenerator:
cls,
app_mode: AppMode,
workflow_run_id: str,
idle_timeout=300,
idle_timeout: float = 300,
ping_interval: float = 10.0,
on_subscribe: Callable[[], None] | None = None,
replay: bool = False,
) -> Generator[Mapping | str, None, None]:
topic = cls.get_response_topic(app_mode, workflow_run_id)
return stream_topic_events(
@ -33,4 +34,5 @@ class MessageGenerator:
idle_timeout=idle_timeout,
ping_interval=ping_interval,
on_subscribe=on_subscribe,
replay=replay,
)

View File

@ -17,6 +17,7 @@ def stream_topic_events(
ping_interval: float | None = None,
on_subscribe: Callable[[], None] | None = None,
terminal_events: Iterable[str | StreamEvent] | None = None,
replay: bool = False,
) -> Generator[Mapping[str, Any] | str, None, None]:
# send a PING event immediately to prevent the connection staying in pending state for a long time.
#
@ -27,7 +28,7 @@ def stream_topic_events(
terminal_values = _normalize_terminal_events(terminal_events)
last_msg_time = time.time()
last_ping_time = last_msg_time
with topic.subscribe() as sub:
with topic.subscribe(replay=replay) as sub:
# on_subscribe fires only after the Redis subscription is active.
# This is used to gate task start and reduce pub/sub race for the first event.
if on_subscribe is not None:

View File

@ -58,6 +58,7 @@ class MessageListItem(ResponseModel):
message_files: list[MessageFile]
status: str
error: str | None = None
workflow_run_id: str | None = None
extra_contents: list[ExecutionExtraContentDomainModel]
@field_validator("inputs", mode="before")

View File

@ -92,7 +92,14 @@ class Subscriber(Protocol):
"""
@abstractmethod
def subscribe(self) -> Subscription:
def subscribe(self, *, replay: bool = False) -> Subscription:
"""Create a new subscription.
:param replay: When True and the underlying transport supports message retention
(e.g. Redis Streams), the subscription replays all buffered messages from
the beginning of the stream before switching to live tail. Transports
without retention (plain Pub/Sub) silently ignore this flag.
"""
pass

View File

@ -44,7 +44,7 @@ class Topic:
def as_subscriber(self) -> Subscriber:
return self
def subscribe(self) -> Subscription:
def subscribe(self, *, replay: bool = False) -> Subscription:
return _RedisSubscription(
client=self._client,
pubsub=self._client.pubsub(),

View File

@ -42,7 +42,7 @@ class ShardedTopic:
def as_subscriber(self) -> Subscriber:
return self
def subscribe(self) -> Subscription:
def subscribe(self, *, replay: bool = False) -> Subscription:
return _RedisShardedSubscription(
client=self._client,
pubsub=self._client.pubsub(),

View File

@ -54,16 +54,17 @@ class StreamsTopic:
def as_subscriber(self) -> Subscriber:
return self
def subscribe(self) -> Subscription:
return _StreamsSubscription(self._client, self._key)
def subscribe(self, *, replay: bool = False) -> Subscription:
return _StreamsSubscription(self._client, self._key, replay=replay)
class _StreamsSubscription(Subscription):
_SENTINEL = object()
def __init__(self, client: Redis | RedisCluster, key: str):
def __init__(self, client: Redis | RedisCluster, key: str, *, replay: bool = False):
self._client = client
self._key = key
self._replay = replay
self._queue: queue.Queue[object] = queue.Queue()
@ -76,7 +77,6 @@ class _StreamsSubscription(Subscription):
# reading and writing the _listener / `_closed` attribute.
self._lock = threading.Lock()
self._closed: bool = False
# self._closed = threading.Event()
self._listener: threading.Thread | None = None
def _listen(self) -> None:
@ -89,10 +89,9 @@ class _StreamsSubscription(Subscription):
# since this method runs in a dedicated thread, acquiring `_lock` inside this method won't cause
# deadlock.
# Setting initial last id to `$` to signal redis that we only want new messages.
#
# `"0"` replays all retained entries; `"$"` tails only new messages.
# ref: https://redis.io/docs/latest/commands/xread/#the-special--id
last_id = "$"
last_id = "0" if self._replay else "$"
try:
while True:
with self._lock:

View File

@ -416,6 +416,7 @@ class AppGenerateService:
cls,
app_model: App,
workflow_run: WorkflowRun,
replay: bool = False,
):
if workflow_run.status.is_ended():
# TODO(QuantumGhost): handled the ended scenario.
@ -424,5 +425,5 @@ class AppGenerateService:
generator = AdvancedChatAppGenerator()
return generator.convert_to_event_stream(
generator.retrieve_events(AppMode(app_model.mode), workflow_run.id),
generator.retrieve_events(AppMode(app_model.mode), workflow_run.id, replay=replay),
)

View File

@ -61,6 +61,7 @@ def build_workflow_event_stream(
session_maker: sessionmaker[Session],
idle_timeout: float = 300,
ping_interval: float = 10.0,
replay: bool = False,
) -> Generator[Mapping[str, Any] | str, None, None]:
topic = MessageGenerator.get_response_topic(app_mode, workflow_run.id)
workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
@ -103,7 +104,7 @@ def build_workflow_event_stream(
last_msg_time = time.time()
last_ping_time = last_msg_time
with topic.subscribe() as sub:
with topic.subscribe(replay=replay) as sub:
buffer_state = _start_buffering(sub)
try:
task_id = _resolve_task_id(resumption_context, buffer_state, workflow_run.id)

View File

@ -5,6 +5,7 @@ import InSiteMessageNotification from '@/app/components/app/in-site-message/noti
import AmplitudeProvider from '@/app/components/base/amplitude'
import GA, { GaType } from '@/app/components/base/ga'
import Zendesk from '@/app/components/base/zendesk'
import GotoAnything from '@/app/components/goto-anything'
import Header from '@/app/components/header'
import HeaderWrapper from '@/app/components/header/header-wrapper'
import ReadmePanel from '@/app/components/plugins/readme-panel'
@ -12,15 +13,10 @@ import { AppContextProvider } from '@/context/app-context-provider'
import { EventEmitterContextProvider } from '@/context/event-emitter-provider'
import { ModalContextProvider } from '@/context/modal-context-provider'
import { ProviderContextProvider } from '@/context/provider-context-provider'
import dynamic from '@/next/dynamic'
import PartnerStack from '../components/billing/partner-stack'
import Splash from '../components/splash'
import RoleRouteGuard from './role-route-guard'
const GotoAnything = dynamic(() => import('@/app/components/goto-anything'), {
ssr: false,
})
const Layout = ({ children }: { children: ReactNode }) => {
return (
<>

View File

@ -125,6 +125,7 @@ const defaultChatHookReturn: Partial<ChatHookReturn> = {
handleSend: vi.fn(),
handleStop: vi.fn(),
handleSwitchSibling: vi.fn(),
handleReconnect: vi.fn(),
isResponding: false,
suggestedQuestions: [],
}
@ -605,6 +606,95 @@ describe('ChatWrapper', () => {
expect(handleSwitchSibling).not.toHaveBeenCalled()
})
it('should reconnect to a recent running workflow on mount', () => {
const handleReconnect = vi.fn()
vi.mocked(useChat).mockReturnValue({
...defaultChatHookReturn,
chatList: [],
handleReconnect,
} as unknown as ChatHookReturn)
vi.mocked(useChatWithHistoryContext).mockReturnValue({
...defaultContextValue,
appPrevChatTree: [{
id: 'running-answer',
isAnswer: true,
content: 'partial content',
workflow_run_id: 'run-active',
created_at: Math.floor(Date.now() / 1000) - 30,
children: [],
} as unknown as ChatItemInTree],
})
render(<ChatWrapper />)
expect(handleReconnect).toHaveBeenCalledWith(
'running-answer',
'run-active',
expect.objectContaining({ isPublicAPI: true }),
)
})
it('should not reconnect to an old workflow beyond the retention window', () => {
const handleReconnect = vi.fn()
vi.mocked(useChat).mockReturnValue({
...defaultChatHookReturn,
chatList: [],
handleReconnect,
} as unknown as ChatHookReturn)
vi.mocked(useChatWithHistoryContext).mockReturnValue({
...defaultContextValue,
appPrevChatTree: [{
id: 'old-answer',
isAnswer: true,
content: 'old content',
workflow_run_id: 'run-old',
created_at: Math.floor(Date.now() / 1000) - 700,
children: [],
} as unknown as ChatItemInTree],
})
render(<ChatWrapper />)
expect(handleReconnect).not.toHaveBeenCalled()
})
it('should prefer paused workflow over running workflow for reconnection', () => {
const handleSwitchSibling = vi.fn()
const handleReconnect = vi.fn()
vi.mocked(useChat).mockReturnValue({
...defaultChatHookReturn,
chatList: [],
handleSwitchSibling,
handleReconnect,
} as unknown as ChatHookReturn)
vi.mocked(useChatWithHistoryContext).mockReturnValue({
...defaultContextValue,
appPrevChatTree: [
{
id: 'running-answer',
isAnswer: true,
content: '',
workflow_run_id: 'run-active',
created_at: Math.floor(Date.now() / 1000) - 10,
children: [],
} as unknown as ChatItemInTree,
{
id: 'paused-answer',
isAnswer: true,
content: '',
workflow_run_id: 'run-paused',
humanInputFormDataList: [{ node_id: 'n-1' }],
children: [],
} as unknown as ChatItemInTree,
],
})
render(<ChatWrapper />)
expect(handleSwitchSibling).toHaveBeenCalledWith('paused-answer', expect.any(Object))
expect(handleReconnect).not.toHaveBeenCalled()
})
it('should call stopChatMessageResponding when handleStop is triggered', () => {
const handleStop = vi.fn()
vi.mocked(useChat).mockReturnValue({

View File

@ -79,6 +79,7 @@ const ChatWrapper = () => {
handleSend,
handleStop,
handleSwitchSibling,
handleReconnect,
isResponding: respondingState,
suggestedQuestions,
} = useChat(
@ -140,37 +141,47 @@ const ChatWrapper = () => {
setIsResponding(respondingState)
}, [respondingState, setIsResponding])
// Resume paused workflows when chat history is loaded
// Resume paused workflows or reconnect to running workflows when chat history is loaded
useEffect(() => {
if (!appPrevChatTree || appPrevChatTree.length === 0)
return
// Find the last answer item with workflow_run_id that needs resumption (DFS - find deepest first)
let lastPausedNode: ChatItemInTree | undefined
const findLastPausedWorkflow = (nodes: ChatItemInTree[]) => {
nodes.forEach((node) => {
// DFS: recurse to children first
if (node.children && node.children.length > 0)
findLastPausedWorkflow(node.children)
const STREAM_RETENTION_SECONDS = 600
// Track the last node with humanInputFormDataList
if (node.isAnswer && node.workflow_run_id && node.humanInputFormDataList && node.humanInputFormDataList.length > 0)
lastPausedNode = node
let lastPausedNode: ChatItemInTree | undefined
let lastRunningNode: ChatItemInTree | undefined
const findReconnectableWorkflow = (nodes: ChatItemInTree[]) => {
nodes.forEach((node) => {
if (node.isAnswer && node.workflow_run_id) {
if (node.humanInputFormDataList && node.humanInputFormDataList.length > 0) {
lastPausedNode = node
}
else if (
node.created_at
&& (Date.now() / 1000 - node.created_at) < STREAM_RETENTION_SECONDS
) {
lastRunningNode = node
}
}
if (node.children && node.children.length > 0)
findReconnectableWorkflow(node.children)
})
}
findLastPausedWorkflow(appPrevChatTree)
findReconnectableWorkflow(appPrevChatTree)
const callbacks = {
onGetSuggestedQuestions: (responseItemId: string) => fetchSuggestedQuestions(responseItemId, appSourceType, appId),
onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted,
isPublicAPI: appSourceType === AppSourceType.webApp,
}
// Only resume the last paused workflow
if (lastPausedNode) {
handleSwitchSibling(
lastPausedNode.id,
{
onGetSuggestedQuestions: responseItemId => fetchSuggestedQuestions(responseItemId, appSourceType, appId),
onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted,
isPublicAPI: appSourceType === AppSourceType.webApp,
},
)
handleSwitchSibling(lastPausedNode.id, callbacks)
}
else if (lastRunningNode) {
handleReconnect(lastRunningNode.id, lastRunningNode.workflow_run_id!, callbacks)
}
}, [])

View File

@ -35,12 +35,12 @@ function getFormattedChatList(messages: any[]) {
const answerFiles = item.message_files?.filter((file: any) => file.belongs_to === 'assistant') || []
const humanInputFormDataList: HumanInputFormData[] = []
const humanInputFilledFormDataList: HumanInputFilledFormData[] = []
let workflowRunId = ''
let workflowRunIdFromExtra = ''
if (item.status === 'paused') {
item.extra_contents?.forEach((content: ExtraContent) => {
if (content.type === 'human_input' && !content.submitted) {
humanInputFormDataList.push(content.form_definition)
workflowRunId = content.workflow_run_id
workflowRunIdFromExtra = content.workflow_run_id
}
})
}
@ -62,7 +62,8 @@ function getFormattedChatList(messages: any[]) {
parentMessageId: `question-${item.id}`,
humanInputFormDataList,
humanInputFilledFormDataList,
workflow_run_id: workflowRunId,
workflow_run_id: item.workflow_run_id || workflowRunIdFromExtra,
created_at: item.created_at,
})
})
return newChatList

View File

@ -932,7 +932,7 @@ describe('useChat', () => {
})
expect(sseGet).toHaveBeenCalledWith(
'/workflow/wr-1/events?include_state_snapshot=true',
'/workflow/wr-1/events?include_state_snapshot=true&replay=true',
expect.any(Object),
expect.any(Object),
)
@ -1264,6 +1264,102 @@ describe('useChat', () => {
})
})
describe('handleReconnect', () => {
it('should call sseGet with include_state_snapshot and rebuild from snapshot events', () => {
let callbacks: HookCallbacks
vi.mocked(sseGet).mockImplementation(async (_url, _params, options) => {
callbacks = options as HookCallbacks
})
const prevChatTree = [{
id: 'q-1',
content: 'query',
isAnswer: false,
children: [{
id: 'm-reconnect',
content: 'stale partial content',
isAnswer: true,
siblingIndex: 0,
workflowProcess: { status: 'running', tracing: [{ node_id: 'old-node' }] },
}],
}]
const { result } = renderHook(() => useChat(undefined, undefined, prevChatTree as ChatItemInTree[]))
act(() => {
result.current.handleReconnect('m-reconnect', 'wr-reconnect', { isPublicAPI: true })
})
expect(sseGet).toHaveBeenCalledWith(
'/workflow/wr-reconnect/events?include_state_snapshot=true',
expect.any(Object),
expect.any(Object),
)
// Content is not reset until onWorkflowStarted fires
const beforeStart = result.current.chatList[1]
expect(beforeStart.content).toBe('stale partial content')
act(() => {
callbacks.onWorkflowStarted({ workflow_run_id: 'wr-reconnect', task_id: 't-1' })
})
// After onWorkflowStarted, content is reset and workflowProcess is fresh
const afterStart = result.current.chatList[1]
expect(afterStart.content).toBe('')
expect(afterStart.workflowProcess).toEqual({ status: WorkflowRunningStatus.Running, tracing: [] })
act(() => {
callbacks.onMessageReplace({ answer: 'full snapshot text' })
callbacks.onNodeStarted({ data: { node_id: 'n-1', id: 'n-1', title: 'Node 1' } })
callbacks.onNodeFinished({ data: { node_id: 'n-1', id: 'n-1', title: 'Node 1', status: 'succeeded' } })
callbacks.onWorkflowFinished({ data: { status: 'succeeded' } })
callbacks.onCompleted()
})
const lastResponse = result.current.chatList[1]
expect(lastResponse.content).toBe('full snapshot text')
expect(lastResponse.workflowProcess?.status).toBe('succeeded')
expect(lastResponse.workflowProcess?.tracing).toHaveLength(1)
expect(result.current.isResponding).toBe(false)
})
it('should abort previous stream when reconnecting again', () => {
const callbacksList: HookCallbacks[] = []
vi.mocked(sseGet).mockImplementation(async (_url, _params, options) => {
callbacksList.push(options as HookCallbacks)
})
const prevChatTree = [{
id: 'q-1',
content: 'query',
isAnswer: false,
children: [{
id: 'm-rc',
content: 'partial',
isAnswer: true,
siblingIndex: 0,
}],
}]
const { result } = renderHook(() => useChat(undefined, undefined, prevChatTree as ChatItemInTree[]))
const previousAbort = createAbortControllerMock()
act(() => {
result.current.handleReconnect('m-rc', 'wr-1', { isPublicAPI: true })
})
act(() => {
callbacksList[0].getAbortController(previousAbort)
})
act(() => {
result.current.handleReconnect('m-rc', 'wr-2', { isPublicAPI: true })
})
expect(previousAbort.abort).toHaveBeenCalledTimes(1)
})
})
describe('createAudioPlayerManager branch cases', () => {
it('should handle ttsUrl generation for appId with installed apps', async () => {
vi.mocked(usePathname).mockReturnValue('/explore/installed/app')
@ -1331,7 +1427,7 @@ describe('useChat', () => {
})
expect(sseGet).toHaveBeenCalledWith(
'/workflow/wr-tts-app/events?include_state_snapshot=true',
'/workflow/wr-tts-app/events?include_state_snapshot=true&replay=true',
expect.any(Object),
expect.any(Object),
)
@ -1493,7 +1589,7 @@ describe('useChat', () => {
// Should automatically call handleResume -> sseGet for human input
expect(sseGet).toHaveBeenCalledWith(
'/workflow/wr-1/events?include_state_snapshot=true',
'/workflow/wr-1/events?include_state_snapshot=true&replay=true',
expect.any(Object),
expect.any(Object),
)

View File

@ -244,8 +244,10 @@ export const useChat = (
}: SendCallback,
) => {
const getOrCreatePlayer = createAudioPlayerManager()
// Re-subscribe to workflow events for the specific message
const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true`
// Re-subscribe to workflow events for the specific message.
// replay=true tells the backend to read from the beginning of the Redis Stream
// so all retained events are replayed on reconnection (e.g. page refresh).
const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true&replay=true`
const otherOptions: IOtherOptions = {
isPublicAPI,
@ -583,6 +585,320 @@ export const useChat = (
)
}, [updateChatTreeNode, handleResponding, createAudioPlayerManager, config?.suggested_questions_after_answer])
const handleReconnect = useCallback((
messageId: string,
workflowRunId: string,
{
onGetSuggestedQuestions,
onConversationComplete,
isPublicAPI,
}: SendCallback,
) => {
const getOrCreatePlayer = createAudioPlayerManager()
const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true&replay=true`
const otherOptions: IOtherOptions = {
isPublicAPI,
getAbortController: (abortController) => {
workflowEventsAbortControllerRef.current = abortController
},
onData: (message: string, isFirstMessage: boolean, { conversationId: newConversationId, messageId: msgId, taskId }: IOnDataMoreInfo) => {
updateChatTreeNode(messageId, (responseItem) => {
const isAgentMode = responseItem.agent_thoughts && responseItem.agent_thoughts.length > 0
if (!isAgentMode) {
responseItem.content = responseItem.content + message
}
else {
const lastThought = responseItem.agent_thoughts?.[responseItem.agent_thoughts?.length - 1]
if (lastThought)
lastThought.thought = lastThought.thought + message
}
if (msgId)
responseItem.id = msgId
})
if (isFirstMessage && newConversationId)
conversationIdRef.current = newConversationId
if (taskId)
taskIdRef.current = taskId
},
async onCompleted(hasError?: boolean) {
handleResponding(false)
if (hasError)
return
if (onConversationComplete)
onConversationComplete(conversationIdRef.current)
if (config?.suggested_questions_after_answer?.enabled && !hasStopRespondedRef.current && onGetSuggestedQuestions) {
try {
const { data }: any = await onGetSuggestedQuestions(
messageId,
newAbortController => suggestedQuestionsAbortControllerRef.current = newAbortController,
)
setSuggestedQuestions(data)
}
catch {
setSuggestedQuestions([])
}
}
},
onFile(file) {
const fileType = (file as { type?: string }).type || 'image'
const baseFile = ('transferMethod' in file) ? (file as Partial<FileEntity>) : null
const convertedFile: FileEntity = {
id: baseFile?.id || (file as { id: string }).id,
type: baseFile?.type || (fileType === 'image' ? 'image/png' : fileType === 'video' ? 'video/mp4' : fileType === 'audio' ? 'audio/mpeg' : 'application/octet-stream'),
transferMethod: (baseFile?.transferMethod as FileEntity['transferMethod']) || (fileType === 'image' ? 'remote_url' : 'local_file'),
uploadedId: baseFile?.uploadedId || (file as { id: string }).id,
supportFileType: baseFile?.supportFileType || (fileType === 'image' ? 'image' : fileType === 'video' ? 'video' : fileType === 'audio' ? 'audio' : 'document'),
progress: baseFile?.progress ?? 100,
name: baseFile?.name || `generated_${fileType}.${fileType === 'image' ? 'png' : fileType === 'video' ? 'mp4' : fileType === 'audio' ? 'mp3' : 'bin'}`,
url: baseFile?.url || (file as { url?: string }).url,
size: baseFile?.size ?? 0,
}
updateChatTreeNode(messageId, (responseItem) => {
const lastThought = responseItem.agent_thoughts?.[responseItem.agent_thoughts?.length - 1]
if (lastThought) {
responseItem.agent_thoughts!.at(-1)!.message_files = [...(lastThought as any).message_files, convertedFile]
}
else {
const currentFiles = (responseItem.message_files as FileEntity[] | undefined) ?? []
responseItem.message_files = [...currentFiles, convertedFile]
}
})
},
onThought(thought) {
updateChatTreeNode(messageId, (responseItem) => {
if (thought.message_id)
responseItem.id = thought.message_id
if (thought.conversation_id)
responseItem.conversationId = thought.conversation_id
if (!responseItem.agent_thoughts)
responseItem.agent_thoughts = []
if (responseItem.agent_thoughts.length === 0) {
responseItem.agent_thoughts.push(thought)
}
else {
const lastThought = responseItem.agent_thoughts.at(-1)
if (lastThought?.id === thought.id) {
thought.thought = lastThought.thought
thought.message_files = lastThought.message_files
responseItem.agent_thoughts[responseItem.agent_thoughts.length - 1] = thought
}
else {
responseItem.agent_thoughts.push(thought)
}
}
})
},
onMessageEnd: (messageEnd) => {
updateChatTreeNode(messageId, (responseItem) => {
if (messageEnd.metadata?.annotation_reply) {
responseItem.annotation = ({
id: messageEnd.metadata.annotation_reply.id,
authorName: messageEnd.metadata.annotation_reply.account.name,
})
return
}
responseItem.citation = messageEnd.metadata?.retriever_resources || []
const processedFilesFromResponse = getProcessedFilesFromResponse(messageEnd.files || [])
responseItem.allFiles = uniqBy([...(responseItem.allFiles || []), ...(processedFilesFromResponse || [])], 'id')
})
},
onMessageReplace: (messageReplace) => {
updateChatTreeNode(messageId, (responseItem) => {
responseItem.content = messageReplace.answer
})
},
onError() {
handleResponding(false)
},
onWorkflowStarted: ({ workflow_run_id, task_id }) => {
handleResponding(true)
hasStopRespondedRef.current = false
updateChatTreeNode(messageId, (responseItem) => {
taskIdRef.current = task_id
responseItem.content = ''
responseItem.workflow_run_id = workflow_run_id
responseItem.workflowProcess = {
status: WorkflowRunningStatus.Running,
tracing: [],
}
})
},
onWorkflowFinished: ({ data: workflowFinishedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (responseItem.workflowProcess)
responseItem.workflowProcess.status = workflowFinishedData.status as WorkflowRunningStatus
})
},
onIterationStart: ({ data: iterationStartedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess)
return
if (!responseItem.workflowProcess.tracing)
responseItem.workflowProcess.tracing = []
responseItem.workflowProcess.tracing.push({
...iterationStartedData,
status: WorkflowRunningStatus.Running,
})
})
},
onIterationFinish: ({ data: iterationFinishedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess?.tracing)
return
const tracing = responseItem.workflowProcess.tracing
const iterationIndex = tracing.findIndex(item => item.node_id === iterationFinishedData.node_id
&& (item.execution_metadata?.parallel_id === iterationFinishedData.execution_metadata?.parallel_id || item.parallel_id === iterationFinishedData.execution_metadata?.parallel_id))!
if (iterationIndex > -1) {
tracing[iterationIndex] = {
...tracing[iterationIndex],
...iterationFinishedData,
status: WorkflowRunningStatus.Succeeded,
}
}
})
},
onNodeStarted: ({ data: nodeStartedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess)
return
if (!responseItem.workflowProcess.tracing)
responseItem.workflowProcess.tracing = []
const currentIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === nodeStartedData.node_id)
if (currentIndex > -1) {
responseItem.workflowProcess.tracing[currentIndex] = {
...nodeStartedData,
status: NodeRunningStatus.Running,
}
}
else {
if (nodeStartedData.iteration_id)
return
responseItem.workflowProcess.tracing.push({
...nodeStartedData,
status: WorkflowRunningStatus.Running,
})
}
})
},
onNodeFinished: ({ data: nodeFinishedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess?.tracing)
return
if (nodeFinishedData.iteration_id)
return
const currentIndex = responseItem.workflowProcess.tracing.findIndex((item) => {
if (!item.execution_metadata?.parallel_id)
return item.id === nodeFinishedData.id
return item.id === nodeFinishedData.id && (item.execution_metadata?.parallel_id === nodeFinishedData.execution_metadata?.parallel_id)
})
if (currentIndex > -1)
responseItem.workflowProcess.tracing[currentIndex] = nodeFinishedData as any
})
},
onTTSChunk: (msgId: string, audio: string) => {
if (!audio || audio === '')
return
const audioPlayer = getOrCreatePlayer()
if (audioPlayer) {
audioPlayer.playAudioWithAudio(audio, true)
AudioPlayerManager.getInstance().resetMsgId(msgId)
}
},
onTTSEnd: (_msgId: string, audio: string) => {
const audioPlayer = getOrCreatePlayer()
if (audioPlayer)
audioPlayer.playAudioWithAudio(audio, false)
},
onLoopStart: ({ data: loopStartedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess)
return
if (!responseItem.workflowProcess.tracing)
responseItem.workflowProcess.tracing = []
responseItem.workflowProcess.tracing.push({
...loopStartedData,
status: WorkflowRunningStatus.Running,
})
})
},
onLoopFinish: ({ data: loopFinishedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess?.tracing)
return
const tracing = responseItem.workflowProcess.tracing
const loopIndex = tracing.findIndex(item => item.node_id === loopFinishedData.node_id
&& (item.execution_metadata?.parallel_id === loopFinishedData.execution_metadata?.parallel_id || item.parallel_id === loopFinishedData.execution_metadata?.parallel_id))!
if (loopIndex > -1) {
tracing[loopIndex] = {
...tracing[loopIndex],
...loopFinishedData,
status: WorkflowRunningStatus.Succeeded,
}
}
})
},
onHumanInputRequired: ({ data: humanInputRequiredData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.humanInputFormDataList) {
responseItem.humanInputFormDataList = [humanInputRequiredData]
}
else {
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputRequiredData.node_id)
if (currentFormIndex > -1)
responseItem.humanInputFormDataList[currentFormIndex] = humanInputRequiredData
else
responseItem.humanInputFormDataList.push(humanInputRequiredData)
}
if (responseItem.workflowProcess?.tracing) {
const currentTracingIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === humanInputRequiredData.node_id)
if (currentTracingIndex > -1)
responseItem.workflowProcess.tracing[currentTracingIndex].status = NodeRunningStatus.Paused
}
})
},
onHumanInputFormFilled: ({ data: humanInputFilledFormData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (responseItem.humanInputFormDataList?.length) {
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputFilledFormData.node_id)
if (currentFormIndex > -1)
responseItem.humanInputFormDataList.splice(currentFormIndex, 1)
}
if (!responseItem.humanInputFilledFormDataList)
responseItem.humanInputFilledFormDataList = [humanInputFilledFormData]
else
responseItem.humanInputFilledFormDataList.push(humanInputFilledFormData)
})
},
onHumanInputFormTimeout: ({ data: humanInputFormTimeoutData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (responseItem.humanInputFormDataList?.length) {
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputFormTimeoutData.node_id)
responseItem.humanInputFormDataList[currentFormIndex].expiration_time = humanInputFormTimeoutData.expiration_time
}
})
},
onWorkflowPaused: ({ data: workflowPausedData }) => {
const resumeUrl = `/workflow/${workflowPausedData.workflow_run_id}/events`
pausedStateRef.current = true
sseGet(resumeUrl, {}, otherOptions)
updateChatTreeNode(messageId, (responseItem) => {
responseItem.workflowProcess!.status = WorkflowRunningStatus.Paused
})
},
}
if (workflowEventsAbortControllerRef.current)
workflowEventsAbortControllerRef.current.abort()
sseGet(url, {}, otherOptions)
}, [updateChatTreeNode, handleResponding, createAudioPlayerManager, config?.suggested_questions_after_answer])
const updateCurrentQAOnTree = useCallback(({
parentId,
responseItem,
@ -1275,6 +1591,7 @@ export const useChat = (
setIsResponding,
handleSend,
handleResume,
handleReconnect,
handleSwitchSibling,
suggestedQuestions,
handleRestart,

View File

@ -111,6 +111,7 @@ export type IChatItem = {
agent_thoughts?: ThoughtItem[]
message_files?: FileEntity[]
workflow_run_id?: string
created_at?: number
// for agent log
conversationId?: string
input?: any

View File

@ -177,6 +177,7 @@ const createUseChatReturn = (overrides: Partial<UseChatReturn> = {}): UseChatRet
setTargetMessageId: vi.fn() as UseChatReturn['setTargetMessageId'],
handleSend: vi.fn(),
handleResume: vi.fn(),
handleReconnect: vi.fn(),
setIsResponding: vi.fn() as UseChatReturn['setIsResponding'],
handleStop: vi.fn(),
handleSwitchSibling: vi.fn(),
@ -541,6 +542,52 @@ describe('EmbeddedChatbot chat-wrapper', () => {
expect(handleSwitchSibling).toHaveBeenCalled()
})
it('should reconnect to a recent running workflow on mount', () => {
const handleReconnect = vi.fn()
vi.mocked(useChat).mockReturnValue(createUseChatReturn({
handleReconnect,
}))
vi.mocked(useEmbeddedChatbotContext).mockReturnValue(createContextValue({
appPrevChatList: [
{
id: 'running-node',
isAnswer: true,
content: 'partial',
workflow_run_id: 'run-active',
created_at: Math.floor(Date.now() / 1000) - 30,
children: [],
} as unknown as ChatItemInTree,
],
}))
render(<ChatWrapper />)
expect(handleReconnect).toHaveBeenCalledWith(
'running-node',
'run-active',
expect.objectContaining({ isPublicAPI: true }),
)
})
it('should not reconnect to an old workflow beyond the retention window', () => {
const handleReconnect = vi.fn()
vi.mocked(useChat).mockReturnValue(createUseChatReturn({
handleReconnect,
}))
vi.mocked(useEmbeddedChatbotContext).mockReturnValue(createContextValue({
appPrevChatList: [
{
id: 'old-node',
isAnswer: true,
content: 'done',
workflow_run_id: 'run-old',
created_at: Math.floor(Date.now() / 1000) - 700,
children: [],
} as unknown as ChatItemInTree,
],
}))
render(<ChatWrapper />)
expect(handleReconnect).not.toHaveBeenCalled()
})
it('should handle conversation completion and suggested questions in chat actions', async () => {
const handleSend = vi.fn()
vi.mocked(useChat).mockReturnValue(createUseChatReturn({

View File

@ -85,6 +85,7 @@ const ChatWrapper = () => {
handleSend,
handleStop,
handleSwitchSibling,
handleReconnect,
isResponding: respondingState,
suggestedQuestions,
} = useChat(
@ -142,37 +143,47 @@ const ChatWrapper = () => {
setIsResponding(respondingState)
}, [respondingState, setIsResponding])
// Resume paused workflows when chat history is loaded
// Resume paused workflows or reconnect to running workflows when chat history is loaded
useEffect(() => {
if (!appPrevChatList || appPrevChatList.length === 0)
return
// Find the last answer item with workflow_run_id that needs resumption (DFS - find deepest first)
let lastPausedNode: ChatItemInTree | undefined
const findLastPausedWorkflow = (nodes: ChatItemInTree[]) => {
nodes.forEach((node) => {
// DFS: recurse to children first
if (node.children && node.children.length > 0)
findLastPausedWorkflow(node.children)
const STREAM_RETENTION_SECONDS = 600
// Track the last node with humanInputFormDataList
if (node.isAnswer && node.workflow_run_id && node.humanInputFormDataList && node.humanInputFormDataList.length > 0)
lastPausedNode = node
let lastPausedNode: ChatItemInTree | undefined
let lastRunningNode: ChatItemInTree | undefined
const findReconnectableWorkflow = (nodes: ChatItemInTree[]) => {
nodes.forEach((node) => {
if (node.isAnswer && node.workflow_run_id) {
if (node.humanInputFormDataList && node.humanInputFormDataList.length > 0) {
lastPausedNode = node
}
else if (
node.created_at
&& (Date.now() / 1000 - node.created_at) < STREAM_RETENTION_SECONDS
) {
lastRunningNode = node
}
}
if (node.children && node.children.length > 0)
findReconnectableWorkflow(node.children)
})
}
findLastPausedWorkflow(appPrevChatList)
findReconnectableWorkflow(appPrevChatList)
const callbacks = {
onGetSuggestedQuestions: (responseItemId: string) => fetchSuggestedQuestions(responseItemId, appSourceType, appId),
onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted,
isPublicAPI: appSourceType === AppSourceType.webApp,
}
// Only resume the last paused workflow
if (lastPausedNode) {
handleSwitchSibling(
lastPausedNode.id,
{
onGetSuggestedQuestions: responseItemId => fetchSuggestedQuestions(responseItemId, appSourceType, appId),
onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted,
isPublicAPI: appSourceType === AppSourceType.webApp,
},
)
handleSwitchSibling(lastPausedNode.id, callbacks)
}
else if (lastRunningNode) {
handleReconnect(lastRunningNode.id, lastRunningNode.workflow_run_id!, callbacks)
}
}, [])

View File

@ -42,6 +42,8 @@ function getFormattedChatList(messages: any[]) {
citation: item.retriever_resources,
message_files: getProcessedFilesFromResponse(answerFiles.map((item: any) => ({ ...item, related_id: item.id }))),
parentMessageId: `question-${item.id}`,
workflow_run_id: item.workflow_run_id,
created_at: item.created_at,
})
})
return newChatList

View File

@ -9,7 +9,7 @@ export function ReactScanLoader() {
<Script
src="//unpkg.com/react-scan/dist/auto.global.js"
crossOrigin="anonymous"
strategy="afterInteractive"
strategy="beforeInteractive"
/>
)
}

View File

@ -1,13 +1,15 @@
'use client'
import type { FC, PropsWithChildren } from 'react'
import * as React from 'react'
import { useUserProfile } from '@/service/use-common'
import { useIsLogin } from '@/service/use-common'
import Loading from './base/loading'
const Splash: FC<PropsWithChildren> = () => {
const { isPending, data } = useUserProfile()
// would auto redirect to signin page if not logged in
const { isLoading, data: loginData } = useIsLogin()
const isLoggedIn = loginData?.logged_in
if (isPending || !data?.profile) {
if (isLoading || !isLoggedIn) {
return (
<div className="fixed inset-0 z-9999999 flex h-full items-center justify-center bg-background-body">
<Loading />

View File

@ -122,7 +122,7 @@ describe('useChat handleResume', () => {
})
expect(mockSseGet).toHaveBeenCalledWith(
'/workflow/wfr-1/events?include_state_snapshot=true',
'/workflow/wfr-1/events?include_state_snapshot=true&replay=true',
{},
expect.any(Object),
)

View File

@ -680,8 +680,10 @@ export const useChat = (
onGetSuggestedQuestions,
}: SendCallback,
) => {
// Re-subscribe to workflow events for the specific message
const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true`
// Re-subscribe to workflow events for the specific message.
// replay=true tells the backend to read from the beginning of the Redis Stream
// so all retained events are replayed on reconnection (e.g. page refresh).
const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true&replay=true`
const otherOptions: IOtherOptions = {
getAbortController: (abortController) => {