diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 2cd47d97a8..2ac44f9b4a 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -25,6 +25,7 @@ from core.app.entities.queue_entities import ( QueueAnnotationReplyEvent, QueueErrorEvent, QueueHumanInputFormFilledEvent, + QueueHumanInputFormTimeoutEvent, QueueIterationCompletedEvent, QueueIterationNextEvent, QueueIterationStartEvent, @@ -692,6 +693,14 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): event=event, task_id=self._application_generate_entity.task_id ) + def _handle_human_input_form_timeout_event( + self, event: QueueHumanInputFormTimeoutEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle human input form timeout events.""" + yield self._workflow_response_converter.human_input_form_timeout_to_stream_response( + event=event, task_id=self._application_generate_entity.task_id + ) + def _persist_human_input_extra_content(self, *, node_id: str | None = None, form_id: str | None = None) -> None: if not self._workflow_run_id or not self._message_id: return @@ -774,6 +783,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): QueueAdvancedChatMessageEndEvent: self._handle_advanced_chat_message_end_event, QueueAgentLogEvent: self._handle_agent_log_event, QueueHumanInputFormFilledEvent: self._handle_human_input_form_filled_event, + QueueHumanInputFormTimeoutEvent: self._handle_human_input_form_timeout_event, } def _dispatch_event( diff --git a/api/core/app/apps/common/workflow_response_converter.py b/api/core/app/apps/common/workflow_response_converter.py index d4712fde07..bde3b8e5ee 100644 --- a/api/core/app/apps/common/workflow_response_converter.py +++ b/api/core/app/apps/common/workflow_response_converter.py @@ -9,6 +9,7 @@ from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, from core.app.entities.queue_entities import ( QueueAgentLogEvent, QueueHumanInputFormFilledEvent, + QueueHumanInputFormTimeoutEvent, QueueIterationCompletedEvent, QueueIterationNextEvent, QueueIterationStartEvent, @@ -25,6 +26,7 @@ from core.app.entities.queue_entities import ( from core.app.entities.task_entities import ( AgentLogStreamResponse, HumanInputFormFilledResponse, + HumanInputFormTimeoutResponse, HumanInputRequiredResponse, IterationNodeCompletedStreamResponse, IterationNodeNextStreamResponse, @@ -338,6 +340,20 @@ class WorkflowResponseConverter: ), ) + def human_input_form_timeout_to_stream_response( + self, *, event: QueueHumanInputFormTimeoutEvent, task_id: str + ) -> HumanInputFormTimeoutResponse: + run_id = self._ensure_workflow_run_id() + return HumanInputFormTimeoutResponse( + task_id=task_id, + workflow_run_id=run_id, + data=HumanInputFormTimeoutResponse.Data( + node_id=event.node_id, + node_title=event.node_title, + expiration_time=int(event.expiration_time.timestamp()), + ), + ) + @classmethod def workflow_run_result_to_finish_response( cls, diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 6d8c061cbc..c3296001cc 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -18,6 +18,7 @@ from core.app.entities.queue_entities import ( QueueAgentLogEvent, QueueErrorEvent, QueueHumanInputFormFilledEvent, + QueueHumanInputFormTimeoutEvent, QueueIterationCompletedEvent, QueueIterationNextEvent, QueueIterationStartEvent, @@ -523,6 +524,14 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): event=event, task_id=self._application_generate_entity.task_id ) + def _handle_human_input_form_timeout_event( + self, event: QueueHumanInputFormTimeoutEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle human input form timeout events.""" + yield self._workflow_response_converter.human_input_form_timeout_to_stream_response( + event=event, task_id=self._application_generate_entity.task_id + ) + def _get_event_handlers(self) -> dict[type, Callable]: """Get mapping of event types to their handlers using fluent pattern.""" return { @@ -550,6 +559,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): # Agent events QueueAgentLogEvent: self._handle_agent_log_event, QueueHumanInputFormFilledEvent: self._handle_human_input_form_filled_event, + QueueHumanInputFormTimeoutEvent: self._handle_human_input_form_timeout_event, } def _dispatch_event( diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index f7a4664c61..b09385aad3 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -8,6 +8,7 @@ from core.app.entities.queue_entities import ( AppQueueEvent, QueueAgentLogEvent, QueueHumanInputFormFilledEvent, + QueueHumanInputFormTimeoutEvent, QueueIterationCompletedEvent, QueueIterationNextEvent, QueueIterationStartEvent, @@ -42,6 +43,7 @@ from core.workflow.graph_events import ( NodeRunExceptionEvent, NodeRunFailedEvent, NodeRunHumanInputFormFilledEvent, + NodeRunHumanInputFormTimeoutEvent, NodeRunIterationFailedEvent, NodeRunIterationNextEvent, NodeRunIterationStartedEvent, @@ -393,6 +395,15 @@ class WorkflowBasedAppRunner: action_text=event.action_text, ) ) + elif isinstance(event, NodeRunHumanInputFormTimeoutEvent): + self._publish_event( + QueueHumanInputFormTimeoutEvent( + node_id=event.node_id, + node_type=event.node_type, + node_title=event.node_title, + expiration_time=event.expiration_time, + ) + ) elif isinstance(event, NodeRunRetryEvent): node_run_result = event.node_run_result inputs = node_run_result.inputs diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index b9a6f655ba..5b2fa29b56 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -50,6 +50,7 @@ class QueueEvent(StrEnum): RETRY = "retry" PAUSE = "pause" HUMAN_INPUT_FORM_FILLED = "human_input_form_filled" + HUMAN_INPUT_FORM_TIMEOUT = "human_input_form_timeout" class AppQueueEvent(BaseModel): @@ -506,6 +507,19 @@ class QueueHumanInputFormFilledEvent(AppQueueEvent): action_text: str +class QueueHumanInputFormTimeoutEvent(AppQueueEvent): + """ + QueueHumanInputFormTimeoutEvent entity + """ + + event: QueueEvent = QueueEvent.HUMAN_INPUT_FORM_TIMEOUT + + node_id: str + node_type: NodeType + node_title: str + expiration_time: datetime + + class QueueMessage(BaseModel): """ QueueMessage abstract entity diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 00ab6b4bbc..774a7151fd 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -87,6 +87,7 @@ class StreamEvent(StrEnum): AGENT_LOG = "agent_log" HUMAN_INPUT_REQUIRED = "human_input_required" HUMAN_INPUT_FORM_FILLED = "human_input_form_filled" + HUMAN_INPUT_FORM_TIMEOUT = "human_input_form_timeout" class StreamResponse(BaseModel): @@ -305,6 +306,21 @@ class HumanInputFormFilledResponse(StreamResponse): data: Data +class HumanInputFormTimeoutResponse(StreamResponse): + class Data(BaseModel): + """ + Data entity + """ + + node_id: str + node_title: str + expiration_time: int + + event: StreamEvent = StreamEvent.HUMAN_INPUT_FORM_TIMEOUT + workflow_run_id: str + data: Data + + class NodeStartStreamResponse(StreamResponse): """ NodeStartStreamResponse entity diff --git a/api/core/workflow/graph_events/__init__.py b/api/core/workflow/graph_events/__init__.py index 0ef494ebe6..7bb6346cb7 100644 --- a/api/core/workflow/graph_events/__init__.py +++ b/api/core/workflow/graph_events/__init__.py @@ -39,6 +39,7 @@ from .node import ( NodeRunExceptionEvent, NodeRunFailedEvent, NodeRunHumanInputFormFilledEvent, + NodeRunHumanInputFormTimeoutEvent, NodeRunPauseRequestedEvent, NodeRunRetrieverResourceEvent, NodeRunRetryEvent, @@ -61,6 +62,7 @@ __all__ = [ "NodeRunExceptionEvent", "NodeRunFailedEvent", "NodeRunHumanInputFormFilledEvent", + "NodeRunHumanInputFormTimeoutEvent", "NodeRunIterationFailedEvent", "NodeRunIterationNextEvent", "NodeRunIterationStartedEvent", diff --git a/api/core/workflow/graph_events/node.py b/api/core/workflow/graph_events/node.py index db67731bec..140b4de1da 100644 --- a/api/core/workflow/graph_events/node.py +++ b/api/core/workflow/graph_events/node.py @@ -63,5 +63,12 @@ class NodeRunHumanInputFormFilledEvent(GraphNodeEventBase): action_text: str = Field(..., description="Display text of the chosen action button.") +class NodeRunHumanInputFormTimeoutEvent(GraphNodeEventBase): + """Emitted when a HumanInput form times out.""" + + node_title: str = Field(..., description="HumanInput node title") + expiration_time: datetime = Field(..., description="Form expiration time") + + class NodeRunPauseRequestedEvent(GraphNodeEventBase): reason: PauseReason = Field(..., description="pause reason") diff --git a/api/core/workflow/node_events/__init__.py b/api/core/workflow/node_events/__init__.py index 3cd277fab0..a9bef8f9a2 100644 --- a/api/core/workflow/node_events/__init__.py +++ b/api/core/workflow/node_events/__init__.py @@ -14,6 +14,7 @@ from .loop import ( ) from .node import ( HumanInputFormFilledEvent, + HumanInputFormTimeoutEvent, ModelInvokeCompletedEvent, PauseRequestedEvent, RunRetrieverResourceEvent, @@ -25,6 +26,7 @@ from .node import ( __all__ = [ "AgentLogEvent", "HumanInputFormFilledEvent", + "HumanInputFormTimeoutEvent", "IterationFailedEvent", "IterationNextEvent", "IterationStartedEvent", diff --git a/api/core/workflow/node_events/node.py b/api/core/workflow/node_events/node.py index 15c11e8033..9c76b7d7c2 100644 --- a/api/core/workflow/node_events/node.py +++ b/api/core/workflow/node_events/node.py @@ -56,3 +56,10 @@ class HumanInputFormFilledEvent(NodeEventBase): rendered_content: str action_id: str action_text: str + + +class HumanInputFormTimeoutEvent(NodeEventBase): + """Event emitted when a human input form times out.""" + + node_title: str + expiration_time: datetime diff --git a/api/core/workflow/nodes/base/node.py b/api/core/workflow/nodes/base/node.py index ea014a2f21..a08c6d3ed8 100644 --- a/api/core/workflow/nodes/base/node.py +++ b/api/core/workflow/nodes/base/node.py @@ -17,6 +17,7 @@ from core.workflow.graph_events import ( NodeRunAgentLogEvent, NodeRunFailedEvent, NodeRunHumanInputFormFilledEvent, + NodeRunHumanInputFormTimeoutEvent, NodeRunIterationFailedEvent, NodeRunIterationNextEvent, NodeRunIterationStartedEvent, @@ -34,6 +35,7 @@ from core.workflow.graph_events import ( from core.workflow.node_events import ( AgentLogEvent, HumanInputFormFilledEvent, + HumanInputFormTimeoutEvent, IterationFailedEvent, IterationNextEvent, IterationStartedEvent, @@ -649,6 +651,16 @@ class Node(Generic[NodeDataT]): action_text=event.action_text, ) + @_dispatch.register + def _(self, event: HumanInputFormTimeoutEvent): + return NodeRunHumanInputFormTimeoutEvent( + id=self.execution_id, + node_id=self._node_id, + node_type=self.node_type, + node_title=event.node_title, + expiration_time=event.expiration_time, + ) + @_dispatch.register def _(self, event: LoopStartedEvent) -> NodeRunLoopStartedEvent: return NodeRunLoopStartedEvent( diff --git a/api/core/workflow/nodes/human_input/human_input_node.py b/api/core/workflow/nodes/human_input/human_input_node.py index 1087b63b5a..f5039e9059 100644 --- a/api/core/workflow/nodes/human_input/human_input_node.py +++ b/api/core/workflow/nodes/human_input/human_input_node.py @@ -7,7 +7,12 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.repositories.human_input_reposotiry import HumanInputFormRepositoryImpl from core.workflow.entities.pause_reason import HumanInputRequired from core.workflow.enums import NodeExecutionType, NodeType, WorkflowNodeExecutionStatus -from core.workflow.node_events import HumanInputFormFilledEvent, NodeRunResult, PauseRequestedEvent +from core.workflow.node_events import ( + HumanInputFormFilledEvent, + HumanInputFormTimeoutEvent, + NodeRunResult, + PauseRequestedEvent, +) from core.workflow.node_events.base import NodeEventBase from core.workflow.node_events.node import StreamCompletedEvent from core.workflow.nodes.base.node import Node @@ -245,6 +250,10 @@ class HumanInputNode(Node[HumanInputNodeData]): return if form.status == HumanInputFormStatus.TIMEOUT or form.expiration_time <= naive_utc_now(): + yield HumanInputFormTimeoutEvent( + node_title=self._node_data.title, + expiration_time=form.expiration_time, + ) yield StreamCompletedEvent( node_run_result=NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED, diff --git a/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_human_input.py b/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_human_input.py index 1a51caf0f2..1c36b4d12b 100644 --- a/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_human_input.py +++ b/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_human_input.py @@ -1,8 +1,9 @@ +from datetime import UTC, datetime from types import SimpleNamespace from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter from core.app.entities.app_invoke_entities import InvokeFrom -from core.app.entities.queue_entities import QueueHumanInputFormFilledEvent +from core.app.entities.queue_entities import QueueHumanInputFormFilledEvent, QueueHumanInputFormTimeoutEvent from core.workflow.entities.workflow_start_reason import WorkflowStartReason from core.workflow.runtime import GraphRuntimeState, VariablePool from core.workflow.system_variable import SystemVariable @@ -60,3 +61,27 @@ def test_human_input_form_filled_stream_response_contains_rendered_content(): assert resp.data.node_title == "Human Input" assert resp.data.rendered_content.startswith("# Title") assert resp.data.action_id == "Approve" + + +def test_human_input_form_timeout_stream_response_contains_timeout_metadata(): + converter = _build_converter() + converter.workflow_start_to_stream_response( + task_id="task-1", + workflow_run_id="run-1", + workflow_id="wf-1", + reason=WorkflowStartReason.INITIAL, + ) + + queue_event = QueueHumanInputFormTimeoutEvent( + node_id="node-1", + node_type="human-input", + node_title="Human Input", + expiration_time=datetime(2025, 1, 1, tzinfo=UTC), + ) + + resp = converter.human_input_form_timeout_to_stream_response(event=queue_event, task_id="task-1") + + assert resp.workflow_run_id == "run-1" + assert resp.data.node_id == "node-1" + assert resp.data.node_title == "Human Input" + assert resp.data.expiration_time == 1735689600 diff --git a/api/tests/unit_tests/core/workflow/nodes/human_input/test_entities.py b/api/tests/unit_tests/core/workflow/nodes/human_input/test_entities.py index 56e53312a4..bfe7b03c13 100644 --- a/api/tests/unit_tests/core/workflow/nodes/human_input/test_entities.py +++ b/api/tests/unit_tests/core/workflow/nodes/human_input/test_entities.py @@ -80,9 +80,7 @@ class TestFormInput: """Test text input with constant default value.""" default = FormInputDefault(type=PlaceholderType.CONSTANT, value="Enter your response here...") - form_input = FormInput( - type=FormInputType.TEXT_INPUT, output_variable_name="user_input", default=default - ) + form_input = FormInput(type=FormInputType.TEXT_INPUT, output_variable_name="user_input", default=default) assert form_input.type == FormInputType.TEXT_INPUT assert form_input.output_variable_name == "user_input" @@ -93,9 +91,7 @@ class TestFormInput: """Test text input with variable default value.""" default = FormInputDefault(type=PlaceholderType.VARIABLE, selector=["node_123", "output_var"]) - form_input = FormInput( - type=FormInputType.TEXT_INPUT, output_variable_name="user_input", default=default - ) + form_input = FormInput(type=FormInputType.TEXT_INPUT, output_variable_name="user_input", default=default) assert form_input.default.type == PlaceholderType.VARIABLE assert form_input.default.selector == ["node_123", "output_var"] diff --git a/api/tests/unit_tests/core/workflow/nodes/human_input/test_human_input_form_filled_event.py b/api/tests/unit_tests/core/workflow/nodes/human_input/test_human_input_form_filled_event.py index 41a02810d7..c7070a39a4 100644 --- a/api/tests/unit_tests/core/workflow/nodes/human_input/test_human_input_form_filled_event.py +++ b/api/tests/unit_tests/core/workflow/nodes/human_input/test_human_input_form_filled_event.py @@ -7,6 +7,7 @@ from core.workflow.entities.graph_init_params import GraphInitParams from core.workflow.enums import NodeType from core.workflow.graph_events import ( NodeRunHumanInputFormFilledEvent, + NodeRunHumanInputFormTimeoutEvent, NodeRunStartedEvent, ) from core.workflow.nodes.human_input.enums import HumanInputFormStatus @@ -86,6 +87,67 @@ def _build_node(form_content: str = "Please enter your name:\n\n{{#$output.name# ) +def _build_timeout_node() -> HumanInputNode: + system_variables = SystemVariable.empty() + system_variables.workflow_execution_id = str(uuid.uuid4()) + graph_runtime_state = GraphRuntimeState( + variable_pool=VariablePool(system_variables=system_variables, user_inputs={}, environment_variables=[]), + start_at=0.0, + ) + graph_init_params = GraphInitParams( + tenant_id="tenant", + app_id="app", + workflow_id="workflow", + graph_config={"nodes": [], "edges": []}, + user_id="user", + user_from=UserFrom.ACCOUNT, + invoke_from=InvokeFrom.SERVICE_API, + call_depth=0, + ) + + config = { + "id": "node-1", + "type": NodeType.HUMAN_INPUT.value, + "data": { + "title": "Human Input", + "form_content": "Please enter your name:\n\n{{#$output.name#}}", + "inputs": [ + { + "type": "text_input", + "output_variable_name": "name", + "default": {"type": "constant", "value": ""}, + } + ], + "user_actions": [ + { + "id": "Accept", + "title": "Approve", + "button_style": "default", + } + ], + }, + } + + fake_form = SimpleNamespace( + id="form-1", + rendered_content="content", + submitted=False, + selected_action_id=None, + submitted_data=None, + status=HumanInputFormStatus.TIMEOUT, + expiration_time=naive_utc_now() - datetime.timedelta(minutes=1), + ) + + repo = _FakeFormRepository(fake_form) + return HumanInputNode( + id="node-1", + config=config, + graph_init_params=graph_init_params, + graph_runtime_state=graph_runtime_state, + form_repository=repo, + ) + + def test_human_input_node_emits_form_filled_event_before_succeeded(): node = _build_node() @@ -99,3 +161,15 @@ def test_human_input_node_emits_form_filled_event_before_succeeded(): assert filled_event.rendered_content.endswith("Alice") assert filled_event.action_id == "Accept" assert filled_event.action_text == "Approve" + + +def test_human_input_node_emits_timeout_event_before_succeeded(): + node = _build_timeout_node() + + events = list(node.run()) + + assert isinstance(events[0], NodeRunStartedEvent) + assert isinstance(events[1], NodeRunHumanInputFormTimeoutEvent) + + timeout_event = events[1] + assert timeout_event.node_title == "Human Input" diff --git a/api/tests/unit_tests/libs/_human_input/test_form_service.py b/api/tests/unit_tests/libs/_human_input/test_form_service.py index d5ff8db2d9..15e7d41e85 100644 --- a/api/tests/unit_tests/libs/_human_input/test_form_service.py +++ b/api/tests/unit_tests/libs/_human_input/test_form_service.py @@ -304,9 +304,7 @@ class TestFormValidation: "tenant_id": "tenant-abc", "app_id": "app-def", "form_content": "Test form", - "inputs": [ - FormInput(type=FormInputType.TEXT_INPUT, output_variable_name="required_input", default=None) - ], + "inputs": [FormInput(type=FormInputType.TEXT_INPUT, output_variable_name="required_input", default=None)], "user_actions": [UserAction(id="submit", title="Submit")], "timeout": 1, "timeout_unit": TimeoutUnit.HOUR,