From 116ec9dd04e56b2d056a76ed68520ffaa0ff85f0 Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Tue, 27 Jan 2026 08:46:41 +0800 Subject: [PATCH] Read the current implementation and consider the following problem (vibe-kanban 25d8b973) Currently, the HumanInput node yields a `HumanInputFormFilledEvent` event while form is submmited. However, for form level timeout, current no event about timeout is emitted. This makes the frontend UI not updated while the events of time out are sent to the frontend. Analysis this problem, propose a way to resolve this issue. --- .../advanced_chat/generate_task_pipeline.py | 10 +++ .../common/workflow_response_converter.py | 16 ++++ .../apps/workflow/generate_task_pipeline.py | 10 +++ api/core/app/apps/workflow_app_runner.py | 11 +++ api/core/app/entities/queue_entities.py | 14 ++++ api/core/app/entities/task_entities.py | 16 ++++ api/core/workflow/graph_events/__init__.py | 2 + api/core/workflow/graph_events/node.py | 7 ++ api/core/workflow/node_events/__init__.py | 2 + api/core/workflow/node_events/node.py | 7 ++ api/core/workflow/nodes/base/node.py | 12 +++ .../nodes/human_input/human_input_node.py | 11 ++- ...workflow_response_converter_human_input.py | 27 ++++++- .../nodes/human_input/test_entities.py | 8 +- .../test_human_input_form_filled_event.py | 74 +++++++++++++++++++ .../libs/_human_input/test_form_service.py | 4 +- 16 files changed, 220 insertions(+), 11 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 2cd47d97a8..2ac44f9b4a 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -25,6 +25,7 @@ from core.app.entities.queue_entities import ( QueueAnnotationReplyEvent, QueueErrorEvent, QueueHumanInputFormFilledEvent, + QueueHumanInputFormTimeoutEvent, QueueIterationCompletedEvent, QueueIterationNextEvent, QueueIterationStartEvent, @@ -692,6 +693,14 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): event=event, task_id=self._application_generate_entity.task_id ) + def _handle_human_input_form_timeout_event( + self, event: QueueHumanInputFormTimeoutEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle human input form timeout events.""" + yield self._workflow_response_converter.human_input_form_timeout_to_stream_response( + event=event, task_id=self._application_generate_entity.task_id + ) + def _persist_human_input_extra_content(self, *, node_id: str | None = None, form_id: str | None = None) -> None: if not self._workflow_run_id or not self._message_id: return @@ -774,6 +783,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): QueueAdvancedChatMessageEndEvent: self._handle_advanced_chat_message_end_event, QueueAgentLogEvent: self._handle_agent_log_event, QueueHumanInputFormFilledEvent: self._handle_human_input_form_filled_event, + QueueHumanInputFormTimeoutEvent: self._handle_human_input_form_timeout_event, } def _dispatch_event( diff --git a/api/core/app/apps/common/workflow_response_converter.py b/api/core/app/apps/common/workflow_response_converter.py index d4712fde07..bde3b8e5ee 100644 --- a/api/core/app/apps/common/workflow_response_converter.py +++ b/api/core/app/apps/common/workflow_response_converter.py @@ -9,6 +9,7 @@ from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, from core.app.entities.queue_entities import ( QueueAgentLogEvent, QueueHumanInputFormFilledEvent, + QueueHumanInputFormTimeoutEvent, QueueIterationCompletedEvent, QueueIterationNextEvent, QueueIterationStartEvent, @@ -25,6 +26,7 @@ from core.app.entities.queue_entities import ( from core.app.entities.task_entities import ( AgentLogStreamResponse, HumanInputFormFilledResponse, + HumanInputFormTimeoutResponse, HumanInputRequiredResponse, IterationNodeCompletedStreamResponse, IterationNodeNextStreamResponse, @@ -338,6 +340,20 @@ class WorkflowResponseConverter: ), ) + def human_input_form_timeout_to_stream_response( + self, *, event: QueueHumanInputFormTimeoutEvent, task_id: str + ) -> HumanInputFormTimeoutResponse: + run_id = self._ensure_workflow_run_id() + return HumanInputFormTimeoutResponse( + task_id=task_id, + workflow_run_id=run_id, + data=HumanInputFormTimeoutResponse.Data( + node_id=event.node_id, + node_title=event.node_title, + expiration_time=int(event.expiration_time.timestamp()), + ), + ) + @classmethod def workflow_run_result_to_finish_response( cls, diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 6d8c061cbc..c3296001cc 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -18,6 +18,7 @@ from core.app.entities.queue_entities import ( QueueAgentLogEvent, QueueErrorEvent, QueueHumanInputFormFilledEvent, + QueueHumanInputFormTimeoutEvent, QueueIterationCompletedEvent, QueueIterationNextEvent, QueueIterationStartEvent, @@ -523,6 +524,14 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): event=event, task_id=self._application_generate_entity.task_id ) + def _handle_human_input_form_timeout_event( + self, event: QueueHumanInputFormTimeoutEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle human input form timeout events.""" + yield self._workflow_response_converter.human_input_form_timeout_to_stream_response( + event=event, task_id=self._application_generate_entity.task_id + ) + def _get_event_handlers(self) -> dict[type, Callable]: """Get mapping of event types to their handlers using fluent pattern.""" return { @@ -550,6 +559,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): # Agent events QueueAgentLogEvent: self._handle_agent_log_event, QueueHumanInputFormFilledEvent: self._handle_human_input_form_filled_event, + QueueHumanInputFormTimeoutEvent: self._handle_human_input_form_timeout_event, } def _dispatch_event( diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index f7a4664c61..b09385aad3 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -8,6 +8,7 @@ from core.app.entities.queue_entities import ( AppQueueEvent, QueueAgentLogEvent, QueueHumanInputFormFilledEvent, + QueueHumanInputFormTimeoutEvent, QueueIterationCompletedEvent, QueueIterationNextEvent, QueueIterationStartEvent, @@ -42,6 +43,7 @@ from core.workflow.graph_events import ( NodeRunExceptionEvent, NodeRunFailedEvent, NodeRunHumanInputFormFilledEvent, + NodeRunHumanInputFormTimeoutEvent, NodeRunIterationFailedEvent, NodeRunIterationNextEvent, NodeRunIterationStartedEvent, @@ -393,6 +395,15 @@ class WorkflowBasedAppRunner: action_text=event.action_text, ) ) + elif isinstance(event, NodeRunHumanInputFormTimeoutEvent): + self._publish_event( + QueueHumanInputFormTimeoutEvent( + node_id=event.node_id, + node_type=event.node_type, + node_title=event.node_title, + expiration_time=event.expiration_time, + ) + ) elif isinstance(event, NodeRunRetryEvent): node_run_result = event.node_run_result inputs = node_run_result.inputs diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index b9a6f655ba..5b2fa29b56 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -50,6 +50,7 @@ class QueueEvent(StrEnum): RETRY = "retry" PAUSE = "pause" HUMAN_INPUT_FORM_FILLED = "human_input_form_filled" + HUMAN_INPUT_FORM_TIMEOUT = "human_input_form_timeout" class AppQueueEvent(BaseModel): @@ -506,6 +507,19 @@ class QueueHumanInputFormFilledEvent(AppQueueEvent): action_text: str +class QueueHumanInputFormTimeoutEvent(AppQueueEvent): + """ + QueueHumanInputFormTimeoutEvent entity + """ + + event: QueueEvent = QueueEvent.HUMAN_INPUT_FORM_TIMEOUT + + node_id: str + node_type: NodeType + node_title: str + expiration_time: datetime + + class QueueMessage(BaseModel): """ QueueMessage abstract entity diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 00ab6b4bbc..774a7151fd 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -87,6 +87,7 @@ class StreamEvent(StrEnum): AGENT_LOG = "agent_log" HUMAN_INPUT_REQUIRED = "human_input_required" HUMAN_INPUT_FORM_FILLED = "human_input_form_filled" + HUMAN_INPUT_FORM_TIMEOUT = "human_input_form_timeout" class StreamResponse(BaseModel): @@ -305,6 +306,21 @@ class HumanInputFormFilledResponse(StreamResponse): data: Data +class HumanInputFormTimeoutResponse(StreamResponse): + class Data(BaseModel): + """ + Data entity + """ + + node_id: str + node_title: str + expiration_time: int + + event: StreamEvent = StreamEvent.HUMAN_INPUT_FORM_TIMEOUT + workflow_run_id: str + data: Data + + class NodeStartStreamResponse(StreamResponse): """ NodeStartStreamResponse entity diff --git a/api/core/workflow/graph_events/__init__.py b/api/core/workflow/graph_events/__init__.py index 0ef494ebe6..7bb6346cb7 100644 --- a/api/core/workflow/graph_events/__init__.py +++ b/api/core/workflow/graph_events/__init__.py @@ -39,6 +39,7 @@ from .node import ( NodeRunExceptionEvent, NodeRunFailedEvent, NodeRunHumanInputFormFilledEvent, + NodeRunHumanInputFormTimeoutEvent, NodeRunPauseRequestedEvent, NodeRunRetrieverResourceEvent, NodeRunRetryEvent, @@ -61,6 +62,7 @@ __all__ = [ "NodeRunExceptionEvent", "NodeRunFailedEvent", "NodeRunHumanInputFormFilledEvent", + "NodeRunHumanInputFormTimeoutEvent", "NodeRunIterationFailedEvent", "NodeRunIterationNextEvent", "NodeRunIterationStartedEvent", diff --git a/api/core/workflow/graph_events/node.py b/api/core/workflow/graph_events/node.py index db67731bec..140b4de1da 100644 --- a/api/core/workflow/graph_events/node.py +++ b/api/core/workflow/graph_events/node.py @@ -63,5 +63,12 @@ class NodeRunHumanInputFormFilledEvent(GraphNodeEventBase): action_text: str = Field(..., description="Display text of the chosen action button.") +class NodeRunHumanInputFormTimeoutEvent(GraphNodeEventBase): + """Emitted when a HumanInput form times out.""" + + node_title: str = Field(..., description="HumanInput node title") + expiration_time: datetime = Field(..., description="Form expiration time") + + class NodeRunPauseRequestedEvent(GraphNodeEventBase): reason: PauseReason = Field(..., description="pause reason") diff --git a/api/core/workflow/node_events/__init__.py b/api/core/workflow/node_events/__init__.py index 3cd277fab0..a9bef8f9a2 100644 --- a/api/core/workflow/node_events/__init__.py +++ b/api/core/workflow/node_events/__init__.py @@ -14,6 +14,7 @@ from .loop import ( ) from .node import ( HumanInputFormFilledEvent, + HumanInputFormTimeoutEvent, ModelInvokeCompletedEvent, PauseRequestedEvent, RunRetrieverResourceEvent, @@ -25,6 +26,7 @@ from .node import ( __all__ = [ "AgentLogEvent", "HumanInputFormFilledEvent", + "HumanInputFormTimeoutEvent", "IterationFailedEvent", "IterationNextEvent", "IterationStartedEvent", diff --git a/api/core/workflow/node_events/node.py b/api/core/workflow/node_events/node.py index 15c11e8033..9c76b7d7c2 100644 --- a/api/core/workflow/node_events/node.py +++ b/api/core/workflow/node_events/node.py @@ -56,3 +56,10 @@ class HumanInputFormFilledEvent(NodeEventBase): rendered_content: str action_id: str action_text: str + + +class HumanInputFormTimeoutEvent(NodeEventBase): + """Event emitted when a human input form times out.""" + + node_title: str + expiration_time: datetime diff --git a/api/core/workflow/nodes/base/node.py b/api/core/workflow/nodes/base/node.py index ea014a2f21..a08c6d3ed8 100644 --- a/api/core/workflow/nodes/base/node.py +++ b/api/core/workflow/nodes/base/node.py @@ -17,6 +17,7 @@ from core.workflow.graph_events import ( NodeRunAgentLogEvent, NodeRunFailedEvent, NodeRunHumanInputFormFilledEvent, + NodeRunHumanInputFormTimeoutEvent, NodeRunIterationFailedEvent, NodeRunIterationNextEvent, NodeRunIterationStartedEvent, @@ -34,6 +35,7 @@ from core.workflow.graph_events import ( from core.workflow.node_events import ( AgentLogEvent, HumanInputFormFilledEvent, + HumanInputFormTimeoutEvent, IterationFailedEvent, IterationNextEvent, IterationStartedEvent, @@ -649,6 +651,16 @@ class Node(Generic[NodeDataT]): action_text=event.action_text, ) + @_dispatch.register + def _(self, event: HumanInputFormTimeoutEvent): + return NodeRunHumanInputFormTimeoutEvent( + id=self.execution_id, + node_id=self._node_id, + node_type=self.node_type, + node_title=event.node_title, + expiration_time=event.expiration_time, + ) + @_dispatch.register def _(self, event: LoopStartedEvent) -> NodeRunLoopStartedEvent: return NodeRunLoopStartedEvent( diff --git a/api/core/workflow/nodes/human_input/human_input_node.py b/api/core/workflow/nodes/human_input/human_input_node.py index 1087b63b5a..f5039e9059 100644 --- a/api/core/workflow/nodes/human_input/human_input_node.py +++ b/api/core/workflow/nodes/human_input/human_input_node.py @@ -7,7 +7,12 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.repositories.human_input_reposotiry import HumanInputFormRepositoryImpl from core.workflow.entities.pause_reason import HumanInputRequired from core.workflow.enums import NodeExecutionType, NodeType, WorkflowNodeExecutionStatus -from core.workflow.node_events import HumanInputFormFilledEvent, NodeRunResult, PauseRequestedEvent +from core.workflow.node_events import ( + HumanInputFormFilledEvent, + HumanInputFormTimeoutEvent, + NodeRunResult, + PauseRequestedEvent, +) from core.workflow.node_events.base import NodeEventBase from core.workflow.node_events.node import StreamCompletedEvent from core.workflow.nodes.base.node import Node @@ -245,6 +250,10 @@ class HumanInputNode(Node[HumanInputNodeData]): return if form.status == HumanInputFormStatus.TIMEOUT or form.expiration_time <= naive_utc_now(): + yield HumanInputFormTimeoutEvent( + node_title=self._node_data.title, + expiration_time=form.expiration_time, + ) yield StreamCompletedEvent( node_run_result=NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED, diff --git a/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_human_input.py b/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_human_input.py index 1a51caf0f2..1c36b4d12b 100644 --- a/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_human_input.py +++ b/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_human_input.py @@ -1,8 +1,9 @@ +from datetime import UTC, datetime from types import SimpleNamespace from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter from core.app.entities.app_invoke_entities import InvokeFrom -from core.app.entities.queue_entities import QueueHumanInputFormFilledEvent +from core.app.entities.queue_entities import QueueHumanInputFormFilledEvent, QueueHumanInputFormTimeoutEvent from core.workflow.entities.workflow_start_reason import WorkflowStartReason from core.workflow.runtime import GraphRuntimeState, VariablePool from core.workflow.system_variable import SystemVariable @@ -60,3 +61,27 @@ def test_human_input_form_filled_stream_response_contains_rendered_content(): assert resp.data.node_title == "Human Input" assert resp.data.rendered_content.startswith("# Title") assert resp.data.action_id == "Approve" + + +def test_human_input_form_timeout_stream_response_contains_timeout_metadata(): + converter = _build_converter() + converter.workflow_start_to_stream_response( + task_id="task-1", + workflow_run_id="run-1", + workflow_id="wf-1", + reason=WorkflowStartReason.INITIAL, + ) + + queue_event = QueueHumanInputFormTimeoutEvent( + node_id="node-1", + node_type="human-input", + node_title="Human Input", + expiration_time=datetime(2025, 1, 1, tzinfo=UTC), + ) + + resp = converter.human_input_form_timeout_to_stream_response(event=queue_event, task_id="task-1") + + assert resp.workflow_run_id == "run-1" + assert resp.data.node_id == "node-1" + assert resp.data.node_title == "Human Input" + assert resp.data.expiration_time == 1735689600 diff --git a/api/tests/unit_tests/core/workflow/nodes/human_input/test_entities.py b/api/tests/unit_tests/core/workflow/nodes/human_input/test_entities.py index 56e53312a4..bfe7b03c13 100644 --- a/api/tests/unit_tests/core/workflow/nodes/human_input/test_entities.py +++ b/api/tests/unit_tests/core/workflow/nodes/human_input/test_entities.py @@ -80,9 +80,7 @@ class TestFormInput: """Test text input with constant default value.""" default = FormInputDefault(type=PlaceholderType.CONSTANT, value="Enter your response here...") - form_input = FormInput( - type=FormInputType.TEXT_INPUT, output_variable_name="user_input", default=default - ) + form_input = FormInput(type=FormInputType.TEXT_INPUT, output_variable_name="user_input", default=default) assert form_input.type == FormInputType.TEXT_INPUT assert form_input.output_variable_name == "user_input" @@ -93,9 +91,7 @@ class TestFormInput: """Test text input with variable default value.""" default = FormInputDefault(type=PlaceholderType.VARIABLE, selector=["node_123", "output_var"]) - form_input = FormInput( - type=FormInputType.TEXT_INPUT, output_variable_name="user_input", default=default - ) + form_input = FormInput(type=FormInputType.TEXT_INPUT, output_variable_name="user_input", default=default) assert form_input.default.type == PlaceholderType.VARIABLE assert form_input.default.selector == ["node_123", "output_var"] diff --git a/api/tests/unit_tests/core/workflow/nodes/human_input/test_human_input_form_filled_event.py b/api/tests/unit_tests/core/workflow/nodes/human_input/test_human_input_form_filled_event.py index 41a02810d7..c7070a39a4 100644 --- a/api/tests/unit_tests/core/workflow/nodes/human_input/test_human_input_form_filled_event.py +++ b/api/tests/unit_tests/core/workflow/nodes/human_input/test_human_input_form_filled_event.py @@ -7,6 +7,7 @@ from core.workflow.entities.graph_init_params import GraphInitParams from core.workflow.enums import NodeType from core.workflow.graph_events import ( NodeRunHumanInputFormFilledEvent, + NodeRunHumanInputFormTimeoutEvent, NodeRunStartedEvent, ) from core.workflow.nodes.human_input.enums import HumanInputFormStatus @@ -86,6 +87,67 @@ def _build_node(form_content: str = "Please enter your name:\n\n{{#$output.name# ) +def _build_timeout_node() -> HumanInputNode: + system_variables = SystemVariable.empty() + system_variables.workflow_execution_id = str(uuid.uuid4()) + graph_runtime_state = GraphRuntimeState( + variable_pool=VariablePool(system_variables=system_variables, user_inputs={}, environment_variables=[]), + start_at=0.0, + ) + graph_init_params = GraphInitParams( + tenant_id="tenant", + app_id="app", + workflow_id="workflow", + graph_config={"nodes": [], "edges": []}, + user_id="user", + user_from=UserFrom.ACCOUNT, + invoke_from=InvokeFrom.SERVICE_API, + call_depth=0, + ) + + config = { + "id": "node-1", + "type": NodeType.HUMAN_INPUT.value, + "data": { + "title": "Human Input", + "form_content": "Please enter your name:\n\n{{#$output.name#}}", + "inputs": [ + { + "type": "text_input", + "output_variable_name": "name", + "default": {"type": "constant", "value": ""}, + } + ], + "user_actions": [ + { + "id": "Accept", + "title": "Approve", + "button_style": "default", + } + ], + }, + } + + fake_form = SimpleNamespace( + id="form-1", + rendered_content="content", + submitted=False, + selected_action_id=None, + submitted_data=None, + status=HumanInputFormStatus.TIMEOUT, + expiration_time=naive_utc_now() - datetime.timedelta(minutes=1), + ) + + repo = _FakeFormRepository(fake_form) + return HumanInputNode( + id="node-1", + config=config, + graph_init_params=graph_init_params, + graph_runtime_state=graph_runtime_state, + form_repository=repo, + ) + + def test_human_input_node_emits_form_filled_event_before_succeeded(): node = _build_node() @@ -99,3 +161,15 @@ def test_human_input_node_emits_form_filled_event_before_succeeded(): assert filled_event.rendered_content.endswith("Alice") assert filled_event.action_id == "Accept" assert filled_event.action_text == "Approve" + + +def test_human_input_node_emits_timeout_event_before_succeeded(): + node = _build_timeout_node() + + events = list(node.run()) + + assert isinstance(events[0], NodeRunStartedEvent) + assert isinstance(events[1], NodeRunHumanInputFormTimeoutEvent) + + timeout_event = events[1] + assert timeout_event.node_title == "Human Input" diff --git a/api/tests/unit_tests/libs/_human_input/test_form_service.py b/api/tests/unit_tests/libs/_human_input/test_form_service.py index d5ff8db2d9..15e7d41e85 100644 --- a/api/tests/unit_tests/libs/_human_input/test_form_service.py +++ b/api/tests/unit_tests/libs/_human_input/test_form_service.py @@ -304,9 +304,7 @@ class TestFormValidation: "tenant_id": "tenant-abc", "app_id": "app-def", "form_content": "Test form", - "inputs": [ - FormInput(type=FormInputType.TEXT_INPUT, output_variable_name="required_input", default=None) - ], + "inputs": [FormInput(type=FormInputType.TEXT_INPUT, output_variable_name="required_input", default=None)], "user_actions": [UserAction(id="submit", title="Submit")], "timeout": 1, "timeout_unit": TimeoutUnit.HOUR,