feat(api): implement test form delivery & submission logic (vibe-kanban 89cd6a22)

This ensures that user can receive & submit form while using email
delivery test.
This commit is contained in:
QuantumGhost
2026-01-19 09:49:05 +08:00
parent 2db638b992
commit 6bf6bf6a2a
13 changed files with 224 additions and 28 deletions

View File

@ -546,6 +546,10 @@ class HumanInputFormSubmitPayload(BaseModel):
class HumanInputDeliveryTestPayload(BaseModel):
delivery_method_id: str = Field(..., description="Delivery method ID")
inputs: dict[str, Any] = Field(
default_factory=dict,
description="Values used to fill missing upstream variables referenced in form_content",
)
reg(HumanInputFormPreviewPayload)
@ -693,6 +697,7 @@ class WorkflowDraftHumanInputDeliveryTestApi(Resource):
account=current_user,
node_id=node_id,
delivery_method_id=args.delivery_method_id,
inputs=args.inputs,
)
except ValueError as exc:
raise InvalidArgumentError(str(exc))

View File

@ -17,7 +17,11 @@ from core.workflow.nodes.human_input.entities import (
MemberRecipient,
WebAppDeliveryMethod,
)
from core.workflow.nodes.human_input.enums import DeliveryMethodType, HumanInputFormStatus
from core.workflow.nodes.human_input.enums import (
DeliveryMethodType,
HumanInputFormKind,
HumanInputFormStatus,
)
from core.workflow.repositories.human_input_form_repository import (
FormCreateParams,
FormNotFoundError,
@ -132,10 +136,11 @@ class _HumanInputFormEntityImpl(HumanInputFormEntity):
@dataclasses.dataclass(frozen=True)
class HumanInputFormRecord:
form_id: str
workflow_run_id: str
workflow_run_id: str | None
node_id: str
tenant_id: str
app_id: str
form_kind: HumanInputFormKind
definition: FormDefinition
rendered_content: str
expiration_time: datetime
@ -164,6 +169,7 @@ class HumanInputFormRecord:
node_id=form_model.node_id,
tenant_id=form_model.tenant_id,
app_id=form_model.app_id,
form_kind=form_model.form_kind,
definition=FormDefinition.model_validate_json(form_model.form_definition),
rendered_content=form_model.rendered_content,
expiration_time=form_model.expiration_time,
@ -340,6 +346,7 @@ class HumanInputFormRepositoryImpl:
tenant_id=self._tenant_id,
app_id=params.app_id,
workflow_run_id=params.workflow_execution_id,
form_kind=params.form_kind,
node_id=params.node_id,
form_definition=form_definition.model_dump_json(),
rendered_content=params.rendered_content,

View File

@ -10,6 +10,13 @@ class HumanInputFormStatus(enum.StrEnum):
TIMEOUT = enum.auto()
class HumanInputFormKind(enum.StrEnum):
"""Kind of a human input form."""
RUNTIME = enum.auto() # Form created during workflow execution.
DELIVERY_TEST = enum.auto() # Form created for delivery tests.
class DeliveryMethodType(enum.StrEnum):
"""Delivery method types for human input forms."""

View File

@ -5,7 +5,7 @@ from datetime import datetime
from typing import Any, Protocol
from core.workflow.nodes.human_input.entities import DeliveryChannelConfig, HumanInputNodeData
from core.workflow.nodes.human_input.enums import HumanInputFormStatus
from core.workflow.nodes.human_input.enums import HumanInputFormKind, HumanInputFormStatus
class HumanInputError(Exception):
@ -19,7 +19,8 @@ class FormNotFoundError(HumanInputError):
@dataclasses.dataclass
class FormCreateParams:
app_id: str
workflow_execution_id: str
# None when creating a delivery test form; set for runtime forms.
workflow_execution_id: str | None
# node_id is the identifier for a specific
# node in the graph.
@ -40,6 +41,7 @@ class FormCreateParams:
#
# For type = CONSTANT, the value is not stored inside `resolved_placeholder_values`
resolved_placeholder_values: Mapping[str, Any]
form_kind: HumanInputFormKind = HumanInputFormKind.RUNTIME
# Force creating a console-only recipient for submission in Console.
console_recipient_required: bool = False

View File

@ -0,0 +1,40 @@
"""Add form_kind to human_input_forms
Revision ID: 3c1d9e2f1a4b
Revises: 7a1c4d2f9b8e
Create Date: 2026-01-20 00:01:00.000000
"""
from alembic import op
import models as models
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "3c1d9e2f1a4b"
down_revision = "7a1c4d2f9b8e"
branch_labels = None
depends_on = None
def upgrade():
with op.batch_alter_table("human_input_forms", schema=None) as batch_op:
batch_op.add_column(
sa.Column("form_kind", sa.String(length=20), server_default=sa.text("'runtime'"), nullable=False)
)
batch_op.alter_column(
"workflow_run_id",
existing_type=models.types.StringUUID(),
nullable=True,
)
def downgrade():
with op.batch_alter_table("human_input_forms", schema=None) as batch_op:
batch_op.alter_column(
"workflow_run_id",
existing_type=models.types.StringUUID(),
nullable=False,
)
batch_op.drop_column("form_kind")

View File

@ -8,6 +8,7 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship
from core.workflow.nodes.human_input.enums import (
DeliveryMethodType,
HumanInputFormKind,
HumanInputFormStatus,
)
from libs.helper import generate_string
@ -32,7 +33,12 @@ class HumanInputForm(DefaultFieldsMixin, Base):
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
workflow_run_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
workflow_run_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
form_kind: Mapped[HumanInputFormKind] = mapped_column(
EnumText(HumanInputFormKind),
nullable=False,
default=HumanInputFormKind.RUNTIME,
)
# The human input node the current form corresponds to.
node_id: Mapped[str] = mapped_column(sa.String(60), nullable=False)

View File

@ -7,6 +7,7 @@ from typing import Protocol
from sqlalchemy import Engine, select
from sqlalchemy.orm import sessionmaker
from configs import dify_config
from core.workflow.nodes.human_input.entities import (
DeliveryChannelConfig,
EmailDeliveryConfig,
@ -26,6 +27,12 @@ class DeliveryTestStatus(StrEnum):
FAILED = "failed"
@dataclass(frozen=True)
class DeliveryTestEmailRecipient:
email: str
form_token: str
@dataclass(frozen=True)
class DeliveryTestContext:
tenant_id: str
@ -34,6 +41,7 @@ class DeliveryTestContext:
node_title: str | None
rendered_content: str
template_vars: dict[str, str] = field(default_factory=dict)
recipients: list[DeliveryTestEmailRecipient] = field(default_factory=list)
@dataclass(frozen=True)
@ -51,6 +59,15 @@ class DeliveryTestUnsupportedError(DeliveryTestError):
pass
def _build_form_link(token: str | None) -> str | None:
if not token:
return None
base_url = dify_config.APP_WEB_URL
if not base_url:
return None
return f"{base_url.rstrip('/')}/form/{token}"
class DeliveryTestHandler(Protocol):
def supports(self, method: DeliveryChannelConfig) -> bool: ...
@ -219,4 +236,11 @@ class EmailDeliveryTestHandler:
substitutions = {key: value or "" for key, value in raw_values.items()}
if context.template_vars:
substitutions.update({key: value for key, value in context.template_vars.items() if value is not None})
token = next(
(recipient.form_token for recipient in context.recipients if recipient.email == recipient_email),
None,
)
if token:
substitutions["form_token"] = token
substitutions["form_link"] = _build_form_link(token) or ""
return substitutions

View File

@ -14,7 +14,7 @@ from core.workflow.nodes.human_input.entities import (
HumanInputSubmissionValidationError,
validate_human_input_submission,
)
from core.workflow.nodes.human_input.enums import HumanInputFormStatus
from core.workflow.nodes.human_input.enums import HumanInputFormKind, HumanInputFormStatus
from libs.datetime_utils import naive_utc_now
from libs.exception import BaseHTTPException
from models.human_input import RecipientType
@ -39,7 +39,8 @@ class Form:
return self._record.form_id
@property
def workflow_run_id(self) -> str:
def workflow_run_id(self) -> str | None:
"""Workflow run id for runtime forms; None for delivery tests."""
return self._record.workflow_run_id
@property
@ -62,6 +63,10 @@ class Form:
def status(self) -> HumanInputFormStatus:
return self._record.status
@property
def form_kind(self) -> HumanInputFormKind:
return self._record.form_kind
@property
def expiration_time(self):
return self._record.expiration_time
@ -166,6 +171,10 @@ class HumanInputService:
submission_end_user_id=submission_end_user_id,
)
if result.form_kind != HumanInputFormKind.RUNTIME:
return
if result.workflow_run_id is None:
return
self._enqueue_resume(result.workflow_run_id)
def _ensure_form_active(self, form: Form) -> None:

View File

@ -14,6 +14,7 @@ from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.file import File
from core.repositories import DifyCoreRepositoryFactory
from core.repositories.human_input_reposotiry import HumanInputFormRepositoryImpl
from core.variables import Variable
from core.variables.variables import VariableUnion
from core.workflow.entities import GraphInitParams, WorkflowNodeExecution
@ -30,9 +31,11 @@ from core.workflow.nodes.human_input.entities import (
apply_debug_email_recipient,
validate_human_input_submission,
)
from core.workflow.nodes.human_input.enums import HumanInputFormKind
from core.workflow.nodes.human_input.human_input_node import HumanInputNode
from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
from core.workflow.nodes.start.entities import StartNodeData
from core.workflow.repositories.human_input_form_repository import FormCreateParams
from core.workflow.runtime import GraphRuntimeState, VariablePool
from core.workflow.system_variable import SystemVariable
from core.workflow.variable_loader import load_into_variable_pool
@ -45,6 +48,7 @@ from factories.file_factory import build_from_mapping, build_from_mappings
from libs.datetime_utils import naive_utc_now
from models import Account
from models.enums import UserFrom
from models.human_input import HumanInputFormRecipient, RecipientType
from models.model import App, AppMode
from models.tools import WorkflowToolProvider
from models.workflow import Workflow, WorkflowNodeExecutionModel, WorkflowNodeExecutionTriggeredFrom, WorkflowType
@ -58,6 +62,7 @@ from .errors.workflow_service import DraftWorkflowDeletionError, WorkflowInUseEr
from .human_input_delivery_test_service import (
DeliveryTestContext,
DeliveryTestError,
DeliveryTestEmailRecipient,
DeliveryTestUnsupportedError,
HumanInputDeliveryTestService,
)
@ -900,6 +905,7 @@ class WorkflowService:
account: Account,
node_id: str,
delivery_method_id: str,
inputs: Mapping[str, Any] | None = None,
) -> None:
draft_workflow = self.get_draft_workflow(app_model=app_model)
if not draft_workflow:
@ -925,11 +931,27 @@ class WorkflowService:
user_id=account.id or "",
)
rendered_content = self._render_human_input_content_for_test(
variable_pool = self._build_human_input_variable_pool(
app_model=app_model,
workflow=draft_workflow,
node_config=node_config,
manual_inputs=inputs or {},
)
node = self._build_human_input_node(
workflow=draft_workflow,
account=account,
node_config=node_config,
variable_pool=variable_pool,
)
rendered_content = node._render_form_content_before_submission()
resolved_placeholder_values = node._resolve_inputs()
form_id, recipients = self._create_human_input_delivery_test_form(
app_model=app_model,
node_id=node_id,
node_data=node_data,
delivery_method=delivery_method,
rendered_content=rendered_content,
resolved_placeholder_values=resolved_placeholder_values,
)
test_service = HumanInputDeliveryTestService()
context = DeliveryTestContext(
@ -938,6 +960,8 @@ class WorkflowService:
node_id=node_id,
node_title=node_data.title,
rendered_content=rendered_content,
template_vars={"form_id": form_id},
recipients=recipients,
)
try:
test_service.send_test(context=context, method=delivery_method)
@ -957,27 +981,54 @@ class WorkflowService:
return method
return None
def _render_human_input_content_for_test(
def _create_human_input_delivery_test_form(
self,
*,
app_model: App,
workflow: Workflow,
account: Account,
node_config: Mapping[str, Any],
) -> str:
variable_pool = self._build_human_input_variable_pool(
app_model=app_model,
workflow=workflow,
node_config=node_config,
manual_inputs={},
node_id: str,
node_data: HumanInputNodeData,
delivery_method: DeliveryChannelConfig,
rendered_content: str,
resolved_placeholder_values: Mapping[str, Any],
) -> tuple[str, list[DeliveryTestEmailRecipient]]:
repo = HumanInputFormRepositoryImpl(session_factory=db.engine, tenant_id=app_model.tenant_id)
params = FormCreateParams(
app_id=app_model.id,
workflow_execution_id=None,
node_id=node_id,
form_config=node_data,
rendered_content=rendered_content,
delivery_methods=[delivery_method],
display_in_ui=False,
resolved_placeholder_values=resolved_placeholder_values,
form_kind=HumanInputFormKind.DELIVERY_TEST,
)
node = self._build_human_input_node(
workflow=workflow,
account=account,
node_config=node_config,
variable_pool=variable_pool,
)
return node._render_form_content_before_submission()
form_entity = repo.create_form(params)
return form_entity.id, self._load_email_recipients(form_entity.id)
@staticmethod
def _load_email_recipients(form_id: str) -> list[DeliveryTestEmailRecipient]:
with Session(bind=db.engine) as session:
recipients = session.scalars(
select(HumanInputFormRecipient).where(HumanInputFormRecipient.form_id == form_id)
).all()
recipients_data: list[DeliveryTestEmailRecipient] = []
for recipient in recipients:
if recipient.recipient_type not in {RecipientType.EMAIL_MEMBER, RecipientType.EMAIL_EXTERNAL}:
continue
if not recipient.access_token:
continue
try:
payload = json.loads(recipient.recipient_payload)
except Exception:
logger.exception("Failed to parse human input recipient payload for delivery test.")
continue
email = payload.get("email")
if email:
recipients_data.append(
DeliveryTestEmailRecipient(email=email, form_token=recipient.access_token)
)
return recipients_data
def _build_human_input_node(
self,

View File

@ -9,7 +9,7 @@ from configs import dify_config
from core.repositories.human_input_reposotiry import HumanInputFormSubmissionRepository
from core.workflow.enums import WorkflowExecutionStatus
from core.workflow.nodes.human_input.entities import FormDefinition
from core.workflow.nodes.human_input.enums import HumanInputFormStatus, TimeoutUnit
from core.workflow.nodes.human_input.enums import HumanInputFormKind, HumanInputFormStatus, TimeoutUnit
from extensions.ext_database import db
from extensions.ext_storage import storage
from libs.datetime_utils import ensure_naive_utc, naive_utc_now
@ -89,6 +89,9 @@ def check_and_handle_human_input_timeouts(limit: int = 100) -> None:
for form_model in expired_forms:
try:
if form_model.form_kind == HumanInputFormKind.DELIVERY_TEST:
form_repo.mark_timeout(form_id=form_model.id, reason="delivery_test_timeout")
continue
is_global = _is_global_timeout(form_model, global_timeout_seconds)
record = form_repo.mark_timeout(
form_id=form_model.id,

View File

@ -194,6 +194,7 @@ def test_human_input_delivery_test_calls_service(
account=account,
node_id="node-7",
delivery_method_id="delivery-123",
inputs={},
)

View File

@ -15,6 +15,7 @@ from core.workflow.nodes.human_input.entities import (
)
from core.workflow.nodes.human_input.enums import (
FormInputType,
HumanInputFormKind,
HumanInputFormStatus,
TimeoutUnit,
)
@ -42,6 +43,7 @@ def sample_form_record():
node_id="node-id",
tenant_id="tenant-id",
app_id="app-id",
form_kind=HumanInputFormKind.RUNTIME,
definition=FormDefinition(
form_content="hello",
inputs=[],
@ -187,6 +189,28 @@ def test_submit_form_by_token_calls_repository_and_enqueue(sample_form_record, m
enqueue_spy.assert_called_once_with(sample_form_record.workflow_run_id)
def test_submit_form_by_token_skips_enqueue_for_delivery_test(sample_form_record, mock_session_factory, mocker):
session_factory, _ = mock_session_factory
repo = MagicMock(spec=HumanInputFormSubmissionRepository)
test_record = dataclasses.replace(
sample_form_record,
form_kind=HumanInputFormKind.DELIVERY_TEST,
workflow_run_id=None,
)
repo.get_by_token.return_value = test_record
repo.mark_submitted.return_value = test_record
service = HumanInputService(session_factory, form_repository=repo)
enqueue_spy = mocker.patch.object(service, "_enqueue_resume")
service.submit_form_by_token(
recipient_type=RecipientType.STANDALONE_WEB_APP,
form_token="token",
selected_action_id="submit",
form_data={"field": "value"},
)
enqueue_spy.assert_not_called()
def test_submit_form_by_token_passes_submission_user_id(sample_form_record, mock_session_factory, mocker):
session_factory, _ = mock_session_factory
repo = MagicMock(spec=HumanInputFormSubmissionRepository)

View File

@ -92,7 +92,14 @@ def test_human_input_delivery_dispatches_to_test_service(monkeypatch: pytest.Mon
workflow = MagicMock()
workflow.get_node_config_by_id.return_value = node_config
service.get_draft_workflow = MagicMock(return_value=workflow) # type: ignore[method-assign]
service._render_human_input_content_for_test = MagicMock(return_value="rendered") # type: ignore[attr-defined]
service._build_human_input_variable_pool = MagicMock(return_value=MagicMock()) # type: ignore[attr-defined]
node_stub = MagicMock()
node_stub._render_form_content_before_submission.return_value = "rendered"
node_stub._resolve_inputs.return_value = {}
service._build_human_input_node = MagicMock(return_value=node_stub) # type: ignore[attr-defined]
service._create_human_input_delivery_test_form = MagicMock( # type: ignore[attr-defined]
return_value=("form-1", {})
)
test_service_instance = MagicMock()
monkeypatch.setattr(
@ -109,8 +116,11 @@ def test_human_input_delivery_dispatches_to_test_service(monkeypatch: pytest.Mon
account=account,
node_id="node-1",
delivery_method_id=str(delivery_method.id),
inputs={"#node-1.output#": "value"},
)
pool_args = service._build_human_input_variable_pool.call_args.kwargs
assert pool_args["manual_inputs"] == {"#node-1.output#": "value"}
test_service_instance.send_test.assert_called_once()
@ -121,7 +131,14 @@ def test_human_input_delivery_debug_mode_overrides_recipients(monkeypatch: pytes
workflow = MagicMock()
workflow.get_node_config_by_id.return_value = node_config
service.get_draft_workflow = MagicMock(return_value=workflow) # type: ignore[method-assign]
service._render_human_input_content_for_test = MagicMock(return_value="rendered") # type: ignore[attr-defined]
service._build_human_input_variable_pool = MagicMock(return_value=MagicMock()) # type: ignore[attr-defined]
node_stub = MagicMock()
node_stub._render_form_content_before_submission.return_value = "rendered"
node_stub._resolve_inputs.return_value = {}
service._build_human_input_node = MagicMock(return_value=node_stub) # type: ignore[attr-defined]
service._create_human_input_delivery_test_form = MagicMock( # type: ignore[attr-defined]
return_value=("form-1", {})
)
test_service_instance = MagicMock()
monkeypatch.setattr(