mirror of
https://github.com/langgenius/dify.git
synced 2026-05-01 07:58:02 +08:00
feat: webhook trigger backend api (#24387)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
@ -67,6 +67,7 @@ from .app import (
|
||||
workflow_draft_variable,
|
||||
workflow_run,
|
||||
workflow_statistic,
|
||||
workflow_trigger,
|
||||
)
|
||||
|
||||
# Import auth controllers
|
||||
|
||||
151
api/controllers/console/app/workflow_trigger.py
Normal file
151
api/controllers/console/app/workflow_trigger.py
Normal file
@ -0,0 +1,151 @@
|
||||
import logging
|
||||
import secrets
|
||||
|
||||
from flask_restful import Resource, reqparse
|
||||
from sqlalchemy.orm import Session
|
||||
from werkzeug.exceptions import BadRequest, Forbidden, NotFound
|
||||
|
||||
from configs import dify_config
|
||||
from controllers.console import api
|
||||
from controllers.console.app.wraps import get_app_model
|
||||
from controllers.console.wraps import account_initialization_required, setup_required
|
||||
from extensions.ext_database import db
|
||||
from libs.login import current_user, login_required
|
||||
from models.workflow import WorkflowWebhookTrigger
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WebhookTriggerApi(Resource):
|
||||
"""Webhook Trigger API"""
|
||||
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model
|
||||
def post(self, app_model):
|
||||
"""Create webhook trigger"""
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("node_id", type=str, required=True, help="Node ID is required")
|
||||
parser.add_argument(
|
||||
"triggered_by",
|
||||
type=str,
|
||||
required=False,
|
||||
default="production",
|
||||
choices=["debugger", "production"],
|
||||
help="triggered_by must be debugger or production",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if app_model.mode != "workflow":
|
||||
raise BadRequest("Invalid app mode, only workflow can add webhook node")
|
||||
|
||||
# The role of the current user in the ta table must be admin, owner, or editor
|
||||
if not current_user.is_editor:
|
||||
raise Forbidden()
|
||||
|
||||
node_id = args["node_id"]
|
||||
triggered_by = args["triggered_by"]
|
||||
|
||||
with Session(db.engine) as session:
|
||||
# Check if webhook trigger already exists for this app, node, and environment
|
||||
existing_trigger = (
|
||||
session.query(WorkflowWebhookTrigger)
|
||||
.filter(
|
||||
WorkflowWebhookTrigger.app_id == app_model.id,
|
||||
WorkflowWebhookTrigger.node_id == node_id,
|
||||
WorkflowWebhookTrigger.triggered_by == triggered_by,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if existing_trigger:
|
||||
raise BadRequest("Webhook trigger already exists for this node and environment")
|
||||
|
||||
# Generate unique webhook_id
|
||||
webhook_id = self._generate_webhook_id(session)
|
||||
|
||||
# Create new webhook trigger
|
||||
webhook_trigger = WorkflowWebhookTrigger(
|
||||
app_id=app_model.id,
|
||||
node_id=node_id,
|
||||
tenant_id=current_user.current_tenant_id,
|
||||
webhook_id=webhook_id,
|
||||
triggered_by=triggered_by,
|
||||
)
|
||||
|
||||
session.add(webhook_trigger)
|
||||
session.commit()
|
||||
session.refresh(webhook_trigger)
|
||||
|
||||
return {
|
||||
"id": webhook_trigger.id,
|
||||
"webhook_id": webhook_trigger.webhook_id,
|
||||
"webhook_url": f"{dify_config.SERVICE_API_URL}/triggers/webhook/{webhook_trigger.webhook_id}",
|
||||
"node_id": webhook_trigger.node_id,
|
||||
"triggered_by": webhook_trigger.triggered_by,
|
||||
"created_at": webhook_trigger.created_at.isoformat(),
|
||||
}
|
||||
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model
|
||||
def delete(self, app_model):
|
||||
"""Delete webhook trigger"""
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("node_id", type=str, required=True, help="Node ID is required")
|
||||
parser.add_argument(
|
||||
"triggered_by",
|
||||
type=str,
|
||||
required=False,
|
||||
default="production",
|
||||
choices=["debugger", "production"],
|
||||
help="triggered_by must be debugger or production",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
# The role of the current user in the ta table must be admin, owner, or editor
|
||||
if not current_user.is_editor:
|
||||
raise Forbidden()
|
||||
|
||||
node_id = args["node_id"]
|
||||
triggered_by = args["triggered_by"]
|
||||
|
||||
with Session(db.engine) as session:
|
||||
# Find webhook trigger
|
||||
webhook_trigger = (
|
||||
session.query(WorkflowWebhookTrigger)
|
||||
.filter(
|
||||
WorkflowWebhookTrigger.app_id == app_model.id,
|
||||
WorkflowWebhookTrigger.node_id == node_id,
|
||||
WorkflowWebhookTrigger.triggered_by == triggered_by,
|
||||
WorkflowWebhookTrigger.tenant_id == current_user.current_tenant_id,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if not webhook_trigger:
|
||||
raise NotFound("Webhook trigger not found")
|
||||
|
||||
session.delete(webhook_trigger)
|
||||
session.commit()
|
||||
|
||||
return {"result": "success"}, 204
|
||||
|
||||
def _generate_webhook_id(self, session: Session) -> str:
|
||||
"""Generate unique 24-character webhook ID"""
|
||||
while True:
|
||||
# Generate 24-character random string
|
||||
webhook_id = secrets.token_urlsafe(18)[:24] # token_urlsafe gives base64url, take first 24 chars
|
||||
|
||||
# Check if it already exists
|
||||
existing = (
|
||||
session.query(WorkflowWebhookTrigger).filter(WorkflowWebhookTrigger.webhook_id == webhook_id).first()
|
||||
)
|
||||
|
||||
if not existing:
|
||||
return webhook_id
|
||||
|
||||
|
||||
api.add_resource(WebhookTriggerApi, "/apps/<uuid:app_id>/workflows/triggers/webhook")
|
||||
7
api/controllers/trigger/__init__.py
Normal file
7
api/controllers/trigger/__init__.py
Normal file
@ -0,0 +1,7 @@
|
||||
from flask import Blueprint
|
||||
|
||||
# Create trigger blueprint
|
||||
bp = Blueprint("trigger", __name__, url_prefix="/triggers")
|
||||
|
||||
# Import routes after blueprint creation to avoid circular imports
|
||||
from . import webhook
|
||||
43
api/controllers/trigger/webhook.py
Normal file
43
api/controllers/trigger/webhook.py
Normal file
@ -0,0 +1,43 @@
|
||||
import logging
|
||||
|
||||
from flask import jsonify
|
||||
from werkzeug.exceptions import BadRequest, NotFound
|
||||
|
||||
from controllers.trigger import bp
|
||||
from services.webhook_service import WebhookService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@bp.route("/webhook/<string:webhook_id>", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"])
|
||||
def handle_webhook(webhook_id: str):
|
||||
"""
|
||||
Handle webhook trigger calls.
|
||||
|
||||
This endpoint receives webhook calls and processes them according to the
|
||||
configured webhook trigger settings.
|
||||
"""
|
||||
try:
|
||||
# Get webhook trigger, workflow, and node configuration
|
||||
webhook_trigger, workflow, node_config = WebhookService.get_webhook_trigger_and_workflow(webhook_id)
|
||||
|
||||
# Extract request data
|
||||
webhook_data = WebhookService.extract_webhook_data(webhook_trigger)
|
||||
|
||||
# Validate request against node configuration
|
||||
validation_result = WebhookService.validate_webhook_request(webhook_data, node_config)
|
||||
if not validation_result["valid"]:
|
||||
raise BadRequest(validation_result["error"])
|
||||
|
||||
# Process webhook call (send to Celery)
|
||||
WebhookService.trigger_workflow_execution(webhook_trigger, webhook_data, workflow)
|
||||
|
||||
# Return configured response
|
||||
response_data, status_code = WebhookService.generate_webhook_response(node_config)
|
||||
return jsonify(response_data), status_code
|
||||
|
||||
except ValueError as e:
|
||||
raise NotFound(str(e))
|
||||
except Exception as e:
|
||||
logger.exception(f"Webhook processing failed for {webhook_id}: {str(e)}")
|
||||
return jsonify({"error": "Internal server error", "message": str(e)}), 500
|
||||
Reference in New Issue
Block a user