Files
dify/api/services/agent/composer_validator.py
zyssyz123 d9e90d0fa0 feat: add new agent (#36284)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-05-19 10:43:23 +00:00

72 lines
2.6 KiB
Python

from typing import Any
from pydantic import ValidationError
from services.agent.errors import AgentSoulLockedError, InvalidComposerConfigError, PlaintextSecretNotAllowedError
from services.entities.agent_entities import (
AgentSoulConfig,
ComposerSavePayload,
ComposerVariant,
WorkflowNodeJobConfig,
)
_PLAINTEXT_SECRET_KEYS = {
"api_key",
"apikey",
"authorization",
"password",
"secret",
"secret_key",
}
class ComposerConfigValidator:
@classmethod
def validate_save_payload(cls, payload: ComposerSavePayload) -> None:
if payload.variant == ComposerVariant.WORKFLOW and payload.soul_lock.locked and payload.agent_soul is not None:
raise AgentSoulLockedError()
if payload.agent_soul is not None:
cls.validate_agent_soul(payload.agent_soul)
if payload.node_job is not None:
cls.validate_node_job(payload.node_job)
@classmethod
def validate_agent_soul(cls, agent_soul: AgentSoulConfig) -> None:
cls._reject_plaintext_secrets(agent_soul.model_dump(mode="json"), path="agent_soul")
@classmethod
def validate_node_job(cls, node_job: WorkflowNodeJobConfig) -> None:
cls._reject_plaintext_secrets(node_job.model_dump(mode="json"), path="node_job")
@classmethod
def validate_agent_soul_dict(cls, value: dict[str, Any]) -> AgentSoulConfig:
try:
config = AgentSoulConfig.model_validate(value)
except ValidationError as exc:
raise InvalidComposerConfigError(str(exc)) from exc
cls.validate_agent_soul(config)
return config
@classmethod
def validate_node_job_dict(cls, value: dict[str, Any]) -> WorkflowNodeJobConfig:
try:
config = WorkflowNodeJobConfig.model_validate(value)
except ValidationError as exc:
raise InvalidComposerConfigError(str(exc)) from exc
cls.validate_node_job(config)
return config
@classmethod
def _reject_plaintext_secrets(cls, value: Any, *, path: str) -> None:
if isinstance(value, dict):
for key, nested in value.items():
normalized_key = key.lower().replace("-", "_")
nested_path = f"{path}.{key}"
if normalized_key in _PLAINTEXT_SECRET_KEYS and isinstance(nested, str) and nested:
raise PlaintextSecretNotAllowedError(f"Plaintext secret is not allowed at {nested_path}")
cls._reject_plaintext_secrets(nested, path=nested_path)
elif isinstance(value, list):
for index, nested in enumerate(value):
cls._reject_plaintext_secrets(nested, path=f"{path}[{index}]")