From f988619d2c244dcca7fd900ea7892600a143aa91 Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Thu, 8 Jan 2026 10:27:52 +0800 Subject: [PATCH] feat(api): adjust model fields and cleanup form creation logic --- .../common/workflow_response_converter.py | 1 + .../app/apps/message_based_app_generator.py | 2 ++ api/core/app/apps/message_generator.py | 2 ++ api/core/app/entities/task_entities.py | 1 + .../repositories/human_input_reposotiry.py | 2 +- api/core/workflow/entities/pause_reason.py | 6 ++-- .../workflow/nodes/human_input/entities.py | 6 ++++ .../nodes/human_input/human_input_node.py | 28 +++++++++++++++---- .../human_input_form_repository.py | 8 ++++-- api/models/enums.py | 1 + .../test_human_input_form_repository_impl.py | 4 +++ .../helpers/execution_extra_content.py | 5 ++++ .../test_mail_human_input_delivery_task.py | 2 ++ .../app/apps/test_workflow_pause_events.py | 3 ++ 14 files changed, 60 insertions(+), 11 deletions(-) diff --git a/api/core/app/apps/common/workflow_response_converter.py b/api/core/app/apps/common/workflow_response_converter.py index fa9c1391c3..bb2d323532 100644 --- a/api/core/app/apps/common/workflow_response_converter.py +++ b/api/core/app/apps/common/workflow_response_converter.py @@ -299,6 +299,7 @@ class WorkflowResponseConverter: form_content=reason.form_content, inputs=reason.inputs, actions=reason.actions, + display_in_ui=reason.display_in_ui, form_token=reason.form_token, resolved_placeholder_values=reason.resolved_placeholder_values, ), diff --git a/api/core/app/apps/message_based_app_generator.py b/api/core/app/apps/message_based_app_generator.py index 8c11739d10..e7455c6f13 100644 --- a/api/core/app/apps/message_based_app_generator.py +++ b/api/core/app/apps/message_based_app_generator.py @@ -326,6 +326,8 @@ def _topic_msg_generator( ) -> Generator[Mapping[str, Any], None, None]: last_msg_time = time.time() with topic.subscribe() as sub: + # on_subscribe fires only after the Redis subscription is active. + # This is used to gate task start and reduce pub/sub race for the first event. if on_subscribe is not None: on_subscribe() while True: diff --git a/api/core/app/apps/message_generator.py b/api/core/app/apps/message_generator.py index aeec3289a8..79cb6e7ed4 100644 --- a/api/core/app/apps/message_generator.py +++ b/api/core/app/apps/message_generator.py @@ -44,6 +44,8 @@ def _topic_msg_generator( ) -> Generator[Mapping[str, Any], None, None]: last_msg_time = time.time() with topic.subscribe() as sub: + # on_subscribe fires only after the Redis subscription is active. + # This is used to gate task start and reduce pub/sub race for the first event. if on_subscribe is not None: on_subscribe() while True: diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index ca02a8dbb7..f53624beab 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -277,6 +277,7 @@ class HumanInputRequiredResponse(StreamResponse): form_content: str inputs: Sequence[FormInput] = Field(default_factory=list) actions: Sequence[UserAction] = Field(default_factory=list) + display_in_ui: bool = False form_token: str | None = None resolved_placeholder_values: Mapping[str, Any] = Field(default_factory=dict) diff --git a/api/core/repositories/human_input_reposotiry.py b/api/core/repositories/human_input_reposotiry.py index 8debaf0e4d..925fe9fe03 100644 --- a/api/core/repositories/human_input_reposotiry.py +++ b/api/core/repositories/human_input_reposotiry.py @@ -342,7 +342,7 @@ class HumanInputFormRepositoryImpl: ) session.add(form_model) recipient_models: list[HumanInputFormRecipient] = [] - for delivery in form_config.delivery_methods: + for delivery in params.delivery_methods: delivery_and_recipients = self._delivery_method_to_model( session=session, form_id=form_id, diff --git a/api/core/workflow/entities/pause_reason.py b/api/core/workflow/entities/pause_reason.py index 7baa1b4b9b..1817f13908 100644 --- a/api/core/workflow/entities/pause_reason.py +++ b/api/core/workflow/entities/pause_reason.py @@ -18,6 +18,7 @@ class HumanInputRequired(BaseModel): form_content: str inputs: list[FormInput] = Field(default_factory=list) actions: list[UserAction] = Field(default_factory=list) + display_in_ui: bool = False node_id: str node_title: str @@ -32,10 +33,11 @@ class HumanInputRequired(BaseModel): # Only form inputs with placeholder type `VARIABLE` will be resolved and stored in `resolved_placeholder_values`. resolved_placeholder_values: Mapping[str, Any] = Field(default_factory=dict) - # The `form_token` is the token used to submit the form via webapp. It corresponds to + # The `form_token` is the token used to submit the form via UI surfaces. It corresponds to # `HumanInputFormRecipient.access_token`. # - # This field is `None` if webapp delivery is not set. + # This field is `None` if webapp delivery is not set and not + # in orchestrating mode. form_token: str | None = None diff --git a/api/core/workflow/nodes/human_input/entities.py b/api/core/workflow/nodes/human_input/entities.py index 2950c41637..81b03d1958 100644 --- a/api/core/workflow/nodes/human_input/entities.py +++ b/api/core/workflow/nodes/human_input/entities.py @@ -253,3 +253,9 @@ class FormDefinition(BaseModel): # this is used to store the values of the placeholders placeholder_values: dict[str, Any] = Field(default_factory=dict) + + # node_title records the title of the HumanInput node. + node_title: str | None = None + + # display_in_ui controls whether the form should be displayed in UI surfaces. + display_in_ui: bool | None = None diff --git a/api/core/workflow/nodes/human_input/human_input_node.py b/api/core/workflow/nodes/human_input/human_input_node.py index 9a3310a003..4599927079 100644 --- a/api/core/workflow/nodes/human_input/human_input_node.py +++ b/api/core/workflow/nodes/human_input/human_input_node.py @@ -20,8 +20,8 @@ from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter from extensions.ext_database import db from libs.datetime_utils import naive_utc_now -from .entities import HumanInputNodeData -from .enums import HumanInputFormStatus, PlaceholderType +from .entities import DeliveryChannelConfig, HumanInputNodeData +from .enums import DeliveryMethodType, HumanInputFormStatus, PlaceholderType if TYPE_CHECKING: from core.workflow.entities.graph_init_params import GraphInitParams @@ -154,24 +154,37 @@ class HumanInputNode(Node[HumanInputNodeData]): return resolved_inputs - def _should_require_form_token(self) -> bool: + def _should_require_console_recipient(self) -> bool: if self.invoke_from == InvokeFrom.DEBUGGER: return True if self.invoke_from == InvokeFrom.EXPLORE: return self._node_data.is_webapp_enabled() return False + def _display_in_ui(self) -> bool: + if self.invoke_from == InvokeFrom.DEBUGGER: + return True + return self._node_data.is_webapp_enabled() + + def _effective_delivery_methods(self) -> Sequence[DeliveryChannelConfig]: + enabled_methods = [method for method in self._node_data.delivery_methods if method.enabled] + if self.invoke_from in {InvokeFrom.DEBUGGER, InvokeFrom.EXPLORE}: + return [method for method in enabled_methods if method.type != DeliveryMethodType.WEBAPP] + return enabled_methods + def _human_input_required_event(self, form_entity: HumanInputFormEntity) -> HumanInputRequired: node_data = self._node_data resolved_placeholder_values = self._resolve_inputs() + display_in_ui = self._display_in_ui() form_token = form_entity.web_app_token - if self._should_require_form_token() and form_token is None: - raise AssertionError("Form token should be available for console execution.") + if display_in_ui and form_token is None: + raise AssertionError("Form token should be available for UI execution.") return HumanInputRequired( form_id=form_entity.id, form_content=form_entity.rendered_content, inputs=node_data.inputs, actions=node_data.user_actions, + display_in_ui=display_in_ui, node_id=self.id, node_title=node_data.title, form_token=form_token, @@ -193,13 +206,16 @@ class HumanInputNode(Node[HumanInputNodeData]): repo = self._form_repository form = repo.get_form(self._workflow_execution_id, self.id) if form is None: + display_in_ui = self._display_in_ui() params = FormCreateParams( workflow_execution_id=self._workflow_execution_id, node_id=self.id, form_config=self._node_data, rendered_content=self._render_form_content_before_submission(), + delivery_methods=self._effective_delivery_methods(), + display_in_ui=display_in_ui, resolved_placeholder_values=self._resolve_inputs(), - console_recipient_required=self._should_require_form_token(), + console_recipient_required=self._should_require_console_recipient(), console_creator_account_id=( self.user_id if self.invoke_from in {InvokeFrom.DEBUGGER, InvokeFrom.EXPLORE} else None ), diff --git a/api/core/workflow/repositories/human_input_form_repository.py b/api/core/workflow/repositories/human_input_form_repository.py index e694b70f12..8ceee1aad9 100644 --- a/api/core/workflow/repositories/human_input_form_repository.py +++ b/api/core/workflow/repositories/human_input_form_repository.py @@ -1,10 +1,10 @@ import abc import dataclasses -from collections.abc import Mapping +from collections.abc import Mapping, Sequence from datetime import datetime from typing import Any, Protocol -from core.workflow.nodes.human_input.entities import HumanInputNodeData +from core.workflow.nodes.human_input.entities import DeliveryChannelConfig, HumanInputNodeData from core.workflow.nodes.human_input.enums import HumanInputFormStatus @@ -29,6 +29,10 @@ class FormCreateParams: form_config: HumanInputNodeData rendered_content: str + # Delivery methods already filtered by runtime context (invoke_from). + delivery_methods: Sequence[DeliveryChannelConfig] + # UI display flag computed by runtime context. + display_in_ui: bool # resolved_placeholder_values saves the values for placeholders with # type = VARIABLE. diff --git a/api/models/enums.py b/api/models/enums.py index 8cd3d4cf2a..2bc61120ce 100644 --- a/api/models/enums.py +++ b/api/models/enums.py @@ -36,6 +36,7 @@ class MessageStatus(StrEnum): """ NORMAL = "normal" + PAUSED = "paused" ERROR = "error" diff --git a/api/tests/test_containers_integration_tests/core/repositories/test_human_input_form_repository_impl.py b/api/tests/test_containers_integration_tests/core/repositories/test_human_input_form_repository_impl.py index 269e5c21b1..1b0e0fe165 100644 --- a/api/tests/test_containers_integration_tests/core/repositories/test_human_input_form_repository_impl.py +++ b/api/tests/test_containers_integration_tests/core/repositories/test_human_input_form_repository_impl.py @@ -70,6 +70,8 @@ def _build_form_params(delivery_methods: list[EmailDeliveryMethod]) -> FormCreat node_id="human-input-node", form_config=form_config, rendered_content="

