From 2a7c7e75036ebc30c4396d96affbf5e9c824e06f Mon Sep 17 00:00:00 2001 From: Yansong Zhang <916125788@qq.com> Date: Mon, 18 May 2026 11:50:26 +0800 Subject: [PATCH] chore: align agent backend protocol adapter --- api/clients/agent_backend/__init__.py | 172 ++++------- api/clients/agent_backend/client.py | 127 +++++++- api/clients/agent_backend/dto.py | 290 ------------------ api/clients/agent_backend/errors.py | 66 ++-- api/clients/agent_backend/event_adapter.py | 166 ++++++++++ api/clients/agent_backend/event_parser.py | 64 ---- api/clients/agent_backend/events.py | 158 ---------- api/clients/agent_backend/factory.py | 26 +- api/clients/agent_backend/fake_client.py | 117 +++++++ api/clients/agent_backend/lifecycle.py | 53 ---- api/clients/agent_backend/mock_client.py | 146 --------- api/clients/agent_backend/request_builder.py | 192 ++++++++++++ api/pyproject.toml | 2 +- .../clients/agent_backend/test_client.py | 126 ++++++++ .../clients/agent_backend/test_dto.py | 135 -------- .../agent_backend/test_event_adapter.py | 132 ++++++++ .../agent_backend/test_event_parser.py | 115 ------- .../clients/agent_backend/test_fake_client.py | 66 ++++ .../clients/agent_backend/test_lifecycle.py | 80 ----- .../clients/agent_backend/test_mock_client.py | 75 ----- .../agent_backend/test_request_builder.py | 132 ++++++++ api/uv.lock | 4 +- dify-agent/src/dify_agent/client/_client.py | 42 ++- .../src/dify_agent/protocol/__init__.py | 18 ++ dify-agent/src/dify_agent/protocol/schemas.py | 105 ++++++- .../src/dify_agent/server/routes/runs.py | 21 +- .../local/dify_agent/client/test_client.py | 35 +++ .../protocol/test_protocol_schemas.py | 53 ++++ .../dify_agent/server/test_runs_routes.py | 15 + 29 files changed, 1469 insertions(+), 1264 deletions(-) delete mode 100644 api/clients/agent_backend/dto.py create mode 100644 api/clients/agent_backend/event_adapter.py delete mode 100644 api/clients/agent_backend/event_parser.py delete mode 100644 api/clients/agent_backend/events.py create mode 100644 api/clients/agent_backend/fake_client.py delete mode 100644 api/clients/agent_backend/lifecycle.py delete mode 100644 api/clients/agent_backend/mock_client.py create mode 100644 api/clients/agent_backend/request_builder.py create mode 100644 api/tests/unit_tests/clients/agent_backend/test_client.py delete mode 100644 api/tests/unit_tests/clients/agent_backend/test_dto.py create mode 100644 api/tests/unit_tests/clients/agent_backend/test_event_adapter.py delete mode 100644 api/tests/unit_tests/clients/agent_backend/test_event_parser.py create mode 100644 api/tests/unit_tests/clients/agent_backend/test_fake_client.py delete mode 100644 api/tests/unit_tests/clients/agent_backend/test_lifecycle.py delete mode 100644 api/tests/unit_tests/clients/agent_backend/test_mock_client.py create mode 100644 api/tests/unit_tests/clients/agent_backend/test_request_builder.py diff --git a/api/clients/agent_backend/__init__.py b/api/clients/agent_backend/__init__.py index 729f20a40b..2e3777f61b 100644 --- a/api/clients/agent_backend/__init__.py +++ b/api/clients/agent_backend/__init__.py @@ -1,110 +1,74 @@ -from clients.agent_backend.client import AgentBackendClient -from clients.agent_backend.dto import ( - CONTRACT_VERSION, - AgentExecutionContext, - AgentIdentityKind, - AgentIdentityRef, - AgentInvokeFrom, - AgentLayerConfig, - AgentLayerType, - AgentRuntimeOptions, - CliToolsLayerConfig, - CompositorConfig, - DifyPluginToolsLayerConfig, - FilesystemLayerConfig, - HumanContactsLayerConfig, - MemoryLayerConfig, - OutputSchemaLayerConfig, - PromptLayerConfig, - PromptOrigin, - PromptRole, - ReferenceType, - ResourceRef, - SandboxLayerConfig, - SecretBinding, - SecretsLayerConfig, - SkillsLayerConfig, - WorkflowContextLayerConfig, +"""API-side integration boundary for the Dify Agent backend. + +Public wire DTOs come from ``dify_agent.protocol``. This package only contains +API adapters: request building from Dify product concepts, a thin client wrapper, +event adaptation for future workflow integration, and deterministic fakes. +""" + +from clients.agent_backend.client import AgentBackendRunClient, DifyAgentBackendRunClient +from clients.agent_backend.errors import ( + AgentBackendError, + AgentBackendHTTPError, + AgentBackendRequestBuildError, + AgentBackendRunFailedError, + AgentBackendStreamError, + AgentBackendTransportError, + AgentBackendValidationError, ) -from clients.agent_backend.event_parser import AgentBackendEventParser -from clients.agent_backend.events import ( - AgentBackendEvent, - AgentBackendEventEnvelope, - AgentBackendEventType, - AgentErrorEvent, - AgentFileCreatedEvent, - AgentLifecycleAckEvent, - AgentOutputCreatedEvent, - AgentOutputDeltaEvent, - AgentOutputValidationFailedEvent, - AgentPauseRequestedEvent, - AgentTextCompletedEvent, - AgentTextDeltaEvent, - AgentToolCallDeltaEvent, - AgentToolCallFailedEvent, - AgentToolCallStartedEvent, - AgentToolCallSucceededEvent, +from clients.agent_backend.event_adapter import ( + AgentBackendInternalEvent, + AgentBackendInternalEventType, + AgentBackendRunCancelledInternalEvent, + AgentBackendRunEventAdapter, + AgentBackendRunFailedInternalEvent, + AgentBackendRunPausedInternalEvent, + AgentBackendRunStartedInternalEvent, + AgentBackendRunSucceededInternalEvent, + AgentBackendStreamInternalEvent, ) -from clients.agent_backend.lifecycle import ( - AgentBackendInvokeRequest, - AgentBackendLifecycleAck, - AgentBackendLifecycleRequest, - AgentBackendLifecycleSignal, - AgentLifecycleEvent, - AgentLifecycleReason, +from clients.agent_backend.factory import create_agent_backend_run_client +from clients.agent_backend.fake_client import FakeAgentBackendRunClient, FakeAgentBackendScenario +from clients.agent_backend.request_builder import ( + AGENT_SOUL_PROMPT_LAYER_ID, + DIFY_PLUGIN_CONTEXT_LAYER_ID, + WORKFLOW_NODE_JOB_PROMPT_LAYER_ID, + WORKFLOW_USER_PROMPT_LAYER_ID, + AgentBackendModelConfig, + AgentBackendOutputConfig, + AgentBackendRunRequestBuilder, + AgentBackendWorkflowNodeRunInput, + redact_for_agent_backend_log, ) -from clients.agent_backend.mock_client import MockAgentBackendClient, MockAgentBackendScenario __all__ = [ - "CONTRACT_VERSION", - "AgentBackendClient", - "AgentBackendEvent", - "AgentBackendEventEnvelope", - "AgentBackendEventParser", - "AgentBackendEventType", - "AgentBackendInvokeRequest", - "AgentBackendLifecycleAck", - "AgentBackendLifecycleRequest", - "AgentBackendLifecycleSignal", - "AgentErrorEvent", - "AgentExecutionContext", - "AgentFileCreatedEvent", - "AgentIdentityKind", - "AgentIdentityRef", - "AgentInvokeFrom", - "AgentLayerConfig", - "AgentLayerType", - "AgentLifecycleAckEvent", - "AgentLifecycleEvent", - "AgentLifecycleReason", - "AgentOutputCreatedEvent", - "AgentOutputDeltaEvent", - "AgentOutputValidationFailedEvent", - "AgentPauseRequestedEvent", - "AgentRuntimeOptions", - "AgentTextCompletedEvent", - "AgentTextDeltaEvent", - "AgentToolCallDeltaEvent", - "AgentToolCallFailedEvent", - "AgentToolCallStartedEvent", - "AgentToolCallSucceededEvent", - "CliToolsLayerConfig", - "CompositorConfig", - "DifyPluginToolsLayerConfig", - "FilesystemLayerConfig", - "HumanContactsLayerConfig", - "MemoryLayerConfig", - "MockAgentBackendClient", - "MockAgentBackendScenario", - "OutputSchemaLayerConfig", - "PromptLayerConfig", - "PromptOrigin", - "PromptRole", - "ReferenceType", - "ResourceRef", - "SandboxLayerConfig", - "SecretBinding", - "SecretsLayerConfig", - "SkillsLayerConfig", - "WorkflowContextLayerConfig", + "AGENT_SOUL_PROMPT_LAYER_ID", + "DIFY_PLUGIN_CONTEXT_LAYER_ID", + "WORKFLOW_NODE_JOB_PROMPT_LAYER_ID", + "WORKFLOW_USER_PROMPT_LAYER_ID", + "AgentBackendError", + "AgentBackendHTTPError", + "AgentBackendInternalEvent", + "AgentBackendInternalEventType", + "AgentBackendModelConfig", + "AgentBackendOutputConfig", + "AgentBackendRequestBuildError", + "AgentBackendRunCancelledInternalEvent", + "AgentBackendRunClient", + "AgentBackendRunEventAdapter", + "AgentBackendRunFailedError", + "AgentBackendRunFailedInternalEvent", + "AgentBackendRunPausedInternalEvent", + "AgentBackendRunRequestBuilder", + "AgentBackendRunStartedInternalEvent", + "AgentBackendRunSucceededInternalEvent", + "AgentBackendStreamError", + "AgentBackendStreamInternalEvent", + "AgentBackendTransportError", + "AgentBackendValidationError", + "AgentBackendWorkflowNodeRunInput", + "DifyAgentBackendRunClient", + "FakeAgentBackendRunClient", + "FakeAgentBackendScenario", + "create_agent_backend_run_client", + "redact_for_agent_backend_log", ] diff --git a/api/clients/agent_backend/client.py b/api/clients/agent_backend/client.py index a2fed81abc..1043d87335 100644 --- a/api/clients/agent_backend/client.py +++ b/api/clients/agent_backend/client.py @@ -1,19 +1,126 @@ +"""Synchronous API-side wrapper around the public ``dify-agent`` client. + +``dify-agent`` owns the cross-service DTOs and HTTP/SSE implementation. The API +backend keeps this thin wrapper so workflow code depends on a local protocol, +gets API-native errors, and can use a deterministic fake in tests without +creating another wire contract. +""" + from __future__ import annotations from collections.abc import Iterator from typing import Protocol -from clients.agent_backend.events import AgentBackendEvent -from clients.agent_backend.lifecycle import ( - AgentBackendInvokeRequest, - AgentBackendLifecycleAck, - AgentBackendLifecycleRequest, +from dify_agent.client import ( + DifyAgentClientError, + DifyAgentHTTPError, + DifyAgentStreamError, + DifyAgentTimeoutError, + DifyAgentValidationError, +) +from dify_agent.protocol import ( + CancelRunRequest, + CancelRunResponse, + CreateRunRequest, + CreateRunResponse, + RunEvent, + RunStatusResponse, +) + +from clients.agent_backend.errors import ( + AgentBackendError, + AgentBackendHTTPError, + AgentBackendStreamError, + AgentBackendTransportError, + AgentBackendValidationError, ) -class AgentBackendClient(Protocol): - def invoke(self, request: AgentBackendInvokeRequest) -> Iterator[AgentBackendEvent]: - """Invoke agent backend and stream typed events.""" +class AgentBackendRunClient(Protocol): + """Local boundary used by API workflow integrations to run Agent backend jobs.""" - def send_lifecycle(self, request: AgentBackendLifecycleRequest) -> AgentBackendLifecycleAck: - """Send an out-of-band lifecycle signal.""" + def create_run(self, request: CreateRunRequest) -> CreateRunResponse: + """Create one Agent backend run and return its accepted status.""" + + def cancel_run(self, run_id: str, request: CancelRunRequest | None = None) -> CancelRunResponse: + """Request explicit cancellation for one Agent backend run.""" + + def stream_events(self, run_id: str, *, after: str | None = None) -> Iterator[RunEvent]: + """Yield public ``dify-agent`` run events in stream order.""" + + def wait_run(self, run_id: str, *, timeout_seconds: float | None = None) -> RunStatusResponse: + """Wait for a run to reach a terminal status and return that status.""" + + +class _DifyAgentSyncClient(Protocol): + """Subset of ``dify_agent.client.Client`` used by the API wrapper.""" + + def create_run_sync(self, request: CreateRunRequest) -> CreateRunResponse: + """Create one run synchronously.""" + + def cancel_run_sync(self, run_id: str, request: CancelRunRequest | None = None) -> CancelRunResponse: + """Cancel one run synchronously.""" + + def stream_events_sync(self, run_id: str, *, after: str | None = None) -> Iterator[RunEvent]: + """Stream run events synchronously.""" + + def wait_run_sync(self, run_id: str, *, timeout_seconds: float | None = None) -> RunStatusResponse: + """Wait for terminal run status synchronously.""" + + +class DifyAgentBackendRunClient: + """Adapter from API sync call sites to ``dify_agent.client.Client`` sync methods.""" + + client: _DifyAgentSyncClient + + def __init__(self, client: _DifyAgentSyncClient) -> None: + self.client = client + + def create_run(self, request: CreateRunRequest) -> CreateRunResponse: + """Create one run through ``POST /runs`` and normalize client exceptions.""" + try: + return self.client.create_run_sync(request) + except Exception as exc: + raise _normalize_dify_agent_error(exc) from exc + + def cancel_run(self, run_id: str, request: CancelRunRequest | None = None) -> CancelRunResponse: + """Cancel one run through ``POST /runs/{run_id}/cancel`` and normalize exceptions.""" + try: + return self.client.cancel_run_sync(run_id, request=request) + except Exception as exc: + raise _normalize_dify_agent_error(exc) from exc + + def stream_events(self, run_id: str, *, after: str | None = None) -> Iterator[RunEvent]: + """Stream run events from ``/events/sse`` with the wrapped client's reconnect policy.""" + try: + yield from self.client.stream_events_sync(run_id, after=after) + except Exception as exc: + raise _normalize_dify_agent_error(exc) from exc + + def wait_run(self, run_id: str, *, timeout_seconds: float | None = None) -> RunStatusResponse: + """Poll run status until terminal state and normalize client exceptions.""" + try: + return self.client.wait_run_sync(run_id, timeout_seconds=timeout_seconds) + except Exception as exc: + raise _normalize_dify_agent_error(exc) from exc + + +def _normalize_dify_agent_error(exc: Exception) -> AgentBackendError: + """Map public ``dify-agent`` client errors to API-side integration errors.""" + if isinstance(exc, DifyAgentValidationError): + return AgentBackendValidationError("Agent backend request or response validation failed", detail=exc.detail) + if isinstance(exc, DifyAgentHTTPError): + return AgentBackendHTTPError( + f"Agent backend HTTP {exc.status_code}", + status_code=exc.status_code, + detail=exc.detail, + ) + if isinstance(exc, DifyAgentTimeoutError): + return AgentBackendTransportError(str(exc)) + if isinstance(exc, DifyAgentStreamError): + return AgentBackendStreamError(str(exc)) + if isinstance(exc, DifyAgentClientError): + return AgentBackendTransportError(str(exc)) + if isinstance(exc, AgentBackendError): + return exc + return AgentBackendTransportError(str(exc) or type(exc).__name__) diff --git a/api/clients/agent_backend/dto.py b/api/clients/agent_backend/dto.py deleted file mode 100644 index c456911bf4..0000000000 --- a/api/clients/agent_backend/dto.py +++ /dev/null @@ -1,290 +0,0 @@ -from __future__ import annotations - -from collections.abc import Mapping -from enum import StrEnum -from typing import Annotated, Any, Literal - -from pydantic import BaseModel, ConfigDict, Field, JsonValue, field_validator, model_validator - -CONTRACT_VERSION = "agent-backend.v1" - - -class AgentInvokeFrom(StrEnum): - WORKFLOW_RUN = "workflow_run" - SINGLE_STEP = "single_step" - AGENT_APP = "agent_app" - BABYSIT = "babysit" - FASTEN = "fasten" - - -class AgentLayerType(StrEnum): - WORKFLOW_CONTEXT = "workflow_context" - PROMPT = "prompt" - FILESYSTEM = "filesystem" - SANDBOX = "sandbox" - DIFY_PLUGIN_TOOLS = "dify_plugin_tools" - CLI_TOOLS = "cli_tools" - SKILLS = "skills" - HUMAN_CONTACTS = "human_contacts" - OUTPUT_SCHEMA = "output_schema" - MEMORY = "memory" - SECRETS = "secrets" - - -class LayerLifecycleScope(StrEnum): - INVOCATION = "invocation" - WORKFLOW_RUN = "workflow_run" - AGENT_SESSION = "agent_session" - PERSISTENT_REF = "persistent_ref" - - -class PromptOrigin(StrEnum): - AGENT_SOUL = "agent_soul" - WORKFLOW_NODE_JOB = "workflow_node_job" - BABYSIT_TRANSIENT = "babysit_transient" - FASTEN_CANDIDATE = "fasten_candidate" - - -class PromptRole(StrEnum): - SYSTEM = "system" - USER = "user" - DEVELOPER = "developer" - - -class AgentIdentityKind(StrEnum): - ROSTER_AGENT = "roster_agent" - WORKFLOW_INLINE_AGENT = "workflow_inline_agent" - AGENT_APP = "agent_app" - - -class ReferenceType(StrEnum): - AGENT = "agent" - AGENT_CONFIG_VERSION = "agent_config_version" - CREDENTIAL = "credential" - FILE = "file" - HUMAN = "human" - MEMORY = "memory" - SECRET = "secret" - SKILL = "skill" - TOOL = "tool" - WORKFLOW = "workflow" - WORKFLOW_RUN = "workflow_run" - - -class AgentBackendBaseModel(BaseModel): - model_config = ConfigDict(extra="forbid", populate_by_name=True, use_enum_values=True) - - def model_dump(self, *args: Any, **kwargs: Any) -> dict[str, Any]: - kwargs.setdefault("by_alias", True) - return super().model_dump(*args, **kwargs) - - def model_dump_redacted(self, **kwargs: Any) -> dict[str, Any]: - dumped = self.model_dump(mode="json", **kwargs) - return _redact_mapping(dumped) - - -class AgentExecutionContext(AgentBackendBaseModel): - tenant_id: str - app_id: str | None = None - workflow_id: str | None = None - workflow_run_id: str | None = None - node_id: str | None = None - node_execution_id: str | None = None - conversation_id: str | None = None - agent_id: str | None = None - agent_config_version_id: str | None = None - invoke_from: AgentInvokeFrom - trace_id: str | None = None - - -class ResourceRef(AgentBackendBaseModel): - type: ReferenceType - id: str - name: str | None = None - metadata: Mapping[str, JsonValue] = Field(default_factory=dict) - - -class AgentIdentityRef(AgentBackendBaseModel): - kind: AgentIdentityKind - agent_ref: ResourceRef | None = None - config_version_ref: ResourceRef | None = None - - -class AgentRuntimeOptions(AgentBackendBaseModel): - stream: bool = True - timeout_seconds: float | None = None - debug: bool = False - mock_scenario: str | None = None - - -class BaseAgentLayerConfig(AgentBackendBaseModel): - id: str - lifecycle_scope: LayerLifecycleScope = LayerLifecycleScope.INVOCATION - depends_on: list[str] = Field(default_factory=list) - - -class WorkflowContextLayerConfig(BaseAgentLayerConfig): - type: Literal[AgentLayerType.WORKFLOW_CONTEXT] = AgentLayerType.WORKFLOW_CONTEXT - workflow_ref: ResourceRef | None = None - workflow_run_ref: ResourceRef | None = None - node_id: str | None = None - node_execution_id: str | None = None - variables: Mapping[str, JsonValue] = Field(default_factory=dict) - previous_node_outputs: Mapping[str, JsonValue] = Field(default_factory=dict) - - -class PromptLayerConfig(BaseAgentLayerConfig): - type: Literal[AgentLayerType.PROMPT] = AgentLayerType.PROMPT - origin: PromptOrigin - role: PromptRole - content: str - - -class FilesystemMountConfig(AgentBackendBaseModel): - ref: ResourceRef - mount_point: str - read_only: bool = False - - -class FilesystemLayerConfig(BaseAgentLayerConfig): - type: Literal[AgentLayerType.FILESYSTEM] = AgentLayerType.FILESYSTEM - mounts: list[FilesystemMountConfig] = Field(default_factory=list) - - -class SandboxLayerConfig(BaseAgentLayerConfig): - type: Literal[AgentLayerType.SANDBOX] = AgentLayerType.SANDBOX - provider: str - image: str | None = None - options: Mapping[str, JsonValue] = Field(default_factory=dict) - - -class DifyPluginToolRef(AgentBackendBaseModel): - tool_ref: ResourceRef - credential_ref: ResourceRef | None = None - runtime_parameters: Mapping[str, JsonValue] = Field(default_factory=dict) - - -class DifyPluginToolsLayerConfig(BaseAgentLayerConfig): - type: Literal[AgentLayerType.DIFY_PLUGIN_TOOLS] = AgentLayerType.DIFY_PLUGIN_TOOLS - tools: list[DifyPluginToolRef] = Field(default_factory=list) - - -class CliToolRef(AgentBackendBaseModel): - name: str - command: str - install_command: str | None = None - credential_ref: ResourceRef | None = None - metadata: Mapping[str, JsonValue] = Field(default_factory=dict) - - -class CliToolsLayerConfig(BaseAgentLayerConfig): - type: Literal[AgentLayerType.CLI_TOOLS] = AgentLayerType.CLI_TOOLS - tools: list[CliToolRef] = Field(default_factory=list) - - -class SkillsLayerConfig(BaseAgentLayerConfig): - type: Literal[AgentLayerType.SKILLS] = AgentLayerType.SKILLS - skill_refs: list[ResourceRef] = Field(default_factory=list) - - -class HumanContactRef(AgentBackendBaseModel): - human_ref: ResourceRef - allowed_delivery_methods: list[str] = Field(default_factory=list) - - -class HumanContactsLayerConfig(BaseAgentLayerConfig): - type: Literal[AgentLayerType.HUMAN_CONTACTS] = AgentLayerType.HUMAN_CONTACTS - contacts: list[HumanContactRef] = Field(default_factory=list) - - -class OutputDeclaration(AgentBackendBaseModel): - name: str - type: str - json_schema: Mapping[str, JsonValue] | None = Field(default=None, alias="schema") - required: bool = True - validation_prompt: str | None = None - benchmark_file_ref: ResourceRef | None = None - - -class OutputSchemaLayerConfig(BaseAgentLayerConfig): - type: Literal[AgentLayerType.OUTPUT_SCHEMA] = AgentLayerType.OUTPUT_SCHEMA - outputs: list[OutputDeclaration] = Field(default_factory=list) - - -class MemoryLayerConfig(BaseAgentLayerConfig): - type: Literal[AgentLayerType.MEMORY] = AgentLayerType.MEMORY - strategy_ref: ResourceRef | None = None - scope: str | None = None - options: Mapping[str, JsonValue] = Field(default_factory=dict) - - -class SecretBinding(AgentBackendBaseModel): - secret_ref: ResourceRef - env_name: str - required: bool = True - - @field_validator("secret_ref") - @classmethod - def validate_secret_ref(cls, value: ResourceRef) -> ResourceRef: - if value.type != ReferenceType.SECRET: - raise ValueError("secret_ref must reference a secret") - return value - - -class SecretsLayerConfig(BaseAgentLayerConfig): - type: Literal[AgentLayerType.SECRETS] = AgentLayerType.SECRETS - bindings: list[SecretBinding] = Field(default_factory=list) - - -type AgentLayerConfig = Annotated[ - WorkflowContextLayerConfig - | PromptLayerConfig - | FilesystemLayerConfig - | SandboxLayerConfig - | DifyPluginToolsLayerConfig - | CliToolsLayerConfig - | SkillsLayerConfig - | HumanContactsLayerConfig - | OutputSchemaLayerConfig - | MemoryLayerConfig - | SecretsLayerConfig, - Field(discriminator="type"), -] - - -class CompositorConfig(AgentBackendBaseModel): - contract_version: Literal["agent-backend.v1"] = CONTRACT_VERSION - execution_context: AgentExecutionContext - agent_identity: AgentIdentityRef | None = None - layers: list[AgentLayerConfig] - runtime_options: AgentRuntimeOptions = Field(default_factory=AgentRuntimeOptions) - - @model_validator(mode="after") - def validate_contract(self) -> CompositorConfig: - layer_ids = [layer.id for layer in self.layers] - if len(layer_ids) != len(set(layer_ids)): - raise ValueError("layer ids must be unique") - known_ids = set(layer_ids) - for layer in self.layers: - unknown_deps = set(layer.depends_on) - known_ids - if unknown_deps: - raise ValueError(f"layer '{layer.id}' depends on unknown layer ids: {sorted(unknown_deps)}") - return self - - -_SENSITIVE_KEY_PARTS = ("secret", "credential", "token", "password", "api_key") - - -def _redact_mapping(value: Any) -> Any: - if isinstance(value, Mapping): - redacted: dict[str, Any] = {} - for key, item in value.items(): - key_text = str(key).lower() - if any(part in key_text for part in _SENSITIVE_KEY_PARTS): - redacted[key] = "[REDACTED]" - else: - redacted[key] = _redact_mapping(item) - return redacted - if isinstance(value, list): - return [_redact_mapping(item) for item in value] - return value diff --git a/api/clients/agent_backend/errors.py b/api/clients/agent_backend/errors.py index d828a3869d..ee88c65fa8 100644 --- a/api/clients/agent_backend/errors.py +++ b/api/clients/agent_backend/errors.py @@ -1,31 +1,61 @@ +"""API-side errors for the Dify Agent backend integration. + +The wire protocol and low-level HTTP behaviour are owned by ``dify-agent``. +This module only normalizes those client errors into the API backend's boundary +so workflow/node code does not depend directly on transport-specific exception +classes. +""" + from __future__ import annotations from typing import Any class AgentBackendError(Exception): - """Base error for the agent backend client contract.""" + """Base error for API-side Agent backend integration failures.""" -class AgentBackendContractVersionError(AgentBackendError): - """Raised when an event or DTO uses an unsupported contract version.""" - - -class AgentBackendEventParseError(AgentBackendError): - """Raised when an agent backend event cannot be parsed into a typed event.""" - - def __init__(self, message: str, *, raw_event: Any | None = None): - super().__init__(message) - self.raw_event = raw_event - - -class AgentBackendUnknownEventError(AgentBackendEventParseError): - """Raised when the agent backend emits an event type unknown to this API version.""" +class AgentBackendRequestBuildError(AgentBackendError): + """Raised when Dify product/workflow state cannot be mapped to a run request.""" class AgentBackendTransportError(AgentBackendError): - """Raised for transport-level failures talking to the agent backend.""" + """Raised for timeout or request-level failures talking to Agent backend.""" -class AgentBackendInvocationError(AgentBackendError): - """Raised for invocation-level failures before a typed error event can be emitted.""" +class AgentBackendHTTPError(AgentBackendTransportError): + """Raised for Agent backend HTTP errors after status/detail normalization.""" + + status_code: int + detail: object + + def __init__(self, message: str, *, status_code: int, detail: object) -> None: + self.status_code = status_code + self.detail = detail + super().__init__(message) + + +class AgentBackendValidationError(AgentBackendError): + """Raised for local request validation or Agent backend 422 responses.""" + + detail: object + + def __init__(self, message: str, *, detail: object) -> None: + self.detail = detail + super().__init__(message) + + +class AgentBackendStreamError(AgentBackendError): + """Raised when an Agent backend event stream is malformed or exhausted.""" + + +class AgentBackendRunFailedError(AgentBackendError): + """Raised by callers that choose to translate a terminal failed run into an exception.""" + + run_id: str + detail: Any + + def __init__(self, run_id: str, detail: Any) -> None: + self.run_id = run_id + self.detail = detail + super().__init__(f"Agent backend run failed: {run_id}") diff --git a/api/clients/agent_backend/event_adapter.py b/api/clients/agent_backend/event_adapter.py new file mode 100644 index 0000000000..8150d87fe3 --- /dev/null +++ b/api/clients/agent_backend/event_adapter.py @@ -0,0 +1,166 @@ +"""Adapt public ``dify-agent`` run events into API-internal event semantics. + +The adapter does not define a new cross-service event contract. It consumes +``dify_agent.protocol.RunEvent`` and produces small API-internal models that the +future workflow Agent Node can map to Graphon/AppQueue events in phase 3. +""" + +from __future__ import annotations + +from enum import StrEnum +from typing import Annotated, Literal, cast + +from agenton.compositor import CompositorSessionSnapshot +from dify_agent.protocol import ( + PydanticAIStreamRunEvent, + RunCancelledEvent, + RunEvent, + RunFailedEvent, + RunPausedEvent, + RunStartedEvent, + RunSucceededEvent, +) +from pydantic import BaseModel, ConfigDict, Field, JsonValue, TypeAdapter + +_EVENT_DATA_ADAPTER = TypeAdapter(object) + + +class AgentBackendInternalEventType(StrEnum): + """API-only event labels used before Graphon/AppQueue integration.""" + + RUN_STARTED = "run_started" + STREAM_EVENT = "stream_event" + RUN_PAUSED = "run_paused" + RUN_SUCCEEDED = "run_succeeded" + RUN_FAILED = "run_failed" + RUN_CANCELLED = "run_cancelled" + + +class AgentBackendInternalEventBase(BaseModel): + """Common fields preserved from public Dify Agent run events.""" + + run_id: str + source_event_id: str | None = None + + model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True) + + +class AgentBackendRunStartedInternalEvent(AgentBackendInternalEventBase): + """API-internal marker for a started Agent backend run.""" + + type: Literal[AgentBackendInternalEventType.RUN_STARTED] = AgentBackendInternalEventType.RUN_STARTED + + +class AgentBackendStreamInternalEvent(AgentBackendInternalEventBase): + """API-internal wrapper for one pydantic-ai stream event payload.""" + + type: Literal[AgentBackendInternalEventType.STREAM_EVENT] = AgentBackendInternalEventType.STREAM_EVENT + event_kind: str | None = None + data: JsonValue + + +class AgentBackendRunSucceededInternalEvent(AgentBackendInternalEventBase): + """API-internal terminal success event carrying final output and session state.""" + + type: Literal[AgentBackendInternalEventType.RUN_SUCCEEDED] = AgentBackendInternalEventType.RUN_SUCCEEDED + output: JsonValue + session_snapshot: CompositorSessionSnapshot + + +class AgentBackendRunPausedInternalEvent(AgentBackendInternalEventBase): + """API-internal resumable pause event for human handoff and Babysit flows.""" + + type: Literal[AgentBackendInternalEventType.RUN_PAUSED] = AgentBackendInternalEventType.RUN_PAUSED + reason: str + message: str | None = None + session_snapshot: CompositorSessionSnapshot | None = None + + +class AgentBackendRunFailedInternalEvent(AgentBackendInternalEventBase): + """API-internal terminal failure event carrying the backend-safe error text.""" + + type: Literal[AgentBackendInternalEventType.RUN_FAILED] = AgentBackendInternalEventType.RUN_FAILED + error: str + reason: str | None = None + + +class AgentBackendRunCancelledInternalEvent(AgentBackendInternalEventBase): + """API-internal terminal cancellation event.""" + + type: Literal[AgentBackendInternalEventType.RUN_CANCELLED] = AgentBackendInternalEventType.RUN_CANCELLED + reason: str | None = None + message: str | None = None + + +type AgentBackendInternalEvent = Annotated[ + AgentBackendRunStartedInternalEvent + | AgentBackendStreamInternalEvent + | AgentBackendRunPausedInternalEvent + | AgentBackendRunSucceededInternalEvent + | AgentBackendRunFailedInternalEvent + | AgentBackendRunCancelledInternalEvent, + Field(discriminator="type"), +] + + +class AgentBackendRunEventAdapter: + """Maps public ``dify-agent`` event variants to API-internal event variants.""" + + def adapt(self, event: RunEvent) -> list[AgentBackendInternalEvent]: + """Return zero or more API-internal events derived from one public run event.""" + match event: + case RunStartedEvent(): + return [ + AgentBackendRunStartedInternalEvent( + run_id=event.run_id, + source_event_id=event.id, + ) + ] + case PydanticAIStreamRunEvent(): + data = cast(JsonValue, _EVENT_DATA_ADAPTER.dump_python(event.data, mode="json")) + event_kind = data.get("event_kind") if isinstance(data, dict) else None + return [ + AgentBackendStreamInternalEvent( + run_id=event.run_id, + source_event_id=event.id, + event_kind=event_kind if isinstance(event_kind, str) else None, + data=data, + ) + ] + case RunSucceededEvent(): + return [ + AgentBackendRunSucceededInternalEvent( + run_id=event.run_id, + source_event_id=event.id, + output=event.data.output, + session_snapshot=event.data.session_snapshot, + ) + ] + case RunPausedEvent(): + return [ + AgentBackendRunPausedInternalEvent( + run_id=event.run_id, + source_event_id=event.id, + reason=event.data.reason, + message=event.data.message, + session_snapshot=event.data.session_snapshot, + ) + ] + case RunFailedEvent(): + return [ + AgentBackendRunFailedInternalEvent( + run_id=event.run_id, + source_event_id=event.id, + error=event.data.error, + reason=event.data.reason, + ) + ] + case RunCancelledEvent(): + return [ + AgentBackendRunCancelledInternalEvent( + run_id=event.run_id, + source_event_id=event.id, + reason=event.data.reason, + message=event.data.message, + ) + ] diff --git a/api/clients/agent_backend/event_parser.py b/api/clients/agent_backend/event_parser.py deleted file mode 100644 index 7f4d2b37fc..0000000000 --- a/api/clients/agent_backend/event_parser.py +++ /dev/null @@ -1,64 +0,0 @@ -from __future__ import annotations - -from collections.abc import Iterable, Iterator, Mapping -from typing import Any - -from pydantic import TypeAdapter, ValidationError - -from clients.agent_backend.dto import CONTRACT_VERSION -from clients.agent_backend.errors import ( - AgentBackendContractVersionError, - AgentBackendEventParseError, - AgentBackendUnknownEventError, -) -from clients.agent_backend.events import ( - AgentBackendEvent, - AgentBackendEventEnvelope, - AgentBackendEventType, -) - -_event_adapter = TypeAdapter(AgentBackendEvent) - - -class AgentBackendEventParser: - def parse(self, raw_event: Mapping[str, Any]) -> AgentBackendEvent: - raw_contract_version = raw_event.get("contract_version") - if raw_contract_version != CONTRACT_VERSION: - raise AgentBackendContractVersionError( - f"Unsupported agent backend contract version: {raw_contract_version}" - ) - - raw_event_type = raw_event.get("type") - try: - event_type = AgentBackendEventType(raw_event_type) - except ValueError as exc: - raise AgentBackendUnknownEventError( - f"Unknown agent backend event type: {raw_event_type}", - raw_event=raw_event, - ) from exc - - try: - envelope = AgentBackendEventEnvelope.model_validate({**raw_event, "type": event_type.value}) - except ValidationError as exc: - raise AgentBackendEventParseError("Invalid agent backend event envelope", raw_event=raw_event) from exc - - event_payload = { - "event_id": envelope.event_id, - "sequence": envelope.sequence, - "type": event_type.value, - "created_at": envelope.created_at, - "execution_context": envelope.execution_context, - **envelope.payload, - } - - try: - return _event_adapter.validate_python(event_payload) - except ValidationError as exc: - raise AgentBackendEventParseError( - f"Invalid payload for agent backend event type: {event_type}", - raw_event=raw_event, - ) from exc - - def parse_many(self, raw_events: Iterable[Mapping[str, Any]]) -> Iterator[AgentBackendEvent]: - for raw_event in raw_events: - yield self.parse(raw_event) diff --git a/api/clients/agent_backend/events.py b/api/clients/agent_backend/events.py deleted file mode 100644 index 17b73e1985..0000000000 --- a/api/clients/agent_backend/events.py +++ /dev/null @@ -1,158 +0,0 @@ -from __future__ import annotations - -from enum import StrEnum -from typing import Annotated, Any, Literal - -from pydantic import Field, JsonValue - -from clients.agent_backend.dto import ( - CONTRACT_VERSION, - AgentBackendBaseModel, - AgentExecutionContext, - ResourceRef, -) -from clients.agent_backend.lifecycle import AgentLifecycleEvent, AgentLifecycleReason - - -class AgentBackendEventType(StrEnum): - LIFECYCLE = "lifecycle" - TEXT_DELTA = "text.delta" - TEXT_COMPLETED = "text.completed" - TOOL_CALL_STARTED = "tool_call.started" - TOOL_CALL_DELTA = "tool_call.delta" - TOOL_CALL_SUCCEEDED = "tool_call.succeeded" - TOOL_CALL_FAILED = "tool_call.failed" - FILE_CREATED = "file.created" - OUTPUT_DELTA = "output.delta" - OUTPUT_CREATED = "output.created" - OUTPUT_VALIDATION_FAILED = "output.validation_failed" - ERROR = "error" - PAUSE_REQUESTED = "pause.requested" - - -class AgentBackendEventBase(AgentBackendBaseModel): - event_id: str - sequence: int - created_at: int - execution_context: AgentExecutionContext - - -class AgentLifecycleAckEvent(AgentBackendEventBase): - type: Literal[AgentBackendEventType.LIFECYCLE] = AgentBackendEventType.LIFECYCLE - lifecycle_event: AgentLifecycleEvent - lifecycle_reason: AgentLifecycleReason - message: str | None = None - - -class AgentTextDeltaEvent(AgentBackendEventBase): - type: Literal[AgentBackendEventType.TEXT_DELTA] = AgentBackendEventType.TEXT_DELTA - delta: str - output_name: str = "text" - - -class AgentTextCompletedEvent(AgentBackendEventBase): - type: Literal[AgentBackendEventType.TEXT_COMPLETED] = AgentBackendEventType.TEXT_COMPLETED - text: str - output_name: str = "text" - - -class AgentToolCallStartedEvent(AgentBackendEventBase): - type: Literal[AgentBackendEventType.TOOL_CALL_STARTED] = AgentBackendEventType.TOOL_CALL_STARTED - tool_call_id: str - tool_name: str - parent_id: str | None = None - input: JsonValue | None = None - metadata: dict[str, JsonValue] = Field(default_factory=dict) - - -class AgentToolCallDeltaEvent(AgentBackendEventBase): - type: Literal[AgentBackendEventType.TOOL_CALL_DELTA] = AgentBackendEventType.TOOL_CALL_DELTA - tool_call_id: str - delta: str - - -class AgentToolCallSucceededEvent(AgentBackendEventBase): - type: Literal[AgentBackendEventType.TOOL_CALL_SUCCEEDED] = AgentBackendEventType.TOOL_CALL_SUCCEEDED - tool_call_id: str - output: JsonValue | None = None - metadata: dict[str, JsonValue] = Field(default_factory=dict) - - -class AgentToolCallFailedEvent(AgentBackendEventBase): - type: Literal[AgentBackendEventType.TOOL_CALL_FAILED] = AgentBackendEventType.TOOL_CALL_FAILED - tool_call_id: str - error: str - retryable: bool = False - metadata: dict[str, JsonValue] = Field(default_factory=dict) - - -class AgentFileCreatedEvent(AgentBackendEventBase): - type: Literal[AgentBackendEventType.FILE_CREATED] = AgentBackendEventType.FILE_CREATED - file_ref: ResourceRef - output_name: str | None = None - metadata: dict[str, JsonValue] = Field(default_factory=dict) - - -class AgentOutputDeltaEvent(AgentBackendEventBase): - type: Literal[AgentBackendEventType.OUTPUT_DELTA] = AgentBackendEventType.OUTPUT_DELTA - output_name: str - delta: JsonValue - - -class AgentOutputCreatedEvent(AgentBackendEventBase): - type: Literal[AgentBackendEventType.OUTPUT_CREATED] = AgentBackendEventType.OUTPUT_CREATED - output_name: str - value: JsonValue - metadata: dict[str, JsonValue] = Field(default_factory=dict) - - -class AgentOutputValidationFailedEvent(AgentBackendEventBase): - type: Literal[AgentBackendEventType.OUTPUT_VALIDATION_FAILED] = AgentBackendEventType.OUTPUT_VALIDATION_FAILED - output_name: str - error: str - retryable: bool = True - metadata: dict[str, JsonValue] = Field(default_factory=dict) - - -class AgentErrorEvent(AgentBackendEventBase): - type: Literal[AgentBackendEventType.ERROR] = AgentBackendEventType.ERROR - category: str - code: str - message: str - retryable: bool = False - safe_details: dict[str, JsonValue] = Field(default_factory=dict) - - -class AgentPauseRequestedEvent(AgentBackendEventBase): - type: Literal[AgentBackendEventType.PAUSE_REQUESTED] = AgentBackendEventType.PAUSE_REQUESTED - reason: str - message: str | None = None - metadata: dict[str, JsonValue] = Field(default_factory=dict) - - -type AgentBackendEvent = Annotated[ - AgentLifecycleAckEvent - | AgentTextDeltaEvent - | AgentTextCompletedEvent - | AgentToolCallStartedEvent - | AgentToolCallDeltaEvent - | AgentToolCallSucceededEvent - | AgentToolCallFailedEvent - | AgentFileCreatedEvent - | AgentOutputDeltaEvent - | AgentOutputCreatedEvent - | AgentOutputValidationFailedEvent - | AgentErrorEvent - | AgentPauseRequestedEvent, - Field(discriminator="type"), -] - - -class AgentBackendEventEnvelope(AgentBackendBaseModel): - contract_version: Literal["agent-backend.v1"] = CONTRACT_VERSION - event_id: str - sequence: int - type: AgentBackendEventType - created_at: int - execution_context: AgentExecutionContext - payload: dict[str, Any] diff --git a/api/clients/agent_backend/factory.py b/api/clients/agent_backend/factory.py index ff8943afd9..133eb42b28 100644 --- a/api/clients/agent_backend/factory.py +++ b/api/clients/agent_backend/factory.py @@ -1,12 +1,22 @@ +"""Factories for API-side Agent backend clients.""" + from __future__ import annotations -from clients.agent_backend.client import AgentBackendClient -from clients.agent_backend.mock_client import MockAgentBackendClient, MockAgentBackendScenario +from dify_agent.client import Client + +from clients.agent_backend.client import AgentBackendRunClient, DifyAgentBackendRunClient +from clients.agent_backend.fake_client import FakeAgentBackendRunClient, FakeAgentBackendScenario -def create_agent_backend_client(*, use_mock: bool = True, mock_scenario: str | None = None) -> AgentBackendClient: - if use_mock: - scenario = MockAgentBackendScenario(mock_scenario) if mock_scenario else MockAgentBackendScenario.SUCCESS_TEXT - return MockAgentBackendClient(scenario=scenario) - - raise NotImplementedError("Real agent backend client is not implemented in phase 0") +def create_agent_backend_run_client( + *, + base_url: str | None = None, + use_fake: bool = False, + fake_scenario: str | FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCESS, +) -> AgentBackendRunClient: + """Create the API-side run client without hiding the ``dify-agent`` protocol.""" + if use_fake: + return FakeAgentBackendRunClient(scenario=FakeAgentBackendScenario(fake_scenario)) + if base_url is None: + raise ValueError("base_url is required when creating a real Agent backend client") + return DifyAgentBackendRunClient(Client(base_url=base_url)) diff --git a/api/clients/agent_backend/fake_client.py b/api/clients/agent_backend/fake_client.py new file mode 100644 index 0000000000..6414ddc7b5 --- /dev/null +++ b/api/clients/agent_backend/fake_client.py @@ -0,0 +1,117 @@ +"""Deterministic fake Agent backend client using public ``dify-agent`` events. + +Tests should exercise the same ``RunEvent`` DTOs as the real HTTP client. This +fake therefore replaces the previous custom mock protocol instead of emulating a +separate ``agent-backend.v1`` event stream. +""" + +from __future__ import annotations + +from collections.abc import Iterator +from datetime import UTC, datetime +from enum import StrEnum + +from agenton.compositor import CompositorSessionSnapshot +from dify_agent.protocol import ( + CancelRunRequest, + CancelRunResponse, + CreateRunRequest, + CreateRunResponse, + RunEvent, + RunFailedEvent, + RunFailedEventData, + RunStartedEvent, + RunStatusResponse, + RunSucceededEvent, + RunSucceededEventData, +) + +_FIXED_TIME = datetime(2026, 1, 1, tzinfo=UTC) + + +class FakeAgentBackendScenario(StrEnum): + """Deterministic fake scenarios for API-side integration tests.""" + + SUCCESS = "success" + FAILED = "failed" + + +class FakeAgentBackendRunClient: + """In-memory implementation of ``AgentBackendRunClient`` for unit tests.""" + + scenario: FakeAgentBackendScenario + run_id: str + request: CreateRunRequest | None + + def __init__( + self, + *, + scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCESS, + run_id: str = "fake-run-1", + ) -> None: + self.scenario = scenario + self.run_id = run_id + self.request = None + + def create_run(self, request: CreateRunRequest) -> CreateRunResponse: + """Record the request and return a deterministic accepted response.""" + self.request = request + return CreateRunResponse(run_id=self.run_id, status="running") + + def cancel_run(self, run_id: str, request: CancelRunRequest | None = None) -> CancelRunResponse: + """Return a deterministic cancellation response.""" + del request + return CancelRunResponse(run_id=run_id, status="cancelled") + + def stream_events(self, run_id: str, *, after: str | None = None) -> Iterator[RunEvent]: + """Yield the deterministic public ``RunEvent`` sequence for ``run_id``.""" + for event in self._events(run_id): + if after is not None and event.id is not None and event.id <= after: + continue + yield event + + def wait_run(self, run_id: str, *, timeout_seconds: float | None = None) -> RunStatusResponse: + """Return a deterministic terminal status; timeout is accepted for protocol parity.""" + del timeout_seconds + match self.scenario: + case FakeAgentBackendScenario.SUCCESS: + return RunStatusResponse( + run_id=run_id, + status="succeeded", + created_at=_FIXED_TIME, + updated_at=_FIXED_TIME, + ) + case FakeAgentBackendScenario.FAILED: + return RunStatusResponse( + run_id=run_id, + status="failed", + created_at=_FIXED_TIME, + updated_at=_FIXED_TIME, + error="fake failure", + ) + + def _events(self, run_id: str) -> tuple[RunEvent, ...]: + match self.scenario: + case FakeAgentBackendScenario.SUCCESS: + return ( + RunStartedEvent(id="1-0", run_id=run_id, created_at=_FIXED_TIME), + RunSucceededEvent( + id="2-0", + run_id=run_id, + created_at=_FIXED_TIME, + data=RunSucceededEventData( + output={"text": "hello agent"}, + session_snapshot=CompositorSessionSnapshot(layers=[]), + ), + ), + ) + case FakeAgentBackendScenario.FAILED: + return ( + RunStartedEvent(id="1-0", run_id=run_id, created_at=_FIXED_TIME), + RunFailedEvent( + id="2-0", + run_id=run_id, + created_at=_FIXED_TIME, + data=RunFailedEventData(error="fake failure", reason="unit_test"), + ), + ) diff --git a/api/clients/agent_backend/lifecycle.py b/api/clients/agent_backend/lifecycle.py deleted file mode 100644 index a660e5af0b..0000000000 --- a/api/clients/agent_backend/lifecycle.py +++ /dev/null @@ -1,53 +0,0 @@ -from __future__ import annotations - -from enum import StrEnum - -from pydantic import Field - -from clients.agent_backend.dto import AgentBackendBaseModel, AgentExecutionContext, CompositorConfig - - -class AgentLifecycleEvent(StrEnum): - CREATE = "create" - TMP_LEAVE = "tmp_leave" - REENTER = "reenter" - DELETE = "delete" - - -class AgentLifecycleReason(StrEnum): - WORKFLOW_RUN_START = "workflow_run_start" - WORKFLOW_RUN_FINISH = "workflow_run_finish" - SINGLE_STEP_START = "single_step_start" - BABYSIT_START = "babysit_start" - HUMAN_HANDOFF = "human_handoff" - WORKFLOW_HANDOFF = "workflow_handoff" - RESUME = "resume" - FASTEN_PREVIEW = "fasten_preview" - CANCEL = "cancel" - - -class AgentBackendLifecycleSignal(AgentBackendBaseModel): - event: AgentLifecycleEvent - reason: AgentLifecycleReason - execution_context: AgentExecutionContext - target_layer_ids: list[str] | None = None - idempotency_key: str - - -class AgentBackendInvokeRequest(AgentBackendBaseModel): - compositor_config: CompositorConfig - lifecycle_signals: list[AgentBackendLifecycleSignal] = Field(default_factory=list) - idempotency_key: str - stream: bool = True - - -class AgentBackendLifecycleRequest(AgentBackendBaseModel): - signal: AgentBackendLifecycleSignal - - -class AgentBackendLifecycleAck(AgentBackendBaseModel): - accepted: bool - event: AgentLifecycleEvent - reason: AgentLifecycleReason - idempotency_key: str - message: str | None = None diff --git a/api/clients/agent_backend/mock_client.py b/api/clients/agent_backend/mock_client.py deleted file mode 100644 index 6a3390a30c..0000000000 --- a/api/clients/agent_backend/mock_client.py +++ /dev/null @@ -1,146 +0,0 @@ -from __future__ import annotations - -from collections.abc import Iterator, Mapping -from enum import StrEnum -from typing import Any - -from clients.agent_backend.dto import CONTRACT_VERSION, AgentBackendBaseModel, ReferenceType, ResourceRef -from clients.agent_backend.event_parser import AgentBackendEventParser -from clients.agent_backend.events import AgentBackendEvent, AgentBackendEventType -from clients.agent_backend.lifecycle import ( - AgentBackendInvokeRequest, - AgentBackendLifecycleAck, - AgentBackendLifecycleRequest, - AgentLifecycleEvent, - AgentLifecycleReason, -) - - -class MockAgentBackendScenario(StrEnum): - SUCCESS_TEXT = "success_text" - SUCCESS_FILE = "success_file" - TOOL_CALL = "tool_call" - PAUSE = "pause" - ERROR = "error" - UNKNOWN_EVENT = "unknown_event" - - -class MockAgentBackendClient(AgentBackendBaseModel): - scenario: MockAgentBackendScenario = MockAgentBackendScenario.SUCCESS_TEXT - - def invoke(self, request: AgentBackendInvokeRequest) -> Iterator[AgentBackendEvent]: - parser = AgentBackendEventParser() - scenario = request.compositor_config.runtime_options.mock_scenario or self.scenario - for raw_event in self._raw_events(request, MockAgentBackendScenario(scenario)): - yield parser.parse(raw_event) - - def send_lifecycle(self, request: AgentBackendLifecycleRequest) -> AgentBackendLifecycleAck: - return AgentBackendLifecycleAck( - accepted=True, - event=request.signal.event, - reason=request.signal.reason, - idempotency_key=request.signal.idempotency_key, - ) - - def _raw_events( - self, request: AgentBackendInvokeRequest, scenario: MockAgentBackendScenario - ) -> Iterator[Mapping[str, Any]]: - match scenario: - case MockAgentBackendScenario.SUCCESS_TEXT: - yield self._envelope(request, 1, AgentBackendEventType.LIFECYCLE, self._lifecycle_payload()) - yield self._envelope(request, 2, AgentBackendEventType.TEXT_DELTA, {"delta": "hello "}) - yield self._envelope(request, 3, AgentBackendEventType.TEXT_DELTA, {"delta": "agent"}) - yield self._envelope(request, 4, AgentBackendEventType.TEXT_COMPLETED, {"text": "hello agent"}) - yield self._envelope( - request, - 5, - AgentBackendEventType.OUTPUT_CREATED, - {"output_name": "text", "value": "hello agent"}, - ) - case MockAgentBackendScenario.SUCCESS_FILE: - yield self._envelope(request, 1, AgentBackendEventType.LIFECYCLE, self._lifecycle_payload()) - yield self._envelope( - request, - 2, - AgentBackendEventType.FILE_CREATED, - { - "file_ref": ResourceRef( - type=ReferenceType.FILE, - id="mock-file-1", - name="result.txt", - ).model_dump(mode="json"), - "output_name": "result_file", - }, - ) - yield self._envelope( - request, - 3, - AgentBackendEventType.OUTPUT_CREATED, - {"output_name": "result_file", "value": {"file_id": "mock-file-1"}}, - ) - case MockAgentBackendScenario.TOOL_CALL: - yield self._envelope( - request, - 1, - AgentBackendEventType.TOOL_CALL_STARTED, - {"tool_call_id": "tool-call-1", "tool_name": "web_search", "input": {"query": "Dify"}}, - ) - yield self._envelope( - request, - 2, - AgentBackendEventType.TOOL_CALL_SUCCEEDED, - {"tool_call_id": "tool-call-1", "output": {"result": "ok"}}, - ) - yield self._envelope(request, 3, AgentBackendEventType.TEXT_DELTA, {"delta": "done"}) - yield self._envelope( - request, - 4, - AgentBackendEventType.OUTPUT_CREATED, - {"output_name": "text", "value": "done"}, - ) - case MockAgentBackendScenario.PAUSE: - yield self._envelope(request, 1, AgentBackendEventType.TEXT_DELTA, {"delta": "waiting"}) - yield self._envelope( - request, - 2, - AgentBackendEventType.PAUSE_REQUESTED, - {"reason": "human_handoff", "message": "Need human input"}, - ) - case MockAgentBackendScenario.ERROR: - yield self._envelope( - request, - 1, - AgentBackendEventType.ERROR, - { - "category": "mock", - "code": "mock_error", - "message": "Mock agent backend error", - "retryable": False, - }, - ) - case MockAgentBackendScenario.UNKNOWN_EVENT: - yield self._envelope(request, 1, "unknown.event", {"value": "boom"}) - - def _envelope( - self, - request: AgentBackendInvokeRequest, - sequence: int, - event_type: AgentBackendEventType | str, - payload: Mapping[str, Any], - ) -> dict[str, Any]: - return { - "contract_version": CONTRACT_VERSION, - "event_id": f"{request.idempotency_key}:{sequence}", - "sequence": sequence, - "type": str(event_type), - "created_at": 1_800_000_000_000 + sequence, - "execution_context": request.compositor_config.execution_context.model_dump(mode="json"), - "payload": dict(payload), - } - - @staticmethod - def _lifecycle_payload() -> dict[str, str]: - return { - "lifecycle_event": AgentLifecycleEvent.CREATE.value, - "lifecycle_reason": AgentLifecycleReason.WORKFLOW_RUN_START.value, - } diff --git a/api/clients/agent_backend/request_builder.py b/api/clients/agent_backend/request_builder.py new file mode 100644 index 0000000000..41b9ce059d --- /dev/null +++ b/api/clients/agent_backend/request_builder.py @@ -0,0 +1,192 @@ +"""Build ``dify-agent`` run requests from API-side product concepts. + +This module is intentionally an adapter, not a wire DTO package. The emitted +object is always ``dify_agent.protocol.CreateRunRequest`` so the Agent backend +protocol has a single owner. API-only context such as Agent Soul vs workflow job +prompt is preserved in layer names and metadata until the dedicated product +schemas land in later phases. +""" + +from __future__ import annotations + +from typing import ClassVar + +from agenton.compositor import CompositorSessionSnapshot +from agenton.layers import ExitIntent +from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig +from dify_agent.layers.dify_plugin import ( + DIFY_PLUGIN_LAYER_TYPE_ID, + DIFY_PLUGIN_LLM_LAYER_TYPE_ID, + DifyPluginCredentialValue, + DifyPluginLayerConfig, + DifyPluginLLMLayerConfig, +) +from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerConfig +from dify_agent.protocol import ( + DIFY_AGENT_MODEL_LAYER_ID, + DIFY_AGENT_OUTPUT_LAYER_ID, + CreateRunRequest, + ExecutionContext, + LayerExitSignals, + RunComposition, + RunLayerSpec, + RunPurpose, +) +from pydantic import BaseModel, ConfigDict, Field, JsonValue, field_validator + +AGENT_SOUL_PROMPT_LAYER_ID = "agent_soul_prompt" +WORKFLOW_NODE_JOB_PROMPT_LAYER_ID = "workflow_node_job_prompt" +WORKFLOW_USER_PROMPT_LAYER_ID = "workflow_user_prompt" +DIFY_PLUGIN_CONTEXT_LAYER_ID = "plugin" + + +class AgentBackendModelConfig(BaseModel): + """API-side model/plugin selection before it is converted to Dify Agent layers.""" + + tenant_id: str + plugin_id: str + model_provider: str + model: str + user_id: str | None = None + credentials: dict[str, DifyPluginCredentialValue] = Field(default_factory=dict) + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + +class AgentBackendOutputConfig(BaseModel): + """API-side structured output declaration for the conventional output layer.""" + + json_schema: dict[str, JsonValue] + name: str = "final_result" + description: str | None = None + strict: bool | None = None + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + +class AgentBackendWorkflowNodeRunInput(BaseModel): + """Inputs needed to build the first workflow-node-oriented Agent backend run request.""" + + model: AgentBackendModelConfig + execution_context: ExecutionContext + workflow_node_job_prompt: str + user_prompt: str + agent_soul_prompt: str | None = None + purpose: RunPurpose = "workflow_node" + idempotency_key: str | None = None + output: AgentBackendOutputConfig | None = None + session_snapshot: CompositorSessionSnapshot | None = None + suspend_on_exit: bool = False + metadata: dict[str, JsonValue] = Field(default_factory=dict) + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid", arbitrary_types_allowed=True) + + @field_validator("workflow_node_job_prompt", "user_prompt") + @classmethod + def _reject_blank_prompt(cls, value: str) -> str: + if not value.strip(): + raise ValueError("prompt must not be blank") + return value + + +class AgentBackendRunRequestBuilder: + """Converts API product state into the public ``dify-agent`` run protocol.""" + + def build_for_workflow_node(self, run_input: AgentBackendWorkflowNodeRunInput) -> CreateRunRequest: + """Build a workflow Agent Node run request without defining another wire schema.""" + layers: list[RunLayerSpec] = [] + if run_input.agent_soul_prompt: + layers.append( + RunLayerSpec( + name=AGENT_SOUL_PROMPT_LAYER_ID, + type=PLAIN_PROMPT_LAYER_TYPE_ID, + metadata={**run_input.metadata, "origin": "agent_soul"}, + config=PromptLayerConfig(prefix=run_input.agent_soul_prompt), + ) + ) + + layers.extend( + [ + RunLayerSpec( + name=WORKFLOW_NODE_JOB_PROMPT_LAYER_ID, + type=PLAIN_PROMPT_LAYER_TYPE_ID, + metadata={**run_input.metadata, "origin": "workflow_node_job"}, + config=PromptLayerConfig(prefix=run_input.workflow_node_job_prompt), + ), + RunLayerSpec( + name=WORKFLOW_USER_PROMPT_LAYER_ID, + type=PLAIN_PROMPT_LAYER_TYPE_ID, + metadata={**run_input.metadata, "origin": "workflow_user_prompt"}, + config=PromptLayerConfig(user=run_input.user_prompt), + ), + RunLayerSpec( + name=DIFY_PLUGIN_CONTEXT_LAYER_ID, + type=DIFY_PLUGIN_LAYER_TYPE_ID, + metadata=run_input.metadata, + config=DifyPluginLayerConfig( + tenant_id=run_input.model.tenant_id, + plugin_id=run_input.model.plugin_id, + user_id=run_input.model.user_id, + ), + ), + RunLayerSpec( + name=DIFY_AGENT_MODEL_LAYER_ID, + type=DIFY_PLUGIN_LLM_LAYER_TYPE_ID, + deps={"plugin": DIFY_PLUGIN_CONTEXT_LAYER_ID}, + metadata=run_input.metadata, + config=DifyPluginLLMLayerConfig( + model_provider=run_input.model.model_provider, + model=run_input.model.model, + credentials=run_input.model.credentials, + ), + ), + ] + ) + + if run_input.output is not None: + layers.append( + RunLayerSpec( + name=DIFY_AGENT_OUTPUT_LAYER_ID, + type=DIFY_OUTPUT_LAYER_TYPE_ID, + metadata=run_input.metadata, + config=DifyOutputLayerConfig( + json_schema=run_input.output.json_schema, + name=run_input.output.name, + description=run_input.output.description, + strict=run_input.output.strict, + ), + ) + ) + + return CreateRunRequest( + composition=RunComposition(layers=layers), + execution_context=run_input.execution_context, + purpose=run_input.purpose, + idempotency_key=run_input.idempotency_key, + metadata=run_input.metadata, + session_snapshot=run_input.session_snapshot, + on_exit=LayerExitSignals( + default=ExitIntent.SUSPEND if run_input.suspend_on_exit else ExitIntent.DELETE, + ), + ) + + +_SENSITIVE_KEY_PARTS = ("secret", "credential", "token", "password", "api_key") + + +def redact_for_agent_backend_log(value: object) -> object: + """Return a JSON-like copy with credential-bearing keys redacted for logs/tests.""" + if isinstance(value, BaseModel): + return redact_for_agent_backend_log(value.model_dump(mode="json", warnings=False)) + if isinstance(value, dict): + redacted: dict[object, object] = {} + for key, item in value.items(): + key_text = str(key).lower() + if any(part in key_text for part in _SENSITIVE_KEY_PARTS): + redacted[key] = "[REDACTED]" + else: + redacted[key] = redact_for_agent_backend_log(item) + return redacted + if isinstance(value, list): + return [redact_for_agent_backend_log(item) for item in value] + return value diff --git a/api/pyproject.toml b/api/pyproject.toml index 3ddc1e30a9..e2ebb6c88b 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -9,6 +9,7 @@ dependencies = [ "boto3>=1.43.6", "celery>=5.6.3", "croniter>=6.2.2", + "dify-agent", "flask>=3.1.3,<4.0.0", "flask-cors>=6.0.2", "gevent>=26.4.0", @@ -114,7 +115,6 @@ override-dependencies = [ ############################################################ dev = [ "coverage>=7.13.4", - "dify-agent", "dotenv-linter>=0.7.0", "faker>=40.15.0", "lxml-stubs>=0.5.1", diff --git a/api/tests/unit_tests/clients/agent_backend/test_client.py b/api/tests/unit_tests/clients/agent_backend/test_client.py new file mode 100644 index 0000000000..7e3be42551 --- /dev/null +++ b/api/tests/unit_tests/clients/agent_backend/test_client.py @@ -0,0 +1,126 @@ +from collections.abc import Iterator + +import pytest +from dify_agent.client import DifyAgentHTTPError, DifyAgentStreamError, DifyAgentTimeoutError, DifyAgentValidationError +from dify_agent.protocol import ( + CancelRunRequest, + CancelRunResponse, + CreateRunRequest, + CreateRunResponse, + ExecutionContext, + RunEvent, + RunStartedEvent, + RunStatusResponse, +) + +from clients.agent_backend import ( + AgentBackendHTTPError, + AgentBackendModelConfig, + AgentBackendRunRequestBuilder, + AgentBackendStreamError, + AgentBackendTransportError, + AgentBackendValidationError, + AgentBackendWorkflowNodeRunInput, + DifyAgentBackendRunClient, +) + + +def _request(): + return AgentBackendRunRequestBuilder().build_for_workflow_node( + AgentBackendWorkflowNodeRunInput( + model=AgentBackendModelConfig( + tenant_id="tenant-1", + plugin_id="langgenius/openai", + model_provider="openai", + model="gpt-test", + ), + execution_context=ExecutionContext(tenant_id="tenant-1", invoke_from="workflow_run"), + workflow_node_job_prompt="Do the task.", + user_prompt="hello", + ) + ) + + +class _SuccessfulClient: + def create_run_sync(self, request: CreateRunRequest) -> CreateRunResponse: + assert isinstance(request, CreateRunRequest) + return CreateRunResponse(run_id="run-1", status="running") + + def cancel_run_sync(self, run_id: str, request: CancelRunRequest | None = None) -> CancelRunResponse: + del request + return CancelRunResponse(run_id=run_id, status="cancelled") + + def stream_events_sync(self, run_id: str, *, after: str | None = None) -> Iterator[RunEvent]: + del after + yield RunStartedEvent(id="1-0", run_id=run_id) + + def wait_run_sync(self, run_id: str, *, timeout_seconds: float | None = None) -> RunStatusResponse: + del timeout_seconds + return RunStatusResponse.model_validate( + { + "run_id": run_id, + "status": "succeeded", + "created_at": "2026-01-01T00:00:00+00:00", + "updated_at": "2026-01-01T00:00:00+00:00", + } + ) + + +def test_dify_agent_backend_run_client_delegates_sync_methods(): + client = DifyAgentBackendRunClient(_SuccessfulClient()) + + created = client.create_run(_request()) + cancelled = client.cancel_run(created.run_id) + events = list(client.stream_events(created.run_id)) + status = client.wait_run(created.run_id) + + assert created.run_id == "run-1" + assert cancelled.status == "cancelled" + assert events[0].type == "run_started" + assert status.status == "succeeded" + + +def test_dify_agent_backend_run_client_maps_validation_error(): + class InvalidClient(_SuccessfulClient): + def create_run_sync(self, request: CreateRunRequest) -> CreateRunResponse: + raise DifyAgentValidationError(detail={"field": "bad"}) + + with pytest.raises(AgentBackendValidationError) as exc_info: + DifyAgentBackendRunClient(InvalidClient()).create_run(_request()) + + assert exc_info.value.detail == {"field": "bad"} + + +def test_dify_agent_backend_run_client_maps_http_error(): + class HTTPErrorClient(_SuccessfulClient): + def create_run_sync(self, request: CreateRunRequest) -> CreateRunResponse: + raise DifyAgentHTTPError(status_code=503, detail="unavailable") + + with pytest.raises(AgentBackendHTTPError) as exc_info: + DifyAgentBackendRunClient(HTTPErrorClient()).create_run(_request()) + + assert exc_info.value.status_code == 503 + assert exc_info.value.detail == "unavailable" + + +def test_dify_agent_backend_run_client_maps_timeout_error(): + class TimeoutClient(_SuccessfulClient): + def wait_run_sync(self, run_id: str, *, timeout_seconds: float | None = None) -> RunStatusResponse: + raise DifyAgentTimeoutError("timeout") + + with pytest.raises(AgentBackendTransportError) as exc_info: + DifyAgentBackendRunClient(TimeoutClient()).wait_run("run-1") + + assert str(exc_info.value) == "timeout" + + +def test_dify_agent_backend_run_client_maps_stream_error(): + class StreamClient(_SuccessfulClient): + def stream_events_sync(self, run_id: str, *, after: str | None = None) -> Iterator[RunEvent]: + raise DifyAgentStreamError("bad stream") + yield + + with pytest.raises(AgentBackendStreamError) as exc_info: + list(DifyAgentBackendRunClient(StreamClient()).stream_events("run-1")) + + assert str(exc_info.value) == "bad stream" diff --git a/api/tests/unit_tests/clients/agent_backend/test_dto.py b/api/tests/unit_tests/clients/agent_backend/test_dto.py deleted file mode 100644 index 77282299cb..0000000000 --- a/api/tests/unit_tests/clients/agent_backend/test_dto.py +++ /dev/null @@ -1,135 +0,0 @@ -import pytest -from pydantic import ValidationError - -from clients.agent_backend import ( - CONTRACT_VERSION, - AgentExecutionContext, - AgentInvokeFrom, - AgentLayerType, - CompositorConfig, - PromptLayerConfig, - PromptOrigin, - PromptRole, - ReferenceType, - ResourceRef, - SecretBinding, - SecretsLayerConfig, -) - - -def _execution_context() -> AgentExecutionContext: - return AgentExecutionContext( - tenant_id="tenant-1", - app_id="app-1", - workflow_id="workflow-1", - workflow_run_id="workflow-run-1", - node_id="node-1", - node_execution_id="node-execution-1", - invoke_from=AgentInvokeFrom.WORKFLOW_RUN, - ) - - -def test_compositor_config_serializes_contract_shape(): - config = CompositorConfig( - execution_context=_execution_context(), - layers=[ - PromptLayerConfig( - id="agent-soul-prompt", - origin=PromptOrigin.AGENT_SOUL, - role=PromptRole.SYSTEM, - content="You are a helpful agent.", - ), - PromptLayerConfig( - id="workflow-job-prompt", - origin=PromptOrigin.WORKFLOW_NODE_JOB, - role=PromptRole.USER, - content="Review the previous node output.", - depends_on=["agent-soul-prompt"], - ), - ], - ) - - dumped = config.model_dump(mode="json") - - assert dumped["contract_version"] == CONTRACT_VERSION - assert dumped["execution_context"]["tenant_id"] == "tenant-1" - assert dumped["layers"][0]["type"] == AgentLayerType.PROMPT - assert dumped["layers"][0]["origin"] == PromptOrigin.AGENT_SOUL - assert dumped["layers"][1]["depends_on"] == ["agent-soul-prompt"] - - -def test_compositor_config_rejects_duplicate_layer_ids(): - with pytest.raises(ValidationError, match="layer ids must be unique"): - CompositorConfig( - execution_context=_execution_context(), - layers=[ - PromptLayerConfig( - id="prompt", - origin=PromptOrigin.AGENT_SOUL, - role=PromptRole.SYSTEM, - content="one", - ), - PromptLayerConfig( - id="prompt", - origin=PromptOrigin.WORKFLOW_NODE_JOB, - role=PromptRole.USER, - content="two", - ), - ], - ) - - -def test_compositor_config_rejects_unknown_layer_dependency(): - with pytest.raises(ValidationError, match="depends on unknown layer ids"): - CompositorConfig( - execution_context=_execution_context(), - layers=[ - PromptLayerConfig( - id="prompt", - origin=PromptOrigin.WORKFLOW_NODE_JOB, - role=PromptRole.USER, - content="two", - depends_on=["missing"], - ), - ], - ) - - -def test_secret_binding_rejects_plaintext_secret_payload(): - with pytest.raises(ValidationError): - SecretBinding.model_validate( - { - "secret_ref": {"type": "secret", "id": "secret-1"}, - "env_name": "GITHUB_TOKEN", - "value": "plaintext-token", - } - ) - - -def test_secret_binding_requires_secret_ref_type(): - with pytest.raises(ValidationError, match="secret_ref must reference a secret"): - SecretBinding( - secret_ref=ResourceRef(type=ReferenceType.CREDENTIAL, id="credential-1"), - env_name="GITHUB_TOKEN", - ) - - -def test_redacted_dump_hides_secret_and_credential_refs(): - config = CompositorConfig( - execution_context=_execution_context(), - layers=[ - SecretsLayerConfig( - id="secrets", - bindings=[ - SecretBinding( - secret_ref=ResourceRef(type=ReferenceType.SECRET, id="secret-1"), - env_name="GITHUB_TOKEN", - ) - ], - ) - ], - ) - - redacted = config.model_dump_redacted() - - assert redacted["layers"][0]["bindings"][0]["secret_ref"] == "[REDACTED]" diff --git a/api/tests/unit_tests/clients/agent_backend/test_event_adapter.py b/api/tests/unit_tests/clients/agent_backend/test_event_adapter.py new file mode 100644 index 0000000000..79f7d14d31 --- /dev/null +++ b/api/tests/unit_tests/clients/agent_backend/test_event_adapter.py @@ -0,0 +1,132 @@ +from agenton.compositor import CompositorSessionSnapshot +from dify_agent.protocol import ( + PydanticAIStreamRunEvent, + RunCancelledEvent, + RunCancelledEventData, + RunFailedEvent, + RunFailedEventData, + RunPausedEvent, + RunPausedEventData, + RunStartedEvent, + RunSucceededEvent, + RunSucceededEventData, +) +from pydantic_ai.messages import FinalResultEvent + +from clients.agent_backend import ( + AgentBackendInternalEventType, + AgentBackendRunCancelledInternalEvent, + AgentBackendRunEventAdapter, + AgentBackendRunFailedInternalEvent, + AgentBackendRunPausedInternalEvent, + AgentBackendRunStartedInternalEvent, + AgentBackendRunSucceededInternalEvent, + AgentBackendStreamInternalEvent, +) + + +def test_event_adapter_maps_run_started(): + adapted = AgentBackendRunEventAdapter().adapt(RunStartedEvent(id="1-0", run_id="run-1")) + + assert adapted == [ + AgentBackendRunStartedInternalEvent( + run_id="run-1", + source_event_id="1-0", + ) + ] + + +def test_event_adapter_maps_pydantic_ai_stream_event(): + adapted = AgentBackendRunEventAdapter().adapt( + PydanticAIStreamRunEvent( + id="2-0", + run_id="run-1", + data=FinalResultEvent(tool_name=None, tool_call_id=None), + ) + ) + + assert len(adapted) == 1 + event = adapted[0] + assert isinstance(event, AgentBackendStreamInternalEvent) + assert event.type == AgentBackendInternalEventType.STREAM_EVENT + assert event.event_kind == "final_result" + assert event.data["event_kind"] == "final_result" + + +def test_event_adapter_maps_run_succeeded_to_final_output(): + snapshot = CompositorSessionSnapshot(layers=[]) + adapted = AgentBackendRunEventAdapter().adapt( + RunSucceededEvent( + id="3-0", + run_id="run-1", + data=RunSucceededEventData(output={"summary": "done"}, session_snapshot=snapshot), + ) + ) + + assert adapted == [ + AgentBackendRunSucceededInternalEvent( + run_id="run-1", + source_event_id="3-0", + output={"summary": "done"}, + session_snapshot=snapshot, + ) + ] + + +def test_event_adapter_maps_run_failed_to_failed_result(): + adapted = AgentBackendRunEventAdapter().adapt( + RunFailedEvent( + id="4-0", + run_id="run-1", + data=RunFailedEventData(error="boom", reason="runtime"), + ) + ) + + assert adapted == [ + AgentBackendRunFailedInternalEvent( + run_id="run-1", + source_event_id="4-0", + error="boom", + reason="runtime", + ) + ] + + +def test_event_adapter_maps_run_paused_to_resumable_pause(): + snapshot = CompositorSessionSnapshot(layers=[]) + adapted = AgentBackendRunEventAdapter().adapt( + RunPausedEvent( + id="5-0", + run_id="run-1", + data=RunPausedEventData(reason="human_handoff", message="Need review", session_snapshot=snapshot), + ) + ) + + assert adapted == [ + AgentBackendRunPausedInternalEvent( + run_id="run-1", + source_event_id="5-0", + reason="human_handoff", + message="Need review", + session_snapshot=snapshot, + ) + ] + + +def test_event_adapter_maps_run_cancelled_to_terminal_cancelled(): + adapted = AgentBackendRunEventAdapter().adapt( + RunCancelledEvent( + id="6-0", + run_id="run-1", + data=RunCancelledEventData(reason="user_cancelled", message="Stopped by user"), + ) + ) + + assert adapted == [ + AgentBackendRunCancelledInternalEvent( + run_id="run-1", + source_event_id="6-0", + reason="user_cancelled", + message="Stopped by user", + ) + ] diff --git a/api/tests/unit_tests/clients/agent_backend/test_event_parser.py b/api/tests/unit_tests/clients/agent_backend/test_event_parser.py deleted file mode 100644 index 4ed9ec27ce..0000000000 --- a/api/tests/unit_tests/clients/agent_backend/test_event_parser.py +++ /dev/null @@ -1,115 +0,0 @@ -import pytest - -from clients.agent_backend import ( - CONTRACT_VERSION, - AgentBackendEventParser, - AgentBackendEventType, - AgentErrorEvent, - AgentExecutionContext, - AgentFileCreatedEvent, - AgentInvokeFrom, - AgentLifecycleAckEvent, - AgentOutputCreatedEvent, - AgentOutputDeltaEvent, - AgentOutputValidationFailedEvent, - AgentPauseRequestedEvent, - AgentTextCompletedEvent, - AgentTextDeltaEvent, - AgentToolCallDeltaEvent, - AgentToolCallFailedEvent, - AgentToolCallStartedEvent, - AgentToolCallSucceededEvent, -) -from clients.agent_backend.errors import ( - AgentBackendContractVersionError, - AgentBackendEventParseError, - AgentBackendUnknownEventError, -) - - -def _context_dict(): - return AgentExecutionContext(tenant_id="tenant-1", invoke_from=AgentInvokeFrom.WORKFLOW_RUN).model_dump(mode="json") - - -def _raw_event(event_type: str, payload: dict, sequence: int = 1) -> dict: - return { - "contract_version": CONTRACT_VERSION, - "event_id": f"event-{sequence}", - "sequence": sequence, - "type": event_type, - "created_at": 1_800_000_000_000 + sequence, - "execution_context": _context_dict(), - "payload": payload, - } - - -@pytest.mark.parametrize( - ("event_type", "payload", "expected_cls"), - [ - ( - AgentBackendEventType.LIFECYCLE, - {"lifecycle_event": "create", "lifecycle_reason": "workflow_run_start"}, - AgentLifecycleAckEvent, - ), - (AgentBackendEventType.TEXT_DELTA, {"delta": "hello"}, AgentTextDeltaEvent), - (AgentBackendEventType.TEXT_COMPLETED, {"text": "hello"}, AgentTextCompletedEvent), - ( - AgentBackendEventType.TOOL_CALL_STARTED, - {"tool_call_id": "tool-1", "tool_name": "web_search"}, - AgentToolCallStartedEvent, - ), - (AgentBackendEventType.TOOL_CALL_DELTA, {"tool_call_id": "tool-1", "delta": "chunk"}, AgentToolCallDeltaEvent), - ( - AgentBackendEventType.TOOL_CALL_SUCCEEDED, - {"tool_call_id": "tool-1", "output": {"ok": True}}, - AgentToolCallSucceededEvent, - ), - ( - AgentBackendEventType.TOOL_CALL_FAILED, - {"tool_call_id": "tool-1", "error": "failed"}, - AgentToolCallFailedEvent, - ), - ( - AgentBackendEventType.FILE_CREATED, - {"file_ref": {"type": "file", "id": "file-1"}}, - AgentFileCreatedEvent, - ), - (AgentBackendEventType.OUTPUT_DELTA, {"output_name": "summary", "delta": "part"}, AgentOutputDeltaEvent), - (AgentBackendEventType.OUTPUT_CREATED, {"output_name": "summary", "value": "done"}, AgentOutputCreatedEvent), - ( - AgentBackendEventType.OUTPUT_VALIDATION_FAILED, - {"output_name": "report", "error": "not a pdf"}, - AgentOutputValidationFailedEvent, - ), - ( - AgentBackendEventType.ERROR, - {"category": "runtime", "code": "boom", "message": "failed"}, - AgentErrorEvent, - ), - (AgentBackendEventType.PAUSE_REQUESTED, {"reason": "human_handoff"}, AgentPauseRequestedEvent), - ], -) -def test_event_parser_parses_all_known_event_types(event_type, payload, expected_cls): - event = AgentBackendEventParser().parse(_raw_event(str(event_type), payload)) - - assert isinstance(event, expected_cls) - assert event.event_id == "event-1" - assert event.execution_context.tenant_id == "tenant-1" - - -def test_event_parser_rejects_unknown_event_type(): - with pytest.raises(AgentBackendUnknownEventError): - AgentBackendEventParser().parse(_raw_event("unknown.event", {"value": "boom"})) - - -def test_event_parser_rejects_unknown_contract_version(): - raw_event = _raw_event("text.delta", {"delta": "hello"}) - raw_event["contract_version"] = "agent-backend.v999" - - with pytest.raises(AgentBackendContractVersionError): - AgentBackendEventParser().parse(raw_event) - - -def test_event_parser_rejects_invalid_payload_shape(): - with pytest.raises(AgentBackendEventParseError): - AgentBackendEventParser().parse(_raw_event("text.delta", {"text": "wrong"})) diff --git a/api/tests/unit_tests/clients/agent_backend/test_fake_client.py b/api/tests/unit_tests/clients/agent_backend/test_fake_client.py new file mode 100644 index 0000000000..80b398988a --- /dev/null +++ b/api/tests/unit_tests/clients/agent_backend/test_fake_client.py @@ -0,0 +1,66 @@ +from dify_agent.protocol import ExecutionContext + +from clients.agent_backend import ( + AgentBackendModelConfig, + AgentBackendRunRequestBuilder, + AgentBackendWorkflowNodeRunInput, + FakeAgentBackendRunClient, + FakeAgentBackendScenario, +) + + +def _request(): + return AgentBackendRunRequestBuilder().build_for_workflow_node( + AgentBackendWorkflowNodeRunInput( + model=AgentBackendModelConfig( + tenant_id="tenant-1", + plugin_id="langgenius/openai", + model_provider="openai", + model="gpt-test", + ), + execution_context=ExecutionContext(tenant_id="tenant-1", invoke_from="workflow_run"), + workflow_node_job_prompt="Do the task.", + user_prompt="hello", + ) + ) + + +def test_fake_client_stream_is_deterministic(): + client = FakeAgentBackendRunClient() + request = _request() + + created = client.create_run(request) + first = [event.model_dump(mode="json") for event in client.stream_events(created.run_id)] + second = [event.model_dump(mode="json") for event in client.stream_events(created.run_id)] + + assert created.run_id == "fake-run-1" + assert client.request is request + assert first == second + assert [event["type"] for event in first] == ["run_started", "run_succeeded"] + assert first[-1]["data"]["output"] == {"text": "hello agent"} + + +def test_fake_client_stream_honors_cursor(): + events = list(FakeAgentBackendRunClient().stream_events("fake-run-1", after="1-0")) + + assert len(events) == 1 + assert events[0].type == "run_succeeded" + + +def test_fake_client_failed_scenario_returns_failed_status_and_event(): + client = FakeAgentBackendRunClient(scenario=FakeAgentBackendScenario.FAILED) + + status = client.wait_run("fake-run-1") + events = list(client.stream_events("fake-run-1")) + + assert status.status == "failed" + assert status.error == "fake failure" + assert events[-1].type == "run_failed" + assert events[-1].data.error == "fake failure" + + +def test_fake_client_cancel_run_returns_cancelled_status(): + cancelled = FakeAgentBackendRunClient().cancel_run("fake-run-1") + + assert cancelled.run_id == "fake-run-1" + assert cancelled.status == "cancelled" diff --git a/api/tests/unit_tests/clients/agent_backend/test_lifecycle.py b/api/tests/unit_tests/clients/agent_backend/test_lifecycle.py deleted file mode 100644 index 0cebbdc0ad..0000000000 --- a/api/tests/unit_tests/clients/agent_backend/test_lifecycle.py +++ /dev/null @@ -1,80 +0,0 @@ -from clients.agent_backend import ( - AgentBackendLifecycleAck, - AgentBackendLifecycleRequest, - AgentBackendLifecycleSignal, - AgentExecutionContext, - AgentInvokeFrom, - AgentLifecycleEvent, - AgentLifecycleReason, - MockAgentBackendClient, -) - - -def _execution_context(invoke_from: AgentInvokeFrom = AgentInvokeFrom.WORKFLOW_RUN) -> AgentExecutionContext: - return AgentExecutionContext(tenant_id="tenant-1", invoke_from=invoke_from) - - -def test_lifecycle_signal_is_idempotent_shape(): - signal = AgentBackendLifecycleSignal( - event=AgentLifecycleEvent.CREATE, - reason=AgentLifecycleReason.WORKFLOW_RUN_START, - execution_context=_execution_context(), - idempotency_key="workflow-run-1:create", - ) - - dumped = signal.model_dump(mode="json") - - assert dumped == { - "event": "create", - "reason": "workflow_run_start", - "execution_context": { - "tenant_id": "tenant-1", - "app_id": None, - "workflow_id": None, - "workflow_run_id": None, - "node_id": None, - "node_execution_id": None, - "conversation_id": None, - "agent_id": None, - "agent_config_version_id": None, - "invoke_from": "workflow_run", - "trace_id": None, - }, - "target_layer_ids": None, - "idempotency_key": "workflow-run-1:create", - } - - -def test_lifecycle_reason_mapping_covers_phase0_scenarios(): - expected = { - AgentLifecycleReason.WORKFLOW_RUN_START, - AgentLifecycleReason.WORKFLOW_RUN_FINISH, - AgentLifecycleReason.SINGLE_STEP_START, - AgentLifecycleReason.BABYSIT_START, - AgentLifecycleReason.HUMAN_HANDOFF, - AgentLifecycleReason.WORKFLOW_HANDOFF, - AgentLifecycleReason.RESUME, - AgentLifecycleReason.FASTEN_PREVIEW, - AgentLifecycleReason.CANCEL, - } - - assert set(AgentLifecycleReason) == expected - - -def test_mock_client_accepts_lifecycle_signal(): - client = MockAgentBackendClient() - signal = AgentBackendLifecycleSignal( - event=AgentLifecycleEvent.DELETE, - reason=AgentLifecycleReason.CANCEL, - execution_context=_execution_context(AgentInvokeFrom.BABYSIT), - idempotency_key="babysit-1:cancel", - ) - - ack = client.send_lifecycle(AgentBackendLifecycleRequest(signal=signal)) - - assert ack == AgentBackendLifecycleAck( - accepted=True, - event=AgentLifecycleEvent.DELETE, - reason=AgentLifecycleReason.CANCEL, - idempotency_key="babysit-1:cancel", - ) diff --git a/api/tests/unit_tests/clients/agent_backend/test_mock_client.py b/api/tests/unit_tests/clients/agent_backend/test_mock_client.py deleted file mode 100644 index 2521dcf045..0000000000 --- a/api/tests/unit_tests/clients/agent_backend/test_mock_client.py +++ /dev/null @@ -1,75 +0,0 @@ -import pytest - -from clients.agent_backend import ( - AgentBackendInvokeRequest, - AgentErrorEvent, - AgentExecutionContext, - AgentInvokeFrom, - AgentOutputCreatedEvent, - AgentPauseRequestedEvent, - AgentTextDeltaEvent, - CompositorConfig, - MockAgentBackendClient, - MockAgentBackendScenario, - PromptLayerConfig, - PromptOrigin, - PromptRole, -) -from clients.agent_backend.errors import AgentBackendUnknownEventError - - -def _request(mock_scenario: str | None = None) -> AgentBackendInvokeRequest: - config = CompositorConfig( - execution_context=AgentExecutionContext(tenant_id="tenant-1", invoke_from=AgentInvokeFrom.WORKFLOW_RUN), - layers=[ - PromptLayerConfig( - id="prompt", - origin=PromptOrigin.AGENT_SOUL, - role=PromptRole.SYSTEM, - content="You are a helpful agent.", - ) - ], - ) - if mock_scenario: - config.runtime_options.mock_scenario = mock_scenario - return AgentBackendInvokeRequest(compositor_config=config, idempotency_key="invoke-1") - - -def test_mock_client_success_text_stream_is_deterministic(): - client = MockAgentBackendClient(scenario=MockAgentBackendScenario.SUCCESS_TEXT) - - first = [event.model_dump(mode="json") for event in client.invoke(_request())] - second = [event.model_dump(mode="json") for event in client.invoke(_request())] - - assert first == second - assert [event["sequence"] for event in first] == [1, 2, 3, 4, 5] - assert isinstance(next(client.invoke(_request())), object) - assert first[-1]["type"] == "output.created" - assert first[-1]["value"] == "hello agent" - - -def test_mock_client_can_select_scenario_from_request(): - events = list(MockAgentBackendClient().invoke(_request(MockAgentBackendScenario.PAUSE))) - - assert any(isinstance(event, AgentTextDeltaEvent) for event in events) - assert isinstance(events[-1], AgentPauseRequestedEvent) - - -def test_mock_client_error_event_does_not_raise_transport_error(): - events = list(MockAgentBackendClient(scenario=MockAgentBackendScenario.ERROR).invoke(_request())) - - assert len(events) == 1 - assert isinstance(events[0], AgentErrorEvent) - assert events[0].code == "mock_error" - - -def test_mock_client_success_file_returns_output_event(): - events = list(MockAgentBackendClient(scenario=MockAgentBackendScenario.SUCCESS_FILE).invoke(_request())) - - assert isinstance(events[-1], AgentOutputCreatedEvent) - assert events[-1].output_name == "result_file" - - -def test_mock_client_unknown_event_scenario_exercises_parser_failure(): - with pytest.raises(AgentBackendUnknownEventError): - list(MockAgentBackendClient(scenario=MockAgentBackendScenario.UNKNOWN_EVENT).invoke(_request())) diff --git a/api/tests/unit_tests/clients/agent_backend/test_request_builder.py b/api/tests/unit_tests/clients/agent_backend/test_request_builder.py new file mode 100644 index 0000000000..44c795d70d --- /dev/null +++ b/api/tests/unit_tests/clients/agent_backend/test_request_builder.py @@ -0,0 +1,132 @@ +import pytest +from agenton.layers import ExitIntent +from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID +from dify_agent.layers.dify_plugin import DIFY_PLUGIN_LAYER_TYPE_ID, DIFY_PLUGIN_LLM_LAYER_TYPE_ID +from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID +from dify_agent.protocol import ( + DIFY_AGENT_MODEL_LAYER_ID, + DIFY_AGENT_OUTPUT_LAYER_ID, + CreateRunRequest, + ExecutionContext, +) +from pydantic import ValidationError + +from clients.agent_backend import ( + AGENT_SOUL_PROMPT_LAYER_ID, + WORKFLOW_NODE_JOB_PROMPT_LAYER_ID, + WORKFLOW_USER_PROMPT_LAYER_ID, + AgentBackendModelConfig, + AgentBackendOutputConfig, + AgentBackendRunRequestBuilder, + AgentBackendWorkflowNodeRunInput, + redact_for_agent_backend_log, +) + + +def _run_input() -> AgentBackendWorkflowNodeRunInput: + return AgentBackendWorkflowNodeRunInput( + model=AgentBackendModelConfig( + tenant_id="tenant-1", + plugin_id="langgenius/openai", + user_id="user-1", + model_provider="openai", + model="gpt-test", + credentials={"api_key": "secret-key"}, + ), + execution_context=ExecutionContext( + tenant_id="tenant-1", + workflow_id="workflow-1", + workflow_run_id="workflow-run-1", + node_id="node-1", + node_execution_id="node-execution-1", + invoke_from="workflow_run", + ), + idempotency_key="workflow-run-1:node-execution-1", + agent_soul_prompt="You are a careful reviewer.", + workflow_node_job_prompt="Review the previous node output.", + user_prompt="Summarize the report.", + output=AgentBackendOutputConfig( + json_schema={ + "type": "object", + "properties": {"summary": {"type": "string"}}, + "required": ["summary"], + } + ), + metadata={"workflow_id": "workflow-1", "node_id": "node-1"}, + ) + + +def test_request_builder_outputs_dify_agent_create_run_request(): + request = AgentBackendRunRequestBuilder().build_for_workflow_node(_run_input()) + + assert isinstance(request, CreateRunRequest) + assert [layer.name for layer in request.composition.layers] == [ + AGENT_SOUL_PROMPT_LAYER_ID, + WORKFLOW_NODE_JOB_PROMPT_LAYER_ID, + WORKFLOW_USER_PROMPT_LAYER_ID, + "plugin", + DIFY_AGENT_MODEL_LAYER_ID, + DIFY_AGENT_OUTPUT_LAYER_ID, + ] + assert request.on_exit.default is ExitIntent.DELETE + assert request.execution_context is not None + assert request.execution_context.node_execution_id == "node-execution-1" + assert request.idempotency_key == "workflow-run-1:node-execution-1" + assert request.metadata == {"workflow_id": "workflow-1", "node_id": "node-1"} + + +def test_request_builder_separates_agent_soul_and_workflow_job_prompt(): + request = AgentBackendRunRequestBuilder().build_for_workflow_node(_run_input()) + layers = {layer.name: layer for layer in request.composition.layers} + + assert layers[AGENT_SOUL_PROMPT_LAYER_ID].type == PLAIN_PROMPT_LAYER_TYPE_ID + assert layers[AGENT_SOUL_PROMPT_LAYER_ID].metadata["origin"] == "agent_soul" + assert layers[WORKFLOW_NODE_JOB_PROMPT_LAYER_ID].metadata["origin"] == "workflow_node_job" + assert layers[WORKFLOW_USER_PROMPT_LAYER_ID].metadata["origin"] == "workflow_user_prompt" + + dumped = request.model_dump(mode="json") + assert dumped["composition"]["layers"][0]["config"]["prefix"] == "You are a careful reviewer." + assert dumped["composition"]["layers"][1]["config"]["prefix"] == "Review the previous node output." + assert dumped["composition"]["layers"][2]["config"]["user"] == "Summarize the report." + + +def test_request_builder_sets_model_and_output_layer_contract_ids(): + request = AgentBackendRunRequestBuilder().build_for_workflow_node(_run_input()) + layers = {layer.name: layer for layer in request.composition.layers} + + assert layers["plugin"].type == DIFY_PLUGIN_LAYER_TYPE_ID + assert layers[DIFY_AGENT_MODEL_LAYER_ID].type == DIFY_PLUGIN_LLM_LAYER_TYPE_ID + assert layers[DIFY_AGENT_MODEL_LAYER_ID].deps == {"plugin": "plugin"} + assert layers[DIFY_AGENT_OUTPUT_LAYER_ID].type == DIFY_OUTPUT_LAYER_TYPE_ID + + +def test_request_builder_can_suspend_on_exit_for_resume_or_babysit_paths(): + run_input = _run_input() + run_input.suspend_on_exit = True + + request = AgentBackendRunRequestBuilder().build_for_workflow_node(run_input) + + assert request.on_exit.default is ExitIntent.SUSPEND + + +def test_request_builder_rejects_blank_prompts(): + with pytest.raises(ValidationError): + AgentBackendWorkflowNodeRunInput( + model=AgentBackendModelConfig( + tenant_id="tenant-1", + plugin_id="langgenius/openai", + model_provider="openai", + model="gpt-test", + ), + execution_context=ExecutionContext(tenant_id="tenant-1", invoke_from="workflow_run"), + workflow_node_job_prompt=" ", + user_prompt="hello", + ) + + +def test_redact_for_agent_backend_log_hides_credentials(): + request = AgentBackendRunRequestBuilder().build_for_workflow_node(_run_input()) + + redacted = redact_for_agent_backend_log(request) + + assert redacted["composition"]["layers"][4]["config"]["credentials"] == "[REDACTED]" diff --git a/api/uv.lock b/api/uv.lock index bb150e8cf3..0e4f3170b5 100644 --- a/api/uv.lock +++ b/api/uv.lock @@ -1332,6 +1332,7 @@ dependencies = [ { name = "boto3" }, { name = "celery" }, { name = "croniter" }, + { name = "dify-agent" }, { name = "fastopenapi", extra = ["flask"] }, { name = "flask" }, { name = "flask-compress" }, @@ -1372,7 +1373,6 @@ dev = [ { name = "boto3-stubs" }, { name = "celery-types" }, { name = "coverage" }, - { name = "dify-agent" }, { name = "dotenv-linter" }, { name = "faker" }, { name = "hypothesis" }, @@ -1615,6 +1615,7 @@ requires-dist = [ { name = "boto3", specifier = ">=1.43.6" }, { name = "celery", specifier = ">=5.6.3" }, { name = "croniter", specifier = ">=6.2.2" }, + { name = "dify-agent", directory = "../dify-agent" }, { name = "fastopenapi", extras = ["flask"], specifier = "~=0.7.0" }, { name = "flask", specifier = ">=3.1.3,<4.0.0" }, { name = "flask-compress", specifier = ">=1.24,<2.0.0" }, @@ -1655,7 +1656,6 @@ dev = [ { name = "boto3-stubs", specifier = ">=1.43.2" }, { name = "celery-types", specifier = ">=0.23.0" }, { name = "coverage", specifier = ">=7.13.4" }, - { name = "dify-agent", directory = "../dify-agent" }, { name = "dotenv-linter", specifier = ">=0.7.0" }, { name = "faker", specifier = ">=40.15.0" }, { name = "hypothesis", specifier = ">=6.152.4" }, diff --git a/dify-agent/src/dify_agent/client/_client.py b/dify-agent/src/dify_agent/client/_client.py index 7760f356aa..1a27381bc0 100644 --- a/dify-agent/src/dify_agent/client/_client.py +++ b/dify-agent/src/dify_agent/client/_client.py @@ -23,6 +23,8 @@ import httpx from pydantic import BaseModel, ValidationError from dify_agent.protocol.schemas import ( + CancelRunRequest, + CancelRunResponse, CreateRunRequest, CreateRunResponse, RUN_EVENT_ADAPTER, @@ -32,8 +34,8 @@ from dify_agent.protocol.schemas import ( ) _ResponseModelT = TypeVar("_ResponseModelT", bound=BaseModel) -_TERMINAL_EVENT_TYPES = {"run_succeeded", "run_failed"} -_TERMINAL_RUN_STATUSES = {"succeeded", "failed"} +_TERMINAL_EVENT_TYPES = {"run_succeeded", "run_failed", "run_cancelled"} +_TERMINAL_RUN_STATUSES = {"succeeded", "failed", "cancelled"} class DifyAgentClientError(RuntimeError): @@ -279,6 +281,42 @@ class Client: raise DifyAgentClientError(f"create_run_sync request failed: {exc}") from exc return _parse_model_response(response, CreateRunResponse) + async def cancel_run(self, run_id: str, request: CancelRunRequest | None = None) -> CancelRunResponse: + """Request explicit cancellation for ``run_id``. + + The server may accept cancellation only for active runs; unsupported + deployments return an HTTP error rather than overloading ``run_failed``. + """ + request_model = request or CancelRunRequest() + try: + response = await self._get_async_http_client().post( + self._url(f"/runs/{quote(run_id, safe='')}/cancel"), + content=request_model.model_dump_json(), + headers=self._merged_headers({"Content-Type": "application/json"}), + timeout=self._timeout, + ) + except httpx.TimeoutException as exc: + raise DifyAgentTimeoutError("cancel_run timed out") from exc + except httpx.RequestError as exc: + raise DifyAgentClientError(f"cancel_run request failed: {exc}") from exc + return _parse_model_response(response, CancelRunResponse) + + def cancel_run_sync(self, run_id: str, request: CancelRunRequest | None = None) -> CancelRunResponse: + """Synchronous variant of ``cancel_run``.""" + request_model = request or CancelRunRequest() + try: + response = self._get_sync_http_client().post( + self._url(f"/runs/{quote(run_id, safe='')}/cancel"), + content=request_model.model_dump_json(), + headers=self._merged_headers({"Content-Type": "application/json"}), + timeout=self._timeout, + ) + except httpx.TimeoutException as exc: + raise DifyAgentTimeoutError("cancel_run_sync timed out") from exc + except httpx.RequestError as exc: + raise DifyAgentClientError(f"cancel_run_sync request failed: {exc}") from exc + return _parse_model_response(response, CancelRunResponse) + async def get_run(self, run_id: str) -> RunStatusResponse: """Return the current status for ``run_id`` or raise a mapped client error.""" try: diff --git a/dify-agent/src/dify_agent/protocol/__init__.py b/dify-agent/src/dify_agent/protocol/__init__.py index 7ab78a5e4d..3ada378df8 100644 --- a/dify-agent/src/dify_agent/protocol/__init__.py +++ b/dify-agent/src/dify_agent/protocol/__init__.py @@ -5,17 +5,26 @@ from .schemas import ( DIFY_AGENT_OUTPUT_LAYER_ID, RUN_EVENT_ADAPTER, BaseRunEvent, + CancelRunRequest, + CancelRunResponse, CreateRunRequest, CreateRunResponse, EmptyRunEventData, + ExecutionContext, + InvokeFrom, LayerExitSignals, PydanticAIStreamRunEvent, + RunCancelledEvent, + RunCancelledEventData, RunEvent, RunComposition, RunEventType, RunEventsResponse, RunFailedEvent, RunFailedEventData, + RunPausedEvent, + RunPausedEventData, + RunPurpose, RunLayerSpec, RunStartedEvent, RunStatus, @@ -28,20 +37,29 @@ from .schemas import ( __all__ = [ "BaseRunEvent", + "CancelRunRequest", + "CancelRunResponse", "CreateRunRequest", "CreateRunResponse", "DIFY_AGENT_MODEL_LAYER_ID", "DIFY_AGENT_OUTPUT_LAYER_ID", "EmptyRunEventData", + "ExecutionContext", + "InvokeFrom", "LayerExitSignals", "PydanticAIStreamRunEvent", "RUN_EVENT_ADAPTER", + "RunCancelledEvent", + "RunCancelledEventData", "RunComposition", "RunEvent", "RunEventType", "RunEventsResponse", "RunFailedEvent", "RunFailedEventData", + "RunPausedEvent", + "RunPausedEventData", + "RunPurpose", "RunLayerSpec", "RunStartedEvent", "RunStatus", diff --git a/dify-agent/src/dify_agent/protocol/schemas.py b/dify-agent/src/dify_agent/protocol/schemas.py index 430c4052a3..76890e3991 100644 --- a/dify-agent/src/dify_agent/protocol/schemas.py +++ b/dify-agent/src/dify_agent/protocol/schemas.py @@ -43,12 +43,16 @@ from agenton.layers import ExitIntent DIFY_AGENT_MODEL_LAYER_ID: Final[str] = "llm" DIFY_AGENT_OUTPUT_LAYER_ID: Final[str] = "output" -RunStatus = Literal["running", "succeeded", "failed"] +RunStatus = Literal["running", "paused", "succeeded", "failed", "cancelled"] +RunPurpose = Literal["workflow_node", "single_step", "agent_app", "babysit", "fasten_preview"] +InvokeFrom = Literal["workflow_run", "single_step", "agent_app", "babysit", "fasten"] RunEventType = Literal[ "run_started", "pydantic_ai_event", + "run_paused", "run_succeeded", "run_failed", + "run_cancelled", ] @@ -100,6 +104,29 @@ class RunComposition(BaseModel): model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") +class ExecutionContext(BaseModel): + """Dify-owned execution identifiers attached to one Agent backend run. + + The Agent backend stores and replays this context for observability and + product correlation only. It must not use these identifiers as authorization + proof; API backend remains responsible for tenant and user access checks. + """ + + tenant_id: str + app_id: str | None = None + workflow_id: str | None = None + workflow_run_id: str | None = None + node_id: str | None = None + node_execution_id: str | None = None + conversation_id: str | None = None + agent_id: str | None = None + agent_config_version_id: str | None = None + invoke_from: InvokeFrom + trace_id: str | None = None + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + class CreateRunRequest(BaseModel): """Request body for creating one async agent run. @@ -115,12 +142,30 @@ class CreateRunRequest(BaseModel): """ composition: RunComposition + execution_context: ExecutionContext | None = None + purpose: RunPurpose = "workflow_node" + idempotency_key: str | None = None + metadata: dict[str, JsonValue] = Field(default_factory=dict) session_snapshot: CompositorSessionSnapshot | None = None on_exit: LayerExitSignals = Field(default_factory=LayerExitSignals) model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") +class CancelRunRequest(BaseModel): + """Request body for cancelling a run. + + Runtime cancellation is intentionally a separate protocol operation from + failed execution so API callers can distinguish user/operator cancellation + from model, tool, or infrastructure failures. + """ + + reason: str | None = None + message: str | None = None + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + def normalize_composition(composition: RunComposition) -> tuple[CompositorConfig, dict[str, LayerConfigInput]]: """Split public Dify composition into Agenton's graph config and layer configs. @@ -159,6 +204,15 @@ class CreateRunResponse(BaseModel): model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") +class CancelRunResponse(BaseModel): + """Response returned after a cancel request is accepted.""" + + run_id: str + status: Literal["cancelled"] + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + class RunStatusResponse(BaseModel): """Current server-side status for one run.""" @@ -195,6 +249,25 @@ class RunFailedEventData(BaseModel): model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") +class RunPausedEventData(BaseModel): + """Pause payload used for human handoff or other resumable waits.""" + + reason: str + message: str | None = None + session_snapshot: CompositorSessionSnapshot | None = None + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + +class RunCancelledEventData(BaseModel): + """Terminal cancellation payload for explicit user/operator cancellation.""" + + reason: str | None = None + message: str | None = None + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + class BaseRunEvent(BaseModel): """Shared append-only event envelope visible through polling and SSE.""" @@ -233,8 +306,27 @@ class RunFailedEvent(BaseRunEvent): data: RunFailedEventData +class RunPausedEvent(BaseRunEvent): + """Resumable pause event emitted when a run waits for outside input.""" + + type: Literal["run_paused"] = "run_paused" + data: RunPausedEventData + + +class RunCancelledEvent(BaseRunEvent): + """Terminal cancellation event emitted after an explicit cancel request.""" + + type: Literal["run_cancelled"] = "run_cancelled" + data: RunCancelledEventData = Field(default_factory=RunCancelledEventData) + + RunEvent: TypeAlias = Annotated[ - RunStartedEvent | PydanticAIStreamRunEvent | RunSucceededEvent | RunFailedEvent, + RunStartedEvent + | PydanticAIStreamRunEvent + | RunPausedEvent + | RunSucceededEvent + | RunFailedEvent + | RunCancelledEvent, Field(discriminator="type"), ] RUN_EVENT_ADAPTER: TypeAdapter[RunEvent] = TypeAdapter(RunEvent) @@ -252,20 +344,29 @@ class RunEventsResponse(BaseModel): __all__ = [ "BaseRunEvent", + "CancelRunRequest", + "CancelRunResponse", "CreateRunRequest", "CreateRunResponse", "DIFY_AGENT_MODEL_LAYER_ID", "DIFY_AGENT_OUTPUT_LAYER_ID", "EmptyRunEventData", + "ExecutionContext", + "InvokeFrom", "LayerExitSignals", "PydanticAIStreamRunEvent", "RUN_EVENT_ADAPTER", + "RunCancelledEvent", + "RunCancelledEventData", "RunComposition", "RunEvent", "RunEventType", "RunEventsResponse", "RunFailedEvent", "RunFailedEventData", + "RunPausedEvent", + "RunPausedEventData", + "RunPurpose", "RunStartedEvent", "RunStatus", "RunStatusResponse", diff --git a/dify-agent/src/dify_agent/server/routes/runs.py b/dify-agent/src/dify_agent/server/routes/runs.py index 9375b1f5b7..a5dff09218 100644 --- a/dify-agent/src/dify_agent/server/routes/runs.py +++ b/dify-agent/src/dify_agent/server/routes/runs.py @@ -13,7 +13,14 @@ from typing import Annotated from fastapi import APIRouter, Depends, Header, HTTPException, Query from fastapi.responses import StreamingResponse -from dify_agent.protocol.schemas import CreateRunRequest, CreateRunResponse, RunEventsResponse, RunStatusResponse +from dify_agent.protocol.schemas import ( + CancelRunRequest, + CancelRunResponse, + CreateRunRequest, + CreateRunResponse, + RunEventsResponse, + RunStatusResponse, +) from dify_agent.runtime.run_scheduler import RunRequestValidationError, RunScheduler, SchedulerStoppingError from dify_agent.server.sse import sse_event_stream from dify_agent.storage.redis_run_store import RedisRunStore, RunNotFoundError @@ -59,6 +66,18 @@ def create_runs_router( error=record.error, ) + @router.post("/{run_id}/cancel", response_model=CancelRunResponse) + async def cancel_run(run_id: str, request: CancelRunRequest) -> CancelRunResponse: + """Reserve the cancellation endpoint in the public protocol. + + Runtime cancellation requires scheduler task lookup and persistence + semantics that are outside the current server implementation. Exposing a + typed endpoint now lets clients bind to the final route while receiving + an explicit 501 until execution support lands. + """ + del run_id, request + raise HTTPException(status_code=501, detail="run cancellation is not implemented") + @router.get("/{run_id}/events", response_model=RunEventsResponse) async def get_run_events( run_id: str, diff --git a/dify-agent/tests/local/dify_agent/client/test_client.py b/dify-agent/tests/local/dify_agent/client/test_client.py index 990475909d..db2d2e2386 100644 --- a/dify-agent/tests/local/dify_agent/client/test_client.py +++ b/dify-agent/tests/local/dify_agent/client/test_client.py @@ -20,8 +20,11 @@ from dify_agent.client import ( DifyAgentValidationError, ) from dify_agent.protocol.schemas import ( + CancelRunRequest, + CancelRunResponse, CreateRunRequest, RUN_EVENT_ADAPTER, + RunCancelledEvent, RunEvent, RunEventsResponse, RunStartedEvent, @@ -97,6 +100,10 @@ def test_sync_methods_parse_protocol_dtos_and_send_create_request_dto() -> None: "next_cursor": "1-0", }, ) + if request.method == "POST" and request.url.path == "/runs/run-1/cancel": + payload = cast(dict[str, object], json.loads(request.content)) + assert payload == {"reason": "user_cancelled", "message": None} + return httpx.Response(202, json={"run_id": "run-1", "status": "cancelled"}) raise AssertionError(f"unexpected request: {request.method} {request.url}") http_client = httpx.Client(transport=httpx.MockTransport(handler)) @@ -105,11 +112,14 @@ def test_sync_methods_parse_protocol_dtos_and_send_create_request_dto() -> None: created = client.create_run_sync(CreateRunRequest.model_validate(_create_run_payload())) status = client.get_run_sync(created.run_id) events = client.get_events_sync(created.run_id, after="0-0", limit=10) + cancelled = client.cancel_run_sync(created.run_id, CancelRunRequest(reason="user_cancelled")) assert created.status == "running" assert status.status == "running" assert isinstance(events, RunEventsResponse) assert [event.type for event in events.events] == ["run_started"] + assert isinstance(cancelled, CancelRunResponse) + assert cancelled.status == "cancelled" def test_async_methods_and_wait_run_parse_protocol_dtos() -> None: @@ -251,6 +261,31 @@ def test_stream_events_stops_after_terminal_event() -> None: assert calls == 1 +def test_stream_events_stops_after_cancelled_terminal_event() -> None: + calls = 0 + body = "".join( + [ + _event_frame(RunStartedEvent(id="1-0", run_id="run-1")), + _event_frame(RunCancelledEvent(id="2-0", run_id="run-1")), + ] + ) + + def handler(_request: httpx.Request) -> httpx.Response: + nonlocal calls + calls += 1 + return httpx.Response(200, content=body) + + client = Client( + base_url="http://testserver", + sync_http_client=httpx.Client(transport=httpx.MockTransport(handler)), + ) + + events = list(client.stream_events_sync("run-1", reconnect_delay_seconds=0)) + + assert [event.type for event in events] == ["run_started", "run_cancelled"] + assert calls == 1 + + def test_stream_events_reconnects_from_latest_event_id() -> None: seen_after: list[str] = [] diff --git a/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py b/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py index ffdad4207b..db2db7c438 100644 --- a/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py +++ b/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py @@ -12,12 +12,17 @@ from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, DIFY_AGENT_OUTPUT_LAY from dify_agent.protocol.schemas import ( RUN_EVENT_ADAPTER, CreateRunRequest, + ExecutionContext, LayerExitSignals, PydanticAIStreamRunEvent, + RunCancelledEvent, + RunCancelledEventData, RunComposition, RunFailedEvent, RunFailedEventData, RunLayerSpec, + RunPausedEvent, + RunPausedEventData, RunStartedEvent, RunSucceededEvent, RunSucceededEventData, @@ -38,6 +43,15 @@ def test_run_event_adapter_round_trips_typed_variants() -> None: ), ), RunFailedEvent(run_id="run-1", data=RunFailedEventData(error="boom", reason="shutdown")), + RunPausedEvent( + run_id="run-1", + data=RunPausedEventData( + reason="human_handoff", + message="Need review", + session_snapshot=CompositorSessionSnapshot(layers=[]), + ), + ), + RunCancelledEvent(run_id="run-1", data=RunCancelledEventData(reason="user_cancelled")), ] for event in events: @@ -89,6 +103,18 @@ def test_create_run_request_accepts_dto_first_public_composition_and_normalizes_ } ) request = CreateRunRequest( + execution_context=ExecutionContext( + tenant_id="tenant-1", + workflow_id="workflow-1", + workflow_run_id="workflow-run-1", + node_id="node-1", + node_execution_id="node-execution-1", + invoke_from="workflow_run", + trace_id="trace-1", + ), + purpose="workflow_node", + idempotency_key="workflow-run-1:node-execution-1", + metadata={"source": "unit_test"}, composition=RunComposition( layers=[ RunLayerSpec(name="prompt", type=PLAIN_PROMPT_LAYER_TYPE_ID, config=prompt_config), @@ -111,6 +137,22 @@ def test_create_run_request_accepts_dto_first_public_composition_and_normalizes_ graph_config, layer_configs = normalize_composition(request.composition) payload = request.model_dump(mode="json") + assert payload["execution_context"] == { + "tenant_id": "tenant-1", + "app_id": None, + "workflow_id": "workflow-1", + "workflow_run_id": "workflow-run-1", + "node_id": "node-1", + "node_execution_id": "node-execution-1", + "conversation_id": None, + "agent_id": None, + "agent_config_version_id": None, + "invoke_from": "workflow_run", + "trace_id": "trace-1", + } + assert payload["purpose"] == "workflow_node" + assert payload["idempotency_key"] == "workflow-run-1:node-execution-1" + assert payload["metadata"] == {"source": "unit_test"} assert payload["composition"]["layers"][0]["config"] == {"prefix": "system", "user": "hello", "suffix": []} assert [layer.model_dump(mode="json") for layer in graph_config.layers] == [ {"name": "prompt", "type": PLAIN_PROMPT_LAYER_TYPE_ID, "deps": {}, "metadata": {}}, @@ -163,6 +205,17 @@ def test_on_exit_accept_layer_overrides() -> None: assert request.on_exit.layers == {"prompt": ExitIntent.SUSPEND, "llm": ExitIntent.DELETE} +def test_execution_context_rejects_unknown_fields() -> None: + with pytest.raises(ValidationError): + _ = ExecutionContext.model_validate( + { + "tenant_id": "tenant-1", + "invoke_from": "workflow_run", + "unknown": "value", + } + ) + + def test_layer_exit_signals_reject_extra_fields() -> None: with pytest.raises(ValidationError): _ = LayerExitSignals.model_validate({"default": "suspend", "unknown": "value"}) diff --git a/dify-agent/tests/local/dify_agent/server/test_runs_routes.py b/dify-agent/tests/local/dify_agent/server/test_runs_routes.py index c173816a51..bed7883170 100644 --- a/dify-agent/tests/local/dify_agent/server/test_runs_routes.py +++ b/dify-agent/tests/local/dify_agent/server/test_runs_routes.py @@ -67,6 +67,21 @@ def test_create_run_returns_running_from_scheduler() -> None: assert response.json() == {"run_id": "run-1", "status": "running"} +def test_cancel_run_endpoint_is_reserved_but_not_implemented() -> None: + from fastapi import FastAPI + + app = FastAPI() + app.include_router( + create_runs_router(lambda: FakeStore(), lambda: FakeScheduler()) # pyright: ignore[reportArgumentType] + ) + client = TestClient(app) + + response = client.post("/runs/run-1/cancel", json={"reason": "user_cancelled"}) + + assert response.status_code == 501 + assert response.json()["detail"] == "run cancellation is not implemented" + + def test_create_run_accepts_valid_full_plugin_graph() -> None: from fastapi import FastAPI