chore: align agent backend protocol adapter

This commit is contained in:
Yansong Zhang
2026-05-18 11:50:26 +08:00
parent 199e267415
commit d8e83da370
29 changed files with 1470 additions and 1264 deletions

View File

@ -1,110 +1,74 @@
from clients.agent_backend.client import AgentBackendClient
from clients.agent_backend.dto import (
CONTRACT_VERSION,
AgentExecutionContext,
AgentIdentityKind,
AgentIdentityRef,
AgentInvokeFrom,
AgentLayerConfig,
AgentLayerType,
AgentRuntimeOptions,
CliToolsLayerConfig,
CompositorConfig,
DifyPluginToolsLayerConfig,
FilesystemLayerConfig,
HumanContactsLayerConfig,
MemoryLayerConfig,
OutputSchemaLayerConfig,
PromptLayerConfig,
PromptOrigin,
PromptRole,
ReferenceType,
ResourceRef,
SandboxLayerConfig,
SecretBinding,
SecretsLayerConfig,
SkillsLayerConfig,
WorkflowContextLayerConfig,
"""API-side integration boundary for the Dify Agent backend.
Public wire DTOs come from ``dify_agent.protocol``. This package only contains
API adapters: request building from Dify product concepts, a thin client wrapper,
event adaptation for future workflow integration, and deterministic fakes.
"""
from clients.agent_backend.client import AgentBackendRunClient, DifyAgentBackendRunClient
from clients.agent_backend.errors import (
AgentBackendError,
AgentBackendHTTPError,
AgentBackendRequestBuildError,
AgentBackendRunFailedError,
AgentBackendStreamError,
AgentBackendTransportError,
AgentBackendValidationError,
)
from clients.agent_backend.event_parser import AgentBackendEventParser
from clients.agent_backend.events import (
AgentBackendEvent,
AgentBackendEventEnvelope,
AgentBackendEventType,
AgentErrorEvent,
AgentFileCreatedEvent,
AgentLifecycleAckEvent,
AgentOutputCreatedEvent,
AgentOutputDeltaEvent,
AgentOutputValidationFailedEvent,
AgentPauseRequestedEvent,
AgentTextCompletedEvent,
AgentTextDeltaEvent,
AgentToolCallDeltaEvent,
AgentToolCallFailedEvent,
AgentToolCallStartedEvent,
AgentToolCallSucceededEvent,
from clients.agent_backend.event_adapter import (
AgentBackendInternalEvent,
AgentBackendInternalEventType,
AgentBackendRunCancelledInternalEvent,
AgentBackendRunEventAdapter,
AgentBackendRunFailedInternalEvent,
AgentBackendRunPausedInternalEvent,
AgentBackendRunStartedInternalEvent,
AgentBackendRunSucceededInternalEvent,
AgentBackendStreamInternalEvent,
)
from clients.agent_backend.lifecycle import (
AgentBackendInvokeRequest,
AgentBackendLifecycleAck,
AgentBackendLifecycleRequest,
AgentBackendLifecycleSignal,
AgentLifecycleEvent,
AgentLifecycleReason,
from clients.agent_backend.factory import create_agent_backend_run_client
from clients.agent_backend.fake_client import FakeAgentBackendRunClient, FakeAgentBackendScenario
from clients.agent_backend.request_builder import (
AGENT_SOUL_PROMPT_LAYER_ID,
DIFY_PLUGIN_CONTEXT_LAYER_ID,
WORKFLOW_NODE_JOB_PROMPT_LAYER_ID,
WORKFLOW_USER_PROMPT_LAYER_ID,
AgentBackendModelConfig,
AgentBackendOutputConfig,
AgentBackendRunRequestBuilder,
AgentBackendWorkflowNodeRunInput,
redact_for_agent_backend_log,
)
from clients.agent_backend.mock_client import MockAgentBackendClient, MockAgentBackendScenario
__all__ = [
"CONTRACT_VERSION",
"AgentBackendClient",
"AgentBackendEvent",
"AgentBackendEventEnvelope",
"AgentBackendEventParser",
"AgentBackendEventType",
"AgentBackendInvokeRequest",
"AgentBackendLifecycleAck",
"AgentBackendLifecycleRequest",
"AgentBackendLifecycleSignal",
"AgentErrorEvent",
"AgentExecutionContext",
"AgentFileCreatedEvent",
"AgentIdentityKind",
"AgentIdentityRef",
"AgentInvokeFrom",
"AgentLayerConfig",
"AgentLayerType",
"AgentLifecycleAckEvent",
"AgentLifecycleEvent",
"AgentLifecycleReason",
"AgentOutputCreatedEvent",
"AgentOutputDeltaEvent",
"AgentOutputValidationFailedEvent",
"AgentPauseRequestedEvent",
"AgentRuntimeOptions",
"AgentTextCompletedEvent",
"AgentTextDeltaEvent",
"AgentToolCallDeltaEvent",
"AgentToolCallFailedEvent",
"AgentToolCallStartedEvent",
"AgentToolCallSucceededEvent",
"CliToolsLayerConfig",
"CompositorConfig",
"DifyPluginToolsLayerConfig",
"FilesystemLayerConfig",
"HumanContactsLayerConfig",
"MemoryLayerConfig",
"MockAgentBackendClient",
"MockAgentBackendScenario",
"OutputSchemaLayerConfig",
"PromptLayerConfig",
"PromptOrigin",
"PromptRole",
"ReferenceType",
"ResourceRef",
"SandboxLayerConfig",
"SecretBinding",
"SecretsLayerConfig",
"SkillsLayerConfig",
"WorkflowContextLayerConfig",
"AGENT_SOUL_PROMPT_LAYER_ID",
"DIFY_PLUGIN_CONTEXT_LAYER_ID",
"WORKFLOW_NODE_JOB_PROMPT_LAYER_ID",
"WORKFLOW_USER_PROMPT_LAYER_ID",
"AgentBackendError",
"AgentBackendHTTPError",
"AgentBackendInternalEvent",
"AgentBackendInternalEventType",
"AgentBackendModelConfig",
"AgentBackendOutputConfig",
"AgentBackendRequestBuildError",
"AgentBackendRunCancelledInternalEvent",
"AgentBackendRunClient",
"AgentBackendRunEventAdapter",
"AgentBackendRunFailedError",
"AgentBackendRunFailedInternalEvent",
"AgentBackendRunPausedInternalEvent",
"AgentBackendRunRequestBuilder",
"AgentBackendRunStartedInternalEvent",
"AgentBackendRunSucceededInternalEvent",
"AgentBackendStreamError",
"AgentBackendStreamInternalEvent",
"AgentBackendTransportError",
"AgentBackendValidationError",
"AgentBackendWorkflowNodeRunInput",
"DifyAgentBackendRunClient",
"FakeAgentBackendRunClient",
"FakeAgentBackendScenario",
"create_agent_backend_run_client",
"redact_for_agent_backend_log",
]

View File

@ -1,19 +1,126 @@
"""Synchronous API-side wrapper around the public ``dify-agent`` client.
``dify-agent`` owns the cross-service DTOs and HTTP/SSE implementation. The API
backend keeps this thin wrapper so workflow code depends on a local protocol,
gets API-native errors, and can use a deterministic fake in tests without
creating another wire contract.
"""
from __future__ import annotations
from collections.abc import Iterator
from typing import Protocol
from clients.agent_backend.events import AgentBackendEvent
from clients.agent_backend.lifecycle import (
AgentBackendInvokeRequest,
AgentBackendLifecycleAck,
AgentBackendLifecycleRequest,
from dify_agent.client import (
DifyAgentClientError,
DifyAgentHTTPError,
DifyAgentStreamError,
DifyAgentTimeoutError,
DifyAgentValidationError,
)
from dify_agent.protocol import (
CancelRunRequest,
CancelRunResponse,
CreateRunRequest,
CreateRunResponse,
RunEvent,
RunStatusResponse,
)
from clients.agent_backend.errors import (
AgentBackendError,
AgentBackendHTTPError,
AgentBackendStreamError,
AgentBackendTransportError,
AgentBackendValidationError,
)
class AgentBackendClient(Protocol):
def invoke(self, request: AgentBackendInvokeRequest) -> Iterator[AgentBackendEvent]:
"""Invoke agent backend and stream typed events."""
class AgentBackendRunClient(Protocol):
"""Local boundary used by API workflow integrations to run Agent backend jobs."""
def send_lifecycle(self, request: AgentBackendLifecycleRequest) -> AgentBackendLifecycleAck:
"""Send an out-of-band lifecycle signal."""
def create_run(self, request: CreateRunRequest) -> CreateRunResponse:
"""Create one Agent backend run and return its accepted status."""
def cancel_run(self, run_id: str, request: CancelRunRequest | None = None) -> CancelRunResponse:
"""Request explicit cancellation for one Agent backend run."""
def stream_events(self, run_id: str, *, after: str | None = None) -> Iterator[RunEvent]:
"""Yield public ``dify-agent`` run events in stream order."""
def wait_run(self, run_id: str, *, timeout_seconds: float | None = None) -> RunStatusResponse:
"""Wait for a run to reach a terminal status and return that status."""
class _DifyAgentSyncClient(Protocol):
"""Subset of ``dify_agent.client.Client`` used by the API wrapper."""
def create_run_sync(self, request: CreateRunRequest) -> CreateRunResponse:
"""Create one run synchronously."""
def cancel_run_sync(self, run_id: str, request: CancelRunRequest | None = None) -> CancelRunResponse:
"""Cancel one run synchronously."""
def stream_events_sync(self, run_id: str, *, after: str | None = None) -> Iterator[RunEvent]:
"""Stream run events synchronously."""
def wait_run_sync(self, run_id: str, *, timeout_seconds: float | None = None) -> RunStatusResponse:
"""Wait for terminal run status synchronously."""
class DifyAgentBackendRunClient:
"""Adapter from API sync call sites to ``dify_agent.client.Client`` sync methods."""
client: _DifyAgentSyncClient
def __init__(self, client: _DifyAgentSyncClient) -> None:
self.client = client
def create_run(self, request: CreateRunRequest) -> CreateRunResponse:
"""Create one run through ``POST /runs`` and normalize client exceptions."""
try:
return self.client.create_run_sync(request)
except Exception as exc:
raise _normalize_dify_agent_error(exc) from exc
def cancel_run(self, run_id: str, request: CancelRunRequest | None = None) -> CancelRunResponse:
"""Cancel one run through ``POST /runs/{run_id}/cancel`` and normalize exceptions."""
try:
return self.client.cancel_run_sync(run_id, request=request)
except Exception as exc:
raise _normalize_dify_agent_error(exc) from exc
def stream_events(self, run_id: str, *, after: str | None = None) -> Iterator[RunEvent]:
"""Stream run events from ``/events/sse`` with the wrapped client's reconnect policy."""
try:
yield from self.client.stream_events_sync(run_id, after=after)
except Exception as exc:
raise _normalize_dify_agent_error(exc) from exc
def wait_run(self, run_id: str, *, timeout_seconds: float | None = None) -> RunStatusResponse:
"""Poll run status until terminal state and normalize client exceptions."""
try:
return self.client.wait_run_sync(run_id, timeout_seconds=timeout_seconds)
except Exception as exc:
raise _normalize_dify_agent_error(exc) from exc
def _normalize_dify_agent_error(exc: Exception) -> AgentBackendError:
"""Map public ``dify-agent`` client errors to API-side integration errors."""
if isinstance(exc, DifyAgentValidationError):
return AgentBackendValidationError("Agent backend request or response validation failed", detail=exc.detail)
if isinstance(exc, DifyAgentHTTPError):
return AgentBackendHTTPError(
f"Agent backend HTTP {exc.status_code}",
status_code=exc.status_code,
detail=exc.detail,
)
if isinstance(exc, DifyAgentTimeoutError):
return AgentBackendTransportError(str(exc))
if isinstance(exc, DifyAgentStreamError):
return AgentBackendStreamError(str(exc))
if isinstance(exc, DifyAgentClientError):
return AgentBackendTransportError(str(exc))
if isinstance(exc, AgentBackendError):
return exc
return AgentBackendTransportError(str(exc) or type(exc).__name__)

View File

@ -1,290 +0,0 @@
from __future__ import annotations
from collections.abc import Mapping
from enum import StrEnum
from typing import Annotated, Any, Literal
from pydantic import BaseModel, ConfigDict, Field, JsonValue, field_validator, model_validator
CONTRACT_VERSION = "agent-backend.v1"
class AgentInvokeFrom(StrEnum):
WORKFLOW_RUN = "workflow_run"
SINGLE_STEP = "single_step"
AGENT_APP = "agent_app"
BABYSIT = "babysit"
FASTEN = "fasten"
class AgentLayerType(StrEnum):
WORKFLOW_CONTEXT = "workflow_context"
PROMPT = "prompt"
FILESYSTEM = "filesystem"
SANDBOX = "sandbox"
DIFY_PLUGIN_TOOLS = "dify_plugin_tools"
CLI_TOOLS = "cli_tools"
SKILLS = "skills"
HUMAN_CONTACTS = "human_contacts"
OUTPUT_SCHEMA = "output_schema"
MEMORY = "memory"
SECRETS = "secrets"
class LayerLifecycleScope(StrEnum):
INVOCATION = "invocation"
WORKFLOW_RUN = "workflow_run"
AGENT_SESSION = "agent_session"
PERSISTENT_REF = "persistent_ref"
class PromptOrigin(StrEnum):
AGENT_SOUL = "agent_soul"
WORKFLOW_NODE_JOB = "workflow_node_job"
BABYSIT_TRANSIENT = "babysit_transient"
FASTEN_CANDIDATE = "fasten_candidate"
class PromptRole(StrEnum):
SYSTEM = "system"
USER = "user"
DEVELOPER = "developer"
class AgentIdentityKind(StrEnum):
ROSTER_AGENT = "roster_agent"
WORKFLOW_INLINE_AGENT = "workflow_inline_agent"
AGENT_APP = "agent_app"
class ReferenceType(StrEnum):
AGENT = "agent"
AGENT_CONFIG_VERSION = "agent_config_version"
CREDENTIAL = "credential"
FILE = "file"
HUMAN = "human"
MEMORY = "memory"
SECRET = "secret"
SKILL = "skill"
TOOL = "tool"
WORKFLOW = "workflow"
WORKFLOW_RUN = "workflow_run"
class AgentBackendBaseModel(BaseModel):
model_config = ConfigDict(extra="forbid", populate_by_name=True, use_enum_values=True)
def model_dump(self, *args: Any, **kwargs: Any) -> dict[str, Any]:
kwargs.setdefault("by_alias", True)
return super().model_dump(*args, **kwargs)
def model_dump_redacted(self, **kwargs: Any) -> dict[str, Any]:
dumped = self.model_dump(mode="json", **kwargs)
return _redact_mapping(dumped)
class AgentExecutionContext(AgentBackendBaseModel):
tenant_id: str
app_id: str | None = None
workflow_id: str | None = None
workflow_run_id: str | None = None
node_id: str | None = None
node_execution_id: str | None = None
conversation_id: str | None = None
agent_id: str | None = None
agent_config_version_id: str | None = None
invoke_from: AgentInvokeFrom
trace_id: str | None = None
class ResourceRef(AgentBackendBaseModel):
type: ReferenceType
id: str
name: str | None = None
metadata: Mapping[str, JsonValue] = Field(default_factory=dict)
class AgentIdentityRef(AgentBackendBaseModel):
kind: AgentIdentityKind
agent_ref: ResourceRef | None = None
config_version_ref: ResourceRef | None = None
class AgentRuntimeOptions(AgentBackendBaseModel):
stream: bool = True
timeout_seconds: float | None = None
debug: bool = False
mock_scenario: str | None = None
class BaseAgentLayerConfig(AgentBackendBaseModel):
id: str
lifecycle_scope: LayerLifecycleScope = LayerLifecycleScope.INVOCATION
depends_on: list[str] = Field(default_factory=list)
class WorkflowContextLayerConfig(BaseAgentLayerConfig):
type: Literal[AgentLayerType.WORKFLOW_CONTEXT] = AgentLayerType.WORKFLOW_CONTEXT
workflow_ref: ResourceRef | None = None
workflow_run_ref: ResourceRef | None = None
node_id: str | None = None
node_execution_id: str | None = None
variables: Mapping[str, JsonValue] = Field(default_factory=dict)
previous_node_outputs: Mapping[str, JsonValue] = Field(default_factory=dict)
class PromptLayerConfig(BaseAgentLayerConfig):
type: Literal[AgentLayerType.PROMPT] = AgentLayerType.PROMPT
origin: PromptOrigin
role: PromptRole
content: str
class FilesystemMountConfig(AgentBackendBaseModel):
ref: ResourceRef
mount_point: str
read_only: bool = False
class FilesystemLayerConfig(BaseAgentLayerConfig):
type: Literal[AgentLayerType.FILESYSTEM] = AgentLayerType.FILESYSTEM
mounts: list[FilesystemMountConfig] = Field(default_factory=list)
class SandboxLayerConfig(BaseAgentLayerConfig):
type: Literal[AgentLayerType.SANDBOX] = AgentLayerType.SANDBOX
provider: str
image: str | None = None
options: Mapping[str, JsonValue] = Field(default_factory=dict)
class DifyPluginToolRef(AgentBackendBaseModel):
tool_ref: ResourceRef
credential_ref: ResourceRef | None = None
runtime_parameters: Mapping[str, JsonValue] = Field(default_factory=dict)
class DifyPluginToolsLayerConfig(BaseAgentLayerConfig):
type: Literal[AgentLayerType.DIFY_PLUGIN_TOOLS] = AgentLayerType.DIFY_PLUGIN_TOOLS
tools: list[DifyPluginToolRef] = Field(default_factory=list)
class CliToolRef(AgentBackendBaseModel):
name: str
command: str
install_command: str | None = None
credential_ref: ResourceRef | None = None
metadata: Mapping[str, JsonValue] = Field(default_factory=dict)
class CliToolsLayerConfig(BaseAgentLayerConfig):
type: Literal[AgentLayerType.CLI_TOOLS] = AgentLayerType.CLI_TOOLS
tools: list[CliToolRef] = Field(default_factory=list)
class SkillsLayerConfig(BaseAgentLayerConfig):
type: Literal[AgentLayerType.SKILLS] = AgentLayerType.SKILLS
skill_refs: list[ResourceRef] = Field(default_factory=list)
class HumanContactRef(AgentBackendBaseModel):
human_ref: ResourceRef
allowed_delivery_methods: list[str] = Field(default_factory=list)
class HumanContactsLayerConfig(BaseAgentLayerConfig):
type: Literal[AgentLayerType.HUMAN_CONTACTS] = AgentLayerType.HUMAN_CONTACTS
contacts: list[HumanContactRef] = Field(default_factory=list)
class OutputDeclaration(AgentBackendBaseModel):
name: str
type: str
json_schema: Mapping[str, JsonValue] | None = Field(default=None, alias="schema")
required: bool = True
validation_prompt: str | None = None
benchmark_file_ref: ResourceRef | None = None
class OutputSchemaLayerConfig(BaseAgentLayerConfig):
type: Literal[AgentLayerType.OUTPUT_SCHEMA] = AgentLayerType.OUTPUT_SCHEMA
outputs: list[OutputDeclaration] = Field(default_factory=list)
class MemoryLayerConfig(BaseAgentLayerConfig):
type: Literal[AgentLayerType.MEMORY] = AgentLayerType.MEMORY
strategy_ref: ResourceRef | None = None
scope: str | None = None
options: Mapping[str, JsonValue] = Field(default_factory=dict)
class SecretBinding(AgentBackendBaseModel):
secret_ref: ResourceRef
env_name: str
required: bool = True
@field_validator("secret_ref")
@classmethod
def validate_secret_ref(cls, value: ResourceRef) -> ResourceRef:
if value.type != ReferenceType.SECRET:
raise ValueError("secret_ref must reference a secret")
return value
class SecretsLayerConfig(BaseAgentLayerConfig):
type: Literal[AgentLayerType.SECRETS] = AgentLayerType.SECRETS
bindings: list[SecretBinding] = Field(default_factory=list)
type AgentLayerConfig = Annotated[
WorkflowContextLayerConfig
| PromptLayerConfig
| FilesystemLayerConfig
| SandboxLayerConfig
| DifyPluginToolsLayerConfig
| CliToolsLayerConfig
| SkillsLayerConfig
| HumanContactsLayerConfig
| OutputSchemaLayerConfig
| MemoryLayerConfig
| SecretsLayerConfig,
Field(discriminator="type"),
]
class CompositorConfig(AgentBackendBaseModel):
contract_version: Literal["agent-backend.v1"] = CONTRACT_VERSION
execution_context: AgentExecutionContext
agent_identity: AgentIdentityRef | None = None
layers: list[AgentLayerConfig]
runtime_options: AgentRuntimeOptions = Field(default_factory=AgentRuntimeOptions)
@model_validator(mode="after")
def validate_contract(self) -> CompositorConfig:
layer_ids = [layer.id for layer in self.layers]
if len(layer_ids) != len(set(layer_ids)):
raise ValueError("layer ids must be unique")
known_ids = set(layer_ids)
for layer in self.layers:
unknown_deps = set(layer.depends_on) - known_ids
if unknown_deps:
raise ValueError(f"layer '{layer.id}' depends on unknown layer ids: {sorted(unknown_deps)}")
return self
_SENSITIVE_KEY_PARTS = ("secret", "credential", "token", "password", "api_key")
def _redact_mapping(value: Any) -> Any:
if isinstance(value, Mapping):
redacted: dict[str, Any] = {}
for key, item in value.items():
key_text = str(key).lower()
if any(part in key_text for part in _SENSITIVE_KEY_PARTS):
redacted[key] = "[REDACTED]"
else:
redacted[key] = _redact_mapping(item)
return redacted
if isinstance(value, list):
return [_redact_mapping(item) for item in value]
return value

View File

@ -1,31 +1,61 @@
"""API-side errors for the Dify Agent backend integration.
The wire protocol and low-level HTTP behaviour are owned by ``dify-agent``.
This module only normalizes those client errors into the API backend's boundary
so workflow/node code does not depend directly on transport-specific exception
classes.
"""
from __future__ import annotations
from typing import Any
class AgentBackendError(Exception):
"""Base error for the agent backend client contract."""
"""Base error for API-side Agent backend integration failures."""
class AgentBackendContractVersionError(AgentBackendError):
"""Raised when an event or DTO uses an unsupported contract version."""
class AgentBackendEventParseError(AgentBackendError):
"""Raised when an agent backend event cannot be parsed into a typed event."""
def __init__(self, message: str, *, raw_event: Any | None = None):
super().__init__(message)
self.raw_event = raw_event
class AgentBackendUnknownEventError(AgentBackendEventParseError):
"""Raised when the agent backend emits an event type unknown to this API version."""
class AgentBackendRequestBuildError(AgentBackendError):
"""Raised when Dify product/workflow state cannot be mapped to a run request."""
class AgentBackendTransportError(AgentBackendError):
"""Raised for transport-level failures talking to the agent backend."""
"""Raised for timeout or request-level failures talking to Agent backend."""
class AgentBackendInvocationError(AgentBackendError):
"""Raised for invocation-level failures before a typed error event can be emitted."""
class AgentBackendHTTPError(AgentBackendTransportError):
"""Raised for Agent backend HTTP errors after status/detail normalization."""
status_code: int
detail: object
def __init__(self, message: str, *, status_code: int, detail: object) -> None:
self.status_code = status_code
self.detail = detail
super().__init__(message)
class AgentBackendValidationError(AgentBackendError):
"""Raised for local request validation or Agent backend 422 responses."""
detail: object
def __init__(self, message: str, *, detail: object) -> None:
self.detail = detail
super().__init__(message)
class AgentBackendStreamError(AgentBackendError):
"""Raised when an Agent backend event stream is malformed or exhausted."""
class AgentBackendRunFailedError(AgentBackendError):
"""Raised by callers that choose to translate a terminal failed run into an exception."""
run_id: str
detail: Any
def __init__(self, run_id: str, detail: Any) -> None:
self.run_id = run_id
self.detail = detail
super().__init__(f"Agent backend run failed: {run_id}")

View File

@ -0,0 +1,167 @@
"""Adapt public ``dify-agent`` run events into API-internal event semantics.
The adapter does not define a new cross-service event contract. It consumes
``dify_agent.protocol.RunEvent`` and produces small API-internal models that the
future workflow Agent Node can map to Graphon/AppQueue events in phase 3.
"""
from __future__ import annotations
from enum import StrEnum
from typing import Annotated, Literal, cast
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.protocol import (
PydanticAIStreamRunEvent,
RunCancelledEvent,
RunEvent,
RunFailedEvent,
RunPausedEvent,
RunStartedEvent,
RunSucceededEvent,
)
from pydantic import BaseModel, ConfigDict, Field, JsonValue, TypeAdapter
_EVENT_DATA_ADAPTER = TypeAdapter(object)
class AgentBackendInternalEventType(StrEnum):
"""API-only event labels used before Graphon/AppQueue integration."""
RUN_STARTED = "run_started"
STREAM_EVENT = "stream_event"
RUN_PAUSED = "run_paused"
RUN_SUCCEEDED = "run_succeeded"
RUN_FAILED = "run_failed"
RUN_CANCELLED = "run_cancelled"
class AgentBackendInternalEventBase(BaseModel):
"""Common fields preserved from public Dify Agent run events."""
run_id: str
source_event_id: str | None = None
model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True)
class AgentBackendRunStartedInternalEvent(AgentBackendInternalEventBase):
"""API-internal marker for a started Agent backend run."""
type: Literal[AgentBackendInternalEventType.RUN_STARTED] = AgentBackendInternalEventType.RUN_STARTED
class AgentBackendStreamInternalEvent(AgentBackendInternalEventBase):
"""API-internal wrapper for one pydantic-ai stream event payload."""
type: Literal[AgentBackendInternalEventType.STREAM_EVENT] = AgentBackendInternalEventType.STREAM_EVENT
event_kind: str | None = None
data: JsonValue
class AgentBackendRunSucceededInternalEvent(AgentBackendInternalEventBase):
"""API-internal terminal success event carrying final output and session state."""
type: Literal[AgentBackendInternalEventType.RUN_SUCCEEDED] = AgentBackendInternalEventType.RUN_SUCCEEDED
output: JsonValue
session_snapshot: CompositorSessionSnapshot
class AgentBackendRunPausedInternalEvent(AgentBackendInternalEventBase):
"""API-internal resumable pause event for human handoff and Babysit flows."""
type: Literal[AgentBackendInternalEventType.RUN_PAUSED] = AgentBackendInternalEventType.RUN_PAUSED
reason: str
message: str | None = None
session_snapshot: CompositorSessionSnapshot | None = None
class AgentBackendRunFailedInternalEvent(AgentBackendInternalEventBase):
"""API-internal terminal failure event carrying the backend-safe error text."""
type: Literal[AgentBackendInternalEventType.RUN_FAILED] = AgentBackendInternalEventType.RUN_FAILED
error: str
reason: str | None = None
class AgentBackendRunCancelledInternalEvent(AgentBackendInternalEventBase):
"""API-internal terminal cancellation event."""
type: Literal[AgentBackendInternalEventType.RUN_CANCELLED] = AgentBackendInternalEventType.RUN_CANCELLED
reason: str | None = None
message: str | None = None
type AgentBackendInternalEvent = Annotated[
AgentBackendRunStartedInternalEvent
| AgentBackendStreamInternalEvent
| AgentBackendRunPausedInternalEvent
| AgentBackendRunSucceededInternalEvent
| AgentBackendRunFailedInternalEvent
| AgentBackendRunCancelledInternalEvent,
Field(discriminator="type"),
]
class AgentBackendRunEventAdapter:
"""Maps public ``dify-agent`` event variants to API-internal event variants."""
def adapt(self, event: RunEvent) -> list[AgentBackendInternalEvent]:
"""Return zero or more API-internal events derived from one public run event."""
match event:
case RunStartedEvent():
return [
AgentBackendRunStartedInternalEvent(
run_id=event.run_id,
source_event_id=event.id,
)
]
case PydanticAIStreamRunEvent():
data = cast(JsonValue, _EVENT_DATA_ADAPTER.dump_python(event.data, mode="json"))
event_kind = data.get("event_kind") if isinstance(data, dict) else None
return [
AgentBackendStreamInternalEvent(
run_id=event.run_id,
source_event_id=event.id,
event_kind=event_kind if isinstance(event_kind, str) else None,
data=data,
)
]
case RunSucceededEvent():
return [
AgentBackendRunSucceededInternalEvent(
run_id=event.run_id,
source_event_id=event.id,
output=event.data.output,
session_snapshot=event.data.session_snapshot,
)
]
case RunPausedEvent():
return [
AgentBackendRunPausedInternalEvent(
run_id=event.run_id,
source_event_id=event.id,
reason=event.data.reason,
message=event.data.message,
session_snapshot=event.data.session_snapshot,
)
]
case RunFailedEvent():
return [
AgentBackendRunFailedInternalEvent(
run_id=event.run_id,
source_event_id=event.id,
error=event.data.error,
reason=event.data.reason,
)
]
case RunCancelledEvent():
return [
AgentBackendRunCancelledInternalEvent(
run_id=event.run_id,
source_event_id=event.id,
reason=event.data.reason,
message=event.data.message,
)
]
raise TypeError(f"unsupported agent backend run event: {type(event).__name__}")

View File

@ -1,64 +0,0 @@
from __future__ import annotations
from collections.abc import Iterable, Iterator, Mapping
from typing import Any
from pydantic import TypeAdapter, ValidationError
from clients.agent_backend.dto import CONTRACT_VERSION
from clients.agent_backend.errors import (
AgentBackendContractVersionError,
AgentBackendEventParseError,
AgentBackendUnknownEventError,
)
from clients.agent_backend.events import (
AgentBackendEvent,
AgentBackendEventEnvelope,
AgentBackendEventType,
)
_event_adapter = TypeAdapter(AgentBackendEvent)
class AgentBackendEventParser:
def parse(self, raw_event: Mapping[str, Any]) -> AgentBackendEvent:
raw_contract_version = raw_event.get("contract_version")
if raw_contract_version != CONTRACT_VERSION:
raise AgentBackendContractVersionError(
f"Unsupported agent backend contract version: {raw_contract_version}"
)
raw_event_type = raw_event.get("type")
try:
event_type = AgentBackendEventType(raw_event_type)
except ValueError as exc:
raise AgentBackendUnknownEventError(
f"Unknown agent backend event type: {raw_event_type}",
raw_event=raw_event,
) from exc
try:
envelope = AgentBackendEventEnvelope.model_validate({**raw_event, "type": event_type.value})
except ValidationError as exc:
raise AgentBackendEventParseError("Invalid agent backend event envelope", raw_event=raw_event) from exc
event_payload = {
"event_id": envelope.event_id,
"sequence": envelope.sequence,
"type": event_type.value,
"created_at": envelope.created_at,
"execution_context": envelope.execution_context,
**envelope.payload,
}
try:
return _event_adapter.validate_python(event_payload)
except ValidationError as exc:
raise AgentBackendEventParseError(
f"Invalid payload for agent backend event type: {event_type}",
raw_event=raw_event,
) from exc
def parse_many(self, raw_events: Iterable[Mapping[str, Any]]) -> Iterator[AgentBackendEvent]:
for raw_event in raw_events:
yield self.parse(raw_event)

View File

@ -1,158 +0,0 @@
from __future__ import annotations
from enum import StrEnum
from typing import Annotated, Any, Literal
from pydantic import Field, JsonValue
from clients.agent_backend.dto import (
CONTRACT_VERSION,
AgentBackendBaseModel,
AgentExecutionContext,
ResourceRef,
)
from clients.agent_backend.lifecycle import AgentLifecycleEvent, AgentLifecycleReason
class AgentBackendEventType(StrEnum):
LIFECYCLE = "lifecycle"
TEXT_DELTA = "text.delta"
TEXT_COMPLETED = "text.completed"
TOOL_CALL_STARTED = "tool_call.started"
TOOL_CALL_DELTA = "tool_call.delta"
TOOL_CALL_SUCCEEDED = "tool_call.succeeded"
TOOL_CALL_FAILED = "tool_call.failed"
FILE_CREATED = "file.created"
OUTPUT_DELTA = "output.delta"
OUTPUT_CREATED = "output.created"
OUTPUT_VALIDATION_FAILED = "output.validation_failed"
ERROR = "error"
PAUSE_REQUESTED = "pause.requested"
class AgentBackendEventBase(AgentBackendBaseModel):
event_id: str
sequence: int
created_at: int
execution_context: AgentExecutionContext
class AgentLifecycleAckEvent(AgentBackendEventBase):
type: Literal[AgentBackendEventType.LIFECYCLE] = AgentBackendEventType.LIFECYCLE
lifecycle_event: AgentLifecycleEvent
lifecycle_reason: AgentLifecycleReason
message: str | None = None
class AgentTextDeltaEvent(AgentBackendEventBase):
type: Literal[AgentBackendEventType.TEXT_DELTA] = AgentBackendEventType.TEXT_DELTA
delta: str
output_name: str = "text"
class AgentTextCompletedEvent(AgentBackendEventBase):
type: Literal[AgentBackendEventType.TEXT_COMPLETED] = AgentBackendEventType.TEXT_COMPLETED
text: str
output_name: str = "text"
class AgentToolCallStartedEvent(AgentBackendEventBase):
type: Literal[AgentBackendEventType.TOOL_CALL_STARTED] = AgentBackendEventType.TOOL_CALL_STARTED
tool_call_id: str
tool_name: str
parent_id: str | None = None
input: JsonValue | None = None
metadata: dict[str, JsonValue] = Field(default_factory=dict)
class AgentToolCallDeltaEvent(AgentBackendEventBase):
type: Literal[AgentBackendEventType.TOOL_CALL_DELTA] = AgentBackendEventType.TOOL_CALL_DELTA
tool_call_id: str
delta: str
class AgentToolCallSucceededEvent(AgentBackendEventBase):
type: Literal[AgentBackendEventType.TOOL_CALL_SUCCEEDED] = AgentBackendEventType.TOOL_CALL_SUCCEEDED
tool_call_id: str
output: JsonValue | None = None
metadata: dict[str, JsonValue] = Field(default_factory=dict)
class AgentToolCallFailedEvent(AgentBackendEventBase):
type: Literal[AgentBackendEventType.TOOL_CALL_FAILED] = AgentBackendEventType.TOOL_CALL_FAILED
tool_call_id: str
error: str
retryable: bool = False
metadata: dict[str, JsonValue] = Field(default_factory=dict)
class AgentFileCreatedEvent(AgentBackendEventBase):
type: Literal[AgentBackendEventType.FILE_CREATED] = AgentBackendEventType.FILE_CREATED
file_ref: ResourceRef
output_name: str | None = None
metadata: dict[str, JsonValue] = Field(default_factory=dict)
class AgentOutputDeltaEvent(AgentBackendEventBase):
type: Literal[AgentBackendEventType.OUTPUT_DELTA] = AgentBackendEventType.OUTPUT_DELTA
output_name: str
delta: JsonValue
class AgentOutputCreatedEvent(AgentBackendEventBase):
type: Literal[AgentBackendEventType.OUTPUT_CREATED] = AgentBackendEventType.OUTPUT_CREATED
output_name: str
value: JsonValue
metadata: dict[str, JsonValue] = Field(default_factory=dict)
class AgentOutputValidationFailedEvent(AgentBackendEventBase):
type: Literal[AgentBackendEventType.OUTPUT_VALIDATION_FAILED] = AgentBackendEventType.OUTPUT_VALIDATION_FAILED
output_name: str
error: str
retryable: bool = True
metadata: dict[str, JsonValue] = Field(default_factory=dict)
class AgentErrorEvent(AgentBackendEventBase):
type: Literal[AgentBackendEventType.ERROR] = AgentBackendEventType.ERROR
category: str
code: str
message: str
retryable: bool = False
safe_details: dict[str, JsonValue] = Field(default_factory=dict)
class AgentPauseRequestedEvent(AgentBackendEventBase):
type: Literal[AgentBackendEventType.PAUSE_REQUESTED] = AgentBackendEventType.PAUSE_REQUESTED
reason: str
message: str | None = None
metadata: dict[str, JsonValue] = Field(default_factory=dict)
type AgentBackendEvent = Annotated[
AgentLifecycleAckEvent
| AgentTextDeltaEvent
| AgentTextCompletedEvent
| AgentToolCallStartedEvent
| AgentToolCallDeltaEvent
| AgentToolCallSucceededEvent
| AgentToolCallFailedEvent
| AgentFileCreatedEvent
| AgentOutputDeltaEvent
| AgentOutputCreatedEvent
| AgentOutputValidationFailedEvent
| AgentErrorEvent
| AgentPauseRequestedEvent,
Field(discriminator="type"),
]
class AgentBackendEventEnvelope(AgentBackendBaseModel):
contract_version: Literal["agent-backend.v1"] = CONTRACT_VERSION
event_id: str
sequence: int
type: AgentBackendEventType
created_at: int
execution_context: AgentExecutionContext
payload: dict[str, Any]

View File

@ -1,12 +1,22 @@
"""Factories for API-side Agent backend clients."""
from __future__ import annotations
from clients.agent_backend.client import AgentBackendClient
from clients.agent_backend.mock_client import MockAgentBackendClient, MockAgentBackendScenario
from dify_agent.client import Client
from clients.agent_backend.client import AgentBackendRunClient, DifyAgentBackendRunClient
from clients.agent_backend.fake_client import FakeAgentBackendRunClient, FakeAgentBackendScenario
def create_agent_backend_client(*, use_mock: bool = True, mock_scenario: str | None = None) -> AgentBackendClient:
if use_mock:
scenario = MockAgentBackendScenario(mock_scenario) if mock_scenario else MockAgentBackendScenario.SUCCESS_TEXT
return MockAgentBackendClient(scenario=scenario)
raise NotImplementedError("Real agent backend client is not implemented in phase 0")
def create_agent_backend_run_client(
*,
base_url: str | None = None,
use_fake: bool = False,
fake_scenario: str | FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCESS,
) -> AgentBackendRunClient:
"""Create the API-side run client without hiding the ``dify-agent`` protocol."""
if use_fake:
return FakeAgentBackendRunClient(scenario=FakeAgentBackendScenario(fake_scenario))
if base_url is None:
raise ValueError("base_url is required when creating a real Agent backend client")
return DifyAgentBackendRunClient(Client(base_url=base_url))

View File

@ -0,0 +1,117 @@
"""Deterministic fake Agent backend client using public ``dify-agent`` events.
Tests should exercise the same ``RunEvent`` DTOs as the real HTTP client. This
fake therefore replaces the previous custom mock protocol instead of emulating a
separate ``agent-backend.v1`` event stream.
"""
from __future__ import annotations
from collections.abc import Iterator
from datetime import UTC, datetime
from enum import StrEnum
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.protocol import (
CancelRunRequest,
CancelRunResponse,
CreateRunRequest,
CreateRunResponse,
RunEvent,
RunFailedEvent,
RunFailedEventData,
RunStartedEvent,
RunStatusResponse,
RunSucceededEvent,
RunSucceededEventData,
)
_FIXED_TIME = datetime(2026, 1, 1, tzinfo=UTC)
class FakeAgentBackendScenario(StrEnum):
"""Deterministic fake scenarios for API-side integration tests."""
SUCCESS = "success"
FAILED = "failed"
class FakeAgentBackendRunClient:
"""In-memory implementation of ``AgentBackendRunClient`` for unit tests."""
scenario: FakeAgentBackendScenario
run_id: str
request: CreateRunRequest | None
def __init__(
self,
*,
scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCESS,
run_id: str = "fake-run-1",
) -> None:
self.scenario = scenario
self.run_id = run_id
self.request = None
def create_run(self, request: CreateRunRequest) -> CreateRunResponse:
"""Record the request and return a deterministic accepted response."""
self.request = request
return CreateRunResponse(run_id=self.run_id, status="running")
def cancel_run(self, run_id: str, request: CancelRunRequest | None = None) -> CancelRunResponse:
"""Return a deterministic cancellation response."""
del request
return CancelRunResponse(run_id=run_id, status="cancelled")
def stream_events(self, run_id: str, *, after: str | None = None) -> Iterator[RunEvent]:
"""Yield the deterministic public ``RunEvent`` sequence for ``run_id``."""
for event in self._events(run_id):
if after is not None and event.id is not None and event.id <= after:
continue
yield event
def wait_run(self, run_id: str, *, timeout_seconds: float | None = None) -> RunStatusResponse:
"""Return a deterministic terminal status; timeout is accepted for protocol parity."""
del timeout_seconds
match self.scenario:
case FakeAgentBackendScenario.SUCCESS:
return RunStatusResponse(
run_id=run_id,
status="succeeded",
created_at=_FIXED_TIME,
updated_at=_FIXED_TIME,
)
case FakeAgentBackendScenario.FAILED:
return RunStatusResponse(
run_id=run_id,
status="failed",
created_at=_FIXED_TIME,
updated_at=_FIXED_TIME,
error="fake failure",
)
def _events(self, run_id: str) -> tuple[RunEvent, ...]:
match self.scenario:
case FakeAgentBackendScenario.SUCCESS:
return (
RunStartedEvent(id="1-0", run_id=run_id, created_at=_FIXED_TIME),
RunSucceededEvent(
id="2-0",
run_id=run_id,
created_at=_FIXED_TIME,
data=RunSucceededEventData(
output={"text": "hello agent"},
session_snapshot=CompositorSessionSnapshot(layers=[]),
),
),
)
case FakeAgentBackendScenario.FAILED:
return (
RunStartedEvent(id="1-0", run_id=run_id, created_at=_FIXED_TIME),
RunFailedEvent(
id="2-0",
run_id=run_id,
created_at=_FIXED_TIME,
data=RunFailedEventData(error="fake failure", reason="unit_test"),
),
)

View File

@ -1,53 +0,0 @@
from __future__ import annotations
from enum import StrEnum
from pydantic import Field
from clients.agent_backend.dto import AgentBackendBaseModel, AgentExecutionContext, CompositorConfig
class AgentLifecycleEvent(StrEnum):
CREATE = "create"
TMP_LEAVE = "tmp_leave"
REENTER = "reenter"
DELETE = "delete"
class AgentLifecycleReason(StrEnum):
WORKFLOW_RUN_START = "workflow_run_start"
WORKFLOW_RUN_FINISH = "workflow_run_finish"
SINGLE_STEP_START = "single_step_start"
BABYSIT_START = "babysit_start"
HUMAN_HANDOFF = "human_handoff"
WORKFLOW_HANDOFF = "workflow_handoff"
RESUME = "resume"
FASTEN_PREVIEW = "fasten_preview"
CANCEL = "cancel"
class AgentBackendLifecycleSignal(AgentBackendBaseModel):
event: AgentLifecycleEvent
reason: AgentLifecycleReason
execution_context: AgentExecutionContext
target_layer_ids: list[str] | None = None
idempotency_key: str
class AgentBackendInvokeRequest(AgentBackendBaseModel):
compositor_config: CompositorConfig
lifecycle_signals: list[AgentBackendLifecycleSignal] = Field(default_factory=list)
idempotency_key: str
stream: bool = True
class AgentBackendLifecycleRequest(AgentBackendBaseModel):
signal: AgentBackendLifecycleSignal
class AgentBackendLifecycleAck(AgentBackendBaseModel):
accepted: bool
event: AgentLifecycleEvent
reason: AgentLifecycleReason
idempotency_key: str
message: str | None = None

View File

@ -1,146 +0,0 @@
from __future__ import annotations
from collections.abc import Iterator, Mapping
from enum import StrEnum
from typing import Any
from clients.agent_backend.dto import CONTRACT_VERSION, AgentBackendBaseModel, ReferenceType, ResourceRef
from clients.agent_backend.event_parser import AgentBackendEventParser
from clients.agent_backend.events import AgentBackendEvent, AgentBackendEventType
from clients.agent_backend.lifecycle import (
AgentBackendInvokeRequest,
AgentBackendLifecycleAck,
AgentBackendLifecycleRequest,
AgentLifecycleEvent,
AgentLifecycleReason,
)
class MockAgentBackendScenario(StrEnum):
SUCCESS_TEXT = "success_text"
SUCCESS_FILE = "success_file"
TOOL_CALL = "tool_call"
PAUSE = "pause"
ERROR = "error"
UNKNOWN_EVENT = "unknown_event"
class MockAgentBackendClient(AgentBackendBaseModel):
scenario: MockAgentBackendScenario = MockAgentBackendScenario.SUCCESS_TEXT
def invoke(self, request: AgentBackendInvokeRequest) -> Iterator[AgentBackendEvent]:
parser = AgentBackendEventParser()
scenario = request.compositor_config.runtime_options.mock_scenario or self.scenario
for raw_event in self._raw_events(request, MockAgentBackendScenario(scenario)):
yield parser.parse(raw_event)
def send_lifecycle(self, request: AgentBackendLifecycleRequest) -> AgentBackendLifecycleAck:
return AgentBackendLifecycleAck(
accepted=True,
event=request.signal.event,
reason=request.signal.reason,
idempotency_key=request.signal.idempotency_key,
)
def _raw_events(
self, request: AgentBackendInvokeRequest, scenario: MockAgentBackendScenario
) -> Iterator[Mapping[str, Any]]:
match scenario:
case MockAgentBackendScenario.SUCCESS_TEXT:
yield self._envelope(request, 1, AgentBackendEventType.LIFECYCLE, self._lifecycle_payload())
yield self._envelope(request, 2, AgentBackendEventType.TEXT_DELTA, {"delta": "hello "})
yield self._envelope(request, 3, AgentBackendEventType.TEXT_DELTA, {"delta": "agent"})
yield self._envelope(request, 4, AgentBackendEventType.TEXT_COMPLETED, {"text": "hello agent"})
yield self._envelope(
request,
5,
AgentBackendEventType.OUTPUT_CREATED,
{"output_name": "text", "value": "hello agent"},
)
case MockAgentBackendScenario.SUCCESS_FILE:
yield self._envelope(request, 1, AgentBackendEventType.LIFECYCLE, self._lifecycle_payload())
yield self._envelope(
request,
2,
AgentBackendEventType.FILE_CREATED,
{
"file_ref": ResourceRef(
type=ReferenceType.FILE,
id="mock-file-1",
name="result.txt",
).model_dump(mode="json"),
"output_name": "result_file",
},
)
yield self._envelope(
request,
3,
AgentBackendEventType.OUTPUT_CREATED,
{"output_name": "result_file", "value": {"file_id": "mock-file-1"}},
)
case MockAgentBackendScenario.TOOL_CALL:
yield self._envelope(
request,
1,
AgentBackendEventType.TOOL_CALL_STARTED,
{"tool_call_id": "tool-call-1", "tool_name": "web_search", "input": {"query": "Dify"}},
)
yield self._envelope(
request,
2,
AgentBackendEventType.TOOL_CALL_SUCCEEDED,
{"tool_call_id": "tool-call-1", "output": {"result": "ok"}},
)
yield self._envelope(request, 3, AgentBackendEventType.TEXT_DELTA, {"delta": "done"})
yield self._envelope(
request,
4,
AgentBackendEventType.OUTPUT_CREATED,
{"output_name": "text", "value": "done"},
)
case MockAgentBackendScenario.PAUSE:
yield self._envelope(request, 1, AgentBackendEventType.TEXT_DELTA, {"delta": "waiting"})
yield self._envelope(
request,
2,
AgentBackendEventType.PAUSE_REQUESTED,
{"reason": "human_handoff", "message": "Need human input"},
)
case MockAgentBackendScenario.ERROR:
yield self._envelope(
request,
1,
AgentBackendEventType.ERROR,
{
"category": "mock",
"code": "mock_error",
"message": "Mock agent backend error",
"retryable": False,
},
)
case MockAgentBackendScenario.UNKNOWN_EVENT:
yield self._envelope(request, 1, "unknown.event", {"value": "boom"})
def _envelope(
self,
request: AgentBackendInvokeRequest,
sequence: int,
event_type: AgentBackendEventType | str,
payload: Mapping[str, Any],
) -> dict[str, Any]:
return {
"contract_version": CONTRACT_VERSION,
"event_id": f"{request.idempotency_key}:{sequence}",
"sequence": sequence,
"type": str(event_type),
"created_at": 1_800_000_000_000 + sequence,
"execution_context": request.compositor_config.execution_context.model_dump(mode="json"),
"payload": dict(payload),
}
@staticmethod
def _lifecycle_payload() -> dict[str, str]:
return {
"lifecycle_event": AgentLifecycleEvent.CREATE.value,
"lifecycle_reason": AgentLifecycleReason.WORKFLOW_RUN_START.value,
}

View File

@ -0,0 +1,192 @@
"""Build ``dify-agent`` run requests from API-side product concepts.
This module is intentionally an adapter, not a wire DTO package. The emitted
object is always ``dify_agent.protocol.CreateRunRequest`` so the Agent backend
protocol has a single owner. API-only context such as Agent Soul vs workflow job
prompt is preserved in layer names and metadata until the dedicated product
schemas land in later phases.
"""
from __future__ import annotations
from typing import ClassVar
from agenton.compositor import CompositorSessionSnapshot
from agenton.layers import ExitIntent
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig
from dify_agent.layers.dify_plugin import (
DIFY_PLUGIN_LAYER_TYPE_ID,
DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
DifyPluginCredentialValue,
DifyPluginLayerConfig,
DifyPluginLLMLayerConfig,
)
from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerConfig
from dify_agent.protocol import (
DIFY_AGENT_MODEL_LAYER_ID,
DIFY_AGENT_OUTPUT_LAYER_ID,
CreateRunRequest,
ExecutionContext,
LayerExitSignals,
RunComposition,
RunLayerSpec,
RunPurpose,
)
from pydantic import BaseModel, ConfigDict, Field, JsonValue, field_validator
AGENT_SOUL_PROMPT_LAYER_ID = "agent_soul_prompt"
WORKFLOW_NODE_JOB_PROMPT_LAYER_ID = "workflow_node_job_prompt"
WORKFLOW_USER_PROMPT_LAYER_ID = "workflow_user_prompt"
DIFY_PLUGIN_CONTEXT_LAYER_ID = "plugin"
class AgentBackendModelConfig(BaseModel):
"""API-side model/plugin selection before it is converted to Dify Agent layers."""
tenant_id: str
plugin_id: str
model_provider: str
model: str
user_id: str | None = None
credentials: dict[str, DifyPluginCredentialValue] = Field(default_factory=dict)
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class AgentBackendOutputConfig(BaseModel):
"""API-side structured output declaration for the conventional output layer."""
json_schema: dict[str, JsonValue]
name: str = "final_result"
description: str | None = None
strict: bool | None = None
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class AgentBackendWorkflowNodeRunInput(BaseModel):
"""Inputs needed to build the first workflow-node-oriented Agent backend run request."""
model: AgentBackendModelConfig
execution_context: ExecutionContext
workflow_node_job_prompt: str
user_prompt: str
agent_soul_prompt: str | None = None
purpose: RunPurpose = "workflow_node"
idempotency_key: str | None = None
output: AgentBackendOutputConfig | None = None
session_snapshot: CompositorSessionSnapshot | None = None
suspend_on_exit: bool = False
metadata: dict[str, JsonValue] = Field(default_factory=dict)
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid", arbitrary_types_allowed=True)
@field_validator("workflow_node_job_prompt", "user_prompt")
@classmethod
def _reject_blank_prompt(cls, value: str) -> str:
if not value.strip():
raise ValueError("prompt must not be blank")
return value
class AgentBackendRunRequestBuilder:
"""Converts API product state into the public ``dify-agent`` run protocol."""
def build_for_workflow_node(self, run_input: AgentBackendWorkflowNodeRunInput) -> CreateRunRequest:
"""Build a workflow Agent Node run request without defining another wire schema."""
layers: list[RunLayerSpec] = []
if run_input.agent_soul_prompt:
layers.append(
RunLayerSpec(
name=AGENT_SOUL_PROMPT_LAYER_ID,
type=PLAIN_PROMPT_LAYER_TYPE_ID,
metadata={**run_input.metadata, "origin": "agent_soul"},
config=PromptLayerConfig(prefix=run_input.agent_soul_prompt),
)
)
layers.extend(
[
RunLayerSpec(
name=WORKFLOW_NODE_JOB_PROMPT_LAYER_ID,
type=PLAIN_PROMPT_LAYER_TYPE_ID,
metadata={**run_input.metadata, "origin": "workflow_node_job"},
config=PromptLayerConfig(prefix=run_input.workflow_node_job_prompt),
),
RunLayerSpec(
name=WORKFLOW_USER_PROMPT_LAYER_ID,
type=PLAIN_PROMPT_LAYER_TYPE_ID,
metadata={**run_input.metadata, "origin": "workflow_user_prompt"},
config=PromptLayerConfig(user=run_input.user_prompt),
),
RunLayerSpec(
name=DIFY_PLUGIN_CONTEXT_LAYER_ID,
type=DIFY_PLUGIN_LAYER_TYPE_ID,
metadata=run_input.metadata,
config=DifyPluginLayerConfig(
tenant_id=run_input.model.tenant_id,
plugin_id=run_input.model.plugin_id,
user_id=run_input.model.user_id,
),
),
RunLayerSpec(
name=DIFY_AGENT_MODEL_LAYER_ID,
type=DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
deps={"plugin": DIFY_PLUGIN_CONTEXT_LAYER_ID},
metadata=run_input.metadata,
config=DifyPluginLLMLayerConfig(
model_provider=run_input.model.model_provider,
model=run_input.model.model,
credentials=run_input.model.credentials,
),
),
]
)
if run_input.output is not None:
layers.append(
RunLayerSpec(
name=DIFY_AGENT_OUTPUT_LAYER_ID,
type=DIFY_OUTPUT_LAYER_TYPE_ID,
metadata=run_input.metadata,
config=DifyOutputLayerConfig(
json_schema=run_input.output.json_schema,
name=run_input.output.name,
description=run_input.output.description,
strict=run_input.output.strict,
),
)
)
return CreateRunRequest(
composition=RunComposition(layers=layers),
execution_context=run_input.execution_context,
purpose=run_input.purpose,
idempotency_key=run_input.idempotency_key,
metadata=run_input.metadata,
session_snapshot=run_input.session_snapshot,
on_exit=LayerExitSignals(
default=ExitIntent.SUSPEND if run_input.suspend_on_exit else ExitIntent.DELETE,
),
)
_SENSITIVE_KEY_PARTS = ("secret", "credential", "token", "password", "api_key")
def redact_for_agent_backend_log(value: object) -> object:
"""Return a JSON-like copy with credential-bearing keys redacted for logs/tests."""
if isinstance(value, BaseModel):
return redact_for_agent_backend_log(value.model_dump(mode="json", warnings=False))
if isinstance(value, dict):
redacted: dict[object, object] = {}
for key, item in value.items():
key_text = str(key).lower()
if any(part in key_text for part in _SENSITIVE_KEY_PARTS):
redacted[key] = "[REDACTED]"
else:
redacted[key] = redact_for_agent_backend_log(item)
return redacted
if isinstance(value, list):
return [redact_for_agent_backend_log(item) for item in value]
return value

View File

@ -9,6 +9,7 @@ dependencies = [
"boto3>=1.43.6",
"celery>=5.6.3",
"croniter>=6.2.2",
"dify-agent",
"flask>=3.1.3,<4.0.0",
"flask-cors>=6.0.2",
"gevent>=26.4.0",
@ -114,7 +115,6 @@ override-dependencies = [
############################################################
dev = [
"coverage>=7.13.4",
"dify-agent",
"dotenv-linter>=0.7.0",
"faker>=40.15.0",
"lxml-stubs>=0.5.1",

View File

@ -0,0 +1,126 @@
from collections.abc import Iterator
import pytest
from dify_agent.client import DifyAgentHTTPError, DifyAgentStreamError, DifyAgentTimeoutError, DifyAgentValidationError
from dify_agent.protocol import (
CancelRunRequest,
CancelRunResponse,
CreateRunRequest,
CreateRunResponse,
ExecutionContext,
RunEvent,
RunStartedEvent,
RunStatusResponse,
)
from clients.agent_backend import (
AgentBackendHTTPError,
AgentBackendModelConfig,
AgentBackendRunRequestBuilder,
AgentBackendStreamError,
AgentBackendTransportError,
AgentBackendValidationError,
AgentBackendWorkflowNodeRunInput,
DifyAgentBackendRunClient,
)
def _request():
return AgentBackendRunRequestBuilder().build_for_workflow_node(
AgentBackendWorkflowNodeRunInput(
model=AgentBackendModelConfig(
tenant_id="tenant-1",
plugin_id="langgenius/openai",
model_provider="openai",
model="gpt-test",
),
execution_context=ExecutionContext(tenant_id="tenant-1", invoke_from="workflow_run"),
workflow_node_job_prompt="Do the task.",
user_prompt="hello",
)
)
class _SuccessfulClient:
def create_run_sync(self, request: CreateRunRequest) -> CreateRunResponse:
assert isinstance(request, CreateRunRequest)
return CreateRunResponse(run_id="run-1", status="running")
def cancel_run_sync(self, run_id: str, request: CancelRunRequest | None = None) -> CancelRunResponse:
del request
return CancelRunResponse(run_id=run_id, status="cancelled")
def stream_events_sync(self, run_id: str, *, after: str | None = None) -> Iterator[RunEvent]:
del after
yield RunStartedEvent(id="1-0", run_id=run_id)
def wait_run_sync(self, run_id: str, *, timeout_seconds: float | None = None) -> RunStatusResponse:
del timeout_seconds
return RunStatusResponse.model_validate(
{
"run_id": run_id,
"status": "succeeded",
"created_at": "2026-01-01T00:00:00+00:00",
"updated_at": "2026-01-01T00:00:00+00:00",
}
)
def test_dify_agent_backend_run_client_delegates_sync_methods():
client = DifyAgentBackendRunClient(_SuccessfulClient())
created = client.create_run(_request())
cancelled = client.cancel_run(created.run_id)
events = list(client.stream_events(created.run_id))
status = client.wait_run(created.run_id)
assert created.run_id == "run-1"
assert cancelled.status == "cancelled"
assert events[0].type == "run_started"
assert status.status == "succeeded"
def test_dify_agent_backend_run_client_maps_validation_error():
class InvalidClient(_SuccessfulClient):
def create_run_sync(self, request: CreateRunRequest) -> CreateRunResponse:
raise DifyAgentValidationError(detail={"field": "bad"})
with pytest.raises(AgentBackendValidationError) as exc_info:
DifyAgentBackendRunClient(InvalidClient()).create_run(_request())
assert exc_info.value.detail == {"field": "bad"}
def test_dify_agent_backend_run_client_maps_http_error():
class HTTPErrorClient(_SuccessfulClient):
def create_run_sync(self, request: CreateRunRequest) -> CreateRunResponse:
raise DifyAgentHTTPError(status_code=503, detail="unavailable")
with pytest.raises(AgentBackendHTTPError) as exc_info:
DifyAgentBackendRunClient(HTTPErrorClient()).create_run(_request())
assert exc_info.value.status_code == 503
assert exc_info.value.detail == "unavailable"
def test_dify_agent_backend_run_client_maps_timeout_error():
class TimeoutClient(_SuccessfulClient):
def wait_run_sync(self, run_id: str, *, timeout_seconds: float | None = None) -> RunStatusResponse:
raise DifyAgentTimeoutError("timeout")
with pytest.raises(AgentBackendTransportError) as exc_info:
DifyAgentBackendRunClient(TimeoutClient()).wait_run("run-1")
assert str(exc_info.value) == "timeout"
def test_dify_agent_backend_run_client_maps_stream_error():
class StreamClient(_SuccessfulClient):
def stream_events_sync(self, run_id: str, *, after: str | None = None) -> Iterator[RunEvent]:
raise DifyAgentStreamError("bad stream")
yield
with pytest.raises(AgentBackendStreamError) as exc_info:
list(DifyAgentBackendRunClient(StreamClient()).stream_events("run-1"))
assert str(exc_info.value) == "bad stream"

View File

@ -1,135 +0,0 @@
import pytest
from pydantic import ValidationError
from clients.agent_backend import (
CONTRACT_VERSION,
AgentExecutionContext,
AgentInvokeFrom,
AgentLayerType,
CompositorConfig,
PromptLayerConfig,
PromptOrigin,
PromptRole,
ReferenceType,
ResourceRef,
SecretBinding,
SecretsLayerConfig,
)
def _execution_context() -> AgentExecutionContext:
return AgentExecutionContext(
tenant_id="tenant-1",
app_id="app-1",
workflow_id="workflow-1",
workflow_run_id="workflow-run-1",
node_id="node-1",
node_execution_id="node-execution-1",
invoke_from=AgentInvokeFrom.WORKFLOW_RUN,
)
def test_compositor_config_serializes_contract_shape():
config = CompositorConfig(
execution_context=_execution_context(),
layers=[
PromptLayerConfig(
id="agent-soul-prompt",
origin=PromptOrigin.AGENT_SOUL,
role=PromptRole.SYSTEM,
content="You are a helpful agent.",
),
PromptLayerConfig(
id="workflow-job-prompt",
origin=PromptOrigin.WORKFLOW_NODE_JOB,
role=PromptRole.USER,
content="Review the previous node output.",
depends_on=["agent-soul-prompt"],
),
],
)
dumped = config.model_dump(mode="json")
assert dumped["contract_version"] == CONTRACT_VERSION
assert dumped["execution_context"]["tenant_id"] == "tenant-1"
assert dumped["layers"][0]["type"] == AgentLayerType.PROMPT
assert dumped["layers"][0]["origin"] == PromptOrigin.AGENT_SOUL
assert dumped["layers"][1]["depends_on"] == ["agent-soul-prompt"]
def test_compositor_config_rejects_duplicate_layer_ids():
with pytest.raises(ValidationError, match="layer ids must be unique"):
CompositorConfig(
execution_context=_execution_context(),
layers=[
PromptLayerConfig(
id="prompt",
origin=PromptOrigin.AGENT_SOUL,
role=PromptRole.SYSTEM,
content="one",
),
PromptLayerConfig(
id="prompt",
origin=PromptOrigin.WORKFLOW_NODE_JOB,
role=PromptRole.USER,
content="two",
),
],
)
def test_compositor_config_rejects_unknown_layer_dependency():
with pytest.raises(ValidationError, match="depends on unknown layer ids"):
CompositorConfig(
execution_context=_execution_context(),
layers=[
PromptLayerConfig(
id="prompt",
origin=PromptOrigin.WORKFLOW_NODE_JOB,
role=PromptRole.USER,
content="two",
depends_on=["missing"],
),
],
)
def test_secret_binding_rejects_plaintext_secret_payload():
with pytest.raises(ValidationError):
SecretBinding.model_validate(
{
"secret_ref": {"type": "secret", "id": "secret-1"},
"env_name": "GITHUB_TOKEN",
"value": "plaintext-token",
}
)
def test_secret_binding_requires_secret_ref_type():
with pytest.raises(ValidationError, match="secret_ref must reference a secret"):
SecretBinding(
secret_ref=ResourceRef(type=ReferenceType.CREDENTIAL, id="credential-1"),
env_name="GITHUB_TOKEN",
)
def test_redacted_dump_hides_secret_and_credential_refs():
config = CompositorConfig(
execution_context=_execution_context(),
layers=[
SecretsLayerConfig(
id="secrets",
bindings=[
SecretBinding(
secret_ref=ResourceRef(type=ReferenceType.SECRET, id="secret-1"),
env_name="GITHUB_TOKEN",
)
],
)
],
)
redacted = config.model_dump_redacted()
assert redacted["layers"][0]["bindings"][0]["secret_ref"] == "[REDACTED]"

View File

@ -0,0 +1,132 @@
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.protocol import (
PydanticAIStreamRunEvent,
RunCancelledEvent,
RunCancelledEventData,
RunFailedEvent,
RunFailedEventData,
RunPausedEvent,
RunPausedEventData,
RunStartedEvent,
RunSucceededEvent,
RunSucceededEventData,
)
from pydantic_ai.messages import FinalResultEvent
from clients.agent_backend import (
AgentBackendInternalEventType,
AgentBackendRunCancelledInternalEvent,
AgentBackendRunEventAdapter,
AgentBackendRunFailedInternalEvent,
AgentBackendRunPausedInternalEvent,
AgentBackendRunStartedInternalEvent,
AgentBackendRunSucceededInternalEvent,
AgentBackendStreamInternalEvent,
)
def test_event_adapter_maps_run_started():
adapted = AgentBackendRunEventAdapter().adapt(RunStartedEvent(id="1-0", run_id="run-1"))
assert adapted == [
AgentBackendRunStartedInternalEvent(
run_id="run-1",
source_event_id="1-0",
)
]
def test_event_adapter_maps_pydantic_ai_stream_event():
adapted = AgentBackendRunEventAdapter().adapt(
PydanticAIStreamRunEvent(
id="2-0",
run_id="run-1",
data=FinalResultEvent(tool_name=None, tool_call_id=None),
)
)
assert len(adapted) == 1
event = adapted[0]
assert isinstance(event, AgentBackendStreamInternalEvent)
assert event.type == AgentBackendInternalEventType.STREAM_EVENT
assert event.event_kind == "final_result"
assert event.data["event_kind"] == "final_result"
def test_event_adapter_maps_run_succeeded_to_final_output():
snapshot = CompositorSessionSnapshot(layers=[])
adapted = AgentBackendRunEventAdapter().adapt(
RunSucceededEvent(
id="3-0",
run_id="run-1",
data=RunSucceededEventData(output={"summary": "done"}, session_snapshot=snapshot),
)
)
assert adapted == [
AgentBackendRunSucceededInternalEvent(
run_id="run-1",
source_event_id="3-0",
output={"summary": "done"},
session_snapshot=snapshot,
)
]
def test_event_adapter_maps_run_failed_to_failed_result():
adapted = AgentBackendRunEventAdapter().adapt(
RunFailedEvent(
id="4-0",
run_id="run-1",
data=RunFailedEventData(error="boom", reason="runtime"),
)
)
assert adapted == [
AgentBackendRunFailedInternalEvent(
run_id="run-1",
source_event_id="4-0",
error="boom",
reason="runtime",
)
]
def test_event_adapter_maps_run_paused_to_resumable_pause():
snapshot = CompositorSessionSnapshot(layers=[])
adapted = AgentBackendRunEventAdapter().adapt(
RunPausedEvent(
id="5-0",
run_id="run-1",
data=RunPausedEventData(reason="human_handoff", message="Need review", session_snapshot=snapshot),
)
)
assert adapted == [
AgentBackendRunPausedInternalEvent(
run_id="run-1",
source_event_id="5-0",
reason="human_handoff",
message="Need review",
session_snapshot=snapshot,
)
]
def test_event_adapter_maps_run_cancelled_to_terminal_cancelled():
adapted = AgentBackendRunEventAdapter().adapt(
RunCancelledEvent(
id="6-0",
run_id="run-1",
data=RunCancelledEventData(reason="user_cancelled", message="Stopped by user"),
)
)
assert adapted == [
AgentBackendRunCancelledInternalEvent(
run_id="run-1",
source_event_id="6-0",
reason="user_cancelled",
message="Stopped by user",
)
]

View File

@ -1,115 +0,0 @@
import pytest
from clients.agent_backend import (
CONTRACT_VERSION,
AgentBackendEventParser,
AgentBackendEventType,
AgentErrorEvent,
AgentExecutionContext,
AgentFileCreatedEvent,
AgentInvokeFrom,
AgentLifecycleAckEvent,
AgentOutputCreatedEvent,
AgentOutputDeltaEvent,
AgentOutputValidationFailedEvent,
AgentPauseRequestedEvent,
AgentTextCompletedEvent,
AgentTextDeltaEvent,
AgentToolCallDeltaEvent,
AgentToolCallFailedEvent,
AgentToolCallStartedEvent,
AgentToolCallSucceededEvent,
)
from clients.agent_backend.errors import (
AgentBackendContractVersionError,
AgentBackendEventParseError,
AgentBackendUnknownEventError,
)
def _context_dict():
return AgentExecutionContext(tenant_id="tenant-1", invoke_from=AgentInvokeFrom.WORKFLOW_RUN).model_dump(mode="json")
def _raw_event(event_type: str, payload: dict, sequence: int = 1) -> dict:
return {
"contract_version": CONTRACT_VERSION,
"event_id": f"event-{sequence}",
"sequence": sequence,
"type": event_type,
"created_at": 1_800_000_000_000 + sequence,
"execution_context": _context_dict(),
"payload": payload,
}
@pytest.mark.parametrize(
("event_type", "payload", "expected_cls"),
[
(
AgentBackendEventType.LIFECYCLE,
{"lifecycle_event": "create", "lifecycle_reason": "workflow_run_start"},
AgentLifecycleAckEvent,
),
(AgentBackendEventType.TEXT_DELTA, {"delta": "hello"}, AgentTextDeltaEvent),
(AgentBackendEventType.TEXT_COMPLETED, {"text": "hello"}, AgentTextCompletedEvent),
(
AgentBackendEventType.TOOL_CALL_STARTED,
{"tool_call_id": "tool-1", "tool_name": "web_search"},
AgentToolCallStartedEvent,
),
(AgentBackendEventType.TOOL_CALL_DELTA, {"tool_call_id": "tool-1", "delta": "chunk"}, AgentToolCallDeltaEvent),
(
AgentBackendEventType.TOOL_CALL_SUCCEEDED,
{"tool_call_id": "tool-1", "output": {"ok": True}},
AgentToolCallSucceededEvent,
),
(
AgentBackendEventType.TOOL_CALL_FAILED,
{"tool_call_id": "tool-1", "error": "failed"},
AgentToolCallFailedEvent,
),
(
AgentBackendEventType.FILE_CREATED,
{"file_ref": {"type": "file", "id": "file-1"}},
AgentFileCreatedEvent,
),
(AgentBackendEventType.OUTPUT_DELTA, {"output_name": "summary", "delta": "part"}, AgentOutputDeltaEvent),
(AgentBackendEventType.OUTPUT_CREATED, {"output_name": "summary", "value": "done"}, AgentOutputCreatedEvent),
(
AgentBackendEventType.OUTPUT_VALIDATION_FAILED,
{"output_name": "report", "error": "not a pdf"},
AgentOutputValidationFailedEvent,
),
(
AgentBackendEventType.ERROR,
{"category": "runtime", "code": "boom", "message": "failed"},
AgentErrorEvent,
),
(AgentBackendEventType.PAUSE_REQUESTED, {"reason": "human_handoff"}, AgentPauseRequestedEvent),
],
)
def test_event_parser_parses_all_known_event_types(event_type, payload, expected_cls):
event = AgentBackendEventParser().parse(_raw_event(str(event_type), payload))
assert isinstance(event, expected_cls)
assert event.event_id == "event-1"
assert event.execution_context.tenant_id == "tenant-1"
def test_event_parser_rejects_unknown_event_type():
with pytest.raises(AgentBackendUnknownEventError):
AgentBackendEventParser().parse(_raw_event("unknown.event", {"value": "boom"}))
def test_event_parser_rejects_unknown_contract_version():
raw_event = _raw_event("text.delta", {"delta": "hello"})
raw_event["contract_version"] = "agent-backend.v999"
with pytest.raises(AgentBackendContractVersionError):
AgentBackendEventParser().parse(raw_event)
def test_event_parser_rejects_invalid_payload_shape():
with pytest.raises(AgentBackendEventParseError):
AgentBackendEventParser().parse(_raw_event("text.delta", {"text": "wrong"}))

View File

@ -0,0 +1,66 @@
from dify_agent.protocol import ExecutionContext
from clients.agent_backend import (
AgentBackendModelConfig,
AgentBackendRunRequestBuilder,
AgentBackendWorkflowNodeRunInput,
FakeAgentBackendRunClient,
FakeAgentBackendScenario,
)
def _request():
return AgentBackendRunRequestBuilder().build_for_workflow_node(
AgentBackendWorkflowNodeRunInput(
model=AgentBackendModelConfig(
tenant_id="tenant-1",
plugin_id="langgenius/openai",
model_provider="openai",
model="gpt-test",
),
execution_context=ExecutionContext(tenant_id="tenant-1", invoke_from="workflow_run"),
workflow_node_job_prompt="Do the task.",
user_prompt="hello",
)
)
def test_fake_client_stream_is_deterministic():
client = FakeAgentBackendRunClient()
request = _request()
created = client.create_run(request)
first = [event.model_dump(mode="json") for event in client.stream_events(created.run_id)]
second = [event.model_dump(mode="json") for event in client.stream_events(created.run_id)]
assert created.run_id == "fake-run-1"
assert client.request is request
assert first == second
assert [event["type"] for event in first] == ["run_started", "run_succeeded"]
assert first[-1]["data"]["output"] == {"text": "hello agent"}
def test_fake_client_stream_honors_cursor():
events = list(FakeAgentBackendRunClient().stream_events("fake-run-1", after="1-0"))
assert len(events) == 1
assert events[0].type == "run_succeeded"
def test_fake_client_failed_scenario_returns_failed_status_and_event():
client = FakeAgentBackendRunClient(scenario=FakeAgentBackendScenario.FAILED)
status = client.wait_run("fake-run-1")
events = list(client.stream_events("fake-run-1"))
assert status.status == "failed"
assert status.error == "fake failure"
assert events[-1].type == "run_failed"
assert events[-1].data.error == "fake failure"
def test_fake_client_cancel_run_returns_cancelled_status():
cancelled = FakeAgentBackendRunClient().cancel_run("fake-run-1")
assert cancelled.run_id == "fake-run-1"
assert cancelled.status == "cancelled"

View File

@ -1,80 +0,0 @@
from clients.agent_backend import (
AgentBackendLifecycleAck,
AgentBackendLifecycleRequest,
AgentBackendLifecycleSignal,
AgentExecutionContext,
AgentInvokeFrom,
AgentLifecycleEvent,
AgentLifecycleReason,
MockAgentBackendClient,
)
def _execution_context(invoke_from: AgentInvokeFrom = AgentInvokeFrom.WORKFLOW_RUN) -> AgentExecutionContext:
return AgentExecutionContext(tenant_id="tenant-1", invoke_from=invoke_from)
def test_lifecycle_signal_is_idempotent_shape():
signal = AgentBackendLifecycleSignal(
event=AgentLifecycleEvent.CREATE,
reason=AgentLifecycleReason.WORKFLOW_RUN_START,
execution_context=_execution_context(),
idempotency_key="workflow-run-1:create",
)
dumped = signal.model_dump(mode="json")
assert dumped == {
"event": "create",
"reason": "workflow_run_start",
"execution_context": {
"tenant_id": "tenant-1",
"app_id": None,
"workflow_id": None,
"workflow_run_id": None,
"node_id": None,
"node_execution_id": None,
"conversation_id": None,
"agent_id": None,
"agent_config_version_id": None,
"invoke_from": "workflow_run",
"trace_id": None,
},
"target_layer_ids": None,
"idempotency_key": "workflow-run-1:create",
}
def test_lifecycle_reason_mapping_covers_phase0_scenarios():
expected = {
AgentLifecycleReason.WORKFLOW_RUN_START,
AgentLifecycleReason.WORKFLOW_RUN_FINISH,
AgentLifecycleReason.SINGLE_STEP_START,
AgentLifecycleReason.BABYSIT_START,
AgentLifecycleReason.HUMAN_HANDOFF,
AgentLifecycleReason.WORKFLOW_HANDOFF,
AgentLifecycleReason.RESUME,
AgentLifecycleReason.FASTEN_PREVIEW,
AgentLifecycleReason.CANCEL,
}
assert set(AgentLifecycleReason) == expected
def test_mock_client_accepts_lifecycle_signal():
client = MockAgentBackendClient()
signal = AgentBackendLifecycleSignal(
event=AgentLifecycleEvent.DELETE,
reason=AgentLifecycleReason.CANCEL,
execution_context=_execution_context(AgentInvokeFrom.BABYSIT),
idempotency_key="babysit-1:cancel",
)
ack = client.send_lifecycle(AgentBackendLifecycleRequest(signal=signal))
assert ack == AgentBackendLifecycleAck(
accepted=True,
event=AgentLifecycleEvent.DELETE,
reason=AgentLifecycleReason.CANCEL,
idempotency_key="babysit-1:cancel",
)

View File

@ -1,75 +0,0 @@
import pytest
from clients.agent_backend import (
AgentBackendInvokeRequest,
AgentErrorEvent,
AgentExecutionContext,
AgentInvokeFrom,
AgentOutputCreatedEvent,
AgentPauseRequestedEvent,
AgentTextDeltaEvent,
CompositorConfig,
MockAgentBackendClient,
MockAgentBackendScenario,
PromptLayerConfig,
PromptOrigin,
PromptRole,
)
from clients.agent_backend.errors import AgentBackendUnknownEventError
def _request(mock_scenario: str | None = None) -> AgentBackendInvokeRequest:
config = CompositorConfig(
execution_context=AgentExecutionContext(tenant_id="tenant-1", invoke_from=AgentInvokeFrom.WORKFLOW_RUN),
layers=[
PromptLayerConfig(
id="prompt",
origin=PromptOrigin.AGENT_SOUL,
role=PromptRole.SYSTEM,
content="You are a helpful agent.",
)
],
)
if mock_scenario:
config.runtime_options.mock_scenario = mock_scenario
return AgentBackendInvokeRequest(compositor_config=config, idempotency_key="invoke-1")
def test_mock_client_success_text_stream_is_deterministic():
client = MockAgentBackendClient(scenario=MockAgentBackendScenario.SUCCESS_TEXT)
first = [event.model_dump(mode="json") for event in client.invoke(_request())]
second = [event.model_dump(mode="json") for event in client.invoke(_request())]
assert first == second
assert [event["sequence"] for event in first] == [1, 2, 3, 4, 5]
assert isinstance(next(client.invoke(_request())), object)
assert first[-1]["type"] == "output.created"
assert first[-1]["value"] == "hello agent"
def test_mock_client_can_select_scenario_from_request():
events = list(MockAgentBackendClient().invoke(_request(MockAgentBackendScenario.PAUSE)))
assert any(isinstance(event, AgentTextDeltaEvent) for event in events)
assert isinstance(events[-1], AgentPauseRequestedEvent)
def test_mock_client_error_event_does_not_raise_transport_error():
events = list(MockAgentBackendClient(scenario=MockAgentBackendScenario.ERROR).invoke(_request()))
assert len(events) == 1
assert isinstance(events[0], AgentErrorEvent)
assert events[0].code == "mock_error"
def test_mock_client_success_file_returns_output_event():
events = list(MockAgentBackendClient(scenario=MockAgentBackendScenario.SUCCESS_FILE).invoke(_request()))
assert isinstance(events[-1], AgentOutputCreatedEvent)
assert events[-1].output_name == "result_file"
def test_mock_client_unknown_event_scenario_exercises_parser_failure():
with pytest.raises(AgentBackendUnknownEventError):
list(MockAgentBackendClient(scenario=MockAgentBackendScenario.UNKNOWN_EVENT).invoke(_request()))

View File

@ -0,0 +1,132 @@
import pytest
from agenton.layers import ExitIntent
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID
from dify_agent.layers.dify_plugin import DIFY_PLUGIN_LAYER_TYPE_ID, DIFY_PLUGIN_LLM_LAYER_TYPE_ID
from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID
from dify_agent.protocol import (
DIFY_AGENT_MODEL_LAYER_ID,
DIFY_AGENT_OUTPUT_LAYER_ID,
CreateRunRequest,
ExecutionContext,
)
from pydantic import ValidationError
from clients.agent_backend import (
AGENT_SOUL_PROMPT_LAYER_ID,
WORKFLOW_NODE_JOB_PROMPT_LAYER_ID,
WORKFLOW_USER_PROMPT_LAYER_ID,
AgentBackendModelConfig,
AgentBackendOutputConfig,
AgentBackendRunRequestBuilder,
AgentBackendWorkflowNodeRunInput,
redact_for_agent_backend_log,
)
def _run_input() -> AgentBackendWorkflowNodeRunInput:
return AgentBackendWorkflowNodeRunInput(
model=AgentBackendModelConfig(
tenant_id="tenant-1",
plugin_id="langgenius/openai",
user_id="user-1",
model_provider="openai",
model="gpt-test",
credentials={"api_key": "secret-key"},
),
execution_context=ExecutionContext(
tenant_id="tenant-1",
workflow_id="workflow-1",
workflow_run_id="workflow-run-1",
node_id="node-1",
node_execution_id="node-execution-1",
invoke_from="workflow_run",
),
idempotency_key="workflow-run-1:node-execution-1",
agent_soul_prompt="You are a careful reviewer.",
workflow_node_job_prompt="Review the previous node output.",
user_prompt="Summarize the report.",
output=AgentBackendOutputConfig(
json_schema={
"type": "object",
"properties": {"summary": {"type": "string"}},
"required": ["summary"],
}
),
metadata={"workflow_id": "workflow-1", "node_id": "node-1"},
)
def test_request_builder_outputs_dify_agent_create_run_request():
request = AgentBackendRunRequestBuilder().build_for_workflow_node(_run_input())
assert isinstance(request, CreateRunRequest)
assert [layer.name for layer in request.composition.layers] == [
AGENT_SOUL_PROMPT_LAYER_ID,
WORKFLOW_NODE_JOB_PROMPT_LAYER_ID,
WORKFLOW_USER_PROMPT_LAYER_ID,
"plugin",
DIFY_AGENT_MODEL_LAYER_ID,
DIFY_AGENT_OUTPUT_LAYER_ID,
]
assert request.on_exit.default is ExitIntent.DELETE
assert request.execution_context is not None
assert request.execution_context.node_execution_id == "node-execution-1"
assert request.idempotency_key == "workflow-run-1:node-execution-1"
assert request.metadata == {"workflow_id": "workflow-1", "node_id": "node-1"}
def test_request_builder_separates_agent_soul_and_workflow_job_prompt():
request = AgentBackendRunRequestBuilder().build_for_workflow_node(_run_input())
layers = {layer.name: layer for layer in request.composition.layers}
assert layers[AGENT_SOUL_PROMPT_LAYER_ID].type == PLAIN_PROMPT_LAYER_TYPE_ID
assert layers[AGENT_SOUL_PROMPT_LAYER_ID].metadata["origin"] == "agent_soul"
assert layers[WORKFLOW_NODE_JOB_PROMPT_LAYER_ID].metadata["origin"] == "workflow_node_job"
assert layers[WORKFLOW_USER_PROMPT_LAYER_ID].metadata["origin"] == "workflow_user_prompt"
dumped = request.model_dump(mode="json")
assert dumped["composition"]["layers"][0]["config"]["prefix"] == "You are a careful reviewer."
assert dumped["composition"]["layers"][1]["config"]["prefix"] == "Review the previous node output."
assert dumped["composition"]["layers"][2]["config"]["user"] == "Summarize the report."
def test_request_builder_sets_model_and_output_layer_contract_ids():
request = AgentBackendRunRequestBuilder().build_for_workflow_node(_run_input())
layers = {layer.name: layer for layer in request.composition.layers}
assert layers["plugin"].type == DIFY_PLUGIN_LAYER_TYPE_ID
assert layers[DIFY_AGENT_MODEL_LAYER_ID].type == DIFY_PLUGIN_LLM_LAYER_TYPE_ID
assert layers[DIFY_AGENT_MODEL_LAYER_ID].deps == {"plugin": "plugin"}
assert layers[DIFY_AGENT_OUTPUT_LAYER_ID].type == DIFY_OUTPUT_LAYER_TYPE_ID
def test_request_builder_can_suspend_on_exit_for_resume_or_babysit_paths():
run_input = _run_input()
run_input.suspend_on_exit = True
request = AgentBackendRunRequestBuilder().build_for_workflow_node(run_input)
assert request.on_exit.default is ExitIntent.SUSPEND
def test_request_builder_rejects_blank_prompts():
with pytest.raises(ValidationError):
AgentBackendWorkflowNodeRunInput(
model=AgentBackendModelConfig(
tenant_id="tenant-1",
plugin_id="langgenius/openai",
model_provider="openai",
model="gpt-test",
),
execution_context=ExecutionContext(tenant_id="tenant-1", invoke_from="workflow_run"),
workflow_node_job_prompt=" ",
user_prompt="hello",
)
def test_redact_for_agent_backend_log_hides_credentials():
request = AgentBackendRunRequestBuilder().build_for_workflow_node(_run_input())
redacted = redact_for_agent_backend_log(request)
assert redacted["composition"]["layers"][4]["config"]["credentials"] == "[REDACTED]"

4
api/uv.lock generated
View File

@ -1332,6 +1332,7 @@ dependencies = [
{ name = "boto3" },
{ name = "celery" },
{ name = "croniter" },
{ name = "dify-agent" },
{ name = "fastopenapi", extra = ["flask"] },
{ name = "flask" },
{ name = "flask-compress" },
@ -1372,7 +1373,6 @@ dev = [
{ name = "boto3-stubs" },
{ name = "celery-types" },
{ name = "coverage" },
{ name = "dify-agent" },
{ name = "dotenv-linter" },
{ name = "faker" },
{ name = "hypothesis" },
@ -1615,6 +1615,7 @@ requires-dist = [
{ name = "boto3", specifier = ">=1.43.6" },
{ name = "celery", specifier = ">=5.6.3" },
{ name = "croniter", specifier = ">=6.2.2" },
{ name = "dify-agent", directory = "../dify-agent" },
{ name = "fastopenapi", extras = ["flask"], specifier = "~=0.7.0" },
{ name = "flask", specifier = ">=3.1.3,<4.0.0" },
{ name = "flask-compress", specifier = ">=1.24,<2.0.0" },
@ -1655,7 +1656,6 @@ dev = [
{ name = "boto3-stubs", specifier = ">=1.43.2" },
{ name = "celery-types", specifier = ">=0.23.0" },
{ name = "coverage", specifier = ">=7.13.4" },
{ name = "dify-agent", directory = "../dify-agent" },
{ name = "dotenv-linter", specifier = ">=0.7.0" },
{ name = "faker", specifier = ">=40.15.0" },
{ name = "hypothesis", specifier = ">=6.152.4" },

View File

@ -23,6 +23,8 @@ import httpx
from pydantic import BaseModel, ValidationError
from dify_agent.protocol.schemas import (
CancelRunRequest,
CancelRunResponse,
CreateRunRequest,
CreateRunResponse,
RUN_EVENT_ADAPTER,
@ -32,8 +34,8 @@ from dify_agent.protocol.schemas import (
)
_ResponseModelT = TypeVar("_ResponseModelT", bound=BaseModel)
_TERMINAL_EVENT_TYPES = {"run_succeeded", "run_failed"}
_TERMINAL_RUN_STATUSES = {"succeeded", "failed"}
_TERMINAL_EVENT_TYPES = {"run_succeeded", "run_failed", "run_cancelled"}
_TERMINAL_RUN_STATUSES = {"succeeded", "failed", "cancelled"}
class DifyAgentClientError(RuntimeError):
@ -279,6 +281,42 @@ class Client:
raise DifyAgentClientError(f"create_run_sync request failed: {exc}") from exc
return _parse_model_response(response, CreateRunResponse)
async def cancel_run(self, run_id: str, request: CancelRunRequest | None = None) -> CancelRunResponse:
"""Request explicit cancellation for ``run_id``.
The server may accept cancellation only for active runs; unsupported
deployments return an HTTP error rather than overloading ``run_failed``.
"""
request_model = request or CancelRunRequest()
try:
response = await self._get_async_http_client().post(
self._url(f"/runs/{quote(run_id, safe='')}/cancel"),
content=request_model.model_dump_json(),
headers=self._merged_headers({"Content-Type": "application/json"}),
timeout=self._timeout,
)
except httpx.TimeoutException as exc:
raise DifyAgentTimeoutError("cancel_run timed out") from exc
except httpx.RequestError as exc:
raise DifyAgentClientError(f"cancel_run request failed: {exc}") from exc
return _parse_model_response(response, CancelRunResponse)
def cancel_run_sync(self, run_id: str, request: CancelRunRequest | None = None) -> CancelRunResponse:
"""Synchronous variant of ``cancel_run``."""
request_model = request or CancelRunRequest()
try:
response = self._get_sync_http_client().post(
self._url(f"/runs/{quote(run_id, safe='')}/cancel"),
content=request_model.model_dump_json(),
headers=self._merged_headers({"Content-Type": "application/json"}),
timeout=self._timeout,
)
except httpx.TimeoutException as exc:
raise DifyAgentTimeoutError("cancel_run_sync timed out") from exc
except httpx.RequestError as exc:
raise DifyAgentClientError(f"cancel_run_sync request failed: {exc}") from exc
return _parse_model_response(response, CancelRunResponse)
async def get_run(self, run_id: str) -> RunStatusResponse:
"""Return the current status for ``run_id`` or raise a mapped client error."""
try:

View File

@ -5,17 +5,26 @@ from .schemas import (
DIFY_AGENT_OUTPUT_LAYER_ID,
RUN_EVENT_ADAPTER,
BaseRunEvent,
CancelRunRequest,
CancelRunResponse,
CreateRunRequest,
CreateRunResponse,
EmptyRunEventData,
ExecutionContext,
InvokeFrom,
LayerExitSignals,
PydanticAIStreamRunEvent,
RunCancelledEvent,
RunCancelledEventData,
RunEvent,
RunComposition,
RunEventType,
RunEventsResponse,
RunFailedEvent,
RunFailedEventData,
RunPausedEvent,
RunPausedEventData,
RunPurpose,
RunLayerSpec,
RunStartedEvent,
RunStatus,
@ -28,20 +37,29 @@ from .schemas import (
__all__ = [
"BaseRunEvent",
"CancelRunRequest",
"CancelRunResponse",
"CreateRunRequest",
"CreateRunResponse",
"DIFY_AGENT_MODEL_LAYER_ID",
"DIFY_AGENT_OUTPUT_LAYER_ID",
"EmptyRunEventData",
"ExecutionContext",
"InvokeFrom",
"LayerExitSignals",
"PydanticAIStreamRunEvent",
"RUN_EVENT_ADAPTER",
"RunCancelledEvent",
"RunCancelledEventData",
"RunComposition",
"RunEvent",
"RunEventType",
"RunEventsResponse",
"RunFailedEvent",
"RunFailedEventData",
"RunPausedEvent",
"RunPausedEventData",
"RunPurpose",
"RunLayerSpec",
"RunStartedEvent",
"RunStatus",

View File

@ -43,12 +43,16 @@ from agenton.layers import ExitIntent
DIFY_AGENT_MODEL_LAYER_ID: Final[str] = "llm"
DIFY_AGENT_OUTPUT_LAYER_ID: Final[str] = "output"
RunStatus = Literal["running", "succeeded", "failed"]
RunStatus = Literal["running", "paused", "succeeded", "failed", "cancelled"]
RunPurpose = Literal["workflow_node", "single_step", "agent_app", "babysit", "fasten_preview"]
InvokeFrom = Literal["workflow_run", "single_step", "agent_app", "babysit", "fasten"]
RunEventType = Literal[
"run_started",
"pydantic_ai_event",
"run_paused",
"run_succeeded",
"run_failed",
"run_cancelled",
]
@ -100,6 +104,29 @@ class RunComposition(BaseModel):
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class ExecutionContext(BaseModel):
"""Dify-owned execution identifiers attached to one Agent backend run.
The Agent backend stores and replays this context for observability and
product correlation only. It must not use these identifiers as authorization
proof; API backend remains responsible for tenant and user access checks.
"""
tenant_id: str
app_id: str | None = None
workflow_id: str | None = None
workflow_run_id: str | None = None
node_id: str | None = None
node_execution_id: str | None = None
conversation_id: str | None = None
agent_id: str | None = None
agent_config_version_id: str | None = None
invoke_from: InvokeFrom
trace_id: str | None = None
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class CreateRunRequest(BaseModel):
"""Request body for creating one async agent run.
@ -115,12 +142,30 @@ class CreateRunRequest(BaseModel):
"""
composition: RunComposition
execution_context: ExecutionContext | None = None
purpose: RunPurpose = "workflow_node"
idempotency_key: str | None = None
metadata: dict[str, JsonValue] = Field(default_factory=dict)
session_snapshot: CompositorSessionSnapshot | None = None
on_exit: LayerExitSignals = Field(default_factory=LayerExitSignals)
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class CancelRunRequest(BaseModel):
"""Request body for cancelling a run.
Runtime cancellation is intentionally a separate protocol operation from
failed execution so API callers can distinguish user/operator cancellation
from model, tool, or infrastructure failures.
"""
reason: str | None = None
message: str | None = None
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
def normalize_composition(composition: RunComposition) -> tuple[CompositorConfig, dict[str, LayerConfigInput]]:
"""Split public Dify composition into Agenton's graph config and layer configs.
@ -159,6 +204,15 @@ class CreateRunResponse(BaseModel):
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class CancelRunResponse(BaseModel):
"""Response returned after a cancel request is accepted."""
run_id: str
status: Literal["cancelled"]
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class RunStatusResponse(BaseModel):
"""Current server-side status for one run."""
@ -195,6 +249,25 @@ class RunFailedEventData(BaseModel):
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class RunPausedEventData(BaseModel):
"""Pause payload used for human handoff or other resumable waits."""
reason: str
message: str | None = None
session_snapshot: CompositorSessionSnapshot | None = None
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class RunCancelledEventData(BaseModel):
"""Terminal cancellation payload for explicit user/operator cancellation."""
reason: str | None = None
message: str | None = None
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class BaseRunEvent(BaseModel):
"""Shared append-only event envelope visible through polling and SSE."""
@ -233,8 +306,27 @@ class RunFailedEvent(BaseRunEvent):
data: RunFailedEventData
class RunPausedEvent(BaseRunEvent):
"""Resumable pause event emitted when a run waits for outside input."""
type: Literal["run_paused"] = "run_paused"
data: RunPausedEventData
class RunCancelledEvent(BaseRunEvent):
"""Terminal cancellation event emitted after an explicit cancel request."""
type: Literal["run_cancelled"] = "run_cancelled"
data: RunCancelledEventData = Field(default_factory=RunCancelledEventData)
RunEvent: TypeAlias = Annotated[
RunStartedEvent | PydanticAIStreamRunEvent | RunSucceededEvent | RunFailedEvent,
RunStartedEvent
| PydanticAIStreamRunEvent
| RunPausedEvent
| RunSucceededEvent
| RunFailedEvent
| RunCancelledEvent,
Field(discriminator="type"),
]
RUN_EVENT_ADAPTER: TypeAdapter[RunEvent] = TypeAdapter(RunEvent)
@ -252,20 +344,29 @@ class RunEventsResponse(BaseModel):
__all__ = [
"BaseRunEvent",
"CancelRunRequest",
"CancelRunResponse",
"CreateRunRequest",
"CreateRunResponse",
"DIFY_AGENT_MODEL_LAYER_ID",
"DIFY_AGENT_OUTPUT_LAYER_ID",
"EmptyRunEventData",
"ExecutionContext",
"InvokeFrom",
"LayerExitSignals",
"PydanticAIStreamRunEvent",
"RUN_EVENT_ADAPTER",
"RunCancelledEvent",
"RunCancelledEventData",
"RunComposition",
"RunEvent",
"RunEventType",
"RunEventsResponse",
"RunFailedEvent",
"RunFailedEventData",
"RunPausedEvent",
"RunPausedEventData",
"RunPurpose",
"RunStartedEvent",
"RunStatus",
"RunStatusResponse",

View File

@ -13,7 +13,14 @@ from typing import Annotated
from fastapi import APIRouter, Depends, Header, HTTPException, Query
from fastapi.responses import StreamingResponse
from dify_agent.protocol.schemas import CreateRunRequest, CreateRunResponse, RunEventsResponse, RunStatusResponse
from dify_agent.protocol.schemas import (
CancelRunRequest,
CancelRunResponse,
CreateRunRequest,
CreateRunResponse,
RunEventsResponse,
RunStatusResponse,
)
from dify_agent.runtime.run_scheduler import RunRequestValidationError, RunScheduler, SchedulerStoppingError
from dify_agent.server.sse import sse_event_stream
from dify_agent.storage.redis_run_store import RedisRunStore, RunNotFoundError
@ -59,6 +66,18 @@ def create_runs_router(
error=record.error,
)
@router.post("/{run_id}/cancel", response_model=CancelRunResponse)
async def cancel_run(run_id: str, request: CancelRunRequest) -> CancelRunResponse:
"""Reserve the cancellation endpoint in the public protocol.
Runtime cancellation requires scheduler task lookup and persistence
semantics that are outside the current server implementation. Exposing a
typed endpoint now lets clients bind to the final route while receiving
an explicit 501 until execution support lands.
"""
del run_id, request
raise HTTPException(status_code=501, detail="run cancellation is not implemented")
@router.get("/{run_id}/events", response_model=RunEventsResponse)
async def get_run_events(
run_id: str,

View File

@ -20,8 +20,11 @@ from dify_agent.client import (
DifyAgentValidationError,
)
from dify_agent.protocol.schemas import (
CancelRunRequest,
CancelRunResponse,
CreateRunRequest,
RUN_EVENT_ADAPTER,
RunCancelledEvent,
RunEvent,
RunEventsResponse,
RunStartedEvent,
@ -97,6 +100,10 @@ def test_sync_methods_parse_protocol_dtos_and_send_create_request_dto() -> None:
"next_cursor": "1-0",
},
)
if request.method == "POST" and request.url.path == "/runs/run-1/cancel":
payload = cast(dict[str, object], json.loads(request.content))
assert payload == {"reason": "user_cancelled", "message": None}
return httpx.Response(202, json={"run_id": "run-1", "status": "cancelled"})
raise AssertionError(f"unexpected request: {request.method} {request.url}")
http_client = httpx.Client(transport=httpx.MockTransport(handler))
@ -105,11 +112,14 @@ def test_sync_methods_parse_protocol_dtos_and_send_create_request_dto() -> None:
created = client.create_run_sync(CreateRunRequest.model_validate(_create_run_payload()))
status = client.get_run_sync(created.run_id)
events = client.get_events_sync(created.run_id, after="0-0", limit=10)
cancelled = client.cancel_run_sync(created.run_id, CancelRunRequest(reason="user_cancelled"))
assert created.status == "running"
assert status.status == "running"
assert isinstance(events, RunEventsResponse)
assert [event.type for event in events.events] == ["run_started"]
assert isinstance(cancelled, CancelRunResponse)
assert cancelled.status == "cancelled"
def test_async_methods_and_wait_run_parse_protocol_dtos() -> None:
@ -251,6 +261,31 @@ def test_stream_events_stops_after_terminal_event() -> None:
assert calls == 1
def test_stream_events_stops_after_cancelled_terminal_event() -> None:
calls = 0
body = "".join(
[
_event_frame(RunStartedEvent(id="1-0", run_id="run-1")),
_event_frame(RunCancelledEvent(id="2-0", run_id="run-1")),
]
)
def handler(_request: httpx.Request) -> httpx.Response:
nonlocal calls
calls += 1
return httpx.Response(200, content=body)
client = Client(
base_url="http://testserver",
sync_http_client=httpx.Client(transport=httpx.MockTransport(handler)),
)
events = list(client.stream_events_sync("run-1", reconnect_delay_seconds=0))
assert [event.type for event in events] == ["run_started", "run_cancelled"]
assert calls == 1
def test_stream_events_reconnects_from_latest_event_id() -> None:
seen_after: list[str] = []

View File

@ -12,12 +12,17 @@ from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, DIFY_AGENT_OUTPUT_LAY
from dify_agent.protocol.schemas import (
RUN_EVENT_ADAPTER,
CreateRunRequest,
ExecutionContext,
LayerExitSignals,
PydanticAIStreamRunEvent,
RunCancelledEvent,
RunCancelledEventData,
RunComposition,
RunFailedEvent,
RunFailedEventData,
RunLayerSpec,
RunPausedEvent,
RunPausedEventData,
RunStartedEvent,
RunSucceededEvent,
RunSucceededEventData,
@ -38,6 +43,15 @@ def test_run_event_adapter_round_trips_typed_variants() -> None:
),
),
RunFailedEvent(run_id="run-1", data=RunFailedEventData(error="boom", reason="shutdown")),
RunPausedEvent(
run_id="run-1",
data=RunPausedEventData(
reason="human_handoff",
message="Need review",
session_snapshot=CompositorSessionSnapshot(layers=[]),
),
),
RunCancelledEvent(run_id="run-1", data=RunCancelledEventData(reason="user_cancelled")),
]
for event in events:
@ -89,6 +103,18 @@ def test_create_run_request_accepts_dto_first_public_composition_and_normalizes_
}
)
request = CreateRunRequest(
execution_context=ExecutionContext(
tenant_id="tenant-1",
workflow_id="workflow-1",
workflow_run_id="workflow-run-1",
node_id="node-1",
node_execution_id="node-execution-1",
invoke_from="workflow_run",
trace_id="trace-1",
),
purpose="workflow_node",
idempotency_key="workflow-run-1:node-execution-1",
metadata={"source": "unit_test"},
composition=RunComposition(
layers=[
RunLayerSpec(name="prompt", type=PLAIN_PROMPT_LAYER_TYPE_ID, config=prompt_config),
@ -111,6 +137,22 @@ def test_create_run_request_accepts_dto_first_public_composition_and_normalizes_
graph_config, layer_configs = normalize_composition(request.composition)
payload = request.model_dump(mode="json")
assert payload["execution_context"] == {
"tenant_id": "tenant-1",
"app_id": None,
"workflow_id": "workflow-1",
"workflow_run_id": "workflow-run-1",
"node_id": "node-1",
"node_execution_id": "node-execution-1",
"conversation_id": None,
"agent_id": None,
"agent_config_version_id": None,
"invoke_from": "workflow_run",
"trace_id": "trace-1",
}
assert payload["purpose"] == "workflow_node"
assert payload["idempotency_key"] == "workflow-run-1:node-execution-1"
assert payload["metadata"] == {"source": "unit_test"}
assert payload["composition"]["layers"][0]["config"] == {"prefix": "system", "user": "hello", "suffix": []}
assert [layer.model_dump(mode="json") for layer in graph_config.layers] == [
{"name": "prompt", "type": PLAIN_PROMPT_LAYER_TYPE_ID, "deps": {}, "metadata": {}},
@ -163,6 +205,17 @@ def test_on_exit_accept_layer_overrides() -> None:
assert request.on_exit.layers == {"prompt": ExitIntent.SUSPEND, "llm": ExitIntent.DELETE}
def test_execution_context_rejects_unknown_fields() -> None:
with pytest.raises(ValidationError):
_ = ExecutionContext.model_validate(
{
"tenant_id": "tenant-1",
"invoke_from": "workflow_run",
"unknown": "value",
}
)
def test_layer_exit_signals_reject_extra_fields() -> None:
with pytest.raises(ValidationError):
_ = LayerExitSignals.model_validate({"default": "suspend", "unknown": "value"})

View File

@ -67,6 +67,21 @@ def test_create_run_returns_running_from_scheduler() -> None:
assert response.json() == {"run_id": "run-1", "status": "running"}
def test_cancel_run_endpoint_is_reserved_but_not_implemented() -> None:
from fastapi import FastAPI
app = FastAPI()
app.include_router(
create_runs_router(lambda: FakeStore(), lambda: FakeScheduler()) # pyright: ignore[reportArgumentType]
)
client = TestClient(app)
response = client.post("/runs/run-1/cancel", json={"reason": "user_cancelled"})
assert response.status_code == 501
assert response.json()["detail"] == "run cancellation is not implemented"
def test_create_run_accepts_valid_full_plugin_graph() -> None:
from fastapi import FastAPI