mirror of
https://github.com/langgenius/dify.git
synced 2026-04-16 18:46:14 +08:00
Compare commits
2 Commits
main
...
feat/repla
| Author | SHA1 | Date | |
|---|---|---|---|
| 53c0dde3d5 | |||
| a1759fccb8 |
@ -81,6 +81,7 @@ class WorkflowEventsApi(WebApiResource):
|
||||
raise InvalidArgumentError(f"cannot subscribe to workflow run, workflow_run_id={workflow_run.id}")
|
||||
|
||||
include_state_snapshot = request.args.get("include_state_snapshot", "false").lower() == "true"
|
||||
replay = request.args.get("replay", "false").lower() == "true"
|
||||
|
||||
def _generate_stream_events():
|
||||
if include_state_snapshot:
|
||||
@ -91,10 +92,11 @@ class WorkflowEventsApi(WebApiResource):
|
||||
tenant_id=app_model.tenant_id,
|
||||
app_id=app_model.id,
|
||||
session_maker=session_maker,
|
||||
replay=replay,
|
||||
)
|
||||
)
|
||||
return generator.convert_to_event_stream(
|
||||
msg_generator.retrieve_events(app_mode, workflow_run.id),
|
||||
msg_generator.retrieve_events(app_mode, workflow_run.id, replay=replay),
|
||||
)
|
||||
|
||||
event_generator = _generate_stream_events
|
||||
|
||||
@ -311,12 +311,14 @@ class MessageBasedAppGenerator(BaseAppGenerator):
|
||||
cls,
|
||||
app_mode: AppMode,
|
||||
workflow_run_id: str,
|
||||
idle_timeout=300,
|
||||
idle_timeout: float = 300,
|
||||
on_subscribe: Callable[[], None] | None = None,
|
||||
replay: bool = False,
|
||||
) -> Generator[Mapping | str, None, None]:
|
||||
topic = cls.get_response_topic(app_mode, workflow_run_id)
|
||||
return stream_topic_events(
|
||||
topic=topic,
|
||||
idle_timeout=idle_timeout,
|
||||
on_subscribe=on_subscribe,
|
||||
replay=replay,
|
||||
)
|
||||
|
||||
@ -23,9 +23,10 @@ class MessageGenerator:
|
||||
cls,
|
||||
app_mode: AppMode,
|
||||
workflow_run_id: str,
|
||||
idle_timeout=300,
|
||||
idle_timeout: float = 300,
|
||||
ping_interval: float = 10.0,
|
||||
on_subscribe: Callable[[], None] | None = None,
|
||||
replay: bool = False,
|
||||
) -> Generator[Mapping | str, None, None]:
|
||||
topic = cls.get_response_topic(app_mode, workflow_run_id)
|
||||
return stream_topic_events(
|
||||
@ -33,4 +34,5 @@ class MessageGenerator:
|
||||
idle_timeout=idle_timeout,
|
||||
ping_interval=ping_interval,
|
||||
on_subscribe=on_subscribe,
|
||||
replay=replay,
|
||||
)
|
||||
|
||||
@ -17,6 +17,7 @@ def stream_topic_events(
|
||||
ping_interval: float | None = None,
|
||||
on_subscribe: Callable[[], None] | None = None,
|
||||
terminal_events: Iterable[str | StreamEvent] | None = None,
|
||||
replay: bool = False,
|
||||
) -> Generator[Mapping[str, Any] | str, None, None]:
|
||||
# send a PING event immediately to prevent the connection staying in pending state for a long time.
|
||||
#
|
||||
@ -27,7 +28,7 @@ def stream_topic_events(
|
||||
terminal_values = _normalize_terminal_events(terminal_events)
|
||||
last_msg_time = time.time()
|
||||
last_ping_time = last_msg_time
|
||||
with topic.subscribe() as sub:
|
||||
with topic.subscribe(replay=replay) as sub:
|
||||
# on_subscribe fires only after the Redis subscription is active.
|
||||
# This is used to gate task start and reduce pub/sub race for the first event.
|
||||
if on_subscribe is not None:
|
||||
|
||||
@ -58,6 +58,7 @@ class MessageListItem(ResponseModel):
|
||||
message_files: list[MessageFile]
|
||||
status: str
|
||||
error: str | None = None
|
||||
workflow_run_id: str | None = None
|
||||
extra_contents: list[ExecutionExtraContentDomainModel]
|
||||
|
||||
@field_validator("inputs", mode="before")
|
||||
|
||||
@ -92,7 +92,14 @@ class Subscriber(Protocol):
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def subscribe(self) -> Subscription:
|
||||
def subscribe(self, *, replay: bool = False) -> Subscription:
|
||||
"""Create a new subscription.
|
||||
|
||||
:param replay: When True and the underlying transport supports message retention
|
||||
(e.g. Redis Streams), the subscription replays all buffered messages from
|
||||
the beginning of the stream before switching to live tail. Transports
|
||||
without retention (plain Pub/Sub) silently ignore this flag.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
|
||||
@ -44,7 +44,7 @@ class Topic:
|
||||
def as_subscriber(self) -> Subscriber:
|
||||
return self
|
||||
|
||||
def subscribe(self) -> Subscription:
|
||||
def subscribe(self, *, replay: bool = False) -> Subscription:
|
||||
return _RedisSubscription(
|
||||
client=self._client,
|
||||
pubsub=self._client.pubsub(),
|
||||
|
||||
@ -42,7 +42,7 @@ class ShardedTopic:
|
||||
def as_subscriber(self) -> Subscriber:
|
||||
return self
|
||||
|
||||
def subscribe(self) -> Subscription:
|
||||
def subscribe(self, *, replay: bool = False) -> Subscription:
|
||||
return _RedisShardedSubscription(
|
||||
client=self._client,
|
||||
pubsub=self._client.pubsub(),
|
||||
|
||||
@ -54,16 +54,17 @@ class StreamsTopic:
|
||||
def as_subscriber(self) -> Subscriber:
|
||||
return self
|
||||
|
||||
def subscribe(self) -> Subscription:
|
||||
return _StreamsSubscription(self._client, self._key)
|
||||
def subscribe(self, *, replay: bool = False) -> Subscription:
|
||||
return _StreamsSubscription(self._client, self._key, replay=replay)
|
||||
|
||||
|
||||
class _StreamsSubscription(Subscription):
|
||||
_SENTINEL = object()
|
||||
|
||||
def __init__(self, client: Redis | RedisCluster, key: str):
|
||||
def __init__(self, client: Redis | RedisCluster, key: str, *, replay: bool = False):
|
||||
self._client = client
|
||||
self._key = key
|
||||
self._replay = replay
|
||||
|
||||
self._queue: queue.Queue[object] = queue.Queue()
|
||||
|
||||
@ -76,7 +77,6 @@ class _StreamsSubscription(Subscription):
|
||||
# reading and writing the _listener / `_closed` attribute.
|
||||
self._lock = threading.Lock()
|
||||
self._closed: bool = False
|
||||
# self._closed = threading.Event()
|
||||
self._listener: threading.Thread | None = None
|
||||
|
||||
def _listen(self) -> None:
|
||||
@ -89,10 +89,9 @@ class _StreamsSubscription(Subscription):
|
||||
# since this method runs in a dedicated thread, acquiring `_lock` inside this method won't cause
|
||||
# deadlock.
|
||||
|
||||
# Setting initial last id to `$` to signal redis that we only want new messages.
|
||||
#
|
||||
# `"0"` replays all retained entries; `"$"` tails only new messages.
|
||||
# ref: https://redis.io/docs/latest/commands/xread/#the-special--id
|
||||
last_id = "$"
|
||||
last_id = "0" if self._replay else "$"
|
||||
try:
|
||||
while True:
|
||||
with self._lock:
|
||||
|
||||
@ -416,6 +416,7 @@ class AppGenerateService:
|
||||
cls,
|
||||
app_model: App,
|
||||
workflow_run: WorkflowRun,
|
||||
replay: bool = False,
|
||||
):
|
||||
if workflow_run.status.is_ended():
|
||||
# TODO(QuantumGhost): handled the ended scenario.
|
||||
@ -424,5 +425,5 @@ class AppGenerateService:
|
||||
generator = AdvancedChatAppGenerator()
|
||||
|
||||
return generator.convert_to_event_stream(
|
||||
generator.retrieve_events(AppMode(app_model.mode), workflow_run.id),
|
||||
generator.retrieve_events(AppMode(app_model.mode), workflow_run.id, replay=replay),
|
||||
)
|
||||
|
||||
@ -61,6 +61,7 @@ def build_workflow_event_stream(
|
||||
session_maker: sessionmaker[Session],
|
||||
idle_timeout: float = 300,
|
||||
ping_interval: float = 10.0,
|
||||
replay: bool = False,
|
||||
) -> Generator[Mapping[str, Any] | str, None, None]:
|
||||
topic = MessageGenerator.get_response_topic(app_mode, workflow_run.id)
|
||||
workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
|
||||
@ -103,7 +104,7 @@ def build_workflow_event_stream(
|
||||
last_msg_time = time.time()
|
||||
last_ping_time = last_msg_time
|
||||
|
||||
with topic.subscribe() as sub:
|
||||
with topic.subscribe(replay=replay) as sub:
|
||||
buffer_state = _start_buffering(sub)
|
||||
try:
|
||||
task_id = _resolve_task_id(resumption_context, buffer_state, workflow_run.id)
|
||||
|
||||
@ -5,6 +5,7 @@ import InSiteMessageNotification from '@/app/components/app/in-site-message/noti
|
||||
import AmplitudeProvider from '@/app/components/base/amplitude'
|
||||
import GA, { GaType } from '@/app/components/base/ga'
|
||||
import Zendesk from '@/app/components/base/zendesk'
|
||||
import GotoAnything from '@/app/components/goto-anything'
|
||||
import Header from '@/app/components/header'
|
||||
import HeaderWrapper from '@/app/components/header/header-wrapper'
|
||||
import ReadmePanel from '@/app/components/plugins/readme-panel'
|
||||
@ -12,15 +13,10 @@ import { AppContextProvider } from '@/context/app-context-provider'
|
||||
import { EventEmitterContextProvider } from '@/context/event-emitter-provider'
|
||||
import { ModalContextProvider } from '@/context/modal-context-provider'
|
||||
import { ProviderContextProvider } from '@/context/provider-context-provider'
|
||||
import dynamic from '@/next/dynamic'
|
||||
import PartnerStack from '../components/billing/partner-stack'
|
||||
import Splash from '../components/splash'
|
||||
import RoleRouteGuard from './role-route-guard'
|
||||
|
||||
const GotoAnything = dynamic(() => import('@/app/components/goto-anything'), {
|
||||
ssr: false,
|
||||
})
|
||||
|
||||
const Layout = ({ children }: { children: ReactNode }) => {
|
||||
return (
|
||||
<>
|
||||
|
||||
@ -125,6 +125,7 @@ const defaultChatHookReturn: Partial<ChatHookReturn> = {
|
||||
handleSend: vi.fn(),
|
||||
handleStop: vi.fn(),
|
||||
handleSwitchSibling: vi.fn(),
|
||||
handleReconnect: vi.fn(),
|
||||
isResponding: false,
|
||||
suggestedQuestions: [],
|
||||
}
|
||||
@ -605,6 +606,95 @@ describe('ChatWrapper', () => {
|
||||
expect(handleSwitchSibling).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should reconnect to a recent running workflow on mount', () => {
|
||||
const handleReconnect = vi.fn()
|
||||
vi.mocked(useChat).mockReturnValue({
|
||||
...defaultChatHookReturn,
|
||||
chatList: [],
|
||||
handleReconnect,
|
||||
} as unknown as ChatHookReturn)
|
||||
|
||||
vi.mocked(useChatWithHistoryContext).mockReturnValue({
|
||||
...defaultContextValue,
|
||||
appPrevChatTree: [{
|
||||
id: 'running-answer',
|
||||
isAnswer: true,
|
||||
content: 'partial content',
|
||||
workflow_run_id: 'run-active',
|
||||
created_at: Math.floor(Date.now() / 1000) - 30,
|
||||
children: [],
|
||||
} as unknown as ChatItemInTree],
|
||||
})
|
||||
|
||||
render(<ChatWrapper />)
|
||||
expect(handleReconnect).toHaveBeenCalledWith(
|
||||
'running-answer',
|
||||
'run-active',
|
||||
expect.objectContaining({ isPublicAPI: true }),
|
||||
)
|
||||
})
|
||||
|
||||
it('should not reconnect to an old workflow beyond the retention window', () => {
|
||||
const handleReconnect = vi.fn()
|
||||
vi.mocked(useChat).mockReturnValue({
|
||||
...defaultChatHookReturn,
|
||||
chatList: [],
|
||||
handleReconnect,
|
||||
} as unknown as ChatHookReturn)
|
||||
|
||||
vi.mocked(useChatWithHistoryContext).mockReturnValue({
|
||||
...defaultContextValue,
|
||||
appPrevChatTree: [{
|
||||
id: 'old-answer',
|
||||
isAnswer: true,
|
||||
content: 'old content',
|
||||
workflow_run_id: 'run-old',
|
||||
created_at: Math.floor(Date.now() / 1000) - 700,
|
||||
children: [],
|
||||
} as unknown as ChatItemInTree],
|
||||
})
|
||||
|
||||
render(<ChatWrapper />)
|
||||
expect(handleReconnect).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should prefer paused workflow over running workflow for reconnection', () => {
|
||||
const handleSwitchSibling = vi.fn()
|
||||
const handleReconnect = vi.fn()
|
||||
vi.mocked(useChat).mockReturnValue({
|
||||
...defaultChatHookReturn,
|
||||
chatList: [],
|
||||
handleSwitchSibling,
|
||||
handleReconnect,
|
||||
} as unknown as ChatHookReturn)
|
||||
|
||||
vi.mocked(useChatWithHistoryContext).mockReturnValue({
|
||||
...defaultContextValue,
|
||||
appPrevChatTree: [
|
||||
{
|
||||
id: 'running-answer',
|
||||
isAnswer: true,
|
||||
content: '',
|
||||
workflow_run_id: 'run-active',
|
||||
created_at: Math.floor(Date.now() / 1000) - 10,
|
||||
children: [],
|
||||
} as unknown as ChatItemInTree,
|
||||
{
|
||||
id: 'paused-answer',
|
||||
isAnswer: true,
|
||||
content: '',
|
||||
workflow_run_id: 'run-paused',
|
||||
humanInputFormDataList: [{ node_id: 'n-1' }],
|
||||
children: [],
|
||||
} as unknown as ChatItemInTree,
|
||||
],
|
||||
})
|
||||
|
||||
render(<ChatWrapper />)
|
||||
expect(handleSwitchSibling).toHaveBeenCalledWith('paused-answer', expect.any(Object))
|
||||
expect(handleReconnect).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should call stopChatMessageResponding when handleStop is triggered', () => {
|
||||
const handleStop = vi.fn()
|
||||
vi.mocked(useChat).mockReturnValue({
|
||||
|
||||
@ -79,6 +79,7 @@ const ChatWrapper = () => {
|
||||
handleSend,
|
||||
handleStop,
|
||||
handleSwitchSibling,
|
||||
handleReconnect,
|
||||
isResponding: respondingState,
|
||||
suggestedQuestions,
|
||||
} = useChat(
|
||||
@ -140,37 +141,47 @@ const ChatWrapper = () => {
|
||||
setIsResponding(respondingState)
|
||||
}, [respondingState, setIsResponding])
|
||||
|
||||
// Resume paused workflows when chat history is loaded
|
||||
// Resume paused workflows or reconnect to running workflows when chat history is loaded
|
||||
useEffect(() => {
|
||||
if (!appPrevChatTree || appPrevChatTree.length === 0)
|
||||
return
|
||||
|
||||
// Find the last answer item with workflow_run_id that needs resumption (DFS - find deepest first)
|
||||
let lastPausedNode: ChatItemInTree | undefined
|
||||
const findLastPausedWorkflow = (nodes: ChatItemInTree[]) => {
|
||||
nodes.forEach((node) => {
|
||||
// DFS: recurse to children first
|
||||
if (node.children && node.children.length > 0)
|
||||
findLastPausedWorkflow(node.children)
|
||||
const STREAM_RETENTION_SECONDS = 600
|
||||
|
||||
// Track the last node with humanInputFormDataList
|
||||
if (node.isAnswer && node.workflow_run_id && node.humanInputFormDataList && node.humanInputFormDataList.length > 0)
|
||||
lastPausedNode = node
|
||||
let lastPausedNode: ChatItemInTree | undefined
|
||||
let lastRunningNode: ChatItemInTree | undefined
|
||||
const findReconnectableWorkflow = (nodes: ChatItemInTree[]) => {
|
||||
nodes.forEach((node) => {
|
||||
if (node.isAnswer && node.workflow_run_id) {
|
||||
if (node.humanInputFormDataList && node.humanInputFormDataList.length > 0) {
|
||||
lastPausedNode = node
|
||||
}
|
||||
else if (
|
||||
node.created_at
|
||||
&& (Date.now() / 1000 - node.created_at) < STREAM_RETENTION_SECONDS
|
||||
) {
|
||||
lastRunningNode = node
|
||||
}
|
||||
}
|
||||
|
||||
if (node.children && node.children.length > 0)
|
||||
findReconnectableWorkflow(node.children)
|
||||
})
|
||||
}
|
||||
|
||||
findLastPausedWorkflow(appPrevChatTree)
|
||||
findReconnectableWorkflow(appPrevChatTree)
|
||||
|
||||
const callbacks = {
|
||||
onGetSuggestedQuestions: (responseItemId: string) => fetchSuggestedQuestions(responseItemId, appSourceType, appId),
|
||||
onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted,
|
||||
isPublicAPI: appSourceType === AppSourceType.webApp,
|
||||
}
|
||||
|
||||
// Only resume the last paused workflow
|
||||
if (lastPausedNode) {
|
||||
handleSwitchSibling(
|
||||
lastPausedNode.id,
|
||||
{
|
||||
onGetSuggestedQuestions: responseItemId => fetchSuggestedQuestions(responseItemId, appSourceType, appId),
|
||||
onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted,
|
||||
isPublicAPI: appSourceType === AppSourceType.webApp,
|
||||
},
|
||||
)
|
||||
handleSwitchSibling(lastPausedNode.id, callbacks)
|
||||
}
|
||||
else if (lastRunningNode) {
|
||||
handleReconnect(lastRunningNode.id, lastRunningNode.workflow_run_id!, callbacks)
|
||||
}
|
||||
}, [])
|
||||
|
||||
|
||||
@ -35,12 +35,12 @@ function getFormattedChatList(messages: any[]) {
|
||||
const answerFiles = item.message_files?.filter((file: any) => file.belongs_to === 'assistant') || []
|
||||
const humanInputFormDataList: HumanInputFormData[] = []
|
||||
const humanInputFilledFormDataList: HumanInputFilledFormData[] = []
|
||||
let workflowRunId = ''
|
||||
let workflowRunIdFromExtra = ''
|
||||
if (item.status === 'paused') {
|
||||
item.extra_contents?.forEach((content: ExtraContent) => {
|
||||
if (content.type === 'human_input' && !content.submitted) {
|
||||
humanInputFormDataList.push(content.form_definition)
|
||||
workflowRunId = content.workflow_run_id
|
||||
workflowRunIdFromExtra = content.workflow_run_id
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -62,7 +62,8 @@ function getFormattedChatList(messages: any[]) {
|
||||
parentMessageId: `question-${item.id}`,
|
||||
humanInputFormDataList,
|
||||
humanInputFilledFormDataList,
|
||||
workflow_run_id: workflowRunId,
|
||||
workflow_run_id: item.workflow_run_id || workflowRunIdFromExtra,
|
||||
created_at: item.created_at,
|
||||
})
|
||||
})
|
||||
return newChatList
|
||||
|
||||
@ -932,7 +932,7 @@ describe('useChat', () => {
|
||||
})
|
||||
|
||||
expect(sseGet).toHaveBeenCalledWith(
|
||||
'/workflow/wr-1/events?include_state_snapshot=true',
|
||||
'/workflow/wr-1/events?include_state_snapshot=true&replay=true',
|
||||
expect.any(Object),
|
||||
expect.any(Object),
|
||||
)
|
||||
@ -1264,6 +1264,102 @@ describe('useChat', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('handleReconnect', () => {
|
||||
it('should call sseGet with include_state_snapshot and rebuild from snapshot events', () => {
|
||||
let callbacks: HookCallbacks
|
||||
|
||||
vi.mocked(sseGet).mockImplementation(async (_url, _params, options) => {
|
||||
callbacks = options as HookCallbacks
|
||||
})
|
||||
|
||||
const prevChatTree = [{
|
||||
id: 'q-1',
|
||||
content: 'query',
|
||||
isAnswer: false,
|
||||
children: [{
|
||||
id: 'm-reconnect',
|
||||
content: 'stale partial content',
|
||||
isAnswer: true,
|
||||
siblingIndex: 0,
|
||||
workflowProcess: { status: 'running', tracing: [{ node_id: 'old-node' }] },
|
||||
}],
|
||||
}]
|
||||
|
||||
const { result } = renderHook(() => useChat(undefined, undefined, prevChatTree as ChatItemInTree[]))
|
||||
|
||||
act(() => {
|
||||
result.current.handleReconnect('m-reconnect', 'wr-reconnect', { isPublicAPI: true })
|
||||
})
|
||||
|
||||
expect(sseGet).toHaveBeenCalledWith(
|
||||
'/workflow/wr-reconnect/events?include_state_snapshot=true',
|
||||
expect.any(Object),
|
||||
expect.any(Object),
|
||||
)
|
||||
|
||||
// Content is not reset until onWorkflowStarted fires
|
||||
const beforeStart = result.current.chatList[1]
|
||||
expect(beforeStart.content).toBe('stale partial content')
|
||||
|
||||
act(() => {
|
||||
callbacks.onWorkflowStarted({ workflow_run_id: 'wr-reconnect', task_id: 't-1' })
|
||||
})
|
||||
|
||||
// After onWorkflowStarted, content is reset and workflowProcess is fresh
|
||||
const afterStart = result.current.chatList[1]
|
||||
expect(afterStart.content).toBe('')
|
||||
expect(afterStart.workflowProcess).toEqual({ status: WorkflowRunningStatus.Running, tracing: [] })
|
||||
|
||||
act(() => {
|
||||
callbacks.onMessageReplace({ answer: 'full snapshot text' })
|
||||
callbacks.onNodeStarted({ data: { node_id: 'n-1', id: 'n-1', title: 'Node 1' } })
|
||||
callbacks.onNodeFinished({ data: { node_id: 'n-1', id: 'n-1', title: 'Node 1', status: 'succeeded' } })
|
||||
callbacks.onWorkflowFinished({ data: { status: 'succeeded' } })
|
||||
callbacks.onCompleted()
|
||||
})
|
||||
|
||||
const lastResponse = result.current.chatList[1]
|
||||
expect(lastResponse.content).toBe('full snapshot text')
|
||||
expect(lastResponse.workflowProcess?.status).toBe('succeeded')
|
||||
expect(lastResponse.workflowProcess?.tracing).toHaveLength(1)
|
||||
expect(result.current.isResponding).toBe(false)
|
||||
})
|
||||
|
||||
it('should abort previous stream when reconnecting again', () => {
|
||||
const callbacksList: HookCallbacks[] = []
|
||||
vi.mocked(sseGet).mockImplementation(async (_url, _params, options) => {
|
||||
callbacksList.push(options as HookCallbacks)
|
||||
})
|
||||
|
||||
const prevChatTree = [{
|
||||
id: 'q-1',
|
||||
content: 'query',
|
||||
isAnswer: false,
|
||||
children: [{
|
||||
id: 'm-rc',
|
||||
content: 'partial',
|
||||
isAnswer: true,
|
||||
siblingIndex: 0,
|
||||
}],
|
||||
}]
|
||||
|
||||
const { result } = renderHook(() => useChat(undefined, undefined, prevChatTree as ChatItemInTree[]))
|
||||
const previousAbort = createAbortControllerMock()
|
||||
|
||||
act(() => {
|
||||
result.current.handleReconnect('m-rc', 'wr-1', { isPublicAPI: true })
|
||||
})
|
||||
act(() => {
|
||||
callbacksList[0].getAbortController(previousAbort)
|
||||
})
|
||||
act(() => {
|
||||
result.current.handleReconnect('m-rc', 'wr-2', { isPublicAPI: true })
|
||||
})
|
||||
|
||||
expect(previousAbort.abort).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
})
|
||||
|
||||
describe('createAudioPlayerManager branch cases', () => {
|
||||
it('should handle ttsUrl generation for appId with installed apps', async () => {
|
||||
vi.mocked(usePathname).mockReturnValue('/explore/installed/app')
|
||||
@ -1331,7 +1427,7 @@ describe('useChat', () => {
|
||||
})
|
||||
|
||||
expect(sseGet).toHaveBeenCalledWith(
|
||||
'/workflow/wr-tts-app/events?include_state_snapshot=true',
|
||||
'/workflow/wr-tts-app/events?include_state_snapshot=true&replay=true',
|
||||
expect.any(Object),
|
||||
expect.any(Object),
|
||||
)
|
||||
@ -1493,7 +1589,7 @@ describe('useChat', () => {
|
||||
|
||||
// Should automatically call handleResume -> sseGet for human input
|
||||
expect(sseGet).toHaveBeenCalledWith(
|
||||
'/workflow/wr-1/events?include_state_snapshot=true',
|
||||
'/workflow/wr-1/events?include_state_snapshot=true&replay=true',
|
||||
expect.any(Object),
|
||||
expect.any(Object),
|
||||
)
|
||||
|
||||
@ -244,8 +244,10 @@ export const useChat = (
|
||||
}: SendCallback,
|
||||
) => {
|
||||
const getOrCreatePlayer = createAudioPlayerManager()
|
||||
// Re-subscribe to workflow events for the specific message
|
||||
const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true`
|
||||
// Re-subscribe to workflow events for the specific message.
|
||||
// replay=true tells the backend to read from the beginning of the Redis Stream
|
||||
// so all retained events are replayed on reconnection (e.g. page refresh).
|
||||
const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true&replay=true`
|
||||
|
||||
const otherOptions: IOtherOptions = {
|
||||
isPublicAPI,
|
||||
@ -583,6 +585,320 @@ export const useChat = (
|
||||
)
|
||||
}, [updateChatTreeNode, handleResponding, createAudioPlayerManager, config?.suggested_questions_after_answer])
|
||||
|
||||
const handleReconnect = useCallback((
|
||||
messageId: string,
|
||||
workflowRunId: string,
|
||||
{
|
||||
onGetSuggestedQuestions,
|
||||
onConversationComplete,
|
||||
isPublicAPI,
|
||||
}: SendCallback,
|
||||
) => {
|
||||
const getOrCreatePlayer = createAudioPlayerManager()
|
||||
const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true&replay=true`
|
||||
|
||||
const otherOptions: IOtherOptions = {
|
||||
isPublicAPI,
|
||||
getAbortController: (abortController) => {
|
||||
workflowEventsAbortControllerRef.current = abortController
|
||||
},
|
||||
onData: (message: string, isFirstMessage: boolean, { conversationId: newConversationId, messageId: msgId, taskId }: IOnDataMoreInfo) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
const isAgentMode = responseItem.agent_thoughts && responseItem.agent_thoughts.length > 0
|
||||
if (!isAgentMode) {
|
||||
responseItem.content = responseItem.content + message
|
||||
}
|
||||
else {
|
||||
const lastThought = responseItem.agent_thoughts?.[responseItem.agent_thoughts?.length - 1]
|
||||
if (lastThought)
|
||||
lastThought.thought = lastThought.thought + message
|
||||
}
|
||||
if (msgId)
|
||||
responseItem.id = msgId
|
||||
})
|
||||
|
||||
if (isFirstMessage && newConversationId)
|
||||
conversationIdRef.current = newConversationId
|
||||
|
||||
if (taskId)
|
||||
taskIdRef.current = taskId
|
||||
},
|
||||
async onCompleted(hasError?: boolean) {
|
||||
handleResponding(false)
|
||||
|
||||
if (hasError)
|
||||
return
|
||||
|
||||
if (onConversationComplete)
|
||||
onConversationComplete(conversationIdRef.current)
|
||||
|
||||
if (config?.suggested_questions_after_answer?.enabled && !hasStopRespondedRef.current && onGetSuggestedQuestions) {
|
||||
try {
|
||||
const { data }: any = await onGetSuggestedQuestions(
|
||||
messageId,
|
||||
newAbortController => suggestedQuestionsAbortControllerRef.current = newAbortController,
|
||||
)
|
||||
setSuggestedQuestions(data)
|
||||
}
|
||||
catch {
|
||||
setSuggestedQuestions([])
|
||||
}
|
||||
}
|
||||
},
|
||||
onFile(file) {
|
||||
const fileType = (file as { type?: string }).type || 'image'
|
||||
const baseFile = ('transferMethod' in file) ? (file as Partial<FileEntity>) : null
|
||||
const convertedFile: FileEntity = {
|
||||
id: baseFile?.id || (file as { id: string }).id,
|
||||
type: baseFile?.type || (fileType === 'image' ? 'image/png' : fileType === 'video' ? 'video/mp4' : fileType === 'audio' ? 'audio/mpeg' : 'application/octet-stream'),
|
||||
transferMethod: (baseFile?.transferMethod as FileEntity['transferMethod']) || (fileType === 'image' ? 'remote_url' : 'local_file'),
|
||||
uploadedId: baseFile?.uploadedId || (file as { id: string }).id,
|
||||
supportFileType: baseFile?.supportFileType || (fileType === 'image' ? 'image' : fileType === 'video' ? 'video' : fileType === 'audio' ? 'audio' : 'document'),
|
||||
progress: baseFile?.progress ?? 100,
|
||||
name: baseFile?.name || `generated_${fileType}.${fileType === 'image' ? 'png' : fileType === 'video' ? 'mp4' : fileType === 'audio' ? 'mp3' : 'bin'}`,
|
||||
url: baseFile?.url || (file as { url?: string }).url,
|
||||
size: baseFile?.size ?? 0,
|
||||
}
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
const lastThought = responseItem.agent_thoughts?.[responseItem.agent_thoughts?.length - 1]
|
||||
if (lastThought) {
|
||||
responseItem.agent_thoughts!.at(-1)!.message_files = [...(lastThought as any).message_files, convertedFile]
|
||||
}
|
||||
else {
|
||||
const currentFiles = (responseItem.message_files as FileEntity[] | undefined) ?? []
|
||||
responseItem.message_files = [...currentFiles, convertedFile]
|
||||
}
|
||||
})
|
||||
},
|
||||
onThought(thought) {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (thought.message_id)
|
||||
responseItem.id = thought.message_id
|
||||
if (thought.conversation_id)
|
||||
responseItem.conversationId = thought.conversation_id
|
||||
if (!responseItem.agent_thoughts)
|
||||
responseItem.agent_thoughts = []
|
||||
if (responseItem.agent_thoughts.length === 0) {
|
||||
responseItem.agent_thoughts.push(thought)
|
||||
}
|
||||
else {
|
||||
const lastThought = responseItem.agent_thoughts.at(-1)
|
||||
if (lastThought?.id === thought.id) {
|
||||
thought.thought = lastThought.thought
|
||||
thought.message_files = lastThought.message_files
|
||||
responseItem.agent_thoughts[responseItem.agent_thoughts.length - 1] = thought
|
||||
}
|
||||
else {
|
||||
responseItem.agent_thoughts.push(thought)
|
||||
}
|
||||
}
|
||||
})
|
||||
},
|
||||
onMessageEnd: (messageEnd) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (messageEnd.metadata?.annotation_reply) {
|
||||
responseItem.annotation = ({
|
||||
id: messageEnd.metadata.annotation_reply.id,
|
||||
authorName: messageEnd.metadata.annotation_reply.account.name,
|
||||
})
|
||||
return
|
||||
}
|
||||
responseItem.citation = messageEnd.metadata?.retriever_resources || []
|
||||
const processedFilesFromResponse = getProcessedFilesFromResponse(messageEnd.files || [])
|
||||
responseItem.allFiles = uniqBy([...(responseItem.allFiles || []), ...(processedFilesFromResponse || [])], 'id')
|
||||
})
|
||||
},
|
||||
onMessageReplace: (messageReplace) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
responseItem.content = messageReplace.answer
|
||||
})
|
||||
},
|
||||
onError() {
|
||||
handleResponding(false)
|
||||
},
|
||||
onWorkflowStarted: ({ workflow_run_id, task_id }) => {
|
||||
handleResponding(true)
|
||||
hasStopRespondedRef.current = false
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
taskIdRef.current = task_id
|
||||
responseItem.content = ''
|
||||
responseItem.workflow_run_id = workflow_run_id
|
||||
responseItem.workflowProcess = {
|
||||
status: WorkflowRunningStatus.Running,
|
||||
tracing: [],
|
||||
}
|
||||
})
|
||||
},
|
||||
onWorkflowFinished: ({ data: workflowFinishedData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (responseItem.workflowProcess)
|
||||
responseItem.workflowProcess.status = workflowFinishedData.status as WorkflowRunningStatus
|
||||
})
|
||||
},
|
||||
onIterationStart: ({ data: iterationStartedData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (!responseItem.workflowProcess)
|
||||
return
|
||||
if (!responseItem.workflowProcess.tracing)
|
||||
responseItem.workflowProcess.tracing = []
|
||||
responseItem.workflowProcess.tracing.push({
|
||||
...iterationStartedData,
|
||||
status: WorkflowRunningStatus.Running,
|
||||
})
|
||||
})
|
||||
},
|
||||
onIterationFinish: ({ data: iterationFinishedData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (!responseItem.workflowProcess?.tracing)
|
||||
return
|
||||
const tracing = responseItem.workflowProcess.tracing
|
||||
const iterationIndex = tracing.findIndex(item => item.node_id === iterationFinishedData.node_id
|
||||
&& (item.execution_metadata?.parallel_id === iterationFinishedData.execution_metadata?.parallel_id || item.parallel_id === iterationFinishedData.execution_metadata?.parallel_id))!
|
||||
if (iterationIndex > -1) {
|
||||
tracing[iterationIndex] = {
|
||||
...tracing[iterationIndex],
|
||||
...iterationFinishedData,
|
||||
status: WorkflowRunningStatus.Succeeded,
|
||||
}
|
||||
}
|
||||
})
|
||||
},
|
||||
onNodeStarted: ({ data: nodeStartedData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (!responseItem.workflowProcess)
|
||||
return
|
||||
if (!responseItem.workflowProcess.tracing)
|
||||
responseItem.workflowProcess.tracing = []
|
||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === nodeStartedData.node_id)
|
||||
if (currentIndex > -1) {
|
||||
responseItem.workflowProcess.tracing[currentIndex] = {
|
||||
...nodeStartedData,
|
||||
status: NodeRunningStatus.Running,
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (nodeStartedData.iteration_id)
|
||||
return
|
||||
responseItem.workflowProcess.tracing.push({
|
||||
...nodeStartedData,
|
||||
status: WorkflowRunningStatus.Running,
|
||||
})
|
||||
}
|
||||
})
|
||||
},
|
||||
onNodeFinished: ({ data: nodeFinishedData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (!responseItem.workflowProcess?.tracing)
|
||||
return
|
||||
if (nodeFinishedData.iteration_id)
|
||||
return
|
||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex((item) => {
|
||||
if (!item.execution_metadata?.parallel_id)
|
||||
return item.id === nodeFinishedData.id
|
||||
return item.id === nodeFinishedData.id && (item.execution_metadata?.parallel_id === nodeFinishedData.execution_metadata?.parallel_id)
|
||||
})
|
||||
if (currentIndex > -1)
|
||||
responseItem.workflowProcess.tracing[currentIndex] = nodeFinishedData as any
|
||||
})
|
||||
},
|
||||
onTTSChunk: (msgId: string, audio: string) => {
|
||||
if (!audio || audio === '')
|
||||
return
|
||||
const audioPlayer = getOrCreatePlayer()
|
||||
if (audioPlayer) {
|
||||
audioPlayer.playAudioWithAudio(audio, true)
|
||||
AudioPlayerManager.getInstance().resetMsgId(msgId)
|
||||
}
|
||||
},
|
||||
onTTSEnd: (_msgId: string, audio: string) => {
|
||||
const audioPlayer = getOrCreatePlayer()
|
||||
if (audioPlayer)
|
||||
audioPlayer.playAudioWithAudio(audio, false)
|
||||
},
|
||||
onLoopStart: ({ data: loopStartedData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (!responseItem.workflowProcess)
|
||||
return
|
||||
if (!responseItem.workflowProcess.tracing)
|
||||
responseItem.workflowProcess.tracing = []
|
||||
responseItem.workflowProcess.tracing.push({
|
||||
...loopStartedData,
|
||||
status: WorkflowRunningStatus.Running,
|
||||
})
|
||||
})
|
||||
},
|
||||
onLoopFinish: ({ data: loopFinishedData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (!responseItem.workflowProcess?.tracing)
|
||||
return
|
||||
const tracing = responseItem.workflowProcess.tracing
|
||||
const loopIndex = tracing.findIndex(item => item.node_id === loopFinishedData.node_id
|
||||
&& (item.execution_metadata?.parallel_id === loopFinishedData.execution_metadata?.parallel_id || item.parallel_id === loopFinishedData.execution_metadata?.parallel_id))!
|
||||
if (loopIndex > -1) {
|
||||
tracing[loopIndex] = {
|
||||
...tracing[loopIndex],
|
||||
...loopFinishedData,
|
||||
status: WorkflowRunningStatus.Succeeded,
|
||||
}
|
||||
}
|
||||
})
|
||||
},
|
||||
onHumanInputRequired: ({ data: humanInputRequiredData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (!responseItem.humanInputFormDataList) {
|
||||
responseItem.humanInputFormDataList = [humanInputRequiredData]
|
||||
}
|
||||
else {
|
||||
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputRequiredData.node_id)
|
||||
if (currentFormIndex > -1)
|
||||
responseItem.humanInputFormDataList[currentFormIndex] = humanInputRequiredData
|
||||
else
|
||||
responseItem.humanInputFormDataList.push(humanInputRequiredData)
|
||||
}
|
||||
if (responseItem.workflowProcess?.tracing) {
|
||||
const currentTracingIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === humanInputRequiredData.node_id)
|
||||
if (currentTracingIndex > -1)
|
||||
responseItem.workflowProcess.tracing[currentTracingIndex].status = NodeRunningStatus.Paused
|
||||
}
|
||||
})
|
||||
},
|
||||
onHumanInputFormFilled: ({ data: humanInputFilledFormData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (responseItem.humanInputFormDataList?.length) {
|
||||
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputFilledFormData.node_id)
|
||||
if (currentFormIndex > -1)
|
||||
responseItem.humanInputFormDataList.splice(currentFormIndex, 1)
|
||||
}
|
||||
if (!responseItem.humanInputFilledFormDataList)
|
||||
responseItem.humanInputFilledFormDataList = [humanInputFilledFormData]
|
||||
else
|
||||
responseItem.humanInputFilledFormDataList.push(humanInputFilledFormData)
|
||||
})
|
||||
},
|
||||
onHumanInputFormTimeout: ({ data: humanInputFormTimeoutData }) => {
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
if (responseItem.humanInputFormDataList?.length) {
|
||||
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputFormTimeoutData.node_id)
|
||||
responseItem.humanInputFormDataList[currentFormIndex].expiration_time = humanInputFormTimeoutData.expiration_time
|
||||
}
|
||||
})
|
||||
},
|
||||
onWorkflowPaused: ({ data: workflowPausedData }) => {
|
||||
const resumeUrl = `/workflow/${workflowPausedData.workflow_run_id}/events`
|
||||
pausedStateRef.current = true
|
||||
sseGet(resumeUrl, {}, otherOptions)
|
||||
updateChatTreeNode(messageId, (responseItem) => {
|
||||
responseItem.workflowProcess!.status = WorkflowRunningStatus.Paused
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
if (workflowEventsAbortControllerRef.current)
|
||||
workflowEventsAbortControllerRef.current.abort()
|
||||
|
||||
sseGet(url, {}, otherOptions)
|
||||
}, [updateChatTreeNode, handleResponding, createAudioPlayerManager, config?.suggested_questions_after_answer])
|
||||
|
||||
const updateCurrentQAOnTree = useCallback(({
|
||||
parentId,
|
||||
responseItem,
|
||||
@ -1275,6 +1591,7 @@ export const useChat = (
|
||||
setIsResponding,
|
||||
handleSend,
|
||||
handleResume,
|
||||
handleReconnect,
|
||||
handleSwitchSibling,
|
||||
suggestedQuestions,
|
||||
handleRestart,
|
||||
|
||||
@ -111,6 +111,7 @@ export type IChatItem = {
|
||||
agent_thoughts?: ThoughtItem[]
|
||||
message_files?: FileEntity[]
|
||||
workflow_run_id?: string
|
||||
created_at?: number
|
||||
// for agent log
|
||||
conversationId?: string
|
||||
input?: any
|
||||
|
||||
@ -177,6 +177,7 @@ const createUseChatReturn = (overrides: Partial<UseChatReturn> = {}): UseChatRet
|
||||
setTargetMessageId: vi.fn() as UseChatReturn['setTargetMessageId'],
|
||||
handleSend: vi.fn(),
|
||||
handleResume: vi.fn(),
|
||||
handleReconnect: vi.fn(),
|
||||
setIsResponding: vi.fn() as UseChatReturn['setIsResponding'],
|
||||
handleStop: vi.fn(),
|
||||
handleSwitchSibling: vi.fn(),
|
||||
@ -541,6 +542,52 @@ describe('EmbeddedChatbot chat-wrapper', () => {
|
||||
expect(handleSwitchSibling).toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should reconnect to a recent running workflow on mount', () => {
|
||||
const handleReconnect = vi.fn()
|
||||
vi.mocked(useChat).mockReturnValue(createUseChatReturn({
|
||||
handleReconnect,
|
||||
}))
|
||||
vi.mocked(useEmbeddedChatbotContext).mockReturnValue(createContextValue({
|
||||
appPrevChatList: [
|
||||
{
|
||||
id: 'running-node',
|
||||
isAnswer: true,
|
||||
content: 'partial',
|
||||
workflow_run_id: 'run-active',
|
||||
created_at: Math.floor(Date.now() / 1000) - 30,
|
||||
children: [],
|
||||
} as unknown as ChatItemInTree,
|
||||
],
|
||||
}))
|
||||
render(<ChatWrapper />)
|
||||
expect(handleReconnect).toHaveBeenCalledWith(
|
||||
'running-node',
|
||||
'run-active',
|
||||
expect.objectContaining({ isPublicAPI: true }),
|
||||
)
|
||||
})
|
||||
|
||||
it('should not reconnect to an old workflow beyond the retention window', () => {
|
||||
const handleReconnect = vi.fn()
|
||||
vi.mocked(useChat).mockReturnValue(createUseChatReturn({
|
||||
handleReconnect,
|
||||
}))
|
||||
vi.mocked(useEmbeddedChatbotContext).mockReturnValue(createContextValue({
|
||||
appPrevChatList: [
|
||||
{
|
||||
id: 'old-node',
|
||||
isAnswer: true,
|
||||
content: 'done',
|
||||
workflow_run_id: 'run-old',
|
||||
created_at: Math.floor(Date.now() / 1000) - 700,
|
||||
children: [],
|
||||
} as unknown as ChatItemInTree,
|
||||
],
|
||||
}))
|
||||
render(<ChatWrapper />)
|
||||
expect(handleReconnect).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should handle conversation completion and suggested questions in chat actions', async () => {
|
||||
const handleSend = vi.fn()
|
||||
vi.mocked(useChat).mockReturnValue(createUseChatReturn({
|
||||
|
||||
@ -85,6 +85,7 @@ const ChatWrapper = () => {
|
||||
handleSend,
|
||||
handleStop,
|
||||
handleSwitchSibling,
|
||||
handleReconnect,
|
||||
isResponding: respondingState,
|
||||
suggestedQuestions,
|
||||
} = useChat(
|
||||
@ -142,37 +143,47 @@ const ChatWrapper = () => {
|
||||
setIsResponding(respondingState)
|
||||
}, [respondingState, setIsResponding])
|
||||
|
||||
// Resume paused workflows when chat history is loaded
|
||||
// Resume paused workflows or reconnect to running workflows when chat history is loaded
|
||||
useEffect(() => {
|
||||
if (!appPrevChatList || appPrevChatList.length === 0)
|
||||
return
|
||||
|
||||
// Find the last answer item with workflow_run_id that needs resumption (DFS - find deepest first)
|
||||
let lastPausedNode: ChatItemInTree | undefined
|
||||
const findLastPausedWorkflow = (nodes: ChatItemInTree[]) => {
|
||||
nodes.forEach((node) => {
|
||||
// DFS: recurse to children first
|
||||
if (node.children && node.children.length > 0)
|
||||
findLastPausedWorkflow(node.children)
|
||||
const STREAM_RETENTION_SECONDS = 600
|
||||
|
||||
// Track the last node with humanInputFormDataList
|
||||
if (node.isAnswer && node.workflow_run_id && node.humanInputFormDataList && node.humanInputFormDataList.length > 0)
|
||||
lastPausedNode = node
|
||||
let lastPausedNode: ChatItemInTree | undefined
|
||||
let lastRunningNode: ChatItemInTree | undefined
|
||||
const findReconnectableWorkflow = (nodes: ChatItemInTree[]) => {
|
||||
nodes.forEach((node) => {
|
||||
if (node.isAnswer && node.workflow_run_id) {
|
||||
if (node.humanInputFormDataList && node.humanInputFormDataList.length > 0) {
|
||||
lastPausedNode = node
|
||||
}
|
||||
else if (
|
||||
node.created_at
|
||||
&& (Date.now() / 1000 - node.created_at) < STREAM_RETENTION_SECONDS
|
||||
) {
|
||||
lastRunningNode = node
|
||||
}
|
||||
}
|
||||
|
||||
if (node.children && node.children.length > 0)
|
||||
findReconnectableWorkflow(node.children)
|
||||
})
|
||||
}
|
||||
|
||||
findLastPausedWorkflow(appPrevChatList)
|
||||
findReconnectableWorkflow(appPrevChatList)
|
||||
|
||||
const callbacks = {
|
||||
onGetSuggestedQuestions: (responseItemId: string) => fetchSuggestedQuestions(responseItemId, appSourceType, appId),
|
||||
onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted,
|
||||
isPublicAPI: appSourceType === AppSourceType.webApp,
|
||||
}
|
||||
|
||||
// Only resume the last paused workflow
|
||||
if (lastPausedNode) {
|
||||
handleSwitchSibling(
|
||||
lastPausedNode.id,
|
||||
{
|
||||
onGetSuggestedQuestions: responseItemId => fetchSuggestedQuestions(responseItemId, appSourceType, appId),
|
||||
onConversationComplete: currentConversationId ? undefined : handleNewConversationCompleted,
|
||||
isPublicAPI: appSourceType === AppSourceType.webApp,
|
||||
},
|
||||
)
|
||||
handleSwitchSibling(lastPausedNode.id, callbacks)
|
||||
}
|
||||
else if (lastRunningNode) {
|
||||
handleReconnect(lastRunningNode.id, lastRunningNode.workflow_run_id!, callbacks)
|
||||
}
|
||||
}, [])
|
||||
|
||||
|
||||
@ -42,6 +42,8 @@ function getFormattedChatList(messages: any[]) {
|
||||
citation: item.retriever_resources,
|
||||
message_files: getProcessedFilesFromResponse(answerFiles.map((item: any) => ({ ...item, related_id: item.id }))),
|
||||
parentMessageId: `question-${item.id}`,
|
||||
workflow_run_id: item.workflow_run_id,
|
||||
created_at: item.created_at,
|
||||
})
|
||||
})
|
||||
return newChatList
|
||||
|
||||
@ -9,7 +9,7 @@ export function ReactScanLoader() {
|
||||
<Script
|
||||
src="//unpkg.com/react-scan/dist/auto.global.js"
|
||||
crossOrigin="anonymous"
|
||||
strategy="afterInteractive"
|
||||
strategy="beforeInteractive"
|
||||
/>
|
||||
)
|
||||
}
|
||||
|
||||
@ -1,13 +1,15 @@
|
||||
'use client'
|
||||
import type { FC, PropsWithChildren } from 'react'
|
||||
import * as React from 'react'
|
||||
import { useUserProfile } from '@/service/use-common'
|
||||
import { useIsLogin } from '@/service/use-common'
|
||||
import Loading from './base/loading'
|
||||
|
||||
const Splash: FC<PropsWithChildren> = () => {
|
||||
const { isPending, data } = useUserProfile()
|
||||
// would auto redirect to signin page if not logged in
|
||||
const { isLoading, data: loginData } = useIsLogin()
|
||||
const isLoggedIn = loginData?.logged_in
|
||||
|
||||
if (isPending || !data?.profile) {
|
||||
if (isLoading || !isLoggedIn) {
|
||||
return (
|
||||
<div className="fixed inset-0 z-9999999 flex h-full items-center justify-center bg-background-body">
|
||||
<Loading />
|
||||
|
||||
@ -122,7 +122,7 @@ describe('useChat – handleResume', () => {
|
||||
})
|
||||
|
||||
expect(mockSseGet).toHaveBeenCalledWith(
|
||||
'/workflow/wfr-1/events?include_state_snapshot=true',
|
||||
'/workflow/wfr-1/events?include_state_snapshot=true&replay=true',
|
||||
{},
|
||||
expect.any(Object),
|
||||
)
|
||||
|
||||
@ -680,8 +680,10 @@ export const useChat = (
|
||||
onGetSuggestedQuestions,
|
||||
}: SendCallback,
|
||||
) => {
|
||||
// Re-subscribe to workflow events for the specific message
|
||||
const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true`
|
||||
// Re-subscribe to workflow events for the specific message.
|
||||
// replay=true tells the backend to read from the beginning of the Redis Stream
|
||||
// so all retained events are replayed on reconnection (e.g. page refresh).
|
||||
const url = `/workflow/${workflowRunId}/events?include_state_snapshot=true&replay=true`
|
||||
|
||||
const otherOptions: IOtherOptions = {
|
||||
getAbortController: (abortController) => {
|
||||
|
||||
Reference in New Issue
Block a user