Compare commits

..

16 Commits

Author SHA1 Message Date
79bad45f55 refactor(migration): drop comment from LB collision key 2026-05-25 22:31:21 -07:00
90d6b3db77 test(migration): cover LB dedup collision case in credential merge 2026-05-25 22:14:28 -07:00
c39cfc14fb refactor(migration): clarify LB collision key includes model_name intentionally 2026-05-25 22:06:58 -07:00
2300497a8c fix(migration): dedup load_balancing_model_configs on credential merge
When provider_model_credentials are merged (loser deleted, winner kept),
rows in load_balancing_model_configs pointing to the loser credential are
rewritten to point to the winner. If the winner already has an LB row for
the same (provider_name, model_name, model_type), the rewrite would create
a duplicate — instead, detect the collision and delete the loser LB row.

Adds _LoadBalancingCredentialDeletePlan, extends
_ProviderModelCredentialGroupPlan with load_balancing_deletions, adds
_emit_load_balancing_reference_deletions, and wires it into the credential
group plan emit path.
2026-05-25 22:00:55 -07:00
b0b96d5e01 Merge branch 'main' into feat/model-type-migration-script 2026-05-25 21:21:05 -07:00
435ca8b9f1 test(api): fix broken tests 2026-05-26 11:21:34 +08:00
e21679d980 chore(api): fix type error 2026-05-26 11:00:51 +08:00
91a0a6d27a chore(api): fix lint issues 2026-05-26 10:49:29 +08:00
5ac44589d6 Merge remote-tracking branch 'upstream/main' into feat/model-type-migration-script-2 2026-05-26 10:12:48 +08:00
2fda1318be feat(api): support concurrency in mode type migration script 2026-05-26 10:00:55 +08:00
8cd3cf7c75 feat(api): allow specify JSON output 2026-05-26 09:45:04 +08:00
7aabb67441 chore(api): rollback changes to ext_logging 2026-05-26 09:35:03 +08:00
c9327ee666 chore(api): remove dev group from default groups 2026-05-26 09:28:24 +08:00
1a25acc140 chore(api): log after updating / deletion 2026-05-26 09:24:08 +08:00
79beadb8cf chore(api): minor adjustment 2026-05-26 04:39:30 +08:00
1b70cdd51d feat(api): introduce model type migration script
Assisted-By: Codex:GPT-5.4
2026-05-26 03:27:19 +08:00
49 changed files with 4778 additions and 5421 deletions

111
.github/dependabot.yml vendored
View File

@ -110,114 +110,3 @@ updates:
github-actions-dependencies:
patterns:
- "*"
- package-ecosystem: "uv"
directory: "/api"
target-branch: "lts/1.13.x"
open-pull-requests-limit: 10
schedule:
interval: "weekly"
groups:
flask:
patterns:
- "flask"
- "flask-*"
- "werkzeug"
- "gunicorn"
google:
patterns:
- "google-*"
- "googleapis-*"
opentelemetry:
patterns:
- "opentelemetry-*"
pydantic:
patterns:
- "pydantic"
- "pydantic-*"
llm:
patterns:
- "langfuse"
- "langsmith"
- "litellm"
- "mlflow*"
- "opik"
- "weave*"
- "arize*"
- "tiktoken"
- "transformers"
database:
patterns:
- "sqlalchemy"
- "psycopg2*"
- "psycogreen"
- "redis*"
- "alembic*"
storage:
patterns:
- "boto3*"
- "botocore*"
- "azure-*"
- "bce-*"
- "cos-python-*"
- "esdk-obs-*"
- "google-cloud-storage"
- "opendal"
- "oss2"
- "supabase*"
- "tos*"
vdb:
patterns:
- "alibabacloud*"
- "chromadb"
- "clickhouse-*"
- "clickzetta-*"
- "couchbase"
- "elasticsearch"
- "opensearch-py"
- "oracledb"
- "pgvect*"
- "pymilvus"
- "pymochow"
- "pyobvector"
- "qdrant-client"
- "intersystems-*"
- "tablestore"
- "tcvectordb"
- "tidb-vector"
- "upstash-*"
- "volcengine-*"
- "weaviate-*"
- "xinference-*"
- "mo-vector"
- "mysql-connector-*"
dev:
patterns:
- "coverage"
- "dotenv-linter"
- "faker"
- "lxml-stubs"
- "basedpyright"
- "ruff"
- "pytest*"
- "types-*"
- "boto3-stubs"
- "hypothesis"
- "pandas-stubs"
- "scipy-stubs"
- "import-linter"
- "celery-types"
- "mypy*"
- "pyrefly"
python-packages:
patterns:
- "*"
- package-ecosystem: "github-actions"
directory: "/"
target-branch: "lts/1.13.x"
open-pull-requests-limit: 5
schedule:
interval: "weekly"
groups:
github-actions-dependencies:
patterns:
- "*"

View File

@ -223,10 +223,11 @@ def initialize_extensions(app: DifyApp):
def create_migrations_app() -> DifyApp:
app = create_flask_app_with_configs()
from extensions import ext_database, ext_migrate
from extensions import ext_commands, ext_database, ext_migrate
# Initialize only required extensions
ext_database.init_app(app)
ext_migrate.init_app(app)
ext_commands.init_app(app)
return app

View File

@ -31,7 +31,6 @@ from clients.agent_backend.fake_client import FakeAgentBackendRunClient, FakeAge
from clients.agent_backend.request_builder import (
AGENT_SOUL_PROMPT_LAYER_ID,
DIFY_EXECUTION_CONTEXT_LAYER_ID,
DIFY_PLUGIN_TOOLS_LAYER_ID,
WORKFLOW_NODE_JOB_PROMPT_LAYER_ID,
WORKFLOW_USER_PROMPT_LAYER_ID,
AgentBackendModelConfig,
@ -44,7 +43,6 @@ from clients.agent_backend.request_builder import (
__all__ = [
"AGENT_SOUL_PROMPT_LAYER_ID",
"DIFY_EXECUTION_CONTEXT_LAYER_ID",
"DIFY_PLUGIN_TOOLS_LAYER_ID",
"WORKFLOW_NODE_JOB_PROMPT_LAYER_ID",
"WORKFLOW_USER_PROMPT_LAYER_ID",
"AgentBackendError",

View File

@ -18,10 +18,8 @@ from agenton.layers import ExitIntent
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig
from dify_agent.layers.dify_plugin import (
DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID,
DifyPluginCredentialValue,
DifyPluginLLMLayerConfig,
DifyPluginToolsLayerConfig,
)
from dify_agent.layers.execution_context import (
DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID,
@ -43,7 +41,6 @@ AGENT_SOUL_PROMPT_LAYER_ID = "agent_soul_prompt"
WORKFLOW_NODE_JOB_PROMPT_LAYER_ID = "workflow_node_job_prompt"
WORKFLOW_USER_PROMPT_LAYER_ID = "workflow_user_prompt"
DIFY_EXECUTION_CONTEXT_LAYER_ID = "execution_context"
DIFY_PLUGIN_TOOLS_LAYER_ID = "tools"
class AgentBackendModelConfig(BaseModel):
@ -84,7 +81,6 @@ class AgentBackendWorkflowNodeRunInput(BaseModel):
purpose: RunPurpose = "workflow_node"
idempotency_key: str | None = None
output: AgentBackendOutputConfig | None = None
tools: DifyPluginToolsLayerConfig | None = None
session_snapshot: CompositorSessionSnapshot | None = None
suspend_on_exit: bool = False
metadata: dict[str, JsonValue] = Field(default_factory=dict)
@ -151,17 +147,6 @@ class AgentBackendRunRequestBuilder:
]
)
if run_input.tools is not None and run_input.tools.tools:
layers.append(
RunLayerSpec(
name=DIFY_PLUGIN_TOOLS_LAYER_ID,
type=DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID,
deps={"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID},
metadata=run_input.metadata,
config=run_input.tools,
)
)
if run_input.output is not None:
layers.append(
RunLayerSpec(

View File

@ -3,6 +3,7 @@ CLI command modules extracted from `commands.py`.
"""
from .account import create_tenant, reset_email, reset_password
from .data_migrate import data_migrate, legacy_model_types
from .plugin import (
extract_plugins,
extract_unique_plugins,
@ -44,6 +45,7 @@ __all__ = [
"clear_orphaned_file_records",
"convert_to_agent_apps",
"create_tenant",
"data_migrate",
"delete_archived_workflow_runs",
"export_app_messages",
"extract_plugins",
@ -52,6 +54,7 @@ __all__ = [
"fix_app_site_missing",
"install_plugins",
"install_rag_pipeline_plugins",
"legacy_model_types",
"migrate_annotation_vector_database",
"migrate_data_for_plugin",
"migrate_knowledge_vector_database",

View File

@ -0,0 +1,169 @@
import io
import os
import sys
from contextlib import AbstractContextManager, nullcontext
from pathlib import Path
from typing import cast
import click
from extensions.ext_database import db
from graphon.model_runtime.entities.model_entities import ModelType
from services.legacy_model_type_migration import (
VALID_TABLE_NAMES,
LegacyModelTypeMigrationService,
load_tenant_ids_from_file,
)
_SUPPORTED_MODEL_TYPE_CHOICES = (
ModelType.LLM.value,
ModelType.TEXT_EMBEDDING.value,
ModelType.RERANK.value,
)
_DEFAULT_CONCURRENCY = os.cpu_count() or 1
def _normalize_multi_value_option(
values: tuple[str, ...],
*,
valid_values: tuple[str, ...],
option_name: str,
) -> tuple[str, ...]:
normalized_values: list[str] = []
seen_values: set[str] = set()
for value in values:
for item in value.split(","):
normalized_item = item.strip()
if not normalized_item:
continue
if normalized_item not in valid_values:
raise click.BadParameter(
f"invalid value '{normalized_item}'. valid values: {', '.join(valid_values)}",
param_hint=option_name,
)
if normalized_item in seen_values:
continue
seen_values.add(normalized_item)
normalized_values.append(normalized_item)
return tuple(normalized_values)
@click.group(
"data-migrate",
help="Online data migration commands.",
)
def data_migrate() -> None:
"""Namespace for production data migration commands."""
@click.command(
"legacy-model-types",
help=(
"Migrate legacy provider model_type values to canonical values. "
"Default is dry-run and emits JSON lines only. "
"If --tables includes provider_model_credentials, the command may also update "
"provider_models and load_balancing_model_configs references so merged credentials stay reachable."
),
)
@click.option(
"--apply",
is_flag=True,
default=False,
help="Apply the migration. Default is dry-run.",
)
@click.option(
"--tables",
"tables",
multiple=True,
type=str,
help=(
"Limit model_type migration to specific tables. Accepts comma-separated values or repeated flags. "
"When provider_model_credentials is selected, provider_models and "
"load_balancing_model_configs may also be updated for credential reference rewrites."
"Default to: "
),
)
@click.option(
"--model-types",
"model_types",
multiple=True,
type=str,
help=(
"Canonical model types to migrate. Accepts comma-separated values or repeated flags. "
"Defaults to: `llm,text-embedding,rerank`"
),
)
@click.option(
"--tenant-id-file",
type=click.Path(exists=True, dir_okay=False, readable=True, resolve_path=True),
help="Optional file containing tenant ids, one per line.",
)
@click.option(
"--output",
type=click.Path(dir_okay=False, resolve_path=True, path_type=Path),
help="Optional file path for JSON lines event logs. Defaults to stdout.",
)
@click.option(
"--concurrency",
type=click.IntRange(min=1),
default=_DEFAULT_CONCURRENCY,
show_default=True,
help="Number of tenant-level worker threads to run in parallel.",
)
def legacy_model_types(
apply: bool,
tables: tuple[str, ...],
model_types: tuple[str, ...],
tenant_id_file: str | None,
output: Path | None,
concurrency: int = _DEFAULT_CONCURRENCY,
) -> None:
"""
Migrate legacy provider-related model_type values and emit JSON lines events.
"""
normalized_tables = _normalize_multi_value_option(
tables,
valid_values=VALID_TABLE_NAMES,
option_name="--tables",
)
normalized_model_types = _normalize_multi_value_option(
model_types,
valid_values=_SUPPORTED_MODEL_TYPE_CHOICES,
option_name="--model-types",
)
selected_model_types = (
tuple(ModelType.value_of(model_type) for model_type in normalized_model_types)
if normalized_model_types
else (
ModelType.LLM,
ModelType.TEXT_EMBEDDING,
ModelType.RERANK,
)
)
tenant_ids = load_tenant_ids_from_file(tenant_id_file) if tenant_id_file else None
output_context: AbstractContextManager[io.TextIOBase]
if output is None:
output_context = nullcontext(cast(io.TextIOBase, sys.stdout))
else:
try:
output_context = output.open("w", encoding="utf-8")
except OSError as exc:
raise click.ClickException(f"failed to open output file '{output}': {exc.strerror or exc}") from exc
with output_context as output_stream:
LegacyModelTypeMigrationService(
engine=db.engine,
apply=apply,
concurrency=concurrency,
output=cast(io.TextIOBase, output_stream),
tables=normalized_tables or None,
model_types=selected_model_types,
tenant_ids=tenant_ids,
).migrate()
data_migrate.add_command(legacy_model_types)

View File

@ -68,7 +68,6 @@ from .app import (
workflow_app_log,
workflow_comment,
workflow_draft_variable,
workflow_node_output_inspector,
workflow_run,
workflow_statistic,
workflow_trigger,
@ -219,7 +218,6 @@ __all__ = [
"workflow_app_log",
"workflow_comment",
"workflow_draft_variable",
"workflow_node_output_inspector",
"workflow_run",
"workflow_statistic",
"workflow_trigger",

View File

@ -1,415 +0,0 @@
"""Console REST endpoints for the Node Output Inspector (Stage 4 §8 / §10.3).
PRD §Node Output Inspector replaces the consumer-organized Variable Inspector
with a producer-organized view of each node's declared outputs and their
per-run status. This module exposes two parallel sets of three read-only
endpoints — one for ``/workflows/draft/runs/...`` (Composer test runs) and one
for ``/workflows/published/runs/...`` (real App API / webapp / webhook /
schedule / plugin triggers). Both sets share the same service code, the same
response shapes, and the same error codes; the URL is the *only* difference,
so the frontend can pick the right prefix based on which run-detail page the
user is on.
Decision D-1 (published Inspector deferred) was lifted 2026-05-26 — the
``published_run_inspector_not_implemented`` 404 code is therefore no longer
produced.
URLs follow the design doc and reuse the existing
``/apps/<uuid:app_id>/workflows/draft/...`` prefix from
:mod:`controllers.console.app.workflow_draft_variable`. The
``published`` prefix mirrors it shape-for-shape.
"""
from __future__ import annotations
import json
import logging
from collections.abc import Iterator
from uuid import UUID
from flask import Response
from flask_restx import Resource
from controllers.console import console_ns
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, setup_required
from libs.exception import BaseHTTPException
from libs.login import login_required
from models import App, AppMode
from services.workflow import inspector_events
from services.workflow.node_output_inspector_service import (
NodeOutputInspectorError,
NodeOutputInspectorService,
)
logger = logging.getLogger(__name__)
# Heartbeat cadence — every N empty subscribe ticks emit a SSE comment so
# intervening proxies (nginx, ingress) don't reap the idle connection.
# ``inspector_events.subscribe`` ticks at 1s, so 15 → 15s heartbeat.
_HEARTBEAT_EVERY_TICKS = 15
# Hard ceiling on a single stream — if we never see a terminal workflow
# event (engine crashed, redis dropped the message), force-close after this
# many ticks (= seconds).
_STREAM_HARD_TIMEOUT_TICKS = 1800 # 30 min
def _service() -> NodeOutputInspectorService:
"""One-line factory so tests can monkeypatch a stub if needed."""
return NodeOutputInspectorService()
def _serve_snapshot(app_model: App, run_id: UUID) -> dict:
"""Resource-body shared by draft + published snapshot endpoints.
Pulled out so the 6 REST routes don't duplicate the same 6-line try/except
+ ``model_dump`` ritual — the routes shrink to one-liners and the actual
behaviour lives here, where unit tests can hit it without spinning up
Flask request context.
"""
try:
snapshot = _service().snapshot_workflow_run(app_model=app_model, workflow_run_id=str(run_id))
except NodeOutputInspectorError as error:
raise _InspectorNotFound(error) from error
return snapshot.model_dump(mode="json")
def _serve_node_detail(app_model: App, run_id: UUID, node_id: str) -> dict:
"""Resource-body shared by draft + published node-detail endpoints."""
try:
view = _service().node_detail(
app_model=app_model,
workflow_run_id=str(run_id),
node_id=node_id,
)
except NodeOutputInspectorError as error:
raise _InspectorNotFound(error) from error
return view.model_dump(mode="json")
def _serve_output_preview(app_model: App, run_id: UUID, node_id: str, output_name: str) -> dict:
"""Resource-body shared by draft + published output-preview endpoints."""
try:
preview = _service().output_preview(
app_model=app_model,
workflow_run_id=str(run_id),
node_id=node_id,
output_name=output_name,
)
except NodeOutputInspectorError as error:
raise _InspectorNotFound(error) from error
return preview.model_dump(mode="json")
class _InspectorNotFound(BaseHTTPException):
"""404 that preserves the inspector's specific error code.
Without this the response body collapses to a generic ``not_found`` code
and clients lose the ability to distinguish, e.g.,
``workflow_run_not_found`` from ``published_run_inspector_not_implemented``.
"""
code = 404
def __init__(self, error: NodeOutputInspectorError) -> None:
self.error_code = error.code
super().__init__(description=str(error))
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/runs/<uuid:run_id>/node-outputs")
class WorkflowDraftRunNodeOutputsApi(Resource):
"""Whole-run snapshot organized by producer node."""
@console_ns.doc("get_workflow_draft_run_node_outputs")
@console_ns.doc(description="Snapshot of every node's declared outputs for a draft workflow run.")
@console_ns.doc(params={"app_id": "Application ID", "run_id": "Workflow run ID"})
@console_ns.response(404, "Workflow run not found")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, run_id: UUID):
return _serve_snapshot(app_model, run_id)
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/runs/<uuid:run_id>/node-outputs/<string:node_id>")
class WorkflowDraftRunNodeOutputDetailApi(Resource):
"""One node's declared outputs + per-output status."""
@console_ns.doc("get_workflow_draft_run_node_output_detail")
@console_ns.doc(description="One node's declared outputs for a draft workflow run.")
@console_ns.doc(
params={
"app_id": "Application ID",
"run_id": "Workflow run ID",
"node_id": "Node ID inside the workflow graph",
}
)
@console_ns.response(404, "Workflow run / node not found")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, run_id: UUID, node_id: str):
return _serve_node_detail(app_model, run_id, node_id)
@console_ns.route(
"/apps/<uuid:app_id>/workflows/draft/runs/<uuid:run_id>/node-outputs/<string:node_id>/<string:output_name>/preview"
)
class WorkflowDraftRunNodeOutputPreviewApi(Resource):
"""Full value for one declared output (with signed URL for file refs)."""
@console_ns.doc("get_workflow_draft_run_node_output_preview")
@console_ns.doc(description="Full value for one declared output, including signed download URL for files.")
@console_ns.doc(
params={
"app_id": "Application ID",
"run_id": "Workflow run ID",
"node_id": "Node ID inside the workflow graph",
"output_name": "Declared output name as exposed by Composer",
}
)
@console_ns.response(404, "Workflow run / node / output not found")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, run_id: UUID, node_id: str, output_name: str):
return _serve_output_preview(app_model, run_id, node_id, output_name)
# ──────────────────────────────────────────────────────────────────────────────
# SSE event stream — shared generator used by draft + published variants
# ──────────────────────────────────────────────────────────────────────────────
def _sse_envelope(event: str, data: dict | str, event_id: int) -> str:
"""Format one SSE record per D-5 ``{event, data, id}`` envelope.
``data`` is JSON-serialized when given as a dict; raw strings are
forwarded unchanged so we can also emit ``:keepalive`` comment lines.
"""
payload = data if isinstance(data, str) else json.dumps(data, ensure_ascii=False)
return f"event: {event}\nid: {event_id}\ndata: {payload}\n\n"
def _stream_inspector_events(app_model: App, run_id: UUID) -> Iterator[str]:
"""Yield SSE-framed strings for one workflow run.
The stream begins with a full ``snapshot`` event so the client has a
starting state without needing a separate REST GET. Then for every
``node_changed`` message from the pub/sub channel we re-read that node
from DB and push a fresh ``node_changed`` event. When the workflow run
reaches a terminal state we push one final ``workflow_run_completed``
event and close the stream.
Failures inside the loop are caught and surfaced as ``error`` events so
the frontend can show a banner rather than seeing the connection drop
silently. The Inspector never raises across the SSE boundary.
"""
service = _service()
run_id_str = str(run_id)
# Initial snapshot — also flushes a 404 back at the client right away
# if the run is gone (raised before yielding any bytes, so Flask turns it
# into the normal HTTP 404 path).
try:
snapshot = service.snapshot_workflow_run(app_model=app_model, workflow_run_id=run_id_str)
except NodeOutputInspectorError as error:
raise _InspectorNotFound(error) from error
event_id = 0
yield _sse_envelope("snapshot", snapshot.model_dump(mode="json"), event_id)
# If the run already finished by the time the client connected, emit
# the terminal envelope synchronously and close — no point subscribing.
# The enum value for partial success is the hyphenated ``partial-succeeded``
# (graphon.enums.WorkflowExecutionStatus), not ``partial_succeeded``.
if snapshot.workflow_run_status.value in {"succeeded", "failed", "stopped", "partial-succeeded"}:
event_id += 1
yield _sse_envelope(
"workflow_run_completed",
{"workflow_run_id": run_id_str, "workflow_run_status": snapshot.workflow_run_status.value},
event_id,
)
return
# Live subscription
ticks_since_heartbeat = 0
total_ticks = 0
for message in inspector_events.subscribe(run_id_str, timeout_seconds=1.0):
total_ticks += 1
if total_ticks > _STREAM_HARD_TIMEOUT_TICKS:
logger.warning(
"Inspector SSE: forcing close after %ds without terminal event for run %s",
_STREAM_HARD_TIMEOUT_TICKS,
run_id_str,
)
return
# Heartbeat sentinel — ``inspector_events.subscribe`` synthesizes a
# ``node_changed`` message with both fields ``None`` on every redis
# timeout. Real ``workflow_completed`` messages keep their kind even
# when status couldn't be resolved (publisher race), so checking kind
# first makes the heartbeat branch safe.
if message.kind == "node_changed" and message.node_id is None and message.status is None:
ticks_since_heartbeat += 1
if ticks_since_heartbeat >= _HEARTBEAT_EVERY_TICKS:
yield ":keepalive\n\n"
ticks_since_heartbeat = 0
continue
ticks_since_heartbeat = 0
if message.kind == "workflow_completed":
event_id += 1
yield _sse_envelope(
"workflow_run_completed",
{"workflow_run_id": run_id_str, "workflow_run_status": message.status or "unknown"},
event_id,
)
return
# node_changed: recompute the node slice from DB
if not message.node_id:
continue
try:
node_view = service.node_detail(
app_model=app_model,
workflow_run_id=run_id_str,
node_id=message.node_id,
)
except NodeOutputInspectorError:
# Node may not appear in the graph yet (race with persistence); skip.
continue
except Exception:
logger.warning(
"Inspector SSE: node_detail failed for run %s node %s",
run_id_str,
message.node_id,
exc_info=True,
)
event_id += 1
yield _sse_envelope(
"error",
{"node_id": message.node_id, "message": "failed to refresh node detail"},
event_id,
)
continue
event_id += 1
yield _sse_envelope("node_changed", node_view.model_dump(mode="json"), event_id)
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/runs/<uuid:run_id>/node-outputs/events")
class WorkflowDraftRunNodeOutputEventsApi(Resource):
"""SSE stream of inspector deltas for a draft run."""
@console_ns.doc("stream_workflow_draft_run_node_output_events")
@console_ns.doc(description="Server-Sent Events stream of inspector deltas for a draft workflow run.")
@console_ns.doc(params={"app_id": "Application ID", "run_id": "Workflow run ID"})
@console_ns.response(404, "Workflow run not found")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, run_id: UUID):
return Response(
_stream_inspector_events(app_model, run_id),
mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
)
# ──────────────────────────────────────────────────────────────────────────────
# Published-run endpoints — symmetric to the draft trio above
# ──────────────────────────────────────────────────────────────────────────────
@console_ns.route("/apps/<uuid:app_id>/workflows/published/runs/<uuid:run_id>/node-outputs")
class WorkflowPublishedRunNodeOutputsApi(Resource):
"""Whole-run snapshot for a *published* workflow run.
Same response shape as the ``/draft/`` variant — frontend can multiplex
based on which page (Composer test-run vs. Run History) is mounted.
"""
@console_ns.doc("get_workflow_published_run_node_outputs")
@console_ns.doc(description="Snapshot of every node's declared outputs for a published workflow run.")
@console_ns.doc(params={"app_id": "Application ID", "run_id": "Workflow run ID"})
@console_ns.response(404, "Workflow run not found")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, run_id: UUID):
return _serve_snapshot(app_model, run_id)
@console_ns.route("/apps/<uuid:app_id>/workflows/published/runs/<uuid:run_id>/node-outputs/<string:node_id>")
class WorkflowPublishedRunNodeOutputDetailApi(Resource):
"""One node's declared outputs + per-output status (published run)."""
@console_ns.doc("get_workflow_published_run_node_output_detail")
@console_ns.doc(description="One node's declared outputs for a published workflow run.")
@console_ns.doc(
params={
"app_id": "Application ID",
"run_id": "Workflow run ID",
"node_id": "Node ID inside the workflow graph",
}
)
@console_ns.response(404, "Workflow run / node not found")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, run_id: UUID, node_id: str):
return _serve_node_detail(app_model, run_id, node_id)
@console_ns.route(
"/apps/<uuid:app_id>/workflows/published/runs/<uuid:run_id>"
"/node-outputs/<string:node_id>/<string:output_name>/preview"
)
class WorkflowPublishedRunNodeOutputPreviewApi(Resource):
"""Full value for one declared output of a published run."""
@console_ns.doc("get_workflow_published_run_node_output_preview")
@console_ns.doc(description="Full value for one declared output of a published run.")
@console_ns.doc(
params={
"app_id": "Application ID",
"run_id": "Workflow run ID",
"node_id": "Node ID inside the workflow graph",
"output_name": "Declared output name as exposed by Composer",
}
)
@console_ns.response(404, "Workflow run / node / output not found")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, run_id: UUID, node_id: str, output_name: str):
return _serve_output_preview(app_model, run_id, node_id, output_name)
@console_ns.route("/apps/<uuid:app_id>/workflows/published/runs/<uuid:run_id>/node-outputs/events")
class WorkflowPublishedRunNodeOutputEventsApi(Resource):
"""SSE stream of inspector deltas for a published run."""
@console_ns.doc("stream_workflow_published_run_node_output_events")
@console_ns.doc(description="Server-Sent Events stream of inspector deltas for a published workflow run.")
@console_ns.doc(params={"app_id": "Application ID", "run_id": "Workflow run ID"})
@console_ns.response(404, "Workflow run not found")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, run_id: UUID):
return Response(
_stream_inspector_events(app_model, run_id),
mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
)

View File

@ -21,14 +21,13 @@ from controllers.console.explore.error import (
NotCompletionAppError,
)
from controllers.console.explore.wraps import InstalledAppResource
from controllers.console.wraps import with_current_user
from core.app.entities.app_invoke_entities import InvokeFrom
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from fields.conversation_fields import ResultResponse
from fields.message_fields import MessageInfiniteScrollPagination, MessageListItem, SuggestedQuestionsResponse
from graphon.model_runtime.errors.invoke import InvokeError
from libs import helper
from models import Account
from libs.login import current_account_with_tenant
from models.enums import FeedbackRating
from models.model import AppMode
from services.app_generate_service import AppGenerateService
@ -60,8 +59,8 @@ register_response_schema_models(console_ns, ResultResponse, SuggestedQuestionsRe
)
class MessageListApi(InstalledAppResource):
@console_ns.expect(console_ns.models[MessageListQuery.__name__])
@with_current_user
def get(self, current_user: Account, installed_app):
def get(self, installed_app):
current_user, _ = current_account_with_tenant()
app_model = installed_app.app
app_mode = AppMode.value_of(app_model.mode)
@ -97,8 +96,8 @@ class MessageListApi(InstalledAppResource):
class MessageFeedbackApi(InstalledAppResource):
@console_ns.expect(console_ns.models[MessageFeedbackPayload.__name__])
@console_ns.response(200, "Feedback submitted successfully", console_ns.models[ResultResponse.__name__])
@with_current_user
def post(self, current_user: Account, installed_app, message_id: UUID):
def post(self, installed_app, message_id: UUID):
current_user, _ = current_account_with_tenant()
app_model = installed_app.app
message_id_str = str(message_id)
@ -125,8 +124,8 @@ class MessageFeedbackApi(InstalledAppResource):
)
class MessageMoreLikeThisApi(InstalledAppResource):
@console_ns.expect(console_ns.models[MoreLikeThisQuery.__name__])
@with_current_user
def get(self, current_user: Account, installed_app, message_id: UUID):
def get(self, installed_app, message_id: UUID):
current_user, _ = current_account_with_tenant()
app_model = installed_app.app
if app_model.mode != "completion":
raise NotCompletionAppError()
@ -171,8 +170,8 @@ class MessageMoreLikeThisApi(InstalledAppResource):
)
class MessageSuggestedQuestionApi(InstalledAppResource):
@console_ns.response(200, "Success", console_ns.models[SuggestedQuestionsResponse.__name__])
@with_current_user
def get(self, current_user: Account, installed_app, message_id: UUID):
def get(self, installed_app, message_id: UUID):
current_user, _ = current_account_with_tenant()
app_model = installed_app.app
app_mode = AppMode.value_of(app_model.mode)
if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT}:

View File