Approve?

", + delivery_methods=delivery_methods, + display_in_ui=False, resolved_placeholder_values={}, ) @@ -180,6 +182,8 @@ class TestHumanInputFormRepositoryImplWithContainers: user_actions=[UserAction(id="approve", title="Approve")], ), rendered_content="

Approve?

", + delivery_methods=[], + display_in_ui=False, resolved_placeholder_values=resolved_values, ) diff --git a/api/tests/test_containers_integration_tests/helpers/execution_extra_content.py b/api/tests/test_containers_integration_tests/helpers/execution_extra_content.py index 1b23673294..f937c2e55b 100644 --- a/api/tests/test_containers_integration_tests/helpers/execution_extra_content.py +++ b/api/tests/test_containers_integration_tests/helpers/execution_extra_content.py @@ -21,6 +21,7 @@ class HumanInputMessageFixture: form: HumanInputForm action_id: str action_text: str + node_title: str def create_human_input_message_fixture(db_session) -> HumanInputMessageFixture: @@ -109,6 +110,7 @@ def create_human_input_message_fixture(db_session) -> HumanInputMessageFixture: action_id = "approve" action_text = "Approve request" + node_title = "Approval" form_definition = FormDefinition( form_content="content", inputs=[], @@ -116,6 +118,8 @@ def create_human_input_message_fixture(db_session) -> HumanInputMessageFixture: rendered_content="Rendered block", timeout=1, timeout_unit=TimeoutUnit.HOUR, + node_title=node_title, + display_in_ui=True, ) form = HumanInputForm( tenant_id=tenant.id, @@ -146,4 +150,5 @@ def create_human_input_message_fixture(db_session) -> HumanInputMessageFixture: form=form, action_id=action_id, action_text=action_text, + node_title=node_title, ) diff --git a/api/tests/test_containers_integration_tests/tasks/test_mail_human_input_delivery_task.py b/api/tests/test_containers_integration_tests/tasks/test_mail_human_input_delivery_task.py index c628dc371f..51c045a924 100644 --- a/api/tests/test_containers_integration_tests/tasks/test_mail_human_input_delivery_task.py +++ b/api/tests/test_containers_integration_tests/tasks/test_mail_human_input_delivery_task.py @@ -92,6 +92,8 @@ def _build_form(db_session_with_containers, tenant, account): node_id="node-1", form_config=node_data, rendered_content="Rendered", + delivery_methods=node_data.delivery_methods, + display_in_ui=False, resolved_placeholder_values={}, ) return repo.create_form(params) diff --git a/api/tests/unit_tests/core/app/apps/test_workflow_pause_events.py b/api/tests/unit_tests/core/app/apps/test_workflow_pause_events.py index 3928968ff9..34e90f457e 100644 --- a/api/tests/unit_tests/core/app/apps/test_workflow_pause_events.py +++ b/api/tests/unit_tests/core/app/apps/test_workflow_pause_events.py @@ -126,6 +126,7 @@ def test_queue_workflow_paused_event_to_stream_responses(): FormInput(type=FormInputType.TEXT_INPUT, output_variable_name="field", placeholder=None), ], actions=[UserAction(id="approve", title="Approve")], + display_in_ui=True, node_id="node-id", node_title="Human Step", form_token="token", @@ -144,6 +145,7 @@ def test_queue_workflow_paused_event_to_stream_responses(): assert pause_resp.data.paused_nodes == ["node-id"] assert pause_resp.data.outputs == {"answer": "value"} assert pause_resp.data.reasons[0]["form_id"] == "form-1" + assert pause_resp.data.reasons[0]["display_in_ui"] is True assert isinstance(responses[0], HumanInputRequiredResponse) hi_resp = responses[0] @@ -152,3 +154,4 @@ def test_queue_workflow_paused_event_to_stream_responses(): assert hi_resp.data.node_title == "Human Step" assert hi_resp.data.inputs[0].output_variable_name == "field" assert hi_resp.data.actions[0].id == "approve" + assert hi_resp.data.display_in_ui is True