From 689e8353676b329bfd19e3c7ba2b53dfd0c84254 Mon Sep 17 00:00:00 2001 From: Yansong Zhang <916125788@qq.com> Date: Tue, 19 May 2026 11:18:58 +0800 Subject: [PATCH] feat: add agent roster composer APIs --- api/controllers/console/__init__.py | 4 + api/controllers/console/agent/__init__.py | 3 + api/controllers/console/agent/composer.py | 153 ++++ api/controllers/console/agent/roster.py | 124 +++ ...c421_add_agent_config_version_revisions.py | 75 ++ api/models/__init__.py | 4 + api/models/agent.py | 63 ++ api/services/agent/__init__.py | 4 + api/services/agent/composer_service.py | 773 ++++++++++++++++++ api/services/agent/composer_validator.py | 71 ++ api/services/agent/errors.py | 29 + api/services/agent/roster_service.py | 333 ++++++++ api/services/entities/agent_entities.py | 222 +++++ api/tests/unit_tests/models/test_agent.py | 32 + .../agent/test_agent_composer_entities.py | 144 ++++ 15 files changed, 2034 insertions(+) create mode 100644 api/controllers/console/agent/__init__.py create mode 100644 api/controllers/console/agent/composer.py create mode 100644 api/controllers/console/agent/roster.py create mode 100644 api/migrations/versions/2026_05_19_1000-f8b6b7e9c421_add_agent_config_version_revisions.py create mode 100644 api/services/agent/__init__.py create mode 100644 api/services/agent/composer_service.py create mode 100644 api/services/agent/composer_validator.py create mode 100644 api/services/agent/errors.py create mode 100644 api/services/agent/roster_service.py create mode 100644 api/services/entities/agent_entities.py create mode 100644 api/tests/unit_tests/services/agent/test_agent_composer_entities.py diff --git a/api/controllers/console/__init__.py b/api/controllers/console/__init__.py index f5aeb17ba2..102d7c1a88 100644 --- a/api/controllers/console/__init__.py +++ b/api/controllers/console/__init__.py @@ -44,6 +44,8 @@ from . import ( spec, version, ) +from .agent import composer as agent_composer +from .agent import roster as agent_roster # Import app controllers from .app import ( @@ -143,7 +145,9 @@ __all__ = [ "activate", "advanced_prompt_template", "agent", + "agent_composer", "agent_providers", + "agent_roster", "annotation", "api", "apikey", diff --git a/api/controllers/console/agent/__init__.py b/api/controllers/console/agent/__init__.py new file mode 100644 index 0000000000..88b955dcba --- /dev/null +++ b/api/controllers/console/agent/__init__.py @@ -0,0 +1,3 @@ +from . import composer, roster + +__all__ = ["composer", "roster"] diff --git a/api/controllers/console/agent/composer.py b/api/controllers/console/agent/composer.py new file mode 100644 index 0000000000..2fb262ed93 --- /dev/null +++ b/api/controllers/console/agent/composer.py @@ -0,0 +1,153 @@ +from flask_restx import Resource + +from controllers.common.schema import register_schema_models +from controllers.console import console_ns +from controllers.console.app.wraps import get_app_model +from controllers.console.wraps import account_initialization_required, edit_permission_required, setup_required +from libs.login import current_account_with_tenant, login_required +from models.model import AppMode +from services.agent.composer_service import AgentComposerService +from services.agent.composer_validator import ComposerConfigValidator +from services.entities.agent_entities import ComposerSavePayload + +register_schema_models(console_ns, ComposerSavePayload) + + +@console_ns.route("/apps//workflows/draft/nodes//agent-composer") +class WorkflowAgentComposerApi(Resource): + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.WORKFLOW, AppMode.ADVANCED_CHAT]) + def get(self, app_model, node_id: str): + _, tenant_id = current_account_with_tenant() + return AgentComposerService.load_workflow_composer( + tenant_id=tenant_id, + app_id=app_model.id, + node_id=node_id, + ) + + @console_ns.expect(console_ns.models[ComposerSavePayload.__name__]) + @setup_required + @login_required + @account_initialization_required + @edit_permission_required + @get_app_model(mode=[AppMode.WORKFLOW, AppMode.ADVANCED_CHAT]) + def put(self, app_model, node_id: str): + account, tenant_id = current_account_with_tenant() + payload = ComposerSavePayload.model_validate(console_ns.payload or {}) + return AgentComposerService.save_workflow_composer( + tenant_id=tenant_id, + app_id=app_model.id, + node_id=node_id, + account_id=account.id, + payload=payload, + ) + + +@console_ns.route("/apps//workflows/draft/nodes//agent-composer/validate") +class WorkflowAgentComposerValidateApi(Resource): + @console_ns.expect(console_ns.models[ComposerSavePayload.__name__]) + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.WORKFLOW, AppMode.ADVANCED_CHAT]) + def post(self, app_model, node_id: str): + payload = ComposerSavePayload.model_validate(console_ns.payload or {}) + ComposerConfigValidator.validate_save_payload(payload) + return {"result": "success", "errors": []} + + +@console_ns.route("/apps//workflows/draft/nodes//agent-composer/candidates") +class WorkflowAgentComposerCandidatesApi(Resource): + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.WORKFLOW, AppMode.ADVANCED_CHAT]) + def get(self, app_model, node_id: str): + return AgentComposerService.get_workflow_candidates(app_id=app_model.id) + + +@console_ns.route("/apps//workflows/draft/nodes//agent-composer/impact") +class WorkflowAgentComposerImpactApi(Resource): + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.WORKFLOW, AppMode.ADVANCED_CHAT]) + def post(self, app_model, node_id: str): + _, tenant_id = current_account_with_tenant() + payload = ComposerSavePayload.model_validate(console_ns.payload or {}) + version_id = payload.binding.agent_config_version_id if payload.binding else None + if not version_id: + return {"agent_config_version_id": None, "workflow_node_count": 0, "bindings": []} + return AgentComposerService.calculate_impact(tenant_id=tenant_id, agent_config_version_id=version_id) + + +@console_ns.route("/apps//workflows/draft/nodes//agent-composer/save-to-roster") +class WorkflowAgentComposerSaveToRosterApi(Resource): + @console_ns.expect(console_ns.models[ComposerSavePayload.__name__]) + @setup_required + @login_required + @account_initialization_required + @edit_permission_required + @get_app_model(mode=[AppMode.WORKFLOW, AppMode.ADVANCED_CHAT]) + def post(self, app_model, node_id: str): + account, tenant_id = current_account_with_tenant() + payload = ComposerSavePayload.model_validate(console_ns.payload or {}) + return AgentComposerService.save_workflow_composer( + tenant_id=tenant_id, + app_id=app_model.id, + node_id=node_id, + account_id=account.id, + payload=payload, + ) + + +@console_ns.route("/apps//agent-composer") +class AgentAppComposerApi(Resource): + @setup_required + @login_required + @account_initialization_required + @get_app_model() + def get(self, app_model): + _, tenant_id = current_account_with_tenant() + return AgentComposerService.load_agent_app_composer(tenant_id=tenant_id, app_id=app_model.id) + + @console_ns.expect(console_ns.models[ComposerSavePayload.__name__]) + @setup_required + @login_required + @account_initialization_required + @edit_permission_required + @get_app_model() + def put(self, app_model): + account, tenant_id = current_account_with_tenant() + payload = ComposerSavePayload.model_validate(console_ns.payload or {}) + return AgentComposerService.save_agent_app_composer( + tenant_id=tenant_id, + app_id=app_model.id, + account_id=account.id, + payload=payload, + ) + + +@console_ns.route("/apps//agent-composer/validate") +class AgentAppComposerValidateApi(Resource): + @console_ns.expect(console_ns.models[ComposerSavePayload.__name__]) + @setup_required + @login_required + @account_initialization_required + @get_app_model() + def post(self, app_model): + payload = ComposerSavePayload.model_validate(console_ns.payload or {}) + ComposerConfigValidator.validate_save_payload(payload) + return {"result": "success", "errors": []} + + +@console_ns.route("/apps//agent-composer/candidates") +class AgentAppComposerCandidatesApi(Resource): + @setup_required + @login_required + @account_initialization_required + @get_app_model() + def get(self, app_model): + return AgentComposerService.get_agent_app_candidates(app_id=app_model.id) diff --git a/api/controllers/console/agent/roster.py b/api/controllers/console/agent/roster.py new file mode 100644 index 0000000000..e1d2ecee27 --- /dev/null +++ b/api/controllers/console/agent/roster.py @@ -0,0 +1,124 @@ +from flask import request +from flask_restx import Resource +from pydantic import BaseModel, Field + +from controllers.common.schema import register_schema_models +from controllers.console import console_ns +from controllers.console.wraps import account_initialization_required, edit_permission_required, setup_required +from libs.login import current_account_with_tenant, login_required +from services.agent.roster_service import AgentRosterService +from services.entities.agent_entities import RosterAgentCreatePayload, RosterAgentUpdatePayload, RosterListQuery + + +class AgentInviteOptionsQuery(RosterListQuery): + app_id: str | None = Field(default=None, description="Workflow app id for in-current-workflow markers") + + +class AgentIdPath(BaseModel): + agent_id: str + + +register_schema_models( + console_ns, + AgentInviteOptionsQuery, + AgentIdPath, + RosterAgentCreatePayload, + RosterAgentUpdatePayload, + RosterListQuery, +) + + +@console_ns.route("/agents") +class AgentRosterListApi(Resource): + @setup_required + @login_required + @account_initialization_required + def get(self): + _, tenant_id = current_account_with_tenant() + query = RosterListQuery.model_validate(request.args.to_dict(flat=True)) + return AgentRosterService.list_roster_agents( + tenant_id=tenant_id, page=query.page, limit=query.limit, keyword=query.keyword + ) + + @console_ns.expect(console_ns.models[RosterAgentCreatePayload.__name__]) + @setup_required + @login_required + @account_initialization_required + @edit_permission_required + def post(self): + account, tenant_id = current_account_with_tenant() + payload = RosterAgentCreatePayload.model_validate(console_ns.payload or {}) + agent = AgentRosterService.create_roster_agent(tenant_id=tenant_id, account_id=account.id, payload=payload) + return AgentRosterService.get_roster_agent_detail(tenant_id=tenant_id, agent_id=agent.id), 201 + + +@console_ns.route("/agents/invite-options") +class AgentInviteOptionsApi(Resource): + @setup_required + @login_required + @account_initialization_required + def get(self): + _, tenant_id = current_account_with_tenant() + query = AgentInviteOptionsQuery.model_validate(request.args.to_dict(flat=True)) + return AgentRosterService.list_invite_options( + tenant_id=tenant_id, + page=query.page, + limit=query.limit, + keyword=query.keyword, + app_id=query.app_id, + ) + + +@console_ns.route("/agents/") +class AgentRosterDetailApi(Resource): + @setup_required + @login_required + @account_initialization_required + def get(self, agent_id): + _, tenant_id = current_account_with_tenant() + return AgentRosterService.get_roster_agent_detail(tenant_id=tenant_id, agent_id=str(agent_id)) + + @console_ns.expect(console_ns.models[RosterAgentUpdatePayload.__name__]) + @setup_required + @login_required + @account_initialization_required + @edit_permission_required + def patch(self, agent_id): + account, tenant_id = current_account_with_tenant() + payload = RosterAgentUpdatePayload.model_validate(console_ns.payload or {}) + return AgentRosterService.update_roster_agent( + tenant_id=tenant_id, agent_id=str(agent_id), account_id=account.id, payload=payload + ) + + @setup_required + @login_required + @account_initialization_required + @edit_permission_required + def delete(self, agent_id): + account, tenant_id = current_account_with_tenant() + AgentRosterService.archive_roster_agent(tenant_id=tenant_id, agent_id=str(agent_id), account_id=account.id) + return "", 204 + + +@console_ns.route("/agents//versions") +class AgentRosterVersionsApi(Resource): + @setup_required + @login_required + @account_initialization_required + def get(self, agent_id): + _, tenant_id = current_account_with_tenant() + return {"data": AgentRosterService.list_agent_versions(tenant_id=tenant_id, agent_id=str(agent_id))} + + +@console_ns.route("/agents//versions/") +class AgentRosterVersionDetailApi(Resource): + @setup_required + @login_required + @account_initialization_required + def get(self, agent_id, version_id): + _, tenant_id = current_account_with_tenant() + return AgentRosterService.get_agent_version_detail( + tenant_id=tenant_id, + agent_id=str(agent_id), + version_id=str(version_id), + ) diff --git a/api/migrations/versions/2026_05_19_1000-f8b6b7e9c421_add_agent_config_version_revisions.py b/api/migrations/versions/2026_05_19_1000-f8b6b7e9c421_add_agent_config_version_revisions.py new file mode 100644 index 0000000000..09f87e0b28 --- /dev/null +++ b/api/migrations/versions/2026_05_19_1000-f8b6b7e9c421_add_agent_config_version_revisions.py @@ -0,0 +1,75 @@ +"""add agent config version revisions + +Revision ID: f8b6b7e9c421 +Revises: c6a9f4b12d3e +Create Date: 2026-05-19 10:00:00.000000 + +""" + +import sqlalchemy as sa +from alembic import op + +import models + +# revision identifiers, used by Alembic. +revision = "f8b6b7e9c421" +down_revision = "c6a9f4b12d3e" +branch_labels = None +depends_on = None + + +def _is_pg(conn) -> bool: + return conn.dialect.name == "postgresql" + + +def _uuid_column(name: str, *, nullable: bool = False, primary_key: bool = False) -> sa.Column: + kwargs = {"nullable": nullable, "primary_key": primary_key} + if primary_key and _is_pg(op.get_bind()): + kwargs["server_default"] = sa.text("uuidv7()") + return sa.Column(name, models.types.StringUUID(), **kwargs) + + +def upgrade(): + op.create_table( + "agent_config_version_revisions", + _uuid_column("id", primary_key=True), + sa.Column("tenant_id", models.types.StringUUID(), nullable=False), + sa.Column("agent_id", models.types.StringUUID(), nullable=False), + sa.Column("agent_config_version_id", models.types.StringUUID(), nullable=False), + sa.Column("revision", sa.Integer(), nullable=False), + sa.Column("operation", sa.String(length=64), nullable=False), + sa.Column("config_snapshot", models.types.LongText(), nullable=False), + sa.Column("previous_config_snapshot", models.types.LongText(), nullable=True), + sa.Column("summary", models.types.LongText(), nullable=True), + sa.Column("version_note", models.types.LongText(), nullable=True), + sa.Column("created_by", models.types.StringUUID(), nullable=True), + sa.Column("created_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False), + sa.PrimaryKeyConstraint("id", name=op.f("agent_config_version_revision_pkey")), + sa.UniqueConstraint( + "agent_config_version_id", + "revision", + name=op.f("agent_config_version_revision_version_revision_unique"), + ), + ) + op.create_index( + "agent_config_version_revision_tenant_agent_created_at_idx", + "agent_config_version_revisions", + ["tenant_id", "agent_id", "created_at"], + ) + op.create_index( + "agent_config_version_revision_tenant_version_created_at_idx", + "agent_config_version_revisions", + ["tenant_id", "agent_config_version_id", "created_at"], + ) + + +def downgrade(): + op.drop_index( + "agent_config_version_revision_tenant_version_created_at_idx", + table_name="agent_config_version_revisions", + ) + op.drop_index( + "agent_config_version_revision_tenant_agent_created_at_idx", + table_name="agent_config_version_revisions", + ) + op.drop_table("agent_config_version_revisions") diff --git a/api/models/__init__.py b/api/models/__init__.py index e2cad5b606..a39b200886 100644 --- a/api/models/__init__.py +++ b/api/models/__init__.py @@ -11,6 +11,8 @@ from .account import ( from .agent import ( Agent, AgentConfigVersion, + AgentConfigVersionOperation, + AgentConfigVersionRevision, AgentKind, AgentScope, AgentSource, @@ -137,6 +139,8 @@ __all__ = [ "AccountTrialAppRecord", "Agent", "AgentConfigVersion", + "AgentConfigVersionOperation", + "AgentConfigVersionRevision", "AgentKind", "AgentScope", "AgentSource", diff --git a/api/models/agent.py b/api/models/agent.py index 7695890f3a..39d9e959d8 100644 --- a/api/models/agent.py +++ b/api/models/agent.py @@ -35,6 +35,14 @@ class AgentStatus(StrEnum): ARCHIVED = "archived" +class AgentConfigVersionOperation(StrEnum): + CREATE_VERSION = "create_version" + SAVE_CURRENT_VERSION = "save_current_version" + SAVE_NEW_VERSION = "save_new_version" + SAVE_NEW_AGENT = "save_new_agent" + SAVE_TO_ROSTER = "save_to_roster" + + class WorkflowAgentBindingType(StrEnum): ROSTER_AGENT = "roster_agent" INLINE_AGENT = "inline_agent" @@ -118,6 +126,61 @@ class AgentConfigVersion(Base): return json.loads(self.config_snapshot) if self.config_snapshot else {} +class AgentConfigVersionRevision(Base): + """Audit snapshot for every Agent Soul save operation. + + ``AgentConfigVersion`` represents a semantic version that workflow bindings + can reference. Revisions record mutable saves against that semantic version, + especially ``Save to Current Version`` where the version id must stay stable. + JSON fields are stored as ``LongText`` and must not use DB server defaults. + """ + + __tablename__ = "agent_config_version_revisions" + __table_args__ = ( + sa.PrimaryKeyConstraint("id", name="agent_config_version_revision_pkey"), + UniqueConstraint( + "agent_config_version_id", + "revision", + name="agent_config_version_revision_version_revision_unique", + ), + Index("agent_config_version_revision_tenant_agent_created_at_idx", "tenant_id", "agent_id", "created_at"), + Index( + "agent_config_version_revision_tenant_version_created_at_idx", + "tenant_id", + "agent_config_version_id", + "created_at", + ), + ) + + id: Mapped[str] = mapped_column(StringUUID, primary_key=True, default=lambda: str(uuidv7())) + tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + agent_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + agent_config_version_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + revision: Mapped[int] = mapped_column(sa.Integer, nullable=False) + operation: Mapped[AgentConfigVersionOperation] = mapped_column( + EnumText(AgentConfigVersionOperation, length=64), nullable=False + ) + config_snapshot: Mapped[str] = mapped_column(LongText, nullable=False) + previous_config_snapshot: Mapped[str | None] = mapped_column(LongText, nullable=True) + summary: Mapped[str | None] = mapped_column(LongText, nullable=True) + version_note: Mapped[str | None] = mapped_column(LongText, nullable=True) + created_by: Mapped[str | None] = mapped_column(StringUUID, nullable=True) + created_at: Mapped[datetime] = mapped_column( + DateTime, + nullable=False, + default=naive_utc_now, + server_default=func.current_timestamp(), + ) + + @property + def config_snapshot_dict(self) -> dict[str, Any]: + return json.loads(self.config_snapshot) if self.config_snapshot else {} + + @property + def previous_config_snapshot_dict(self) -> dict[str, Any] | None: + return json.loads(self.previous_config_snapshot) if self.previous_config_snapshot else None + + class WorkflowAgentNodeBinding(DefaultFieldsMixin, Base): """Binding between one workflow node and one Agent config version. diff --git a/api/services/agent/__init__.py b/api/services/agent/__init__.py new file mode 100644 index 0000000000..c3a5ebaec4 --- /dev/null +++ b/api/services/agent/__init__.py @@ -0,0 +1,4 @@ +from .composer_service import AgentComposerService +from .roster_service import AgentRosterService + +__all__ = ["AgentComposerService", "AgentRosterService"] diff --git a/api/services/agent/composer_service.py b/api/services/agent/composer_service.py new file mode 100644 index 0000000000..ac6444ab77 --- /dev/null +++ b/api/services/agent/composer_service.py @@ -0,0 +1,773 @@ +from typing import Any + +from sqlalchemy import func, select +from sqlalchemy.exc import IntegrityError + +from extensions.ext_database import db +from models.agent import ( + Agent, + AgentConfigVersion, + AgentConfigVersionOperation, + AgentConfigVersionRevision, + AgentKind, + AgentScope, + AgentSource, + AgentStatus, + WorkflowAgentBindingType, + WorkflowAgentNodeBinding, +) +from models.workflow import Workflow +from services.agent.composer_validator import ComposerConfigValidator +from services.agent.errors import AgentNameConflictError, AgentNotFoundError, AgentVersionNotFoundError +from services.agent.roster_service import _json_dump +from services.entities.agent_entities import ( + AgentSoulConfig, + ComposerCandidatesResponse, + ComposerSavePayload, + ComposerSaveStrategy, + ComposerVariant, + WorkflowNodeJobConfig, +) + + +class AgentComposerService: + @classmethod + def load_workflow_composer(cls, *, tenant_id: str, app_id: str, node_id: str) -> dict[str, Any]: + workflow = cls._get_draft_workflow(tenant_id=tenant_id, app_id=app_id) + binding = cls._get_workflow_binding(tenant_id=tenant_id, workflow_id=workflow.id, node_id=node_id) + if not binding: + return cls._empty_workflow_state(app_id=app_id, workflow_id=workflow.id, node_id=node_id) + + agent = cls._get_agent_if_present(tenant_id=tenant_id, agent_id=binding.agent_id) + version = cls._get_version_if_present( + tenant_id=tenant_id, + agent_id=agent.id if agent else None, + version_id=binding.agent_config_version_id, + ) + return cls._serialize_workflow_state(binding=binding, agent=agent, version=version) + + @classmethod + def save_workflow_composer( + cls, *, tenant_id: str, app_id: str, node_id: str, account_id: str, payload: ComposerSavePayload + ) -> dict[str, Any]: + if payload.variant != ComposerVariant.WORKFLOW: + raise ValueError("Workflow composer endpoint only accepts workflow variant") + + ComposerConfigValidator.validate_save_payload(payload) + workflow = cls._get_draft_workflow(tenant_id=tenant_id, app_id=app_id) + binding = cls._get_workflow_binding(tenant_id=tenant_id, workflow_id=workflow.id, node_id=node_id) + + match payload.save_strategy: + case ComposerSaveStrategy.NODE_JOB_ONLY: + binding = cls._save_node_job_only( + tenant_id=tenant_id, + app_id=app_id, + workflow_id=workflow.id, + node_id=node_id, + account_id=account_id, + binding=binding, + payload=payload, + ) + case ComposerSaveStrategy.SAVE_TO_CURRENT_VERSION: + binding = cls._save_to_current_version( + tenant_id=tenant_id, account_id=account_id, binding=binding, payload=payload + ) + case ComposerSaveStrategy.SAVE_AS_NEW_VERSION: + binding = cls._save_as_new_version( + tenant_id=tenant_id, account_id=account_id, binding=binding, payload=payload + ) + case ComposerSaveStrategy.SAVE_AS_NEW_AGENT: + binding = cls._save_as_new_agent( + tenant_id=tenant_id, + app_id=app_id, + workflow_id=workflow.id, + node_id=node_id, + account_id=account_id, + binding=binding, + payload=payload, + ) + case ComposerSaveStrategy.SAVE_TO_ROSTER: + binding = cls._save_to_roster( + tenant_id=tenant_id, account_id=account_id, binding=binding, payload=payload + ) + + db.session.commit() + agent = cls._get_agent_if_present(tenant_id=tenant_id, agent_id=binding.agent_id) + version = cls._get_version_if_present( + tenant_id=tenant_id, + agent_id=agent.id if agent else None, + version_id=binding.agent_config_version_id, + ) + return cls._serialize_workflow_state(binding=binding, agent=agent, version=version) + + @classmethod + def load_agent_app_composer(cls, *, tenant_id: str, app_id: str) -> dict[str, Any]: + agent = db.session.scalar( + select(Agent) + .where( + Agent.tenant_id == tenant_id, + Agent.app_id == app_id, + Agent.scope == AgentScope.ROSTER, + Agent.status == AgentStatus.ACTIVE, + ) + .order_by(Agent.created_at.desc()) + .limit(1) + ) + if not agent: + raise AgentNotFoundError() + version = cls._require_version( + tenant_id=tenant_id, agent_id=agent.id, version_id=agent.active_config_version_id + ) + return { + "variant": ComposerVariant.AGENT_APP.value, + "agent": cls._serialize_agent(agent), + "active_config_version": cls._serialize_version(version), + "agent_soul": version.config_snapshot_dict, + "save_options": [ + ComposerSaveStrategy.SAVE_TO_CURRENT_VERSION.value, + ComposerSaveStrategy.SAVE_AS_NEW_VERSION.value, + ], + } + + @classmethod + def save_agent_app_composer( + cls, *, tenant_id: str, app_id: str, account_id: str, payload: ComposerSavePayload + ) -> dict[str, Any]: + if payload.variant != ComposerVariant.AGENT_APP: + raise ValueError("Agent App composer endpoint only accepts agent_app variant") + ComposerConfigValidator.validate_save_payload(payload) + if payload.agent_soul is None: + raise ValueError("agent_soul is required") + + agent = db.session.scalar( + select(Agent) + .where( + Agent.tenant_id == tenant_id, + Agent.app_id == app_id, + Agent.scope == AgentScope.ROSTER, + Agent.status == AgentStatus.ACTIVE, + ) + .order_by(Agent.created_at.desc()) + .limit(1) + ) + if not agent: + agent = Agent( + tenant_id=tenant_id, + name=payload.new_agent_name or "Untitled Agent", + description="", + agent_kind=AgentKind.DIFY_AGENT, + scope=AgentScope.ROSTER, + source=AgentSource.AGENT_APP, + app_id=app_id, + status=AgentStatus.ACTIVE, + created_by=account_id, + updated_by=account_id, + ) + db.session.add(agent) + try: + db.session.flush() + except IntegrityError as exc: + db.session.rollback() + raise AgentNameConflictError() from exc + + if payload.save_strategy == ComposerSaveStrategy.SAVE_AS_NEW_VERSION or not agent.active_config_version_id: + version = cls._create_config_version( + tenant_id=tenant_id, + agent_id=agent.id, + account_id=account_id, + agent_soul=payload.agent_soul, + operation=AgentConfigVersionOperation.SAVE_NEW_VERSION, + version_note=payload.version_note, + ) + agent.active_config_version_id = version.id + else: + version = cls._require_version( + tenant_id=tenant_id, agent_id=agent.id, version_id=agent.active_config_version_id + ) + cls._update_current_version( + version=version, + account_id=account_id, + agent_soul=payload.agent_soul, + operation=AgentConfigVersionOperation.SAVE_CURRENT_VERSION, + version_note=payload.version_note, + ) + + db.session.commit() + return cls.load_agent_app_composer(tenant_id=tenant_id, app_id=app_id) + + @classmethod + def get_workflow_candidates(cls, *, app_id: str) -> dict[str, Any]: + response = ComposerCandidatesResponse( + variant=ComposerVariant.WORKFLOW, + allowed_node_job_candidates={ + "previous_node_outputs": [], + "declare_output_types": ["string", "number", "object", "array", "boolean", "file"], + "human_contacts": [], + }, + allowed_soul_candidates={ + "skills_files": [], + "dify_tools": [], + "cli_tools": [], + "knowledge_datasets": [], + "human_contacts": [], + }, + ) + return response.model_dump(mode="json") + + @classmethod + def get_agent_app_candidates(cls, *, app_id: str) -> dict[str, Any]: + response = ComposerCandidatesResponse( + variant=ComposerVariant.AGENT_APP, + allowed_node_job_candidates={}, + allowed_soul_candidates={ + "skills_files": [], + "dify_tools": [], + "cli_tools": [], + "knowledge_datasets": [], + "human_contacts": [], + }, + ) + return response.model_dump(mode="json") + + @classmethod + def calculate_impact(cls, *, tenant_id: str, agent_config_version_id: str) -> dict[str, Any]: + bindings = list( + db.session.scalars( + select(WorkflowAgentNodeBinding).where( + WorkflowAgentNodeBinding.tenant_id == tenant_id, + WorkflowAgentNodeBinding.agent_config_version_id == agent_config_version_id, + ) + ).all() + ) + return { + "agent_config_version_id": agent_config_version_id, + "workflow_node_count": len(bindings), + "bindings": [ + { + "app_id": binding.app_id, + "workflow_id": binding.workflow_id, + "workflow_version": binding.workflow_version, + "node_id": binding.node_id, + } + for binding in bindings + ], + } + + @classmethod + def _save_node_job_only( + cls, + *, + tenant_id: str, + app_id: str, + workflow_id: str, + node_id: str, + account_id: str, + binding: WorkflowAgentNodeBinding | None, + payload: ComposerSavePayload, + ) -> WorkflowAgentNodeBinding: + node_job = payload.node_job or WorkflowNodeJobConfig() + node_job_json = _json_dump(node_job.model_dump(mode="json")) + if binding: + binding.node_job_config = node_job_json + binding.updated_by = account_id + return binding + + agent_soul = payload.agent_soul or AgentSoulConfig() + agent = cls._create_workflow_only_agent( + tenant_id=tenant_id, + app_id=app_id, + workflow_id=workflow_id, + node_id=node_id, + account_id=account_id, + agent_soul=agent_soul, + ) + binding = WorkflowAgentNodeBinding( + tenant_id=tenant_id, + app_id=app_id, + workflow_id=workflow_id, + workflow_version=Workflow.VERSION_DRAFT, + node_id=node_id, + binding_type=WorkflowAgentBindingType.INLINE_AGENT, + agent_id=agent.id, + agent_config_version_id=agent.active_config_version_id, + node_job_config=node_job_json, + created_by=account_id, + updated_by=account_id, + ) + db.session.add(binding) + db.session.flush() + return binding + + @classmethod + def _save_to_current_version( + cls, + *, + tenant_id: str, + account_id: str, + binding: WorkflowAgentNodeBinding | None, + payload: ComposerSavePayload, + ) -> WorkflowAgentNodeBinding: + binding = cls._require_binding(binding) + if payload.agent_soul is None: + raise ValueError("agent_soul is required") + version = cls._require_version( + tenant_id=tenant_id, + agent_id=binding.agent_id, + version_id=binding.agent_config_version_id, + ) + cls._update_current_version( + version=version, + account_id=account_id, + agent_soul=payload.agent_soul, + operation=AgentConfigVersionOperation.SAVE_CURRENT_VERSION, + version_note=payload.version_note, + ) + if payload.node_job is not None: + binding.node_job_config = _json_dump(payload.node_job.model_dump(mode="json")) + binding.updated_by = account_id + return binding + + @classmethod + def _save_as_new_version( + cls, + *, + tenant_id: str, + account_id: str, + binding: WorkflowAgentNodeBinding | None, + payload: ComposerSavePayload, + ) -> WorkflowAgentNodeBinding: + binding = cls._require_binding(binding) + if not binding.agent_id or payload.agent_soul is None: + raise ValueError("agent_id and agent_soul are required") + version = cls._create_config_version( + tenant_id=tenant_id, + agent_id=binding.agent_id, + account_id=account_id, + agent_soul=payload.agent_soul, + operation=AgentConfigVersionOperation.SAVE_NEW_VERSION, + version_note=payload.version_note, + ) + agent = cls._require_agent(tenant_id=tenant_id, agent_id=binding.agent_id) + agent.active_config_version_id = version.id + agent.updated_by = account_id + binding.agent_config_version_id = version.id + binding.updated_by = account_id + if payload.node_job is not None: + binding.node_job_config = _json_dump(payload.node_job.model_dump(mode="json")) + return binding + + @classmethod + def _save_as_new_agent( + cls, + *, + tenant_id: str, + app_id: str, + workflow_id: str, + node_id: str, + account_id: str, + binding: WorkflowAgentNodeBinding | None, + payload: ComposerSavePayload, + ) -> WorkflowAgentNodeBinding: + if payload.agent_soul is None: + raise ValueError("agent_soul is required") + agent_name = payload.new_agent_name or "Untitled Agent" + agent = cls._create_roster_agent_for_composer( + tenant_id=tenant_id, + account_id=account_id, + name=agent_name, + agent_soul=payload.agent_soul, + operation=AgentConfigVersionOperation.SAVE_NEW_AGENT, + version_note=payload.version_note, + ) + node_job = payload.node_job or WorkflowNodeJobConfig() + if not binding: + binding = WorkflowAgentNodeBinding( + tenant_id=tenant_id, + app_id=app_id, + workflow_id=workflow_id, + workflow_version=Workflow.VERSION_DRAFT, + node_id=node_id, + created_by=account_id, + ) + db.session.add(binding) + binding.binding_type = WorkflowAgentBindingType.ROSTER_AGENT + binding.agent_id = agent.id + binding.agent_config_version_id = agent.active_config_version_id + binding.node_job_config = _json_dump(node_job.model_dump(mode="json")) + binding.updated_by = account_id + db.session.flush() + return binding + + @classmethod + def _save_to_roster( + cls, + *, + tenant_id: str, + account_id: str, + binding: WorkflowAgentNodeBinding | None, + payload: ComposerSavePayload, + ) -> WorkflowAgentNodeBinding: + binding = cls._require_binding(binding) + source_agent = cls._require_agent(tenant_id=tenant_id, agent_id=binding.agent_id) + source_version = cls._require_version( + tenant_id=tenant_id, + agent_id=source_agent.id, + version_id=binding.agent_config_version_id, + ) + agent_soul = payload.agent_soul or AgentSoulConfig.model_validate(source_version.config_snapshot_dict) + agent_name = payload.new_agent_name or source_agent.name + roster_agent = cls._create_roster_agent_for_composer( + tenant_id=tenant_id, + account_id=account_id, + name=agent_name, + agent_soul=agent_soul, + operation=AgentConfigVersionOperation.SAVE_TO_ROSTER, + version_note=payload.version_note, + ) + binding.binding_type = WorkflowAgentBindingType.ROSTER_AGENT + binding.agent_id = roster_agent.id + binding.agent_config_version_id = roster_agent.active_config_version_id + binding.updated_by = account_id + if payload.node_job is not None: + binding.node_job_config = _json_dump(payload.node_job.model_dump(mode="json")) + return binding + + @classmethod + def _create_workflow_only_agent( + cls, + *, + tenant_id: str, + app_id: str, + workflow_id: str, + node_id: str, + account_id: str, + agent_soul: AgentSoulConfig, + ) -> Agent: + agent = Agent( + tenant_id=tenant_id, + name=f"Workflow Agent {node_id}", + description="", + agent_kind=AgentKind.DIFY_AGENT, + scope=AgentScope.WORKFLOW_ONLY, + source=AgentSource.WORKFLOW, + app_id=app_id, + workflow_id=workflow_id, + workflow_node_id=node_id, + status=AgentStatus.ACTIVE, + created_by=account_id, + updated_by=account_id, + ) + db.session.add(agent) + db.session.flush() + version = cls._create_config_version( + tenant_id=tenant_id, + agent_id=agent.id, + account_id=account_id, + agent_soul=agent_soul, + operation=AgentConfigVersionOperation.CREATE_VERSION, + version_note=None, + ) + agent.active_config_version_id = version.id + return agent + + @classmethod + def _create_roster_agent_for_composer( + cls, + *, + tenant_id: str, + account_id: str, + name: str, + agent_soul: AgentSoulConfig, + operation: AgentConfigVersionOperation, + version_note: str | None, + ) -> Agent: + agent = Agent( + tenant_id=tenant_id, + name=name, + description="", + agent_kind=AgentKind.DIFY_AGENT, + scope=AgentScope.ROSTER, + source=AgentSource.WORKFLOW, + status=AgentStatus.ACTIVE, + created_by=account_id, + updated_by=account_id, + ) + db.session.add(agent) + try: + db.session.flush() + except IntegrityError as exc: + db.session.rollback() + raise AgentNameConflictError() from exc + version = cls._create_config_version( + tenant_id=tenant_id, + agent_id=agent.id, + account_id=account_id, + agent_soul=agent_soul, + operation=operation, + version_note=version_note, + ) + agent.active_config_version_id = version.id + return agent + + @classmethod + def _create_config_version( + cls, + *, + tenant_id: str, + agent_id: str, + account_id: str, + agent_soul: AgentSoulConfig, + operation: AgentConfigVersionOperation, + version_note: str | None, + ) -> AgentConfigVersion: + next_version = ( + db.session.scalar( + select(func.max(AgentConfigVersion.version)).where( + AgentConfigVersion.tenant_id == tenant_id, + AgentConfigVersion.agent_id == agent_id, + ) + ) + or 0 + ) + 1 + snapshot = _json_dump(agent_soul.model_dump(mode="json")) + version = AgentConfigVersion( + tenant_id=tenant_id, + agent_id=agent_id, + version=next_version, + config_snapshot=snapshot, + version_note=version_note, + created_by=account_id, + ) + db.session.add(version) + db.session.flush() + revision = AgentConfigVersionRevision( + tenant_id=tenant_id, + agent_id=agent_id, + agent_config_version_id=version.id, + revision=1, + operation=operation, + config_snapshot=snapshot, + version_note=version_note, + created_by=account_id, + ) + db.session.add(revision) + db.session.flush() + return version + + @classmethod + def _update_current_version( + cls, + *, + version: AgentConfigVersion, + account_id: str, + agent_soul: AgentSoulConfig, + operation: AgentConfigVersionOperation, + version_note: str | None, + ) -> AgentConfigVersionRevision: + previous_snapshot = version.config_snapshot + snapshot = _json_dump(agent_soul.model_dump(mode="json")) + next_revision = ( + db.session.scalar( + select(func.max(AgentConfigVersionRevision.revision)).where( + AgentConfigVersionRevision.agent_config_version_id == version.id + ) + ) + or 0 + ) + 1 + version.config_snapshot = snapshot + version.version_note = version_note + revision = AgentConfigVersionRevision( + tenant_id=version.tenant_id, + agent_id=version.agent_id, + agent_config_version_id=version.id, + revision=next_revision, + operation=operation, + config_snapshot=snapshot, + previous_config_snapshot=previous_snapshot, + version_note=version_note, + created_by=account_id, + ) + db.session.add(revision) + db.session.flush() + return revision + + @classmethod + def _get_draft_workflow(cls, *, tenant_id: str, app_id: str) -> Workflow: + workflow = db.session.scalar( + select(Workflow) + .where( + Workflow.tenant_id == tenant_id, + Workflow.app_id == app_id, + Workflow.version == Workflow.VERSION_DRAFT, + ) + .limit(1) + ) + if not workflow: + raise ValueError("Draft workflow not found") + return workflow + + @classmethod + def _get_workflow_binding( + cls, *, tenant_id: str, workflow_id: str, node_id: str + ) -> WorkflowAgentNodeBinding | None: + return db.session.scalar( + select(WorkflowAgentNodeBinding) + .where( + WorkflowAgentNodeBinding.tenant_id == tenant_id, + WorkflowAgentNodeBinding.workflow_id == workflow_id, + WorkflowAgentNodeBinding.workflow_version == Workflow.VERSION_DRAFT, + WorkflowAgentNodeBinding.node_id == node_id, + ) + .limit(1) + ) + + @classmethod + def _require_binding(cls, binding: WorkflowAgentNodeBinding | None) -> WorkflowAgentNodeBinding: + if not binding: + raise ValueError("Workflow agent binding not found") + return binding + + @classmethod + def _require_agent(cls, *, tenant_id: str, agent_id: str | None) -> Agent: + if not agent_id: + raise AgentNotFoundError() + agent = db.session.scalar(select(Agent).where(Agent.tenant_id == tenant_id, Agent.id == agent_id).limit(1)) + if not agent: + raise AgentNotFoundError() + return agent + + @classmethod + def _get_agent_if_present(cls, *, tenant_id: str, agent_id: str | None) -> Agent | None: + if not agent_id: + return None + return db.session.scalar(select(Agent).where(Agent.tenant_id == tenant_id, Agent.id == agent_id).limit(1)) + + @classmethod + def _require_version(cls, *, tenant_id: str, agent_id: str | None, version_id: str | None) -> AgentConfigVersion: + if not agent_id or not version_id: + raise AgentVersionNotFoundError() + version = db.session.scalar( + select(AgentConfigVersion) + .where( + AgentConfigVersion.tenant_id == tenant_id, + AgentConfigVersion.agent_id == agent_id, + AgentConfigVersion.id == version_id, + ) + .limit(1) + ) + if not version: + raise AgentVersionNotFoundError() + return version + + @classmethod + def _get_version_if_present( + cls, *, tenant_id: str, agent_id: str | None, version_id: str | None + ) -> AgentConfigVersion | None: + if not agent_id or not version_id: + return None + return db.session.scalar( + select(AgentConfigVersion) + .where( + AgentConfigVersion.tenant_id == tenant_id, + AgentConfigVersion.agent_id == agent_id, + AgentConfigVersion.id == version_id, + ) + .limit(1) + ) + + @classmethod + def _empty_workflow_state(cls, *, app_id: str, workflow_id: str, node_id: str) -> dict[str, Any]: + return { + "variant": ComposerVariant.WORKFLOW.value, + "agent": None, + "active_config_version": None, + "binding": None, + "soul_lock": {"locked": False, "can_unlock": False, "reason": "workflow_only_empty"}, + "agent_soul": AgentSoulConfig().model_dump(mode="json"), + "node_job": WorkflowNodeJobConfig().model_dump(mode="json"), + "save_options": [ComposerSaveStrategy.NODE_JOB_ONLY.value, ComposerSaveStrategy.SAVE_TO_ROSTER.value], + "impact_summary": None, + "app_id": app_id, + "workflow_id": workflow_id, + "node_id": node_id, + } + + @classmethod + def _serialize_workflow_state( + cls, + *, + binding: WorkflowAgentNodeBinding, + agent: Agent | None, + version: AgentConfigVersion | None, + ) -> dict[str, Any]: + locked = bool(agent and agent.scope == AgentScope.ROSTER) + save_options = [ComposerSaveStrategy.NODE_JOB_ONLY.value] + if locked: + save_options.extend( + [ + ComposerSaveStrategy.SAVE_TO_CURRENT_VERSION.value, + ComposerSaveStrategy.SAVE_AS_NEW_VERSION.value, + ComposerSaveStrategy.SAVE_AS_NEW_AGENT.value, + ] + ) + else: + save_options.append(ComposerSaveStrategy.SAVE_TO_ROSTER.value) + return { + "variant": ComposerVariant.WORKFLOW.value, + "agent": cls._serialize_agent(agent) if agent else None, + "active_config_version": cls._serialize_version(version), + "binding": { + "id": binding.id, + "binding_type": binding.binding_type.value, + "agent_id": binding.agent_id, + "agent_config_version_id": binding.agent_config_version_id, + "workflow_id": binding.workflow_id, + "workflow_version": binding.workflow_version, + "node_id": binding.node_id, + }, + "soul_lock": { + "locked": locked, + "can_unlock": locked, + "reason": "roster_agent_shared_version" if locked else "workflow_only_agent", + }, + "agent_soul": cls._workflow_agent_soul_config(version.config_snapshot_dict) + if version + else AgentSoulConfig().model_dump(mode="json"), + "node_job": binding.node_job_config_dict, + "save_options": save_options, + "impact_summary": cls.calculate_impact( + tenant_id=binding.tenant_id, agent_config_version_id=binding.agent_config_version_id + ) + if binding.agent_config_version_id + else None, + } + + @classmethod + def _serialize_agent(cls, agent: Agent) -> dict[str, Any]: + return { + "id": agent.id, + "name": agent.name, + "description": agent.description, + "scope": agent.scope.value, + "status": agent.status.value, + "active_config_version_id": agent.active_config_version_id, + } + + @classmethod + def _serialize_version(cls, version: AgentConfigVersion | None) -> dict[str, Any] | None: + if not version: + return None + return { + "id": version.id, + "version": version.version, + "version_note": version.version_note, + "created_by": version.created_by, + "created_at": version.created_at.isoformat() if version.created_at else None, + } + + @staticmethod + def _workflow_agent_soul_config(config_snapshot: dict[str, Any]) -> dict[str, Any]: + agent_soul = dict(config_snapshot) + agent_soul["app_features"] = {} + agent_soul["app_variables"] = [] + return agent_soul diff --git a/api/services/agent/composer_validator.py b/api/services/agent/composer_validator.py new file mode 100644 index 0000000000..9c91496f68 --- /dev/null +++ b/api/services/agent/composer_validator.py @@ -0,0 +1,71 @@ +from typing import Any + +from pydantic import ValidationError + +from services.agent.errors import AgentSoulLockedError, InvalidComposerConfigError, PlaintextSecretNotAllowedError +from services.entities.agent_entities import ( + AgentSoulConfig, + ComposerSavePayload, + ComposerVariant, + WorkflowNodeJobConfig, +) + +_PLAINTEXT_SECRET_KEYS = { + "api_key", + "apikey", + "authorization", + "password", + "secret", + "secret_key", +} + + +class ComposerConfigValidator: + @classmethod + def validate_save_payload(cls, payload: ComposerSavePayload) -> None: + if payload.variant == ComposerVariant.WORKFLOW and payload.soul_lock.locked and payload.agent_soul is not None: + raise AgentSoulLockedError() + + if payload.agent_soul is not None: + cls.validate_agent_soul(payload.agent_soul) + if payload.node_job is not None: + cls.validate_node_job(payload.node_job) + + @classmethod + def validate_agent_soul(cls, agent_soul: AgentSoulConfig) -> None: + cls._reject_plaintext_secrets(agent_soul.model_dump(mode="json"), path="agent_soul") + + @classmethod + def validate_node_job(cls, node_job: WorkflowNodeJobConfig) -> None: + cls._reject_plaintext_secrets(node_job.model_dump(mode="json"), path="node_job") + + @classmethod + def validate_agent_soul_dict(cls, value: dict[str, Any]) -> AgentSoulConfig: + try: + config = AgentSoulConfig.model_validate(value) + except ValidationError as exc: + raise InvalidComposerConfigError(str(exc)) from exc + cls.validate_agent_soul(config) + return config + + @classmethod + def validate_node_job_dict(cls, value: dict[str, Any]) -> WorkflowNodeJobConfig: + try: + config = WorkflowNodeJobConfig.model_validate(value) + except ValidationError as exc: + raise InvalidComposerConfigError(str(exc)) from exc + cls.validate_node_job(config) + return config + + @classmethod + def _reject_plaintext_secrets(cls, value: Any, *, path: str) -> None: + if isinstance(value, dict): + for key, nested in value.items(): + normalized_key = key.lower().replace("-", "_") + nested_path = f"{path}.{key}" + if normalized_key in _PLAINTEXT_SECRET_KEYS and isinstance(nested, str) and nested: + raise PlaintextSecretNotAllowedError(f"Plaintext secret is not allowed at {nested_path}") + cls._reject_plaintext_secrets(nested, path=nested_path) + elif isinstance(value, list): + for index, nested in enumerate(value): + cls._reject_plaintext_secrets(nested, path=f"{path}[{index}]") diff --git a/api/services/agent/errors.py b/api/services/agent/errors.py new file mode 100644 index 0000000000..dcc8f69961 --- /dev/null +++ b/api/services/agent/errors.py @@ -0,0 +1,29 @@ +from werkzeug.exceptions import BadRequest, Conflict, NotFound + + +class AgentNotFoundError(NotFound): + description = "Agent not found." + + +class AgentVersionNotFoundError(NotFound): + description = "Agent config version not found." + + +class AgentNameConflictError(Conflict): + description = "Agent name already exists." + + +class AgentArchivedError(Conflict): + description = "Archived agent cannot be modified." + + +class AgentSoulLockedError(BadRequest): + description = "Agent Soul is locked for this workflow node." + + +class InvalidComposerConfigError(BadRequest): + description = "Invalid agent composer config." + + +class PlaintextSecretNotAllowedError(BadRequest): + description = "Plaintext secret values are not allowed in Agent config." diff --git a/api/services/agent/roster_service.py b/api/services/agent/roster_service.py new file mode 100644 index 0000000000..2144912b00 --- /dev/null +++ b/api/services/agent/roster_service.py @@ -0,0 +1,333 @@ +import json +from typing import Any + +from sqlalchemy import func, select +from sqlalchemy.exc import IntegrityError + +from extensions.ext_database import db +from libs.datetime_utils import naive_utc_now +from models.agent import ( + Agent, + AgentConfigVersion, + AgentConfigVersionOperation, + AgentConfigVersionRevision, + AgentKind, + AgentScope, + AgentSource, + AgentStatus, + WorkflowAgentNodeBinding, +) +from models.workflow import Workflow +from services.agent.composer_validator import ComposerConfigValidator +from services.agent.errors import ( + AgentArchivedError, + AgentNameConflictError, + AgentNotFoundError, + AgentVersionNotFoundError, +) +from services.entities.agent_entities import RosterAgentCreatePayload, RosterAgentUpdatePayload + + +def _json_dump(value: dict[str, Any]) -> str: + return json.dumps(value, ensure_ascii=False, sort_keys=True, separators=(",", ":")) + + +class AgentRosterService: + @staticmethod + def serialize_agent(agent: Agent, active_version: AgentConfigVersion | None = None) -> dict[str, Any]: + return { + "id": agent.id, + "name": agent.name, + "description": agent.description, + "icon_type": agent.icon_type, + "icon": agent.icon, + "icon_background": agent.icon_background, + "agent_kind": agent.agent_kind.value, + "scope": agent.scope.value, + "source": agent.source.value, + "app_id": agent.app_id, + "workflow_id": agent.workflow_id, + "workflow_node_id": agent.workflow_node_id, + "active_config_version_id": agent.active_config_version_id, + "active_config_version": AgentRosterService.serialize_version(active_version) if active_version else None, + "status": agent.status.value, + "created_by": agent.created_by, + "updated_by": agent.updated_by, + "archived_by": agent.archived_by, + "archived_at": agent.archived_at.isoformat() if agent.archived_at else None, + "created_at": agent.created_at.isoformat() if agent.created_at else None, + "updated_at": agent.updated_at.isoformat() if agent.updated_at else None, + } + + @staticmethod + def serialize_version(version: AgentConfigVersion | None) -> dict[str, Any] | None: + if version is None: + return None + return { + "id": version.id, + "agent_id": version.agent_id, + "version": version.version, + "summary": version.summary, + "version_note": version.version_note, + "created_by": version.created_by, + "created_at": version.created_at.isoformat() if version.created_at else None, + } + + @classmethod + def list_roster_agents( + cls, *, tenant_id: str, page: int = 1, limit: int = 20, keyword: str | None = None + ) -> dict[str, Any]: + stmt = select(Agent).where( + Agent.tenant_id == tenant_id, + Agent.scope == AgentScope.ROSTER, + Agent.status == AgentStatus.ACTIVE, + ) + if keyword: + from libs.helper import escape_like_pattern + + escaped_keyword = escape_like_pattern(keyword) + stmt = stmt.where(Agent.name.ilike(f"%{escaped_keyword}%", escape="\\")) + stmt = stmt.order_by(Agent.updated_at.desc()) + + total = db.session.scalar(select(func.count()).select_from(stmt.subquery())) or 0 + agents = list(db.session.scalars(stmt.offset((page - 1) * limit).limit(limit)).all()) + versions_by_id = cls._load_versions_by_id( + [agent.active_config_version_id for agent in agents if agent.active_config_version_id] + ) + + data = [] + for agent in agents: + active_version = ( + versions_by_id.get(agent.active_config_version_id) if agent.active_config_version_id else None + ) + data.append(cls.serialize_agent(agent, active_version)) + + return { + "data": data, + "page": page, + "limit": limit, + "total": total, + "has_more": page * limit < total, + } + + @classmethod + def list_invite_options( + cls, *, tenant_id: str, page: int = 1, limit: int = 20, keyword: str | None = None, app_id: str | None = None + ) -> dict[str, Any]: + result = cls.list_roster_agents(tenant_id=tenant_id, page=page, limit=limit, keyword=keyword) + usage_by_agent_id: dict[str, list[str]] = {} + if app_id: + draft_workflow = db.session.scalar( + select(Workflow) + .where( + Workflow.tenant_id == tenant_id, + Workflow.app_id == app_id, + Workflow.version == Workflow.VERSION_DRAFT, + ) + .limit(1) + ) + if draft_workflow: + agent_ids = [item["id"] for item in result["data"]] + if agent_ids: + bindings = db.session.scalars( + select(WorkflowAgentNodeBinding).where( + WorkflowAgentNodeBinding.tenant_id == tenant_id, + WorkflowAgentNodeBinding.workflow_id == draft_workflow.id, + WorkflowAgentNodeBinding.workflow_version == Workflow.VERSION_DRAFT, + WorkflowAgentNodeBinding.agent_id.in_(agent_ids), + ) + ).all() + for binding in bindings: + if binding.agent_id: + usage_by_agent_id.setdefault(binding.agent_id, []).append(binding.node_id) + + for item in result["data"]: + existing_node_ids = usage_by_agent_id.get(item["id"], []) + item["is_in_current_workflow"] = bool(existing_node_ids) + item["in_current_workflow_count"] = len(existing_node_ids) + item["existing_node_ids"] = existing_node_ids + return result + + @classmethod + def create_roster_agent( + cls, + *, + tenant_id: str, + account_id: str, + payload: RosterAgentCreatePayload, + source: AgentSource = AgentSource.AGENT_APP, + ) -> Agent: + ComposerConfigValidator.validate_agent_soul(payload.agent_soul) + snapshot = _json_dump(payload.agent_soul.model_dump(mode="json")) + + agent = Agent( + tenant_id=tenant_id, + name=payload.name, + description=payload.description, + icon_type=payload.icon_type, + icon=payload.icon, + icon_background=payload.icon_background, + agent_kind=AgentKind.DIFY_AGENT, + scope=AgentScope.ROSTER, + source=source, + status=AgentStatus.ACTIVE, + created_by=account_id, + updated_by=account_id, + ) + db.session.add(agent) + try: + db.session.flush() + except IntegrityError as exc: + db.session.rollback() + raise AgentNameConflictError() from exc + + version = AgentConfigVersion( + tenant_id=tenant_id, + agent_id=agent.id, + version=1, + config_snapshot=snapshot, + version_note=payload.version_note, + created_by=account_id, + ) + db.session.add(version) + db.session.flush() + + revision = AgentConfigVersionRevision( + tenant_id=tenant_id, + agent_id=agent.id, + agent_config_version_id=version.id, + revision=1, + operation=AgentConfigVersionOperation.CREATE_VERSION, + config_snapshot=snapshot, + version_note=payload.version_note, + created_by=account_id, + ) + db.session.add(revision) + agent.active_config_version_id = version.id + + try: + db.session.commit() + except IntegrityError as exc: + db.session.rollback() + raise AgentNameConflictError() from exc + return agent + + @classmethod + def get_roster_agent_detail(cls, *, tenant_id: str, agent_id: str) -> dict[str, Any]: + agent = cls._get_agent(tenant_id=tenant_id, agent_id=agent_id, roster_only=True) + active_version = cls._get_version( + tenant_id=tenant_id, agent_id=agent.id, version_id=agent.active_config_version_id + ) + return cls.serialize_agent(agent, active_version) + + @classmethod + def update_roster_agent( + cls, *, tenant_id: str, agent_id: str, account_id: str, payload: RosterAgentUpdatePayload + ) -> dict[str, Any]: + agent = cls._get_agent(tenant_id=tenant_id, agent_id=agent_id, roster_only=True) + if agent.status == AgentStatus.ARCHIVED: + raise AgentArchivedError() + + update_data = payload.model_dump(exclude_unset=True) + for key, value in update_data.items(): + setattr(agent, key, value) + agent.updated_by = account_id + + try: + db.session.commit() + except IntegrityError as exc: + db.session.rollback() + raise AgentNameConflictError() from exc + return cls.get_roster_agent_detail(tenant_id=tenant_id, agent_id=agent_id) + + @classmethod + def archive_roster_agent(cls, *, tenant_id: str, agent_id: str, account_id: str) -> None: + agent = cls._get_agent(tenant_id=tenant_id, agent_id=agent_id, roster_only=True) + if agent.status == AgentStatus.ARCHIVED: + return + agent.status = AgentStatus.ARCHIVED + agent.archived_by = account_id + agent.archived_at = naive_utc_now() + agent.updated_by = account_id + db.session.commit() + + @classmethod + def list_agent_versions(cls, *, tenant_id: str, agent_id: str) -> list[dict[str, Any]]: + cls._get_agent(tenant_id=tenant_id, agent_id=agent_id, roster_only=True) + versions = list( + db.session.scalars( + select(AgentConfigVersion) + .where(AgentConfigVersion.tenant_id == tenant_id, AgentConfigVersion.agent_id == agent_id) + .order_by(AgentConfigVersion.version.desc()) + ).all() + ) + return [ + serialized_version + for version in versions + if (serialized_version := cls.serialize_version(version)) is not None + ] + + @classmethod + def get_agent_version_detail(cls, *, tenant_id: str, agent_id: str, version_id: str) -> dict[str, Any]: + cls._get_agent(tenant_id=tenant_id, agent_id=agent_id, roster_only=True) + version = cls._get_version(tenant_id=tenant_id, agent_id=agent_id, version_id=version_id) + revisions = list( + db.session.scalars( + select(AgentConfigVersionRevision) + .where( + AgentConfigVersionRevision.tenant_id == tenant_id, + AgentConfigVersionRevision.agent_id == agent_id, + AgentConfigVersionRevision.agent_config_version_id == version_id, + ) + .order_by(AgentConfigVersionRevision.revision.desc()) + ).all() + ) + result = cls.serialize_version(version) or {} + result["config_snapshot"] = version.config_snapshot_dict + result["revisions"] = [ + { + "id": revision.id, + "revision": revision.revision, + "operation": revision.operation.value, + "summary": revision.summary, + "version_note": revision.version_note, + "created_by": revision.created_by, + "created_at": revision.created_at.isoformat() if revision.created_at else None, + } + for revision in revisions + ] + return result + + @classmethod + def _get_agent(cls, *, tenant_id: str, agent_id: str, roster_only: bool = False) -> Agent: + stmt = select(Agent).where(Agent.tenant_id == tenant_id, Agent.id == agent_id) + if roster_only: + stmt = stmt.where(Agent.scope == AgentScope.ROSTER) + agent = db.session.scalar(stmt.limit(1)) + if not agent: + raise AgentNotFoundError() + return agent + + @classmethod + def _get_version(cls, *, tenant_id: str, agent_id: str, version_id: str | None) -> AgentConfigVersion: + if not version_id: + raise AgentVersionNotFoundError() + version = db.session.scalar( + select(AgentConfigVersion) + .where( + AgentConfigVersion.tenant_id == tenant_id, + AgentConfigVersion.agent_id == agent_id, + AgentConfigVersion.id == version_id, + ) + .limit(1) + ) + if not version: + raise AgentVersionNotFoundError() + return version + + @classmethod + def _load_versions_by_id(cls, version_ids: list[str]) -> dict[str, AgentConfigVersion]: + if not version_ids: + return {} + versions = db.session.scalars(select(AgentConfigVersion).where(AgentConfigVersion.id.in_(version_ids))).all() + return {version.id: version for version in versions} diff --git a/api/services/entities/agent_entities.py b/api/services/entities/agent_entities.py new file mode 100644 index 0000000000..6ba9303644 --- /dev/null +++ b/api/services/entities/agent_entities.py @@ -0,0 +1,222 @@ +from enum import StrEnum +from typing import Any, Literal + +from pydantic import BaseModel, ConfigDict, Field, model_validator + + +class ComposerVariant(StrEnum): + WORKFLOW = "workflow" + AGENT_APP = "agent_app" + + +class ComposerSaveStrategy(StrEnum): + NODE_JOB_ONLY = "node_job_only" + SAVE_TO_CURRENT_VERSION = "save_to_current_version" + SAVE_AS_NEW_VERSION = "save_as_new_version" + SAVE_AS_NEW_AGENT = "save_as_new_agent" + SAVE_TO_ROSTER = "save_to_roster" + + +class AgentKnowledgeQueryMode(StrEnum): + USER_QUERY = "user_query" + GENERATED_QUERY = "generated_query" + + +class WorkflowNodeJobMode(StrEnum): + LET_AGENT_FIGURE_IT_OUT = "let_agent_figure_it_out" + TELL_AGENT_WHAT_TO_DO = "tell_agent_what_to_do" + + +class DeclaredOutputType(StrEnum): + STRING = "string" + NUMBER = "number" + OBJECT = "object" + ARRAY = "array" + BOOLEAN = "boolean" + FILE = "file" + + +class AgentSoulPromptConfig(BaseModel): + system_prompt: str = "" + + +class AgentSoulSkillsFilesConfig(BaseModel): + files: list[dict[str, Any]] = Field(default_factory=list) + skills: list[dict[str, Any]] = Field(default_factory=list) + + +class AgentSoulToolsConfig(BaseModel): + dify_tools: list[dict[str, Any]] = Field(default_factory=list) + cli_tools: list[dict[str, Any]] = Field(default_factory=list) + + +class AgentSoulKnowledgeConfig(BaseModel): + datasets: list[dict[str, Any]] = Field(default_factory=list) + query_mode: AgentKnowledgeQueryMode | None = None + query_config: dict[str, Any] = Field(default_factory=dict) + + +class AgentSoulHumanConfig(BaseModel): + contacts: list[dict[str, Any]] = Field(default_factory=list) + tools: list[dict[str, Any]] = Field(default_factory=list) + + +class AgentSoulEnvConfig(BaseModel): + variables: list[dict[str, Any]] = Field(default_factory=list) + secret_refs: list[dict[str, Any]] = Field(default_factory=list) + + +class AgentSoulSandboxConfig(BaseModel): + provider: str | None = None + config: dict[str, Any] = Field(default_factory=dict) + + +class AgentSoulMemoryConfig(BaseModel): + scope: str | None = None + budget: str | None = None + artifacts: list[dict[str, Any]] = Field(default_factory=list) + + +class AppVariableConfig(BaseModel): + name: str = Field(min_length=1, max_length=255) + type: str = Field(min_length=1, max_length=64) + required: bool = False + default: Any = None + + +class AgentSoulConfig(BaseModel): + model_config = ConfigDict(extra="forbid") + + schema_version: int = 1 + prompt: AgentSoulPromptConfig = Field(default_factory=AgentSoulPromptConfig) + skills_files: AgentSoulSkillsFilesConfig = Field(default_factory=AgentSoulSkillsFilesConfig) + tools: AgentSoulToolsConfig = Field(default_factory=AgentSoulToolsConfig) + knowledge: AgentSoulKnowledgeConfig = Field(default_factory=AgentSoulKnowledgeConfig) + human: AgentSoulHumanConfig = Field(default_factory=AgentSoulHumanConfig) + env: AgentSoulEnvConfig = Field(default_factory=AgentSoulEnvConfig) + sandbox: AgentSoulSandboxConfig = Field(default_factory=AgentSoulSandboxConfig) + memory: AgentSoulMemoryConfig = Field(default_factory=AgentSoulMemoryConfig) + app_features: dict[str, Any] = Field(default_factory=dict) + app_variables: list[AppVariableConfig] = Field(default_factory=list) + misc_legacy: dict[str, Any] = Field(default_factory=dict) + + +class DeclaredOutputFileConfig(BaseModel): + extensions: list[str] = Field(default_factory=list) + mime_types: list[str] = Field(default_factory=list) + + +class DeclaredOutputCheckConfig(BaseModel): + type: str = Field(min_length=1, max_length=64) + prompt: str | None = None + benchmark_file_ref: dict[str, Any] | None = None + + +class DeclaredOutputFailureStrategy(BaseModel): + on_type_check_failed: str | None = None + on_output_check_failed: str | None = None + max_retries: int = Field(default=0, ge=0, le=10) + + +class DeclaredOutputConfig(BaseModel): + id: str | None = None + name: str = Field(min_length=1, max_length=255) + type: DeclaredOutputType + description: str | None = None + required: bool = True + file: DeclaredOutputFileConfig | None = None + checks: list[DeclaredOutputCheckConfig] = Field(default_factory=list) + failure_strategy: DeclaredOutputFailureStrategy | None = None + + @model_validator(mode="after") + def validate_file_metadata(self) -> "DeclaredOutputConfig": + if self.type == DeclaredOutputType.FILE and self.file is None: + self.file = DeclaredOutputFileConfig() + if self.type != DeclaredOutputType.FILE and self.file is not None: + raise ValueError("file metadata is only allowed for file outputs") + return self + + +class WorkflowNodeJobConfig(BaseModel): + model_config = ConfigDict(extra="forbid") + + schema_version: int = 1 + mode: WorkflowNodeJobMode = WorkflowNodeJobMode.TELL_AGENT_WHAT_TO_DO + workflow_prompt: str = "" + previous_node_output_refs: list[dict[str, Any]] = Field(default_factory=list) + declared_outputs: list[DeclaredOutputConfig] = Field(default_factory=list) + human_contacts: list[dict[str, Any]] = Field(default_factory=list) + metadata: dict[str, Any] = Field(default_factory=dict) + + +class ComposerBindingPayload(BaseModel): + binding_type: Literal["roster_agent", "inline_agent"] + agent_id: str | None = None + agent_config_version_id: str | None = None + + +class ComposerSoulLockPayload(BaseModel): + locked: bool = True + unlocked_from_version_id: str | None = None + + +class ComposerSavePayload(BaseModel): + variant: ComposerVariant + binding: ComposerBindingPayload | None = None + soul_lock: ComposerSoulLockPayload = Field(default_factory=ComposerSoulLockPayload) + agent_soul: AgentSoulConfig | None = None + node_job: WorkflowNodeJobConfig | None = None + save_strategy: ComposerSaveStrategy + version_note: str | None = None + idempotency_key: str | None = None + client_revision_id: str | None = None + new_agent_name: str | None = Field(default=None, min_length=1, max_length=255) + + @model_validator(mode="after") + def validate_variant_sections(self) -> "ComposerSavePayload": + if self.variant == ComposerVariant.AGENT_APP and self.node_job is not None: + raise ValueError("Agent App Variant must not include workflow node job config") + if self.variant == ComposerVariant.AGENT_APP and self.agent_soul is not None: + if self.agent_soul.app_variables and self.save_strategy == ComposerSaveStrategy.NODE_JOB_ONLY: + raise ValueError("Agent App Variant cannot use node_job_only save strategy") + if self.variant == ComposerVariant.WORKFLOW and self.agent_soul is not None: + if self.agent_soul.app_variables: + raise ValueError("Workflow Variant must not include app variables") + if self.agent_soul.app_features: + raise ValueError("Workflow Variant must not include app features") + return self + + +class RosterAgentCreatePayload(BaseModel): + name: str = Field(min_length=1, max_length=255) + description: str = "" + icon_type: str | None = Field(default=None, max_length=255) + icon: str | None = Field(default=None, max_length=255) + icon_background: str | None = Field(default=None, max_length=255) + agent_soul: AgentSoulConfig = Field(default_factory=AgentSoulConfig) + version_note: str | None = None + + +class RosterAgentUpdatePayload(BaseModel): + name: str | None = Field(default=None, min_length=1, max_length=255) + description: str | None = None + icon_type: str | None = Field(default=None, max_length=255) + icon: str | None = Field(default=None, max_length=255) + icon_background: str | None = Field(default=None, max_length=255) + + +class RosterListQuery(BaseModel): + page: int = Field(default=1, ge=1) + limit: int = Field(default=20, ge=1, le=100) + keyword: str | None = None + + +class ComposerCandidateCapabilities(BaseModel): + human_roster_available: bool = False + + +class ComposerCandidatesResponse(BaseModel): + variant: ComposerVariant + allowed_node_job_candidates: dict[str, Any] = Field(default_factory=dict) + allowed_soul_candidates: dict[str, Any] = Field(default_factory=dict) + capabilities: ComposerCandidateCapabilities = Field(default_factory=ComposerCandidateCapabilities) diff --git a/api/tests/unit_tests/models/test_agent.py b/api/tests/unit_tests/models/test_agent.py index 9e45c671d2..7828d76862 100644 --- a/api/tests/unit_tests/models/test_agent.py +++ b/api/tests/unit_tests/models/test_agent.py @@ -7,6 +7,8 @@ from sqlalchemy.exc import IntegrityError from models.agent import ( Agent, AgentConfigVersion, + AgentConfigVersionOperation, + AgentConfigVersionRevision, AgentKind, AgentScope, AgentSource, @@ -25,6 +27,7 @@ def test_agent_enums_match_prd_boundaries(): assert AgentSource.WORKFLOW.value == "workflow" assert AgentStatus.ACTIVE.value == "active" assert AgentStatus.ARCHIVED.value == "archived" + assert AgentConfigVersionOperation.SAVE_CURRENT_VERSION.value == "save_current_version" assert WorkflowAgentBindingType.ROSTER_AGENT.value == "roster_agent" assert WorkflowAgentBindingType.INLINE_AGENT.value == "inline_agent" @@ -149,7 +152,36 @@ def test_long_text_columns_do_not_use_mysql_incompatible_server_defaults(): for column in ( Agent.__table__.c.description, AgentConfigVersion.__table__.c.config_snapshot, + AgentConfigVersionRevision.__table__.c.config_snapshot, + AgentConfigVersionRevision.__table__.c.previous_config_snapshot, WorkflowAgentNodeBinding.__table__.c.node_job_config, ): assert isinstance(column.type, LongText) assert column.server_default is None + + +def test_agent_config_version_revision_records_audit_snapshot(): + snapshot = {"schema_version": 1, "prompt": {"system_prompt": "new"}} + previous_snapshot = {"schema_version": 1, "prompt": {"system_prompt": "old"}} + revision = AgentConfigVersionRevision( + tenant_id="tenant-1", + agent_id="agent-1", + agent_config_version_id="version-1", + revision=2, + operation=AgentConfigVersionOperation.SAVE_CURRENT_VERSION, + config_snapshot=json.dumps(snapshot), + previous_config_snapshot=json.dumps(previous_snapshot), + ) + + unique_constraints = { + constraint.name: tuple(column.name for column in constraint.columns) + for constraint in AgentConfigVersionRevision.__table__.constraints + if constraint.__class__.__name__ == "UniqueConstraint" + } + + assert unique_constraints["agent_config_version_revision_version_revision_unique"] == ( + "agent_config_version_id", + "revision", + ) + assert revision.config_snapshot_dict == snapshot + assert revision.previous_config_snapshot_dict == previous_snapshot diff --git a/api/tests/unit_tests/services/agent/test_agent_composer_entities.py b/api/tests/unit_tests/services/agent/test_agent_composer_entities.py new file mode 100644 index 0000000000..1c9c7d9d69 --- /dev/null +++ b/api/tests/unit_tests/services/agent/test_agent_composer_entities.py @@ -0,0 +1,144 @@ +import pytest + +from services.agent.composer_service import AgentComposerService +from services.agent.composer_validator import ComposerConfigValidator +from services.agent.errors import AgentSoulLockedError, PlaintextSecretNotAllowedError +from services.entities.agent_entities import ( + AgentKnowledgeQueryMode, + AgentSoulConfig, + ComposerSavePayload, + ComposerSaveStrategy, + ComposerVariant, + DeclaredOutputType, + WorkflowNodeJobConfig, +) + + +def test_workflow_variant_rejects_agent_app_only_fields(): + with pytest.raises(ValueError): + ComposerSavePayload.model_validate( + { + "variant": ComposerVariant.WORKFLOW, + "save_strategy": ComposerSaveStrategy.NODE_JOB_ONLY, + "agent_soul": { + "app_variables": [{"name": "company_name", "type": "string"}], + }, + } + ) + + +def test_agent_app_variant_rejects_workflow_node_job(): + with pytest.raises(ValueError): + ComposerSavePayload.model_validate( + { + "variant": ComposerVariant.AGENT_APP, + "save_strategy": ComposerSaveStrategy.SAVE_TO_CURRENT_VERSION, + "node_job": {"workflow_prompt": "Use the previous node output."}, + } + ) + + +def test_locked_workflow_soul_rejects_soul_changes(): + payload = ComposerSavePayload.model_validate( + { + "variant": ComposerVariant.WORKFLOW, + "save_strategy": ComposerSaveStrategy.SAVE_TO_CURRENT_VERSION, + "soul_lock": {"locked": True}, + "agent_soul": {"prompt": {"system_prompt": "changed"}}, + } + ) + + with pytest.raises(AgentSoulLockedError): + ComposerConfigValidator.validate_save_payload(payload) + + +def test_agent_app_soul_allows_app_features_and_variables(): + payload = ComposerSavePayload.model_validate( + { + "variant": ComposerVariant.AGENT_APP, + "save_strategy": ComposerSaveStrategy.SAVE_TO_CURRENT_VERSION, + "agent_soul": { + "app_features": { + "conversation_opener": {}, + "follow_up": {}, + "citations_and_attributions": {}, + "content_moderation": {}, + "annotation_reply": {}, + }, + "app_variables": [{"name": "company_name", "type": "string", "required": True}], + }, + } + ) + + ComposerConfigValidator.validate_save_payload(payload) + assert payload.agent_soul is not None + assert payload.agent_soul.app_variables[0].name == "company_name" + + +def test_knowledge_query_mode_uses_stable_backend_enums(): + config = AgentSoulConfig.model_validate( + { + "knowledge": { + "datasets": [{"dataset_id": "dataset-1"}], + "query_mode": "generated_query", + "query_config": {"generation_prompt": "Create a retrieval query."}, + } + } + ) + + assert config.knowledge.query_mode == AgentKnowledgeQueryMode.GENERATED_QUERY + + +def test_declared_outputs_support_file_check_and_failure_strategy(): + node_job = WorkflowNodeJobConfig.model_validate( + { + "declared_outputs": [ + { + "name": "analysis_report", + "type": "file", + "file": {"extensions": [".pdf"], "mime_types": ["application/pdf"]}, + "checks": [ + { + "type": "benchmark_file", + "prompt": "Report must include risk summary.", + "benchmark_file_ref": {"upload_file_id": "file-1"}, + } + ], + "failure_strategy": { + "on_type_check_failed": "fail_node", + "on_output_check_failed": "retry", + "max_retries": 1, + }, + } + ] + } + ) + + output = node_job.declared_outputs[0] + assert output.type == DeclaredOutputType.FILE + assert output.file is not None + assert output.file.extensions == [".pdf"] + assert output.checks[0].type == "benchmark_file" + assert output.failure_strategy is not None + assert output.failure_strategy.max_retries == 1 + + +def test_plaintext_secrets_are_rejected(): + config = AgentSoulConfig.model_validate({"env": {"variables": [{"name": "OPENAI_API_KEY", "api_key": "secret"}]}}) + + with pytest.raises(PlaintextSecretNotAllowedError): + ComposerConfigValidator.validate_agent_soul(config) + + +def test_workflow_agent_soul_config_strips_agent_app_only_fields(): + config = AgentComposerService._workflow_agent_soul_config( + { + "prompt": {"system_prompt": "answer carefully"}, + "app_features": {"conversation_opener": {"enabled": True}}, + "app_variables": [{"name": "company_name", "type": "string"}], + } + ) + + assert config["prompt"]["system_prompt"] == "answer carefully" + assert config["app_features"] == {} + assert config["app_variables"] == []