@ -9,14 +9,14 @@ from pydantic import BaseModel, Field, TypeAdapter, field_validator
from constants import HIDDEN_VALUE
from fields.base import ResponseModel
from libs.helper import to_timestamp
from libs.login import login_required
from libs.login import current_account_with_tenant, login_required
from models.api_based_extension import APIBasedExtension
from services.api_based_extension_service import APIBasedExtensionService
from services.code_based_extension_service import CodeBasedExtensionService
from ..common.schema import DEFAULT_REF_TEMPLATE_SWAGGER_2_0, register_schema_models
from . import console_ns
from .wraps import account_initialization_required, setup_required, with_current_tenant_id
from .wraps import account_initialization_required, setup_required
class CodeBasedExtensionQuery(BaseModel):
@ -116,11 +116,11 @@ class APIBasedExtensionAPI(Resource):
@setup_required
@login_required
@account_initialization_required
@with_current_tenant_id
def get(self, current_tenant_id: str):
def get(self):
_, tenant_id = current_account_with_tenant()
return [
_serialize_api_based_extension(extension)
for extension in APIBasedExtensionService.get_all_by_tenant_id(current_tenant_id)
for extension in APIBasedExtensionService.get_all_by_tenant_id(tenant_id)
]
@console_ns.doc("create_api_based_extension")
@ -130,9 +130,9 @@ class APIBasedExtensionAPI(Resource):
@setup_required
@login_required
@account_initialization_required
@with_current_tenant_id
def post(self, current_tenant_id: str):
def post(self):
payload = APIBasedExtensionPayload.model_validate(console_ns.payload or {})
_, current_tenant_id = current_account_with_tenant()
extension_data = APIBasedExtension(
tenant_id=current_tenant_id,
@ -153,12 +153,12 @@ class APIBasedExtensionDetailAPI(Resource):
@setup_required
@login_required
@account_initialization_required
@with_current_tenant_id
def get(self, current_tenant_id: str, id: UUID):
def get(self, id: UUID):
api_based_extension_id = str(id)
_, tenant_id = current_account_with_tenant()
return _serialize_api_based_extension(
APIBasedExtensionService.get_with_tenant_id(current_tenant_id, api_based_extension_id)
APIBasedExtensionService.get_with_tenant_id(tenant_id, api_based_extension_id)
)
@console_ns.doc("update_api_based_extension")
@ -169,9 +169,9 @@ class APIBasedExtensionDetailAPI(Resource):
@setup_required
@login_required
@account_initialization_required
@with_current_tenant_id
def post(self, current_tenant_id: str, id: UUID):
def post(self, id: UUID):
api_based_extension_id = str(id)
_, current_tenant_id = current_account_with_tenant()
extension_data_from_db = APIBasedExtensionService.get_with_tenant_id(current_tenant_id, api_based_extension_id)
@ -197,9 +197,9 @@ class APIBasedExtensionDetailAPI(Resource):
@setup_required
@login_required
@account_initialization_required
@with_current_tenant_id
def delete(self, current_tenant_id: str, id: UUID):
def delete(self, id: UUID):
api_based_extension_id = str(id)
_, current_tenant_id = current_account_with_tenant()
extension_data_from_db = APIBasedExtensionService.get_with_tenant_id(current_tenant_id, api_based_extension_id)

View File

@ -2,11 +2,11 @@ from flask_restx import Resource
from werkzeug.exceptions import Unauthorized
from controllers.common.schema import register_response_schema_models
from libs.login import current_user, login_required
from libs.login import current_account_with_tenant, current_user, login_required
from services.feature_service import FeatureModel, FeatureService, LimitationModel, SystemFeatureModel
from . import console_ns
from .wraps import account_initialization_required, cloud_utm_record, setup_required, with_current_tenant_id
from .wraps import account_initialization_required, cloud_utm_record, setup_required
register_response_schema_models(console_ns, FeatureModel, LimitationModel, SystemFeatureModel)
@ -24,9 +24,10 @@ class FeatureApi(Resource):
@login_required
@account_initialization_required
@cloud_utm_record
@with_current_tenant_id
def get(self, current_tenant_id: str):
def get(self):
"""Get feature configuration for current tenant"""
_, current_tenant_id = current_account_with_tenant()
payload = FeatureService.get_features(
current_tenant_id,
exclude_vector_space=True,
@ -48,9 +49,10 @@ class FeatureVectorSpaceApi(Resource):
@login_required
@account_initialization_required
@cloud_utm_record
@with_current_tenant_id
def get(self, current_tenant_id: str):
def get(self):
"""Get vector-space usage and limit for current tenant"""
_, current_tenant_id = current_account_with_tenant()
return FeatureService.get_vector_space(current_tenant_id).model_dump()

View File

@ -22,13 +22,10 @@ from controllers.console.wraps import (
account_initialization_required,
cloud_edition_billing_resource_check,
setup_required,
with_current_tenant_id,
with_current_user,
)
from extensions.ext_database import db
from fields.file_fields import FileResponse, UploadConfig
from libs.login import login_required
from models.account import Account
from libs.login import current_account_with_tenant, login_required
from services.file_service import FileService
from . import console_ns
@ -65,8 +62,8 @@ class FileApi(Resource):
@account_initialization_required
@cloud_edition_billing_resource_check("documents")
@console_ns.response(201, "File uploaded successfully", console_ns.models[FileResponse.__name__])
@with_current_user
def post(self, current_user: Account):
def post(self):
current_user, _ = current_account_with_tenant()
source_str = request.form.get("source")
source: Literal["datasets"] | None = "datasets" if source_str == "datasets" else None
@ -110,10 +107,10 @@ class FilePreviewApi(Resource):
@login_required
@account_initialization_required
@console_ns.response(200, "Success", console_ns.models[TextContentResponse.__name__])
@with_current_tenant_id
def get(self, current_tenant_id: str, file_id: UUID):
def get(self, file_id: UUID):
file_id_str = str(file_id)
text = FileService(db.engine).get_file_preview(file_id_str, current_tenant_id)
_, tenant_id = current_account_with_tenant()
text = FileService(db.engine).get_file_preview(file_id_str, tenant_id)
return {"content": text}

View File

@ -12,13 +12,11 @@ from controllers.common.errors import (
)
from controllers.common.schema import register_response_schema_models, register_schema_models
from controllers.console import console_ns
from controllers.console.wraps import with_current_user
from core.helper import ssrf_proxy
from extensions.ext_database import db
from fields.file_fields import FileWithSignedUrl, RemoteFileInfo
from graphon.file import helpers as file_helpers
from libs.login import login_required
from models.account import Account
from libs.login import current_account_with_tenant, login_required
from services.file_service import FileService
@ -51,8 +49,7 @@ class RemoteFileUpload(Resource):
@console_ns.expect(console_ns.models[RemoteFileUploadPayload.__name__])
@console_ns.response(201, "File uploaded successfully", console_ns.models[FileWithSignedUrl.__name__])
@login_required
@with_current_user
def post(self, current_user: Account):
def post(self):
payload = RemoteFileUploadPayload.model_validate(console_ns.payload)
url = payload.url
@ -77,11 +74,12 @@ class RemoteFileUpload(Resource):
content = resp.content if resp.request.method == "GET" else ssrf_proxy.get(url).content
try:
user, _ = current_account_with_tenant()
upload_file = FileService(db.engine).upload_file(
filename=file_info.filename,
content=content,
mimetype=file_info.mimetype,
user=current_user,
user=user,
source_url=url,
)
except services.errors.file.FileTooLargeError as file_too_large_error:

View File

@ -47,12 +47,6 @@ from graphon.graph_events import (
)
from graphon.node_events import NodeRunResult
from libs.datetime_utils import naive_utc_now
from services.workflow.inspector_events import (
publish_node_changed as _inspector_publish_node_changed,
)
from services.workflow.inspector_events import (
publish_workflow_completed as _inspector_publish_workflow_completed,
)
@dataclass(slots=True)
@ -169,7 +163,6 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
self._workflow_execution_repository.save(execution)
self._enqueue_trace_task(execution)
_inspector_publish_workflow_completed(workflow_run_id=execution.id_, status=str(execution.status.value))
def _handle_graph_run_partial_succeeded(self, event: GraphRunPartialSucceededEvent) -> None:
execution = self._get_workflow_execution()
@ -180,7 +173,6 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
self._workflow_execution_repository.save(execution)
self._enqueue_trace_task(execution)
_inspector_publish_workflow_completed(workflow_run_id=execution.id_, status=str(execution.status.value))
def _handle_graph_run_failed(self, event: GraphRunFailedEvent) -> None:
execution = self._get_workflow_execution()
@ -192,7 +184,6 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
self._fail_running_node_executions(error_message=event.error)
self._workflow_execution_repository.save(execution)
self._enqueue_trace_task(execution)
_inspector_publish_workflow_completed(workflow_run_id=execution.id_, status=str(execution.status.value))
def _handle_graph_run_aborted(self, event: GraphRunAbortedEvent) -> None:
execution = self._get_workflow_execution()
@ -203,7 +194,6 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
self._fail_running_node_executions(error_message=execution.error_message or "")
self._workflow_execution_repository.save(execution)
self._enqueue_trace_task(execution)
_inspector_publish_workflow_completed(workflow_run_id=execution.id_, status=str(execution.status.value))
def _handle_graph_run_paused(self, event: GraphRunPausedEvent) -> None:
execution = self._get_workflow_execution()
@ -251,7 +241,6 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
created_at=event.start_at,
)
self._node_snapshots[event.id] = snapshot
_inspector_publish_node_changed(workflow_run_id=execution.id_, node_id=event.node_id, status="running")
def _handle_node_retry(self, event: NodeRunRetryEvent) -> None:
domain_execution = self._get_node_execution(event.id)
@ -259,11 +248,6 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
domain_execution.error = event.error
self._workflow_node_execution_repository.save(domain_execution)
self._workflow_node_execution_repository.save_execution_data(domain_execution)
_inspector_publish_node_changed(
workflow_run_id=self._get_workflow_execution().id_,
node_id=domain_execution.node_id,
status="retry",
)
def _handle_node_succeeded(self, event: NodeRunSucceededEvent) -> None:
domain_execution = self._get_node_execution(event.id)
@ -273,11 +257,6 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
WorkflowNodeExecutionStatus.SUCCEEDED,
finished_at=event.finished_at,
)
_inspector_publish_node_changed(
workflow_run_id=self._get_workflow_execution().id_,
node_id=domain_execution.node_id,
status="succeeded",
)
def _handle_node_failed(self, event: NodeRunFailedEvent) -> None:
domain_execution = self._get_node_execution(event.id)
@ -288,11 +267,6 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
error=event.error,
finished_at=event.finished_at,
)
_inspector_publish_node_changed(
workflow_run_id=self._get_workflow_execution().id_,
node_id=domain_execution.node_id,
status="failed",
)
def _handle_node_exception(self, event: NodeRunExceptionEvent) -> None:
domain_execution = self._get_node_execution(event.id)
@ -303,11 +277,6 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
error=event.error,
finished_at=event.finished_at,
)
_inspector_publish_node_changed(
workflow_run_id=self._get_workflow_execution().id_,
node_id=domain_execution.node_id,
status="exception",
)
def _handle_node_pause_requested(self, event: NodeRunPauseRequestedEvent) -> None:
domain_execution = self._get_node_execution(event.id)

View File

@ -1,268 +0,0 @@
from __future__ import annotations
from collections.abc import Mapping
from typing import Any, Protocol, cast
from dify_agent.layers.dify_plugin import (
DifyPluginCredentialValue,
DifyPluginToolConfig,
DifyPluginToolCredentialType,
DifyPluginToolParameter,
DifyPluginToolParameterForm,
DifyPluginToolsLayerConfig,
)
from core.agent.entities import AgentToolEntity
from core.app.entities.app_invoke_entities import InvokeFrom
from core.tools.__base.tool import Tool
from core.tools.entities.tool_entities import ToolProviderType
from core.tools.errors import (
ToolProviderCredentialValidationError,
ToolProviderNotFoundError,
)
from core.tools.tool_manager import ToolManager
from models.agent_config_entities import AgentSoulDifyToolConfig, AgentSoulToolsConfig
from models.provider_ids import ToolProviderID
class WorkflowAgentPluginToolsBuildError(ValueError):
"""Raised when Agent Soul tools cannot be prepared for Agent backend."""
def __init__(self, error_code: str, message: str) -> None:
self.error_code = error_code
super().__init__(message)
class AgentToolRuntimeProvider(Protocol):
def get_agent_tool_runtime(
self,
tenant_id: str,
app_id: str,
agent_tool: AgentToolEntity,
user_id: str | None = None,
invoke_from: InvokeFrom = InvokeFrom.DEBUGGER,
variable_pool: Any | None = None,
) -> Tool: ...
class WorkflowAgentPluginToolsBuilder:
"""Prepare Agent Soul Dify Plugin Tools for the public Agent backend DTO."""
def __init__(self, *, tool_runtime_provider: AgentToolRuntimeProvider | None = None) -> None:
self._tool_runtime_provider = tool_runtime_provider or ToolManager
def build(
self,
*,
tenant_id: str,
app_id: str,
user_id: str | None,
tools: AgentSoulToolsConfig,
invoke_from: InvokeFrom,
) -> DifyPluginToolsLayerConfig | None:
"""Resolve user-selected Dify Plugin Tools into the Agent backend DTO.
``invoke_from`` is the *real* runtime caller category (DEBUGGER for a
Composer test run, SERVICE_API / WEB_APP for a published run). It must
be threaded through to :class:`ToolManager` so credential quotas, rate
limits, and audit tags match the actual call site.
"""
enabled_tools = [tool for tool in tools.dify_tools if tool.enabled]
if not enabled_tools:
return None
prepared: list[DifyPluginToolConfig] = []
seen_names: set[str] = set()
for tool_config in enabled_tools:
agent_tool = self._to_agent_tool_entity(tool_config)
tool_runtime = self._fetch_tool_runtime(
tenant_id=tenant_id,
app_id=app_id,
user_id=user_id,
agent_tool=agent_tool,
invoke_from=invoke_from,
tool_config=tool_config,
)
exposed_name = self._exposed_tool_name(tool_config)
if exposed_name in seen_names:
raise WorkflowAgentPluginToolsBuildError(
"agent_tool_name_duplicated",
f"Duplicate Dify Plugin Tool name {exposed_name!r}.",
)
seen_names.add(exposed_name)
prepared.append(self._to_backend_tool_config(tool_config, tool_runtime, exposed_name))
return DifyPluginToolsLayerConfig(tools=prepared)
def _fetch_tool_runtime(
self,
*,
tenant_id: str,
app_id: str,
user_id: str | None,
agent_tool: AgentToolEntity,
invoke_from: InvokeFrom,
tool_config: AgentSoulDifyToolConfig,
) -> Tool:
"""Resolve the API-side ``Tool`` runtime, mapping fetch errors to
Inspector-friendly error codes so callers can render distinct UX for
"tool definition gone" vs "credential failed".
"""
try:
return self._tool_runtime_provider.get_agent_tool_runtime(
tenant_id=tenant_id,
app_id=app_id,
agent_tool=agent_tool,
user_id=user_id,
invoke_from=invoke_from,
variable_pool=None,
)
except ToolProviderNotFoundError as exc:
raise WorkflowAgentPluginToolsBuildError(
"agent_tool_declaration_not_found",
f"Dify Plugin Tool {tool_config.tool_name!r} declaration not found: {exc}",
) from exc
except ToolProviderCredentialValidationError as exc:
raise WorkflowAgentPluginToolsBuildError(
"agent_tool_credential_invalid",
f"Dify Plugin Tool {tool_config.tool_name!r} credential validation failed: {exc}",
) from exc
except ValueError as exc:
# ToolManager raises bare ValueError when the agent tool's
# ``runtime`` / runtime parameters are missing. Surface it under a
# narrower error code than a generic "declaration not found" so
# frontend can render an actionable hint.
raise WorkflowAgentPluginToolsBuildError(
"agent_tool_config_invalid",
f"Dify Plugin Tool {tool_config.tool_name!r} runtime construction failed: {exc}",
) from exc
@staticmethod
def _to_agent_tool_entity(tool_config: AgentSoulDifyToolConfig) -> AgentToolEntity:
return AgentToolEntity(
provider_type=ToolProviderType.value_of(tool_config.provider_type),
provider_id=WorkflowAgentPluginToolsBuilder._provider_id(tool_config),
tool_name=tool_config.tool_name,
tool_parameters=dict(tool_config.runtime_parameters),
credential_id=tool_config.credential_ref.id if tool_config.credential_ref else None,
)
@staticmethod
def _provider_id(tool_config: AgentSoulDifyToolConfig) -> str:
if tool_config.provider_id:
return tool_config.provider_id
assert tool_config.plugin_id is not None
assert tool_config.provider is not None
return f"{tool_config.plugin_id}/{tool_config.provider}"
@staticmethod
def _exposed_tool_name(tool_config: AgentSoulDifyToolConfig) -> str:
# Stage 3.1 decision: no user rename yet. Keep the model-visible tool
# name aligned with the plugin declaration identity.
return tool_config.tool_name
def _to_backend_tool_config(
self,
tool_config: AgentSoulDifyToolConfig,
tool_runtime: Tool,
exposed_name: str,
) -> DifyPluginToolConfig:
runtime = tool_runtime.runtime
if runtime is None:
raise WorkflowAgentPluginToolsBuildError(
"agent_tool_config_invalid",
f"Dify Plugin Tool {tool_config.tool_name!r} has no runtime.",
)
provider_id = self._provider_id(tool_config)
plugin_id, provider = self._plugin_provider(tool_config, provider_id)
parameters = [
DifyPluginToolParameter.model_validate(parameter.model_dump(mode="json"))
for parameter in tool_runtime.get_merged_runtime_parameters()
]
runtime_parameters = self._runtime_parameters(tool_runtime, parameters)
description = tool_config.description
if description is None and tool_runtime.entity.description is not None:
description = tool_runtime.entity.description.llm
return DifyPluginToolConfig(
plugin_id=plugin_id,
provider=provider,
tool_name=tool_config.tool_name,
credential_type=self._credential_type(tool_config, runtime.credentials),
name=exposed_name,
description=description,
credentials=self._normalize_credentials(runtime.credentials, tool_name=tool_config.tool_name),
runtime_parameters=runtime_parameters,
parameters=parameters,
parameters_json_schema=cast(dict[str, Any], tool_runtime.get_llm_parameters_json_schema()),
)
@staticmethod
def _plugin_provider(tool_config: AgentSoulDifyToolConfig, provider_id: str) -> tuple[str, str]:
if tool_config.plugin_id and tool_config.provider:
return tool_config.plugin_id, tool_config.provider
provider_id_entity = ToolProviderID(provider_id)
return provider_id_entity.plugin_id, provider_id_entity.provider_name
@staticmethod
def _credential_type(
tool_config: AgentSoulDifyToolConfig,
credentials: Mapping[str, Any],
) -> DifyPluginToolCredentialType:
if not credentials and tool_config.credential_type == "unauthorized":
return "unauthorized"
return tool_config.credential_type
@staticmethod
def _runtime_parameters(
tool_runtime: Tool,
parameters: list[DifyPluginToolParameter],
) -> dict[str, Any]:
runtime = tool_runtime.runtime
runtime_parameters = dict(runtime.runtime_parameters if runtime is not None else {})
missing = [
parameter.name
for parameter in parameters
if parameter.form is not DifyPluginToolParameterForm.LLM
and parameter.required
and parameter.default is None
and parameter.name not in runtime_parameters
]
if missing:
names = ", ".join(sorted(missing))
raise WorkflowAgentPluginToolsBuildError(
"agent_tool_runtime_parameter_missing",
f"Dify Plugin Tool {tool_runtime.entity.identity.name!r} is missing runtime parameters: {names}.",
)
return runtime_parameters
@staticmethod
def _normalize_credentials(
credentials: Mapping[str, Any],
*,
tool_name: str,
) -> dict[str, DifyPluginCredentialValue]:
"""Forward only scalar credential values to the Agent backend.
``DifyPluginCredentialValue`` is ``str | int | float | bool | None``.
Refusing non-scalar values (lists, dicts, custom objects) is safer than
``str(value)`` — stringifying a nested OAuth token blob produces a
Python ``repr`` that the plugin daemon cannot use, and we'd rather
surface a clear ``agent_tool_credential_shape_invalid`` than send junk.
"""
normalized: dict[str, DifyPluginCredentialValue] = {}
for key, value in credentials.items():
if isinstance(value, str | int | float | bool) or value is None:
normalized[key] = value
continue
raise WorkflowAgentPluginToolsBuildError(
"agent_tool_credential_shape_invalid",
(
f"Dify Plugin Tool {tool_name!r} credential {key!r} has a non-scalar value "
f"({type(value).__name__}); only str/int/float/bool/None are forwarded to the daemon."
),
)
return normalized

View File

