mirror of
https://github.com/langgenius/dify.git
synced 2026-05-05 09:58:04 +08:00
feat: introduce trigger functionality (#27644)
Signed-off-by: lyzno1 <yuanyouhuilyz@gmail.com> Co-authored-by: Stream <Stream_2@qq.com> Co-authored-by: lyzno1 <92089059+lyzno1@users.noreply.github.com> Co-authored-by: zhsama <torvalds@linux.do> Co-authored-by: Harry <xh001x@hotmail.com> Co-authored-by: lyzno1 <yuanyouhuilyz@gmail.com> Co-authored-by: yessenia <yessenia.contact@gmail.com> Co-authored-by: hjlarry <hjlarry@163.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: WTW0313 <twwu@dify.ai> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@ -26,7 +26,14 @@ from .dataset import (
|
||||
TidbAuthBinding,
|
||||
Whitelist,
|
||||
)
|
||||
from .enums import CreatorUserRole, UserFrom, WorkflowRunTriggeredFrom
|
||||
from .enums import (
|
||||
AppTriggerStatus,
|
||||
AppTriggerType,
|
||||
CreatorUserRole,
|
||||
UserFrom,
|
||||
WorkflowRunTriggeredFrom,
|
||||
WorkflowTriggerStatus,
|
||||
)
|
||||
from .model import (
|
||||
ApiRequest,
|
||||
ApiToken,
|
||||
@ -79,6 +86,13 @@ from .tools import (
|
||||
ToolModelInvoke,
|
||||
WorkflowToolProvider,
|
||||
)
|
||||
from .trigger import (
|
||||
AppTrigger,
|
||||
TriggerOAuthSystemClient,
|
||||
TriggerOAuthTenantClient,
|
||||
TriggerSubscription,
|
||||
WorkflowSchedulePlan,
|
||||
)
|
||||
from .web import PinnedConversation, SavedMessage
|
||||
from .workflow import (
|
||||
ConversationVariable,
|
||||
@ -106,9 +120,12 @@ __all__ = [
|
||||
"AppAnnotationHitHistory",
|
||||
"AppAnnotationSetting",
|
||||
"AppDatasetJoin",
|
||||
"AppMCPServer", # Added
|
||||
"AppMCPServer",
|
||||
"AppMode",
|
||||
"AppModelConfig",
|
||||
"AppTrigger",
|
||||
"AppTriggerStatus",
|
||||
"AppTriggerType",
|
||||
"BuiltinToolProvider",
|
||||
"CeleryTask",
|
||||
"CeleryTaskSet",
|
||||
@ -169,6 +186,9 @@ __all__ = [
|
||||
"ToolLabelBinding",
|
||||
"ToolModelInvoke",
|
||||
"TraceAppConfig",
|
||||
"TriggerOAuthSystemClient",
|
||||
"TriggerOAuthTenantClient",
|
||||
"TriggerSubscription",
|
||||
"UploadFile",
|
||||
"UserFrom",
|
||||
"Whitelist",
|
||||
@ -181,6 +201,8 @@ __all__ = [
|
||||
"WorkflowPause",
|
||||
"WorkflowRun",
|
||||
"WorkflowRunTriggeredFrom",
|
||||
"WorkflowSchedulePlan",
|
||||
"WorkflowToolProvider",
|
||||
"WorkflowTriggerStatus",
|
||||
"WorkflowType",
|
||||
]
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
from enum import StrEnum
|
||||
|
||||
from core.workflow.enums import NodeType
|
||||
|
||||
|
||||
class CreatorUserRole(StrEnum):
|
||||
ACCOUNT = "account"
|
||||
@ -13,9 +15,12 @@ class UserFrom(StrEnum):
|
||||
|
||||
class WorkflowRunTriggeredFrom(StrEnum):
|
||||
DEBUGGING = "debugging"
|
||||
APP_RUN = "app-run"
|
||||
APP_RUN = "app-run" # webapp / service api
|
||||
RAG_PIPELINE_RUN = "rag-pipeline-run"
|
||||
RAG_PIPELINE_DEBUGGING = "rag-pipeline-debugging"
|
||||
WEBHOOK = "webhook"
|
||||
SCHEDULE = "schedule"
|
||||
PLUGIN = "plugin"
|
||||
|
||||
|
||||
class DraftVariableType(StrEnum):
|
||||
@ -38,3 +43,35 @@ class ExecutionOffLoadType(StrEnum):
|
||||
INPUTS = "inputs"
|
||||
PROCESS_DATA = "process_data"
|
||||
OUTPUTS = "outputs"
|
||||
|
||||
|
||||
class WorkflowTriggerStatus(StrEnum):
|
||||
"""Workflow Trigger Execution Status"""
|
||||
|
||||
PENDING = "pending"
|
||||
QUEUED = "queued"
|
||||
RUNNING = "running"
|
||||
SUCCEEDED = "succeeded"
|
||||
PAUSED = "paused"
|
||||
FAILED = "failed"
|
||||
RATE_LIMITED = "rate_limited"
|
||||
RETRYING = "retrying"
|
||||
|
||||
|
||||
class AppTriggerStatus(StrEnum):
|
||||
"""App Trigger Status Enum"""
|
||||
|
||||
ENABLED = "enabled"
|
||||
DISABLED = "disabled"
|
||||
UNAUTHORIZED = "unauthorized"
|
||||
|
||||
|
||||
class AppTriggerType(StrEnum):
|
||||
"""App Trigger Type Enum"""
|
||||
|
||||
TRIGGER_WEBHOOK = NodeType.TRIGGER_WEBHOOK.value
|
||||
TRIGGER_SCHEDULE = NodeType.TRIGGER_SCHEDULE.value
|
||||
TRIGGER_PLUGIN = NodeType.TRIGGER_PLUGIN.value
|
||||
|
||||
# for backward compatibility
|
||||
UNKNOWN = "unknown"
|
||||
|
||||
@ -57,3 +57,8 @@ class ToolProviderID(GenericProviderID):
|
||||
class DatasourceProviderID(GenericProviderID):
|
||||
def __init__(self, value: str, is_hardcoded: bool = False) -> None:
|
||||
super().__init__(value, is_hardcoded)
|
||||
|
||||
|
||||
class TriggerProviderID(GenericProviderID):
|
||||
def __init__(self, value: str, is_hardcoded: bool = False) -> None:
|
||||
super().__init__(value, is_hardcoded)
|
||||
|
||||
456
api/models/trigger.py
Normal file
456
api/models/trigger.py
Normal file
@ -0,0 +1,456 @@
|
||||
import json
|
||||
import time
|
||||
from collections.abc import Mapping
|
||||
from datetime import datetime
|
||||
from functools import cached_property
|
||||
from typing import Any, cast
|
||||
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import DateTime, Index, Integer, String, UniqueConstraint, func
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
from core.trigger.entities.api_entities import TriggerProviderSubscriptionApiEntity
|
||||
from core.trigger.entities.entities import Subscription
|
||||
from core.trigger.utils.endpoint import generate_plugin_trigger_endpoint_url, generate_webhook_trigger_endpoint
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from models.base import Base
|
||||
from models.engine import db
|
||||
from models.enums import AppTriggerStatus, AppTriggerType, CreatorUserRole, WorkflowTriggerStatus
|
||||
from models.model import Account
|
||||
from models.types import EnumText, StringUUID
|
||||
|
||||
|
||||
class TriggerSubscription(Base):
|
||||
"""
|
||||
Trigger provider model for managing credentials
|
||||
Supports multiple credential instances per provider
|
||||
"""
|
||||
|
||||
__tablename__ = "trigger_subscriptions"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="trigger_provider_pkey"),
|
||||
Index("idx_trigger_providers_tenant_provider", "tenant_id", "provider_id"),
|
||||
# Primary index for O(1) lookup by endpoint
|
||||
Index("idx_trigger_providers_endpoint", "endpoint_id", unique=True),
|
||||
# Composite index for tenant-specific queries (optional, kept for compatibility)
|
||||
Index("idx_trigger_providers_tenant_endpoint", "tenant_id", "endpoint_id"),
|
||||
UniqueConstraint("tenant_id", "provider_id", "name", name="unique_trigger_provider"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
|
||||
name: Mapped[str] = mapped_column(String(255), nullable=False, comment="Subscription instance name")
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
user_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
provider_id: Mapped[str] = mapped_column(
|
||||
String(255), nullable=False, comment="Provider identifier (e.g., plugin_id/provider_name)"
|
||||
)
|
||||
endpoint_id: Mapped[str] = mapped_column(String(255), nullable=False, comment="Subscription endpoint")
|
||||
parameters: Mapped[dict[str, Any]] = mapped_column(sa.JSON, nullable=False, comment="Subscription parameters JSON")
|
||||
properties: Mapped[dict[str, Any]] = mapped_column(sa.JSON, nullable=False, comment="Subscription properties JSON")
|
||||
|
||||
credentials: Mapped[dict[str, Any]] = mapped_column(
|
||||
sa.JSON, nullable=False, comment="Subscription credentials JSON"
|
||||
)
|
||||
credential_type: Mapped[str] = mapped_column(String(50), nullable=False, comment="oauth or api_key")
|
||||
credential_expires_at: Mapped[int] = mapped_column(
|
||||
Integer, default=-1, comment="OAuth token expiration timestamp, -1 for never"
|
||||
)
|
||||
expires_at: Mapped[int] = mapped_column(
|
||||
Integer, default=-1, comment="Subscription instance expiration timestamp, -1 for never"
|
||||
)
|
||||
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime,
|
||||
nullable=False,
|
||||
server_default=func.current_timestamp(),
|
||||
server_onupdate=func.current_timestamp(),
|
||||
)
|
||||
|
||||
def is_credential_expired(self) -> bool:
|
||||
"""Check if credential is expired"""
|
||||
if self.credential_expires_at == -1:
|
||||
return False
|
||||
# Check if token expires in next 3 minutes
|
||||
return (self.credential_expires_at - 180) < int(time.time())
|
||||
|
||||
def to_entity(self) -> Subscription:
|
||||
return Subscription(
|
||||
expires_at=self.expires_at,
|
||||
endpoint=generate_plugin_trigger_endpoint_url(self.endpoint_id),
|
||||
parameters=self.parameters,
|
||||
properties=self.properties,
|
||||
)
|
||||
|
||||
def to_api_entity(self) -> TriggerProviderSubscriptionApiEntity:
|
||||
return TriggerProviderSubscriptionApiEntity(
|
||||
id=self.id,
|
||||
name=self.name,
|
||||
provider=self.provider_id,
|
||||
endpoint=generate_plugin_trigger_endpoint_url(self.endpoint_id),
|
||||
parameters=self.parameters,
|
||||
properties=self.properties,
|
||||
credential_type=CredentialType(self.credential_type),
|
||||
credentials=self.credentials,
|
||||
workflows_in_use=-1,
|
||||
)
|
||||
|
||||
|
||||
# system level trigger oauth client params
|
||||
class TriggerOAuthSystemClient(Base):
|
||||
__tablename__ = "trigger_oauth_system_clients"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="trigger_oauth_system_client_pkey"),
|
||||
sa.UniqueConstraint("plugin_id", "provider", name="trigger_oauth_system_client_plugin_id_provider_idx"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
|
||||
plugin_id: Mapped[str] = mapped_column(String(512), nullable=False)
|
||||
provider: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
# oauth params of the trigger provider
|
||||
encrypted_oauth_params: Mapped[str] = mapped_column(sa.Text, nullable=False)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime,
|
||||
nullable=False,
|
||||
server_default=func.current_timestamp(),
|
||||
server_onupdate=func.current_timestamp(),
|
||||
)
|
||||
|
||||
|
||||
# tenant level trigger oauth client params (client_id, client_secret, etc.)
|
||||
class TriggerOAuthTenantClient(Base):
|
||||
__tablename__ = "trigger_oauth_tenant_clients"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="trigger_oauth_tenant_client_pkey"),
|
||||
sa.UniqueConstraint("tenant_id", "plugin_id", "provider", name="unique_trigger_oauth_tenant_client"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
|
||||
# tenant id
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
plugin_id: Mapped[str] = mapped_column(String(512), nullable=False)
|
||||
provider: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
enabled: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, server_default=sa.text("true"))
|
||||
# oauth params of the trigger provider
|
||||
encrypted_oauth_params: Mapped[str] = mapped_column(sa.Text, nullable=False)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime,
|
||||
nullable=False,
|
||||
server_default=func.current_timestamp(),
|
||||
server_onupdate=func.current_timestamp(),
|
||||
)
|
||||
|
||||
@property
|
||||
def oauth_params(self) -> Mapping[str, Any]:
|
||||
return cast(Mapping[str, Any], json.loads(self.encrypted_oauth_params or "{}"))
|
||||
|
||||
|
||||
class WorkflowTriggerLog(Base):
|
||||
"""
|
||||
Workflow Trigger Log
|
||||
|
||||
Track async trigger workflow runs with re-invocation capability
|
||||
|
||||
Attributes:
|
||||
- id (uuid) Trigger Log ID (used as workflow_trigger_log_id)
|
||||
- tenant_id (uuid) Workspace ID
|
||||
- app_id (uuid) App ID
|
||||
- workflow_id (uuid) Workflow ID
|
||||
- workflow_run_id (uuid) Optional - Associated workflow run ID when execution starts
|
||||
- root_node_id (string) Optional - Custom starting node ID for workflow execution
|
||||
- trigger_metadata (text) Optional - Trigger metadata (JSON)
|
||||
- trigger_type (string) Type of trigger: webhook, schedule, plugin
|
||||
- trigger_data (text) Full trigger data including inputs (JSON)
|
||||
- inputs (text) Input parameters (JSON)
|
||||
- outputs (text) Optional - Output content (JSON)
|
||||
- status (string) Execution status
|
||||
- error (text) Optional - Error message if failed
|
||||
- queue_name (string) Celery queue used
|
||||
- celery_task_id (string) Optional - Celery task ID for tracking
|
||||
- retry_count (int) Number of retry attempts
|
||||
- elapsed_time (float) Optional - Time consumption in seconds
|
||||
- total_tokens (int) Optional - Total tokens used
|
||||
- created_by_role (string) Creator role: account, end_user
|
||||
- created_by (string) Creator ID
|
||||
- created_at (timestamp) Creation time
|
||||
- triggered_at (timestamp) Optional - When actually triggered
|
||||
- finished_at (timestamp) Optional - Completion time
|
||||
"""
|
||||
|
||||
__tablename__ = "workflow_trigger_logs"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="workflow_trigger_log_pkey"),
|
||||
sa.Index("workflow_trigger_log_tenant_app_idx", "tenant_id", "app_id"),
|
||||
sa.Index("workflow_trigger_log_status_idx", "status"),
|
||||
sa.Index("workflow_trigger_log_created_at_idx", "created_at"),
|
||||
sa.Index("workflow_trigger_log_workflow_run_idx", "workflow_run_id"),
|
||||
sa.Index("workflow_trigger_log_workflow_id_idx", "workflow_id"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()"))
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
workflow_run_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
|
||||
root_node_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
|
||||
trigger_metadata: Mapped[str] = mapped_column(sa.Text, nullable=False)
|
||||
trigger_type: Mapped[str] = mapped_column(EnumText(AppTriggerType, length=50), nullable=False)
|
||||
trigger_data: Mapped[str] = mapped_column(sa.Text, nullable=False) # Full TriggerData as JSON
|
||||
inputs: Mapped[str] = mapped_column(sa.Text, nullable=False) # Just inputs for easy viewing
|
||||
outputs: Mapped[str | None] = mapped_column(sa.Text, nullable=True)
|
||||
|
||||
status: Mapped[str] = mapped_column(
|
||||
EnumText(WorkflowTriggerStatus, length=50), nullable=False, default=WorkflowTriggerStatus.PENDING
|
||||
)
|
||||
error: Mapped[str | None] = mapped_column(sa.Text, nullable=True)
|
||||
|
||||
queue_name: Mapped[str] = mapped_column(String(100), nullable=False)
|
||||
celery_task_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
|
||||
retry_count: Mapped[int] = mapped_column(sa.Integer, nullable=False, default=0)
|
||||
|
||||
elapsed_time: Mapped[float | None] = mapped_column(sa.Float, nullable=True)
|
||||
total_tokens: Mapped[int | None] = mapped_column(sa.Integer, nullable=True)
|
||||
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
created_by_role: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
created_by: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
|
||||
triggered_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
|
||||
finished_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
|
||||
|
||||
@property
|
||||
def created_by_account(self):
|
||||
created_by_role = CreatorUserRole(self.created_by_role)
|
||||
return db.session.get(Account, self.created_by) if created_by_role == CreatorUserRole.ACCOUNT else None
|
||||
|
||||
@property
|
||||
def created_by_end_user(self):
|
||||
from models.model import EndUser
|
||||
|
||||
created_by_role = CreatorUserRole(self.created_by_role)
|
||||
return db.session.get(EndUser, self.created_by) if created_by_role == CreatorUserRole.END_USER else None
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Convert to dictionary for API responses"""
|
||||
return {
|
||||
"id": self.id,
|
||||
"tenant_id": self.tenant_id,
|
||||
"app_id": self.app_id,
|
||||
"workflow_id": self.workflow_id,
|
||||
"workflow_run_id": self.workflow_run_id,
|
||||
"root_node_id": self.root_node_id,
|
||||
"trigger_metadata": json.loads(self.trigger_metadata) if self.trigger_metadata else None,
|
||||
"trigger_type": self.trigger_type,
|
||||
"trigger_data": json.loads(self.trigger_data),
|
||||
"inputs": json.loads(self.inputs),
|
||||
"outputs": json.loads(self.outputs) if self.outputs else None,
|
||||
"status": self.status,
|
||||
"error": self.error,
|
||||
"queue_name": self.queue_name,
|
||||
"celery_task_id": self.celery_task_id,
|
||||
"retry_count": self.retry_count,
|
||||
"elapsed_time": self.elapsed_time,
|
||||
"total_tokens": self.total_tokens,
|
||||
"created_by_role": self.created_by_role,
|
||||
"created_by": self.created_by,
|
||||
"created_at": self.created_at.isoformat() if self.created_at else None,
|
||||
"triggered_at": self.triggered_at.isoformat() if self.triggered_at else None,
|
||||
"finished_at": self.finished_at.isoformat() if self.finished_at else None,
|
||||
}
|
||||
|
||||
|
||||
class WorkflowWebhookTrigger(Base):
|
||||
"""
|
||||
Workflow Webhook Trigger
|
||||
|
||||
Attributes:
|
||||
- id (uuid) Primary key
|
||||
- app_id (uuid) App ID to bind to a specific app
|
||||
- node_id (varchar) Node ID which node in the workflow
|
||||
- tenant_id (uuid) Workspace ID
|
||||
- webhook_id (varchar) Webhook ID for URL: https://api.dify.ai/triggers/webhook/:webhook_id
|
||||
- created_by (varchar) User ID of the creator
|
||||
- created_at (timestamp) Creation time
|
||||
- updated_at (timestamp) Last update time
|
||||
"""
|
||||
|
||||
__tablename__ = "workflow_webhook_triggers"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="workflow_webhook_trigger_pkey"),
|
||||
sa.Index("workflow_webhook_trigger_tenant_idx", "tenant_id"),
|
||||
sa.UniqueConstraint("app_id", "node_id", name="uniq_node"),
|
||||
sa.UniqueConstraint("webhook_id", name="uniq_webhook_id"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()"))
|
||||
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
node_id: Mapped[str] = mapped_column(String(64), nullable=False)
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
webhook_id: Mapped[str] = mapped_column(String(24), nullable=False)
|
||||
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime,
|
||||
nullable=False,
|
||||
server_default=func.current_timestamp(),
|
||||
server_onupdate=func.current_timestamp(),
|
||||
)
|
||||
|
||||
@cached_property
|
||||
def webhook_url(self):
|
||||
"""
|
||||
Generated webhook url
|
||||
"""
|
||||
return generate_webhook_trigger_endpoint(self.webhook_id)
|
||||
|
||||
@cached_property
|
||||
def webhook_debug_url(self):
|
||||
"""
|
||||
Generated debug webhook url
|
||||
"""
|
||||
return generate_webhook_trigger_endpoint(self.webhook_id, True)
|
||||
|
||||
|
||||
class WorkflowPluginTrigger(Base):
|
||||
"""
|
||||
Workflow Plugin Trigger
|
||||
|
||||
Maps plugin triggers to workflow nodes, similar to WorkflowWebhookTrigger
|
||||
|
||||
Attributes:
|
||||
- id (uuid) Primary key
|
||||
- app_id (uuid) App ID to bind to a specific app
|
||||
- node_id (varchar) Node ID which node in the workflow
|
||||
- tenant_id (uuid) Workspace ID
|
||||
- provider_id (varchar) Plugin provider ID
|
||||
- event_name (varchar) trigger name
|
||||
- subscription_id (varchar) Subscription ID
|
||||
- created_at (timestamp) Creation time
|
||||
- updated_at (timestamp) Last update time
|
||||
"""
|
||||
|
||||
__tablename__ = "workflow_plugin_triggers"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="workflow_plugin_trigger_pkey"),
|
||||
sa.Index("workflow_plugin_trigger_tenant_subscription_idx", "tenant_id", "subscription_id", "event_name"),
|
||||
sa.UniqueConstraint("app_id", "node_id", name="uniq_app_node_subscription"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
|
||||
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
node_id: Mapped[str] = mapped_column(String(64), nullable=False)
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
provider_id: Mapped[str] = mapped_column(String(512), nullable=False)
|
||||
event_name: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
subscription_id: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime,
|
||||
nullable=False,
|
||||
server_default=func.current_timestamp(),
|
||||
server_onupdate=func.current_timestamp(),
|
||||
)
|
||||
|
||||
|
||||
class AppTrigger(Base):
|
||||
"""
|
||||
App Trigger
|
||||
|
||||
Manages multiple triggers for an app with enable/disable and authorization states.
|
||||
|
||||
Attributes:
|
||||
- id (uuid) Primary key
|
||||
- tenant_id (uuid) Workspace ID
|
||||
- app_id (uuid) App ID
|
||||
- trigger_type (string) Type: webhook, schedule, plugin
|
||||
- title (string) Trigger title
|
||||
|
||||
- status (string) Status: enabled, disabled, unauthorized, error
|
||||
- node_id (string) Optional workflow node ID
|
||||
- created_at (timestamp) Creation time
|
||||
- updated_at (timestamp) Last update time
|
||||
"""
|
||||
|
||||
__tablename__ = "app_triggers"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="app_trigger_pkey"),
|
||||
sa.Index("app_trigger_tenant_app_idx", "tenant_id", "app_id"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()"))
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
node_id: Mapped[str | None] = mapped_column(String(64), nullable=False)
|
||||
trigger_type: Mapped[str] = mapped_column(EnumText(AppTriggerType, length=50), nullable=False)
|
||||
title: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
provider_name: Mapped[str] = mapped_column(String(255), server_default="", nullable=True)
|
||||
status: Mapped[str] = mapped_column(
|
||||
EnumText(AppTriggerStatus, length=50), nullable=False, default=AppTriggerStatus.ENABLED
|
||||
)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime,
|
||||
nullable=False,
|
||||
default=naive_utc_now(),
|
||||
server_onupdate=func.current_timestamp(),
|
||||
)
|
||||
|
||||
|
||||
class WorkflowSchedulePlan(Base):
|
||||
"""
|
||||
Workflow Schedule Configuration
|
||||
|
||||
Store schedule configurations for time-based workflow triggers.
|
||||
Uses cron expressions with timezone support for flexible scheduling.
|
||||
|
||||
Attributes:
|
||||
- id (uuid) Primary key
|
||||
- app_id (uuid) App ID to bind to a specific app
|
||||
- node_id (varchar) Starting node ID for workflow execution
|
||||
- tenant_id (uuid) Workspace ID for multi-tenancy
|
||||
- cron_expression (varchar) Cron expression defining schedule pattern
|
||||
- timezone (varchar) Timezone for cron evaluation (e.g., 'Asia/Shanghai')
|
||||
- next_run_at (timestamp) Next scheduled execution time
|
||||
- created_at (timestamp) Creation timestamp
|
||||
- updated_at (timestamp) Last update timestamp
|
||||
"""
|
||||
|
||||
__tablename__ = "workflow_schedule_plans"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="workflow_schedule_plan_pkey"),
|
||||
sa.UniqueConstraint("app_id", "node_id", name="uniq_app_node"),
|
||||
sa.Index("workflow_schedule_plan_next_idx", "next_run_at"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()"))
|
||||
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
node_id: Mapped[str] = mapped_column(String(64), nullable=False)
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
|
||||
# Schedule configuration
|
||||
cron_expression: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
timezone: Mapped[str] = mapped_column(String(64), nullable=False)
|
||||
|
||||
# Schedule control
|
||||
next_run_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
|
||||
)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Convert to dictionary representation"""
|
||||
return {
|
||||
"id": self.id,
|
||||
"app_id": self.app_id,
|
||||
"node_id": self.node_id,
|
||||
"tenant_id": self.tenant_id,
|
||||
"cron_expression": self.cron_expression,
|
||||
"timezone": self.timezone,
|
||||
"next_run_at": self.next_run_at.isoformat() if self.next_run_at else None,
|
||||
"created_at": self.created_at.isoformat(),
|
||||
"updated_at": self.updated_at.isoformat(),
|
||||
}
|
||||
@ -1,6 +1,6 @@
|
||||
import json
|
||||
import logging
|
||||
from collections.abc import Mapping, Sequence
|
||||
from collections.abc import Generator, Mapping, Sequence
|
||||
from datetime import datetime
|
||||
from enum import StrEnum
|
||||
from typing import TYPE_CHECKING, Any, Optional, Union, cast
|
||||
@ -303,6 +303,54 @@ class Workflow(Base):
|
||||
def features_dict(self) -> dict[str, Any]:
|
||||
return json.loads(self.features) if self.features else {}
|
||||
|
||||
def walk_nodes(
|
||||
self, specific_node_type: NodeType | None = None
|
||||
) -> Generator[tuple[str, Mapping[str, Any]], None, None]:
|
||||
"""
|
||||
Walk through the workflow nodes, yield each node configuration.
|
||||
|
||||
Each node configuration is a tuple containing the node's id and the node's properties.
|
||||
|
||||
Node properties example:
|
||||
{
|
||||
"type": "llm",
|
||||
"title": "LLM",
|
||||
"desc": "",
|
||||
"variables": [],
|
||||
"model":
|
||||
{
|
||||
"provider": "langgenius/openai/openai",
|
||||
"name": "gpt-4",
|
||||
"mode": "chat",
|
||||
"completion_params": { "temperature": 0.7 },
|
||||
},
|
||||
"prompt_template": [{ "role": "system", "text": "" }],
|
||||
"context": { "enabled": false, "variable_selector": [] },
|
||||
"vision": { "enabled": false },
|
||||
"memory":
|
||||
{
|
||||
"window": { "enabled": false, "size": 10 },
|
||||
"query_prompt_template": "{{#sys.query#}}\n\n{{#sys.files#}}",
|
||||
"role_prefix": { "user": "", "assistant": "" },
|
||||
},
|
||||
"selected": false,
|
||||
}
|
||||
|
||||
For specific node type, refer to `core.workflow.nodes`
|
||||
"""
|
||||
graph_dict = self.graph_dict
|
||||
if "nodes" not in graph_dict:
|
||||
raise WorkflowDataError("nodes not found in workflow graph")
|
||||
|
||||
if specific_node_type:
|
||||
yield from (
|
||||
(node["id"], node["data"])
|
||||
for node in graph_dict["nodes"]
|
||||
if node["data"]["type"] == specific_node_type.value
|
||||
)
|
||||
else:
|
||||
yield from ((node["id"], node["data"]) for node in graph_dict["nodes"])
|
||||
|
||||
def user_input_form(self, to_old_structure: bool = False) -> list[Any]:
|
||||
# get start node from graph
|
||||
if not self.graph:
|
||||
|
||||
Reference in New Issue
Block a user