Compare commits

...

2 Commits

29 changed files with 1455 additions and 214 deletions

View File

@ -98,35 +98,16 @@ uv run --extra server uvicorn dify_agent.server.app:app \
`ServerSettings` reads `.env` from the current `dify-agent` directory, or from
`dify-agent/.env` when the command is run from the repository root.
## Create a one-file uv script client
## Create a Python client example
In another shell, keep working from the `dify-agent` directory and create this
script. The script depends on the local `dify-agent` package only; it does not
install the server extra because it talks to the already running server through
the public Python client.
```bash
DIFY_AGENT_PACKAGE_URL="$(python3 - <<'PY'
from pathlib import Path
print(Path.cwd().resolve().as_uri())
PY
)"
cat > ./run_dify_agent_client.py <<PY
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "dify-agent @ ${DIFY_AGENT_PACKAGE_URL}",
# ]
# ///
In another shell, keep working from the `dify-agent` directory. Create
`run_dify_agent_client.py` with the example below, then replace the placeholder
tenant id and provider credential values.
```python {test="skip" lint="skip"}
import asyncio
import json
import os
import sys
from typing import Any
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig
from dify_agent.client import Client
@ -139,55 +120,39 @@ from dify_agent.layers.dify_plugin import (
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest, RunComposition, RunLayerSpec
def env(name: str, default: str | None = None) -> str:
value = os.environ.get(name, default)
if value is None or value == "":
raise SystemExit(f"Missing required environment variable: {name}")
return value
API_BASE_URL = "http://127.0.0.1:8000"
TENANT_ID = "replace-with-tenant-id"
PLUGIN_ID = "langgenius/openai"
USER_ID: str | None = None
# Keep these aligned with DIFY_AGENT_PROVIDER and DIFY_AGENT_MODEL_NAME in dify-agent/.env.
MODEL_PROVIDER = "replace-with-provider-from-dify-agent-env"
MODEL_NAME = "replace-with-model-from-dify-agent-env"
MODEL_CREDENTIALS: dict[str, str | int | float | bool | None] = {
"api_key": "replace-with-provider-key",
}
SYSTEM_PROMPT = "You are a concise assistant."
USER_PROMPT = "用一句话介绍 Dify Agent。"
def load_credentials() -> dict[str, Any]:
raw = env("DIFY_AGENT_MODEL_CREDENTIALS_JSON")
try:
data = json.loads(raw)
except json.JSONDecodeError as exc:
raise SystemExit(f"DIFY_AGENT_MODEL_CREDENTIALS_JSON must be valid JSON: {exc}") from exc
if not isinstance(data, dict):
raise SystemExit("DIFY_AGENT_MODEL_CREDENTIALS_JSON must be a JSON object")
return data
async def main() -> int:
api_base_url = env("DIFY_AGENT_SERVER_URL", "http://127.0.0.1:8000")
tenant_id = env("DIFY_AGENT_TENANT_ID")
plugin_id = env("DIFY_AGENT_PLUGIN_ID", "langgenius/openai")
user_id = os.environ.get("DIFY_AGENT_USER_ID") or None
model_provider = env("DIFY_AGENT_PROVIDER", "openai")
model_name = env("DIFY_AGENT_MODEL_NAME", "gpt-4o-mini")
model_credentials = load_credentials()
system_prompt = env("DIFY_AGENT_SYSTEM_PROMPT", "You are a concise assistant.")
user_prompt = env("DIFY_AGENT_PROMPT", "Say hello from the Dify Agent client.")
request = CreateRunRequest(
def build_request() -> CreateRunRequest:
return CreateRunRequest(
composition=RunComposition(
layers=[
RunLayerSpec(
name="prompt",
type=PLAIN_PROMPT_LAYER_TYPE_ID,
config=PromptLayerConfig(prefix=system_prompt, user=user_prompt),
config=PromptLayerConfig(prefix=SYSTEM_PROMPT, user=USER_PROMPT),
),
RunLayerSpec(
name="plugin",
type=DIFY_PLUGIN_LAYER_TYPE_ID,
config=DifyPluginLayerConfig(
tenant_id=tenant_id,
plugin_id=plugin_id,
user_id=user_id,
tenant_id=TENANT_ID,
plugin_id=PLUGIN_ID,
user_id=USER_ID,
),
),
RunLayerSpec(
@ -195,17 +160,19 @@ async def main() -> int:
type=DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
deps={"plugin": "plugin"},
config=DifyPluginLLMLayerConfig(
model_provider=model_provider,
model=model_name,
credentials=model_credentials,
model_provider=MODEL_PROVIDER,
model=MODEL_NAME,
credentials=MODEL_CREDENTIALS,
),
),
],
),
)
async with Client(base_url=api_base_url, stream_timeout=None) as client:
run = await client.create_run(request)
async def main() -> int:
async with Client(base_url=API_BASE_URL, stream_timeout=None) as client:
run = await client.create_run(build_request())
print(f"created run: {run.run_id}, status={run.status}")
async for event in client.stream_events(run.run_id):
@ -225,35 +192,26 @@ async def main() -> int:
if __name__ == "__main__":
raise SystemExit(asyncio.run(main()))
PY
chmod +x ./run_dify_agent_client.py
```
## Configure the client request and run it
## Run the client example
The server-side `.env` controls how Dify Agent reaches the plugin daemon. The
client request controls which tenant/plugin/provider/model and provider
credentials the run uses. Configure those values before executing the script:
client example controls which tenant/plugin/provider/model and provider
credentials the run uses.
Run the example from the `dify-agent` directory:
```bash
export DIFY_AGENT_SERVER_URL=http://127.0.0.1:8000
export DIFY_AGENT_TENANT_ID=replace-with-tenant-id
export DIFY_AGENT_PLUGIN_ID=langgenius/openai
export DIFY_AGENT_PROVIDER=openai
export DIFY_AGENT_MODEL_NAME=gpt-4o-mini
export DIFY_AGENT_MODEL_CREDENTIALS_JSON='{"api_key":"replace-with-provider-key"}'
export DIFY_AGENT_PROMPT='用一句话介绍 Dify Agent。'
./run_dify_agent_client.py
uv run python ./run_dify_agent_client.py
```
The shape of `DIFY_AGENT_MODEL_CREDENTIALS_JSON` depends on the selected plugin
provider's credential schema. The `{"api_key":"..."}` value above is only an
OpenAI-style example.
The shape of `MODEL_CREDENTIALS` depends on the selected plugin provider's
credential schema. The `{"api_key":"..."}` value above is only an OpenAI-style
example.
Set `MODEL_PROVIDER` and `MODEL_NAME` to the same values as
`DIFY_AGENT_PROVIDER` and `DIFY_AGENT_MODEL_NAME` in `dify-agent/.env`.
## Troubleshooting
@ -263,6 +221,7 @@ If the run fails, check these items first:
2. The Dify Agent server is listening on `127.0.0.1:8000`.
3. `DIFY_AGENT_PLUGIN_DAEMON_URL` points to the correct plugin daemon.
4. `DIFY_AGENT_PLUGIN_DAEMON_API_KEY` matches the plugin daemon server key.
5. `DIFY_AGENT_PLUGIN_ID`, `DIFY_AGENT_PROVIDER`, and
`DIFY_AGENT_MODEL_NAME` match a provider available through the plugin daemon.
6. `DIFY_AGENT_MODEL_CREDENTIALS_JSON` matches that provider's credential schema.
5. `PLUGIN_ID`, `MODEL_PROVIDER`, and `MODEL_NAME` in the client example match
the corresponding values configured in `dify-agent/.env` and a provider
available through the plugin daemon.
6. `MODEL_CREDENTIALS` matches that provider's credential schema.

View File

@ -0,0 +1,103 @@
# History layer
The history layer stores pydantic-ai conversation history in the Agenton session
snapshot. Add it when a later run should resume the previous conversation.
The history layer is state-only: it contributes no prompt text, user prompt, or
tools, and it owns no live resources.
## Layer contract
| Property | Value |
| --- | --- |
| Reserved layer name | `history` |
| Type id | `pydantic_ai.history` |
| Config | none |
| Dependencies | none |
Use at most one history layer. It must be named `history` and must not declare
dependencies.
## Basic usage
```python {test="skip" lint="skip"}
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID
from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID, RunLayerSpec
history_layer = RunLayerSpec(
name=DIFY_AGENT_HISTORY_LAYER_ID,
type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID,
)
```
Include this layer in the same composition as your prompt, plugin, and LLM
layers.
## Resume a conversation
Successful runs return a terminal event with both final output and a resumable
session snapshot:
```python {test="skip" lint="skip"}
accepted = await client.create_run(request)
async for event in client.stream_events(accepted.run_id):
if event.type == "run_succeeded":
output = event.data.output
snapshot = event.data.session_snapshot
break
```
Pass `snapshot` to the next request and keep the same layer names and order:
```python {test="skip" lint="skip"}
next_request = CreateRunRequest(
composition=composition_with_the_same_layer_names_and_order,
session_snapshot=snapshot,
)
```
`CreateRunRequest.on_exit` defaults to suspending layers, which makes the
terminal snapshot resumable. Keep that default for normal memory flows.
## What gets stored
Dify Agent handles memory conservatively:
1. Current system prompts are rendered into temporary `message_history` before
stored history.
2. Stored history is then sent to the model.
3. Current user prompts are sent after the stored history.
4. Only newly produced pydantic-ai messages are appended after a successful run.
5. Current system prompts are not persisted into the history layer.
6. Failed runs emit `run_failed` and do not return a success snapshot to resume.
## Persist snapshots outside the client process
Session snapshots are Pydantic models and can be saved as JSON:
```python {test="skip" lint="skip"}
from pathlib import Path
from agenton.compositor import CompositorSessionSnapshot
snapshot_path = Path("session_snapshot.json")
snapshot_path.write_text(snapshot.model_dump_json(), encoding="utf-8")
restored_snapshot = CompositorSessionSnapshot.model_validate_json(
snapshot_path.read_text(encoding="utf-8")
)
```
Always restore snapshots with the same layer names and order that produced them.
## Troubleshooting
| Symptom | What to check |
| --- | --- |
| `must use reserved layer name 'history'` | Rename the layer to `history`. |
| `does not support dependencies` | Remove `deps` from the history layer. |
| Resume fails with snapshot lifecycle errors | Use the success snapshot from `run_succeeded` and keep layer names/order unchanged. |
| System prompts appear missing from saved memory | This is expected; current system prompts are temporary and are not persisted. |

View File

@ -0,0 +1,59 @@
# Plugin layer
The plugin layer carries Dify plugin daemon identity for a run. It identifies the
tenant, plugin, and optional user context; server settings provide the plugin
daemon URL and API key.
Use it together with a [plugin LLM layer](../plugin-llm-layer/index.md). The LLM
layer depends on this layer to reach the plugin daemon.
## Config fields
| Field | Type | Meaning |
| --- | --- | --- |
| `tenant_id` | `str` | Dify tenant/workspace id used when calling the plugin daemon. |
| `plugin_id` | `str` | Plugin id, for example `langgenius/openai`. |
| `user_id` | `str \| None` | Optional end-user id passed through to the plugin daemon. |
The plugin layer type id is `dify.plugin`.
## Basic usage
```python {test="skip" lint="skip"}
from dify_agent.layers.dify_plugin import DIFY_PLUGIN_LAYER_TYPE_ID, DifyPluginLayerConfig
from dify_agent.protocol import RunLayerSpec
plugin_layer = RunLayerSpec(
name="plugin",
type=DIFY_PLUGIN_LAYER_TYPE_ID,
config=DifyPluginLayerConfig(
tenant_id="replace-with-tenant-id",
plugin_id="langgenius/openai",
user_id="replace-with-user-id",
),
)
```
If you do not need a user id, omit `user_id` or pass `None`.
## Server-side settings
The plugin layer config does not include daemon transport settings. Configure
these on the Dify Agent server instead:
```env
DIFY_AGENT_PLUGIN_DAEMON_URL=http://localhost:5002
DIFY_AGENT_PLUGIN_DAEMON_API_KEY=replace-with-plugin-daemon-server-key
```
This keeps server credentials out of client-submitted layer config and out of
session snapshots.
## Notes
- The plugin layer does not open, cache, close, or snapshot HTTP clients.
- `plugin_id` selects the plugin package. The business model provider and model
name belong to the plugin LLM layer, not this layer.
- The conventional layer name is `plugin`. If you use another name, point the LLM
layer dependency at that name.

View File

@ -0,0 +1,102 @@
# Plugin LLM layer
The plugin LLM layer selects the model provider, model name, provider credentials,
and optional model settings for the current run. Dify Agent reads the model from
the reserved layer name `llm`.
It must depend on a [plugin layer](../plugin-layer/index.md), because the plugin
layer supplies the daemon identity and transport context.
## Config fields
| Field | Type | Meaning |
| --- | --- | --- |
| `model_provider` | `str` | Provider name inside the selected plugin. Use the value of `DIFY_AGENT_PROVIDER` from `dify-agent/.env`. |
| `model` | `str` | Model name. Use the value of `DIFY_AGENT_MODEL_NAME` from `dify-agent/.env`. |
| `credentials` | `dict[str, str \| int \| float \| bool \| None]` | Provider-specific credential object. |
| `model_settings` | `ModelSettings \| None` | Optional pydantic-ai model settings. |
The plugin LLM layer type id is `dify.plugin.llm`.
## Basic usage
```python {test="skip" lint="skip"}
from dify_agent.layers.dify_plugin import DIFY_PLUGIN_LLM_LAYER_TYPE_ID, DifyPluginLLMLayerConfig
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, RunLayerSpec
MODEL_PROVIDER = "replace-with-provider-from-dify-agent-env"
MODEL_NAME = "replace-with-model-from-dify-agent-env"
llm_layer = RunLayerSpec(
name=DIFY_AGENT_MODEL_LAYER_ID,
type=DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
deps={"plugin": "plugin"},
config=DifyPluginLLMLayerConfig(
model_provider=MODEL_PROVIDER,
model=MODEL_NAME,
credentials={"api_key": "replace-with-provider-key"},
),
)
```
`deps={"plugin": "plugin"}` means: bind the LLM layer's dependency field named
`plugin` to the composition layer named `plugin`.
Set `MODEL_PROVIDER` and `MODEL_NAME` to the same values as
`DIFY_AGENT_PROVIDER` and `DIFY_AGENT_MODEL_NAME` in `dify-agent/.env`.
## Complete minimal model composition
Most runs include a prompt, plugin context, and LLM layer:
```python {test="skip" lint="skip"}
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig
from dify_agent.layers.dify_plugin import (
DIFY_PLUGIN_LAYER_TYPE_ID,
DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
DifyPluginLLMLayerConfig,
DifyPluginLayerConfig,
)
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, RunComposition, RunLayerSpec
MODEL_PROVIDER = "replace-with-provider-from-dify-agent-env"
MODEL_NAME = "replace-with-model-from-dify-agent-env"
composition = RunComposition(
layers=[
RunLayerSpec(
name="prompt",
type=PLAIN_PROMPT_LAYER_TYPE_ID,
config=PromptLayerConfig(prefix="You are concise.", user="Say hello."),
),
RunLayerSpec(
name="plugin",
type=DIFY_PLUGIN_LAYER_TYPE_ID,
config=DifyPluginLayerConfig(
tenant_id="replace-with-tenant-id",
plugin_id="langgenius/openai",
),
),
RunLayerSpec(
name=DIFY_AGENT_MODEL_LAYER_ID,
type=DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
deps={"plugin": "plugin"},
config=DifyPluginLLMLayerConfig(
model_provider=MODEL_PROVIDER,
model=MODEL_NAME,
credentials={"api_key": "replace-with-provider-key"},
),
),
]
)
```
## Notes
- The model layer must use the reserved name `llm` (`DIFY_AGENT_MODEL_LAYER_ID`).
- Credential shape depends on the selected plugin provider; the OpenAI-style
`api_key` field above is only an example.
- Client-submitted model credentials remain in the scheduled request memory and
are not part of run records or session snapshots.

View File

@ -0,0 +1,72 @@
# Prompt layer
The prompt layer provides the current run's system and user prompt fragments. In
Dify Agent request bodies it is a regular `RunLayerSpec` with type id
`plain.prompt`.
Use it for:
- system instructions that should be sent on this run
- the current user input
- optional suffix system instructions
## Config fields
| Field | Type | Meaning |
| --- | --- | --- |
| `prefix` | `str` or `list[str]` | System prompt fragments collected before other prompt content. |
| `user` | `str` or `list[str]` | Current user-message fragments for the run. |
| `suffix` | `str` or `list[str]` | System prompt fragments collected after prefix content. |
All fields default to an empty list. Dify Agent rejects a create-run request when
the effective user prompt is empty or whitespace-only.
## Basic usage
```python {test="skip" lint="skip"}
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig
from dify_agent.protocol import RunLayerSpec
prompt_layer = RunLayerSpec(
name="prompt",
type=PLAIN_PROMPT_LAYER_TYPE_ID,
config=PromptLayerConfig(
prefix="You are a concise assistant.",
user="Summarize the incident in one paragraph.",
),
)
```
## Multiple prompt fragments
Use lists when the caller wants to keep fragments separate while still sending one
run:
```python {test="skip" lint="skip"}
prompt_layer = RunLayerSpec(
name="prompt",
type=PLAIN_PROMPT_LAYER_TYPE_ID,
config=PromptLayerConfig(
prefix=[
"You are an incident response assistant.",
"Prefer concrete mitigation steps.",
],
user=[
"Database latency is elevated.",
"Return the likely severity and next actions.",
],
suffix="Do not invent metrics that are not provided.",
),
)
```
## Notes
- The run API does not accept a top-level `user_prompt`; submit user input through
a prompt layer.
- Prompt layer names are not reserved by the runtime, but `prompt` is the
recommended conventional name.
- When a [history layer](../history-layer/index.md) is present, current system
prompts are sent as a temporary prefix before stored history and are not saved
into memory.

View File

@ -0,0 +1,101 @@
# Structured output layer
The structured output layer makes the final answer follow a caller-provided JSON
Schema. Add it when the client needs a JSON object instead of plain text.
When present, Dify Agent exposes the schema to the model as a structured-output
tool and validates the model response against the same schema.
## Layer contract
| Property | Value |
| --- | --- |
| Reserved layer name | `output` |
| Type id | `dify.output` |
| Config | `DifyOutputLayerConfig` |
| Dependencies | none |
Use at most one structured output layer. It must be named `output`.
## Config fields
| Field | Type | Meaning |
| --- | --- | --- |
| `json_schema` | `dict[str, JsonValue]` | Top-level object JSON Schema for the final answer. |
| `description` | `str \| None` | Optional model-facing tool description. |
| `strict` | `bool \| None` | Optional strictness flag passed to the output tool. |
The structured-output tool name is fixed to `final_output`.
## Basic usage
```python {test="skip" lint="skip"}
from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerConfig
from dify_agent.protocol import DIFY_AGENT_OUTPUT_LAYER_ID, RunLayerSpec
output_layer = RunLayerSpec(
name=DIFY_AGENT_OUTPUT_LAYER_ID,
type=DIFY_OUTPUT_LAYER_TYPE_ID,
config=DifyOutputLayerConfig(
description="Structured incident summary returned by the agent.",
strict=True,
json_schema={
"type": "object",
"properties": {
"title": {"type": "string"},
"severity": {"type": "string", "enum": ["low", "medium", "high"]},
"actions": {"type": "array", "items": {"type": "string"}},
},
"required": ["title", "severity", "actions"],
"additionalProperties": False,
},
),
)
```
On success, the terminal event contains the validated JSON-safe object:
```python {test="skip" lint="skip"}
async for event in client.stream_events(run_id):
if event.type == "run_succeeded":
structured_output = event.data.output
```
If the `output` layer is omitted, Dify Agent keeps the default plain text output
contract.
## Schema limits
The first structured-output version supports a practical subset of JSON Schema:
- the top-level schema must be an object (`"type": "object"`)
- the model-facing structured-output tool name is always `final_output`
- remote `$ref` values are not supported
- local refs are supported only under `#/$defs/...`
- recursive `$defs` refs are not supported
- `$ref` values inside ordinary literal keywords such as `const`, `enum`,
`example`, and `examples` are treated as data, not schema refs
## Validation and retry behavior
The runtime builds a pydantic-ai output contract from the layer config. The same
contract exposes the model-facing schema and validates the returned object.
If the model returns an invalid object, pydantic-ai's normal output-validation
retry behavior applies. If retries are exhausted, the run ends with `run_failed`.
## Resuming runs with structured output
Session snapshots store layer runtime state, not output-layer config. If you
resume a run that uses structured output, include the same `output` layer again so
the runtime can rebuild the output contract.
## Troubleshooting
| Symptom | What to check |
| --- | --- |
| `must use reserved layer name 'output'` | Rename the layer to `output`. |
| Structured output falls back to text | Confirm the `output` layer is present and has type `dify.output`. |
| Run fails before model resolution | Validate the JSON Schema and `$ref` usage. |
| Resume loses structured output | Resubmit the same output layer; snapshots do not store the schema. |

View File

@ -15,7 +15,13 @@ nav:
- Examples: agenton/examples/index.md
- Dify Agent:
- Overview: dify-agent/index.md
- Get Started: dify-agent/get-started/index.md
- User Manual:
- Get Started: dify-agent/get-started/index.md
- Prompt Layer: dify-agent/user-manual/prompt-layer/index.md
- Plugin Layer: dify-agent/user-manual/plugin-layer/index.md
- Plugin LLM Layer: dify-agent/user-manual/plugin-llm-layer/index.md
- History Layer: dify-agent/user-manual/history-layer/index.md
- Structured Output Layer: dify-agent/user-manual/structured-output-layer/index.md
- Operations Guide: dify-agent/guide/index.md
- Run API: dify-agent/api/index.md
- Examples: dify-agent/examples/index.md

View File

@ -6,7 +6,7 @@ readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"httpx>=0.28.1",
"pydantic>=2.12.5,<2.13",
"pydantic>=2.12.5,<3",
"pydantic-ai-slim>=1.85.1",
"typing-extensions>=4.12.2",
]

View File

@ -4,8 +4,16 @@ from agenton_collections.layers.pydantic_ai.bridge import (
PydanticAIBridgeLayer,
PydanticAIBridgeLayerDeps,
)
from agenton_collections.layers.pydantic_ai.history import (
PYDANTIC_AI_HISTORY_LAYER_TYPE_ID,
PydanticAIHistoryLayer,
PydanticAIHistoryRuntimeState,
)
__all__ = [
"PydanticAIBridgeLayer",
"PydanticAIBridgeLayerDeps",
"PYDANTIC_AI_HISTORY_LAYER_TYPE_ID",
"PydanticAIHistoryLayer",
"PydanticAIHistoryRuntimeState",
]

View File

@ -0,0 +1,68 @@
"""Serializable pydantic-ai conversation history layer.
This layer keeps pydantic-ai ``ModelMessage`` history inside Agenton's
serializable ``runtime_state`` so compositor session snapshots can persist and
restore typed messages without any separate storage protocol. The layer is
intentionally state-only: it contributes no system prompts, user prompts, or
tools, and it owns no live resources. Integrations should read
``message_history`` before ``Agent.run(message_history=...)`` and then write
back only the history shape they intend to persist after success, for example
replacing with ``result.all_messages()`` or appending only
``result.new_messages()`` when temporary prompt prefixes must stay ephemeral.
"""
from collections.abc import Sequence
from typing import ClassVar, Final
from pydantic import BaseModel, ConfigDict, Field
from pydantic_ai.messages import ModelMessage
from agenton.layers import EmptyLayerConfig, NoLayerDeps, PydanticAILayer
PYDANTIC_AI_HISTORY_LAYER_TYPE_ID: Final[str] = "pydantic_ai.history"
class PydanticAIHistoryRuntimeState(BaseModel):
"""Serializable history state stored in Agenton session snapshots."""
messages: list[ModelMessage] = Field(default_factory=list)
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid", validate_assignment=True)
class PydanticAIHistoryLayer(
PydanticAILayer[NoLayerDeps, object, EmptyLayerConfig, PydanticAIHistoryRuntimeState]
):
"""State-only layer that stores pydantic-ai message history.
The mutable history lives only in ``runtime_state.messages``. Helper methods
always assign fresh lists instead of mutating the stored list in place so
Pydantic assignment validation continues to guard the serialized state.
"""
type_id: ClassVar[str | None] = PYDANTIC_AI_HISTORY_LAYER_TYPE_ID
@property
def message_history(self) -> list[ModelMessage]:
"""Return a shallow copy of the stored message history."""
return list(self.runtime_state.messages)
def replace_messages(self, messages: Sequence[ModelMessage]) -> None:
"""Replace the stored history with a validated copy of ``messages``."""
self.runtime_state.messages = list(messages)
def append_messages(self, messages: Sequence[ModelMessage]) -> None:
"""Append ``messages`` while keeping assignment validation on write."""
self.runtime_state.messages = [*self.runtime_state.messages, *messages]
def clear(self) -> None:
"""Remove all stored history messages."""
self.runtime_state.messages = []
__all__ = [
"PYDANTIC_AI_HISTORY_LAYER_TYPE_ID",
"PydanticAIHistoryLayer",
"PydanticAIHistoryRuntimeState",
]

View File

@ -8,7 +8,6 @@ imports do not pull in server execution code.
from __future__ import annotations
import re
from typing import ClassVar, Final
from pydantic import ConfigDict, JsonValue, field_validator
@ -17,7 +16,6 @@ from agenton.layers import LayerConfig
DIFY_OUTPUT_LAYER_TYPE_ID: Final[str] = "dify.output"
_OUTPUT_TOOL_NAME_PATTERN: Final[re.Pattern[str]] = re.compile(r"^[A-Za-z0-9_-]{1,64}$")
class DifyOutputLayerConfig(LayerConfig):
@ -30,15 +28,13 @@ class DifyOutputLayerConfig(LayerConfig):
schemas plus local ``#/$defs/...`` references so the same caller-provided
schema can drive both runtime validation and model-facing tool exposure; the
exposure copy may inline supported ``$defs`` refs as needed for the
Pydantic/Pydantic AI integration. ``name`` becomes the structured-output
tool name exposed to pydantic-ai, defaults to ``final_result``, and must be
1-64 ASCII letters, numbers, underscores, or hyphens so downstream model
providers accept it consistently. ``description`` and ``strict`` are passed
through to the generated structured-output tool definition.
Pydantic/Pydantic AI integration. The structured-output tool name and schema
title exposed to pydantic-ai are fixed to ``final_output`` so callers only
control the JSON Schema itself plus any optional description/strictness
metadata.
"""
json_schema: dict[str, JsonValue]
name: str = "final_result"
description: str | None = None
strict: bool | None = None
@ -51,12 +47,4 @@ class DifyOutputLayerConfig(LayerConfig):
raise ValueError("Schema must declare an object output.")
return value
@field_validator("name")
@classmethod
def _ensure_safe_tool_name(cls, value: str) -> str:
if not _OUTPUT_TOOL_NAME_PATTERN.fullmatch(value):
raise ValueError("name must be 1-64 characters of letters, numbers, underscores, or hyphens.")
return value
__all__ = ["DIFY_OUTPUT_LAYER_TYPE_ID", "DifyOutputLayerConfig"]

View File

@ -34,6 +34,8 @@ from agenton.layers import EmptyRuntimeState, NoLayerDeps, PlainLayer
from dify_agent.layers.output.configs import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerConfig
_FINAL_OUTPUT_TOOL_NAME: Final[str] = "final_output"
_VALIDATED_OUTPUT_TYPE_NAME: Final[str] = f"DifyValidatedOutput_{_FINAL_OUTPUT_TOOL_NAME}"
_LOCAL_DEFS_REF_PREFIX: Final[str] = "#/$defs/"
_NON_SCHEMA_VALUE_KEYWORDS: Final[frozenset[str]] = frozenset({"const", "default", "enum", "example", "examples"})
@ -71,7 +73,9 @@ class DifyOutputLayer(PlainLayer[NoLayerDeps, DifyOutputLayerConfig, EmptyRuntim
runtime validation inside the same dynamically generated dict-like type.
First-version support is intentionally limited to top-level object JSON
Schemas so the same schema can be validated with ``jsonschema`` and then
exposed to Pydantic AI without any wrapper/unwrapper translation.
exposed to Pydantic AI without any wrapper/unwrapper translation. The
public tool name and exposed schema title are always ``final_output`` so
providers see one stable structured-output contract shape.
Raises:
ValueError: If the JSON Schema is invalid, contains non-local
@ -82,7 +86,6 @@ class DifyOutputLayer(PlainLayer[NoLayerDeps, DifyOutputLayerConfig, EmptyRuntim
_reject_non_local_refs(user_schema)
validated_output_type = _build_validated_output_type(
user_schema,
name=self.config.name,
description=self.config.description,
)
@ -91,7 +94,7 @@ class DifyOutputLayer(PlainLayer[NoLayerDeps, DifyOutputLayerConfig, EmptyRuntim
OutputSpec[object],
ToolOutput(
validated_output_type,
name=self.config.name,
name=_FINAL_OUTPUT_TOOL_NAME,
strict=self.config.strict,
),
),
@ -111,18 +114,16 @@ def _build_json_schema_validator(schema: dict[str, JsonValue]) -> JsonSchemaVali
def _build_validated_output_type(
schema: dict[str, JsonValue],
*,
name: str,
description: str | None,
) -> type[dict[str, object]]:
"""Create a dict-like output type with custom JSON schema and validation hooks.
The generated type is unique per output layer config. Its Pydantic core
The generated type object is fresh per output layer config. Its Pydantic core
schema performs real ``jsonschema`` validation, while its JSON schema hook
exposes a model-facing schema that Pydantic AI can turn into an output tool.
"""
validator = _build_json_schema_validator(schema)
exposed_schema = _build_exposed_json_schema(schema, name=name, description=description)
type_name = _build_output_type_name(name)
exposed_schema = _build_exposed_json_schema(schema, description=description)
def _validate_output(value: dict[str, object]) -> object:
errors = sorted(validator.iter_errors(cast(JsonValue, value)), key=lambda error: _sort_error_path(error.path))
@ -165,14 +166,13 @@ def _build_validated_output_type(
"__get_pydantic_core_schema__": __get_pydantic_core_schema__,
"__get_pydantic_json_schema__": __get_pydantic_json_schema__,
}
validated_output_type = cast(type[dict[str, object]], type(type_name, (dict,), namespace))
validated_output_type = cast(type[dict[str, object]], type(_VALIDATED_OUTPUT_TYPE_NAME, (dict,), namespace))
return validated_output_type
def _build_exposed_json_schema(
schema: dict[str, JsonValue],
*,
name: str,
description: str | None,
) -> dict[str, JsonValue]:
"""Return the schema exposed to the model through Pydantic AI.
@ -183,18 +183,10 @@ def _build_exposed_json_schema(
attached.
"""
exposed_schema = _inline_local_defs_refs(schema)
exposed_schema["title"] = name
exposed_schema["title"] = _FINAL_OUTPUT_TOOL_NAME
if description is not None:
exposed_schema["description"] = description
return exposed_schema
def _build_output_type_name(name: str) -> str:
"""Return a deterministic debug-friendly class name for one output schema."""
sanitized = "".join(character if character.isalnum() else "_" for character in name).strip("_") or "final_result"
return f"DifyValidatedOutput_{sanitized}"
def _reject_non_local_refs(schema: JsonValue) -> None:
"""Reject references that would require external fetching or non-local state.

View File

@ -1,6 +1,7 @@
"""Public protocol exports shared by the Dify Agent server and clients."""
from .schemas import (
DIFY_AGENT_HISTORY_LAYER_ID,
DIFY_AGENT_MODEL_LAYER_ID,
DIFY_AGENT_OUTPUT_LAYER_ID,
RUN_EVENT_ADAPTER,
@ -30,6 +31,7 @@ __all__ = [
"BaseRunEvent",
"CreateRunRequest",
"CreateRunResponse",
"DIFY_AGENT_HISTORY_LAYER_ID",
"DIFY_AGENT_MODEL_LAYER_ID",
"DIFY_AGENT_OUTPUT_LAYER_ID",
"EmptyRunEventData",

View File

@ -17,7 +17,8 @@ public ``id``/``run_id``/``type``/``data``/``created_at`` shape, while each
``type`` has a typed ``data`` model so OpenAPI, Redis replay, and clients parse
the same payload contract. Model/provider selection is part of the submitted
composition, not a top-level run field; the runtime reads the model layer named
by ``DIFY_AGENT_MODEL_LAYER_ID`` and the optional structured output layer named
by ``DIFY_AGENT_MODEL_LAYER_ID``, the optional history layer named by
``DIFY_AGENT_HISTORY_LAYER_ID``, and the optional structured output layer named
by ``DIFY_AGENT_OUTPUT_LAYER_ID``. Request-level ``on_exit`` signals decide
whether each active layer is suspended or deleted when the run exits, with
suspend as the default so successful terminal events can include resumable
@ -42,6 +43,7 @@ from agenton.layers import ExitIntent
DIFY_AGENT_MODEL_LAYER_ID: Final[str] = "llm"
DIFY_AGENT_HISTORY_LAYER_ID: Final[str] = "history"
DIFY_AGENT_OUTPUT_LAYER_ID: Final[str] = "output"
RunStatus = Literal["running", "succeeded", "failed"]
RunEventType = Literal[
@ -104,8 +106,10 @@ class CreateRunRequest(BaseModel):
"""Request body for creating one async agent run.
Model/provider configuration must be supplied through the composition layer
named by ``DIFY_AGENT_MODEL_LAYER_ID``. Structured output may be supplied
through the optional composition layer named by
named by ``DIFY_AGENT_MODEL_LAYER_ID``. Optional persisted conversation
history may be supplied through the composition layer named by
``DIFY_AGENT_HISTORY_LAYER_ID``. Structured output may be supplied through
the optional composition layer named by
``DIFY_AGENT_OUTPUT_LAYER_ID``. ``on_exit`` defaults every active layer to
suspend so callers receive a resumable success snapshot unless they
explicitly request delete for one or more layers. Session snapshots do not
@ -254,6 +258,7 @@ __all__ = [
"BaseRunEvent",
"CreateRunRequest",
"CreateRunResponse",
"DIFY_AGENT_HISTORY_LAYER_ID",
"DIFY_AGENT_MODEL_LAYER_ID",
"DIFY_AGENT_OUTPUT_LAYER_ID",
"EmptyRunEventData",

View File

@ -2,10 +2,10 @@
The run request carries model/provider selection in the layer graph. This helper
keeps Agent construction details out of ``AgentRunRunner`` while accepting an
already resolved Pydantic AI model from the configured model layer. Prompt and
tool values arriving here are already transformed by Agenton's
``PYDANTIC_AI_TRANSFORMERS`` preset; this module registers those pydantic-ai
objects without reimplementing plain/pydantic-ai conversion logic. The caller
already resolved Pydantic AI model from the configured model layer. Tool values
arriving here are already transformed by Agenton's
``PYDANTIC_AI_TRANSFORMERS`` preset, while Dify system prompts are rendered into
temporary ``message_history`` before the call reaches this helper. The caller
also passes the already resolved ``output_type`` so legacy text output and the
optional JSON Schema output layer share the same ``Agent`` construction path.
"""
@ -18,13 +18,12 @@ from pydantic_ai.messages import UserContent
from pydantic_ai.models import Model
from pydantic_ai.output import OutputSpec
from agenton.layers.types import PydanticAIPrompt, PydanticAITool
from agenton.layers.types import PydanticAITool
def create_agent(
model: Model[Any],
*,
system_prompts: Sequence[PydanticAIPrompt[object]],
tools: Sequence[PydanticAITool[object]],
output_type: OutputSpec[object] = str,
) -> Agent[None, object]:
@ -36,10 +35,7 @@ def create_agent(
carries the Pydantic hooks needed for schema exposure and runtime validation,
so agent construction does not need to register a separate validator.
"""
agent = cast(Agent[None, object], Agent(model, output_type=output_type, tools=tools))
for prompt in system_prompts:
_ = agent.system_prompt(cast(Any, prompt))
return agent
return cast(Agent[None, object], Agent(model, output_type=output_type, tools=tools))
def normalize_user_input(user_prompts: Sequence[UserContent]) -> str | Sequence[UserContent]:

View File

@ -1,12 +1,13 @@
"""Safe Agenton compositor construction for API-submitted configs.
Only explicitly allowed provider type ids are constructible here. The default
provider set contains prompt layers, the state-free Dify structured output
layer, plus Dify plugin LLM layers. Public DTOs provide tenant/plugin/model
data, while server-only plugin daemon settings are injected through the provider
factory for ``DifyPluginLayer``. The resulting ``Compositor`` remains Agenton
state-only: live resources such as the plugin daemon HTTP client are supplied
later by the runtime and never enter providers, layers, or session snapshots.
provider set contains prompt layers, the optional pydantic-ai history layer, the
state-free Dify structured output layer, plus Dify plugin LLM layers. Public
DTOs provide tenant/plugin/model data, while server-only plugin daemon settings
are injected through the provider factory for ``DifyPluginLayer``. The resulting
``Compositor`` remains Agenton state-only: live resources such as the plugin
daemon HTTP client are supplied later by the runtime and never enter providers,
layers, or session snapshots.
"""
from collections.abc import Mapping, Sequence
@ -16,6 +17,7 @@ from pydantic_ai.messages import UserContent
from agenton.compositor import Compositor, CompositorConfig, LayerProvider, LayerProviderInput
from agenton.layers.types import AllPromptTypes, AllToolTypes, AllUserPromptTypes, PydanticAIPrompt, PydanticAITool
from agenton_collections.layers.pydantic_ai import PydanticAIHistoryLayer
from agenton_collections.layers.plain.basic import PromptLayer
from agenton_collections.transformers.pydantic_ai import PYDANTIC_AI_TRANSFORMERS
from dify_agent.layers.dify_plugin.configs import DifyPluginLayerConfig
@ -35,6 +37,7 @@ def create_default_layer_providers(
"""Return the server provider set of safe config-constructible layers."""
return (
LayerProvider.from_layer_type(PromptLayer),
LayerProvider.from_layer_type(PydanticAIHistoryLayer),
LayerProvider.from_layer_type(DifyOutputLayer),
LayerProvider.from_factory(
layer_type=DifyPluginLayer,

View File

@ -0,0 +1,133 @@
"""Helpers for optional Dify Agent history-layer integration.
Dify Agent keeps pydantic-ai conversation history as an optional Agenton layer
named ``history``. The runner always injects the current Dify system prompt via
temporary ``message_history`` instead of ``Agent.system_prompt(...)`` so the
model sees ``current system prompt -> stored history -> current user prompt``
even when persisted history is present. Only zero-argument system prompt
callables are supported here because the prompts are rendered outside
pydantic-ai's normal run context; this matches Dify's current plain-prompt
compositions and fails fast for unsupported context-dependent prompt shapes.
"""
from __future__ import annotations
import inspect
from collections.abc import Awaitable, Callable, Sequence
from typing import Protocol, cast
from pydantic_ai.messages import ModelMessage, ModelRequest, SystemPromptPart
from agenton.layers.types import PydanticAIPrompt
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID, PydanticAIHistoryLayer
from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID
from dify_agent.protocol.schemas import RunComposition
class SupportsHistoryLayerLookup(Protocol):
"""Minimal entered-run surface needed by the history helper."""
def get_layer(self, name: str, layer_type: type[PydanticAIHistoryLayer]) -> PydanticAIHistoryLayer:
"""Return a typed layer instance or raise lookup/type errors."""
...
def validate_history_layer_composition(composition: RunComposition) -> None:
"""Reject unsupported public history-layer graph shapes."""
history_layers = [layer for layer in composition.layers if layer.type == PYDANTIC_AI_HISTORY_LAYER_TYPE_ID]
if not history_layers:
return
if len(history_layers) > 1:
names = ", ".join(layer.name for layer in history_layers)
raise ValueError(
f"Only one '{PYDANTIC_AI_HISTORY_LAYER_TYPE_ID}' layer is supported, named "
f"'{DIFY_AGENT_HISTORY_LAYER_ID}'. Found layers: {names}."
)
history_layer = history_layers[0]
if history_layer.name != DIFY_AGENT_HISTORY_LAYER_ID:
raise ValueError(
f"Layer type '{PYDANTIC_AI_HISTORY_LAYER_TYPE_ID}' must use reserved layer name "
f"'{DIFY_AGENT_HISTORY_LAYER_ID}', got '{history_layer.name}'."
)
if history_layer.deps:
dependency_names = ", ".join(sorted(history_layer.deps))
raise ValueError(
f"Layer type '{PYDANTIC_AI_HISTORY_LAYER_TYPE_ID}' does not support dependencies; "
f"got dependency keys: {dependency_names}."
)
def get_history_layer(run: SupportsHistoryLayerLookup) -> PydanticAIHistoryLayer | None:
"""Return the active history layer when the reserved slot is present."""
try:
return run.get_layer(DIFY_AGENT_HISTORY_LAYER_ID, PydanticAIHistoryLayer)
except KeyError:
return None
async def build_run_message_history(
*,
system_prompts: Sequence[PydanticAIPrompt[object]],
stored_history: Sequence[ModelMessage],
) -> list[ModelMessage] | None:
"""Build temporary pydantic-ai history for one Dify Agent loop.
Current system prompts are rendered first into one transient
``ModelRequest`` prefix, followed by any already stored history messages.
When both inputs are empty, the helper returns ``None`` so callers can omit
the ``message_history`` argument entirely and preserve pydantic-ai's empty
history behavior.
"""
rendered_system_parts: list[SystemPromptPart] = []
for prompt in system_prompts:
prompt_text = await _render_system_prompt(prompt)
if prompt_text is None:
continue
rendered_system_parts.append(SystemPromptPart(content=prompt_text))
message_history: list[ModelMessage] = []
if rendered_system_parts:
message_history.append(ModelRequest(parts=rendered_system_parts))
message_history.extend(stored_history)
return message_history or None
def append_successful_run_history(
history_layer: PydanticAIHistoryLayer | None,
new_messages: Sequence[ModelMessage],
) -> None:
"""Append only newly produced pydantic-ai messages after successful runs."""
if history_layer is None or not new_messages:
return
history_layer.append_messages(new_messages)
async def _render_system_prompt(prompt: PydanticAIPrompt[object]) -> str | None:
signature = inspect.signature(prompt)
if signature.parameters:
raise ValueError(
"Dify Agent runtime currently supports only zero-argument system prompts when rendering temporary "
"message history."
)
prompt_without_context = cast(Callable[[], str | None | Awaitable[str | None]], prompt)
prompt_value = prompt_without_context()
if inspect.isawaitable(prompt_value):
prompt_value = await prompt_value
if prompt_value is None:
return None
if not isinstance(prompt_value, str):
raise TypeError(f"System prompt callables must return str | None, got '{type(prompt_value).__name__}'.")
return prompt_value
__all__ = [
"SupportsHistoryLayerLookup",
"append_successful_run_history",
"build_run_message_history",
"get_history_layer",
"validate_history_layer_composition",
]

View File

@ -6,10 +6,11 @@ task registry. Redis remains the durable source for status and event streams, bu
there is no Redis job queue or cross-process handoff. If the process crashes,
currently active runs are lost until an external operator marks or retries them.
Create-run validation enters a lightweight Agenton run before persistence so the
same transformed user prompts, optional structured output contract, and
top-level ``on_exit`` policy used by execution are checked without relying on
removed session/control APIs; Dify's default layers keep lifecycle hooks
side-effect free so this validation does not open plugin daemon clients.
same transformed user prompts, temporary system-prompt history assembly,
optional structured output contract, and top-level ``on_exit`` policy used by
execution are checked without relying on removed session/control APIs; Dify's
default layers keep lifecycle hooks side-effect free so this validation does not
open plugin daemon clients.
"""
import asyncio
@ -24,6 +25,7 @@ from dify_agent.protocol.schemas import CreateRunRequest, normalize_composition
from dify_agent.runtime.agenton_validation import is_agenton_enter_validation_runtime_error
from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_providers
from dify_agent.runtime.event_sink import RunEventSink, emit_run_failed
from dify_agent.runtime.history import build_run_message_history, get_history_layer, validate_history_layer_composition
from dify_agent.runtime.layer_exit_signals import apply_layer_exit_signals, validate_layer_exit_signals
from dify_agent.runtime.output_type import resolve_run_output_contract, validate_output_layer_composition
from dify_agent.runtime.runner import AgentRunRunner
@ -169,18 +171,20 @@ async def validate_run_request(
) -> None:
"""Validate create-run semantics that require an entered Agenton run.
This boundary rejects unsupported output-layer graph shapes, unknown
This boundary rejects unsupported output/history-layer graph shapes, unknown
``on_exit`` layer ids, effectively empty transformed user prompts, and known
enter-time snapshot lifecycle errors before the scheduler persists a run
record. It also exercises provider config validation, structured output
contract construction, and snapshot hydration without touching external
services because Dify plugin daemon clients are owned by the FastAPI
lifespan, not Agenton lifecycle hooks.
record. It also exercises provider config validation, temporary
system-prompt history assembly, structured output contract construction, and
snapshot hydration without touching external services because Dify plugin
daemon clients are owned by the FastAPI lifespan, not Agenton lifecycle
hooks.
"""
resolved_layer_providers = layer_providers if layer_providers is not None else create_default_layer_providers()
entered_run = False
try:
validate_output_layer_composition(request.composition)
validate_history_layer_composition(request.composition)
graph_config, layer_configs = normalize_composition(request.composition)
compositor = build_pydantic_ai_compositor(
graph_config,
@ -190,6 +194,11 @@ async def validate_run_request(
async with compositor.enter(configs=layer_configs, session_snapshot=request.session_snapshot) as run:
entered_run = True
apply_layer_exit_signals(run, request.on_exit)
history_layer = get_history_layer(run)
_ = await build_run_message_history(
system_prompts=run.prompts,
stored_history=history_layer.message_history if history_layer is not None else (),
)
if not has_non_blank_user_prompt(run.user_prompts):
raise RunRequestValidationError(EMPTY_USER_PROMPTS_ERROR)
_ = resolve_run_output_contract(run)

View File

@ -2,19 +2,22 @@
The runner is storage-agnostic: it normalizes the public Dify composition into
Agenton's graph/config split, enters a fresh ``CompositorRun`` (or resumes one
from a snapshot), runs pydantic-ai with ``run.user_prompts`` as the user input,
emits stream events, applies request-level ``on_exit`` signals, and then
publishes a terminal success or failure event. The Pydantic AI model is resolved
from the active Agenton layer named by ``DIFY_AGENT_MODEL_LAYER_ID``. An
optional structured output layer named by ``DIFY_AGENT_OUTPUT_LAYER_ID`` is read
after entry and resolved into an output contract whose type both exposes the
output schema to the model and performs runtime JSON Schema validation through
custom Pydantic hooks. Invalid structured outputs therefore trigger Pydantic
AI's normal output-validation retry behavior before Dify Agent emits
``run_succeeded``. Layers still never own the FastAPI lifespan-owned plugin
daemon HTTP client. Successful terminal events contain both the JSON-safe final
output and session snapshot; there are no separate output or snapshot events to
correlate.
from a snapshot), renders the current Dify system prompts into temporary
``message_history``, runs pydantic-ai with ``run.user_prompts`` as the current
user input, emits stream events, applies request-level ``on_exit`` signals, and
then publishes a terminal success or failure event. The Pydantic AI model is
resolved from the active Agenton layer named by ``DIFY_AGENT_MODEL_LAYER_ID``.
An optional history layer contributes stored message history only through
session state; successful runs append only ``result.new_messages()`` back into
that layer so current system prompts are not persisted. An optional structured
output layer named by ``DIFY_AGENT_OUTPUT_LAYER_ID`` is read after entry and
resolved into an output contract whose type both exposes the output schema to
the model and performs runtime JSON Schema validation through custom Pydantic
hooks. Invalid structured outputs therefore trigger Pydantic AI's normal
output-validation retry behavior before Dify Agent emits ``run_succeeded``.
Layers still never own the FastAPI lifespan-owned plugin daemon HTTP client.
Successful terminal events contain both the JSON-safe final output and session
snapshot; there are no separate output or snapshot events to correlate.
"""
from collections.abc import AsyncIterable
@ -37,6 +40,12 @@ from dify_agent.runtime.event_sink import (
emit_run_started,
emit_run_succeeded,
)
from dify_agent.runtime.history import (
append_successful_run_history,
build_run_message_history,
get_history_layer,
validate_history_layer_composition,
)
from dify_agent.runtime.layer_exit_signals import apply_layer_exit_signals, validate_layer_exit_signals
from dify_agent.runtime.output_type import resolve_run_output_contract, validate_output_layer_composition
from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt
@ -100,17 +109,18 @@ class AgentRunRunner:
Known input-shaped Agenton enter-time runtime errors, such as trying to
resume a ``CLOSED`` snapshot layer, are normalized to
``AgentRunValidationError``. Output-layer graph invariants are validated
from the public composition before entering Agenton so misnamed or extra
``dify.output`` layers never silently degrade to text output. Later
runtime failures still propagate as execution errors so they become
terminal failed runs rather than client validation responses. Structured
output uses a resolved contract whose type itself encodes both the
model-facing schema and the runtime validation hooks, so invalid model
outputs can be corrected before Dify Agent emits success.
``AgentRunValidationError``. Output/history-layer graph invariants are
validated from the public composition before entering Agenton so
misnamed or extra reserved layers never silently degrade. Later runtime
failures still propagate as execution errors so they become terminal
failed runs rather than client validation responses. Structured output
uses a resolved contract whose type itself encodes both the model-facing
schema and the runtime validation hooks, so invalid model outputs can be
corrected before Dify Agent emits success.
"""
try:
validate_output_layer_composition(self.request.composition)
validate_history_layer_composition(self.request.composition)
graph_config, layer_configs = normalize_composition(self.request.composition)
compositor = build_pydantic_ai_compositor(graph_config, providers=self.layer_providers)
validate_layer_exit_signals(compositor, self.request.on_exit)
@ -132,6 +142,11 @@ class AgentRunRunner:
try:
output_contract = resolve_run_output_contract(run)
history_layer = get_history_layer(run)
message_history = await build_run_message_history(
system_prompts=run.prompts,
stored_history=history_layer.message_history if history_layer is not None else (),
)
llm_layer = run.get_layer(DIFY_AGENT_MODEL_LAYER_ID, DifyPluginLLMLayer)
model = llm_layer.get_model(http_client=self.plugin_daemon_http_client)
except (KeyError, TypeError, RuntimeError, ValueError) as exc:
@ -139,12 +154,16 @@ class AgentRunRunner:
agent = create_agent(
model,
system_prompts=run.prompts,
tools=run.tools,
output_type=output_contract.output_type,
)
result = await agent.run(normalize_user_input(user_prompts), event_stream_handler=handle_events)
result = await agent.run(
normalize_user_input(user_prompts),
message_history=message_history,
event_stream_handler=handle_events,
)
output = _serialize_agent_output(result.output)
append_successful_run_history(history_layer, result.new_messages())
except RuntimeError as exc:
if not entered_run and is_agenton_enter_validation_runtime_error(exc):
raise AgentRunValidationError(str(exc)) from exc

View File

@ -29,8 +29,11 @@ class RedisRunStore(RunEventSink):
"""Async Redis implementation for run records and event logs.
``run_retention_seconds`` is applied to both the run record key and the
per-run Redis stream. Event writes also refresh the record TTL so long-running
runs that keep producing events do not lose their status record mid-run.
per-run Redis stream. Event writes run ``XADD`` and both TTL refreshes in one
Redis transaction so a newly created stream is not left without expiration if
the client is interrupted between commands. Event writes also refresh the
record TTL so long-running runs that keep producing events do not lose their
status record mid-run.
"""
redis: Redis
@ -81,15 +84,18 @@ class RedisRunStore(RunEventSink):
)
async def append_event(self, event: RunEvent) -> str:
"""Append an event JSON payload to the run's Redis stream."""
"""Append an event JSON payload to the run's Redis stream with TTLs."""
events_key = run_events_key(self.prefix, event.run_id)
payload = RUN_EVENT_ADAPTER.dump_json(event, exclude={"id"}).decode()
event_id = await self.redis.xadd(
events_key,
{"payload": payload},
)
await self.redis.expire(events_key, self.run_retention_seconds)
await self.redis.expire(run_record_key(self.prefix, event.run_id), self.run_retention_seconds)
async with self.redis.pipeline(transaction=True) as pipeline:
_ = pipeline.xadd(
events_key,
{"payload": payload},
)
_ = pipeline.expire(events_key, self.run_retention_seconds)
_ = pipeline.expire(run_record_key(self.prefix, event.run_id), self.run_retention_seconds)
results = cast(list[object], await pipeline.execute())
event_id = results[0]
return event_id.decode() if isinstance(event_id, bytes) else str(event_id)
async def get_events(self, run_id: str, *, after: str = "0-0", limit: int = 100) -> RunEventsResponse:

View File

@ -0,0 +1,96 @@
import asyncio
from pydantic_ai.messages import ModelMessage, ModelRequest, ModelResponse, TextPart, UserPromptPart
from agenton.compositor import Compositor, LayerNode
from agenton.layers import LifecycleState
from agenton_collections.layers.pydantic_ai import PydanticAIHistoryLayer, PydanticAIHistoryRuntimeState
def test_pydantic_ai_history_layer_starts_empty_and_contributes_no_prompts_or_tools() -> None:
layer = PydanticAIHistoryLayer()
assert layer.message_history == []
assert list(layer.prefix_prompts) == []
assert list(layer.suffix_prompts) == []
assert list(layer.user_prompts) == []
assert list(layer.tools) == []
def test_pydantic_ai_history_layer_replace_messages_saves_validated_copy() -> None:
layer = PydanticAIHistoryLayer()
messages = _sample_messages()
layer.replace_messages(messages)
borrowed_messages = layer.message_history
assert borrowed_messages == messages
assert borrowed_messages is not messages
messages.append(ModelResponse(parts=[TextPart(content="later")]))
assert layer.message_history != messages
def test_pydantic_ai_history_layer_append_messages_preserves_order_and_internal_state() -> None:
layer = PydanticAIHistoryLayer()
request, response = _sample_messages()
layer.replace_messages([request])
layer.append_messages((response,))
borrowed_messages = layer.message_history
borrowed_messages.clear()
assert layer.message_history == [request, response]
def test_pydantic_ai_history_layer_clear_removes_stored_messages() -> None:
layer = PydanticAIHistoryLayer()
layer.replace_messages(_sample_messages())
layer.clear()
assert layer.message_history == []
assert layer.runtime_state.messages == []
def test_pydantic_ai_history_runtime_state_round_trips_through_json_dump() -> None:
messages = _sample_messages()
runtime_state = PydanticAIHistoryRuntimeState(messages=messages)
dumped_state = runtime_state.model_dump(mode="json")
restored_state = PydanticAIHistoryRuntimeState.model_validate(dumped_state)
assert restored_state.messages == messages
assert isinstance(restored_state.messages[0], ModelRequest)
assert isinstance(restored_state.messages[1], ModelResponse)
def test_pydantic_ai_history_layer_messages_round_trip_through_session_snapshot() -> None:
compositor = Compositor([LayerNode("history", PydanticAIHistoryLayer)])
messages = _sample_messages()
async def scenario() -> None:
async with compositor.enter() as first_run:
history_layer = first_run.get_layer("history", PydanticAIHistoryLayer)
history_layer.replace_messages(messages)
first_run.suspend_on_exit()
assert first_run.session_snapshot is not None
assert first_run.session_snapshot.layers[0].lifecycle_state is LifecycleState.SUSPENDED
async with compositor.enter(session_snapshot=first_run.session_snapshot) as resumed_run:
history_layer = resumed_run.get_layer("history", PydanticAIHistoryLayer)
assert history_layer.message_history == messages
assert isinstance(history_layer.runtime_state.messages[0], ModelRequest)
assert isinstance(history_layer.runtime_state.messages[1], ModelResponse)
asyncio.run(scenario())
def _sample_messages() -> list[ModelMessage]:
return [
ModelRequest(parts=[UserPromptPart(content="Hello")]),
ModelResponse(parts=[TextPart(content="Hi there")]),
]

View File

@ -121,11 +121,16 @@ def test_output_package_exports_client_safe_config_symbols_only() -> None:
assert not hasattr(output_exports, "DifyOutputLayer")
def test_output_layer_config_accepts_valid_object_schema_and_defaults_name() -> None:
def test_output_layer_config_accepts_valid_object_schema_without_public_tool_name() -> None:
config = DifyOutputLayerConfig(json_schema=_json_schema())
assert DIFY_OUTPUT_LAYER_TYPE_ID == "dify.output"
assert config.name == "final_result"
assert hasattr(config, "name") is False
assert config.model_dump(mode="json") == {
"json_schema": _json_schema(),
"description": None,
"strict": None,
}
assert config.description is None
assert config.strict is None
@ -138,7 +143,7 @@ def test_output_layer_config_rejects_non_object_top_level_json_schema() -> None:
@pytest.mark.parametrize(
("payload", "message"),
[
({"json_schema": _json_schema(), "name": "bad name"}, "letters, numbers, underscores, or hyphens"),
({"json_schema": _json_schema(), "name": "bad name"}, "Extra inputs are not permitted"),
({"json_schema": _json_schema(), "unknown": True}, "Extra inputs are not permitted"),
],
)
@ -150,7 +155,6 @@ def test_output_layer_config_rejects_invalid_input(payload: dict[str, object], m
def test_output_layer_builds_validated_output_contract_for_object_schema() -> None:
config = DifyOutputLayerConfig(
json_schema=_json_schema(),
name="incident_summary",
description="Structured incident summary.",
strict=True,
)
@ -163,11 +167,11 @@ def test_output_layer_builds_validated_output_contract_for_object_schema() -> No
output_adapter = TypeAdapter(_validated_output_type(output_contract.output_type))
assert isinstance(output_type, ToolOutput)
assert output_type.name == "incident_summary"
assert output_type.name == "final_output"
assert output_type.description is None
assert output_type.strict is True
assert output_schema["type"] == "object"
assert output_schema["title"] == "incident_summary"
assert output_schema["title"] == "final_output"
assert output_schema["description"] == "Structured incident summary."
assert output_adapter.validate_python(valid_output) == valid_output
@ -206,7 +210,7 @@ def test_output_layer_rejects_non_defs_local_ref_in_direct_object_schema() -> No
def test_output_layer_keeps_local_defs_ref_working_in_direct_object_schema() -> None:
output_contract = DifyOutputLayer.from_config(
DifyOutputLayerConfig(json_schema=_object_local_defs_ref_schema(), name="direct_defs_result")
DifyOutputLayerConfig(json_schema=_object_local_defs_ref_schema())
).build_output_contract()
output_adapter = TypeAdapter(_validated_output_type(output_contract.output_type))
output_schema = output_adapter.json_schema()
@ -221,7 +225,7 @@ def test_output_layer_keeps_local_defs_ref_working_in_direct_object_schema() ->
},
},
"required": ["items"],
"title": "direct_defs_result",
"title": "final_output",
}
assert output_adapter.validate_python({"items": ["a", "b"]}) == {"items": ["a", "b"]}

View File

@ -8,7 +8,7 @@ from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptL
import dify_agent.protocol as protocol_exports
from dify_agent.layers.dify_plugin import DIFY_PLUGIN_LAYER_TYPE_ID, DIFY_PLUGIN_LLM_LAYER_TYPE_ID
from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerConfig
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, DIFY_AGENT_OUTPUT_LAYER_ID
from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID, DIFY_AGENT_MODEL_LAYER_ID, DIFY_AGENT_OUTPUT_LAYER_ID
from dify_agent.protocol.schemas import (
RUN_EVENT_ADAPTER,
CreateRunRequest,
@ -63,6 +63,7 @@ def test_pydantic_ai_event_data_uses_agent_stream_event_model() -> None:
def test_create_run_request_rejects_old_compositor_payload_and_model_layer_id_is_public() -> None:
assert DIFY_AGENT_MODEL_LAYER_ID == "llm"
assert DIFY_AGENT_HISTORY_LAYER_ID == "history"
assert DIFY_AGENT_OUTPUT_LAYER_ID == "output"
with pytest.raises(ValidationError):
_ = CreateRunRequest.model_validate(

View File

@ -7,9 +7,10 @@ import pytest
from agenton.compositor import CompositorSessionSnapshot, LayerSessionSnapshot
from agenton.layers import ExitIntent, LifecycleState
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerConfig
from dify_agent.protocol import DIFY_AGENT_OUTPUT_LAYER_ID
from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID, DIFY_AGENT_OUTPUT_LAYER_ID
from dify_agent.protocol.schemas import (
CreateRunRequest,
LayerExitSignals,
@ -191,7 +192,6 @@ def test_create_run_rejects_invalid_output_schema_before_persisting() -> None:
await scheduler.create_run(
_request(
output_config={
"name": "incident_summary",
"json_schema": _recursive_output_schema(),
}
)
@ -212,7 +212,6 @@ def test_create_run_rejects_remote_ref_output_schema_before_persisting() -> None
await scheduler.create_run(
_request(
output_config={
"name": "incident_summary",
"json_schema": {
"type": "object",
"properties": {
@ -238,7 +237,6 @@ def test_create_run_rejects_non_object_output_schema_before_persisting() -> None
await scheduler.create_run(
_request(
output_config={
"name": "incident_actions",
"json_schema": {
"type": "array",
"items": {"type": "string"},
@ -252,6 +250,32 @@ def test_create_run_rejects_non_object_output_schema_before_persisting() -> None
asyncio.run(scenario())
def test_create_run_rejects_public_output_tool_name_override_before_persisting() -> None:
async def scenario() -> None:
store = FakeStore()
async with httpx.AsyncClient() as client:
scheduler = RunScheduler(store=store, plugin_daemon_http_client=client)
with pytest.raises(ValueError, match="Extra inputs are not permitted"):
await scheduler.create_run(
_request(
output_config={
"name": "incident_summary",
"json_schema": {
"type": "object",
"properties": {"title": {"type": "string"}},
"required": ["title"],
"additionalProperties": False,
},
}
)
)
assert store.records == {}
asyncio.run(scenario())
def test_create_run_rejects_non_defs_local_ref_in_direct_object_schema_before_persisting() -> None:
async def scenario() -> None:
store = FakeStore()
@ -262,7 +286,6 @@ def test_create_run_rejects_non_defs_local_ref_in_direct_object_schema_before_pe
await scheduler.create_run(
_request(
output_config={
"name": "incident_summary",
"json_schema": {
"type": "object",
"properties": {
@ -426,6 +449,78 @@ def test_validate_run_request_rejects_misnamed_output_layer_before_provider_chec
asyncio.run(scenario())
def test_validate_run_request_accepts_reserved_history_layer() -> None:
async def scenario() -> None:
request = CreateRunRequest(
composition=RunComposition(
layers=[
RunLayerSpec(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")),
RunLayerSpec(name=DIFY_AGENT_HISTORY_LAYER_ID, type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID),
]
)
)
await validate_run_request(request)
asyncio.run(scenario())
def test_validate_run_request_rejects_misnamed_history_layer_before_provider_checks() -> None:
async def scenario() -> None:
request = CreateRunRequest(
composition=RunComposition(
layers=[
RunLayerSpec(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")),
RunLayerSpec(name="chat-history", type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID),
]
)
)
with pytest.raises(RunRequestValidationError, match="must use reserved layer name 'history'"):
await validate_run_request(request, layer_providers=())
asyncio.run(scenario())
def test_validate_run_request_rejects_multiple_history_layers_before_provider_checks() -> None:
async def scenario() -> None:
request = CreateRunRequest(
composition=RunComposition(
layers=[
RunLayerSpec(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")),
RunLayerSpec(name=DIFY_AGENT_HISTORY_LAYER_ID, type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID),
RunLayerSpec(name="secondary-history", type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID),
]
)
)
with pytest.raises(RunRequestValidationError, match="Only one 'pydantic_ai.history' layer is supported"):
await validate_run_request(request, layer_providers=())
asyncio.run(scenario())
def test_validate_run_request_rejects_history_layer_dependencies_before_provider_checks() -> None:
async def scenario() -> None:
request = CreateRunRequest(
composition=RunComposition(
layers=[
RunLayerSpec(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")),
RunLayerSpec(
name=DIFY_AGENT_HISTORY_LAYER_ID,
type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID,
deps={"prompt": "prompt"},
),
]
)
)
with pytest.raises(RunRequestValidationError, match="does not support dependencies"):
await validate_run_request(request, layer_providers=())
asyncio.run(scenario())
def test_create_run_rejects_unknown_layer_exit_signal_before_persisting() -> None:
async def scenario() -> None:
store = FakeStore()

View File

@ -5,18 +5,19 @@ from typing import Any
import httpx
import pytest
from pydantic_ai.exceptions import UnexpectedModelBehavior
from pydantic_ai.messages import ModelMessage, ModelResponse, ToolCallPart
from pydantic_ai.messages import ModelMessage, ModelRequest, ModelResponse, SystemPromptPart, TextPart, ToolCallPart, UserPromptPart
from pydantic_ai.models import ModelRequestParameters
from pydantic_ai.models.test import TestModel
from pydantic_ai.settings import ModelSettings
from agenton.compositor import CompositorSessionSnapshot, LayerSessionSnapshot
from agenton.layers import ExitIntent, LifecycleState
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID, PydanticAIHistoryRuntimeState
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig
from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer
from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerConfig
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, DIFY_AGENT_OUTPUT_LAYER_ID
from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID, DIFY_AGENT_MODEL_LAYER_ID, DIFY_AGENT_OUTPUT_LAYER_ID
from dify_agent.protocol.schemas import (
CreateRunRequest,
LayerExitSignals,
@ -31,6 +32,7 @@ from dify_agent.runtime.runner import AgentRunRunner, AgentRunValidationError
def _request(
user: str | list[str] = "hello",
*,
include_history: bool = False,
llm_layer_name: str = DIFY_AGENT_MODEL_LAYER_ID,
plugin_layer_name: str = "plugin",
on_exit: LayerExitSignals | None = None,
@ -42,6 +44,11 @@ def _request(
type="plain.prompt",
config=PromptLayerConfig(prefix="system", user=user),
),
*(
[RunLayerSpec(name=DIFY_AGENT_HISTORY_LAYER_ID, type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID)]
if include_history
else []
),
RunLayerSpec(
name=plugin_layer_name,
type="dify.plugin",
@ -122,6 +129,58 @@ class SequenceOutputTestModel(TestModel):
)
class RecordingTestModel(TestModel):
seen_requests: list[list[ModelMessage]]
failure: Exception | None
def __init__(self, *, custom_output_text: str = "done", failure: Exception | None = None) -> None:
super().__init__(call_tools=[], custom_output_text=custom_output_text)
self.seen_requests = []
self.failure = failure
def _request(
self,
messages: list[ModelMessage],
model_settings: ModelSettings | None,
model_request_parameters: ModelRequestParameters,
) -> ModelResponse:
self.seen_requests.append(list(messages))
if self.failure is not None:
raise self.failure
return super()._request(messages, model_settings, model_request_parameters)
def _history_session_snapshot(
messages: list[ModelMessage],
*,
include_output: bool = False,
) -> CompositorSessionSnapshot:
layers = [
LayerSessionSnapshot(name="prompt", lifecycle_state=LifecycleState.SUSPENDED, runtime_state={}),
LayerSessionSnapshot(
name=DIFY_AGENT_HISTORY_LAYER_ID,
lifecycle_state=LifecycleState.SUSPENDED,
runtime_state=PydanticAIHistoryRuntimeState(messages=messages).model_dump(mode="json"),
),
LayerSessionSnapshot(name="plugin", lifecycle_state=LifecycleState.SUSPENDED, runtime_state={}),
LayerSessionSnapshot(name=DIFY_AGENT_MODEL_LAYER_ID, lifecycle_state=LifecycleState.SUSPENDED, runtime_state={}),
]
if include_output:
layers.append(
LayerSessionSnapshot(name=DIFY_AGENT_OUTPUT_LAYER_ID, lifecycle_state=LifecycleState.SUSPENDED, runtime_state={})
)
return CompositorSessionSnapshot(layers=layers)
def _history_messages_from_snapshot(snapshot: CompositorSessionSnapshot) -> list[ModelMessage]:
history_snapshot = next(layer for layer in snapshot.layers if layer.name == DIFY_AGENT_HISTORY_LAYER_ID)
return PydanticAIHistoryRuntimeState.model_validate(history_snapshot.runtime_state).messages
def _flatten_message_parts(messages: list[ModelMessage]) -> list[object]:
return [part for message in messages for part in message.parts]
def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPatch) -> None:
seen_clients: list[httpx.AsyncClient] = []
@ -170,6 +229,172 @@ def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPa
assert sink.statuses["run-1"] == "succeeded"
def test_runner_passes_temporary_system_prompt_prefix_without_history_layer(monkeypatch: pytest.MonkeyPatch) -> None:
model = RecordingTestModel(custom_output_text="done")
def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
assert http_client.is_closed is False
return model # pyright: ignore[reportReturnType]
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
sink = InMemoryRunEventSink()
async def scenario() -> None:
async with httpx.AsyncClient() as client:
await AgentRunRunner(
sink=sink,
request=_request("current user"),
run_id="run-no-history",
plugin_daemon_http_client=client,
).run()
asyncio.run(scenario())
request_parts = _flatten_message_parts(model.seen_requests[0])
assert isinstance(request_parts[0], SystemPromptPart)
assert request_parts[0].content == "system"
assert isinstance(request_parts[1], UserPromptPart)
assert request_parts[1].content == "current user"
terminal = sink.events["run-no-history"][-1]
assert isinstance(terminal, RunSucceededEvent)
assert [layer.name for layer in terminal.data.session_snapshot.layers] == ["prompt", "plugin", DIFY_AGENT_MODEL_LAYER_ID]
def test_runner_prepends_current_system_prompt_to_stored_history_and_appends_only_new_messages(
monkeypatch: pytest.MonkeyPatch,
) -> None:
model = RecordingTestModel(custom_output_text="done")
stored_history = [
ModelRequest(parts=[UserPromptPart(content="old user")]),
ModelResponse(parts=[TextPart(content="old assistant")]),
]
def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
assert http_client.is_closed is False
return model # pyright: ignore[reportReturnType]
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
request = _request("current user", include_history=True)
request.session_snapshot = _history_session_snapshot(stored_history)
sink = InMemoryRunEventSink()
async def scenario() -> None:
async with httpx.AsyncClient() as client:
await AgentRunRunner(
sink=sink,
request=request,
run_id="run-history",
plugin_daemon_http_client=client,
).run()
asyncio.run(scenario())
request_parts = _flatten_message_parts(model.seen_requests[0])
assert isinstance(request_parts[0], SystemPromptPart)
assert request_parts[0].content == "system"
assert isinstance(request_parts[1], UserPromptPart)
assert request_parts[1].content == "old user"
assert isinstance(request_parts[2], TextPart)
assert request_parts[2].content == "old assistant"
assert isinstance(request_parts[3], UserPromptPart)
assert request_parts[3].content == "current user"
terminal = sink.events["run-history"][-1]
assert isinstance(terminal, RunSucceededEvent)
saved_history = _history_messages_from_snapshot(terminal.data.session_snapshot)
assert saved_history[:2] == stored_history
assert isinstance(saved_history[2], ModelRequest)
assert len(saved_history[2].parts) == 1
assert isinstance(saved_history[2].parts[0], UserPromptPart)
assert saved_history[2].parts[0].content == "current user"
assert isinstance(saved_history[3], ModelResponse)
assert len(saved_history[3].parts) == 1
assert isinstance(saved_history[3].parts[0], TextPart)
assert saved_history[3].parts[0].content == "done"
assert all(not any(isinstance(part, SystemPromptPart) for part in message.parts) for message in saved_history)
def test_runner_with_empty_history_layer_still_sends_system_prompt_and_saves_only_new_messages(
monkeypatch: pytest.MonkeyPatch,
) -> None:
model = RecordingTestModel(custom_output_text="done")
def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
assert http_client.is_closed is False
return model # pyright: ignore[reportReturnType]
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
request = _request("current user", include_history=True)
request.session_snapshot = _history_session_snapshot([])
sink = InMemoryRunEventSink()
async def scenario() -> None:
async with httpx.AsyncClient() as client:
await AgentRunRunner(
sink=sink,
request=request,
run_id="run-empty-history",
plugin_daemon_http_client=client,
).run()
asyncio.run(scenario())
request_parts = _flatten_message_parts(model.seen_requests[0])
assert isinstance(request_parts[0], SystemPromptPart)
assert request_parts[0].content == "system"
assert isinstance(request_parts[1], UserPromptPart)
assert request_parts[1].content == "current user"
terminal = sink.events["run-empty-history"][-1]
assert isinstance(terminal, RunSucceededEvent)
saved_history = _history_messages_from_snapshot(terminal.data.session_snapshot)
assert isinstance(saved_history[0], ModelRequest)
assert len(saved_history[0].parts) == 1
assert isinstance(saved_history[0].parts[0], UserPromptPart)
assert saved_history[0].parts[0].content == "current user"
assert isinstance(saved_history[1], ModelResponse)
assert len(saved_history[1].parts) == 1
assert isinstance(saved_history[1].parts[0], TextPart)
assert saved_history[1].parts[0].content == "done"
assert all(not any(isinstance(part, SystemPromptPart) for part in message.parts) for message in saved_history)
def test_runner_failure_with_history_layer_emits_failed_terminal_event_without_success_snapshot(
monkeypatch: pytest.MonkeyPatch,
) -> None:
model = RecordingTestModel(failure=RuntimeError("boom"))
stored_history = [
ModelRequest(parts=[UserPromptPart(content="old user")]),
ModelResponse(parts=[TextPart(content="old assistant")]),
]
def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
assert http_client.is_closed is False
return model # pyright: ignore[reportReturnType]
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
request = _request("current user", include_history=True)
request.session_snapshot = _history_session_snapshot(stored_history)
sink = InMemoryRunEventSink()
async def scenario() -> None:
async with httpx.AsyncClient() as client:
with pytest.raises(RuntimeError, match="boom"):
await AgentRunRunner(
sink=sink,
request=request,
run_id="run-history-failure",
plugin_daemon_http_client=client,
).run()
asyncio.run(scenario())
assert [event.type for event in sink.events["run-history-failure"]] == ["run_started", "run_failed"]
assert sink.statuses["run-history-failure"] == "failed"
assert request.session_snapshot is not None
assert _history_messages_from_snapshot(request.session_snapshot) == stored_history
def test_runner_applies_on_exit_overrides_to_success_snapshot(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
assert http_client.is_closed is False
@ -232,7 +457,6 @@ def test_runner_passes_output_layer_spec_to_agent_and_serializes_structured_resu
"required": ["title", "severity", "actions"],
"additionalProperties": False,
},
name="incident_summary",
description="Structured incident summary returned by the agent.",
strict=True,
)
@ -267,10 +491,10 @@ def test_runner_passes_output_layer_spec_to_agent_and_serializes_structured_resu
assert model.last_model_request_parameters is not None
assert len(model.last_model_request_parameters.output_tools) == 1
output_tool = model.last_model_request_parameters.output_tools[0]
assert output_tool.name == "incident_summary"
assert output_tool.name == "final_output"
assert output_tool.description == "Structured incident summary returned by the agent."
assert output_tool.parameters_json_schema["type"] == "object"
assert output_tool.parameters_json_schema["title"] == "incident_summary"
assert output_tool.parameters_json_schema["title"] == "final_output"
assert output_tool.parameters_json_schema["properties"] == {
"title": {"type": "string"},
"severity": {"type": "string", "enum": ["low", "medium", "high"]},
@ -321,7 +545,6 @@ def test_runner_retries_invalid_structured_output_and_eventually_succeeds(monkey
"required": ["title", "severity", "actions"],
"additionalProperties": False,
},
name="incident_summary",
description="Structured incident summary returned by the agent.",
)
)
@ -374,7 +597,6 @@ def test_runner_fails_when_invalid_structured_output_exhausts_retries(monkeypatc
"required": ["title", "severity", "actions"],
"additionalProperties": False,
},
name="incident_summary",
description="Structured incident summary returned by the agent.",
)
)
@ -411,7 +633,6 @@ def test_runner_rejects_invalid_output_layer_before_model_resolution(monkeypatch
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
request = _request(
output_config={
"name": "incident_summary",
"json_schema": _recursive_output_schema(),
}
)

View File

@ -0,0 +1,151 @@
import asyncio
import pytest
from pydantic_ai.messages import ModelRequest, ModelResponse, SystemPromptPart, TextPart, UserPromptPart
from agenton.compositor import Compositor, LayerNode
from agenton_collections.layers.pydantic_ai import (
PYDANTIC_AI_HISTORY_LAYER_TYPE_ID,
PydanticAIHistoryLayer,
)
from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID
from dify_agent.protocol.schemas import RunComposition, RunLayerSpec
from dify_agent.runtime.compositor_factory import create_default_layer_providers
from dify_agent.runtime.history import (
append_successful_run_history,
build_run_message_history,
get_history_layer,
validate_history_layer_composition,
)
def test_default_layer_providers_include_pydantic_ai_history_layer() -> None:
providers = create_default_layer_providers()
assert PYDANTIC_AI_HISTORY_LAYER_TYPE_ID in {provider.type_id for provider in providers}
def test_validate_history_layer_composition_accepts_absent_or_reserved_history_layer() -> None:
validate_history_layer_composition(RunComposition(layers=[]))
validate_history_layer_composition(
RunComposition(
layers=[
RunLayerSpec(
name=DIFY_AGENT_HISTORY_LAYER_ID,
type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID,
)
]
)
)
def test_validate_history_layer_composition_rejects_multiple_history_layers() -> None:
composition = RunComposition(
layers=[
RunLayerSpec(name=DIFY_AGENT_HISTORY_LAYER_ID, type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID),
RunLayerSpec(name="secondary-history", type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID),
]
)
with pytest.raises(ValueError, match="Only one 'pydantic_ai.history' layer is supported"):
validate_history_layer_composition(composition)
def test_validate_history_layer_composition_rejects_misnamed_history_layer() -> None:
composition = RunComposition(
layers=[
RunLayerSpec(name="chat-history", type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID),
]
)
with pytest.raises(ValueError, match="must use reserved layer name 'history'"):
validate_history_layer_composition(composition)
def test_validate_history_layer_composition_rejects_history_layer_dependencies() -> None:
composition = RunComposition(
layers=[
RunLayerSpec(
name=DIFY_AGENT_HISTORY_LAYER_ID,
type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID,
deps={"prompt": "prompt"},
)
]
)
with pytest.raises(ValueError, match="does not support dependencies"):
validate_history_layer_composition(composition)
def test_get_history_layer_returns_optional_active_history_layer() -> None:
compositor = Compositor([LayerNode(DIFY_AGENT_HISTORY_LAYER_ID, PydanticAIHistoryLayer)])
async def scenario() -> None:
async with compositor.enter() as run:
history_layer = get_history_layer(run)
assert isinstance(history_layer, PydanticAIHistoryLayer)
asyncio.run(scenario())
def test_build_run_message_history_renders_current_system_prompts_before_stored_history() -> None:
stored_history = [
ModelRequest(parts=[UserPromptPart(content="old user")]),
ModelResponse(parts=[TextPart(content="old assistant")]),
]
async def scenario() -> None:
message_history = await build_run_message_history(
system_prompts=[lambda: "current system", lambda: "current suffix"],
stored_history=stored_history,
)
assert message_history is not None
assert isinstance(message_history[0], ModelRequest)
assert [part.content for part in message_history[0].parts] == ["current system", "current suffix"]
assert message_history[1:] == stored_history
asyncio.run(scenario())
def test_build_run_message_history_returns_none_without_system_prompt_or_history() -> None:
async def scenario() -> None:
assert await build_run_message_history(system_prompts=[], stored_history=[]) is None
asyncio.run(scenario())
def test_build_run_message_history_renders_system_prompt_without_history_layer() -> None:
async def scenario() -> None:
message_history = await build_run_message_history(system_prompts=[lambda: "current system"], stored_history=[])
assert message_history is not None
assert len(message_history) == 1
assert isinstance(message_history[0], ModelRequest)
assert isinstance(message_history[0].parts[0], SystemPromptPart)
assert message_history[0].parts[0].content == "current system"
asyncio.run(scenario())
def test_build_run_message_history_rejects_context_dependent_prompt_functions() -> None:
def unsupported_prompt(_ctx: object) -> str:
return "current system"
async def scenario() -> None:
with pytest.raises(ValueError, match="zero-argument system prompts"):
await build_run_message_history(system_prompts=[unsupported_prompt], stored_history=[])
asyncio.run(scenario())
def test_append_successful_run_history_preserves_existing_message_order() -> None:
history_layer = PydanticAIHistoryLayer()
stored_history = [ModelRequest(parts=[UserPromptPart(content="old user")])]
new_messages = [ModelResponse(parts=[TextPart(content="new assistant")])]
history_layer.replace_messages(stored_history)
append_successful_run_history(history_layer, new_messages)
assert history_layer.message_history == [*stored_history, *new_messages]

View File

@ -30,6 +30,13 @@ class FakeRedis:
async def xadd(self, key: str, fields: Mapping[str, object]) -> str:
self.commands.append(("xadd", key, dict(fields)))
return self._append_stream_entry(key, fields)
def pipeline(self, transaction: bool = True, shard_hint: str | None = None) -> "FakeRedisPipeline":
self.commands.append(("pipeline", transaction, shard_hint))
return FakeRedisPipeline(self)
def _append_stream_entry(self, key: str, fields: Mapping[str, object]) -> str:
entries = self.streams.setdefault(key, [])
event_id = f"{len(entries) + 1}-0"
entries.append((event_id, dict(fields)))
@ -64,6 +71,35 @@ class FakeRedis:
return int(timestamp), int(sequence)
class FakeRedisPipeline:
redis: FakeRedis
results: list[object]
def __init__(self, redis: FakeRedis) -> None:
self.redis = redis
self.results = []
async def __aenter__(self) -> "FakeRedisPipeline":
return self
async def __aexit__(self, exc_type: object, exc: object, traceback: object) -> None:
del exc_type, exc, traceback
def xadd(self, key: str, fields: Mapping[str, object]) -> "FakeRedisPipeline":
self.redis.commands.append(("xadd", key, dict(fields)))
self.results.append(self.redis._append_stream_entry(key, fields))
return self
def expire(self, key: str, seconds: int) -> "FakeRedisPipeline":
self.redis.commands.append(("expire", key, seconds))
self.results.append(True)
return self
async def execute(self) -> list[object]:
self.redis.commands.append(("execute",))
return list(self.results)
def test_create_run_writes_running_record_without_job_queue_and_with_retention() -> None:
redis = FakeRedis()
store = RedisRunStore(redis, prefix="test") # pyright: ignore[reportArgumentType]
@ -97,15 +133,21 @@ def test_append_event_serializes_typed_event_without_id_and_expires_run_keys() -
event_id = asyncio.run(store.append_event(RunStartedEvent(id="local", run_id="run-1")))
assert event_id == "1-0"
assert redis.commands[0][0] == "xadd"
fields = redis.commands[0][2]
pipeline_commands = [command for command in redis.commands if command[0] == "pipeline"]
assert len(pipeline_commands) == 1
assert pipeline_commands[0][1] is True
xadd_commands = [command for command in redis.commands if command[0] == "xadd"]
assert len(xadd_commands) == 1
fields = xadd_commands[0][2]
assert isinstance(fields, dict)
assert '"id"' not in str(fields["payload"])
assert '"type":"run_started"' in str(fields["payload"])
assert redis.commands[1:] == [
expire_commands = {command for command in redis.commands if command[0] == "expire"}
assert expire_commands == {
("expire", "test:runs:run-1:events", 60),
("expire", "test:runs:run-1:record", 60),
]
}
assert ("execute",) in redis.commands
def test_get_events_round_trips_run_succeeded_output_and_session_snapshot() -> None:

View File

@ -8,7 +8,7 @@ PROJECT_ROOT = Path(__file__).resolve().parents[2]
CLIENT_SHARED_DTO_DEPENDENCIES = {
"httpx>=0.28.1",
"pydantic>=2.13.3",
"pydantic>=2.12.5,<3",
"pydantic-ai-slim>=1.85.1",
"typing-extensions>=4.12.2",
}

2
dify-agent/uv.lock generated
View File

@ -614,7 +614,7 @@ requires-dist = [
{ name = "graphon", marker = "extra == 'server'", specifier = "~=0.2.2" },
{ name = "httpx", specifier = ">=0.28.1" },
{ name = "jsonschema", marker = "extra == 'server'", specifier = ">=4.23.0" },
{ name = "pydantic", specifier = ">=2.12.5,<2.13" },
{ name = "pydantic", specifier = ">=2.12.5,<3" },
{ name = "pydantic-ai-slim", specifier = ">=1.85.1" },
{ name = "pydantic-ai-slim", extras = ["anthropic", "google", "openai"], marker = "extra == 'server'", specifier = ">=1.85.1" },
{ name = "pydantic-settings", marker = "extra == 'server'", specifier = ">=2.12.0" },