@ -11,14 +11,13 @@ SUPPORTED_AGENT_BACKEND_FEATURES = frozenset(
"workflow_context",
"model",
"structured_output",
"tools.dify_tools",
}
)
RESERVED_AGENT_BACKEND_FEATURES = frozenset(
{
"skills_files",
"tools.cli_tools",
"tools",
"knowledge",
"human",
"env",
@ -33,7 +32,7 @@ def build_runtime_feature_manifest(agent_soul: AgentSoulConfig) -> dict[str, Any
warnings: list[dict[str, str]] = []
soul_dump = agent_soul.model_dump(mode="json")
for section in sorted(RESERVED_AGENT_BACKEND_FEATURES):
value = _get_nested(soul_dump, section)
value = soul_dump.get(section)
has_value = bool(value)
if isinstance(value, dict):
has_value = any(bool(item) for item in value.values())
@ -42,12 +41,11 @@ def build_runtime_feature_manifest(agent_soul: AgentSoulConfig) -> dict[str, Any
{
"section": f"agent_soul.{section}",
"code": "agent_backend_layer_not_available",
"message": f"{section} is saved in Agent Soul but is not executed by Agent backend.",
"message": f"{section} is saved in Agent Soul but is not executed by Agent backend in phase 3.",
}
)
reserved_status = dict.fromkeys(sorted(RESERVED_AGENT_BACKEND_FEATURES), "reserved_not_executed")
reserved_status["tools.dify_tools"] = "supported_when_config_valid"
return {
"supported": sorted(SUPPORTED_AGENT_BACKEND_FEATURES),
@ -55,12 +53,3 @@ def build_runtime_feature_manifest(agent_soul: AgentSoulConfig) -> dict[str, Any
"reserved_status": reserved_status,
"unsupported_runtime_warnings": warnings,
}
def _get_nested(value: dict[str, Any], path: str) -> Any:
current: Any = value
for part in path.split("."):
if not isinstance(current, dict):
return None
current = current.get(part)
return current

View File

@ -30,7 +30,6 @@ from models.agent_config_entities import (
)
from .output_failure_orchestrator import retry_idempotency_key
from .plugin_tools_builder import WorkflowAgentPluginToolsBuilder, WorkflowAgentPluginToolsBuildError
from .runtime_feature_manifest import build_runtime_feature_manifest
@ -85,11 +84,9 @@ class WorkflowAgentRuntimeRequestBuilder:
*,
credentials_provider: CredentialsProvider,
request_builder: AgentBackendRunRequestBuilder | None = None,
plugin_tools_builder: WorkflowAgentPluginToolsBuilder | None = None,
) -> None:
self._credentials_provider = credentials_provider
self._request_builder = request_builder or AgentBackendRunRequestBuilder()
self._plugin_tools_builder = plugin_tools_builder or WorkflowAgentPluginToolsBuilder()
def build(self, context: WorkflowAgentRuntimeBuildContext) -> WorkflowAgentRuntimeRequest:
agent_soul = AgentSoulConfig.model_validate(context.snapshot.config_snapshot_dict)
@ -105,26 +102,6 @@ class WorkflowAgentRuntimeRequestBuilder:
workflow_job_prompt = node_job.workflow_prompt.strip() or "Run this workflow Agent Node for the current run."
user_prompt = workflow_context_prompt.strip() or "Use the current workflow context."
credentials = self._credentials_provider.fetch(agent_soul.model.model_provider, agent_soul.model.model)
try:
tools_layer = self._plugin_tools_builder.build(
tenant_id=context.dify_context.tenant_id,
app_id=context.dify_context.app_id,
user_id=context.dify_context.user_id,
tools=agent_soul.tools,
# Thread the *real* runtime invocation source through to
# ToolManager so credential quotas, rate limits, and audit
# trails match the actual call site (DEBUGGER for draft test
# run, SERVICE_API / WEB_APP for published run).
invoke_from=context.dify_context.invoke_from,
)
except WorkflowAgentPluginToolsBuildError as error:
raise WorkflowAgentRuntimeRequestBuildError(error.error_code, str(error)) from error
if tools_layer is not None:
metadata["agent_tools"] = {
"dify_tool_count": len(tools_layer.tools),
"dify_tool_names": [tool.name or tool.tool_name for tool in tools_layer.tools],
"cli_tool_count": len(agent_soul.tools.cli_tools),
}
request = self._request_builder.build_for_workflow_node(
AgentBackendWorkflowNodeRunInput(
@ -157,7 +134,6 @@ class WorkflowAgentRuntimeRequestBuilder:
workflow_node_job_prompt=workflow_job_prompt,
user_prompt=user_prompt,
output=self._build_output_config(node_job.declared_outputs),
tools=tools_layer,
idempotency_key=self._idempotency_key(context),
metadata=metadata,
)

View File

@ -126,7 +126,6 @@ class WorkflowAgentNodeValidator:
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {binding.node_id} requires Agent Soul model config."
)
cls._validate_agent_soul_tools(binding=binding, agent_soul=agent_soul)
node_job = WorkflowNodeJobConfig.model_validate(binding.node_job_config_dict)
cls.validate_node_job(session=session, binding=binding, node_job=node_job, topology=topology)
@ -281,26 +280,6 @@ class WorkflowAgentNodeValidator:
f"Workflow Agent node {binding.node_id} references unsupported human contact channel {channel}."
)
@classmethod
def _validate_agent_soul_tools(
cls,
*,
binding: WorkflowAgentNodeBinding,
agent_soul: AgentSoulConfig,
) -> None:
exposed_names: set[str] = set()
for tool in agent_soul.tools.dify_tools:
if not tool.enabled:
continue
exposed_name = tool.tool_name
if exposed_name in exposed_names:
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {binding.node_id} has duplicate Dify Plugin Tool name {exposed_name}."
)
exposed_names.add(exposed_name)
# CLI tools remain saved-but-not-executed. They are allowed at publish
# time so existing Agent Soul drafts are not blocked by a reserved field.
@staticmethod
def _validate_file_ref(
*,

View File

@ -12,6 +12,7 @@ def init_app(app: DifyApp):
clear_orphaned_file_records,
convert_to_agent_apps,
create_tenant,
data_migrate,
delete_archived_workflow_runs,
export_app_messages,
extract_plugins,
@ -44,6 +45,7 @@ def init_app(app: DifyApp):
convert_to_agent_apps,
add_qdrant_index,
create_tenant,
data_migrate,
upgrade_db,
fix_app_site_missing,
migrate_data_for_plugin,

View File

@ -1,6 +1,6 @@
import re
from enum import StrEnum
from typing import Any, Final, Literal
from typing import Any, Final
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
@ -50,90 +50,8 @@ class AgentSoulSkillsFilesConfig(BaseModel):
skills: list[dict[str, Any]] = Field(default_factory=list)
class AgentSoulDifyToolCredentialRef(BaseModel):
"""Reference to a stored Dify Plugin Tool credential.
Secret values are resolved only at runtime. The legacy ``credential_id``
field is accepted by :class:`AgentSoulDifyToolConfig` and normalized here so
old Agent tool payloads can be read while new payloads stay explicit.
"""
model_config = ConfigDict(extra="ignore")
type: Literal["provider", "tool"] = "tool"
id: str | None = Field(default=None, max_length=255)
provider: str | None = Field(default=None, max_length=255)
class AgentSoulDifyToolConfig(BaseModel):
"""One Dify Plugin Tool configured on Agent Soul.
The API backend prepares this persisted product shape into
``DifyPluginToolConfig`` before sending a run request to Agent backend.
``provider_id`` keeps compatibility with existing Agent tool config payloads;
new callers should send ``plugin_id`` + ``provider`` when available.
"""
# ``extra="ignore"`` (not ``"allow"``) so historical Agent Soul payloads
# with unknown fields still load — but the extra keys are dropped instead
# of silently riding along into ``model_dump``. New callers should send the
# explicit schema fields below.
model_config = ConfigDict(extra="ignore")
enabled: bool = True
# Dify Plugin Tools live behind the ``PLUGIN`` provider type. ``BUILT_IN`` /
# ``WORKFLOW`` / ``API`` providers are not exposed to the Agent backend in
# this layer — keep the default narrow so a missing field surfaces as
# ``agent_tool_declaration_not_found`` against the correct provider table.
provider_type: str = "plugin"
provider_id: str | None = Field(default=None, max_length=255)
plugin_id: str | None = Field(default=None, max_length=255)
provider: str | None = Field(default=None, max_length=255)
tool_name: str = Field(min_length=1, max_length=255)
credential_type: Literal["api-key", "oauth2", "unauthorized"] = "api-key"
credential_ref: AgentSoulDifyToolCredentialRef | None = None
# Reserved for a future user-rename UX. Accepted but currently rejected at
# validation time so frontend cannot silently believe a rename took effect
# (see :meth:`_validate_provider_and_credentials`).
name: str | None = Field(default=None, max_length=255)
description: str | None = None
runtime_parameters: dict[str, Any] = Field(default_factory=dict)
@model_validator(mode="before")
@classmethod
def _normalize_legacy_payload(cls, value: Any) -> Any:
if not isinstance(value, dict):
return value
normalized = dict(value)
if normalized.get("provider_id") is None and isinstance(normalized.get("provider_name"), str):
normalized["provider_id"] = normalized["provider_name"]
if normalized.get("runtime_parameters") is None and isinstance(normalized.get("tool_parameters"), dict):
normalized["runtime_parameters"] = normalized["tool_parameters"]
if normalized.get("credential_ref") is None and normalized.get("credential_id"):
normalized["credential_ref"] = {
"type": "tool",
"id": normalized.get("credential_id"),
"provider": normalized.get("provider_id") or normalized.get("provider"),
}
return normalized
@model_validator(mode="after")
def _validate_provider_and_credentials(self) -> "AgentSoulDifyToolConfig":
if not self.provider_id and not (self.plugin_id and self.provider):
raise ValueError("Dify tool requires provider_id or plugin_id + provider")
if self.credential_type != "unauthorized" and (self.credential_ref is None or not self.credential_ref.id):
raise ValueError("credential_ref.id is required for credentialed Dify tools")
# ``name`` is reserved for a future user-rename UX. Until that lands
# the model-visible name is forced to match ``tool_name``; reject
# explicit values so a frontend bug surfaces immediately instead of
# producing a silently-ignored override.
if self.name is not None and self.name != self.tool_name:
raise ValueError("name override is not yet supported; omit ``name`` or set it equal to ``tool_name``.")
return self
class AgentSoulToolsConfig(BaseModel):
dify_tools: list[AgentSoulDifyToolConfig] = Field(default_factory=list)
dify_tools: list[dict[str, Any]] = Field(default_factory=list)
cli_tools: list[dict[str, Any]] = Field(default_factory=list)

View File

@ -3447,89 +3447,6 @@ Run draft workflow
| 200 | Draft workflow run started successfully |
| 403 | Permission denied |
### /apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs
#### GET
##### Description
Snapshot of every node's declared outputs for a draft workflow run.
##### Parameters
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ------ |
| app_id | path | Application ID | Yes | string |
| run_id | path | Workflow run ID | Yes | string |
##### Responses
| Code | Description |
| ---- | ----------- |
| 404 | Workflow run not found |
### /apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/events
#### GET
##### Description
Server-Sent Events stream of inspector deltas for a draft workflow run.
##### Parameters
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ------ |
| app_id | path | Application ID | Yes | string |
| run_id | path | Workflow run ID | Yes | string |
##### Responses
| Code | Description |
| ---- | ----------- |
| 404 | Workflow run not found |
### /apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/{node_id}
#### GET
##### Description
One node's declared outputs for a draft workflow run.
##### Parameters
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ------ |
| app_id | path | Application ID | Yes | string |
| node_id | path | Node ID inside the workflow graph | Yes | string |
| run_id | path | Workflow run ID | Yes | string |
##### Responses
| Code | Description |
| ---- | ----------- |
| 404 | Workflow run / node not found |
### /apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/{node_id}/{output_name}/preview
#### GET
##### Description
Full value for one declared output, including signed download URL for files.
##### Parameters
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ------ |
| app_id | path | Application ID | Yes | string |
| node_id | path | Node ID inside the workflow graph | Yes | string |
| output_name | path | Declared output name as exposed by Composer | Yes | string |
| run_id | path | Workflow run ID | Yes | string |
##### Responses
| Code | Description |
| ---- | ----------- |
| 404 | Workflow run / node / output not found |
### /apps/{app_id}/workflows/draft/system-variables
#### GET
@ -3767,89 +3684,6 @@ Publish workflow
| ---- | ----------- |
| 200 | Success |
### /apps/{app_id}/workflows/published/runs/{run_id}/node-outputs
#### GET
##### Description
Snapshot of every node's declared outputs for a published workflow run.
##### Parameters
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ------ |
| app_id | path | Application ID | Yes | string |
| run_id | path | Workflow run ID | Yes | string |
##### Responses
| Code | Description |
| ---- | ----------- |
| 404 | Workflow run not found |
### /apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/events
#### GET
##### Description
Server-Sent Events stream of inspector deltas for a published workflow run.
##### Parameters
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ------ |
| app_id | path | Application ID | Yes | string |
| run_id | path | Workflow run ID | Yes | string |
##### Responses
| Code | Description |
| ---- | ----------- |
| 404 | Workflow run not found |
### /apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/{node_id}
#### GET
##### Description
One node's declared outputs for a published workflow run.
##### Parameters
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ------ |
| app_id | path | Application ID | Yes | string |
| node_id | path | Node ID inside the workflow graph | Yes | string |
| run_id | path | Workflow run ID | Yes | string |
##### Responses
| Code | Description |
| ---- | ----------- |
| 404 | Workflow run / node not found |
### /apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/{node_id}/{output_name}/preview
#### GET
##### Description
Full value for one declared output of a published run.
##### Parameters
| Name | Located in | Description | Required | Schema |
| ---- | ---------- | ----------- | -------- | ------ |
| app_id | path | Application ID | Yes | string |
| node_id | path | Node ID inside the workflow graph | Yes | string |
| output_name | path | Declared output name as exposed by Composer | Yes | string |
| run_id | path | Workflow run ID | Yes | string |
##### Responses
| Code | Description |
| ---- | ----------- |
| 404 | Workflow run / node / output not found |
### /apps/{app_id}/workflows/triggers/webhook
#### GET
@ -10705,43 +10539,6 @@ Supported icon storage formats for Agent roster entries.
| skills_files | [AgentSoulSkillsFilesConfig](#agentsoulskillsfilesconfig) | | No |
| tools | [AgentSoulToolsConfig](#agentsoultoolsconfig) | | No |
#### AgentSoulDifyToolConfig
One Dify Plugin Tool configured on Agent Soul.
The API backend prepares this persisted product shape into
``DifyPluginToolConfig`` before sending a run request to Agent backend.
``provider_id`` keeps compatibility with existing Agent tool config payloads;
new callers should send ``plugin_id`` + ``provider`` when available.
| Name | Type | Description | Required |
| ---- | ---- | ----------- | -------- |
| credential_ref | [AgentSoulDifyToolCredentialRef](#agentsouldifytoolcredentialref) | | No |
| credential_type | string | *Enum:* `"api-key"`, `"oauth2"`, `"unauthorized"` | No |
| description | string | | No |
| enabled | boolean | | No |
| name | string | | No |
| plugin_id | string | | No |
| provider | string | | No |
| provider_id | string | | No |
| provider_type | string | | No |
| runtime_parameters | object | | No |
| tool_name | string | | Yes |
#### AgentSoulDifyToolCredentialRef
Reference to a stored Dify Plugin Tool credential.
Secret values are resolved only at runtime. The legacy ``credential_id``
field is accepted by :class:`AgentSoulDifyToolConfig` and normalized here so
old Agent tool payloads can be read while new payloads stay explicit.
| Name | Type | Description | Required |
| ---- | ---- | ----------- | -------- |
| id | string | | No |
| provider | string | | No |
| type | string | *Enum:* `"provider"`, `"tool"` | No |
#### AgentSoulEnvConfig
| Name | Type | Description | Required |
@ -10819,7 +10616,7 @@ Reference to model credentials resolved only at runtime.
| Name | Type | Description | Required |
| ---- | ---- | ----------- | -------- |
| cli_tools | [ object ] | | No |
| dify_tools | [ [AgentSoulDifyToolConfig](#agentsouldifytoolconfig) ] | | No |
| dify_tools | [ object ] | | No |
#### AgentThought

View File

@ -6,7 +6,7 @@ requires-python = "~=3.12.0"
dependencies = [
# Legacy: mature and widely deployed
"bleach>=6.3.0,<7.0.0",
"boto3>=1.43.14,<2.0.0",
"boto3>=1.43.10,<2.0.0",
"celery>=5.6.3,<6.0.0",
"croniter>=6.2.2,<7.0.0",
"dify-agent",
@ -102,10 +102,7 @@ dify-trace-weave = { workspace = true }
[tool.uv]
default-groups = ["storage", "tools", "vdb-all", "trace-all"]
package = false
override-dependencies = [
"litellm>=1.83.10,<2.0.0",
"pyarrow>=23.0.1,<24.0.0",
]
override-dependencies = ["litellm>=1.83.10,<2.0.0", "pyarrow>=23.0.1,<24.0.0"]
[dependency-groups]

File diff suppressed because it is too large Load Diff

View File

@ -1,194 +0,0 @@
"""Inspector pub/sub fanout for live workflow run updates (Stage 4 §8.5).
The Node Output Inspector exposes a Server-Sent Events stream alongside its
three REST endpoints so the frontend can render per-output progress without
DB polling. This module owns the redis pub/sub channel that connects the two
sides:
* :func:`publish_node_changed` / :func:`publish_workflow_completed` —
invoked by :class:`core.app.workflow.layers.persistence.WorkflowPersistenceLayer`
at the very end of each handler, after the DB write has already
succeeded. Publish failures are swallowed so the engine never trips on a
flaky redis connection.
* :func:`subscribe` — async iterator the SSE endpoint consumes.
Channel layout
--------------
``dify:inspector:workflow_run:{workflow_run_id}``
One channel per workflow run; the SSE endpoint subscribes for the lifetime of
the run and unsubscribes on the terminal event. Multiple clients can attach
to the same run safely — redis pub/sub fans every message out to every
listener.
The message envelope intentionally carries only the *delta* needed to invalidate
a slice of the inspector view; the SSE handler re-reads the canonical
``WorkflowNodeExecutionModel`` row from the DB so we never serialize stale
state across the wire. This means messages stay tiny (~150 bytes) and the
inspector view stays consistent even if a publisher races persistence.
Decision D-5: the on-wire SSE envelope ``{event, data, id}`` is shared with
the babysit chat stream; this module only emits the *internal* pub/sub
message — the SSE controller turns it into the public envelope.
"""
from __future__ import annotations
import json
import logging
from collections.abc import Iterator
from dataclasses import asdict, dataclass
from typing import Final, Literal
from extensions.ext_redis import redis_client
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────────────────────────────────────
# Channel naming
# ──────────────────────────────────────────────────────────────────────────────
_CHANNEL_PREFIX: Final = "dify:inspector:workflow_run"
def channel_for(workflow_run_id: str) -> str:
"""Return the pub/sub channel name for ``workflow_run_id``.
Kept as a module-level helper so tests can pin the channel without
reaching into the publish/subscribe code paths.
"""
return f"{_CHANNEL_PREFIX}:{workflow_run_id}"
# ──────────────────────────────────────────────────────────────────────────────
# Message envelope
# ──────────────────────────────────────────────────────────────────────────────
#: Tags discriminating the wire-level message kinds. Kept narrow so the SSE
#: controller can pattern-match exhaustively.
InspectorMessageKind = Literal["node_changed", "workflow_completed"]
@dataclass(frozen=True, slots=True)
class InspectorMessage:
"""Minimal delta carried across the pub/sub channel.
``node_id`` is set only for ``node_changed`` messages; ``status`` is the
coarse string status straight from the persistence layer (``"running"`` /
``"succeeded"`` / ``"failed"`` for nodes, plus ``"succeeded"`` /
``"failed"`` / ``"partial_succeeded"`` / ``"stopped"`` for workflow runs).
"""
kind: InspectorMessageKind
workflow_run_id: str
node_id: str | None = None
status: str | None = None
def to_json(self) -> str:
return json.dumps(asdict(self), ensure_ascii=False)
@classmethod
def from_json(cls, blob: str) -> InspectorMessage | None:
"""Decode a payload, returning ``None`` for any shape we can't trust."""
try:
decoded = json.loads(blob)
except (json.JSONDecodeError, TypeError):
return None
if not isinstance(decoded, dict):
return None
kind = decoded.get("kind")
if kind not in ("node_changed", "workflow_completed"):
return None
workflow_run_id = decoded.get("workflow_run_id")
if not isinstance(workflow_run_id, str) or not workflow_run_id:
return None
node_id = decoded.get("node_id")
if node_id is not None and not isinstance(node_id, str):
return None
status = decoded.get("status")
if status is not None and not isinstance(status, str):
return None
return cls(kind=kind, workflow_run_id=workflow_run_id, node_id=node_id, status=status)
# ──────────────────────────────────────────────────────────────────────────────
# Publisher (called from the persistence layer)
# ──────────────────────────────────────────────────────────────────────────────
def _publish(message: InspectorMessage) -> None:
"""Best-effort fire-and-forget publish.
Persistence runs inside the workflow engine thread; we never want a redis
glitch to break the workflow. Any exception is logged at debug level so
operators still see them when they grep, but the engine keeps running.
"""
try:
redis_client.publish(channel_for(message.workflow_run_id), message.to_json())
except Exception:
logger.debug("InspectorEventPublisher: publish failed for %s", message.workflow_run_id, exc_info=True)
def publish_node_changed(*, workflow_run_id: str, node_id: str, status: str) -> None:
"""Announce that one node's execution row just changed.
The SSE handler will recompute the node slice from the DB on receipt.
"""
_publish(InspectorMessage(kind="node_changed", workflow_run_id=workflow_run_id, node_id=node_id, status=status))
def publish_workflow_completed(*, workflow_run_id: str, status: str) -> None:
"""Announce that the workflow run reached a terminal state.
The SSE handler emits one last envelope and disconnects.
"""
_publish(InspectorMessage(kind="workflow_completed", workflow_run_id=workflow_run_id, status=status))
# ──────────────────────────────────────────────────────────────────────────────
# Subscriber (consumed by the SSE controller)
# ──────────────────────────────────────────────────────────────────────────────
def subscribe(workflow_run_id: str, *, timeout_seconds: float = 1.0) -> Iterator[InspectorMessage]:
"""Yield ``InspectorMessage`` instances until the consumer abandons us.
The loop polls redis with ``timeout_seconds`` so the SSE handler can
interleave keepalive heartbeats. Yields ``None`` on timeout so the caller
can decide whether to keep blocking; malformed payloads are silently
skipped.
The pub/sub connection is closed when the iterator is garbage-collected
(the wrapping ``finally`` releases it as soon as the SSE handler exits).
"""
pubsub = redis_client.pubsub()
pubsub.subscribe(channel_for(workflow_run_id))
try:
while True:
raw = pubsub.get_message(ignore_subscribe_messages=True, timeout=timeout_seconds)
if raw is None:
# Surface a heartbeat tick — caller can keep-alive or check
# disconnection without blocking redis any longer.
yield InspectorMessage(kind="node_changed", workflow_run_id=workflow_run_id, node_id=None, status=None)
continue
data = raw.get("data") if isinstance(raw, dict) else None
if isinstance(data, bytes):
data = data.decode("utf-8", errors="replace")
if not isinstance(data, str):
continue
message = InspectorMessage.from_json(data)
if message is None:
continue
yield message
finally:
try:
pubsub.unsubscribe(channel_for(workflow_run_id))
pubsub.close()
except Exception:
logger.debug(
"InspectorEventPublisher: pubsub teardown failed for %s",
workflow_run_id,
exc_info=True,
)

View File

@ -1,712 +0,0 @@
"""Node Output Inspector service (Stage 4 §8).
PRD §Node Output Inspector renames every workflow "Variable" to a "Node Output"
and re-organizes the panel by **producer node** rather than consumer node. This
service backs the new REST surface
``/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs[/...]`` with three
read-only views:
* :meth:`snapshot_workflow_run` — every node + its declared outputs + per-output
status, for one debug workflow run.
* :meth:`node_detail` — the same shape filtered down to one node.
* :meth:`output_preview` — full payload for one output, with signed download
URL when the output references an upload file.
Design constraints baked into this version:
1. **No new tables** (§8.1). Topology comes from ``WorkflowRun.graph`` (the
graph snapshot taken at execution time so the view stays consistent even
if the draft was edited mid-run). Execution facts come from
``WorkflowNodeExecutionModel`` rows already produced by the workflow
runtime.
2. **Draft + published runs** (decision D-1 lifted 2026-05-26). The Inspector
accepts ``WorkflowRunTriggeredFrom.DEBUGGING`` (draft test runs) as well as
``APP_RUN`` / ``WEBHOOK`` / ``SCHEDULE`` / ``PLUGIN`` / ``RAG_PIPELINE_*``
triggers; the underlying graph + executions are uniform across all of them.
Cross-tenant / cross-app rows still 404 via the standard tenant/app scope.
3. **Declared outputs by node kind**:
* Agent v2 nodes resolve their declared list via
:class:`WorkflowAgentBindingResolver` (the binding owns the canonical
``DeclaredOutputConfig`` list and falls back to PRD defaults when empty).
* Other node kinds don't have a declared-output schema yet; we surface the
keys present in the execution payload as a best-effort list typed
``unknown`` so the panel can still render them.
4. **Per-output status** is derived from the metadata the agent_v2 stack
already records (``output_type_check`` and ``output_check`` blobs); falling
back to the node's overall status when those signals aren't present.
5. **SSE stream** (design §8.5) lives in
:mod:`controllers.console.app.workflow_node_output_inspector` alongside the
REST endpoints. The Inspector and the babysit chat SSE share the
``{event, data, id}`` envelope per decision D-5.
"""
from __future__ import annotations
import json
import logging
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from datetime import datetime
from enum import StrEnum
from typing import Any
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy import select
from core.db.session_factory import session_factory
from core.workflow.nodes.agent_v2.binding_resolver import (
WorkflowAgentBindingError,
WorkflowAgentBindingResolver,
)
from core.workflow.nodes.agent_v2.runtime_request_builder import (
WorkflowAgentRuntimeRequestBuilder,
)
from graphon.enums import (
BuiltinNodeTypes,
WorkflowExecutionStatus,
WorkflowNodeExecutionStatus,
)
from graphon.file import helpers as file_helpers
from models import App
from models.agent_config_entities import DeclaredOutputConfig, DeclaredOutputType
from models.workflow import WorkflowNodeExecutionModel, WorkflowRun
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────────────────────────────────────
# Public dataclasses / enums (Pydantic — these go straight on the wire)
# ──────────────────────────────────────────────────────────────────────────────
class NodeOutputStatus(StrEnum):
"""Lifecycle status of a single declared output within a run."""
PENDING = "pending" # node not started
RUNNING = "running" # node running, output not ready yet
READY = "ready"
TYPE_CHECK_FAILED = "type_check_failed"
OUTPUT_CHECK_FAILED = "output_check_failed"
NOT_PRODUCED = "not_produced" # node succeeded but did not produce this declared output
FAILED = "failed" # node itself failed/exception/stopped
class NodeStatus(StrEnum):
"""Coarse node-level status used by Inspector to pick a banner."""
IDLE = "idle"
RUNNING = "running"
READY = "ready"
FAILED = "failed"
class CheckResultView(BaseModel):
"""``type_check`` / ``output_check`` per-output summary block."""
model_config = ConfigDict(extra="forbid")
passed: bool
reason: str | None = None
class NodeOutputView(BaseModel):
model_config = ConfigDict(extra="forbid")
name: str
type: DeclaredOutputType | None = None
status: NodeOutputStatus
value_preview: Any = None
type_check: CheckResultView | None = None
output_check: CheckResultView | None = None
retried: int = 0
class NodeOutputsView(BaseModel):
model_config = ConfigDict(extra="forbid")
node_id: str
node_kind: str
node_display_name: str
node_status: NodeStatus
node_started_at: datetime | None = None
node_completed_at: datetime | None = None
outputs: list[NodeOutputView] = Field(default_factory=list)
class WorkflowRunSnapshotView(BaseModel):
model_config = ConfigDict(extra="forbid")
workflow_run_id: str
workflow_run_status: WorkflowExecutionStatus
node_outputs: list[NodeOutputsView] = Field(default_factory=list)
class OutputPreviewView(BaseModel):
model_config = ConfigDict(extra="forbid")
node_id: str
output_name: str
type: DeclaredOutputType | None = None
status: NodeOutputStatus
value: Any = None # full value (with signed URL for file refs)
class NodeOutputInspectorError(Exception):
"""Raised when a request cannot be served (404-level conditions)."""
def __init__(self, code: str, message: str) -> None:
super().__init__(message)
self.code = code
# ──────────────────────────────────────────────────────────────────────────────
# Internal helpers — declared outputs per node
# ──────────────────────────────────────────────────────────────────────────────
@dataclass(frozen=True, slots=True)
class _ResolvedDeclaration:
"""Declared output the Inspector should expose, with a normalized type.
``inferred`` is ``True`` when the node kind has no declared-output schema
(we derived the list from the execution payload). The frontend can use
this to dim the type column.
"""
name: str
declared_type: DeclaredOutputType | None
inferred: bool
def _is_agent_v2_node(node: Mapping[str, Any]) -> bool:
"""A graph node is Agent v2 iff its ``data.type`` is the AGENT builtin
AND its ``data.version`` is ``"2"``.
``BuiltinNodeTypes.AGENT`` is a ``ClassVar[NodeType]`` (plain string), not
a StrEnum, so we compare against it directly without ``.value``.
"""
data = node.get("data") or {}
if not isinstance(data, Mapping):
return False
if data.get("type") != BuiltinNodeTypes.AGENT:
return False
return str(data.get("version", "")) == "2"
def _graph_nodes(workflow_run: WorkflowRun) -> list[Mapping[str, Any]]:
"""Parse ``WorkflowRun.graph`` (LongText JSON) into a node list.
The graph blob may be missing / unparseable for very old runs; we treat
that as "no nodes" rather than failing the Inspector, so the panel still
renders an empty state.
"""
if not workflow_run.graph:
return []
try:
parsed = json.loads(workflow_run.graph)
except (json.JSONDecodeError, TypeError):
logger.warning("NodeOutputInspector: workflow_run %s has unparseable graph blob", workflow_run.id)
return []
if not isinstance(parsed, Mapping):
return []
nodes = parsed.get("nodes")
if not isinstance(nodes, list):
return []
return [n for n in nodes if isinstance(n, Mapping) and "id" in n]
# ──────────────────────────────────────────────────────────────────────────────
# Internal helpers — per-output status derivation
# ──────────────────────────────────────────────────────────────────────────────
def _decode_json_blob(blob: str | None) -> Mapping[str, Any] | None:
if not blob:
return None
try:
decoded = json.loads(blob)
except (json.JSONDecodeError, TypeError):
return None
if not isinstance(decoded, Mapping):
return None
return decoded
def _node_status_for(execution: WorkflowNodeExecutionModel | None) -> NodeStatus:
if execution is None:
return NodeStatus.IDLE
if execution.status == WorkflowNodeExecutionStatus.RUNNING:
return NodeStatus.RUNNING
if execution.status == WorkflowNodeExecutionStatus.SUCCEEDED:
return NodeStatus.READY
return NodeStatus.FAILED
def _type_check_by_name(metadata: Mapping[str, Any] | None) -> dict[str, Mapping[str, Any]]:
if not metadata:
return {}
block = metadata.get("output_type_check")
if not isinstance(block, Mapping):
return {}
results = block.get("results") or []
if not isinstance(results, list):
return {}
indexed: dict[str, Mapping[str, Any]] = {}
for r in results:
if isinstance(r, Mapping) and isinstance(r.get("name"), str):
indexed[r["name"]] = r
return indexed
def _output_check_by_name(metadata: Mapping[str, Any] | None) -> dict[str, Mapping[str, Any]]:
if not metadata:
return {}
block = metadata.get("output_check")
if not isinstance(block, Mapping):
return {}
results = block.get("results") or []
if not isinstance(results, list):
return {}
indexed: dict[str, Mapping[str, Any]] = {}
for r in results:
if isinstance(r, Mapping) and isinstance(r.get("name"), str):
indexed[r["name"]] = r
return indexed
def _retried_attempt_count(metadata: Mapping[str, Any] | None) -> int:
"""The agent_v2 runtime records the final attempt index in metadata.
``attempt`` is 0-indexed so a single execution with no retry has
``attempt == 0`` and a Inspector-friendly ``retried == 0``.
"""
if not metadata:
return 0
attempt = metadata.get("attempt")
if isinstance(attempt, int) and attempt > 0:
return attempt
return 0
# ──────────────────────────────────────────────────────────────────────────────
# Value preview (file refs get signed URLs)
# ──────────────────────────────────────────────────────────────────────────────
_PREVIEW_TEXT_LIMIT = 500
_FILE_ID_KEYS: tuple[str, ...] = ("file_id", "upload_file_id", "tool_file_id")
def _looks_like_file_ref(value: Any) -> str | None:
"""Return the resolved ``file_id`` when ``value`` is a file-shaped dict."""
if not isinstance(value, Mapping):
return None
for key in _FILE_ID_KEYS:
candidate = value.get(key)
if isinstance(candidate, str) and candidate:
return candidate
return None
def _value_preview(value: Any) -> Any:
"""Compact preview suitable for the snapshot endpoint.
File refs are augmented with a signed download URL so the panel can render
a thumbnail / link without a second round-trip; long strings are truncated;
other scalar / dict / list shapes are returned as-is (the Pydantic layer
enforces JSON-safety on serialization).
"""
file_id = _looks_like_file_ref(value)
if file_id:
assert isinstance(value, Mapping)
try:
preview_url = file_helpers.get_signed_file_url(upload_file_id=file_id)
except Exception:
logger.warning("NodeOutputInspector: signed URL failed for file_id=%s", file_id, exc_info=True)
preview_url = None
return {**dict(value), "preview_url": preview_url}
if isinstance(value, str) and len(value) > _PREVIEW_TEXT_LIMIT:
return value[:_PREVIEW_TEXT_LIMIT] + ""
return value
def _full_value(value: Any) -> Any:
"""Same shape as :func:`_value_preview` minus the truncation."""
file_id = _looks_like_file_ref(value)
if file_id:
assert isinstance(value, Mapping)
try:
preview_url = file_helpers.get_signed_file_url(upload_file_id=file_id)
except Exception:
logger.warning("NodeOutputInspector: signed URL failed for file_id=%s", file_id, exc_info=True)
preview_url = None
return {**dict(value), "preview_url": preview_url}
return value
# ──────────────────────────────────────────────────────────────────────────────
# Service
# ──────────────────────────────────────────────────────────────────────────────
class NodeOutputInspectorService:
"""Read-only Inspector for draft + published workflow runs.
The service is dependency-light: it holds a single
:class:`WorkflowAgentBindingResolver` so agent v2 nodes can map to their
declared outputs without re-implementing binding lookup. All other I/O
uses the global session factory so workflow runs / executions stay on the
repo-default code path.
Tenancy is enforced via ``app_model.tenant_id`` + ``app_model.id`` on
every load — the same scope guard regardless of trigger source.
"""
def __init__(self, binding_resolver: WorkflowAgentBindingResolver | None = None) -> None:
self._binding_resolver = binding_resolver or WorkflowAgentBindingResolver()
# ── public API ────────────────────────────────────────────────────────
def snapshot_workflow_run(self, *, app_model: App, workflow_run_id: str) -> WorkflowRunSnapshotView:
"""Build the per-node snapshot for one debug workflow run."""
workflow_run, executions = self._load_run_and_executions(app_model=app_model, workflow_run_id=workflow_run_id)
executions_by_node = self._index_executions_by_node(executions)
graph_nodes = _graph_nodes(workflow_run)
node_views: list[NodeOutputsView] = []
for raw_node in graph_nodes:
node_id = str(raw_node["id"])
execution = executions_by_node.get(node_id)
view = self._build_node_view(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
workflow_id=workflow_run.workflow_id,
raw_node=raw_node,
execution=execution,
)
node_views.append(view)
return WorkflowRunSnapshotView(
workflow_run_id=workflow_run.id,
workflow_run_status=workflow_run.status,
node_outputs=node_views,
)
def node_detail(self, *, app_model: App, workflow_run_id: str, node_id: str) -> NodeOutputsView:
"""Per-node Inspector entry — returns one ``NodeOutputsView``."""
workflow_run, executions = self._load_run_and_executions(app_model=app_model, workflow_run_id=workflow_run_id)
graph_nodes = _graph_nodes(workflow_run)
raw_node = next((n for n in graph_nodes if str(n.get("id")) == node_id), None)
if raw_node is None:
raise NodeOutputInspectorError(
"node_not_in_workflow_run",
f"Node {node_id!r} does not appear in workflow run {workflow_run_id!r}.",
)
execution = self._index_executions_by_node(executions).get(node_id)
return self._build_node_view(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
workflow_id=workflow_run.workflow_id,
raw_node=raw_node,
execution=execution,
)
def output_preview(
self,
*,
app_model: App,
workflow_run_id: str,
node_id: str,
output_name: str,
) -> OutputPreviewView:
"""Full payload for one declared output (with signed file URL)."""
detail = self.node_detail(
app_model=app_model,
workflow_run_id=workflow_run_id,
node_id=node_id,
)
view = next((o for o in detail.outputs if o.name == output_name), None)
if view is None:
raise NodeOutputInspectorError(
"node_output_not_declared",
f"Output {output_name!r} is not declared on node {node_id!r}.",
)
# ``node_detail`` already produced a truncated value_preview; reload
# the raw value from the execution payload so the preview endpoint can
# return the full thing (still wrapped through ``_full_value`` for
# signed file URLs).
execution = self._index_executions_by_node(
self._load_run_and_executions(app_model=app_model, workflow_run_id=workflow_run_id)[1]
).get(node_id)
full_value: Any = None
if execution is not None:
outputs = _decode_json_blob(execution.outputs) or {}
if output_name in outputs:
full_value = _full_value(outputs[output_name])
return OutputPreviewView(
node_id=node_id,
output_name=output_name,
type=view.type,
status=view.status,
value=full_value,
)
# ── DB loading ────────────────────────────────────────────────────────
def _load_run_and_executions(
self, *, app_model: App, workflow_run_id: str
) -> tuple[WorkflowRun, Sequence[WorkflowNodeExecutionModel]]:
"""Fetch the ``WorkflowRun`` row + every execution that belongs to it.
Enforces:
* row exists,
* row belongs to the app's tenant + app.
The trigger source (DEBUGGING vs. APP_RUN / WEBHOOK / SCHEDULE / ...) is
deliberately not checked here — D-1 was lifted 2026-05-26 and the
Inspector now serves both draft and published runs.
"""
with session_factory.create_session() as session:
workflow_run = session.scalar(
select(WorkflowRun).where(
WorkflowRun.id == workflow_run_id,
WorkflowRun.app_id == app_model.id,
WorkflowRun.tenant_id == app_model.tenant_id,
)
)
if workflow_run is None:
raise NodeOutputInspectorError("workflow_run_not_found", "Workflow run not found.")
executions = session.scalars(
select(WorkflowNodeExecutionModel).where(
WorkflowNodeExecutionModel.workflow_run_id == workflow_run_id,
WorkflowNodeExecutionModel.tenant_id == app_model.tenant_id,
WorkflowNodeExecutionModel.app_id == app_model.id,
)
).all()
return workflow_run, executions
@staticmethod
def _index_executions_by_node(
executions: Sequence[WorkflowNodeExecutionModel],
) -> dict[str, WorkflowNodeExecutionModel]:
"""Keep the latest execution per ``node_id``.
A given node may have multiple rows when retries or iterations occur;
``index`` is the per-run sequence counter, so we keep the one with
the highest index as the canonical "current" view.
"""
latest: dict[str, WorkflowNodeExecutionModel] = {}
for execution in executions:
existing = latest.get(execution.node_id)
if existing is None or execution.index > existing.index:
latest[execution.node_id] = execution
return latest
# ── Per-node view construction ────────────────────────────────────────
def _build_node_view(
self,
*,
tenant_id: str,
app_id: str,
workflow_id: str,
raw_node: Mapping[str, Any],
execution: WorkflowNodeExecutionModel | None,
) -> NodeOutputsView:
node_id = str(raw_node["id"])
data = raw_node.get("data") or {}
if not isinstance(data, Mapping):
data = {}
node_kind = str(data.get("type") or (execution.node_type if execution else "") or "unknown")
display_name = str(data.get("title") or (execution.title if execution else node_id))
node_status = _node_status_for(execution)
declarations = self._resolve_declared_outputs(
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
node_id=node_id,
raw_node=raw_node,
execution=execution,
)
outputs_dict = _decode_json_blob(execution.outputs) if execution else None
metadata_dict = _decode_json_blob(execution.execution_metadata) if execution else None
type_check_by_name = _type_check_by_name(metadata_dict)
output_check_by_name = _output_check_by_name(metadata_dict)
retried = _retried_attempt_count(metadata_dict)
output_views: list[NodeOutputView] = []
for declaration in declarations:
output_views.append(
self._build_output_view(
declaration=declaration,
node_status=node_status,
outputs_dict=outputs_dict,
type_check_by_name=type_check_by_name,
output_check_by_name=output_check_by_name,
retried=retried,
)
)
return NodeOutputsView(
node_id=node_id,
node_kind=node_kind,
node_display_name=display_name,
node_status=node_status,
node_started_at=execution.created_at if execution else None,
node_completed_at=execution.finished_at if execution else None,
outputs=output_views,
)
def _build_output_view(
self,
*,
declaration: _ResolvedDeclaration,
node_status: NodeStatus,
outputs_dict: Mapping[str, Any] | None,
type_check_by_name: Mapping[str, Mapping[str, Any]],
output_check_by_name: Mapping[str, Mapping[str, Any]],
retried: int,
) -> NodeOutputView:
name = declaration.name
declared_type = declaration.declared_type
if node_status == NodeStatus.IDLE:
return NodeOutputView(
name=name,
type=declared_type,
status=NodeOutputStatus.PENDING,
retried=retried,
)
if node_status == NodeStatus.RUNNING:
return NodeOutputView(
name=name,
type=declared_type,
status=NodeOutputStatus.RUNNING,
retried=retried,
)
if node_status == NodeStatus.FAILED:
return NodeOutputView(
name=name,
type=declared_type,
status=NodeOutputStatus.FAILED,
retried=retried,
)
# ── node succeeded ────────────────────────────────────────────
type_check_result = type_check_by_name.get(name)
output_check_result = output_check_by_name.get(name)
type_check_view = self._coerce_check_view(type_check_result)
output_check_view = self._coerce_check_view(output_check_result)
# type check loses first; output check next; otherwise ready.
status: NodeOutputStatus
if type_check_result and not _is_passing(type_check_result):
status = NodeOutputStatus.TYPE_CHECK_FAILED
elif output_check_result and not _is_passing(output_check_result):
status = NodeOutputStatus.OUTPUT_CHECK_FAILED
elif outputs_dict is not None and name not in outputs_dict:
status = NodeOutputStatus.NOT_PRODUCED
else:
status = NodeOutputStatus.READY
value_preview = _value_preview(outputs_dict.get(name)) if outputs_dict and name in outputs_dict else None
return NodeOutputView(
name=name,
type=declared_type,
status=status,
value_preview=value_preview,
type_check=type_check_view,
output_check=output_check_view,
retried=retried,
)
@staticmethod
def _coerce_check_view(result: Mapping[str, Any] | None) -> CheckResultView | None:
if not result:
return None
# type_check rows use ``status``; output_check rows use ``status`` too —
# both record per-output state. We treat ``status == "ready"``/"passed"
# as passing and everything else as failing, so the view stays
# stable regardless of which producer wrote the metadata.
return CheckResultView(passed=_is_passing(result), reason=result.get("reason"))
# ── Declared-output resolution ────────────────────────────────────────
def _resolve_declared_outputs(
self,
*,
tenant_id: str,
app_id: str,
workflow_id: str,
node_id: str,
raw_node: Mapping[str, Any],
execution: WorkflowNodeExecutionModel | None,
) -> list[_ResolvedDeclaration]:
if _is_agent_v2_node(raw_node):
agent_decl = self._declared_outputs_for_agent_v2(
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
node_id=node_id,
)
if agent_decl is not None:
return [_ResolvedDeclaration(name=o.name, declared_type=o.type, inferred=False) for o in agent_decl]
# Non-agent (or agent-binding-missing) fall back to inferring from the
# produced payload so the Inspector still has something to show.
return self._infer_outputs_from_payload(execution=execution)
def _declared_outputs_for_agent_v2(
self,
*,
tenant_id: str,
app_id: str,
workflow_id: str,
node_id: str,
) -> list[DeclaredOutputConfig] | None:
try:
bundle = self._binding_resolver.resolve(
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
node_id=node_id,
)
except WorkflowAgentBindingError:
return None
try:
from models.agent_config_entities import WorkflowNodeJobConfig
node_job = WorkflowNodeJobConfig.model_validate(bundle.binding.node_job_config_dict)
except Exception:
logger.warning(
"NodeOutputInspector: malformed node_job_config for binding %s", bundle.binding.id, exc_info=True
)
return None
return list(WorkflowAgentRuntimeRequestBuilder.effective_declared_outputs(list(node_job.declared_outputs)))
@staticmethod
def _infer_outputs_from_payload(*, execution: WorkflowNodeExecutionModel | None) -> list[_ResolvedDeclaration]:
if execution is None:
return []
outputs = _decode_json_blob(execution.outputs)
if not outputs:
return []
return [_ResolvedDeclaration(name=name, declared_type=None, inferred=True) for name in outputs]
def _is_passing(result: Mapping[str, Any]) -> bool:
"""A check-result row is "passing" when its ``status`` is the ready/passed
sentinel emitted by the type-checker / output-check executor."""
status = result.get("status")
if status in {"ready", "passed", "not_produced"}:
return True
return False

View File

@ -0,0 +1 @@
"""Shared test helpers for backend migration tests."""

View File

@ -0,0 +1,379 @@
from __future__ import annotations
import json
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta
from uuid import uuid4
import sqlalchemy as sa
from sqlalchemy.engine import Engine
from models.account import Tenant
from models.enums import CredentialSourceType
from models.provider import (
LoadBalancingModelConfig,
ProviderModel,
ProviderModelCredential,
ProviderModelSetting,
TenantDefaultModel,
)
LEGACY_TO_CANONICAL: dict[str, str] = {
"text-generation": "llm",
"embeddings": "text-embedding",
"reranking": "rerank",
}
UNCHANGED_MODEL_TYPES: tuple[str, ...] = ("speech2text", "moderation", "tts")
ALL_TABLE_NAMES: tuple[str, ...] = (
ProviderModel.__tablename__,
TenantDefaultModel.__tablename__,
ProviderModelSetting.__tablename__,
LoadBalancingModelConfig.__tablename__,
ProviderModelCredential.__tablename__,
)
DEFAULT_PRIMARY_TENANT_ID = "00000000-0000-0000-0000-000000000101"
DEFAULT_SECONDARY_TENANT_ID = "00000000-0000-0000-0000-000000000202"
@dataclass(frozen=True, slots=True)
class DirtyTenantFixture:
tenant_id: str
winner_credential_id: str
loser_credential_id: str
distinct_credential_id: str
provider_model_id: str
load_balancing_config_id: str
winner_load_balancing_config_id: str
provider_model_setting_id: str
tenant_default_model_id: str
embedding_provider_model_id: str
embedding_setting_id: str
loser_credential_name: str
distinct_credential_name: str
loser_encrypted_config: str
winner_encrypted_config: str
@dataclass(frozen=True, slots=True)
class DirtyDataFixture:
primary: DirtyTenantFixture
secondary: DirtyTenantFixture
def create_minimal_legacy_model_type_schema(engine: Engine) -> None:
metadata = Tenant.__table__.metadata
metadata.create_all(
engine,
tables=[
Tenant.__table__,
ProviderModel.__table__,
TenantDefaultModel.__table__,
ProviderModelSetting.__table__,
LoadBalancingModelConfig.__table__,
ProviderModelCredential.__table__,
],
checkfirst=True,
)
def drop_minimal_legacy_model_type_schema(engine: Engine) -> None:
metadata = Tenant.__table__.metadata
metadata.drop_all(
engine,
tables=[
LoadBalancingModelConfig.__table__,
ProviderModelSetting.__table__,
TenantDefaultModel.__table__,
ProviderModel.__table__,
ProviderModelCredential.__table__,
Tenant.__table__,
],
checkfirst=True,
)
def seed_legacy_model_type_dirty_data(
engine: Engine,
*,
primary_tenant_id: str = DEFAULT_PRIMARY_TENANT_ID,
secondary_tenant_id: str = DEFAULT_SECONDARY_TENANT_ID,
) -> DirtyDataFixture:
create_minimal_legacy_model_type_schema(engine)
primary = _seed_tenant(engine, tenant_id=primary_tenant_id, provider_name="openai")
secondary = _seed_tenant(engine, tenant_id=secondary_tenant_id, provider_name="openai")
return DirtyDataFixture(primary=primary, secondary=secondary)
def snapshot_legacy_model_type_state(engine: Engine) -> dict[str, list[dict[str, object]]]:
snapshots: dict[str, list[dict[str, object]]] = {}
for table_name in ALL_TABLE_NAMES:
snapshots[table_name] = fetch_table_rows(engine, table_name)
return snapshots
def fetch_table_rows(
engine: Engine,
table_name: str,
*,
tenant_id: str | None = None,
) -> list[dict[str, object]]:
sql = f"SELECT * FROM {table_name}"
params: dict[str, object] = {}
if tenant_id is not None:
sql += " WHERE tenant_id = :tenant_id"
params["tenant_id"] = tenant_id
sql += " ORDER BY id ASC"
with engine.begin() as conn:
rows = conn.execute(sa.text(sql), params).mappings().all()
result: list[dict[str, object]] = []
for row in rows:
normalized = dict(row)
for key, value in normalized.items():
if isinstance(value, datetime):
normalized[key] = value.isoformat()
elif isinstance(value, uuid.UUID):
normalized[key] = str(value)
result.append(normalized)
return result
def fetch_model_types_for_tenant(engine: Engine, table_name: str, tenant_id: str) -> list[str]:
rows = fetch_table_rows(engine, table_name, tenant_id=tenant_id)
return [str(row["model_type"]) for row in rows]
def assert_tenant_rows_use_only_canonical_model_types(engine: Engine, tenant_id: str) -> None:
for table_name in ALL_TABLE_NAMES:
model_types = fetch_model_types_for_tenant(engine, table_name, tenant_id)
assert set(model_types) <= set(LEGACY_TO_CANONICAL.values()) | set(UNCHANGED_MODEL_TYPES), (
table_name,
model_types,
)
def count_rows(engine: Engine, table_name: str, *, tenant_id: str) -> int:
with engine.begin() as conn:
stmt = sa.text(f"SELECT COUNT(*) FROM {table_name} WHERE tenant_id = :tenant_id")
return int(conn.execute(stmt, {"tenant_id": tenant_id}).scalar_one())
def _seed_tenant(engine: Engine, *, tenant_id: str, provider_name: str) -> DirtyTenantFixture:
now = datetime(2025, 1, 1, 12, 0, 0)
winner_credential_id = str(uuid4())
loser_credential_id = str(uuid4())
distinct_credential_id = str(uuid4())
provider_model_id = str(uuid4())
load_balancing_config_id = str(uuid4())
provider_model_setting_id = str(uuid4())
tenant_default_model_id = str(uuid4())
embedding_provider_model_id = str(uuid4())
embedding_setting_id = str(uuid4())
loser_credential_name = f"{tenant_id}-shared"
distinct_credential_name = f"{tenant_id}-distinct"
winner_encrypted_config = json.dumps({"api_key": f"{tenant_id}-winner"})
loser_encrypted_config = json.dumps({"api_key": f"{tenant_id}-loser"})
distinct_encrypted_config = json.dumps({"api_key": f"{tenant_id}-distinct"})
with engine.begin() as conn:
conn.execute(
Tenant.__table__.insert().values(
id=tenant_id,
name=f"Tenant {tenant_id}",
plan="basic",
status="normal",
)
)
conn.execute(
sa.text(
"""
INSERT INTO provider_model_credentials
(
id, tenant_id, provider_name, model_name,
model_type, credential_name, encrypted_config,
created_at, updated_at
)
VALUES
(
:winner_id, :tenant_id, :provider_name, 'gpt-4o-mini',
'llm', :shared_name, :winner_config,
:created_at, :winner_updated_at
),
(
:loser_id, :tenant_id, :provider_name, 'gpt-4o-mini',
'text-generation', :shared_name, :loser_config,
:created_at, :loser_updated_at
),
(
:distinct_id, :tenant_id, :provider_name, 'gpt-4o-mini',
'text-generation', :distinct_name, :distinct_config,
:created_at, :distinct_updated_at
)
"""
),
{
"winner_id": winner_credential_id,
"loser_id": loser_credential_id,
"distinct_id": distinct_credential_id,
"tenant_id": tenant_id,
"provider_name": provider_name,
"shared_name": loser_credential_name,
"distinct_name": distinct_credential_name,
"winner_config": winner_encrypted_config,
"loser_config": loser_encrypted_config,
"distinct_config": distinct_encrypted_config,
"created_at": now - timedelta(days=2),
"winner_updated_at": now,
"loser_updated_at": now - timedelta(days=1),
"distinct_updated_at": now - timedelta(hours=12),
},
)
conn.execute(
sa.text(
"""
INSERT INTO provider_models
(
id, tenant_id, provider_name, model_name,
model_type, credential_id, is_valid,
created_at, updated_at
)
VALUES
(
:provider_model_id, :tenant_id, :provider_name, 'gpt-4o-mini',
'text-generation', :loser_id, :is_valid,
:created_at, :updated_at
),
(
:embedding_provider_model_id, :tenant_id, :provider_name, 'text-embedding-3-large',
'embeddings', NULL, :is_valid,
:created_at, :updated_at
)
"""
),
{
"provider_model_id": provider_model_id,
"embedding_provider_model_id": embedding_provider_model_id,
"tenant_id": tenant_id,
"provider_name": provider_name,
"loser_id": loser_credential_id,
"is_valid": True,
"created_at": now - timedelta(days=2),
"updated_at": now - timedelta(hours=6),
},
)
conn.execute(
sa.text(
"""
INSERT INTO tenant_default_models
(id, tenant_id, provider_name, model_name, model_type, created_at, updated_at)
VALUES
(
:tenant_default_model_id, :tenant_id, :provider_name, 'gpt-4o-mini',
'text-generation', :created_at, :updated_at
)
"""
),
{
"tenant_default_model_id": tenant_default_model_id,
"tenant_id": tenant_id,
"provider_name": provider_name,
"created_at": now - timedelta(days=2),
"updated_at": now - timedelta(hours=4),
},
)
conn.execute(
sa.text(
"""
INSERT INTO provider_model_settings
(
id, tenant_id, provider_name, model_name,
model_type, enabled, load_balancing_enabled,
created_at, updated_at
)
VALUES
(
:provider_model_setting_id, :tenant_id, :provider_name, 'gpt-4o-mini',
'text-generation', :enabled, :load_balancing_enabled,
:created_at, :updated_at
),
(
:embedding_setting_id, :tenant_id, :provider_name, 'text-embedding-3-large',
'embeddings', :enabled, :embedding_load_balancing_enabled,
:created_at, :updated_at
)
"""
),
{
"provider_model_setting_id": provider_model_setting_id,
"embedding_setting_id": embedding_setting_id,
"tenant_id": tenant_id,
"provider_name": provider_name,
"enabled": True,
"load_balancing_enabled": True,
"embedding_load_balancing_enabled": False,
"created_at": now - timedelta(days=2),
"updated_at": now - timedelta(hours=3),
},
)
winner_load_balancing_config_id = str(uuid4())
conn.execute(
sa.text(
"""
INSERT INTO load_balancing_model_configs
(
id, tenant_id, provider_name, model_name, model_type,
name, encrypted_config, credential_id, credential_source_type,
enabled, created_at, updated_at
)
VALUES
(
:load_balancing_config_id, :tenant_id, :provider_name, 'gpt-4o-mini', 'text-generation',
:lb_name, :loser_config, :loser_id, :credential_source_type,
:enabled, :created_at, :updated_at
),
(
:lb_winner_id, :tenant_id, :provider_name, 'gpt-4o-mini', 'text-generation',
:winner_name, :winner_config, :winner_cred_id, :credential_source_type,
:enabled, :created_at, :winner_updated_at
)
"""
),
{
"load_balancing_config_id": load_balancing_config_id,
"lb_winner_id": winner_load_balancing_config_id,
"tenant_id": tenant_id,
"provider_name": provider_name,
"lb_name": loser_credential_name,
"loser_config": loser_encrypted_config,
"loser_id": loser_credential_id,
"winner_name": f"{tenant_id}-winner-lb",
"winner_config": winner_encrypted_config,
"winner_cred_id": winner_credential_id,
"credential_source_type": CredentialSourceType.CUSTOM_MODEL.value,
"enabled": True,
"created_at": now - timedelta(days=2),
"updated_at": now - timedelta(hours=2),
"winner_updated_at": now - timedelta(hours=1),
},
)
return DirtyTenantFixture(
tenant_id=tenant_id,
winner_credential_id=winner_credential_id,
loser_credential_id=loser_credential_id,
distinct_credential_id=distinct_credential_id,
provider_model_id=provider_model_id,
load_balancing_config_id=load_balancing_config_id,
winner_load_balancing_config_id=winner_load_balancing_config_id,
provider_model_setting_id=provider_model_setting_id,
tenant_default_model_id=tenant_default_model_id,
embedding_provider_model_id=embedding_provider_model_id,
embedding_setting_id=embedding_setting_id,
loser_credential_name=loser_credential_name,
distinct_credential_name=distinct_credential_name,
loser_encrypted_config=loser_encrypted_config,
winner_encrypted_config=winner_encrypted_config,
)

View File

@ -1,475 +0,0 @@
"""End-to-end tests for ``NodeOutputInspectorService`` (Stage 4 §8 / ENG-373).
These integration tests exercise the service against a real Postgres
(``dify-db-1``) — same pattern as :mod:`test_remove_app_and_related_data_task`:
seed rows via ``session_factory.create_session()`` with explicit commits,
exercise the service, clean up by ID at teardown.
Coverage:
1. Snapshot for a draft run with one agent v2 node + one tool node
2. Type-check failure visible in snapshot
3. Output-check failure visible in snapshot
4. Published run returns ``published_run_inspector_not_implemented``
5. Cross-tenant access returns ``workflow_run_not_found``
6. File output preview endpoint returns full value with signed URL
7. ``node_detail`` path serves a single node view
"""
from __future__ import annotations
import json
import uuid
from collections.abc import Generator
from datetime import UTC, datetime
from types import SimpleNamespace
from typing import Any
from unittest.mock import patch
import pytest
from sqlalchemy import delete
from core.db.session_factory import session_factory
from graphon.enums import WorkflowExecutionStatus, WorkflowNodeExecutionStatus
from models.enums import CreatorUserRole, WorkflowRunTriggeredFrom
from models.workflow import (
WorkflowNodeExecutionModel,
WorkflowNodeExecutionTriggeredFrom,
WorkflowRun,
WorkflowType,
)
from services.workflow.node_output_inspector_service import (
NodeOutputInspectorError,
NodeOutputInspectorService,
NodeOutputStatus,
NodeStatus,
)
@pytest.fixture
def fake_app_model() -> SimpleNamespace:
"""Lightweight stand-in for the ``App`` model that the service consumes.
``App`` is only read for ``id`` and ``tenant_id``; the service does not
poke at any ORM relationship so a SimpleNamespace is enough — and it
keeps us free of needing the ``apps`` row to actually exist (which would
drag in Account / Tenant setup).
"""
return SimpleNamespace(
id=str(uuid.uuid4()),
tenant_id=str(uuid.uuid4()),
)
def _make_workflow_run(
*,
app_id: str,
tenant_id: str,
triggered_from: WorkflowRunTriggeredFrom = WorkflowRunTriggeredFrom.DEBUGGING,
status: WorkflowExecutionStatus = WorkflowExecutionStatus.RUNNING,
graph: dict[str, Any] | None = None,
) -> WorkflowRun:
"""Build a ``WorkflowRun`` row with all required fields populated."""
return WorkflowRun(
id=str(uuid.uuid4()),
tenant_id=tenant_id,
app_id=app_id,
workflow_id=str(uuid.uuid4()),
type=WorkflowType.WORKFLOW,
triggered_from=triggered_from,
version="draft",
graph=json.dumps(graph or {"nodes": []}),
status=status,
created_by_role=CreatorUserRole.ACCOUNT,
created_by=str(uuid.uuid4()),
)
def _make_execution(
*,
app_id: str,
tenant_id: str,
workflow_id: str,
workflow_run_id: str,
node_id: str,
node_type: str = "agent",
title: str = "",
status: WorkflowNodeExecutionStatus = WorkflowNodeExecutionStatus.SUCCEEDED,
outputs: dict[str, Any] | None = None,
execution_metadata: dict[str, Any] | None = None,
index: int = 1,
) -> WorkflowNodeExecutionModel:
"""Build a ``WorkflowNodeExecutionModel`` row with all required fields."""
return WorkflowNodeExecutionModel(
id=str(uuid.uuid4()),
tenant_id=tenant_id,
app_id=app_id,
workflow_id=workflow_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
workflow_run_id=workflow_run_id,
index=index,
node_id=node_id,
node_type=node_type,
title=title or node_id,
status=status,
outputs=json.dumps(outputs) if outputs is not None else None,
execution_metadata=json.dumps(execution_metadata) if execution_metadata is not None else None,
created_by_role=CreatorUserRole.ACCOUNT,
created_by=str(uuid.uuid4()),
created_at=datetime.now(UTC),
finished_at=datetime.now(UTC),
)
@pytest.fixture
def seeded_run(
flask_req_ctx, fake_app_model: SimpleNamespace
) -> Generator[tuple[SimpleNamespace, WorkflowRun, list[WorkflowNodeExecutionModel]], None, None]:
"""Seed one debug ``WorkflowRun`` + 2 node executions in real Postgres.
Yields ``(app_model, workflow_run, executions)``. Cleans both rows up at
teardown via direct ``DELETE`` so a failed test never leaves debris.
"""
graph = {
"nodes": [
{
"id": "agent-node-1",
"data": {"type": "agent", "version": "2", "title": "My Agent"},
},
{
"id": "tool-node-1",
"data": {"type": "tool", "title": "Slack"},
},
]
}
workflow_run = _make_workflow_run(
app_id=fake_app_model.id,
tenant_id=fake_app_model.tenant_id,
graph=graph,
)
agent_execution = _make_execution(
app_id=fake_app_model.id,
tenant_id=fake_app_model.tenant_id,
workflow_id=workflow_run.workflow_id,
workflow_run_id=workflow_run.id,
node_id="agent-node-1",
node_type="agent",
outputs={"text": "hello world"},
execution_metadata={
"output_type_check": {
"passed": True,
"results": [{"name": "text", "type": "string", "status": "ready"}],
},
"attempt": 0,
},
index=1,
)
tool_execution = _make_execution(
app_id=fake_app_model.id,
tenant_id=fake_app_model.tenant_id,
workflow_id=workflow_run.workflow_id,
workflow_run_id=workflow_run.id,
node_id="tool-node-1",
node_type="tool",
outputs={"message": "sent", "ok": True},
index=2,
)
with session_factory.create_session() as session:
session.add(workflow_run)
session.add(agent_execution)
session.add(tool_execution)
session.commit()
run_id = workflow_run.id
execution_ids = [agent_execution.id, tool_execution.id]
try:
yield fake_app_model, workflow_run, [agent_execution, tool_execution]
finally:
with session_factory.create_session() as session:
session.execute(delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id.in_(execution_ids)))
session.execute(delete(WorkflowRun).where(WorkflowRun.id == run_id))
session.commit()
# ──────────────────────────────────────────────────────────────────────────────
# Stub binding resolver — declared outputs for the agent v2 node
# ──────────────────────────────────────────────────────────────────────────────
def _stub_resolver(declared_outputs_payload: list[dict[str, Any]]):
"""Return a stand-in binding resolver whose ``.resolve`` always returns
one bundle with the supplied declared_outputs.
The real resolver hits ``workflow_agent_node_bindings``; we skip that
table here so the Inspector can be tested without binding-row setup.
"""
binding = SimpleNamespace(
id="binding-1",
node_job_config_dict={
"workflow_prompt": "stub",
"declared_outputs": declared_outputs_payload,
},
)
bundle = SimpleNamespace(binding=binding, agent=None, snapshot=None)
class _Resolver:
def resolve(self, **_: Any):
return bundle
return _Resolver()
# ──────────────────────────────────────────────────────────────────────────────
# Tests
# ──────────────────────────────────────────────────────────────────────────────
def test_snapshot_returns_agent_v2_declared_outputs_with_status_ready(seeded_run):
"""Happy path: agent v2 node + tool node both render, statuses come from
real ``WorkflowRun`` + ``WorkflowNodeExecutionModel`` rows."""
app_model, workflow_run, _ = seeded_run
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([{"name": "text", "type": "string"}]))
snapshot = service.snapshot_workflow_run(
app_model=app_model,
workflow_run_id=workflow_run.id,
)
assert snapshot.workflow_run_id == workflow_run.id
assert snapshot.workflow_run_status == WorkflowExecutionStatus.RUNNING
by_node = {n.node_id: n for n in snapshot.node_outputs}
agent_view = by_node["agent-node-1"]
assert agent_view.node_status == NodeStatus.READY
assert agent_view.outputs[0].name == "text"
assert agent_view.outputs[0].status == NodeOutputStatus.READY
assert agent_view.outputs[0].value_preview == "hello world"
tool_view = by_node["tool-node-1"]
# Tool node's declared outputs are *inferred* from the produced payload.
output_names = sorted(o.name for o in tool_view.outputs)
assert output_names == ["message", "ok"]
assert all(o.type is None for o in tool_view.outputs)
def test_snapshot_404s_for_missing_run(fake_app_model):
"""Service raises ``workflow_run_not_found`` when the row doesn't exist."""
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([]))
with pytest.raises(NodeOutputInspectorError) as exc:
service.snapshot_workflow_run(app_model=fake_app_model, workflow_run_id=str(uuid.uuid4()))
assert exc.value.code == "workflow_run_not_found"
def test_snapshot_404s_for_cross_tenant_access(seeded_run):
"""A wrong-tenant app_model must not see another tenant's run."""
_, workflow_run, _ = seeded_run
intruder = SimpleNamespace(id=str(uuid.uuid4()), tenant_id=str(uuid.uuid4()))
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([]))
with pytest.raises(NodeOutputInspectorError) as exc:
service.snapshot_workflow_run(app_model=intruder, workflow_run_id=workflow_run.id)
assert exc.value.code == "workflow_run_not_found"
def test_snapshot_404s_for_published_run_per_decision_d1(flask_req_ctx, fake_app_model):
"""Decision D-1: published / app-run Inspector deferred to stage 4.1."""
workflow_run = _make_workflow_run(
app_id=fake_app_model.id,
tenant_id=fake_app_model.tenant_id,
triggered_from=WorkflowRunTriggeredFrom.APP_RUN,
graph={"nodes": []},
)
with session_factory.create_session() as session:
session.add(workflow_run)
session.commit()
run_id = workflow_run.id
try:
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([]))
with pytest.raises(NodeOutputInspectorError) as exc:
service.snapshot_workflow_run(app_model=fake_app_model, workflow_run_id=run_id)
assert exc.value.code == "published_run_inspector_not_implemented"
finally:
with session_factory.create_session() as session:
session.execute(delete(WorkflowRun).where(WorkflowRun.id == run_id))
session.commit()
def test_snapshot_surfaces_type_check_failure_from_metadata(flask_req_ctx, fake_app_model):
"""Per-output ``TYPE_CHECK_FAILED`` derived from the metadata blob the
Stage 4 §5 stack records on the execution row."""
graph = {"nodes": [{"id": "agent-1", "data": {"type": "agent", "version": "2"}}]}
workflow_run = _make_workflow_run(app_id=fake_app_model.id, tenant_id=fake_app_model.tenant_id, graph=graph)
execution = _make_execution(
app_id=fake_app_model.id,
tenant_id=fake_app_model.tenant_id,
workflow_id=workflow_run.workflow_id,
workflow_run_id=workflow_run.id,
node_id="agent-1",
outputs={"summary": 123}, # int despite declared string
execution_metadata={
"output_type_check": {
"passed": False,
"results": [
{
"name": "summary",
"type": "string",
"status": "type_check_failed",
"reason": "expected string, got int",
}
],
}
},
)
with session_factory.create_session() as session:
session.add(workflow_run)
session.add(execution)
session.commit()
run_id, execution_id = workflow_run.id, execution.id
try:
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([{"name": "summary", "type": "string"}]))
snapshot = service.snapshot_workflow_run(app_model=fake_app_model, workflow_run_id=run_id)
output = snapshot.node_outputs[0].outputs[0]
assert output.status == NodeOutputStatus.TYPE_CHECK_FAILED
assert output.type_check is not None
assert output.type_check.passed is False
assert output.type_check.reason == "expected string, got int"
finally:
with session_factory.create_session() as session:
session.execute(delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id == execution_id))
session.execute(delete(WorkflowRun).where(WorkflowRun.id == run_id))
session.commit()
def test_snapshot_surfaces_output_check_failure_from_metadata(flask_req_ctx, fake_app_model):
"""When ``output_type_check.passed`` but ``output_check.passed=False``, the
output is flagged ``OUTPUT_CHECK_FAILED``."""
graph = {"nodes": [{"id": "agent-1", "data": {"type": "agent", "version": "2"}}]}
workflow_run = _make_workflow_run(app_id=fake_app_model.id, tenant_id=fake_app_model.tenant_id, graph=graph)
execution = _make_execution(
app_id=fake_app_model.id,
tenant_id=fake_app_model.tenant_id,
workflow_id=workflow_run.workflow_id,
workflow_run_id=workflow_run.id,
node_id="agent-1",
outputs={"report": {"file_id": "550e8400-e29b-41d4-a716-446655440000"}},
execution_metadata={
"output_type_check": {"passed": True, "results": [{"name": "report", "status": "ready"}]},
"output_check": {
"passed": False,
"results": [{"name": "report", "status": "failed", "reason": "benchmark mismatch"}],
},
},
)
with session_factory.create_session() as session:
session.add(workflow_run)
session.add(execution)
session.commit()
run_id, execution_id = workflow_run.id, execution.id
try:
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([{"name": "report", "type": "file"}]))
# Stub signed-URL so we don't depend on the workflow file runtime being
# bound (it isn't, in this minimal flask_req_ctx).
with patch(
"services.workflow.node_output_inspector_service.file_helpers.get_signed_file_url",
return_value="https://signed.example/report",
):
snapshot = service.snapshot_workflow_run(app_model=fake_app_model, workflow_run_id=run_id)
output = snapshot.node_outputs[0].outputs[0]
assert output.status == NodeOutputStatus.OUTPUT_CHECK_FAILED
assert output.output_check is not None
assert output.output_check.passed is False
assert output.output_check.reason == "benchmark mismatch"
finally:
with session_factory.create_session() as session:
session.execute(delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id == execution_id))
session.execute(delete(WorkflowRun).where(WorkflowRun.id == run_id))
session.commit()
def test_node_detail_serves_one_node(seeded_run):
app_model, workflow_run, _ = seeded_run
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([{"name": "text", "type": "string"}]))
view = service.node_detail(
app_model=app_model,
workflow_run_id=workflow_run.id,
node_id="agent-node-1",
)
assert view.node_id == "agent-node-1"
assert view.outputs[0].name == "text"
def test_output_preview_for_file_renders_signed_url(seeded_run, fake_app_model):
"""``preview`` returns the full value with signed_url for file refs."""
# Replace the seeded agent execution's output with a file ref.
_, workflow_run, executions = seeded_run
agent_execution = executions[0]
with session_factory.create_session() as session:
# Re-bind the persisted row so we can mutate + commit.
from sqlalchemy import select
row = session.scalar(
select(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id == agent_execution.id)
)
assert row is not None
row.outputs = json.dumps({"text": {"file_id": "550e8400-e29b-41d4-a716-446655440000", "filename": "x.pdf"}})
session.commit()
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([{"name": "text", "type": "file"}]))
with patch(
"services.workflow.node_output_inspector_service.file_helpers.get_signed_file_url",
return_value="https://signed.example/x.pdf",
):
preview = service.output_preview(
app_model=fake_app_model,
workflow_run_id=workflow_run.id,
node_id="agent-node-1",
output_name="text",
)
assert preview.output_name == "text"
assert isinstance(preview.value, dict)
assert preview.value["preview_url"] == "https://signed.example/x.pdf"
assert preview.value["filename"] == "x.pdf"
def test_keeps_latest_execution_per_node_by_index(flask_req_ctx, fake_app_model):
"""Multiple executions for the same node_id → service keeps the highest
``index`` (matches the agent_v2 retry pattern that re-emits node
executions)."""
graph = {"nodes": [{"id": "agent-1", "data": {"type": "agent", "version": "2"}}]}
workflow_run = _make_workflow_run(app_id=fake_app_model.id, tenant_id=fake_app_model.tenant_id, graph=graph)
older = _make_execution(
app_id=fake_app_model.id,
tenant_id=fake_app_model.tenant_id,
workflow_id=workflow_run.workflow_id,
workflow_run_id=workflow_run.id,
node_id="agent-1",
outputs={"text": "first attempt"},
index=1,
)
newer = _make_execution(
app_id=fake_app_model.id,
tenant_id=fake_app_model.tenant_id,
workflow_id=workflow_run.workflow_id,
workflow_run_id=workflow_run.id,
node_id="agent-1",
outputs={"text": "second attempt"},
index=5,
)
with session_factory.create_session() as session:
session.add(workflow_run)
session.add(older)
session.add(newer)
session.commit()
run_id, ex_ids = workflow_run.id, [older.id, newer.id]
try:
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([{"name": "text", "type": "string"}]))
snapshot = service.snapshot_workflow_run(app_model=fake_app_model, workflow_run_id=run_id)
assert snapshot.node_outputs[0].outputs[0].value_preview == "second attempt"
finally:
with session_factory.create_session() as session:
session.execute(delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id.in_(ex_ids)))
session.execute(delete(WorkflowRun).where(WorkflowRun.id == run_id))
session.commit()

