mirror of
https://github.com/langgenius/dify.git
synced 2026-05-21 17:20:25 +08:00
72 lines
2.6 KiB
Python
72 lines
2.6 KiB
Python
from typing import Any
|
|
|
|
from pydantic import ValidationError
|
|
|
|
from services.agent.errors import AgentSoulLockedError, InvalidComposerConfigError, PlaintextSecretNotAllowedError
|
|
from services.entities.agent_entities import (
|
|
AgentSoulConfig,
|
|
ComposerSavePayload,
|
|
ComposerVariant,
|
|
WorkflowNodeJobConfig,
|
|
)
|
|
|
|
_PLAINTEXT_SECRET_KEYS = {
|
|
"api_key",
|
|
"apikey",
|
|
"authorization",
|
|
"password",
|
|
"secret",
|
|
"secret_key",
|
|
}
|
|
|
|
|
|
class ComposerConfigValidator:
|
|
@classmethod
|
|
def validate_save_payload(cls, payload: ComposerSavePayload) -> None:
|
|
if payload.variant == ComposerVariant.WORKFLOW and payload.soul_lock.locked and payload.agent_soul is not None:
|
|
raise AgentSoulLockedError()
|
|
|
|
if payload.agent_soul is not None:
|
|
cls.validate_agent_soul(payload.agent_soul)
|
|
if payload.node_job is not None:
|
|
cls.validate_node_job(payload.node_job)
|
|
|
|
@classmethod
|
|
def validate_agent_soul(cls, agent_soul: AgentSoulConfig) -> None:
|
|
cls._reject_plaintext_secrets(agent_soul.model_dump(mode="json"), path="agent_soul")
|
|
|
|
@classmethod
|
|
def validate_node_job(cls, node_job: WorkflowNodeJobConfig) -> None:
|
|
cls._reject_plaintext_secrets(node_job.model_dump(mode="json"), path="node_job")
|
|
|
|
@classmethod
|
|
def validate_agent_soul_dict(cls, value: dict[str, Any]) -> AgentSoulConfig:
|
|
try:
|
|
config = AgentSoulConfig.model_validate(value)
|
|
except ValidationError as exc:
|
|
raise InvalidComposerConfigError(str(exc)) from exc
|
|
cls.validate_agent_soul(config)
|
|
return config
|
|
|
|
@classmethod
|
|
def validate_node_job_dict(cls, value: dict[str, Any]) -> WorkflowNodeJobConfig:
|
|
try:
|
|
config = WorkflowNodeJobConfig.model_validate(value)
|
|
except ValidationError as exc:
|
|
raise InvalidComposerConfigError(str(exc)) from exc
|
|
cls.validate_node_job(config)
|
|
return config
|
|
|
|
@classmethod
|
|
def _reject_plaintext_secrets(cls, value: Any, *, path: str) -> None:
|
|
if isinstance(value, dict):
|
|
for key, nested in value.items():
|
|
normalized_key = key.lower().replace("-", "_")
|
|
nested_path = f"{path}.{key}"
|
|
if normalized_key in _PLAINTEXT_SECRET_KEYS and isinstance(nested, str) and nested:
|
|
raise PlaintextSecretNotAllowedError(f"Plaintext secret is not allowed at {nested_path}")
|
|
cls._reject_plaintext_secrets(nested, path=nested_path)
|
|
elif isinstance(value, list):
|
|
for index, nested in enumerate(value):
|
|
cls._reject_plaintext_secrets(nested, path=f"{path}[{index}]")
|