mirror of
https://github.com/langgenius/dify.git
synced 2026-05-22 18:08:40 +08:00
Compare commits
2 Commits
1.14.2
...
feat(agent
| Author | SHA1 | Date | |
|---|---|---|---|
| b519e705be | |||
| b187c5dd6e |
@ -98,35 +98,16 @@ uv run --extra server uvicorn dify_agent.server.app:app \
|
||||
`ServerSettings` reads `.env` from the current `dify-agent` directory, or from
|
||||
`dify-agent/.env` when the command is run from the repository root.
|
||||
|
||||
## Create a one-file uv script client
|
||||
## Create a Python client example
|
||||
|
||||
In another shell, keep working from the `dify-agent` directory and create this
|
||||
script. The script depends on the local `dify-agent` package only; it does not
|
||||
install the server extra because it talks to the already running server through
|
||||
the public Python client.
|
||||
|
||||
```bash
|
||||
DIFY_AGENT_PACKAGE_URL="$(python3 - <<'PY'
|
||||
from pathlib import Path
|
||||
|
||||
print(Path.cwd().resolve().as_uri())
|
||||
PY
|
||||
)"
|
||||
|
||||
cat > ./run_dify_agent_client.py <<PY
|
||||
#!/usr/bin/env -S uv run --script
|
||||
# /// script
|
||||
# requires-python = ">=3.12"
|
||||
# dependencies = [
|
||||
# "dify-agent @ ${DIFY_AGENT_PACKAGE_URL}",
|
||||
# ]
|
||||
# ///
|
||||
In another shell, keep working from the `dify-agent` directory. Create
|
||||
`run_dify_agent_client.py` with the example below, then replace the placeholder
|
||||
tenant id and provider credential values.
|
||||
|
||||
```python {test="skip" lint="skip"}
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from typing import Any
|
||||
|
||||
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig
|
||||
from dify_agent.client import Client
|
||||
@ -139,55 +120,39 @@ from dify_agent.layers.dify_plugin import (
|
||||
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest, RunComposition, RunLayerSpec
|
||||
|
||||
|
||||
def env(name: str, default: str | None = None) -> str:
|
||||
value = os.environ.get(name, default)
|
||||
if value is None or value == "":
|
||||
raise SystemExit(f"Missing required environment variable: {name}")
|
||||
return value
|
||||
API_BASE_URL = "http://127.0.0.1:8000"
|
||||
|
||||
TENANT_ID = "replace-with-tenant-id"
|
||||
PLUGIN_ID = "langgenius/openai"
|
||||
USER_ID: str | None = None
|
||||
|
||||
# Keep these aligned with DIFY_AGENT_PROVIDER and DIFY_AGENT_MODEL_NAME in dify-agent/.env.
|
||||
MODEL_PROVIDER = "replace-with-provider-from-dify-agent-env"
|
||||
MODEL_NAME = "replace-with-model-from-dify-agent-env"
|
||||
MODEL_CREDENTIALS: dict[str, str | int | float | bool | None] = {
|
||||
"api_key": "replace-with-provider-key",
|
||||
}
|
||||
|
||||
SYSTEM_PROMPT = "You are a concise assistant."
|
||||
USER_PROMPT = "用一句话介绍 Dify Agent。"
|
||||
|
||||
|
||||
def load_credentials() -> dict[str, Any]:
|
||||
raw = env("DIFY_AGENT_MODEL_CREDENTIALS_JSON")
|
||||
try:
|
||||
data = json.loads(raw)
|
||||
except json.JSONDecodeError as exc:
|
||||
raise SystemExit(f"DIFY_AGENT_MODEL_CREDENTIALS_JSON must be valid JSON: {exc}") from exc
|
||||
|
||||
if not isinstance(data, dict):
|
||||
raise SystemExit("DIFY_AGENT_MODEL_CREDENTIALS_JSON must be a JSON object")
|
||||
|
||||
return data
|
||||
|
||||
|
||||
async def main() -> int:
|
||||
api_base_url = env("DIFY_AGENT_SERVER_URL", "http://127.0.0.1:8000")
|
||||
|
||||
tenant_id = env("DIFY_AGENT_TENANT_ID")
|
||||
plugin_id = env("DIFY_AGENT_PLUGIN_ID", "langgenius/openai")
|
||||
user_id = os.environ.get("DIFY_AGENT_USER_ID") or None
|
||||
|
||||
model_provider = env("DIFY_AGENT_PROVIDER", "openai")
|
||||
model_name = env("DIFY_AGENT_MODEL_NAME", "gpt-4o-mini")
|
||||
model_credentials = load_credentials()
|
||||
|
||||
system_prompt = env("DIFY_AGENT_SYSTEM_PROMPT", "You are a concise assistant.")
|
||||
user_prompt = env("DIFY_AGENT_PROMPT", "Say hello from the Dify Agent client.")
|
||||
|
||||
request = CreateRunRequest(
|
||||
def build_request() -> CreateRunRequest:
|
||||
return CreateRunRequest(
|
||||
composition=RunComposition(
|
||||
layers=[
|
||||
RunLayerSpec(
|
||||
name="prompt",
|
||||
type=PLAIN_PROMPT_LAYER_TYPE_ID,
|
||||
config=PromptLayerConfig(prefix=system_prompt, user=user_prompt),
|
||||
config=PromptLayerConfig(prefix=SYSTEM_PROMPT, user=USER_PROMPT),
|
||||
),
|
||||
RunLayerSpec(
|
||||
name="plugin",
|
||||
type=DIFY_PLUGIN_LAYER_TYPE_ID,
|
||||
config=DifyPluginLayerConfig(
|
||||
tenant_id=tenant_id,
|
||||
plugin_id=plugin_id,
|
||||
user_id=user_id,
|
||||
tenant_id=TENANT_ID,
|
||||
plugin_id=PLUGIN_ID,
|
||||
user_id=USER_ID,
|
||||
),
|
||||
),
|
||||
RunLayerSpec(
|
||||
@ -195,17 +160,19 @@ async def main() -> int:
|
||||
type=DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
|
||||
deps={"plugin": "plugin"},
|
||||
config=DifyPluginLLMLayerConfig(
|
||||
model_provider=model_provider,
|
||||
model=model_name,
|
||||
credentials=model_credentials,
|
||||
model_provider=MODEL_PROVIDER,
|
||||
model=MODEL_NAME,
|
||||
credentials=MODEL_CREDENTIALS,
|
||||
),
|
||||
),
|
||||
],
|
||||
),
|
||||
)
|
||||
|
||||
async with Client(base_url=api_base_url, stream_timeout=None) as client:
|
||||
run = await client.create_run(request)
|
||||
|
||||
async def main() -> int:
|
||||
async with Client(base_url=API_BASE_URL, stream_timeout=None) as client:
|
||||
run = await client.create_run(build_request())
|
||||
print(f"created run: {run.run_id}, status={run.status}")
|
||||
|
||||
async for event in client.stream_events(run.run_id):
|
||||
@ -225,35 +192,26 @@ async def main() -> int:
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(asyncio.run(main()))
|
||||
PY
|
||||
|
||||
chmod +x ./run_dify_agent_client.py
|
||||
```
|
||||
|
||||
## Configure the client request and run it
|
||||
## Run the client example
|
||||
|
||||
The server-side `.env` controls how Dify Agent reaches the plugin daemon. The
|
||||
client request controls which tenant/plugin/provider/model and provider
|
||||
credentials the run uses. Configure those values before executing the script:
|
||||
client example controls which tenant/plugin/provider/model and provider
|
||||
credentials the run uses.
|
||||
|
||||
Run the example from the `dify-agent` directory:
|
||||
|
||||
```bash
|
||||
export DIFY_AGENT_SERVER_URL=http://127.0.0.1:8000
|
||||
|
||||
export DIFY_AGENT_TENANT_ID=replace-with-tenant-id
|
||||
export DIFY_AGENT_PLUGIN_ID=langgenius/openai
|
||||
export DIFY_AGENT_PROVIDER=openai
|
||||
export DIFY_AGENT_MODEL_NAME=gpt-4o-mini
|
||||
|
||||
export DIFY_AGENT_MODEL_CREDENTIALS_JSON='{"api_key":"replace-with-provider-key"}'
|
||||
|
||||
export DIFY_AGENT_PROMPT='用一句话介绍 Dify Agent。'
|
||||
|
||||
./run_dify_agent_client.py
|
||||
uv run python ./run_dify_agent_client.py
|
||||
```
|
||||
|
||||
The shape of `DIFY_AGENT_MODEL_CREDENTIALS_JSON` depends on the selected plugin
|
||||
provider's credential schema. The `{"api_key":"..."}` value above is only an
|
||||
OpenAI-style example.
|
||||
The shape of `MODEL_CREDENTIALS` depends on the selected plugin provider's
|
||||
credential schema. The `{"api_key":"..."}` value above is only an OpenAI-style
|
||||
example.
|
||||
|
||||
Set `MODEL_PROVIDER` and `MODEL_NAME` to the same values as
|
||||
`DIFY_AGENT_PROVIDER` and `DIFY_AGENT_MODEL_NAME` in `dify-agent/.env`.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
@ -263,6 +221,7 @@ If the run fails, check these items first:
|
||||
2. The Dify Agent server is listening on `127.0.0.1:8000`.
|
||||
3. `DIFY_AGENT_PLUGIN_DAEMON_URL` points to the correct plugin daemon.
|
||||
4. `DIFY_AGENT_PLUGIN_DAEMON_API_KEY` matches the plugin daemon server key.
|
||||
5. `DIFY_AGENT_PLUGIN_ID`, `DIFY_AGENT_PROVIDER`, and
|
||||
`DIFY_AGENT_MODEL_NAME` match a provider available through the plugin daemon.
|
||||
6. `DIFY_AGENT_MODEL_CREDENTIALS_JSON` matches that provider's credential schema.
|
||||
5. `PLUGIN_ID`, `MODEL_PROVIDER`, and `MODEL_NAME` in the client example match
|
||||
the corresponding values configured in `dify-agent/.env` and a provider
|
||||
available through the plugin daemon.
|
||||
6. `MODEL_CREDENTIALS` matches that provider's credential schema.
|
||||
|
||||
103
dify-agent/docs/dify-agent/user-manual/history-layer/index.md
Normal file
103
dify-agent/docs/dify-agent/user-manual/history-layer/index.md
Normal file
@ -0,0 +1,103 @@
|
||||
# History layer
|
||||
|
||||
The history layer stores pydantic-ai conversation history in the Agenton session
|
||||
snapshot. Add it when a later run should resume the previous conversation.
|
||||
|
||||
The history layer is state-only: it contributes no prompt text, user prompt, or
|
||||
tools, and it owns no live resources.
|
||||
|
||||
## Layer contract
|
||||
|
||||
| Property | Value |
|
||||
| --- | --- |
|
||||
| Reserved layer name | `history` |
|
||||
| Type id | `pydantic_ai.history` |
|
||||
| Config | none |
|
||||
| Dependencies | none |
|
||||
|
||||
Use at most one history layer. It must be named `history` and must not declare
|
||||
dependencies.
|
||||
|
||||
## Basic usage
|
||||
|
||||
```python {test="skip" lint="skip"}
|
||||
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID
|
||||
from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID, RunLayerSpec
|
||||
|
||||
|
||||
history_layer = RunLayerSpec(
|
||||
name=DIFY_AGENT_HISTORY_LAYER_ID,
|
||||
type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID,
|
||||
)
|
||||
```
|
||||
|
||||
Include this layer in the same composition as your prompt, plugin, and LLM
|
||||
layers.
|
||||
|
||||
## Resume a conversation
|
||||
|
||||
Successful runs return a terminal event with both final output and a resumable
|
||||
session snapshot:
|
||||
|
||||
```python {test="skip" lint="skip"}
|
||||
accepted = await client.create_run(request)
|
||||
|
||||
async for event in client.stream_events(accepted.run_id):
|
||||
if event.type == "run_succeeded":
|
||||
output = event.data.output
|
||||
snapshot = event.data.session_snapshot
|
||||
break
|
||||
```
|
||||
|
||||
Pass `snapshot` to the next request and keep the same layer names and order:
|
||||
|
||||
```python {test="skip" lint="skip"}
|
||||
next_request = CreateRunRequest(
|
||||
composition=composition_with_the_same_layer_names_and_order,
|
||||
session_snapshot=snapshot,
|
||||
)
|
||||
```
|
||||
|
||||
`CreateRunRequest.on_exit` defaults to suspending layers, which makes the
|
||||
terminal snapshot resumable. Keep that default for normal memory flows.
|
||||
|
||||
## What gets stored
|
||||
|
||||
Dify Agent handles memory conservatively:
|
||||
|
||||
1. Current system prompts are rendered into temporary `message_history` before
|
||||
stored history.
|
||||
2. Stored history is then sent to the model.
|
||||
3. Current user prompts are sent after the stored history.
|
||||
4. Only newly produced pydantic-ai messages are appended after a successful run.
|
||||
5. Current system prompts are not persisted into the history layer.
|
||||
6. Failed runs emit `run_failed` and do not return a success snapshot to resume.
|
||||
|
||||
## Persist snapshots outside the client process
|
||||
|
||||
Session snapshots are Pydantic models and can be saved as JSON:
|
||||
|
||||
```python {test="skip" lint="skip"}
|
||||
from pathlib import Path
|
||||
|
||||
from agenton.compositor import CompositorSessionSnapshot
|
||||
|
||||
|
||||
snapshot_path = Path("session_snapshot.json")
|
||||
snapshot_path.write_text(snapshot.model_dump_json(), encoding="utf-8")
|
||||
|
||||
restored_snapshot = CompositorSessionSnapshot.model_validate_json(
|
||||
snapshot_path.read_text(encoding="utf-8")
|
||||
)
|
||||
```
|
||||
|
||||
Always restore snapshots with the same layer names and order that produced them.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
| Symptom | What to check |
|
||||
| --- | --- |
|
||||
| `must use reserved layer name 'history'` | Rename the layer to `history`. |
|
||||
| `does not support dependencies` | Remove `deps` from the history layer. |
|
||||
| Resume fails with snapshot lifecycle errors | Use the success snapshot from `run_succeeded` and keep layer names/order unchanged. |
|
||||
| System prompts appear missing from saved memory | This is expected; current system prompts are temporary and are not persisted. |
|
||||
59
dify-agent/docs/dify-agent/user-manual/plugin-layer/index.md
Normal file
59
dify-agent/docs/dify-agent/user-manual/plugin-layer/index.md
Normal file
@ -0,0 +1,59 @@
|
||||
# Plugin layer
|
||||
|
||||
The plugin layer carries Dify plugin daemon identity for a run. It identifies the
|
||||
tenant, plugin, and optional user context; server settings provide the plugin
|
||||
daemon URL and API key.
|
||||
|
||||
Use it together with a [plugin LLM layer](../plugin-llm-layer/index.md). The LLM
|
||||
layer depends on this layer to reach the plugin daemon.
|
||||
|
||||
## Config fields
|
||||
|
||||
| Field | Type | Meaning |
|
||||
| --- | --- | --- |
|
||||
| `tenant_id` | `str` | Dify tenant/workspace id used when calling the plugin daemon. |
|
||||
| `plugin_id` | `str` | Plugin id, for example `langgenius/openai`. |
|
||||
| `user_id` | `str \| None` | Optional end-user id passed through to the plugin daemon. |
|
||||
|
||||
The plugin layer type id is `dify.plugin`.
|
||||
|
||||
## Basic usage
|
||||
|
||||
```python {test="skip" lint="skip"}
|
||||
from dify_agent.layers.dify_plugin import DIFY_PLUGIN_LAYER_TYPE_ID, DifyPluginLayerConfig
|
||||
from dify_agent.protocol import RunLayerSpec
|
||||
|
||||
|
||||
plugin_layer = RunLayerSpec(
|
||||
name="plugin",
|
||||
type=DIFY_PLUGIN_LAYER_TYPE_ID,
|
||||
config=DifyPluginLayerConfig(
|
||||
tenant_id="replace-with-tenant-id",
|
||||
plugin_id="langgenius/openai",
|
||||
user_id="replace-with-user-id",
|
||||
),
|
||||
)
|
||||
```
|
||||
|
||||
If you do not need a user id, omit `user_id` or pass `None`.
|
||||
|
||||
## Server-side settings
|
||||
|
||||
The plugin layer config does not include daemon transport settings. Configure
|
||||
these on the Dify Agent server instead:
|
||||
|
||||
```env
|
||||
DIFY_AGENT_PLUGIN_DAEMON_URL=http://localhost:5002
|
||||
DIFY_AGENT_PLUGIN_DAEMON_API_KEY=replace-with-plugin-daemon-server-key
|
||||
```
|
||||
|
||||
This keeps server credentials out of client-submitted layer config and out of
|
||||
session snapshots.
|
||||
|
||||
## Notes
|
||||
|
||||
- The plugin layer does not open, cache, close, or snapshot HTTP clients.
|
||||
- `plugin_id` selects the plugin package. The business model provider and model
|
||||
name belong to the plugin LLM layer, not this layer.
|
||||
- The conventional layer name is `plugin`. If you use another name, point the LLM
|
||||
layer dependency at that name.
|
||||
102
dify-agent/docs/dify-agent/user-manual/plugin-llm-layer/index.md
Normal file
102
dify-agent/docs/dify-agent/user-manual/plugin-llm-layer/index.md
Normal file
@ -0,0 +1,102 @@
|
||||
# Plugin LLM layer
|
||||
|
||||
The plugin LLM layer selects the model provider, model name, provider credentials,
|
||||
and optional model settings for the current run. Dify Agent reads the model from
|
||||
the reserved layer name `llm`.
|
||||
|
||||
It must depend on a [plugin layer](../plugin-layer/index.md), because the plugin
|
||||
layer supplies the daemon identity and transport context.
|
||||
|
||||
## Config fields
|
||||
|
||||
| Field | Type | Meaning |
|
||||
| --- | --- | --- |
|
||||
| `model_provider` | `str` | Provider name inside the selected plugin. Use the value of `DIFY_AGENT_PROVIDER` from `dify-agent/.env`. |
|
||||
| `model` | `str` | Model name. Use the value of `DIFY_AGENT_MODEL_NAME` from `dify-agent/.env`. |
|
||||
| `credentials` | `dict[str, str \| int \| float \| bool \| None]` | Provider-specific credential object. |
|
||||
| `model_settings` | `ModelSettings \| None` | Optional pydantic-ai model settings. |
|
||||
|
||||
The plugin LLM layer type id is `dify.plugin.llm`.
|
||||
|
||||
## Basic usage
|
||||
|
||||
```python {test="skip" lint="skip"}
|
||||
from dify_agent.layers.dify_plugin import DIFY_PLUGIN_LLM_LAYER_TYPE_ID, DifyPluginLLMLayerConfig
|
||||
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, RunLayerSpec
|
||||
|
||||
|
||||
MODEL_PROVIDER = "replace-with-provider-from-dify-agent-env"
|
||||
MODEL_NAME = "replace-with-model-from-dify-agent-env"
|
||||
|
||||
llm_layer = RunLayerSpec(
|
||||
name=DIFY_AGENT_MODEL_LAYER_ID,
|
||||
type=DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
|
||||
deps={"plugin": "plugin"},
|
||||
config=DifyPluginLLMLayerConfig(
|
||||
model_provider=MODEL_PROVIDER,
|
||||
model=MODEL_NAME,
|
||||
credentials={"api_key": "replace-with-provider-key"},
|
||||
),
|
||||
)
|
||||
```
|
||||
|
||||
`deps={"plugin": "plugin"}` means: bind the LLM layer's dependency field named
|
||||
`plugin` to the composition layer named `plugin`.
|
||||
|
||||
Set `MODEL_PROVIDER` and `MODEL_NAME` to the same values as
|
||||
`DIFY_AGENT_PROVIDER` and `DIFY_AGENT_MODEL_NAME` in `dify-agent/.env`.
|
||||
|
||||
## Complete minimal model composition
|
||||
|
||||
Most runs include a prompt, plugin context, and LLM layer:
|
||||
|
||||
```python {test="skip" lint="skip"}
|
||||
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig
|
||||
from dify_agent.layers.dify_plugin import (
|
||||
DIFY_PLUGIN_LAYER_TYPE_ID,
|
||||
DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
|
||||
DifyPluginLLMLayerConfig,
|
||||
DifyPluginLayerConfig,
|
||||
)
|
||||
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, RunComposition, RunLayerSpec
|
||||
|
||||
|
||||
MODEL_PROVIDER = "replace-with-provider-from-dify-agent-env"
|
||||
MODEL_NAME = "replace-with-model-from-dify-agent-env"
|
||||
|
||||
composition = RunComposition(
|
||||
layers=[
|
||||
RunLayerSpec(
|
||||
name="prompt",
|
||||
type=PLAIN_PROMPT_LAYER_TYPE_ID,
|
||||
config=PromptLayerConfig(prefix="You are concise.", user="Say hello."),
|
||||
),
|
||||
RunLayerSpec(
|
||||
name="plugin",
|
||||
type=DIFY_PLUGIN_LAYER_TYPE_ID,
|
||||
config=DifyPluginLayerConfig(
|
||||
tenant_id="replace-with-tenant-id",
|
||||
plugin_id="langgenius/openai",
|
||||
),
|
||||
),
|
||||
RunLayerSpec(
|
||||
name=DIFY_AGENT_MODEL_LAYER_ID,
|
||||
type=DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
|
||||
deps={"plugin": "plugin"},
|
||||
config=DifyPluginLLMLayerConfig(
|
||||
model_provider=MODEL_PROVIDER,
|
||||
model=MODEL_NAME,
|
||||
credentials={"api_key": "replace-with-provider-key"},
|
||||
),
|
||||
),
|
||||
]
|
||||
)
|
||||
```
|
||||
|
||||
## Notes
|
||||
|
||||
- The model layer must use the reserved name `llm` (`DIFY_AGENT_MODEL_LAYER_ID`).
|
||||
- Credential shape depends on the selected plugin provider; the OpenAI-style
|
||||
`api_key` field above is only an example.
|
||||
- Client-submitted model credentials remain in the scheduled request memory and
|
||||
are not part of run records or session snapshots.
|
||||
72
dify-agent/docs/dify-agent/user-manual/prompt-layer/index.md
Normal file
72
dify-agent/docs/dify-agent/user-manual/prompt-layer/index.md
Normal file
@ -0,0 +1,72 @@
|
||||
# Prompt layer
|
||||
|
||||
The prompt layer provides the current run's system and user prompt fragments. In
|
||||
Dify Agent request bodies it is a regular `RunLayerSpec` with type id
|
||||
`plain.prompt`.
|
||||
|
||||
Use it for:
|
||||
|
||||
- system instructions that should be sent on this run
|
||||
- the current user input
|
||||
- optional suffix system instructions
|
||||
|
||||
## Config fields
|
||||
|
||||
| Field | Type | Meaning |
|
||||
| --- | --- | --- |
|
||||
| `prefix` | `str` or `list[str]` | System prompt fragments collected before other prompt content. |
|
||||
| `user` | `str` or `list[str]` | Current user-message fragments for the run. |
|
||||
| `suffix` | `str` or `list[str]` | System prompt fragments collected after prefix content. |
|
||||
|
||||
All fields default to an empty list. Dify Agent rejects a create-run request when
|
||||
the effective user prompt is empty or whitespace-only.
|
||||
|
||||
## Basic usage
|
||||
|
||||
```python {test="skip" lint="skip"}
|
||||
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig
|
||||
from dify_agent.protocol import RunLayerSpec
|
||||
|
||||
|
||||
prompt_layer = RunLayerSpec(
|
||||
name="prompt",
|
||||
type=PLAIN_PROMPT_LAYER_TYPE_ID,
|
||||
config=PromptLayerConfig(
|
||||
prefix="You are a concise assistant.",
|
||||
user="Summarize the incident in one paragraph.",
|
||||
),
|
||||
)
|
||||
```
|
||||
|
||||
## Multiple prompt fragments
|
||||
|
||||
Use lists when the caller wants to keep fragments separate while still sending one
|
||||
run:
|
||||
|
||||
```python {test="skip" lint="skip"}
|
||||
prompt_layer = RunLayerSpec(
|
||||
name="prompt",
|
||||
type=PLAIN_PROMPT_LAYER_TYPE_ID,
|
||||
config=PromptLayerConfig(
|
||||
prefix=[
|
||||
"You are an incident response assistant.",
|
||||
"Prefer concrete mitigation steps.",
|
||||
],
|
||||
user=[
|
||||
"Database latency is elevated.",
|
||||
"Return the likely severity and next actions.",
|
||||
],
|
||||
suffix="Do not invent metrics that are not provided.",
|
||||
),
|
||||
)
|
||||
```
|
||||
|
||||
## Notes
|
||||
|
||||
- The run API does not accept a top-level `user_prompt`; submit user input through
|
||||
a prompt layer.
|
||||
- Prompt layer names are not reserved by the runtime, but `prompt` is the
|
||||
recommended conventional name.
|
||||
- When a [history layer](../history-layer/index.md) is present, current system
|
||||
prompts are sent as a temporary prefix before stored history and are not saved
|
||||
into memory.
|
||||
@ -0,0 +1,101 @@
|
||||
# Structured output layer
|
||||
|
||||
The structured output layer makes the final answer follow a caller-provided JSON
|
||||
Schema. Add it when the client needs a JSON object instead of plain text.
|
||||
|
||||
When present, Dify Agent exposes the schema to the model as a structured-output
|
||||
tool and validates the model response against the same schema.
|
||||
|
||||
## Layer contract
|
||||
|
||||
| Property | Value |
|
||||
| --- | --- |
|
||||
| Reserved layer name | `output` |
|
||||
| Type id | `dify.output` |
|
||||
| Config | `DifyOutputLayerConfig` |
|
||||
| Dependencies | none |
|
||||
|
||||
Use at most one structured output layer. It must be named `output`.
|
||||
|
||||
## Config fields
|
||||
|
||||
| Field | Type | Meaning |
|
||||
| --- | --- | --- |
|
||||
| `json_schema` | `dict[str, JsonValue]` | Top-level object JSON Schema for the final answer. |
|
||||
| `description` | `str \| None` | Optional model-facing tool description. |
|
||||
| `strict` | `bool \| None` | Optional strictness flag passed to the output tool. |
|
||||
|
||||
The structured-output tool name is fixed to `final_output`.
|
||||
|
||||
## Basic usage
|
||||
|
||||
```python {test="skip" lint="skip"}
|
||||
from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerConfig
|
||||
from dify_agent.protocol import DIFY_AGENT_OUTPUT_LAYER_ID, RunLayerSpec
|
||||
|
||||
|
||||
output_layer = RunLayerSpec(
|
||||
name=DIFY_AGENT_OUTPUT_LAYER_ID,
|
||||
type=DIFY_OUTPUT_LAYER_TYPE_ID,
|
||||
config=DifyOutputLayerConfig(
|
||||
description="Structured incident summary returned by the agent.",
|
||||
strict=True,
|
||||
json_schema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"title": {"type": "string"},
|
||||
"severity": {"type": "string", "enum": ["low", "medium", "high"]},
|
||||
"actions": {"type": "array", "items": {"type": "string"}},
|
||||
},
|
||||
"required": ["title", "severity", "actions"],
|
||||
"additionalProperties": False,
|
||||
},
|
||||
),
|
||||
)
|
||||
```
|
||||
|
||||
On success, the terminal event contains the validated JSON-safe object:
|
||||
|
||||
```python {test="skip" lint="skip"}
|
||||
async for event in client.stream_events(run_id):
|
||||
if event.type == "run_succeeded":
|
||||
structured_output = event.data.output
|
||||
```
|
||||
|
||||
If the `output` layer is omitted, Dify Agent keeps the default plain text output
|
||||
contract.
|
||||
|
||||
## Schema limits
|
||||
|
||||
The first structured-output version supports a practical subset of JSON Schema:
|
||||
|
||||
- the top-level schema must be an object (`"type": "object"`)
|
||||
- the model-facing structured-output tool name is always `final_output`
|
||||
- remote `$ref` values are not supported
|
||||
- local refs are supported only under `#/$defs/...`
|
||||
- recursive `$defs` refs are not supported
|
||||
- `$ref` values inside ordinary literal keywords such as `const`, `enum`,
|
||||
`example`, and `examples` are treated as data, not schema refs
|
||||
|
||||
## Validation and retry behavior
|
||||
|
||||
The runtime builds a pydantic-ai output contract from the layer config. The same
|
||||
contract exposes the model-facing schema and validates the returned object.
|
||||
|
||||
If the model returns an invalid object, pydantic-ai's normal output-validation
|
||||
retry behavior applies. If retries are exhausted, the run ends with `run_failed`.
|
||||
|
||||
## Resuming runs with structured output
|
||||
|
||||
Session snapshots store layer runtime state, not output-layer config. If you
|
||||
resume a run that uses structured output, include the same `output` layer again so
|
||||
the runtime can rebuild the output contract.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
| Symptom | What to check |
|
||||
| --- | --- |
|
||||
| `must use reserved layer name 'output'` | Rename the layer to `output`. |
|
||||
| Structured output falls back to text | Confirm the `output` layer is present and has type `dify.output`. |
|
||||
| Run fails before model resolution | Validate the JSON Schema and `$ref` usage. |
|
||||
| Resume loses structured output | Resubmit the same output layer; snapshots do not store the schema. |
|
||||
@ -15,7 +15,13 @@ nav:
|
||||
- Examples: agenton/examples/index.md
|
||||
- Dify Agent:
|
||||
- Overview: dify-agent/index.md
|
||||
- Get Started: dify-agent/get-started/index.md
|
||||
- User Manual:
|
||||
- Get Started: dify-agent/get-started/index.md
|
||||
- Prompt Layer: dify-agent/user-manual/prompt-layer/index.md
|
||||
- Plugin Layer: dify-agent/user-manual/plugin-layer/index.md
|
||||
- Plugin LLM Layer: dify-agent/user-manual/plugin-llm-layer/index.md
|
||||
- History Layer: dify-agent/user-manual/history-layer/index.md
|
||||
- Structured Output Layer: dify-agent/user-manual/structured-output-layer/index.md
|
||||
- Operations Guide: dify-agent/guide/index.md
|
||||
- Run API: dify-agent/api/index.md
|
||||
- Examples: dify-agent/examples/index.md
|
||||
|
||||
@ -6,7 +6,7 @@ readme = "README.md"
|
||||
requires-python = ">=3.12"
|
||||
dependencies = [
|
||||
"httpx>=0.28.1",
|
||||
"pydantic>=2.12.5,<2.13",
|
||||
"pydantic>=2.12.5,<3",
|
||||
"pydantic-ai-slim>=1.85.1",
|
||||
"typing-extensions>=4.12.2",
|
||||
]
|
||||
|
||||
@ -4,8 +4,16 @@ from agenton_collections.layers.pydantic_ai.bridge import (
|
||||
PydanticAIBridgeLayer,
|
||||
PydanticAIBridgeLayerDeps,
|
||||
)
|
||||
from agenton_collections.layers.pydantic_ai.history import (
|
||||
PYDANTIC_AI_HISTORY_LAYER_TYPE_ID,
|
||||
PydanticAIHistoryLayer,
|
||||
PydanticAIHistoryRuntimeState,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"PydanticAIBridgeLayer",
|
||||
"PydanticAIBridgeLayerDeps",
|
||||
"PYDANTIC_AI_HISTORY_LAYER_TYPE_ID",
|
||||
"PydanticAIHistoryLayer",
|
||||
"PydanticAIHistoryRuntimeState",
|
||||
]
|
||||
|
||||
@ -0,0 +1,68 @@
|
||||
"""Serializable pydantic-ai conversation history layer.
|
||||
|
||||
This layer keeps pydantic-ai ``ModelMessage`` history inside Agenton's
|
||||
serializable ``runtime_state`` so compositor session snapshots can persist and
|
||||
restore typed messages without any separate storage protocol. The layer is
|
||||
intentionally state-only: it contributes no system prompts, user prompts, or
|
||||
tools, and it owns no live resources. Integrations should read
|
||||
``message_history`` before ``Agent.run(message_history=...)`` and then write
|
||||
back only the history shape they intend to persist after success, for example
|
||||
replacing with ``result.all_messages()`` or appending only
|
||||
``result.new_messages()`` when temporary prompt prefixes must stay ephemeral.
|
||||
"""
|
||||
|
||||
from collections.abc import Sequence
|
||||
from typing import ClassVar, Final
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
from pydantic_ai.messages import ModelMessage
|
||||
|
||||
from agenton.layers import EmptyLayerConfig, NoLayerDeps, PydanticAILayer
|
||||
|
||||
|
||||
PYDANTIC_AI_HISTORY_LAYER_TYPE_ID: Final[str] = "pydantic_ai.history"
|
||||
|
||||
|
||||
class PydanticAIHistoryRuntimeState(BaseModel):
|
||||
"""Serializable history state stored in Agenton session snapshots."""
|
||||
|
||||
messages: list[ModelMessage] = Field(default_factory=list)
|
||||
|
||||
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid", validate_assignment=True)
|
||||
|
||||
|
||||
class PydanticAIHistoryLayer(
|
||||
PydanticAILayer[NoLayerDeps, object, EmptyLayerConfig, PydanticAIHistoryRuntimeState]
|
||||
):
|
||||
"""State-only layer that stores pydantic-ai message history.
|
||||
|
||||
The mutable history lives only in ``runtime_state.messages``. Helper methods
|
||||
always assign fresh lists instead of mutating the stored list in place so
|
||||
Pydantic assignment validation continues to guard the serialized state.
|
||||
"""
|
||||
|
||||
type_id: ClassVar[str | None] = PYDANTIC_AI_HISTORY_LAYER_TYPE_ID
|
||||
|
||||
@property
|
||||
def message_history(self) -> list[ModelMessage]:
|
||||
"""Return a shallow copy of the stored message history."""
|
||||
return list(self.runtime_state.messages)
|
||||
|
||||
def replace_messages(self, messages: Sequence[ModelMessage]) -> None:
|
||||
"""Replace the stored history with a validated copy of ``messages``."""
|
||||
self.runtime_state.messages = list(messages)
|
||||
|
||||
def append_messages(self, messages: Sequence[ModelMessage]) -> None:
|
||||
"""Append ``messages`` while keeping assignment validation on write."""
|
||||
self.runtime_state.messages = [*self.runtime_state.messages, *messages]
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Remove all stored history messages."""
|
||||
self.runtime_state.messages = []
|
||||
|
||||
|
||||
__all__ = [
|
||||
"PYDANTIC_AI_HISTORY_LAYER_TYPE_ID",
|
||||
"PydanticAIHistoryLayer",
|
||||
"PydanticAIHistoryRuntimeState",
|
||||
]
|
||||
@ -8,7 +8,6 @@ imports do not pull in server execution code.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import ClassVar, Final
|
||||
|
||||
from pydantic import ConfigDict, JsonValue, field_validator
|
||||
@ -17,7 +16,6 @@ from agenton.layers import LayerConfig
|
||||
|
||||
|
||||
DIFY_OUTPUT_LAYER_TYPE_ID: Final[str] = "dify.output"
|
||||
_OUTPUT_TOOL_NAME_PATTERN: Final[re.Pattern[str]] = re.compile(r"^[A-Za-z0-9_-]{1,64}$")
|
||||
|
||||
|
||||
class DifyOutputLayerConfig(LayerConfig):
|
||||
@ -30,15 +28,13 @@ class DifyOutputLayerConfig(LayerConfig):
|
||||
schemas plus local ``#/$defs/...`` references so the same caller-provided
|
||||
schema can drive both runtime validation and model-facing tool exposure; the
|
||||
exposure copy may inline supported ``$defs`` refs as needed for the
|
||||
Pydantic/Pydantic AI integration. ``name`` becomes the structured-output
|
||||
tool name exposed to pydantic-ai, defaults to ``final_result``, and must be
|
||||
1-64 ASCII letters, numbers, underscores, or hyphens so downstream model
|
||||
providers accept it consistently. ``description`` and ``strict`` are passed
|
||||
through to the generated structured-output tool definition.
|
||||
Pydantic/Pydantic AI integration. The structured-output tool name and schema
|
||||
title exposed to pydantic-ai are fixed to ``final_output`` so callers only
|
||||
control the JSON Schema itself plus any optional description/strictness
|
||||
metadata.
|
||||
"""
|
||||
|
||||
json_schema: dict[str, JsonValue]
|
||||
name: str = "final_result"
|
||||
description: str | None = None
|
||||
strict: bool | None = None
|
||||
|
||||
@ -51,12 +47,4 @@ class DifyOutputLayerConfig(LayerConfig):
|
||||
raise ValueError("Schema must declare an object output.")
|
||||
return value
|
||||
|
||||
@field_validator("name")
|
||||
@classmethod
|
||||
def _ensure_safe_tool_name(cls, value: str) -> str:
|
||||
if not _OUTPUT_TOOL_NAME_PATTERN.fullmatch(value):
|
||||
raise ValueError("name must be 1-64 characters of letters, numbers, underscores, or hyphens.")
|
||||
return value
|
||||
|
||||
|
||||
__all__ = ["DIFY_OUTPUT_LAYER_TYPE_ID", "DifyOutputLayerConfig"]
|
||||
|
||||
@ -34,6 +34,8 @@ from agenton.layers import EmptyRuntimeState, NoLayerDeps, PlainLayer
|
||||
from dify_agent.layers.output.configs import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerConfig
|
||||
|
||||
|
||||
_FINAL_OUTPUT_TOOL_NAME: Final[str] = "final_output"
|
||||
_VALIDATED_OUTPUT_TYPE_NAME: Final[str] = f"DifyValidatedOutput_{_FINAL_OUTPUT_TOOL_NAME}"
|
||||
_LOCAL_DEFS_REF_PREFIX: Final[str] = "#/$defs/"
|
||||
_NON_SCHEMA_VALUE_KEYWORDS: Final[frozenset[str]] = frozenset({"const", "default", "enum", "example", "examples"})
|
||||
|
||||
@ -71,7 +73,9 @@ class DifyOutputLayer(PlainLayer[NoLayerDeps, DifyOutputLayerConfig, EmptyRuntim
|
||||
runtime validation inside the same dynamically generated dict-like type.
|
||||
First-version support is intentionally limited to top-level object JSON
|
||||
Schemas so the same schema can be validated with ``jsonschema`` and then
|
||||
exposed to Pydantic AI without any wrapper/unwrapper translation.
|
||||
exposed to Pydantic AI without any wrapper/unwrapper translation. The
|
||||
public tool name and exposed schema title are always ``final_output`` so
|
||||
providers see one stable structured-output contract shape.
|
||||
|
||||
Raises:
|
||||
ValueError: If the JSON Schema is invalid, contains non-local
|
||||
@ -82,7 +86,6 @@ class DifyOutputLayer(PlainLayer[NoLayerDeps, DifyOutputLayerConfig, EmptyRuntim
|
||||
_reject_non_local_refs(user_schema)
|
||||
validated_output_type = _build_validated_output_type(
|
||||
user_schema,
|
||||
name=self.config.name,
|
||||
description=self.config.description,
|
||||
)
|
||||
|
||||
@ -91,7 +94,7 @@ class DifyOutputLayer(PlainLayer[NoLayerDeps, DifyOutputLayerConfig, EmptyRuntim
|
||||
OutputSpec[object],
|
||||
ToolOutput(
|
||||
validated_output_type,
|
||||
name=self.config.name,
|
||||
name=_FINAL_OUTPUT_TOOL_NAME,
|
||||
strict=self.config.strict,
|
||||
),
|
||||
),
|
||||
@ -111,18 +114,16 @@ def _build_json_schema_validator(schema: dict[str, JsonValue]) -> JsonSchemaVali
|
||||
def _build_validated_output_type(
|
||||
schema: dict[str, JsonValue],
|
||||
*,
|
||||
name: str,
|
||||
description: str | None,
|
||||
) -> type[dict[str, object]]:
|
||||
"""Create a dict-like output type with custom JSON schema and validation hooks.
|
||||
|
||||
The generated type is unique per output layer config. Its Pydantic core
|
||||
The generated type object is fresh per output layer config. Its Pydantic core
|
||||
schema performs real ``jsonschema`` validation, while its JSON schema hook
|
||||
exposes a model-facing schema that Pydantic AI can turn into an output tool.
|
||||
"""
|
||||
validator = _build_json_schema_validator(schema)
|
||||
exposed_schema = _build_exposed_json_schema(schema, name=name, description=description)
|
||||
type_name = _build_output_type_name(name)
|
||||
exposed_schema = _build_exposed_json_schema(schema, description=description)
|
||||
|
||||
def _validate_output(value: dict[str, object]) -> object:
|
||||
errors = sorted(validator.iter_errors(cast(JsonValue, value)), key=lambda error: _sort_error_path(error.path))
|
||||
@ -165,14 +166,13 @@ def _build_validated_output_type(
|
||||
"__get_pydantic_core_schema__": __get_pydantic_core_schema__,
|
||||
"__get_pydantic_json_schema__": __get_pydantic_json_schema__,
|
||||
}
|
||||
validated_output_type = cast(type[dict[str, object]], type(type_name, (dict,), namespace))
|
||||
validated_output_type = cast(type[dict[str, object]], type(_VALIDATED_OUTPUT_TYPE_NAME, (dict,), namespace))
|
||||
return validated_output_type
|
||||
|
||||
|
||||
def _build_exposed_json_schema(
|
||||
schema: dict[str, JsonValue],
|
||||
*,
|
||||
name: str,
|
||||
description: str | None,
|
||||
) -> dict[str, JsonValue]:
|
||||
"""Return the schema exposed to the model through Pydantic AI.
|
||||
@ -183,18 +183,10 @@ def _build_exposed_json_schema(
|
||||
attached.
|
||||
"""
|
||||
exposed_schema = _inline_local_defs_refs(schema)
|
||||
exposed_schema["title"] = name
|
||||
exposed_schema["title"] = _FINAL_OUTPUT_TOOL_NAME
|
||||
if description is not None:
|
||||
exposed_schema["description"] = description
|
||||
return exposed_schema
|
||||
|
||||
|
||||
def _build_output_type_name(name: str) -> str:
|
||||
"""Return a deterministic debug-friendly class name for one output schema."""
|
||||
sanitized = "".join(character if character.isalnum() else "_" for character in name).strip("_") or "final_result"
|
||||
return f"DifyValidatedOutput_{sanitized}"
|
||||
|
||||
|
||||
def _reject_non_local_refs(schema: JsonValue) -> None:
|
||||
"""Reject references that would require external fetching or non-local state.
|
||||
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
"""Public protocol exports shared by the Dify Agent server and clients."""
|
||||
|
||||
from .schemas import (
|
||||
DIFY_AGENT_HISTORY_LAYER_ID,
|
||||
DIFY_AGENT_MODEL_LAYER_ID,
|
||||
DIFY_AGENT_OUTPUT_LAYER_ID,
|
||||
RUN_EVENT_ADAPTER,
|
||||
@ -30,6 +31,7 @@ __all__ = [
|
||||
"BaseRunEvent",
|
||||
"CreateRunRequest",
|
||||
"CreateRunResponse",
|
||||
"DIFY_AGENT_HISTORY_LAYER_ID",
|
||||
"DIFY_AGENT_MODEL_LAYER_ID",
|
||||
"DIFY_AGENT_OUTPUT_LAYER_ID",
|
||||
"EmptyRunEventData",
|
||||
|
||||
@ -17,7 +17,8 @@ public ``id``/``run_id``/``type``/``data``/``created_at`` shape, while each
|
||||
``type`` has a typed ``data`` model so OpenAPI, Redis replay, and clients parse
|
||||
the same payload contract. Model/provider selection is part of the submitted
|
||||
composition, not a top-level run field; the runtime reads the model layer named
|
||||
by ``DIFY_AGENT_MODEL_LAYER_ID`` and the optional structured output layer named
|
||||
by ``DIFY_AGENT_MODEL_LAYER_ID``, the optional history layer named by
|
||||
``DIFY_AGENT_HISTORY_LAYER_ID``, and the optional structured output layer named
|
||||
by ``DIFY_AGENT_OUTPUT_LAYER_ID``. Request-level ``on_exit`` signals decide
|
||||
whether each active layer is suspended or deleted when the run exits, with
|
||||
suspend as the default so successful terminal events can include resumable
|
||||
@ -42,6 +43,7 @@ from agenton.layers import ExitIntent
|
||||
|
||||
|
||||
DIFY_AGENT_MODEL_LAYER_ID: Final[str] = "llm"
|
||||
DIFY_AGENT_HISTORY_LAYER_ID: Final[str] = "history"
|
||||
DIFY_AGENT_OUTPUT_LAYER_ID: Final[str] = "output"
|
||||
RunStatus = Literal["running", "succeeded", "failed"]
|
||||
RunEventType = Literal[
|
||||
@ -104,8 +106,10 @@ class CreateRunRequest(BaseModel):
|
||||
"""Request body for creating one async agent run.
|
||||
|
||||
Model/provider configuration must be supplied through the composition layer
|
||||
named by ``DIFY_AGENT_MODEL_LAYER_ID``. Structured output may be supplied
|
||||
through the optional composition layer named by
|
||||
named by ``DIFY_AGENT_MODEL_LAYER_ID``. Optional persisted conversation
|
||||
history may be supplied through the composition layer named by
|
||||
``DIFY_AGENT_HISTORY_LAYER_ID``. Structured output may be supplied through
|
||||
the optional composition layer named by
|
||||
``DIFY_AGENT_OUTPUT_LAYER_ID``. ``on_exit`` defaults every active layer to
|
||||
suspend so callers receive a resumable success snapshot unless they
|
||||
explicitly request delete for one or more layers. Session snapshots do not
|
||||
@ -254,6 +258,7 @@ __all__ = [
|
||||
"BaseRunEvent",
|
||||
"CreateRunRequest",
|
||||
"CreateRunResponse",
|
||||
"DIFY_AGENT_HISTORY_LAYER_ID",
|
||||
"DIFY_AGENT_MODEL_LAYER_ID",
|
||||
"DIFY_AGENT_OUTPUT_LAYER_ID",
|
||||
"EmptyRunEventData",
|
||||
|
||||
@ -2,10 +2,10 @@
|
||||
|
||||
The run request carries model/provider selection in the layer graph. This helper
|
||||
keeps Agent construction details out of ``AgentRunRunner`` while accepting an
|
||||
already resolved Pydantic AI model from the configured model layer. Prompt and
|
||||
tool values arriving here are already transformed by Agenton's
|
||||
``PYDANTIC_AI_TRANSFORMERS`` preset; this module registers those pydantic-ai
|
||||
objects without reimplementing plain/pydantic-ai conversion logic. The caller
|
||||
already resolved Pydantic AI model from the configured model layer. Tool values
|
||||
arriving here are already transformed by Agenton's
|
||||
``PYDANTIC_AI_TRANSFORMERS`` preset, while Dify system prompts are rendered into
|
||||
temporary ``message_history`` before the call reaches this helper. The caller
|
||||
also passes the already resolved ``output_type`` so legacy text output and the
|
||||
optional JSON Schema output layer share the same ``Agent`` construction path.
|
||||
"""
|
||||
@ -18,13 +18,12 @@ from pydantic_ai.messages import UserContent
|
||||
from pydantic_ai.models import Model
|
||||
from pydantic_ai.output import OutputSpec
|
||||
|
||||
from agenton.layers.types import PydanticAIPrompt, PydanticAITool
|
||||
from agenton.layers.types import PydanticAITool
|
||||
|
||||
|
||||
def create_agent(
|
||||
model: Model[Any],
|
||||
*,
|
||||
system_prompts: Sequence[PydanticAIPrompt[object]],
|
||||
tools: Sequence[PydanticAITool[object]],
|
||||
output_type: OutputSpec[object] = str,
|
||||
) -> Agent[None, object]:
|
||||
@ -36,10 +35,7 @@ def create_agent(
|
||||
carries the Pydantic hooks needed for schema exposure and runtime validation,
|
||||
so agent construction does not need to register a separate validator.
|
||||
"""
|
||||
agent = cast(Agent[None, object], Agent(model, output_type=output_type, tools=tools))
|
||||
for prompt in system_prompts:
|
||||
_ = agent.system_prompt(cast(Any, prompt))
|
||||
return agent
|
||||
return cast(Agent[None, object], Agent(model, output_type=output_type, tools=tools))
|
||||
|
||||
|
||||
def normalize_user_input(user_prompts: Sequence[UserContent]) -> str | Sequence[UserContent]:
|
||||
|
||||
@ -1,12 +1,13 @@
|
||||
"""Safe Agenton compositor construction for API-submitted configs.
|
||||
|
||||
Only explicitly allowed provider type ids are constructible here. The default
|
||||
provider set contains prompt layers, the state-free Dify structured output
|
||||
layer, plus Dify plugin LLM layers. Public DTOs provide tenant/plugin/model
|
||||
data, while server-only plugin daemon settings are injected through the provider
|
||||
factory for ``DifyPluginLayer``. The resulting ``Compositor`` remains Agenton
|
||||
state-only: live resources such as the plugin daemon HTTP client are supplied
|
||||
later by the runtime and never enter providers, layers, or session snapshots.
|
||||
provider set contains prompt layers, the optional pydantic-ai history layer, the
|
||||
state-free Dify structured output layer, plus Dify plugin LLM layers. Public
|
||||
DTOs provide tenant/plugin/model data, while server-only plugin daemon settings
|
||||
are injected through the provider factory for ``DifyPluginLayer``. The resulting
|
||||
``Compositor`` remains Agenton state-only: live resources such as the plugin
|
||||
daemon HTTP client are supplied later by the runtime and never enter providers,
|
||||
layers, or session snapshots.
|
||||
"""
|
||||
|
||||
from collections.abc import Mapping, Sequence
|
||||
@ -16,6 +17,7 @@ from pydantic_ai.messages import UserContent
|
||||
|
||||
from agenton.compositor import Compositor, CompositorConfig, LayerProvider, LayerProviderInput
|
||||
from agenton.layers.types import AllPromptTypes, AllToolTypes, AllUserPromptTypes, PydanticAIPrompt, PydanticAITool
|
||||
from agenton_collections.layers.pydantic_ai import PydanticAIHistoryLayer
|
||||
from agenton_collections.layers.plain.basic import PromptLayer
|
||||
from agenton_collections.transformers.pydantic_ai import PYDANTIC_AI_TRANSFORMERS
|
||||
from dify_agent.layers.dify_plugin.configs import DifyPluginLayerConfig
|
||||
@ -35,6 +37,7 @@ def create_default_layer_providers(
|
||||
"""Return the server provider set of safe config-constructible layers."""
|
||||
return (
|
||||
LayerProvider.from_layer_type(PromptLayer),
|
||||
LayerProvider.from_layer_type(PydanticAIHistoryLayer),
|
||||
LayerProvider.from_layer_type(DifyOutputLayer),
|
||||
LayerProvider.from_factory(
|
||||
layer_type=DifyPluginLayer,
|
||||
|
||||
133
dify-agent/src/dify_agent/runtime/history.py
Normal file
133
dify-agent/src/dify_agent/runtime/history.py
Normal file
@ -0,0 +1,133 @@
|
||||
"""Helpers for optional Dify Agent history-layer integration.
|
||||
|
||||
Dify Agent keeps pydantic-ai conversation history as an optional Agenton layer
|
||||
named ``history``. The runner always injects the current Dify system prompt via
|
||||
temporary ``message_history`` instead of ``Agent.system_prompt(...)`` so the
|
||||
model sees ``current system prompt -> stored history -> current user prompt``
|
||||
even when persisted history is present. Only zero-argument system prompt
|
||||
callables are supported here because the prompts are rendered outside
|
||||
pydantic-ai's normal run context; this matches Dify's current plain-prompt
|
||||
compositions and fails fast for unsupported context-dependent prompt shapes.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import inspect
|
||||
from collections.abc import Awaitable, Callable, Sequence
|
||||
from typing import Protocol, cast
|
||||
|
||||
from pydantic_ai.messages import ModelMessage, ModelRequest, SystemPromptPart
|
||||
|
||||
from agenton.layers.types import PydanticAIPrompt
|
||||
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID, PydanticAIHistoryLayer
|
||||
from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID
|
||||
from dify_agent.protocol.schemas import RunComposition
|
||||
|
||||
|
||||
class SupportsHistoryLayerLookup(Protocol):
|
||||
"""Minimal entered-run surface needed by the history helper."""
|
||||
|
||||
def get_layer(self, name: str, layer_type: type[PydanticAIHistoryLayer]) -> PydanticAIHistoryLayer:
|
||||
"""Return a typed layer instance or raise lookup/type errors."""
|
||||
...
|
||||
|
||||
|
||||
def validate_history_layer_composition(composition: RunComposition) -> None:
|
||||
"""Reject unsupported public history-layer graph shapes."""
|
||||
history_layers = [layer for layer in composition.layers if layer.type == PYDANTIC_AI_HISTORY_LAYER_TYPE_ID]
|
||||
if not history_layers:
|
||||
return
|
||||
|
||||
if len(history_layers) > 1:
|
||||
names = ", ".join(layer.name for layer in history_layers)
|
||||
raise ValueError(
|
||||
f"Only one '{PYDANTIC_AI_HISTORY_LAYER_TYPE_ID}' layer is supported, named "
|
||||
f"'{DIFY_AGENT_HISTORY_LAYER_ID}'. Found layers: {names}."
|
||||
)
|
||||
|
||||
history_layer = history_layers[0]
|
||||
if history_layer.name != DIFY_AGENT_HISTORY_LAYER_ID:
|
||||
raise ValueError(
|
||||
f"Layer type '{PYDANTIC_AI_HISTORY_LAYER_TYPE_ID}' must use reserved layer name "
|
||||
f"'{DIFY_AGENT_HISTORY_LAYER_ID}', got '{history_layer.name}'."
|
||||
)
|
||||
|
||||
if history_layer.deps:
|
||||
dependency_names = ", ".join(sorted(history_layer.deps))
|
||||
raise ValueError(
|
||||
f"Layer type '{PYDANTIC_AI_HISTORY_LAYER_TYPE_ID}' does not support dependencies; "
|
||||
f"got dependency keys: {dependency_names}."
|
||||
)
|
||||
|
||||
|
||||
def get_history_layer(run: SupportsHistoryLayerLookup) -> PydanticAIHistoryLayer | None:
|
||||
"""Return the active history layer when the reserved slot is present."""
|
||||
try:
|
||||
return run.get_layer(DIFY_AGENT_HISTORY_LAYER_ID, PydanticAIHistoryLayer)
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
|
||||
async def build_run_message_history(
|
||||
*,
|
||||
system_prompts: Sequence[PydanticAIPrompt[object]],
|
||||
stored_history: Sequence[ModelMessage],
|
||||
) -> list[ModelMessage] | None:
|
||||
"""Build temporary pydantic-ai history for one Dify Agent loop.
|
||||
|
||||
Current system prompts are rendered first into one transient
|
||||
``ModelRequest`` prefix, followed by any already stored history messages.
|
||||
When both inputs are empty, the helper returns ``None`` so callers can omit
|
||||
the ``message_history`` argument entirely and preserve pydantic-ai's empty
|
||||
history behavior.
|
||||
"""
|
||||
rendered_system_parts: list[SystemPromptPart] = []
|
||||
for prompt in system_prompts:
|
||||
prompt_text = await _render_system_prompt(prompt)
|
||||
if prompt_text is None:
|
||||
continue
|
||||
rendered_system_parts.append(SystemPromptPart(content=prompt_text))
|
||||
|
||||
message_history: list[ModelMessage] = []
|
||||
if rendered_system_parts:
|
||||
message_history.append(ModelRequest(parts=rendered_system_parts))
|
||||
message_history.extend(stored_history)
|
||||
return message_history or None
|
||||
|
||||
|
||||
def append_successful_run_history(
|
||||
history_layer: PydanticAIHistoryLayer | None,
|
||||
new_messages: Sequence[ModelMessage],
|
||||
) -> None:
|
||||
"""Append only newly produced pydantic-ai messages after successful runs."""
|
||||
if history_layer is None or not new_messages:
|
||||
return
|
||||
history_layer.append_messages(new_messages)
|
||||
|
||||
|
||||
async def _render_system_prompt(prompt: PydanticAIPrompt[object]) -> str | None:
|
||||
signature = inspect.signature(prompt)
|
||||
if signature.parameters:
|
||||
raise ValueError(
|
||||
"Dify Agent runtime currently supports only zero-argument system prompts when rendering temporary "
|
||||
"message history."
|
||||
)
|
||||
|
||||
prompt_without_context = cast(Callable[[], str | None | Awaitable[str | None]], prompt)
|
||||
prompt_value = prompt_without_context()
|
||||
if inspect.isawaitable(prompt_value):
|
||||
prompt_value = await prompt_value
|
||||
if prompt_value is None:
|
||||
return None
|
||||
if not isinstance(prompt_value, str):
|
||||
raise TypeError(f"System prompt callables must return str | None, got '{type(prompt_value).__name__}'.")
|
||||
return prompt_value
|
||||
|
||||
|
||||
__all__ = [
|
||||
"SupportsHistoryLayerLookup",
|
||||
"append_successful_run_history",
|
||||
"build_run_message_history",
|
||||
"get_history_layer",
|
||||
"validate_history_layer_composition",
|
||||
]
|
||||
@ -6,10 +6,11 @@ task registry. Redis remains the durable source for status and event streams, bu
|
||||
there is no Redis job queue or cross-process handoff. If the process crashes,
|
||||
currently active runs are lost until an external operator marks or retries them.
|
||||
Create-run validation enters a lightweight Agenton run before persistence so the
|
||||
same transformed user prompts, optional structured output contract, and
|
||||
top-level ``on_exit`` policy used by execution are checked without relying on
|
||||
removed session/control APIs; Dify's default layers keep lifecycle hooks
|
||||
side-effect free so this validation does not open plugin daemon clients.
|
||||
same transformed user prompts, temporary system-prompt history assembly,
|
||||
optional structured output contract, and top-level ``on_exit`` policy used by
|
||||
execution are checked without relying on removed session/control APIs; Dify's
|
||||
default layers keep lifecycle hooks side-effect free so this validation does not
|
||||
open plugin daemon clients.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
@ -24,6 +25,7 @@ from dify_agent.protocol.schemas import CreateRunRequest, normalize_composition
|
||||
from dify_agent.runtime.agenton_validation import is_agenton_enter_validation_runtime_error
|
||||
from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_providers
|
||||
from dify_agent.runtime.event_sink import RunEventSink, emit_run_failed
|
||||
from dify_agent.runtime.history import build_run_message_history, get_history_layer, validate_history_layer_composition
|
||||
from dify_agent.runtime.layer_exit_signals import apply_layer_exit_signals, validate_layer_exit_signals
|
||||
from dify_agent.runtime.output_type import resolve_run_output_contract, validate_output_layer_composition
|
||||
from dify_agent.runtime.runner import AgentRunRunner
|
||||
@ -169,18 +171,20 @@ async def validate_run_request(
|
||||
) -> None:
|
||||
"""Validate create-run semantics that require an entered Agenton run.
|
||||
|
||||
This boundary rejects unsupported output-layer graph shapes, unknown
|
||||
This boundary rejects unsupported output/history-layer graph shapes, unknown
|
||||
``on_exit`` layer ids, effectively empty transformed user prompts, and known
|
||||
enter-time snapshot lifecycle errors before the scheduler persists a run
|
||||
record. It also exercises provider config validation, structured output
|
||||
contract construction, and snapshot hydration without touching external
|
||||
services because Dify plugin daemon clients are owned by the FastAPI
|
||||
lifespan, not Agenton lifecycle hooks.
|
||||
record. It also exercises provider config validation, temporary
|
||||
system-prompt history assembly, structured output contract construction, and
|
||||
snapshot hydration without touching external services because Dify plugin
|
||||
daemon clients are owned by the FastAPI lifespan, not Agenton lifecycle
|
||||
hooks.
|
||||
"""
|
||||
resolved_layer_providers = layer_providers if layer_providers is not None else create_default_layer_providers()
|
||||
entered_run = False
|
||||
try:
|
||||
validate_output_layer_composition(request.composition)
|
||||
validate_history_layer_composition(request.composition)
|
||||
graph_config, layer_configs = normalize_composition(request.composition)
|
||||
compositor = build_pydantic_ai_compositor(
|
||||
graph_config,
|
||||
@ -190,6 +194,11 @@ async def validate_run_request(
|
||||
async with compositor.enter(configs=layer_configs, session_snapshot=request.session_snapshot) as run:
|
||||
entered_run = True
|
||||
apply_layer_exit_signals(run, request.on_exit)
|
||||
history_layer = get_history_layer(run)
|
||||
_ = await build_run_message_history(
|
||||
system_prompts=run.prompts,
|
||||
stored_history=history_layer.message_history if history_layer is not None else (),
|
||||
)
|
||||
if not has_non_blank_user_prompt(run.user_prompts):
|
||||
raise RunRequestValidationError(EMPTY_USER_PROMPTS_ERROR)
|
||||
_ = resolve_run_output_contract(run)
|
||||
|
||||
@ -2,19 +2,22 @@
|
||||
|
||||
The runner is storage-agnostic: it normalizes the public Dify composition into
|
||||
Agenton's graph/config split, enters a fresh ``CompositorRun`` (or resumes one
|
||||
from a snapshot), runs pydantic-ai with ``run.user_prompts`` as the user input,
|
||||
emits stream events, applies request-level ``on_exit`` signals, and then
|
||||
publishes a terminal success or failure event. The Pydantic AI model is resolved
|
||||
from the active Agenton layer named by ``DIFY_AGENT_MODEL_LAYER_ID``. An
|
||||
optional structured output layer named by ``DIFY_AGENT_OUTPUT_LAYER_ID`` is read
|
||||
after entry and resolved into an output contract whose type both exposes the
|
||||
output schema to the model and performs runtime JSON Schema validation through
|
||||
custom Pydantic hooks. Invalid structured outputs therefore trigger Pydantic
|
||||
AI's normal output-validation retry behavior before Dify Agent emits
|
||||
``run_succeeded``. Layers still never own the FastAPI lifespan-owned plugin
|
||||
daemon HTTP client. Successful terminal events contain both the JSON-safe final
|
||||
output and session snapshot; there are no separate output or snapshot events to
|
||||
correlate.
|
||||
from a snapshot), renders the current Dify system prompts into temporary
|
||||
``message_history``, runs pydantic-ai with ``run.user_prompts`` as the current
|
||||
user input, emits stream events, applies request-level ``on_exit`` signals, and
|
||||
then publishes a terminal success or failure event. The Pydantic AI model is
|
||||
resolved from the active Agenton layer named by ``DIFY_AGENT_MODEL_LAYER_ID``.
|
||||
An optional history layer contributes stored message history only through
|
||||
session state; successful runs append only ``result.new_messages()`` back into
|
||||
that layer so current system prompts are not persisted. An optional structured
|
||||
output layer named by ``DIFY_AGENT_OUTPUT_LAYER_ID`` is read after entry and
|
||||
resolved into an output contract whose type both exposes the output schema to
|
||||
the model and performs runtime JSON Schema validation through custom Pydantic
|
||||
hooks. Invalid structured outputs therefore trigger Pydantic AI's normal
|
||||
output-validation retry behavior before Dify Agent emits ``run_succeeded``.
|
||||
Layers still never own the FastAPI lifespan-owned plugin daemon HTTP client.
|
||||
Successful terminal events contain both the JSON-safe final output and session
|
||||
snapshot; there are no separate output or snapshot events to correlate.
|
||||
"""
|
||||
|
||||
from collections.abc import AsyncIterable
|
||||
@ -37,6 +40,12 @@ from dify_agent.runtime.event_sink import (
|
||||
emit_run_started,
|
||||
emit_run_succeeded,
|
||||
)
|
||||
from dify_agent.runtime.history import (
|
||||
append_successful_run_history,
|
||||
build_run_message_history,
|
||||
get_history_layer,
|
||||
validate_history_layer_composition,
|
||||
)
|
||||
from dify_agent.runtime.layer_exit_signals import apply_layer_exit_signals, validate_layer_exit_signals
|
||||
from dify_agent.runtime.output_type import resolve_run_output_contract, validate_output_layer_composition
|
||||
from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt
|
||||
@ -100,17 +109,18 @@ class AgentRunRunner:
|
||||
|
||||
Known input-shaped Agenton enter-time runtime errors, such as trying to
|
||||
resume a ``CLOSED`` snapshot layer, are normalized to
|
||||
``AgentRunValidationError``. Output-layer graph invariants are validated
|
||||
from the public composition before entering Agenton so misnamed or extra
|
||||
``dify.output`` layers never silently degrade to text output. Later
|
||||
runtime failures still propagate as execution errors so they become
|
||||
terminal failed runs rather than client validation responses. Structured
|
||||
output uses a resolved contract whose type itself encodes both the
|
||||
model-facing schema and the runtime validation hooks, so invalid model
|
||||
outputs can be corrected before Dify Agent emits success.
|
||||
``AgentRunValidationError``. Output/history-layer graph invariants are
|
||||
validated from the public composition before entering Agenton so
|
||||
misnamed or extra reserved layers never silently degrade. Later runtime
|
||||
failures still propagate as execution errors so they become terminal
|
||||
failed runs rather than client validation responses. Structured output
|
||||
uses a resolved contract whose type itself encodes both the model-facing
|
||||
schema and the runtime validation hooks, so invalid model outputs can be
|
||||
corrected before Dify Agent emits success.
|
||||
"""
|
||||
try:
|
||||
validate_output_layer_composition(self.request.composition)
|
||||
validate_history_layer_composition(self.request.composition)
|
||||
graph_config, layer_configs = normalize_composition(self.request.composition)
|
||||
compositor = build_pydantic_ai_compositor(graph_config, providers=self.layer_providers)
|
||||
validate_layer_exit_signals(compositor, self.request.on_exit)
|
||||
@ -132,6 +142,11 @@ class AgentRunRunner:
|
||||
|
||||
try:
|
||||
output_contract = resolve_run_output_contract(run)
|
||||
history_layer = get_history_layer(run)
|
||||
message_history = await build_run_message_history(
|
||||
system_prompts=run.prompts,
|
||||
stored_history=history_layer.message_history if history_layer is not None else (),
|
||||
)
|
||||
llm_layer = run.get_layer(DIFY_AGENT_MODEL_LAYER_ID, DifyPluginLLMLayer)
|
||||
model = llm_layer.get_model(http_client=self.plugin_daemon_http_client)
|
||||
except (KeyError, TypeError, RuntimeError, ValueError) as exc:
|
||||
@ -139,12 +154,16 @@ class AgentRunRunner:
|
||||
|
||||
agent = create_agent(
|
||||
model,
|
||||
system_prompts=run.prompts,
|
||||
tools=run.tools,
|
||||
output_type=output_contract.output_type,
|
||||
)
|
||||
result = await agent.run(normalize_user_input(user_prompts), event_stream_handler=handle_events)
|
||||
result = await agent.run(
|
||||
normalize_user_input(user_prompts),
|
||||
message_history=message_history,
|
||||
event_stream_handler=handle_events,
|
||||
)
|
||||
output = _serialize_agent_output(result.output)
|
||||
append_successful_run_history(history_layer, result.new_messages())
|
||||
except RuntimeError as exc:
|
||||
if not entered_run and is_agenton_enter_validation_runtime_error(exc):
|
||||
raise AgentRunValidationError(str(exc)) from exc
|
||||
|
||||
@ -29,8 +29,11 @@ class RedisRunStore(RunEventSink):
|
||||
"""Async Redis implementation for run records and event logs.
|
||||
|
||||
``run_retention_seconds`` is applied to both the run record key and the
|
||||
per-run Redis stream. Event writes also refresh the record TTL so long-running
|
||||
runs that keep producing events do not lose their status record mid-run.
|
||||
per-run Redis stream. Event writes run ``XADD`` and both TTL refreshes in one
|
||||
Redis transaction so a newly created stream is not left without expiration if
|
||||
the client is interrupted between commands. Event writes also refresh the
|
||||
record TTL so long-running runs that keep producing events do not lose their
|
||||
status record mid-run.
|
||||
"""
|
||||
|
||||
redis: Redis
|
||||
@ -81,15 +84,18 @@ class RedisRunStore(RunEventSink):
|
||||
)
|
||||
|
||||
async def append_event(self, event: RunEvent) -> str:
|
||||
"""Append an event JSON payload to the run's Redis stream."""
|
||||
"""Append an event JSON payload to the run's Redis stream with TTLs."""
|
||||
events_key = run_events_key(self.prefix, event.run_id)
|
||||
payload = RUN_EVENT_ADAPTER.dump_json(event, exclude={"id"}).decode()
|
||||
event_id = await self.redis.xadd(
|
||||
events_key,
|
||||
{"payload": payload},
|
||||
)
|
||||
await self.redis.expire(events_key, self.run_retention_seconds)
|
||||
await self.redis.expire(run_record_key(self.prefix, event.run_id), self.run_retention_seconds)
|
||||
async with self.redis.pipeline(transaction=True) as pipeline:
|
||||
_ = pipeline.xadd(
|
||||
events_key,
|
||||
{"payload": payload},
|
||||
)
|
||||
_ = pipeline.expire(events_key, self.run_retention_seconds)
|
||||
_ = pipeline.expire(run_record_key(self.prefix, event.run_id), self.run_retention_seconds)
|
||||
results = cast(list[object], await pipeline.execute())
|
||||
event_id = results[0]
|
||||
return event_id.decode() if isinstance(event_id, bytes) else str(event_id)
|
||||
|
||||
async def get_events(self, run_id: str, *, after: str = "0-0", limit: int = 100) -> RunEventsResponse:
|
||||
|
||||
@ -0,0 +1,96 @@
|
||||
import asyncio
|
||||
|
||||
from pydantic_ai.messages import ModelMessage, ModelRequest, ModelResponse, TextPart, UserPromptPart
|
||||
|
||||
from agenton.compositor import Compositor, LayerNode
|
||||
from agenton.layers import LifecycleState
|
||||
from agenton_collections.layers.pydantic_ai import PydanticAIHistoryLayer, PydanticAIHistoryRuntimeState
|
||||
|
||||
|
||||
def test_pydantic_ai_history_layer_starts_empty_and_contributes_no_prompts_or_tools() -> None:
|
||||
layer = PydanticAIHistoryLayer()
|
||||
|
||||
assert layer.message_history == []
|
||||
assert list(layer.prefix_prompts) == []
|
||||
assert list(layer.suffix_prompts) == []
|
||||
assert list(layer.user_prompts) == []
|
||||
assert list(layer.tools) == []
|
||||
|
||||
|
||||
def test_pydantic_ai_history_layer_replace_messages_saves_validated_copy() -> None:
|
||||
layer = PydanticAIHistoryLayer()
|
||||
messages = _sample_messages()
|
||||
|
||||
layer.replace_messages(messages)
|
||||
borrowed_messages = layer.message_history
|
||||
|
||||
assert borrowed_messages == messages
|
||||
assert borrowed_messages is not messages
|
||||
|
||||
messages.append(ModelResponse(parts=[TextPart(content="later")]))
|
||||
assert layer.message_history != messages
|
||||
|
||||
|
||||
def test_pydantic_ai_history_layer_append_messages_preserves_order_and_internal_state() -> None:
|
||||
layer = PydanticAIHistoryLayer()
|
||||
request, response = _sample_messages()
|
||||
|
||||
layer.replace_messages([request])
|
||||
layer.append_messages((response,))
|
||||
|
||||
borrowed_messages = layer.message_history
|
||||
borrowed_messages.clear()
|
||||
|
||||
assert layer.message_history == [request, response]
|
||||
|
||||
|
||||
def test_pydantic_ai_history_layer_clear_removes_stored_messages() -> None:
|
||||
layer = PydanticAIHistoryLayer()
|
||||
|
||||
layer.replace_messages(_sample_messages())
|
||||
layer.clear()
|
||||
|
||||
assert layer.message_history == []
|
||||
assert layer.runtime_state.messages == []
|
||||
|
||||
|
||||
def test_pydantic_ai_history_runtime_state_round_trips_through_json_dump() -> None:
|
||||
messages = _sample_messages()
|
||||
runtime_state = PydanticAIHistoryRuntimeState(messages=messages)
|
||||
|
||||
dumped_state = runtime_state.model_dump(mode="json")
|
||||
restored_state = PydanticAIHistoryRuntimeState.model_validate(dumped_state)
|
||||
|
||||
assert restored_state.messages == messages
|
||||
assert isinstance(restored_state.messages[0], ModelRequest)
|
||||
assert isinstance(restored_state.messages[1], ModelResponse)
|
||||
|
||||
|
||||
def test_pydantic_ai_history_layer_messages_round_trip_through_session_snapshot() -> None:
|
||||
compositor = Compositor([LayerNode("history", PydanticAIHistoryLayer)])
|
||||
messages = _sample_messages()
|
||||
|
||||
async def scenario() -> None:
|
||||
async with compositor.enter() as first_run:
|
||||
history_layer = first_run.get_layer("history", PydanticAIHistoryLayer)
|
||||
history_layer.replace_messages(messages)
|
||||
first_run.suspend_on_exit()
|
||||
|
||||
assert first_run.session_snapshot is not None
|
||||
assert first_run.session_snapshot.layers[0].lifecycle_state is LifecycleState.SUSPENDED
|
||||
|
||||
async with compositor.enter(session_snapshot=first_run.session_snapshot) as resumed_run:
|
||||
history_layer = resumed_run.get_layer("history", PydanticAIHistoryLayer)
|
||||
|
||||
assert history_layer.message_history == messages
|
||||
assert isinstance(history_layer.runtime_state.messages[0], ModelRequest)
|
||||
assert isinstance(history_layer.runtime_state.messages[1], ModelResponse)
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
|
||||
def _sample_messages() -> list[ModelMessage]:
|
||||
return [
|
||||
ModelRequest(parts=[UserPromptPart(content="Hello")]),
|
||||
ModelResponse(parts=[TextPart(content="Hi there")]),
|
||||
]
|
||||
@ -121,11 +121,16 @@ def test_output_package_exports_client_safe_config_symbols_only() -> None:
|
||||
assert not hasattr(output_exports, "DifyOutputLayer")
|
||||
|
||||
|
||||
def test_output_layer_config_accepts_valid_object_schema_and_defaults_name() -> None:
|
||||
def test_output_layer_config_accepts_valid_object_schema_without_public_tool_name() -> None:
|
||||
config = DifyOutputLayerConfig(json_schema=_json_schema())
|
||||
|
||||
assert DIFY_OUTPUT_LAYER_TYPE_ID == "dify.output"
|
||||
assert config.name == "final_result"
|
||||
assert hasattr(config, "name") is False
|
||||
assert config.model_dump(mode="json") == {
|
||||
"json_schema": _json_schema(),
|
||||
"description": None,
|
||||
"strict": None,
|
||||
}
|
||||
assert config.description is None
|
||||
assert config.strict is None
|
||||
|
||||
@ -138,7 +143,7 @@ def test_output_layer_config_rejects_non_object_top_level_json_schema() -> None:
|
||||
@pytest.mark.parametrize(
|
||||
("payload", "message"),
|
||||
[
|
||||
({"json_schema": _json_schema(), "name": "bad name"}, "letters, numbers, underscores, or hyphens"),
|
||||
({"json_schema": _json_schema(), "name": "bad name"}, "Extra inputs are not permitted"),
|
||||
({"json_schema": _json_schema(), "unknown": True}, "Extra inputs are not permitted"),
|
||||
],
|
||||
)
|
||||
@ -150,7 +155,6 @@ def test_output_layer_config_rejects_invalid_input(payload: dict[str, object], m
|
||||
def test_output_layer_builds_validated_output_contract_for_object_schema() -> None:
|
||||
config = DifyOutputLayerConfig(
|
||||
json_schema=_json_schema(),
|
||||
name="incident_summary",
|
||||
description="Structured incident summary.",
|
||||
strict=True,
|
||||
)
|
||||
@ -163,11 +167,11 @@ def test_output_layer_builds_validated_output_contract_for_object_schema() -> No
|
||||
output_adapter = TypeAdapter(_validated_output_type(output_contract.output_type))
|
||||
|
||||
assert isinstance(output_type, ToolOutput)
|
||||
assert output_type.name == "incident_summary"
|
||||
assert output_type.name == "final_output"
|
||||
assert output_type.description is None
|
||||
assert output_type.strict is True
|
||||
assert output_schema["type"] == "object"
|
||||
assert output_schema["title"] == "incident_summary"
|
||||
assert output_schema["title"] == "final_output"
|
||||
assert output_schema["description"] == "Structured incident summary."
|
||||
assert output_adapter.validate_python(valid_output) == valid_output
|
||||
|
||||
@ -206,7 +210,7 @@ def test_output_layer_rejects_non_defs_local_ref_in_direct_object_schema() -> No
|
||||
|
||||
def test_output_layer_keeps_local_defs_ref_working_in_direct_object_schema() -> None:
|
||||
output_contract = DifyOutputLayer.from_config(
|
||||
DifyOutputLayerConfig(json_schema=_object_local_defs_ref_schema(), name="direct_defs_result")
|
||||
DifyOutputLayerConfig(json_schema=_object_local_defs_ref_schema())
|
||||
).build_output_contract()
|
||||
output_adapter = TypeAdapter(_validated_output_type(output_contract.output_type))
|
||||
output_schema = output_adapter.json_schema()
|
||||
@ -221,7 +225,7 @@ def test_output_layer_keeps_local_defs_ref_working_in_direct_object_schema() ->
|
||||
},
|
||||
},
|
||||
"required": ["items"],
|
||||
"title": "direct_defs_result",
|
||||
"title": "final_output",
|
||||
}
|
||||
assert output_adapter.validate_python({"items": ["a", "b"]}) == {"items": ["a", "b"]}
|
||||
|
||||
|
||||
@ -8,7 +8,7 @@ from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptL
|
||||
import dify_agent.protocol as protocol_exports
|
||||
from dify_agent.layers.dify_plugin import DIFY_PLUGIN_LAYER_TYPE_ID, DIFY_PLUGIN_LLM_LAYER_TYPE_ID
|
||||
from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerConfig
|
||||
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, DIFY_AGENT_OUTPUT_LAYER_ID
|
||||
from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID, DIFY_AGENT_MODEL_LAYER_ID, DIFY_AGENT_OUTPUT_LAYER_ID
|
||||
from dify_agent.protocol.schemas import (
|
||||
RUN_EVENT_ADAPTER,
|
||||
CreateRunRequest,
|
||||
@ -63,6 +63,7 @@ def test_pydantic_ai_event_data_uses_agent_stream_event_model() -> None:
|
||||
|
||||
def test_create_run_request_rejects_old_compositor_payload_and_model_layer_id_is_public() -> None:
|
||||
assert DIFY_AGENT_MODEL_LAYER_ID == "llm"
|
||||
assert DIFY_AGENT_HISTORY_LAYER_ID == "history"
|
||||
assert DIFY_AGENT_OUTPUT_LAYER_ID == "output"
|
||||
with pytest.raises(ValidationError):
|
||||
_ = CreateRunRequest.model_validate(
|
||||
|
||||
@ -7,9 +7,10 @@ import pytest
|
||||
|
||||
from agenton.compositor import CompositorSessionSnapshot, LayerSessionSnapshot
|
||||
from agenton.layers import ExitIntent, LifecycleState
|
||||
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID
|
||||
from agenton_collections.layers.plain import PromptLayerConfig
|
||||
from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerConfig
|
||||
from dify_agent.protocol import DIFY_AGENT_OUTPUT_LAYER_ID
|
||||
from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID, DIFY_AGENT_OUTPUT_LAYER_ID
|
||||
from dify_agent.protocol.schemas import (
|
||||
CreateRunRequest,
|
||||
LayerExitSignals,
|
||||
@ -191,7 +192,6 @@ def test_create_run_rejects_invalid_output_schema_before_persisting() -> None:
|
||||
await scheduler.create_run(
|
||||
_request(
|
||||
output_config={
|
||||
"name": "incident_summary",
|
||||
"json_schema": _recursive_output_schema(),
|
||||
}
|
||||
)
|
||||
@ -212,7 +212,6 @@ def test_create_run_rejects_remote_ref_output_schema_before_persisting() -> None
|
||||
await scheduler.create_run(
|
||||
_request(
|
||||
output_config={
|
||||
"name": "incident_summary",
|
||||
"json_schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
@ -238,7 +237,6 @@ def test_create_run_rejects_non_object_output_schema_before_persisting() -> None
|
||||
await scheduler.create_run(
|
||||
_request(
|
||||
output_config={
|
||||
"name": "incident_actions",
|
||||
"json_schema": {
|
||||
"type": "array",
|
||||
"items": {"type": "string"},
|
||||
@ -252,6 +250,32 @@ def test_create_run_rejects_non_object_output_schema_before_persisting() -> None
|
||||
asyncio.run(scenario())
|
||||
|
||||
|
||||
def test_create_run_rejects_public_output_tool_name_override_before_persisting() -> None:
|
||||
async def scenario() -> None:
|
||||
store = FakeStore()
|
||||
async with httpx.AsyncClient() as client:
|
||||
scheduler = RunScheduler(store=store, plugin_daemon_http_client=client)
|
||||
|
||||
with pytest.raises(ValueError, match="Extra inputs are not permitted"):
|
||||
await scheduler.create_run(
|
||||
_request(
|
||||
output_config={
|
||||
"name": "incident_summary",
|
||||
"json_schema": {
|
||||
"type": "object",
|
||||
"properties": {"title": {"type": "string"}},
|
||||
"required": ["title"],
|
||||
"additionalProperties": False,
|
||||
},
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
assert store.records == {}
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
|
||||
def test_create_run_rejects_non_defs_local_ref_in_direct_object_schema_before_persisting() -> None:
|
||||
async def scenario() -> None:
|
||||
store = FakeStore()
|
||||
@ -262,7 +286,6 @@ def test_create_run_rejects_non_defs_local_ref_in_direct_object_schema_before_pe
|
||||
await scheduler.create_run(
|
||||
_request(
|
||||
output_config={
|
||||
"name": "incident_summary",
|
||||
"json_schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
@ -426,6 +449,78 @@ def test_validate_run_request_rejects_misnamed_output_layer_before_provider_chec
|
||||
asyncio.run(scenario())
|
||||
|
||||
|
||||
def test_validate_run_request_accepts_reserved_history_layer() -> None:
|
||||
async def scenario() -> None:
|
||||
request = CreateRunRequest(
|
||||
composition=RunComposition(
|
||||
layers=[
|
||||
RunLayerSpec(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")),
|
||||
RunLayerSpec(name=DIFY_AGENT_HISTORY_LAYER_ID, type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID),
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
await validate_run_request(request)
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
|
||||
def test_validate_run_request_rejects_misnamed_history_layer_before_provider_checks() -> None:
|
||||
async def scenario() -> None:
|
||||
request = CreateRunRequest(
|
||||
composition=RunComposition(
|
||||
layers=[
|
||||
RunLayerSpec(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")),
|
||||
RunLayerSpec(name="chat-history", type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID),
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
with pytest.raises(RunRequestValidationError, match="must use reserved layer name 'history'"):
|
||||
await validate_run_request(request, layer_providers=())
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
|
||||
def test_validate_run_request_rejects_multiple_history_layers_before_provider_checks() -> None:
|
||||
async def scenario() -> None:
|
||||
request = CreateRunRequest(
|
||||
composition=RunComposition(
|
||||
layers=[
|
||||
RunLayerSpec(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")),
|
||||
RunLayerSpec(name=DIFY_AGENT_HISTORY_LAYER_ID, type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID),
|
||||
RunLayerSpec(name="secondary-history", type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID),
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
with pytest.raises(RunRequestValidationError, match="Only one 'pydantic_ai.history' layer is supported"):
|
||||
await validate_run_request(request, layer_providers=())
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
|
||||
def test_validate_run_request_rejects_history_layer_dependencies_before_provider_checks() -> None:
|
||||
async def scenario() -> None:
|
||||
request = CreateRunRequest(
|
||||
composition=RunComposition(
|
||||
layers=[
|
||||
RunLayerSpec(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")),
|
||||
RunLayerSpec(
|
||||
name=DIFY_AGENT_HISTORY_LAYER_ID,
|
||||
type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID,
|
||||
deps={"prompt": "prompt"},
|
||||
),
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
with pytest.raises(RunRequestValidationError, match="does not support dependencies"):
|
||||
await validate_run_request(request, layer_providers=())
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
|
||||
def test_create_run_rejects_unknown_layer_exit_signal_before_persisting() -> None:
|
||||
async def scenario() -> None:
|
||||
store = FakeStore()
|
||||
|
||||
@ -5,18 +5,19 @@ from typing import Any
|
||||
import httpx
|
||||
import pytest
|
||||
from pydantic_ai.exceptions import UnexpectedModelBehavior
|
||||
from pydantic_ai.messages import ModelMessage, ModelResponse, ToolCallPart
|
||||
from pydantic_ai.messages import ModelMessage, ModelRequest, ModelResponse, SystemPromptPart, TextPart, ToolCallPart, UserPromptPart
|
||||
from pydantic_ai.models import ModelRequestParameters
|
||||
from pydantic_ai.models.test import TestModel
|
||||
from pydantic_ai.settings import ModelSettings
|
||||
|
||||
from agenton.compositor import CompositorSessionSnapshot, LayerSessionSnapshot
|
||||
from agenton.layers import ExitIntent, LifecycleState
|
||||
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID, PydanticAIHistoryRuntimeState
|
||||
from agenton_collections.layers.plain import PromptLayerConfig
|
||||
from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig
|
||||
from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer
|
||||
from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerConfig
|
||||
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, DIFY_AGENT_OUTPUT_LAYER_ID
|
||||
from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID, DIFY_AGENT_MODEL_LAYER_ID, DIFY_AGENT_OUTPUT_LAYER_ID
|
||||
from dify_agent.protocol.schemas import (
|
||||
CreateRunRequest,
|
||||
LayerExitSignals,
|
||||
@ -31,6 +32,7 @@ from dify_agent.runtime.runner import AgentRunRunner, AgentRunValidationError
|
||||
def _request(
|
||||
user: str | list[str] = "hello",
|
||||
*,
|
||||
include_history: bool = False,
|
||||
llm_layer_name: str = DIFY_AGENT_MODEL_LAYER_ID,
|
||||
plugin_layer_name: str = "plugin",
|
||||
on_exit: LayerExitSignals | None = None,
|
||||
@ -42,6 +44,11 @@ def _request(
|
||||
type="plain.prompt",
|
||||
config=PromptLayerConfig(prefix="system", user=user),
|
||||
),
|
||||
*(
|
||||
[RunLayerSpec(name=DIFY_AGENT_HISTORY_LAYER_ID, type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID)]
|
||||
if include_history
|
||||
else []
|
||||
),
|
||||
RunLayerSpec(
|
||||
name=plugin_layer_name,
|
||||
type="dify.plugin",
|
||||
@ -122,6 +129,58 @@ class SequenceOutputTestModel(TestModel):
|
||||
)
|
||||
|
||||
|
||||
class RecordingTestModel(TestModel):
|
||||
seen_requests: list[list[ModelMessage]]
|
||||
failure: Exception | None
|
||||
|
||||
def __init__(self, *, custom_output_text: str = "done", failure: Exception | None = None) -> None:
|
||||
super().__init__(call_tools=[], custom_output_text=custom_output_text)
|
||||
self.seen_requests = []
|
||||
self.failure = failure
|
||||
|
||||
def _request(
|
||||
self,
|
||||
messages: list[ModelMessage],
|
||||
model_settings: ModelSettings | None,
|
||||
model_request_parameters: ModelRequestParameters,
|
||||
) -> ModelResponse:
|
||||
self.seen_requests.append(list(messages))
|
||||
if self.failure is not None:
|
||||
raise self.failure
|
||||
return super()._request(messages, model_settings, model_request_parameters)
|
||||
|
||||
|
||||
def _history_session_snapshot(
|
||||
messages: list[ModelMessage],
|
||||
*,
|
||||
include_output: bool = False,
|
||||
) -> CompositorSessionSnapshot:
|
||||
layers = [
|
||||
LayerSessionSnapshot(name="prompt", lifecycle_state=LifecycleState.SUSPENDED, runtime_state={}),
|
||||
LayerSessionSnapshot(
|
||||
name=DIFY_AGENT_HISTORY_LAYER_ID,
|
||||
lifecycle_state=LifecycleState.SUSPENDED,
|
||||
runtime_state=PydanticAIHistoryRuntimeState(messages=messages).model_dump(mode="json"),
|
||||
),
|
||||
LayerSessionSnapshot(name="plugin", lifecycle_state=LifecycleState.SUSPENDED, runtime_state={}),
|
||||
LayerSessionSnapshot(name=DIFY_AGENT_MODEL_LAYER_ID, lifecycle_state=LifecycleState.SUSPENDED, runtime_state={}),
|
||||
]
|
||||
if include_output:
|
||||
layers.append(
|
||||
LayerSessionSnapshot(name=DIFY_AGENT_OUTPUT_LAYER_ID, lifecycle_state=LifecycleState.SUSPENDED, runtime_state={})
|
||||
)
|
||||
return CompositorSessionSnapshot(layers=layers)
|
||||
|
||||
|
||||
def _history_messages_from_snapshot(snapshot: CompositorSessionSnapshot) -> list[ModelMessage]:
|
||||
history_snapshot = next(layer for layer in snapshot.layers if layer.name == DIFY_AGENT_HISTORY_LAYER_ID)
|
||||
return PydanticAIHistoryRuntimeState.model_validate(history_snapshot.runtime_state).messages
|
||||
|
||||
|
||||
def _flatten_message_parts(messages: list[ModelMessage]) -> list[object]:
|
||||
return [part for message in messages for part in message.parts]
|
||||
|
||||
|
||||
def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
seen_clients: list[httpx.AsyncClient] = []
|
||||
|
||||
@ -170,6 +229,172 @@ def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPa
|
||||
assert sink.statuses["run-1"] == "succeeded"
|
||||
|
||||
|
||||
def test_runner_passes_temporary_system_prompt_prefix_without_history_layer(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
model = RecordingTestModel(custom_output_text="done")
|
||||
|
||||
def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
|
||||
assert http_client.is_closed is False
|
||||
return model # pyright: ignore[reportReturnType]
|
||||
|
||||
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
|
||||
sink = InMemoryRunEventSink()
|
||||
|
||||
async def scenario() -> None:
|
||||
async with httpx.AsyncClient() as client:
|
||||
await AgentRunRunner(
|
||||
sink=sink,
|
||||
request=_request("current user"),
|
||||
run_id="run-no-history",
|
||||
plugin_daemon_http_client=client,
|
||||
).run()
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
request_parts = _flatten_message_parts(model.seen_requests[0])
|
||||
assert isinstance(request_parts[0], SystemPromptPart)
|
||||
assert request_parts[0].content == "system"
|
||||
assert isinstance(request_parts[1], UserPromptPart)
|
||||
assert request_parts[1].content == "current user"
|
||||
terminal = sink.events["run-no-history"][-1]
|
||||
assert isinstance(terminal, RunSucceededEvent)
|
||||
assert [layer.name for layer in terminal.data.session_snapshot.layers] == ["prompt", "plugin", DIFY_AGENT_MODEL_LAYER_ID]
|
||||
|
||||
|
||||
def test_runner_prepends_current_system_prompt_to_stored_history_and_appends_only_new_messages(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
model = RecordingTestModel(custom_output_text="done")
|
||||
stored_history = [
|
||||
ModelRequest(parts=[UserPromptPart(content="old user")]),
|
||||
ModelResponse(parts=[TextPart(content="old assistant")]),
|
||||
]
|
||||
|
||||
def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
|
||||
assert http_client.is_closed is False
|
||||
return model # pyright: ignore[reportReturnType]
|
||||
|
||||
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
|
||||
request = _request("current user", include_history=True)
|
||||
request.session_snapshot = _history_session_snapshot(stored_history)
|
||||
sink = InMemoryRunEventSink()
|
||||
|
||||
async def scenario() -> None:
|
||||
async with httpx.AsyncClient() as client:
|
||||
await AgentRunRunner(
|
||||
sink=sink,
|
||||
request=request,
|
||||
run_id="run-history",
|
||||
plugin_daemon_http_client=client,
|
||||
).run()
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
request_parts = _flatten_message_parts(model.seen_requests[0])
|
||||
assert isinstance(request_parts[0], SystemPromptPart)
|
||||
assert request_parts[0].content == "system"
|
||||
assert isinstance(request_parts[1], UserPromptPart)
|
||||
assert request_parts[1].content == "old user"
|
||||
assert isinstance(request_parts[2], TextPart)
|
||||
assert request_parts[2].content == "old assistant"
|
||||
assert isinstance(request_parts[3], UserPromptPart)
|
||||
assert request_parts[3].content == "current user"
|
||||
|
||||
terminal = sink.events["run-history"][-1]
|
||||
assert isinstance(terminal, RunSucceededEvent)
|
||||
saved_history = _history_messages_from_snapshot(terminal.data.session_snapshot)
|
||||
assert saved_history[:2] == stored_history
|
||||
assert isinstance(saved_history[2], ModelRequest)
|
||||
assert len(saved_history[2].parts) == 1
|
||||
assert isinstance(saved_history[2].parts[0], UserPromptPart)
|
||||
assert saved_history[2].parts[0].content == "current user"
|
||||
assert isinstance(saved_history[3], ModelResponse)
|
||||
assert len(saved_history[3].parts) == 1
|
||||
assert isinstance(saved_history[3].parts[0], TextPart)
|
||||
assert saved_history[3].parts[0].content == "done"
|
||||
assert all(not any(isinstance(part, SystemPromptPart) for part in message.parts) for message in saved_history)
|
||||
|
||||
|
||||
def test_runner_with_empty_history_layer_still_sends_system_prompt_and_saves_only_new_messages(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
model = RecordingTestModel(custom_output_text="done")
|
||||
|
||||
def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
|
||||
assert http_client.is_closed is False
|
||||
return model # pyright: ignore[reportReturnType]
|
||||
|
||||
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
|
||||
request = _request("current user", include_history=True)
|
||||
request.session_snapshot = _history_session_snapshot([])
|
||||
sink = InMemoryRunEventSink()
|
||||
|
||||
async def scenario() -> None:
|
||||
async with httpx.AsyncClient() as client:
|
||||
await AgentRunRunner(
|
||||
sink=sink,
|
||||
request=request,
|
||||
run_id="run-empty-history",
|
||||
plugin_daemon_http_client=client,
|
||||
).run()
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
request_parts = _flatten_message_parts(model.seen_requests[0])
|
||||
assert isinstance(request_parts[0], SystemPromptPart)
|
||||
assert request_parts[0].content == "system"
|
||||
assert isinstance(request_parts[1], UserPromptPart)
|
||||
assert request_parts[1].content == "current user"
|
||||
|
||||
terminal = sink.events["run-empty-history"][-1]
|
||||
assert isinstance(terminal, RunSucceededEvent)
|
||||
saved_history = _history_messages_from_snapshot(terminal.data.session_snapshot)
|
||||
assert isinstance(saved_history[0], ModelRequest)
|
||||
assert len(saved_history[0].parts) == 1
|
||||
assert isinstance(saved_history[0].parts[0], UserPromptPart)
|
||||
assert saved_history[0].parts[0].content == "current user"
|
||||
assert isinstance(saved_history[1], ModelResponse)
|
||||
assert len(saved_history[1].parts) == 1
|
||||
assert isinstance(saved_history[1].parts[0], TextPart)
|
||||
assert saved_history[1].parts[0].content == "done"
|
||||
assert all(not any(isinstance(part, SystemPromptPart) for part in message.parts) for message in saved_history)
|
||||
|
||||
|
||||
def test_runner_failure_with_history_layer_emits_failed_terminal_event_without_success_snapshot(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
model = RecordingTestModel(failure=RuntimeError("boom"))
|
||||
stored_history = [
|
||||
ModelRequest(parts=[UserPromptPart(content="old user")]),
|
||||
ModelResponse(parts=[TextPart(content="old assistant")]),
|
||||
]
|
||||
|
||||
def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
|
||||
assert http_client.is_closed is False
|
||||
return model # pyright: ignore[reportReturnType]
|
||||
|
||||
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
|
||||
request = _request("current user", include_history=True)
|
||||
request.session_snapshot = _history_session_snapshot(stored_history)
|
||||
sink = InMemoryRunEventSink()
|
||||
|
||||
async def scenario() -> None:
|
||||
async with httpx.AsyncClient() as client:
|
||||
with pytest.raises(RuntimeError, match="boom"):
|
||||
await AgentRunRunner(
|
||||
sink=sink,
|
||||
request=request,
|
||||
run_id="run-history-failure",
|
||||
plugin_daemon_http_client=client,
|
||||
).run()
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
assert [event.type for event in sink.events["run-history-failure"]] == ["run_started", "run_failed"]
|
||||
assert sink.statuses["run-history-failure"] == "failed"
|
||||
assert request.session_snapshot is not None
|
||||
assert _history_messages_from_snapshot(request.session_snapshot) == stored_history
|
||||
|
||||
|
||||
def test_runner_applies_on_exit_overrides_to_success_snapshot(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
|
||||
assert http_client.is_closed is False
|
||||
@ -232,7 +457,6 @@ def test_runner_passes_output_layer_spec_to_agent_and_serializes_structured_resu
|
||||
"required": ["title", "severity", "actions"],
|
||||
"additionalProperties": False,
|
||||
},
|
||||
name="incident_summary",
|
||||
description="Structured incident summary returned by the agent.",
|
||||
strict=True,
|
||||
)
|
||||
@ -267,10 +491,10 @@ def test_runner_passes_output_layer_spec_to_agent_and_serializes_structured_resu
|
||||
assert model.last_model_request_parameters is not None
|
||||
assert len(model.last_model_request_parameters.output_tools) == 1
|
||||
output_tool = model.last_model_request_parameters.output_tools[0]
|
||||
assert output_tool.name == "incident_summary"
|
||||
assert output_tool.name == "final_output"
|
||||
assert output_tool.description == "Structured incident summary returned by the agent."
|
||||
assert output_tool.parameters_json_schema["type"] == "object"
|
||||
assert output_tool.parameters_json_schema["title"] == "incident_summary"
|
||||
assert output_tool.parameters_json_schema["title"] == "final_output"
|
||||
assert output_tool.parameters_json_schema["properties"] == {
|
||||
"title": {"type": "string"},
|
||||
"severity": {"type": "string", "enum": ["low", "medium", "high"]},
|
||||
@ -321,7 +545,6 @@ def test_runner_retries_invalid_structured_output_and_eventually_succeeds(monkey
|
||||
"required": ["title", "severity", "actions"],
|
||||
"additionalProperties": False,
|
||||
},
|
||||
name="incident_summary",
|
||||
description="Structured incident summary returned by the agent.",
|
||||
)
|
||||
)
|
||||
@ -374,7 +597,6 @@ def test_runner_fails_when_invalid_structured_output_exhausts_retries(monkeypatc
|
||||
"required": ["title", "severity", "actions"],
|
||||
"additionalProperties": False,
|
||||
},
|
||||
name="incident_summary",
|
||||
description="Structured incident summary returned by the agent.",
|
||||
)
|
||||
)
|
||||
@ -411,7 +633,6 @@ def test_runner_rejects_invalid_output_layer_before_model_resolution(monkeypatch
|
||||
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
|
||||
request = _request(
|
||||
output_config={
|
||||
"name": "incident_summary",
|
||||
"json_schema": _recursive_output_schema(),
|
||||
}
|
||||
)
|
||||
|
||||
@ -0,0 +1,151 @@
|
||||
import asyncio
|
||||
|
||||
import pytest
|
||||
from pydantic_ai.messages import ModelRequest, ModelResponse, SystemPromptPart, TextPart, UserPromptPart
|
||||
|
||||
from agenton.compositor import Compositor, LayerNode
|
||||
from agenton_collections.layers.pydantic_ai import (
|
||||
PYDANTIC_AI_HISTORY_LAYER_TYPE_ID,
|
||||
PydanticAIHistoryLayer,
|
||||
)
|
||||
from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID
|
||||
from dify_agent.protocol.schemas import RunComposition, RunLayerSpec
|
||||
from dify_agent.runtime.compositor_factory import create_default_layer_providers
|
||||
from dify_agent.runtime.history import (
|
||||
append_successful_run_history,
|
||||
build_run_message_history,
|
||||
get_history_layer,
|
||||
validate_history_layer_composition,
|
||||
)
|
||||
|
||||
|
||||
def test_default_layer_providers_include_pydantic_ai_history_layer() -> None:
|
||||
providers = create_default_layer_providers()
|
||||
|
||||
assert PYDANTIC_AI_HISTORY_LAYER_TYPE_ID in {provider.type_id for provider in providers}
|
||||
|
||||
|
||||
def test_validate_history_layer_composition_accepts_absent_or_reserved_history_layer() -> None:
|
||||
validate_history_layer_composition(RunComposition(layers=[]))
|
||||
validate_history_layer_composition(
|
||||
RunComposition(
|
||||
layers=[
|
||||
RunLayerSpec(
|
||||
name=DIFY_AGENT_HISTORY_LAYER_ID,
|
||||
type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID,
|
||||
)
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def test_validate_history_layer_composition_rejects_multiple_history_layers() -> None:
|
||||
composition = RunComposition(
|
||||
layers=[
|
||||
RunLayerSpec(name=DIFY_AGENT_HISTORY_LAYER_ID, type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID),
|
||||
RunLayerSpec(name="secondary-history", type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID),
|
||||
]
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="Only one 'pydantic_ai.history' layer is supported"):
|
||||
validate_history_layer_composition(composition)
|
||||
|
||||
|
||||
def test_validate_history_layer_composition_rejects_misnamed_history_layer() -> None:
|
||||
composition = RunComposition(
|
||||
layers=[
|
||||
RunLayerSpec(name="chat-history", type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID),
|
||||
]
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="must use reserved layer name 'history'"):
|
||||
validate_history_layer_composition(composition)
|
||||
|
||||
|
||||
def test_validate_history_layer_composition_rejects_history_layer_dependencies() -> None:
|
||||
composition = RunComposition(
|
||||
layers=[
|
||||
RunLayerSpec(
|
||||
name=DIFY_AGENT_HISTORY_LAYER_ID,
|
||||
type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID,
|
||||
deps={"prompt": "prompt"},
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="does not support dependencies"):
|
||||
validate_history_layer_composition(composition)
|
||||
|
||||
|
||||
def test_get_history_layer_returns_optional_active_history_layer() -> None:
|
||||
compositor = Compositor([LayerNode(DIFY_AGENT_HISTORY_LAYER_ID, PydanticAIHistoryLayer)])
|
||||
|
||||
async def scenario() -> None:
|
||||
async with compositor.enter() as run:
|
||||
history_layer = get_history_layer(run)
|
||||
|
||||
assert isinstance(history_layer, PydanticAIHistoryLayer)
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
|
||||
def test_build_run_message_history_renders_current_system_prompts_before_stored_history() -> None:
|
||||
stored_history = [
|
||||
ModelRequest(parts=[UserPromptPart(content="old user")]),
|
||||
ModelResponse(parts=[TextPart(content="old assistant")]),
|
||||
]
|
||||
|
||||
async def scenario() -> None:
|
||||
message_history = await build_run_message_history(
|
||||
system_prompts=[lambda: "current system", lambda: "current suffix"],
|
||||
stored_history=stored_history,
|
||||
)
|
||||
|
||||
assert message_history is not None
|
||||
assert isinstance(message_history[0], ModelRequest)
|
||||
assert [part.content for part in message_history[0].parts] == ["current system", "current suffix"]
|
||||
assert message_history[1:] == stored_history
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
|
||||
def test_build_run_message_history_returns_none_without_system_prompt_or_history() -> None:
|
||||
async def scenario() -> None:
|
||||
assert await build_run_message_history(system_prompts=[], stored_history=[]) is None
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
|
||||
def test_build_run_message_history_renders_system_prompt_without_history_layer() -> None:
|
||||
async def scenario() -> None:
|
||||
message_history = await build_run_message_history(system_prompts=[lambda: "current system"], stored_history=[])
|
||||
|
||||
assert message_history is not None
|
||||
assert len(message_history) == 1
|
||||
assert isinstance(message_history[0], ModelRequest)
|
||||
assert isinstance(message_history[0].parts[0], SystemPromptPart)
|
||||
assert message_history[0].parts[0].content == "current system"
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
|
||||
def test_build_run_message_history_rejects_context_dependent_prompt_functions() -> None:
|
||||
def unsupported_prompt(_ctx: object) -> str:
|
||||
return "current system"
|
||||
|
||||
async def scenario() -> None:
|
||||
with pytest.raises(ValueError, match="zero-argument system prompts"):
|
||||
await build_run_message_history(system_prompts=[unsupported_prompt], stored_history=[])
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
|
||||
def test_append_successful_run_history_preserves_existing_message_order() -> None:
|
||||
history_layer = PydanticAIHistoryLayer()
|
||||
stored_history = [ModelRequest(parts=[UserPromptPart(content="old user")])]
|
||||
new_messages = [ModelResponse(parts=[TextPart(content="new assistant")])]
|
||||
|
||||
history_layer.replace_messages(stored_history)
|
||||
append_successful_run_history(history_layer, new_messages)
|
||||
|
||||
assert history_layer.message_history == [*stored_history, *new_messages]
|
||||
@ -30,6 +30,13 @@ class FakeRedis:
|
||||
|
||||
async def xadd(self, key: str, fields: Mapping[str, object]) -> str:
|
||||
self.commands.append(("xadd", key, dict(fields)))
|
||||
return self._append_stream_entry(key, fields)
|
||||
|
||||
def pipeline(self, transaction: bool = True, shard_hint: str | None = None) -> "FakeRedisPipeline":
|
||||
self.commands.append(("pipeline", transaction, shard_hint))
|
||||
return FakeRedisPipeline(self)
|
||||
|
||||
def _append_stream_entry(self, key: str, fields: Mapping[str, object]) -> str:
|
||||
entries = self.streams.setdefault(key, [])
|
||||
event_id = f"{len(entries) + 1}-0"
|
||||
entries.append((event_id, dict(fields)))
|
||||
@ -64,6 +71,35 @@ class FakeRedis:
|
||||
return int(timestamp), int(sequence)
|
||||
|
||||
|
||||
class FakeRedisPipeline:
|
||||
redis: FakeRedis
|
||||
results: list[object]
|
||||
|
||||
def __init__(self, redis: FakeRedis) -> None:
|
||||
self.redis = redis
|
||||
self.results = []
|
||||
|
||||
async def __aenter__(self) -> "FakeRedisPipeline":
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type: object, exc: object, traceback: object) -> None:
|
||||
del exc_type, exc, traceback
|
||||
|
||||
def xadd(self, key: str, fields: Mapping[str, object]) -> "FakeRedisPipeline":
|
||||
self.redis.commands.append(("xadd", key, dict(fields)))
|
||||
self.results.append(self.redis._append_stream_entry(key, fields))
|
||||
return self
|
||||
|
||||
def expire(self, key: str, seconds: int) -> "FakeRedisPipeline":
|
||||
self.redis.commands.append(("expire", key, seconds))
|
||||
self.results.append(True)
|
||||
return self
|
||||
|
||||
async def execute(self) -> list[object]:
|
||||
self.redis.commands.append(("execute",))
|
||||
return list(self.results)
|
||||
|
||||
|
||||
def test_create_run_writes_running_record_without_job_queue_and_with_retention() -> None:
|
||||
redis = FakeRedis()
|
||||
store = RedisRunStore(redis, prefix="test") # pyright: ignore[reportArgumentType]
|
||||
@ -97,15 +133,21 @@ def test_append_event_serializes_typed_event_without_id_and_expires_run_keys() -
|
||||
event_id = asyncio.run(store.append_event(RunStartedEvent(id="local", run_id="run-1")))
|
||||
|
||||
assert event_id == "1-0"
|
||||
assert redis.commands[0][0] == "xadd"
|
||||
fields = redis.commands[0][2]
|
||||
pipeline_commands = [command for command in redis.commands if command[0] == "pipeline"]
|
||||
assert len(pipeline_commands) == 1
|
||||
assert pipeline_commands[0][1] is True
|
||||
xadd_commands = [command for command in redis.commands if command[0] == "xadd"]
|
||||
assert len(xadd_commands) == 1
|
||||
fields = xadd_commands[0][2]
|
||||
assert isinstance(fields, dict)
|
||||
assert '"id"' not in str(fields["payload"])
|
||||
assert '"type":"run_started"' in str(fields["payload"])
|
||||
assert redis.commands[1:] == [
|
||||
expire_commands = {command for command in redis.commands if command[0] == "expire"}
|
||||
assert expire_commands == {
|
||||
("expire", "test:runs:run-1:events", 60),
|
||||
("expire", "test:runs:run-1:record", 60),
|
||||
]
|
||||
}
|
||||
assert ("execute",) in redis.commands
|
||||
|
||||
|
||||
def test_get_events_round_trips_run_succeeded_output_and_session_snapshot() -> None:
|
||||
|
||||
@ -8,7 +8,7 @@ PROJECT_ROOT = Path(__file__).resolve().parents[2]
|
||||
|
||||
CLIENT_SHARED_DTO_DEPENDENCIES = {
|
||||
"httpx>=0.28.1",
|
||||
"pydantic>=2.13.3",
|
||||
"pydantic>=2.12.5,<3",
|
||||
"pydantic-ai-slim>=1.85.1",
|
||||
"typing-extensions>=4.12.2",
|
||||
}
|
||||
|
||||
2
dify-agent/uv.lock
generated
2
dify-agent/uv.lock
generated
@ -614,7 +614,7 @@ requires-dist = [
|
||||
{ name = "graphon", marker = "extra == 'server'", specifier = "~=0.2.2" },
|
||||
{ name = "httpx", specifier = ">=0.28.1" },
|
||||
{ name = "jsonschema", marker = "extra == 'server'", specifier = ">=4.23.0" },
|
||||
{ name = "pydantic", specifier = ">=2.12.5,<2.13" },
|
||||
{ name = "pydantic", specifier = ">=2.12.5,<3" },
|
||||
{ name = "pydantic-ai-slim", specifier = ">=1.85.1" },
|
||||
{ name = "pydantic-ai-slim", extras = ["anthropic", "google", "openai"], marker = "extra == 'server'", specifier = ">=1.85.1" },
|
||||
{ name = "pydantic-settings", marker = "extra == 'server'", specifier = ">=2.12.0" },
|
||||
|
||||
Reference in New Issue
Block a user