View File

@ -0,0 +1,82 @@
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
API_PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(API_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(API_PROJECT_ROOT))
import sqlalchemy as sa
from tests.helpers.legacy_model_type_migration import (
DEFAULT_PRIMARY_TENANT_ID,
DEFAULT_SECONDARY_TENANT_ID,
create_minimal_legacy_model_type_schema,
seed_legacy_model_type_dirty_data,
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Seed dirty legacy model_type rows for manual migration experiments. "
"Example: uv run --project api python api/tests/seed_legacy_model_type_dirty_data.py "
"--db-url postgresql://postgres:postgres@127.0.0.1:5432/dify"
)
)
parser.add_argument("--db-url", required=True, help="SQLAlchemy database URL for the target database.")
parser.add_argument(
"--primary-tenant-id",
default=DEFAULT_PRIMARY_TENANT_ID,
help="Tenant that will contain the main conflict scenario.",
)
parser.add_argument(
"--secondary-tenant-id",
default=DEFAULT_SECONDARY_TENANT_ID,
help="Tenant used to verify tenant filtering behavior.",
)
parser.add_argument(
"--create-minimal-schema",
action="store_true",
help="Create the minimal tables needed for the seed when running against an empty scratch database.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
engine = sa.create_engine(args.db_url)
try:
if args.create_minimal_schema:
create_minimal_legacy_model_type_schema(engine)
fixture = seed_legacy_model_type_dirty_data(
engine,
primary_tenant_id=args.primary_tenant_id,
secondary_tenant_id=args.secondary_tenant_id,
)
finally:
engine.dispose()
print(
json.dumps(
{
"primary_tenant_id": fixture.primary.tenant_id,
"secondary_tenant_id": fixture.secondary.tenant_id,
"winner_credential_id": fixture.primary.winner_credential_id,
"loser_credential_id": fixture.primary.loser_credential_id,
"provider_model_id": fixture.primary.provider_model_id,
"load_balancing_config_id": fixture.primary.load_balancing_config_id,
},
indent=2,
sort_keys=True,
)
)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@ -0,0 +1,127 @@
from __future__ import annotations
import importlib
import io
from collections.abc import Generator
import pytest
import sqlalchemy as sa
from tests.helpers.legacy_model_type_migration import (
assert_tenant_rows_use_only_canonical_model_types,
count_rows,
fetch_table_rows,
seed_legacy_model_type_dirty_data,
)
@pytest.fixture(scope="session")
def migration_module():
try:
return importlib.import_module("services.legacy_model_type_migration")
except ModuleNotFoundError as exc: # pragma: no cover - explicit TDD failure path
pytest.fail(
"services.legacy_model_type_migration is missing. "
"Implement LegacyModelTypeMigrationService before running these tests."
)
@pytest.fixture(params=("postgresql", "mysql"), scope="session")
def container_engine(request: pytest.FixtureRequest) -> Generator[tuple[str, sa.Engine], None, None]:
backend_name = request.param
if backend_name == "postgresql":
testcontainers_postgres = pytest.importorskip("testcontainers.postgres")
container = testcontainers_postgres.PostgresContainer("postgres:15-alpine")
else:
testcontainers_mysql = pytest.importorskip("testcontainers.mysql")
container = testcontainers_mysql.MySqlContainer("mysql:8.0")
container.start()
raw_url = container.get_connection_url()
engine_url = raw_url.replace("mysql://", "mysql+pymysql://", 1)
engine = sa.create_engine(engine_url)
try:
yield backend_name, engine
finally:
engine.dispose()
container.stop()
def test_legacy_model_type_migration_end_to_end_across_supported_backends(
migration_module,
container_engine: tuple[str, sa.Engine],
monkeypatch: pytest.MonkeyPatch,
) -> None:
backend_name, engine = container_engine
helper_module = importlib.import_module("tests.helpers.legacy_model_type_migration")
helper_module.drop_minimal_legacy_model_type_schema(engine)
fixture = seed_legacy_model_type_dirty_data(engine)
deleted_cache_keys: list[str] = []
def _record_delete(self) -> None:
deleted_cache_keys.append(self.cache_key)
monkeypatch.setattr(migration_module.ProviderCredentialsCache, "delete", _record_delete)
dry_run_output = io.StringIO()
migration_module.LegacyModelTypeMigrationService(
engine=engine,
apply=False,
output=dry_run_output,
tenant_ids=(fixture.primary.tenant_id,),
).migrate()
assert count_rows(engine, "provider_model_credentials", tenant_id=fixture.primary.tenant_id) == 3
assert deleted_cache_keys == []
apply_output = io.StringIO()
migration_module.LegacyModelTypeMigrationService(
engine=engine,
apply=True,
output=apply_output,
tenant_ids=(fixture.primary.tenant_id,),
).migrate()
first_apply_state = {
table_name: fetch_table_rows(engine, table_name, tenant_id=fixture.primary.tenant_id)
for table_name in (
"provider_models",
"tenant_default_models",
"provider_model_settings",
"load_balancing_model_configs",
"provider_model_credentials",
)
}
assert_tenant_rows_use_only_canonical_model_types(engine, fixture.primary.tenant_id)
assert count_rows(engine, "provider_model_credentials", tenant_id=fixture.primary.tenant_id) == 2
provider_model_row = next(
row for row in first_apply_state["provider_models"] if row["id"] == fixture.primary.provider_model_id
)
assert provider_model_row["credential_id"] == fixture.primary.winner_credential_id
credential_ids = {str(row["id"]) for row in first_apply_state["provider_model_credentials"]}
assert credential_ids == {
fixture.primary.winner_credential_id,
fixture.primary.distinct_credential_id,
}
lb_row = next(
row
for row in first_apply_state["load_balancing_model_configs"]
if row["id"] == fixture.primary.load_balancing_config_id
)
assert lb_row["credential_id"] == fixture.primary.winner_credential_id
assert lb_row["encrypted_config"] == fixture.primary.winner_encrypted_config
assert deleted_cache_keys, f"{backend_name} apply run should clear cache keys"
migration_module.LegacyModelTypeMigrationService(
engine=engine,
apply=True,
output=io.StringIO(),
tenant_ids=(fixture.primary.tenant_id,),
).migrate()
second_apply_state = {
table_name: fetch_table_rows(engine, table_name, tenant_id=fixture.primary.tenant_id)
for table_name in first_apply_state
}
assert second_apply_state == first_apply_state

View File

@ -1,12 +1,7 @@
import pytest
from agenton.layers import ExitIntent
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID
from dify_agent.layers.dify_plugin import (
DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID,
DifyPluginToolConfig,
DifyPluginToolsLayerConfig,
)
from dify_agent.layers.dify_plugin import DIFY_PLUGIN_LLM_LAYER_TYPE_ID
from dify_agent.layers.execution_context import DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID, DifyExecutionContextLayerConfig
from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID
from dify_agent.protocol import (
@ -19,7 +14,6 @@ from pydantic import ValidationError
from clients.agent_backend import (
AGENT_SOUL_PROMPT_LAYER_ID,
DIFY_EXECUTION_CONTEXT_LAYER_ID,
DIFY_PLUGIN_TOOLS_LAYER_ID,
WORKFLOW_NODE_JOB_PROMPT_LAYER_ID,
WORKFLOW_USER_PROMPT_LAYER_ID,
AgentBackendModelConfig,
@ -106,33 +100,6 @@ def test_request_builder_sets_model_and_output_layer_contract_ids():
assert layers[DIFY_AGENT_OUTPUT_LAYER_ID].type == DIFY_OUTPUT_LAYER_TYPE_ID
def test_request_builder_adds_dify_plugin_tools_layer_when_configured():
run_input = _run_input()
run_input.tools = DifyPluginToolsLayerConfig(
tools=[
DifyPluginToolConfig(
plugin_id="langgenius/time",
provider="time",
tool_name="current_time",
credential_type="unauthorized",
name="current_time",
description="Get current time.",
credentials={},
runtime_parameters={},
parameters=[],
parameters_json_schema={"type": "object", "properties": {}, "required": []},
)
]
)
request = AgentBackendRunRequestBuilder().build_for_workflow_node(run_input)
layers = {layer.name: layer for layer in request.composition.layers}
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID].type == DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID].deps == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID].config.tools[0].tool_name == "current_time"
def test_request_builder_can_suspend_on_exit_for_resume_or_babysit_paths():
run_input = _run_input()
run_input.suspend_on_exit = True

