WIP: feat(api): implement delivery testing api

This commit is contained in:
QuantumGhost
2026-01-06 08:54:06 +08:00
parent 184f7ab144
commit fb01b91b06
7 changed files with 676 additions and 6 deletions

View File

@ -14,6 +14,7 @@ from controllers.console import console_ns
from controllers.console.app.error import ConversationCompletedError, DraftWorkflowNotExist, DraftWorkflowNotSync
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, edit_permission_required, setup_required
from controllers.web.error import InvalidArgumentError
from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpError
from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
from core.app.apps.base_app_queue_manager import AppQueueManager
@ -532,6 +533,10 @@ class HumanInputSubmitPayload(BaseModel):
action: str
class HumanInputDeliveryTestPayload(BaseModel):
delivery_method_id: str
@console_ns.route("/apps/<uuid:app_id>/advanced-chat/workflows/draft/human-input/nodes/<string:node_id>/form")
class AdvancedChatDraftHumanInputFormApi(Resource):
@console_ns.doc("get_advanced_chat_draft_human_input_form")
@ -662,6 +667,80 @@ class WorkflowDraftHumanInputFormApi(Resource):
return jsonable_encoder(result)
@console_ns.route("/apps/<uuid:app_id>/advanced-chat/workflows/draft/human-input/nodes/<string:node_id>/delivery-test")
class AdvancedChatDraftHumanInputDeliveryTestApi(Resource):
@console_ns.doc("test_advanced_chat_draft_human_input_delivery")
@console_ns.doc(description="Test human input delivery for advanced chat workflow")
@console_ns.doc(params={"app_id": "Application ID", "node_id": "Node ID"})
@console_ns.expect(
console_ns.model(
"AdvancedChatHumanInputDeliveryTestRequest",
{
"delivery_method_id": fields.String(required=True, description="Delivery method ID"),
},
)
)
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT])
@edit_permission_required
def post(self, app_model: App, node_id: str):
"""
Test human input delivery
"""
current_user, _ = current_account_with_tenant()
args = HumanInputDeliveryTestPayload.model_validate(console_ns.payload or {})
workflow_service = WorkflowService()
try:
workflow_service.test_human_input_delivery(
app_model=app_model,
account=current_user,
node_id=node_id,
delivery_method_id=args.delivery_method_id,
)
except ValueError as exc:
raise InvalidArgumentError(str(exc))
return jsonable_encoder({})
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/human-input/nodes/<string:node_id>/delivery-test")
class WorkflowDraftHumanInputDeliveryTestApi(Resource):
@console_ns.doc("test_workflow_draft_human_input_delivery")
@console_ns.doc(description="Test human input delivery for workflow")
@console_ns.doc(params={"app_id": "Application ID", "node_id": "Node ID"})
@console_ns.expect(
console_ns.model(
"WorkflowHumanInputDeliveryTestRequest",
{
"delivery_method_id": fields.String(required=True, description="Delivery method ID"),
},
)
)
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.WORKFLOW])
@edit_permission_required
def post(self, app_model: App, node_id: str):
"""
Test human input delivery
"""
current_user, _ = current_account_with_tenant()
workflow_service = WorkflowService()
args = HumanInputDeliveryTestPayload.model_validate(console_ns.payload or {})
try:
workflow_service.test_human_input_delivery(
app_model=app_model,
account=current_user,
node_id=node_id,
delivery_method_id=args.delivery_method_id,
)
except ValueError as exc:
raise InvalidArgumentError(str(exc))
return jsonable_encoder({})
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/run")
class DraftWorkflowRunApi(Resource):
@console_ns.doc("run_draft_workflow")

View File

