Read the current implementation and consider the following problem (vibe-kanban 25d8b973)

Currently, the HumanInput node yields a `HumanInputFormFilledEvent` event while form is submmited. However, for form level timeout, current no event about timeout is emitted. This makes the frontend UI not updated while the events of time out are sent to the frontend.

Analysis this problem, propose a way to resolve this issue.
This commit is contained in:
QuantumGhost
2026-01-27 08:46:41 +08:00
parent 9bbe63c1d8
commit 116ec9dd04
16 changed files with 220 additions and 11 deletions

View File

@ -25,6 +25,7 @@ from core.app.entities.queue_entities import (
QueueAnnotationReplyEvent,
QueueErrorEvent,
QueueHumanInputFormFilledEvent,
QueueHumanInputFormTimeoutEvent,
QueueIterationCompletedEvent,
QueueIterationNextEvent,
QueueIterationStartEvent,
@ -692,6 +693,14 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
event=event, task_id=self._application_generate_entity.task_id
)
def _handle_human_input_form_timeout_event(
self, event: QueueHumanInputFormTimeoutEvent, **kwargs
) -> Generator[StreamResponse, None, None]:
"""Handle human input form timeout events."""
yield self._workflow_response_converter.human_input_form_timeout_to_stream_response(
event=event, task_id=self._application_generate_entity.task_id
)
def _persist_human_input_extra_content(self, *, node_id: str | None = None, form_id: str | None = None) -> None:
if not self._workflow_run_id or not self._message_id:
return
@ -774,6 +783,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
QueueAdvancedChatMessageEndEvent: self._handle_advanced_chat_message_end_event,
QueueAgentLogEvent: self._handle_agent_log_event,
QueueHumanInputFormFilledEvent: self._handle_human_input_form_filled_event,
QueueHumanInputFormTimeoutEvent: self._handle_human_input_form_timeout_event,
}
def _dispatch_event(

View File

@ -9,6 +9,7 @@ from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity,
from core.app.entities.queue_entities import (
QueueAgentLogEvent,
QueueHumanInputFormFilledEvent,
QueueHumanInputFormTimeoutEvent,
QueueIterationCompletedEvent,
QueueIterationNextEvent,
QueueIterationStartEvent,
@ -25,6 +26,7 @@ from core.app.entities.queue_entities import (
from core.app.entities.task_entities import (
AgentLogStreamResponse,
HumanInputFormFilledResponse,
HumanInputFormTimeoutResponse,
HumanInputRequiredResponse,
IterationNodeCompletedStreamResponse,
IterationNodeNextStreamResponse,
@ -338,6 +340,20 @@ class WorkflowResponseConverter:
),
)
def human_input_form_timeout_to_stream_response(
self, *, event: QueueHumanInputFormTimeoutEvent, task_id: str
) -> HumanInputFormTimeoutResponse:
run_id = self._ensure_workflow_run_id()
return HumanInputFormTimeoutResponse(
task_id=task_id,
workflow_run_id=run_id,
data=HumanInputFormTimeoutResponse.Data(
node_id=event.node_id,
node_title=event.node_title,
expiration_time=int(event.expiration_time.timestamp()),
),
)
@classmethod
def workflow_run_result_to_finish_response(
cls,

View File

@ -18,6 +18,7 @@ from core.app.entities.queue_entities import (
QueueAgentLogEvent,
QueueErrorEvent,
QueueHumanInputFormFilledEvent,
QueueHumanInputFormTimeoutEvent,
QueueIterationCompletedEvent,
QueueIterationNextEvent,
QueueIterationStartEvent,
@ -523,6 +524,14 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
event=event, task_id=self._application_generate_entity.task_id
)
def _handle_human_input_form_timeout_event(
self, event: QueueHumanInputFormTimeoutEvent, **kwargs
) -> Generator[StreamResponse, None, None]:
"""Handle human input form timeout events."""
yield self._workflow_response_converter.human_input_form_timeout_to_stream_response(
event=event, task_id=self._application_generate_entity.task_id
)
def _get_event_handlers(self) -> dict[type, Callable]:
"""Get mapping of event types to their handlers using fluent pattern."""
return {
@ -550,6 +559,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
# Agent events
QueueAgentLogEvent: self._handle_agent_log_event,
QueueHumanInputFormFilledEvent: self._handle_human_input_form_filled_event,
QueueHumanInputFormTimeoutEvent: self._handle_human_input_form_timeout_event,
}
def _dispatch_event(

View File

@ -8,6 +8,7 @@ from core.app.entities.queue_entities import (
AppQueueEvent,
QueueAgentLogEvent,
QueueHumanInputFormFilledEvent,
QueueHumanInputFormTimeoutEvent,
QueueIterationCompletedEvent,
QueueIterationNextEvent,
QueueIterationStartEvent,
@ -42,6 +43,7 @@ from core.workflow.graph_events import (
NodeRunExceptionEvent,
NodeRunFailedEvent,
NodeRunHumanInputFormFilledEvent,
NodeRunHumanInputFormTimeoutEvent,
NodeRunIterationFailedEvent,
NodeRunIterationNextEvent,
NodeRunIterationStartedEvent,
@ -393,6 +395,15 @@ class WorkflowBasedAppRunner:
action_text=event.action_text,
)
)
elif isinstance(event, NodeRunHumanInputFormTimeoutEvent):
self._publish_event(
QueueHumanInputFormTimeoutEvent(
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_title,
expiration_time=event.expiration_time,
)
)
elif isinstance(event, NodeRunRetryEvent):
node_run_result = event.node_run_result
inputs = node_run_result.inputs

View File

@ -50,6 +50,7 @@ class QueueEvent(StrEnum):
RETRY = "retry"
PAUSE = "pause"
HUMAN_INPUT_FORM_FILLED = "human_input_form_filled"
HUMAN_INPUT_FORM_TIMEOUT = "human_input_form_timeout"
class AppQueueEvent(BaseModel):
@ -506,6 +507,19 @@ class QueueHumanInputFormFilledEvent(AppQueueEvent):
action_text: str
class QueueHumanInputFormTimeoutEvent(AppQueueEvent):
"""
QueueHumanInputFormTimeoutEvent entity
"""
event: QueueEvent = QueueEvent.HUMAN_INPUT_FORM_TIMEOUT
node_id: str
node_type: NodeType
node_title: str
expiration_time: datetime
class QueueMessage(BaseModel):
"""
QueueMessage abstract entity

View File

@ -87,6 +87,7 @@ class StreamEvent(StrEnum):
AGENT_LOG = "agent_log"
HUMAN_INPUT_REQUIRED = "human_input_required"
HUMAN_INPUT_FORM_FILLED = "human_input_form_filled"
HUMAN_INPUT_FORM_TIMEOUT = "human_input_form_timeout"
class StreamResponse(BaseModel):
@ -305,6 +306,21 @@ class HumanInputFormFilledResponse(StreamResponse):
data: Data
class HumanInputFormTimeoutResponse(StreamResponse):
class Data(BaseModel):
"""
Data entity
"""
node_id: str
node_title: str
expiration_time: int
event: StreamEvent = StreamEvent.HUMAN_INPUT_FORM_TIMEOUT
workflow_run_id: str
data: Data
class NodeStartStreamResponse(StreamResponse):
"""
NodeStartStreamResponse entity

View File

@ -39,6 +39,7 @@ from .node import (
NodeRunExceptionEvent,
NodeRunFailedEvent,
NodeRunHumanInputFormFilledEvent,
NodeRunHumanInputFormTimeoutEvent,
NodeRunPauseRequestedEvent,
NodeRunRetrieverResourceEvent,
NodeRunRetryEvent,
@ -61,6 +62,7 @@ __all__ = [
"NodeRunExceptionEvent",
"NodeRunFailedEvent",
"NodeRunHumanInputFormFilledEvent",
"NodeRunHumanInputFormTimeoutEvent",
"NodeRunIterationFailedEvent",
"NodeRunIterationNextEvent",
"NodeRunIterationStartedEvent",

View File

@ -63,5 +63,12 @@ class NodeRunHumanInputFormFilledEvent(GraphNodeEventBase):
action_text: str = Field(..., description="Display text of the chosen action button.")
class NodeRunHumanInputFormTimeoutEvent(GraphNodeEventBase):
"""Emitted when a HumanInput form times out."""
node_title: str = Field(..., description="HumanInput node title")
expiration_time: datetime = Field(..., description="Form expiration time")
class NodeRunPauseRequestedEvent(GraphNodeEventBase):
reason: PauseReason = Field(..., description="pause reason")

View File

@ -14,6 +14,7 @@ from .loop import (
)
from .node import (
HumanInputFormFilledEvent,
HumanInputFormTimeoutEvent,
ModelInvokeCompletedEvent,
PauseRequestedEvent,
RunRetrieverResourceEvent,
@ -25,6 +26,7 @@ from .node import (
__all__ = [
"AgentLogEvent",
"HumanInputFormFilledEvent",
"HumanInputFormTimeoutEvent",
"IterationFailedEvent",
"IterationNextEvent",
"IterationStartedEvent",

View File

@ -56,3 +56,10 @@ class HumanInputFormFilledEvent(NodeEventBase):
rendered_content: str
action_id: str
action_text: str
class HumanInputFormTimeoutEvent(NodeEventBase):
"""Event emitted when a human input form times out."""
node_title: str
expiration_time: datetime

View File

@ -17,6 +17,7 @@ from core.workflow.graph_events import (
NodeRunAgentLogEvent,
NodeRunFailedEvent,
NodeRunHumanInputFormFilledEvent,
NodeRunHumanInputFormTimeoutEvent,
NodeRunIterationFailedEvent,
NodeRunIterationNextEvent,
NodeRunIterationStartedEvent,
@ -34,6 +35,7 @@ from core.workflow.graph_events import (
from core.workflow.node_events import (
AgentLogEvent,
HumanInputFormFilledEvent,
HumanInputFormTimeoutEvent,
IterationFailedEvent,
IterationNextEvent,
IterationStartedEvent,
@ -649,6 +651,16 @@ class Node(Generic[NodeDataT]):
action_text=event.action_text,
)
@_dispatch.register
def _(self, event: HumanInputFormTimeoutEvent):
return NodeRunHumanInputFormTimeoutEvent(
id=self.execution_id,
node_id=self._node_id,
node_type=self.node_type,
node_title=event.node_title,
expiration_time=event.expiration_time,
)
@_dispatch.register
def _(self, event: LoopStartedEvent) -> NodeRunLoopStartedEvent:
return NodeRunLoopStartedEvent(

View File

@ -7,7 +7,12 @@ from core.app.entities.app_invoke_entities import InvokeFrom
from core.repositories.human_input_reposotiry import HumanInputFormRepositoryImpl
from core.workflow.entities.pause_reason import HumanInputRequired
from core.workflow.enums import NodeExecutionType, NodeType, WorkflowNodeExecutionStatus
from core.workflow.node_events import HumanInputFormFilledEvent, NodeRunResult, PauseRequestedEvent
from core.workflow.node_events import (
HumanInputFormFilledEvent,
HumanInputFormTimeoutEvent,
NodeRunResult,
PauseRequestedEvent,
)
from core.workflow.node_events.base import NodeEventBase
from core.workflow.node_events.node import StreamCompletedEvent
from core.workflow.nodes.base.node import Node
@ -245,6 +250,10 @@ class HumanInputNode(Node[HumanInputNodeData]):
return
if form.status == HumanInputFormStatus.TIMEOUT or form.expiration_time <= naive_utc_now():
yield HumanInputFormTimeoutEvent(
node_title=self._node_data.title,
expiration_time=form.expiration_time,
)
yield StreamCompletedEvent(
node_run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,

View File

@ -1,8 +1,9 @@
from datetime import UTC, datetime
from types import SimpleNamespace
from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.queue_entities import QueueHumanInputFormFilledEvent
from core.app.entities.queue_entities import QueueHumanInputFormFilledEvent, QueueHumanInputFormTimeoutEvent
from core.workflow.entities.workflow_start_reason import WorkflowStartReason
from core.workflow.runtime import GraphRuntimeState, VariablePool
from core.workflow.system_variable import SystemVariable
@ -60,3 +61,27 @@ def test_human_input_form_filled_stream_response_contains_rendered_content():
assert resp.data.node_title == "Human Input"
assert resp.data.rendered_content.startswith("# Title")
assert resp.data.action_id == "Approve"
def test_human_input_form_timeout_stream_response_contains_timeout_metadata():
converter = _build_converter()
converter.workflow_start_to_stream_response(
task_id="task-1",
workflow_run_id="run-1",
workflow_id="wf-1",
reason=WorkflowStartReason.INITIAL,
)
queue_event = QueueHumanInputFormTimeoutEvent(
node_id="node-1",
node_type="human-input",
node_title="Human Input",
expiration_time=datetime(2025, 1, 1, tzinfo=UTC),
)
resp = converter.human_input_form_timeout_to_stream_response(event=queue_event, task_id="task-1")
assert resp.workflow_run_id == "run-1"
assert resp.data.node_id == "node-1"
assert resp.data.node_title == "Human Input"
assert resp.data.expiration_time == 1735689600

View File

@ -80,9 +80,7 @@ class TestFormInput:
"""Test text input with constant default value."""
default = FormInputDefault(type=PlaceholderType.CONSTANT, value="Enter your response here...")
form_input = FormInput(
type=FormInputType.TEXT_INPUT, output_variable_name="user_input", default=default
)
form_input = FormInput(type=FormInputType.TEXT_INPUT, output_variable_name="user_input", default=default)
assert form_input.type == FormInputType.TEXT_INPUT
assert form_input.output_variable_name == "user_input"
@ -93,9 +91,7 @@ class TestFormInput:
"""Test text input with variable default value."""
default = FormInputDefault(type=PlaceholderType.VARIABLE, selector=["node_123", "output_var"])
form_input = FormInput(
type=FormInputType.TEXT_INPUT, output_variable_name="user_input", default=default
)
form_input = FormInput(type=FormInputType.TEXT_INPUT, output_variable_name="user_input", default=default)
assert form_input.default.type == PlaceholderType.VARIABLE
assert form_input.default.selector == ["node_123", "output_var"]

View File

@ -7,6 +7,7 @@ from core.workflow.entities.graph_init_params import GraphInitParams
from core.workflow.enums import NodeType
from core.workflow.graph_events import (
NodeRunHumanInputFormFilledEvent,
NodeRunHumanInputFormTimeoutEvent,
NodeRunStartedEvent,
)
from core.workflow.nodes.human_input.enums import HumanInputFormStatus
@ -86,6 +87,67 @@ def _build_node(form_content: str = "Please enter your name:\n\n{{#$output.name#
)
def _build_timeout_node() -> HumanInputNode:
system_variables = SystemVariable.empty()
system_variables.workflow_execution_id = str(uuid.uuid4())
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool(system_variables=system_variables, user_inputs={}, environment_variables=[]),
start_at=0.0,
)
graph_init_params = GraphInitParams(
tenant_id="tenant",
app_id="app",
workflow_id="workflow",
graph_config={"nodes": [], "edges": []},
user_id="user",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.SERVICE_API,
call_depth=0,
)
config = {
"id": "node-1",
"type": NodeType.HUMAN_INPUT.value,
"data": {
"title": "Human Input",
"form_content": "Please enter your name:\n\n{{#$output.name#}}",
"inputs": [
{
"type": "text_input",
"output_variable_name": "name",
"default": {"type": "constant", "value": ""},
}
],
"user_actions": [
{
"id": "Accept",
"title": "Approve",
"button_style": "default",
}
],
},
}
fake_form = SimpleNamespace(
id="form-1",
rendered_content="content",
submitted=False,
selected_action_id=None,
submitted_data=None,
status=HumanInputFormStatus.TIMEOUT,
expiration_time=naive_utc_now() - datetime.timedelta(minutes=1),
)
repo = _FakeFormRepository(fake_form)
return HumanInputNode(
id="node-1",
config=config,
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
form_repository=repo,
)
def test_human_input_node_emits_form_filled_event_before_succeeded():
node = _build_node()
@ -99,3 +161,15 @@ def test_human_input_node_emits_form_filled_event_before_succeeded():
assert filled_event.rendered_content.endswith("Alice")
assert filled_event.action_id == "Accept"
assert filled_event.action_text == "Approve"
def test_human_input_node_emits_timeout_event_before_succeeded():
node = _build_timeout_node()
events = list(node.run())
assert isinstance(events[0], NodeRunStartedEvent)
assert isinstance(events[1], NodeRunHumanInputFormTimeoutEvent)
timeout_event = events[1]
assert timeout_event.node_title == "Human Input"

View File

@ -304,9 +304,7 @@ class TestFormValidation:
"tenant_id": "tenant-abc",
"app_id": "app-def",
"form_content": "Test form",
"inputs": [
FormInput(type=FormInputType.TEXT_INPUT, output_variable_name="required_input", default=None)
],
"inputs": [FormInput(type=FormInputType.TEXT_INPUT, output_variable_name="required_input", default=None)],
"user_actions": [UserAction(id="submit", title="Submit")],
"timeout": 1,
"timeout_unit": TimeoutUnit.HOUR,