File diff suppressed because it is too large Load Diff

View File

@ -1,454 +0,0 @@
"""Unit tests for the Node Output Inspector controller (Stage 4 §8).
The controller has two non-trivial moving parts:
1. :func:`_sse_envelope` — wire-format builder for the SSE ``{event, data, id}``
records (decision D-5).
2. :func:`_stream_inspector_events` — the SSE generator that fans the redis
pub/sub stream out as snapshot / node_changed / workflow_run_completed /
error events.
We exercise both as plain functions with mocked dependencies (service +
``inspector_events.subscribe``) — going through Flask routes would multiply
the test scaffolding without buying additional confidence in the core
behaviour.
The Resource classes themselves are trivial wrappers (``_service().method()``
+ ``_InspectorNotFound`` translation), and are touched here only by import so
codecov sees them as exercised; their detailed behaviour is locked down by
the service-level tests in
``tests/unit_tests/services/workflow/test_node_output_inspector_service.py``.
"""
from __future__ import annotations
import json
from collections.abc import Iterator
from typing import Any
from unittest.mock import MagicMock
from uuid import UUID
import pytest
from controllers.console.app import workflow_node_output_inspector as ctrl
from services.workflow.inspector_events import InspectorMessage
from services.workflow.node_output_inspector_service import (
NodeOutputInspectorError,
NodeOutputStatus,
NodeOutputsView,
NodeStatus,
WorkflowRunSnapshotView,
)
# ──────────────────────────────────────────────────────────────────────────────
# Fixtures
# ──────────────────────────────────────────────────────────────────────────────
@pytest.fixture
def app_model() -> Any:
"""A minimal ``App`` stub the controller passes through to the service.
The SSE generator never reads its attributes — just forwards it — so a
sentinel object is enough.
"""
return MagicMock(name="App", tenant_id="tenant-1", id="app-1")
@pytest.fixture
def run_id() -> UUID:
return UUID("00000000-0000-0000-0000-0000000000aa")
def _snapshot_view(*, status: str, node_id: str = "agent-1") -> WorkflowRunSnapshotView:
from graphon.enums import WorkflowExecutionStatus
return WorkflowRunSnapshotView(
workflow_run_id="00000000-0000-0000-0000-0000000000aa",
workflow_run_status=WorkflowExecutionStatus(status),
node_outputs=[
NodeOutputsView(
node_id=node_id,
node_kind="agent",
node_display_name="Greeter",
node_status=NodeStatus.RUNNING if status == "running" else NodeStatus.READY,
outputs=[],
)
],
)
def _node_view(*, node_id: str = "agent-1", node_status: NodeStatus = NodeStatus.READY) -> NodeOutputsView:
return NodeOutputsView(
node_id=node_id,
node_kind="agent",
node_display_name="Greeter",
node_status=node_status,
outputs=[],
)
# ──────────────────────────────────────────────────────────────────────────────
# _sse_envelope
# ──────────────────────────────────────────────────────────────────────────────
def test_sse_envelope_serializes_dict_payload():
out = ctrl._sse_envelope("snapshot", {"foo": "bar"}, 7)
lines = out.rstrip("\n").split("\n")
assert lines[0] == "event: snapshot"
assert lines[1] == "id: 7"
assert lines[2] == 'data: {"foo": "bar"}'
assert out.endswith("\n\n") # SSE record separator
def test_sse_envelope_passes_strings_through_unmodified():
"""A raw string payload (e.g. ``:keepalive``) is emitted as-is."""
out = ctrl._sse_envelope("snapshot", ":keepalive", 1)
assert "data: :keepalive\n" in out
def test_sse_envelope_handles_unicode_payload():
out = ctrl._sse_envelope("node_changed", {"name": "你好"}, 3)
assert "你好" in out # ensure_ascii=False
# ──────────────────────────────────────────────────────────────────────────────
# _stream_inspector_events — fast path (already-terminal run)
# ──────────────────────────────────────────────────────────────────────────────
def _drain(stream: Iterator[str]) -> list[str]:
return list(stream)
def _parse(record: str) -> tuple[str, dict | None]:
"""Pull ``event`` + ``data`` (json-decoded) out of one SSE record."""
event = None
data: dict | None = None
for line in record.rstrip("\n").split("\n"):
if line.startswith("event: "):
event = line[len("event: ") :]
elif line.startswith("data: "):
try:
data = json.loads(line[len("data: ") :])
except json.JSONDecodeError:
data = None
assert event is not None
return event, data
@pytest.fixture
def patch_service(monkeypatch: pytest.MonkeyPatch):
"""Replace ``_service()`` with a MagicMock per-test."""
fake = MagicMock()
monkeypatch.setattr(ctrl, "_service", lambda: fake)
return fake
@pytest.fixture
def patch_subscribe(monkeypatch: pytest.MonkeyPatch):
"""Patch the pub/sub subscribe iterator."""
def _make(messages: list[InspectorMessage | None]):
def _subscribe(workflow_run_id: str, *, timeout_seconds: float = 1.0):
for m in messages:
if m is None:
# heartbeat sentinel
yield InspectorMessage(
kind="node_changed",
workflow_run_id=workflow_run_id,
node_id=None,
status=None,
)
else:
yield m
monkeypatch.setattr(ctrl.inspector_events, "subscribe", _subscribe)
return _make
def test_stream_fast_path_when_run_already_terminal(patch_service, app_model, run_id):
"""A run that's already ``succeeded`` should produce ``snapshot`` →
``workflow_run_completed`` and close without subscribing to pub/sub."""
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="succeeded")
records = _drain(ctrl._stream_inspector_events(app_model, run_id))
assert len(records) == 2
e0, d0 = _parse(records[0])
e1, d1 = _parse(records[1])
assert e0 == "snapshot"
assert d0 is not None
assert d0["workflow_run_status"] == "succeeded"
assert e1 == "workflow_run_completed"
assert d1 is not None
assert d1["workflow_run_status"] == "succeeded"
def test_stream_fast_path_each_terminal_status(patch_service, app_model, run_id):
"""All four terminal statuses take the fast-path. Note the enum value for
partial success is the hyphenated ``partial-succeeded``."""
for terminal in ("succeeded", "failed", "stopped", "partial-succeeded"):
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status=terminal)
records = _drain(ctrl._stream_inspector_events(app_model, run_id))
events = [_parse(r)[0] for r in records]
assert events == ["snapshot", "workflow_run_completed"], terminal
def test_stream_initial_404_propagates_before_any_bytes(patch_service, app_model, run_id):
"""``NodeOutputInspectorError`` on the initial snapshot must surface as the
controller's ``_InspectorNotFound`` exception so Flask returns HTTP 404
— not a half-streamed SSE body."""
patch_service.snapshot_workflow_run.side_effect = NodeOutputInspectorError(
"workflow_run_not_found", "Workflow run not found."
)
gen = ctrl._stream_inspector_events(app_model, run_id)
with pytest.raises(ctrl._InspectorNotFound) as exc:
next(gen)
assert exc.value.error_code == "workflow_run_not_found"
# ──────────────────────────────────────────────────────────────────────────────
# _stream_inspector_events — live path (run is running)
# ──────────────────────────────────────────────────────────────────────────────
def test_stream_live_emits_snapshot_then_node_changed_then_completion(
patch_service, patch_subscribe, app_model, run_id
):
"""Happy path: snapshot → 2× node_changed → workflow_run_completed."""
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
patch_service.node_detail.return_value = _node_view(node_id="agent-1")
msgs = [
InspectorMessage(kind="node_changed", workflow_run_id=str(run_id), node_id="agent-1", status="running"),
InspectorMessage(kind="node_changed", workflow_run_id=str(run_id), node_id="agent-1", status="succeeded"),
InspectorMessage(kind="workflow_completed", workflow_run_id=str(run_id), node_id=None, status="succeeded"),
]
patch_subscribe(msgs)
events = [_parse(r)[0] for r in _drain(ctrl._stream_inspector_events(app_model, run_id))]
assert events == ["snapshot", "node_changed", "node_changed", "workflow_run_completed"]
# node_detail should be called once per delta (not once per heartbeat)
assert patch_service.node_detail.call_count == 2
def test_stream_emits_heartbeat_after_n_idle_ticks(
patch_service, patch_subscribe, monkeypatch: pytest.MonkeyPatch, app_model, run_id
):
"""When pub/sub returns the heartbeat sentinel ``_HEARTBEAT_EVERY_TICKS``
times in a row, the generator emits a ``:keepalive`` SSE comment."""
monkeypatch.setattr(ctrl, "_HEARTBEAT_EVERY_TICKS", 2)
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
patch_service.node_detail.return_value = _node_view()
# 2 heartbeats → keepalive, then real message + completion.
patch_subscribe(
[
None,
None,
InspectorMessage(kind="workflow_completed", workflow_run_id=str(run_id), node_id=None, status="failed"),
]
)
records = _drain(ctrl._stream_inspector_events(app_model, run_id))
raw = "".join(records)
assert ":keepalive\n\n" in raw
assert "workflow_run_completed" in raw
def test_stream_hard_timeout_force_closes_without_terminal(
patch_service, patch_subscribe, monkeypatch: pytest.MonkeyPatch, app_model, run_id
):
"""If the engine crashes / drops the terminal event, the generator force-
closes after ``_STREAM_HARD_TIMEOUT_TICKS`` ticks rather than hanging."""
monkeypatch.setattr(ctrl, "_STREAM_HARD_TIMEOUT_TICKS", 3)
monkeypatch.setattr(ctrl, "_HEARTBEAT_EVERY_TICKS", 100) # avoid keepalive noise
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
# 5 heartbeats, no terminal → generator should bail after 3 ticks.
patch_subscribe([None] * 10)
records = _drain(ctrl._stream_inspector_events(app_model, run_id))
events = [_parse(r)[0] for r in records]
assert events == ["snapshot"] # only snapshot, then forced close
def test_stream_skips_messages_with_missing_node_id(patch_service, patch_subscribe, app_model, run_id):
"""Defensive: malformed node_changed without node_id is silently dropped."""
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
patch_subscribe(
[
InspectorMessage(kind="node_changed", workflow_run_id=str(run_id), node_id="", status="running"),
InspectorMessage(kind="workflow_completed", workflow_run_id=str(run_id), node_id=None, status="succeeded"),
]
)
events = [_parse(r)[0] for r in _drain(ctrl._stream_inspector_events(app_model, run_id))]
assert events == ["snapshot", "workflow_run_completed"]
assert patch_service.node_detail.call_count == 0
def test_stream_skips_node_detail_404_without_breaking_stream(patch_service, patch_subscribe, app_model, run_id):
"""When node_detail 404s mid-stream (node still being persisted), the
generator just drops that delta and keeps streaming."""
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
patch_service.node_detail.side_effect = NodeOutputInspectorError("node_not_in_workflow_run", "transient")
patch_subscribe(
[
InspectorMessage(kind="node_changed", workflow_run_id=str(run_id), node_id="agent-1", status="running"),
InspectorMessage(kind="workflow_completed", workflow_run_id=str(run_id), node_id=None, status="succeeded"),
]
)
events = [_parse(r)[0] for r in _drain(ctrl._stream_inspector_events(app_model, run_id))]
assert events == ["snapshot", "workflow_run_completed"]
def test_stream_emits_error_event_on_node_detail_unexpected_exception(
patch_service, patch_subscribe, app_model, run_id
):
"""Any non-Inspector exception (DB outage, JSON decode error) becomes a
user-visible ``error`` SSE record; the stream keeps running."""
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
patch_service.node_detail.side_effect = RuntimeError("db gone")
patch_subscribe(
[
InspectorMessage(kind="node_changed", workflow_run_id=str(run_id), node_id="agent-1", status="running"),
InspectorMessage(kind="workflow_completed", workflow_run_id=str(run_id), node_id=None, status="succeeded"),
]
)
records = _drain(ctrl._stream_inspector_events(app_model, run_id))
events = [_parse(r) for r in records]
kinds = [e for e, _ in events]
assert kinds == ["snapshot", "error", "workflow_run_completed"]
err_event, err_data = events[1]
assert err_data is not None
assert err_data["node_id"] == "agent-1"
assert "failed" in err_data["message"]
def test_stream_workflow_completed_status_falls_back_to_unknown(patch_service, patch_subscribe, app_model, run_id):
"""If the pub/sub message arrives with status=None (publish race), the SSE
payload still carries ``workflow_run_status`` with the ``unknown``
sentinel so the frontend never sees a missing field."""
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
patch_subscribe(
[InspectorMessage(kind="workflow_completed", workflow_run_id=str(run_id), node_id=None, status=None)]
)
records = _drain(ctrl._stream_inspector_events(app_model, run_id))
e, d = _parse(records[-1])
assert e == "workflow_run_completed"
assert d is not None
assert d["workflow_run_status"] == "unknown"
# ──────────────────────────────────────────────────────────────────────────────
# Resource classes — import-level smoke + service-method delegation
# ──────────────────────────────────────────────────────────────────────────────
def test_resource_classes_are_registered():
"""All 8 Inspector Resource classes must be importable from the module so
flask-restx can discover them via the namespace decorators."""
for name in (
"WorkflowDraftRunNodeOutputsApi",
"WorkflowDraftRunNodeOutputDetailApi",
"WorkflowDraftRunNodeOutputPreviewApi",
"WorkflowDraftRunNodeOutputEventsApi",
"WorkflowPublishedRunNodeOutputsApi",
"WorkflowPublishedRunNodeOutputDetailApi",
"WorkflowPublishedRunNodeOutputPreviewApi",
"WorkflowPublishedRunNodeOutputEventsApi",
):
assert hasattr(ctrl, name), name
def test_inspector_not_found_preserves_error_code():
"""Sanity: the controller's bespoke 404 wrapper hangs onto the
Inspector's specific error code rather than collapsing to a generic
``not_found``."""
err = NodeOutputInspectorError("node_not_in_workflow_run", "boom")
wrapped = ctrl._InspectorNotFound(err)
assert wrapped.error_code == "node_not_in_workflow_run"
assert wrapped.code == 404
# ──────────────────────────────────────────────────────────────────────────────
# _serve_* — shared REST handler bodies (covered by both draft + published)
# ──────────────────────────────────────────────────────────────────────────────
def test_serve_snapshot_happy_path(patch_service, app_model, run_id):
"""Returns the snapshot view as JSON-serialisable dict."""
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
result = ctrl._serve_snapshot(app_model, run_id)
assert isinstance(result, dict)
assert result["workflow_run_id"] == "00000000-0000-0000-0000-0000000000aa"
patch_service.snapshot_workflow_run.assert_called_once_with(app_model=app_model, workflow_run_id=str(run_id))
def test_serve_snapshot_translates_inspector_error_to_404(patch_service, app_model, run_id):
"""``NodeOutputInspectorError`` becomes the controller's 404 wrapper with
the specific ``error_code`` preserved."""
patch_service.snapshot_workflow_run.side_effect = NodeOutputInspectorError("workflow_run_not_found", "no such run")
with pytest.raises(ctrl._InspectorNotFound) as exc:
ctrl._serve_snapshot(app_model, run_id)
assert exc.value.error_code == "workflow_run_not_found"
def test_serve_node_detail_happy_path(patch_service, app_model, run_id):
patch_service.node_detail.return_value = _node_view(node_id="agent-1")
result = ctrl._serve_node_detail(app_model, run_id, "agent-1")
assert result["node_id"] == "agent-1"
patch_service.node_detail.assert_called_once_with(
app_model=app_model, workflow_run_id=str(run_id), node_id="agent-1"
)
def test_serve_node_detail_translates_inspector_error(patch_service, app_model, run_id):
patch_service.node_detail.side_effect = NodeOutputInspectorError("node_not_in_workflow_run", "missing")
with pytest.raises(ctrl._InspectorNotFound) as exc:
ctrl._serve_node_detail(app_model, run_id, "ghost")
assert exc.value.error_code == "node_not_in_workflow_run"
def test_serve_output_preview_happy_path(patch_service, app_model, run_id):
from services.workflow.node_output_inspector_service import (
DeclaredOutputType,
OutputPreviewView,
)
patch_service.output_preview.return_value = OutputPreviewView(
node_id="agent-1",
output_name="text",
type=DeclaredOutputType.STRING,
status=NodeOutputStatus.READY,
value="Hello",
)
result = ctrl._serve_output_preview(app_model, run_id, "agent-1", "text")
assert result["value"] == "Hello"
assert result["status"] == "ready"
patch_service.output_preview.assert_called_once_with(
app_model=app_model,
workflow_run_id=str(run_id),
node_id="agent-1",
output_name="text",
)
def test_serve_output_preview_translates_inspector_error(patch_service, app_model, run_id):
patch_service.output_preview.side_effect = NodeOutputInspectorError("node_output_not_declared", "no such output")
with pytest.raises(ctrl._InspectorNotFound) as exc:
ctrl._serve_output_preview(app_model, run_id, "agent-1", "phantom")
assert exc.value.error_code == "node_output_not_declared"
# ──────────────────────────────────────────────────────────────────────────────
# Note: the Resource ``.get`` methods themselves (6 REST + 2 SSE) are
# 1-line delegators to the helpers above. They can't be called directly in a
# unit test because their decorator stack (``@setup_required`` /
# ``@login_required`` / ``@account_initialization_required`` /
# ``@get_app_model``) needs a real Flask request context + DB-backed account.
# The integration test in
# ``tests/integration_tests/services/test_node_output_inspector_service.py``
# (and the E2E driver in /tmp/e2e_inspector_sse_published.py) exercise them
# through the HTTP stack.
# ──────────────────────────────────────────────────────────────────────────────

View File

@ -73,13 +73,14 @@ class TestMessageListApi:
"/",
query_string={"conversation_id": "11111111-1111-1111-1111-111111111111"},
),
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.MessageService,
"pagination_by_first_id",
return_value=pagination,
),
):
result = method(MagicMock(), installed_app)
result = method(installed_app)
assert result["limit"] == 20
assert result["has_more"] is False
@ -92,8 +93,9 @@ class TestMessageListApi:
installed_app = MagicMock()
installed_app.app = MagicMock(mode="completion")
with pytest.raises(NotChatAppError):
method(MagicMock(), installed_app)
with patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)):
with pytest.raises(NotChatAppError):
method(installed_app)
def test_conversation_not_exists(self, app: Flask):
api = module.MessageListApi()
@ -107,6 +109,7 @@ class TestMessageListApi:
"/",
query_string={"conversation_id": "11111111-1111-1111-1111-111111111111"},
),
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.MessageService,
"pagination_by_first_id",
@ -114,7 +117,7 @@ class TestMessageListApi:
),
):
with pytest.raises(NotFound):
method(MagicMock(), installed_app)
method(installed_app)
def test_first_message_not_exists(self, app: Flask):
api = module.MessageListApi()
@ -128,6 +131,7 @@ class TestMessageListApi:
"/",
query_string={"conversation_id": "11111111-1111-1111-1111-111111111111"},
),
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.MessageService,
"pagination_by_first_id",
@ -135,7 +139,7 @@ class TestMessageListApi:
),
):
with pytest.raises(NotFound):
method(MagicMock(), installed_app)
method(installed_app)
class TestMessageFeedbackApi:
@ -148,12 +152,13 @@ class TestMessageFeedbackApi:
with (
app.test_request_context("/", json={"rating": "like"}),
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.MessageService,
"create_feedback",
),
):
result = method(MagicMock(), installed_app, "mid")
result = method(installed_app, "mid")
assert result["result"] == "success"
@ -166,6 +171,7 @@ class TestMessageFeedbackApi:
with (
app.test_request_context("/", json={}),
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.MessageService,
"create_feedback",
@ -173,7 +179,7 @@ class TestMessageFeedbackApi:
),
):
with pytest.raises(NotFound):
method(MagicMock(), installed_app, "mid")
method(installed_app, "mid")
class TestMessageMoreLikeThisApi:
@ -189,6 +195,7 @@ class TestMessageMoreLikeThisApi:
"/",
query_string={"response_mode": "blocking"},
),
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.AppGenerateService,
"generate_more_like_this",
@ -200,7 +207,7 @@ class TestMessageMoreLikeThisApi:
return_value=("ok", 200),
),
):
resp = method(MagicMock(), installed_app, "mid")
resp = method(installed_app, "mid")
assert resp == ("ok", 200)
@ -211,8 +218,9 @@ class TestMessageMoreLikeThisApi:
installed_app = MagicMock()
installed_app.app = MagicMock(mode="chat")
with pytest.raises(NotCompletionAppError):
method(MagicMock(), installed_app, "mid")
with patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)):
with pytest.raises(NotCompletionAppError):
method(installed_app, "mid")
def test_more_like_this_disabled(self, app: Flask):
api = module.MessageMoreLikeThisApi()
@ -226,6 +234,7 @@ class TestMessageMoreLikeThisApi:
"/",
query_string={"response_mode": "blocking"},
),
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.AppGenerateService,
"generate_more_like_this",
@ -233,7 +242,7 @@ class TestMessageMoreLikeThisApi:
),
):
with pytest.raises(AppMoreLikeThisDisabledError):
method(MagicMock(), installed_app, "mid")
method(installed_app, "mid")
def test_message_not_exists_more_like_this(self, app: Flask):
api = module.MessageMoreLikeThisApi()
@ -247,6 +256,7 @@ class TestMessageMoreLikeThisApi:
"/",
query_string={"response_mode": "blocking"},
),
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.AppGenerateService,
"generate_more_like_this",
@ -254,7 +264,7 @@ class TestMessageMoreLikeThisApi:
),
):
with pytest.raises(NotFound):
method(MagicMock(), installed_app, "mid")
method(installed_app, "mid")
def test_provider_not_init_more_like_this(self, app: Flask):
api = module.MessageMoreLikeThisApi()
@ -268,6 +278,7 @@ class TestMessageMoreLikeThisApi:
"/",
query_string={"response_mode": "blocking"},
),
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.AppGenerateService,
"generate_more_like_this",
@ -275,7 +286,7 @@ class TestMessageMoreLikeThisApi:
),
):
with pytest.raises(ProviderNotInitializeError):
method(MagicMock(), installed_app, "mid")
method(installed_app, "mid")
def test_quota_exceeded_more_like_this(self, app: Flask):
api = module.MessageMoreLikeThisApi()
@ -289,6 +300,7 @@ class TestMessageMoreLikeThisApi:
"/",
query_string={"response_mode": "blocking"},
),
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.AppGenerateService,
"generate_more_like_this",
@ -296,7 +308,7 @@ class TestMessageMoreLikeThisApi:
),
):
with pytest.raises(ProviderQuotaExceededError):
method(MagicMock(), installed_app, "mid")
method(installed_app, "mid")
def test_model_not_support_more_like_this(self, app: Flask):
api = module.MessageMoreLikeThisApi()
@ -310,6 +322,7 @@ class TestMessageMoreLikeThisApi:
"/",
query_string={"response_mode": "blocking"},
),
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.AppGenerateService,
"generate_more_like_this",
@ -317,7 +330,7 @@ class TestMessageMoreLikeThisApi:
),
):
with pytest.raises(ProviderModelCurrentlyNotSupportError):
method(MagicMock(), installed_app, "mid")
method(installed_app, "mid")
def test_invoke_error_more_like_this(self, app: Flask):
api = module.MessageMoreLikeThisApi()
@ -331,6 +344,7 @@ class TestMessageMoreLikeThisApi:
"/",
query_string={"response_mode": "blocking"},
),
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.AppGenerateService,
"generate_more_like_this",
@ -338,7 +352,7 @@ class TestMessageMoreLikeThisApi:
),
):
with pytest.raises(CompletionRequestError):
method(MagicMock(), installed_app, "mid")
method(installed_app, "mid")
def test_unexpected_error_more_like_this(self, app: Flask):
api = module.MessageMoreLikeThisApi()
@ -352,6 +366,7 @@ class TestMessageMoreLikeThisApi:
"/",
query_string={"response_mode": "blocking"},
),
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.AppGenerateService,
"generate_more_like_this",
@ -359,7 +374,7 @@ class TestMessageMoreLikeThisApi:
),
):
with pytest.raises(InternalServerError):
method(MagicMock(), installed_app, "mid")
method(installed_app, "mid")
class TestMessageSuggestedQuestionApi:
@ -371,13 +386,14 @@ class TestMessageSuggestedQuestionApi:
installed_app.app = MagicMock(mode="chat")
with (
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.MessageService,
"get_suggested_questions_after_answer",
return_value=["q1", "q2"],
),
):
result = method(MagicMock(), installed_app, "mid")
result = method(installed_app, "mid")
assert result["data"] == ["q1", "q2"]
@ -388,8 +404,9 @@ class TestMessageSuggestedQuestionApi:
installed_app = MagicMock()
installed_app.app = MagicMock(mode="completion")
with pytest.raises(NotChatAppError):
method(MagicMock(), installed_app, "mid")
with patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)):
with pytest.raises(NotChatAppError):
method(installed_app, "mid")
def test_disabled(self):
api = module.MessageSuggestedQuestionApi()
@ -399,6 +416,7 @@ class TestMessageSuggestedQuestionApi:
installed_app.app = MagicMock(mode="chat")
with (
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.MessageService,
"get_suggested_questions_after_answer",
@ -406,7 +424,7 @@ class TestMessageSuggestedQuestionApi:
),
):
with pytest.raises(AppSuggestedQuestionsAfterAnswerDisabledError):
method(MagicMock(), installed_app, "mid")
method(installed_app, "mid")
def test_message_not_exists_suggested_question(self):
api = module.MessageSuggestedQuestionApi()
@ -416,6 +434,7 @@ class TestMessageSuggestedQuestionApi:
installed_app.app = MagicMock(mode="chat")
with (
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.MessageService,
"get_suggested_questions_after_answer",
@ -423,7 +442,7 @@ class TestMessageSuggestedQuestionApi:
),
):
with pytest.raises(NotFound):
method(MagicMock(), installed_app, "mid")
method(installed_app, "mid")
def test_conversation_not_exists_suggested_question(self):
api = module.MessageSuggestedQuestionApi()
@ -433,6 +452,7 @@ class TestMessageSuggestedQuestionApi:
installed_app.app = MagicMock(mode="chat")
with (
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.MessageService,
"get_suggested_questions_after_answer",
@ -440,7 +460,7 @@ class TestMessageSuggestedQuestionApi:
),
):
with pytest.raises(NotFound):
method(MagicMock(), installed_app, "mid")
method(installed_app, "mid")
def test_provider_not_init_suggested_question(self):
api = module.MessageSuggestedQuestionApi()
@ -450,6 +470,7 @@ class TestMessageSuggestedQuestionApi:
installed_app.app = MagicMock(mode="chat")
with (
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.MessageService,
"get_suggested_questions_after_answer",
@ -457,7 +478,7 @@ class TestMessageSuggestedQuestionApi:
),
):
with pytest.raises(ProviderNotInitializeError):
method(MagicMock(), installed_app, "mid")
method(installed_app, "mid")
def test_quota_exceeded_suggested_question(self):
api = module.MessageSuggestedQuestionApi()
@ -467,6 +488,7 @@ class TestMessageSuggestedQuestionApi:
installed_app.app = MagicMock(mode="chat")
with (
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.MessageService,
"get_suggested_questions_after_answer",
@ -474,7 +496,7 @@ class TestMessageSuggestedQuestionApi:
),
):
with pytest.raises(ProviderQuotaExceededError):
method(MagicMock(), installed_app, "mid")
method(installed_app, "mid")
def test_model_not_support_suggested_question(self):
api = module.MessageSuggestedQuestionApi()
@ -484,6 +506,7 @@ class TestMessageSuggestedQuestionApi:
installed_app.app = MagicMock(mode="chat")
with (
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.MessageService,
"get_suggested_questions_after_answer",
@ -491,7 +514,7 @@ class TestMessageSuggestedQuestionApi:
),
):
with pytest.raises(ProviderModelCurrentlyNotSupportError):
method(MagicMock(), installed_app, "mid")
method(installed_app, "mid")
def test_invoke_error_suggested_question(self):
api = module.MessageSuggestedQuestionApi()
@ -501,6 +524,7 @@ class TestMessageSuggestedQuestionApi:
installed_app.app = MagicMock(mode="chat")
with (
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.MessageService,
"get_suggested_questions_after_answer",
@ -508,7 +532,7 @@ class TestMessageSuggestedQuestionApi:
),
):
with pytest.raises(CompletionRequestError):
method(MagicMock(), installed_app, "mid")
method(installed_app, "mid")
def test_unexpected_error_suggested_question(self):
api = module.MessageSuggestedQuestionApi()
@ -518,6 +542,7 @@ class TestMessageSuggestedQuestionApi:
installed_app.app = MagicMock(mode="chat")
with (
patch.object(module, "current_account_with_tenant", return_value=(MagicMock(), None)),
patch.object(
module.MessageService,
"get_suggested_questions_after_answer",
@ -525,4 +550,4 @@ class TestMessageSuggestedQuestionApi:
),
):
with pytest.raises(InternalServerError):
method(MagicMock(), installed_app, "mid")
method(installed_app, "mid")