@ -0,0 +1,218 @@
from __future__ import annotations
from dataclasses import dataclass, field
from enum import StrEnum
from typing import Protocol
from sqlalchemy import Engine, select
from sqlalchemy.orm import sessionmaker
from core.workflow.nodes.human_input.entities import (
DeliveryChannelConfig,
EmailDeliveryConfig,
EmailDeliveryMethod,
ExternalRecipient,
MemberRecipient,
)
from extensions.ext_database import db
from extensions.ext_mail import mail
from libs.email_template_renderer import render_email_template
from models import Account, TenantAccountJoin
class DeliveryTestStatus(StrEnum):
OK = "ok"
FAILED = "failed"
@dataclass(frozen=True)
class DeliveryTestContext:
tenant_id: str
app_id: str
node_id: str
node_title: str | None
rendered_content: str
template_vars: dict[str, str] = field(default_factory=dict)
@dataclass(frozen=True)
class DeliveryTestResult:
status: DeliveryTestStatus
delivered_to: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
class DeliveryTestError(Exception):
pass
class DeliveryTestUnsupportedError(DeliveryTestError):
pass
class DeliveryTestHandler(Protocol):
def supports(self, method: DeliveryChannelConfig) -> bool: ...
def send_test(
self,
*,
context: DeliveryTestContext,
method: DeliveryChannelConfig,
) -> DeliveryTestResult: ...
class DeliveryTestRegistry:
def __init__(self, handlers: list[DeliveryTestHandler] | None = None) -> None:
self._handlers = list(handlers or [])
def register(self, handler: DeliveryTestHandler) -> None:
self._handlers.append(handler)
def dispatch(
self,
*,
context: DeliveryTestContext,
method: DeliveryChannelConfig,
) -> DeliveryTestResult:
for handler in self._handlers:
if handler.supports(method):
return handler.send_test(context=context, method=method)
raise DeliveryTestUnsupportedError("Delivery method does not support test send.")
@classmethod
def default(cls) -> DeliveryTestRegistry:
return cls([EmailDeliveryTestHandler()])
class HumanInputDeliveryTestService:
def __init__(self, registry: DeliveryTestRegistry | None = None) -> None:
self._registry = registry or DeliveryTestRegistry.default()
def send_test(
self,
*,
context: DeliveryTestContext,
method: DeliveryChannelConfig,
) -> DeliveryTestResult:
return self._registry.dispatch(context=context, method=method)
class EmailDeliveryTestHandler:
def __init__(self, session_factory: sessionmaker | Engine | None = None) -> None:
if session_factory is None:
session_factory = sessionmaker(bind=db.engine)
elif isinstance(session_factory, Engine):
session_factory = sessionmaker(bind=session_factory)
self._session_factory = session_factory
def supports(self, method: DeliveryChannelConfig) -> bool:
return isinstance(method, EmailDeliveryMethod)
def send_test(
self,
*,
context: DeliveryTestContext,
method: DeliveryChannelConfig,
) -> DeliveryTestResult:
if not isinstance(method, EmailDeliveryMethod):
raise DeliveryTestUnsupportedError("Delivery method does not support test send.")
if not mail.is_inited():
raise DeliveryTestError("Mail client is not initialized.")
recipients = self._resolve_recipients(
tenant_id=context.tenant_id,
method=method,
)
if not recipients:
raise DeliveryTestError("No recipients configured for delivery method.")
delivered: list[str] = []
for recipient_email in recipients:
substitutions = self._build_substitutions(
context=context,
recipient_email=recipient_email,
)
subject = render_email_template(method.config.subject, substitutions)
templated_body = EmailDeliveryConfig.replace_url_placeholder(
method.config.body,
substitutions.get("form_link"),
)
body = render_email_template(templated_body, substitutions)
mail.send(
to=recipient_email,
subject=subject,
html=body,
)
delivered.append(recipient_email)
return DeliveryTestResult(status=DeliveryTestStatus.OK, delivered_to=delivered)
def _resolve_recipients(self, *, tenant_id: str, method: EmailDeliveryMethod) -> list[str]:
recipients = method.config.recipients
emails: list[str] = []
member_user_ids: list[str] = []
for recipient in recipients.items:
if isinstance(recipient, MemberRecipient):
member_user_ids.append(recipient.user_id)
elif isinstance(recipient, ExternalRecipient):
if recipient.email:
emails.append(recipient.email)
if recipients.whole_workspace:
member_user_ids = []
member_emails = self._query_workspace_member_emails(tenant_id=tenant_id, user_ids=None)
emails.extend(member_emails.values())
elif member_user_ids:
member_emails = self._query_workspace_member_emails(tenant_id=tenant_id, user_ids=member_user_ids)
for user_id in member_user_ids:
email = member_emails.get(user_id)
if email:
emails.append(email)
return list(dict.fromkeys([email for email in emails if email]))
def _query_workspace_member_emails(
self,
*,
tenant_id: str,
user_ids: list[str] | None,
) -> dict[str, str]:
if user_ids is None:
unique_ids = None
else:
unique_ids = {user_id for user_id in user_ids if user_id}
if not unique_ids:
return {}
stmt = (
select(Account.id, Account.email)
.join(TenantAccountJoin, TenantAccountJoin.account_id == Account.id)
.where(TenantAccountJoin.tenant_id == tenant_id)
)
if unique_ids is not None:
stmt = stmt.where(Account.id.in_(unique_ids))
with self._session_factory() as session:
rows = session.execute(stmt).all()
return dict(rows)
@staticmethod
def _build_substitutions(
*,
context: DeliveryTestContext,
recipient_email: str,
) -> dict[str, str]:
raw_values: dict[str, str | None] = {
"form_id": "",
"node_title": context.node_title,
"workflow_run_id": "",
"form_token": "",
"form_link": "",
"form_content": context.rendered_content,
"recipient_email": recipient_email,
}
substitutions = {key: value or "" for key, value in raw_values.items()}
if context.template_vars:
substitutions.update({key: value for key, value in context.template_vars.items() if value is not None})
return substitutions

