mirror of
https://github.com/langgenius/dify.git
synced 2026-03-23 07:17:55 +08:00
WIP: P5 api
This commit is contained in:
@ -3,10 +3,14 @@ from typing import Literal, cast
|
||||
from flask import request
|
||||
from flask_restx import Resource, fields, marshal_with
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from controllers.console import console_ns
|
||||
from controllers.console.app.wraps import get_app_model
|
||||
from controllers.console.wraps import account_initialization_required, setup_required
|
||||
from controllers.web.error import NotFoundError
|
||||
from core.workflow.enums import WorkflowExecutionStatus
|
||||
from extensions.ext_database import db
|
||||
from fields.end_user_fields import simple_end_user_fields
|
||||
from fields.member_fields import simple_account_fields
|
||||
from fields.workflow_run_fields import (
|
||||
@ -21,8 +25,10 @@ from fields.workflow_run_fields import (
|
||||
)
|
||||
from libs.custom_inputs import time_duration
|
||||
from libs.helper import uuid_value
|
||||
from libs.login import current_user, login_required
|
||||
from libs.login import current_account_with_tenant, current_user, login_required
|
||||
from models import Account, App, AppMode, EndUser, WorkflowRunTriggeredFrom
|
||||
from models.workflow import WorkflowRun
|
||||
from repositories.factory import DifyAPIRepositoryFactory
|
||||
from services.workflow_run_service import WorkflowRunService
|
||||
|
||||
# Workflow run status choices for filtering
|
||||
@ -375,3 +381,67 @@ class WorkflowRunNodeExecutionListApi(Resource):
|
||||
)
|
||||
|
||||
return {"data": node_executions}
|
||||
|
||||
|
||||
@console_ns.route("/workflow/<string:workflow_run_id>/pause-details")
|
||||
class ConsoleWorkflowPauseDetailsApi(Resource):
|
||||
"""Console API for getting workflow pause details."""
|
||||
|
||||
@account_initialization_required
|
||||
@login_required
|
||||
def get(self, workflow_run_id: str):
|
||||
"""
|
||||
Get workflow pause details.
|
||||
|
||||
GET /console/api/workflow/<workflow_run_id>/pause-details
|
||||
|
||||
Returns information about why and where the workflow is paused.
|
||||
"""
|
||||
|
||||
# Query WorkflowRun to determine if workflow is suspended
|
||||
account, tenant = current_account_with_tenant()
|
||||
session_maker = sessionmaker(bind=db.engine)
|
||||
workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker=session_maker)
|
||||
workflow_run = db.session.get(WorkflowRun, workflow_run_id)
|
||||
if not workflow_run:
|
||||
raise NotFoundError("Workflow run not found")
|
||||
|
||||
# Check if workflow is suspended
|
||||
is_paused = workflow_run.status == WorkflowExecutionStatus.PAUSED
|
||||
if not is_paused:
|
||||
return {"is_suspended": False, "paused_at": None, "paused_nodes": [], "pending_human_inputs": []}, 200
|
||||
|
||||
# Get pending Human Input forms for this workflow run
|
||||
service = HumanInputFormService(db.session())
|
||||
pending_forms = service.get_pending_forms_for_workflow_run(workflow_run_id)
|
||||
|
||||
# Build response
|
||||
response = {
|
||||
"is_suspended": True,
|
||||
"paused_at": workflow_run.created_at.isoformat() + "Z" if workflow_run.created_at else None,
|
||||
"paused_nodes": [],
|
||||
"pending_human_inputs": [],
|
||||
}
|
||||
|
||||
# Add pending human input forms
|
||||
for form in pending_forms:
|
||||
form_definition = json.loads(form.form_definition) if form.form_definition else {}
|
||||
response["pending_human_inputs"].append(
|
||||
{
|
||||
"form_id": form.id,
|
||||
"node_id": form_definition.get("node_id", "unknown"),
|
||||
"node_title": form_definition.get("title", "Human Input"),
|
||||
"created_at": form.created_at.isoformat() + "Z" if form.created_at else None,
|
||||
}
|
||||
)
|
||||
|
||||
# Also add to paused_nodes for backward compatibility
|
||||
response["paused_nodes"].append(
|
||||
{
|
||||
"node_id": form_definition.get("node_id", "unknown"),
|
||||
"node_title": form_definition.get("title", "Human Input"),
|
||||
"pause_type": {"type": "human_input", "form_id": form.id},
|
||||
}
|
||||
)
|
||||
|
||||
return response, 200
|
||||
|
||||
@ -204,68 +204,6 @@ class ConsoleWorkflowEventsApi(Resource):
|
||||
)
|
||||
|
||||
|
||||
@console_ns.route("/workflow/<string:workflow_run_id>/pause-details")
|
||||
class ConsoleWorkflowPauseDetailsApi(Resource):
|
||||
"""Console API for getting workflow pause details."""
|
||||
|
||||
@account_initialization_required
|
||||
@login_required
|
||||
def get(self, workflow_run_id: str):
|
||||
"""
|
||||
Get workflow pause details.
|
||||
|
||||
GET /console/api/workflow/<workflow_run_id>/pause-details
|
||||
|
||||
Returns information about why and where the workflow is paused.
|
||||
"""
|
||||
|
||||
# Query WorkflowRun to determine if workflow is suspended
|
||||
workflow_run = db.session.get(WorkflowRun, workflow_run_id)
|
||||
if not workflow_run:
|
||||
raise NotFoundError("Workflow run not found")
|
||||
|
||||
# Check if workflow is suspended
|
||||
is_suspended = workflow_run.status == "running" and workflow_run.pause_details is not None
|
||||
|
||||
if not is_suspended:
|
||||
return {"is_suspended": False, "paused_at": None, "paused_nodes": [], "pending_human_inputs": []}, 200
|
||||
|
||||
# Get pending Human Input forms for this workflow run
|
||||
service = HumanInputFormService(db.session())
|
||||
pending_forms = service.get_pending_forms_for_workflow_run(workflow_run_id)
|
||||
|
||||
# Build response
|
||||
response = {
|
||||
"is_suspended": True,
|
||||
"paused_at": workflow_run.created_at.isoformat() + "Z" if workflow_run.created_at else None,
|
||||
"paused_nodes": [],
|
||||
"pending_human_inputs": [],
|
||||
}
|
||||
|
||||
# Add pending human input forms
|
||||
for form in pending_forms:
|
||||
form_definition = json.loads(form.form_definition) if form.form_definition else {}
|
||||
response["pending_human_inputs"].append(
|
||||
{
|
||||
"form_id": form.id,
|
||||
"node_id": form_definition.get("node_id", "unknown"),
|
||||
"node_title": form_definition.get("title", "Human Input"),
|
||||
"created_at": form.created_at.isoformat() + "Z" if form.created_at else None,
|
||||
}
|
||||
)
|
||||
|
||||
# Also add to paused_nodes for backward compatibility
|
||||
response["paused_nodes"].append(
|
||||
{
|
||||
"node_id": form_definition.get("node_id", "unknown"),
|
||||
"node_title": form_definition.get("title", "Human Input"),
|
||||
"pause_type": {"type": "human_input", "form_id": form.id},
|
||||
}
|
||||
)
|
||||
|
||||
return response, 200
|
||||
|
||||
|
||||
def _retrieve_app_for_workflow_run(session: Session, workflow_run: WorkflowRun):
|
||||
query = select(App).where(
|
||||
App.id == workflow_run.app_id,
|
||||
|
||||
@ -284,6 +284,13 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol):
|
||||
# while creating pause.
|
||||
...
|
||||
|
||||
def get_workflow_pause(self, workflow_run_id: str) -> WorkflowPauseEntity | None:
|
||||
"""Retrieve the current pause for a workflow execution.
|
||||
|
||||
If there is no current pause, this method would return `None`.
|
||||
"""
|
||||
...
|
||||
|
||||
def resume_workflow_pause(
|
||||
self,
|
||||
workflow_run_id: str,
|
||||
|
||||
Reference in New Issue
Block a user