View File

@ -54,6 +54,7 @@ def _masked_api_key(api_key: str) -> str:
def _mock_console_guards(monkeypatch: pytest.MonkeyPatch) -> MagicMock:
"""Bypass console decorators so handlers can run in isolation."""
import controllers.console.extension as extension_module
from controllers.console import wraps as wraps_module
account = MagicMock()
@ -65,6 +66,7 @@ def _mock_console_guards(monkeypatch: pytest.MonkeyPatch) -> MagicMock:
monkeypatch.setattr(wraps_module.dify_config, "EDITION", "CLOUD")
monkeypatch.setattr("libs.login.dify_config.LOGIN_DISABLED", True)
monkeypatch.delenv("INIT_PASSWORD", raising=False)
monkeypatch.setattr(extension_module, "current_account_with_tenant", lambda: (account, "tenant-123"))
monkeypatch.setattr(wraps_module, "current_account_with_tenant", lambda: (account, "tenant-123"))
# The login_required decorator consults the shared LocalProxy in libs.login.

View File

@ -15,6 +15,11 @@ class TestFeatureApi:
def test_get_tenant_features_success(self, mocker: MockerFixture):
from controllers.console.feature import FeatureApi
mocker.patch(
"controllers.console.feature.current_account_with_tenant",
return_value=("account_id", "tenant_123"),
)
get_features = mocker.patch("controllers.console.feature.FeatureService.get_features")
get_features.return_value.model_dump.return_value = {
"features": {"feature_a": True},
@ -24,7 +29,7 @@ class TestFeatureApi:
api = FeatureApi()
raw_get = unwrap(FeatureApi.get)
result = raw_get(api, "tenant_123")
result = raw_get(api)
assert result == {"features": {"feature_a": True}}
get_features.assert_called_once_with("tenant_123", exclude_vector_space=True)
@ -34,13 +39,18 @@ class TestFeatureVectorSpaceApi:
def test_get_vector_space_success(self, mocker: MockerFixture):
from controllers.console.feature import FeatureVectorSpaceApi
mocker.patch(
"controllers.console.feature.current_account_with_tenant",
return_value=("account_id", "tenant_123"),
)
get_vector_space = mocker.patch("controllers.console.feature.FeatureService.get_vector_space")
get_vector_space.return_value.model_dump.return_value = {"size": 5120, "limit": 20480}
api = FeatureVectorSpaceApi()
raw_get = unwrap(FeatureVectorSpaceApi.get)
result = raw_get(api, "tenant_123")
result = raw_get(api)
assert result == {"size": 5120, "limit": 20480}
get_vector_space.assert_called_once_with("tenant_123")

View File

@ -59,8 +59,12 @@ def mock_current_user():
@pytest.fixture
def mock_current_tenant_id():
return "tenant-123"
def mock_account_context(mock_current_user):
with patch(
"controllers.console.files.current_account_with_tenant",
return_value=(mock_current_user, None),
):
yield
@pytest.fixture
@ -91,15 +95,15 @@ class TestFileApiGet:
class TestFileApiPost:
def test_no_file_uploaded(self, app: Flask, mock_current_user):
def test_no_file_uploaded(self, app: Flask, mock_account_context):
api = FileApi()
post_method = unwrap(api.post)
with app.test_request_context(method="POST", data={}):
with pytest.raises(NoFileUploadedError):
post_method(api, mock_current_user)
post_method(api)
def test_too_many_files(self, app: Flask, mock_current_user):
def test_too_many_files(self, app: Flask, mock_account_context):
api = FileApi()
post_method = unwrap(api.post)
@ -114,9 +118,9 @@ class TestFileApiPost:
mock_request.form.get.return_value = None
with pytest.raises(TooManyFilesError):
post_method(api, mock_current_user)
post_method(api)
def test_filename_missing(self, app: Flask, mock_current_user):
def test_filename_missing(self, app: Flask, mock_account_context):
api = FileApi()
post_method = unwrap(api.post)
@ -126,24 +130,28 @@ class TestFileApiPost:
with app.test_request_context(method="POST", data=data):
with pytest.raises(FilenameNotExistsError):
post_method(api, mock_current_user)
post_method(api)
def test_dataset_upload_without_permission(self, app: Flask, mock_current_user):
mock_current_user.is_dataset_editor = False
api = FileApi()
post_method = unwrap(api.post)
with patch(
"controllers.console.files.current_account_with_tenant",
return_value=(mock_current_user, None),
):
api = FileApi()
post_method = unwrap(api.post)
data = {
"file": (io.BytesIO(b"abc"), "test.txt"),
"source": "datasets",
}
data = {
"file": (io.BytesIO(b"abc"), "test.txt"),
"source": "datasets",
}
with app.test_request_context(method="POST", data=data):
with pytest.raises(Forbidden):
post_method(api, mock_current_user)
with app.test_request_context(method="POST", data=data):
with pytest.raises(Forbidden):
post_method(api)
def test_successful_upload(self, app: Flask, mock_current_user, mock_file_service):
def test_successful_upload(self, app: Flask, mock_account_context, mock_file_service):
api = FileApi()
post_method = unwrap(api.post)
@ -171,13 +179,13 @@ class TestFileApiPost:
}
with app.test_request_context(method="POST", data=data):
response, status = post_method(api, mock_current_user)
response, status = post_method(api)
assert status == 201
assert response["id"] == "file-id-123"
assert response["name"] == "test.txt"
def test_upload_with_invalid_source(self, app: Flask, mock_current_user, mock_file_service):
def test_upload_with_invalid_source(self, app: Flask, mock_account_context, mock_file_service):
"""Test that invalid source parameter gets normalized to None"""
api = FileApi()
post_method = unwrap(api.post)
@ -208,7 +216,7 @@ class TestFileApiPost:
}
with app.test_request_context(method="POST", data=data):
response, status = post_method(api, mock_current_user)
response, status = post_method(api)
assert status == 201
assert response["id"] == "file-id-456"
@ -217,7 +225,7 @@ class TestFileApiPost:
call_kwargs = mock_file_service.upload_file.call_args[1]
assert call_kwargs["source"] is None
def test_file_too_large_error(self, app: Flask, mock_current_user, mock_file_service):
def test_file_too_large_error(self, app: Flask, mock_account_context, mock_file_service):
api = FileApi()
post_method = unwrap(api.post)
@ -232,9 +240,9 @@ class TestFileApiPost:
with app.test_request_context(method="POST", data=data):
with pytest.raises(FileTooLargeError):
post_method(api, mock_current_user)
post_method(api)
def test_unsupported_file_type(self, app: Flask, mock_current_user, mock_file_service):
def test_unsupported_file_type(self, app: Flask, mock_account_context, mock_file_service):
api = FileApi()
post_method = unwrap(api.post)
@ -249,9 +257,9 @@ class TestFileApiPost:
with app.test_request_context(method="POST", data=data):
with pytest.raises(UnsupportedFileTypeError):
post_method(api, mock_current_user)
post_method(api)
def test_blocked_extension(self, app: Flask, mock_current_user, mock_file_service):
def test_blocked_extension(self, app: Flask, mock_account_context, mock_file_service):
api = FileApi()
post_method = unwrap(api.post)
@ -266,17 +274,17 @@ class TestFileApiPost:
with app.test_request_context(method="POST", data=data):
with pytest.raises(BlockedFileExtensionError):
post_method(api, mock_current_user)
post_method(api)
class TestFilePreviewApi:
def test_get_preview(self, app: Flask, mock_current_tenant_id, mock_file_service):
def test_get_preview(self, app: Flask, mock_account_context, mock_file_service):
api = FilePreviewApi()
get_method = unwrap(api.get)
mock_file_service.get_file_preview.return_value = "preview text"
with app.test_request_context():
result = get_method(api, mock_current_tenant_id, "1234")
result = get_method(api, "1234")
assert result == {"content": "preview text"}

View File

