This commit is contained in:
QuantumGhost
2025-11-12 05:28:56 +08:00
parent 8b914d9116
commit 4f48b8a57d
23 changed files with 1108 additions and 1828 deletions

View File

@ -1,197 +0,0 @@
"""
Domain entities for human input forms.
Models are independent of the storage mechanism and don't contain
implementation details like tenant_id, app_id, etc.
"""
from datetime import datetime
from enum import StrEnum
from typing import Any, Optional
from pydantic import BaseModel, Field
from libs.datetime_utils import naive_utc_now
def naive_utc_from_now() -> datetime:
"""Get current UTC datetime."""
return naive_utc_now()
class HumanInputFormStatus(StrEnum):
"""Status of a human input form."""
WAITING = "waiting"
EXPIRED = "expired"
SUBMITTED = "submitted"
TIMEOUT = "timeout"
class HumanInputSubmissionType(StrEnum):
"""Type of submission for human input forms."""
web_form = "web_form"
web_app = "web_app"
email = "email"
class FormSubmission(BaseModel):
"""Represents a form submission."""
data: dict[str, Any] = Field(default_factory=dict)
action: str = ""
submitted_at: datetime = Field(default_factory=naive_utc_now)
submission_type: HumanInputSubmissionType = HumanInputSubmissionType.web_form
submission_user_id: Optional[str] = None
submission_end_user_id: Optional[str] = None
submitter_email: Optional[str] = None
class HumanInputForm(BaseModel):
"""
Domain model for human input forms.
This model represents the business concept of a human input form without
infrastructure concerns like tenant_id, app_id, etc.
"""
id_: str = Field(...)
workflow_run_id: str = Field(...)
form_definition: dict[str, Any] = Field(default_factory=dict)
rendered_content: str = ""
status: HumanInputFormStatus = HumanInputFormStatus.WAITING
web_app_token: Optional[str] = None
submission: Optional[FormSubmission] = None
created_at: datetime = Field(default_factory=naive_utc_from_now)
@property
def is_submitted(self) -> bool:
"""Check if the form has been submitted."""
return self.status == HumanInputFormStatus.SUBMITTED
@property
def is_expired(self) -> bool:
"""Check if the form has expired."""
return self.status == HumanInputFormStatus.EXPIRED
@property
def is_waiting(self) -> bool:
"""Check if the form is waiting for submission."""
return self.status == HumanInputFormStatus.WAITING
@property
def can_be_submitted(self) -> bool:
"""Check if the form can still be submitted."""
return self.status == HumanInputFormStatus.WAITING
def submit(
self,
data: dict[str, Any],
action: str,
submission_type: HumanInputSubmissionType = HumanInputSubmissionType.web_form,
submission_user_id: Optional[str] = None,
submission_end_user_id: Optional[str] = None,
submitter_email: Optional[str] = None,
) -> None:
"""
Submit the form with the given data and action.
Args:
data: The form data submitted by the user
action: The action taken by the user
submission_type: Type of submission
submission_user_id: ID of the user who submitted (console submissions)
submission_end_user_id: ID of the end user who submitted (webapp submissions)
submitter_email: Email of the submitter (if applicable)
Raises:
ValueError: If the form cannot be submitted
"""
if not self.can_be_submitted:
raise ValueError(f"Form cannot be submitted in status: {self.status}")
# Validate that the action is valid based on form definition
valid_actions = {act.get("id") for act in self.form_definition.get("user_actions", [])}
if action not in valid_actions:
raise ValueError(f"Invalid action: {action}")
self.submission = FormSubmission(
data=data,
action=action,
submission_type=submission_type,
submission_user_id=submission_user_id,
submission_end_user_id=submission_end_user_id,
submitter_email=submitter_email,
)
self.status = HumanInputFormStatus.SUBMITTED
def expire(self) -> None:
"""Mark the form as expired."""
if self.status != HumanInputFormStatus.WAITING:
raise ValueError(f"Form cannot be expired in status: {self.status}")
self.status = HumanInputFormStatus.EXPIRED
def get_form_definition_for_display(self, include_site_info: bool = False) -> dict[str, Any]:
"""
Get form definition for display purposes.
Args:
include_site_info: Whether to include site information in the response
Returns:
Form definition dictionary for display
"""
if self.status == HumanInputFormStatus.EXPIRED:
raise ValueError("Form has expired")
if self.status == HumanInputFormStatus.SUBMITTED:
raise ValueError("Form has already been submitted")
response = {
"form_content": self.rendered_content,
"inputs": self.form_definition.get("inputs", []),
"user_actions": self.form_definition.get("user_actions", []),
}
if include_site_info:
# Note: In domain model, we don't have app_id
# This would be added at the application layer
response["site"] = {
"title": "Workflow Form",
}
return response
@classmethod
def create(
cls,
*,
id_: str,
workflow_run_id: str,
form_definition: dict[str, Any],
rendered_content: str,
web_app_token: Optional[str] = None,
) -> "HumanInputForm":
"""
Create a new human input form.
Args:
id_: Unique identifier for the form
workflow_run_id: ID of the associated workflow run
form_definition: Form definition as a dictionary
rendered_content: Rendered HTML content of the form
web_app_token: Optional token for web app access
Returns:
New HumanInputForm instance
"""
return cls(
id_=id_,
workflow_run_id=workflow_run_id,
form_definition=form_definition,
rendered_content=rendered_content,
status=HumanInputFormStatus.WAITING,
web_app_token=web_app_token,
)

