WIP: feat(api): Implement HumanInputFormFilled event

This commit is contained in:
QuantumGhost
2026-01-04 10:25:00 +08:00
parent a2e250ce0c
commit 3ab1ad6530
15 changed files with 291 additions and 2 deletions

View File

@ -24,6 +24,7 @@ from core.app.entities.queue_entities import (
QueueAgentLogEvent,
QueueAnnotationReplyEvent,
QueueErrorEvent,
QueueHumanInputFormFilledEvent,
QueueIterationCompletedEvent,
QueueIterationNextEvent,
QueueIterationStartEvent,
@ -656,6 +657,14 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
"""Handle message replace events."""
yield self._message_cycle_manager.message_replace_to_stream_response(answer=event.text, reason=event.reason)
def _handle_human_input_form_filled_event(
self, event: QueueHumanInputFormFilledEvent, **kwargs
) -> Generator[StreamResponse, None, None]:
"""Handle human input form filled events."""
yield self._workflow_response_converter.human_input_form_filled_to_stream_response(
event=event, task_id=self._application_generate_entity.task_id
)
def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[StreamResponse, None, None]:
"""Handle agent log events."""
yield self._workflow_response_converter.handle_agent_log(
@ -695,6 +704,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
QueueMessageReplaceEvent: self._handle_message_replace_event,
QueueAdvancedChatMessageEndEvent: self._handle_advanced_chat_message_end_event,
QueueAgentLogEvent: self._handle_agent_log_event,
QueueHumanInputFormFilledEvent: self._handle_human_input_form_filled_event,
}
def _dispatch_event(

View File

@ -8,6 +8,7 @@ from typing import Any, NewType, Union
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom, WorkflowAppGenerateEntity
from core.app.entities.queue_entities import (
QueueAgentLogEvent,
QueueHumanInputFormFilledEvent,
QueueIterationCompletedEvent,
QueueIterationNextEvent,
QueueIterationStartEvent,
@ -23,6 +24,7 @@ from core.app.entities.queue_entities import (
)
from core.app.entities.task_entities import (
AgentLogStreamResponse,
HumanInputFormFilledResponse,
HumanInputRequiredResponse,
IterationNodeCompletedStreamResponse,
IterationNodeNextStreamResponse,
@ -318,6 +320,21 @@ class WorkflowResponseConverter:
return responses
def human_input_form_filled_to_stream_response(
self, *, event: QueueHumanInputFormFilledEvent, task_id: str
) -> HumanInputFormFilledResponse:
run_id = self._ensure_workflow_run_id()
return HumanInputFormFilledResponse(
task_id=task_id,
workflow_run_id=run_id,
data=HumanInputFormFilledResponse.Data(
node_id=event.node_id,
rendered_content=event.rendered_content,
action_id=event.action_id,
action_text=event.action_text,
),
)
@classmethod
def workflow_run_result_to_finish_response(
cls,

View File

@ -17,6 +17,7 @@ from core.app.entities.queue_entities import (
MessageQueueMessage,
QueueAgentLogEvent,
QueueErrorEvent,
QueueHumanInputFormFilledEvent,
QueueIterationCompletedEvent,
QueueIterationNextEvent,
QueueIterationStartEvent,
@ -514,6 +515,14 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
task_id=self._application_generate_entity.task_id, event=event
)
def _handle_human_input_form_filled_event(
self, event: QueueHumanInputFormFilledEvent, **kwargs
) -> Generator[StreamResponse, None, None]:
"""Handle human input form filled events."""
yield self._workflow_response_converter.human_input_form_filled_to_stream_response(
event=event, task_id=self._application_generate_entity.task_id
)
def _get_event_handlers(self) -> dict[type, Callable]:
"""Get mapping of event types to their handlers using fluent pattern."""
return {
@ -540,6 +549,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
QueueLoopCompletedEvent: self._handle_loop_completed_event,
# Agent events
QueueAgentLogEvent: self._handle_agent_log_event,
QueueHumanInputFormFilledEvent: self._handle_human_input_form_filled_event,
}
def _dispatch_event(

View File

@ -8,6 +8,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.queue_entities import (
AppQueueEvent,
QueueAgentLogEvent,
QueueHumanInputFormFilledEvent,
QueueIterationCompletedEvent,
QueueIterationNextEvent,
QueueIterationStartEvent,
@ -41,6 +42,7 @@ from core.workflow.graph_events import (
NodeRunAgentLogEvent,
NodeRunExceptionEvent,
NodeRunFailedEvent,
NodeRunHumanInputFormFilledEvent,
NodeRunIterationFailedEvent,
NodeRunIterationNextEvent,
NodeRunIterationStartedEvent,
@ -380,6 +382,17 @@ class WorkflowBasedAppRunner:
paused_nodes=paused_nodes,
)
)
elif isinstance(event, NodeRunHumanInputFormFilledEvent):
self._publish_event(
QueueHumanInputFormFilledEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
rendered_content=event.rendered_content,
action_id=event.action_id,
action_text=event.action_text,
)
)
elif isinstance(event, NodeRunRetryEvent):
node_run_result = event.node_run_result
inputs = node_run_result.inputs

View File

@ -48,6 +48,7 @@ class QueueEvent(StrEnum):
STOP = "stop"
RETRY = "retry"
PAUSE = "pause"
HUMAN_INPUT_FORM_FILLED = "human_input_form_filled"
class AppQueueEvent(BaseModel):
@ -491,6 +492,21 @@ class QueueStopEvent(AppQueueEvent):
return reason_mapping.get(self.stopped_by, "Stopped by unknown reason.")
class QueueHumanInputFormFilledEvent(AppQueueEvent):
"""
QueueHumanInputFormFilledEvent entity
"""
event: QueueEvent = QueueEvent.HUMAN_INPUT_FORM_FILLED
node_execution_id: str
node_id: str
node_type: NodeType
rendered_content: str
action_id: str
action_text: str
class QueueMessage(BaseModel):
"""
QueueMessage abstract entity

View File

@ -85,6 +85,7 @@ class StreamEvent(StrEnum):
TEXT_REPLACE = "text_replace"
AGENT_LOG = "agent_log"
HUMAN_INPUT_REQUIRED = "human_input_required"
HUMAN_INPUT_FORM_FILLED = "human_input_form_filled"
class StreamResponse(BaseModel):
@ -284,6 +285,22 @@ class HumanInputRequiredResponse(StreamResponse):
data: Data
class HumanInputFormFilledResponse(StreamResponse):
class Data(BaseModel):
"""
Data entity
"""
node_id: str
rendered_content: str
action_id: str
action_text: str
event: StreamEvent = StreamEvent.HUMAN_INPUT_FORM_FILLED
workflow_run_id: str
data: Data
class NodeStartStreamResponse(StreamResponse):
"""
NodeStartStreamResponse entity

View File

@ -38,6 +38,7 @@ from .loop import (
from .node import (
NodeRunExceptionEvent,
NodeRunFailedEvent,
NodeRunHumanInputFormFilledEvent,
NodeRunPauseRequestedEvent,
NodeRunRetrieverResourceEvent,
NodeRunRetryEvent,
@ -59,6 +60,7 @@ __all__ = [
"NodeRunAgentLogEvent",
"NodeRunExceptionEvent",
"NodeRunFailedEvent",
"NodeRunHumanInputFormFilledEvent",
"NodeRunIterationFailedEvent",
"NodeRunIterationNextEvent",
"NodeRunIterationStartedEvent",

View File

@ -55,5 +55,13 @@ class NodeRunRetryEvent(NodeRunStartedEvent):
retry_index: int = Field(..., description="which retry attempt is about to be performed")
class NodeRunHumanInputFormFilledEvent(GraphNodeEventBase):
"""Emitted when a HumanInput form is submitted and before the node finishes."""
rendered_content: str = Field(..., description="Markdown content rendered with user inputs.")
action_id: str = Field(..., description="User action identifier chosen in the form.")
action_text: str = Field(..., description="Display text of the chosen action button.")
class NodeRunPauseRequestedEvent(GraphNodeEventBase):
reason: PauseReason = Field(..., description="pause reason")

View File

@ -13,6 +13,7 @@ from .loop import (
LoopSucceededEvent,
)
from .node import (
HumanInputFormFilledEvent,
ModelInvokeCompletedEvent,
PauseRequestedEvent,
RunRetrieverResourceEvent,
@ -23,6 +24,7 @@ from .node import (
__all__ = [
"AgentLogEvent",
"HumanInputFormFilledEvent",
"IterationFailedEvent",
"IterationNextEvent",
"IterationStartedEvent",

View File

@ -47,3 +47,11 @@ class StreamCompletedEvent(NodeEventBase):
class PauseRequestedEvent(NodeEventBase):
reason: PauseReason = Field(..., description="pause reason")
class HumanInputFormFilledEvent(NodeEventBase):
"""Event emitted when a human input form is submitted."""
rendered_content: str
action_id: str
action_text: str

View File

@ -16,6 +16,7 @@ from core.workflow.graph_events import (
GraphNodeEventBase,
NodeRunAgentLogEvent,
NodeRunFailedEvent,
NodeRunHumanInputFormFilledEvent,
NodeRunIterationFailedEvent,
NodeRunIterationNextEvent,
NodeRunIterationStartedEvent,
@ -32,6 +33,7 @@ from core.workflow.graph_events import (
)
from core.workflow.node_events import (
AgentLogEvent,
HumanInputFormFilledEvent,
IterationFailedEvent,
IterationNextEvent,
IterationStartedEvent,
@ -612,6 +614,17 @@ class Node(Generic[NodeDataT]):
metadata=event.metadata,
)
@_dispatch.register
def _(self, event: HumanInputFormFilledEvent):
return NodeRunHumanInputFormFilledEvent(
id=self.execution_id,
node_id=self._node_id,
node_type=self.node_type,
rendered_content=event.rendered_content,
action_id=event.action_id,
action_text=event.action_text,
)
@_dispatch.register
def _(self, event: LoopStartedEvent) -> NodeRunLoopStartedEvent:
return NodeRunLoopStartedEvent(

View File

@ -232,6 +232,15 @@ class HumanInputNodeData(BaseNodeData):
return variable_mappings
def find_action_text(self, action_id: str) -> str:
"""
Resolve action display text by id.
"""
for action in self.user_actions:
if action.id == action_id:
return action.title
return action_id
class FormDefinition(BaseModel):
form_content: str

View File

@ -6,7 +6,7 @@ from typing import TYPE_CHECKING, Any
from core.repositories.human_input_reposotiry import HumanInputFormRepositoryImpl
from core.workflow.entities.pause_reason import HumanInputRequired
from core.workflow.enums import NodeExecutionType, NodeType, WorkflowNodeExecutionStatus
from core.workflow.node_events import NodeRunResult, PauseRequestedEvent
from core.workflow.node_events import HumanInputFormFilledEvent, NodeRunResult, PauseRequestedEvent
from core.workflow.node_events.base import NodeEventBase
from core.workflow.nodes.base.node import Node
from core.workflow.repositories.human_input_form_repository import (
@ -217,11 +217,21 @@ class HumanInputNode(Node[HumanInputNodeData]):
submitted_data = form.submitted_data or {}
outputs: dict[str, Any] = dict(submitted_data)
outputs["__action_id"] = selected_action_id
outputs["__rendered_content"] = self._render_form_content_with_outputs(
rendered_content = self._render_form_content_with_outputs(
form.rendered_content,
outputs,
self._node_data.outputs_field_names(),
)
outputs["__rendered_content"] = rendered_content
action_text = self._node_data.find_action_text(selected_action_id)
yield HumanInputFormFilledEvent(
rendered_content=rendered_content,
action_id=selected_action_id,
action_text=action_text,
)
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
outputs=outputs,

View File

@ -0,0 +1,59 @@
from types import SimpleNamespace
from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.queue_entities import QueueHumanInputFormFilledEvent
from core.workflow.runtime import GraphRuntimeState, VariablePool
from core.workflow.system_variable import SystemVariable
def _build_converter():
system_variables = SystemVariable(
files=[],
user_id="user-1",
app_id="app-1",
workflow_id="wf-1",
workflow_execution_id="run-1",
)
runtime_state = GraphRuntimeState(variable_pool=VariablePool(), start_at=0.0)
app_entity = SimpleNamespace(
task_id="task-1",
app_config=SimpleNamespace(app_id="app-1", tenant_id="tenant-1"),
invoke_from=InvokeFrom.EXPLORE,
files=[],
inputs={},
workflow_execution_id="run-1",
call_depth=0,
)
account = SimpleNamespace(id="acc-1", name="tester", email="tester@example.com")
return WorkflowResponseConverter(
application_generate_entity=app_entity,
user=account,
system_variables=system_variables,
)
def test_human_input_form_filled_stream_response_contains_rendered_content():
converter = _build_converter()
converter.workflow_start_to_stream_response(
task_id="task-1",
workflow_run_id="run-1",
workflow_id="wf-1",
is_resumption=False,
)
queue_event = QueueHumanInputFormFilledEvent(
node_execution_id="exec-1",
node_id="node-1",
node_type="human-input",
rendered_content="# Title\nvalue",
action_id="Approve",
action_text="Approve",
)
resp = converter.human_input_form_filled_to_stream_response(event=queue_event, task_id="task-1")
assert resp.workflow_run_id == "run-1"
assert resp.data.node_id == "node-1"
assert resp.data.rendered_content.startswith("# Title")
assert resp.data.action_id == "Approve"

View File

@ -0,0 +1,95 @@
import uuid
from types import SimpleNamespace
from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.entities.graph_init_params import GraphInitParams
from core.workflow.enums import NodeType
from core.workflow.graph_events import (
NodeRunHumanInputFormFilledEvent,
NodeRunStartedEvent,
)
from core.workflow.nodes.human_input.human_input_node import HumanInputNode
from core.workflow.runtime import GraphRuntimeState, VariablePool
from core.workflow.system_variable import SystemVariable
from models.enums import UserFrom
class _FakeFormRepository:
def __init__(self, form):
self._form = form
def get_form(self, *_args, **_kwargs):
return self._form
def _build_node(form_content: str = "Please enter your name:\n\n{{#$outputs.name#}}") -> HumanInputNode:
system_variables = SystemVariable.empty()
system_variables.workflow_execution_id = str(uuid.uuid4())
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool(system_variables=system_variables, user_inputs={}, environment_variables=[]),
start_at=0.0,
)
graph_init_params = GraphInitParams(
tenant_id="tenant",
app_id="app",
workflow_id="workflow",
graph_config={"nodes": [], "edges": []},
user_id="user",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.SERVICE_API,
call_depth=0,
)
config = {
"id": "node-1",
"type": NodeType.HUMAN_INPUT.value,
"data": {
"title": "Human Input",
"form_content": form_content,
"inputs": [
{
"type": "text_input",
"output_variable_name": "name",
"placeholder": {"type": "constant", "value": ""},
}
],
"user_actions": [
{
"id": "Accept",
"title": "Approve",
"button_style": "default",
}
],
},
}
fake_form = SimpleNamespace(
id="form-1",
rendered_content=form_content,
submitted=True,
selected_action_id="Accept",
submitted_data={"name": "Alice"},
)
repo = _FakeFormRepository(fake_form)
return HumanInputNode(
id="node-1",
config=config,
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
form_repository=repo,
)
def test_human_input_node_emits_form_filled_event_before_succeeded():
node = _build_node()
events = list(node.run())
assert isinstance(events[0], NodeRunStartedEvent)
assert isinstance(events[1], NodeRunHumanInputFormFilledEvent)
filled_event = events[1]
assert filled_event.rendered_content.endswith("Alice")
assert filled_event.action_id == "Accept"
assert filled_event.action_text == "Approve"