mirror of
https://github.com/langgenius/dify.git
synced 2026-02-19 01:25:40 +08:00
Compare commits
26 Commits
feat/trigg
...
feat/chunk
| Author | SHA1 | Date | |
|---|---|---|---|
| 626e71cb3b | |||
| 07047487c3 | |||
| c7064d44af | |||
| e93bfe3d41 | |||
| ab910c736c | |||
| 4047a6bb12 | |||
| df2478dc26 | |||
| 4cc3f6045b | |||
| 1550316b8d | |||
| 87394d2512 | |||
| bad59c95bc | |||
| 9f138ef246 | |||
| 6453fc4973 | |||
| f62f926537 | |||
| b3dafd913b | |||
| b2d8a7eaf1 | |||
| 3e54414191 | |||
| a173546c8d | |||
| aa69d90489 | |||
| 4ba1292455 | |||
| bb01c31f30 | |||
| cd90b2ca9e | |||
| 9a65350cf7 | |||
| 680eb7a9f6 | |||
| 878420463c | |||
| 4692e20daf |
@ -1,16 +1,16 @@
|
||||
#!/bin/bash
|
||||
WORKSPACE_ROOT=$(pwd)
|
||||
|
||||
npm add -g pnpm@10.15.0
|
||||
corepack enable
|
||||
cd web && pnpm install
|
||||
pipx install uv
|
||||
|
||||
echo 'alias start-api="cd /workspaces/dify/api && uv run python -m flask run --host 0.0.0.0 --port=5001 --debug"' >> ~/.bashrc
|
||||
echo 'alias start-worker="cd /workspaces/dify/api && uv run python -m celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage"' >> ~/.bashrc
|
||||
echo 'alias start-web="cd /workspaces/dify/web && pnpm dev"' >> ~/.bashrc
|
||||
echo 'alias start-web-prod="cd /workspaces/dify/web && pnpm build && pnpm start"' >> ~/.bashrc
|
||||
echo 'alias start-containers="cd /workspaces/dify/docker && docker-compose -f docker-compose.middleware.yaml -p dify --env-file middleware.env up -d"' >> ~/.bashrc
|
||||
echo 'alias stop-containers="cd /workspaces/dify/docker && docker-compose -f docker-compose.middleware.yaml -p dify --env-file middleware.env down"' >> ~/.bashrc
|
||||
echo "alias start-api=\"cd $WORKSPACE_ROOT/api && uv run python -m flask run --host 0.0.0.0 --port=5001 --debug\"" >> ~/.bashrc
|
||||
echo "alias start-worker=\"cd $WORKSPACE_ROOT/api && uv run python -m celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage\"" >> ~/.bashrc
|
||||
echo "alias start-web=\"cd $WORKSPACE_ROOT/web && pnpm dev\"" >> ~/.bashrc
|
||||
echo "alias start-web-prod=\"cd $WORKSPACE_ROOT/web && pnpm build && pnpm start\"" >> ~/.bashrc
|
||||
echo "alias start-containers=\"cd $WORKSPACE_ROOT/docker && docker-compose -f docker-compose.middleware.yaml -p dify --env-file middleware.env up -d\"" >> ~/.bashrc
|
||||
echo "alias stop-containers=\"cd $WORKSPACE_ROOT/docker && docker-compose -f docker-compose.middleware.yaml -p dify --env-file middleware.env down\"" >> ~/.bashrc
|
||||
|
||||
source /home/vscode/.bashrc
|
||||
|
||||
|
||||
2
.github/workflows/autofix.yml
vendored
2
.github/workflows/autofix.yml
vendored
@ -2,8 +2,6 @@ name: autofix.ci
|
||||
on:
|
||||
pull_request:
|
||||
branches: ["main"]
|
||||
push:
|
||||
branches: ["main"]
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
|
||||
5
.gitignore
vendored
5
.gitignore
vendored
@ -147,6 +147,7 @@ api/.idea
|
||||
|
||||
api/.env
|
||||
api/storage/*
|
||||
api/Dockerfile.local
|
||||
|
||||
docker-legacy/volumes/app/storage/*
|
||||
docker-legacy/volumes/db/data/*
|
||||
@ -231,5 +232,7 @@ api/.env.backup
|
||||
# Benchmark
|
||||
scripts/stress-test/setup/config/
|
||||
scripts/stress-test/reports/
|
||||
|
||||
# mcp
|
||||
.serena
|
||||
.playwright-mcp/
|
||||
.serena/
|
||||
@ -85,4 +85,3 @@ pnpm test # Run Jest tests
|
||||
|
||||
- All async tasks use Celery with Redis as broker
|
||||
- **Internationalization**: Frontend supports multiple languages with English (`web/i18n/en-US/`) as the source. All user-facing text must use i18n keys, no hardcoded strings. Edit corresponding module files in `en-US/` directory for translations.
|
||||
- **Logging**: Never use `str(e)` in `logger.exception()` calls. Use `logger.exception("message", exc_info=e)` instead
|
||||
|
||||
5
Makefile
5
Makefile
@ -61,8 +61,9 @@ check:
|
||||
@echo "✅ Code check complete"
|
||||
|
||||
lint:
|
||||
@echo "🔧 Running ruff format and check with fixes..."
|
||||
@uv run --directory api --dev sh -c 'ruff format ./api && ruff check --fix ./api'
|
||||
@echo "🔧 Running ruff format, check with fixes, and import linter..."
|
||||
@uv run --project api --dev sh -c 'ruff format ./api && ruff check --fix ./api'
|
||||
@uv run --directory api --dev lint-imports
|
||||
@echo "✅ Linting complete"
|
||||
|
||||
type-check:
|
||||
|
||||
@ -436,9 +436,6 @@ HTTP_REQUEST_NODE_MAX_BINARY_SIZE=10485760
|
||||
HTTP_REQUEST_NODE_MAX_TEXT_SIZE=1048576
|
||||
HTTP_REQUEST_NODE_SSL_VERIFY=True
|
||||
|
||||
# Webhook request configuration
|
||||
WEBHOOK_REQUEST_BODY_MAX_SIZE=10485760
|
||||
|
||||
# Respect X-* headers to redirect clients
|
||||
RESPECT_XFORWARD_HEADERS_ENABLED=false
|
||||
|
||||
@ -517,12 +514,6 @@ ENABLE_CLEAN_MESSAGES=false
|
||||
ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK=false
|
||||
ENABLE_DATASETS_QUEUE_MONITOR=false
|
||||
ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK=true
|
||||
ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK=true
|
||||
# Interval time in minutes for polling scheduled workflows(default: 1 min)
|
||||
WORKFLOW_SCHEDULE_POLLER_INTERVAL=1
|
||||
WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE=100
|
||||
# Maximum number of scheduled workflows to dispatch per tick (0 for unlimited)
|
||||
WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK=0
|
||||
|
||||
# Position configuration
|
||||
POSITION_TOOL_PINS=
|
||||
|
||||
@ -30,6 +30,7 @@ select = [
|
||||
"RUF022", # unsorted-dunder-all
|
||||
"S506", # unsafe-yaml-load
|
||||
"SIM", # flake8-simplify rules
|
||||
"T201", # print-found
|
||||
"TRY400", # error-instead-of-exception
|
||||
"TRY401", # verbose-log-message
|
||||
"UP", # pyupgrade rules
|
||||
@ -91,11 +92,18 @@ ignore = [
|
||||
"configs/*" = [
|
||||
"N802", # invalid-function-name
|
||||
]
|
||||
"core/model_runtime/callbacks/base_callback.py" = [
|
||||
"T201",
|
||||
]
|
||||
"core/workflow/callbacks/workflow_logging_callback.py" = [
|
||||
"T201",
|
||||
]
|
||||
"libs/gmpy2_pkcs10aep_cipher.py" = [
|
||||
"N803", # invalid-argument-name
|
||||
]
|
||||
"tests/*" = [
|
||||
"F811", # redefined-while-unused
|
||||
"T201", # allow print in tests
|
||||
]
|
||||
|
||||
[lint.pyflakes]
|
||||
|
||||
2
api/.vscode/launch.json.example
vendored
2
api/.vscode/launch.json.example
vendored
@ -54,7 +54,7 @@
|
||||
"--loglevel",
|
||||
"DEBUG",
|
||||
"-Q",
|
||||
"dataset,generation,mail,ops_trace,app_deletion,workflow"
|
||||
"dataset,generation,mail,ops_trace,app_deletion"
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
@ -7,7 +7,7 @@ _logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _log(message: str):
|
||||
print(message, flush=True)
|
||||
_logger.debug(message)
|
||||
|
||||
|
||||
# grpc gevent
|
||||
|
||||
@ -14,12 +14,12 @@ from sqlalchemy.exc import SQLAlchemyError
|
||||
from configs import dify_config
|
||||
from constants.languages import languages
|
||||
from core.helper import encrypter
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
from core.plugin.impl.plugin import PluginInstaller
|
||||
from core.rag.datasource.vdb.vector_factory import Vector
|
||||
from core.rag.datasource.vdb.vector_type import VectorType
|
||||
from core.rag.index_processor.constant.built_in_field import BuiltInField
|
||||
from core.rag.models.document import Document
|
||||
from core.tools.entities.tool_entities import CredentialType
|
||||
from core.tools.utils.system_oauth_encryption import encrypt_system_oauth_params
|
||||
from events.app_event import app_was_created
|
||||
from extensions.ext_database import db
|
||||
@ -739,18 +739,18 @@ where sites.id is null limit 1000"""
|
||||
try:
|
||||
app = db.session.query(App).where(App.id == app_id).first()
|
||||
if not app:
|
||||
print(f"App {app_id} not found")
|
||||
logger.info("App %s not found", app_id)
|
||||
continue
|
||||
|
||||
tenant = app.tenant
|
||||
if tenant:
|
||||
accounts = tenant.get_accounts()
|
||||
if not accounts:
|
||||
print(f"Fix failed for app {app.id}")
|
||||
logger.info("Fix failed for app %s", app.id)
|
||||
continue
|
||||
|
||||
account = accounts[0]
|
||||
print(f"Fixing missing site for app {app.id}")
|
||||
logger.info("Fixing missing site for app %s", app.id)
|
||||
app_was_created.send(app, account=account)
|
||||
except Exception:
|
||||
failed_app_ids.append(app_id)
|
||||
@ -1227,55 +1227,6 @@ def setup_system_tool_oauth_client(provider, client_params):
|
||||
click.echo(click.style(f"OAuth client params setup successfully. id: {oauth_client.id}", fg="green"))
|
||||
|
||||
|
||||
@click.command("setup-system-trigger-oauth-client", help="Setup system trigger oauth client.")
|
||||
@click.option("--provider", prompt=True, help="Provider name")
|
||||
@click.option("--client-params", prompt=True, help="Client Params")
|
||||
def setup_system_trigger_oauth_client(provider, client_params):
|
||||
"""
|
||||
Setup system trigger oauth client
|
||||
"""
|
||||
from models.provider_ids import TriggerProviderID
|
||||
from models.trigger import TriggerOAuthSystemClient
|
||||
|
||||
provider_id = TriggerProviderID(provider)
|
||||
provider_name = provider_id.provider_name
|
||||
plugin_id = provider_id.plugin_id
|
||||
|
||||
try:
|
||||
# json validate
|
||||
click.echo(click.style(f"Validating client params: {client_params}", fg="yellow"))
|
||||
client_params_dict = TypeAdapter(dict[str, Any]).validate_json(client_params)
|
||||
click.echo(click.style("Client params validated successfully.", fg="green"))
|
||||
|
||||
click.echo(click.style(f"Encrypting client params: {client_params}", fg="yellow"))
|
||||
click.echo(click.style(f"Using SECRET_KEY: `{dify_config.SECRET_KEY}`", fg="yellow"))
|
||||
oauth_client_params = encrypt_system_oauth_params(client_params_dict)
|
||||
click.echo(click.style("Client params encrypted successfully.", fg="green"))
|
||||
except Exception as e:
|
||||
click.echo(click.style(f"Error parsing client params: {str(e)}", fg="red"))
|
||||
return
|
||||
|
||||
deleted_count = (
|
||||
db.session.query(TriggerOAuthSystemClient)
|
||||
.filter_by(
|
||||
provider=provider_name,
|
||||
plugin_id=plugin_id,
|
||||
)
|
||||
.delete()
|
||||
)
|
||||
if deleted_count > 0:
|
||||
click.echo(click.style(f"Deleted {deleted_count} existing oauth client params.", fg="yellow"))
|
||||
|
||||
oauth_client = TriggerOAuthSystemClient(
|
||||
provider=provider_name,
|
||||
plugin_id=plugin_id,
|
||||
encrypted_oauth_params=oauth_client_params,
|
||||
)
|
||||
db.session.add(oauth_client)
|
||||
db.session.commit()
|
||||
click.echo(click.style(f"OAuth client params setup successfully. id: {oauth_client.id}", fg="green"))
|
||||
|
||||
|
||||
def _find_orphaned_draft_variables(batch_size: int = 1000) -> list[str]:
|
||||
"""
|
||||
Find draft variables that reference non-existent apps.
|
||||
@ -1593,7 +1544,7 @@ def transform_datasource_credentials():
|
||||
if jina_plugin_id not in installed_plugins_ids:
|
||||
if jina_plugin_unique_identifier:
|
||||
# install jina plugin
|
||||
print(jina_plugin_unique_identifier)
|
||||
logger.debug("Installing Jina plugin %s", jina_plugin_unique_identifier)
|
||||
PluginService.install_from_marketplace_pkg(tenant_id, [jina_plugin_unique_identifier])
|
||||
|
||||
auth_count = 0
|
||||
|
||||
@ -154,17 +154,6 @@ class CodeExecutionSandboxConfig(BaseSettings):
|
||||
)
|
||||
|
||||
|
||||
class TriggerConfig(BaseSettings):
|
||||
"""
|
||||
Configuration for trigger
|
||||
"""
|
||||
|
||||
WEBHOOK_REQUEST_BODY_MAX_SIZE: PositiveInt = Field(
|
||||
description="Maximum allowed size for webhook request bodies in bytes",
|
||||
default=10485760,
|
||||
)
|
||||
|
||||
|
||||
class PluginConfig(BaseSettings):
|
||||
"""
|
||||
Plugin configs
|
||||
@ -961,22 +950,6 @@ class CeleryScheduleTasksConfig(BaseSettings):
|
||||
description="Enable check upgradable plugin task",
|
||||
default=True,
|
||||
)
|
||||
ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK: bool = Field(
|
||||
description="Enable workflow schedule poller task",
|
||||
default=True,
|
||||
)
|
||||
WORKFLOW_SCHEDULE_POLLER_INTERVAL: int = Field(
|
||||
description="Workflow schedule poller interval in minutes",
|
||||
default=1,
|
||||
)
|
||||
WORKFLOW_SCHEDULE_POLLER_BATCH_SIZE: int = Field(
|
||||
description="Maximum number of schedules to process in each poll batch",
|
||||
default=100,
|
||||
)
|
||||
WORKFLOW_SCHEDULE_MAX_DISPATCH_PER_TICK: int = Field(
|
||||
description="Maximum schedules to dispatch per tick (0=unlimited, circuit breaker)",
|
||||
default=0,
|
||||
)
|
||||
|
||||
|
||||
class PositionConfig(BaseSettings):
|
||||
@ -1100,7 +1073,6 @@ class FeatureConfig(
|
||||
AuthConfig, # Changed from OAuthConfig to AuthConfig
|
||||
BillingConfig,
|
||||
CodeExecutionSandboxConfig,
|
||||
TriggerConfig,
|
||||
PluginConfig,
|
||||
MarketplaceConfig,
|
||||
DataSetConfig,
|
||||
|
||||
@ -9,8 +9,6 @@ if TYPE_CHECKING:
|
||||
from core.model_runtime.entities.model_entities import AIModelEntity
|
||||
from core.plugin.entities.plugin_daemon import PluginModelProviderEntity
|
||||
from core.tools.plugin_tool.provider import PluginToolProviderController
|
||||
from core.trigger.provider import PluginTriggerProviderController
|
||||
from core.workflow.entities.variable_pool import VariablePool
|
||||
|
||||
|
||||
"""
|
||||
@ -43,11 +41,3 @@ datasource_plugin_providers: RecyclableContextVar[dict[str, "DatasourcePluginPro
|
||||
datasource_plugin_providers_lock: RecyclableContextVar[Lock] = RecyclableContextVar(
|
||||
ContextVar("datasource_plugin_providers_lock")
|
||||
)
|
||||
|
||||
plugin_trigger_providers: RecyclableContextVar[dict[str, "PluginTriggerProviderController"]] = RecyclableContextVar(
|
||||
ContextVar("plugin_trigger_providers")
|
||||
)
|
||||
|
||||
plugin_trigger_providers_lock: RecyclableContextVar[Lock] = RecyclableContextVar(
|
||||
ContextVar("plugin_trigger_providers_lock")
|
||||
)
|
||||
|
||||
@ -87,7 +87,6 @@ from .app import (
|
||||
workflow_draft_variable,
|
||||
workflow_run,
|
||||
workflow_statistic,
|
||||
workflow_trigger,
|
||||
)
|
||||
|
||||
# Import auth controllers
|
||||
@ -224,21 +223,6 @@ api.add_resource(
|
||||
|
||||
api.add_namespace(console_ns)
|
||||
|
||||
# Import workspace controllers
|
||||
from .workspace import (
|
||||
account,
|
||||
agent_providers,
|
||||
endpoint,
|
||||
load_balancing_config,
|
||||
members,
|
||||
model_providers,
|
||||
models,
|
||||
plugin,
|
||||
tool_providers,
|
||||
trigger_providers,
|
||||
workspace,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"account",
|
||||
"activate",
|
||||
@ -304,7 +288,6 @@ __all__ = [
|
||||
"statistic",
|
||||
"tags",
|
||||
"tool_providers",
|
||||
"trigger_providers",
|
||||
"version",
|
||||
"website",
|
||||
"workflow",
|
||||
|
||||
@ -12,7 +12,6 @@ from controllers.console.app.error import (
|
||||
)
|
||||
from controllers.console.wraps import account_initialization_required, setup_required
|
||||
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
|
||||
from core.helper.code_executor.code_node_provider import CodeNodeProvider
|
||||
from core.helper.code_executor.javascript.javascript_code_provider import JavascriptCodeProvider
|
||||
from core.helper.code_executor.python3.python3_code_provider import Python3CodeProvider
|
||||
from core.llm_generator.llm_generator import LLMGenerator
|
||||
@ -199,11 +198,13 @@ class InstructionGenerateApi(Resource):
|
||||
parser.add_argument("model_config", type=dict, required=True, nullable=False, location="json")
|
||||
parser.add_argument("ideal_output", type=str, required=False, default="", location="json")
|
||||
args = parser.parse_args()
|
||||
providers: list[type[CodeNodeProvider]] = [Python3CodeProvider, JavascriptCodeProvider]
|
||||
code_provider: type[CodeNodeProvider] | None = next(
|
||||
(p for p in providers if p.is_accept_language(args["language"])), None
|
||||
code_template = (
|
||||
Python3CodeProvider.get_default_code()
|
||||
if args["language"] == "python"
|
||||
else (JavascriptCodeProvider.get_default_code())
|
||||
if args["language"] == "javascript"
|
||||
else ""
|
||||
)
|
||||
code_template = code_provider.get_default_code() if code_provider else ""
|
||||
try:
|
||||
# Generate from nothing for a workflow node
|
||||
if (args["current"] == code_template or args["current"] == "") and args["node_id"] != "":
|
||||
|
||||
@ -62,6 +62,9 @@ class ChatMessageListApi(Resource):
|
||||
@account_initialization_required
|
||||
@marshal_with(message_infinite_scroll_pagination_fields)
|
||||
def get(self, app_model):
|
||||
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
|
||||
raise Forbidden()
|
||||
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("conversation_id", required=True, type=uuid_value, location="args")
|
||||
parser.add_argument("first_id", type=uuid_value, location="args")
|
||||
|
||||
@ -20,7 +20,6 @@ from core.app.apps.base_app_queue_manager import AppQueueManager
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from core.file.models import File
|
||||
from core.helper.trace_id_helper import get_external_trace_id
|
||||
from core.model_runtime.utils.encoders import jsonable_encoder
|
||||
from core.workflow.graph_engine.manager import GraphEngineManager
|
||||
from extensions.ext_database import db
|
||||
from factories import file_factory, variable_factory
|
||||
@ -36,7 +35,6 @@ from models.workflow import Workflow
|
||||
from services.app_generate_service import AppGenerateService
|
||||
from services.errors.app import WorkflowHashNotEqualError
|
||||
from services.errors.llm import InvokeRateLimitError
|
||||
from services.trigger_debug_service import TriggerDebugService
|
||||
from services.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError, WorkflowService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -1006,165 +1004,3 @@ class DraftWorkflowNodeLastRunApi(Resource):
|
||||
if node_exec is None:
|
||||
raise NotFound("last run not found")
|
||||
return node_exec
|
||||
|
||||
|
||||
class DraftWorkflowTriggerNodeApi(Resource):
|
||||
"""
|
||||
Single node debug - Polling API for trigger events
|
||||
Path: /apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/trigger
|
||||
"""
|
||||
|
||||
@api.doc("poll_draft_workflow_trigger_node")
|
||||
@api.doc(description="Poll for trigger events and execute single node when event arrives")
|
||||
@api.doc(params={
|
||||
"app_id": "Application ID",
|
||||
"node_id": "Node ID"
|
||||
})
|
||||
@api.expect(
|
||||
api.model(
|
||||
"DraftWorkflowTriggerNodeRequest",
|
||||
{
|
||||
"trigger_name": fields.String(required=True, description="Trigger name"),
|
||||
"subscription_id": fields.String(required=True, description="Subscription ID"),
|
||||
}
|
||||
)
|
||||
)
|
||||
@api.response(200, "Trigger event received and node executed successfully")
|
||||
@api.response(403, "Permission denied")
|
||||
@api.response(500, "Internal server error")
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=[AppMode.WORKFLOW])
|
||||
def post(self, app_model: App, node_id: str):
|
||||
"""
|
||||
Poll for trigger events and execute single node when event arrives
|
||||
"""
|
||||
if not isinstance(current_user, Account) or not current_user.is_editor:
|
||||
raise Forbidden()
|
||||
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("trigger_name", type=str, required=True, location="json", nullable=False)
|
||||
parser.add_argument("subscription_id", type=str, required=True, location="json", nullable=False)
|
||||
args = parser.parse_args()
|
||||
trigger_name = args["trigger_name"]
|
||||
subscription_id = args["subscription_id"]
|
||||
|
||||
event = TriggerDebugService.poll_event(
|
||||
tenant_id=app_model.tenant_id,
|
||||
user_id=current_user.id,
|
||||
app_id=app_model.id,
|
||||
subscription_id=subscription_id,
|
||||
node_id=node_id,
|
||||
trigger_name=trigger_name,
|
||||
)
|
||||
if not event:
|
||||
return jsonable_encoder({"status": "waiting"})
|
||||
|
||||
try:
|
||||
workflow_service = WorkflowService()
|
||||
draft_workflow = workflow_service.get_draft_workflow(app_model)
|
||||
if not draft_workflow:
|
||||
raise ValueError("Workflow not found")
|
||||
|
||||
user_inputs = event.model_dump()
|
||||
node_execution = workflow_service.run_draft_workflow_node(
|
||||
app_model=app_model,
|
||||
draft_workflow=draft_workflow,
|
||||
node_id=node_id,
|
||||
user_inputs=user_inputs,
|
||||
account=current_user,
|
||||
query="",
|
||||
files=[],
|
||||
)
|
||||
return jsonable_encoder(node_execution)
|
||||
except Exception:
|
||||
logger.exception("Error running draft workflow trigger node")
|
||||
return jsonable_encoder(
|
||||
{
|
||||
"status": "error",
|
||||
}
|
||||
), 500
|
||||
|
||||
|
||||
class DraftWorkflowTriggerRunApi(Resource):
|
||||
"""
|
||||
Full workflow debug - Polling API for trigger events
|
||||
Path: /apps/<uuid:app_id>/workflows/draft/trigger/run
|
||||
"""
|
||||
|
||||
@api.doc("poll_draft_workflow_trigger_run")
|
||||
@api.doc(description="Poll for trigger events and execute full workflow when event arrives")
|
||||
@api.doc(params={"app_id": "Application ID"})
|
||||
@api.expect(
|
||||
api.model(
|
||||
"DraftWorkflowTriggerRunRequest",
|
||||
{
|
||||
"node_id": fields.String(required=True, description="Node ID"),
|
||||
"trigger_name": fields.String(required=True, description="Trigger name"),
|
||||
"subscription_id": fields.String(required=True, description="Subscription ID"),
|
||||
}
|
||||
)
|
||||
)
|
||||
@api.response(200, "Trigger event received and workflow executed successfully")
|
||||
@api.response(403, "Permission denied")
|
||||
@api.response(500, "Internal server error")
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=[AppMode.WORKFLOW])
|
||||
def post(self, app_model: App):
|
||||
"""
|
||||
Poll for trigger events and execute full workflow when event arrives
|
||||
"""
|
||||
if not isinstance(current_user, Account) or not current_user.is_editor:
|
||||
raise Forbidden()
|
||||
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("node_id", type=str, required=True, location="json", nullable=False)
|
||||
parser.add_argument("trigger_name", type=str, required=True, location="json", nullable=False)
|
||||
parser.add_argument("subscription_id", type=str, required=True, location="json", nullable=False)
|
||||
args = parser.parse_args()
|
||||
node_id = args["node_id"]
|
||||
trigger_name = args["trigger_name"]
|
||||
subscription_id = args["subscription_id"]
|
||||
|
||||
event = TriggerDebugService.poll_event(
|
||||
tenant_id=app_model.tenant_id,
|
||||
user_id=current_user.id,
|
||||
app_id=app_model.id,
|
||||
subscription_id=subscription_id,
|
||||
node_id=node_id,
|
||||
trigger_name=trigger_name,
|
||||
)
|
||||
if not event:
|
||||
return jsonable_encoder({"status": "waiting"})
|
||||
|
||||
workflow_args = {
|
||||
"inputs": event.model_dump(),
|
||||
"query": "",
|
||||
"files": [],
|
||||
}
|
||||
external_trace_id = get_external_trace_id(request)
|
||||
if external_trace_id:
|
||||
workflow_args["external_trace_id"] = external_trace_id
|
||||
|
||||
try:
|
||||
response = AppGenerateService.generate(
|
||||
app_model=app_model,
|
||||
user=current_user,
|
||||
args=workflow_args,
|
||||
invoke_from=InvokeFrom.DEBUGGER,
|
||||
streaming=True,
|
||||
)
|
||||
return helper.compact_generate_response(response)
|
||||
except InvokeRateLimitError as ex:
|
||||
raise InvokeRateLimitHttpError(ex.description)
|
||||
except Exception:
|
||||
logger.exception("Error running draft workflow trigger run")
|
||||
return jsonable_encoder(
|
||||
{
|
||||
"status": "error",
|
||||
}
|
||||
), 500
|
||||
|
||||
|
||||
@ -1,249 +0,0 @@
|
||||
import logging
|
||||
|
||||
from flask_restx import Resource, marshal_with, reqparse
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
from werkzeug.exceptions import Forbidden, NotFound
|
||||
|
||||
from configs import dify_config
|
||||
from controllers.console import api
|
||||
from controllers.console.app.wraps import get_app_model
|
||||
from controllers.console.wraps import account_initialization_required, setup_required
|
||||
from core.model_runtime.utils.encoders import jsonable_encoder
|
||||
from extensions.ext_database import db
|
||||
from fields.workflow_trigger_fields import trigger_fields, triggers_list_fields, webhook_trigger_fields
|
||||
from libs.login import current_user, login_required
|
||||
from models.model import Account, AppMode
|
||||
from models.workflow import AppTrigger, AppTriggerStatus, WorkflowWebhookTrigger
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
from services.workflow_plugin_trigger_service import WorkflowPluginTriggerService
|
||||
|
||||
|
||||
class PluginTriggerApi(Resource):
|
||||
"""Workflow Plugin Trigger API"""
|
||||
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=AppMode.WORKFLOW)
|
||||
def post(self, app_model):
|
||||
"""Create plugin trigger"""
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("node_id", type=str, required=False, location="json")
|
||||
parser.add_argument("provider_id", type=str, required=False, location="json")
|
||||
parser.add_argument("trigger_name", type=str, required=False, location="json")
|
||||
parser.add_argument("subscription_id", type=str, required=False, location="json")
|
||||
args = parser.parse_args()
|
||||
|
||||
assert isinstance(current_user, Account)
|
||||
assert current_user.current_tenant_id is not None
|
||||
if not current_user.is_editor:
|
||||
raise Forbidden()
|
||||
|
||||
plugin_trigger = WorkflowPluginTriggerService.create_plugin_trigger(
|
||||
app_id=app_model.id,
|
||||
tenant_id=current_user.current_tenant_id,
|
||||
node_id=args["node_id"],
|
||||
provider_id=args["provider_id"],
|
||||
trigger_name=args["trigger_name"],
|
||||
subscription_id=args["subscription_id"],
|
||||
)
|
||||
|
||||
return jsonable_encoder(plugin_trigger)
|
||||
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=AppMode.WORKFLOW)
|
||||
def get(self, app_model):
|
||||
"""Get plugin trigger"""
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("node_id", type=str, required=True, help="Node ID is required")
|
||||
args = parser.parse_args()
|
||||
|
||||
plugin_trigger = WorkflowPluginTriggerService.get_plugin_trigger(
|
||||
app_id=app_model.id,
|
||||
node_id=args["node_id"],
|
||||
)
|
||||
|
||||
return jsonable_encoder(plugin_trigger)
|
||||
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=AppMode.WORKFLOW)
|
||||
def put(self, app_model):
|
||||
"""Update plugin trigger"""
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("node_id", type=str, required=True, help="Node ID is required")
|
||||
parser.add_argument("subscription_id", type=str, required=True, location="json", help="Subscription ID")
|
||||
args = parser.parse_args()
|
||||
|
||||
assert isinstance(current_user, Account)
|
||||
assert current_user.current_tenant_id is not None
|
||||
if not current_user.is_editor:
|
||||
raise Forbidden()
|
||||
|
||||
plugin_trigger = WorkflowPluginTriggerService.update_plugin_trigger(
|
||||
app_id=app_model.id,
|
||||
node_id=args["node_id"],
|
||||
subscription_id=args["subscription_id"],
|
||||
)
|
||||
|
||||
return jsonable_encoder(plugin_trigger)
|
||||
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=AppMode.WORKFLOW)
|
||||
def delete(self, app_model):
|
||||
"""Delete plugin trigger"""
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("node_id", type=str, required=True, help="Node ID is required")
|
||||
args = parser.parse_args()
|
||||
|
||||
assert isinstance(current_user, Account)
|
||||
assert current_user.current_tenant_id is not None
|
||||
if not current_user.is_editor:
|
||||
raise Forbidden()
|
||||
|
||||
WorkflowPluginTriggerService.delete_plugin_trigger(
|
||||
app_id=app_model.id,
|
||||
node_id=args["node_id"],
|
||||
)
|
||||
|
||||
return {"result": "success"}, 204
|
||||
|
||||
|
||||
class WebhookTriggerApi(Resource):
|
||||
"""Webhook Trigger API"""
|
||||
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=AppMode.WORKFLOW)
|
||||
@marshal_with(webhook_trigger_fields)
|
||||
def get(self, app_model):
|
||||
"""Get webhook trigger for a node"""
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("node_id", type=str, required=True, help="Node ID is required")
|
||||
args = parser.parse_args()
|
||||
|
||||
node_id = args["node_id"]
|
||||
|
||||
with Session(db.engine) as session:
|
||||
# Get webhook trigger for this app and node
|
||||
webhook_trigger = (
|
||||
session.query(WorkflowWebhookTrigger)
|
||||
.filter(
|
||||
WorkflowWebhookTrigger.app_id == app_model.id,
|
||||
WorkflowWebhookTrigger.node_id == node_id,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if not webhook_trigger:
|
||||
raise NotFound("Webhook trigger not found for this node")
|
||||
|
||||
# Add computed fields for marshal_with
|
||||
base_url = dify_config.SERVICE_API_URL
|
||||
webhook_trigger.webhook_url = f"{base_url}/triggers/webhook/{webhook_trigger.webhook_id}" # type: ignore
|
||||
webhook_trigger.webhook_debug_url = f"{base_url}/triggers/webhook-debug/{webhook_trigger.webhook_id}" # type: ignore
|
||||
|
||||
return webhook_trigger
|
||||
|
||||
|
||||
class AppTriggersApi(Resource):
|
||||
"""App Triggers list API"""
|
||||
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=AppMode.WORKFLOW)
|
||||
@marshal_with(triggers_list_fields)
|
||||
def get(self, app_model):
|
||||
"""Get app triggers list"""
|
||||
assert isinstance(current_user, Account)
|
||||
assert current_user.current_tenant_id is not None
|
||||
|
||||
with Session(db.engine) as session:
|
||||
# Get all triggers for this app using select API
|
||||
triggers = (
|
||||
session.execute(
|
||||
select(AppTrigger)
|
||||
.where(
|
||||
AppTrigger.tenant_id == current_user.current_tenant_id,
|
||||
AppTrigger.app_id == app_model.id,
|
||||
)
|
||||
.order_by(AppTrigger.created_at.desc(), AppTrigger.id.desc())
|
||||
)
|
||||
.scalars()
|
||||
.all()
|
||||
)
|
||||
|
||||
# Add computed icon field for each trigger
|
||||
url_prefix = dify_config.CONSOLE_API_URL + "/console/api/workspaces/current/tool-provider/builtin/"
|
||||
for trigger in triggers:
|
||||
if trigger.trigger_type == "trigger-plugin":
|
||||
trigger.icon = url_prefix + trigger.provider_name + "/icon" # type: ignore
|
||||
else:
|
||||
trigger.icon = "" # type: ignore
|
||||
|
||||
return {"data": triggers}
|
||||
|
||||
|
||||
class AppTriggerEnableApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=AppMode.WORKFLOW)
|
||||
@marshal_with(trigger_fields)
|
||||
def post(self, app_model):
|
||||
"""Update app trigger (enable/disable)"""
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("trigger_id", type=str, required=True, nullable=False, location="json")
|
||||
parser.add_argument("enable_trigger", type=bool, required=True, nullable=False, location="json")
|
||||
args = parser.parse_args()
|
||||
|
||||
assert isinstance(current_user, Account)
|
||||
assert current_user.current_tenant_id is not None
|
||||
if not current_user.is_editor:
|
||||
raise Forbidden()
|
||||
|
||||
trigger_id = args["trigger_id"]
|
||||
|
||||
with Session(db.engine) as session:
|
||||
# Find the trigger using select
|
||||
trigger = session.execute(
|
||||
select(AppTrigger).where(
|
||||
AppTrigger.id == trigger_id,
|
||||
AppTrigger.tenant_id == current_user.current_tenant_id,
|
||||
AppTrigger.app_id == app_model.id,
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
|
||||
if not trigger:
|
||||
raise NotFound("Trigger not found")
|
||||
|
||||
# Update status based on enable_trigger boolean
|
||||
trigger.status = AppTriggerStatus.ENABLED if args["enable_trigger"] else AppTriggerStatus.DISABLED
|
||||
|
||||
session.commit()
|
||||
session.refresh(trigger)
|
||||
|
||||
# Add computed icon field
|
||||
url_prefix = dify_config.CONSOLE_API_URL + "/console/api/workspaces/current/tool-provider/builtin/"
|
||||
if trigger.trigger_type == "trigger-plugin":
|
||||
trigger.icon = url_prefix + trigger.provider_name + "/icon" # type: ignore
|
||||
else:
|
||||
trigger.icon = "" # type: ignore
|
||||
|
||||
return trigger
|
||||
|
||||
|
||||
api.add_resource(WebhookTriggerApi, "/apps/<uuid:app_id>/workflows/triggers/webhook")
|
||||
api.add_resource(PluginTriggerApi, "/apps/<uuid:app_id>/workflows/triggers/plugin")
|
||||
api.add_resource(AppTriggersApi, "/apps/<uuid:app_id>/triggers")
|
||||
api.add_resource(AppTriggerEnableApi, "/apps/<uuid:app_id>/trigger-enable")
|
||||
@ -516,20 +516,18 @@ class PluginFetchDynamicSelectOptionsApi(Resource):
|
||||
parser.add_argument("provider", type=str, required=True, location="args")
|
||||
parser.add_argument("action", type=str, required=True, location="args")
|
||||
parser.add_argument("parameter", type=str, required=True, location="args")
|
||||
parser.add_argument("credential_id", type=str, required=False, location="args")
|
||||
parser.add_argument("provider_type", type=str, required=True, location="args")
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
options = PluginParameterService.get_dynamic_select_options(
|
||||
tenant_id=tenant_id,
|
||||
user_id=user_id,
|
||||
plugin_id=args["plugin_id"],
|
||||
provider=args["provider"],
|
||||
action=args["action"],
|
||||
parameter=args["parameter"],
|
||||
credential_id=args["credential_id"],
|
||||
provider_type=args["provider_type"],
|
||||
tenant_id,
|
||||
user_id,
|
||||
args["plugin_id"],
|
||||
args["provider"],
|
||||
args["action"],
|
||||
args["parameter"],
|
||||
args["provider_type"],
|
||||
)
|
||||
except PluginDaemonClientSideError as e:
|
||||
raise ValueError(e)
|
||||
|
||||
@ -21,8 +21,8 @@ from core.mcp.auth.auth_provider import OAuthClientProvider
|
||||
from core.mcp.error import MCPAuthError, MCPError
|
||||
from core.mcp.mcp_client import MCPClient
|
||||
from core.model_runtime.utils.encoders import jsonable_encoder
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
from core.plugin.impl.oauth import OAuthHandler
|
||||
from core.tools.entities.tool_entities import CredentialType
|
||||
from libs.helper import StrLen, alphanumeric, uuid_value
|
||||
from libs.login import login_required
|
||||
from models.provider_ids import ToolProviderID
|
||||
|
||||
@ -1,589 +0,0 @@
|
||||
import logging
|
||||
|
||||
from flask import make_response, redirect, request
|
||||
from flask_restx import Resource, reqparse
|
||||
from sqlalchemy.orm import Session
|
||||
from werkzeug.exceptions import BadRequest, Forbidden
|
||||
|
||||
from configs import dify_config
|
||||
from controllers.console import api
|
||||
from controllers.console.wraps import account_initialization_required, setup_required
|
||||
from core.model_runtime.utils.encoders import jsonable_encoder
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
from core.plugin.impl.oauth import OAuthHandler
|
||||
from core.trigger.entities.entities import SubscriptionBuilderUpdater
|
||||
from core.trigger.trigger_manager import TriggerManager
|
||||
from extensions.ext_database import db
|
||||
from libs.login import current_user, login_required
|
||||
from models.account import Account
|
||||
from models.provider_ids import TriggerProviderID
|
||||
from services.plugin.oauth_service import OAuthProxyService
|
||||
from services.trigger.trigger_provider_service import TriggerProviderService
|
||||
from services.trigger.trigger_subscription_builder_service import TriggerSubscriptionBuilderService
|
||||
from services.workflow_plugin_trigger_service import WorkflowPluginTriggerService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TriggerProviderListApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def get(self):
|
||||
"""List all trigger providers for the current tenant"""
|
||||
user = current_user
|
||||
assert isinstance(user, Account)
|
||||
assert user.current_tenant_id is not None
|
||||
return jsonable_encoder(TriggerProviderService.list_trigger_providers(user.current_tenant_id))
|
||||
|
||||
|
||||
class TriggerProviderInfoApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def get(self, provider):
|
||||
"""Get info for a trigger provider"""
|
||||
user = current_user
|
||||
assert isinstance(user, Account)
|
||||
assert user.current_tenant_id is not None
|
||||
return jsonable_encoder(
|
||||
TriggerProviderService.get_trigger_provider(user.current_tenant_id, TriggerProviderID(provider))
|
||||
)
|
||||
|
||||
|
||||
class TriggerSubscriptionListApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def get(self, provider):
|
||||
"""List all trigger subscriptions for the current tenant's provider"""
|
||||
user = current_user
|
||||
assert isinstance(user, Account)
|
||||
assert user.current_tenant_id is not None
|
||||
if not user.is_admin_or_owner:
|
||||
raise Forbidden()
|
||||
|
||||
try:
|
||||
return jsonable_encoder(
|
||||
TriggerProviderService.list_trigger_provider_subscriptions(
|
||||
tenant_id=user.current_tenant_id, provider_id=TriggerProviderID(provider)
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception("Error listing trigger providers", exc_info=e)
|
||||
raise
|
||||
|
||||
|
||||
class TriggerSubscriptionBuilderCreateApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def post(self, provider):
|
||||
"""Add a new subscription instance for a trigger provider"""
|
||||
user = current_user
|
||||
assert isinstance(user, Account)
|
||||
assert user.current_tenant_id is not None
|
||||
if not user.is_admin_or_owner:
|
||||
raise Forbidden()
|
||||
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("credential_type", type=str, required=False, nullable=True, location="json")
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
credential_type = CredentialType.of(args.get("credential_type") or CredentialType.UNAUTHORIZED.value)
|
||||
subscription_builder = TriggerSubscriptionBuilderService.create_trigger_subscription_builder(
|
||||
tenant_id=user.current_tenant_id,
|
||||
user_id=user.id,
|
||||
provider_id=TriggerProviderID(provider),
|
||||
credential_type=credential_type,
|
||||
)
|
||||
return jsonable_encoder({"subscription_builder": subscription_builder})
|
||||
except ValueError as e:
|
||||
raise BadRequest(str(e))
|
||||
except Exception as e:
|
||||
logger.exception("Error adding provider credential", exc_info=e)
|
||||
raise
|
||||
|
||||
|
||||
class TriggerSubscriptionBuilderGetApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def get(self, provider, subscription_builder_id):
|
||||
"""Get a subscription instance for a trigger provider"""
|
||||
return jsonable_encoder(
|
||||
TriggerSubscriptionBuilderService.get_subscription_builder_by_id(subscription_builder_id)
|
||||
)
|
||||
|
||||
|
||||
class TriggerSubscriptionBuilderVerifyApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def post(self, provider, subscription_builder_id):
|
||||
"""Verify a subscription instance for a trigger provider"""
|
||||
user = current_user
|
||||
assert isinstance(user, Account)
|
||||
assert user.current_tenant_id is not None
|
||||
if not user.is_admin_or_owner:
|
||||
raise Forbidden()
|
||||
|
||||
parser = reqparse.RequestParser()
|
||||
# The credentials of the subscription builder
|
||||
parser.add_argument("credentials", type=dict, required=False, nullable=True, location="json")
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
TriggerSubscriptionBuilderService.update_trigger_subscription_builder(
|
||||
tenant_id=user.current_tenant_id,
|
||||
provider_id=TriggerProviderID(provider),
|
||||
subscription_builder_id=subscription_builder_id,
|
||||
subscription_builder_updater=SubscriptionBuilderUpdater(
|
||||
credentials=args.get("credentials", None),
|
||||
),
|
||||
)
|
||||
return TriggerSubscriptionBuilderService.verify_trigger_subscription_builder(
|
||||
tenant_id=user.current_tenant_id,
|
||||
user_id=user.id,
|
||||
provider_id=TriggerProviderID(provider),
|
||||
subscription_builder_id=subscription_builder_id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception("Error verifying provider credential", exc_info=e)
|
||||
raise
|
||||
|
||||
|
||||
class TriggerSubscriptionBuilderUpdateApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def post(self, provider, subscription_builder_id):
|
||||
"""Update a subscription instance for a trigger provider"""
|
||||
user = current_user
|
||||
assert isinstance(user, Account)
|
||||
assert user.current_tenant_id is not None
|
||||
|
||||
parser = reqparse.RequestParser()
|
||||
# The name of the subscription builder
|
||||
parser.add_argument("name", type=str, required=False, nullable=True, location="json")
|
||||
# The parameters of the subscription builder
|
||||
parser.add_argument("parameters", type=dict, required=False, nullable=True, location="json")
|
||||
# The properties of the subscription builder
|
||||
parser.add_argument("properties", type=dict, required=False, nullable=True, location="json")
|
||||
# The credentials of the subscription builder
|
||||
parser.add_argument("credentials", type=dict, required=False, nullable=True, location="json")
|
||||
args = parser.parse_args()
|
||||
try:
|
||||
return jsonable_encoder(
|
||||
TriggerSubscriptionBuilderService.update_trigger_subscription_builder(
|
||||
tenant_id=user.current_tenant_id,
|
||||
provider_id=TriggerProviderID(provider),
|
||||
subscription_builder_id=subscription_builder_id,
|
||||
subscription_builder_updater=SubscriptionBuilderUpdater(
|
||||
name=args.get("name", None),
|
||||
parameters=args.get("parameters", None),
|
||||
properties=args.get("properties", None),
|
||||
credentials=args.get("credentials", None),
|
||||
),
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception("Error updating provider credential", exc_info=e)
|
||||
raise
|
||||
|
||||
|
||||
class TriggerSubscriptionBuilderLogsApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def get(self, provider, subscription_builder_id):
|
||||
"""Get the request logs for a subscription instance for a trigger provider"""
|
||||
user = current_user
|
||||
assert isinstance(user, Account)
|
||||
assert user.current_tenant_id is not None
|
||||
|
||||
try:
|
||||
logs = TriggerSubscriptionBuilderService.list_logs(subscription_builder_id)
|
||||
return jsonable_encoder({"logs": [log.model_dump(mode="json") for log in logs]})
|
||||
except Exception as e:
|
||||
logger.exception("Error getting request logs for subscription builder", exc_info=e)
|
||||
raise
|
||||
|
||||
|
||||
class TriggerSubscriptionBuilderBuildApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def post(self, provider, subscription_builder_id):
|
||||
"""Build a subscription instance for a trigger provider"""
|
||||
user = current_user
|
||||
assert isinstance(user, Account)
|
||||
assert user.current_tenant_id is not None
|
||||
if not user.is_admin_or_owner:
|
||||
raise Forbidden()
|
||||
|
||||
parser = reqparse.RequestParser()
|
||||
# The name of the subscription builder
|
||||
parser.add_argument("name", type=str, required=False, nullable=True, location="json")
|
||||
# The parameters of the subscription builder
|
||||
parser.add_argument("parameters", type=dict, required=False, nullable=True, location="json")
|
||||
# The properties of the subscription builder
|
||||
parser.add_argument("properties", type=dict, required=False, nullable=True, location="json")
|
||||
# The credentials of the subscription builder
|
||||
parser.add_argument("credentials", type=dict, required=False, nullable=True, location="json")
|
||||
args = parser.parse_args()
|
||||
try:
|
||||
TriggerSubscriptionBuilderService.update_trigger_subscription_builder(
|
||||
tenant_id=user.current_tenant_id,
|
||||
provider_id=TriggerProviderID(provider),
|
||||
subscription_builder_id=subscription_builder_id,
|
||||
subscription_builder_updater=SubscriptionBuilderUpdater(
|
||||
name=args.get("name", None),
|
||||
parameters=args.get("parameters", None),
|
||||
properties=args.get("properties", None),
|
||||
),
|
||||
)
|
||||
TriggerSubscriptionBuilderService.build_trigger_subscription_builder(
|
||||
tenant_id=user.current_tenant_id,
|
||||
user_id=user.id,
|
||||
provider_id=TriggerProviderID(provider),
|
||||
subscription_builder_id=subscription_builder_id,
|
||||
)
|
||||
return 200
|
||||
except ValueError as e:
|
||||
raise BadRequest(str(e))
|
||||
except Exception as e:
|
||||
logger.exception("Error building provider credential", exc_info=e)
|
||||
raise
|
||||
|
||||
|
||||
class TriggerSubscriptionDeleteApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def post(self, subscription_id):
|
||||
"""Delete a subscription instance"""
|
||||
user = current_user
|
||||
assert isinstance(user, Account)
|
||||
assert user.current_tenant_id is not None
|
||||
if not user.is_admin_or_owner:
|
||||
raise Forbidden()
|
||||
|
||||
try:
|
||||
with Session(db.engine) as session:
|
||||
# Delete trigger provider subscription
|
||||
TriggerProviderService.delete_trigger_provider(
|
||||
session=session,
|
||||
tenant_id=user.current_tenant_id,
|
||||
subscription_id=subscription_id,
|
||||
)
|
||||
# Delete plugin triggers
|
||||
WorkflowPluginTriggerService.delete_plugin_trigger_by_subscription(
|
||||
session=session,
|
||||
tenant_id=user.current_tenant_id,
|
||||
subscription_id=subscription_id,
|
||||
)
|
||||
session.commit()
|
||||
return {"result": "success"}
|
||||
except ValueError as e:
|
||||
raise BadRequest(str(e))
|
||||
except Exception as e:
|
||||
logger.exception("Error deleting provider credential", exc_info=e)
|
||||
raise
|
||||
|
||||
|
||||
class TriggerOAuthAuthorizeApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def get(self, provider):
|
||||
"""Initiate OAuth authorization flow for a trigger provider"""
|
||||
user = current_user
|
||||
assert isinstance(user, Account)
|
||||
assert user.current_tenant_id is not None
|
||||
|
||||
try:
|
||||
provider_id = TriggerProviderID(provider)
|
||||
plugin_id = provider_id.plugin_id
|
||||
provider_name = provider_id.provider_name
|
||||
tenant_id = user.current_tenant_id
|
||||
|
||||
# Get OAuth client configuration
|
||||
oauth_client_params = TriggerProviderService.get_oauth_client(
|
||||
tenant_id=tenant_id,
|
||||
provider_id=provider_id,
|
||||
)
|
||||
|
||||
if oauth_client_params is None:
|
||||
raise Forbidden("No OAuth client configuration found for this trigger provider")
|
||||
|
||||
# Create subscription builder
|
||||
subscription_builder = TriggerSubscriptionBuilderService.create_trigger_subscription_builder(
|
||||
tenant_id=tenant_id,
|
||||
user_id=user.id,
|
||||
provider_id=provider_id,
|
||||
credential_type=CredentialType.OAUTH2,
|
||||
)
|
||||
|
||||
# Create OAuth handler and proxy context
|
||||
oauth_handler = OAuthHandler()
|
||||
context_id = OAuthProxyService.create_proxy_context(
|
||||
user_id=user.id,
|
||||
tenant_id=tenant_id,
|
||||
plugin_id=plugin_id,
|
||||
provider=provider_name,
|
||||
extra_data={
|
||||
"subscription_builder_id": subscription_builder.id,
|
||||
},
|
||||
)
|
||||
|
||||
# Build redirect URI for callback
|
||||
redirect_uri = f"{dify_config.CONSOLE_API_URL}/console/api/oauth/plugin/{provider}/trigger/callback"
|
||||
|
||||
# Get authorization URL
|
||||
authorization_url_response = oauth_handler.get_authorization_url(
|
||||
tenant_id=tenant_id,
|
||||
user_id=user.id,
|
||||
plugin_id=plugin_id,
|
||||
provider=provider_name,
|
||||
redirect_uri=redirect_uri,
|
||||
system_credentials=oauth_client_params,
|
||||
)
|
||||
|
||||
# Create response with cookie
|
||||
response = make_response(
|
||||
jsonable_encoder(
|
||||
{
|
||||
"authorization_url": authorization_url_response.authorization_url,
|
||||
"subscription_builder_id": subscription_builder.id,
|
||||
"subscription_builder": subscription_builder,
|
||||
}
|
||||
)
|
||||
)
|
||||
response.set_cookie(
|
||||
"context_id",
|
||||
context_id,
|
||||
httponly=True,
|
||||
samesite="Lax",
|
||||
max_age=OAuthProxyService.__MAX_AGE__,
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Error initiating OAuth flow", exc_info=e)
|
||||
raise
|
||||
|
||||
|
||||
class TriggerOAuthCallbackApi(Resource):
|
||||
@setup_required
|
||||
def get(self, provider):
|
||||
"""Handle OAuth callback for trigger provider"""
|
||||
context_id = request.cookies.get("context_id")
|
||||
if not context_id:
|
||||
raise Forbidden("context_id not found")
|
||||
|
||||
# Use and validate proxy context
|
||||
context = OAuthProxyService.use_proxy_context(context_id)
|
||||
if context is None:
|
||||
raise Forbidden("Invalid context_id")
|
||||
|
||||
# Parse provider ID
|
||||
provider_id = TriggerProviderID(provider)
|
||||
plugin_id = provider_id.plugin_id
|
||||
provider_name = provider_id.provider_name
|
||||
user_id = context.get("user_id")
|
||||
tenant_id = context.get("tenant_id")
|
||||
subscription_builder_id = context.get("subscription_builder_id")
|
||||
|
||||
# Get OAuth client configuration
|
||||
oauth_client_params = TriggerProviderService.get_oauth_client(
|
||||
tenant_id=tenant_id,
|
||||
provider_id=provider_id,
|
||||
)
|
||||
|
||||
if oauth_client_params is None:
|
||||
raise Forbidden("No OAuth client configuration found for this trigger provider")
|
||||
|
||||
# Get OAuth credentials from callback
|
||||
oauth_handler = OAuthHandler()
|
||||
redirect_uri = f"{dify_config.CONSOLE_API_URL}/console/api/oauth/plugin/{provider}/trigger/callback"
|
||||
|
||||
credentials_response = oauth_handler.get_credentials(
|
||||
tenant_id=tenant_id,
|
||||
user_id=user_id,
|
||||
plugin_id=plugin_id,
|
||||
provider=provider_name,
|
||||
redirect_uri=redirect_uri,
|
||||
system_credentials=oauth_client_params,
|
||||
request=request,
|
||||
)
|
||||
|
||||
credentials = credentials_response.credentials
|
||||
expires_at = credentials_response.expires_at
|
||||
|
||||
if not credentials:
|
||||
raise Exception("Failed to get OAuth credentials")
|
||||
|
||||
# Update subscription builder
|
||||
TriggerSubscriptionBuilderService.update_trigger_subscription_builder(
|
||||
tenant_id=tenant_id,
|
||||
provider_id=provider_id,
|
||||
subscription_builder_id=subscription_builder_id,
|
||||
subscription_builder_updater=SubscriptionBuilderUpdater(
|
||||
credentials=credentials,
|
||||
credential_expires_at=expires_at,
|
||||
),
|
||||
)
|
||||
# Redirect to OAuth callback page
|
||||
return redirect(f"{dify_config.CONSOLE_WEB_URL}/oauth-callback")
|
||||
|
||||
|
||||
class TriggerOAuthClientManageApi(Resource):
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def get(self, provider):
|
||||
"""Get OAuth client configuration for a provider"""
|
||||
user = current_user
|
||||
assert isinstance(user, Account)
|
||||
assert user.current_tenant_id is not None
|
||||
if not user.is_admin_or_owner:
|
||||
raise Forbidden()
|
||||
|
||||
try:
|
||||
provider_id = TriggerProviderID(provider)
|
||||
|
||||
# Get custom OAuth client params if exists
|
||||
custom_params = TriggerProviderService.get_custom_oauth_client_params(
|
||||
tenant_id=user.current_tenant_id,
|
||||
provider_id=provider_id,
|
||||
)
|
||||
|
||||
# Check if custom client is enabled
|
||||
is_custom_enabled = TriggerProviderService.is_oauth_custom_client_enabled(
|
||||
tenant_id=user.current_tenant_id,
|
||||
provider_id=provider_id,
|
||||
)
|
||||
|
||||
# Check if there's a system OAuth client
|
||||
system_client = TriggerProviderService.get_oauth_client(
|
||||
tenant_id=user.current_tenant_id,
|
||||
provider_id=provider_id,
|
||||
)
|
||||
provider_controller = TriggerManager.get_trigger_provider(user.current_tenant_id, provider_id)
|
||||
redirect_uri = f"{dify_config.CONSOLE_API_URL}/console/api/oauth/plugin/{provider}/trigger/callback"
|
||||
return jsonable_encoder(
|
||||
{
|
||||
"configured": bool(custom_params or system_client),
|
||||
"oauth_client_schema": provider_controller.get_oauth_client_schema(),
|
||||
"custom_configured": bool(custom_params),
|
||||
"custom_enabled": is_custom_enabled,
|
||||
"redirect_uri": redirect_uri,
|
||||
"params": custom_params or {},
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Error getting OAuth client", exc_info=e)
|
||||
raise
|
||||
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def post(self, provider):
|
||||
"""Configure custom OAuth client for a provider"""
|
||||
user = current_user
|
||||
assert isinstance(user, Account)
|
||||
assert user.current_tenant_id is not None
|
||||
if not user.is_admin_or_owner:
|
||||
raise Forbidden()
|
||||
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("client_params", type=dict, required=False, nullable=True, location="json")
|
||||
parser.add_argument("enabled", type=bool, required=False, nullable=True, location="json")
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
provider_id = TriggerProviderID(provider)
|
||||
return TriggerProviderService.save_custom_oauth_client_params(
|
||||
tenant_id=user.current_tenant_id,
|
||||
provider_id=provider_id,
|
||||
client_params=args.get("client_params"),
|
||||
enabled=args.get("enabled"),
|
||||
)
|
||||
|
||||
except ValueError as e:
|
||||
raise BadRequest(str(e))
|
||||
except Exception as e:
|
||||
logger.exception("Error configuring OAuth client", exc_info=e)
|
||||
raise
|
||||
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def delete(self, provider):
|
||||
"""Remove custom OAuth client configuration"""
|
||||
user = current_user
|
||||
assert isinstance(user, Account)
|
||||
assert user.current_tenant_id is not None
|
||||
if not user.is_admin_or_owner:
|
||||
raise Forbidden()
|
||||
|
||||
try:
|
||||
provider_id = TriggerProviderID(provider)
|
||||
|
||||
return TriggerProviderService.delete_custom_oauth_client_params(
|
||||
tenant_id=user.current_tenant_id,
|
||||
provider_id=provider_id,
|
||||
)
|
||||
except ValueError as e:
|
||||
raise BadRequest(str(e))
|
||||
except Exception as e:
|
||||
logger.exception("Error removing OAuth client", exc_info=e)
|
||||
raise
|
||||
|
||||
|
||||
# Trigger Subscription
|
||||
api.add_resource(TriggerProviderListApi, "/workspaces/current/triggers")
|
||||
api.add_resource(TriggerProviderInfoApi, "/workspaces/current/trigger-provider/<path:provider>/info")
|
||||
api.add_resource(TriggerSubscriptionListApi, "/workspaces/current/trigger-provider/<path:provider>/subscriptions/list")
|
||||
api.add_resource(
|
||||
TriggerSubscriptionDeleteApi,
|
||||
"/workspaces/current/trigger-provider/<path:subscription_id>/subscriptions/delete",
|
||||
)
|
||||
|
||||
# Trigger Subscription Builder
|
||||
api.add_resource(
|
||||
TriggerSubscriptionBuilderCreateApi,
|
||||
"/workspaces/current/trigger-provider/<path:provider>/subscriptions/builder/create",
|
||||
)
|
||||
api.add_resource(
|
||||
TriggerSubscriptionBuilderGetApi,
|
||||
"/workspaces/current/trigger-provider/<path:provider>/subscriptions/builder/<path:subscription_builder_id>",
|
||||
)
|
||||
api.add_resource(
|
||||
TriggerSubscriptionBuilderUpdateApi,
|
||||
"/workspaces/current/trigger-provider/<path:provider>/subscriptions/builder/update/<path:subscription_builder_id>",
|
||||
)
|
||||
api.add_resource(
|
||||
TriggerSubscriptionBuilderVerifyApi,
|
||||
"/workspaces/current/trigger-provider/<path:provider>/subscriptions/builder/verify/<path:subscription_builder_id>",
|
||||
)
|
||||
api.add_resource(
|
||||
TriggerSubscriptionBuilderBuildApi,
|
||||
"/workspaces/current/trigger-provider/<path:provider>/subscriptions/builder/build/<path:subscription_builder_id>",
|
||||
)
|
||||
api.add_resource(
|
||||
TriggerSubscriptionBuilderLogsApi,
|
||||
"/workspaces/current/trigger-provider/<path:provider>/subscriptions/builder/logs/<path:subscription_builder_id>",
|
||||
)
|
||||
|
||||
|
||||
# OAuth
|
||||
api.add_resource(
|
||||
TriggerOAuthAuthorizeApi, "/workspaces/current/trigger-provider/<path:provider>/subscriptions/oauth/authorize"
|
||||
)
|
||||
api.add_resource(TriggerOAuthCallbackApi, "/oauth/plugin/<path:provider>/trigger/callback")
|
||||
api.add_resource(TriggerOAuthClientManageApi, "/workspaces/current/trigger-provider/<path:provider>/oauth/client")
|
||||
@ -9,9 +9,10 @@ from controllers.console.app.mcp_server import AppMCPServerStatus
|
||||
from controllers.mcp import mcp_ns
|
||||
from core.app.app_config.entities import VariableEntity
|
||||
from core.mcp import types as mcp_types
|
||||
from core.mcp.server.streamable_http import handle_mcp_request
|
||||
from extensions.ext_database import db
|
||||
from libs import helper
|
||||
from models.model import App, AppMCPServer, AppMode
|
||||
from models.model import App, AppMCPServer, AppMode, EndUser
|
||||
|
||||
|
||||
class MCPRequestError(Exception):
|
||||
@ -194,6 +195,50 @@ class MCPAppApi(Resource):
|
||||
except ValidationError as e:
|
||||
raise MCPRequestError(mcp_types.INVALID_PARAMS, f"Invalid MCP request: {str(e)}")
|
||||
|
||||
mcp_server_handler = MCPServerStreamableHTTPRequestHandler(app, request, converted_user_input_form)
|
||||
response = mcp_server_handler.handle()
|
||||
return helper.compact_generate_response(response)
|
||||
def _retrieve_end_user(self, tenant_id: str, mcp_server_id: str, session: Session) -> EndUser | None:
|
||||
"""Get end user from existing session - optimized query"""
|
||||
return (
|
||||
session.query(EndUser)
|
||||
.where(EndUser.tenant_id == tenant_id)
|
||||
.where(EndUser.session_id == mcp_server_id)
|
||||
.where(EndUser.type == "mcp")
|
||||
.first()
|
||||
)
|
||||
|
||||
def _create_end_user(
|
||||
self, client_name: str, tenant_id: str, app_id: str, mcp_server_id: str, session: Session
|
||||
) -> EndUser:
|
||||
"""Create end user in existing session"""
|
||||
end_user = EndUser(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
type="mcp",
|
||||
name=client_name,
|
||||
session_id=mcp_server_id,
|
||||
)
|
||||
session.add(end_user)
|
||||
session.flush() # Use flush instead of commit to keep transaction open
|
||||
session.refresh(end_user)
|
||||
return end_user
|
||||
|
||||
def _handle_mcp_request(
|
||||
self,
|
||||
app: App,
|
||||
mcp_server: AppMCPServer,
|
||||
mcp_request: mcp_types.ClientRequest,
|
||||
user_input_form: list[VariableEntity],
|
||||
session: Session,
|
||||
request_id: Union[int, str],
|
||||
) -> mcp_types.JSONRPCResponse | mcp_types.JSONRPCError | None:
|
||||
"""Handle MCP request and return response"""
|
||||
end_user = self._retrieve_end_user(mcp_server.tenant_id, mcp_server.id, session)
|
||||
|
||||
if not end_user and isinstance(mcp_request.root, mcp_types.InitializeRequest):
|
||||
client_info = mcp_request.root.params.clientInfo
|
||||
client_name = f"{client_info.name}@{client_info.version}"
|
||||
# Commit the session before creating end user to avoid transaction conflicts
|
||||
session.commit()
|
||||
with Session(db.engine, expire_on_commit=False) as create_session, create_session.begin():
|
||||
end_user = self._create_end_user(client_name, app.tenant_id, app.id, mcp_server.id, create_session)
|
||||
|
||||
return handle_mcp_request(app, mcp_request, user_input_form, mcp_server, end_user, request_id)
|
||||
|
||||
@ -1,7 +0,0 @@
|
||||
from flask import Blueprint
|
||||
|
||||
# Create trigger blueprint
|
||||
bp = Blueprint("trigger", __name__, url_prefix="/triggers")
|
||||
|
||||
# Import routes after blueprint creation to avoid circular imports
|
||||
from . import trigger, webhook
|
||||
@ -1,41 +0,0 @@
|
||||
import logging
|
||||
import re
|
||||
|
||||
from flask import jsonify, request
|
||||
from werkzeug.exceptions import NotFound
|
||||
|
||||
from controllers.trigger import bp
|
||||
from services.trigger.trigger_subscription_builder_service import TriggerSubscriptionBuilderService
|
||||
from services.trigger_service import TriggerService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
UUID_PATTERN = r"^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$"
|
||||
UUID_MATCHER = re.compile(UUID_PATTERN)
|
||||
|
||||
|
||||
@bp.route("/plugin/<string:endpoint_id>", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"])
|
||||
def trigger_endpoint(endpoint_id: str):
|
||||
"""
|
||||
Handle endpoint trigger calls.
|
||||
"""
|
||||
# endpoint_id must be UUID
|
||||
if not UUID_MATCHER.match(endpoint_id):
|
||||
raise NotFound("Invalid endpoint ID")
|
||||
handling_chain = [
|
||||
TriggerService.process_endpoint,
|
||||
TriggerSubscriptionBuilderService.process_builder_validation_endpoint,
|
||||
]
|
||||
try:
|
||||
for handler in handling_chain:
|
||||
response = handler(endpoint_id, request)
|
||||
if response:
|
||||
break
|
||||
if not response:
|
||||
raise NotFound("Endpoint not found")
|
||||
return response
|
||||
except ValueError as e:
|
||||
raise NotFound(str(e))
|
||||
except Exception as e:
|
||||
logger.exception("Webhook processing failed for {endpoint_id}")
|
||||
return jsonify({"error": "Internal server error", "message": str(e)}), 500
|
||||
@ -1,46 +0,0 @@
|
||||
import logging
|
||||
|
||||
from flask import jsonify
|
||||
from werkzeug.exceptions import NotFound, RequestEntityTooLarge
|
||||
|
||||
from controllers.trigger import bp
|
||||
from services.webhook_service import WebhookService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@bp.route("/webhook/<string:webhook_id>", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"])
|
||||
@bp.route("/webhook-debug/<string:webhook_id>", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"])
|
||||
def handle_webhook(webhook_id: str):
|
||||
"""
|
||||
Handle webhook trigger calls.
|
||||
|
||||
This endpoint receives webhook calls and processes them according to the
|
||||
configured webhook trigger settings.
|
||||
"""
|
||||
try:
|
||||
# Get webhook trigger, workflow, and node configuration
|
||||
webhook_trigger, workflow, node_config = WebhookService.get_webhook_trigger_and_workflow(webhook_id)
|
||||
|
||||
# Extract request data
|
||||
webhook_data = WebhookService.extract_webhook_data(webhook_trigger)
|
||||
|
||||
# Validate request against node configuration
|
||||
validation_result = WebhookService.validate_webhook_request(webhook_data, node_config)
|
||||
if not validation_result["valid"]:
|
||||
return jsonify({"error": "Bad Request", "message": validation_result["error"]}), 400
|
||||
|
||||
# Process webhook call (send to Celery)
|
||||
WebhookService.trigger_workflow_execution(webhook_trigger, webhook_data, workflow)
|
||||
|
||||
# Return configured response
|
||||
response_data, status_code = WebhookService.generate_webhook_response(node_config)
|
||||
return jsonify(response_data), status_code
|
||||
|
||||
except ValueError as e:
|
||||
raise NotFound(str(e))
|
||||
except RequestEntityTooLarge:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.exception("Webhook processing failed for %s", webhook_id)
|
||||
return jsonify({"error": "Internal server error", "message": str(e)}), 500
|
||||
@ -261,6 +261,8 @@ class MessageSuggestedQuestionApi(WebApiResource):
|
||||
questions = MessageService.get_suggested_questions_after_answer(
|
||||
app_model=app_model, user=end_user, message_id=message_id, invoke_from=InvokeFrom.WEB_APP
|
||||
)
|
||||
# questions is a list of strings, not a list of Message objects
|
||||
# so we can directly return it
|
||||
except MessageNotExistsError:
|
||||
raise NotFound("Message not found")
|
||||
except ConversationNotExistsError:
|
||||
|
||||
@ -3,7 +3,7 @@ import logging
|
||||
import threading
|
||||
import uuid
|
||||
from collections.abc import Generator, Mapping, Sequence
|
||||
from typing import Any, Literal, Optional, Union, overload
|
||||
from typing import Any, Literal, Union, overload
|
||||
|
||||
from flask import Flask, current_app
|
||||
from pydantic import ValidationError
|
||||
@ -53,8 +53,6 @@ class WorkflowAppGenerator(BaseAppGenerator):
|
||||
invoke_from: InvokeFrom,
|
||||
streaming: Literal[True],
|
||||
call_depth: int,
|
||||
triggered_from: Optional[WorkflowRunTriggeredFrom] = None,
|
||||
root_node_id: Optional[str] = None,
|
||||
) -> Generator[Mapping | str, None, None]: ...
|
||||
|
||||
@overload
|
||||
@ -68,8 +66,6 @@ class WorkflowAppGenerator(BaseAppGenerator):
|
||||
invoke_from: InvokeFrom,
|
||||
streaming: Literal[False],
|
||||
call_depth: int,
|
||||
triggered_from: Optional[WorkflowRunTriggeredFrom] = None,
|
||||
root_node_id: Optional[str] = None,
|
||||
) -> Mapping[str, Any]: ...
|
||||
|
||||
@overload
|
||||
@ -83,8 +79,6 @@ class WorkflowAppGenerator(BaseAppGenerator):
|
||||
invoke_from: InvokeFrom,
|
||||
streaming: bool,
|
||||
call_depth: int,
|
||||
triggered_from: Optional[WorkflowRunTriggeredFrom] = None,
|
||||
root_node_id: Optional[str] = None,
|
||||
) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None]]: ...
|
||||
|
||||
def generate(
|
||||
@ -97,8 +91,6 @@ class WorkflowAppGenerator(BaseAppGenerator):
|
||||
invoke_from: InvokeFrom,
|
||||
streaming: bool = True,
|
||||
call_depth: int = 0,
|
||||
triggered_from: Optional[WorkflowRunTriggeredFrom] = None,
|
||||
root_node_id: Optional[str] = None,
|
||||
) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None]]:
|
||||
files: Sequence[Mapping[str, Any]] = args.get("files") or []
|
||||
|
||||
@ -127,26 +119,24 @@ class WorkflowAppGenerator(BaseAppGenerator):
|
||||
app_id=app_model.id,
|
||||
user_id=user.id if isinstance(user, Account) else user.session_id,
|
||||
)
|
||||
|
||||
inputs: Mapping[str, Any] = args["inputs"]
|
||||
|
||||
extras = {
|
||||
**extract_external_trace_id_from_args(args),
|
||||
}
|
||||
workflow_run_id = str(uuid.uuid4())
|
||||
if triggered_from in (WorkflowRunTriggeredFrom.DEBUGGING, WorkflowRunTriggeredFrom.APP_RUN):
|
||||
# start node get inputs
|
||||
inputs = self._prepare_user_inputs(
|
||||
user_inputs=inputs,
|
||||
variables=app_config.variables,
|
||||
tenant_id=app_model.tenant_id,
|
||||
strict_type_validation=True if invoke_from == InvokeFrom.SERVICE_API else False,
|
||||
)
|
||||
# init application generate entity
|
||||
application_generate_entity = WorkflowAppGenerateEntity(
|
||||
task_id=str(uuid.uuid4()),
|
||||
app_config=app_config,
|
||||
file_upload_config=file_extra_config,
|
||||
inputs=inputs,
|
||||
inputs=self._prepare_user_inputs(
|
||||
user_inputs=inputs,
|
||||
variables=app_config.variables,
|
||||
tenant_id=app_model.tenant_id,
|
||||
strict_type_validation=True if invoke_from == InvokeFrom.SERVICE_API else False,
|
||||
),
|
||||
files=list(system_files),
|
||||
user_id=user.id,
|
||||
stream=streaming,
|
||||
@ -165,10 +155,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
|
||||
# Create session factory
|
||||
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
|
||||
# Create workflow execution(aka workflow run) repository
|
||||
if triggered_from is not None:
|
||||
# Use explicitly provided triggered_from (for async triggers)
|
||||
workflow_triggered_from = triggered_from
|
||||
elif invoke_from == InvokeFrom.DEBUGGER:
|
||||
if invoke_from == InvokeFrom.DEBUGGER:
|
||||
workflow_triggered_from = WorkflowRunTriggeredFrom.DEBUGGING
|
||||
else:
|
||||
workflow_triggered_from = WorkflowRunTriggeredFrom.APP_RUN
|
||||
@ -195,7 +182,6 @@ class WorkflowAppGenerator(BaseAppGenerator):
|
||||
workflow_execution_repository=workflow_execution_repository,
|
||||
workflow_node_execution_repository=workflow_node_execution_repository,
|
||||
streaming=streaming,
|
||||
root_node_id=root_node_id,
|
||||
)
|
||||
|
||||
def _generate(
|
||||
@ -210,7 +196,6 @@ class WorkflowAppGenerator(BaseAppGenerator):
|
||||
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
|
||||
streaming: bool = True,
|
||||
variable_loader: VariableLoader = DUMMY_VARIABLE_LOADER,
|
||||
root_node_id: Optional[str] = None,
|
||||
) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]:
|
||||
"""
|
||||
Generate App response.
|
||||
@ -246,7 +231,6 @@ class WorkflowAppGenerator(BaseAppGenerator):
|
||||
"queue_manager": queue_manager,
|
||||
"context": context,
|
||||
"variable_loader": variable_loader,
|
||||
"root_node_id": root_node_id,
|
||||
},
|
||||
)
|
||||
|
||||
@ -440,16 +424,15 @@ class WorkflowAppGenerator(BaseAppGenerator):
|
||||
queue_manager: AppQueueManager,
|
||||
context: contextvars.Context,
|
||||
variable_loader: VariableLoader,
|
||||
root_node_id: Optional[str] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Generate worker in a new thread.
|
||||
:param flask_app: Flask app
|
||||
:param application_generate_entity: application generate entity
|
||||
:param queue_manager: queue manager
|
||||
:param workflow_thread_pool_id: workflow thread pool id
|
||||
:return:
|
||||
"""
|
||||
|
||||
with preserve_flask_contexts(flask_app, context_vars=context):
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
workflow = session.scalar(
|
||||
@ -482,7 +465,6 @@ class WorkflowAppGenerator(BaseAppGenerator):
|
||||
variable_loader=variable_loader,
|
||||
workflow=workflow,
|
||||
system_user_id=system_user_id,
|
||||
root_node_id=root_node_id,
|
||||
)
|
||||
|
||||
try:
|
||||
|
||||
@ -34,7 +34,6 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
|
||||
variable_loader: VariableLoader,
|
||||
workflow: Workflow,
|
||||
system_user_id: str,
|
||||
root_node_id: str | None = None,
|
||||
):
|
||||
super().__init__(
|
||||
queue_manager=queue_manager,
|
||||
@ -44,7 +43,6 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
|
||||
self.application_generate_entity = application_generate_entity
|
||||
self._workflow = workflow
|
||||
self._sys_user_id = system_user_id
|
||||
self._root_node_id = root_node_id
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
@ -107,7 +105,6 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
workflow_id=self._workflow.id,
|
||||
tenant_id=self._workflow.tenant_id,
|
||||
root_node_id=self._root_node_id,
|
||||
user_id=self.application_generate_entity.user_id,
|
||||
)
|
||||
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
from collections.abc import Mapping
|
||||
from typing import Any, Optional, cast
|
||||
from typing import Any, cast
|
||||
|
||||
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
@ -79,7 +79,6 @@ class WorkflowBasedAppRunner:
|
||||
workflow_id: str = "",
|
||||
tenant_id: str = "",
|
||||
user_id: str = "",
|
||||
root_node_id: Optional[str] = None,
|
||||
) -> Graph:
|
||||
"""
|
||||
Init graph
|
||||
@ -113,7 +112,7 @@ class WorkflowBasedAppRunner:
|
||||
)
|
||||
|
||||
# init graph
|
||||
graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=root_node_id)
|
||||
graph = Graph.init(graph_config=graph_config, node_factory=node_factory)
|
||||
|
||||
if not graph:
|
||||
raise ValueError("graph not found in workflow")
|
||||
@ -162,7 +161,7 @@ class WorkflowBasedAppRunner:
|
||||
edge
|
||||
for edge in graph_config.get("edges", [])
|
||||
if (edge.get("source") is None or edge.get("source") in node_ids)
|
||||
and (edge.get("target") is None or edge.get("target") in node_ids)
|
||||
and (edge.get("target") is None or edge.get("target") in node_ids)
|
||||
]
|
||||
|
||||
graph_config["edges"] = edge_configs
|
||||
@ -277,7 +276,7 @@ class WorkflowBasedAppRunner:
|
||||
edge
|
||||
for edge in graph_config.get("edges", [])
|
||||
if (edge.get("source") is None or edge.get("source") in node_ids)
|
||||
and (edge.get("target") is None or edge.get("target") in node_ids)
|
||||
and (edge.get("target") is None or edge.get("target") in node_ids)
|
||||
]
|
||||
|
||||
graph_config["edges"] = edge_configs
|
||||
|
||||
@ -207,7 +207,6 @@ class ProviderConfig(BasicProviderConfig):
|
||||
required: bool = False
|
||||
default: Union[int, str, float, bool] | None = None
|
||||
options: list[Option] | None = None
|
||||
multiple: bool | None = False
|
||||
label: I18nObject | None = None
|
||||
help: I18nObject | None = None
|
||||
url: str | None = None
|
||||
|
||||
@ -3,7 +3,7 @@ import re
|
||||
from collections.abc import Sequence
|
||||
from typing import Any
|
||||
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
from core.tools.entities.tool_entities import CredentialType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -1,128 +0,0 @@
|
||||
import contextlib
|
||||
from copy import deepcopy
|
||||
from typing import Any, Optional, Protocol
|
||||
|
||||
from core.entities.provider_entities import BasicProviderConfig
|
||||
from core.helper import encrypter
|
||||
|
||||
|
||||
class ProviderConfigCache(Protocol):
|
||||
"""
|
||||
Interface for provider configuration cache operations
|
||||
"""
|
||||
|
||||
def get(self) -> Optional[dict]:
|
||||
"""Get cached provider configuration"""
|
||||
...
|
||||
|
||||
def set(self, config: dict[str, Any]) -> None:
|
||||
"""Cache provider configuration"""
|
||||
...
|
||||
|
||||
def delete(self) -> None:
|
||||
"""Delete cached provider configuration"""
|
||||
...
|
||||
|
||||
|
||||
class ProviderConfigEncrypter:
|
||||
tenant_id: str
|
||||
config: list[BasicProviderConfig]
|
||||
provider_config_cache: ProviderConfigCache
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
tenant_id: str,
|
||||
config: list[BasicProviderConfig],
|
||||
provider_config_cache: ProviderConfigCache,
|
||||
):
|
||||
self.tenant_id = tenant_id
|
||||
self.config = config
|
||||
self.provider_config_cache = provider_config_cache
|
||||
|
||||
def _deep_copy(self, data: dict[str, str]) -> dict[str, str]:
|
||||
"""
|
||||
deep copy data
|
||||
"""
|
||||
return deepcopy(data)
|
||||
|
||||
def encrypt(self, data: dict[str, str]) -> dict[str, str]:
|
||||
"""
|
||||
encrypt tool credentials with tenant id
|
||||
|
||||
return a deep copy of credentials with encrypted values
|
||||
"""
|
||||
data = self._deep_copy(data)
|
||||
|
||||
# get fields need to be decrypted
|
||||
fields = dict[str, BasicProviderConfig]()
|
||||
for credential in self.config:
|
||||
fields[credential.name] = credential
|
||||
|
||||
for field_name, field in fields.items():
|
||||
if field.type == BasicProviderConfig.Type.SECRET_INPUT:
|
||||
if field_name in data:
|
||||
encrypted = encrypter.encrypt_token(self.tenant_id, data[field_name] or "")
|
||||
data[field_name] = encrypted
|
||||
|
||||
return data
|
||||
|
||||
def mask_credentials(self, data: dict[str, Any]) -> dict[str, Any]:
|
||||
"""
|
||||
mask credentials
|
||||
|
||||
return a deep copy of credentials with masked values
|
||||
"""
|
||||
data = self._deep_copy(data)
|
||||
|
||||
# get fields need to be decrypted
|
||||
fields = dict[str, BasicProviderConfig]()
|
||||
for credential in self.config:
|
||||
fields[credential.name] = credential
|
||||
|
||||
for field_name, field in fields.items():
|
||||
if field.type == BasicProviderConfig.Type.SECRET_INPUT:
|
||||
if field_name in data:
|
||||
if len(data[field_name]) > 6:
|
||||
data[field_name] = (
|
||||
data[field_name][:2] + "*" * (len(data[field_name]) - 4) + data[field_name][-2:]
|
||||
)
|
||||
else:
|
||||
data[field_name] = "*" * len(data[field_name])
|
||||
|
||||
return data
|
||||
|
||||
def mask_tool_credentials(self, data: dict[str, Any]) -> dict[str, Any]:
|
||||
return self.mask_credentials(data)
|
||||
|
||||
def decrypt(self, data: dict[str, str]) -> dict[str, Any]:
|
||||
"""
|
||||
decrypt tool credentials with tenant id
|
||||
|
||||
return a deep copy of credentials with decrypted values
|
||||
"""
|
||||
cached_credentials = self.provider_config_cache.get()
|
||||
if cached_credentials:
|
||||
return cached_credentials
|
||||
|
||||
data = self._deep_copy(data)
|
||||
# get fields need to be decrypted
|
||||
fields = dict[str, BasicProviderConfig]()
|
||||
for credential in self.config:
|
||||
fields[credential.name] = credential
|
||||
|
||||
for field_name, field in fields.items():
|
||||
if field.type == BasicProviderConfig.Type.SECRET_INPUT:
|
||||
if field_name in data:
|
||||
with contextlib.suppress(Exception):
|
||||
# if the value is None or empty string, skip decrypt
|
||||
if not data[field_name]:
|
||||
continue
|
||||
|
||||
data[field_name] = encrypter.decrypt_token(self.tenant_id, data[field_name])
|
||||
|
||||
self.provider_config_cache.set(data)
|
||||
return data
|
||||
|
||||
|
||||
def create_provider_encrypter(tenant_id: str, config: list[BasicProviderConfig], cache: ProviderConfigCache):
|
||||
return ProviderConfigEncrypter(tenant_id=tenant_id, config=config, provider_config_cache=cache), cache
|
||||
@ -417,7 +417,7 @@ class WeaveDataTrace(BaseTraceInstance):
|
||||
if not login_status:
|
||||
raise ValueError("Weave login failed")
|
||||
else:
|
||||
print("Weave login successful")
|
||||
logger.info("Weave login successful")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.debug("Weave API check failed: %s", str(e))
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import datetime
|
||||
from collections.abc import Mapping
|
||||
from enum import StrEnum, auto
|
||||
from typing import Any, Optional
|
||||
from typing import Any
|
||||
|
||||
from packaging.version import InvalidVersion, Version
|
||||
from pydantic import BaseModel, Field, field_validator, model_validator
|
||||
@ -13,7 +13,6 @@ from core.plugin.entities.base import BasePluginEntity
|
||||
from core.plugin.entities.endpoint import EndpointProviderDeclaration
|
||||
from core.tools.entities.common_entities import I18nObject
|
||||
from core.tools.entities.tool_entities import ToolProviderEntity
|
||||
from core.trigger.entities.entities import TriggerProviderEntity
|
||||
|
||||
|
||||
class PluginInstallationSource(StrEnum):
|
||||
@ -28,56 +27,54 @@ class PluginResourceRequirements(BaseModel):
|
||||
|
||||
class Permission(BaseModel):
|
||||
class Tool(BaseModel):
|
||||
enabled: Optional[bool] = Field(default=False)
|
||||
enabled: bool | None = Field(default=False)
|
||||
|
||||
class Model(BaseModel):
|
||||
enabled: Optional[bool] = Field(default=False)
|
||||
llm: Optional[bool] = Field(default=False)
|
||||
text_embedding: Optional[bool] = Field(default=False)
|
||||
rerank: Optional[bool] = Field(default=False)
|
||||
tts: Optional[bool] = Field(default=False)
|
||||
speech2text: Optional[bool] = Field(default=False)
|
||||
moderation: Optional[bool] = Field(default=False)
|
||||
enabled: bool | None = Field(default=False)
|
||||
llm: bool | None = Field(default=False)
|
||||
text_embedding: bool | None = Field(default=False)
|
||||
rerank: bool | None = Field(default=False)
|
||||
tts: bool | None = Field(default=False)
|
||||
speech2text: bool | None = Field(default=False)
|
||||
moderation: bool | None = Field(default=False)
|
||||
|
||||
class Node(BaseModel):
|
||||
enabled: Optional[bool] = Field(default=False)
|
||||
enabled: bool | None = Field(default=False)
|
||||
|
||||
class Endpoint(BaseModel):
|
||||
enabled: Optional[bool] = Field(default=False)
|
||||
enabled: bool | None = Field(default=False)
|
||||
|
||||
class Storage(BaseModel):
|
||||
enabled: Optional[bool] = Field(default=False)
|
||||
enabled: bool | None = Field(default=False)
|
||||
size: int = Field(ge=1024, le=1073741824, default=1048576)
|
||||
|
||||
tool: Optional[Tool] = Field(default=None)
|
||||
model: Optional[Model] = Field(default=None)
|
||||
node: Optional[Node] = Field(default=None)
|
||||
endpoint: Optional[Endpoint] = Field(default=None)
|
||||
storage: Optional[Storage] = Field(default=None)
|
||||
tool: Tool | None = Field(default=None)
|
||||
model: Model | None = Field(default=None)
|
||||
node: Node | None = Field(default=None)
|
||||
endpoint: Endpoint | None = Field(default=None)
|
||||
storage: Storage | None = Field(default=None)
|
||||
|
||||
permission: Optional[Permission] = Field(default=None)
|
||||
permission: Permission | None = Field(default=None)
|
||||
|
||||
|
||||
class PluginCategory(StrEnum):
|
||||
Tool = auto()
|
||||
Model = auto()
|
||||
Extension = auto()
|
||||
AgentStrategy = auto()
|
||||
Datasource = auto()
|
||||
Trigger = auto()
|
||||
AgentStrategy = "agent-strategy"
|
||||
Datasource = "datasource"
|
||||
|
||||
|
||||
class PluginDeclaration(BaseModel):
|
||||
class Plugins(BaseModel):
|
||||
tools: Optional[list[str]] = Field(default_factory=list[str])
|
||||
models: Optional[list[str]] = Field(default_factory=list[str])
|
||||
endpoints: Optional[list[str]] = Field(default_factory=list[str])
|
||||
triggers: Optional[list[str]] = Field(default_factory=list[str])
|
||||
tools: list[str] | None = Field(default_factory=list[str])
|
||||
models: list[str] | None = Field(default_factory=list[str])
|
||||
endpoints: list[str] | None = Field(default_factory=list[str])
|
||||
datasources: list[str] | None = Field(default_factory=list[str])
|
||||
|
||||
class Meta(BaseModel):
|
||||
minimum_dify_version: Optional[str] = Field(default=None, pattern=r"^\d{1,4}(\.\d{1,4}){1,3}(-\w{1,16})?$")
|
||||
version: Optional[str] = Field(default=None)
|
||||
minimum_dify_version: str | None = Field(default=None)
|
||||
version: str | None = Field(default=None)
|
||||
|
||||
@field_validator("minimum_dify_version")
|
||||
@classmethod
|
||||
@ -90,26 +87,25 @@ class PluginDeclaration(BaseModel):
|
||||
except InvalidVersion as e:
|
||||
raise ValueError(f"Invalid version format: {v}") from e
|
||||
|
||||
version: str = Field(..., pattern=r"^\d{1,4}(\.\d{1,4}){1,3}(-\w{1,16})?$")
|
||||
author: Optional[str] = Field(..., pattern=r"^[a-zA-Z0-9_-]{1,64}$")
|
||||
version: str = Field(...)
|
||||
author: str | None = Field(..., pattern=r"^[a-zA-Z0-9_-]{1,64}$")
|
||||
name: str = Field(..., pattern=r"^[a-z0-9_-]{1,128}$")
|
||||
description: I18nObject
|
||||
icon: str
|
||||
icon_dark: Optional[str] = Field(default=None)
|
||||
icon_dark: str | None = Field(default=None)
|
||||
label: I18nObject
|
||||
category: PluginCategory
|
||||
created_at: datetime.datetime
|
||||
resource: PluginResourceRequirements
|
||||
plugins: Plugins
|
||||
tags: list[str] = Field(default_factory=list)
|
||||
repo: Optional[str] = Field(default=None)
|
||||
repo: str | None = Field(default=None)
|
||||
verified: bool = Field(default=False)
|
||||
tool: Optional[ToolProviderEntity] = None
|
||||
model: Optional[ProviderEntity] = None
|
||||
endpoint: Optional[EndpointProviderDeclaration] = None
|
||||
agent_strategy: Optional[AgentStrategyProviderEntity] = None
|
||||
tool: ToolProviderEntity | None = None
|
||||
model: ProviderEntity | None = None
|
||||
endpoint: EndpointProviderDeclaration | None = None
|
||||
agent_strategy: AgentStrategyProviderEntity | None = None
|
||||
datasource: DatasourceProviderEntity | None = None
|
||||
trigger: Optional[TriggerProviderEntity] = None
|
||||
meta: Meta
|
||||
|
||||
@field_validator("version")
|
||||
@ -123,7 +119,7 @@ class PluginDeclaration(BaseModel):
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def validate_category(cls, values: dict) -> dict:
|
||||
def validate_category(cls, values: dict):
|
||||
# auto detect category
|
||||
if values.get("tool"):
|
||||
values["category"] = PluginCategory.Tool
|
||||
@ -133,8 +129,6 @@ class PluginDeclaration(BaseModel):
|
||||
values["category"] = PluginCategory.Datasource
|
||||
elif values.get("agent_strategy"):
|
||||
values["category"] = PluginCategory.AgentStrategy
|
||||
elif values.get("trigger"):
|
||||
values["category"] = PluginCategory.Trigger
|
||||
else:
|
||||
values["category"] = PluginCategory.Extension
|
||||
return values
|
||||
@ -196,9 +190,9 @@ class PluginDependency(BaseModel):
|
||||
|
||||
type: Type
|
||||
value: Github | Marketplace | Package
|
||||
current_identifier: Optional[str] = None
|
||||
current_identifier: str | None = None
|
||||
|
||||
|
||||
class MissingPluginDependency(BaseModel):
|
||||
plugin_unique_identifier: str
|
||||
current_identifier: Optional[str] = None
|
||||
current_identifier: str | None = None
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
from collections.abc import Mapping, Sequence
|
||||
from datetime import datetime
|
||||
from enum import StrEnum, auto
|
||||
from enum import StrEnum
|
||||
from typing import Any, Generic, TypeVar
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
@ -14,7 +14,6 @@ from core.plugin.entities.parameters import PluginParameterOption
|
||||
from core.plugin.entities.plugin import PluginDeclaration, PluginEntity
|
||||
from core.tools.entities.common_entities import I18nObject
|
||||
from core.tools.entities.tool_entities import ToolProviderEntityWithPlugin
|
||||
from core.trigger.entities.entities import TriggerProviderEntity
|
||||
|
||||
T = TypeVar("T", bound=(BaseModel | dict | list | bool | str))
|
||||
|
||||
@ -206,49 +205,3 @@ class PluginListResponse(BaseModel):
|
||||
|
||||
class PluginDynamicSelectOptionsResponse(BaseModel):
|
||||
options: Sequence[PluginParameterOption] = Field(description="The options of the dynamic select.")
|
||||
|
||||
|
||||
class PluginTriggerProviderEntity(BaseModel):
|
||||
provider: str
|
||||
plugin_unique_identifier: str
|
||||
plugin_id: str
|
||||
declaration: TriggerProviderEntity
|
||||
|
||||
|
||||
class CredentialType(StrEnum):
|
||||
API_KEY = "api-key"
|
||||
OAUTH2 = auto()
|
||||
UNAUTHORIZED = auto()
|
||||
|
||||
def get_name(self):
|
||||
if self == CredentialType.API_KEY:
|
||||
return "API KEY"
|
||||
elif self == CredentialType.OAUTH2:
|
||||
return "AUTH"
|
||||
elif self == CredentialType.UNAUTHORIZED:
|
||||
return "UNAUTHORIZED"
|
||||
else:
|
||||
return self.value.replace("-", " ").upper()
|
||||
|
||||
def is_editable(self):
|
||||
return self == CredentialType.API_KEY
|
||||
|
||||
def is_validate_allowed(self):
|
||||
return self == CredentialType.API_KEY
|
||||
|
||||
@classmethod
|
||||
def values(cls):
|
||||
return [item.value for item in cls]
|
||||
|
||||
@classmethod
|
||||
def of(cls, credential_type: str) -> "CredentialType":
|
||||
type_name = credential_type.lower()
|
||||
if type_name in {"api-key", "api_key"}:
|
||||
return cls.API_KEY
|
||||
elif type_name in {"oauth2", "oauth"}:
|
||||
return cls.OAUTH2
|
||||
elif type_name == "unauthorized":
|
||||
return cls.UNAUTHORIZED
|
||||
else:
|
||||
raise ValueError(f"Invalid credential type: {credential_type}")
|
||||
|
||||
|
||||
@ -1,7 +1,5 @@
|
||||
from collections.abc import Mapping
|
||||
from typing import Any, Literal
|
||||
|
||||
from flask import Response
|
||||
from pydantic import BaseModel, ConfigDict, Field, field_validator
|
||||
|
||||
from core.entities.provider_entities import BasicProviderConfig
|
||||
@ -239,33 +237,3 @@ class RequestFetchAppInfo(BaseModel):
|
||||
"""
|
||||
|
||||
app_id: str
|
||||
|
||||
|
||||
class Event(BaseModel):
|
||||
variables: Mapping[str, Any]
|
||||
|
||||
|
||||
class TriggerInvokeResponse(BaseModel):
|
||||
event: Event
|
||||
|
||||
|
||||
class PluginTriggerDispatchResponse(BaseModel):
|
||||
triggers: list[str]
|
||||
raw_http_response: str
|
||||
|
||||
|
||||
class TriggerSubscriptionResponse(BaseModel):
|
||||
subscription: dict[str, Any]
|
||||
|
||||
|
||||
class TriggerValidateProviderCredentialsResponse(BaseModel):
|
||||
result: bool
|
||||
|
||||
|
||||
class TriggerDispatchResponse:
|
||||
triggers: list[str]
|
||||
response: Response
|
||||
|
||||
def __init__(self, triggers: list[str], response: Response):
|
||||
self.triggers = triggers
|
||||
self.response = response
|
||||
|
||||
@ -15,7 +15,6 @@ class DynamicSelectClient(BasePluginClient):
|
||||
provider: str,
|
||||
action: str,
|
||||
credentials: Mapping[str, Any],
|
||||
credential_type: str,
|
||||
parameter: str,
|
||||
) -> PluginDynamicSelectOptionsResponse:
|
||||
"""
|
||||
@ -30,7 +29,6 @@ class DynamicSelectClient(BasePluginClient):
|
||||
"data": {
|
||||
"provider": GenericProviderID(provider).provider_name,
|
||||
"credentials": credentials,
|
||||
"credential_type": credential_type,
|
||||
"provider_action": action,
|
||||
"parameter": parameter,
|
||||
},
|
||||
|
||||
@ -4,14 +4,13 @@ from typing import Any
|
||||
from pydantic import BaseModel
|
||||
|
||||
from core.plugin.entities.plugin_daemon import (
|
||||
CredentialType,
|
||||
PluginBasicBooleanResponse,
|
||||
PluginToolProviderEntity,
|
||||
)
|
||||
from core.plugin.impl.base import BasePluginClient
|
||||
from core.plugin.utils.chunk_merger import merge_blob_chunks
|
||||
from core.schemas.resolver import resolve_dify_schema_refs
|
||||
from core.tools.entities.tool_entities import ToolInvokeMessage, ToolParameter
|
||||
from core.tools.entities.tool_entities import CredentialType, ToolInvokeMessage, ToolParameter
|
||||
from models.provider_ids import GenericProviderID, ToolProviderID
|
||||
|
||||
|
||||
|
||||
@ -1,301 +0,0 @@
|
||||
import binascii
|
||||
from collections.abc import Mapping
|
||||
from typing import Any
|
||||
|
||||
from flask import Request
|
||||
|
||||
from core.plugin.entities.plugin_daemon import CredentialType, PluginTriggerProviderEntity
|
||||
from core.plugin.entities.request import (
|
||||
PluginTriggerDispatchResponse,
|
||||
TriggerDispatchResponse,
|
||||
TriggerInvokeResponse,
|
||||
TriggerSubscriptionResponse,
|
||||
TriggerValidateProviderCredentialsResponse,
|
||||
)
|
||||
from core.plugin.impl.base import BasePluginClient
|
||||
from core.plugin.utils.http_parser import deserialize_response, serialize_request
|
||||
from core.trigger.entities.entities import Subscription
|
||||
from models.provider_ids import GenericProviderID, TriggerProviderID
|
||||
|
||||
|
||||
class PluginTriggerManager(BasePluginClient):
|
||||
def fetch_trigger_providers(self, tenant_id: str) -> list[PluginTriggerProviderEntity]:
|
||||
"""
|
||||
Fetch trigger providers for the given tenant.
|
||||
"""
|
||||
|
||||
def transformer(json_response: dict[str, Any]) -> dict:
|
||||
for provider in json_response.get("data", []):
|
||||
declaration = provider.get("declaration", {}) or {}
|
||||
provider_id = provider.get("plugin_id") + "/" + provider.get("provider")
|
||||
for trigger in declaration.get("triggers", []):
|
||||
trigger["identity"]["provider"] = provider_id
|
||||
|
||||
return json_response
|
||||
|
||||
response = self._request_with_plugin_daemon_response(
|
||||
"GET",
|
||||
f"plugin/{tenant_id}/management/triggers",
|
||||
list[PluginTriggerProviderEntity],
|
||||
params={"page": 1, "page_size": 256},
|
||||
transformer=transformer,
|
||||
)
|
||||
|
||||
for provider in response:
|
||||
provider.declaration.identity.name = f"{provider.plugin_id}/{provider.declaration.identity.name}"
|
||||
|
||||
# override the provider name for each trigger to plugin_id/provider_name
|
||||
for trigger in provider.declaration.triggers:
|
||||
trigger.identity.provider = provider.declaration.identity.name
|
||||
|
||||
return response
|
||||
|
||||
def fetch_trigger_provider(self, tenant_id: str, provider_id: TriggerProviderID) -> PluginTriggerProviderEntity:
|
||||
"""
|
||||
Fetch trigger provider for the given tenant and plugin.
|
||||
"""
|
||||
|
||||
def transformer(json_response: dict[str, Any]) -> dict:
|
||||
data = json_response.get("data")
|
||||
if data:
|
||||
for trigger in data.get("declaration", {}).get("triggers", []):
|
||||
trigger["identity"]["provider"] = str(provider_id)
|
||||
|
||||
return json_response
|
||||
|
||||
response = self._request_with_plugin_daemon_response(
|
||||
"GET",
|
||||
f"plugin/{tenant_id}/management/trigger",
|
||||
PluginTriggerProviderEntity,
|
||||
params={"provider": provider_id.provider_name, "plugin_id": provider_id.plugin_id},
|
||||
transformer=transformer,
|
||||
)
|
||||
|
||||
response.declaration.identity.name = str(provider_id)
|
||||
|
||||
# override the provider name for each trigger to plugin_id/provider_name
|
||||
for trigger in response.declaration.triggers:
|
||||
trigger.identity.provider = str(provider_id)
|
||||
|
||||
return response
|
||||
|
||||
def invoke_trigger(
|
||||
self,
|
||||
tenant_id: str,
|
||||
user_id: str,
|
||||
provider: str,
|
||||
trigger: str,
|
||||
credentials: Mapping[str, str],
|
||||
credential_type: CredentialType,
|
||||
request: Request,
|
||||
parameters: Mapping[str, Any],
|
||||
) -> TriggerInvokeResponse:
|
||||
"""
|
||||
Invoke a trigger with the given parameters.
|
||||
"""
|
||||
trigger_provider_id = GenericProviderID(provider)
|
||||
|
||||
response = self._request_with_plugin_daemon_response_stream(
|
||||
"POST",
|
||||
f"plugin/{tenant_id}/dispatch/trigger/invoke",
|
||||
TriggerInvokeResponse,
|
||||
data={
|
||||
"user_id": user_id,
|
||||
"data": {
|
||||
"provider": trigger_provider_id.provider_name,
|
||||
"trigger": trigger,
|
||||
"credentials": credentials,
|
||||
"credential_type": credential_type,
|
||||
"raw_http_request": binascii.hexlify(serialize_request(request)).decode(),
|
||||
"parameters": parameters,
|
||||
},
|
||||
},
|
||||
headers={
|
||||
"X-Plugin-ID": trigger_provider_id.plugin_id,
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
for resp in response:
|
||||
return TriggerInvokeResponse(event=resp.event)
|
||||
|
||||
raise ValueError("No response received from plugin daemon for invoke trigger")
|
||||
|
||||
def validate_provider_credentials(
|
||||
self, tenant_id: str, user_id: str, provider: str, credentials: Mapping[str, str]
|
||||
) -> bool:
|
||||
"""
|
||||
Validate the credentials of the trigger provider.
|
||||
"""
|
||||
trigger_provider_id = GenericProviderID(provider)
|
||||
|
||||
response = self._request_with_plugin_daemon_response_stream(
|
||||
"POST",
|
||||
f"plugin/{tenant_id}/dispatch/trigger/validate_credentials",
|
||||
TriggerValidateProviderCredentialsResponse,
|
||||
data={
|
||||
"user_id": user_id,
|
||||
"data": {
|
||||
"provider": trigger_provider_id.provider_name,
|
||||
"credentials": credentials,
|
||||
},
|
||||
},
|
||||
headers={
|
||||
"X-Plugin-ID": trigger_provider_id.plugin_id,
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
for resp in response:
|
||||
return resp.result
|
||||
|
||||
raise ValueError("No response received from plugin daemon for validate provider credentials")
|
||||
|
||||
def dispatch_event(
|
||||
self,
|
||||
tenant_id: str,
|
||||
user_id: str,
|
||||
provider: str,
|
||||
subscription: Mapping[str, Any],
|
||||
request: Request,
|
||||
) -> TriggerDispatchResponse:
|
||||
"""
|
||||
Dispatch an event to triggers.
|
||||
"""
|
||||
trigger_provider_id = GenericProviderID(provider)
|
||||
|
||||
response = self._request_with_plugin_daemon_response_stream(
|
||||
"POST",
|
||||
f"plugin/{tenant_id}/dispatch/trigger/dispatch_event",
|
||||
PluginTriggerDispatchResponse,
|
||||
data={
|
||||
"user_id": user_id,
|
||||
"data": {
|
||||
"provider": trigger_provider_id.provider_name,
|
||||
"subscription": subscription,
|
||||
"raw_http_request": binascii.hexlify(serialize_request(request)).decode(),
|
||||
},
|
||||
},
|
||||
headers={
|
||||
"X-Plugin-ID": trigger_provider_id.plugin_id,
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
for resp in response:
|
||||
return TriggerDispatchResponse(
|
||||
triggers=resp.triggers,
|
||||
response=deserialize_response(binascii.unhexlify(resp.raw_http_response.encode())),
|
||||
)
|
||||
|
||||
raise ValueError("No response received from plugin daemon for dispatch event")
|
||||
|
||||
def subscribe(
|
||||
self,
|
||||
tenant_id: str,
|
||||
user_id: str,
|
||||
provider: str,
|
||||
credentials: Mapping[str, str],
|
||||
endpoint: str,
|
||||
parameters: Mapping[str, Any],
|
||||
) -> TriggerSubscriptionResponse:
|
||||
"""
|
||||
Subscribe to a trigger.
|
||||
"""
|
||||
trigger_provider_id = GenericProviderID(provider)
|
||||
|
||||
response = self._request_with_plugin_daemon_response_stream(
|
||||
"POST",
|
||||
f"plugin/{tenant_id}/dispatch/trigger/subscribe",
|
||||
TriggerSubscriptionResponse,
|
||||
data={
|
||||
"user_id": user_id,
|
||||
"data": {
|
||||
"provider": trigger_provider_id.provider_name,
|
||||
"credentials": credentials,
|
||||
"endpoint": endpoint,
|
||||
"parameters": parameters,
|
||||
},
|
||||
},
|
||||
headers={
|
||||
"X-Plugin-ID": trigger_provider_id.plugin_id,
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
for resp in response:
|
||||
return resp
|
||||
|
||||
raise ValueError("No response received from plugin daemon for subscribe")
|
||||
|
||||
def unsubscribe(
|
||||
self,
|
||||
tenant_id: str,
|
||||
user_id: str,
|
||||
provider: str,
|
||||
subscription: Subscription,
|
||||
credentials: Mapping[str, str],
|
||||
) -> TriggerSubscriptionResponse:
|
||||
"""
|
||||
Unsubscribe from a trigger.
|
||||
"""
|
||||
trigger_provider_id = GenericProviderID(provider)
|
||||
|
||||
response = self._request_with_plugin_daemon_response_stream(
|
||||
"POST",
|
||||
f"plugin/{tenant_id}/dispatch/trigger/unsubscribe",
|
||||
TriggerSubscriptionResponse,
|
||||
data={
|
||||
"user_id": user_id,
|
||||
"data": {
|
||||
"provider": trigger_provider_id.provider_name,
|
||||
"subscription": subscription.model_dump(),
|
||||
"credentials": credentials,
|
||||
},
|
||||
},
|
||||
headers={
|
||||
"X-Plugin-ID": trigger_provider_id.plugin_id,
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
for resp in response:
|
||||
return resp
|
||||
|
||||
raise ValueError("No response received from plugin daemon for unsubscribe")
|
||||
|
||||
def refresh(
|
||||
self,
|
||||
tenant_id: str,
|
||||
user_id: str,
|
||||
provider: str,
|
||||
subscription: Subscription,
|
||||
credentials: Mapping[str, str],
|
||||
) -> TriggerSubscriptionResponse:
|
||||
"""
|
||||
Refresh a trigger subscription.
|
||||
"""
|
||||
trigger_provider_id = GenericProviderID(provider)
|
||||
|
||||
response = self._request_with_plugin_daemon_response_stream(
|
||||
"POST",
|
||||
f"plugin/{tenant_id}/dispatch/trigger/refresh",
|
||||
TriggerSubscriptionResponse,
|
||||
data={
|
||||
"user_id": user_id,
|
||||
"data": {
|
||||
"provider": trigger_provider_id.provider_name,
|
||||
"subscription": subscription.model_dump(),
|
||||
"credentials": credentials,
|
||||
},
|
||||
},
|
||||
headers={
|
||||
"X-Plugin-ID": trigger_provider_id.plugin_id,
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
for resp in response:
|
||||
return resp
|
||||
|
||||
raise ValueError("No response received from plugin daemon for refresh")
|
||||
@ -1,159 +0,0 @@
|
||||
from io import BytesIO
|
||||
|
||||
from flask import Request, Response
|
||||
from werkzeug.datastructures import Headers
|
||||
|
||||
|
||||
def serialize_request(request: Request) -> bytes:
|
||||
method = request.method
|
||||
path = request.full_path.rstrip("?")
|
||||
raw = f"{method} {path} HTTP/1.1\r\n".encode()
|
||||
|
||||
for name, value in request.headers.items():
|
||||
raw += f"{name}: {value}\r\n".encode()
|
||||
|
||||
raw += b"\r\n"
|
||||
|
||||
body = request.get_data(as_text=False)
|
||||
if body:
|
||||
raw += body
|
||||
|
||||
return raw
|
||||
|
||||
|
||||
def deserialize_request(raw_data: bytes) -> Request:
|
||||
header_end = raw_data.find(b"\r\n\r\n")
|
||||
if header_end == -1:
|
||||
header_end = raw_data.find(b"\n\n")
|
||||
if header_end == -1:
|
||||
header_data = raw_data
|
||||
body = b""
|
||||
else:
|
||||
header_data = raw_data[:header_end]
|
||||
body = raw_data[header_end + 2 :]
|
||||
else:
|
||||
header_data = raw_data[:header_end]
|
||||
body = raw_data[header_end + 4 :]
|
||||
|
||||
lines = header_data.split(b"\r\n")
|
||||
if len(lines) == 1 and b"\n" in lines[0]:
|
||||
lines = header_data.split(b"\n")
|
||||
|
||||
if not lines or not lines[0]:
|
||||
raise ValueError("Empty HTTP request")
|
||||
|
||||
request_line = lines[0].decode("utf-8", errors="ignore")
|
||||
parts = request_line.split(" ", 2)
|
||||
if len(parts) < 2:
|
||||
raise ValueError(f"Invalid request line: {request_line}")
|
||||
|
||||
method = parts[0]
|
||||
full_path = parts[1]
|
||||
protocol = parts[2] if len(parts) > 2 else "HTTP/1.1"
|
||||
|
||||
if "?" in full_path:
|
||||
path, query_string = full_path.split("?", 1)
|
||||
else:
|
||||
path = full_path
|
||||
query_string = ""
|
||||
|
||||
headers = Headers()
|
||||
for line in lines[1:]:
|
||||
if not line:
|
||||
continue
|
||||
line_str = line.decode("utf-8", errors="ignore")
|
||||
if ":" not in line_str:
|
||||
continue
|
||||
name, value = line_str.split(":", 1)
|
||||
headers.add(name, value.strip())
|
||||
|
||||
host = headers.get("Host", "localhost")
|
||||
if ":" in host:
|
||||
server_name, server_port = host.rsplit(":", 1)
|
||||
else:
|
||||
server_name = host
|
||||
server_port = "80"
|
||||
|
||||
environ = {
|
||||
"REQUEST_METHOD": method,
|
||||
"PATH_INFO": path,
|
||||
"QUERY_STRING": query_string,
|
||||
"SERVER_NAME": server_name,
|
||||
"SERVER_PORT": server_port,
|
||||
"SERVER_PROTOCOL": protocol,
|
||||
"wsgi.input": BytesIO(body),
|
||||
"wsgi.url_scheme": "http",
|
||||
}
|
||||
|
||||
if "Content-Type" in headers:
|
||||
environ["CONTENT_TYPE"] = headers.get("Content-Type")
|
||||
|
||||
if "Content-Length" in headers:
|
||||
environ["CONTENT_LENGTH"] = headers.get("Content-Length")
|
||||
elif body:
|
||||
environ["CONTENT_LENGTH"] = str(len(body))
|
||||
|
||||
for name, value in headers.items():
|
||||
if name.upper() in ("CONTENT-TYPE", "CONTENT-LENGTH"):
|
||||
continue
|
||||
env_name = f"HTTP_{name.upper().replace('-', '_')}"
|
||||
environ[env_name] = value
|
||||
|
||||
return Request(environ)
|
||||
|
||||
|
||||
def serialize_response(response: Response) -> bytes:
|
||||
raw = f"HTTP/1.1 {response.status}\r\n".encode()
|
||||
|
||||
for name, value in response.headers.items():
|
||||
raw += f"{name}: {value}\r\n".encode()
|
||||
|
||||
raw += b"\r\n"
|
||||
|
||||
body = response.get_data(as_text=False)
|
||||
if body:
|
||||
raw += body
|
||||
|
||||
return raw
|
||||
|
||||
|
||||
def deserialize_response(raw_data: bytes) -> Response:
|
||||
header_end = raw_data.find(b"\r\n\r\n")
|
||||
if header_end == -1:
|
||||
header_end = raw_data.find(b"\n\n")
|
||||
if header_end == -1:
|
||||
header_data = raw_data
|
||||
body = b""
|
||||
else:
|
||||
header_data = raw_data[:header_end]
|
||||
body = raw_data[header_end + 2 :]
|
||||
else:
|
||||
header_data = raw_data[:header_end]
|
||||
body = raw_data[header_end + 4 :]
|
||||
|
||||
lines = header_data.split(b"\r\n")
|
||||
if len(lines) == 1 and b"\n" in lines[0]:
|
||||
lines = header_data.split(b"\n")
|
||||
|
||||
if not lines or not lines[0]:
|
||||
raise ValueError("Empty HTTP response")
|
||||
|
||||
status_line = lines[0].decode("utf-8", errors="ignore")
|
||||
parts = status_line.split(" ", 2)
|
||||
if len(parts) < 2:
|
||||
raise ValueError(f"Invalid status line: {status_line}")
|
||||
|
||||
status_code = int(parts[1])
|
||||
|
||||
response = Response(response=body, status=status_code)
|
||||
|
||||
for line in lines[1:]:
|
||||
if not line:
|
||||
continue
|
||||
line_str = line.decode("utf-8", errors="ignore")
|
||||
if ":" not in line_str:
|
||||
continue
|
||||
name, value = line_str.split(":", 1)
|
||||
response.headers[name] = value.strip()
|
||||
|
||||
return response
|
||||
@ -229,7 +229,7 @@ class OceanBaseVector(BaseVector):
|
||||
try:
|
||||
metadata = json.loads(metadata_str)
|
||||
except json.JSONDecodeError:
|
||||
print(f"Invalid JSON metadata: {metadata_str}")
|
||||
logger.warning("Invalid JSON metadata: %s", metadata_str)
|
||||
metadata = {}
|
||||
metadata["score"] = score
|
||||
docs.append(Document(page_content=_text, metadata=metadata))
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import array
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import uuid
|
||||
from typing import Any
|
||||
@ -19,6 +20,8 @@ from core.rag.models.document import Document
|
||||
from extensions.ext_redis import redis_client
|
||||
from models.dataset import Dataset
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
oracledb.defaults.fetch_lobs = False
|
||||
|
||||
|
||||
@ -180,8 +183,8 @@ class OracleVector(BaseVector):
|
||||
value,
|
||||
)
|
||||
conn.commit()
|
||||
except Exception as e:
|
||||
print(e)
|
||||
except Exception:
|
||||
logger.exception("Failed to insert record %s into %s", value[0], self.table_name)
|
||||
conn.close()
|
||||
return pks
|
||||
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from typing import Any
|
||||
|
||||
@ -23,6 +24,8 @@ from core.rag.datasource.vdb.vector_base import BaseVector
|
||||
from core.rag.models.document import Document
|
||||
from extensions.ext_redis import redis_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
Base = declarative_base() # type: Any
|
||||
|
||||
|
||||
@ -187,8 +190,8 @@ class RelytVector(BaseVector):
|
||||
delete_condition = chunks_table.c.id.in_(ids)
|
||||
conn.execute(chunks_table.delete().where(delete_condition))
|
||||
return True
|
||||
except Exception as e:
|
||||
print("Delete operation failed:", str(e))
|
||||
except Exception:
|
||||
logger.exception("Delete operation failed for collection %s", self._collection_name)
|
||||
return False
|
||||
|
||||
def delete_by_metadata_field(self, key: str, value: str):
|
||||
|
||||
@ -164,8 +164,8 @@ class TiDBVector(BaseVector):
|
||||
delete_condition = table.c.id.in_(ids)
|
||||
conn.execute(table.delete().where(delete_condition))
|
||||
return True
|
||||
except Exception as e:
|
||||
print("Delete operation failed:", str(e))
|
||||
except Exception:
|
||||
logger.exception("Delete operation failed for collection %s", self._collection_name)
|
||||
return False
|
||||
|
||||
def get_ids_by_metadata_field(self, key: str, value: str):
|
||||
|
||||
@ -93,6 +93,17 @@ class DatasetDocumentStore:
|
||||
|
||||
segment_document = self.get_document_segment(doc_id=doc.metadata["doc_id"])
|
||||
|
||||
# Check if a segment with the same content hash already exists in the dataset
|
||||
existing_segment_by_hash = db.session.query(DocumentSegment).filter_by(
|
||||
dataset_id=self._dataset.id,
|
||||
index_node_hash=doc.metadata["doc_hash"],
|
||||
enabled=True
|
||||
).first()
|
||||
|
||||
if existing_segment_by_hash:
|
||||
# Skip creating duplicate segment with same content hash
|
||||
continue
|
||||
|
||||
# NOTE: doc could already exist in the store, but we overwrite it
|
||||
if not allow_update and segment_document:
|
||||
raise ValueError(
|
||||
|
||||
@ -417,12 +417,10 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository)
|
||||
|
||||
if db_model is not None:
|
||||
offload_data = db_model.offload_data
|
||||
|
||||
else:
|
||||
db_model = self._to_db_model(domain_model)
|
||||
offload_data = []
|
||||
offload_data = db_model.offload_data
|
||||
|
||||
offload_data = db_model.offload_data
|
||||
if domain_model.inputs is not None:
|
||||
result = self._truncate_and_upload(
|
||||
domain_model.inputs,
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
from collections.abc import Mapping, MutableMapping
|
||||
from pathlib import Path
|
||||
@ -8,6 +9,8 @@ from typing import Any, ClassVar, Optional
|
||||
class SchemaRegistry:
|
||||
"""Schema registry manages JSON schemas with version support"""
|
||||
|
||||
logger: ClassVar[logging.Logger] = logging.getLogger(__name__)
|
||||
|
||||
_default_instance: ClassVar[Optional["SchemaRegistry"]] = None
|
||||
_lock: ClassVar[threading.Lock] = threading.Lock()
|
||||
|
||||
@ -83,7 +86,7 @@ class SchemaRegistry:
|
||||
self.metadata[uri] = metadata
|
||||
|
||||
except (OSError, json.JSONDecodeError) as e:
|
||||
print(f"Warning: failed to load schema {version}/{schema_name}: {e}")
|
||||
self.logger.warning("Failed to load schema %s/%s: %s", version, schema_name, e)
|
||||
|
||||
def get_schema(self, uri: str) -> Any | None:
|
||||
"""Retrieves a schema by URI with version support"""
|
||||
|
||||
@ -4,8 +4,7 @@ from openai import BaseModel
|
||||
from pydantic import Field
|
||||
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
from core.tools.entities.tool_entities import ToolInvokeFrom
|
||||
from core.tools.entities.tool_entities import CredentialType, ToolInvokeFrom
|
||||
|
||||
|
||||
class ToolRuntime(BaseModel):
|
||||
|
||||
@ -4,11 +4,11 @@ from typing import Any
|
||||
|
||||
from core.entities.provider_entities import ProviderConfig
|
||||
from core.helper.module_import_helper import load_single_subclass_from_source
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
from core.tools.__base.tool_provider import ToolProviderController
|
||||
from core.tools.__base.tool_runtime import ToolRuntime
|
||||
from core.tools.builtin_tool.tool import BuiltinTool
|
||||
from core.tools.entities.tool_entities import (
|
||||
CredentialType,
|
||||
OAuthSchema,
|
||||
ToolEntity,
|
||||
ToolProviderEntity,
|
||||
|
||||
@ -396,6 +396,10 @@ class ApiTool(Tool):
|
||||
# assemble invoke message based on response type
|
||||
if parsed_response.is_json and isinstance(parsed_response.content, dict):
|
||||
yield self.create_json_message(parsed_response.content)
|
||||
|
||||
# FIXES: https://github.com/langgenius/dify/pull/23456#issuecomment-3182413088
|
||||
# We need never break the original flows
|
||||
yield self.create_text_message(response.text)
|
||||
else:
|
||||
# Convert to string if needed and create text message
|
||||
text_response = (
|
||||
|
||||
@ -5,10 +5,9 @@ from typing import Any, Literal
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
|
||||
from core.model_runtime.utils.encoders import jsonable_encoder
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
from core.tools.__base.tool import ToolParameter
|
||||
from core.tools.entities.common_entities import I18nObject
|
||||
from core.tools.entities.tool_entities import ToolProviderType
|
||||
from core.tools.entities.tool_entities import CredentialType, ToolProviderType
|
||||
|
||||
|
||||
class ToolApiEntity(BaseModel):
|
||||
|
||||
@ -477,4 +477,37 @@ class ToolSelector(BaseModel):
|
||||
tool_parameters: Mapping[str, Parameter] = Field(..., description="Parameters, type llm")
|
||||
|
||||
def to_plugin_parameter(self) -> dict[str, Any]:
|
||||
return self.model_dump()
|
||||
return self.model_dump()
|
||||
|
||||
|
||||
class CredentialType(StrEnum):
|
||||
API_KEY = "api-key"
|
||||
OAUTH2 = auto()
|
||||
|
||||
def get_name(self):
|
||||
if self == CredentialType.API_KEY:
|
||||
return "API KEY"
|
||||
elif self == CredentialType.OAUTH2:
|
||||
return "AUTH"
|
||||
else:
|
||||
return self.value.replace("-", " ").upper()
|
||||
|
||||
def is_editable(self):
|
||||
return self == CredentialType.API_KEY
|
||||
|
||||
def is_validate_allowed(self):
|
||||
return self == CredentialType.API_KEY
|
||||
|
||||
@classmethod
|
||||
def values(cls):
|
||||
return [item.value for item in cls]
|
||||
|
||||
@classmethod
|
||||
def of(cls, credential_type: str) -> "CredentialType":
|
||||
type_name = credential_type.lower()
|
||||
if type_name in {"api-key", "api_key"}:
|
||||
return cls.API_KEY
|
||||
elif type_name in {"oauth2", "oauth"}:
|
||||
return cls.OAUTH2
|
||||
else:
|
||||
raise ValueError(f"Invalid credential type: {credential_type}")
|
||||
|
||||
@ -21,7 +21,6 @@ from core.helper.module_import_helper import load_single_subclass_from_source
|
||||
from core.helper.position_helper import is_filtered
|
||||
from core.helper.provider_cache import ToolProviderCredentialsCache
|
||||
from core.model_runtime.utils.encoders import jsonable_encoder
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
from core.plugin.impl.tool import PluginToolManager
|
||||
from core.tools.__base.tool import Tool
|
||||
from core.tools.__base.tool_provider import ToolProviderController
|
||||
@ -35,6 +34,7 @@ from core.tools.entities.api_entities import ToolProviderApiEntity, ToolProvider
|
||||
from core.tools.entities.common_entities import I18nObject
|
||||
from core.tools.entities.tool_entities import (
|
||||
ApiProviderAuthType,
|
||||
CredentialType,
|
||||
ToolInvokeFrom,
|
||||
ToolParameter,
|
||||
ToolProviderType,
|
||||
|
||||
@ -1,24 +1,137 @@
|
||||
# Import generic components from provider_encryption module
|
||||
from core.helper.provider_encryption import (
|
||||
ProviderConfigCache,
|
||||
ProviderConfigEncrypter,
|
||||
create_provider_encrypter,
|
||||
)
|
||||
import contextlib
|
||||
from copy import deepcopy
|
||||
from typing import Any, Protocol
|
||||
|
||||
# Re-export for backward compatibility
|
||||
__all__ = [
|
||||
"ProviderConfigCache",
|
||||
"ProviderConfigEncrypter",
|
||||
"create_provider_encrypter",
|
||||
"create_tool_provider_encrypter",
|
||||
]
|
||||
|
||||
# Tool-specific imports
|
||||
from core.entities.provider_entities import BasicProviderConfig
|
||||
from core.helper import encrypter
|
||||
from core.helper.provider_cache import SingletonProviderCredentialsCache
|
||||
from core.tools.__base.tool_provider import ToolProviderController
|
||||
|
||||
|
||||
def create_tool_provider_encrypter(tenant_id: str, controller: ToolProviderController):
|
||||
class ProviderConfigCache(Protocol):
|
||||
"""
|
||||
Interface for provider configuration cache operations
|
||||
"""
|
||||
|
||||
def get(self) -> dict | None:
|
||||
"""Get cached provider configuration"""
|
||||
...
|
||||
|
||||
def set(self, config: dict[str, Any]):
|
||||
"""Cache provider configuration"""
|
||||
...
|
||||
|
||||
def delete(self):
|
||||
"""Delete cached provider configuration"""
|
||||
...
|
||||
|
||||
|
||||
class ProviderConfigEncrypter:
|
||||
tenant_id: str
|
||||
config: list[BasicProviderConfig]
|
||||
provider_config_cache: ProviderConfigCache
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
tenant_id: str,
|
||||
config: list[BasicProviderConfig],
|
||||
provider_config_cache: ProviderConfigCache,
|
||||
):
|
||||
self.tenant_id = tenant_id
|
||||
self.config = config
|
||||
self.provider_config_cache = provider_config_cache
|
||||
|
||||
def _deep_copy(self, data: dict[str, str]) -> dict[str, str]:
|
||||
"""
|
||||
deep copy data
|
||||
"""
|
||||
return deepcopy(data)
|
||||
|
||||
def encrypt(self, data: dict[str, str]) -> dict[str, str]:
|
||||
"""
|
||||
encrypt tool credentials with tenant id
|
||||
|
||||
return a deep copy of credentials with encrypted values
|
||||
"""
|
||||
data = self._deep_copy(data)
|
||||
|
||||
# get fields need to be decrypted
|
||||
fields = dict[str, BasicProviderConfig]()
|
||||
for credential in self.config:
|
||||
fields[credential.name] = credential
|
||||
|
||||
for field_name, field in fields.items():
|
||||
if field.type == BasicProviderConfig.Type.SECRET_INPUT:
|
||||
if field_name in data:
|
||||
encrypted = encrypter.encrypt_token(self.tenant_id, data[field_name] or "")
|
||||
data[field_name] = encrypted
|
||||
|
||||
return data
|
||||
|
||||
def mask_tool_credentials(self, data: dict[str, Any]) -> dict[str, Any]:
|
||||
"""
|
||||
mask tool credentials
|
||||
|
||||
return a deep copy of credentials with masked values
|
||||
"""
|
||||
data = self._deep_copy(data)
|
||||
|
||||
# get fields need to be decrypted
|
||||
fields = dict[str, BasicProviderConfig]()
|
||||
for credential in self.config:
|
||||
fields[credential.name] = credential
|
||||
|
||||
for field_name, field in fields.items():
|
||||
if field.type == BasicProviderConfig.Type.SECRET_INPUT:
|
||||
if field_name in data:
|
||||
if len(data[field_name]) > 6:
|
||||
data[field_name] = (
|
||||
data[field_name][:2] + "*" * (len(data[field_name]) - 4) + data[field_name][-2:]
|
||||
)
|
||||
else:
|
||||
data[field_name] = "*" * len(data[field_name])
|
||||
|
||||
return data
|
||||
|
||||
def decrypt(self, data: dict[str, str]) -> dict[str, Any]:
|
||||
"""
|
||||
decrypt tool credentials with tenant id
|
||||
|
||||
return a deep copy of credentials with decrypted values
|
||||
"""
|
||||
cached_credentials = self.provider_config_cache.get()
|
||||
if cached_credentials:
|
||||
return cached_credentials
|
||||
|
||||
data = self._deep_copy(data)
|
||||
# get fields need to be decrypted
|
||||
fields = dict[str, BasicProviderConfig]()
|
||||
for credential in self.config:
|
||||
fields[credential.name] = credential
|
||||
|
||||
for field_name, field in fields.items():
|
||||
if field.type == BasicProviderConfig.Type.SECRET_INPUT:
|
||||
if field_name in data:
|
||||
with contextlib.suppress(Exception):
|
||||
# if the value is None or empty string, skip decrypt
|
||||
if not data[field_name]:
|
||||
continue
|
||||
|
||||
data[field_name] = encrypter.decrypt_token(self.tenant_id, data[field_name])
|
||||
|
||||
self.provider_config_cache.set(data)
|
||||
return data
|
||||
|
||||
|
||||
def create_provider_encrypter(
|
||||
tenant_id: str, config: list[BasicProviderConfig], cache: ProviderConfigCache
|
||||
) -> tuple[ProviderConfigEncrypter, ProviderConfigCache]:
|
||||
return ProviderConfigEncrypter(tenant_id=tenant_id, config=config, provider_config_cache=cache), cache
|
||||
|
||||
|
||||
def create_tool_provider_encrypter(
|
||||
tenant_id: str, controller: ToolProviderController
|
||||
) -> tuple[ProviderConfigEncrypter, ProviderConfigCache]:
|
||||
cache = SingletonProviderCredentialsCache(
|
||||
tenant_id=tenant_id,
|
||||
provider_type=controller.provider_type.value,
|
||||
|
||||
@ -1 +0,0 @@
|
||||
# Core trigger module initialization
|
||||
@ -1,76 +0,0 @@
|
||||
from collections.abc import Mapping
|
||||
from typing import Any, Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from core.entities.provider_entities import ProviderConfig
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
from core.tools.entities.common_entities import I18nObject
|
||||
from core.trigger.entities.entities import (
|
||||
SubscriptionSchema,
|
||||
TriggerCreationMethod,
|
||||
TriggerDescription,
|
||||
TriggerIdentity,
|
||||
TriggerParameter,
|
||||
)
|
||||
|
||||
|
||||
class TriggerProviderSubscriptionApiEntity(BaseModel):
|
||||
id: str = Field(description="The unique id of the subscription")
|
||||
name: str = Field(description="The name of the subscription")
|
||||
provider: str = Field(description="The provider id of the subscription")
|
||||
credential_type: CredentialType = Field(description="The type of the credential")
|
||||
credentials: dict = Field(description="The credentials of the subscription")
|
||||
endpoint: str = Field(description="The endpoint of the subscription")
|
||||
parameters: dict = Field(description="The parameters of the subscription")
|
||||
properties: dict = Field(description="The properties of the subscription")
|
||||
workflows_in_use: int = Field(description="The number of workflows using this subscription")
|
||||
|
||||
|
||||
class TriggerApiEntity(BaseModel):
|
||||
name: str = Field(description="The name of the trigger")
|
||||
identity: TriggerIdentity = Field(description="The identity of the trigger")
|
||||
description: TriggerDescription = Field(description="The description of the trigger")
|
||||
parameters: list[TriggerParameter] = Field(description="The parameters of the trigger")
|
||||
output_schema: Optional[Mapping[str, Any]] = Field(description="The output schema of the trigger")
|
||||
|
||||
|
||||
class TriggerProviderApiEntity(BaseModel):
|
||||
author: str = Field(..., description="The author of the trigger provider")
|
||||
name: str = Field(..., description="The name of the trigger provider")
|
||||
label: I18nObject = Field(..., description="The label of the trigger provider")
|
||||
description: I18nObject = Field(..., description="The description of the trigger provider")
|
||||
icon: Optional[str] = Field(default=None, description="The icon of the trigger provider")
|
||||
icon_dark: Optional[str] = Field(default=None, description="The dark icon of the trigger provider")
|
||||
tags: list[str] = Field(default_factory=list, description="The tags of the trigger provider")
|
||||
|
||||
plugin_id: Optional[str] = Field(default="", description="The plugin id of the tool")
|
||||
plugin_unique_identifier: Optional[str] = Field(default="", description="The unique identifier of the tool")
|
||||
|
||||
supported_creation_methods: list[TriggerCreationMethod] = Field(
|
||||
default_factory=list,
|
||||
description="Supported creation methods for the trigger provider. Possible values: 'OAUTH', 'APIKEY', 'MANUAL'."
|
||||
)
|
||||
|
||||
credentials_schema: list[ProviderConfig] = Field(description="The credentials schema of the trigger provider")
|
||||
oauth_client_schema: list[ProviderConfig] = Field(
|
||||
default_factory=list, description="The schema of the OAuth client"
|
||||
)
|
||||
subscription_schema: Optional[SubscriptionSchema] = Field(
|
||||
description="The subscription schema of the trigger provider"
|
||||
)
|
||||
triggers: list[TriggerApiEntity] = Field(description="The triggers of the trigger provider")
|
||||
|
||||
|
||||
class SubscriptionBuilderApiEntity(BaseModel):
|
||||
id: str = Field(description="The id of the subscription builder")
|
||||
name: str = Field(description="The name of the subscription builder")
|
||||
provider: str = Field(description="The provider id of the subscription builder")
|
||||
endpoint: str = Field(description="The endpoint id of the subscription builder")
|
||||
parameters: Mapping[str, Any] = Field(description="The parameters of the subscription builder")
|
||||
properties: Mapping[str, Any] = Field(description="The properties of the subscription builder")
|
||||
credentials: Mapping[str, str] = Field(description="The credentials of the subscription builder")
|
||||
credential_type: CredentialType = Field(description="The credential type of the subscription builder")
|
||||
|
||||
|
||||
__all__ = ["TriggerApiEntity", "TriggerProviderApiEntity", "TriggerProviderSubscriptionApiEntity"]
|
||||
@ -1,309 +0,0 @@
|
||||
from collections.abc import Mapping
|
||||
from datetime import datetime
|
||||
from enum import StrEnum
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
from core.entities.provider_entities import ProviderConfig
|
||||
from core.plugin.entities.parameters import PluginParameterAutoGenerate, PluginParameterOption, PluginParameterTemplate
|
||||
from core.tools.entities.common_entities import I18nObject
|
||||
|
||||
|
||||
class TriggerParameterType(StrEnum):
|
||||
"""The type of the parameter"""
|
||||
|
||||
STRING = "string"
|
||||
NUMBER = "number"
|
||||
BOOLEAN = "boolean"
|
||||
SELECT = "select"
|
||||
FILE = "file"
|
||||
FILES = "files"
|
||||
MODEL_SELECTOR = "model-selector"
|
||||
APP_SELECTOR = "app-selector"
|
||||
OBJECT = "object"
|
||||
ARRAY = "array"
|
||||
DYNAMIC_SELECT = "dynamic-select"
|
||||
|
||||
|
||||
class TriggerParameter(BaseModel):
|
||||
"""
|
||||
The parameter of the trigger
|
||||
"""
|
||||
|
||||
name: str = Field(..., description="The name of the parameter")
|
||||
label: I18nObject = Field(..., description="The label presented to the user")
|
||||
type: TriggerParameterType = Field(..., description="The type of the parameter")
|
||||
auto_generate: Optional[PluginParameterAutoGenerate] = Field(
|
||||
default=None, description="The auto generate of the parameter"
|
||||
)
|
||||
template: Optional[PluginParameterTemplate] = Field(default=None, description="The template of the parameter")
|
||||
scope: Optional[str] = None
|
||||
required: Optional[bool] = False
|
||||
multiple: bool | None = Field(
|
||||
default=False,
|
||||
description="Whether the parameter is multiple select, only valid for select or dynamic-select type",
|
||||
)
|
||||
default: Union[int, float, str, list, None] = None
|
||||
min: Union[float, int, None] = None
|
||||
max: Union[float, int, None] = None
|
||||
precision: Optional[int] = None
|
||||
options: Optional[list[PluginParameterOption]] = None
|
||||
description: Optional[I18nObject] = None
|
||||
|
||||
|
||||
class TriggerProviderIdentity(BaseModel):
|
||||
"""
|
||||
The identity of the trigger provider
|
||||
"""
|
||||
|
||||
author: str = Field(..., description="The author of the trigger provider")
|
||||
name: str = Field(..., description="The name of the trigger provider")
|
||||
label: I18nObject = Field(..., description="The label of the trigger provider")
|
||||
description: I18nObject = Field(..., description="The description of the trigger provider")
|
||||
icon: Optional[str] = Field(default=None, description="The icon of the trigger provider")
|
||||
icon_dark: Optional[str] = Field(default=None, description="The dark icon of the trigger provider")
|
||||
tags: list[str] = Field(default_factory=list, description="The tags of the trigger provider")
|
||||
|
||||
|
||||
class TriggerIdentity(BaseModel):
|
||||
"""
|
||||
The identity of the trigger
|
||||
"""
|
||||
|
||||
author: str = Field(..., description="The author of the trigger")
|
||||
name: str = Field(..., description="The name of the trigger")
|
||||
label: I18nObject = Field(..., description="The label of the trigger")
|
||||
provider: Optional[str] = Field(default=None, description="The provider of the trigger")
|
||||
|
||||
|
||||
class TriggerDescription(BaseModel):
|
||||
"""
|
||||
The description of the trigger
|
||||
"""
|
||||
|
||||
human: I18nObject = Field(..., description="Human readable description")
|
||||
llm: I18nObject = Field(..., description="LLM readable description")
|
||||
|
||||
|
||||
class TriggerEntity(BaseModel):
|
||||
"""
|
||||
The configuration of a trigger
|
||||
"""
|
||||
|
||||
identity: TriggerIdentity = Field(..., description="The identity of the trigger")
|
||||
parameters: list[TriggerParameter] = Field(default=[], description="The parameters of the trigger")
|
||||
description: TriggerDescription = Field(..., description="The description of the trigger")
|
||||
output_schema: Optional[Mapping[str, Any]] = Field(
|
||||
default=None, description="The output schema that this trigger produces"
|
||||
)
|
||||
|
||||
|
||||
class OAuthSchema(BaseModel):
|
||||
client_schema: list[ProviderConfig] = Field(default_factory=list, description="The schema of the OAuth client")
|
||||
credentials_schema: list[ProviderConfig] = Field(
|
||||
default_factory=list, description="The schema of the OAuth credentials"
|
||||
)
|
||||
|
||||
|
||||
class SubscriptionSchema(BaseModel):
|
||||
"""
|
||||
The subscription schema of the trigger provider
|
||||
"""
|
||||
|
||||
parameters_schema: list[TriggerParameter] | None = Field(
|
||||
default_factory=list,
|
||||
description="The parameters schema required to create a subscription",
|
||||
)
|
||||
|
||||
properties_schema: list[ProviderConfig] | None = Field(
|
||||
default_factory=list,
|
||||
description="The configuration schema stored in the subscription entity",
|
||||
)
|
||||
|
||||
def get_default_parameters(self) -> Mapping[str, Any]:
|
||||
"""Get the default parameters from the parameters schema"""
|
||||
if not self.parameters_schema:
|
||||
return {}
|
||||
return {param.name: param.default for param in self.parameters_schema if param.default}
|
||||
|
||||
def get_default_properties(self) -> Mapping[str, Any]:
|
||||
"""Get the default properties from the properties schema"""
|
||||
if not self.properties_schema:
|
||||
return {}
|
||||
return {prop.name: prop.default for prop in self.properties_schema if prop.default}
|
||||
|
||||
|
||||
class TriggerProviderEntity(BaseModel):
|
||||
"""
|
||||
The configuration of a trigger provider
|
||||
"""
|
||||
|
||||
identity: TriggerProviderIdentity = Field(..., description="The identity of the trigger provider")
|
||||
credentials_schema: list[ProviderConfig] = Field(
|
||||
default_factory=list,
|
||||
description="The credentials schema of the trigger provider",
|
||||
)
|
||||
oauth_schema: Optional[OAuthSchema] = Field(
|
||||
default=None,
|
||||
description="The OAuth schema of the trigger provider if OAuth is supported",
|
||||
)
|
||||
subscription_schema: SubscriptionSchema = Field(
|
||||
description="The subscription schema for trigger(webhook, polling, etc.) subscription parameters",
|
||||
)
|
||||
triggers: list[TriggerEntity] = Field(default=[], description="The triggers of the trigger provider")
|
||||
|
||||
|
||||
class Subscription(BaseModel):
|
||||
"""
|
||||
Result of a successful trigger subscription operation.
|
||||
|
||||
Contains all information needed to manage the subscription lifecycle.
|
||||
"""
|
||||
|
||||
expires_at: int = Field(
|
||||
..., description="The timestamp when the subscription will expire, this for refresh the subscription"
|
||||
)
|
||||
|
||||
endpoint: str = Field(..., description="The webhook endpoint URL allocated by Dify for receiving events")
|
||||
properties: Mapping[str, Any] = Field(
|
||||
..., description="Subscription data containing all properties and provider-specific information"
|
||||
)
|
||||
|
||||
|
||||
class Unsubscription(BaseModel):
|
||||
"""
|
||||
Result of a trigger unsubscription operation.
|
||||
|
||||
Provides detailed information about the unsubscription attempt,
|
||||
including success status and error details if failed.
|
||||
"""
|
||||
|
||||
success: bool = Field(..., description="Whether the unsubscription was successful")
|
||||
|
||||
message: Optional[str] = Field(
|
||||
None,
|
||||
description="Human-readable message about the operation result. "
|
||||
"Success message for successful operations, "
|
||||
"detailed error information for failures.",
|
||||
)
|
||||
|
||||
|
||||
class RequestLog(BaseModel):
|
||||
id: str = Field(..., description="The id of the request log")
|
||||
endpoint: str = Field(..., description="The endpoint of the request log")
|
||||
request: dict = Field(..., description="The request of the request log")
|
||||
response: dict = Field(..., description="The response of the request log")
|
||||
created_at: datetime = Field(..., description="The created at of the request log")
|
||||
|
||||
|
||||
class SubscriptionBuilder(BaseModel):
|
||||
id: str = Field(..., description="The id of the subscription builder")
|
||||
name: str | None = Field(default=None, description="The name of the subscription builder")
|
||||
tenant_id: str = Field(..., description="The tenant id of the subscription builder")
|
||||
user_id: str = Field(..., description="The user id of the subscription builder")
|
||||
provider_id: str = Field(..., description="The provider id of the subscription builder")
|
||||
endpoint_id: str = Field(..., description="The endpoint id of the subscription builder")
|
||||
parameters: Mapping[str, Any] = Field(..., description="The parameters of the subscription builder")
|
||||
properties: Mapping[str, Any] = Field(..., description="The properties of the subscription builder")
|
||||
credentials: Mapping[str, str] = Field(..., description="The credentials of the subscription builder")
|
||||
credential_type: str | None = Field(default=None, description="The credential type of the subscription builder")
|
||||
credential_expires_at: int | None = Field(
|
||||
default=None, description="The credential expires at of the subscription builder"
|
||||
)
|
||||
expires_at: int = Field(..., description="The expires at of the subscription builder")
|
||||
|
||||
def to_subscription(self) -> Subscription:
|
||||
return Subscription(
|
||||
expires_at=self.expires_at,
|
||||
endpoint=self.endpoint_id,
|
||||
properties=self.properties,
|
||||
)
|
||||
|
||||
|
||||
class SubscriptionBuilderUpdater(BaseModel):
|
||||
name: str | None = Field(default=None, description="The name of the subscription builder")
|
||||
parameters: Mapping[str, Any] | None = Field(default=None, description="The parameters of the subscription builder")
|
||||
properties: Mapping[str, Any] | None = Field(default=None, description="The properties of the subscription builder")
|
||||
credentials: Mapping[str, str] | None = Field(
|
||||
default=None, description="The credentials of the subscription builder"
|
||||
)
|
||||
credential_type: str | None = Field(default=None, description="The credential type of the subscription builder")
|
||||
credential_expires_at: int | None = Field(
|
||||
default=None, description="The credential expires at of the subscription builder"
|
||||
)
|
||||
expires_at: int | None = Field(default=None, description="The expires at of the subscription builder")
|
||||
|
||||
def update(self, subscription_builder: SubscriptionBuilder) -> None:
|
||||
if self.name:
|
||||
subscription_builder.name = self.name
|
||||
if self.parameters:
|
||||
subscription_builder.parameters = self.parameters
|
||||
if self.properties:
|
||||
subscription_builder.properties = self.properties
|
||||
if self.credentials:
|
||||
subscription_builder.credentials = self.credentials
|
||||
if self.credential_type:
|
||||
subscription_builder.credential_type = self.credential_type
|
||||
if self.credential_expires_at:
|
||||
subscription_builder.credential_expires_at = self.credential_expires_at
|
||||
if self.expires_at:
|
||||
subscription_builder.expires_at = self.expires_at
|
||||
|
||||
|
||||
class TriggerEventData(BaseModel):
|
||||
"""Event data dispatched to trigger sessions."""
|
||||
|
||||
subscription_id: str
|
||||
triggers: list[str]
|
||||
request_id: str
|
||||
timestamp: float
|
||||
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
|
||||
|
||||
class TriggerInputs(BaseModel):
|
||||
"""Standard inputs for trigger nodes."""
|
||||
|
||||
request_id: str
|
||||
trigger_name: str
|
||||
subscription_id: str
|
||||
|
||||
@classmethod
|
||||
def from_trigger_entity(cls, request_id: str, subscription_id: str, trigger: TriggerEntity) -> "TriggerInputs":
|
||||
"""Create from trigger entity (for production)."""
|
||||
return cls(request_id=request_id, trigger_name=trigger.identity.name, subscription_id=subscription_id)
|
||||
|
||||
def to_workflow_args(self) -> dict[str, Any]:
|
||||
"""Convert to workflow arguments format."""
|
||||
return {"inputs": self.model_dump(), "files": []}
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Convert to dict (alias for model_dump)."""
|
||||
return self.model_dump()
|
||||
|
||||
|
||||
class TriggerCreationMethod(StrEnum):
|
||||
OAUTH = "OAUTH"
|
||||
APIKEY = "APIKEY"
|
||||
MANUAL = "MANUAL"
|
||||
|
||||
|
||||
# Export all entities
|
||||
__all__ = [
|
||||
"OAuthSchema",
|
||||
"RequestLog",
|
||||
"Subscription",
|
||||
"SubscriptionBuilder",
|
||||
"TriggerCreationMethod",
|
||||
"TriggerDescription",
|
||||
"TriggerEntity",
|
||||
"TriggerEventData",
|
||||
"TriggerIdentity",
|
||||
"TriggerInputs",
|
||||
"TriggerParameter",
|
||||
"TriggerParameterType",
|
||||
"TriggerProviderEntity",
|
||||
"TriggerProviderIdentity",
|
||||
"Unsubscription",
|
||||
]
|
||||
@ -1,2 +0,0 @@
|
||||
class TriggerProviderCredentialValidationError(ValueError):
|
||||
pass
|
||||
@ -1,358 +0,0 @@
|
||||
"""
|
||||
Trigger Provider Controller for managing trigger providers
|
||||
"""
|
||||
|
||||
import logging
|
||||
from collections.abc import Mapping
|
||||
from typing import Any, Optional
|
||||
|
||||
from flask import Request
|
||||
|
||||
from core.entities.provider_entities import BasicProviderConfig
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
from core.plugin.entities.request import (
|
||||
TriggerDispatchResponse,
|
||||
TriggerInvokeResponse,
|
||||
)
|
||||
from core.plugin.impl.trigger import PluginTriggerManager
|
||||
from core.trigger.entities.api_entities import TriggerApiEntity, TriggerProviderApiEntity
|
||||
from core.trigger.entities.entities import (
|
||||
ProviderConfig,
|
||||
Subscription,
|
||||
SubscriptionSchema,
|
||||
TriggerCreationMethod,
|
||||
TriggerEntity,
|
||||
TriggerProviderEntity,
|
||||
TriggerProviderIdentity,
|
||||
Unsubscription,
|
||||
)
|
||||
from core.trigger.errors import TriggerProviderCredentialValidationError
|
||||
from models.provider_ids import TriggerProviderID
|
||||
from services.plugin.plugin_service import PluginService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PluginTriggerProviderController:
|
||||
"""
|
||||
Controller for plugin trigger providers
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
entity: TriggerProviderEntity,
|
||||
plugin_id: str,
|
||||
plugin_unique_identifier: str,
|
||||
provider_id: TriggerProviderID,
|
||||
tenant_id: str,
|
||||
):
|
||||
"""
|
||||
Initialize plugin trigger provider controller
|
||||
|
||||
:param entity: Trigger provider entity
|
||||
:param plugin_id: Plugin ID
|
||||
:param plugin_unique_identifier: Plugin unique identifier
|
||||
:param provider_id: Provider ID
|
||||
:param tenant_id: Tenant ID
|
||||
"""
|
||||
self.entity = entity
|
||||
self.tenant_id = tenant_id
|
||||
self.plugin_id = plugin_id
|
||||
self.provider_id = provider_id
|
||||
self.plugin_unique_identifier = plugin_unique_identifier
|
||||
|
||||
def get_provider_id(self) -> TriggerProviderID:
|
||||
"""
|
||||
Get provider ID
|
||||
"""
|
||||
return self.provider_id
|
||||
|
||||
def to_api_entity(self) -> TriggerProviderApiEntity:
|
||||
"""
|
||||
Convert to API entity
|
||||
"""
|
||||
icon = (
|
||||
PluginService.get_plugin_icon_url(self.tenant_id, self.entity.identity.icon)
|
||||
if self.entity.identity.icon
|
||||
else None
|
||||
)
|
||||
icon_dark = (
|
||||
PluginService.get_plugin_icon_url(self.tenant_id, self.entity.identity.icon_dark)
|
||||
if self.entity.identity.icon_dark
|
||||
else None
|
||||
)
|
||||
supported_creation_methods = []
|
||||
if self.entity.oauth_schema:
|
||||
supported_creation_methods.append(TriggerCreationMethod.OAUTH)
|
||||
if self.entity.credentials_schema:
|
||||
supported_creation_methods.append(TriggerCreationMethod.APIKEY)
|
||||
if self.entity.subscription_schema:
|
||||
supported_creation_methods.append(TriggerCreationMethod.MANUAL)
|
||||
return TriggerProviderApiEntity(
|
||||
author=self.entity.identity.author,
|
||||
name=self.entity.identity.name,
|
||||
label=self.entity.identity.label,
|
||||
description=self.entity.identity.description,
|
||||
icon=icon,
|
||||
icon_dark=icon_dark,
|
||||
tags=self.entity.identity.tags,
|
||||
plugin_id=self.plugin_id,
|
||||
plugin_unique_identifier=self.plugin_unique_identifier,
|
||||
credentials_schema=self.entity.credentials_schema,
|
||||
oauth_client_schema=self.entity.oauth_schema.client_schema if self.entity.oauth_schema else [],
|
||||
subscription_schema=self.entity.subscription_schema,
|
||||
supported_creation_methods=supported_creation_methods,
|
||||
triggers=[
|
||||
TriggerApiEntity(
|
||||
name=trigger.identity.name,
|
||||
identity=trigger.identity,
|
||||
description=trigger.description,
|
||||
parameters=trigger.parameters,
|
||||
output_schema=trigger.output_schema,
|
||||
)
|
||||
for trigger in self.entity.triggers
|
||||
],
|
||||
)
|
||||
|
||||
@property
|
||||
def identity(self) -> TriggerProviderIdentity:
|
||||
"""Get provider identity"""
|
||||
return self.entity.identity
|
||||
|
||||
def get_triggers(self) -> list[TriggerEntity]:
|
||||
"""
|
||||
Get all triggers for this provider
|
||||
|
||||
:return: List of trigger entities
|
||||
"""
|
||||
return self.entity.triggers
|
||||
|
||||
def get_trigger(self, trigger_name: str) -> Optional[TriggerEntity]:
|
||||
"""
|
||||
Get a specific trigger by name
|
||||
|
||||
:param trigger_name: Trigger name
|
||||
:return: Trigger entity or None
|
||||
"""
|
||||
for trigger in self.entity.triggers:
|
||||
if trigger.identity.name == trigger_name:
|
||||
return trigger
|
||||
return None
|
||||
|
||||
def get_subscription_schema(self) -> SubscriptionSchema:
|
||||
"""
|
||||
Get subscription schema for this provider
|
||||
|
||||
:return: List of subscription config schemas
|
||||
"""
|
||||
return self.entity.subscription_schema
|
||||
|
||||
def validate_credentials(self, user_id: str, credentials: Mapping[str, str]) -> None:
|
||||
"""
|
||||
Validate credentials against schema
|
||||
|
||||
:param credentials: Credentials to validate
|
||||
:return: Validation response
|
||||
"""
|
||||
# First validate against schema
|
||||
for config in self.entity.credentials_schema:
|
||||
if config.required and config.name not in credentials:
|
||||
raise TriggerProviderCredentialValidationError(f"Missing required credential field: {config.name}")
|
||||
|
||||
# Then validate with the plugin daemon
|
||||
manager = PluginTriggerManager()
|
||||
provider_id = self.get_provider_id()
|
||||
response = manager.validate_provider_credentials(
|
||||
tenant_id=self.tenant_id,
|
||||
user_id=user_id,
|
||||
provider=str(provider_id),
|
||||
credentials=credentials,
|
||||
)
|
||||
if not response:
|
||||
raise TriggerProviderCredentialValidationError(
|
||||
"Invalid credentials",
|
||||
)
|
||||
|
||||
def get_supported_credential_types(self) -> list[CredentialType]:
|
||||
"""
|
||||
Get supported credential types for this provider.
|
||||
|
||||
:return: List of supported credential types
|
||||
"""
|
||||
types = []
|
||||
if self.entity.oauth_schema:
|
||||
types.append(CredentialType.OAUTH2)
|
||||
if self.entity.credentials_schema:
|
||||
types.append(CredentialType.API_KEY)
|
||||
return types
|
||||
|
||||
def get_credentials_schema(self, credential_type: CredentialType | str) -> list[ProviderConfig]:
|
||||
"""
|
||||
Get credentials schema by credential type
|
||||
|
||||
:param credential_type: The type of credential (oauth or api_key)
|
||||
:return: List of provider config schemas
|
||||
"""
|
||||
credential_type = CredentialType.of(credential_type) if isinstance(credential_type, str) else credential_type
|
||||
if credential_type == CredentialType.OAUTH2:
|
||||
return self.entity.oauth_schema.credentials_schema.copy() if self.entity.oauth_schema else []
|
||||
if credential_type == CredentialType.API_KEY:
|
||||
return self.entity.credentials_schema.copy() if self.entity.credentials_schema else []
|
||||
if credential_type == CredentialType.UNAUTHORIZED:
|
||||
return []
|
||||
raise ValueError(f"Invalid credential type: {credential_type}")
|
||||
|
||||
def get_credential_schema_config(self, credential_type: CredentialType | str) -> list[BasicProviderConfig]:
|
||||
"""
|
||||
Get credential schema config by credential type
|
||||
"""
|
||||
return [x.to_basic_provider_config() for x in self.get_credentials_schema(credential_type)]
|
||||
|
||||
def get_oauth_client_schema(self) -> list[ProviderConfig]:
|
||||
"""
|
||||
Get OAuth client schema for this provider
|
||||
|
||||
:return: List of OAuth client config schemas
|
||||
"""
|
||||
return self.entity.oauth_schema.client_schema.copy() if self.entity.oauth_schema else []
|
||||
|
||||
def get_properties_schema(self) -> list[BasicProviderConfig]:
|
||||
"""
|
||||
Get properties schema for this provider
|
||||
|
||||
:return: List of properties config schemas
|
||||
"""
|
||||
return (
|
||||
[x.to_basic_provider_config() for x in self.entity.subscription_schema.properties_schema.copy()]
|
||||
if self.entity.subscription_schema.properties_schema
|
||||
else []
|
||||
)
|
||||
|
||||
def dispatch(self, user_id: str, request: Request, subscription: Subscription) -> TriggerDispatchResponse:
|
||||
"""
|
||||
Dispatch a trigger through plugin runtime
|
||||
|
||||
:param user_id: User ID
|
||||
:param request: Flask request object
|
||||
:param subscription: Subscription
|
||||
:return: Dispatch response with triggers and raw HTTP response
|
||||
"""
|
||||
manager = PluginTriggerManager()
|
||||
provider_id = self.get_provider_id()
|
||||
|
||||
response = manager.dispatch_event(
|
||||
tenant_id=self.tenant_id,
|
||||
user_id=user_id,
|
||||
provider=str(provider_id),
|
||||
subscription=subscription.model_dump(),
|
||||
request=request,
|
||||
)
|
||||
return response
|
||||
|
||||
def invoke_trigger(
|
||||
self,
|
||||
user_id: str,
|
||||
trigger_name: str,
|
||||
parameters: Mapping[str, Any],
|
||||
credentials: Mapping[str, str],
|
||||
credential_type: CredentialType,
|
||||
request: Request,
|
||||
) -> TriggerInvokeResponse:
|
||||
"""
|
||||
Execute a trigger through plugin runtime
|
||||
|
||||
:param user_id: User ID
|
||||
:param trigger_name: Trigger name
|
||||
:param parameters: Trigger parameters
|
||||
:param credentials: Provider credentials
|
||||
:param credential_type: Credential type
|
||||
:param request: Request
|
||||
:return: Trigger execution result
|
||||
"""
|
||||
manager = PluginTriggerManager()
|
||||
provider_id = self.get_provider_id()
|
||||
|
||||
return manager.invoke_trigger(
|
||||
tenant_id=self.tenant_id,
|
||||
user_id=user_id,
|
||||
provider=str(provider_id),
|
||||
trigger=trigger_name,
|
||||
credentials=credentials,
|
||||
credential_type=credential_type,
|
||||
request=request,
|
||||
parameters=parameters,
|
||||
)
|
||||
|
||||
def subscribe_trigger(
|
||||
self, user_id: str, endpoint: str, parameters: Mapping[str, Any], credentials: Mapping[str, str]
|
||||
) -> Subscription:
|
||||
"""
|
||||
Subscribe to a trigger through plugin runtime
|
||||
|
||||
:param user_id: User ID
|
||||
:param endpoint: Subscription endpoint
|
||||
:param subscription_params: Subscription parameters
|
||||
:param credentials: Provider credentials
|
||||
:return: Subscription result
|
||||
"""
|
||||
manager = PluginTriggerManager()
|
||||
provider_id = self.get_provider_id()
|
||||
|
||||
response = manager.subscribe(
|
||||
tenant_id=self.tenant_id,
|
||||
user_id=user_id,
|
||||
provider=str(provider_id),
|
||||
credentials=credentials,
|
||||
endpoint=endpoint,
|
||||
parameters=parameters,
|
||||
)
|
||||
|
||||
return Subscription.model_validate(response.subscription)
|
||||
|
||||
def unsubscribe_trigger(
|
||||
self, user_id: str, subscription: Subscription, credentials: Mapping[str, str]
|
||||
) -> Unsubscription:
|
||||
"""
|
||||
Unsubscribe from a trigger through plugin runtime
|
||||
|
||||
:param user_id: User ID
|
||||
:param subscription: Subscription metadata
|
||||
:param credentials: Provider credentials
|
||||
:return: Unsubscription result
|
||||
"""
|
||||
manager = PluginTriggerManager()
|
||||
provider_id = self.get_provider_id()
|
||||
|
||||
response = manager.unsubscribe(
|
||||
tenant_id=self.tenant_id,
|
||||
user_id=user_id,
|
||||
provider=str(provider_id),
|
||||
subscription=subscription,
|
||||
credentials=credentials,
|
||||
)
|
||||
|
||||
return Unsubscription.model_validate(response.subscription)
|
||||
|
||||
def refresh_trigger(self, subscription: Subscription, credentials: Mapping[str, str]) -> Subscription:
|
||||
"""
|
||||
Refresh a trigger subscription through plugin runtime
|
||||
|
||||
:param subscription: Subscription metadata
|
||||
:param credentials: Provider credentials
|
||||
:return: Refreshed subscription result
|
||||
"""
|
||||
manager = PluginTriggerManager()
|
||||
provider_id = self.get_provider_id()
|
||||
|
||||
response = manager.refresh(
|
||||
tenant_id=self.tenant_id,
|
||||
user_id="system", # System refresh
|
||||
provider=str(provider_id),
|
||||
subscription=subscription,
|
||||
credentials=credentials,
|
||||
)
|
||||
|
||||
return Subscription.model_validate(response.subscription)
|
||||
|
||||
|
||||
__all__ = ["PluginTriggerProviderController"]
|
||||
@ -1,254 +0,0 @@
|
||||
"""
|
||||
Trigger Manager for loading and managing trigger providers and triggers
|
||||
"""
|
||||
|
||||
import logging
|
||||
from collections.abc import Mapping
|
||||
from threading import Lock
|
||||
from typing import Any, Optional
|
||||
|
||||
from flask import Request
|
||||
|
||||
import contexts
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
from core.plugin.entities.request import TriggerInvokeResponse
|
||||
from core.plugin.impl.trigger import PluginTriggerManager
|
||||
from core.trigger.entities.entities import (
|
||||
Subscription,
|
||||
SubscriptionSchema,
|
||||
TriggerEntity,
|
||||
Unsubscription,
|
||||
)
|
||||
from core.trigger.provider import PluginTriggerProviderController
|
||||
from models.provider_ids import TriggerProviderID
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TriggerManager:
|
||||
"""
|
||||
Manager for trigger providers and triggers
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def list_plugin_trigger_providers(cls, tenant_id: str) -> list[PluginTriggerProviderController]:
|
||||
"""
|
||||
List all plugin trigger providers for a tenant
|
||||
|
||||
:param tenant_id: Tenant ID
|
||||
:return: List of trigger provider controllers
|
||||
"""
|
||||
manager = PluginTriggerManager()
|
||||
provider_entities = manager.fetch_trigger_providers(tenant_id)
|
||||
|
||||
controllers = []
|
||||
for provider in provider_entities:
|
||||
try:
|
||||
controller = PluginTriggerProviderController(
|
||||
entity=provider.declaration,
|
||||
plugin_id=provider.plugin_id,
|
||||
plugin_unique_identifier=provider.plugin_unique_identifier,
|
||||
provider_id=TriggerProviderID(provider.provider),
|
||||
tenant_id=tenant_id,
|
||||
)
|
||||
controllers.append(controller)
|
||||
except Exception:
|
||||
logger.exception("Failed to load trigger provider %s", provider.plugin_id)
|
||||
continue
|
||||
|
||||
return controllers
|
||||
|
||||
@classmethod
|
||||
def get_trigger_provider(cls, tenant_id: str, provider_id: TriggerProviderID) -> PluginTriggerProviderController:
|
||||
"""
|
||||
Get a specific plugin trigger provider
|
||||
|
||||
:param tenant_id: Tenant ID
|
||||
:param provider_id: Provider ID
|
||||
:return: Trigger provider controller or None
|
||||
"""
|
||||
# check if context is set
|
||||
try:
|
||||
contexts.plugin_trigger_providers.get()
|
||||
except LookupError:
|
||||
contexts.plugin_trigger_providers.set({})
|
||||
contexts.plugin_trigger_providers_lock.set(Lock())
|
||||
|
||||
plugin_trigger_providers = contexts.plugin_trigger_providers.get()
|
||||
provider_id_str = str(provider_id)
|
||||
if provider_id_str in plugin_trigger_providers:
|
||||
return plugin_trigger_providers[provider_id_str]
|
||||
|
||||
with contexts.plugin_trigger_providers_lock.get():
|
||||
# double check
|
||||
plugin_trigger_providers = contexts.plugin_trigger_providers.get()
|
||||
if provider_id_str in plugin_trigger_providers:
|
||||
return plugin_trigger_providers[provider_id_str]
|
||||
|
||||
manager = PluginTriggerManager()
|
||||
provider = manager.fetch_trigger_provider(tenant_id, provider_id)
|
||||
|
||||
if not provider:
|
||||
raise ValueError(f"Trigger provider {provider_id} not found")
|
||||
|
||||
try:
|
||||
controller = PluginTriggerProviderController(
|
||||
entity=provider.declaration,
|
||||
plugin_id=provider.plugin_id,
|
||||
plugin_unique_identifier=provider.plugin_unique_identifier,
|
||||
provider_id=provider_id,
|
||||
tenant_id=tenant_id,
|
||||
)
|
||||
plugin_trigger_providers[provider_id_str] = controller
|
||||
return controller
|
||||
except Exception as e:
|
||||
logger.exception("Failed to load trigger provider")
|
||||
raise e
|
||||
|
||||
@classmethod
|
||||
def list_all_trigger_providers(cls, tenant_id: str) -> list[PluginTriggerProviderController]:
|
||||
"""
|
||||
List all trigger providers (plugin)
|
||||
|
||||
:param tenant_id: Tenant ID
|
||||
:return: List of all trigger provider controllers
|
||||
"""
|
||||
return cls.list_plugin_trigger_providers(tenant_id)
|
||||
|
||||
@classmethod
|
||||
def list_triggers_by_provider(cls, tenant_id: str, provider_id: TriggerProviderID) -> list[TriggerEntity]:
|
||||
"""
|
||||
List all triggers for a specific provider
|
||||
|
||||
:param tenant_id: Tenant ID
|
||||
:param provider_id: Provider ID
|
||||
:return: List of trigger entities
|
||||
"""
|
||||
provider = cls.get_trigger_provider(tenant_id, provider_id)
|
||||
return provider.get_triggers()
|
||||
|
||||
@classmethod
|
||||
def get_trigger(cls, tenant_id: str, provider_id: TriggerProviderID, trigger_name: str) -> Optional[TriggerEntity]:
|
||||
"""
|
||||
Get a specific trigger
|
||||
|
||||
:param tenant_id: Tenant ID
|
||||
:param provider_id: Provider ID
|
||||
:param trigger_name: Trigger name
|
||||
:return: Trigger entity or None
|
||||
"""
|
||||
return cls.get_trigger_provider(tenant_id, provider_id).get_trigger(trigger_name)
|
||||
|
||||
@classmethod
|
||||
def invoke_trigger(
|
||||
cls,
|
||||
tenant_id: str,
|
||||
user_id: str,
|
||||
provider_id: TriggerProviderID,
|
||||
trigger_name: str,
|
||||
parameters: Mapping[str, Any],
|
||||
credentials: Mapping[str, str],
|
||||
credential_type: CredentialType,
|
||||
request: Request,
|
||||
) -> TriggerInvokeResponse:
|
||||
"""
|
||||
Execute a trigger
|
||||
|
||||
:param tenant_id: Tenant ID
|
||||
:param user_id: User ID
|
||||
:param provider_id: Provider ID
|
||||
:param trigger_name: Trigger name
|
||||
:param parameters: Trigger parameters
|
||||
:param credentials: Provider credentials
|
||||
:param credential_type: Credential type
|
||||
:param request: Request
|
||||
:return: Trigger execution result
|
||||
"""
|
||||
provider = cls.get_trigger_provider(tenant_id, provider_id)
|
||||
trigger = provider.get_trigger(trigger_name)
|
||||
if not trigger:
|
||||
raise ValueError(f"Trigger {trigger_name} not found in provider {provider_id}")
|
||||
return provider.invoke_trigger(user_id, trigger_name, parameters, credentials, credential_type, request)
|
||||
|
||||
@classmethod
|
||||
def subscribe_trigger(
|
||||
cls,
|
||||
tenant_id: str,
|
||||
user_id: str,
|
||||
provider_id: TriggerProviderID,
|
||||
endpoint: str,
|
||||
parameters: Mapping[str, Any],
|
||||
credentials: Mapping[str, str],
|
||||
) -> Subscription:
|
||||
"""
|
||||
Subscribe to a trigger (e.g., register webhook)
|
||||
|
||||
:param tenant_id: Tenant ID
|
||||
:param user_id: User ID
|
||||
:param provider_id: Provider ID
|
||||
:param endpoint: Subscription endpoint
|
||||
:param parameters: Subscription parameters
|
||||
:param credentials: Provider credentials
|
||||
:return: Subscription result
|
||||
"""
|
||||
provider = cls.get_trigger_provider(tenant_id, provider_id)
|
||||
return provider.subscribe_trigger(
|
||||
user_id=user_id, endpoint=endpoint, parameters=parameters, credentials=credentials
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def unsubscribe_trigger(
|
||||
cls,
|
||||
tenant_id: str,
|
||||
user_id: str,
|
||||
provider_id: TriggerProviderID,
|
||||
subscription: Subscription,
|
||||
credentials: Mapping[str, str],
|
||||
) -> Unsubscription:
|
||||
"""
|
||||
Unsubscribe from a trigger
|
||||
|
||||
:param tenant_id: Tenant ID
|
||||
:param user_id: User ID
|
||||
:param provider_id: Provider ID
|
||||
:param subscription: Subscription metadata from subscribe operation
|
||||
:param credentials: Provider credentials
|
||||
:return: Unsubscription result
|
||||
"""
|
||||
provider = cls.get_trigger_provider(tenant_id, provider_id)
|
||||
return provider.unsubscribe_trigger(user_id=user_id, subscription=subscription, credentials=credentials)
|
||||
|
||||
@classmethod
|
||||
def get_provider_subscription_schema(cls, tenant_id: str, provider_id: TriggerProviderID) -> SubscriptionSchema:
|
||||
"""
|
||||
Get provider subscription schema
|
||||
|
||||
:param tenant_id: Tenant ID
|
||||
:param provider_id: Provider ID
|
||||
:return: List of subscription config schemas
|
||||
"""
|
||||
return cls.get_trigger_provider(tenant_id, provider_id).get_subscription_schema()
|
||||
|
||||
@classmethod
|
||||
def refresh_trigger(
|
||||
cls,
|
||||
tenant_id: str,
|
||||
provider_id: TriggerProviderID,
|
||||
subscription: Subscription,
|
||||
credentials: Mapping[str, str],
|
||||
) -> Subscription:
|
||||
"""
|
||||
Refresh a trigger subscription
|
||||
|
||||
:param tenant_id: Tenant ID
|
||||
:param provider_id: Provider ID
|
||||
:param trigger_name: Trigger name
|
||||
:param subscription: Subscription metadata from subscribe operation
|
||||
:param credentials: Provider credentials
|
||||
:return: Refreshed subscription result
|
||||
"""
|
||||
return cls.get_trigger_provider(tenant_id, provider_id).refresh_trigger(subscription, credentials)
|
||||
|
||||
|
||||
# Export
|
||||
__all__ = ["TriggerManager"]
|
||||
@ -1,145 +0,0 @@
|
||||
from collections.abc import Mapping
|
||||
from typing import Union
|
||||
|
||||
from core.entities.provider_entities import BasicProviderConfig, ProviderConfig
|
||||
from core.helper.provider_cache import ProviderCredentialsCache
|
||||
from core.helper.provider_encryption import ProviderConfigCache, ProviderConfigEncrypter, create_provider_encrypter
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
from core.trigger.entities.api_entities import TriggerProviderSubscriptionApiEntity
|
||||
from core.trigger.provider import PluginTriggerProviderController
|
||||
from models.trigger import TriggerSubscription
|
||||
|
||||
|
||||
class TriggerProviderCredentialsCache(ProviderCredentialsCache):
|
||||
"""Cache for trigger provider credentials"""
|
||||
|
||||
def __init__(self, tenant_id: str, provider_id: str, credential_id: str):
|
||||
super().__init__(tenant_id=tenant_id, provider_id=provider_id, credential_id=credential_id)
|
||||
|
||||
def _generate_cache_key(self, **kwargs) -> str:
|
||||
tenant_id = kwargs["tenant_id"]
|
||||
provider_id = kwargs["provider_id"]
|
||||
credential_id = kwargs["credential_id"]
|
||||
return f"trigger_credentials:tenant_id:{tenant_id}:provider_id:{provider_id}:credential_id:{credential_id}"
|
||||
|
||||
|
||||
class TriggerProviderOAuthClientParamsCache(ProviderCredentialsCache):
|
||||
"""Cache for trigger provider OAuth client"""
|
||||
|
||||
def __init__(self, tenant_id: str, provider_id: str):
|
||||
super().__init__(tenant_id=tenant_id, provider_id=provider_id)
|
||||
|
||||
def _generate_cache_key(self, **kwargs) -> str:
|
||||
tenant_id = kwargs["tenant_id"]
|
||||
provider_id = kwargs["provider_id"]
|
||||
return f"trigger_oauth_client:tenant_id:{tenant_id}:provider_id:{provider_id}"
|
||||
|
||||
|
||||
class TriggerProviderPropertiesCache(ProviderCredentialsCache):
|
||||
"""Cache for trigger provider properties"""
|
||||
|
||||
def __init__(self, tenant_id: str, provider_id: str, subscription_id: str):
|
||||
super().__init__(tenant_id=tenant_id, provider_id=provider_id, subscription_id=subscription_id)
|
||||
|
||||
def _generate_cache_key(self, **kwargs) -> str:
|
||||
tenant_id = kwargs["tenant_id"]
|
||||
provider_id = kwargs["provider_id"]
|
||||
subscription_id = kwargs["subscription_id"]
|
||||
return f"trigger_properties:tenant_id:{tenant_id}:provider_id:{provider_id}:subscription_id:{subscription_id}"
|
||||
|
||||
|
||||
def create_trigger_provider_encrypter_for_subscription(
|
||||
tenant_id: str,
|
||||
controller: PluginTriggerProviderController,
|
||||
subscription: Union[TriggerSubscription, TriggerProviderSubscriptionApiEntity],
|
||||
) -> tuple[ProviderConfigEncrypter, ProviderConfigCache]:
|
||||
cache = TriggerProviderCredentialsCache(
|
||||
tenant_id=tenant_id,
|
||||
provider_id=str(controller.get_provider_id()),
|
||||
credential_id=subscription.id,
|
||||
)
|
||||
encrypter, _ = create_provider_encrypter(
|
||||
tenant_id=tenant_id,
|
||||
config=controller.get_credential_schema_config(subscription.credential_type),
|
||||
cache=cache,
|
||||
)
|
||||
return encrypter, cache
|
||||
|
||||
|
||||
def delete_cache_for_subscription(tenant_id: str, provider_id: str, subscription_id: str):
|
||||
cache = TriggerProviderCredentialsCache(
|
||||
tenant_id=tenant_id,
|
||||
provider_id=provider_id,
|
||||
credential_id=subscription_id,
|
||||
)
|
||||
cache.delete()
|
||||
|
||||
|
||||
def create_trigger_provider_encrypter_for_properties(
|
||||
tenant_id: str,
|
||||
controller: PluginTriggerProviderController,
|
||||
subscription: Union[TriggerSubscription, TriggerProviderSubscriptionApiEntity],
|
||||
) -> tuple[ProviderConfigEncrypter, ProviderConfigCache]:
|
||||
cache = TriggerProviderPropertiesCache(
|
||||
tenant_id=tenant_id,
|
||||
provider_id=str(controller.get_provider_id()),
|
||||
subscription_id=subscription.id,
|
||||
)
|
||||
encrypter, _ = create_provider_encrypter(
|
||||
tenant_id=tenant_id,
|
||||
config=controller.get_properties_schema(),
|
||||
cache=cache,
|
||||
)
|
||||
return encrypter, cache
|
||||
|
||||
|
||||
def create_trigger_provider_encrypter(
|
||||
tenant_id: str, controller: PluginTriggerProviderController, credential_id: str, credential_type: CredentialType
|
||||
) -> tuple[ProviderConfigEncrypter, ProviderConfigCache]:
|
||||
cache = TriggerProviderCredentialsCache(
|
||||
tenant_id=tenant_id,
|
||||
provider_id=str(controller.get_provider_id()),
|
||||
credential_id=credential_id,
|
||||
)
|
||||
encrypter, _ = create_provider_encrypter(
|
||||
tenant_id=tenant_id,
|
||||
config=controller.get_credential_schema_config(credential_type),
|
||||
cache=cache,
|
||||
)
|
||||
return encrypter, cache
|
||||
|
||||
|
||||
def create_trigger_provider_oauth_encrypter(
|
||||
tenant_id: str, controller: PluginTriggerProviderController
|
||||
) -> tuple[ProviderConfigEncrypter, ProviderConfigCache]:
|
||||
cache = TriggerProviderOAuthClientParamsCache(
|
||||
tenant_id=tenant_id,
|
||||
provider_id=str(controller.get_provider_id()),
|
||||
)
|
||||
encrypter, _ = create_provider_encrypter(
|
||||
tenant_id=tenant_id,
|
||||
config=[x.to_basic_provider_config() for x in controller.get_oauth_client_schema()],
|
||||
cache=cache,
|
||||
)
|
||||
return encrypter, cache
|
||||
|
||||
|
||||
def masked_credentials(
|
||||
schemas: list[ProviderConfig],
|
||||
credentials: Mapping[str, str],
|
||||
) -> Mapping[str, str]:
|
||||
masked_credentials = {}
|
||||
configs = {x.name: x.to_basic_provider_config() for x in schemas}
|
||||
for key, value in credentials.items():
|
||||
config = configs.get(key)
|
||||
if not config:
|
||||
masked_credentials[key] = value
|
||||
continue
|
||||
if config.type == BasicProviderConfig.Type.SECRET_INPUT:
|
||||
if len(value) <= 4:
|
||||
masked_credentials[key] = "*" * len(value)
|
||||
else:
|
||||
masked_credentials[key] = value[:2] + "*" * (len(value) - 4) + value[-2:]
|
||||
else:
|
||||
masked_credentials[key] = value
|
||||
return masked_credentials
|
||||
@ -1,5 +0,0 @@
|
||||
from configs import dify_config
|
||||
|
||||
|
||||
def parse_endpoint_id(endpoint_id: str) -> str:
|
||||
return f"{dify_config.CONSOLE_API_URL}/triggers/plugin/{endpoint_id}"
|
||||
@ -58,18 +58,6 @@ class NodeType(StrEnum):
|
||||
DOCUMENT_EXTRACTOR = "document-extractor"
|
||||
LIST_OPERATOR = "list-operator"
|
||||
AGENT = "agent"
|
||||
TRIGGER_WEBHOOK = "trigger-webhook"
|
||||
TRIGGER_SCHEDULE = "trigger-schedule"
|
||||
TRIGGER_PLUGIN = "trigger-plugin"
|
||||
|
||||
@property
|
||||
def is_start_node(self) -> bool:
|
||||
return self in [
|
||||
NodeType.START,
|
||||
NodeType.TRIGGER_WEBHOOK,
|
||||
NodeType.TRIGGER_SCHEDULE,
|
||||
NodeType.TRIGGER_PLUGIN,
|
||||
]
|
||||
|
||||
|
||||
class NodeExecutionType(StrEnum):
|
||||
@ -134,7 +122,6 @@ class WorkflowNodeExecutionMetadataKey(StrEnum):
|
||||
ERROR_STRATEGY = "error_strategy" # node in continue on error mode return the field
|
||||
LOOP_VARIABLE_MAP = "loop_variable_map" # single loop variable output
|
||||
DATASOURCE_INFO = "datasource_info"
|
||||
TRIGGER_INFO = "trigger_info"
|
||||
|
||||
|
||||
class WorkflowNodeExecutionStatus(StrEnum):
|
||||
|
||||
@ -147,4 +147,4 @@ class ExecutionLimitsLayer(GraphEngineLayer):
|
||||
self.logger.debug("Abort command sent to engine")
|
||||
|
||||
except Exception:
|
||||
self.logger.exception("Failed to send abort command: %s")
|
||||
self.logger.exception("Failed to send abort command")
|
||||
|
||||
@ -21,9 +21,6 @@ from core.workflow.nodes.question_classifier import QuestionClassifierNode
|
||||
from core.workflow.nodes.start import StartNode
|
||||
from core.workflow.nodes.template_transform import TemplateTransformNode
|
||||
from core.workflow.nodes.tool import ToolNode
|
||||
from core.workflow.nodes.trigger_plugin import TriggerPluginNode
|
||||
from core.workflow.nodes.trigger_schedule import TriggerScheduleNode
|
||||
from core.workflow.nodes.trigger_webhook import TriggerWebhookNode
|
||||
from core.workflow.nodes.variable_aggregator import VariableAggregatorNode
|
||||
from core.workflow.nodes.variable_assigner.v1 import VariableAssignerNode as VariableAssignerNodeV1
|
||||
from core.workflow.nodes.variable_assigner.v2 import VariableAssignerNode as VariableAssignerNodeV2
|
||||
@ -145,16 +142,4 @@ NODE_TYPE_CLASSES_MAPPING: Mapping[NodeType, Mapping[str, type[Node]]] = {
|
||||
LATEST_VERSION: KnowledgeIndexNode,
|
||||
"1": KnowledgeIndexNode,
|
||||
},
|
||||
NodeType.TRIGGER_WEBHOOK: {
|
||||
LATEST_VERSION: TriggerWebhookNode,
|
||||
"1": TriggerWebhookNode,
|
||||
},
|
||||
NodeType.TRIGGER_PLUGIN: {
|
||||
LATEST_VERSION: TriggerPluginNode,
|
||||
"1": TriggerPluginNode,
|
||||
},
|
||||
NodeType.TRIGGER_SCHEDULE: {
|
||||
LATEST_VERSION: TriggerScheduleNode,
|
||||
"1": TriggerScheduleNode,
|
||||
},
|
||||
}
|
||||
|
||||
@ -1,3 +0,0 @@
|
||||
from .trigger_plugin_node import TriggerPluginNode
|
||||
|
||||
__all__ = ["TriggerPluginNode"]
|
||||
@ -1,28 +0,0 @@
|
||||
from typing import Any, Optional
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
from core.workflow.enums import ErrorStrategy
|
||||
from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig
|
||||
|
||||
|
||||
class PluginTriggerData(BaseNodeData):
|
||||
"""Plugin trigger node data"""
|
||||
|
||||
title: str
|
||||
desc: Optional[str] = None
|
||||
plugin_id: str = Field(..., description="Plugin ID")
|
||||
provider_id: str = Field(..., description="Provider ID")
|
||||
trigger_name: str = Field(..., description="Trigger name")
|
||||
subscription_id: str = Field(..., description="Subscription ID")
|
||||
plugin_unique_identifier: str = Field(..., description="Plugin unique identifier")
|
||||
parameters: dict[str, Any] = Field(default_factory=dict, description="Trigger parameters")
|
||||
|
||||
# Error handling
|
||||
error_strategy: Optional[ErrorStrategy] = Field(
|
||||
default=ErrorStrategy.FAIL_BRANCH, description="Error handling strategy"
|
||||
)
|
||||
retry_config: RetryConfig = Field(default_factory=lambda: RetryConfig(), description="Retry configuration")
|
||||
default_value_dict: dict[str, Any] = Field(
|
||||
default_factory=dict, description="Default values for outputs when error occurs"
|
||||
)
|
||||
@ -1,153 +0,0 @@
|
||||
from collections.abc import Mapping
|
||||
from typing import Any, Optional
|
||||
|
||||
from elastic_transport import BaseNode
|
||||
|
||||
from core.plugin.impl.exc import PluginDaemonClientSideError, PluginInvokeError
|
||||
from core.plugin.utils.http_parser import deserialize_request
|
||||
from core.trigger.entities.api_entities import TriggerProviderSubscriptionApiEntity
|
||||
from core.trigger.trigger_manager import TriggerManager
|
||||
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
|
||||
from core.workflow.enums import ErrorStrategy
|
||||
from core.workflow.node_events.base import NodeRunResult
|
||||
from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig
|
||||
from core.workflow.nodes.node_mapping import NodeType
|
||||
from extensions.ext_storage import storage
|
||||
from models.provider_ids import TriggerProviderID
|
||||
from services.trigger.trigger_provider_service import TriggerProviderService
|
||||
|
||||
from .entities import PluginTriggerData
|
||||
|
||||
|
||||
class TriggerPluginNode(BaseNode):
|
||||
_node_type = NodeType.TRIGGER_PLUGIN
|
||||
|
||||
_node_data: PluginTriggerData
|
||||
|
||||
def init_node_data(self, data: Mapping[str, Any]) -> None:
|
||||
self._node_data = PluginTriggerData.model_validate(data)
|
||||
|
||||
def _get_error_strategy(self) -> Optional[ErrorStrategy]:
|
||||
return self._node_data.error_strategy
|
||||
|
||||
def _get_retry_config(self) -> RetryConfig:
|
||||
return self._node_data.retry_config
|
||||
|
||||
def _get_title(self) -> str:
|
||||
return self._node_data.title
|
||||
|
||||
def _get_description(self) -> Optional[str]:
|
||||
return self._node_data.desc
|
||||
|
||||
def _get_default_value_dict(self) -> dict[str, Any]:
|
||||
return self._node_data.default_value_dict
|
||||
|
||||
def get_base_node_data(self) -> BaseNodeData:
|
||||
return self._node_data
|
||||
|
||||
@classmethod
|
||||
def get_default_config(cls, filters: Optional[dict[str, Any]] = None) -> dict:
|
||||
return {
|
||||
"type": "plugin",
|
||||
"config": {
|
||||
"plugin_id": "",
|
||||
"provider_id": "",
|
||||
"trigger_name": "",
|
||||
"subscription_id": "",
|
||||
"parameters": {},
|
||||
},
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def version(cls) -> str:
|
||||
return "1"
|
||||
|
||||
def _run(self) -> NodeRunResult:
|
||||
"""
|
||||
Run the plugin trigger node.
|
||||
|
||||
This node invokes the trigger to convert request data into events
|
||||
and makes them available to downstream nodes.
|
||||
"""
|
||||
|
||||
# Get trigger data passed when workflow was triggered
|
||||
trigger_inputs = dict(self.graph_runtime_state.variable_pool.user_inputs)
|
||||
metadata = {
|
||||
WorkflowNodeExecutionMetadataKey.TRIGGER_INFO: {
|
||||
**trigger_inputs,
|
||||
"provider_id": self._node_data.provider_id,
|
||||
"trigger_name": self._node_data.trigger_name,
|
||||
"plugin_unique_identifier": self._node_data.plugin_unique_identifier,
|
||||
},
|
||||
}
|
||||
|
||||
request_id = trigger_inputs.get("request_id")
|
||||
trigger_name = trigger_inputs.get("trigger_name", "")
|
||||
subscription_id = trigger_inputs.get("subscription_id", "")
|
||||
|
||||
if not request_id or not subscription_id:
|
||||
return NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.FAILED,
|
||||
inputs=trigger_inputs,
|
||||
outputs={"error": "No request ID or subscription ID available"},
|
||||
)
|
||||
try:
|
||||
subscription: TriggerProviderSubscriptionApiEntity | None = TriggerProviderService.get_subscription_by_id(
|
||||
tenant_id=self.tenant_id, subscription_id=subscription_id
|
||||
)
|
||||
if not subscription:
|
||||
return NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.FAILED,
|
||||
inputs=trigger_inputs,
|
||||
outputs={"error": f"Invalid subscription {subscription_id} not found"},
|
||||
)
|
||||
except Exception as e:
|
||||
return NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.FAILED,
|
||||
inputs=trigger_inputs,
|
||||
outputs={"error": f"Failed to get subscription: {str(e)}"},
|
||||
)
|
||||
|
||||
try:
|
||||
request = deserialize_request(storage.load_once(f"triggers/{request_id}"))
|
||||
parameters = self._node_data.parameters if hasattr(self, "_node_data") and self._node_data else {}
|
||||
invoke_response = TriggerManager.invoke_trigger(
|
||||
tenant_id=self.tenant_id,
|
||||
user_id=self.user_id,
|
||||
provider_id=TriggerProviderID(subscription.provider),
|
||||
trigger_name=trigger_name,
|
||||
parameters=parameters,
|
||||
credentials=subscription.credentials,
|
||||
credential_type=subscription.credential_type,
|
||||
request=request,
|
||||
)
|
||||
outputs = invoke_response.event.variables or {}
|
||||
return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, inputs=trigger_inputs, outputs=outputs)
|
||||
except PluginInvokeError as e:
|
||||
return NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.FAILED,
|
||||
inputs=trigger_inputs,
|
||||
metadata=metadata,
|
||||
error="An error occurred in the plugin, "
|
||||
f"please contact the author of {subscription.provider} for help, "
|
||||
f"error type: {e.get_error_type()}, "
|
||||
f"error details: {e.get_error_message()}",
|
||||
error_type=type(e).__name__,
|
||||
)
|
||||
except PluginDaemonClientSideError as e:
|
||||
return NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.FAILED,
|
||||
inputs=trigger_inputs,
|
||||
metadata=metadata,
|
||||
error=f"Failed to invoke trigger, error: {e.description}",
|
||||
error_type=type(e).__name__,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
return NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.FAILED,
|
||||
inputs=trigger_inputs,
|
||||
metadata=metadata,
|
||||
error=f"Failed to invoke trigger: {str(e)}",
|
||||
error_type=type(e).__name__,
|
||||
)
|
||||
@ -1,3 +0,0 @@
|
||||
from core.workflow.nodes.trigger_schedule.trigger_schedule_node import TriggerScheduleNode
|
||||
|
||||
__all__ = ["TriggerScheduleNode"]
|
||||
@ -1,51 +0,0 @@
|
||||
from typing import Literal, Optional, Union
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from core.workflow.nodes.base import BaseNodeData
|
||||
|
||||
|
||||
class TriggerScheduleNodeData(BaseNodeData):
|
||||
"""
|
||||
Trigger Schedule Node Data
|
||||
"""
|
||||
|
||||
mode: str = Field(default="visual", description="Schedule mode: visual or cron")
|
||||
frequency: Optional[str] = Field(
|
||||
default=None, description="Frequency for visual mode: hourly, daily, weekly, monthly"
|
||||
)
|
||||
cron_expression: Optional[str] = Field(default=None, description="Cron expression for cron mode")
|
||||
visual_config: Optional[dict] = Field(default=None, description="Visual configuration details")
|
||||
timezone: str = Field(default="UTC", description="Timezone for schedule execution")
|
||||
|
||||
|
||||
class ScheduleConfig(BaseModel):
|
||||
node_id: str
|
||||
cron_expression: str
|
||||
timezone: str = "UTC"
|
||||
|
||||
|
||||
class SchedulePlanUpdate(BaseModel):
|
||||
node_id: Optional[str] = None
|
||||
cron_expression: Optional[str] = None
|
||||
timezone: Optional[str] = None
|
||||
|
||||
|
||||
class VisualConfig(BaseModel):
|
||||
"""Visual configuration for schedule trigger"""
|
||||
|
||||
# For hourly frequency
|
||||
on_minute: Optional[int] = Field(default=0, ge=0, le=59, description="Minute of the hour (0-59)")
|
||||
|
||||
# For daily, weekly, monthly frequencies
|
||||
time: Optional[str] = Field(default="12:00 AM", description="Time in 12-hour format (e.g., '2:30 PM')")
|
||||
|
||||
# For weekly frequency
|
||||
weekdays: Optional[list[Literal["sun", "mon", "tue", "wed", "thu", "fri", "sat"]]] = Field(
|
||||
default=None, description="List of weekdays to run on"
|
||||
)
|
||||
|
||||
# For monthly frequency
|
||||
monthly_days: Optional[list[Union[int, Literal["last"]]]] = Field(
|
||||
default=None, description="Days of month to run on (1-31 or 'last')"
|
||||
)
|
||||
@ -1,31 +0,0 @@
|
||||
from core.workflow.nodes.base.exc import BaseNodeError
|
||||
|
||||
|
||||
class ScheduleNodeError(BaseNodeError):
|
||||
"""Base schedule node error."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class ScheduleNotFoundError(ScheduleNodeError):
|
||||
"""Schedule not found error."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class ScheduleConfigError(ScheduleNodeError):
|
||||
"""Schedule configuration error."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class ScheduleExecutionError(ScheduleNodeError):
|
||||
"""Schedule execution error."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class TenantOwnerNotFoundError(ScheduleExecutionError):
|
||||
"""Tenant owner not found error for schedule execution."""
|
||||
|
||||
pass
|
||||
@ -1,63 +0,0 @@
|
||||
from collections.abc import Mapping
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any, Optional
|
||||
|
||||
from elastic_transport import BaseNode
|
||||
|
||||
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
|
||||
from core.workflow.enums import ErrorStrategy, NodeType
|
||||
from core.workflow.node_events.base import NodeRunResult
|
||||
from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig
|
||||
from core.workflow.nodes.trigger_schedule.entities import TriggerScheduleNodeData
|
||||
|
||||
|
||||
class TriggerScheduleNode(BaseNode):
|
||||
_node_type = NodeType.TRIGGER_SCHEDULE
|
||||
|
||||
_node_data: TriggerScheduleNodeData
|
||||
|
||||
def init_node_data(self, data: Mapping[str, Any]) -> None:
|
||||
self._node_data = TriggerScheduleNodeData(**data)
|
||||
|
||||
def _get_error_strategy(self) -> Optional[ErrorStrategy]:
|
||||
return self._node_data.error_strategy
|
||||
|
||||
def _get_retry_config(self) -> RetryConfig:
|
||||
return self._node_data.retry_config
|
||||
|
||||
def _get_title(self) -> str:
|
||||
return self._node_data.title
|
||||
|
||||
def _get_description(self) -> Optional[str]:
|
||||
return self._node_data.desc
|
||||
|
||||
def _get_default_value_dict(self) -> dict[str, Any]:
|
||||
return self._node_data.default_value_dict
|
||||
|
||||
def get_base_node_data(self) -> BaseNodeData:
|
||||
return self._node_data
|
||||
|
||||
@classmethod
|
||||
def version(cls) -> str:
|
||||
return "1"
|
||||
|
||||
@classmethod
|
||||
def get_default_config(cls, filters: Optional[dict] = None) -> dict:
|
||||
return {
|
||||
"type": "trigger-schedule",
|
||||
"config": {
|
||||
"mode": "visual",
|
||||
"frequency": "daily",
|
||||
"visual_config": {"time": "12:00 AM", "on_minute": 0, "weekdays": ["sun"], "monthly_days": [1]},
|
||||
"timezone": "UTC",
|
||||
},
|
||||
}
|
||||
|
||||
def _run(self) -> NodeRunResult:
|
||||
current_time = datetime.now(UTC)
|
||||
node_outputs = {"current_time": current_time.isoformat()}
|
||||
|
||||
return NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
outputs=node_outputs,
|
||||
)
|
||||
@ -1,3 +0,0 @@
|
||||
from .node import TriggerWebhookNode
|
||||
|
||||
__all__ = ["TriggerWebhookNode"]
|
||||
@ -1,79 +0,0 @@
|
||||
from collections.abc import Sequence
|
||||
from enum import StrEnum
|
||||
from typing import Literal, Optional
|
||||
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
|
||||
from core.workflow.nodes.base import BaseNodeData
|
||||
|
||||
|
||||
class Method(StrEnum):
|
||||
GET = "get"
|
||||
POST = "post"
|
||||
HEAD = "head"
|
||||
PATCH = "patch"
|
||||
PUT = "put"
|
||||
DELETE = "delete"
|
||||
|
||||
|
||||
class ContentType(StrEnum):
|
||||
JSON = "application/json"
|
||||
FORM_DATA = "multipart/form-data"
|
||||
FORM_URLENCODED = "application/x-www-form-urlencoded"
|
||||
TEXT = "text/plain"
|
||||
BINARY = "application/octet-stream"
|
||||
|
||||
|
||||
class WebhookParameter(BaseModel):
|
||||
"""Parameter definition for headers, query params, or body."""
|
||||
|
||||
name: str
|
||||
required: bool = False
|
||||
|
||||
|
||||
class WebhookBodyParameter(BaseModel):
|
||||
"""Body parameter with type information."""
|
||||
|
||||
name: str
|
||||
type: Literal[
|
||||
"string",
|
||||
"number",
|
||||
"boolean",
|
||||
"object",
|
||||
"array[string]",
|
||||
"array[number]",
|
||||
"array[boolean]",
|
||||
"array[object]",
|
||||
"file",
|
||||
] = "string"
|
||||
required: bool = False
|
||||
|
||||
|
||||
class WebhookData(BaseNodeData):
|
||||
"""
|
||||
Webhook Node Data.
|
||||
"""
|
||||
|
||||
class SyncMode(StrEnum):
|
||||
SYNC = "async" # only support
|
||||
|
||||
method: Method = Method.GET
|
||||
content_type: ContentType = Field(default=ContentType.JSON)
|
||||
headers: Sequence[WebhookParameter] = Field(default_factory=list)
|
||||
params: Sequence[WebhookParameter] = Field(default_factory=list) # query parameters
|
||||
body: Sequence[WebhookBodyParameter] = Field(default_factory=list)
|
||||
|
||||
@field_validator("method", mode="before")
|
||||
@classmethod
|
||||
def normalize_method(cls, v) -> str:
|
||||
"""Normalize HTTP method to lowercase to support both uppercase and lowercase input."""
|
||||
if isinstance(v, str):
|
||||
return v.lower()
|
||||
return v
|
||||
|
||||
status_code: int = 200 # Expected status code for response
|
||||
response_body: str = "" # Template for response body
|
||||
|
||||
# Webhook specific fields (not from client data, set internally)
|
||||
webhook_id: Optional[str] = None # Set when webhook trigger is created
|
||||
timeout: int = 30 # Timeout in seconds to wait for webhook response
|
||||
@ -1,25 +0,0 @@
|
||||
from core.workflow.nodes.base.exc import BaseNodeError
|
||||
|
||||
|
||||
class WebhookNodeError(BaseNodeError):
|
||||
"""Base webhook node error."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class WebhookTimeoutError(WebhookNodeError):
|
||||
"""Webhook timeout error."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class WebhookNotFoundError(WebhookNodeError):
|
||||
"""Webhook not found error."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class WebhookConfigError(WebhookNodeError):
|
||||
"""Webhook configuration error."""
|
||||
|
||||
pass
|
||||
@ -1,127 +0,0 @@
|
||||
from collections.abc import Mapping
|
||||
from typing import Any, Optional
|
||||
|
||||
from elastic_transport import BaseNode
|
||||
|
||||
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
|
||||
from core.workflow.enums import ErrorStrategy, NodeType
|
||||
from core.workflow.node_events.base import NodeRunResult
|
||||
from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig
|
||||
|
||||
from .entities import ContentType, WebhookData
|
||||
|
||||
|
||||
class TriggerWebhookNode(BaseNode):
|
||||
_node_type = NodeType.TRIGGER_WEBHOOK
|
||||
|
||||
_node_data: WebhookData
|
||||
|
||||
def init_node_data(self, data: Mapping[str, Any]) -> None:
|
||||
self._node_data = WebhookData.model_validate(data)
|
||||
|
||||
def _get_error_strategy(self) -> Optional[ErrorStrategy]:
|
||||
return self._node_data.error_strategy
|
||||
|
||||
def _get_retry_config(self) -> RetryConfig:
|
||||
return self._node_data.retry_config
|
||||
|
||||
def _get_title(self) -> str:
|
||||
return self._node_data.title
|
||||
|
||||
def _get_description(self) -> Optional[str]:
|
||||
return self._node_data.desc
|
||||
|
||||
def _get_default_value_dict(self) -> dict[str, Any]:
|
||||
return self._node_data.default_value_dict
|
||||
|
||||
def get_base_node_data(self) -> BaseNodeData:
|
||||
return self._node_data
|
||||
|
||||
@classmethod
|
||||
def get_default_config(cls, filters: Optional[dict[str, Any]] = None) -> dict:
|
||||
return {
|
||||
"type": "webhook",
|
||||
"config": {
|
||||
"method": "get",
|
||||
"content_type": "application/json",
|
||||
"headers": [],
|
||||
"params": [],
|
||||
"body": [],
|
||||
"async_mode": True,
|
||||
"status_code": 200,
|
||||
"response_body": "",
|
||||
"timeout": 30,
|
||||
},
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def version(cls) -> str:
|
||||
return "1"
|
||||
|
||||
def _run(self) -> NodeRunResult:
|
||||
"""
|
||||
Run the webhook node.
|
||||
|
||||
Like the start node, this simply takes the webhook data from the variable pool
|
||||
and makes it available to downstream nodes. The actual webhook handling
|
||||
happens in the trigger controller.
|
||||
"""
|
||||
# Get webhook data from variable pool (injected by Celery task)
|
||||
webhook_inputs = dict(self.graph_runtime_state.variable_pool.user_inputs)
|
||||
|
||||
# Extract webhook-specific outputs based on node configuration
|
||||
outputs = self._extract_configured_outputs(webhook_inputs)
|
||||
|
||||
return NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
inputs=webhook_inputs,
|
||||
outputs=outputs,
|
||||
)
|
||||
|
||||
def _extract_configured_outputs(self, webhook_inputs: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Extract outputs based on node configuration from webhook inputs."""
|
||||
outputs = {}
|
||||
|
||||
# Get the raw webhook data (should be injected by Celery task)
|
||||
webhook_data = webhook_inputs.get("webhook_data", {})
|
||||
|
||||
# Extract configured headers (case-insensitive)
|
||||
webhook_headers = webhook_data.get("headers", {})
|
||||
webhook_headers_lower = {k.lower(): v for k, v in webhook_headers.items()}
|
||||
|
||||
for header in self._node_data.headers:
|
||||
header_name = header.name
|
||||
# Try exact match first, then case-insensitive match
|
||||
value = webhook_headers.get(header_name) or webhook_headers_lower.get(header_name.lower())
|
||||
outputs[header_name] = value
|
||||
|
||||
# Extract configured query parameters
|
||||
for param in self._node_data.params:
|
||||
param_name = param.name
|
||||
outputs[param_name] = webhook_data.get("query_params", {}).get(param_name)
|
||||
|
||||
# Extract configured body parameters
|
||||
for body_param in self._node_data.body:
|
||||
param_name = body_param.name
|
||||
param_type = body_param.type
|
||||
|
||||
if self._node_data.content_type == ContentType.TEXT:
|
||||
# For text/plain, the entire body is a single string parameter
|
||||
outputs[param_name] = str(webhook_data.get("body", {}).get("raw", ""))
|
||||
continue
|
||||
elif self._node_data.content_type == ContentType.BINARY:
|
||||
outputs[param_name] = webhook_data.get("body", {}).get("raw", b"")
|
||||
continue
|
||||
|
||||
if param_type == "file":
|
||||
# Get File object (already processed by webhook controller)
|
||||
file_obj = webhook_data.get("files", {}).get(param_name)
|
||||
outputs[param_name] = file_obj
|
||||
else:
|
||||
# Get regular body parameter
|
||||
outputs[param_name] = webhook_data.get("body", {}).get(param_name)
|
||||
|
||||
# Include raw webhook data for debugging/advanced use
|
||||
outputs["_webhook_raw"] = webhook_data
|
||||
|
||||
return outputs
|
||||
@ -30,41 +30,9 @@ if [[ "${MODE}" == "worker" ]]; then
|
||||
CONCURRENCY_OPTION="-c ${CELERY_WORKER_AMOUNT:-1}"
|
||||
fi
|
||||
|
||||
# Configure queues based on edition if not explicitly set
|
||||
if [[ -z "${CELERY_QUEUES}" ]]; then
|
||||
if [[ "${EDITION}" == "CLOUD" ]]; then
|
||||
# Cloud edition: separate queues for dataset and trigger tasks
|
||||
DEFAULT_QUEUES="dataset,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor"
|
||||
else
|
||||
# Community edition (SELF_HOSTED): dataset, pipeline and workflow have separate queues
|
||||
DEFAULT_QUEUES="dataset,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor"
|
||||
fi
|
||||
else
|
||||
DEFAULT_QUEUES="${CELERY_QUEUES}"
|
||||
fi
|
||||
|
||||
# Support for Kubernetes deployment with specific queue workers
|
||||
# Environment variables that can be set:
|
||||
# - CELERY_WORKER_QUEUES: Comma-separated list of queues (overrides CELERY_QUEUES)
|
||||
# - CELERY_WORKER_CONCURRENCY: Number of worker processes (overrides CELERY_WORKER_AMOUNT)
|
||||
# - CELERY_WORKER_POOL: Pool implementation (overrides CELERY_WORKER_CLASS)
|
||||
|
||||
if [[ -n "${CELERY_WORKER_QUEUES}" ]]; then
|
||||
DEFAULT_QUEUES="${CELERY_WORKER_QUEUES}"
|
||||
echo "Using CELERY_WORKER_QUEUES: ${DEFAULT_QUEUES}"
|
||||
fi
|
||||
|
||||
if [[ -n "${CELERY_WORKER_CONCURRENCY}" ]]; then
|
||||
CONCURRENCY_OPTION="-c ${CELERY_WORKER_CONCURRENCY}"
|
||||
echo "Using CELERY_WORKER_CONCURRENCY: ${CELERY_WORKER_CONCURRENCY}"
|
||||
fi
|
||||
|
||||
WORKER_POOL="${CELERY_WORKER_POOL:-${CELERY_WORKER_CLASS:-gevent}}"
|
||||
echo "Starting Celery worker with queues: ${DEFAULT_QUEUES}"
|
||||
|
||||
exec celery -A app.celery worker -P ${WORKER_POOL} $CONCURRENCY_OPTION \
|
||||
exec celery -A celery_entrypoint.celery worker -P ${CELERY_WORKER_CLASS:-gevent} $CONCURRENCY_OPTION \
|
||||
--max-tasks-per-child ${MAX_TASKS_PER_CHILD:-50} --loglevel ${LOG_LEVEL:-INFO} \
|
||||
-Q ${DEFAULT_QUEUES}
|
||||
-Q ${CELERY_QUEUES:-dataset,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation}
|
||||
|
||||
elif [[ "${MODE}" == "beat" ]]; then
|
||||
exec celery -A app.celery beat --loglevel ${LOG_LEVEL:-INFO}
|
||||
|
||||
@ -6,18 +6,12 @@ from .create_site_record_when_app_created import handle as handle_create_site_re
|
||||
from .delete_tool_parameters_cache_when_sync_draft_workflow import (
|
||||
handle as handle_delete_tool_parameters_cache_when_sync_draft_workflow,
|
||||
)
|
||||
from .sync_plugin_trigger_when_app_created import handle as handle_sync_plugin_trigger_when_app_created
|
||||
from .sync_webhook_when_app_created import handle as handle_sync_webhook_when_app_created
|
||||
from .sync_workflow_schedule_when_app_published import handle as handle_sync_workflow_schedule_when_app_published
|
||||
from .update_app_dataset_join_when_app_model_config_updated import (
|
||||
handle as handle_update_app_dataset_join_when_app_model_config_updated,
|
||||
)
|
||||
from .update_app_dataset_join_when_app_published_workflow_updated import (
|
||||
handle as handle_update_app_dataset_join_when_app_published_workflow_updated,
|
||||
)
|
||||
from .update_app_triggers_when_app_published_workflow_updated import (
|
||||
handle as handle_update_app_triggers_when_app_published_workflow_updated,
|
||||
)
|
||||
|
||||
# Consolidated handler replaces both deduct_quota_when_message_created and
|
||||
# update_provider_last_used_at_when_message_created
|
||||
@ -30,11 +24,7 @@ __all__ = [
|
||||
"handle_create_installed_app_when_app_created",
|
||||
"handle_create_site_record_when_app_created",
|
||||
"handle_delete_tool_parameters_cache_when_sync_draft_workflow",
|
||||
"handle_sync_plugin_trigger_when_app_created",
|
||||
"handle_sync_webhook_when_app_created",
|
||||
"handle_sync_workflow_schedule_when_app_published",
|
||||
"handle_update_app_dataset_join_when_app_model_config_updated",
|
||||
"handle_update_app_dataset_join_when_app_published_workflow_updated",
|
||||
"handle_update_app_triggers_when_app_published_workflow_updated",
|
||||
"handle_update_provider_when_message_created",
|
||||
]
|
||||
|
||||
@ -1,22 +0,0 @@
|
||||
import logging
|
||||
|
||||
from events.app_event import app_draft_workflow_was_synced
|
||||
from models.model import App, AppMode
|
||||
from models.workflow import Workflow
|
||||
from services.workflow_plugin_trigger_service import WorkflowPluginTriggerService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@app_draft_workflow_was_synced.connect
|
||||
def handle(sender, synced_draft_workflow: Workflow, **kwargs):
|
||||
"""
|
||||
While creating a workflow or updating a workflow, we may need to sync
|
||||
its plugin trigger relationships in DB.
|
||||
"""
|
||||
app: App = sender
|
||||
if app.mode != AppMode.WORKFLOW.value:
|
||||
# only handle workflow app, chatflow is not supported yet
|
||||
return
|
||||
|
||||
WorkflowPluginTriggerService.sync_plugin_trigger_relationships(app, synced_draft_workflow)
|
||||
@ -1,22 +0,0 @@
|
||||
import logging
|
||||
|
||||
from events.app_event import app_draft_workflow_was_synced
|
||||
from models.model import App, AppMode
|
||||
from models.workflow import Workflow
|
||||
from services.webhook_service import WebhookService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@app_draft_workflow_was_synced.connect
|
||||
def handle(sender, synced_draft_workflow: Workflow, **kwargs):
|
||||
"""
|
||||
While creating a workflow or updating a workflow, we may need to sync
|
||||
its webhook relationships in DB.
|
||||
"""
|
||||
app: App = sender
|
||||
if app.mode != AppMode.WORKFLOW.value:
|
||||
# only handle workflow app, chatflow is not supported yet
|
||||
return
|
||||
|
||||
WebhookService.sync_webhook_relationships(app, synced_draft_workflow)
|
||||
@ -1,86 +0,0 @@
|
||||
import logging
|
||||
from typing import Optional, cast
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.workflow.nodes.trigger_schedule.entities import SchedulePlanUpdate
|
||||
from events.app_event import app_published_workflow_was_updated
|
||||
from extensions.ext_database import db
|
||||
from models import AppMode, Workflow, WorkflowSchedulePlan
|
||||
from services.schedule_service import ScheduleService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@app_published_workflow_was_updated.connect
|
||||
def handle(sender, **kwargs):
|
||||
"""
|
||||
Handle app published workflow update event to sync workflow_schedule_plans table.
|
||||
|
||||
When a workflow is published, this handler will:
|
||||
1. Extract schedule trigger nodes from the workflow graph
|
||||
2. Compare with existing workflow_schedule_plans records
|
||||
3. Create/update/delete schedule plans as needed
|
||||
"""
|
||||
app = sender
|
||||
if app.mode != AppMode.WORKFLOW.value:
|
||||
return
|
||||
|
||||
published_workflow = kwargs.get("published_workflow")
|
||||
published_workflow = cast(Workflow, published_workflow)
|
||||
|
||||
sync_schedule_from_workflow(tenant_id=app.tenant_id, app_id=app.id, workflow=published_workflow)
|
||||
|
||||
|
||||
def sync_schedule_from_workflow(tenant_id: str, app_id: str, workflow: Workflow) -> Optional[WorkflowSchedulePlan]:
|
||||
"""
|
||||
Sync schedule plan from workflow graph configuration.
|
||||
|
||||
Args:
|
||||
tenant_id: Tenant ID
|
||||
app_id: App ID
|
||||
workflow: Published workflow instance
|
||||
|
||||
Returns:
|
||||
Updated or created WorkflowSchedulePlan, or None if no schedule node
|
||||
"""
|
||||
with Session(db.engine) as session:
|
||||
schedule_config = ScheduleService.extract_schedule_config(workflow)
|
||||
|
||||
existing_plan = session.scalar(
|
||||
select(WorkflowSchedulePlan).where(
|
||||
WorkflowSchedulePlan.tenant_id == tenant_id,
|
||||
WorkflowSchedulePlan.app_id == app_id,
|
||||
)
|
||||
)
|
||||
|
||||
if not schedule_config:
|
||||
if existing_plan:
|
||||
logger.info("No schedule node in workflow for app %s, removing schedule plan", app_id)
|
||||
ScheduleService.delete_schedule(session=session, schedule_id=existing_plan.id)
|
||||
session.commit()
|
||||
return None
|
||||
|
||||
if existing_plan:
|
||||
updates = SchedulePlanUpdate(
|
||||
node_id=schedule_config.node_id,
|
||||
cron_expression=schedule_config.cron_expression,
|
||||
timezone=schedule_config.timezone,
|
||||
)
|
||||
updated_plan = ScheduleService.update_schedule(
|
||||
session=session,
|
||||
schedule_id=existing_plan.id,
|
||||
updates=updates,
|
||||
)
|
||||
session.commit()
|
||||
return updated_plan
|
||||
else:
|
||||
new_plan = ScheduleService.create_schedule(
|
||||
session=session,
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
config=schedule_config,
|
||||
)
|
||||
session.commit()
|
||||
return new_plan
|
||||
@ -1,111 +0,0 @@
|
||||
from typing import cast
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.workflow.nodes import NodeType
|
||||
from events.app_event import app_published_workflow_was_updated
|
||||
from extensions.ext_database import db
|
||||
from models import AppMode, AppTrigger, AppTriggerStatus, Workflow
|
||||
|
||||
|
||||
@app_published_workflow_was_updated.connect
|
||||
def handle(sender, **kwargs):
|
||||
"""
|
||||
Handle app published workflow update event to sync app_triggers table.
|
||||
|
||||
When a workflow is published, this handler will:
|
||||
1. Extract trigger nodes from the workflow graph
|
||||
2. Compare with existing app_triggers records
|
||||
3. Add new triggers and remove obsolete ones
|
||||
"""
|
||||
app = sender
|
||||
if app.mode != AppMode.WORKFLOW.value:
|
||||
return
|
||||
|
||||
published_workflow = kwargs.get("published_workflow")
|
||||
published_workflow = cast(Workflow, published_workflow)
|
||||
# Extract trigger info from workflow
|
||||
trigger_infos = get_trigger_infos_from_workflow(published_workflow)
|
||||
|
||||
with Session(db.engine) as session:
|
||||
# Get existing app triggers
|
||||
existing_triggers = (
|
||||
session.execute(
|
||||
select(AppTrigger).where(AppTrigger.tenant_id == app.tenant_id, AppTrigger.app_id == app.id)
|
||||
)
|
||||
.scalars()
|
||||
.all()
|
||||
)
|
||||
|
||||
# Convert existing triggers to dict for easy lookup
|
||||
existing_triggers_map = {trigger.node_id: trigger for trigger in existing_triggers}
|
||||
|
||||
# Get current and new node IDs
|
||||
existing_node_ids = set(existing_triggers_map.keys())
|
||||
new_node_ids = {info["node_id"] for info in trigger_infos}
|
||||
|
||||
# Calculate changes
|
||||
added_node_ids = new_node_ids - existing_node_ids
|
||||
removed_node_ids = existing_node_ids - new_node_ids
|
||||
|
||||
# Remove obsolete triggers
|
||||
for node_id in removed_node_ids:
|
||||
session.delete(existing_triggers_map[node_id])
|
||||
|
||||
for trigger_info in trigger_infos:
|
||||
node_id = trigger_info["node_id"]
|
||||
|
||||
if node_id in added_node_ids:
|
||||
# Create new trigger
|
||||
app_trigger = AppTrigger(
|
||||
tenant_id=app.tenant_id,
|
||||
app_id=app.id,
|
||||
trigger_type=trigger_info["node_type"],
|
||||
title=trigger_info["node_title"],
|
||||
node_id=node_id,
|
||||
provider_name=trigger_info.get("node_provider_name", ""),
|
||||
status=AppTriggerStatus.DISABLED,
|
||||
)
|
||||
session.add(app_trigger)
|
||||
elif node_id in existing_node_ids:
|
||||
# Update existing trigger if needed
|
||||
existing_trigger = existing_triggers_map[node_id]
|
||||
new_title = trigger_info["node_title"]
|
||||
if new_title and existing_trigger.title != new_title:
|
||||
existing_trigger.title = new_title
|
||||
session.add(existing_trigger)
|
||||
|
||||
session.commit()
|
||||
|
||||
|
||||
def get_trigger_infos_from_workflow(published_workflow: Workflow) -> list[dict]:
|
||||
"""
|
||||
Extract trigger node information from the workflow graph.
|
||||
|
||||
Returns:
|
||||
List of trigger info dictionaries containing:
|
||||
- node_type: The type of the trigger node ('trigger-webhook', 'trigger-schedule', 'trigger-plugin')
|
||||
- node_id: The node ID in the workflow
|
||||
- node_title: The title of the node
|
||||
- node_provider_name: The name of the node's provider, only for plugin
|
||||
"""
|
||||
graph = published_workflow.graph_dict
|
||||
if not graph:
|
||||
return []
|
||||
|
||||
nodes = graph.get("nodes", [])
|
||||
trigger_types = {NodeType.TRIGGER_WEBHOOK.value, NodeType.TRIGGER_SCHEDULE.value, NodeType.TRIGGER_PLUGIN.value}
|
||||
|
||||
trigger_infos = [
|
||||
{
|
||||
"node_type": node.get("data", {}).get("type"),
|
||||
"node_id": node.get("id"),
|
||||
"node_title": node.get("data", {}).get("title"),
|
||||
"node_provider_name": node.get("data", {}).get("provider_name"),
|
||||
}
|
||||
for node in nodes
|
||||
if node.get("data", {}).get("type") in trigger_types
|
||||
]
|
||||
|
||||
return trigger_infos
|
||||
@ -12,7 +12,6 @@ def init_app(app: DifyApp):
|
||||
from controllers.inner_api import bp as inner_api_bp
|
||||
from controllers.mcp import bp as mcp_bp
|
||||
from controllers.service_api import bp as service_api_bp
|
||||
from controllers.trigger import bp as trigger_bp
|
||||
from controllers.web import bp as web_bp
|
||||
|
||||
CORS(
|
||||
@ -51,11 +50,3 @@ def init_app(app: DifyApp):
|
||||
|
||||
app.register_blueprint(inner_api_bp)
|
||||
app.register_blueprint(mcp_bp)
|
||||
|
||||
# Register trigger blueprint with CORS for webhook calls
|
||||
CORS(
|
||||
trigger_bp,
|
||||
allow_headers=["Content-Type", "Authorization", "X-App-Code"],
|
||||
methods=["GET", "PUT", "POST", "DELETE", "OPTIONS", "PATCH", "HEAD"],
|
||||
)
|
||||
app.register_blueprint(trigger_bp)
|
||||
|
||||
@ -96,9 +96,7 @@ def init_app(app: DifyApp) -> Celery:
|
||||
celery_app.set_default()
|
||||
app.extensions["celery"] = celery_app
|
||||
|
||||
imports = [
|
||||
"tasks.async_workflow_tasks", # trigger workers
|
||||
]
|
||||
imports = []
|
||||
day = dify_config.CELERY_BEAT_SCHEDULER_TIME
|
||||
|
||||
# if you add a new task, please add the switch to CeleryScheduleTasksConfig
|
||||
@ -158,12 +156,6 @@ def init_app(app: DifyApp) -> Celery:
|
||||
"task": "schedule.clean_workflow_runlogs_precise.clean_workflow_runlogs_precise",
|
||||
"schedule": crontab(minute="0", hour="2"),
|
||||
}
|
||||
if dify_config.ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK:
|
||||
imports.append("schedule.workflow_schedule_task")
|
||||
beat_schedule["workflow_schedule_task"] = {
|
||||
"task": "schedule.workflow_schedule_task.poll_workflow_schedules",
|
||||
"schedule": timedelta(minutes=dify_config.WORKFLOW_SCHEDULE_POLLER_INTERVAL),
|
||||
}
|
||||
celery_app.conf.update(beat_schedule=beat_schedule, imports=imports)
|
||||
|
||||
return celery_app
|
||||
|
||||
@ -23,7 +23,6 @@ def init_app(app: DifyApp):
|
||||
reset_password,
|
||||
setup_datasource_oauth_client,
|
||||
setup_system_tool_oauth_client,
|
||||
setup_system_trigger_oauth_client,
|
||||
transform_datasource_credentials,
|
||||
upgrade_db,
|
||||
vdb_migrate,
|
||||
@ -48,7 +47,6 @@ def init_app(app: DifyApp):
|
||||
clear_orphaned_file_records,
|
||||
remove_orphaned_files_on_storage,
|
||||
setup_system_tool_oauth_client,
|
||||
setup_system_trigger_oauth_client,
|
||||
cleanup_orphaned_draft_variables,
|
||||
migrate_oss,
|
||||
setup_datasource_oauth_client,
|
||||
|
||||
@ -8,7 +8,6 @@ workflow_run_for_log_fields = {
|
||||
"id": fields.String,
|
||||
"version": fields.String,
|
||||
"status": fields.String,
|
||||
"triggered_from": fields.String,
|
||||
"error": fields.String,
|
||||
"elapsed_time": fields.Float,
|
||||
"total_tokens": fields.Integer,
|
||||
|
||||
@ -1,25 +0,0 @@
|
||||
from flask_restx import fields
|
||||
|
||||
trigger_fields = {
|
||||
"id": fields.String,
|
||||
"trigger_type": fields.String,
|
||||
"title": fields.String,
|
||||
"node_id": fields.String,
|
||||
"provider_name": fields.String,
|
||||
"icon": fields.String,
|
||||
"status": fields.String,
|
||||
"created_at": fields.DateTime(dt_format="iso8601"),
|
||||
"updated_at": fields.DateTime(dt_format="iso8601"),
|
||||
}
|
||||
|
||||
triggers_list_fields = {"data": fields.List(fields.Nested(trigger_fields))}
|
||||
|
||||
|
||||
webhook_trigger_fields = {
|
||||
"id": fields.String,
|
||||
"webhook_id": fields.String,
|
||||
"webhook_url": fields.String,
|
||||
"webhook_debug_url": fields.String,
|
||||
"node_id": fields.String,
|
||||
"created_at": fields.DateTime(dt_format="iso8601"),
|
||||
}
|
||||
@ -1,109 +0,0 @@
|
||||
from datetime import UTC, datetime
|
||||
from typing import Optional
|
||||
|
||||
import pytz
|
||||
from croniter import croniter
|
||||
|
||||
|
||||
def calculate_next_run_at(
|
||||
cron_expression: str,
|
||||
timezone: str,
|
||||
base_time: Optional[datetime] = None,
|
||||
) -> datetime:
|
||||
"""
|
||||
Calculate the next run time for a cron expression in a specific timezone.
|
||||
|
||||
Args:
|
||||
cron_expression: Standard 5-field cron expression or predefined expression
|
||||
timezone: Timezone string (e.g., 'UTC', 'America/New_York')
|
||||
base_time: Base time to calculate from (defaults to current UTC time)
|
||||
|
||||
Returns:
|
||||
Next run time in UTC
|
||||
|
||||
Note:
|
||||
Supports enhanced cron syntax including:
|
||||
- Month abbreviations: JAN, FEB, MAR-JUN, JAN,JUN,DEC
|
||||
- Day abbreviations: MON, TUE, MON-FRI, SUN,WED,FRI
|
||||
- Predefined expressions: @daily, @weekly, @monthly, @yearly, @hourly
|
||||
- Special characters: ? wildcard, L (last day), Sunday as 7
|
||||
- Standard 5-field format only (minute hour day month dayOfWeek)
|
||||
"""
|
||||
# Validate cron expression format to match frontend behavior
|
||||
parts = cron_expression.strip().split()
|
||||
|
||||
# Support both 5-field format and predefined expressions (matching frontend)
|
||||
if len(parts) != 5 and not cron_expression.startswith('@'):
|
||||
raise ValueError(
|
||||
f"Cron expression must have exactly 5 fields or be a predefined expression "
|
||||
f"(@daily, @weekly, etc.). Got {len(parts)} fields: '{cron_expression}'"
|
||||
)
|
||||
|
||||
tz = pytz.timezone(timezone)
|
||||
|
||||
if base_time is None:
|
||||
base_time = datetime.now(UTC)
|
||||
|
||||
base_time_tz = base_time.astimezone(tz)
|
||||
cron = croniter(cron_expression, base_time_tz)
|
||||
next_run_tz = cron.get_next(datetime)
|
||||
next_run_utc = next_run_tz.astimezone(UTC)
|
||||
|
||||
return next_run_utc
|
||||
|
||||
|
||||
def convert_12h_to_24h(time_str: str) -> tuple[int, int]:
|
||||
"""
|
||||
Parse 12-hour time format to 24-hour format for cron compatibility.
|
||||
|
||||
Args:
|
||||
time_str: Time string in format "HH:MM AM/PM" (e.g., "12:30 PM")
|
||||
|
||||
Returns:
|
||||
Tuple of (hour, minute) in 24-hour format
|
||||
|
||||
Raises:
|
||||
ValueError: If time string format is invalid or values are out of range
|
||||
|
||||
Examples:
|
||||
- "12:00 AM" -> (0, 0) # Midnight
|
||||
- "12:00 PM" -> (12, 0) # Noon
|
||||
- "1:30 PM" -> (13, 30)
|
||||
- "11:59 PM" -> (23, 59)
|
||||
"""
|
||||
if not time_str or not time_str.strip():
|
||||
raise ValueError("Time string cannot be empty")
|
||||
|
||||
parts = time_str.strip().split()
|
||||
if len(parts) != 2:
|
||||
raise ValueError(f"Invalid time format: '{time_str}'. Expected 'HH:MM AM/PM'")
|
||||
|
||||
time_part, period = parts
|
||||
period = period.upper()
|
||||
|
||||
if period not in ["AM", "PM"]:
|
||||
raise ValueError(f"Invalid period: '{period}'. Must be 'AM' or 'PM'")
|
||||
|
||||
time_parts = time_part.split(":")
|
||||
if len(time_parts) != 2:
|
||||
raise ValueError(f"Invalid time format: '{time_part}'. Expected 'HH:MM'")
|
||||
|
||||
try:
|
||||
hour = int(time_parts[0])
|
||||
minute = int(time_parts[1])
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid time values: {e}")
|
||||
|
||||
if hour < 1 or hour > 12:
|
||||
raise ValueError(f"Invalid hour: {hour}. Must be between 1 and 12")
|
||||
|
||||
if minute < 0 or minute > 59:
|
||||
raise ValueError(f"Invalid minute: {minute}. Must be between 0 and 59")
|
||||
|
||||
# Handle 12-hour to 24-hour edge cases
|
||||
if period == "PM" and hour != 12:
|
||||
hour += 12
|
||||
elif period == "AM" and hour == 12:
|
||||
hour = 0
|
||||
|
||||
return hour, minute
|
||||
@ -1,67 +0,0 @@
|
||||
"""Add workflow trigger logs table
|
||||
|
||||
Revision ID: 4558cfabe44e
|
||||
Revises: 0e154742a5fa
|
||||
Create Date: 2025-08-23 20:38:20.059323
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import models as models
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '4558cfabe44e'
|
||||
down_revision = '8d289573e1da'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('workflow_trigger_logs',
|
||||
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False),
|
||||
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('app_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('workflow_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('workflow_run_id', models.types.StringUUID(), nullable=True),
|
||||
sa.Column('root_node_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('trigger_type', sa.String(length=50), nullable=False),
|
||||
sa.Column('trigger_data', sa.Text(), nullable=False),
|
||||
sa.Column('inputs', sa.Text(), nullable=False),
|
||||
sa.Column('outputs', sa.Text(), nullable=True),
|
||||
sa.Column('status', sa.String(length=50), nullable=False),
|
||||
sa.Column('error', sa.Text(), nullable=True),
|
||||
sa.Column('queue_name', sa.String(length=100), nullable=False),
|
||||
sa.Column('celery_task_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('retry_count', sa.Integer(), nullable=False),
|
||||
sa.Column('elapsed_time', sa.Float(), nullable=True),
|
||||
sa.Column('total_tokens', sa.Integer(), nullable=True),
|
||||
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
|
||||
sa.Column('created_by_role', sa.String(length=255), nullable=False),
|
||||
sa.Column('created_by', sa.String(length=255), nullable=False),
|
||||
sa.Column('triggered_at', sa.DateTime(), nullable=True),
|
||||
sa.Column('finished_at', sa.DateTime(), nullable=True),
|
||||
sa.PrimaryKeyConstraint('id', name='workflow_trigger_log_pkey')
|
||||
)
|
||||
with op.batch_alter_table('workflow_trigger_logs', schema=None) as batch_op:
|
||||
batch_op.create_index('workflow_trigger_log_created_at_idx', ['created_at'], unique=False)
|
||||
batch_op.create_index('workflow_trigger_log_status_idx', ['status'], unique=False)
|
||||
batch_op.create_index('workflow_trigger_log_tenant_app_idx', ['tenant_id', 'app_id'], unique=False)
|
||||
batch_op.create_index('workflow_trigger_log_workflow_id_idx', ['workflow_id'], unique=False)
|
||||
batch_op.create_index('workflow_trigger_log_workflow_run_idx', ['workflow_run_id'], unique=False)
|
||||
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('workflow_trigger_logs', schema=None) as batch_op:
|
||||
batch_op.drop_index('workflow_trigger_log_workflow_run_idx')
|
||||
batch_op.drop_index('workflow_trigger_log_workflow_id_idx')
|
||||
batch_op.drop_index('workflow_trigger_log_tenant_app_idx')
|
||||
batch_op.drop_index('workflow_trigger_log_status_idx')
|
||||
batch_op.drop_index('workflow_trigger_log_created_at_idx')
|
||||
|
||||
op.drop_table('workflow_trigger_logs')
|
||||
# ### end Alembic commands ###
|
||||
@ -1,47 +0,0 @@
|
||||
"""Add workflow webhook table
|
||||
|
||||
Revision ID: 5871f634954d
|
||||
Revises: fa8b0fa6f407
|
||||
Create Date: 2025-08-23 20:39:20.704501
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import models as models
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '5871f634954d'
|
||||
down_revision = '4558cfabe44e'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('workflow_webhook_triggers',
|
||||
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False),
|
||||
sa.Column('app_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('node_id', sa.String(length=64), nullable=False),
|
||||
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('webhook_id', sa.String(length=24), nullable=False),
|
||||
sa.Column('created_by', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
|
||||
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
|
||||
sa.PrimaryKeyConstraint('id', name='workflow_webhook_trigger_pkey'),
|
||||
sa.UniqueConstraint('app_id', 'node_id', name='uniq_node'),
|
||||
sa.UniqueConstraint('webhook_id', name='uniq_webhook_id')
|
||||
)
|
||||
with op.batch_alter_table('workflow_webhook_triggers', schema=None) as batch_op:
|
||||
batch_op.create_index('workflow_webhook_trigger_tenant_idx', ['tenant_id'], unique=False)
|
||||
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('workflow_webhook_triggers', schema=None) as batch_op:
|
||||
batch_op.drop_index('workflow_webhook_trigger_tenant_idx')
|
||||
|
||||
op.drop_table('workflow_webhook_triggers')
|
||||
# ### end Alembic commands ###
|
||||
@ -1,47 +0,0 @@
|
||||
"""Add app triggers table
|
||||
|
||||
Revision ID: 9ee7d347f4c1
|
||||
Revises: 5871f634954d
|
||||
Create Date: 2025-08-27 17:33:30.082812
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import models as models
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '9ee7d347f4c1'
|
||||
down_revision = '5871f634954d'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('app_triggers',
|
||||
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False),
|
||||
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('app_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('node_id', sa.String(length=64), nullable=False),
|
||||
sa.Column('trigger_type', sa.String(length=50), nullable=False),
|
||||
sa.Column('title', sa.String(length=255), nullable=False),
|
||||
sa.Column('provider_name', sa.String(length=255), server_default='', nullable=True),
|
||||
sa.Column('status', sa.String(length=50), nullable=False),
|
||||
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
|
||||
sa.Column('updated_at', sa.DateTime(), nullable=False),
|
||||
sa.PrimaryKeyConstraint('id', name='app_trigger_pkey')
|
||||
)
|
||||
with op.batch_alter_table('app_triggers', schema=None) as batch_op:
|
||||
batch_op.create_index('app_trigger_tenant_app_idx', ['tenant_id', 'app_id'], unique=False)
|
||||
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('app_triggers', schema=None) as batch_op:
|
||||
batch_op.drop_index('app_trigger_tenant_app_idx')
|
||||
|
||||
op.drop_table('app_triggers')
|
||||
# ### end Alembic commands ###
|
||||
@ -1,47 +0,0 @@
|
||||
"""Add workflow schedule plan table
|
||||
|
||||
Revision ID: c19938f630b6
|
||||
Revises: 9ee7d347f4c1
|
||||
Create Date: 2025-08-28 20:52:41.300028
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import models as models
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'c19938f630b6'
|
||||
down_revision = '875c659da2f8'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('workflow_schedule_plans',
|
||||
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False),
|
||||
sa.Column('app_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('node_id', sa.String(length=64), nullable=False),
|
||||
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('cron_expression', sa.String(length=255), nullable=False),
|
||||
sa.Column('timezone', sa.String(length=64), nullable=False),
|
||||
sa.Column('next_run_at', sa.DateTime(), nullable=True),
|
||||
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
|
||||
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
|
||||
sa.PrimaryKeyConstraint('id', name='workflow_schedule_plan_pkey'),
|
||||
sa.UniqueConstraint('app_id', 'node_id', name='uniq_app_node')
|
||||
)
|
||||
with op.batch_alter_table('workflow_schedule_plans', schema=None) as batch_op:
|
||||
batch_op.create_index('workflow_schedule_plan_next_idx', ['next_run_at'], unique=False)
|
||||
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('workflow_schedule_plans', schema=None) as batch_op:
|
||||
batch_op.drop_index('workflow_schedule_plan_next_idx')
|
||||
|
||||
op.drop_table('workflow_schedule_plans')
|
||||
# ### end Alembic commands ###
|
||||
@ -1,104 +0,0 @@
|
||||
"""plugin_trigger
|
||||
|
||||
Revision ID: 132392a2635f
|
||||
Revises: 9ee7d347f4c1
|
||||
Create Date: 2025-09-03 15:00:57.326868
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import models as models
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '132392a2635f'
|
||||
down_revision = '9ee7d347f4c1'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('trigger_oauth_system_clients',
|
||||
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False),
|
||||
sa.Column('plugin_id', sa.String(length=512), nullable=False),
|
||||
sa.Column('provider', sa.String(length=255), nullable=False),
|
||||
sa.Column('encrypted_oauth_params', sa.Text(), nullable=False),
|
||||
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
|
||||
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
|
||||
sa.PrimaryKeyConstraint('id', name='trigger_oauth_system_client_pkey'),
|
||||
sa.UniqueConstraint('plugin_id', 'provider', name='trigger_oauth_system_client_plugin_id_provider_idx')
|
||||
)
|
||||
op.create_table('trigger_oauth_tenant_clients',
|
||||
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False),
|
||||
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('plugin_id', sa.String(length=512), nullable=False),
|
||||
sa.Column('provider', sa.String(length=255), nullable=False),
|
||||
sa.Column('enabled', sa.Boolean(), server_default=sa.text('true'), nullable=False),
|
||||
sa.Column('encrypted_oauth_params', sa.Text(), nullable=False),
|
||||
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
|
||||
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
|
||||
sa.PrimaryKeyConstraint('id', name='trigger_oauth_tenant_client_pkey'),
|
||||
sa.UniqueConstraint('tenant_id', 'plugin_id', 'provider', name='unique_trigger_oauth_tenant_client')
|
||||
)
|
||||
op.create_table('trigger_subscriptions',
|
||||
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False),
|
||||
sa.Column('name', sa.String(length=255), nullable=False, comment='Subscription instance name'),
|
||||
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('user_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('provider_id', sa.String(length=255), nullable=False, comment='Provider identifier (e.g., plugin_id/provider_name)'),
|
||||
sa.Column('endpoint_id', sa.String(length=255), nullable=False, comment='Subscription endpoint'),
|
||||
sa.Column('parameters', sa.JSON(), nullable=False, comment='Subscription parameters JSON'),
|
||||
sa.Column('properties', sa.JSON(), nullable=False, comment='Subscription properties JSON'),
|
||||
sa.Column('credentials', sa.JSON(), nullable=False, comment='Subscription credentials JSON'),
|
||||
sa.Column('credential_type', sa.String(length=50), nullable=False, comment='oauth or api_key'),
|
||||
sa.Column('credential_expires_at', sa.Integer(), nullable=False, comment='OAuth token expiration timestamp, -1 for never'),
|
||||
sa.Column('expires_at', sa.Integer(), nullable=False, comment='Subscription instance expiration timestamp, -1 for never'),
|
||||
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
|
||||
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
|
||||
sa.PrimaryKeyConstraint('id', name='trigger_provider_pkey'),
|
||||
sa.UniqueConstraint('tenant_id', 'provider_id', 'name', name='unique_trigger_provider')
|
||||
)
|
||||
with op.batch_alter_table('trigger_subscriptions', schema=None) as batch_op:
|
||||
batch_op.create_index('idx_trigger_providers_endpoint', ['endpoint_id'], unique=True)
|
||||
batch_op.create_index('idx_trigger_providers_tenant_endpoint', ['tenant_id', 'endpoint_id'], unique=False)
|
||||
batch_op.create_index('idx_trigger_providers_tenant_provider', ['tenant_id', 'provider_id'], unique=False)
|
||||
|
||||
op.create_table('workflow_plugin_triggers',
|
||||
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False),
|
||||
sa.Column('app_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('node_id', sa.String(length=64), nullable=False),
|
||||
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),
|
||||
sa.Column('provider_id', sa.String(length=255), nullable=False),
|
||||
sa.Column('trigger_id', sa.String(length=510), nullable=False),
|
||||
sa.Column('triggered_by', sa.String(length=16), nullable=False),
|
||||
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
|
||||
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
|
||||
sa.PrimaryKeyConstraint('id', name='workflow_plugin_trigger_pkey'),
|
||||
sa.UniqueConstraint('app_id', 'node_id', 'triggered_by', name='uniq_plugin_node'),
|
||||
sa.UniqueConstraint('trigger_id', 'node_id', name='uniq_trigger_node')
|
||||
)
|
||||
with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op:
|
||||
batch_op.create_index('workflow_plugin_trigger_tenant_idx', ['tenant_id'], unique=False)
|
||||
batch_op.create_index('workflow_plugin_trigger_trigger_idx', ['trigger_id'], unique=False)
|
||||
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op:
|
||||
batch_op.drop_index('workflow_plugin_trigger_trigger_idx')
|
||||
batch_op.drop_index('workflow_plugin_trigger_tenant_idx')
|
||||
|
||||
op.drop_table('workflow_plugin_triggers')
|
||||
with op.batch_alter_table('trigger_subscriptions', schema=None) as batch_op:
|
||||
batch_op.drop_index('idx_trigger_providers_tenant_provider')
|
||||
batch_op.drop_index('idx_trigger_providers_tenant_endpoint')
|
||||
batch_op.drop_index('idx_trigger_providers_endpoint')
|
||||
|
||||
op.drop_table('trigger_subscriptions')
|
||||
op.drop_table('trigger_oauth_tenant_clients')
|
||||
op.drop_table('trigger_oauth_system_clients')
|
||||
|
||||
# ### end Alembic commands ###
|
||||
@ -1,62 +0,0 @@
|
||||
"""plugin_trigger_workflow
|
||||
|
||||
Revision ID: 86f068bf56fb
|
||||
Revises: 132392a2635f
|
||||
Create Date: 2025-09-04 12:12:44.661875
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import models as models
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '86f068bf56fb'
|
||||
down_revision = '132392a2635f'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op:
|
||||
batch_op.add_column(sa.Column('subscription_id', sa.String(length=255), nullable=False))
|
||||
batch_op.alter_column('provider_id',
|
||||
existing_type=sa.VARCHAR(length=255),
|
||||
type_=sa.String(length=512),
|
||||
existing_nullable=False)
|
||||
batch_op.alter_column('trigger_id',
|
||||
existing_type=sa.VARCHAR(length=510),
|
||||
type_=sa.String(length=255),
|
||||
existing_nullable=False)
|
||||
batch_op.drop_constraint(batch_op.f('uniq_plugin_node'), type_='unique')
|
||||
batch_op.drop_constraint(batch_op.f('uniq_trigger_node'), type_='unique')
|
||||
batch_op.drop_index(batch_op.f('workflow_plugin_trigger_tenant_idx'))
|
||||
batch_op.drop_index(batch_op.f('workflow_plugin_trigger_trigger_idx'))
|
||||
batch_op.create_unique_constraint('uniq_app_node_subscription', ['app_id', 'node_id'])
|
||||
batch_op.create_index('workflow_plugin_trigger_tenant_subscription_idx', ['tenant_id', 'subscription_id'], unique=False)
|
||||
batch_op.drop_column('triggered_by')
|
||||
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op:
|
||||
batch_op.add_column(sa.Column('triggered_by', sa.VARCHAR(length=16), autoincrement=False, nullable=False))
|
||||
batch_op.drop_index('workflow_plugin_trigger_tenant_subscription_idx')
|
||||
batch_op.drop_constraint('uniq_app_node_subscription', type_='unique')
|
||||
batch_op.create_index(batch_op.f('workflow_plugin_trigger_trigger_idx'), ['trigger_id'], unique=False)
|
||||
batch_op.create_index(batch_op.f('workflow_plugin_trigger_tenant_idx'), ['tenant_id'], unique=False)
|
||||
batch_op.create_unique_constraint(batch_op.f('uniq_trigger_node'), ['trigger_id', 'node_id'], postgresql_nulls_not_distinct=False)
|
||||
batch_op.create_unique_constraint(batch_op.f('uniq_plugin_node'), ['app_id', 'node_id', 'triggered_by'], postgresql_nulls_not_distinct=False)
|
||||
batch_op.alter_column('trigger_id',
|
||||
existing_type=sa.String(length=255),
|
||||
type_=sa.VARCHAR(length=510),
|
||||
existing_nullable=False)
|
||||
batch_op.alter_column('provider_id',
|
||||
existing_type=sa.String(length=512),
|
||||
type_=sa.VARCHAR(length=255),
|
||||
existing_nullable=False)
|
||||
batch_op.drop_column('subscription_id')
|
||||
# ### end Alembic commands ###
|
||||
@ -1,37 +0,0 @@
|
||||
"""plugin_trigger_idx
|
||||
|
||||
Revision ID: 875c659da2f8
|
||||
Revises: 86f068bf56fb
|
||||
Create Date: 2025-09-05 15:51:08.635283
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import models as models
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '875c659da2f8'
|
||||
down_revision = '86f068bf56fb'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op:
|
||||
batch_op.add_column(sa.Column('trigger_name', sa.String(length=255), nullable=False))
|
||||
batch_op.drop_index(batch_op.f('workflow_plugin_trigger_tenant_subscription_idx'))
|
||||
batch_op.create_index('workflow_plugin_trigger_tenant_subscription_idx', ['tenant_id', 'subscription_id', 'trigger_name'], unique=False)
|
||||
batch_op.drop_column('trigger_id')
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op:
|
||||
batch_op.add_column(sa.Column('trigger_id', sa.VARCHAR(length=255), autoincrement=False, nullable=False))
|
||||
batch_op.drop_index('workflow_plugin_trigger_tenant_subscription_idx')
|
||||
batch_op.create_index(batch_op.f('workflow_plugin_trigger_tenant_subscription_idx'), ['tenant_id', 'subscription_id'], unique=False)
|
||||
batch_op.drop_column('trigger_name')
|
||||
# ### end Alembic commands ###
|
||||
@ -79,12 +79,8 @@ from .tools import (
|
||||
ToolModelInvoke,
|
||||
WorkflowToolProvider,
|
||||
)
|
||||
from .trigger import TriggerOAuthSystemClient, TriggerOAuthTenantClient, TriggerSubscription
|
||||
from .web import PinnedConversation, SavedMessage
|
||||
from .workflow import (
|
||||
AppTrigger,
|
||||
AppTriggerStatus,
|
||||
AppTriggerType,
|
||||
ConversationVariable,
|
||||
Workflow,
|
||||
WorkflowAppLog,
|
||||
@ -93,7 +89,6 @@ from .workflow import (
|
||||
WorkflowNodeExecutionOffload,
|
||||
WorkflowNodeExecutionTriggeredFrom,
|
||||
WorkflowRun,
|
||||
WorkflowSchedulePlan,
|
||||
WorkflowType,
|
||||
)
|
||||
|
||||
@ -110,12 +105,9 @@ __all__ = [
|
||||
"AppAnnotationHitHistory",
|
||||
"AppAnnotationSetting",
|
||||
"AppDatasetJoin",
|
||||
"AppMCPServer",
|
||||
"AppMCPServer", # Added
|
||||
"AppMode",
|
||||
"AppModelConfig",
|
||||
"AppTrigger",
|
||||
"AppTriggerStatus",
|
||||
"AppTriggerType",
|
||||
"BuiltinToolProvider",
|
||||
"CeleryTask",
|
||||
"CeleryTaskSet",
|
||||
@ -176,9 +168,6 @@ __all__ = [
|
||||
"ToolLabelBinding",
|
||||
"ToolModelInvoke",
|
||||
"TraceAppConfig",
|
||||
"TriggerOAuthSystemClient",
|
||||
"TriggerOAuthTenantClient",
|
||||
"TriggerSubscription",
|
||||
"UploadFile",
|
||||
"UserFrom",
|
||||
"Whitelist",
|
||||
@ -190,7 +179,6 @@ __all__ = [
|
||||
"WorkflowNodeExecutionTriggeredFrom",
|
||||
"WorkflowRun",
|
||||
"WorkflowRunTriggeredFrom",
|
||||
"WorkflowSchedulePlan",
|
||||
"WorkflowToolProvider",
|
||||
"WorkflowType",
|
||||
]
|
||||
|
||||
@ -689,6 +689,7 @@ class DocumentSegment(Base):
|
||||
sa.Index("document_segment_tenant_document_idx", "document_id", "tenant_id"),
|
||||
sa.Index("document_segment_node_dataset_idx", "index_node_id", "dataset_id"),
|
||||
sa.Index("document_segment_tenant_idx", "tenant_id"),
|
||||
sa.Index("document_segment_dataset_hash_idx", "dataset_id", "index_node_hash"),
|
||||
)
|
||||
|
||||
# initial fields
|
||||
|
||||
@ -16,9 +16,6 @@ class WorkflowRunTriggeredFrom(StrEnum):
|
||||
APP_RUN = "app-run"
|
||||
RAG_PIPELINE_RUN = "rag-pipeline-run"
|
||||
RAG_PIPELINE_DEBUGGING = "rag-pipeline-debugging"
|
||||
WEBHOOK = "webhook"
|
||||
SCHEDULE = "schedule"
|
||||
PLUGIN = "plugin"
|
||||
|
||||
|
||||
class DraftVariableType(StrEnum):
|
||||
|
||||
@ -57,8 +57,3 @@ class ToolProviderID(GenericProviderID):
|
||||
class DatasourceProviderID(GenericProviderID):
|
||||
def __init__(self, value: str, is_hardcoded: bool = False) -> None:
|
||||
super().__init__(value, is_hardcoded)
|
||||
|
||||
|
||||
class TriggerProviderID(GenericProviderID):
|
||||
def __init__(self, value: str, is_hardcoded: bool = False) -> None:
|
||||
super().__init__(value, is_hardcoded)
|
||||
|
||||
@ -1,139 +0,0 @@
|
||||
import json
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import cast
|
||||
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import DateTime, Index, Integer, String, UniqueConstraint, func
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
from core.trigger.entities.api_entities import TriggerProviderSubscriptionApiEntity
|
||||
from core.trigger.entities.entities import Subscription
|
||||
from core.trigger.utils.endpoint import parse_endpoint_id
|
||||
from models.base import Base
|
||||
from models.types import StringUUID
|
||||
|
||||
|
||||
class TriggerSubscription(Base):
|
||||
"""
|
||||
Trigger provider model for managing credentials
|
||||
Supports multiple credential instances per provider
|
||||
"""
|
||||
|
||||
__tablename__ = "trigger_subscriptions"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="trigger_provider_pkey"),
|
||||
Index("idx_trigger_providers_tenant_provider", "tenant_id", "provider_id"),
|
||||
# Primary index for O(1) lookup by endpoint
|
||||
Index("idx_trigger_providers_endpoint", "endpoint_id", unique=True),
|
||||
# Composite index for tenant-specific queries (optional, kept for compatibility)
|
||||
Index("idx_trigger_providers_tenant_endpoint", "tenant_id", "endpoint_id"),
|
||||
UniqueConstraint("tenant_id", "provider_id", "name", name="unique_trigger_provider"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
|
||||
name: Mapped[str] = mapped_column(String(255), nullable=False, comment="Subscription instance name")
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
user_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
provider_id: Mapped[str] = mapped_column(
|
||||
String(255), nullable=False, comment="Provider identifier (e.g., plugin_id/provider_name)"
|
||||
)
|
||||
endpoint_id: Mapped[str] = mapped_column(String(255), nullable=False, comment="Subscription endpoint")
|
||||
parameters: Mapped[dict] = mapped_column(sa.JSON, nullable=False, comment="Subscription parameters JSON")
|
||||
properties: Mapped[dict] = mapped_column(sa.JSON, nullable=False, comment="Subscription properties JSON")
|
||||
|
||||
credentials: Mapped[dict] = mapped_column(sa.JSON, nullable=False, comment="Subscription credentials JSON")
|
||||
credential_type: Mapped[str] = mapped_column(String(50), nullable=False, comment="oauth or api_key")
|
||||
credential_expires_at: Mapped[int] = mapped_column(
|
||||
Integer, default=-1, comment="OAuth token expiration timestamp, -1 for never"
|
||||
)
|
||||
expires_at: Mapped[int] = mapped_column(
|
||||
Integer, default=-1, comment="Subscription instance expiration timestamp, -1 for never"
|
||||
)
|
||||
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime,
|
||||
nullable=False,
|
||||
server_default=func.current_timestamp(),
|
||||
server_onupdate=func.current_timestamp(),
|
||||
)
|
||||
|
||||
def is_credential_expired(self) -> bool:
|
||||
"""Check if credential is expired"""
|
||||
if self.credential_expires_at == -1:
|
||||
return False
|
||||
# Check if token expires in next 3 minutes
|
||||
return (self.credential_expires_at - 180) < int(time.time())
|
||||
|
||||
def to_entity(self) -> Subscription:
|
||||
return Subscription(
|
||||
expires_at=self.expires_at,
|
||||
endpoint=parse_endpoint_id(self.endpoint_id),
|
||||
properties=self.properties,
|
||||
)
|
||||
|
||||
def to_api_entity(self) -> TriggerProviderSubscriptionApiEntity:
|
||||
return TriggerProviderSubscriptionApiEntity(
|
||||
id=self.id,
|
||||
name=self.name,
|
||||
provider=self.provider_id,
|
||||
endpoint=parse_endpoint_id(self.endpoint_id),
|
||||
parameters=self.parameters,
|
||||
properties=self.properties,
|
||||
credential_type=CredentialType(self.credential_type),
|
||||
credentials=self.credentials,
|
||||
workflows_in_use=-1,
|
||||
)
|
||||
|
||||
|
||||
# system level trigger oauth client params
|
||||
class TriggerOAuthSystemClient(Base):
|
||||
__tablename__ = "trigger_oauth_system_clients"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="trigger_oauth_system_client_pkey"),
|
||||
sa.UniqueConstraint("plugin_id", "provider", name="trigger_oauth_system_client_plugin_id_provider_idx"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
|
||||
plugin_id: Mapped[str] = mapped_column(String(512), nullable=False)
|
||||
provider: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
# oauth params of the trigger provider
|
||||
encrypted_oauth_params: Mapped[str] = mapped_column(sa.Text, nullable=False)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime,
|
||||
nullable=False,
|
||||
server_default=func.current_timestamp(),
|
||||
server_onupdate=func.current_timestamp(),
|
||||
)
|
||||
|
||||
|
||||
# tenant level trigger oauth client params (client_id, client_secret, etc.)
|
||||
class TriggerOAuthTenantClient(Base):
|
||||
__tablename__ = "trigger_oauth_tenant_clients"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="trigger_oauth_tenant_client_pkey"),
|
||||
sa.UniqueConstraint("tenant_id", "plugin_id", "provider", name="unique_trigger_oauth_tenant_client"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
|
||||
# tenant id
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
plugin_id: Mapped[str] = mapped_column(String(512), nullable=False)
|
||||
provider: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
enabled: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, server_default=sa.text("true"))
|
||||
# oauth params of the trigger provider
|
||||
encrypted_oauth_params: Mapped[str] = mapped_column(sa.Text, nullable=False)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime,
|
||||
nullable=False,
|
||||
server_default=func.current_timestamp(),
|
||||
server_onupdate=func.current_timestamp(),
|
||||
)
|
||||
|
||||
@property
|
||||
def oauth_params(self) -> dict:
|
||||
return cast(dict, json.loads(self.encrypted_oauth_params or "{}"))
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user