View File

@ -3,6 +3,8 @@ from typing import Annotated, Literal, TypeAlias
from pydantic import BaseModel, Field
from core.workflow.nodes.human_input.entities import FormInput
class PauseReasonType(StrEnum):
HUMAN_INPUT_REQUIRED = auto()
@ -11,10 +13,10 @@ class PauseReasonType(StrEnum):
class HumanInputRequired(BaseModel):
TYPE: Literal[PauseReasonType.HUMAN_INPUT_REQUIRED] = PauseReasonType.HUMAN_INPUT_REQUIRED
form_id: str
# The identifier of the human input node causing the pause.
node_id: str
form_content: str
inputs: list[FormInput] = Field(default_factory=list)
web_app_form_token: str | None = None
class SchedulingPause(BaseModel):

View File

@ -4,6 +4,5 @@ Human Input node implementation.
from .entities import HumanInputNodeData
from .human_input_node import HumanInputNode
from .node import HumanInputNode
__all__ = ["HumanInputNode", "HumanInputNodeData"]

View File

@ -2,59 +2,77 @@
Human Input node entities.
"""
import enum
import re
import uuid
from collections.abc import Mapping, Sequence
from datetime import datetime, timedelta
from enum import StrEnum
from typing import Any, Literal, Optional, Union
from typing import Annotated, Literal, Optional
from pydantic import BaseModel, Field, model_validator
from pydantic import BaseModel, Field, field_validator
from core.variables.consts import SELECTORS_LENGTH
from core.workflow.nodes.base import BaseNodeData
from core.workflow.nodes.base.variable_template_parser import VariableTemplateParser
_OUTPUT_VARIABLE_PATTERN = re.compile(r"\{\{#\$outputs\.(?P<field_name>[a-zA-Z_][a-zA-Z0-9_]{0,29})#\}\}")
class HumanInputFormStatus(StrEnum):
"""Status of a human input form."""
WAITING = enum.auto()
EXPIRED = enum.auto()
SUBMITTED = enum.auto()
TIMEOUT = enum.auto()
class DeliveryMethodType(StrEnum):
"""Delivery method types for human input forms."""
WEBAPP = "webapp"
EMAIL = "email"
WEBAPP = enum.auto()
EMAIL = enum.auto()
class ButtonStyle(StrEnum):
"""Button styles for user actions."""
PRIMARY = "primary"
DEFAULT = "default"
ACCENT = "accent"
GHOST = "ghost"
PRIMARY = enum.auto()
DEFAULT = enum.auto()
ACCENT = enum.auto()
GHOST = enum.auto()
class TimeoutUnit(StrEnum):
"""Timeout unit for form expiration."""
HOUR = "hour"
DAY = "day"
HOUR = enum.auto()
DAY = enum.auto()
class FormInputType(StrEnum):
"""Form input types."""
TEXT_INPUT = "text-input"
PARAGRAPH = "paragraph"
TEXT_INPUT = enum.auto()
PARAGRAPH = enum.auto()
class PlaceholderType(StrEnum):
"""Placeholder types for form inputs."""
VARIABLE = "variable"
CONSTANT = "constant"
VARIABLE = enum.auto()
CONSTANT = enum.auto()
class RecipientType(StrEnum):
class EmailRecipientType(StrEnum):
"""Email recipient types."""
MEMBER = "member"
EXTERNAL = "external"
MEMBER = enum.auto()
EXTERNAL = enum.auto()
class WebAppDeliveryConfig(BaseModel):
class _WebAppDeliveryConfig(BaseModel):
"""Configuration for webapp delivery method."""
pass # Empty for webapp delivery
@ -63,25 +81,25 @@ class WebAppDeliveryConfig(BaseModel):
class MemberRecipient(BaseModel):
"""Member recipient for email delivery."""
type: Literal[RecipientType.MEMBER]
type: Literal[EmailRecipientType.MEMBER] = EmailRecipientType.MEMBER
user_id: str
class ExternalRecipient(BaseModel):
"""External recipient for email delivery."""
type: Literal[RecipientType.EXTERNAL]
type: Literal[EmailRecipientType.EXTERNAL] = EmailRecipientType.EXTERNAL
email: str
Recipient = Union[MemberRecipient, ExternalRecipient]
EmailRecipient = Annotated[MemberRecipient | ExternalRecipient, Field(discriminator="type")]
class EmailRecipients(BaseModel):
"""Email recipients configuration."""
whole_workspace: bool = False
items: list[Recipient] = Field(default_factory=list)
items: list[EmailRecipient] = Field(default_factory=list)
class EmailDeliveryConfig(BaseModel):
@ -92,51 +110,54 @@ class EmailDeliveryConfig(BaseModel):
body: str
DeliveryConfig = Union[WebAppDeliveryConfig, EmailDeliveryConfig]
class _DeliveryMethodBase(BaseModel):
"""Base delivery method configuration."""
class DeliveryMethod(BaseModel):
"""Delivery method configuration."""
type: DeliveryMethodType
enabled: bool = True
config: Optional[DeliveryConfig] = None
id: uuid.UUID = Field(default_factory=uuid.uuid4)
@model_validator(mode="after")
def validate_config_type(self):
"""Validate that config matches the delivery method type."""
if self.config is None:
return self
if self.type == DeliveryMethodType.EMAIL:
if isinstance(self.config, dict):
# Try to parse as EmailDeliveryConfig - this will raise validation errors
try:
self.config = EmailDeliveryConfig.model_validate(self.config)
except Exception as e:
# Re-raise with more specific context
raise ValueError(f"Invalid email delivery configuration: {str(e)}")
elif not isinstance(self.config, EmailDeliveryConfig):
raise ValueError("Config must be EmailDeliveryConfig for email delivery method")
elif self.type == DeliveryMethodType.WEBAPP:
if isinstance(self.config, dict):
# Try to parse as WebAppDeliveryConfig
try:
self.config = WebAppDeliveryConfig.model_validate(self.config)
except Exception as e:
raise ValueError(f"Invalid webapp delivery configuration: {str(e)}")
elif not isinstance(self.config, WebAppDeliveryConfig):
raise ValueError("Config must be WebAppDeliveryConfig for webapp delivery method")
class WebAppDeliveryMethod(_DeliveryMethodBase):
"""Webapp delivery method configuration."""
return self
type: Literal[DeliveryMethodType.WEBAPP] = DeliveryMethodType.WEBAPP
# The config field is not used currently.
config: _WebAppDeliveryConfig = Field(default_factory=_WebAppDeliveryConfig)
class EmailDeliveryMethod(_DeliveryMethodBase):
"""Email delivery method configuration."""
type: Literal[DeliveryMethodType.EMAIL] = DeliveryMethodType.EMAIL
config: EmailDeliveryConfig
DeliveryChannelConfig = Annotated[WebAppDeliveryMethod | EmailDeliveryMethod, Field(discriminator="type")]
class FormInputPlaceholder(BaseModel):
"""Placeholder configuration for form inputs."""
# NOTE: Ideally, a discriminated union would be used to model
# FormInputPlaceholder. However, the UI requires preserving the previous
# value when switching between `VARIABLE` and `CONSTANT` types. This
# necessitates retaining all fields, making a discriminated union unsuitable.
type: PlaceholderType
selector: list[str] = Field(default_factory=list) # Used when type is VARIABLE
value: str = "" # Used when type is CONSTANT
# The selector of placeholder variable, used when `type` is `VARIABLE`
selector: Sequence[str] = Field(default_factory=tuple) #
# The value of the placeholder, used when `type` is `CONSTANT`.
# TODO: How should we express JSON values?
value: str = ""
@field_validator("selector")
@classmethod
def _validate_selector(cls, selector: Sequence[str]) -> Sequence[str]:
if len(selector) < SELECTORS_LENGTH:
raise ValueError(f"the length of selector should be at least {SELECTORS_LENGTH}, selector={selector}")
return selector
class FormInput(BaseModel):
@ -147,24 +168,106 @@ class FormInput(BaseModel):
placeholder: Optional[FormInputPlaceholder] = None
_IDENTIFIER_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
class UserAction(BaseModel):
"""User action configuration."""
# id is the identifier for this action.
# It also serves as the identifiers of output handle.
#
# The id must be a valid identifier (satisfy the _IDENTIFIER_PATTERN above.)
id: str
title: str
button_style: ButtonStyle = ButtonStyle.DEFAULT
@field_validator("id")
@classmethod
def _validate_id(cls, value: str) -> str:
if not _IDENTIFIER_PATTERN.match(value):
raise ValueError(
f"'{value}' is not a valid identifier. It must start with a letter or underscore, "
f"and contain only letters, numbers, or underscores."
)
return value
class HumanInputNodeData(BaseNodeData):
"""Human Input node data."""
delivery_methods: list[DeliveryMethod] = Field(default_factory=list)
delivery_methods: list[DeliveryChannelConfig] = Field(default_factory=list)
form_content: str = ""
inputs: list[FormInput] = Field(default_factory=list)
user_actions: list[UserAction] = Field(default_factory=list)
timeout: int = 36
timeout_unit: TimeoutUnit = TimeoutUnit.HOUR
@field_validator("inputs")
@classmethod
def _validate_inputs(cls, inputs: list[FormInput]) -> list[FormInput]:
seen_names: set[str] = set()
for form_input in inputs:
name = form_input.output_variable_name
if name in seen_names:
raise ValueError(f"duplicated output_variable_name '{name}' in inputs")
seen_names.add(name)
return inputs
@field_validator("user_actions")
@classmethod
def _validate_user_actions(cls, user_actions: list[UserAction]) -> list[UserAction]:
seen_ids: set[str] = set()
for action in user_actions:
action_id = action.id
if action_id in seen_ids:
raise ValueError(f"duplicated user action id '{action_id}'")
seen_ids.add(action_id)
return user_actions
def is_webapp_enabled(self) -> bool:
for dm in self.delivery_methods:
if not dm.enabled:
continue
if dm.type == DeliveryMethodType.WEBAPP:
return True
return False
def expiration_time(self, start_time: datetime) -> datetime:
if self.timeout_unit == TimeoutUnit.HOUR:
return start_time + timedelta(hours=self.timeout)
elif self.timeout_unit == TimeoutUnit.DAY:
return start_time + timedelta(days=self.timeout)
else:
raise AssertionError("unknown timeout unit.")
def outputs_field_names(self) -> Sequence[str]:
field_names = []
for match in _OUTPUT_VARIABLE_PATTERN.finditer(self.form_content):
field_names.append(match.group("field_name"))
return field_names
def extract_variable_selector_to_variable_mapping(self, node_id: str) -> Mapping[str, Sequence[str]]:
variable_selectors = []
variable_template_parser = VariableTemplateParser(template=self.form_content)
variable_selectors.extend(variable_template_parser.extract_variable_selectors())
variable_mappings = {}
for variable_selector in variable_selectors:
qualified_variable_mapping_key = f"{node_id}.{variable_selector.variable}"
variable_mappings[qualified_variable_mapping_key] = variable_selector.value_selector
for input in self.inputs:
placeholder = input.placeholder
if placeholder is None:
continue
if placeholder.type == PlaceholderType.CONSTANT:
continue
placeholder_key = ".".join(placeholder.selector)
qualified_variable_mapping_key = f"{node_id}.#{placeholder_key}#"
variable_mappings[qualified_variable_mapping_key] = placeholder.selector
return variable_mappings
class HumanInputRequired(BaseModel):
"""Event data for human input required."""
@ -176,72 +279,11 @@ class HumanInputRequired(BaseModel):
web_app_form_token: Optional[str] = None
class WorkflowSuspended(BaseModel):
"""Event data for workflow suspended."""
suspended_at_node_ids: list[str]
class PauseTypeHumanInput(BaseModel):
"""Pause type for human input."""
type: Literal["human_input"]
form_id: str
class PauseTypeBreakpoint(BaseModel):
"""Pause type for breakpoint (debugging)."""
type: Literal["breakpoint"]
PauseType = Union[PauseTypeHumanInput, PauseTypeBreakpoint]
class PausedNode(BaseModel):
"""Information about a paused node."""
node_id: str
node_title: str
pause_type: PauseType
class WorkflowPauseDetails(BaseModel):
"""Details about workflow pause."""
paused_at: str # ISO datetime
paused_nodes: list[PausedNode]
class FormSubmissionRequest(BaseModel):
"""Form submission request data."""
inputs: dict[str, str] # mapping of output_variable_name to user input
action: str # UserAction id
class FormGetResponse(BaseModel):
"""Response for form get API."""
site: Optional[dict[str, Any]] = None # Site information for webapp
class FormDefinition(BaseModel):
form_content: str
inputs: list[FormInput]
inputs: list[FormInput] = Field(default_factory=list)
user_actions: list[UserAction] = Field(default_factory=list)
rendered_content: str
class FormSubmissionResponse(BaseModel):
"""Response for successful form submission."""
pass # Empty response for success
class FormErrorResponse(BaseModel):
"""Response for form submission errors."""
error_code: str
description: str
class ResumeWaitResponse(BaseModel):
"""Response for resume wait API."""
status: Literal["paused", "running", "ended"]
timeout: int
timeout_unit: TimeoutUnit

View File

@ -1,13 +1,27 @@
from collections.abc import Mapping
import dataclasses
import logging
from collections.abc import Generator, Mapping, Sequence
from typing import Any
from core.workflow.entities.pause_reason import HumanInputRequired
from core.workflow.enums import NodeExecutionType, NodeType, WorkflowNodeExecutionStatus
from core.workflow.node_events import NodeRunResult, PauseRequestedEvent
from core.workflow.node_events.base import NodeEventBase
from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig
from core.workflow.nodes.base.node import Node
from core.workflow.repositories.human_input_form_repository import FormCreateParams, HumanInputFormRepository
from .entities import HumanInputNodeData
_SELECTED_BRANCH_KEY = "selected_branch"
logger = logging.getLogger(__name__)
@dataclasses.dataclass
class _FormSubmissionResult:
action_id: str
class HumanInputNode(Node[HumanInputNodeData]):
node_type = NodeType.HUMAN_INPUT
@ -17,7 +31,7 @@ class HumanInputNode(Node[HumanInputNodeData]):
"edge_source_handle",
"edgeSourceHandle",
"source_handle",
"selected_branch",
_SELECTED_BRANCH_KEY,
"selectedBranch",
"branch",
"branch_id",
@ -25,43 +39,12 @@ class HumanInputNode(Node[HumanInputNodeData]):
"handle",
)
_node_data: HumanInputNodeData
@classmethod
def version(cls) -> str:
return "1"
def _run(self): # type: ignore[override]
if self._is_completion_ready():
branch_handle = self._resolve_branch_selection()
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
outputs={},
edge_source_handle=branch_handle or "source",
)
return self._pause_generator()
def _pause_generator(self):
# TODO(QuantumGhost): yield a real form id.
yield PauseRequestedEvent(reason=HumanInputRequired(form_id="test_form_id", node_id=self.id))
def _is_completion_ready(self) -> bool:
"""Determine whether all required inputs are satisfied."""
if not self.node_data.required_variables:
return False
variable_pool = self.graph_runtime_state.variable_pool
for selector_str in self.node_data.required_variables:
parts = selector_str.split(".")
if len(parts) != 2:
return False
segment = variable_pool.get(parts)
if segment is None:
return False
return True
def _resolve_branch_selection(self) -> str | None:
"""Determine the branch handle selected by human input if available."""
@ -108,3 +91,106 @@ class HumanInputNode(Node[HumanInputNodeData]):
return candidate
return None
def _create_form_repository(self) -> HumanInputFormRepository:
pass
@staticmethod
def _pause_generator(event: PauseRequestedEvent) -> Generator[NodeEventBase, None, None]:
yield event
@property
def _workflow_execution_id(self) -> str:
workflow_exec_id = self.graph_runtime_state.variable_pool.system_variables.workflow_execution_id
assert workflow_exec_id is not None
return workflow_exec_id
def _run(self) -> NodeRunResult | Generator[NodeEventBase, None, None]:
"""
Execute the human input node.
This method will:
1. Generate a unique form ID
2. Create form content with variable substitution
3. Create form in database
4. Send form via configured delivery methods
5. Suspend workflow execution
6. Wait for form submission to resume
"""
repo = self._create_form_repository()
submission_result = repo.get_form_submission(self._workflow_execution_id, self.app_id)
if submission_result:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
outputs={
"action_id": submission_result.selected_action_id,
},
edge_source_handle=submission_result.selected_action_id,
)
try:
repo = self._create_form_repository()
params = FormCreateParams(
workflow_execution_id=self._workflow_execution_id,
node_id=self.id,
form_config=self._node_data,
rendered_content=self._render_form_content(),
)
result = repo.create_form(params)
# Create human input required event
required_event = HumanInputRequired(
form_id=result.id,
form_content=self._node_data.form_content,
inputs=self._node_data.inputs,
web_app_form_token=result.web_app_token,
)
pause_requested_event = PauseRequestedEvent(reason=required_event)
# Create workflow suspended event
logger.info(
"Human Input node suspended workflow for form. workflow_run_id=%s, node_id=%s, form_id=%s",
self.graph_runtime_state.variable_pool.system_variables.workflow_execution_id,
self.id,
result.id,
)
except Exception as e:
logger.exception("Human Input node failed to execute, node_id=%s", self.id)
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
error=str(e),
error_type="HumanInputNodeError",
)
return self._pause_generator(pause_requested_event)
def _render_form_content(self) -> str:
"""
Process form content by substituting variables.
This method should:
1. Parse the form_content markdown
2. Substitute {{#node_name.var_name#}} with actual values
3. Keep {{#$output.field_name#}} placeholders for form inputs
"""
rendered_form_content = self.graph_runtime_state.variable_pool.convert_template(
self._node_data.form_content,
)
return rendered_form_content.markdown
@classmethod
def _extract_variable_selector_to_variable_mapping(
cls,
*,
graph_config: Mapping[str, Any],
node_id: str,
node_data: Mapping[str, Any],
) -> Mapping[str, Sequence[str]]:
"""
Extract variable selectors referenced in form content and input placeholders.
This method should parse:
1. Variables referenced in form_content ({{#node_name.var_name#}})
2. Variables referenced in input placeholders
"""
validated_node_data = HumanInputNodeData.model_validate(node_data)
return validated_node_data.extract_variable_selector_to_variable_mapping(node_id)

View File

@ -1,259 +0,0 @@
"""
Human Input node implementation.
"""
import json
import logging
import uuid
from collections.abc import Generator, Mapping, Sequence
from typing import Any, Optional, Union
from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from core.workflow.graph_engine.entities.event import InNodeEvent
from core.workflow.nodes.base import BaseNode
from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig
from core.workflow.nodes.enums import ErrorStrategy, NodeType
from core.workflow.nodes.event import NodeEvent
from core.workflow.nodes.human_input.entities import (
HumanInputNodeData,
HumanInputRequired,
WorkflowSuspended,
)
from extensions.ext_database import db
from services.human_input_form_service import HumanInputFormService
logger = logging.getLogger(__name__)
class HumanInputNode(BaseNode):
"""
Human Input Node implementation.
This node pauses workflow execution and waits for human input through
configured delivery methods (webapp or email). The workflow resumes
once the form is submitted.
"""
_node_type: NodeType = NodeType.HUMAN_INPUT
_node_data_cls = HumanInputNodeData
node_data: HumanInputNodeData
def init_node_data(self, data: Mapping[str, Any]) -> None:
"""Initialize node data from configuration."""
self.node_data = self._node_data_cls.model_validate(data)
def _run(self) -> NodeRunResult | Generator[Union[NodeEvent, InNodeEvent], None, None]:
"""
Execute the human input node.
This method will:
1. Generate a unique form ID
2. Create form content with variable substitution
3. Create form in database
4. Send form via configured delivery methods
5. Suspend workflow execution
6. Wait for form submission to resume
"""
try:
# Generate unique form ID
form_id = str(uuid.uuid4())
# Create form content with variable substitution
form_content = self._process_form_content()
# Generate webapp token if webapp delivery is enabled
web_app_form_token = None
webapp_enabled = any(dm.enabled and dm.type.value == "webapp" for dm in self.node_data.delivery_methods)
if webapp_enabled:
web_app_form_token = str(uuid.uuid4()).replace("-", "")
# Create form definition for database storage
form_definition = {
"node_id": self.node_id,
"title": self.node_data.title,
"inputs": [inp.model_dump() for inp in self.node_data.inputs],
"user_actions": [action.model_dump() for action in self.node_data.user_actions],
"timeout": self.node_data.timeout,
"timeout_unit": self.node_data.timeout_unit.value,
"delivery_methods": [dm.model_dump() for dm in self.node_data.delivery_methods],
}
# Create form in database
service = HumanInputFormService(db.session())
service.create_form(
form_id=form_id,
workflow_run_id=self.graph_runtime_state.workflow_run_id,
tenant_id=self.graph_init_params.tenant_id,
app_id=self.graph_init_params.app_id,
form_definition=json.dumps(form_definition),
rendered_content=form_content,
web_app_token=web_app_form_token,
)
# Create human input required event
human_input_event = HumanInputRequired(
form_id=form_id,
node_id=self.node_id,
form_content=form_content,
inputs=self.node_data.inputs,
web_app_form_token=web_app_form_token,
)
# Create workflow suspended event
suspended_event = WorkflowSuspended(suspended_at_node_ids=[self.node_id])
logger.info(f"Human Input node {self.node_id} suspended workflow for form {form_id}")
# Return suspension result
# The workflow engine should handle the suspension and resume logic
return NodeRunResult(
status=WorkflowNodeExecutionStatus.RUNNING, # Node is still running, waiting for input
inputs={},
outputs={},
metadata={
"form_id": form_id,
"web_app_form_token": web_app_form_token,
"human_input_event": human_input_event.model_dump(),
"suspended_event": suspended_event.model_dump(),
"suspended": True, # Flag to indicate this node caused suspension
},
)
except Exception as e:
logger.exception(f"Human Input node {self.node_id} failed to execute")
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
error=str(e),
error_type="HumanInputNodeError",
)
def _process_form_content(self) -> str:
"""
Process form content by substituting variables.
This method should:
1. Parse the form_content markdown
2. Substitute {{#node_name.var_name#}} with actual values
3. Keep {{#$output.field_name#}} placeholders for form inputs
"""
# TODO: Implement variable substitution logic
# For now, return the raw form content
# This should integrate with the existing variable template parser
return self.node_data.form_content
@classmethod
def _extract_variable_selector_to_variable_mapping(
cls,
*,
graph_config: Mapping[str, Any],
node_id: str,
node_data: Mapping[str, Any],
) -> Mapping[str, Sequence[str]]:
"""
Extract variable selectors referenced in form content and input placeholders.
This method should parse:
1. Variables referenced in form_content ({{#node_name.var_name#}})
2. Variables referenced in input placeholders
"""
# TODO: Implement variable extraction logic
# This should parse the form_content and placeholder configurations
# to extract all referenced variables
return {}
@classmethod
def get_default_config(cls, filters: Optional[dict] = None) -> dict:
"""Get default configuration for human input node."""
return {
"type": "human_input",
"config": {
"delivery_methods": [{"type": "webapp", "enabled": True, "config": {}}],
"form_content": "# Human Input\n\nPlease provide your input:\n\n{{#$output.input#}}",
"inputs": [
{
"type": "text-input",
"output_variable_name": "input",
"placeholder": {"type": "constant", "value": "Enter your response here..."},
}
],
"user_actions": [{"id": "submit", "title": "Submit", "button_style": "primary"}],
"timeout": 24,
"timeout_unit": "hour",
},
}
@classmethod
def version(cls) -> str:
"""Return the version of the human input node."""
return "1"
def _get_error_strategy(self) -> Optional[ErrorStrategy]:
"""Get the error strategy for this node."""
return self.node_data.error_strategy
def _get_retry_config(self) -> RetryConfig:
"""Get the retry configuration for this node."""
return self.node_data.retry_config
def _get_title(self) -> str:
"""Get the node title."""
return self.node_data.title
def _get_description(self) -> Optional[str]:
"""Get the node description."""
return self.node_data.desc
def _get_default_value_dict(self) -> dict[str, Any]:
"""Get the default values dictionary for this node."""
return self.node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData:
"""Get the BaseNodeData object for this node."""
return self.node_data
def resume_from_human_input(self, form_submission_data: dict[str, Any]) -> NodeRunResult:
"""
Resume node execution after human input form is submitted.
Args:
form_submission_data: Dict containing:
- inputs: Dict of input field values
- action: The user action taken
Returns:
NodeRunResult with the form inputs as outputs
"""
try:
inputs = form_submission_data.get("inputs", {})
action = form_submission_data.get("action", "")
# Create output dictionary with form inputs
outputs = {}
for input_field in self.node_data.inputs:
field_name = input_field.output_variable_name
if field_name in inputs:
outputs[field_name] = inputs[field_name]
# Add the action to outputs
outputs["_action"] = action
logger.info(f"Human Input node {self.node_id} resumed with action {action}")
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs={},
outputs=outputs,
metadata={
"form_submitted": True,
"submitted_action": action,
},
)
except Exception as e:
logger.exception(f"Human Input node {self.node_id} failed to resume")
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
error=str(e),
error_type="HumanInputResumeError",
)

View File

@ -1,6 +1,65 @@
from typing import Protocol
import abc
import dataclasses
from collections.abc import Mapping
from typing import Any, Protocol
from core.workflow.entities.human_input_form import HumanInputForm
from core.workflow.nodes.human_input.entities import HumanInputNodeData
class HumanInputError(Exception):
pass
class FormNotFoundError(HumanInputError):
pass
@dataclasses.dataclass
class FormCreateParams:
workflow_execution_id: str
# node_id is the identifier for a specific
# node in the graph.
#
# TODO: for node inside loop / iteration, this would
# cause problems, as a single node may be executed multiple times.
node_id: str
form_config: HumanInputNodeData
rendered_content: str
class HumanInputFormEntity(abc.ABC):
@property
@abc.abstractmethod
def id(self) -> str:
"""id returns the identifer of the form."""
pass
@property
@abc.abstractmethod
def web_app_token(self) -> str | None:
"""web_app_token returns the token for submission inside webapp.
If web app delivery is not enabled, this method would return `None`.
"""
# TODO: what if the users are allowed to add multiple
# webapp delivery?
pass
class FormSubmissionEntity(abc.ABC):
@property
@abc.abstractmethod
def selected_action_id(self) -> str:
"""The identifier of action user has selected, correspond to `UserAction.id`."""
pass
@abc.abstractmethod
def form_data(self) -> Mapping[str, Any]:
"""The data submitted for this form"""
pass
class HumanInputFormRepository(Protocol):
@ -16,93 +75,17 @@ class HumanInputFormRepository(Protocol):
application domains or deployment scenarios.
"""
def save(self, form: HumanInputForm) -> None:
def create_form(self, params: FormCreateParams) -> HumanInputFormEntity:
"""
Save or update a HumanInputForm instance.
This method handles both creating new records and updating existing ones.
The implementation should determine whether to create or update based on
the form's ID or other identifying fields.
Args:
form: The HumanInputForm instance to save or update
Create a human input form from form definition.
"""
...
def get_by_id(self, form_id: str) -> HumanInputForm:
"""
Get a form by its ID.
def get_form_submission(self, workflow_execution_id: str, node_id: str) -> FormSubmissionEntity | None:
"""Retrieve the submission for a specific human input node.
Args:
form_id: The ID of the form to retrieve
Returns `FormSubmission` if the form has been submitted, or `None` if not.
Returns:
The HumanInputForm instance
Raises:
NotFoundError: If the form is not found
"""
...
def get_by_web_app_token(self, web_app_token: str) -> HumanInputForm:
"""
Get a form by its web app token.
Args:
web_app_token: The web app token to search for
Returns:
The HumanInputForm instance
Raises:
NotFoundError: If the form is not found
"""
...
def get_pending_forms_for_workflow_run(self, workflow_run_id: str) -> list[HumanInputForm]:
"""
Get all pending human input forms for a workflow run.
Args:
workflow_run_id: The workflow run ID to filter by
Returns:
List of pending HumanInputForm instances
"""
...
def mark_expired_forms(self, expiry_hours: int = 48) -> int:
"""
Mark expired forms as expired.
Args:
expiry_hours: Number of hours after which forms should be expired
Returns:
Number of forms marked as expired
"""
...
def exists_by_id(self, form_id: str) -> bool:
"""
Check if a form exists by ID.
Args:
form_id: The ID of the form to check
Returns:
True if the form exists, False otherwise
"""
...
def exists_by_web_app_token(self, web_app_token: str) -> bool:
"""
Check if a form exists by web app token.
Args:
web_app_token: The web app token to check
Returns:
True if the form exists, False otherwise
Raises `FormNotFoundError` if correspond form record is not found.
"""
...