mirror of
https://github.com/langgenius/dify.git
synced 2026-05-26 20:07:46 +08:00
Compare commits
5 Commits
dependabot
...
feat/call-
| Author | SHA1 | Date | |
|---|---|---|---|
| 114daf3729 | |||
| 69fb870946 | |||
| 21b6c2bec1 | |||
| a41fa5607b | |||
| fb07b43107 |
@ -31,6 +31,7 @@ from clients.agent_backend.fake_client import FakeAgentBackendRunClient, FakeAge
|
||||
from clients.agent_backend.request_builder import (
|
||||
AGENT_SOUL_PROMPT_LAYER_ID,
|
||||
DIFY_EXECUTION_CONTEXT_LAYER_ID,
|
||||
DIFY_PLUGIN_TOOLS_LAYER_ID,
|
||||
WORKFLOW_NODE_JOB_PROMPT_LAYER_ID,
|
||||
WORKFLOW_USER_PROMPT_LAYER_ID,
|
||||
AgentBackendModelConfig,
|
||||
@ -43,6 +44,7 @@ from clients.agent_backend.request_builder import (
|
||||
__all__ = [
|
||||
"AGENT_SOUL_PROMPT_LAYER_ID",
|
||||
"DIFY_EXECUTION_CONTEXT_LAYER_ID",
|
||||
"DIFY_PLUGIN_TOOLS_LAYER_ID",
|
||||
"WORKFLOW_NODE_JOB_PROMPT_LAYER_ID",
|
||||
"WORKFLOW_USER_PROMPT_LAYER_ID",
|
||||
"AgentBackendError",
|
||||
|
||||
@ -18,8 +18,10 @@ from agenton.layers import ExitIntent
|
||||
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig
|
||||
from dify_agent.layers.dify_plugin import (
|
||||
DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
|
||||
DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID,
|
||||
DifyPluginCredentialValue,
|
||||
DifyPluginLLMLayerConfig,
|
||||
DifyPluginToolsLayerConfig,
|
||||
)
|
||||
from dify_agent.layers.execution_context import (
|
||||
DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID,
|
||||
@ -41,6 +43,7 @@ AGENT_SOUL_PROMPT_LAYER_ID = "agent_soul_prompt"
|
||||
WORKFLOW_NODE_JOB_PROMPT_LAYER_ID = "workflow_node_job_prompt"
|
||||
WORKFLOW_USER_PROMPT_LAYER_ID = "workflow_user_prompt"
|
||||
DIFY_EXECUTION_CONTEXT_LAYER_ID = "execution_context"
|
||||
DIFY_PLUGIN_TOOLS_LAYER_ID = "tools"
|
||||
|
||||
|
||||
class AgentBackendModelConfig(BaseModel):
|
||||
@ -81,6 +84,7 @@ class AgentBackendWorkflowNodeRunInput(BaseModel):
|
||||
purpose: RunPurpose = "workflow_node"
|
||||
idempotency_key: str | None = None
|
||||
output: AgentBackendOutputConfig | None = None
|
||||
tools: DifyPluginToolsLayerConfig | None = None
|
||||
session_snapshot: CompositorSessionSnapshot | None = None
|
||||
suspend_on_exit: bool = False
|
||||
metadata: dict[str, JsonValue] = Field(default_factory=dict)
|
||||
@ -147,6 +151,17 @@ class AgentBackendRunRequestBuilder:
|
||||
]
|
||||
)
|
||||
|
||||
if run_input.tools is not None and run_input.tools.tools:
|
||||
layers.append(
|
||||
RunLayerSpec(
|
||||
name=DIFY_PLUGIN_TOOLS_LAYER_ID,
|
||||
type=DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID,
|
||||
deps={"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID},
|
||||
metadata=run_input.metadata,
|
||||
config=run_input.tools,
|
||||
)
|
||||
)
|
||||
|
||||
if run_input.output is not None:
|
||||
layers.append(
|
||||
RunLayerSpec(
|
||||
|
||||
@ -68,6 +68,7 @@ from .app import (
|
||||
workflow_app_log,
|
||||
workflow_comment,
|
||||
workflow_draft_variable,
|
||||
workflow_node_output_inspector,
|
||||
workflow_run,
|
||||
workflow_statistic,
|
||||
workflow_trigger,
|
||||
@ -218,6 +219,7 @@ __all__ = [
|
||||
"workflow_app_log",
|
||||
"workflow_comment",
|
||||
"workflow_draft_variable",
|
||||
"workflow_node_output_inspector",
|
||||
"workflow_run",
|
||||
"workflow_statistic",
|
||||
"workflow_trigger",
|
||||
|
||||
415
api/controllers/console/app/workflow_node_output_inspector.py
Normal file
415
api/controllers/console/app/workflow_node_output_inspector.py
Normal file
@ -0,0 +1,415 @@
|
||||
"""Console REST endpoints for the Node Output Inspector (Stage 4 §8 / §10.3).
|
||||
|
||||
PRD §Node Output Inspector replaces the consumer-organized Variable Inspector
|
||||
with a producer-organized view of each node's declared outputs and their
|
||||
per-run status. This module exposes two parallel sets of three read-only
|
||||
endpoints — one for ``/workflows/draft/runs/...`` (Composer test runs) and one
|
||||
for ``/workflows/published/runs/...`` (real App API / webapp / webhook /
|
||||
schedule / plugin triggers). Both sets share the same service code, the same
|
||||
response shapes, and the same error codes; the URL is the *only* difference,
|
||||
so the frontend can pick the right prefix based on which run-detail page the
|
||||
user is on.
|
||||
|
||||
Decision D-1 (published Inspector deferred) was lifted 2026-05-26 — the
|
||||
``published_run_inspector_not_implemented`` 404 code is therefore no longer
|
||||
produced.
|
||||
|
||||
URLs follow the design doc and reuse the existing
|
||||
``/apps/<uuid:app_id>/workflows/draft/...`` prefix from
|
||||
:mod:`controllers.console.app.workflow_draft_variable`. The
|
||||
``published`` prefix mirrors it shape-for-shape.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from collections.abc import Iterator
|
||||
from uuid import UUID
|
||||
|
||||
from flask import Response
|
||||
from flask_restx import Resource
|
||||
|
||||
from controllers.console import console_ns
|
||||
from controllers.console.app.wraps import get_app_model
|
||||
from controllers.console.wraps import account_initialization_required, setup_required
|
||||
from libs.exception import BaseHTTPException
|
||||
from libs.login import login_required
|
||||
from models import App, AppMode
|
||||
from services.workflow import inspector_events
|
||||
from services.workflow.node_output_inspector_service import (
|
||||
NodeOutputInspectorError,
|
||||
NodeOutputInspectorService,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Heartbeat cadence — every N empty subscribe ticks emit a SSE comment so
|
||||
# intervening proxies (nginx, ingress) don't reap the idle connection.
|
||||
# ``inspector_events.subscribe`` ticks at 1s, so 15 → 15s heartbeat.
|
||||
_HEARTBEAT_EVERY_TICKS = 15
|
||||
# Hard ceiling on a single stream — if we never see a terminal workflow
|
||||
# event (engine crashed, redis dropped the message), force-close after this
|
||||
# many ticks (= seconds).
|
||||
_STREAM_HARD_TIMEOUT_TICKS = 1800 # 30 min
|
||||
|
||||
|
||||
def _service() -> NodeOutputInspectorService:
|
||||
"""One-line factory so tests can monkeypatch a stub if needed."""
|
||||
return NodeOutputInspectorService()
|
||||
|
||||
|
||||
def _serve_snapshot(app_model: App, run_id: UUID) -> dict:
|
||||
"""Resource-body shared by draft + published snapshot endpoints.
|
||||
|
||||
Pulled out so the 6 REST routes don't duplicate the same 6-line try/except
|
||||
+ ``model_dump`` ritual — the routes shrink to one-liners and the actual
|
||||
behaviour lives here, where unit tests can hit it without spinning up
|
||||
Flask request context.
|
||||
"""
|
||||
try:
|
||||
snapshot = _service().snapshot_workflow_run(app_model=app_model, workflow_run_id=str(run_id))
|
||||
except NodeOutputInspectorError as error:
|
||||
raise _InspectorNotFound(error) from error
|
||||
return snapshot.model_dump(mode="json")
|
||||
|
||||
|
||||
def _serve_node_detail(app_model: App, run_id: UUID, node_id: str) -> dict:
|
||||
"""Resource-body shared by draft + published node-detail endpoints."""
|
||||
try:
|
||||
view = _service().node_detail(
|
||||
app_model=app_model,
|
||||
workflow_run_id=str(run_id),
|
||||
node_id=node_id,
|
||||
)
|
||||
except NodeOutputInspectorError as error:
|
||||
raise _InspectorNotFound(error) from error
|
||||
return view.model_dump(mode="json")
|
||||
|
||||
|
||||
def _serve_output_preview(app_model: App, run_id: UUID, node_id: str, output_name: str) -> dict:
|
||||
"""Resource-body shared by draft + published output-preview endpoints."""
|
||||
try:
|
||||
preview = _service().output_preview(
|
||||
app_model=app_model,
|
||||
workflow_run_id=str(run_id),
|
||||
node_id=node_id,
|
||||
output_name=output_name,
|
||||
)
|
||||
except NodeOutputInspectorError as error:
|
||||
raise _InspectorNotFound(error) from error
|
||||
return preview.model_dump(mode="json")
|
||||
|
||||
|
||||
class _InspectorNotFound(BaseHTTPException):
|
||||
"""404 that preserves the inspector's specific error code.
|
||||
|
||||
Without this the response body collapses to a generic ``not_found`` code
|
||||
and clients lose the ability to distinguish, e.g.,
|
||||
``workflow_run_not_found`` from ``published_run_inspector_not_implemented``.
|
||||
"""
|
||||
|
||||
code = 404
|
||||
|
||||
def __init__(self, error: NodeOutputInspectorError) -> None:
|
||||
self.error_code = error.code
|
||||
super().__init__(description=str(error))
|
||||
|
||||
|
||||
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/runs/<uuid:run_id>/node-outputs")
|
||||
class WorkflowDraftRunNodeOutputsApi(Resource):
|
||||
"""Whole-run snapshot organized by producer node."""
|
||||
|
||||
@console_ns.doc("get_workflow_draft_run_node_outputs")
|
||||
@console_ns.doc(description="Snapshot of every node's declared outputs for a draft workflow run.")
|
||||
@console_ns.doc(params={"app_id": "Application ID", "run_id": "Workflow run ID"})
|
||||
@console_ns.response(404, "Workflow run not found")
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
||||
def get(self, app_model: App, run_id: UUID):
|
||||
return _serve_snapshot(app_model, run_id)
|
||||
|
||||
|
||||
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/runs/<uuid:run_id>/node-outputs/<string:node_id>")
|
||||
class WorkflowDraftRunNodeOutputDetailApi(Resource):
|
||||
"""One node's declared outputs + per-output status."""
|
||||
|
||||
@console_ns.doc("get_workflow_draft_run_node_output_detail")
|
||||
@console_ns.doc(description="One node's declared outputs for a draft workflow run.")
|
||||
@console_ns.doc(
|
||||
params={
|
||||
"app_id": "Application ID",
|
||||
"run_id": "Workflow run ID",
|
||||
"node_id": "Node ID inside the workflow graph",
|
||||
}
|
||||
)
|
||||
@console_ns.response(404, "Workflow run / node not found")
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
||||
def get(self, app_model: App, run_id: UUID, node_id: str):
|
||||
return _serve_node_detail(app_model, run_id, node_id)
|
||||
|
||||
|
||||
@console_ns.route(
|
||||
"/apps/<uuid:app_id>/workflows/draft/runs/<uuid:run_id>/node-outputs/<string:node_id>/<string:output_name>/preview"
|
||||
)
|
||||
class WorkflowDraftRunNodeOutputPreviewApi(Resource):
|
||||
"""Full value for one declared output (with signed URL for file refs)."""
|
||||
|
||||
@console_ns.doc("get_workflow_draft_run_node_output_preview")
|
||||
@console_ns.doc(description="Full value for one declared output, including signed download URL for files.")
|
||||
@console_ns.doc(
|
||||
params={
|
||||
"app_id": "Application ID",
|
||||
"run_id": "Workflow run ID",
|
||||
"node_id": "Node ID inside the workflow graph",
|
||||
"output_name": "Declared output name as exposed by Composer",
|
||||
}
|
||||
)
|
||||
@console_ns.response(404, "Workflow run / node / output not found")
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
||||
def get(self, app_model: App, run_id: UUID, node_id: str, output_name: str):
|
||||
return _serve_output_preview(app_model, run_id, node_id, output_name)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# SSE event stream — shared generator used by draft + published variants
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _sse_envelope(event: str, data: dict | str, event_id: int) -> str:
|
||||
"""Format one SSE record per D-5 ``{event, data, id}`` envelope.
|
||||
|
||||
``data`` is JSON-serialized when given as a dict; raw strings are
|
||||
forwarded unchanged so we can also emit ``:keepalive`` comment lines.
|
||||
"""
|
||||
payload = data if isinstance(data, str) else json.dumps(data, ensure_ascii=False)
|
||||
return f"event: {event}\nid: {event_id}\ndata: {payload}\n\n"
|
||||
|
||||
|
||||
def _stream_inspector_events(app_model: App, run_id: UUID) -> Iterator[str]:
|
||||
"""Yield SSE-framed strings for one workflow run.
|
||||
|
||||
The stream begins with a full ``snapshot`` event so the client has a
|
||||
starting state without needing a separate REST GET. Then for every
|
||||
``node_changed`` message from the pub/sub channel we re-read that node
|
||||
from DB and push a fresh ``node_changed`` event. When the workflow run
|
||||
reaches a terminal state we push one final ``workflow_run_completed``
|
||||
event and close the stream.
|
||||
|
||||
Failures inside the loop are caught and surfaced as ``error`` events so
|
||||
the frontend can show a banner rather than seeing the connection drop
|
||||
silently. The Inspector never raises across the SSE boundary.
|
||||
"""
|
||||
service = _service()
|
||||
run_id_str = str(run_id)
|
||||
|
||||
# Initial snapshot — also flushes a 404 back at the client right away
|
||||
# if the run is gone (raised before yielding any bytes, so Flask turns it
|
||||
# into the normal HTTP 404 path).
|
||||
try:
|
||||
snapshot = service.snapshot_workflow_run(app_model=app_model, workflow_run_id=run_id_str)
|
||||
except NodeOutputInspectorError as error:
|
||||
raise _InspectorNotFound(error) from error
|
||||
|
||||
event_id = 0
|
||||
yield _sse_envelope("snapshot", snapshot.model_dump(mode="json"), event_id)
|
||||
|
||||
# If the run already finished by the time the client connected, emit
|
||||
# the terminal envelope synchronously and close — no point subscribing.
|
||||
# The enum value for partial success is the hyphenated ``partial-succeeded``
|
||||
# (graphon.enums.WorkflowExecutionStatus), not ``partial_succeeded``.
|
||||
if snapshot.workflow_run_status.value in {"succeeded", "failed", "stopped", "partial-succeeded"}:
|
||||
event_id += 1
|
||||
yield _sse_envelope(
|
||||
"workflow_run_completed",
|
||||
{"workflow_run_id": run_id_str, "workflow_run_status": snapshot.workflow_run_status.value},
|
||||
event_id,
|
||||
)
|
||||
return
|
||||
|
||||
# Live subscription
|
||||
ticks_since_heartbeat = 0
|
||||
total_ticks = 0
|
||||
for message in inspector_events.subscribe(run_id_str, timeout_seconds=1.0):
|
||||
total_ticks += 1
|
||||
if total_ticks > _STREAM_HARD_TIMEOUT_TICKS:
|
||||
logger.warning(
|
||||
"Inspector SSE: forcing close after %ds without terminal event for run %s",
|
||||
_STREAM_HARD_TIMEOUT_TICKS,
|
||||
run_id_str,
|
||||
)
|
||||
return
|
||||
|
||||
# Heartbeat sentinel — ``inspector_events.subscribe`` synthesizes a
|
||||
# ``node_changed`` message with both fields ``None`` on every redis
|
||||
# timeout. Real ``workflow_completed`` messages keep their kind even
|
||||
# when status couldn't be resolved (publisher race), so checking kind
|
||||
# first makes the heartbeat branch safe.
|
||||
if message.kind == "node_changed" and message.node_id is None and message.status is None:
|
||||
ticks_since_heartbeat += 1
|
||||
if ticks_since_heartbeat >= _HEARTBEAT_EVERY_TICKS:
|
||||
yield ":keepalive\n\n"
|
||||
ticks_since_heartbeat = 0
|
||||
continue
|
||||
ticks_since_heartbeat = 0
|
||||
|
||||
if message.kind == "workflow_completed":
|
||||
event_id += 1
|
||||
yield _sse_envelope(
|
||||
"workflow_run_completed",
|
||||
{"workflow_run_id": run_id_str, "workflow_run_status": message.status or "unknown"},
|
||||
event_id,
|
||||
)
|
||||
return
|
||||
|
||||
# node_changed: recompute the node slice from DB
|
||||
if not message.node_id:
|
||||
continue
|
||||
try:
|
||||
node_view = service.node_detail(
|
||||
app_model=app_model,
|
||||
workflow_run_id=run_id_str,
|
||||
node_id=message.node_id,
|
||||
)
|
||||
except NodeOutputInspectorError:
|
||||
# Node may not appear in the graph yet (race with persistence); skip.
|
||||
continue
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Inspector SSE: node_detail failed for run %s node %s",
|
||||
run_id_str,
|
||||
message.node_id,
|
||||
exc_info=True,
|
||||
)
|
||||
event_id += 1
|
||||
yield _sse_envelope(
|
||||
"error",
|
||||
{"node_id": message.node_id, "message": "failed to refresh node detail"},
|
||||
event_id,
|
||||
)
|
||||
continue
|
||||
|
||||
event_id += 1
|
||||
yield _sse_envelope("node_changed", node_view.model_dump(mode="json"), event_id)
|
||||
|
||||
|
||||
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/runs/<uuid:run_id>/node-outputs/events")
|
||||
class WorkflowDraftRunNodeOutputEventsApi(Resource):
|
||||
"""SSE stream of inspector deltas for a draft run."""
|
||||
|
||||
@console_ns.doc("stream_workflow_draft_run_node_output_events")
|
||||
@console_ns.doc(description="Server-Sent Events stream of inspector deltas for a draft workflow run.")
|
||||
@console_ns.doc(params={"app_id": "Application ID", "run_id": "Workflow run ID"})
|
||||
@console_ns.response(404, "Workflow run not found")
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
||||
def get(self, app_model: App, run_id: UUID):
|
||||
return Response(
|
||||
_stream_inspector_events(app_model, run_id),
|
||||
mimetype="text/event-stream",
|
||||
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
|
||||
)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Published-run endpoints — symmetric to the draft trio above
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@console_ns.route("/apps/<uuid:app_id>/workflows/published/runs/<uuid:run_id>/node-outputs")
|
||||
class WorkflowPublishedRunNodeOutputsApi(Resource):
|
||||
"""Whole-run snapshot for a *published* workflow run.
|
||||
|
||||
Same response shape as the ``/draft/`` variant — frontend can multiplex
|
||||
based on which page (Composer test-run vs. Run History) is mounted.
|
||||
"""
|
||||
|
||||
@console_ns.doc("get_workflow_published_run_node_outputs")
|
||||
@console_ns.doc(description="Snapshot of every node's declared outputs for a published workflow run.")
|
||||
@console_ns.doc(params={"app_id": "Application ID", "run_id": "Workflow run ID"})
|
||||
@console_ns.response(404, "Workflow run not found")
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
||||
def get(self, app_model: App, run_id: UUID):
|
||||
return _serve_snapshot(app_model, run_id)
|
||||
|
||||
|
||||
@console_ns.route("/apps/<uuid:app_id>/workflows/published/runs/<uuid:run_id>/node-outputs/<string:node_id>")
|
||||
class WorkflowPublishedRunNodeOutputDetailApi(Resource):
|
||||
"""One node's declared outputs + per-output status (published run)."""
|
||||
|
||||
@console_ns.doc("get_workflow_published_run_node_output_detail")
|
||||
@console_ns.doc(description="One node's declared outputs for a published workflow run.")
|
||||
@console_ns.doc(
|
||||
params={
|
||||
"app_id": "Application ID",
|
||||
"run_id": "Workflow run ID",
|
||||
"node_id": "Node ID inside the workflow graph",
|
||||
}
|
||||
)
|
||||
@console_ns.response(404, "Workflow run / node not found")
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
||||
def get(self, app_model: App, run_id: UUID, node_id: str):
|
||||
return _serve_node_detail(app_model, run_id, node_id)
|
||||
|
||||
|
||||
@console_ns.route(
|
||||
"/apps/<uuid:app_id>/workflows/published/runs/<uuid:run_id>"
|
||||
"/node-outputs/<string:node_id>/<string:output_name>/preview"
|
||||
)
|
||||
class WorkflowPublishedRunNodeOutputPreviewApi(Resource):
|
||||
"""Full value for one declared output of a published run."""
|
||||
|
||||
@console_ns.doc("get_workflow_published_run_node_output_preview")
|
||||
@console_ns.doc(description="Full value for one declared output of a published run.")
|
||||
@console_ns.doc(
|
||||
params={
|
||||
"app_id": "Application ID",
|
||||
"run_id": "Workflow run ID",
|
||||
"node_id": "Node ID inside the workflow graph",
|
||||
"output_name": "Declared output name as exposed by Composer",
|
||||
}
|
||||
)
|
||||
@console_ns.response(404, "Workflow run / node / output not found")
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
||||
def get(self, app_model: App, run_id: UUID, node_id: str, output_name: str):
|
||||
return _serve_output_preview(app_model, run_id, node_id, output_name)
|
||||
|
||||
|
||||
@console_ns.route("/apps/<uuid:app_id>/workflows/published/runs/<uuid:run_id>/node-outputs/events")
|
||||
class WorkflowPublishedRunNodeOutputEventsApi(Resource):
|
||||
"""SSE stream of inspector deltas for a published run."""
|
||||
|
||||
@console_ns.doc("stream_workflow_published_run_node_output_events")
|
||||
@console_ns.doc(description="Server-Sent Events stream of inspector deltas for a published workflow run.")
|
||||
@console_ns.doc(params={"app_id": "Application ID", "run_id": "Workflow run ID"})
|
||||
@console_ns.response(404, "Workflow run not found")
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
||||
def get(self, app_model: App, run_id: UUID):
|
||||
return Response(
|
||||
_stream_inspector_events(app_model, run_id),
|
||||
mimetype="text/event-stream",
|
||||
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
|
||||
)
|
||||
@ -47,6 +47,12 @@ from graphon.graph_events import (
|
||||
)
|
||||
from graphon.node_events import NodeRunResult
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from services.workflow.inspector_events import (
|
||||
publish_node_changed as _inspector_publish_node_changed,
|
||||
)
|
||||
from services.workflow.inspector_events import (
|
||||
publish_workflow_completed as _inspector_publish_workflow_completed,
|
||||
)
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
@ -163,6 +169,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
|
||||
|
||||
self._workflow_execution_repository.save(execution)
|
||||
self._enqueue_trace_task(execution)
|
||||
_inspector_publish_workflow_completed(workflow_run_id=execution.id_, status=str(execution.status.value))
|
||||
|
||||
def _handle_graph_run_partial_succeeded(self, event: GraphRunPartialSucceededEvent) -> None:
|
||||
execution = self._get_workflow_execution()
|
||||
@ -173,6 +180,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
|
||||
|
||||
self._workflow_execution_repository.save(execution)
|
||||
self._enqueue_trace_task(execution)
|
||||
_inspector_publish_workflow_completed(workflow_run_id=execution.id_, status=str(execution.status.value))
|
||||
|
||||
def _handle_graph_run_failed(self, event: GraphRunFailedEvent) -> None:
|
||||
execution = self._get_workflow_execution()
|
||||
@ -184,6 +192,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
|
||||
self._fail_running_node_executions(error_message=event.error)
|
||||
self._workflow_execution_repository.save(execution)
|
||||
self._enqueue_trace_task(execution)
|
||||
_inspector_publish_workflow_completed(workflow_run_id=execution.id_, status=str(execution.status.value))
|
||||
|
||||
def _handle_graph_run_aborted(self, event: GraphRunAbortedEvent) -> None:
|
||||
execution = self._get_workflow_execution()
|
||||
@ -194,6 +203,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
|
||||
self._fail_running_node_executions(error_message=execution.error_message or "")
|
||||
self._workflow_execution_repository.save(execution)
|
||||
self._enqueue_trace_task(execution)
|
||||
_inspector_publish_workflow_completed(workflow_run_id=execution.id_, status=str(execution.status.value))
|
||||
|
||||
def _handle_graph_run_paused(self, event: GraphRunPausedEvent) -> None:
|
||||
execution = self._get_workflow_execution()
|
||||
@ -241,6 +251,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
|
||||
created_at=event.start_at,
|
||||
)
|
||||
self._node_snapshots[event.id] = snapshot
|
||||
_inspector_publish_node_changed(workflow_run_id=execution.id_, node_id=event.node_id, status="running")
|
||||
|
||||
def _handle_node_retry(self, event: NodeRunRetryEvent) -> None:
|
||||
domain_execution = self._get_node_execution(event.id)
|
||||
@ -248,6 +259,11 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
|
||||
domain_execution.error = event.error
|
||||
self._workflow_node_execution_repository.save(domain_execution)
|
||||
self._workflow_node_execution_repository.save_execution_data(domain_execution)
|
||||
_inspector_publish_node_changed(
|
||||
workflow_run_id=self._get_workflow_execution().id_,
|
||||
node_id=domain_execution.node_id,
|
||||
status="retry",
|
||||
)
|
||||
|
||||
def _handle_node_succeeded(self, event: NodeRunSucceededEvent) -> None:
|
||||
domain_execution = self._get_node_execution(event.id)
|
||||
@ -257,6 +273,11 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
|
||||
WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
finished_at=event.finished_at,
|
||||
)
|
||||
_inspector_publish_node_changed(
|
||||
workflow_run_id=self._get_workflow_execution().id_,
|
||||
node_id=domain_execution.node_id,
|
||||
status="succeeded",
|
||||
)
|
||||
|
||||
def _handle_node_failed(self, event: NodeRunFailedEvent) -> None:
|
||||
domain_execution = self._get_node_execution(event.id)
|
||||
@ -267,6 +288,11 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
|
||||
error=event.error,
|
||||
finished_at=event.finished_at,
|
||||
)
|
||||
_inspector_publish_node_changed(
|
||||
workflow_run_id=self._get_workflow_execution().id_,
|
||||
node_id=domain_execution.node_id,
|
||||
status="failed",
|
||||
)
|
||||
|
||||
def _handle_node_exception(self, event: NodeRunExceptionEvent) -> None:
|
||||
domain_execution = self._get_node_execution(event.id)
|
||||
@ -277,6 +303,11 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
|
||||
error=event.error,
|
||||
finished_at=event.finished_at,
|
||||
)
|
||||
_inspector_publish_node_changed(
|
||||
workflow_run_id=self._get_workflow_execution().id_,
|
||||
node_id=domain_execution.node_id,
|
||||
status="exception",
|
||||
)
|
||||
|
||||
def _handle_node_pause_requested(self, event: NodeRunPauseRequestedEvent) -> None:
|
||||
domain_execution = self._get_node_execution(event.id)
|
||||
|
||||
268
api/core/workflow/nodes/agent_v2/plugin_tools_builder.py
Normal file
268
api/core/workflow/nodes/agent_v2/plugin_tools_builder.py
Normal file
@ -0,0 +1,268 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Mapping
|
||||
from typing import Any, Protocol, cast
|
||||
|
||||
from dify_agent.layers.dify_plugin import (
|
||||
DifyPluginCredentialValue,
|
||||
DifyPluginToolConfig,
|
||||
DifyPluginToolCredentialType,
|
||||
DifyPluginToolParameter,
|
||||
DifyPluginToolParameterForm,
|
||||
DifyPluginToolsLayerConfig,
|
||||
)
|
||||
|
||||
from core.agent.entities import AgentToolEntity
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from core.tools.__base.tool import Tool
|
||||
from core.tools.entities.tool_entities import ToolProviderType
|
||||
from core.tools.errors import (
|
||||
ToolProviderCredentialValidationError,
|
||||
ToolProviderNotFoundError,
|
||||
)
|
||||
from core.tools.tool_manager import ToolManager
|
||||
from models.agent_config_entities import AgentSoulDifyToolConfig, AgentSoulToolsConfig
|
||||
from models.provider_ids import ToolProviderID
|
||||
|
||||
|
||||
class WorkflowAgentPluginToolsBuildError(ValueError):
|
||||
"""Raised when Agent Soul tools cannot be prepared for Agent backend."""
|
||||
|
||||
def __init__(self, error_code: str, message: str) -> None:
|
||||
self.error_code = error_code
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class AgentToolRuntimeProvider(Protocol):
|
||||
def get_agent_tool_runtime(
|
||||
self,
|
||||
tenant_id: str,
|
||||
app_id: str,
|
||||
agent_tool: AgentToolEntity,
|
||||
user_id: str | None = None,
|
||||
invoke_from: InvokeFrom = InvokeFrom.DEBUGGER,
|
||||
variable_pool: Any | None = None,
|
||||
) -> Tool: ...
|
||||
|
||||
|
||||
class WorkflowAgentPluginToolsBuilder:
|
||||
"""Prepare Agent Soul Dify Plugin Tools for the public Agent backend DTO."""
|
||||
|
||||
def __init__(self, *, tool_runtime_provider: AgentToolRuntimeProvider | None = None) -> None:
|
||||
self._tool_runtime_provider = tool_runtime_provider or ToolManager
|
||||
|
||||
def build(
|
||||
self,
|
||||
*,
|
||||
tenant_id: str,
|
||||
app_id: str,
|
||||
user_id: str | None,
|
||||
tools: AgentSoulToolsConfig,
|
||||
invoke_from: InvokeFrom,
|
||||
) -> DifyPluginToolsLayerConfig | None:
|
||||
"""Resolve user-selected Dify Plugin Tools into the Agent backend DTO.
|
||||
|
||||
``invoke_from`` is the *real* runtime caller category (DEBUGGER for a
|
||||
Composer test run, SERVICE_API / WEB_APP for a published run). It must
|
||||
be threaded through to :class:`ToolManager` so credential quotas, rate
|
||||
limits, and audit tags match the actual call site.
|
||||
"""
|
||||
enabled_tools = [tool for tool in tools.dify_tools if tool.enabled]
|
||||
if not enabled_tools:
|
||||
return None
|
||||
|
||||
prepared: list[DifyPluginToolConfig] = []
|
||||
seen_names: set[str] = set()
|
||||
for tool_config in enabled_tools:
|
||||
agent_tool = self._to_agent_tool_entity(tool_config)
|
||||
tool_runtime = self._fetch_tool_runtime(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
user_id=user_id,
|
||||
agent_tool=agent_tool,
|
||||
invoke_from=invoke_from,
|
||||
tool_config=tool_config,
|
||||
)
|
||||
|
||||
exposed_name = self._exposed_tool_name(tool_config)
|
||||
if exposed_name in seen_names:
|
||||
raise WorkflowAgentPluginToolsBuildError(
|
||||
"agent_tool_name_duplicated",
|
||||
f"Duplicate Dify Plugin Tool name {exposed_name!r}.",
|
||||
)
|
||||
seen_names.add(exposed_name)
|
||||
|
||||
prepared.append(self._to_backend_tool_config(tool_config, tool_runtime, exposed_name))
|
||||
|
||||
return DifyPluginToolsLayerConfig(tools=prepared)
|
||||
|
||||
def _fetch_tool_runtime(
|
||||
self,
|
||||
*,
|
||||
tenant_id: str,
|
||||
app_id: str,
|
||||
user_id: str | None,
|
||||
agent_tool: AgentToolEntity,
|
||||
invoke_from: InvokeFrom,
|
||||
tool_config: AgentSoulDifyToolConfig,
|
||||
) -> Tool:
|
||||
"""Resolve the API-side ``Tool`` runtime, mapping fetch errors to
|
||||
Inspector-friendly error codes so callers can render distinct UX for
|
||||
"tool definition gone" vs "credential failed".
|
||||
"""
|
||||
try:
|
||||
return self._tool_runtime_provider.get_agent_tool_runtime(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
agent_tool=agent_tool,
|
||||
user_id=user_id,
|
||||
invoke_from=invoke_from,
|
||||
variable_pool=None,
|
||||
)
|
||||
except ToolProviderNotFoundError as exc:
|
||||
raise WorkflowAgentPluginToolsBuildError(
|
||||
"agent_tool_declaration_not_found",
|
||||
f"Dify Plugin Tool {tool_config.tool_name!r} declaration not found: {exc}",
|
||||
) from exc
|
||||
except ToolProviderCredentialValidationError as exc:
|
||||
raise WorkflowAgentPluginToolsBuildError(
|
||||
"agent_tool_credential_invalid",
|
||||
f"Dify Plugin Tool {tool_config.tool_name!r} credential validation failed: {exc}",
|
||||
) from exc
|
||||
except ValueError as exc:
|
||||
# ToolManager raises bare ValueError when the agent tool's
|
||||
# ``runtime`` / runtime parameters are missing. Surface it under a
|
||||
# narrower error code than a generic "declaration not found" so
|
||||
# frontend can render an actionable hint.
|
||||
raise WorkflowAgentPluginToolsBuildError(
|
||||
"agent_tool_config_invalid",
|
||||
f"Dify Plugin Tool {tool_config.tool_name!r} runtime construction failed: {exc}",
|
||||
) from exc
|
||||
|
||||
@staticmethod
|
||||
def _to_agent_tool_entity(tool_config: AgentSoulDifyToolConfig) -> AgentToolEntity:
|
||||
return AgentToolEntity(
|
||||
provider_type=ToolProviderType.value_of(tool_config.provider_type),
|
||||
provider_id=WorkflowAgentPluginToolsBuilder._provider_id(tool_config),
|
||||
tool_name=tool_config.tool_name,
|
||||
tool_parameters=dict(tool_config.runtime_parameters),
|
||||
credential_id=tool_config.credential_ref.id if tool_config.credential_ref else None,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _provider_id(tool_config: AgentSoulDifyToolConfig) -> str:
|
||||
if tool_config.provider_id:
|
||||
return tool_config.provider_id
|
||||
assert tool_config.plugin_id is not None
|
||||
assert tool_config.provider is not None
|
||||
return f"{tool_config.plugin_id}/{tool_config.provider}"
|
||||
|
||||
@staticmethod
|
||||
def _exposed_tool_name(tool_config: AgentSoulDifyToolConfig) -> str:
|
||||
# Stage 3.1 decision: no user rename yet. Keep the model-visible tool
|
||||
# name aligned with the plugin declaration identity.
|
||||
return tool_config.tool_name
|
||||
|
||||
def _to_backend_tool_config(
|
||||
self,
|
||||
tool_config: AgentSoulDifyToolConfig,
|
||||
tool_runtime: Tool,
|
||||
exposed_name: str,
|
||||
) -> DifyPluginToolConfig:
|
||||
runtime = tool_runtime.runtime
|
||||
if runtime is None:
|
||||
raise WorkflowAgentPluginToolsBuildError(
|
||||
"agent_tool_config_invalid",
|
||||
f"Dify Plugin Tool {tool_config.tool_name!r} has no runtime.",
|
||||
)
|
||||
|
||||
provider_id = self._provider_id(tool_config)
|
||||
plugin_id, provider = self._plugin_provider(tool_config, provider_id)
|
||||
parameters = [
|
||||
DifyPluginToolParameter.model_validate(parameter.model_dump(mode="json"))
|
||||
for parameter in tool_runtime.get_merged_runtime_parameters()
|
||||
]
|
||||
runtime_parameters = self._runtime_parameters(tool_runtime, parameters)
|
||||
description = tool_config.description
|
||||
if description is None and tool_runtime.entity.description is not None:
|
||||
description = tool_runtime.entity.description.llm
|
||||
|
||||
return DifyPluginToolConfig(
|
||||
plugin_id=plugin_id,
|
||||
provider=provider,
|
||||
tool_name=tool_config.tool_name,
|
||||
credential_type=self._credential_type(tool_config, runtime.credentials),
|
||||
name=exposed_name,
|
||||
description=description,
|
||||
credentials=self._normalize_credentials(runtime.credentials, tool_name=tool_config.tool_name),
|
||||
runtime_parameters=runtime_parameters,
|
||||
parameters=parameters,
|
||||
parameters_json_schema=cast(dict[str, Any], tool_runtime.get_llm_parameters_json_schema()),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _plugin_provider(tool_config: AgentSoulDifyToolConfig, provider_id: str) -> tuple[str, str]:
|
||||
if tool_config.plugin_id and tool_config.provider:
|
||||
return tool_config.plugin_id, tool_config.provider
|
||||
provider_id_entity = ToolProviderID(provider_id)
|
||||
return provider_id_entity.plugin_id, provider_id_entity.provider_name
|
||||
|
||||
@staticmethod
|
||||
def _credential_type(
|
||||
tool_config: AgentSoulDifyToolConfig,
|
||||
credentials: Mapping[str, Any],
|
||||
) -> DifyPluginToolCredentialType:
|
||||
if not credentials and tool_config.credential_type == "unauthorized":
|
||||
return "unauthorized"
|
||||
return tool_config.credential_type
|
||||
|
||||
@staticmethod
|
||||
def _runtime_parameters(
|
||||
tool_runtime: Tool,
|
||||
parameters: list[DifyPluginToolParameter],
|
||||
) -> dict[str, Any]:
|
||||
runtime = tool_runtime.runtime
|
||||
runtime_parameters = dict(runtime.runtime_parameters if runtime is not None else {})
|
||||
missing = [
|
||||
parameter.name
|
||||
for parameter in parameters
|
||||
if parameter.form is not DifyPluginToolParameterForm.LLM
|
||||
and parameter.required
|
||||
and parameter.default is None
|
||||
and parameter.name not in runtime_parameters
|
||||
]
|
||||
if missing:
|
||||
names = ", ".join(sorted(missing))
|
||||
raise WorkflowAgentPluginToolsBuildError(
|
||||
"agent_tool_runtime_parameter_missing",
|
||||
f"Dify Plugin Tool {tool_runtime.entity.identity.name!r} is missing runtime parameters: {names}.",
|
||||
)
|
||||
return runtime_parameters
|
||||
|
||||
@staticmethod
|
||||
def _normalize_credentials(
|
||||
credentials: Mapping[str, Any],
|
||||
*,
|
||||
tool_name: str,
|
||||
) -> dict[str, DifyPluginCredentialValue]:
|
||||
"""Forward only scalar credential values to the Agent backend.
|
||||
|
||||
``DifyPluginCredentialValue`` is ``str | int | float | bool | None``.
|
||||
Refusing non-scalar values (lists, dicts, custom objects) is safer than
|
||||
``str(value)`` — stringifying a nested OAuth token blob produces a
|
||||
Python ``repr`` that the plugin daemon cannot use, and we'd rather
|
||||
surface a clear ``agent_tool_credential_shape_invalid`` than send junk.
|
||||
"""
|
||||
normalized: dict[str, DifyPluginCredentialValue] = {}
|
||||
for key, value in credentials.items():
|
||||
if isinstance(value, str | int | float | bool) or value is None:
|
||||
normalized[key] = value
|
||||
continue
|
||||
raise WorkflowAgentPluginToolsBuildError(
|
||||
"agent_tool_credential_shape_invalid",
|
||||
(
|
||||
f"Dify Plugin Tool {tool_name!r} credential {key!r} has a non-scalar value "
|
||||
f"({type(value).__name__}); only str/int/float/bool/None are forwarded to the daemon."
|
||||
),
|
||||
)
|
||||
return normalized
|
||||
@ -11,13 +11,14 @@ SUPPORTED_AGENT_BACKEND_FEATURES = frozenset(
|
||||
"workflow_context",
|
||||
"model",
|
||||
"structured_output",
|
||||
"tools.dify_tools",
|
||||
}
|
||||
)
|
||||
|
||||
RESERVED_AGENT_BACKEND_FEATURES = frozenset(
|
||||
{
|
||||
"skills_files",
|
||||
"tools",
|
||||
"tools.cli_tools",
|
||||
"knowledge",
|
||||
"human",
|
||||
"env",
|
||||
@ -32,7 +33,7 @@ def build_runtime_feature_manifest(agent_soul: AgentSoulConfig) -> dict[str, Any
|
||||
warnings: list[dict[str, str]] = []
|
||||
soul_dump = agent_soul.model_dump(mode="json")
|
||||
for section in sorted(RESERVED_AGENT_BACKEND_FEATURES):
|
||||
value = soul_dump.get(section)
|
||||
value = _get_nested(soul_dump, section)
|
||||
has_value = bool(value)
|
||||
if isinstance(value, dict):
|
||||
has_value = any(bool(item) for item in value.values())
|
||||
@ -41,11 +42,12 @@ def build_runtime_feature_manifest(agent_soul: AgentSoulConfig) -> dict[str, Any
|
||||
{
|
||||
"section": f"agent_soul.{section}",
|
||||
"code": "agent_backend_layer_not_available",
|
||||
"message": f"{section} is saved in Agent Soul but is not executed by Agent backend in phase 3.",
|
||||
"message": f"{section} is saved in Agent Soul but is not executed by Agent backend.",
|
||||
}
|
||||
)
|
||||
|
||||
reserved_status = dict.fromkeys(sorted(RESERVED_AGENT_BACKEND_FEATURES), "reserved_not_executed")
|
||||
reserved_status["tools.dify_tools"] = "supported_when_config_valid"
|
||||
|
||||
return {
|
||||
"supported": sorted(SUPPORTED_AGENT_BACKEND_FEATURES),
|
||||
@ -53,3 +55,12 @@ def build_runtime_feature_manifest(agent_soul: AgentSoulConfig) -> dict[str, Any
|
||||
"reserved_status": reserved_status,
|
||||
"unsupported_runtime_warnings": warnings,
|
||||
}
|
||||
|
||||
|
||||
def _get_nested(value: dict[str, Any], path: str) -> Any:
|
||||
current: Any = value
|
||||
for part in path.split("."):
|
||||
if not isinstance(current, dict):
|
||||
return None
|
||||
current = current.get(part)
|
||||
return current
|
||||
|
||||
@ -30,6 +30,7 @@ from models.agent_config_entities import (
|
||||
)
|
||||
|
||||
from .output_failure_orchestrator import retry_idempotency_key
|
||||
from .plugin_tools_builder import WorkflowAgentPluginToolsBuilder, WorkflowAgentPluginToolsBuildError
|
||||
from .runtime_feature_manifest import build_runtime_feature_manifest
|
||||
|
||||
|
||||
@ -84,9 +85,11 @@ class WorkflowAgentRuntimeRequestBuilder:
|
||||
*,
|
||||
credentials_provider: CredentialsProvider,
|
||||
request_builder: AgentBackendRunRequestBuilder | None = None,
|
||||
plugin_tools_builder: WorkflowAgentPluginToolsBuilder | None = None,
|
||||
) -> None:
|
||||
self._credentials_provider = credentials_provider
|
||||
self._request_builder = request_builder or AgentBackendRunRequestBuilder()
|
||||
self._plugin_tools_builder = plugin_tools_builder or WorkflowAgentPluginToolsBuilder()
|
||||
|
||||
def build(self, context: WorkflowAgentRuntimeBuildContext) -> WorkflowAgentRuntimeRequest:
|
||||
agent_soul = AgentSoulConfig.model_validate(context.snapshot.config_snapshot_dict)
|
||||
@ -102,6 +105,26 @@ class WorkflowAgentRuntimeRequestBuilder:
|
||||
workflow_job_prompt = node_job.workflow_prompt.strip() or "Run this workflow Agent Node for the current run."
|
||||
user_prompt = workflow_context_prompt.strip() or "Use the current workflow context."
|
||||
credentials = self._credentials_provider.fetch(agent_soul.model.model_provider, agent_soul.model.model)
|
||||
try:
|
||||
tools_layer = self._plugin_tools_builder.build(
|
||||
tenant_id=context.dify_context.tenant_id,
|
||||
app_id=context.dify_context.app_id,
|
||||
user_id=context.dify_context.user_id,
|
||||
tools=agent_soul.tools,
|
||||
# Thread the *real* runtime invocation source through to
|
||||
# ToolManager so credential quotas, rate limits, and audit
|
||||
# trails match the actual call site (DEBUGGER for draft test
|
||||
# run, SERVICE_API / WEB_APP for published run).
|
||||
invoke_from=context.dify_context.invoke_from,
|
||||
)
|
||||
except WorkflowAgentPluginToolsBuildError as error:
|
||||
raise WorkflowAgentRuntimeRequestBuildError(error.error_code, str(error)) from error
|
||||
if tools_layer is not None:
|
||||
metadata["agent_tools"] = {
|
||||
"dify_tool_count": len(tools_layer.tools),
|
||||
"dify_tool_names": [tool.name or tool.tool_name for tool in tools_layer.tools],
|
||||
"cli_tool_count": len(agent_soul.tools.cli_tools),
|
||||
}
|
||||
|
||||
request = self._request_builder.build_for_workflow_node(
|
||||
AgentBackendWorkflowNodeRunInput(
|
||||
@ -134,6 +157,7 @@ class WorkflowAgentRuntimeRequestBuilder:
|
||||
workflow_node_job_prompt=workflow_job_prompt,
|
||||
user_prompt=user_prompt,
|
||||
output=self._build_output_config(node_job.declared_outputs),
|
||||
tools=tools_layer,
|
||||
idempotency_key=self._idempotency_key(context),
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
@ -126,6 +126,7 @@ class WorkflowAgentNodeValidator:
|
||||
raise WorkflowAgentNodeValidationError(
|
||||
f"Workflow Agent node {binding.node_id} requires Agent Soul model config."
|
||||
)
|
||||
cls._validate_agent_soul_tools(binding=binding, agent_soul=agent_soul)
|
||||
node_job = WorkflowNodeJobConfig.model_validate(binding.node_job_config_dict)
|
||||
cls.validate_node_job(session=session, binding=binding, node_job=node_job, topology=topology)
|
||||
|
||||
@ -280,6 +281,26 @@ class WorkflowAgentNodeValidator:
|
||||
f"Workflow Agent node {binding.node_id} references unsupported human contact channel {channel}."
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _validate_agent_soul_tools(
|
||||
cls,
|
||||
*,
|
||||
binding: WorkflowAgentNodeBinding,
|
||||
agent_soul: AgentSoulConfig,
|
||||
) -> None:
|
||||
exposed_names: set[str] = set()
|
||||
for tool in agent_soul.tools.dify_tools:
|
||||
if not tool.enabled:
|
||||
continue
|
||||
exposed_name = tool.tool_name
|
||||
if exposed_name in exposed_names:
|
||||
raise WorkflowAgentNodeValidationError(
|
||||
f"Workflow Agent node {binding.node_id} has duplicate Dify Plugin Tool name {exposed_name}."
|
||||
)
|
||||
exposed_names.add(exposed_name)
|
||||
# CLI tools remain saved-but-not-executed. They are allowed at publish
|
||||
# time so existing Agent Soul drafts are not blocked by a reserved field.
|
||||
|
||||
@staticmethod
|
||||
def _validate_file_ref(
|
||||
*,
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import re
|
||||
from enum import StrEnum
|
||||
from typing import Any, Final
|
||||
from typing import Any, Final, Literal
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
|
||||
|
||||
@ -50,8 +50,90 @@ class AgentSoulSkillsFilesConfig(BaseModel):
|
||||
skills: list[dict[str, Any]] = Field(default_factory=list)
|
||||
|
||||
|
||||
class AgentSoulDifyToolCredentialRef(BaseModel):
|
||||
"""Reference to a stored Dify Plugin Tool credential.
|
||||
|
||||
Secret values are resolved only at runtime. The legacy ``credential_id``
|
||||
field is accepted by :class:`AgentSoulDifyToolConfig` and normalized here so
|
||||
old Agent tool payloads can be read while new payloads stay explicit.
|
||||
"""
|
||||
|
||||
model_config = ConfigDict(extra="ignore")
|
||||
|
||||
type: Literal["provider", "tool"] = "tool"
|
||||
id: str | None = Field(default=None, max_length=255)
|
||||
provider: str | None = Field(default=None, max_length=255)
|
||||
|
||||
|
||||
class AgentSoulDifyToolConfig(BaseModel):
|
||||
"""One Dify Plugin Tool configured on Agent Soul.
|
||||
|
||||
The API backend prepares this persisted product shape into
|
||||
``DifyPluginToolConfig`` before sending a run request to Agent backend.
|
||||
``provider_id`` keeps compatibility with existing Agent tool config payloads;
|
||||
new callers should send ``plugin_id`` + ``provider`` when available.
|
||||
"""
|
||||
|
||||
# ``extra="ignore"`` (not ``"allow"``) so historical Agent Soul payloads
|
||||
# with unknown fields still load — but the extra keys are dropped instead
|
||||
# of silently riding along into ``model_dump``. New callers should send the
|
||||
# explicit schema fields below.
|
||||
model_config = ConfigDict(extra="ignore")
|
||||
|
||||
enabled: bool = True
|
||||
# Dify Plugin Tools live behind the ``PLUGIN`` provider type. ``BUILT_IN`` /
|
||||
# ``WORKFLOW`` / ``API`` providers are not exposed to the Agent backend in
|
||||
# this layer — keep the default narrow so a missing field surfaces as
|
||||
# ``agent_tool_declaration_not_found`` against the correct provider table.
|
||||
provider_type: str = "plugin"
|
||||
provider_id: str | None = Field(default=None, max_length=255)
|
||||
plugin_id: str | None = Field(default=None, max_length=255)
|
||||
provider: str | None = Field(default=None, max_length=255)
|
||||
tool_name: str = Field(min_length=1, max_length=255)
|
||||
credential_type: Literal["api-key", "oauth2", "unauthorized"] = "api-key"
|
||||
credential_ref: AgentSoulDifyToolCredentialRef | None = None
|
||||
# Reserved for a future user-rename UX. Accepted but currently rejected at
|
||||
# validation time so frontend cannot silently believe a rename took effect
|
||||
# (see :meth:`_validate_provider_and_credentials`).
|
||||
name: str | None = Field(default=None, max_length=255)
|
||||
description: str | None = None
|
||||
runtime_parameters: dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def _normalize_legacy_payload(cls, value: Any) -> Any:
|
||||
if not isinstance(value, dict):
|
||||
return value
|
||||
normalized = dict(value)
|
||||
if normalized.get("provider_id") is None and isinstance(normalized.get("provider_name"), str):
|
||||
normalized["provider_id"] = normalized["provider_name"]
|
||||
if normalized.get("runtime_parameters") is None and isinstance(normalized.get("tool_parameters"), dict):
|
||||
normalized["runtime_parameters"] = normalized["tool_parameters"]
|
||||
if normalized.get("credential_ref") is None and normalized.get("credential_id"):
|
||||
normalized["credential_ref"] = {
|
||||
"type": "tool",
|
||||
"id": normalized.get("credential_id"),
|
||||
"provider": normalized.get("provider_id") or normalized.get("provider"),
|
||||
}
|
||||
return normalized
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _validate_provider_and_credentials(self) -> "AgentSoulDifyToolConfig":
|
||||
if not self.provider_id and not (self.plugin_id and self.provider):
|
||||
raise ValueError("Dify tool requires provider_id or plugin_id + provider")
|
||||
if self.credential_type != "unauthorized" and (self.credential_ref is None or not self.credential_ref.id):
|
||||
raise ValueError("credential_ref.id is required for credentialed Dify tools")
|
||||
# ``name`` is reserved for a future user-rename UX. Until that lands
|
||||
# the model-visible name is forced to match ``tool_name``; reject
|
||||
# explicit values so a frontend bug surfaces immediately instead of
|
||||
# producing a silently-ignored override.
|
||||
if self.name is not None and self.name != self.tool_name:
|
||||
raise ValueError("name override is not yet supported; omit ``name`` or set it equal to ``tool_name``.")
|
||||
return self
|
||||
|
||||
|
||||
class AgentSoulToolsConfig(BaseModel):
|
||||
dify_tools: list[dict[str, Any]] = Field(default_factory=list)
|
||||
dify_tools: list[AgentSoulDifyToolConfig] = Field(default_factory=list)
|
||||
cli_tools: list[dict[str, Any]] = Field(default_factory=list)
|
||||
|
||||
|
||||
|
||||
@ -3447,6 +3447,89 @@ Run draft workflow
|
||||
| 200 | Draft workflow run started successfully |
|
||||
| 403 | Permission denied |
|
||||
|
||||
### /apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs
|
||||
|
||||
#### GET
|
||||
##### Description
|
||||
|
||||
Snapshot of every node's declared outputs for a draft workflow run.
|
||||
|
||||
##### Parameters
|
||||
|
||||
| Name | Located in | Description | Required | Schema |
|
||||
| ---- | ---------- | ----------- | -------- | ------ |
|
||||
| app_id | path | Application ID | Yes | string |
|
||||
| run_id | path | Workflow run ID | Yes | string |
|
||||
|
||||
##### Responses
|
||||
|
||||
| Code | Description |
|
||||
| ---- | ----------- |
|
||||
| 404 | Workflow run not found |
|
||||
|
||||
### /apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/events
|
||||
|
||||
#### GET
|
||||
##### Description
|
||||
|
||||
Server-Sent Events stream of inspector deltas for a draft workflow run.
|
||||
|
||||
##### Parameters
|
||||
|
||||
| Name | Located in | Description | Required | Schema |
|
||||
| ---- | ---------- | ----------- | -------- | ------ |
|
||||
| app_id | path | Application ID | Yes | string |
|
||||
| run_id | path | Workflow run ID | Yes | string |
|
||||
|
||||
##### Responses
|
||||
|
||||
| Code | Description |
|
||||
| ---- | ----------- |
|
||||
| 404 | Workflow run not found |
|
||||
|
||||
### /apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/{node_id}
|
||||
|
||||
#### GET
|
||||
##### Description
|
||||
|
||||
One node's declared outputs for a draft workflow run.
|
||||
|
||||
##### Parameters
|
||||
|
||||
| Name | Located in | Description | Required | Schema |
|
||||
| ---- | ---------- | ----------- | -------- | ------ |
|
||||
| app_id | path | Application ID | Yes | string |
|
||||
| node_id | path | Node ID inside the workflow graph | Yes | string |
|
||||
| run_id | path | Workflow run ID | Yes | string |
|
||||
|
||||
##### Responses
|
||||
|
||||
| Code | Description |
|
||||
| ---- | ----------- |
|
||||
| 404 | Workflow run / node not found |
|
||||
|
||||
### /apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/{node_id}/{output_name}/preview
|
||||
|
||||
#### GET
|
||||
##### Description
|
||||
|
||||
Full value for one declared output, including signed download URL for files.
|
||||
|
||||
##### Parameters
|
||||
|
||||
| Name | Located in | Description | Required | Schema |
|
||||
| ---- | ---------- | ----------- | -------- | ------ |
|
||||
| app_id | path | Application ID | Yes | string |
|
||||
| node_id | path | Node ID inside the workflow graph | Yes | string |
|
||||
| output_name | path | Declared output name as exposed by Composer | Yes | string |
|
||||
| run_id | path | Workflow run ID | Yes | string |
|
||||
|
||||
##### Responses
|
||||
|
||||
| Code | Description |
|
||||
| ---- | ----------- |
|
||||
| 404 | Workflow run / node / output not found |
|
||||
|
||||
### /apps/{app_id}/workflows/draft/system-variables
|
||||
|
||||
#### GET
|
||||
@ -3684,6 +3767,89 @@ Publish workflow
|
||||
| ---- | ----------- |
|
||||
| 200 | Success |
|
||||
|
||||
### /apps/{app_id}/workflows/published/runs/{run_id}/node-outputs
|
||||
|
||||
#### GET
|
||||
##### Description
|
||||
|
||||
Snapshot of every node's declared outputs for a published workflow run.
|
||||
|
||||
##### Parameters
|
||||
|
||||
| Name | Located in | Description | Required | Schema |
|
||||
| ---- | ---------- | ----------- | -------- | ------ |
|
||||
| app_id | path | Application ID | Yes | string |
|
||||
| run_id | path | Workflow run ID | Yes | string |
|
||||
|
||||
##### Responses
|
||||
|
||||
| Code | Description |
|
||||
| ---- | ----------- |
|
||||
| 404 | Workflow run not found |
|
||||
|
||||
### /apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/events
|
||||
|
||||
#### GET
|
||||
##### Description
|
||||
|
||||
Server-Sent Events stream of inspector deltas for a published workflow run.
|
||||
|
||||
##### Parameters
|
||||
|
||||
| Name | Located in | Description | Required | Schema |
|
||||
| ---- | ---------- | ----------- | -------- | ------ |
|
||||
| app_id | path | Application ID | Yes | string |
|
||||
| run_id | path | Workflow run ID | Yes | string |
|
||||
|
||||
##### Responses
|
||||
|
||||
| Code | Description |
|
||||
| ---- | ----------- |
|
||||
| 404 | Workflow run not found |
|
||||
|
||||
### /apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/{node_id}
|
||||
|
||||
#### GET
|
||||
##### Description
|
||||
|
||||
One node's declared outputs for a published workflow run.
|
||||
|
||||
##### Parameters
|
||||
|
||||
| Name | Located in | Description | Required | Schema |
|
||||
| ---- | ---------- | ----------- | -------- | ------ |
|
||||
| app_id | path | Application ID | Yes | string |
|
||||
| node_id | path | Node ID inside the workflow graph | Yes | string |
|
||||
| run_id | path | Workflow run ID | Yes | string |
|
||||
|
||||
##### Responses
|
||||
|
||||
| Code | Description |
|
||||
| ---- | ----------- |
|
||||
| 404 | Workflow run / node not found |
|
||||
|
||||
### /apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/{node_id}/{output_name}/preview
|
||||
|
||||
#### GET
|
||||
##### Description
|
||||
|
||||
Full value for one declared output of a published run.
|
||||
|
||||
##### Parameters
|
||||
|
||||
| Name | Located in | Description | Required | Schema |
|
||||
| ---- | ---------- | ----------- | -------- | ------ |
|
||||
| app_id | path | Application ID | Yes | string |
|
||||
| node_id | path | Node ID inside the workflow graph | Yes | string |
|
||||
| output_name | path | Declared output name as exposed by Composer | Yes | string |
|
||||
| run_id | path | Workflow run ID | Yes | string |
|
||||
|
||||
##### Responses
|
||||
|
||||
| Code | Description |
|
||||
| ---- | ----------- |
|
||||
| 404 | Workflow run / node / output not found |
|
||||
|
||||
### /apps/{app_id}/workflows/triggers/webhook
|
||||
|
||||
#### GET
|
||||
@ -10539,6 +10705,43 @@ Supported icon storage formats for Agent roster entries.
|
||||
| skills_files | [AgentSoulSkillsFilesConfig](#agentsoulskillsfilesconfig) | | No |
|
||||
| tools | [AgentSoulToolsConfig](#agentsoultoolsconfig) | | No |
|
||||
|
||||
#### AgentSoulDifyToolConfig
|
||||
|
||||
One Dify Plugin Tool configured on Agent Soul.
|
||||
|
||||
The API backend prepares this persisted product shape into
|
||||
``DifyPluginToolConfig`` before sending a run request to Agent backend.
|
||||
``provider_id`` keeps compatibility with existing Agent tool config payloads;
|
||||
new callers should send ``plugin_id`` + ``provider`` when available.
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| credential_ref | [AgentSoulDifyToolCredentialRef](#agentsouldifytoolcredentialref) | | No |
|
||||
| credential_type | string | *Enum:* `"api-key"`, `"oauth2"`, `"unauthorized"` | No |
|
||||
| description | string | | No |
|
||||
| enabled | boolean | | No |
|
||||
| name | string | | No |
|
||||
| plugin_id | string | | No |
|
||||
| provider | string | | No |
|
||||
| provider_id | string | | No |
|
||||
| provider_type | string | | No |
|
||||
| runtime_parameters | object | | No |
|
||||
| tool_name | string | | Yes |
|
||||
|
||||
#### AgentSoulDifyToolCredentialRef
|
||||
|
||||
Reference to a stored Dify Plugin Tool credential.
|
||||
|
||||
Secret values are resolved only at runtime. The legacy ``credential_id``
|
||||
field is accepted by :class:`AgentSoulDifyToolConfig` and normalized here so
|
||||
old Agent tool payloads can be read while new payloads stay explicit.
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| id | string | | No |
|
||||
| provider | string | | No |
|
||||
| type | string | *Enum:* `"provider"`, `"tool"` | No |
|
||||
|
||||
#### AgentSoulEnvConfig
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
@ -10616,7 +10819,7 @@ Reference to model credentials resolved only at runtime.
|
||||
| Name | Type | Description | Required |
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| cli_tools | [ object ] | | No |
|
||||
| dify_tools | [ object ] | | No |
|
||||
| dify_tools | [ [AgentSoulDifyToolConfig](#agentsouldifytoolconfig) ] | | No |
|
||||
|
||||
#### AgentThought
|
||||
|
||||
|
||||
@ -191,7 +191,7 @@ storage = [
|
||||
"opendal==0.46.0",
|
||||
"oss2>=2.19.1,<3.0.0",
|
||||
"supabase>=2.30.0,<3.0.0",
|
||||
"tos>=2.9.1,<3.0.0",
|
||||
"tos>=2.9.0,<3.0.0",
|
||||
]
|
||||
|
||||
############################################################
|
||||
|
||||
194
api/services/workflow/inspector_events.py
Normal file
194
api/services/workflow/inspector_events.py
Normal file
@ -0,0 +1,194 @@
|
||||
"""Inspector pub/sub fanout for live workflow run updates (Stage 4 §8.5).
|
||||
|
||||
The Node Output Inspector exposes a Server-Sent Events stream alongside its
|
||||
three REST endpoints so the frontend can render per-output progress without
|
||||
DB polling. This module owns the redis pub/sub channel that connects the two
|
||||
sides:
|
||||
|
||||
* :func:`publish_node_changed` / :func:`publish_workflow_completed` —
|
||||
invoked by :class:`core.app.workflow.layers.persistence.WorkflowPersistenceLayer`
|
||||
at the very end of each handler, after the DB write has already
|
||||
succeeded. Publish failures are swallowed so the engine never trips on a
|
||||
flaky redis connection.
|
||||
* :func:`subscribe` — async iterator the SSE endpoint consumes.
|
||||
|
||||
Channel layout
|
||||
--------------
|
||||
``dify:inspector:workflow_run:{workflow_run_id}``
|
||||
|
||||
One channel per workflow run; the SSE endpoint subscribes for the lifetime of
|
||||
the run and unsubscribes on the terminal event. Multiple clients can attach
|
||||
to the same run safely — redis pub/sub fans every message out to every
|
||||
listener.
|
||||
|
||||
The message envelope intentionally carries only the *delta* needed to invalidate
|
||||
a slice of the inspector view; the SSE handler re-reads the canonical
|
||||
``WorkflowNodeExecutionModel`` row from the DB so we never serialize stale
|
||||
state across the wire. This means messages stay tiny (~150 bytes) and the
|
||||
inspector view stays consistent even if a publisher races persistence.
|
||||
|
||||
Decision D-5: the on-wire SSE envelope ``{event, data, id}`` is shared with
|
||||
the babysit chat stream; this module only emits the *internal* pub/sub
|
||||
message — the SSE controller turns it into the public envelope.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from collections.abc import Iterator
|
||||
from dataclasses import asdict, dataclass
|
||||
from typing import Final, Literal
|
||||
|
||||
from extensions.ext_redis import redis_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Channel naming
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
_CHANNEL_PREFIX: Final = "dify:inspector:workflow_run"
|
||||
|
||||
|
||||
def channel_for(workflow_run_id: str) -> str:
|
||||
"""Return the pub/sub channel name for ``workflow_run_id``.
|
||||
|
||||
Kept as a module-level helper so tests can pin the channel without
|
||||
reaching into the publish/subscribe code paths.
|
||||
"""
|
||||
return f"{_CHANNEL_PREFIX}:{workflow_run_id}"
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Message envelope
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
#: Tags discriminating the wire-level message kinds. Kept narrow so the SSE
|
||||
#: controller can pattern-match exhaustively.
|
||||
InspectorMessageKind = Literal["node_changed", "workflow_completed"]
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class InspectorMessage:
|
||||
"""Minimal delta carried across the pub/sub channel.
|
||||
|
||||
``node_id`` is set only for ``node_changed`` messages; ``status`` is the
|
||||
coarse string status straight from the persistence layer (``"running"`` /
|
||||
``"succeeded"`` / ``"failed"`` for nodes, plus ``"succeeded"`` /
|
||||
``"failed"`` / ``"partial_succeeded"`` / ``"stopped"`` for workflow runs).
|
||||
"""
|
||||
|
||||
kind: InspectorMessageKind
|
||||
workflow_run_id: str
|
||||
node_id: str | None = None
|
||||
status: str | None = None
|
||||
|
||||
def to_json(self) -> str:
|
||||
return json.dumps(asdict(self), ensure_ascii=False)
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, blob: str) -> InspectorMessage | None:
|
||||
"""Decode a payload, returning ``None`` for any shape we can't trust."""
|
||||
try:
|
||||
decoded = json.loads(blob)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
return None
|
||||
if not isinstance(decoded, dict):
|
||||
return None
|
||||
kind = decoded.get("kind")
|
||||
if kind not in ("node_changed", "workflow_completed"):
|
||||
return None
|
||||
workflow_run_id = decoded.get("workflow_run_id")
|
||||
if not isinstance(workflow_run_id, str) or not workflow_run_id:
|
||||
return None
|
||||
node_id = decoded.get("node_id")
|
||||
if node_id is not None and not isinstance(node_id, str):
|
||||
return None
|
||||
status = decoded.get("status")
|
||||
if status is not None and not isinstance(status, str):
|
||||
return None
|
||||
return cls(kind=kind, workflow_run_id=workflow_run_id, node_id=node_id, status=status)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Publisher (called from the persistence layer)
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _publish(message: InspectorMessage) -> None:
|
||||
"""Best-effort fire-and-forget publish.
|
||||
|
||||
Persistence runs inside the workflow engine thread; we never want a redis
|
||||
glitch to break the workflow. Any exception is logged at debug level so
|
||||
operators still see them when they grep, but the engine keeps running.
|
||||
"""
|
||||
try:
|
||||
redis_client.publish(channel_for(message.workflow_run_id), message.to_json())
|
||||
except Exception:
|
||||
logger.debug("InspectorEventPublisher: publish failed for %s", message.workflow_run_id, exc_info=True)
|
||||
|
||||
|
||||
def publish_node_changed(*, workflow_run_id: str, node_id: str, status: str) -> None:
|
||||
"""Announce that one node's execution row just changed.
|
||||
|
||||
The SSE handler will recompute the node slice from the DB on receipt.
|
||||
"""
|
||||
_publish(InspectorMessage(kind="node_changed", workflow_run_id=workflow_run_id, node_id=node_id, status=status))
|
||||
|
||||
|
||||
def publish_workflow_completed(*, workflow_run_id: str, status: str) -> None:
|
||||
"""Announce that the workflow run reached a terminal state.
|
||||
|
||||
The SSE handler emits one last envelope and disconnects.
|
||||
"""
|
||||
_publish(InspectorMessage(kind="workflow_completed", workflow_run_id=workflow_run_id, status=status))
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Subscriber (consumed by the SSE controller)
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def subscribe(workflow_run_id: str, *, timeout_seconds: float = 1.0) -> Iterator[InspectorMessage]:
|
||||
"""Yield ``InspectorMessage`` instances until the consumer abandons us.
|
||||
|
||||
The loop polls redis with ``timeout_seconds`` so the SSE handler can
|
||||
interleave keepalive heartbeats. Yields ``None`` on timeout so the caller
|
||||
can decide whether to keep blocking; malformed payloads are silently
|
||||
skipped.
|
||||
|
||||
The pub/sub connection is closed when the iterator is garbage-collected
|
||||
(the wrapping ``finally`` releases it as soon as the SSE handler exits).
|
||||
"""
|
||||
pubsub = redis_client.pubsub()
|
||||
pubsub.subscribe(channel_for(workflow_run_id))
|
||||
try:
|
||||
while True:
|
||||
raw = pubsub.get_message(ignore_subscribe_messages=True, timeout=timeout_seconds)
|
||||
if raw is None:
|
||||
# Surface a heartbeat tick — caller can keep-alive or check
|
||||
# disconnection without blocking redis any longer.
|
||||
yield InspectorMessage(kind="node_changed", workflow_run_id=workflow_run_id, node_id=None, status=None)
|
||||
continue
|
||||
data = raw.get("data") if isinstance(raw, dict) else None
|
||||
if isinstance(data, bytes):
|
||||
data = data.decode("utf-8", errors="replace")
|
||||
if not isinstance(data, str):
|
||||
continue
|
||||
message = InspectorMessage.from_json(data)
|
||||
if message is None:
|
||||
continue
|
||||
yield message
|
||||
finally:
|
||||
try:
|
||||
pubsub.unsubscribe(channel_for(workflow_run_id))
|
||||
pubsub.close()
|
||||
except Exception:
|
||||
logger.debug(
|
||||
"InspectorEventPublisher: pubsub teardown failed for %s",
|
||||
workflow_run_id,
|
||||
exc_info=True,
|
||||
)
|
||||
712
api/services/workflow/node_output_inspector_service.py
Normal file
712
api/services/workflow/node_output_inspector_service.py
Normal file
@ -0,0 +1,712 @@
|
||||
"""Node Output Inspector service (Stage 4 §8).
|
||||
|
||||
PRD §Node Output Inspector renames every workflow "Variable" to a "Node Output"
|
||||
and re-organizes the panel by **producer node** rather than consumer node. This
|
||||
service backs the new REST surface
|
||||
``/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs[/...]`` with three
|
||||
read-only views:
|
||||
|
||||
* :meth:`snapshot_workflow_run` — every node + its declared outputs + per-output
|
||||
status, for one debug workflow run.
|
||||
* :meth:`node_detail` — the same shape filtered down to one node.
|
||||
* :meth:`output_preview` — full payload for one output, with signed download
|
||||
URL when the output references an upload file.
|
||||
|
||||
Design constraints baked into this version:
|
||||
|
||||
1. **No new tables** (§8.1). Topology comes from ``WorkflowRun.graph`` (the
|
||||
graph snapshot taken at execution time so the view stays consistent even
|
||||
if the draft was edited mid-run). Execution facts come from
|
||||
``WorkflowNodeExecutionModel`` rows already produced by the workflow
|
||||
runtime.
|
||||
2. **Draft + published runs** (decision D-1 lifted 2026-05-26). The Inspector
|
||||
accepts ``WorkflowRunTriggeredFrom.DEBUGGING`` (draft test runs) as well as
|
||||
``APP_RUN`` / ``WEBHOOK`` / ``SCHEDULE`` / ``PLUGIN`` / ``RAG_PIPELINE_*``
|
||||
triggers; the underlying graph + executions are uniform across all of them.
|
||||
Cross-tenant / cross-app rows still 404 via the standard tenant/app scope.
|
||||
3. **Declared outputs by node kind**:
|
||||
* Agent v2 nodes resolve their declared list via
|
||||
:class:`WorkflowAgentBindingResolver` (the binding owns the canonical
|
||||
``DeclaredOutputConfig`` list and falls back to PRD defaults when empty).
|
||||
* Other node kinds don't have a declared-output schema yet; we surface the
|
||||
keys present in the execution payload as a best-effort list typed
|
||||
``unknown`` so the panel can still render them.
|
||||
4. **Per-output status** is derived from the metadata the agent_v2 stack
|
||||
already records (``output_type_check`` and ``output_check`` blobs); falling
|
||||
back to the node's overall status when those signals aren't present.
|
||||
5. **SSE stream** (design §8.5) lives in
|
||||
:mod:`controllers.console.app.workflow_node_output_inspector` alongside the
|
||||
REST endpoints. The Inspector and the babysit chat SSE share the
|
||||
``{event, data, id}`` envelope per decision D-5.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from collections.abc import Mapping, Sequence
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from enum import StrEnum
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
from sqlalchemy import select
|
||||
|
||||
from core.db.session_factory import session_factory
|
||||
from core.workflow.nodes.agent_v2.binding_resolver import (
|
||||
WorkflowAgentBindingError,
|
||||
WorkflowAgentBindingResolver,
|
||||
)
|
||||
from core.workflow.nodes.agent_v2.runtime_request_builder import (
|
||||
WorkflowAgentRuntimeRequestBuilder,
|
||||
)
|
||||
from graphon.enums import (
|
||||
BuiltinNodeTypes,
|
||||
WorkflowExecutionStatus,
|
||||
WorkflowNodeExecutionStatus,
|
||||
)
|
||||
from graphon.file import helpers as file_helpers
|
||||
from models import App
|
||||
from models.agent_config_entities import DeclaredOutputConfig, DeclaredOutputType
|
||||
from models.workflow import WorkflowNodeExecutionModel, WorkflowRun
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Public dataclasses / enums (Pydantic — these go straight on the wire)
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class NodeOutputStatus(StrEnum):
|
||||
"""Lifecycle status of a single declared output within a run."""
|
||||
|
||||
PENDING = "pending" # node not started
|
||||
RUNNING = "running" # node running, output not ready yet
|
||||
READY = "ready"
|
||||
TYPE_CHECK_FAILED = "type_check_failed"
|
||||
OUTPUT_CHECK_FAILED = "output_check_failed"
|
||||
NOT_PRODUCED = "not_produced" # node succeeded but did not produce this declared output
|
||||
FAILED = "failed" # node itself failed/exception/stopped
|
||||
|
||||
|
||||
class NodeStatus(StrEnum):
|
||||
"""Coarse node-level status used by Inspector to pick a banner."""
|
||||
|
||||
IDLE = "idle"
|
||||
RUNNING = "running"
|
||||
READY = "ready"
|
||||
FAILED = "failed"
|
||||
|
||||
|
||||
class CheckResultView(BaseModel):
|
||||
"""``type_check`` / ``output_check`` per-output summary block."""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
passed: bool
|
||||
reason: str | None = None
|
||||
|
||||
|
||||
class NodeOutputView(BaseModel):
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
name: str
|
||||
type: DeclaredOutputType | None = None
|
||||
status: NodeOutputStatus
|
||||
value_preview: Any = None
|
||||
type_check: CheckResultView | None = None
|
||||
output_check: CheckResultView | None = None
|
||||
retried: int = 0
|
||||
|
||||
|
||||
class NodeOutputsView(BaseModel):
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
node_id: str
|
||||
node_kind: str
|
||||
node_display_name: str
|
||||
node_status: NodeStatus
|
||||
node_started_at: datetime | None = None
|
||||
node_completed_at: datetime | None = None
|
||||
outputs: list[NodeOutputView] = Field(default_factory=list)
|
||||
|
||||
|
||||
class WorkflowRunSnapshotView(BaseModel):
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
workflow_run_id: str
|
||||
workflow_run_status: WorkflowExecutionStatus
|
||||
node_outputs: list[NodeOutputsView] = Field(default_factory=list)
|
||||
|
||||
|
||||
class OutputPreviewView(BaseModel):
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
node_id: str
|
||||
output_name: str
|
||||
type: DeclaredOutputType | None = None
|
||||
status: NodeOutputStatus
|
||||
value: Any = None # full value (with signed URL for file refs)
|
||||
|
||||
|
||||
class NodeOutputInspectorError(Exception):
|
||||
"""Raised when a request cannot be served (404-level conditions)."""
|
||||
|
||||
def __init__(self, code: str, message: str) -> None:
|
||||
super().__init__(message)
|
||||
self.code = code
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Internal helpers — declared outputs per node
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class _ResolvedDeclaration:
|
||||
"""Declared output the Inspector should expose, with a normalized type.
|
||||
|
||||
``inferred`` is ``True`` when the node kind has no declared-output schema
|
||||
(we derived the list from the execution payload). The frontend can use
|
||||
this to dim the type column.
|
||||
"""
|
||||
|
||||
name: str
|
||||
declared_type: DeclaredOutputType | None
|
||||
inferred: bool
|
||||
|
||||
|
||||
def _is_agent_v2_node(node: Mapping[str, Any]) -> bool:
|
||||
"""A graph node is Agent v2 iff its ``data.type`` is the AGENT builtin
|
||||
AND its ``data.version`` is ``"2"``.
|
||||
|
||||
``BuiltinNodeTypes.AGENT`` is a ``ClassVar[NodeType]`` (plain string), not
|
||||
a StrEnum, so we compare against it directly without ``.value``.
|
||||
"""
|
||||
data = node.get("data") or {}
|
||||
if not isinstance(data, Mapping):
|
||||
return False
|
||||
if data.get("type") != BuiltinNodeTypes.AGENT:
|
||||
return False
|
||||
return str(data.get("version", "")) == "2"
|
||||
|
||||
|
||||
def _graph_nodes(workflow_run: WorkflowRun) -> list[Mapping[str, Any]]:
|
||||
"""Parse ``WorkflowRun.graph`` (LongText JSON) into a node list.
|
||||
|
||||
The graph blob may be missing / unparseable for very old runs; we treat
|
||||
that as "no nodes" rather than failing the Inspector, so the panel still
|
||||
renders an empty state.
|
||||
"""
|
||||
if not workflow_run.graph:
|
||||
return []
|
||||
try:
|
||||
parsed = json.loads(workflow_run.graph)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
logger.warning("NodeOutputInspector: workflow_run %s has unparseable graph blob", workflow_run.id)
|
||||
return []
|
||||
if not isinstance(parsed, Mapping):
|
||||
return []
|
||||
nodes = parsed.get("nodes")
|
||||
if not isinstance(nodes, list):
|
||||
return []
|
||||
return [n for n in nodes if isinstance(n, Mapping) and "id" in n]
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Internal helpers — per-output status derivation
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _decode_json_blob(blob: str | None) -> Mapping[str, Any] | None:
|
||||
if not blob:
|
||||
return None
|
||||
try:
|
||||
decoded = json.loads(blob)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
return None
|
||||
if not isinstance(decoded, Mapping):
|
||||
return None
|
||||
return decoded
|
||||
|
||||
|
||||
def _node_status_for(execution: WorkflowNodeExecutionModel | None) -> NodeStatus:
|
||||
if execution is None:
|
||||
return NodeStatus.IDLE
|
||||
if execution.status == WorkflowNodeExecutionStatus.RUNNING:
|
||||
return NodeStatus.RUNNING
|
||||
if execution.status == WorkflowNodeExecutionStatus.SUCCEEDED:
|
||||
return NodeStatus.READY
|
||||
return NodeStatus.FAILED
|
||||
|
||||
|
||||
def _type_check_by_name(metadata: Mapping[str, Any] | None) -> dict[str, Mapping[str, Any]]:
|
||||
if not metadata:
|
||||
return {}
|
||||
block = metadata.get("output_type_check")
|
||||
if not isinstance(block, Mapping):
|
||||
return {}
|
||||
results = block.get("results") or []
|
||||
if not isinstance(results, list):
|
||||
return {}
|
||||
indexed: dict[str, Mapping[str, Any]] = {}
|
||||
for r in results:
|
||||
if isinstance(r, Mapping) and isinstance(r.get("name"), str):
|
||||
indexed[r["name"]] = r
|
||||
return indexed
|
||||
|
||||
|
||||
def _output_check_by_name(metadata: Mapping[str, Any] | None) -> dict[str, Mapping[str, Any]]:
|
||||
if not metadata:
|
||||
return {}
|
||||
block = metadata.get("output_check")
|
||||
if not isinstance(block, Mapping):
|
||||
return {}
|
||||
results = block.get("results") or []
|
||||
if not isinstance(results, list):
|
||||
return {}
|
||||
indexed: dict[str, Mapping[str, Any]] = {}
|
||||
for r in results:
|
||||
if isinstance(r, Mapping) and isinstance(r.get("name"), str):
|
||||
indexed[r["name"]] = r
|
||||
return indexed
|
||||
|
||||
|
||||
def _retried_attempt_count(metadata: Mapping[str, Any] | None) -> int:
|
||||
"""The agent_v2 runtime records the final attempt index in metadata.
|
||||
|
||||
``attempt`` is 0-indexed so a single execution with no retry has
|
||||
``attempt == 0`` and a Inspector-friendly ``retried == 0``.
|
||||
"""
|
||||
if not metadata:
|
||||
return 0
|
||||
attempt = metadata.get("attempt")
|
||||
if isinstance(attempt, int) and attempt > 0:
|
||||
return attempt
|
||||
return 0
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Value preview (file refs get signed URLs)
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
_PREVIEW_TEXT_LIMIT = 500
|
||||
_FILE_ID_KEYS: tuple[str, ...] = ("file_id", "upload_file_id", "tool_file_id")
|
||||
|
||||
|
||||
def _looks_like_file_ref(value: Any) -> str | None:
|
||||
"""Return the resolved ``file_id`` when ``value`` is a file-shaped dict."""
|
||||
if not isinstance(value, Mapping):
|
||||
return None
|
||||
for key in _FILE_ID_KEYS:
|
||||
candidate = value.get(key)
|
||||
if isinstance(candidate, str) and candidate:
|
||||
return candidate
|
||||
return None
|
||||
|
||||
|
||||
def _value_preview(value: Any) -> Any:
|
||||
"""Compact preview suitable for the snapshot endpoint.
|
||||
|
||||
File refs are augmented with a signed download URL so the panel can render
|
||||
a thumbnail / link without a second round-trip; long strings are truncated;
|
||||
other scalar / dict / list shapes are returned as-is (the Pydantic layer
|
||||
enforces JSON-safety on serialization).
|
||||
"""
|
||||
file_id = _looks_like_file_ref(value)
|
||||
if file_id:
|
||||
assert isinstance(value, Mapping)
|
||||
try:
|
||||
preview_url = file_helpers.get_signed_file_url(upload_file_id=file_id)
|
||||
except Exception:
|
||||
logger.warning("NodeOutputInspector: signed URL failed for file_id=%s", file_id, exc_info=True)
|
||||
preview_url = None
|
||||
return {**dict(value), "preview_url": preview_url}
|
||||
if isinstance(value, str) and len(value) > _PREVIEW_TEXT_LIMIT:
|
||||
return value[:_PREVIEW_TEXT_LIMIT] + "…"
|
||||
return value
|
||||
|
||||
|
||||
def _full_value(value: Any) -> Any:
|
||||
"""Same shape as :func:`_value_preview` minus the truncation."""
|
||||
file_id = _looks_like_file_ref(value)
|
||||
if file_id:
|
||||
assert isinstance(value, Mapping)
|
||||
try:
|
||||
preview_url = file_helpers.get_signed_file_url(upload_file_id=file_id)
|
||||
except Exception:
|
||||
logger.warning("NodeOutputInspector: signed URL failed for file_id=%s", file_id, exc_info=True)
|
||||
preview_url = None
|
||||
return {**dict(value), "preview_url": preview_url}
|
||||
return value
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Service
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class NodeOutputInspectorService:
|
||||
"""Read-only Inspector for draft + published workflow runs.
|
||||
|
||||
The service is dependency-light: it holds a single
|
||||
:class:`WorkflowAgentBindingResolver` so agent v2 nodes can map to their
|
||||
declared outputs without re-implementing binding lookup. All other I/O
|
||||
uses the global session factory so workflow runs / executions stay on the
|
||||
repo-default code path.
|
||||
|
||||
Tenancy is enforced via ``app_model.tenant_id`` + ``app_model.id`` on
|
||||
every load — the same scope guard regardless of trigger source.
|
||||
"""
|
||||
|
||||
def __init__(self, binding_resolver: WorkflowAgentBindingResolver | None = None) -> None:
|
||||
self._binding_resolver = binding_resolver or WorkflowAgentBindingResolver()
|
||||
|
||||
# ── public API ────────────────────────────────────────────────────────
|
||||
|
||||
def snapshot_workflow_run(self, *, app_model: App, workflow_run_id: str) -> WorkflowRunSnapshotView:
|
||||
"""Build the per-node snapshot for one debug workflow run."""
|
||||
workflow_run, executions = self._load_run_and_executions(app_model=app_model, workflow_run_id=workflow_run_id)
|
||||
executions_by_node = self._index_executions_by_node(executions)
|
||||
graph_nodes = _graph_nodes(workflow_run)
|
||||
|
||||
node_views: list[NodeOutputsView] = []
|
||||
for raw_node in graph_nodes:
|
||||
node_id = str(raw_node["id"])
|
||||
execution = executions_by_node.get(node_id)
|
||||
view = self._build_node_view(
|
||||
tenant_id=app_model.tenant_id,
|
||||
app_id=app_model.id,
|
||||
workflow_id=workflow_run.workflow_id,
|
||||
raw_node=raw_node,
|
||||
execution=execution,
|
||||
)
|
||||
node_views.append(view)
|
||||
|
||||
return WorkflowRunSnapshotView(
|
||||
workflow_run_id=workflow_run.id,
|
||||
workflow_run_status=workflow_run.status,
|
||||
node_outputs=node_views,
|
||||
)
|
||||
|
||||
def node_detail(self, *, app_model: App, workflow_run_id: str, node_id: str) -> NodeOutputsView:
|
||||
"""Per-node Inspector entry — returns one ``NodeOutputsView``."""
|
||||
workflow_run, executions = self._load_run_and_executions(app_model=app_model, workflow_run_id=workflow_run_id)
|
||||
graph_nodes = _graph_nodes(workflow_run)
|
||||
raw_node = next((n for n in graph_nodes if str(n.get("id")) == node_id), None)
|
||||
if raw_node is None:
|
||||
raise NodeOutputInspectorError(
|
||||
"node_not_in_workflow_run",
|
||||
f"Node {node_id!r} does not appear in workflow run {workflow_run_id!r}.",
|
||||
)
|
||||
|
||||
execution = self._index_executions_by_node(executions).get(node_id)
|
||||
return self._build_node_view(
|
||||
tenant_id=app_model.tenant_id,
|
||||
app_id=app_model.id,
|
||||
workflow_id=workflow_run.workflow_id,
|
||||
raw_node=raw_node,
|
||||
execution=execution,
|
||||
)
|
||||
|
||||
def output_preview(
|
||||
self,
|
||||
*,
|
||||
app_model: App,
|
||||
workflow_run_id: str,
|
||||
node_id: str,
|
||||
output_name: str,
|
||||
) -> OutputPreviewView:
|
||||
"""Full payload for one declared output (with signed file URL)."""
|
||||
detail = self.node_detail(
|
||||
app_model=app_model,
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id=node_id,
|
||||
)
|
||||
view = next((o for o in detail.outputs if o.name == output_name), None)
|
||||
if view is None:
|
||||
raise NodeOutputInspectorError(
|
||||
"node_output_not_declared",
|
||||
f"Output {output_name!r} is not declared on node {node_id!r}.",
|
||||
)
|
||||
|
||||
# ``node_detail`` already produced a truncated value_preview; reload
|
||||
# the raw value from the execution payload so the preview endpoint can
|
||||
# return the full thing (still wrapped through ``_full_value`` for
|
||||
# signed file URLs).
|
||||
execution = self._index_executions_by_node(
|
||||
self._load_run_and_executions(app_model=app_model, workflow_run_id=workflow_run_id)[1]
|
||||
).get(node_id)
|
||||
full_value: Any = None
|
||||
if execution is not None:
|
||||
outputs = _decode_json_blob(execution.outputs) or {}
|
||||
if output_name in outputs:
|
||||
full_value = _full_value(outputs[output_name])
|
||||
|
||||
return OutputPreviewView(
|
||||
node_id=node_id,
|
||||
output_name=output_name,
|
||||
type=view.type,
|
||||
status=view.status,
|
||||
value=full_value,
|
||||
)
|
||||
|
||||
# ── DB loading ────────────────────────────────────────────────────────
|
||||
|
||||
def _load_run_and_executions(
|
||||
self, *, app_model: App, workflow_run_id: str
|
||||
) -> tuple[WorkflowRun, Sequence[WorkflowNodeExecutionModel]]:
|
||||
"""Fetch the ``WorkflowRun`` row + every execution that belongs to it.
|
||||
|
||||
Enforces:
|
||||
* row exists,
|
||||
* row belongs to the app's tenant + app.
|
||||
|
||||
The trigger source (DEBUGGING vs. APP_RUN / WEBHOOK / SCHEDULE / ...) is
|
||||
deliberately not checked here — D-1 was lifted 2026-05-26 and the
|
||||
Inspector now serves both draft and published runs.
|
||||
"""
|
||||
with session_factory.create_session() as session:
|
||||
workflow_run = session.scalar(
|
||||
select(WorkflowRun).where(
|
||||
WorkflowRun.id == workflow_run_id,
|
||||
WorkflowRun.app_id == app_model.id,
|
||||
WorkflowRun.tenant_id == app_model.tenant_id,
|
||||
)
|
||||
)
|
||||
if workflow_run is None:
|
||||
raise NodeOutputInspectorError("workflow_run_not_found", "Workflow run not found.")
|
||||
|
||||
executions = session.scalars(
|
||||
select(WorkflowNodeExecutionModel).where(
|
||||
WorkflowNodeExecutionModel.workflow_run_id == workflow_run_id,
|
||||
WorkflowNodeExecutionModel.tenant_id == app_model.tenant_id,
|
||||
WorkflowNodeExecutionModel.app_id == app_model.id,
|
||||
)
|
||||
).all()
|
||||
|
||||
return workflow_run, executions
|
||||
|
||||
@staticmethod
|
||||
def _index_executions_by_node(
|
||||
executions: Sequence[WorkflowNodeExecutionModel],
|
||||
) -> dict[str, WorkflowNodeExecutionModel]:
|
||||
"""Keep the latest execution per ``node_id``.
|
||||
|
||||
A given node may have multiple rows when retries or iterations occur;
|
||||
``index`` is the per-run sequence counter, so we keep the one with
|
||||
the highest index as the canonical "current" view.
|
||||
"""
|
||||
latest: dict[str, WorkflowNodeExecutionModel] = {}
|
||||
for execution in executions:
|
||||
existing = latest.get(execution.node_id)
|
||||
if existing is None or execution.index > existing.index:
|
||||
latest[execution.node_id] = execution
|
||||
return latest
|
||||
|
||||
# ── Per-node view construction ────────────────────────────────────────
|
||||
|
||||
def _build_node_view(
|
||||
self,
|
||||
*,
|
||||
tenant_id: str,
|
||||
app_id: str,
|
||||
workflow_id: str,
|
||||
raw_node: Mapping[str, Any],
|
||||
execution: WorkflowNodeExecutionModel | None,
|
||||
) -> NodeOutputsView:
|
||||
node_id = str(raw_node["id"])
|
||||
data = raw_node.get("data") or {}
|
||||
if not isinstance(data, Mapping):
|
||||
data = {}
|
||||
|
||||
node_kind = str(data.get("type") or (execution.node_type if execution else "") or "unknown")
|
||||
display_name = str(data.get("title") or (execution.title if execution else node_id))
|
||||
node_status = _node_status_for(execution)
|
||||
|
||||
declarations = self._resolve_declared_outputs(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_id=workflow_id,
|
||||
node_id=node_id,
|
||||
raw_node=raw_node,
|
||||
execution=execution,
|
||||
)
|
||||
|
||||
outputs_dict = _decode_json_blob(execution.outputs) if execution else None
|
||||
metadata_dict = _decode_json_blob(execution.execution_metadata) if execution else None
|
||||
type_check_by_name = _type_check_by_name(metadata_dict)
|
||||
output_check_by_name = _output_check_by_name(metadata_dict)
|
||||
retried = _retried_attempt_count(metadata_dict)
|
||||
|
||||
output_views: list[NodeOutputView] = []
|
||||
for declaration in declarations:
|
||||
output_views.append(
|
||||
self._build_output_view(
|
||||
declaration=declaration,
|
||||
node_status=node_status,
|
||||
outputs_dict=outputs_dict,
|
||||
type_check_by_name=type_check_by_name,
|
||||
output_check_by_name=output_check_by_name,
|
||||
retried=retried,
|
||||
)
|
||||
)
|
||||
|
||||
return NodeOutputsView(
|
||||
node_id=node_id,
|
||||
node_kind=node_kind,
|
||||
node_display_name=display_name,
|
||||
node_status=node_status,
|
||||
node_started_at=execution.created_at if execution else None,
|
||||
node_completed_at=execution.finished_at if execution else None,
|
||||
outputs=output_views,
|
||||
)
|
||||
|
||||
def _build_output_view(
|
||||
self,
|
||||
*,
|
||||
declaration: _ResolvedDeclaration,
|
||||
node_status: NodeStatus,
|
||||
outputs_dict: Mapping[str, Any] | None,
|
||||
type_check_by_name: Mapping[str, Mapping[str, Any]],
|
||||
output_check_by_name: Mapping[str, Mapping[str, Any]],
|
||||
retried: int,
|
||||
) -> NodeOutputView:
|
||||
name = declaration.name
|
||||
declared_type = declaration.declared_type
|
||||
|
||||
if node_status == NodeStatus.IDLE:
|
||||
return NodeOutputView(
|
||||
name=name,
|
||||
type=declared_type,
|
||||
status=NodeOutputStatus.PENDING,
|
||||
retried=retried,
|
||||
)
|
||||
if node_status == NodeStatus.RUNNING:
|
||||
return NodeOutputView(
|
||||
name=name,
|
||||
type=declared_type,
|
||||
status=NodeOutputStatus.RUNNING,
|
||||
retried=retried,
|
||||
)
|
||||
if node_status == NodeStatus.FAILED:
|
||||
return NodeOutputView(
|
||||
name=name,
|
||||
type=declared_type,
|
||||
status=NodeOutputStatus.FAILED,
|
||||
retried=retried,
|
||||
)
|
||||
|
||||
# ── node succeeded ────────────────────────────────────────────
|
||||
type_check_result = type_check_by_name.get(name)
|
||||
output_check_result = output_check_by_name.get(name)
|
||||
type_check_view = self._coerce_check_view(type_check_result)
|
||||
output_check_view = self._coerce_check_view(output_check_result)
|
||||
|
||||
# type check loses first; output check next; otherwise ready.
|
||||
status: NodeOutputStatus
|
||||
if type_check_result and not _is_passing(type_check_result):
|
||||
status = NodeOutputStatus.TYPE_CHECK_FAILED
|
||||
elif output_check_result and not _is_passing(output_check_result):
|
||||
status = NodeOutputStatus.OUTPUT_CHECK_FAILED
|
||||
elif outputs_dict is not None and name not in outputs_dict:
|
||||
status = NodeOutputStatus.NOT_PRODUCED
|
||||
else:
|
||||
status = NodeOutputStatus.READY
|
||||
|
||||
value_preview = _value_preview(outputs_dict.get(name)) if outputs_dict and name in outputs_dict else None
|
||||
|
||||
return NodeOutputView(
|
||||
name=name,
|
||||
type=declared_type,
|
||||
status=status,
|
||||
value_preview=value_preview,
|
||||
type_check=type_check_view,
|
||||
output_check=output_check_view,
|
||||
retried=retried,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _coerce_check_view(result: Mapping[str, Any] | None) -> CheckResultView | None:
|
||||
if not result:
|
||||
return None
|
||||
# type_check rows use ``status``; output_check rows use ``status`` too —
|
||||
# both record per-output state. We treat ``status == "ready"``/"passed"
|
||||
# as passing and everything else as failing, so the view stays
|
||||
# stable regardless of which producer wrote the metadata.
|
||||
return CheckResultView(passed=_is_passing(result), reason=result.get("reason"))
|
||||
|
||||
# ── Declared-output resolution ────────────────────────────────────────
|
||||
|
||||
def _resolve_declared_outputs(
|
||||
self,
|
||||
*,
|
||||
tenant_id: str,
|
||||
app_id: str,
|
||||
workflow_id: str,
|
||||
node_id: str,
|
||||
raw_node: Mapping[str, Any],
|
||||
execution: WorkflowNodeExecutionModel | None,
|
||||
) -> list[_ResolvedDeclaration]:
|
||||
if _is_agent_v2_node(raw_node):
|
||||
agent_decl = self._declared_outputs_for_agent_v2(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_id=workflow_id,
|
||||
node_id=node_id,
|
||||
)
|
||||
if agent_decl is not None:
|
||||
return [_ResolvedDeclaration(name=o.name, declared_type=o.type, inferred=False) for o in agent_decl]
|
||||
|
||||
# Non-agent (or agent-binding-missing) fall back to inferring from the
|
||||
# produced payload so the Inspector still has something to show.
|
||||
return self._infer_outputs_from_payload(execution=execution)
|
||||
|
||||
def _declared_outputs_for_agent_v2(
|
||||
self,
|
||||
*,
|
||||
tenant_id: str,
|
||||
app_id: str,
|
||||
workflow_id: str,
|
||||
node_id: str,
|
||||
) -> list[DeclaredOutputConfig] | None:
|
||||
try:
|
||||
bundle = self._binding_resolver.resolve(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_id=workflow_id,
|
||||
node_id=node_id,
|
||||
)
|
||||
except WorkflowAgentBindingError:
|
||||
return None
|
||||
try:
|
||||
from models.agent_config_entities import WorkflowNodeJobConfig
|
||||
|
||||
node_job = WorkflowNodeJobConfig.model_validate(bundle.binding.node_job_config_dict)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"NodeOutputInspector: malformed node_job_config for binding %s", bundle.binding.id, exc_info=True
|
||||
)
|
||||
return None
|
||||
return list(WorkflowAgentRuntimeRequestBuilder.effective_declared_outputs(list(node_job.declared_outputs)))
|
||||
|
||||
@staticmethod
|
||||
def _infer_outputs_from_payload(*, execution: WorkflowNodeExecutionModel | None) -> list[_ResolvedDeclaration]:
|
||||
if execution is None:
|
||||
return []
|
||||
outputs = _decode_json_blob(execution.outputs)
|
||||
if not outputs:
|
||||
return []
|
||||
return [_ResolvedDeclaration(name=name, declared_type=None, inferred=True) for name in outputs]
|
||||
|
||||
|
||||
def _is_passing(result: Mapping[str, Any]) -> bool:
|
||||
"""A check-result row is "passing" when its ``status`` is the ready/passed
|
||||
sentinel emitted by the type-checker / output-check executor."""
|
||||
status = result.get("status")
|
||||
if status in {"ready", "passed", "not_produced"}:
|
||||
return True
|
||||
return False
|
||||
@ -0,0 +1,475 @@
|
||||
"""End-to-end tests for ``NodeOutputInspectorService`` (Stage 4 §8 / ENG-373).
|
||||
|
||||
These integration tests exercise the service against a real Postgres
|
||||
(``dify-db-1``) — same pattern as :mod:`test_remove_app_and_related_data_task`:
|
||||
seed rows via ``session_factory.create_session()`` with explicit commits,
|
||||
exercise the service, clean up by ID at teardown.
|
||||
|
||||
Coverage:
|
||||
1. Snapshot for a draft run with one agent v2 node + one tool node
|
||||
2. Type-check failure visible in snapshot
|
||||
3. Output-check failure visible in snapshot
|
||||
4. Published run returns ``published_run_inspector_not_implemented``
|
||||
5. Cross-tenant access returns ``workflow_run_not_found``
|
||||
6. File output preview endpoint returns full value with signed URL
|
||||
7. ``node_detail`` path serves a single node view
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import uuid
|
||||
from collections.abc import Generator
|
||||
from datetime import UTC, datetime
|
||||
from types import SimpleNamespace
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import delete
|
||||
|
||||
from core.db.session_factory import session_factory
|
||||
from graphon.enums import WorkflowExecutionStatus, WorkflowNodeExecutionStatus
|
||||
from models.enums import CreatorUserRole, WorkflowRunTriggeredFrom
|
||||
from models.workflow import (
|
||||
WorkflowNodeExecutionModel,
|
||||
WorkflowNodeExecutionTriggeredFrom,
|
||||
WorkflowRun,
|
||||
WorkflowType,
|
||||
)
|
||||
from services.workflow.node_output_inspector_service import (
|
||||
NodeOutputInspectorError,
|
||||
NodeOutputInspectorService,
|
||||
NodeOutputStatus,
|
||||
NodeStatus,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fake_app_model() -> SimpleNamespace:
|
||||
"""Lightweight stand-in for the ``App`` model that the service consumes.
|
||||
|
||||
``App`` is only read for ``id`` and ``tenant_id``; the service does not
|
||||
poke at any ORM relationship so a SimpleNamespace is enough — and it
|
||||
keeps us free of needing the ``apps`` row to actually exist (which would
|
||||
drag in Account / Tenant setup).
|
||||
"""
|
||||
return SimpleNamespace(
|
||||
id=str(uuid.uuid4()),
|
||||
tenant_id=str(uuid.uuid4()),
|
||||
)
|
||||
|
||||
|
||||
def _make_workflow_run(
|
||||
*,
|
||||
app_id: str,
|
||||
tenant_id: str,
|
||||
triggered_from: WorkflowRunTriggeredFrom = WorkflowRunTriggeredFrom.DEBUGGING,
|
||||
status: WorkflowExecutionStatus = WorkflowExecutionStatus.RUNNING,
|
||||
graph: dict[str, Any] | None = None,
|
||||
) -> WorkflowRun:
|
||||
"""Build a ``WorkflowRun`` row with all required fields populated."""
|
||||
return WorkflowRun(
|
||||
id=str(uuid.uuid4()),
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_id=str(uuid.uuid4()),
|
||||
type=WorkflowType.WORKFLOW,
|
||||
triggered_from=triggered_from,
|
||||
version="draft",
|
||||
graph=json.dumps(graph or {"nodes": []}),
|
||||
status=status,
|
||||
created_by_role=CreatorUserRole.ACCOUNT,
|
||||
created_by=str(uuid.uuid4()),
|
||||
)
|
||||
|
||||
|
||||
def _make_execution(
|
||||
*,
|
||||
app_id: str,
|
||||
tenant_id: str,
|
||||
workflow_id: str,
|
||||
workflow_run_id: str,
|
||||
node_id: str,
|
||||
node_type: str = "agent",
|
||||
title: str = "",
|
||||
status: WorkflowNodeExecutionStatus = WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
outputs: dict[str, Any] | None = None,
|
||||
execution_metadata: dict[str, Any] | None = None,
|
||||
index: int = 1,
|
||||
) -> WorkflowNodeExecutionModel:
|
||||
"""Build a ``WorkflowNodeExecutionModel`` row with all required fields."""
|
||||
return WorkflowNodeExecutionModel(
|
||||
id=str(uuid.uuid4()),
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
workflow_id=workflow_id,
|
||||
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
|
||||
workflow_run_id=workflow_run_id,
|
||||
index=index,
|
||||
node_id=node_id,
|
||||
node_type=node_type,
|
||||
title=title or node_id,
|
||||
status=status,
|
||||
outputs=json.dumps(outputs) if outputs is not None else None,
|
||||
execution_metadata=json.dumps(execution_metadata) if execution_metadata is not None else None,
|
||||
created_by_role=CreatorUserRole.ACCOUNT,
|
||||
created_by=str(uuid.uuid4()),
|
||||
created_at=datetime.now(UTC),
|
||||
finished_at=datetime.now(UTC),
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def seeded_run(
|
||||
flask_req_ctx, fake_app_model: SimpleNamespace
|
||||
) -> Generator[tuple[SimpleNamespace, WorkflowRun, list[WorkflowNodeExecutionModel]], None, None]:
|
||||
"""Seed one debug ``WorkflowRun`` + 2 node executions in real Postgres.
|
||||
|
||||
Yields ``(app_model, workflow_run, executions)``. Cleans both rows up at
|
||||
teardown via direct ``DELETE`` so a failed test never leaves debris.
|
||||
"""
|
||||
graph = {
|
||||
"nodes": [
|
||||
{
|
||||
"id": "agent-node-1",
|
||||
"data": {"type": "agent", "version": "2", "title": "My Agent"},
|
||||
},
|
||||
{
|
||||
"id": "tool-node-1",
|
||||
"data": {"type": "tool", "title": "Slack"},
|
||||
},
|
||||
]
|
||||
}
|
||||
workflow_run = _make_workflow_run(
|
||||
app_id=fake_app_model.id,
|
||||
tenant_id=fake_app_model.tenant_id,
|
||||
graph=graph,
|
||||
)
|
||||
agent_execution = _make_execution(
|
||||
app_id=fake_app_model.id,
|
||||
tenant_id=fake_app_model.tenant_id,
|
||||
workflow_id=workflow_run.workflow_id,
|
||||
workflow_run_id=workflow_run.id,
|
||||
node_id="agent-node-1",
|
||||
node_type="agent",
|
||||
outputs={"text": "hello world"},
|
||||
execution_metadata={
|
||||
"output_type_check": {
|
||||
"passed": True,
|
||||
"results": [{"name": "text", "type": "string", "status": "ready"}],
|
||||
},
|
||||
"attempt": 0,
|
||||
},
|
||||
index=1,
|
||||
)
|
||||
tool_execution = _make_execution(
|
||||
app_id=fake_app_model.id,
|
||||
tenant_id=fake_app_model.tenant_id,
|
||||
workflow_id=workflow_run.workflow_id,
|
||||
workflow_run_id=workflow_run.id,
|
||||
node_id="tool-node-1",
|
||||
node_type="tool",
|
||||
outputs={"message": "sent", "ok": True},
|
||||
index=2,
|
||||
)
|
||||
|
||||
with session_factory.create_session() as session:
|
||||
session.add(workflow_run)
|
||||
session.add(agent_execution)
|
||||
session.add(tool_execution)
|
||||
session.commit()
|
||||
run_id = workflow_run.id
|
||||
execution_ids = [agent_execution.id, tool_execution.id]
|
||||
|
||||
try:
|
||||
yield fake_app_model, workflow_run, [agent_execution, tool_execution]
|
||||
finally:
|
||||
with session_factory.create_session() as session:
|
||||
session.execute(delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id.in_(execution_ids)))
|
||||
session.execute(delete(WorkflowRun).where(WorkflowRun.id == run_id))
|
||||
session.commit()
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Stub binding resolver — declared outputs for the agent v2 node
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _stub_resolver(declared_outputs_payload: list[dict[str, Any]]):
|
||||
"""Return a stand-in binding resolver whose ``.resolve`` always returns
|
||||
one bundle with the supplied declared_outputs.
|
||||
|
||||
The real resolver hits ``workflow_agent_node_bindings``; we skip that
|
||||
table here so the Inspector can be tested without binding-row setup.
|
||||
"""
|
||||
binding = SimpleNamespace(
|
||||
id="binding-1",
|
||||
node_job_config_dict={
|
||||
"workflow_prompt": "stub",
|
||||
"declared_outputs": declared_outputs_payload,
|
||||
},
|
||||
)
|
||||
bundle = SimpleNamespace(binding=binding, agent=None, snapshot=None)
|
||||
|
||||
class _Resolver:
|
||||
def resolve(self, **_: Any):
|
||||
return bundle
|
||||
|
||||
return _Resolver()
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Tests
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_snapshot_returns_agent_v2_declared_outputs_with_status_ready(seeded_run):
|
||||
"""Happy path: agent v2 node + tool node both render, statuses come from
|
||||
real ``WorkflowRun`` + ``WorkflowNodeExecutionModel`` rows."""
|
||||
app_model, workflow_run, _ = seeded_run
|
||||
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([{"name": "text", "type": "string"}]))
|
||||
snapshot = service.snapshot_workflow_run(
|
||||
app_model=app_model,
|
||||
workflow_run_id=workflow_run.id,
|
||||
)
|
||||
|
||||
assert snapshot.workflow_run_id == workflow_run.id
|
||||
assert snapshot.workflow_run_status == WorkflowExecutionStatus.RUNNING
|
||||
|
||||
by_node = {n.node_id: n for n in snapshot.node_outputs}
|
||||
|
||||
agent_view = by_node["agent-node-1"]
|
||||
assert agent_view.node_status == NodeStatus.READY
|
||||
assert agent_view.outputs[0].name == "text"
|
||||
assert agent_view.outputs[0].status == NodeOutputStatus.READY
|
||||
assert agent_view.outputs[0].value_preview == "hello world"
|
||||
|
||||
tool_view = by_node["tool-node-1"]
|
||||
# Tool node's declared outputs are *inferred* from the produced payload.
|
||||
output_names = sorted(o.name for o in tool_view.outputs)
|
||||
assert output_names == ["message", "ok"]
|
||||
assert all(o.type is None for o in tool_view.outputs)
|
||||
|
||||
|
||||
def test_snapshot_404s_for_missing_run(fake_app_model):
|
||||
"""Service raises ``workflow_run_not_found`` when the row doesn't exist."""
|
||||
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([]))
|
||||
with pytest.raises(NodeOutputInspectorError) as exc:
|
||||
service.snapshot_workflow_run(app_model=fake_app_model, workflow_run_id=str(uuid.uuid4()))
|
||||
assert exc.value.code == "workflow_run_not_found"
|
||||
|
||||
|
||||
def test_snapshot_404s_for_cross_tenant_access(seeded_run):
|
||||
"""A wrong-tenant app_model must not see another tenant's run."""
|
||||
_, workflow_run, _ = seeded_run
|
||||
intruder = SimpleNamespace(id=str(uuid.uuid4()), tenant_id=str(uuid.uuid4()))
|
||||
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([]))
|
||||
with pytest.raises(NodeOutputInspectorError) as exc:
|
||||
service.snapshot_workflow_run(app_model=intruder, workflow_run_id=workflow_run.id)
|
||||
assert exc.value.code == "workflow_run_not_found"
|
||||
|
||||
|
||||
def test_snapshot_404s_for_published_run_per_decision_d1(flask_req_ctx, fake_app_model):
|
||||
"""Decision D-1: published / app-run Inspector deferred to stage 4.1."""
|
||||
workflow_run = _make_workflow_run(
|
||||
app_id=fake_app_model.id,
|
||||
tenant_id=fake_app_model.tenant_id,
|
||||
triggered_from=WorkflowRunTriggeredFrom.APP_RUN,
|
||||
graph={"nodes": []},
|
||||
)
|
||||
with session_factory.create_session() as session:
|
||||
session.add(workflow_run)
|
||||
session.commit()
|
||||
run_id = workflow_run.id
|
||||
|
||||
try:
|
||||
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([]))
|
||||
with pytest.raises(NodeOutputInspectorError) as exc:
|
||||
service.snapshot_workflow_run(app_model=fake_app_model, workflow_run_id=run_id)
|
||||
assert exc.value.code == "published_run_inspector_not_implemented"
|
||||
finally:
|
||||
with session_factory.create_session() as session:
|
||||
session.execute(delete(WorkflowRun).where(WorkflowRun.id == run_id))
|
||||
session.commit()
|
||||
|
||||
|
||||
def test_snapshot_surfaces_type_check_failure_from_metadata(flask_req_ctx, fake_app_model):
|
||||
"""Per-output ``TYPE_CHECK_FAILED`` derived from the metadata blob the
|
||||
Stage 4 §5 stack records on the execution row."""
|
||||
graph = {"nodes": [{"id": "agent-1", "data": {"type": "agent", "version": "2"}}]}
|
||||
workflow_run = _make_workflow_run(app_id=fake_app_model.id, tenant_id=fake_app_model.tenant_id, graph=graph)
|
||||
execution = _make_execution(
|
||||
app_id=fake_app_model.id,
|
||||
tenant_id=fake_app_model.tenant_id,
|
||||
workflow_id=workflow_run.workflow_id,
|
||||
workflow_run_id=workflow_run.id,
|
||||
node_id="agent-1",
|
||||
outputs={"summary": 123}, # int despite declared string
|
||||
execution_metadata={
|
||||
"output_type_check": {
|
||||
"passed": False,
|
||||
"results": [
|
||||
{
|
||||
"name": "summary",
|
||||
"type": "string",
|
||||
"status": "type_check_failed",
|
||||
"reason": "expected string, got int",
|
||||
}
|
||||
],
|
||||
}
|
||||
},
|
||||
)
|
||||
with session_factory.create_session() as session:
|
||||
session.add(workflow_run)
|
||||
session.add(execution)
|
||||
session.commit()
|
||||
run_id, execution_id = workflow_run.id, execution.id
|
||||
|
||||
try:
|
||||
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([{"name": "summary", "type": "string"}]))
|
||||
snapshot = service.snapshot_workflow_run(app_model=fake_app_model, workflow_run_id=run_id)
|
||||
output = snapshot.node_outputs[0].outputs[0]
|
||||
assert output.status == NodeOutputStatus.TYPE_CHECK_FAILED
|
||||
assert output.type_check is not None
|
||||
assert output.type_check.passed is False
|
||||
assert output.type_check.reason == "expected string, got int"
|
||||
finally:
|
||||
with session_factory.create_session() as session:
|
||||
session.execute(delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id == execution_id))
|
||||
session.execute(delete(WorkflowRun).where(WorkflowRun.id == run_id))
|
||||
session.commit()
|
||||
|
||||
|
||||
def test_snapshot_surfaces_output_check_failure_from_metadata(flask_req_ctx, fake_app_model):
|
||||
"""When ``output_type_check.passed`` but ``output_check.passed=False``, the
|
||||
output is flagged ``OUTPUT_CHECK_FAILED``."""
|
||||
graph = {"nodes": [{"id": "agent-1", "data": {"type": "agent", "version": "2"}}]}
|
||||
workflow_run = _make_workflow_run(app_id=fake_app_model.id, tenant_id=fake_app_model.tenant_id, graph=graph)
|
||||
execution = _make_execution(
|
||||
app_id=fake_app_model.id,
|
||||
tenant_id=fake_app_model.tenant_id,
|
||||
workflow_id=workflow_run.workflow_id,
|
||||
workflow_run_id=workflow_run.id,
|
||||
node_id="agent-1",
|
||||
outputs={"report": {"file_id": "550e8400-e29b-41d4-a716-446655440000"}},
|
||||
execution_metadata={
|
||||
"output_type_check": {"passed": True, "results": [{"name": "report", "status": "ready"}]},
|
||||
"output_check": {
|
||||
"passed": False,
|
||||
"results": [{"name": "report", "status": "failed", "reason": "benchmark mismatch"}],
|
||||
},
|
||||
},
|
||||
)
|
||||
with session_factory.create_session() as session:
|
||||
session.add(workflow_run)
|
||||
session.add(execution)
|
||||
session.commit()
|
||||
run_id, execution_id = workflow_run.id, execution.id
|
||||
|
||||
try:
|
||||
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([{"name": "report", "type": "file"}]))
|
||||
# Stub signed-URL so we don't depend on the workflow file runtime being
|
||||
# bound (it isn't, in this minimal flask_req_ctx).
|
||||
with patch(
|
||||
"services.workflow.node_output_inspector_service.file_helpers.get_signed_file_url",
|
||||
return_value="https://signed.example/report",
|
||||
):
|
||||
snapshot = service.snapshot_workflow_run(app_model=fake_app_model, workflow_run_id=run_id)
|
||||
output = snapshot.node_outputs[0].outputs[0]
|
||||
assert output.status == NodeOutputStatus.OUTPUT_CHECK_FAILED
|
||||
assert output.output_check is not None
|
||||
assert output.output_check.passed is False
|
||||
assert output.output_check.reason == "benchmark mismatch"
|
||||
finally:
|
||||
with session_factory.create_session() as session:
|
||||
session.execute(delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id == execution_id))
|
||||
session.execute(delete(WorkflowRun).where(WorkflowRun.id == run_id))
|
||||
session.commit()
|
||||
|
||||
|
||||
def test_node_detail_serves_one_node(seeded_run):
|
||||
app_model, workflow_run, _ = seeded_run
|
||||
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([{"name": "text", "type": "string"}]))
|
||||
view = service.node_detail(
|
||||
app_model=app_model,
|
||||
workflow_run_id=workflow_run.id,
|
||||
node_id="agent-node-1",
|
||||
)
|
||||
assert view.node_id == "agent-node-1"
|
||||
assert view.outputs[0].name == "text"
|
||||
|
||||
|
||||
def test_output_preview_for_file_renders_signed_url(seeded_run, fake_app_model):
|
||||
"""``preview`` returns the full value with signed_url for file refs."""
|
||||
# Replace the seeded agent execution's output with a file ref.
|
||||
_, workflow_run, executions = seeded_run
|
||||
agent_execution = executions[0]
|
||||
with session_factory.create_session() as session:
|
||||
# Re-bind the persisted row so we can mutate + commit.
|
||||
from sqlalchemy import select
|
||||
|
||||
row = session.scalar(
|
||||
select(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id == agent_execution.id)
|
||||
)
|
||||
assert row is not None
|
||||
row.outputs = json.dumps({"text": {"file_id": "550e8400-e29b-41d4-a716-446655440000", "filename": "x.pdf"}})
|
||||
session.commit()
|
||||
|
||||
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([{"name": "text", "type": "file"}]))
|
||||
with patch(
|
||||
"services.workflow.node_output_inspector_service.file_helpers.get_signed_file_url",
|
||||
return_value="https://signed.example/x.pdf",
|
||||
):
|
||||
preview = service.output_preview(
|
||||
app_model=fake_app_model,
|
||||
workflow_run_id=workflow_run.id,
|
||||
node_id="agent-node-1",
|
||||
output_name="text",
|
||||
)
|
||||
assert preview.output_name == "text"
|
||||
assert isinstance(preview.value, dict)
|
||||
assert preview.value["preview_url"] == "https://signed.example/x.pdf"
|
||||
assert preview.value["filename"] == "x.pdf"
|
||||
|
||||
|
||||
def test_keeps_latest_execution_per_node_by_index(flask_req_ctx, fake_app_model):
|
||||
"""Multiple executions for the same node_id → service keeps the highest
|
||||
``index`` (matches the agent_v2 retry pattern that re-emits node
|
||||
executions)."""
|
||||
graph = {"nodes": [{"id": "agent-1", "data": {"type": "agent", "version": "2"}}]}
|
||||
workflow_run = _make_workflow_run(app_id=fake_app_model.id, tenant_id=fake_app_model.tenant_id, graph=graph)
|
||||
older = _make_execution(
|
||||
app_id=fake_app_model.id,
|
||||
tenant_id=fake_app_model.tenant_id,
|
||||
workflow_id=workflow_run.workflow_id,
|
||||
workflow_run_id=workflow_run.id,
|
||||
node_id="agent-1",
|
||||
outputs={"text": "first attempt"},
|
||||
index=1,
|
||||
)
|
||||
newer = _make_execution(
|
||||
app_id=fake_app_model.id,
|
||||
tenant_id=fake_app_model.tenant_id,
|
||||
workflow_id=workflow_run.workflow_id,
|
||||
workflow_run_id=workflow_run.id,
|
||||
node_id="agent-1",
|
||||
outputs={"text": "second attempt"},
|
||||
index=5,
|
||||
)
|
||||
with session_factory.create_session() as session:
|
||||
session.add(workflow_run)
|
||||
session.add(older)
|
||||
session.add(newer)
|
||||
session.commit()
|
||||
run_id, ex_ids = workflow_run.id, [older.id, newer.id]
|
||||
|
||||
try:
|
||||
service = NodeOutputInspectorService(binding_resolver=_stub_resolver([{"name": "text", "type": "string"}]))
|
||||
snapshot = service.snapshot_workflow_run(app_model=fake_app_model, workflow_run_id=run_id)
|
||||
assert snapshot.node_outputs[0].outputs[0].value_preview == "second attempt"
|
||||
finally:
|
||||
with session_factory.create_session() as session:
|
||||
session.execute(delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id.in_(ex_ids)))
|
||||
session.execute(delete(WorkflowRun).where(WorkflowRun.id == run_id))
|
||||
session.commit()
|
||||
@ -1,7 +1,12 @@
|
||||
import pytest
|
||||
from agenton.layers import ExitIntent
|
||||
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID
|
||||
from dify_agent.layers.dify_plugin import DIFY_PLUGIN_LLM_LAYER_TYPE_ID
|
||||
from dify_agent.layers.dify_plugin import (
|
||||
DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
|
||||
DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID,
|
||||
DifyPluginToolConfig,
|
||||
DifyPluginToolsLayerConfig,
|
||||
)
|
||||
from dify_agent.layers.execution_context import DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID, DifyExecutionContextLayerConfig
|
||||
from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID
|
||||
from dify_agent.protocol import (
|
||||
@ -14,6 +19,7 @@ from pydantic import ValidationError
|
||||
from clients.agent_backend import (
|
||||
AGENT_SOUL_PROMPT_LAYER_ID,
|
||||
DIFY_EXECUTION_CONTEXT_LAYER_ID,
|
||||
DIFY_PLUGIN_TOOLS_LAYER_ID,
|
||||
WORKFLOW_NODE_JOB_PROMPT_LAYER_ID,
|
||||
WORKFLOW_USER_PROMPT_LAYER_ID,
|
||||
AgentBackendModelConfig,
|
||||
@ -100,6 +106,33 @@ def test_request_builder_sets_model_and_output_layer_contract_ids():
|
||||
assert layers[DIFY_AGENT_OUTPUT_LAYER_ID].type == DIFY_OUTPUT_LAYER_TYPE_ID
|
||||
|
||||
|
||||
def test_request_builder_adds_dify_plugin_tools_layer_when_configured():
|
||||
run_input = _run_input()
|
||||
run_input.tools = DifyPluginToolsLayerConfig(
|
||||
tools=[
|
||||
DifyPluginToolConfig(
|
||||
plugin_id="langgenius/time",
|
||||
provider="time",
|
||||
tool_name="current_time",
|
||||
credential_type="unauthorized",
|
||||
name="current_time",
|
||||
description="Get current time.",
|
||||
credentials={},
|
||||
runtime_parameters={},
|
||||
parameters=[],
|
||||
parameters_json_schema={"type": "object", "properties": {}, "required": []},
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
request = AgentBackendRunRequestBuilder().build_for_workflow_node(run_input)
|
||||
layers = {layer.name: layer for layer in request.composition.layers}
|
||||
|
||||
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID].type == DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID
|
||||
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID].deps == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}
|
||||
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID].config.tools[0].tool_name == "current_time"
|
||||
|
||||
|
||||
def test_request_builder_can_suspend_on_exit_for_resume_or_babysit_paths():
|
||||
run_input = _run_input()
|
||||
run_input.suspend_on_exit = True
|
||||
|
||||
@ -0,0 +1,454 @@
|
||||
"""Unit tests for the Node Output Inspector controller (Stage 4 §8).
|
||||
|
||||
The controller has two non-trivial moving parts:
|
||||
|
||||
1. :func:`_sse_envelope` — wire-format builder for the SSE ``{event, data, id}``
|
||||
records (decision D-5).
|
||||
2. :func:`_stream_inspector_events` — the SSE generator that fans the redis
|
||||
pub/sub stream out as snapshot / node_changed / workflow_run_completed /
|
||||
error events.
|
||||
|
||||
We exercise both as plain functions with mocked dependencies (service +
|
||||
``inspector_events.subscribe``) — going through Flask routes would multiply
|
||||
the test scaffolding without buying additional confidence in the core
|
||||
behaviour.
|
||||
|
||||
The Resource classes themselves are trivial wrappers (``_service().method()``
|
||||
+ ``_InspectorNotFound`` translation), and are touched here only by import so
|
||||
codecov sees them as exercised; their detailed behaviour is locked down by
|
||||
the service-level tests in
|
||||
``tests/unit_tests/services/workflow/test_node_output_inspector_service.py``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from collections.abc import Iterator
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock
|
||||
from uuid import UUID
|
||||
|
||||
import pytest
|
||||
|
||||
from controllers.console.app import workflow_node_output_inspector as ctrl
|
||||
from services.workflow.inspector_events import InspectorMessage
|
||||
from services.workflow.node_output_inspector_service import (
|
||||
NodeOutputInspectorError,
|
||||
NodeOutputStatus,
|
||||
NodeOutputsView,
|
||||
NodeStatus,
|
||||
WorkflowRunSnapshotView,
|
||||
)
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Fixtures
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app_model() -> Any:
|
||||
"""A minimal ``App`` stub the controller passes through to the service.
|
||||
|
||||
The SSE generator never reads its attributes — just forwards it — so a
|
||||
sentinel object is enough.
|
||||
"""
|
||||
return MagicMock(name="App", tenant_id="tenant-1", id="app-1")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def run_id() -> UUID:
|
||||
return UUID("00000000-0000-0000-0000-0000000000aa")
|
||||
|
||||
|
||||
def _snapshot_view(*, status: str, node_id: str = "agent-1") -> WorkflowRunSnapshotView:
|
||||
from graphon.enums import WorkflowExecutionStatus
|
||||
|
||||
return WorkflowRunSnapshotView(
|
||||
workflow_run_id="00000000-0000-0000-0000-0000000000aa",
|
||||
workflow_run_status=WorkflowExecutionStatus(status),
|
||||
node_outputs=[
|
||||
NodeOutputsView(
|
||||
node_id=node_id,
|
||||
node_kind="agent",
|
||||
node_display_name="Greeter",
|
||||
node_status=NodeStatus.RUNNING if status == "running" else NodeStatus.READY,
|
||||
outputs=[],
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
def _node_view(*, node_id: str = "agent-1", node_status: NodeStatus = NodeStatus.READY) -> NodeOutputsView:
|
||||
return NodeOutputsView(
|
||||
node_id=node_id,
|
||||
node_kind="agent",
|
||||
node_display_name="Greeter",
|
||||
node_status=node_status,
|
||||
outputs=[],
|
||||
)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# _sse_envelope
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_sse_envelope_serializes_dict_payload():
|
||||
out = ctrl._sse_envelope("snapshot", {"foo": "bar"}, 7)
|
||||
lines = out.rstrip("\n").split("\n")
|
||||
assert lines[0] == "event: snapshot"
|
||||
assert lines[1] == "id: 7"
|
||||
assert lines[2] == 'data: {"foo": "bar"}'
|
||||
assert out.endswith("\n\n") # SSE record separator
|
||||
|
||||
|
||||
def test_sse_envelope_passes_strings_through_unmodified():
|
||||
"""A raw string payload (e.g. ``:keepalive``) is emitted as-is."""
|
||||
out = ctrl._sse_envelope("snapshot", ":keepalive", 1)
|
||||
assert "data: :keepalive\n" in out
|
||||
|
||||
|
||||
def test_sse_envelope_handles_unicode_payload():
|
||||
out = ctrl._sse_envelope("node_changed", {"name": "你好"}, 3)
|
||||
assert "你好" in out # ensure_ascii=False
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# _stream_inspector_events — fast path (already-terminal run)
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _drain(stream: Iterator[str]) -> list[str]:
|
||||
return list(stream)
|
||||
|
||||
|
||||
def _parse(record: str) -> tuple[str, dict | None]:
|
||||
"""Pull ``event`` + ``data`` (json-decoded) out of one SSE record."""
|
||||
event = None
|
||||
data: dict | None = None
|
||||
for line in record.rstrip("\n").split("\n"):
|
||||
if line.startswith("event: "):
|
||||
event = line[len("event: ") :]
|
||||
elif line.startswith("data: "):
|
||||
try:
|
||||
data = json.loads(line[len("data: ") :])
|
||||
except json.JSONDecodeError:
|
||||
data = None
|
||||
assert event is not None
|
||||
return event, data
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patch_service(monkeypatch: pytest.MonkeyPatch):
|
||||
"""Replace ``_service()`` with a MagicMock per-test."""
|
||||
|
||||
fake = MagicMock()
|
||||
monkeypatch.setattr(ctrl, "_service", lambda: fake)
|
||||
return fake
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patch_subscribe(monkeypatch: pytest.MonkeyPatch):
|
||||
"""Patch the pub/sub subscribe iterator."""
|
||||
|
||||
def _make(messages: list[InspectorMessage | None]):
|
||||
def _subscribe(workflow_run_id: str, *, timeout_seconds: float = 1.0):
|
||||
for m in messages:
|
||||
if m is None:
|
||||
# heartbeat sentinel
|
||||
yield InspectorMessage(
|
||||
kind="node_changed",
|
||||
workflow_run_id=workflow_run_id,
|
||||
node_id=None,
|
||||
status=None,
|
||||
)
|
||||
else:
|
||||
yield m
|
||||
|
||||
monkeypatch.setattr(ctrl.inspector_events, "subscribe", _subscribe)
|
||||
|
||||
return _make
|
||||
|
||||
|
||||
def test_stream_fast_path_when_run_already_terminal(patch_service, app_model, run_id):
|
||||
"""A run that's already ``succeeded`` should produce ``snapshot`` →
|
||||
``workflow_run_completed`` and close without subscribing to pub/sub."""
|
||||
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="succeeded")
|
||||
records = _drain(ctrl._stream_inspector_events(app_model, run_id))
|
||||
assert len(records) == 2
|
||||
e0, d0 = _parse(records[0])
|
||||
e1, d1 = _parse(records[1])
|
||||
assert e0 == "snapshot"
|
||||
assert d0 is not None
|
||||
assert d0["workflow_run_status"] == "succeeded"
|
||||
assert e1 == "workflow_run_completed"
|
||||
assert d1 is not None
|
||||
assert d1["workflow_run_status"] == "succeeded"
|
||||
|
||||
|
||||
def test_stream_fast_path_each_terminal_status(patch_service, app_model, run_id):
|
||||
"""All four terminal statuses take the fast-path. Note the enum value for
|
||||
partial success is the hyphenated ``partial-succeeded``."""
|
||||
for terminal in ("succeeded", "failed", "stopped", "partial-succeeded"):
|
||||
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status=terminal)
|
||||
records = _drain(ctrl._stream_inspector_events(app_model, run_id))
|
||||
events = [_parse(r)[0] for r in records]
|
||||
assert events == ["snapshot", "workflow_run_completed"], terminal
|
||||
|
||||
|
||||
def test_stream_initial_404_propagates_before_any_bytes(patch_service, app_model, run_id):
|
||||
"""``NodeOutputInspectorError`` on the initial snapshot must surface as the
|
||||
controller's ``_InspectorNotFound`` exception so Flask returns HTTP 404
|
||||
— not a half-streamed SSE body."""
|
||||
patch_service.snapshot_workflow_run.side_effect = NodeOutputInspectorError(
|
||||
"workflow_run_not_found", "Workflow run not found."
|
||||
)
|
||||
gen = ctrl._stream_inspector_events(app_model, run_id)
|
||||
with pytest.raises(ctrl._InspectorNotFound) as exc:
|
||||
next(gen)
|
||||
assert exc.value.error_code == "workflow_run_not_found"
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# _stream_inspector_events — live path (run is running)
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_stream_live_emits_snapshot_then_node_changed_then_completion(
|
||||
patch_service, patch_subscribe, app_model, run_id
|
||||
):
|
||||
"""Happy path: snapshot → 2× node_changed → workflow_run_completed."""
|
||||
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
|
||||
patch_service.node_detail.return_value = _node_view(node_id="agent-1")
|
||||
|
||||
msgs = [
|
||||
InspectorMessage(kind="node_changed", workflow_run_id=str(run_id), node_id="agent-1", status="running"),
|
||||
InspectorMessage(kind="node_changed", workflow_run_id=str(run_id), node_id="agent-1", status="succeeded"),
|
||||
InspectorMessage(kind="workflow_completed", workflow_run_id=str(run_id), node_id=None, status="succeeded"),
|
||||
]
|
||||
patch_subscribe(msgs)
|
||||
|
||||
events = [_parse(r)[0] for r in _drain(ctrl._stream_inspector_events(app_model, run_id))]
|
||||
assert events == ["snapshot", "node_changed", "node_changed", "workflow_run_completed"]
|
||||
# node_detail should be called once per delta (not once per heartbeat)
|
||||
assert patch_service.node_detail.call_count == 2
|
||||
|
||||
|
||||
def test_stream_emits_heartbeat_after_n_idle_ticks(
|
||||
patch_service, patch_subscribe, monkeypatch: pytest.MonkeyPatch, app_model, run_id
|
||||
):
|
||||
"""When pub/sub returns the heartbeat sentinel ``_HEARTBEAT_EVERY_TICKS``
|
||||
times in a row, the generator emits a ``:keepalive`` SSE comment."""
|
||||
monkeypatch.setattr(ctrl, "_HEARTBEAT_EVERY_TICKS", 2)
|
||||
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
|
||||
patch_service.node_detail.return_value = _node_view()
|
||||
|
||||
# 2 heartbeats → keepalive, then real message + completion.
|
||||
patch_subscribe(
|
||||
[
|
||||
None,
|
||||
None,
|
||||
InspectorMessage(kind="workflow_completed", workflow_run_id=str(run_id), node_id=None, status="failed"),
|
||||
]
|
||||
)
|
||||
records = _drain(ctrl._stream_inspector_events(app_model, run_id))
|
||||
raw = "".join(records)
|
||||
assert ":keepalive\n\n" in raw
|
||||
assert "workflow_run_completed" in raw
|
||||
|
||||
|
||||
def test_stream_hard_timeout_force_closes_without_terminal(
|
||||
patch_service, patch_subscribe, monkeypatch: pytest.MonkeyPatch, app_model, run_id
|
||||
):
|
||||
"""If the engine crashes / drops the terminal event, the generator force-
|
||||
closes after ``_STREAM_HARD_TIMEOUT_TICKS`` ticks rather than hanging."""
|
||||
monkeypatch.setattr(ctrl, "_STREAM_HARD_TIMEOUT_TICKS", 3)
|
||||
monkeypatch.setattr(ctrl, "_HEARTBEAT_EVERY_TICKS", 100) # avoid keepalive noise
|
||||
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
|
||||
|
||||
# 5 heartbeats, no terminal → generator should bail after 3 ticks.
|
||||
patch_subscribe([None] * 10)
|
||||
records = _drain(ctrl._stream_inspector_events(app_model, run_id))
|
||||
events = [_parse(r)[0] for r in records]
|
||||
assert events == ["snapshot"] # only snapshot, then forced close
|
||||
|
||||
|
||||
def test_stream_skips_messages_with_missing_node_id(patch_service, patch_subscribe, app_model, run_id):
|
||||
"""Defensive: malformed node_changed without node_id is silently dropped."""
|
||||
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
|
||||
patch_subscribe(
|
||||
[
|
||||
InspectorMessage(kind="node_changed", workflow_run_id=str(run_id), node_id="", status="running"),
|
||||
InspectorMessage(kind="workflow_completed", workflow_run_id=str(run_id), node_id=None, status="succeeded"),
|
||||
]
|
||||
)
|
||||
events = [_parse(r)[0] for r in _drain(ctrl._stream_inspector_events(app_model, run_id))]
|
||||
assert events == ["snapshot", "workflow_run_completed"]
|
||||
assert patch_service.node_detail.call_count == 0
|
||||
|
||||
|
||||
def test_stream_skips_node_detail_404_without_breaking_stream(patch_service, patch_subscribe, app_model, run_id):
|
||||
"""When node_detail 404s mid-stream (node still being persisted), the
|
||||
generator just drops that delta and keeps streaming."""
|
||||
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
|
||||
patch_service.node_detail.side_effect = NodeOutputInspectorError("node_not_in_workflow_run", "transient")
|
||||
patch_subscribe(
|
||||
[
|
||||
InspectorMessage(kind="node_changed", workflow_run_id=str(run_id), node_id="agent-1", status="running"),
|
||||
InspectorMessage(kind="workflow_completed", workflow_run_id=str(run_id), node_id=None, status="succeeded"),
|
||||
]
|
||||
)
|
||||
events = [_parse(r)[0] for r in _drain(ctrl._stream_inspector_events(app_model, run_id))]
|
||||
assert events == ["snapshot", "workflow_run_completed"]
|
||||
|
||||
|
||||
def test_stream_emits_error_event_on_node_detail_unexpected_exception(
|
||||
patch_service, patch_subscribe, app_model, run_id
|
||||
):
|
||||
"""Any non-Inspector exception (DB outage, JSON decode error) becomes a
|
||||
user-visible ``error`` SSE record; the stream keeps running."""
|
||||
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
|
||||
patch_service.node_detail.side_effect = RuntimeError("db gone")
|
||||
patch_subscribe(
|
||||
[
|
||||
InspectorMessage(kind="node_changed", workflow_run_id=str(run_id), node_id="agent-1", status="running"),
|
||||
InspectorMessage(kind="workflow_completed", workflow_run_id=str(run_id), node_id=None, status="succeeded"),
|
||||
]
|
||||
)
|
||||
records = _drain(ctrl._stream_inspector_events(app_model, run_id))
|
||||
events = [_parse(r) for r in records]
|
||||
kinds = [e for e, _ in events]
|
||||
assert kinds == ["snapshot", "error", "workflow_run_completed"]
|
||||
err_event, err_data = events[1]
|
||||
assert err_data is not None
|
||||
assert err_data["node_id"] == "agent-1"
|
||||
assert "failed" in err_data["message"]
|
||||
|
||||
|
||||
def test_stream_workflow_completed_status_falls_back_to_unknown(patch_service, patch_subscribe, app_model, run_id):
|
||||
"""If the pub/sub message arrives with status=None (publish race), the SSE
|
||||
payload still carries ``workflow_run_status`` with the ``unknown``
|
||||
sentinel so the frontend never sees a missing field."""
|
||||
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
|
||||
patch_subscribe(
|
||||
[InspectorMessage(kind="workflow_completed", workflow_run_id=str(run_id), node_id=None, status=None)]
|
||||
)
|
||||
records = _drain(ctrl._stream_inspector_events(app_model, run_id))
|
||||
e, d = _parse(records[-1])
|
||||
assert e == "workflow_run_completed"
|
||||
assert d is not None
|
||||
assert d["workflow_run_status"] == "unknown"
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Resource classes — import-level smoke + service-method delegation
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_resource_classes_are_registered():
|
||||
"""All 8 Inspector Resource classes must be importable from the module so
|
||||
flask-restx can discover them via the namespace decorators."""
|
||||
for name in (
|
||||
"WorkflowDraftRunNodeOutputsApi",
|
||||
"WorkflowDraftRunNodeOutputDetailApi",
|
||||
"WorkflowDraftRunNodeOutputPreviewApi",
|
||||
"WorkflowDraftRunNodeOutputEventsApi",
|
||||
"WorkflowPublishedRunNodeOutputsApi",
|
||||
"WorkflowPublishedRunNodeOutputDetailApi",
|
||||
"WorkflowPublishedRunNodeOutputPreviewApi",
|
||||
"WorkflowPublishedRunNodeOutputEventsApi",
|
||||
):
|
||||
assert hasattr(ctrl, name), name
|
||||
|
||||
|
||||
def test_inspector_not_found_preserves_error_code():
|
||||
"""Sanity: the controller's bespoke 404 wrapper hangs onto the
|
||||
Inspector's specific error code rather than collapsing to a generic
|
||||
``not_found``."""
|
||||
err = NodeOutputInspectorError("node_not_in_workflow_run", "boom")
|
||||
wrapped = ctrl._InspectorNotFound(err)
|
||||
assert wrapped.error_code == "node_not_in_workflow_run"
|
||||
assert wrapped.code == 404
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# _serve_* — shared REST handler bodies (covered by both draft + published)
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_serve_snapshot_happy_path(patch_service, app_model, run_id):
|
||||
"""Returns the snapshot view as JSON-serialisable dict."""
|
||||
patch_service.snapshot_workflow_run.return_value = _snapshot_view(status="running")
|
||||
result = ctrl._serve_snapshot(app_model, run_id)
|
||||
assert isinstance(result, dict)
|
||||
assert result["workflow_run_id"] == "00000000-0000-0000-0000-0000000000aa"
|
||||
patch_service.snapshot_workflow_run.assert_called_once_with(app_model=app_model, workflow_run_id=str(run_id))
|
||||
|
||||
|
||||
def test_serve_snapshot_translates_inspector_error_to_404(patch_service, app_model, run_id):
|
||||
"""``NodeOutputInspectorError`` becomes the controller's 404 wrapper with
|
||||
the specific ``error_code`` preserved."""
|
||||
patch_service.snapshot_workflow_run.side_effect = NodeOutputInspectorError("workflow_run_not_found", "no such run")
|
||||
with pytest.raises(ctrl._InspectorNotFound) as exc:
|
||||
ctrl._serve_snapshot(app_model, run_id)
|
||||
assert exc.value.error_code == "workflow_run_not_found"
|
||||
|
||||
|
||||
def test_serve_node_detail_happy_path(patch_service, app_model, run_id):
|
||||
patch_service.node_detail.return_value = _node_view(node_id="agent-1")
|
||||
result = ctrl._serve_node_detail(app_model, run_id, "agent-1")
|
||||
assert result["node_id"] == "agent-1"
|
||||
patch_service.node_detail.assert_called_once_with(
|
||||
app_model=app_model, workflow_run_id=str(run_id), node_id="agent-1"
|
||||
)
|
||||
|
||||
|
||||
def test_serve_node_detail_translates_inspector_error(patch_service, app_model, run_id):
|
||||
patch_service.node_detail.side_effect = NodeOutputInspectorError("node_not_in_workflow_run", "missing")
|
||||
with pytest.raises(ctrl._InspectorNotFound) as exc:
|
||||
ctrl._serve_node_detail(app_model, run_id, "ghost")
|
||||
assert exc.value.error_code == "node_not_in_workflow_run"
|
||||
|
||||
|
||||
def test_serve_output_preview_happy_path(patch_service, app_model, run_id):
|
||||
from services.workflow.node_output_inspector_service import (
|
||||
DeclaredOutputType,
|
||||
OutputPreviewView,
|
||||
)
|
||||
|
||||
patch_service.output_preview.return_value = OutputPreviewView(
|
||||
node_id="agent-1",
|
||||
output_name="text",
|
||||
type=DeclaredOutputType.STRING,
|
||||
status=NodeOutputStatus.READY,
|
||||
value="Hello",
|
||||
)
|
||||
result = ctrl._serve_output_preview(app_model, run_id, "agent-1", "text")
|
||||
assert result["value"] == "Hello"
|
||||
assert result["status"] == "ready"
|
||||
patch_service.output_preview.assert_called_once_with(
|
||||
app_model=app_model,
|
||||
workflow_run_id=str(run_id),
|
||||
node_id="agent-1",
|
||||
output_name="text",
|
||||
)
|
||||
|
||||
|
||||
def test_serve_output_preview_translates_inspector_error(patch_service, app_model, run_id):
|
||||
patch_service.output_preview.side_effect = NodeOutputInspectorError("node_output_not_declared", "no such output")
|
||||
with pytest.raises(ctrl._InspectorNotFound) as exc:
|
||||
ctrl._serve_output_preview(app_model, run_id, "agent-1", "phantom")
|
||||
assert exc.value.error_code == "node_output_not_declared"
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Note: the Resource ``.get`` methods themselves (6 REST + 2 SSE) are
|
||||
# 1-line delegators to the helpers above. They can't be called directly in a
|
||||
# unit test because their decorator stack (``@setup_required`` /
|
||||
# ``@login_required`` / ``@account_initialization_required`` /
|
||||
# ``@get_app_model``) needs a real Flask request context + DB-backed account.
|
||||
# The integration test in
|
||||
# ``tests/integration_tests/services/test_node_output_inspector_service.py``
|
||||
# (and the E2E driver in /tmp/e2e_inspector_sse_published.py) exercise them
|
||||
# through the HTTP stack.
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
@ -0,0 +1,192 @@
|
||||
"""Verify the workflow persistence layer fans Inspector deltas to redis pub/sub.
|
||||
|
||||
The hook lives in ``core/app/workflow/layers/persistence.py``:
|
||||
every ``_handle_node_*`` and the terminal ``_handle_graph_run_*`` handlers
|
||||
call into ``services.workflow.inspector_events.publish_node_changed`` /
|
||||
``publish_workflow_completed`` after the DB write succeeds. Those calls are
|
||||
the only thing the Inspector SSE stream listens to, so any future refactor of
|
||||
the persistence layer must keep them in place.
|
||||
|
||||
We don't reconstruct a full workflow engine here — the handlers are tested
|
||||
in isolation by patching just the moving parts they touch
|
||||
(``_workflow_execution`` + ``_node_execution_cache``) and asserting against
|
||||
the publisher module's call sites. This keeps the test compact and tied to
|
||||
the contract, not the implementation.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from core.app.workflow.layers import persistence as persistence_mod
|
||||
from core.app.workflow.layers.persistence import WorkflowPersistenceLayer
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def layer() -> WorkflowPersistenceLayer:
|
||||
"""Build a layer instance with all repository / trace deps stubbed.
|
||||
|
||||
We bypass ``__init__`` because constructing it for real pulls in the
|
||||
workflow engine's app-generate-entity, repos, and a runtime state — none
|
||||
of which matter for asserting that the publish-hook fires.
|
||||
"""
|
||||
instance = WorkflowPersistenceLayer.__new__(WorkflowPersistenceLayer)
|
||||
# Minimum surface the handlers touch:
|
||||
instance._workflow_execution_repository = MagicMock()
|
||||
instance._workflow_node_execution_repository = MagicMock()
|
||||
instance._trace_manager = None
|
||||
instance._workflow_info = MagicMock(workflow_id="wf-1")
|
||||
instance._application_generate_entity = MagicMock()
|
||||
# Use a SimpleNamespace-like spec so Pydantic-validated callsites (e.g.
|
||||
# ``WorkflowNodeExecution.new`` requires real strings) get the right types.
|
||||
workflow_execution = MagicMock()
|
||||
workflow_execution.id_ = "run-1"
|
||||
workflow_execution.workflow_id = "wf-1"
|
||||
workflow_execution.status = MagicMock(value="succeeded")
|
||||
workflow_execution.outputs = {}
|
||||
workflow_execution.error_message = None
|
||||
workflow_execution.exceptions_count = 0
|
||||
workflow_execution.finished_at = None
|
||||
instance._workflow_execution = workflow_execution
|
||||
instance._node_execution_cache = {}
|
||||
instance._node_snapshots = {}
|
||||
instance._node_sequence = 0
|
||||
# `graph_runtime_state` is a layer-base property; stub it.
|
||||
instance._graph_runtime_state = MagicMock(total_tokens=0, node_run_steps=0, outputs={}, exceptions_count=0)
|
||||
return instance
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def capture_publishes(monkeypatch: pytest.MonkeyPatch) -> dict[str, list]:
|
||||
"""Replace the two publishers with capture lists so each test can assert
|
||||
on the exact arguments."""
|
||||
calls: dict[str, list] = {"node": [], "workflow": []}
|
||||
|
||||
def fake_node(*, workflow_run_id: str, node_id: str, status: str) -> None:
|
||||
calls["node"].append({"workflow_run_id": workflow_run_id, "node_id": node_id, "status": status})
|
||||
|
||||
def fake_workflow(*, workflow_run_id: str, status: str) -> None:
|
||||
calls["workflow"].append({"workflow_run_id": workflow_run_id, "status": status})
|
||||
|
||||
monkeypatch.setattr(persistence_mod, "_inspector_publish_node_changed", fake_node)
|
||||
monkeypatch.setattr(persistence_mod, "_inspector_publish_workflow_completed", fake_workflow)
|
||||
return calls
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Graph-level publish hooks
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _graph_event(**kwargs: Any) -> MagicMock:
|
||||
return MagicMock(**kwargs)
|
||||
|
||||
|
||||
def test_graph_run_succeeded_publishes_workflow_completed(layer, capture_publishes):
|
||||
layer._workflow_execution.status = MagicMock(value="succeeded")
|
||||
layer._handle_graph_run_succeeded(_graph_event(outputs={"text": "hi"}))
|
||||
assert capture_publishes["workflow"] == [{"workflow_run_id": "run-1", "status": "succeeded"}]
|
||||
assert capture_publishes["node"] == []
|
||||
|
||||
|
||||
def test_graph_run_partial_succeeded_publishes_workflow_completed(layer, capture_publishes):
|
||||
layer._workflow_execution.status = MagicMock(value="partial-succeeded")
|
||||
layer._handle_graph_run_partial_succeeded(_graph_event(outputs={}, exceptions_count=1))
|
||||
assert capture_publishes["workflow"] == [{"workflow_run_id": "run-1", "status": "partial-succeeded"}]
|
||||
|
||||
|
||||
def test_graph_run_failed_publishes_workflow_completed(layer, capture_publishes):
|
||||
layer._workflow_execution.status = MagicMock(value="failed")
|
||||
layer._handle_graph_run_failed(_graph_event(error="boom", exceptions_count=0))
|
||||
assert capture_publishes["workflow"] == [{"workflow_run_id": "run-1", "status": "failed"}]
|
||||
|
||||
|
||||
def test_graph_run_aborted_publishes_workflow_completed(layer, capture_publishes):
|
||||
layer._workflow_execution.status = MagicMock(value="stopped")
|
||||
layer._handle_graph_run_aborted(_graph_event(reason="user stop"))
|
||||
assert capture_publishes["workflow"] == [{"workflow_run_id": "run-1", "status": "stopped"}]
|
||||
|
||||
|
||||
def test_graph_run_paused_does_not_publish_completion(layer, capture_publishes):
|
||||
"""Pause is not a terminal state — the Inspector keeps waiting for either
|
||||
resume or a real terminal event."""
|
||||
layer._handle_graph_run_paused(_graph_event(outputs={}))
|
||||
assert capture_publishes["workflow"] == []
|
||||
assert capture_publishes["node"] == []
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Node-level publish hooks
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _node_started_event(node_id: str = "agent-1", exec_id: str = "exec-1") -> MagicMock:
|
||||
return MagicMock(
|
||||
id=exec_id,
|
||||
node_id=node_id,
|
||||
node_type="agent",
|
||||
node_title="Greeter",
|
||||
predecessor_node_id=None,
|
||||
in_iteration_id=None,
|
||||
in_loop_id=None,
|
||||
start_at=datetime(2026, 5, 26, 0, 0, 0),
|
||||
)
|
||||
|
||||
|
||||
def _seed_node_execution(layer: WorkflowPersistenceLayer, exec_id: str, node_id: str) -> None:
|
||||
"""Inject a domain execution into the cache so the success / fail / etc
|
||||
handlers (which look it up by id) can run without going through started."""
|
||||
layer._node_execution_cache[exec_id] = MagicMock(
|
||||
id=exec_id, node_id=node_id, status=MagicMock(value="running"), outputs={}, error=None
|
||||
)
|
||||
|
||||
|
||||
def test_node_started_publishes_running(layer, capture_publishes):
|
||||
layer._handle_node_started(_node_started_event())
|
||||
assert capture_publishes["node"] == [{"workflow_run_id": "run-1", "node_id": "agent-1", "status": "running"}]
|
||||
|
||||
|
||||
def test_node_retry_publishes_retry(layer, capture_publishes):
|
||||
_seed_node_execution(layer, exec_id="exec-1", node_id="agent-1")
|
||||
event = MagicMock(id="exec-1", error="rate limit")
|
||||
layer._handle_node_retry(event)
|
||||
assert capture_publishes["node"] == [{"workflow_run_id": "run-1", "node_id": "agent-1", "status": "retry"}]
|
||||
|
||||
|
||||
def test_node_succeeded_publishes_succeeded(layer, capture_publishes, monkeypatch: pytest.MonkeyPatch):
|
||||
_seed_node_execution(layer, exec_id="exec-1", node_id="agent-1")
|
||||
# Stub the inner _update_node_execution so we don't have to construct a
|
||||
# full NodeRunResult — we only want to confirm the publish happens after.
|
||||
monkeypatch.setattr(layer, "_update_node_execution", lambda *a, **kw: None)
|
||||
event = MagicMock(id="exec-1", node_run_result=MagicMock(), finished_at=datetime.now())
|
||||
layer._handle_node_succeeded(event)
|
||||
assert capture_publishes["node"] == [{"workflow_run_id": "run-1", "node_id": "agent-1", "status": "succeeded"}]
|
||||
|
||||
|
||||
def test_node_failed_publishes_failed(layer, capture_publishes, monkeypatch: pytest.MonkeyPatch):
|
||||
_seed_node_execution(layer, exec_id="exec-1", node_id="agent-1")
|
||||
monkeypatch.setattr(layer, "_update_node_execution", lambda *a, **kw: None)
|
||||
event = MagicMock(id="exec-1", node_run_result=MagicMock(), error="bad", finished_at=datetime.now())
|
||||
layer._handle_node_failed(event)
|
||||
assert capture_publishes["node"] == [{"workflow_run_id": "run-1", "node_id": "agent-1", "status": "failed"}]
|
||||
|
||||
|
||||
def test_node_exception_publishes_exception(layer, capture_publishes, monkeypatch: pytest.MonkeyPatch):
|
||||
_seed_node_execution(layer, exec_id="exec-1", node_id="agent-1")
|
||||
monkeypatch.setattr(layer, "_update_node_execution", lambda *a, **kw: None)
|
||||
event = MagicMock(id="exec-1", node_run_result=MagicMock(), error="oom", finished_at=datetime.now())
|
||||
layer._handle_node_exception(event)
|
||||
assert capture_publishes["node"] == [{"workflow_run_id": "run-1", "node_id": "agent-1", "status": "exception"}]
|
||||
|
||||
|
||||
def test_node_pause_requested_does_not_publish(layer, capture_publishes, monkeypatch: pytest.MonkeyPatch):
|
||||
"""Node pause is not an Inspector-visible state — no publish."""
|
||||
_seed_node_execution(layer, exec_id="exec-1", node_id="agent-1")
|
||||
monkeypatch.setattr(layer, "_update_node_execution", lambda *a, **kw: None)
|
||||
event = MagicMock(id="exec-1", node_run_result=MagicMock())
|
||||
layer._handle_node_pause_requested(event)
|
||||
assert capture_publishes["node"] == []
|
||||
@ -0,0 +1,439 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Generator
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from core.agent.entities import AgentToolEntity
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from core.tools.__base.tool import Tool
|
||||
from core.tools.__base.tool_runtime import ToolRuntime
|
||||
from core.tools.entities.common_entities import I18nObject
|
||||
from core.tools.entities.tool_entities import (
|
||||
ToolDescription,
|
||||
ToolEntity,
|
||||
ToolIdentity,
|
||||
ToolInvokeMessage,
|
||||
ToolParameter,
|
||||
)
|
||||
from core.workflow.nodes.agent_v2.plugin_tools_builder import (
|
||||
WorkflowAgentPluginToolsBuilder,
|
||||
WorkflowAgentPluginToolsBuildError,
|
||||
)
|
||||
from models.agent_config_entities import AgentSoulToolsConfig
|
||||
|
||||
|
||||
class FakeRuntimeProvider:
|
||||
def __init__(self, tool: Tool | Exception) -> None:
|
||||
# Either a Tool to hand back, or an exception to raise on lookup. The
|
||||
# latter lets tests exercise the error-mapping branches in
|
||||
# ``WorkflowAgentPluginToolsBuilder._fetch_tool_runtime``.
|
||||
self.tool = tool
|
||||
self.last_agent_tool: AgentToolEntity | None = None
|
||||
self.last_invoke_from: InvokeFrom | None = None
|
||||
|
||||
def get_agent_tool_runtime(
|
||||
self,
|
||||
tenant_id: str,
|
||||
app_id: str,
|
||||
agent_tool: AgentToolEntity,
|
||||
user_id: str | None = None,
|
||||
invoke_from: InvokeFrom = InvokeFrom.DEBUGGER,
|
||||
variable_pool: Any | None = None,
|
||||
) -> Tool:
|
||||
self.last_agent_tool = agent_tool
|
||||
self.last_invoke_from = invoke_from
|
||||
if isinstance(self.tool, Exception):
|
||||
raise self.tool
|
||||
return self.tool
|
||||
|
||||
|
||||
class FakeTool(Tool):
|
||||
def tool_provider_type(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def _invoke(
|
||||
self,
|
||||
user_id: str,
|
||||
tool_parameters: dict[str, Any],
|
||||
conversation_id: str | None = None,
|
||||
app_id: str | None = None,
|
||||
message_id: str | None = None,
|
||||
) -> ToolInvokeMessage | list[ToolInvokeMessage] | Generator[ToolInvokeMessage, None, None]:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
def _tool(*, runtime_parameters: dict[str, Any] | None = None) -> FakeTool:
|
||||
if runtime_parameters is None:
|
||||
runtime_parameters = {"region": "us"}
|
||||
parameters = [
|
||||
ToolParameter(
|
||||
name="query",
|
||||
label=I18nObject(en_US="Query"),
|
||||
type=ToolParameter.ToolParameterType.STRING,
|
||||
form=ToolParameter.ToolParameterForm.LLM,
|
||||
required=True,
|
||||
llm_description="Search query",
|
||||
),
|
||||
ToolParameter(
|
||||
name="region",
|
||||
label=I18nObject(en_US="Region"),
|
||||
type=ToolParameter.ToolParameterType.STRING,
|
||||
form=ToolParameter.ToolParameterForm.FORM,
|
||||
required=True,
|
||||
),
|
||||
]
|
||||
entity = ToolEntity(
|
||||
identity=ToolIdentity(
|
||||
author="langgenius",
|
||||
name="search",
|
||||
label=I18nObject(en_US="Search"),
|
||||
provider="search",
|
||||
),
|
||||
description=ToolDescription(human=I18nObject(en_US="Search"), llm="Search the web."),
|
||||
parameters=parameters,
|
||||
)
|
||||
runtime = ToolRuntime(
|
||||
tenant_id="tenant-1",
|
||||
user_id="user-1",
|
||||
credentials={"api_key": "secret"},
|
||||
runtime_parameters=runtime_parameters,
|
||||
)
|
||||
return FakeTool(entity=entity, runtime=runtime)
|
||||
|
||||
|
||||
def _build(
|
||||
builder: WorkflowAgentPluginToolsBuilder,
|
||||
tools: AgentSoulToolsConfig,
|
||||
*,
|
||||
invoke_from: InvokeFrom = InvokeFrom.DEBUGGER,
|
||||
):
|
||||
"""Shorthand for ``builder.build(...)`` with the standard tenant/app/user
|
||||
triple, so each test only highlights what's actually unique to it."""
|
||||
return builder.build(
|
||||
tenant_id="tenant-1",
|
||||
app_id="app-1",
|
||||
user_id="user-1",
|
||||
tools=tools,
|
||||
invoke_from=invoke_from,
|
||||
)
|
||||
|
||||
|
||||
def test_builds_dify_plugin_tools_layer_from_existing_tool_runtime():
|
||||
runtime_provider = FakeRuntimeProvider(_tool())
|
||||
builder = WorkflowAgentPluginToolsBuilder(tool_runtime_provider=runtime_provider)
|
||||
tools = AgentSoulToolsConfig.model_validate(
|
||||
{
|
||||
"dify_tools": [
|
||||
{
|
||||
"provider_id": "langgenius/search/search",
|
||||
"tool_name": "search",
|
||||
"credential_type": "api-key",
|
||||
"credential_id": "credential-1",
|
||||
"runtime_parameters": {"region": "us"},
|
||||
}
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
result = _build(builder, tools)
|
||||
|
||||
assert result is not None
|
||||
prepared = result.tools[0]
|
||||
assert prepared.plugin_id == "langgenius/search"
|
||||
assert prepared.provider == "search"
|
||||
assert prepared.tool_name == "search"
|
||||
assert prepared.name == "search"
|
||||
assert prepared.credentials == {"api_key": "secret"}
|
||||
assert prepared.runtime_parameters == {"region": "us"}
|
||||
assert prepared.parameters_json_schema["properties"]["query"]["type"] == "string"
|
||||
assert "region" not in prepared.parameters_json_schema["properties"]
|
||||
assert runtime_provider.last_agent_tool is not None
|
||||
assert runtime_provider.last_agent_tool.credential_id == "credential-1"
|
||||
# Default ``provider_type`` is now ``"plugin"`` — the agent tool entity
|
||||
# must surface that so ToolManager hits the plugin provider table, not the
|
||||
# built-in legacy table.
|
||||
assert runtime_provider.last_agent_tool.provider_type.value == "plugin"
|
||||
|
||||
|
||||
def test_rejects_duplicate_exposed_tool_names():
|
||||
builder = WorkflowAgentPluginToolsBuilder(tool_runtime_provider=FakeRuntimeProvider(_tool()))
|
||||
tools = AgentSoulToolsConfig.model_validate(
|
||||
{
|
||||
"dify_tools": [
|
||||
{
|
||||
"provider_id": "langgenius/search/search",
|
||||
"tool_name": "search",
|
||||
"credential_type": "api-key",
|
||||
"credential_id": "credential-1",
|
||||
"runtime_parameters": {"region": "us"},
|
||||
},
|
||||
{
|
||||
"provider_id": "langgenius/search/search",
|
||||
"tool_name": "search",
|
||||
"credential_type": "api-key",
|
||||
"credential_id": "credential-1",
|
||||
"runtime_parameters": {"region": "us"},
|
||||
},
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
with pytest.raises(WorkflowAgentPluginToolsBuildError) as exc_info:
|
||||
_build(builder, tools)
|
||||
|
||||
assert exc_info.value.error_code == "agent_tool_name_duplicated"
|
||||
|
||||
|
||||
def test_rejects_missing_required_runtime_parameter():
|
||||
builder = WorkflowAgentPluginToolsBuilder(tool_runtime_provider=FakeRuntimeProvider(_tool(runtime_parameters={})))
|
||||
tools = AgentSoulToolsConfig.model_validate(
|
||||
{
|
||||
"dify_tools": [
|
||||
{
|
||||
"provider_id": "langgenius/search/search",
|
||||
"tool_name": "search",
|
||||
"credential_type": "api-key",
|
||||
"credential_id": "credential-1",
|
||||
}
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
with pytest.raises(WorkflowAgentPluginToolsBuildError) as exc_info:
|
||||
_build(builder, tools)
|
||||
|
||||
assert exc_info.value.error_code == "agent_tool_runtime_parameter_missing"
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# invoke_from is threaded through to ToolManager
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_invoke_from_is_forwarded_to_tool_runtime_provider():
|
||||
"""``WorkflowAgentRuntimeRequestBuilder`` passes the *real* runtime
|
||||
invocation source (DEBUGGER for draft test run, SERVICE_API for published
|
||||
run, etc.). ToolManager uses ``invoke_from`` for credential quotas / rate
|
||||
limits / audit tags, so any default-falling-back here would silently
|
||||
misattribute usage. Lock in the forwarding behaviour for both
|
||||
representative invoke_from values."""
|
||||
for invoke_from in (InvokeFrom.DEBUGGER, InvokeFrom.SERVICE_API, InvokeFrom.WEB_APP):
|
||||
runtime_provider = FakeRuntimeProvider(_tool())
|
||||
builder = WorkflowAgentPluginToolsBuilder(tool_runtime_provider=runtime_provider)
|
||||
tools = AgentSoulToolsConfig.model_validate(
|
||||
{
|
||||
"dify_tools": [
|
||||
{
|
||||
"provider_id": "langgenius/search/search",
|
||||
"tool_name": "search",
|
||||
"credential_type": "api-key",
|
||||
"credential_id": "credential-1",
|
||||
"runtime_parameters": {"region": "us"},
|
||||
}
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
_build(builder, tools, invoke_from=invoke_from)
|
||||
|
||||
assert runtime_provider.last_invoke_from == invoke_from
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# disabled tools / plugin_id+provider fallback / unauthorized credentials
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_disabled_tools_are_skipped():
|
||||
runtime_provider = FakeRuntimeProvider(_tool())
|
||||
builder = WorkflowAgentPluginToolsBuilder(tool_runtime_provider=runtime_provider)
|
||||
tools = AgentSoulToolsConfig.model_validate(
|
||||
{
|
||||
"dify_tools": [
|
||||
{
|
||||
"provider_id": "langgenius/search/search",
|
||||
"tool_name": "search",
|
||||
"credential_type": "api-key",
|
||||
"credential_id": "credential-1",
|
||||
"runtime_parameters": {"region": "us"},
|
||||
"enabled": False,
|
||||
}
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
# All entries are disabled → builder short-circuits and returns None so the
|
||||
# request_builder skips adding the tools layer entirely.
|
||||
assert _build(builder, tools) is None
|
||||
assert runtime_provider.last_agent_tool is None # ToolManager never queried
|
||||
|
||||
|
||||
def test_plugin_id_plus_provider_fallback_when_provider_id_missing():
|
||||
"""Frontend may send ``plugin_id`` + ``provider`` instead of the
|
||||
concatenated ``provider_id``; the builder must accept both shapes."""
|
||||
runtime_provider = FakeRuntimeProvider(_tool())
|
||||
builder = WorkflowAgentPluginToolsBuilder(tool_runtime_provider=runtime_provider)
|
||||
tools = AgentSoulToolsConfig.model_validate(
|
||||
{
|
||||
"dify_tools": [
|
||||
{
|
||||
"plugin_id": "langgenius/search",
|
||||
"provider": "search",
|
||||
"tool_name": "search",
|
||||
"credential_type": "api-key",
|
||||
"credential_id": "credential-1",
|
||||
"runtime_parameters": {"region": "us"},
|
||||
}
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
result = _build(builder, tools)
|
||||
|
||||
assert result is not None
|
||||
assert runtime_provider.last_agent_tool is not None
|
||||
assert runtime_provider.last_agent_tool.provider_id == "langgenius/search/search"
|
||||
assert result.tools[0].plugin_id == "langgenius/search"
|
||||
assert result.tools[0].provider == "search"
|
||||
|
||||
|
||||
def test_unauthorized_tool_without_credentials():
|
||||
"""``credential_type=unauthorized`` removes the ``credential_ref.id``
|
||||
requirement (e.g. public Wikipedia / current_time tools)."""
|
||||
|
||||
def _no_credentials_tool() -> FakeTool:
|
||||
tool = _tool()
|
||||
assert tool.runtime is not None
|
||||
tool.runtime.credentials = {}
|
||||
return tool
|
||||
|
||||
runtime_provider = FakeRuntimeProvider(_no_credentials_tool())
|
||||
builder = WorkflowAgentPluginToolsBuilder(tool_runtime_provider=runtime_provider)
|
||||
tools = AgentSoulToolsConfig.model_validate(
|
||||
{
|
||||
"dify_tools": [
|
||||
{
|
||||
"provider_id": "langgenius/time/time",
|
||||
"tool_name": "current_time",
|
||||
"credential_type": "unauthorized",
|
||||
"runtime_parameters": {"region": "us"},
|
||||
}
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
result = _build(builder, tools)
|
||||
assert result is not None
|
||||
assert result.tools[0].credential_type == "unauthorized"
|
||||
assert result.tools[0].credentials == {}
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Error-code mapping: declaration not found / credential invalid / config
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _standard_tools_payload() -> AgentSoulToolsConfig:
|
||||
return AgentSoulToolsConfig.model_validate(
|
||||
{
|
||||
"dify_tools": [
|
||||
{
|
||||
"provider_id": "langgenius/search/search",
|
||||
"tool_name": "search",
|
||||
"credential_type": "api-key",
|
||||
"credential_id": "credential-1",
|
||||
"runtime_parameters": {"region": "us"},
|
||||
}
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def test_tool_provider_not_found_maps_to_declaration_not_found():
|
||||
from core.tools.errors import ToolProviderNotFoundError
|
||||
|
||||
builder = WorkflowAgentPluginToolsBuilder(
|
||||
tool_runtime_provider=FakeRuntimeProvider(ToolProviderNotFoundError("provider gone"))
|
||||
)
|
||||
with pytest.raises(WorkflowAgentPluginToolsBuildError) as exc_info:
|
||||
_build(builder, _standard_tools_payload())
|
||||
assert exc_info.value.error_code == "agent_tool_declaration_not_found"
|
||||
|
||||
|
||||
def test_credential_validation_error_maps_to_credential_invalid():
|
||||
from core.tools.errors import ToolProviderCredentialValidationError
|
||||
|
||||
builder = WorkflowAgentPluginToolsBuilder(
|
||||
tool_runtime_provider=FakeRuntimeProvider(ToolProviderCredentialValidationError("creds expired"))
|
||||
)
|
||||
with pytest.raises(WorkflowAgentPluginToolsBuildError) as exc_info:
|
||||
_build(builder, _standard_tools_payload())
|
||||
assert exc_info.value.error_code == "agent_tool_credential_invalid"
|
||||
|
||||
|
||||
def test_generic_value_error_maps_to_config_invalid():
|
||||
"""Bare ``ValueError`` from ToolManager (e.g. "runtime not found") becomes
|
||||
``agent_tool_config_invalid`` — distinct from
|
||||
``agent_tool_declaration_not_found`` so callers can render a different
|
||||
hint."""
|
||||
builder = WorkflowAgentPluginToolsBuilder(tool_runtime_provider=FakeRuntimeProvider(ValueError("runtime missing")))
|
||||
with pytest.raises(WorkflowAgentPluginToolsBuildError) as exc_info:
|
||||
_build(builder, _standard_tools_payload())
|
||||
assert exc_info.value.error_code == "agent_tool_config_invalid"
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Non-scalar credentials rejected instead of silently str()'d
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_rejects_non_scalar_credential_value():
|
||||
"""If a credential ever shows up shaped like ``{"access_token": "..."}``,
|
||||
``str(value)`` would forward a Python repr to the plugin daemon. The
|
||||
builder should refuse and surface an explicit error code so an operator
|
||||
fixes the credential schema instead of debugging a daemon JSON parse
|
||||
failure."""
|
||||
|
||||
def _dict_credential_tool() -> FakeTool:
|
||||
tool = _tool()
|
||||
assert tool.runtime is not None
|
||||
tool.runtime.credentials = {"oauth": {"access_token": "secret", "expires_in": 3600}}
|
||||
return tool
|
||||
|
||||
builder = WorkflowAgentPluginToolsBuilder(tool_runtime_provider=FakeRuntimeProvider(_dict_credential_tool()))
|
||||
with pytest.raises(WorkflowAgentPluginToolsBuildError) as exc_info:
|
||||
_build(builder, _standard_tools_payload())
|
||||
assert exc_info.value.error_code == "agent_tool_credential_shape_invalid"
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Legacy payload normalization
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_legacy_provider_name_and_tool_parameters_normalized():
|
||||
"""Old Composer save payloads used ``provider_name`` / ``tool_parameters``
|
||||
keys. The ``@model_validator(mode="before")`` on AgentSoulDifyToolConfig
|
||||
rewrites them in-place so reading historical Agent Soul snapshots from the
|
||||
DB still works."""
|
||||
config = AgentSoulToolsConfig.model_validate(
|
||||
{
|
||||
"dify_tools": [
|
||||
{
|
||||
"provider_name": "langgenius/search/search",
|
||||
"tool_name": "search",
|
||||
"credential_type": "api-key",
|
||||
"credential_id": "credential-1",
|
||||
"tool_parameters": {"region": "us"},
|
||||
}
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
tool = config.dify_tools[0]
|
||||
assert tool.provider_id == "langgenius/search/search"
|
||||
assert tool.runtime_parameters == {"region": "us"}
|
||||
assert tool.credential_ref is not None
|
||||
assert tool.credential_ref.id == "credential-1"
|
||||
@ -1,8 +1,9 @@
|
||||
from dataclasses import replace
|
||||
|
||||
import pytest
|
||||
from dify_agent.layers.dify_plugin import DifyPluginToolConfig, DifyPluginToolsLayerConfig
|
||||
|
||||
from clients.agent_backend import DIFY_EXECUTION_CONTEXT_LAYER_ID
|
||||
from clients.agent_backend import DIFY_EXECUTION_CONTEXT_LAYER_ID, DIFY_PLUGIN_TOOLS_LAYER_ID
|
||||
from core.app.entities.app_invoke_entities import DifyRunContext, InvokeFrom, UserFrom
|
||||
from core.workflow.nodes.agent_v2.runtime_request_builder import (
|
||||
WorkflowAgentRuntimeBuildContext,
|
||||
@ -26,6 +27,38 @@ class FakeCredentialsProvider:
|
||||
return {"api_key": "secret-key"}
|
||||
|
||||
|
||||
class FakePluginToolsBuilder:
|
||||
def __init__(self) -> None:
|
||||
# Capture the runtime invocation source so tests can assert it was
|
||||
# threaded through from ``DifyRunContext.invoke_from`` rather than
|
||||
# hard-coded to a placeholder like ``VALIDATION``.
|
||||
self.last_invoke_from: InvokeFrom | None = None
|
||||
|
||||
def build(self, *, tenant_id, app_id, user_id, tools, invoke_from):
|
||||
assert tenant_id == "tenant-1"
|
||||
assert app_id == "app-1"
|
||||
assert user_id == "user-1"
|
||||
self.last_invoke_from = invoke_from
|
||||
if not tools.dify_tools:
|
||||
return None
|
||||
return DifyPluginToolsLayerConfig(
|
||||
tools=[
|
||||
DifyPluginToolConfig(
|
||||
plugin_id="langgenius/time",
|
||||
provider="time",
|
||||
tool_name="current_time",
|
||||
credential_type="unauthorized",
|
||||
name="current_time",
|
||||
description="Get current time.",
|
||||
credentials={},
|
||||
runtime_parameters={},
|
||||
parameters=[],
|
||||
parameters_json_schema={"type": "object", "properties": {}, "required": []},
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
class FakeVariablePool:
|
||||
def get(self, selector):
|
||||
if list(selector) == ["sys", "query"]:
|
||||
@ -155,8 +188,60 @@ def test_builds_workflow_run_request_with_file_output_schema_and_reserved_metada
|
||||
assert output_schema["properties"]["confidence"]["type"] == "number"
|
||||
assert output_schema["required"] == ["report"]
|
||||
assert dumped["composition"]["layers"][4]["config"]["model_settings"] == {"temperature": 0.2}
|
||||
assert result.metadata["runtime_support"]["reserved_status"]["tools"] == "reserved_not_executed"
|
||||
assert result.metadata["runtime_support"]["unsupported_runtime_warnings"][0]["section"] == "agent_soul.tools"
|
||||
assert result.metadata["runtime_support"]["reserved_status"]["tools.dify_tools"] == "supported_when_config_valid"
|
||||
assert result.metadata["runtime_support"]["reserved_status"]["tools.cli_tools"] == "reserved_not_executed"
|
||||
warnings = result.metadata["runtime_support"]["unsupported_runtime_warnings"]
|
||||
assert warnings[0]["section"] == "agent_soul.tools.cli_tools"
|
||||
|
||||
|
||||
def test_builds_workflow_run_request_with_dify_plugin_tools_layer():
|
||||
context = _context()
|
||||
snapshot = AgentConfigSnapshot(
|
||||
id="snapshot-1",
|
||||
tenant_id="tenant-1",
|
||||
agent_id="agent-1",
|
||||
version=1,
|
||||
config_snapshot=AgentSoulConfig(
|
||||
prompt={"system_prompt": "You are careful."},
|
||||
model=AgentSoulModelConfig(
|
||||
plugin_id="langgenius/openai",
|
||||
model_provider="openai",
|
||||
model="gpt-test",
|
||||
),
|
||||
tools={
|
||||
"dify_tools": [
|
||||
{
|
||||
"provider_id": "langgenius/time/time",
|
||||
"tool_name": "current_time",
|
||||
"credential_type": "unauthorized",
|
||||
}
|
||||
]
|
||||
},
|
||||
),
|
||||
)
|
||||
context = replace(context, snapshot=snapshot)
|
||||
|
||||
plugin_tools_builder = FakePluginToolsBuilder()
|
||||
result = WorkflowAgentRuntimeRequestBuilder(
|
||||
credentials_provider=FakeCredentialsProvider(),
|
||||
plugin_tools_builder=plugin_tools_builder,
|
||||
).build(context)
|
||||
|
||||
dumped = result.request.model_dump(mode="json")
|
||||
layers = {layer["name"]: layer for layer in dumped["composition"]["layers"]}
|
||||
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID]["type"] == "dify.plugin.tools"
|
||||
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID]["deps"] == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}
|
||||
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID]["config"]["tools"][0]["tool_name"] == "current_time"
|
||||
assert result.metadata["agent_tools"] == {
|
||||
"dify_tool_count": 1,
|
||||
"dify_tool_names": ["current_time"],
|
||||
"cli_tool_count": 0,
|
||||
}
|
||||
# The runtime invocation source must flow from ``DifyRunContext.invoke_from``
|
||||
# into the plugin tools builder so ToolManager attributes credential
|
||||
# quotas / rate limits / audit tags to the real call site instead of a
|
||||
# hard-coded ``VALIDATION`` placeholder.
|
||||
assert plugin_tools_builder.last_invoke_from == context.dify_context.invoke_from
|
||||
|
||||
|
||||
def test_requires_agent_soul_model_config():
|
||||
|
||||
224
api/tests/unit_tests/services/workflow/test_inspector_events.py
Normal file
224
api/tests/unit_tests/services/workflow/test_inspector_events.py
Normal file
@ -0,0 +1,224 @@
|
||||
"""Unit tests for :mod:`services.workflow.inspector_events`.
|
||||
|
||||
The publisher and subscriber both touch redis, so we mock it out at the
|
||||
``redis_client`` boundary. The goal is to lock down:
|
||||
|
||||
1. the channel-naming convention (frontend SSE doesn't need to know it but
|
||||
tests catch accidental renames),
|
||||
2. the JSON envelope (``kind / workflow_run_id / node_id / status``),
|
||||
3. publisher robustness when redis is unavailable,
|
||||
4. subscriber's tolerance of malformed payloads and bytes-vs-str messages,
|
||||
5. subscriber's heartbeat-on-idle behaviour.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from collections.abc import Iterator
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from services.workflow import inspector_events
|
||||
from services.workflow.inspector_events import InspectorMessage
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Channel + envelope
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_channel_for_returns_namespaced_key():
|
||||
assert inspector_events.channel_for("run-42") == "dify:inspector:workflow_run:run-42"
|
||||
|
||||
|
||||
def test_inspector_message_to_json_round_trip():
|
||||
msg = InspectorMessage(kind="node_changed", workflow_run_id="r1", node_id="agent-1", status="succeeded")
|
||||
parsed = json.loads(msg.to_json())
|
||||
assert parsed == {"kind": "node_changed", "workflow_run_id": "r1", "node_id": "agent-1", "status": "succeeded"}
|
||||
|
||||
|
||||
def test_inspector_message_from_json_rejects_bad_kind():
|
||||
blob = json.dumps({"kind": "something_else", "workflow_run_id": "r1"})
|
||||
assert InspectorMessage.from_json(blob) is None
|
||||
|
||||
|
||||
def test_inspector_message_from_json_rejects_bad_workflow_run_id():
|
||||
blob = json.dumps({"kind": "node_changed", "workflow_run_id": ""})
|
||||
assert InspectorMessage.from_json(blob) is None
|
||||
|
||||
|
||||
def test_inspector_message_from_json_rejects_non_string_node_id():
|
||||
blob = json.dumps({"kind": "node_changed", "workflow_run_id": "r1", "node_id": 42})
|
||||
assert InspectorMessage.from_json(blob) is None
|
||||
|
||||
|
||||
def test_inspector_message_from_json_returns_none_for_invalid_json():
|
||||
assert InspectorMessage.from_json("{not json") is None
|
||||
|
||||
|
||||
def test_inspector_message_from_json_rejects_non_dict_payload():
|
||||
"""Defensive: a JSON array or scalar is not an InspectorMessage."""
|
||||
assert InspectorMessage.from_json("[1, 2, 3]") is None
|
||||
assert InspectorMessage.from_json('"plain string"') is None
|
||||
|
||||
|
||||
def test_inspector_message_from_json_rejects_non_string_status():
|
||||
"""Status field, if present, must be a string."""
|
||||
blob = json.dumps({"kind": "workflow_completed", "workflow_run_id": "r1", "status": 42})
|
||||
assert InspectorMessage.from_json(blob) is None
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Publisher
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_publish_node_changed_writes_to_run_channel():
|
||||
fake_redis = MagicMock()
|
||||
with patch.object(inspector_events, "redis_client", fake_redis):
|
||||
inspector_events.publish_node_changed(workflow_run_id="run-1", node_id="agent-1", status="running")
|
||||
|
||||
fake_redis.publish.assert_called_once()
|
||||
channel, blob = fake_redis.publish.call_args.args
|
||||
assert channel == "dify:inspector:workflow_run:run-1"
|
||||
msg = InspectorMessage.from_json(blob)
|
||||
assert msg is not None
|
||||
assert msg.kind == "node_changed"
|
||||
assert msg.node_id == "agent-1"
|
||||
assert msg.status == "running"
|
||||
|
||||
|
||||
def test_publish_workflow_completed_emits_terminal_message():
|
||||
fake_redis = MagicMock()
|
||||
with patch.object(inspector_events, "redis_client", fake_redis):
|
||||
inspector_events.publish_workflow_completed(workflow_run_id="run-1", status="succeeded")
|
||||
|
||||
blob = fake_redis.publish.call_args.args[1]
|
||||
msg = InspectorMessage.from_json(blob)
|
||||
assert msg is not None
|
||||
assert msg.kind == "workflow_completed"
|
||||
assert msg.node_id is None
|
||||
assert msg.status == "succeeded"
|
||||
|
||||
|
||||
def test_publish_swallows_redis_errors():
|
||||
"""Persistence must not crash if redis blows up — we publish best-effort."""
|
||||
|
||||
class _BrokenRedis:
|
||||
def publish(self, *_args: Any, **_kwargs: Any) -> None:
|
||||
raise RuntimeError("redis offline")
|
||||
|
||||
with patch.object(inspector_events, "redis_client", _BrokenRedis()):
|
||||
# No exception should escape.
|
||||
inspector_events.publish_node_changed(workflow_run_id="run-1", node_id="agent-1", status="running")
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Subscriber
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _make_fake_pubsub(messages: list[dict[str, Any] | None]) -> MagicMock:
|
||||
"""Build a redis pubsub stub that replays ``messages`` then raises StopIteration."""
|
||||
pubsub = MagicMock()
|
||||
it: Iterator[dict[str, Any] | None] = iter(messages)
|
||||
pubsub.get_message.side_effect = lambda **_kwargs: next(it, None)
|
||||
return pubsub
|
||||
|
||||
|
||||
def test_subscribe_yields_heartbeat_then_real_message():
|
||||
"""Idle ticks (``get_message`` returns None) surface as a sentinel; real
|
||||
payloads decode to ``InspectorMessage`` instances."""
|
||||
payload = json.dumps(
|
||||
{"kind": "node_changed", "workflow_run_id": "run-1", "node_id": "agent-1", "status": "succeeded"}
|
||||
)
|
||||
fake_redis = MagicMock()
|
||||
fake_redis.pubsub.return_value = _make_fake_pubsub(
|
||||
[
|
||||
None, # heartbeat tick
|
||||
{"data": payload.encode("utf-8")}, # bytes payload, real message
|
||||
None, # heartbeat
|
||||
]
|
||||
)
|
||||
with patch.object(inspector_events, "redis_client", fake_redis):
|
||||
gen = inspector_events.subscribe("run-1", timeout_seconds=0.0)
|
||||
first = next(gen)
|
||||
second = next(gen)
|
||||
third = next(gen)
|
||||
|
||||
# First message is the heartbeat sentinel (both node_id and status are None).
|
||||
assert first.node_id is None
|
||||
assert first.status is None
|
||||
# Second is the real one.
|
||||
assert second.kind == "node_changed"
|
||||
assert second.node_id == "agent-1"
|
||||
assert second.status == "succeeded"
|
||||
# Third is another heartbeat.
|
||||
assert third.node_id is None
|
||||
|
||||
|
||||
def test_subscribe_skips_malformed_payloads():
|
||||
fake_redis = MagicMock()
|
||||
fake_redis.pubsub.return_value = _make_fake_pubsub(
|
||||
[
|
||||
{"data": b"not json at all"},
|
||||
{"data": json.dumps({"kind": "node_changed", "workflow_run_id": "run-1"}).encode("utf-8")},
|
||||
]
|
||||
)
|
||||
with patch.object(inspector_events, "redis_client", fake_redis):
|
||||
gen = inspector_events.subscribe("run-1", timeout_seconds=0.0)
|
||||
msg = next(gen)
|
||||
assert msg.kind == "node_changed"
|
||||
assert msg.node_id is None
|
||||
|
||||
|
||||
def test_subscribe_unsubscribes_on_teardown():
|
||||
fake_pubsub = _make_fake_pubsub([None])
|
||||
fake_redis = MagicMock()
|
||||
fake_redis.pubsub.return_value = fake_pubsub
|
||||
with patch.object(inspector_events, "redis_client", fake_redis):
|
||||
gen = inspector_events.subscribe("run-1", timeout_seconds=0.0)
|
||||
next(gen)
|
||||
gen.close()
|
||||
fake_pubsub.unsubscribe.assert_called_once_with("dify:inspector:workflow_run:run-1")
|
||||
fake_pubsub.close.assert_called_once()
|
||||
|
||||
|
||||
def test_subscribe_swallows_teardown_errors():
|
||||
"""``unsubscribe`` / ``close`` failures must not propagate out of the
|
||||
generator — they're best-effort cleanup."""
|
||||
fake_pubsub = MagicMock()
|
||||
fake_pubsub.get_message.return_value = None
|
||||
fake_pubsub.unsubscribe.side_effect = RuntimeError("redis offline")
|
||||
fake_pubsub.close.side_effect = RuntimeError("close failed")
|
||||
fake_redis = MagicMock()
|
||||
fake_redis.pubsub.return_value = fake_pubsub
|
||||
with patch.object(inspector_events, "redis_client", fake_redis):
|
||||
gen = inspector_events.subscribe("run-1", timeout_seconds=0.0)
|
||||
next(gen)
|
||||
# The teardown path runs in ``finally``; closing the generator
|
||||
# exercises it. No exception should escape.
|
||||
gen.close()
|
||||
|
||||
|
||||
def test_subscribe_skips_non_string_data_payloads():
|
||||
"""``raw["data"]`` can be ``None`` / int / bytes — only str is decodable
|
||||
and the rest are silently skipped."""
|
||||
fake_pubsub = MagicMock()
|
||||
msgs: list[dict[str, Any] | None] = [
|
||||
{"data": None}, # missing payload
|
||||
{"data": 12345}, # int payload (shouldn't happen, defensive)
|
||||
{
|
||||
"data": json.dumps(
|
||||
{"kind": "node_changed", "workflow_run_id": "run-1", "node_id": "agent-1", "status": "running"}
|
||||
)
|
||||
},
|
||||
]
|
||||
it = iter(msgs)
|
||||
fake_pubsub.get_message.side_effect = lambda **_kw: next(it, None)
|
||||
fake_redis = MagicMock()
|
||||
fake_redis.pubsub.return_value = fake_pubsub
|
||||
with patch.object(inspector_events, "redis_client", fake_redis):
|
||||
gen = inspector_events.subscribe("run-1", timeout_seconds=0.0)
|
||||
msg = next(gen)
|
||||
assert msg.kind == "node_changed"
|
||||
assert msg.node_id == "agent-1"
|
||||
@ -0,0 +1,499 @@
|
||||
"""Unit tests for NodeOutputInspectorService (Stage 4 §8).
|
||||
|
||||
The service reads from postgres and resolves agent v2 bindings; this suite
|
||||
mocks ``session_factory`` and the binding resolver so we exercise the
|
||||
view-construction logic without DB / network access.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import UTC, datetime
|
||||
from types import SimpleNamespace
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from graphon.enums import WorkflowExecutionStatus, WorkflowNodeExecutionStatus
|
||||
from models.agent_config_entities import (
|
||||
DeclaredArrayItem,
|
||||
DeclaredOutputConfig,
|
||||
DeclaredOutputType,
|
||||
)
|
||||
from models.enums import WorkflowRunTriggeredFrom
|
||||
from services.workflow.node_output_inspector_service import (
|
||||
NodeOutputInspectorError,
|
||||
NodeOutputInspectorService,
|
||||
NodeOutputStatus,
|
||||
NodeStatus,
|
||||
)
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Fixtures
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _app_model(*, tenant_id: str = "tenant-1", app_id: str = "app-1"):
|
||||
return SimpleNamespace(tenant_id=tenant_id, id=app_id)
|
||||
|
||||
|
||||
def _workflow_run(
|
||||
*,
|
||||
run_id: str = "run-1",
|
||||
workflow_id: str = "workflow-1",
|
||||
tenant_id: str = "tenant-1",
|
||||
app_id: str = "app-1",
|
||||
triggered_from: WorkflowRunTriggeredFrom = WorkflowRunTriggeredFrom.DEBUGGING,
|
||||
status: WorkflowExecutionStatus = WorkflowExecutionStatus.RUNNING,
|
||||
nodes: list[dict[str, Any]] | None = None,
|
||||
):
|
||||
return SimpleNamespace(
|
||||
id=run_id,
|
||||
workflow_id=workflow_id,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
triggered_from=triggered_from,
|
||||
status=status,
|
||||
graph=json.dumps({"nodes": nodes or []}),
|
||||
)
|
||||
|
||||
|
||||
def _execution(
|
||||
*,
|
||||
node_id: str,
|
||||
node_type: str = "agent",
|
||||
title: str = "",
|
||||
status: WorkflowNodeExecutionStatus = WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
outputs: dict[str, Any] | None = None,
|
||||
execution_metadata: dict[str, Any] | None = None,
|
||||
index: int = 1,
|
||||
created_at: datetime | None = None,
|
||||
finished_at: datetime | None = None,
|
||||
):
|
||||
return SimpleNamespace(
|
||||
node_id=node_id,
|
||||
node_type=node_type,
|
||||
title=title or node_id,
|
||||
status=status,
|
||||
outputs=json.dumps(outputs) if outputs is not None else None,
|
||||
execution_metadata=json.dumps(execution_metadata) if execution_metadata is not None else None,
|
||||
index=index,
|
||||
created_at=created_at or datetime.now(UTC),
|
||||
finished_at=finished_at,
|
||||
)
|
||||
|
||||
|
||||
def _agent_v2_node(*, node_id: str = "agent-node-1", title: str = "My Agent") -> dict[str, Any]:
|
||||
return {
|
||||
"id": node_id,
|
||||
"data": {"type": "agent", "version": "2", "title": title},
|
||||
}
|
||||
|
||||
|
||||
def _non_agent_node(*, node_id: str = "tool-node-1", node_type: str = "tool", title: str = "Slack") -> dict[str, Any]:
|
||||
return {
|
||||
"id": node_id,
|
||||
"data": {"type": node_type, "title": title},
|
||||
}
|
||||
|
||||
|
||||
def _patch_session(
|
||||
*,
|
||||
workflow_run: SimpleNamespace | None,
|
||||
executions: list[SimpleNamespace] | None = None,
|
||||
):
|
||||
"""Patch ``session_factory.create_session`` to return the configured rows.
|
||||
|
||||
Returns a context manager that the test uses with ``with``.
|
||||
"""
|
||||
executions = executions or []
|
||||
mock_session = MagicMock()
|
||||
mock_session.scalar.return_value = workflow_run
|
||||
mock_session.scalars.return_value.all.return_value = executions
|
||||
cm = MagicMock()
|
||||
cm.__enter__.return_value = mock_session
|
||||
cm.__exit__.return_value = False
|
||||
return patch(
|
||||
"services.workflow.node_output_inspector_service.session_factory.create_session",
|
||||
return_value=cm,
|
||||
)
|
||||
|
||||
|
||||
def _stub_binding_resolver(*, declared_outputs: list[DeclaredOutputConfig]):
|
||||
"""Build a fake ``WorkflowAgentBindingResolver`` whose ``.resolve`` returns
|
||||
a binding with ``node_job_config_dict.declared_outputs``."""
|
||||
binding = SimpleNamespace(
|
||||
id="binding-1",
|
||||
node_job_config_dict={
|
||||
"workflow_prompt": "stub",
|
||||
"declared_outputs": [o.model_dump() for o in declared_outputs],
|
||||
},
|
||||
)
|
||||
bundle = SimpleNamespace(binding=binding, agent=None, snapshot=None)
|
||||
resolver = MagicMock()
|
||||
resolver.resolve.return_value = bundle
|
||||
return resolver
|
||||
|
||||
|
||||
def _make_service(declared_outputs: list[DeclaredOutputConfig] | None = None) -> NodeOutputInspectorService:
|
||||
return NodeOutputInspectorService(binding_resolver=_stub_binding_resolver(declared_outputs=declared_outputs or []))
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# 404 paths
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_snapshot_404_when_workflow_run_missing():
|
||||
service = _make_service()
|
||||
with _patch_session(workflow_run=None):
|
||||
with pytest.raises(NodeOutputInspectorError) as exc:
|
||||
service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="missing")
|
||||
assert exc.value.code == "workflow_run_not_found"
|
||||
|
||||
|
||||
def test_snapshot_accepts_published_run_d1_lifted():
|
||||
"""D-1 was lifted 2026-05-26: any ``triggered_from`` is now accepted."""
|
||||
service = _make_service()
|
||||
run = _workflow_run(
|
||||
nodes=[_agent_v2_node(node_id="agent-1")],
|
||||
triggered_from=WorkflowRunTriggeredFrom.APP_RUN,
|
||||
)
|
||||
with _patch_session(workflow_run=run, executions=[]):
|
||||
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
|
||||
assert snapshot.workflow_run_id == "run-1"
|
||||
assert [n.node_id for n in snapshot.node_outputs] == ["agent-1"]
|
||||
|
||||
|
||||
def test_snapshot_accepts_webhook_triggered_run():
|
||||
"""Webhook / schedule / plugin triggers are also published-side."""
|
||||
service = _make_service()
|
||||
run = _workflow_run(
|
||||
nodes=[_agent_v2_node(node_id="agent-1")],
|
||||
triggered_from=WorkflowRunTriggeredFrom.WEBHOOK,
|
||||
)
|
||||
with _patch_session(workflow_run=run, executions=[]):
|
||||
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
|
||||
assert snapshot.workflow_run_id == "run-1"
|
||||
|
||||
|
||||
def test_node_detail_404_when_node_id_absent_from_graph():
|
||||
service = _make_service()
|
||||
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
|
||||
with _patch_session(workflow_run=run, executions=[]):
|
||||
with pytest.raises(NodeOutputInspectorError) as exc:
|
||||
service.node_detail(app_model=_app_model(), workflow_run_id="run-1", node_id="ghost")
|
||||
assert exc.value.code == "node_not_in_workflow_run"
|
||||
|
||||
|
||||
def test_output_preview_404_when_output_name_unknown():
|
||||
service = _make_service(
|
||||
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
|
||||
)
|
||||
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
|
||||
ex = _execution(node_id="agent-1", outputs={"text": "hello"})
|
||||
with _patch_session(workflow_run=run, executions=[ex]):
|
||||
with pytest.raises(NodeOutputInspectorError) as exc:
|
||||
service.output_preview(
|
||||
app_model=_app_model(),
|
||||
workflow_run_id="run-1",
|
||||
node_id="agent-1",
|
||||
output_name="missing",
|
||||
)
|
||||
assert exc.value.code == "node_output_not_declared"
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Snapshot happy path
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_snapshot_status_pending_when_node_has_no_execution():
|
||||
service = _make_service(
|
||||
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
|
||||
)
|
||||
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
|
||||
with _patch_session(workflow_run=run, executions=[]):
|
||||
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
|
||||
|
||||
assert len(snapshot.node_outputs) == 1
|
||||
node = snapshot.node_outputs[0]
|
||||
assert node.node_status == NodeStatus.IDLE
|
||||
assert node.outputs[0].status == NodeOutputStatus.PENDING
|
||||
|
||||
|
||||
def test_snapshot_status_running():
|
||||
service = _make_service(
|
||||
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
|
||||
)
|
||||
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
|
||||
ex = _execution(node_id="agent-1", status=WorkflowNodeExecutionStatus.RUNNING)
|
||||
with _patch_session(workflow_run=run, executions=[ex]):
|
||||
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
|
||||
assert snapshot.node_outputs[0].node_status == NodeStatus.RUNNING
|
||||
assert snapshot.node_outputs[0].outputs[0].status == NodeOutputStatus.RUNNING
|
||||
|
||||
|
||||
def test_snapshot_status_failed_node_marks_all_outputs_failed():
|
||||
service = _make_service(
|
||||
declared_outputs=[
|
||||
DeclaredOutputConfig(name="a", type=DeclaredOutputType.STRING),
|
||||
DeclaredOutputConfig(name="b", type=DeclaredOutputType.NUMBER),
|
||||
],
|
||||
)
|
||||
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
|
||||
ex = _execution(node_id="agent-1", status=WorkflowNodeExecutionStatus.FAILED)
|
||||
with _patch_session(workflow_run=run, executions=[ex]):
|
||||
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
|
||||
statuses = {o.name: o.status for o in snapshot.node_outputs[0].outputs}
|
||||
assert statuses == {"a": NodeOutputStatus.FAILED, "b": NodeOutputStatus.FAILED}
|
||||
|
||||
|
||||
def test_snapshot_status_ready_when_outputs_present_and_no_failure_metadata():
|
||||
service = _make_service(
|
||||
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
|
||||
)
|
||||
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
|
||||
ex = _execution(node_id="agent-1", outputs={"text": "hello"})
|
||||
with _patch_session(workflow_run=run, executions=[ex]):
|
||||
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
|
||||
output = snapshot.node_outputs[0].outputs[0]
|
||||
assert output.status == NodeOutputStatus.READY
|
||||
assert output.value_preview == "hello"
|
||||
|
||||
|
||||
def test_snapshot_marks_type_check_failure():
|
||||
service = _make_service(
|
||||
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
|
||||
)
|
||||
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
|
||||
ex = _execution(
|
||||
node_id="agent-1",
|
||||
outputs={"text": "ok"},
|
||||
execution_metadata={
|
||||
"output_type_check": {
|
||||
"passed": False,
|
||||
"results": [{"name": "text", "type": "string", "status": "type_check_failed", "reason": "wrong shape"}],
|
||||
}
|
||||
},
|
||||
)
|
||||
with _patch_session(workflow_run=run, executions=[ex]):
|
||||
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
|
||||
output = snapshot.node_outputs[0].outputs[0]
|
||||
assert output.status == NodeOutputStatus.TYPE_CHECK_FAILED
|
||||
assert output.type_check is not None
|
||||
assert output.type_check.passed is False
|
||||
assert output.type_check.reason == "wrong shape"
|
||||
|
||||
|
||||
def test_snapshot_marks_output_check_failure_when_type_check_passed():
|
||||
service = _make_service(
|
||||
declared_outputs=[
|
||||
DeclaredOutputConfig(
|
||||
name="report",
|
||||
type=DeclaredOutputType.FILE,
|
||||
)
|
||||
],
|
||||
)
|
||||
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
|
||||
ex = _execution(
|
||||
node_id="agent-1",
|
||||
outputs={"report": {"file_id": "550e8400-e29b-41d4-a716-446655440000"}},
|
||||
execution_metadata={
|
||||
"output_type_check": {"passed": True, "results": [{"name": "report", "status": "ready"}]},
|
||||
"output_check": {
|
||||
"passed": False,
|
||||
"results": [{"name": "report", "status": "failed", "reason": "benchmark mismatch"}],
|
||||
},
|
||||
},
|
||||
)
|
||||
with (
|
||||
_patch_session(workflow_run=run, executions=[ex]),
|
||||
patch(
|
||||
"services.workflow.node_output_inspector_service.file_helpers.get_signed_file_url",
|
||||
return_value="https://signed.example/x",
|
||||
),
|
||||
):
|
||||
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
|
||||
output = snapshot.node_outputs[0].outputs[0]
|
||||
assert output.status == NodeOutputStatus.OUTPUT_CHECK_FAILED
|
||||
assert output.output_check is not None
|
||||
assert output.output_check.passed is False
|
||||
assert output.output_check.reason == "benchmark mismatch"
|
||||
|
||||
|
||||
def test_snapshot_marks_not_produced_when_declared_output_missing_from_payload():
|
||||
service = _make_service(
|
||||
declared_outputs=[
|
||||
DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING),
|
||||
DeclaredOutputConfig(name="optional_meta", type=DeclaredOutputType.OBJECT, required=False),
|
||||
],
|
||||
)
|
||||
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
|
||||
ex = _execution(node_id="agent-1", outputs={"text": "hi"}) # optional_meta missing
|
||||
with _patch_session(workflow_run=run, executions=[ex]):
|
||||
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
|
||||
statuses = {o.name: o.status for o in snapshot.node_outputs[0].outputs}
|
||||
assert statuses == {"text": NodeOutputStatus.READY, "optional_meta": NodeOutputStatus.NOT_PRODUCED}
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Non-agent node — outputs inferred from execution payload
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_non_agent_node_outputs_inferred_from_payload_keys():
|
||||
service = _make_service()
|
||||
run = _workflow_run(nodes=[_non_agent_node(node_id="tool-1", node_type="tool")])
|
||||
ex = _execution(
|
||||
node_id="tool-1",
|
||||
node_type="tool",
|
||||
outputs={"message": "sent", "thread_ts": "1234"},
|
||||
)
|
||||
with _patch_session(workflow_run=run, executions=[ex]):
|
||||
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
|
||||
output_names = sorted(o.name for o in snapshot.node_outputs[0].outputs)
|
||||
assert output_names == ["message", "thread_ts"]
|
||||
# All inferred outputs should have ``type=None`` since we don't know the
|
||||
# schema yet.
|
||||
assert all(o.type is None for o in snapshot.node_outputs[0].outputs)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# File preview / signed URL
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_file_output_preview_includes_signed_url():
|
||||
service = _make_service(
|
||||
declared_outputs=[
|
||||
DeclaredOutputConfig(name="report", type=DeclaredOutputType.FILE),
|
||||
],
|
||||
)
|
||||
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
|
||||
file_payload = {"file_id": "550e8400-e29b-41d4-a716-446655440000", "filename": "x.pdf"}
|
||||
ex = _execution(node_id="agent-1", outputs={"report": file_payload})
|
||||
with (
|
||||
_patch_session(workflow_run=run, executions=[ex]),
|
||||
patch(
|
||||
"services.workflow.node_output_inspector_service.file_helpers.get_signed_file_url",
|
||||
return_value="https://signed.example/x.pdf",
|
||||
),
|
||||
):
|
||||
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
|
||||
preview_value = snapshot.node_outputs[0].outputs[0].value_preview
|
||||
assert isinstance(preview_value, dict)
|
||||
assert preview_value["preview_url"] == "https://signed.example/x.pdf"
|
||||
assert preview_value["filename"] == "x.pdf"
|
||||
|
||||
|
||||
def test_file_output_preview_endpoint_returns_full_value_with_signed_url():
|
||||
service = _make_service(
|
||||
declared_outputs=[
|
||||
DeclaredOutputConfig(name="report", type=DeclaredOutputType.FILE),
|
||||
],
|
||||
)
|
||||
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
|
||||
file_payload = {"file_id": "550e8400-e29b-41d4-a716-446655440000", "filename": "x.pdf"}
|
||||
ex = _execution(node_id="agent-1", outputs={"report": file_payload})
|
||||
with (
|
||||
_patch_session(workflow_run=run, executions=[ex]),
|
||||
patch(
|
||||
"services.workflow.node_output_inspector_service.file_helpers.get_signed_file_url",
|
||||
return_value="https://signed.example/x.pdf",
|
||||
),
|
||||
):
|
||||
preview = service.output_preview(
|
||||
app_model=_app_model(),
|
||||
workflow_run_id="run-1",
|
||||
node_id="agent-1",
|
||||
output_name="report",
|
||||
)
|
||||
assert preview.output_name == "report"
|
||||
assert preview.status == NodeOutputStatus.READY
|
||||
assert isinstance(preview.value, dict)
|
||||
assert preview.value["preview_url"] == "https://signed.example/x.pdf"
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Retry / metadata
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_retried_count_pulled_from_attempt_metadata():
|
||||
service = _make_service(
|
||||
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
|
||||
)
|
||||
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
|
||||
ex = _execution(
|
||||
node_id="agent-1",
|
||||
outputs={"text": "ok"},
|
||||
execution_metadata={"attempt": 2},
|
||||
)
|
||||
with _patch_session(workflow_run=run, executions=[ex]):
|
||||
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
|
||||
assert snapshot.node_outputs[0].outputs[0].retried == 2
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Latest-execution-per-node grouping
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_keeps_latest_execution_per_node_by_index():
|
||||
"""When a node has multiple executions (retries / iterations) keep the
|
||||
canonical one — the row with the highest ``index``."""
|
||||
service = _make_service(
|
||||
declared_outputs=[DeclaredOutputConfig(name="text", type=DeclaredOutputType.STRING)],
|
||||
)
|
||||
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
|
||||
older = _execution(node_id="agent-1", outputs={"text": "old"}, index=1)
|
||||
newer = _execution(node_id="agent-1", outputs={"text": "new"}, index=5)
|
||||
with _patch_session(workflow_run=run, executions=[older, newer]):
|
||||
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
|
||||
assert snapshot.node_outputs[0].outputs[0].value_preview == "new"
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Array item declarations round-trip correctly
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_array_typed_output_with_array_item_renders_correctly():
|
||||
service = _make_service(
|
||||
declared_outputs=[
|
||||
DeclaredOutputConfig(
|
||||
name="files",
|
||||
type=DeclaredOutputType.ARRAY,
|
||||
array_item=DeclaredArrayItem(type=DeclaredOutputType.FILE),
|
||||
)
|
||||
],
|
||||
)
|
||||
run = _workflow_run(nodes=[_agent_v2_node(node_id="agent-1")])
|
||||
ex = _execution(node_id="agent-1", outputs={"files": []})
|
||||
with _patch_session(workflow_run=run, executions=[ex]):
|
||||
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
|
||||
output = snapshot.node_outputs[0].outputs[0]
|
||||
assert output.type == DeclaredOutputType.ARRAY
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Graph parsing edge cases
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_unparseable_graph_blob_yields_empty_snapshot_not_500():
|
||||
service = _make_service()
|
||||
run = SimpleNamespace(
|
||||
id="run-1",
|
||||
workflow_id="workflow-1",
|
||||
tenant_id="tenant-1",
|
||||
app_id="app-1",
|
||||
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
|
||||
status=WorkflowExecutionStatus.RUNNING,
|
||||
graph="{not valid json",
|
||||
)
|
||||
with _patch_session(workflow_run=run, executions=[]):
|
||||
snapshot = service.snapshot_workflow_run(app_model=_app_model(), workflow_run_id="run-1")
|
||||
assert snapshot.node_outputs == []
|
||||
6
api/uv.lock
generated
6
api/uv.lock
generated
@ -1724,7 +1724,7 @@ storage = [
|
||||
{ name = "opendal", specifier = "==0.46.0" },
|
||||
{ name = "oss2", specifier = ">=2.19.1,<3.0.0" },
|
||||
{ name = "supabase", specifier = ">=2.30.0,<3.0.0" },
|
||||
{ name = "tos", specifier = ">=2.9.1,<3.0.0" },
|
||||
{ name = "tos", specifier = ">=2.9.0,<3.0.0" },
|
||||
]
|
||||
tools = [
|
||||
{ name = "cloudscraper", specifier = ">=1.2.71,<2.0.0" },
|
||||
@ -6564,7 +6564,7 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "tos"
|
||||
version = "2.9.1"
|
||||
version = "2.9.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "crcmod" },
|
||||
@ -6574,7 +6574,7 @@ dependencies = [
|
||||
{ name = "six" },
|
||||
{ name = "wrapt" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/60/42/079680659e1f6c20f7b837e6b13f16d4c1c389889071f49c18baea9ac2ac/tos-2.9.1.tar.gz", hash = "sha256:06a5cc095d5b3f0e52b04aee8f7e60f8ddcf0c94c4408213e3485e40070d54ef", size = 163709, upload-time = "2026-05-26T03:29:27.747Z" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/9a/b3/13451226f564f88d9db2323e9b7eabcced792a0ad5ee1e333751a7634257/tos-2.9.0.tar.gz", hash = "sha256:861cfc348e770f099f911cb96b2c41774ada6c9c51b7a89d97e0c426074dd99e", size = 157071, upload-time = "2026-01-06T04:13:08.921Z" }
|
||||
|
||||
[[package]]
|
||||
name = "tqdm"
|
||||
|
||||
@ -121,9 +121,7 @@ export type AgentSoulToolsConfig = {
|
||||
cli_tools?: Array<{
|
||||
[key: string]: unknown
|
||||
}>
|
||||
dify_tools?: Array<{
|
||||
[key: string]: unknown
|
||||
}>
|
||||
dify_tools?: Array<AgentSoulDifyToolConfig>
|
||||
}
|
||||
|
||||
export type AgentKnowledgeQueryMode = 'generated_query' | 'user_query'
|
||||
@ -134,6 +132,28 @@ export type AgentSoulModelCredentialRef = {
|
||||
type: string
|
||||
}
|
||||
|
||||
export type AgentSoulDifyToolConfig = {
|
||||
credential_ref?: AgentSoulDifyToolCredentialRef
|
||||
credential_type?: 'api-key' | 'oauth2' | 'unauthorized'
|
||||
description?: string | null
|
||||
enabled?: boolean
|
||||
name?: string | null
|
||||
plugin_id?: string | null
|
||||
provider?: string | null
|
||||
provider_id?: string | null
|
||||
provider_type?: string
|
||||
runtime_parameters?: {
|
||||
[key: string]: unknown
|
||||
}
|
||||
tool_name: string
|
||||
}
|
||||
|
||||
export type AgentSoulDifyToolCredentialRef = {
|
||||
id?: string | null
|
||||
provider?: string | null
|
||||
type?: 'provider' | 'tool'
|
||||
}
|
||||
|
||||
export type GetAgentsData = {
|
||||
body?: never
|
||||
path?: never
|
||||
|
||||
@ -78,14 +78,6 @@ export const zAgentSoulSkillsFilesConfig = z.object({
|
||||
skills: z.array(z.record(z.string(), z.unknown())).optional(),
|
||||
})
|
||||
|
||||
/**
|
||||
* AgentSoulToolsConfig
|
||||
*/
|
||||
export const zAgentSoulToolsConfig = z.object({
|
||||
cli_tools: z.array(z.record(z.string(), z.unknown())).optional(),
|
||||
dify_tools: z.array(z.record(z.string(), z.unknown())).optional(),
|
||||
})
|
||||
|
||||
/**
|
||||
* AgentKnowledgeQueryMode
|
||||
*/
|
||||
@ -124,6 +116,53 @@ export const zAgentSoulModelConfig = z.object({
|
||||
plugin_id: z.string().min(1).max(255),
|
||||
})
|
||||
|
||||
/**
|
||||
* AgentSoulDifyToolCredentialRef
|
||||
*
|
||||
* Reference to a stored Dify Plugin Tool credential.
|
||||
*
|
||||
* Secret values are resolved only at runtime. The legacy ``credential_id``
|
||||
* field is accepted by :class:`AgentSoulDifyToolConfig` and normalized here so
|
||||
* old Agent tool payloads can be read while new payloads stay explicit.
|
||||
*/
|
||||
export const zAgentSoulDifyToolCredentialRef = z.object({
|
||||
id: z.string().max(255).nullish(),
|
||||
provider: z.string().max(255).nullish(),
|
||||
type: z.enum(['provider', 'tool']).optional().default('tool'),
|
||||
})
|
||||
|
||||
/**
|
||||
* AgentSoulDifyToolConfig
|
||||
*
|
||||
* One Dify Plugin Tool configured on Agent Soul.
|
||||
*
|
||||
* The API backend prepares this persisted product shape into
|
||||
* ``DifyPluginToolConfig`` before sending a run request to Agent backend.
|
||||
* ``provider_id`` keeps compatibility with existing Agent tool config payloads;
|
||||
* new callers should send ``plugin_id`` + ``provider`` when available.
|
||||
*/
|
||||
export const zAgentSoulDifyToolConfig = z.object({
|
||||
credential_ref: zAgentSoulDifyToolCredentialRef.optional(),
|
||||
credential_type: z.enum(['api-key', 'oauth2', 'unauthorized']).optional().default('api-key'),
|
||||
description: z.string().nullish(),
|
||||
enabled: z.boolean().optional().default(true),
|
||||
name: z.string().max(255).nullish(),
|
||||
plugin_id: z.string().max(255).nullish(),
|
||||
provider: z.string().max(255).nullish(),
|
||||
provider_id: z.string().max(255).nullish(),
|
||||
provider_type: z.string().optional().default('plugin'),
|
||||
runtime_parameters: z.record(z.string(), z.unknown()).optional(),
|
||||
tool_name: z.string().min(1).max(255),
|
||||
})
|
||||
|
||||
/**
|
||||
* AgentSoulToolsConfig
|
||||
*/
|
||||
export const zAgentSoulToolsConfig = z.object({
|
||||
cli_tools: z.array(z.record(z.string(), z.unknown())).optional(),
|
||||
dify_tools: z.array(zAgentSoulDifyToolConfig).optional(),
|
||||
})
|
||||
|
||||
/**
|
||||
* AgentSoulConfig
|
||||
*/
|
||||
|
||||
@ -167,6 +167,14 @@ import {
|
||||
zGetAppsByAppIdWorkflowsDraftNodesByNodeIdVariablesResponse,
|
||||
zGetAppsByAppIdWorkflowsDraftPath,
|
||||
zGetAppsByAppIdWorkflowsDraftResponse,
|
||||
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewPath,
|
||||
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse,
|
||||
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdPath,
|
||||
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponse,
|
||||
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsPath,
|
||||
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponse,
|
||||
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsPath,
|
||||
zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponse,
|
||||
zGetAppsByAppIdWorkflowsDraftSystemVariablesPath,
|
||||
zGetAppsByAppIdWorkflowsDraftSystemVariablesResponse,
|
||||
zGetAppsByAppIdWorkflowsDraftVariablesByVariableIdPath,
|
||||
@ -175,6 +183,14 @@ import {
|
||||
zGetAppsByAppIdWorkflowsDraftVariablesQuery,
|
||||
zGetAppsByAppIdWorkflowsDraftVariablesResponse,
|
||||
zGetAppsByAppIdWorkflowsPath,
|
||||
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewPath,
|
||||
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse,
|
||||
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdPath,
|
||||
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponse,
|
||||
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsPath,
|
||||
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponse,
|
||||
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsPath,
|
||||
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponse,
|
||||
zGetAppsByAppIdWorkflowsPublishPath,
|
||||
zGetAppsByAppIdWorkflowsPublishResponse,
|
||||
zGetAppsByAppIdWorkflowsQuery,
|
||||
@ -3787,13 +3803,132 @@ export const run10 = {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get system variables for workflow
|
||||
* Server-Sent Events stream of inspector deltas for a draft workflow run.
|
||||
*
|
||||
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
|
||||
*
|
||||
* @deprecated
|
||||
*/
|
||||
export const get59 = oc
|
||||
.route({
|
||||
deprecated: true,
|
||||
description:
|
||||
'Server-Sent Events stream of inspector deltas for a draft workflow run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
|
||||
inputStructure: 'detailed',
|
||||
method: 'GET',
|
||||
operationId: 'getAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEvents',
|
||||
path: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/events',
|
||||
tags: ['console'],
|
||||
})
|
||||
.input(z.object({ params: zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsPath }))
|
||||
.output(zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponse)
|
||||
|
||||
export const events = {
|
||||
get: get59,
|
||||
}
|
||||
|
||||
/**
|
||||
* Full value for one declared output, including signed download URL for files.
|
||||
*
|
||||
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
|
||||
*
|
||||
* @deprecated
|
||||
*/
|
||||
export const get60 = oc
|
||||
.route({
|
||||
deprecated: true,
|
||||
description:
|
||||
'Full value for one declared output, including signed download URL for files.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
|
||||
inputStructure: 'detailed',
|
||||
method: 'GET',
|
||||
operationId: 'getAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreview',
|
||||
path: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/{node_id}/{output_name}/preview',
|
||||
tags: ['console'],
|
||||
})
|
||||
.input(
|
||||
z.object({
|
||||
params: zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewPath,
|
||||
}),
|
||||
)
|
||||
.output(zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse)
|
||||
|
||||
export const preview3 = {
|
||||
get: get60,
|
||||
}
|
||||
|
||||
export const byOutputName = {
|
||||
preview: preview3,
|
||||
}
|
||||
|
||||
/**
|
||||
* One node's declared outputs for a draft workflow run.
|
||||
*
|
||||
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
|
||||
*
|
||||
* @deprecated
|
||||
*/
|
||||
export const get61 = oc
|
||||
.route({
|
||||
deprecated: true,
|
||||
description:
|
||||
'One node\'s declared outputs for a draft workflow run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
|
||||
inputStructure: 'detailed',
|
||||
method: 'GET',
|
||||
operationId: 'getAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeId',
|
||||
path: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/{node_id}',
|
||||
tags: ['console'],
|
||||
})
|
||||
.input(z.object({ params: zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdPath }))
|
||||
.output(zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponse)
|
||||
|
||||
export const byNodeId8 = {
|
||||
get: get61,
|
||||
byOutputName,
|
||||
}
|
||||
|
||||
/**
|
||||
* Snapshot of every node's declared outputs for a draft workflow run.
|
||||
*
|
||||
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
|
||||
*
|
||||
* @deprecated
|
||||
*/
|
||||
export const get62 = oc
|
||||
.route({
|
||||
deprecated: true,
|
||||
description:
|
||||
'Snapshot of every node\'s declared outputs for a draft workflow run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
|
||||
inputStructure: 'detailed',
|
||||
method: 'GET',
|
||||
operationId: 'getAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputs',
|
||||
path: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs',
|
||||
tags: ['console'],
|
||||
})
|
||||
.input(z.object({ params: zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsPath }))
|
||||
.output(zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponse)
|
||||
|
||||
export const nodeOutputs = {
|
||||
get: get62,
|
||||
events,
|
||||
byNodeId: byNodeId8,
|
||||
}
|
||||
|
||||
export const byRunId2 = {
|
||||
nodeOutputs,
|
||||
}
|
||||
|
||||
export const runs = {
|
||||
byRunId: byRunId2,
|
||||
}
|
||||
|
||||
/**
|
||||
* Get system variables for workflow
|
||||
*
|
||||
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
|
||||
*
|
||||
* @deprecated
|
||||
*/
|
||||
export const get63 = oc
|
||||
.route({
|
||||
deprecated: true,
|
||||
description:
|
||||
@ -3808,7 +3943,7 @@ export const get59 = oc
|
||||
.output(zGetAppsByAppIdWorkflowsDraftSystemVariablesResponse)
|
||||
|
||||
export const systemVariables = {
|
||||
get: get59,
|
||||
get: get63,
|
||||
}
|
||||
|
||||
/**
|
||||
@ -3930,7 +4065,7 @@ export const delete9 = oc
|
||||
*
|
||||
* @deprecated
|
||||
*/
|
||||
export const get60 = oc
|
||||
export const get64 = oc
|
||||
.route({
|
||||
deprecated: true,
|
||||
description:
|
||||
@ -3972,7 +4107,7 @@ export const patch2 = oc
|
||||
|
||||
export const byVariableId = {
|
||||
delete: delete9,
|
||||
get: get60,
|
||||
get: get64,
|
||||
patch: patch2,
|
||||
reset,
|
||||
}
|
||||
@ -4002,7 +4137,7 @@ export const delete10 = oc
|
||||
*
|
||||
* @deprecated
|
||||
*/
|
||||
export const get61 = oc
|
||||
export const get65 = oc
|
||||
.route({
|
||||
deprecated: true,
|
||||
description:
|
||||
@ -4024,7 +4159,7 @@ export const get61 = oc
|
||||
|
||||
export const variables2 = {
|
||||
delete: delete10,
|
||||
get: get61,
|
||||
get: get65,
|
||||
byVariableId,
|
||||
}
|
||||
|
||||
@ -4037,7 +4172,7 @@ export const variables2 = {
|
||||
*
|
||||
* @deprecated
|
||||
*/
|
||||
export const get62 = oc
|
||||
export const get66 = oc
|
||||
.route({
|
||||
deprecated: true,
|
||||
description:
|
||||
@ -4082,7 +4217,7 @@ export const post55 = oc
|
||||
.output(zPostAppsByAppIdWorkflowsDraftResponse)
|
||||
|
||||
export const draft2 = {
|
||||
get: get62,
|
||||
get: get66,
|
||||
post: post55,
|
||||
conversationVariables: conversationVariables2,
|
||||
environmentVariables,
|
||||
@ -4092,6 +4227,7 @@ export const draft2 = {
|
||||
loop: loop2,
|
||||
nodes: nodes7,
|
||||
run: run10,
|
||||
runs,
|
||||
systemVariables,
|
||||
trigger: trigger2,
|
||||
variables: variables2,
|
||||
@ -4106,7 +4242,7 @@ export const draft2 = {
|
||||
*
|
||||
* @deprecated
|
||||
*/
|
||||
export const get63 = oc
|
||||
export const get67 = oc
|
||||
.route({
|
||||
deprecated: true,
|
||||
description:
|
||||
@ -4149,10 +4285,137 @@ export const post56 = oc
|
||||
.output(zPostAppsByAppIdWorkflowsPublishResponse)
|
||||
|
||||
export const publish = {
|
||||
get: get63,
|
||||
get: get67,
|
||||
post: post56,
|
||||
}
|
||||
|
||||
/**
|
||||
* Server-Sent Events stream of inspector deltas for a published workflow run.
|
||||
*
|
||||
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
|
||||
*
|
||||
* @deprecated
|
||||
*/
|
||||
export const get68 = oc
|
||||
.route({
|
||||
deprecated: true,
|
||||
description:
|
||||
'Server-Sent Events stream of inspector deltas for a published workflow run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
|
||||
inputStructure: 'detailed',
|
||||
method: 'GET',
|
||||
operationId: 'getAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEvents',
|
||||
path: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/events',
|
||||
tags: ['console'],
|
||||
})
|
||||
.input(z.object({ params: zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsPath }))
|
||||
.output(zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponse)
|
||||
|
||||
export const events2 = {
|
||||
get: get68,
|
||||
}
|
||||
|
||||
/**
|
||||
* Full value for one declared output of a published run.
|
||||
*
|
||||
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
|
||||
*
|
||||
* @deprecated
|
||||
*/
|
||||
export const get69 = oc
|
||||
.route({
|
||||
deprecated: true,
|
||||
description:
|
||||
'Full value for one declared output of a published run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
|
||||
inputStructure: 'detailed',
|
||||
method: 'GET',
|
||||
operationId:
|
||||
'getAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreview',
|
||||
path: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/{node_id}/{output_name}/preview',
|
||||
tags: ['console'],
|
||||
})
|
||||
.input(
|
||||
z.object({
|
||||
params:
|
||||
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewPath,
|
||||
}),
|
||||
)
|
||||
.output(
|
||||
zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse,
|
||||
)
|
||||
|
||||
export const preview4 = {
|
||||
get: get69,
|
||||
}
|
||||
|
||||
export const byOutputName2 = {
|
||||
preview: preview4,
|
||||
}
|
||||
|
||||
/**
|
||||
* One node's declared outputs for a published workflow run.
|
||||
*
|
||||
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
|
||||
*
|
||||
* @deprecated
|
||||
*/
|
||||
export const get70 = oc
|
||||
.route({
|
||||
deprecated: true,
|
||||
description:
|
||||
'One node\'s declared outputs for a published workflow run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
|
||||
inputStructure: 'detailed',
|
||||
method: 'GET',
|
||||
operationId: 'getAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeId',
|
||||
path: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/{node_id}',
|
||||
tags: ['console'],
|
||||
})
|
||||
.input(z.object({ params: zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdPath }))
|
||||
.output(zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponse)
|
||||
|
||||
export const byNodeId9 = {
|
||||
get: get70,
|
||||
byOutputName: byOutputName2,
|
||||
}
|
||||
|
||||
/**
|
||||
* Snapshot of every node's declared outputs for a published workflow run.
|
||||
*
|
||||
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
|
||||
*
|
||||
* @deprecated
|
||||
*/
|
||||
export const get71 = oc
|
||||
.route({
|
||||
deprecated: true,
|
||||
description:
|
||||
'Snapshot of every node\'s declared outputs for a published workflow run.\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
|
||||
inputStructure: 'detailed',
|
||||
method: 'GET',
|
||||
operationId: 'getAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputs',
|
||||
path: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs',
|
||||
tags: ['console'],
|
||||
})
|
||||
.input(z.object({ params: zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsPath }))
|
||||
.output(zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponse)
|
||||
|
||||
export const nodeOutputs2 = {
|
||||
get: get71,
|
||||
events: events2,
|
||||
byNodeId: byNodeId9,
|
||||
}
|
||||
|
||||
export const byRunId3 = {
|
||||
nodeOutputs: nodeOutputs2,
|
||||
}
|
||||
|
||||
export const runs2 = {
|
||||
byRunId: byRunId3,
|
||||
}
|
||||
|
||||
export const published = {
|
||||
runs: runs2,
|
||||
}
|
||||
|
||||
/**
|
||||
* Get webhook trigger for a node
|
||||
*
|
||||
@ -4160,7 +4423,7 @@ export const publish = {
|
||||
*
|
||||
* @deprecated
|
||||
*/
|
||||
export const get64 = oc
|
||||
export const get72 = oc
|
||||
.route({
|
||||
deprecated: true,
|
||||
description:
|
||||
@ -4181,7 +4444,7 @@ export const get64 = oc
|
||||
.output(zGetAppsByAppIdWorkflowsTriggersWebhookResponse)
|
||||
|
||||
export const webhook = {
|
||||
get: get64,
|
||||
get: get72,
|
||||
}
|
||||
|
||||
export const triggers2 = {
|
||||
@ -4279,7 +4542,7 @@ export const byWorkflowId = {
|
||||
*
|
||||
* @deprecated
|
||||
*/
|
||||
export const get65 = oc
|
||||
export const get73 = oc
|
||||
.route({
|
||||
deprecated: true,
|
||||
description:
|
||||
@ -4300,10 +4563,11 @@ export const get65 = oc
|
||||
.output(zGetAppsByAppIdWorkflowsResponse)
|
||||
|
||||
export const workflows3 = {
|
||||
get: get65,
|
||||
get: get73,
|
||||
defaultWorkflowBlockConfigs,
|
||||
draft: draft2,
|
||||
publish,
|
||||
published,
|
||||
triggers: triggers2,
|
||||
byWorkflowId,
|
||||
}
|
||||
@ -4336,7 +4600,7 @@ export const delete12 = oc
|
||||
*
|
||||
* @deprecated
|
||||
*/
|
||||
export const get66 = oc
|
||||
export const get74 = oc
|
||||
.route({
|
||||
deprecated: true,
|
||||
description:
|
||||
@ -4377,7 +4641,7 @@ export const put7 = oc
|
||||
|
||||
export const byAppId2 = {
|
||||
delete: delete12,
|
||||
get: get66,
|
||||
get: get74,
|
||||
put: put7,
|
||||
advancedChat,
|
||||
agentComposer,
|
||||
@ -4446,7 +4710,7 @@ export const byApiKeyId = {
|
||||
*
|
||||
* Get all API keys for an app
|
||||
*/
|
||||
export const get67 = oc
|
||||
export const get75 = oc
|
||||
.route({
|
||||
description: 'Get all API keys for an app',
|
||||
inputStructure: 'detailed',
|
||||
@ -4479,7 +4743,7 @@ export const post58 = oc
|
||||
.output(zPostAppsByResourceIdApiKeysResponse)
|
||||
|
||||
export const apiKeys = {
|
||||
get: get67,
|
||||
get: get75,
|
||||
post: post58,
|
||||
byApiKeyId,
|
||||
}
|
||||
@ -4495,7 +4759,7 @@ export const byResourceId = {
|
||||
*
|
||||
* @deprecated
|
||||
*/
|
||||
export const get68 = oc
|
||||
export const get76 = oc
|
||||
.route({
|
||||
deprecated: true,
|
||||
description:
|
||||
@ -4510,7 +4774,7 @@ export const get68 = oc
|
||||
.output(zGetAppsByServerIdServerRefreshResponse)
|
||||
|
||||
export const refresh = {
|
||||
get: get68,
|
||||
get: get76,
|
||||
}
|
||||
|
||||
export const server2 = {
|
||||
@ -4526,7 +4790,7 @@ export const byServerId = {
|
||||
*
|
||||
* Get list of applications with pagination and filtering
|
||||
*/
|
||||
export const get69 = oc
|
||||
export const get77 = oc
|
||||
.route({
|
||||
description: 'Get list of applications with pagination and filtering',
|
||||
inputStructure: 'detailed',
|
||||
@ -4565,7 +4829,7 @@ export const post59 = oc
|
||||
.output(zPostAppsResponse)
|
||||
|
||||
export const apps = {
|
||||
get: get69,
|
||||
get: get77,
|
||||
post: post59,
|
||||
imports,
|
||||
workflows,
|
||||
|
||||
@ -1430,9 +1430,7 @@ export type AgentSoulToolsConfig = {
|
||||
cli_tools?: Array<{
|
||||
[key: string]: unknown
|
||||
}>
|
||||
dify_tools?: Array<{
|
||||
[key: string]: unknown
|
||||
}>
|
||||
dify_tools?: Array<AgentSoulDifyToolConfig>
|
||||
}
|
||||
|
||||
export type DeclaredOutputConfig = {
|
||||
@ -1525,6 +1523,22 @@ export type AgentSoulModelCredentialRef = {
|
||||
type: string
|
||||
}
|
||||
|
||||
export type AgentSoulDifyToolConfig = {
|
||||
credential_ref?: AgentSoulDifyToolCredentialRef
|
||||
credential_type?: 'api-key' | 'oauth2' | 'unauthorized'
|
||||
description?: string | null
|
||||
enabled?: boolean
|
||||
name?: string | null
|
||||
plugin_id?: string | null
|
||||
provider?: string | null
|
||||
provider_id?: string | null
|
||||
provider_type?: string
|
||||
runtime_parameters?: {
|
||||
[key: string]: unknown
|
||||
}
|
||||
tool_name: string
|
||||
}
|
||||
|
||||
export type DeclaredArrayItem = {
|
||||
description?: string | null
|
||||
type: DeclaredOutputType
|
||||
@ -1562,6 +1576,12 @@ export type UserActionConfig = {
|
||||
|
||||
export type FormInputConfig = unknown
|
||||
|
||||
export type AgentSoulDifyToolCredentialRef = {
|
||||
id?: string | null
|
||||
provider?: string | null
|
||||
type?: 'provider' | 'tool'
|
||||
}
|
||||
|
||||
export type OutputErrorStrategy = 'default_value' | 'fail_branch' | 'stop'
|
||||
|
||||
export type DeclaredOutputRetryConfig = {
|
||||
@ -4750,6 +4770,122 @@ export type PostAppsByAppIdWorkflowsDraftRunResponses = {
|
||||
export type PostAppsByAppIdWorkflowsDraftRunResponse
|
||||
= PostAppsByAppIdWorkflowsDraftRunResponses[keyof PostAppsByAppIdWorkflowsDraftRunResponses]
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsData = {
|
||||
body?: never
|
||||
path: {
|
||||
app_id: string
|
||||
run_id: string
|
||||
}
|
||||
query?: never
|
||||
url: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs'
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsErrors = {
|
||||
404: {
|
||||
[key: string]: unknown
|
||||
}
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsError
|
||||
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsErrors[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsErrors]
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponses = {
|
||||
200: {
|
||||
[key: string]: unknown
|
||||
}
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponse
|
||||
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponses[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponses]
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsData = {
|
||||
body?: never
|
||||
path: {
|
||||
app_id: string
|
||||
run_id: string
|
||||
}
|
||||
query?: never
|
||||
url: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/events'
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsErrors = {
|
||||
404: {
|
||||
[key: string]: unknown
|
||||
}
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsError
|
||||
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsErrors[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsErrors]
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponses = {
|
||||
200: {
|
||||
[key: string]: unknown
|
||||
}
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponse
|
||||
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponses[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponses]
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdData = {
|
||||
body?: never
|
||||
path: {
|
||||
app_id: string
|
||||
node_id: string
|
||||
run_id: string
|
||||
}
|
||||
query?: never
|
||||
url: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/{node_id}'
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdErrors = {
|
||||
404: {
|
||||
[key: string]: unknown
|
||||
}
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdError
|
||||
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdErrors[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdErrors]
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponses = {
|
||||
200: {
|
||||
[key: string]: unknown
|
||||
}
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponse
|
||||
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponses[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponses]
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewData = {
|
||||
body?: never
|
||||
path: {
|
||||
app_id: string
|
||||
node_id: string
|
||||
output_name: string
|
||||
run_id: string
|
||||
}
|
||||
query?: never
|
||||
url: '/apps/{app_id}/workflows/draft/runs/{run_id}/node-outputs/{node_id}/{output_name}/preview'
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewErrors = {
|
||||
404: {
|
||||
[key: string]: unknown
|
||||
}
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewError
|
||||
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewErrors[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewErrors]
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponses
|
||||
= {
|
||||
200: {
|
||||
[key: string]: unknown
|
||||
}
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse
|
||||
= GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponses[keyof GetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponses]
|
||||
|
||||
export type GetAppsByAppIdWorkflowsDraftSystemVariablesData = {
|
||||
body?: never
|
||||
path: {
|
||||
@ -5006,6 +5142,124 @@ export type PostAppsByAppIdWorkflowsPublishResponses = {
|
||||
export type PostAppsByAppIdWorkflowsPublishResponse
|
||||
= PostAppsByAppIdWorkflowsPublishResponses[keyof PostAppsByAppIdWorkflowsPublishResponses]
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsData = {
|
||||
body?: never
|
||||
path: {
|
||||
app_id: string
|
||||
run_id: string
|
||||
}
|
||||
query?: never
|
||||
url: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs'
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsErrors = {
|
||||
404: {
|
||||
[key: string]: unknown
|
||||
}
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsError
|
||||
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsErrors[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsErrors]
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponses = {
|
||||
200: {
|
||||
[key: string]: unknown
|
||||
}
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponse
|
||||
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponses[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponses]
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsData = {
|
||||
body?: never
|
||||
path: {
|
||||
app_id: string
|
||||
run_id: string
|
||||
}
|
||||
query?: never
|
||||
url: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/events'
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsErrors = {
|
||||
404: {
|
||||
[key: string]: unknown
|
||||
}
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsError
|
||||
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsErrors[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsErrors]
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponses = {
|
||||
200: {
|
||||
[key: string]: unknown
|
||||
}
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponse
|
||||
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponses[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponses]
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdData = {
|
||||
body?: never
|
||||
path: {
|
||||
app_id: string
|
||||
node_id: string
|
||||
run_id: string
|
||||
}
|
||||
query?: never
|
||||
url: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/{node_id}'
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdErrors = {
|
||||
404: {
|
||||
[key: string]: unknown
|
||||
}
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdError
|
||||
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdErrors[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdErrors]
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponses = {
|
||||
200: {
|
||||
[key: string]: unknown
|
||||
}
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponse
|
||||
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponses[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponses]
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewData
|
||||
= {
|
||||
body?: never
|
||||
path: {
|
||||
app_id: string
|
||||
node_id: string
|
||||
output_name: string
|
||||
run_id: string
|
||||
}
|
||||
query?: never
|
||||
url: '/apps/{app_id}/workflows/published/runs/{run_id}/node-outputs/{node_id}/{output_name}/preview'
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewErrors
|
||||
= {
|
||||
404: {
|
||||
[key: string]: unknown
|
||||
}
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewError
|
||||
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewErrors[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewErrors]
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponses
|
||||
= {
|
||||
200: {
|
||||
[key: string]: unknown
|
||||
}
|
||||
}
|
||||
|
||||
export type GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse
|
||||
= GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponses[keyof GetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponses]
|
||||
|
||||
export type GetAppsByAppIdWorkflowsTriggersWebhookData = {
|
||||
body?: never
|
||||
path: {
|
||||
|
||||
@ -1533,14 +1533,6 @@ export const zAgentSoulSkillsFilesConfig = z.object({
|
||||
skills: z.array(z.record(z.string(), z.unknown())).optional(),
|
||||
})
|
||||
|
||||
/**
|
||||
* AgentSoulToolsConfig
|
||||
*/
|
||||
export const zAgentSoulToolsConfig = z.object({
|
||||
cli_tools: z.array(z.record(z.string(), z.unknown())).optional(),
|
||||
dify_tools: z.array(z.record(z.string(), z.unknown())).optional(),
|
||||
})
|
||||
|
||||
/**
|
||||
* WorkflowNodeJobMode
|
||||
*/
|
||||
@ -1771,25 +1763,6 @@ export const zAgentSoulModelConfig = z.object({
|
||||
plugin_id: z.string().min(1).max(255),
|
||||
})
|
||||
|
||||
/**
|
||||
* AgentSoulConfig
|
||||
*/
|
||||
export const zAgentSoulConfig = z.object({
|
||||
app_features: z.record(z.string(), z.unknown()).optional(),
|
||||
app_variables: z.array(zAppVariableConfig).optional(),
|
||||
env: zAgentSoulEnvConfig.optional(),
|
||||
human: zAgentSoulHumanConfig.optional(),
|
||||
knowledge: zAgentSoulKnowledgeConfig.optional(),
|
||||
memory: zAgentSoulMemoryConfig.optional(),
|
||||
misc_legacy: z.record(z.string(), z.unknown()).optional(),
|
||||
model: zAgentSoulModelConfig.optional(),
|
||||
prompt: zAgentSoulPromptConfig.optional(),
|
||||
sandbox: zAgentSoulSandboxConfig.optional(),
|
||||
schema_version: z.int().optional().default(1),
|
||||
skills_files: zAgentSoulSkillsFilesConfig.optional(),
|
||||
tools: zAgentSoulToolsConfig.optional(),
|
||||
})
|
||||
|
||||
/**
|
||||
* DeclaredOutputCheckConfig
|
||||
*
|
||||
@ -1842,6 +1815,72 @@ export const zDeclaredArrayItem = z.object({
|
||||
|
||||
export const zFormInputConfig = z.unknown()
|
||||
|
||||
/**
|
||||
* AgentSoulDifyToolCredentialRef
|
||||
*
|
||||
* Reference to a stored Dify Plugin Tool credential.
|
||||
*
|
||||
* Secret values are resolved only at runtime. The legacy ``credential_id``
|
||||
* field is accepted by :class:`AgentSoulDifyToolConfig` and normalized here so
|
||||
* old Agent tool payloads can be read while new payloads stay explicit.
|
||||
*/
|
||||
export const zAgentSoulDifyToolCredentialRef = z.object({
|
||||
id: z.string().max(255).nullish(),
|
||||
provider: z.string().max(255).nullish(),
|
||||
type: z.enum(['provider', 'tool']).optional().default('tool'),
|
||||
})
|
||||
|
||||
/**
|
||||
* AgentSoulDifyToolConfig
|
||||
*
|
||||
* One Dify Plugin Tool configured on Agent Soul.
|
||||
*
|
||||
* The API backend prepares this persisted product shape into
|
||||
* ``DifyPluginToolConfig`` before sending a run request to Agent backend.
|
||||
* ``provider_id`` keeps compatibility with existing Agent tool config payloads;
|
||||
* new callers should send ``plugin_id`` + ``provider`` when available.
|
||||
*/
|
||||
export const zAgentSoulDifyToolConfig = z.object({
|
||||
credential_ref: zAgentSoulDifyToolCredentialRef.optional(),
|
||||
credential_type: z.enum(['api-key', 'oauth2', 'unauthorized']).optional().default('api-key'),
|
||||
description: z.string().nullish(),
|
||||
enabled: z.boolean().optional().default(true),
|
||||
name: z.string().max(255).nullish(),
|
||||
plugin_id: z.string().max(255).nullish(),
|
||||
provider: z.string().max(255).nullish(),
|
||||
provider_id: z.string().max(255).nullish(),
|
||||
provider_type: z.string().optional().default('plugin'),
|
||||
runtime_parameters: z.record(z.string(), z.unknown()).optional(),
|
||||
tool_name: z.string().min(1).max(255),
|
||||
})
|
||||
|
||||
/**
|
||||
* AgentSoulToolsConfig
|
||||
*/
|
||||
export const zAgentSoulToolsConfig = z.object({
|
||||
cli_tools: z.array(z.record(z.string(), z.unknown())).optional(),
|
||||
dify_tools: z.array(zAgentSoulDifyToolConfig).optional(),
|
||||
})
|
||||
|
||||
/**
|
||||
* AgentSoulConfig
|
||||
*/
|
||||
export const zAgentSoulConfig = z.object({
|
||||
app_features: z.record(z.string(), z.unknown()).optional(),
|
||||
app_variables: z.array(zAppVariableConfig).optional(),
|
||||
env: zAgentSoulEnvConfig.optional(),
|
||||
human: zAgentSoulHumanConfig.optional(),
|
||||
knowledge: zAgentSoulKnowledgeConfig.optional(),
|
||||
memory: zAgentSoulMemoryConfig.optional(),
|
||||
misc_legacy: z.record(z.string(), z.unknown()).optional(),
|
||||
model: zAgentSoulModelConfig.optional(),
|
||||
prompt: zAgentSoulPromptConfig.optional(),
|
||||
sandbox: zAgentSoulSandboxConfig.optional(),
|
||||
schema_version: z.int().optional().default(1),
|
||||
skills_files: zAgentSoulSkillsFilesConfig.optional(),
|
||||
tools: zAgentSoulToolsConfig.optional(),
|
||||
})
|
||||
|
||||
/**
|
||||
* OutputErrorStrategy
|
||||
*
|
||||
@ -3834,6 +3873,60 @@ export const zPostAppsByAppIdWorkflowsDraftRunPath = z.object({
|
||||
*/
|
||||
export const zPostAppsByAppIdWorkflowsDraftRunResponse = z.record(z.string(), z.unknown())
|
||||
|
||||
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsPath = z.object({
|
||||
app_id: z.string(),
|
||||
run_id: z.string(),
|
||||
})
|
||||
|
||||
/**
|
||||
* Success
|
||||
*/
|
||||
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsResponse = z.record(
|
||||
z.string(),
|
||||
z.unknown(),
|
||||
)
|
||||
|
||||
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsPath = z.object({
|
||||
app_id: z.string(),
|
||||
run_id: z.string(),
|
||||
})
|
||||
|
||||
/**
|
||||
* Success
|
||||
*/
|
||||
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsEventsResponse = z.record(
|
||||
z.string(),
|
||||
z.unknown(),
|
||||
)
|
||||
|
||||
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdPath = z.object({
|
||||
app_id: z.string(),
|
||||
node_id: z.string(),
|
||||
run_id: z.string(),
|
||||
})
|
||||
|
||||
/**
|
||||
* Success
|
||||
*/
|
||||
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdResponse = z.record(
|
||||
z.string(),
|
||||
z.unknown(),
|
||||
)
|
||||
|
||||
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewPath
|
||||
= z.object({
|
||||
app_id: z.string(),
|
||||
node_id: z.string(),
|
||||
output_name: z.string(),
|
||||
run_id: z.string(),
|
||||
})
|
||||
|
||||
/**
|
||||
* Success
|
||||
*/
|
||||
export const zGetAppsByAppIdWorkflowsDraftRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse
|
||||
= z.record(z.string(), z.unknown())
|
||||
|
||||
export const zGetAppsByAppIdWorkflowsDraftSystemVariablesPath = z.object({
|
||||
app_id: z.string(),
|
||||
})
|
||||
@ -3954,6 +4047,60 @@ export const zPostAppsByAppIdWorkflowsPublishPath = z.object({
|
||||
*/
|
||||
export const zPostAppsByAppIdWorkflowsPublishResponse = z.record(z.string(), z.unknown())
|
||||
|
||||
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsPath = z.object({
|
||||
app_id: z.string(),
|
||||
run_id: z.string(),
|
||||
})
|
||||
|
||||
/**
|
||||
* Success
|
||||
*/
|
||||
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsResponse = z.record(
|
||||
z.string(),
|
||||
z.unknown(),
|
||||
)
|
||||
|
||||
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsPath = z.object({
|
||||
app_id: z.string(),
|
||||
run_id: z.string(),
|
||||
})
|
||||
|
||||
/**
|
||||
* Success
|
||||
*/
|
||||
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsEventsResponse = z.record(
|
||||
z.string(),
|
||||
z.unknown(),
|
||||
)
|
||||
|
||||
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdPath = z.object({
|
||||
app_id: z.string(),
|
||||
node_id: z.string(),
|
||||
run_id: z.string(),
|
||||
})
|
||||
|
||||
/**
|
||||
* Success
|
||||
*/
|
||||
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdResponse = z.record(
|
||||
z.string(),
|
||||
z.unknown(),
|
||||
)
|
||||
|
||||
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewPath
|
||||
= z.object({
|
||||
app_id: z.string(),
|
||||
node_id: z.string(),
|
||||
output_name: z.string(),
|
||||
run_id: z.string(),
|
||||
})
|
||||
|
||||
/**
|
||||
* Success
|
||||
*/
|
||||
export const zGetAppsByAppIdWorkflowsPublishedRunsByRunIdNodeOutputsByNodeIdByOutputNamePreviewResponse
|
||||
= z.record(z.string(), z.unknown())
|
||||
|
||||
export const zGetAppsByAppIdWorkflowsTriggersWebhookPath = z.object({
|
||||
app_id: z.string(),
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user