@ -48,7 +48,6 @@ def _mock_upload_dependencies(
*,
file_size_within_limit: bool = True,
):
current_user = SimpleNamespace(id="u1")
file_info = SimpleNamespace(
filename="report.txt",
extension=".txt",
@ -64,6 +63,7 @@ def _mock_upload_dependencies(
file_service_cls = MagicMock()
file_service_cls.is_file_size_within_limit.return_value = file_size_within_limit
monkeypatch.setattr(remote_files_module, "FileService", file_service_cls)
monkeypatch.setattr(remote_files_module, "current_account_with_tenant", lambda: (SimpleNamespace(id="u1"), None))
monkeypatch.setattr(remote_files_module, "db", SimpleNamespace(engine=object()))
monkeypatch.setattr(
remote_files_module.file_helpers,
@ -71,7 +71,7 @@ def _mock_upload_dependencies(
lambda upload_file_id: f"https://signed.example/{upload_file_id}",
)
return file_service_cls, current_user
return file_service_cls
def test_get_remote_file_info_uses_head_when_successful(app, monkeypatch: pytest.MonkeyPatch) -> None:
@ -147,7 +147,7 @@ def test_remote_file_upload_success_when_fetch_falls_back_to_get(app, monkeypatc
get_mock = MagicMock(return_value=get_resp)
monkeypatch.setattr(remote_files_module.ssrf_proxy, "get", get_mock)
file_service_cls, current_user = _mock_upload_dependencies(monkeypatch)
file_service_cls = _mock_upload_dependencies(monkeypatch)
upload_file = SimpleNamespace(
id="file-1",
name="report.txt",
@ -160,7 +160,7 @@ def test_remote_file_upload_success_when_fetch_falls_back_to_get(app, monkeypatc
file_service_cls.return_value.upload_file.return_value = upload_file
with app.test_request_context(method="POST", json={"url": url}):
payload, status = handler(api, current_user)
payload, status = handler(api)
assert status == 201
assert payload["id"] == "file-1"
@ -170,7 +170,7 @@ def test_remote_file_upload_success_when_fetch_falls_back_to_get(app, monkeypatc
filename="report.txt",
content=b"fallback-content",
mimetype="text/plain",
user=current_user,
user=SimpleNamespace(id="u1"),
source_url=url,
)
@ -191,7 +191,7 @@ def test_remote_file_upload_fetches_content_with_second_get_when_head_succeeds(
get_mock = MagicMock(return_value=extra_get_resp)
monkeypatch.setattr(remote_files_module.ssrf_proxy, "get", get_mock)
file_service_cls, current_user = _mock_upload_dependencies(monkeypatch)
file_service_cls = _mock_upload_dependencies(monkeypatch)
upload_file = SimpleNamespace(
id="file-2",
name="photo.jpg",
@ -204,7 +204,7 @@ def test_remote_file_upload_fetches_content_with_second_get_when_head_succeeds(
file_service_cls.return_value.upload_file.return_value = upload_file
with app.test_request_context(method="POST", json={"url": url}):
payload, status = handler(api, current_user)
payload, status = handler(api)
assert status == 201
assert payload["id"] == "file-2"
@ -226,7 +226,7 @@ def test_remote_file_upload_raises_when_fallback_get_still_not_ok(app, monkeypat
with app.test_request_context(method="POST", json={"url": url}):
with pytest.raises(RemoteFileUploadError, match=f"Failed to fetch file from {url}: bad gateway"):
handler(api, SimpleNamespace(id="u1"))
handler(api)
def test_remote_file_upload_raises_on_httpx_request_error(app, monkeypatch: pytest.MonkeyPatch) -> None:
@ -243,7 +243,7 @@ def test_remote_file_upload_raises_on_httpx_request_error(app, monkeypatch: pyte
with app.test_request_context(method="POST", json={"url": url}):
with pytest.raises(RemoteFileUploadError, match=f"Failed to fetch file from {url}: network down"):
handler(api, SimpleNamespace(id="u1"))
handler(api)
def test_remote_file_upload_rejects_oversized_file(app, monkeypatch: pytest.MonkeyPatch) -> None:
@ -258,11 +258,11 @@ def test_remote_file_upload_rejects_oversized_file(app, monkeypatch: pytest.Monk
)
monkeypatch.setattr(remote_files_module.ssrf_proxy, "get", MagicMock())
_, current_user = _mock_upload_dependencies(monkeypatch, file_size_within_limit=False)
_mock_upload_dependencies(monkeypatch, file_size_within_limit=False)
with app.test_request_context(method="POST", json={"url": url}):
with pytest.raises(FileTooLargeError):
handler(api, current_user)
handler(api)
def test_remote_file_upload_translates_service_file_too_large_error(app, monkeypatch: pytest.MonkeyPatch) -> None:
@ -276,12 +276,12 @@ def test_remote_file_upload_translates_service_file_too_large_error(app, monkeyp
MagicMock(return_value=_FakeResponse(status_code=200, method="GET", content=b"payload")),
)
monkeypatch.setattr(remote_files_module.ssrf_proxy, "get", MagicMock())
file_service_cls, current_user = _mock_upload_dependencies(monkeypatch)
file_service_cls = _mock_upload_dependencies(monkeypatch)
file_service_cls.return_value.upload_file.side_effect = ServiceFileTooLargeError("size exceeded")
with app.test_request_context(method="POST", json={"url": url}):
with pytest.raises(FileTooLargeError, match="size exceeded"):
handler(api, current_user)
handler(api)
def test_remote_file_upload_translates_service_unsupported_type_error(app, monkeypatch: pytest.MonkeyPatch) -> None:
@ -295,9 +295,9 @@ def test_remote_file_upload_translates_service_unsupported_type_error(app, monke
MagicMock(return_value=_FakeResponse(status_code=200, method="GET", content=b"payload")),
)
monkeypatch.setattr(remote_files_module.ssrf_proxy, "get", MagicMock())
file_service_cls, current_user = _mock_upload_dependencies(monkeypatch)
file_service_cls = _mock_upload_dependencies(monkeypatch)
file_service_cls.return_value.upload_file.side_effect = ServiceUnsupportedFileTypeError()
with app.test_request_context(method="POST", json={"url": url}):
with pytest.raises(UnsupportedFileTypeError):
handler(api, current_user)
handler(api)

View File

@ -1,192 +0,0 @@
"""Verify the workflow persistence layer fans Inspector deltas to redis pub/sub.
The hook lives in ``core/app/workflow/layers/persistence.py``:
every ``_handle_node_*`` and the terminal ``_handle_graph_run_*`` handlers
call into ``services.workflow.inspector_events.publish_node_changed`` /
``publish_workflow_completed`` after the DB write succeeds. Those calls are
the only thing the Inspector SSE stream listens to, so any future refactor of
the persistence layer must keep them in place.
We don't reconstruct a full workflow engine here — the handlers are tested
in isolation by patching just the moving parts they touch
(``_workflow_execution`` + ``_node_execution_cache``) and asserting against
the publisher module's call sites. This keeps the test compact and tied to
the contract, not the implementation.
"""
from __future__ import annotations
from datetime import datetime
from typing import Any
from unittest.mock import MagicMock
import pytest
from core.app.workflow.layers import persistence as persistence_mod
from core.app.workflow.layers.persistence import WorkflowPersistenceLayer
@pytest.fixture
def layer() -> WorkflowPersistenceLayer:
"""Build a layer instance with all repository / trace deps stubbed.
We bypass ``__init__`` because constructing it for real pulls in the
workflow engine's app-generate-entity, repos, and a runtime state — none
of which matter for asserting that the publish-hook fires.
"""
instance = WorkflowPersistenceLayer.__new__(WorkflowPersistenceLayer)
# Minimum surface the handlers touch:
instance._workflow_execution_repository = MagicMock()
instance._workflow_node_execution_repository = MagicMock()
instance._trace_manager = None
instance._workflow_info = MagicMock(workflow_id="wf-1")
instance._application_generate_entity = MagicMock()
# Use a SimpleNamespace-like spec so Pydantic-validated callsites (e.g.
# ``WorkflowNodeExecution.new`` requires real strings) get the right types.
workflow_execution = MagicMock()
workflow_execution.id_ = "run-1"
workflow_execution.workflow_id = "wf-1"
workflow_execution.status = MagicMock(value="succeeded")
workflow_execution.outputs = {}
workflow_execution.error_message = None
workflow_execution.exceptions_count = 0
workflow_execution.finished_at = None
instance._workflow_execution = workflow_execution
instance._node_execution_cache = {}
instance._node_snapshots = {}
instance._node_sequence = 0
# `graph_runtime_state` is a layer-base property; stub it.
instance._graph_runtime_state = MagicMock(total_tokens=0, node_run_steps=0, outputs={}, exceptions_count=0)
return instance
@pytest.fixture
def capture_publishes(monkeypatch: pytest.MonkeyPatch) -> dict[str, list]:
"""Replace the two publishers with capture lists so each test can assert
on the exact arguments."""
calls: dict[str, list] = {"node": [], "workflow": []}
def fake_node(*, workflow_run_id: str, node_id: str, status: str) -> None:
calls["node"].append({"workflow_run_id": workflow_run_id, "node_id": node_id, "status": status})
def fake_workflow(*, workflow_run_id: str, status: str) -> None:
calls["workflow"].append({"workflow_run_id": workflow_run_id, "status": status})
monkeypatch.setattr(persistence_mod, "_inspector_publish_node_changed", fake_node)
monkeypatch.setattr(persistence_mod, "_inspector_publish_workflow_completed", fake_workflow)
return calls
# ──────────────────────────────────────────────────────────────────────────────
# Graph-level publish hooks
# ──────────────────────────────────────────────────────────────────────────────
def _graph_event(**kwargs: Any) -> MagicMock:
return MagicMock(**kwargs)
def test_graph_run_succeeded_publishes_workflow_completed(layer, capture_publishes):
layer._workflow_execution.status = MagicMock(value="succeeded")
layer._handle_graph_run_succeeded(_graph_event(outputs={"text": "hi"}))
assert capture_publishes["workflow"] == [{"workflow_run_id": "run-1", "status": "succeeded"}]
assert capture_publishes["node"] == []
def test_graph_run_partial_succeeded_publishes_workflow_completed(layer, capture_publishes):
layer._workflow_execution.status = MagicMock(value="partial-succeeded")
layer._handle_graph_run_partial_succeeded(_graph_event(outputs={}, exceptions_count=1))
assert capture_publishes["workflow"] == [{"workflow_run_id": "run-1", "status": "partial-succeeded"}]
def test_graph_run_failed_publishes_workflow_completed(layer, capture_publishes):
layer._workflow_execution.status = MagicMock(value="failed")
layer._handle_graph_run_failed(_graph_event(error="boom", exceptions_count=0))
assert capture_publishes["workflow"] == [{"workflow_run_id": "run-1", "status": "failed"}]
def test_graph_run_aborted_publishes_workflow_completed(layer, capture_publishes):
layer._workflow_execution.status = MagicMock(value="stopped")
layer._handle_graph_run_aborted(_graph_event(reason="user stop"))
assert capture_publishes["workflow"] == [{"workflow_run_id": "run-1", "status": "stopped"}]
def test_graph_run_paused_does_not_publish_completion(layer, capture_publishes):
"""Pause is not a terminal state — the Inspector keeps waiting for either
resume or a real terminal event."""
layer._handle_graph_run_paused(_graph_event(outputs={}))
assert capture_publishes["workflow"] == []
assert capture_publishes["node"] == []
# ──────────────────────────────────────────────────────────────────────────────
# Node-level publish hooks
# ──────────────────────────────────────────────────────────────────────────────
def _node_started_event(node_id: str = "agent-1", exec_id: str = "exec-1") -> MagicMock:
return MagicMock(
id=exec_id,
node_id=node_id,
node_type="agent",
node_title="Greeter",
predecessor_node_id=None,
in_iteration_id=None,
in_loop_id=None,
start_at=datetime(2026, 5, 26, 0, 0, 0),
)
def _seed_node_execution(layer: WorkflowPersistenceLayer, exec_id: str, node_id: str) -> None:
"""Inject a domain execution into the cache so the success / fail / etc
handlers (which look it up by id) can run without going through started."""
layer._node_execution_cache[exec_id] = MagicMock(
id=exec_id, node_id=node_id, status=MagicMock(value="running"), outputs={}, error=None
)
def test_node_started_publishes_running(layer, capture_publishes):
layer._handle_node_started(_node_started_event())
assert capture_publishes["node"] == [{"workflow_run_id": "run-1", "node_id": "agent-1", "status": "running"}]
def test_node_retry_publishes_retry(layer, capture_publishes):
_seed_node_execution(layer, exec_id="exec-1", node_id="agent-1")
event = MagicMock(id="exec-1", error="rate limit")
layer._handle_node_retry(event)
assert capture_publishes["node"] == [{"workflow_run_id": "run-1", "node_id": "agent-1", "status": "retry"}]
def test_node_succeeded_publishes_succeeded(layer, capture_publishes, monkeypatch: pytest.MonkeyPatch):
_seed_node_execution(layer, exec_id="exec-1", node_id="agent-1")
# Stub the inner _update_node_execution so we don't have to construct a
# full NodeRunResult — we only want to confirm the publish happens after.
monkeypatch.setattr(layer, "_update_node_execution", lambda *a, **kw: None)
event = MagicMock(id="exec-1", node_run_result=MagicMock(), finished_at=datetime.now())
layer._handle_node_succeeded(event)
assert capture_publishes["node"] == [{"workflow_run_id": "run-1", "node_id": "agent-1", "status": "succeeded"}]
def test_node_failed_publishes_failed(layer, capture_publishes, monkeypatch: pytest.MonkeyPatch):
_seed_node_execution(layer, exec_id="exec-1", node_id="agent-1")
monkeypatch.setattr(layer, "_update_node_execution", lambda *a, **kw: None)
event = MagicMock(id="exec-1", node_run_result=MagicMock(), error="bad", finished_at=datetime.now())
layer._handle_node_failed(event)
assert capture_publishes["node"] == [{"workflow_run_id": "run-1", "node_id": "agent-1", "status": "failed"}]
def test_node_exception_publishes_exception(layer, capture_publishes, monkeypatch: pytest.MonkeyPatch):
_seed_node_execution(layer, exec_id="exec-1", node_id="agent-1")
monkeypatch.setattr(layer, "_update_node_execution", lambda *a, **kw: None)
event = MagicMock(id="exec-1", node_run_result=MagicMock(), error="oom", finished_at=datetime.now())
layer._handle_node_exception(event)
assert capture_publishes["node"] == [{"workflow_run_id": "run-1", "node_id": "agent-1", "status": "exception"}]
def test_node_pause_requested_does_not_publish(layer, capture_publishes, monkeypatch: pytest.MonkeyPatch):
"""Node pause is not an Inspector-visible state — no publish."""
_seed_node_execution(layer, exec_id="exec-1", node_id="agent-1")
monkeypatch.setattr(layer, "_update_node_execution", lambda *a, **kw: None)
event = MagicMock(id="exec-1", node_run_result=MagicMock())
layer._handle_node_pause_requested(event)
assert capture_publishes["node"] == []

View File

@ -1,439 +0,0 @@
from __future__ import annotations
from collections.abc import Generator
from typing import Any
import pytest
from core.agent.entities import AgentToolEntity
from core.app.entities.app_invoke_entities import InvokeFrom
from core.tools.__base.tool import Tool
from core.tools.__base.tool_runtime import ToolRuntime
from core.tools.entities.common_entities import I18nObject
from core.tools.entities.tool_entities import (
ToolDescription,
ToolEntity,
ToolIdentity,
ToolInvokeMessage,
ToolParameter,
)
from core.workflow.nodes.agent_v2.plugin_tools_builder import (
WorkflowAgentPluginToolsBuilder,
WorkflowAgentPluginToolsBuildError,
)
from models.agent_config_entities import AgentSoulToolsConfig
class FakeRuntimeProvider:
def __init__(self, tool: Tool | Exception) -> None:
# Either a Tool to hand back, or an exception to raise on lookup. The
# latter lets tests exercise the error-mapping branches in
# ``WorkflowAgentPluginToolsBuilder._fetch_tool_runtime``.
self.tool = tool
self.last_agent_tool: AgentToolEntity | None = None
self.last_invoke_from: InvokeFrom | None = None
def get_agent_tool_runtime(
self,
tenant_id: str,
app_id: str,
agent_tool: AgentToolEntity,
user_id: str | None = None,
invoke_from: InvokeFrom = InvokeFrom.DEBUGGER,
variable_pool: Any | None = None,
) -> Tool:
self.last_agent_tool = agent_tool
self.last_invoke_from = invoke_from
if isinstance(self.tool, Exception):
raise self.tool
return self.tool
class FakeTool(Tool):
def tool_provider_type(self):
raise NotImplementedError
def _invoke(
self,
user_id: str,
tool_parameters: dict[str, Any],
conversation_id: str | None = None,
app_id: str | None = None,
message_id: str | None = None,
) -> ToolInvokeMessage | list[ToolInvokeMessage] | Generator[ToolInvokeMessage, None, None]:
raise NotImplementedError
def _tool(*, runtime_parameters: dict[str, Any] | None = None) -> FakeTool:
if runtime_parameters is None:
runtime_parameters = {"region": "us"}
parameters = [
ToolParameter(
name="query",
label=I18nObject(en_US="Query"),
type=ToolParameter.ToolParameterType.STRING,
form=ToolParameter.ToolParameterForm.LLM,
required=True,
llm_description="Search query",
),
ToolParameter(
name="region",
label=I18nObject(en_US="Region"),
type=ToolParameter.ToolParameterType.STRING,
form=ToolParameter.ToolParameterForm.FORM,
required=True,
),
]
entity = ToolEntity(
identity=ToolIdentity(
author="langgenius",
name="search",
label=I18nObject(en_US="Search"),
provider="search",
),
description=ToolDescription(human=I18nObject(en_US="Search"), llm="Search the web."),
parameters=parameters,
)
runtime = ToolRuntime(
tenant_id="tenant-1",
user_id="user-1",
credentials={"api_key": "secret"},
runtime_parameters=runtime_parameters,
)
return FakeTool(entity=entity, runtime=runtime)
def _build(
builder: WorkflowAgentPluginToolsBuilder,
tools: AgentSoulToolsConfig,
*,
invoke_from: InvokeFrom = InvokeFrom.DEBUGGER,
):
"""Shorthand for ``builder.build(...)`` with the standard tenant/app/user
triple, so each test only highlights what's actually unique to it."""
return builder.build(
tenant_id="tenant-1",
app_id="app-1",
user_id="user-1",
tools=tools,
invoke_from=invoke_from,
)
def test_builds_dify_plugin_tools_layer_from_existing_tool_runtime():
runtime_provider = FakeRuntimeProvider(_tool())
builder = WorkflowAgentPluginToolsBuilder(tool_runtime_provider=runtime_provider)
tools = AgentSoulToolsConfig.model_validate(
{
"dify_tools": [
{
"provider_id": "langgenius/search/search",
"tool_name": "search",
"credential_type": "api-key",
"credential_id": "credential-1",
"runtime_parameters": {"region": "us"},
}
]
}
)
result = _build(builder, tools)
assert result is not None
prepared = result.tools[0]
assert prepared.plugin_id == "langgenius/search"
assert prepared.provider == "search"
assert prepared.tool_name == "search"
assert prepared.name == "search"
assert prepared.credentials == {"api_key": "secret"}
assert prepared.runtime_parameters == {"region": "us"}
assert prepared.parameters_json_schema["properties"]["query"]["type"] == "string"
assert "region" not in prepared.parameters_json_schema["properties"]
assert runtime_provider.last_agent_tool is not None
assert runtime_provider.last_agent_tool.credential_id == "credential-1"
# Default ``provider_type`` is now ``"plugin"`` — the agent tool entity
# must surface that so ToolManager hits the plugin provider table, not the
# built-in legacy table.
assert runtime_provider.last_agent_tool.provider_type.value == "plugin"
def test_rejects_duplicate_exposed_tool_names():
builder = WorkflowAgentPluginToolsBuilder(tool_runtime_provider=FakeRuntimeProvider(_tool()))
tools = AgentSoulToolsConfig.model_validate(
{
"dify_tools": [
{
"provider_id": "langgenius/search/search",
"tool_name": "search",
"credential_type": "api-key",
"credential_id": "credential-1",
"runtime_parameters": {"region": "us"},
},
{
"provider_id": "langgenius/search/search",
"tool_name": "search",
"credential_type": "api-key",
"credential_id": "credential-1",
"runtime_parameters": {"region": "us"},
},
]
}
)
with pytest.raises(WorkflowAgentPluginToolsBuildError) as exc_info:
_build(builder, tools)
assert exc_info.value.error_code == "agent_tool_name_duplicated"
def test_rejects_missing_required_runtime_parameter():
builder = WorkflowAgentPluginToolsBuilder(tool_runtime_provider=FakeRuntimeProvider(_tool(runtime_parameters={})))
tools = AgentSoulToolsConfig.model_validate(
{
"dify_tools": [
{
"provider_id": "langgenius/search/search",
"tool_name": "search",
"credential_type": "api-key",
"credential_id": "credential-1",
}
]
}
)
with pytest.raises(WorkflowAgentPluginToolsBuildError) as exc_info:
_build(builder, tools)
assert exc_info.value.error_code == "agent_tool_runtime_parameter_missing"
# ──────────────────────────────────────────────────────────────────────────────
# invoke_from is threaded through to ToolManager
# ──────────────────────────────────────────────────────────────────────────────
def test_invoke_from_is_forwarded_to_tool_runtime_provider():
"""``WorkflowAgentRuntimeRequestBuilder`` passes the *real* runtime
invocation source (DEBUGGER for draft test run, SERVICE_API for published
run, etc.). ToolManager uses ``invoke_from`` for credential quotas / rate
limits / audit tags, so any default-falling-back here would silently
misattribute usage. Lock in the forwarding behaviour for both
representative invoke_from values."""
for invoke_from in (InvokeFrom.DEBUGGER, InvokeFrom.SERVICE_API, InvokeFrom.WEB_APP):
runtime_provider = FakeRuntimeProvider(_tool())
builder = WorkflowAgentPluginToolsBuilder(tool_runtime_provider=runtime_provider)
tools = AgentSoulToolsConfig.model_validate(
{
"dify_tools": [
{
"provider_id": "langgenius/search/search",
"tool_name": "search",
"credential_type": "api-key",
"credential_id": "credential-1",
"runtime_parameters": {"region": "us"},
}
]
}
)
_build(builder, tools, invoke_from=invoke_from)
assert runtime_provider.last_invoke_from == invoke_from
# ──────────────────────────────────────────────────────────────────────────────
# disabled tools / plugin_id+provider fallback / unauthorized credentials
# ──────────────────────────────────────────────────────────────────────────────
def test_disabled_tools_are_skipped():
runtime_provider = FakeRuntimeProvider(_tool())
builder = WorkflowAgentPluginToolsBuilder(tool_runtime_provider=runtime_provider)
tools = AgentSoulToolsConfig.model_validate(
{
"dify_tools": [
{
"provider_id": "langgenius/search/search",
"tool_name": "search",
"credential_type": "api-key",
"credential_id": "credential-1",
"runtime_parameters": {"region": "us"},
"enabled": False,
}
]
}
)
# All entries are disabled → builder short-circuits and returns None so the
# request_builder skips adding the tools layer entirely.
assert _build(builder, tools) is None
assert runtime_provider.last_agent_tool is None # ToolManager never queried
def test_plugin_id_plus_provider_fallback_when_provider_id_missing():
"""Frontend may send ``plugin_id`` + ``provider`` instead of the
concatenated ``provider_id``; the builder must accept both shapes."""
runtime_provider = FakeRuntimeProvider(_tool())
builder = WorkflowAgentPluginToolsBuilder(tool_runtime_provider=runtime_provider)
tools = AgentSoulToolsConfig.model_validate(
{
"dify_tools": [
{
"plugin_id": "langgenius/search",
"provider": "search",
"tool_name": "search",
"credential_type": "api-key",
"credential_id": "credential-1",
"runtime_parameters": {"region": "us"},
}
]
}
)
result = _build(builder, tools)
assert result is not None
assert runtime_provider.last_agent_tool is not None
assert runtime_provider.last_agent_tool.provider_id == "langgenius/search/search"
assert result.tools[0].plugin_id == "langgenius/search"
assert result.tools[0].provider == "search"
def test_unauthorized_tool_without_credentials():
"""``credential_type=unauthorized`` removes the ``credential_ref.id``
requirement (e.g. public Wikipedia / current_time tools)."""
def _no_credentials_tool() -> FakeTool:
tool = _tool()
assert tool.runtime is not None
tool.runtime.credentials = {}
return tool
runtime_provider = FakeRuntimeProvider(_no_credentials_tool())
builder = WorkflowAgentPluginToolsBuilder(tool_runtime_provider=runtime_provider)
tools = AgentSoulToolsConfig.model_validate(
{
"dify_tools": [
{
"provider_id": "langgenius/time/time",
"tool_name": "current_time",
"credential_type": "unauthorized",
"runtime_parameters": {"region": "us"},
}
]
}
)
result = _build(builder, tools)
assert result is not None
assert result.tools[0].credential_type == "unauthorized"
assert result.tools[0].credentials == {}
# ──────────────────────────────────────────────────────────────────────────────
# Error-code mapping: declaration not found / credential invalid / config
# ──────────────────────────────────────────────────────────────────────────────
def _standard_tools_payload() -> AgentSoulToolsConfig:
return AgentSoulToolsConfig.model_validate(
{
"dify_tools": [
{
"provider_id": "langgenius/search/search",
"tool_name": "search",
"credential_type": "api-key",
"credential_id": "credential-1",
"runtime_parameters": {"region": "us"},
}
]
}
)
def test_tool_provider_not_found_maps_to_declaration_not_found():
from core.tools.errors import ToolProviderNotFoundError
builder = WorkflowAgentPluginToolsBuilder(
tool_runtime_provider=FakeRuntimeProvider(ToolProviderNotFoundError("provider gone"))
)
with pytest.raises(WorkflowAgentPluginToolsBuildError) as exc_info:
_build(builder, _standard_tools_payload())
assert exc_info.value.error_code == "agent_tool_declaration_not_found"
def test_credential_validation_error_maps_to_credential_invalid():
from core.tools.errors import ToolProviderCredentialValidationError
builder = WorkflowAgentPluginToolsBuilder(
tool_runtime_provider=FakeRuntimeProvider(ToolProviderCredentialValidationError("creds expired"))
)
with pytest.raises(WorkflowAgentPluginToolsBuildError) as exc_info:
_build(builder, _standard_tools_payload())
assert exc_info.value.error_code == "agent_tool_credential_invalid"
def test_generic_value_error_maps_to_config_invalid():
"""Bare ``ValueError`` from ToolManager (e.g. "runtime not found") becomes
``agent_tool_config_invalid`` — distinct from
``agent_tool_declaration_not_found`` so callers can render a different
hint."""
builder = WorkflowAgentPluginToolsBuilder(tool_runtime_provider=FakeRuntimeProvider(ValueError("runtime missing")))
with pytest.raises(WorkflowAgentPluginToolsBuildError) as exc_info:
_build(builder, _standard_tools_payload())
assert exc_info.value.error_code == "agent_tool_config_invalid"
# ──────────────────────────────────────────────────────────────────────────────
# Non-scalar credentials rejected instead of silently str()'d
# ──────────────────────────────────────────────────────────────────────────────
def test_rejects_non_scalar_credential_value():
"""If a credential ever shows up shaped like ``{"access_token": "..."}``,
``str(value)`` would forward a Python repr to the plugin daemon. The
builder should refuse and surface an explicit error code so an operator
fixes the credential schema instead of debugging a daemon JSON parse
failure."""
def _dict_credential_tool() -> FakeTool:
tool = _tool()
assert tool.runtime is not None
tool.runtime.credentials = {"oauth": {"access_token": "secret", "expires_in": 3600}}
return tool
builder = WorkflowAgentPluginToolsBuilder(tool_runtime_provider=FakeRuntimeProvider(_dict_credential_tool()))
with pytest.raises(WorkflowAgentPluginToolsBuildError) as exc_info:
_build(builder, _standard_tools_payload())
assert exc_info.value.error_code == "agent_tool_credential_shape_invalid"
# ──────────────────────────────────────────────────────────────────────────────
# Legacy payload normalization
# ──────────────────────────────────────────────────────────────────────────────
def test_legacy_provider_name_and_tool_parameters_normalized():
"""Old Composer save payloads used ``provider_name`` / ``tool_parameters``
keys. The ``@model_validator(mode="before")`` on AgentSoulDifyToolConfig
rewrites them in-place so reading historical Agent Soul snapshots from the
DB still works."""
config = AgentSoulToolsConfig.model_validate(
{
"dify_tools": [
{
"provider_name": "langgenius/search/search",
"tool_name": "search",
"credential_type": "api-key",
"credential_id": "credential-1",
"tool_parameters": {"region": "us"},
}
]
}
)
tool = config.dify_tools[0]
assert tool.provider_id == "langgenius/search/search"
assert tool.runtime_parameters == {"region": "us"}
assert tool.credential_ref is not None
assert tool.credential_ref.id == "credential-1"

View File

@ -1,9 +1,8 @@
from dataclasses import replace
import pytest
from dify_agent.layers.dify_plugin import DifyPluginToolConfig, DifyPluginToolsLayerConfig
from clients.agent_backend import DIFY_EXECUTION_CONTEXT_LAYER_ID, DIFY_PLUGIN_TOOLS_LAYER_ID
from clients.agent_backend import DIFY_EXECUTION_CONTEXT_LAYER_ID
from core.app.entities.app_invoke_entities import DifyRunContext, InvokeFrom, UserFrom
from core.workflow.nodes.agent_v2.runtime_request_builder import (
WorkflowAgentRuntimeBuildContext,
@ -27,38 +26,6 @@ class FakeCredentialsProvider:
return {"api_key": "secret-key"}
class FakePluginToolsBuilder:
def __init__(self) -> None:
# Capture the runtime invocation source so tests can assert it was
# threaded through from ``DifyRunContext.invoke_from`` rather than
# hard-coded to a placeholder like ``VALIDATION``.
self.last_invoke_from: InvokeFrom | None = None
def build(self, *, tenant_id, app_id, user_id, tools, invoke_from):
assert tenant_id == "tenant-1"
assert app_id == "app-1"
assert user_id == "user-1"
self.last_invoke_from = invoke_from
if not tools.dify_tools:
return None
return DifyPluginToolsLayerConfig(
tools=[
DifyPluginToolConfig(
plugin_id="langgenius/time",
provider="time",
tool_name="current_time",
credential_type="unauthorized",
name="current_time",
description="Get current time.",
credentials={},
runtime_parameters={},
parameters=[],
parameters_json_schema={"type": "object", "properties": {}, "required": []},
)
]
)
class FakeVariablePool:
def get(self, selector):
if list(selector) == ["sys", "query"]:
@ -188,60 +155,8 @@ def test_builds_workflow_run_request_with_file_output_schema_and_reserved_metada
assert output_schema["properties"]["confidence"]["type"] == "number"
assert output_schema["required"] == ["report"]
assert dumped["composition"]["layers"][4]["config"]["model_settings"] == {"temperature": 0.2}
assert result.metadata["runtime_support"]["reserved_status"]["tools.dify_tools"] == "supported_when_config_valid"
assert result.metadata["runtime_support"]["reserved_status"]["tools.cli_tools"] == "reserved_not_executed"
warnings = result.metadata["runtime_support"]["unsupported_runtime_warnings"]
assert warnings[0]["section"] == "agent_soul.tools.cli_tools"
def test_builds_workflow_run_request_with_dify_plugin_tools_layer():
context = _context()
snapshot = AgentConfigSnapshot(
id="snapshot-1",
tenant_id="tenant-1",
agent_id="agent-1",
version=1,
config_snapshot=AgentSoulConfig(
prompt={"system_prompt": "You are careful."},
model=AgentSoulModelConfig(
plugin_id="langgenius/openai",
model_provider="openai",
model="gpt-test",
),
tools={
"dify_tools": [
{
"provider_id": "langgenius/time/time",
"tool_name": "current_time",
"credential_type": "unauthorized",
}
]
},
),
)
context = replace(context, snapshot=snapshot)
plugin_tools_builder = FakePluginToolsBuilder()
result = WorkflowAgentRuntimeRequestBuilder(
credentials_provider=FakeCredentialsProvider(),
plugin_tools_builder=plugin_tools_builder,
).build(context)
dumped = result.request.model_dump(mode="json")
layers = {layer["name"]: layer for layer in dumped["composition"]["layers"]}
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID]["type"] == "dify.plugin.tools"
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID]["deps"] == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID]["config"]["tools"][0]["tool_name"] == "current_time"
assert result.metadata["agent_tools"] == {
"dify_tool_count": 1,
"dify_tool_names": ["current_time"],
"cli_tool_count": 0,
}
# The runtime invocation source must flow from ``DifyRunContext.invoke_from``
# into the plugin tools builder so ToolManager attributes credential
# quotas / rate limits / audit tags to the real call site instead of a
# hard-coded ``VALIDATION`` placeholder.
assert plugin_tools_builder.last_invoke_from == context.dify_context.invoke_from
assert result.metadata["runtime_support"]["reserved_status"]["tools"] == "reserved_not_executed"
assert result.metadata["runtime_support"]["unsupported_runtime_warnings"][0]["section"] == "agent_soul.tools"
def test_requires_agent_soul_model_config():

View File

@ -1,224 +0,0 @@
"""Unit tests for :mod:`services.workflow.inspector_events`.
The publisher and subscriber both touch redis, so we mock it out at the
``redis_client`` boundary. The goal is to lock down:
1. the channel-naming convention (frontend SSE doesn't need to know it but
tests catch accidental renames),
2. the JSON envelope (``kind / workflow_run_id / node_id / status``),
3. publisher robustness when redis is unavailable,
4. subscriber's tolerance of malformed payloads and bytes-vs-str messages,
5. subscriber's heartbeat-on-idle behaviour.
"""
from __future__ import annotations
import json
from collections.abc import Iterator
from typing import Any
from unittest.mock import MagicMock, patch
from services.workflow import inspector_events
from services.workflow.inspector_events import InspectorMessage
# ──────────────────────────────────────────────────────────────────────────────
# Channel + envelope
# ──────────────────────────────────────────────────────────────────────────────
def test_channel_for_returns_namespaced_key():
assert inspector_events.channel_for("run-42") == "dify:inspector:workflow_run:run-42"
def test_inspector_message_to_json_round_trip():
msg = InspectorMessage(kind="node_changed", workflow_run_id="r1", node_id="agent-1", status="succeeded")
parsed = json.loads(msg.to_json())
assert parsed == {"kind": "node_changed", "workflow_run_id": "r1", "node_id": "agent-1", "status": "succeeded"}
def test_inspector_message_from_json_rejects_bad_kind():
blob = json.dumps({"kind": "something_else", "workflow_run_id": "r1"})
assert InspectorMessage.from_json(blob) is None
def test_inspector_message_from_json_rejects_bad_workflow_run_id():
blob = json.dumps({"kind": "node_changed", "workflow_run_id": ""})
assert InspectorMessage.from_json(blob) is None
def test_inspector_message_from_json_rejects_non_string_node_id():
blob = json.dumps({"kind": "node_changed", "workflow_run_id": "r1", "node_id": 42})
assert InspectorMessage.from_json(blob) is None
def test_inspector_message_from_json_returns_none_for_invalid_json():
assert InspectorMessage.from_json("{not json") is None
def test_inspector_message_from_json_rejects_non_dict_payload():
"""Defensive: a JSON array or scalar is not an InspectorMessage."""
assert InspectorMessage.from_json("[1, 2, 3]") is None
assert InspectorMessage.from_json('"plain string"') is None
def test_inspector_message_from_json_rejects_non_string_status():
"""Status field, if present, must be a string."""
blob = json.dumps({"kind": "workflow_completed", "workflow_run_id": "r1", "status": 42})
assert InspectorMessage.from_json(blob) is None
# ──────────────────────────────────────────────────────────────────────────────
# Publisher
# ──────────────────────────────────────────────────────────────────────────────
def test_publish_node_changed_writes_to_run_channel():
fake_redis = MagicMock()
with patch.object(inspector_events, "redis_client", fake_redis):
inspector_events.publish_node_changed(workflow_run_id="run-1", node_id="agent-1", status="running")
fake_redis.publish.assert_called_once()
channel, blob = fake_redis.publish.call_args.args
assert channel == "dify:inspector:workflow_run:run-1"
msg = InspectorMessage.from_json(blob)
assert msg is not None
assert msg.kind == "node_changed"
assert msg.node_id == "agent-1"
assert msg.status == "running"
def test_publish_workflow_completed_emits_terminal_message():
fake_redis = MagicMock()
with patch.object(inspector_events, "redis_client", fake_redis):
inspector_events.publish_workflow_completed(workflow_run_id="run-1", status="succeeded")
blob = fake_redis.publish.call_args.args[1]
msg = InspectorMessage.from_json(blob)
assert msg is not None
assert msg.kind == "workflow_completed"
assert msg.node_id is None
assert msg.status == "succeeded"
def test_publish_swallows_redis_errors():
"""Persistence must not crash if redis blows up — we publish best-effort."""
class _BrokenRedis:
def publish(self, *_args: Any, **_kwargs: Any) -> None:
raise RuntimeError("redis offline")
with patch.object(inspector_events, "redis_client", _BrokenRedis()):
# No exception should escape.
inspector_events.publish_node_changed(workflow_run_id="run-1", node_id="agent-1", status="running")
# ──────────────────────────────────────────────────────────────────────────────
# Subscriber
# ──────────────────────────────────────────────────────────────────────────────
def _make_fake_pubsub(messages: list[dict[str, Any] | None]) -> MagicMock:
"""Build a redis pubsub stub that replays ``messages`` then raises StopIteration."""
pubsub = MagicMock()
it: Iterator[dict[str, Any] | None] = iter(messages)
pubsub.get_message.side_effect = lambda **_kwargs: next(it, None)
return pubsub
def test_subscribe_yields_heartbeat_then_real_message():
"""Idle ticks (``get_message`` returns None) surface as a sentinel; real
payloads decode to ``InspectorMessage`` instances."""
payload = json.dumps(
{"kind": "node_changed", "workflow_run_id": "run-1", "node_id": "agent-1", "status": "succeeded"}
)
fake_redis = MagicMock()
fake_redis.pubsub.return_value = _make_fake_pubsub(
[
None, # heartbeat tick
{"data": payload.encode("utf-8")}, # bytes payload, real message
None, # heartbeat
]
)
with patch.object(inspector_events, "redis_client", fake_redis):
gen = inspector_events.subscribe("run-1", timeout_seconds=0.0)
first = next(gen)
second = next(gen)
third = next(gen)
# First message is the heartbeat sentinel (both node_id and status are None).
assert first.node_id is None
assert first.status is None
# Second is the real one.
assert second.kind == "node_changed"
assert second.node_id == "agent-1"
assert second.status == "succeeded"
# Third is another heartbeat.
assert third.node_id is None
def test_subscribe_skips_malformed_payloads():
fake_redis = MagicMock()
fake_redis.pubsub.return_value = _make_fake_pubsub(
[
{"data": b"not json at all"},
{"data": json.dumps({"kind": "node_changed", "workflow_run_id": "run-1"}).encode("utf-8")},
]
)
with patch.object(inspector_events, "redis_client", fake_redis):
gen = inspector_events.subscribe("run-1", timeout_seconds=0.0)
msg = next(gen)
assert msg.kind == "node_changed"
assert msg.node_id is None
def test_subscribe_unsubscribes_on_teardown():
fake_pubsub = _make_fake_pubsub([None])
fake_redis = MagicMock()
fake_redis.pubsub.return_value = fake_pubsub
with patch.object(inspector_events, "redis_client", fake_redis):
gen = inspector_events.subscribe("run-1", timeout_seconds=0.0)
next(gen)
gen.close()
fake_pubsub.unsubscribe.assert_called_once_with("dify:inspector:workflow_run:run-1")
fake_pubsub.close.assert_called_once()
def test_subscribe_swallows_teardown_errors():
"""``unsubscribe`` / ``close`` failures must not propagate out of the
generator — they're best-effort cleanup."""
fake_pubsub = MagicMock()
fake_pubsub.get_message.return_value = None
fake_pubsub.unsubscribe.side_effect = RuntimeError("redis offline")
fake_pubsub.close.side_effect = RuntimeError("close failed")
fake_redis = MagicMock()
fake_redis.pubsub.return_value = fake_pubsub
with patch.object(inspector_events, "redis_client", fake_redis):
gen = inspector_events.subscribe("run-1", timeout_seconds=0.0)
next(gen)
# The teardown path runs in ``finally``; closing the generator
# exercises it. No exception should escape.
gen.close()
def test_subscribe_skips_non_string_data_payloads():
"""``raw["data"]`` can be ``None`` / int / bytes — only str is decodable
and the rest are silently skipped."""
fake_pubsub = MagicMock()
msgs: list[dict[str, Any] | None] = [
{"data": None}, # missing payload
{"data": 12345}, # int payload (shouldn't happen, defensive)
{
"data": json.dumps(
{"kind": "node_changed", "workflow_run_id": "run-1", "node_id": "agent-1", "status": "running"}
)
},
]
it = iter(msgs)
fake_pubsub.get_message.side_effect = lambda **_kw: next(it, None)
fake_redis = MagicMock()
fake_redis.pubsub.return_value = fake_pubsub
with patch.object(inspector_events, "redis_client", fake_redis):
gen = inspector_events.subscribe("run-1", timeout_seconds=0.0)
msg = next(gen)
assert msg.kind == "node_changed"
assert msg.node_id == "agent-1"

View File

@ -1,499 +0,0 @@
"""Unit tests for NodeOutputInspectorService (Stage 4 §8).
The service reads from postgres and resolves agent v2 bindings; this suite
mocks ``session_factory`` and the binding resolver so we exercise the
view-construction logic without DB / network access.
"""
from __future__ import annotations
import json
from datetime import UTC, datetime
from types import SimpleNamespace
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from graphon.enums import WorkflowExecutionStatus, WorkflowNodeExecutionStatus
from models.agent_config_entities import (
DeclaredArrayItem,
DeclaredOutputConfig,
DeclaredOutputType,
)
from models.enums import WorkflowRunTriggeredFrom
from services.workflow.node_output_inspector_service import (
NodeOutputInspectorError,
NodeOutputInspectorService,
NodeOutputStatus,
NodeStatus,
)
# ──────────────────────────────────────────────────────────────────────────────
# Fixtures
# ──────────────────────────────────────────────────────────────────────────────
def _app_model(*, tenant_id: str = "tenant-1", app_id: str = "app-1"):
return SimpleNamespace(tenant_id=tenant_id, id=app_id)
def _workflow_run(
*,
run_id: str = "run-1",
workflow_id: str = "workflow-1",
tenant_id: str = "tenant-1",
app_id: str = "app-1",
triggered_from: WorkflowRunTriggeredFrom = WorkflowRunTriggeredFrom.DEBUGGING,
status: WorkflowExecutionStatus = WorkflowExecutionStatus.RUNNING,
nodes: list[dict[str, Any]] | None = None,
):
return SimpleNamespace(
id=run_id,
workflow_id=workflow_id,
tenant_id=tenant_id,
app_id=app_id,
triggered_from=triggered_from,
status=status,
graph=json.dumps({"nodes": nodes or []}),
)
def _execution(
*,
node_id: str,
node_type: str = "agent",
title: str = "",
status: WorkflowNodeExecutionStatus = WorkflowNodeExecutionStatus.SUCCEEDED,
outputs: dict[str, Any] | None = None,
execution_metadata: dict[str, Any] | None = None,
index: int = 1,
created_at: datetime | None = None,
finished_at: datetime | None = None,
):
return SimpleNamespace(
node_id=node_id,
node_type=node_type,
title=title or node_id,
status=status,
outputs=json.dumps(outputs) if outputs is not None else None,
execution_metadata=json.dumps(execution_metadata) if execution_metadata is not None else None,
index=index,
created_at=created_at or datetime.now(UTC),
finished_at=finished_at,
)
def _agent_v2_node(*, node_id: str = "agent-node-1", title: str = "My Agent") -> dict[str, Any]:
return {
"id": node_id,
"data": {"type": "agent", "version": "2", "title": title},
}
def _non_agent_node(*, node_id: str = "tool-node-1", node_type: str = "tool", title: str = "Slack") -> dict[str, Any]:
return {
"id": node_id,
"data": {"type": node_type, "title": title},
}
def _patch_session(
*,
workflow_run: SimpleNamespace | None,
executions: list[SimpleNamespace] | None = None,
):
"""Patch ``session_factory.create_session`` to return the configured rows.
Returns a context manager that the test uses with ``with``.
"""
executions = executions or []
mock_session = MagicMock()
mock_session.scalar.return_value = workflow_run
mock_session.scalars.return_value.all.return_value = executions
cm = MagicMock()
cm.__enter__.return_value = mock_session
cm.__exit__.return_value = False
return patch(
"services.workflow.node_output_inspector_service.session_factory.create_session",
return_value=cm,
)
def _stub_binding_resolver(*, declared_outputs: list[DeclaredOutputConfig]):
"""Build a fake ``WorkflowAgentBindingResolver`` whose ``.resolve`` returns
a binding with ``node_job_config_dict.declared_outputs``."""
binding = SimpleNamespace(
id="binding-1",
node_job_config_dict={
"workflow_prompt": "stub",
"declared_outputs": [o.model_dump() for o in declared_outputs],
},
)
bundle = SimpleNamespace(binding=binding, agent=None, snapshot=None)
resolver = MagicMock()
resolver.resolve.return_value = bundle
return resolver
def _make_service(declared_outputs: list[DeclaredOutputConfig] | None = None) -> NodeOutputInspectorService:
return NodeOutputInspectorService(binding_resolver=_stub_binding_resolver(declared_outputs=declared_outputs or []))
# ──────────────────────────────────────────────────────────────────────────────
# 404 paths
# ──────────────────────────────────────────────────────────────────────────────
def test_snapshot_404_when_workflow_run_missing():
service = _make_service()
with _patch_session(workflow_run=None):
with pytest.raises(NodeOutputInspectorError) as exc:
service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="missing")
assert exc.value.code == "workflow_run_not_found"
def test_snapshot_accepts_published_run_d1_lifted():
"""D-1 was lifted 2026-05-26: any ``triggered_from`` is now accepted."""
service = _make_service()
run = _workflow_run(
nodes=[_agent_v2_node(node_id="agent-1")],
triggered_from=WorkflowRunTriggeredFrom.APP_RUN,
)
with _patch_session(workflow_run=run, executions=[]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
assert snapshot.workflow_run_id == "run-1"
assert [n.node_id for n in snapshot.node_outputs] == ["agent-1"]
def test_snapshot_accepts_webhook_triggered_run():
"""Webhook / schedule / plugin triggers are also published-side."""
service = _make_service()
run = _workflow_run(
nodes=[_agent_v2_node(node_id="agent-1")],
triggered_from=WorkflowRunTriggeredFrom.WEBHOOK,
)
with _patch_session(workflow_run=run, executions=[]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
assert snapshot.workflow_run_id == "run-1"
def test_node_detail_404_when_node_id_absent_from_graph():
service = _make_service()
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
with _patch_session(workflow_run=run, executions=[]):
with pytest.raises(NodeOutputInspectorError) as exc:
service.node_detail(app_model=_app_model(), workflow_run_id="run-1", node_id="ghost")
assert exc.value.code == "node_not_in_workflow_run"
def test_output_preview_404_when_output_name_unknown():
service = _make_service(
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
ex = _execution(node_id="agent-1", outputs={"text": "hello"})
with _patch_session(workflow_run=run, executions=[ex]):
with pytest.raises(NodeOutputInspectorError) as exc:
service.output_preview(
app_model=_app_model(),
workflow_run_id="run-1",
node_id="agent-1",
output_name="missing",
)
assert exc.value.code == "node_output_not_declared"
# ──────────────────────────────────────────────────────────────────────────────
# Snapshot happy path
# ──────────────────────────────────────────────────────────────────────────────
def test_snapshot_status_pending_when_node_has_no_execution():
service = _make_service(
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
with _patch_session(workflow_run=run, executions=[]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
assert len(snapshot.node_outputs) == 1
node = snapshot.node_outputs[0]
assert node.node_status == NodeStatus.IDLE
assert node.outputs[0].status == NodeOutputStatus.PENDING
def test_snapshot_status_running():
service = _make_service(
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
ex = _execution(node_id="agent-1", status=WorkflowNodeExecutionStatus.RUNNING)
with _patch_session(workflow_run=run, executions=[ex]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
assert snapshot.node_outputs[0].node_status == NodeStatus.RUNNING
assert snapshot.node_outputs[0].outputs[0].status == NodeOutputStatus.RUNNING
def test_snapshot_status_failed_node_marks_all_outputs_failed():
service = _make_service(
declared_outputs=[
DeclaredOutputConfig(name="a", type=DeclaredOutputType.STRING),
DeclaredOutputConfig(name="b", type=DeclaredOutputType.NUMBER),
],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
ex = _execution(node_id="agent-1", status=WorkflowNodeExecutionStatus.FAILED)
with _patch_session(workflow_run=run, executions=[ex]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
statuses = {o.name: o.status for o in snapshot.node_outputs[0].outputs}
assert statuses == {"a": NodeOutputStatus.FAILED, "b": NodeOutputStatus.FAILED}
def test_snapshot_status_ready_when_outputs_present_and_no_failure_metadata():
service = _make_service(
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
ex = _execution(node_id="agent-1", outputs={"text": "hello"})
with _patch_session(workflow_run=run, executions=[ex]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
output = snapshot.node_outputs[0].outputs[0]
assert output.status == NodeOutputStatus.READY
assert output.value_preview == "hello"
def test_snapshot_marks_type_check_failure():
service = _make_service(
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
ex = _execution(
node_id="agent-1",
outputs={"text": "ok"},
execution_metadata={
"output_type_check": {
"passed": False,
"results": [{"name": "text", "type": "string", "status": "type_check_failed", "reason": "wrong shape"}],
}
},
)
with _patch_session(workflow_run=run, executions=[ex]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
output = snapshot.node_outputs[0].outputs[0]
assert output.status == NodeOutputStatus.TYPE_CHECK_FAILED
assert output.type_check is not None
assert output.type_check.passed is False
assert output.type_check.reason == "wrong shape"
def test_snapshot_marks_output_check_failure_when_type_check_passed():
service = _make_service(
declared_outputs=[
DeclaredOutputConfig(
name="report",
type=DeclaredOutputType.FILE,
)
],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
ex = _execution(
node_id="agent-1",
outputs={"report": {"file_id": "550e8400-e29b-41d4-a716-446655440000"}},
execution_metadata={
"output_type_check": {"passed": True, "results": [{"name": "report", "status": "ready"}]},
"output_check": {
"passed": False,
"results": [{"name": "report", "status": "failed", "reason": "benchmark mismatch"}],
},
},
)
with (
_patch_session(workflow_run=run, executions=[ex]),
patch(
"services.workflow.node_output_inspector_service.file_helpers.get_signed_file_url",
return_value="https://signed.example/x",
),
):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
output = snapshot.node_outputs[0].outputs[0]
assert output.status == NodeOutputStatus.OUTPUT_CHECK_FAILED
assert output.output_check is not None
assert output.output_check.passed is False
assert output.output_check.reason == "benchmark mismatch"
def test_snapshot_marks_not_produced_when_declared_output_missing_from_payload():
service = _make_service(
declared_outputs=[
DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING),
DeclaredOutputConfig(name="optional_meta", type=DeclaredOutputType.OBJECT, required=False),
],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
ex = _execution(node_id="agent-1", outputs={"text": "hi"}) # optional_meta missing
with _patch_session(workflow_run=run, executions=[ex]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
statuses = {o.name: o.status for o in snapshot.node_outputs[0].outputs}
assert statuses == {"text": NodeOutputStatus.READY, "optional_meta": NodeOutputStatus.NOT_PRODUCED}
# ──────────────────────────────────────────────────────────────────────────────
# Non-agent node — outputs inferred from execution payload
# ──────────────────────────────────────────────────────────────────────────────
def test_non_agent_node_outputs_inferred_from_payload_keys():
service = _make_service()
run = _workflow_run(nodes=[_non_agent_node(node_id="tool-1", node_type="tool")])
ex = _execution(
node_id="tool-1",
node_type="tool",
outputs={"message": "sent", "thread_ts": "1234"},
)
with _patch_session(workflow_run=run, executions=[ex]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
output_names = sorted(o.name for o in snapshot.node_outputs[0].outputs)
assert output_names == ["message", "thread_ts"]
# All inferred outputs should have ``type=None`` since we don't know the
# schema yet.
assert all(o.type is None for o in snapshot.node_outputs[0].outputs)
# ──────────────────────────────────────────────────────────────────────────────
# File preview / signed URL
# ──────────────────────────────────────────────────────────────────────────────
def test_file_output_preview_includes_signed_url():
service = _make_service(
declared_outputs=[
DeclaredOutputConfig(name="report", type=DeclaredOutputType.FILE),
],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
file_payload = {"file_id": "550e8400-e29b-41d4-a716-446655440000", "filename": "x.pdf"}
ex = _execution(node_id="agent-1", outputs={"report": file_payload})
with (
_patch_session(workflow_run=run, executions=[ex]),
patch(
"services.workflow.node_output_inspector_service.file_helpers.get_signed_file_url",
return_value="https://signed.example/x.pdf",
),
):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
preview_value = snapshot.node_outputs[0].outputs[0].value_preview
assert isinstance(preview_value, dict)
assert preview_value["preview_url"] == "https://signed.example/x.pdf"
assert preview_value["filename"] == "x.pdf"
def test_file_output_preview_endpoint_returns_full_value_with_signed_url():
service = _make_service(
declared_outputs=[
DeclaredOutputConfig(name="report", type=DeclaredOutputType.FILE),
],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
file_payload = {"file_id": "550e8400-e29b-41d4-a716-446655440000", "filename": "x.pdf"}
ex = _execution(node_id="agent-1", outputs={"report": file_payload})
with (
_patch_session(workflow_run=run, executions=[ex]),
patch(
"services.workflow.node_output_inspector_service.file_helpers.get_signed_file_url",
return_value="https://signed.example/x.pdf",
),
):
preview = service.output_preview(
app_model=_app_model(),
workflow_run_id="run-1",
node_id="agent-1",
output_name="report",
)
assert preview.output_name == "report"
assert preview.status == NodeOutputStatus.READY
assert isinstance(preview.value, dict)
assert preview.value["preview_url"] == "https://signed.example/x.pdf"
# ──────────────────────────────────────────────────────────────────────────────
# Retry / metadata
# ──────────────────────────────────────────────────────────────────────────────
def test_retried_count_pulled_from_attempt_metadata():
service = _make_service(
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
ex = _execution(
node_id="agent-1",
outputs={"text": "ok"},
execution_metadata={"attempt": 2},
)
with _patch_session(workflow_run=run, executions=[ex]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
assert snapshot.node_outputs[0].outputs[0].retried == 2
# ──────────────────────────────────────────────────────────────────────────────
# Latest-execution-per-node grouping
# ──────────────────────────────────────────────────────────────────────────────
def test_keeps_latest_execution_per_node_by_index():
"""When a node has multiple executions (retries / iterations) keep the
canonical one — the row with the highest ``index``."""
service = _make_service(
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
older = _execution(node_id="agent-1", outputs={"text": "old"}, index=1)
newer = _execution(node_id="agent-1", outputs={"text": "new"}, index=5)
with _patch_session(workflow_run=run, executions=[older, newer]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
assert snapshot.node_outputs[0].outputs[0].value_preview == "new"
# ──────────────────────────────────────────────────────────────────────────────
# Array item declarations round-trip correctly
# ──────────────────────────────────────────────────────────────────────────────
def test_array_typed_output_with_array_item_renders_correctly():
service = _make_service(
declared_outputs=[
DeclaredOutputConfig(
name="files",
type=DeclaredOutputType.ARRAY,
array_item=DeclaredArrayItem(type=DeclaredOutputType.FILE),
)
],
)
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
ex = _execution(node_id="agent-1", outputs={"files": []})
with _patch_session(workflow_run=run, executions=[ex]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
output = snapshot.node_outputs[0].outputs[0]
assert output.type == DeclaredOutputType.ARRAY
# ──────────────────────────────────────────────────────────────────────────────
# Graph parsing edge cases
# ──────────────────────────────────────────────────────────────────────────────
def test_unparseable_graph_blob_yields_empty_snapshot_not_500():
service = _make_service()
run = SimpleNamespace(
id="run-1",
workflow_id="workflow-1",
tenant_id="tenant-1",
app_id="app-1",
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
status=WorkflowExecutionStatus.RUNNING,
graph="{not valid json",
)
with _patch_session(workflow_run=run, executions=[]):
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
assert snapshot.node_outputs == []

14
api/uv.lock generated
View File

@ -595,16 +595,16 @@ wheels = [
[[package]]
name = "boto3"
version = "1.43.14"
version = "1.43.10"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "botocore" },
{ name = "jmespath" },
{ name = "s3transfer" },
]
sdist = { url = "https://files.pythonhosted.org/packages/79/4b/616367e871ce3f1cb3e8545a97736b6331b9fb081497f2d44c5b2aa6959d/boto3-1.43.14.tar.gz", hash = "sha256:5c0a994b3182061ee101812e721100717a4d664f9f4ceaf4a86b6d032ce9fc2d", size = 113142, upload-time = "2026-05-22T19:28:47.861Z" }
sdist = { url = "https://files.pythonhosted.org/packages/ff/27/ae1a71e945ce7bde39b0677b252fe7d8a0ad7fa3d6b724d78b81469c08fe/boto3-1.43.10.tar.gz", hash = "sha256:27342e5d5f6170fcc8d1e21cdd939af2448d58ac56b08d494250eaad998e30c7", size = 113159, upload-time = "2026-05-18T20:42:34.454Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/cb/00/59cb9329c18e2d3aa23062ceaa87d065f2e81e7d2931df24d64e9a7815aa/boto3-1.43.14-py3-none-any.whl", hash = "sha256:574335744656cfed0b362a0a0467aaf2eb2bf15526edcd02d31d3c661f4b09e4", size = 140536, upload-time = "2026-05-22T19:28:46.49Z" },
{ url = "https://files.pythonhosted.org/packages/0e/1b/439234598449f846b17333e67ec63c3dd8f8880c13de9089383b4bab58c3/boto3-1.43.10-py3-none-any.whl", hash = "sha256:83918184d95967e4c6e9ed1e9a2f58250b291e6ea2cb847ab0825d52596b39e5", size = 140534, upload-time = "2026-05-18T20:42:32.009Z" },
]
[[package]]
@ -627,16 +627,16 @@ bedrock-runtime = [
[[package]]
name = "botocore"
version = "1.43.14"
version = "1.43.10"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "jmespath" },
{ name = "python-dateutil" },
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/78/3c/798d2f7deb118241930c7c6bcfb0b970d3f0245bf580700663199aeed2c3/botocore-1.43.14.tar.gz", hash = "sha256:b9e500737e43d2f147c9d4e23b54360335e77d4c0ba90a318f51b65e06cb8516", size = 15382604, upload-time = "2026-05-22T19:28:36.363Z" }
sdist = { url = "https://files.pythonhosted.org/packages/e2/4e/c127dd0628c551f10cb890e279a9c0e367523b880c4cd3e81a1e76886174/botocore-1.43.10.tar.gz", hash = "sha256:2f4af585b41dbccdfc9f49677d7bd72d713a12ef89a1dc9c8538a927649498bf", size = 15365344, upload-time = "2026-05-18T20:42:21.562Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/27/7e/6e64821077cd2efc4aa51b7d638fb6d48e1c7c450201c529fbaf1de8bfd3/botocore-1.43.14-py3-none-any.whl", hash = "sha256:1f4a2a95ea78c10398e78431e98c1fe47adb54a7b10a32975144c1f541186658", size = 15061424, upload-time = "2026-05-22T19:28:32.682Z" },
{ url = "https://files.pythonhosted.org/packages/a6/0e/41f64d6c267edf03f4fe8f461edc4c644243e77c8d5a1fef1e0166ac4ed0/botocore-1.43.10-py3-none-any.whl", hash = "sha256:8a0176d8c2f8bebe95d4f923a824a1ace04b02f360e220681c388e097f32c3b6", size = 15043571, upload-time = "2026-05-18T20:42:16.664Z" },
]
[[package]]
@ -1612,7 +1612,7 @@ requires-dist = [
{ name = "aliyun-log-python-sdk", specifier = "==0.9.44" },
{ name = "azure-identity", specifier = ">=1.25.3,<2.0.0" },
{ name = "bleach", specifier = ">=6.3.0,<7.0.0" },
{ name = "boto3", specifier = ">=1.43.14,<2.0.0" },
{ name = "boto3", specifier = ">=1.43.10,<2.0.0" },
{ name = "celery", specifier = ">=5.6.3,<6.0.0" },
{ name = "croniter", specifier = ">=6.2.2,<7.0.0" },
{ name = "dify-agent", directory = "../dify-agent" },

View File

@ -121,7 +121,9 @@ export type AgentSoulToolsConfig = {
cli_tools?: Array<{
[key: string]: unknown
}>
dify_tools?: Array<AgentSoulDifyToolConfig>
dify_tools?: Array<{
[key: string]: unknown
}>
}
export type AgentKnowledgeQueryMode = 'generated_query' | 'user_query'
@ -132,28 +134,6 @@ export type AgentSoulModelCredentialRef = {
type: string
}
export type AgentSoulDifyToolConfig = {
credential_ref?: AgentSoulDifyToolCredentialRef
credential_type?: 'api-key' | 'oauth2' | 'unauthorized'
description?: string | null
enabled?: boolean
name?: string | null
plugin_id?: string | null
provider?: string | null
provider_id?: string | null
provider_type?: string
runtime_parameters?: {
[key: string]: unknown
}
tool_name: string
}
export type AgentSoulDifyToolCredentialRef = {
id?: string | null
provider?: string | null
type?: 'provider' | 'tool'
}
export type GetAgentsData = {
body?: never
path?: never

View File

@ -78,6 +78,14 @@ export const zAgentSoulSkillsFilesConfig = z.object({
skills: z.array(z.record(z.string(), z.unknown())).optional(),
})
/**
* AgentSoulToolsConfig
*/
export const zAgentSoulToolsConfig = z.object({
cli_tools: z.array(z.record(z.string(), z.unknown())).optional(),
dify_tools: z.array(z.record(z.string(), z.unknown())).optional(),
})
/**
* AgentKnowledgeQueryMode
*/
@ -116,53 +124,6 @@ export const zAgentSoulModelConfig = z.object({
plugin_id: z.string().min(1).max(255),
})
/**
* AgentSoulDifyToolCredentialRef
*
* Reference to a stored Dify Plugin Tool credential.
*
* Secret values are resolved only at runtime. The legacy ``credential_id``
* field is accepted by :class:`AgentSoulDifyToolConfig` and normalized here so
* old Agent tool payloads can be read while new payloads stay explicit.
*/
export const zAgentSoulDifyToolCredentialRef = z.object({
id: z.string().max(255).nullish(),
provider: z.string().max(255).nullish(),
type: z.enum(['provider', 'tool']).optional().default('tool'),
})
/**
* AgentSoulDifyToolConfig
*
* One Dify Plugin Tool configured on Agent Soul.
*
* The API backend prepares this persisted product shape into
* ``DifyPluginToolConfig`` before sending a run request to Agent backend.
* ``provider_id`` keeps compatibility with existing Agent tool config payloads;
* new callers should send ``plugin_id`` + ``provider`` when available.
*/
export const zAgentSoulDifyToolConfig = z.object({
credential_ref: zAgentSoulDifyToolCredentialRef.optional(),
credential_type: z.enum(['api-key', 'oauth2', 'unauthorized']).optional().default('api-key'),
description: z.string().nullish(),
enabled: z.boolean().optional().default(true),
name: z.string().max(255).nullish(),
plugin_id: z.string().max(255).nullish(),
provider: z.string().max(255).nullish(),
provider_id: z.string().max(255).nullish(),
provider_type: z.string().optional().default('plugin'),
runtime_parameters: z.record(z.string(), z.unknown()).optional(),
tool_name: z.string().min(1).max(255),
})
/**
* AgentSoulToolsConfig
*/
export const zAgentSoulToolsConfig = z.object({
cli_tools: z.array(z.record(z.string(), z.unknown())).optional(),
dify_tools: z.array(zAgentSoulDifyToolConfig).optional(),
})
/**
* AgentSoulConfig
*/

View File

@ -167,14 +167,6 @@ import {
zGetAppsByAppIdWorkflowsDraftNodesByNodeIdVariablesResponse,
zGetAppsByAppIdWorkflowsDraftPath,
zGetAppsByAppIdWorkflowsDraftResponse,
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewPath,
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse,
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdPath,
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponse,
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsPath,
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponse,
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsPath,
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponse,
zGetAppsByAppIdWorkflowsDraftSystemVariablesPath,
zGetAppsByAppIdWorkflowsDraftSystemVariablesResponse,
zGetAppsByAppIdWorkflowsDraftVariablesByVariableIdPath,
@ -183,14 +175,6 @@ import {
zGetAppsByAppIdWorkflowsDraftVariablesQuery,
zGetAppsByAppIdWorkflowsDraftVariablesResponse,
zGetAppsByAppIdWorkflowsPath,
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewPath,
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse,
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdPath,
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponse,
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsPath,
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponse,
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsPath,
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponse,
zGetAppsByAppIdWorkflowsPublishPath,
zGetAppsByAppIdWorkflowsPublishResponse,
zGetAppsByAppIdWorkflowsQuery,
@ -3802,125 +3786,6 @@ export const run10 = {
post: post52,
}
/**
* Server-Sent Events stream of inspector deltas for a draft workflow run.
*
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
*
* @deprecated
*/
export const get59 = oc
.route({
deprecated: true,
description:
'Server-Sent Events stream of inspector deltas for a draft workflow run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
inputStructure: 'detailed',
method: 'GET',
operationId: 'getAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEvents',
path: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/events',
tags: ['console'],
})
.input(z.object({ params: zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsPath }))
.output(zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponse)
export const events = {
get: get59,
}
/**
* Full value for one declared output, including signed download URL for files.
*
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
*
* @deprecated
*/
export const get60 = oc
.route({
deprecated: true,
description:
'Full value for one declared output, including signed download URL for files.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
inputStructure: 'detailed',
method: 'GET',
operationId: 'getAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreview',
path: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/{node_id}/{output_name}/preview',
tags: ['console'],
})
.input(
z.object({
params: zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewPath,
}),
)
.output(zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse)
export const preview3 = {
get: get60,
}
export const byOutputName = {
preview: preview3,
}
/**
* One node's declared outputs for a draft workflow run.
*
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
*
* @deprecated
*/
export const get61 = oc
.route({
deprecated: true,
description:
'One node\'s declared outputs for a draft workflow run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
inputStructure: 'detailed',
method: 'GET',
operationId: 'getAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeId',
path: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/{node_id}',
tags: ['console'],
})
.input(z.object({ params: zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdPath }))
.output(zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponse)
export const byNodeId8 = {
get: get61,
byOutputName,
}
/**
* Snapshot of every node's declared outputs for a draft workflow run.
*
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
*
* @deprecated
*/
export const get62 = oc
.route({
deprecated: true,
description:
'Snapshot of every node\'s declared outputs for a draft workflow run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
inputStructure: 'detailed',
method: 'GET',
operationId: 'getAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputs',
path: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs',
tags: ['console'],
})
.input(z.object({ params: zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsPath }))
.output(zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponse)
export const nodeOutputs = {
get: get62,
events,
byNodeId: byNodeId8,
}
export const byRunId2 = {
nodeOutputs,
}
export const runs = {
byRunId: byRunId2,
}
/**
* Get system variables for workflow
*
@ -3928,7 +3793,7 @@ export const runs = {
*
* @deprecated
*/
export const get63 = oc
export const get59 = oc
.route({
deprecated: true,
description:
@ -3943,7 +3808,7 @@ export const get63 = oc
.output(zGetAppsByAppIdWorkflowsDraftSystemVariablesResponse)
export const systemVariables = {
get: get63,
get: get59,
}
/**
@ -4065,7 +3930,7 @@ export const delete9 = oc
*
* @deprecated
*/
export const get64 = oc
export const get60 = oc
.route({
deprecated: true,
description:
@ -4107,7 +3972,7 @@ export const patch2 = oc
export const byVariableId = {
delete: delete9,
get: get64,
get: get60,
patch: patch2,
reset,
}
@ -4137,7 +4002,7 @@ export const delete10 = oc
*
* @deprecated
*/
export const get65 = oc
export const get61 = oc
.route({
deprecated: true,
description:
@ -4159,7 +4024,7 @@ export const get65 = oc
export const variables2 = {
delete: delete10,
get: get65,
get: get61,
byVariableId,
}
@ -4172,7 +4037,7 @@ export const variables2 = {
*
* @deprecated
*/
export const get66 = oc
export const get62 = oc
.route({
deprecated: true,
description:
@ -4217,7 +4082,7 @@ export const post55 = oc
.output(zPostAppsByAppIdWorkflowsDraftResponse)
export const draft2 = {
get: get66,
get: get62,
post: post55,
conversationVariables: conversationVariables2,
environmentVariables,
@ -4227,7 +4092,6 @@ export const draft2 = {
loop: loop2,
nodes: nodes7,
run: run10,
runs,
systemVariables,
trigger: trigger2,
variables: variables2,
@ -4242,7 +4106,7 @@ export const draft2 = {
*
* @deprecated
*/
export const get67 = oc
export const get63 = oc
.route({
deprecated: true,
description:
@ -4285,137 +4149,10 @@ export const post56 = oc
.output(zPostAppsByAppIdWorkflowsPublishResponse)
export const publish = {
get: get67,
get: get63,
post: post56,
}
/**
* Server-Sent Events stream of inspector deltas for a published workflow run.
*
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
*
* @deprecated
*/
export const get68 = oc
.route({
deprecated: true,
description:
'Server-Sent Events stream of inspector deltas for a published workflow run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
inputStructure: 'detailed',
method: 'GET',
operationId: 'getAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEvents',
path: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/events',
tags: ['console'],
})
.input(z.object({ params: zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsPath }))
.output(zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponse)
export const events2 = {
get: get68,
}
/**
* Full value for one declared output of a published run.
*
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
*
* @deprecated
*/
export const get69 = oc
.route({
deprecated: true,
description:
'Full value for one declared output of a published run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
inputStructure: 'detailed',
method: 'GET',
operationId:
'getAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreview',
path: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/{node_id}/{output_name}/preview',
tags: ['console'],
})
.input(
z.object({
params:
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewPath,
}),
)
.output(
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse,
)
export const preview4 = {
get: get69,
}
export const byOutputName2 = {
preview: preview4,
}
/**
* One node's declared outputs for a published workflow run.
*
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
*
* @deprecated
*/
export const get70 = oc
.route({
deprecated: true,
description:
'One node\'s declared outputs for a published workflow run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
inputStructure: 'detailed',
method: 'GET',
operationId: 'getAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeId',
path: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/{node_id}',
tags: ['console'],
})
.input(z.object({ params: zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdPath }))
.output(zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponse)
export const byNodeId9 = {
get: get70,
byOutputName: byOutputName2,
}
/**
* Snapshot of every node's declared outputs for a published workflow run.
*
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
*
* @deprecated
*/
export const get71 = oc
.route({
deprecated: true,
description:
'Snapshot of every node\'s declared outputs for a published workflow run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
inputStructure: 'detailed',
method: 'GET',
operationId: 'getAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputs',
path: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs',
tags: ['console'],
})
.input(z.object({ params: zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsPath }))
.output(zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponse)
export const nodeOutputs2 = {
get: get71,
events: events2,
byNodeId: byNodeId9,
}
export const byRunId3 = {
nodeOutputs: nodeOutputs2,
}
export const runs2 = {
byRunId: byRunId3,
}
export const published = {
runs: runs2,
}
/**
* Get webhook trigger for a node
*
@ -4423,7 +4160,7 @@ export const published = {
*
* @deprecated
*/
export const get72 = oc
export const get64 = oc
.route({
deprecated: true,
description:
@ -4444,7 +4181,7 @@ export const get72 = oc
.output(zGetAppsByAppIdWorkflowsTriggersWebhookResponse)
export const webhook = {
get: get72,
get: get64,
}
export const triggers2 = {
@ -4542,7 +4279,7 @@ export const byWorkflowId = {
*
* @deprecated
*/
export const get73 = oc
export const get65 = oc
.route({
deprecated: true,
description:
@ -4563,11 +4300,10 @@ export const get73 = oc
.output(zGetAppsByAppIdWorkflowsResponse)
export const workflows3 = {
get: get73,
get: get65,
defaultWorkflowBlockConfigs,
draft: draft2,
publish,
published,
triggers: triggers2,
byWorkflowId,
}
@ -4600,7 +4336,7 @@ export const delete12 = oc
*
* @deprecated
*/
export const get74 = oc
export const get66 = oc
.route({
deprecated: true,
description:
@ -4641,7 +4377,7 @@ export const put7 = oc
export const byAppId2 = {
delete: delete12,
get: get74,
get: get66,
put: put7,
advancedChat,
agentComposer,
@ -4710,7 +4446,7 @@ export const byApiKeyId = {
*
* Get all API keys for an app
*/
export const get75 = oc
export const get67 = oc
.route({
description: 'Get all API keys for an app',
inputStructure: 'detailed',
@ -4743,7 +4479,7 @@ export const post58 = oc
.output(zPostAppsByResourceIdApiKeysResponse)
export const apiKeys = {
get: get75,
get: get67,
post: post58,
byApiKeyId,
}
@ -4759,7 +4495,7 @@ export const byResourceId = {
*
* @deprecated
*/
export const get76 = oc
export const get68 = oc
.route({
deprecated: true,
description:
@ -4774,7 +4510,7 @@ export const get76 = oc
.output(zGetAppsByServerIdServerRefreshResponse)
export const refresh = {
get: get76,
get: get68,
}
export const server2 = {
@ -4790,7 +4526,7 @@ export const byServerId = {
*
* Get list of applications with pagination and filtering
*/
export const get77 = oc
export const get69 = oc
.route({
description: 'Get list of applications with pagination and filtering',
inputStructure: 'detailed',
@ -4829,7 +4565,7 @@ export const post59 = oc
.output(zPostAppsResponse)
export const apps = {
get: get77,
get: get69,
post: post59,
imports,
workflows,

View File

@ -1430,7 +1430,9 @@ export type AgentSoulToolsConfig = {
cli_tools?: Array<{
[key: string]: unknown
}>
dify_tools?: Array<AgentSoulDifyToolConfig>
dify_tools?: Array<{
[key: string]: unknown
}>
}
export type DeclaredOutputConfig = {
@ -1523,22 +1525,6 @@ export type AgentSoulModelCredentialRef = {
type: string
}
export type AgentSoulDifyToolConfig = {
credential_ref?: AgentSoulDifyToolCredentialRef
credential_type?: 'api-key' | 'oauth2' | 'unauthorized'
description?: string | null
enabled?: boolean
name?: string | null
plugin_id?: string | null
provider?: string | null
provider_id?: string | null
provider_type?: string
runtime_parameters?: {
[key: string]: unknown
}
tool_name: string
}
export type DeclaredArrayItem = {
description?: string | null
type: DeclaredOutputType
@ -1576,12 +1562,6 @@ export type UserActionConfig = {
export type FormInputConfig = unknown
export type AgentSoulDifyToolCredentialRef = {
id?: string | null
provider?: string | null
type?: 'provider' | 'tool'
}
export type OutputErrorStrategy = 'default_value' | 'fail_branch' | 'stop'
export type DeclaredOutputRetryConfig = {
@ -4770,122 +4750,6 @@ export type PostAppsByAppIdWorkflowsDraftRunResponses = {
export type PostAppsByAppIdWorkflowsDraftRunResponse
= PostAppsByAppIdWorkflowsDraftRunResponses[keyof PostAppsByAppIdWorkflowsDraftRunResponses]
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsData = {
body?: never
path: {
app_id: string
run_id: string
}
query?: never
url: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs'
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsErrors = {
404: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsError
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsErrors[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsErrors]
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponses = {
200: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponse
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponses[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponses]
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsData = {
body?: never
path: {
app_id: string
run_id: string
}
query?: never
url: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/events'
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsErrors = {
404: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsError
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsErrors[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsErrors]
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponses = {
200: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponse
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponses[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponses]
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdData = {
body?: never
path: {
app_id: string
node_id: string
run_id: string
}
query?: never
url: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/{node_id}'
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdErrors = {
404: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdError
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdErrors[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdErrors]
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponses = {
200: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponse
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponses[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponses]
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewData = {
body?: never
path: {
app_id: string
node_id: string
output_name: string
run_id: string
}
query?: never
url: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/{node_id}/{output_name}/preview'
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewErrors = {
404: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewError
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewErrors[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewErrors]
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponses
= {
200: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponses[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponses]
export type GetAppsByAppIdWorkflowsDraftSystemVariablesData = {
body?: never
path: {
@ -5142,124 +5006,6 @@ export type PostAppsByAppIdWorkflowsPublishResponses = {
export type PostAppsByAppIdWorkflowsPublishResponse
= PostAppsByAppIdWorkflowsPublishResponses[keyof PostAppsByAppIdWorkflowsPublishResponses]
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsData = {
body?: never
path: {
app_id: string
run_id: string
}
query?: never
url: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs'
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsErrors = {
404: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsError
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsErrors[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsErrors]
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponses = {
200: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponse
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponses[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponses]
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsData = {
body?: never
path: {
app_id: string
run_id: string
}
query?: never
url: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/events'
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsErrors = {
404: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsError
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsErrors[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsErrors]
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponses = {
200: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponse
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponses[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponses]
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdData = {
body?: never
path: {
app_id: string
node_id: string
run_id: string
}
query?: never
url: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/{node_id}'
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdErrors = {
404: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdError
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdErrors[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdErrors]
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponses = {
200: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponse
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponses[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponses]
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewData
= {
body?: never
path: {
app_id: string
node_id: string
output_name: string
run_id: string
}
query?: never
url: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/{node_id}/{output_name}/preview'
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewErrors
= {
404: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewError
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewErrors[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewErrors]
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponses
= {
200: {
[key: string]: unknown
}
}
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponses[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponses]
export type GetAppsByAppIdWorkflowsTriggersWebhookData = {
body?: never
path: {

View File

@ -1533,6 +1533,14 @@ export const zAgentSoulSkillsFilesConfig = z.object({
skills: z.array(z.record(z.string(), z.unknown())).optional(),
})
/**
* AgentSoulToolsConfig
*/
export const zAgentSoulToolsConfig = z.object({
cli_tools: z.array(z.record(z.string(), z.unknown())).optional(),
dify_tools: z.array(z.record(z.string(), z.unknown())).optional(),
})
/**
* WorkflowNodeJobMode
*/
@ -1763,6 +1771,25 @@ export const zAgentSoulModelConfig = z.object({
plugin_id: z.string().min(1).max(255),
})
/**
* AgentSoulConfig
*/
export const zAgentSoulConfig = z.object({
app_features: z.record(z.string(), z.unknown()).optional(),
app_variables: z.array(zAppVariableConfig).optional(),
env: zAgentSoulEnvConfig.optional(),
human: zAgentSoulHumanConfig.optional(),
knowledge: zAgentSoulKnowledgeConfig.optional(),
memory: zAgentSoulMemoryConfig.optional(),
misc_legacy: z.record(z.string(), z.unknown()).optional(),
model: zAgentSoulModelConfig.optional(),
prompt: zAgentSoulPromptConfig.optional(),
sandbox: zAgentSoulSandboxConfig.optional(),
schema_version: z.int().optional().default(1),
skills_files: zAgentSoulSkillsFilesConfig.optional(),
tools: zAgentSoulToolsConfig.optional(),
})
/**
* DeclaredOutputCheckConfig
*
@ -1815,72 +1842,6 @@ export const zDeclaredArrayItem = z.object({
export const zFormInputConfig = z.unknown()
/**
* AgentSoulDifyToolCredentialRef
*
* Reference to a stored Dify Plugin Tool credential.
*
* Secret values are resolved only at runtime. The legacy ``credential_id``
* field is accepted by :class:`AgentSoulDifyToolConfig` and normalized here so
* old Agent tool payloads can be read while new payloads stay explicit.
*/
export const zAgentSoulDifyToolCredentialRef = z.object({
id: z.string().max(255).nullish(),
provider: z.string().max(255).nullish(),
type: z.enum(['provider', 'tool']).optional().default('tool'),
})
/**
* AgentSoulDifyToolConfig
*
* One Dify Plugin Tool configured on Agent Soul.
*
* The API backend prepares this persisted product shape into
* ``DifyPluginToolConfig`` before sending a run request to Agent backend.
* ``provider_id`` keeps compatibility with existing Agent tool config payloads;
* new callers should send ``plugin_id`` + ``provider`` when available.
*/
export const zAgentSoulDifyToolConfig = z.object({
credential_ref: zAgentSoulDifyToolCredentialRef.optional(),
credential_type: z.enum(['api-key', 'oauth2', 'unauthorized']).optional().default('api-key'),
description: z.string().nullish(),
enabled: z.boolean().optional().default(true),
name: z.string().max(255).nullish(),
plugin_id: z.string().max(255).nullish(),
provider: z.string().max(255).nullish(),
provider_id: z.string().max(255).nullish(),
provider_type: z.string().optional().default('plugin'),
runtime_parameters: z.record(z.string(), z.unknown()).optional(),
tool_name: z.string().min(1).max(255),
})
/**
* AgentSoulToolsConfig
*/
export const zAgentSoulToolsConfig = z.object({
cli_tools: z.array(z.record(z.string(), z.unknown())).optional(),
dify_tools: z.array(zAgentSoulDifyToolConfig).optional(),
})
/**
* AgentSoulConfig
*/
export const zAgentSoulConfig = z.object({
app_features: z.record(z.string(), z.unknown()).optional(),
app_variables: z.array(zAppVariableConfig).optional(),
env: zAgentSoulEnvConfig.optional(),
human: zAgentSoulHumanConfig.optional(),
knowledge: zAgentSoulKnowledgeConfig.optional(),
memory: zAgentSoulMemoryConfig.optional(),
misc_legacy: z.record(z.string(), z.unknown()).optional(),
model: zAgentSoulModelConfig.optional(),
prompt: zAgentSoulPromptConfig.optional(),
sandbox: zAgentSoulSandboxConfig.optional(),
schema_version: z.int().optional().default(1),
skills_files: zAgentSoulSkillsFilesConfig.optional(),
tools: zAgentSoulToolsConfig.optional(),
})
/**
* OutputErrorStrategy
*
@ -3873,60 +3834,6 @@ export const zPostAppsByAppIdWorkflowsDraftRunPath = z.object({
*/
export const zPostAppsByAppIdWorkflowsDraftRunResponse = z.record(z.string(), z.unknown())
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsPath = z.object({
app_id: z.string(),
run_id: z.string(),
})
/**
* Success
*/
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponse = z.record(
z.string(),
z.unknown(),
)
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsPath = z.object({
app_id: z.string(),
run_id: z.string(),
})
/**
* Success
*/
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponse = z.record(
z.string(),
z.unknown(),
)
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdPath = z.object({
app_id: z.string(),
node_id: z.string(),
run_id: z.string(),
})
/**
* Success
*/
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponse = z.record(
z.string(),
z.unknown(),
)
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewPath
= z.object({
app_id: z.string(),
node_id: z.string(),
output_name: z.string(),
run_id: z.string(),
})
/**
* Success
*/
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse
= z.record(z.string(), z.unknown())
export const zGetAppsByAppIdWorkflowsDraftSystemVariablesPath = z.object({
app_id: z.string(),
})
@ -4047,60 +3954,6 @@ export const zPostAppsByAppIdWorkflowsPublishPath = z.object({
*/
export const zPostAppsByAppIdWorkflowsPublishResponse = z.record(z.string(), z.unknown())
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsPath = z.object({
app_id: z.string(),
run_id: z.string(),
})
/**
* Success
*/
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponse = z.record(
z.string(),
z.unknown(),
)
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsPath = z.object({
app_id: z.string(),
run_id: z.string(),
})
/**
* Success
*/
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponse = z.record(
z.string(),
z.unknown(),
)
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdPath = z.object({
app_id: z.string(),
node_id: z.string(),
run_id: z.string(),
})
/**
* Success
*/
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponse = z.record(
z.string(),
z.unknown(),
)
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewPath
= z.object({
app_id: z.string(),
node_id: z.string(),
output_name: z.string(),
run_id: z.string(),
})
/**
* Success
*/
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse
= z.record(z.string(), z.unknown())
export const zGetAppsByAppIdWorkflowsTriggersWebhookPath = z.object({
app_id: z.string(),
})