mirror of
https://github.com/langgenius/dify.git
synced 2026-05-05 09:58:04 +08:00
feat(trigger): implement plugin trigger synchronization and subscription management in workflow
- Added a new event handler for syncing plugin trigger relationships when a draft workflow is synced, ensuring that the database reflects the current state of plugin triggers. - Introduced subscription management features in the frontend, allowing users to select, add, and remove subscriptions for trigger plugins. - Updated various components to support subscription handling, including the addition of new UI elements for subscription selection and removal. - Enhanced internationalization support by adding new translation keys related to subscription management. These changes improve the overall functionality and user experience of trigger plugins within workflows.
This commit is contained in:
@ -52,6 +52,7 @@ class QueueEvent(StrEnum):
|
||||
STOP = "stop"
|
||||
RETRY = "retry"
|
||||
|
||||
|
||||
class AppQueueEvent(BaseModel):
|
||||
"""
|
||||
QueueEvent abstract entity
|
||||
|
||||
@ -4,6 +4,8 @@ from .create_document_index import handle
|
||||
from .create_installed_app_when_app_created import handle
|
||||
from .create_site_record_when_app_created import handle
|
||||
from .delete_tool_parameters_cache_when_sync_draft_workflow import handle
|
||||
from .sync_plugin_trigger_when_app_created import handle
|
||||
from .sync_webhook_when_app_created import handle
|
||||
from .sync_workflow_schedule_when_app_published import handle
|
||||
from .update_app_dataset_join_when_app_model_config_updated import handle
|
||||
from .update_app_dataset_join_when_app_published_workflow_updated import handle
|
||||
|
||||
@ -0,0 +1,22 @@
|
||||
import logging
|
||||
|
||||
from events.app_event import app_draft_workflow_was_synced
|
||||
from models.model import App, AppMode
|
||||
from models.workflow import Workflow
|
||||
from services.workflow_plugin_trigger_service import WorkflowPluginTriggerService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@app_draft_workflow_was_synced.connect
|
||||
def handle(sender, synced_draft_workflow: Workflow, **kwargs):
|
||||
"""
|
||||
While creating a workflow or updating a workflow, we may need to sync
|
||||
its plugin trigger relationships in DB.
|
||||
"""
|
||||
app: App = sender
|
||||
if app.mode != AppMode.WORKFLOW.value:
|
||||
# only handle workflow app, chatflow is not supported yet
|
||||
return
|
||||
|
||||
WorkflowPluginTriggerService.sync_plugin_trigger_relationships(app, synced_draft_workflow)
|
||||
@ -1484,7 +1484,7 @@ class WorkflowPluginTrigger(Base):
|
||||
- node_id (varchar) Node ID which node in the workflow
|
||||
- tenant_id (uuid) Workspace ID
|
||||
- provider_id (varchar) Plugin provider ID
|
||||
- trigger_name (varchar) trigger name (github_issues_trigger)
|
||||
- trigger_name (varchar) trigger name
|
||||
- subscription_id (varchar) Subscription ID
|
||||
- created_at (timestamp) Creation time
|
||||
- updated_at (timestamp) Last update time
|
||||
|
||||
@ -13,6 +13,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
TRIGGER_DEBUG_EVENT_TTL = 300
|
||||
|
||||
|
||||
class TriggerDebugEvent(BaseModel):
|
||||
subscription_id: str
|
||||
request_id: str
|
||||
|
||||
@ -1,17 +1,24 @@
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
from werkzeug.exceptions import NotFound
|
||||
|
||||
from core.workflow.nodes.enums import NodeType
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from models.model import App
|
||||
from models.trigger import TriggerSubscription
|
||||
from models.workflow import WorkflowPluginTrigger
|
||||
from models.workflow import Workflow, WorkflowPluginTrigger
|
||||
|
||||
|
||||
class WorkflowPluginTriggerService:
|
||||
"""Service for managing workflow plugin triggers"""
|
||||
|
||||
__PLUGIN_TRIGGER_NODE_CACHE_KEY__ = "plugin_trigger_nodes"
|
||||
MAX_PLUGIN_TRIGGER_NODES_PER_WORKFLOW = 5 # Maximum allowed plugin trigger nodes per workflow
|
||||
|
||||
@classmethod
|
||||
def create_plugin_trigger(
|
||||
cls,
|
||||
@ -379,3 +386,161 @@ class WorkflowPluginTriggerService:
|
||||
session.commit()
|
||||
|
||||
return count
|
||||
|
||||
@classmethod
|
||||
def sync_plugin_trigger_relationships(cls, app: App, workflow: Workflow):
|
||||
"""
|
||||
Sync plugin trigger relationships in DB.
|
||||
|
||||
1. Check if the workflow has any plugin trigger nodes
|
||||
2. Fetch the nodes from DB, see if there were any plugin trigger records already
|
||||
3. Diff the nodes and the plugin trigger records, create/update/delete the records as needed
|
||||
|
||||
Approach:
|
||||
Frequent DB operations may cause performance issues, using Redis to cache it instead.
|
||||
If any record exists, cache it.
|
||||
|
||||
Limits:
|
||||
- Maximum 5 plugin trigger nodes per workflow
|
||||
"""
|
||||
|
||||
class Cache(BaseModel):
|
||||
"""
|
||||
Cache model for plugin trigger nodes
|
||||
"""
|
||||
|
||||
record_id: str
|
||||
node_id: str
|
||||
provider_id: str
|
||||
trigger_name: str
|
||||
subscription_id: str
|
||||
|
||||
# Walk nodes to find plugin triggers
|
||||
nodes_in_graph = []
|
||||
for node_id, node_config in workflow.walk_nodes(NodeType.TRIGGER_PLUGIN):
|
||||
# Extract plugin trigger configuration from node
|
||||
plugin_id = node_config.get("plugin_id", "")
|
||||
provider_id = node_config.get("provider_id", "")
|
||||
trigger_name = node_config.get("trigger_name", "")
|
||||
subscription_id = node_config.get("subscription_id", "")
|
||||
|
||||
if not subscription_id:
|
||||
continue
|
||||
|
||||
nodes_in_graph.append(
|
||||
{
|
||||
"node_id": node_id,
|
||||
"plugin_id": plugin_id,
|
||||
"provider_id": provider_id,
|
||||
"trigger_name": trigger_name,
|
||||
"subscription_id": subscription_id,
|
||||
}
|
||||
)
|
||||
|
||||
# Check plugin trigger node limit
|
||||
if len(nodes_in_graph) > cls.MAX_PLUGIN_TRIGGER_NODES_PER_WORKFLOW:
|
||||
raise ValueError(
|
||||
f"Workflow exceeds maximum plugin trigger node limit. "
|
||||
f"Found {len(nodes_in_graph)} plugin trigger nodes, "
|
||||
f"maximum allowed is {cls.MAX_PLUGIN_TRIGGER_NODES_PER_WORKFLOW}"
|
||||
)
|
||||
|
||||
not_found_in_cache: list[dict] = []
|
||||
for node_info in nodes_in_graph:
|
||||
node_id = node_info["node_id"]
|
||||
# firstly check if the node exists in cache
|
||||
if not redis_client.get(f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:{node_id}"):
|
||||
not_found_in_cache.append(node_info)
|
||||
continue
|
||||
|
||||
with Session(db.engine) as session:
|
||||
try:
|
||||
# lock the concurrent plugin trigger creation
|
||||
redis_client.lock(f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:apps:{app.id}:lock", timeout=10)
|
||||
# fetch the non-cached nodes from DB
|
||||
all_records = session.scalars(
|
||||
select(WorkflowPluginTrigger).where(
|
||||
WorkflowPluginTrigger.app_id == app.id,
|
||||
WorkflowPluginTrigger.tenant_id == app.tenant_id,
|
||||
)
|
||||
).all()
|
||||
|
||||
nodes_id_in_db = {node.node_id: node for node in all_records}
|
||||
nodes_id_in_graph = {node["node_id"] for node in nodes_in_graph}
|
||||
|
||||
# get the nodes not found both in cache and DB
|
||||
nodes_not_found = [
|
||||
node_info for node_info in not_found_in_cache if node_info["node_id"] not in nodes_id_in_db
|
||||
]
|
||||
|
||||
# create new plugin trigger records
|
||||
for node_info in nodes_not_found:
|
||||
plugin_trigger = WorkflowPluginTrigger(
|
||||
app_id=app.id,
|
||||
tenant_id=app.tenant_id,
|
||||
node_id=node_info["node_id"],
|
||||
provider_id=node_info["provider_id"],
|
||||
trigger_name=node_info["trigger_name"],
|
||||
subscription_id=node_info["subscription_id"],
|
||||
)
|
||||
session.add(plugin_trigger)
|
||||
session.flush() # Get the ID for caching
|
||||
|
||||
cache = Cache(
|
||||
record_id=plugin_trigger.id,
|
||||
node_id=node_info["node_id"],
|
||||
provider_id=node_info["provider_id"],
|
||||
trigger_name=node_info["trigger_name"],
|
||||
subscription_id=node_info["subscription_id"],
|
||||
)
|
||||
redis_client.set(
|
||||
f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:{node_info['node_id']}",
|
||||
cache.model_dump_json(),
|
||||
ex=60 * 60,
|
||||
)
|
||||
session.commit()
|
||||
|
||||
# Update existing records if subscription_id changed
|
||||
for node_info in nodes_in_graph:
|
||||
node_id = node_info["node_id"]
|
||||
if node_id in nodes_id_in_db:
|
||||
existing_record = nodes_id_in_db[node_id]
|
||||
if (
|
||||
existing_record.subscription_id != node_info["subscription_id"]
|
||||
or existing_record.provider_id != node_info["provider_id"]
|
||||
or existing_record.trigger_name != node_info["trigger_name"]
|
||||
):
|
||||
existing_record.subscription_id = node_info["subscription_id"]
|
||||
existing_record.provider_id = node_info["provider_id"]
|
||||
existing_record.trigger_name = node_info["trigger_name"]
|
||||
session.add(existing_record)
|
||||
|
||||
# Update cache
|
||||
cache = Cache(
|
||||
record_id=existing_record.id,
|
||||
node_id=node_id,
|
||||
provider_id=node_info["provider_id"],
|
||||
trigger_name=node_info["trigger_name"],
|
||||
subscription_id=node_info["subscription_id"],
|
||||
)
|
||||
redis_client.set(
|
||||
f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:{node_id}",
|
||||
cache.model_dump_json(),
|
||||
ex=60 * 60,
|
||||
)
|
||||
session.commit()
|
||||
|
||||
# delete the nodes not found in the graph
|
||||
for node_id in nodes_id_in_db:
|
||||
if node_id not in nodes_id_in_graph:
|
||||
session.delete(nodes_id_in_db[node_id])
|
||||
redis_client.delete(f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:{node_id}")
|
||||
session.commit()
|
||||
except Exception:
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.exception("Failed to sync plugin trigger relationships for app %s", app.id)
|
||||
raise
|
||||
finally:
|
||||
redis_client.delete(f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:apps:{app.id}:lock")
|
||||
|
||||
Reference in New Issue
Block a user