View File

@ -23,7 +23,7 @@ from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent, N
from core.workflow.node_events import NodeRunResult
from core.workflow.nodes import NodeType
from core.workflow.nodes.base.node import Node
from core.workflow.nodes.human_input.entities import HumanInputNodeData
from core.workflow.nodes.human_input.entities import DeliveryChannelConfig, HumanInputNodeData
from core.workflow.nodes.human_input.human_input_node import HumanInputNode
from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
from core.workflow.nodes.start.entities import StartNodeData
@ -48,6 +48,12 @@ from services.errors.app import IsDraftWorkflowError, TriggerNodeLimitExceededEr
from services.workflow.workflow_converter import WorkflowConverter
from .errors.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError
from .human_input_delivery_test_service import (
DeliveryTestContext,
DeliveryTestError,
DeliveryTestUnsupportedError,
HumanInputDeliveryTestService,
)
from .workflow_draft_variable_service import DraftVariableSaver, DraftVarLoader, WorkflowDraftVariableService
@ -859,6 +865,87 @@ class WorkflowService:
return outputs
def test_human_input_delivery(
self,
*,
app_model: App,
account: Account,
node_id: str,
delivery_method_id: str,
) -> None:
draft_workflow = self.get_draft_workflow(app_model=app_model)
if not draft_workflow:
raise ValueError("Workflow not initialized")
node_config = draft_workflow.get_node_config_by_id(node_id)
node_type = Workflow.get_node_type_from_node_config(node_config)
if node_type is not NodeType.HUMAN_INPUT:
raise ValueError("Node type must be human-input.")
node_data = HumanInputNodeData.model_validate(node_config.get("data", {}))
delivery_method = self._resolve_human_input_delivery_method(
node_data=node_data,
delivery_method_id=delivery_method_id,
)
if delivery_method is None:
raise ValueError("Delivery method not found.")
if not delivery_method.enabled:
raise ValueError("Delivery method is disabled.")
rendered_content = self._render_human_input_content_for_test(
app_model=app_model,
workflow=draft_workflow,
account=account,
node_config=node_config,
)
test_service = HumanInputDeliveryTestService()
context = DeliveryTestContext(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
node_id=node_id,
node_title=node_data.title,
rendered_content=rendered_content,
)
try:
test_service.send_test(context=context, method=delivery_method)
except DeliveryTestUnsupportedError as exc:
raise ValueError("Delivery method does not support test send.") from exc
except DeliveryTestError as exc:
raise ValueError(str(exc)) from exc
@staticmethod
def _resolve_human_input_delivery_method(
*,
node_data: HumanInputNodeData,
delivery_method_id: str,
) -> DeliveryChannelConfig | None:
for method in node_data.delivery_methods:
if str(method.id) == delivery_method_id:
return method
return None
def _render_human_input_content_for_test(
self,
*,
app_model: App,
workflow: Workflow,
account: Account,
node_config: Mapping[str, Any],
) -> str:
variable_pool = self._build_human_input_variable_pool(
app_model=app_model,
workflow=workflow,
node_config=node_config,
manual_inputs={},
)
node = self._build_human_input_node(
workflow=workflow,
account=account,
node_config=node_config,
variable_pool=variable_pool,
)
return node._render_form_content_before_submission()
def _build_human_input_node(
self,
*,

View File

@ -29,8 +29,8 @@ def _calculate_node_deadline(definition: FormDefinition, created_at, *, start_ti
raise AssertionError("unknown timeout unit.")
def _is_global_timeout(form_model: HumanInputForm, global_timeout_hours: int) -> bool:
if global_timeout_hours <= 0:
def _is_global_timeout(form_model: HumanInputForm, global_timeout_seconds: int) -> bool:
if global_timeout_seconds <= 0:
return False
form_definition = FormDefinition.model_validate_json(form_model.form_definition)
@ -38,7 +38,7 @@ def _is_global_timeout(form_model: HumanInputForm, global_timeout_hours: int) ->
created_at = ensure_naive_utc(form_model.created_at)
expiration_time = ensure_naive_utc(form_model.expiration_time)
node_deadline = _calculate_node_deadline(form_definition, created_at)
global_deadline = created_at + timedelta(hours=global_timeout_hours)
global_deadline = created_at + timedelta(seconds=global_timeout_seconds)
return global_deadline <= node_deadline and expiration_time <= global_deadline
@ -74,7 +74,7 @@ def check_and_handle_human_input_timeouts(limit: int = 100) -> None:
form_repo = HumanInputFormSubmissionRepository(session_factory)
service = HumanInputService(session_factory, form_repository=form_repo)
now = naive_utc_now()
global_timeout_hours = int(getattr(dify_config, "HITL_GLOBAL_TIMEOUT_HOURS", 0) or 0)
global_timeout_seconds = dify_config.HITL_GLOBAL_TIMEOUT_SECONDS
with session_factory() as session:
stmt = (
@ -89,7 +89,7 @@ def check_and_handle_human_input_timeouts(limit: int = 100) -> None:
for form_model in expired_forms:
try:
is_global = _is_global_timeout(form_model, global_timeout_hours)
is_global = _is_global_timeout(form_model, global_timeout_seconds)
record = form_repo.mark_timeout(
form_id=form_model.id,
reason="global_timeout" if is_global else "node_timeout",

View File

@ -0,0 +1,112 @@
import json
import uuid
from unittest.mock import MagicMock
import pytest
from core.workflow.enums import NodeType
from core.workflow.nodes.human_input.entities import (
EmailDeliveryConfig,
EmailDeliveryMethod,
EmailRecipients,
ExternalRecipient,
HumanInputNodeData,
)
from models.account import Account, Tenant, TenantAccountJoin, TenantAccountRole
from models.model import App, AppMode
from models.workflow import Workflow, WorkflowType
from services.workflow_service import WorkflowService
def _create_app_with_draft_workflow(session, *, delivery_method_id: uuid.UUID) -> tuple[App, Account]:
tenant = Tenant(name="Test Tenant")
account = Account(name="Tester", email="tester@example.com")
session.add_all([tenant, account])
session.flush()
session.add(
TenantAccountJoin(
tenant_id=tenant.id,
account_id=account.id,
current=True,
role=TenantAccountRole.OWNER.value,
)
)
app = App(
tenant_id=tenant.id,
name="Test App",
description="",
mode=AppMode.WORKFLOW.value,
icon_type="emoji",
icon="app",
icon_background="#ffffff",
enable_site=True,
enable_api=True,
created_by=account.id,
updated_by=account.id,
)
session.add(app)
session.flush()
email_method = EmailDeliveryMethod(
id=delivery_method_id,
enabled=True,
config=EmailDeliveryConfig(
recipients=EmailRecipients(
whole_workspace=False,
items=[ExternalRecipient(email="recipient@example.com")],
),
subject="Test {{recipient_email}}",
body="Body {{#url#}} {{form_content}}",
),
)
node_data = HumanInputNodeData(
title="Human Input",
delivery_methods=[email_method],
form_content="Hello Human Input",
inputs=[],
user_actions=[],
).model_dump(mode="json")
node_data["type"] = NodeType.HUMAN_INPUT.value
graph = json.dumps({"nodes": [{"id": "human-node", "data": node_data}], "edges": []})
workflow = Workflow.new(
tenant_id=tenant.id,
app_id=app.id,
type=WorkflowType.WORKFLOW.value,
version=Workflow.VERSION_DRAFT,
graph=graph,
features=json.dumps({}),
created_by=account.id,
environment_variables=[],
conversation_variables=[],
rag_pipeline_variables=[],
)
session.add(workflow)
session.commit()
return app, account
def test_human_input_delivery_test_sends_email(
db_session_with_containers,
monkeypatch: pytest.MonkeyPatch,
) -> None:
delivery_method_id = uuid.uuid4()
app, account = _create_app_with_draft_workflow(db_session_with_containers, delivery_method_id=delivery_method_id)
send_mock = MagicMock()
monkeypatch.setattr("services.human_input_delivery_test_service.mail.is_inited", lambda: True)
monkeypatch.setattr("services.human_input_delivery_test_service.mail.send", send_mock)
service = WorkflowService()
service.test_human_input_delivery(
app_model=app,
account=account,
node_id="human-node",
delivery_method_id=str(delivery_method_id),
)
assert send_mock.call_count == 1
assert send_mock.call_args.kwargs["to"] == "recipient@example.com"

View File

@ -10,6 +10,7 @@ from flask import Flask
from controllers.console import wraps as console_wraps
from controllers.console.app import workflow as workflow_module
from controllers.console.app import wraps as app_wraps
from controllers.web.error import InvalidArgumentError
from libs import login as login_lib
from models.account import Account, AccountStatus, TenantAccountRole
from models.model import AppMode
@ -146,6 +147,72 @@ def test_human_input_submit_forwards_payload(app: Flask, monkeypatch: pytest.Mon
)
@dataclass
class DeliveryTestCase:
resource_cls: type
path: str
mode: AppMode
@pytest.mark.parametrize(
"case",
[
DeliveryTestCase(
resource_cls=workflow_module.AdvancedChatDraftHumanInputDeliveryTestApi,
path="/console/api/apps/app-123/advanced-chat/workflows/draft/human-input/nodes/node-7/delivery-test",
mode=AppMode.ADVANCED_CHAT,
),
DeliveryTestCase(
resource_cls=workflow_module.WorkflowDraftHumanInputDeliveryTestApi,
path="/console/api/apps/app-123/workflows/draft/human-input/nodes/node-7/delivery-test",
mode=AppMode.WORKFLOW,
),
],
)
def test_human_input_delivery_test_calls_service(
app: Flask, monkeypatch: pytest.MonkeyPatch, case: DeliveryTestCase
) -> None:
account = _make_account()
app_model = _make_app(case.mode)
_patch_console_guards(monkeypatch, account, app_model)
service_instance = MagicMock()
monkeypatch.setattr(workflow_module, "WorkflowService", MagicMock(return_value=service_instance))
with app.test_request_context(
case.path,
method="POST",
json={"delivery_method_id": "delivery-123"},
):
response = case.resource_cls().post(app_id=app_model.id, node_id="node-7")
assert response == {}
service_instance.test_human_input_delivery.assert_called_once_with(
app_model=app_model,
account=account,
node_id="node-7",
delivery_method_id="delivery-123",
)
def test_human_input_delivery_test_maps_validation_error(app: Flask, monkeypatch: pytest.MonkeyPatch) -> None:
account = _make_account()
app_model = _make_app(AppMode.ADVANCED_CHAT)
_patch_console_guards(monkeypatch, account, app_model)
service_instance = MagicMock()
service_instance.test_human_input_delivery.side_effect = ValueError("bad delivery method")
monkeypatch.setattr(workflow_module, "WorkflowService", MagicMock(return_value=service_instance))
with app.test_request_context(
"/console/api/apps/app-123/advanced-chat/workflows/draft/human-input/nodes/node-1/delivery-test",
method="POST",
json={"delivery_method_id": "bad"},
):
with pytest.raises(InvalidArgumentError):
workflow_module.AdvancedChatDraftHumanInputDeliveryTestApi().post(app_id=app_model.id, node_id="node-1")
def test_human_input_preview_rejects_non_mapping(app: Flask, monkeypatch: pytest.MonkeyPatch) -> None:
account = _make_account()
app_model = _make_app(AppMode.ADVANCED_CHAT)

View File

@ -0,0 +1,107 @@
import uuid
from types import SimpleNamespace
from unittest.mock import MagicMock
import pytest
from core.workflow.enums import NodeType
from core.workflow.nodes.human_input.entities import (
EmailDeliveryConfig,
EmailDeliveryMethod,
EmailRecipients,
ExternalRecipient,
HumanInputNodeData,
)
from services import workflow_service as workflow_service_module
from services.workflow_service import WorkflowService
def _build_node_config(delivery_methods):
node_data = HumanInputNodeData(
title="Human Input",
delivery_methods=delivery_methods,
form_content="Test content",
inputs=[],
user_actions=[],
).model_dump(mode="json")
node_data["type"] = NodeType.HUMAN_INPUT.value
return {"id": "node-1", "data": node_data}
def _make_email_method(enabled: bool = True) -> EmailDeliveryMethod:
return EmailDeliveryMethod(
id=uuid.uuid4(),
enabled=enabled,
config=EmailDeliveryConfig(
recipients=EmailRecipients(
whole_workspace=False,
items=[ExternalRecipient(email="tester@example.com")],
),
subject="Test subject",
body="Test body",
),
)
def test_human_input_delivery_requires_draft_workflow():
service = WorkflowService()
service.get_draft_workflow = MagicMock(return_value=None) # type: ignore[method-assign]
app_model = SimpleNamespace(tenant_id="tenant-1", id="app-1")
account = SimpleNamespace(id="account-1")
with pytest.raises(ValueError, match="Workflow not initialized"):
service.test_human_input_delivery(
app_model=app_model,
account=account,
node_id="node-1",
delivery_method_id="delivery-1",
)
def test_human_input_delivery_rejects_disabled_method():
service = WorkflowService()
delivery_method = _make_email_method(enabled=False)
node_config = _build_node_config([delivery_method])
workflow = MagicMock()
workflow.get_node_config_by_id.return_value = node_config
service.get_draft_workflow = MagicMock(return_value=workflow) # type: ignore[method-assign]
app_model = SimpleNamespace(tenant_id="tenant-1", id="app-1")
account = SimpleNamespace(id="account-1")
with pytest.raises(ValueError, match="Delivery method is disabled"):
service.test_human_input_delivery(
app_model=app_model,
account=account,
node_id="node-1",
delivery_method_id=str(delivery_method.id),
)
def test_human_input_delivery_dispatches_to_test_service(monkeypatch: pytest.MonkeyPatch):
service = WorkflowService()
delivery_method = _make_email_method(enabled=True)
node_config = _build_node_config([delivery_method])
workflow = MagicMock()
workflow.get_node_config_by_id.return_value = node_config
service.get_draft_workflow = MagicMock(return_value=workflow) # type: ignore[method-assign]
service._render_human_input_content_for_test = MagicMock(return_value="rendered") # type: ignore[attr-defined]
test_service_instance = MagicMock()
monkeypatch.setattr(
workflow_service_module,
"HumanInputDeliveryTestService",
MagicMock(return_value=test_service_instance),
)
app_model = SimpleNamespace(tenant_id="tenant-1", id="app-1")
account = SimpleNamespace(id="account-1")
service.test_human_input_delivery(
app_model=app_model,
account=account,
node_id="node-1",
delivery_method_id=str(delivery_method.id),
)
test_service_instance.send_test.assert_called_once()