feat(api): Human Input Node (backend part) (#31646)

The backend part of the human in the loop (HITL) feature and relevant architecture / workflow engine changes.

Signed-off-by: yihong0618 <zouzou0208@gmail.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: 盐粒 Yanli <yanli@dify.ai>
Co-authored-by: CrabSAMA <40541269+CrabSAMA@users.noreply.github.com>
Co-authored-by: Stephen Zhou <38493346+hyoban@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: yihong <zouzou0208@gmail.com>
Co-authored-by: Joel <iamjoel007@gmail.com>
This commit is contained in:
QuantumGhost
2026-01-30 10:18:49 +08:00
committed by GitHub
parent fedd097f63
commit 03e3acfc71
207 changed files with 19006 additions and 373 deletions

View File

@ -1,7 +1,9 @@
from __future__ import annotations
import logging
import threading
import uuid
from collections.abc import Generator, Mapping
from collections.abc import Callable, Generator, Mapping
from typing import TYPE_CHECKING, Any, Union
from configs import dify_config
@ -9,22 +11,61 @@ from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator
from core.app.apps.agent_chat.app_generator import AgentChatAppGenerator
from core.app.apps.chat.app_generator import ChatAppGenerator
from core.app.apps.completion.app_generator import CompletionAppGenerator
from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.features.rate_limiting import RateLimit
from core.app.features.rate_limiting.rate_limit import rate_limit_context
from enums.quota_type import QuotaType, unlimited
from extensions.otel import AppGenerateHandler, trace_span
from models.model import Account, App, AppMode, EndUser
from models.workflow import Workflow
from models.workflow import Workflow, WorkflowRun
from services.errors.app import QuotaExceededError, WorkflowIdFormatError, WorkflowNotFoundError
from services.errors.llm import InvokeRateLimitError
from services.workflow_service import WorkflowService
from tasks.app_generate.workflow_execute_task import AppExecutionParams, workflow_based_app_execution_task
logger = logging.getLogger(__name__)
SSE_TASK_START_FALLBACK_MS = 200
if TYPE_CHECKING:
from controllers.console.app.workflow import LoopNodeRunPayload
class AppGenerateService:
@staticmethod
def _build_streaming_task_on_subscribe(start_task: Callable[[], None]) -> Callable[[], None]:
started = False
lock = threading.Lock()
def _try_start() -> bool:
nonlocal started
with lock:
if started:
return True
try:
start_task()
except Exception:
logger.exception("Failed to enqueue streaming task")
return False
started = True
return True
# XXX(QuantumGhost): dirty hacks to avoid a race between publisher and SSE subscriber.
# The Celery task may publish the first event before the API side actually subscribes,
# causing an "at most once" drop with Redis Pub/Sub. We start the task on subscribe,
# but also use a short fallback timer so the task still runs if the client never consumes.
timer = threading.Timer(SSE_TASK_START_FALLBACK_MS / 1000.0, _try_start)
timer.daemon = True
timer.start()
def _on_subscribe() -> None:
if _try_start():
timer.cancel()
return _on_subscribe
@classmethod
@trace_span(AppGenerateHandler)
def generate(
@ -88,15 +129,29 @@ class AppGenerateService:
elif app_model.mode == AppMode.ADVANCED_CHAT:
workflow_id = args.get("workflow_id")
workflow = cls._get_workflow(app_model, invoke_from, workflow_id)
with rate_limit_context(rate_limit, request_id):
payload = AppExecutionParams.new(
app_model=app_model,
workflow=workflow,
user=user,
args=args,
invoke_from=invoke_from,
streaming=streaming,
call_depth=0,
)
payload_json = payload.model_dump_json()
def on_subscribe():
workflow_based_app_execution_task.delay(payload_json)
on_subscribe = cls._build_streaming_task_on_subscribe(on_subscribe)
generator = AdvancedChatAppGenerator()
return rate_limit.generate(
AdvancedChatAppGenerator.convert_to_event_stream(
AdvancedChatAppGenerator().generate(
app_model=app_model,
workflow=workflow,
user=user,
args=args,
invoke_from=invoke_from,
streaming=streaming,
generator.convert_to_event_stream(
generator.retrieve_events(
AppMode.ADVANCED_CHAT,
payload.workflow_run_id,
on_subscribe=on_subscribe,
),
),
request_id=request_id,
@ -104,6 +159,36 @@ class AppGenerateService:
elif app_model.mode == AppMode.WORKFLOW:
workflow_id = args.get("workflow_id")
workflow = cls._get_workflow(app_model, invoke_from, workflow_id)
if streaming:
with rate_limit_context(rate_limit, request_id):
payload = AppExecutionParams.new(
app_model=app_model,
workflow=workflow,
user=user,
args=args,
invoke_from=invoke_from,
streaming=True,
call_depth=0,
root_node_id=root_node_id,
workflow_run_id=str(uuid.uuid4()),
)
payload_json = payload.model_dump_json()
def on_subscribe():
workflow_based_app_execution_task.delay(payload_json)
on_subscribe = cls._build_streaming_task_on_subscribe(on_subscribe)
return rate_limit.generate(
WorkflowAppGenerator.convert_to_event_stream(
MessageBasedAppGenerator.retrieve_events(
AppMode.WORKFLOW,
payload.workflow_run_id,
on_subscribe=on_subscribe,
),
),
request_id,
)
return rate_limit.generate(
WorkflowAppGenerator.convert_to_event_stream(
WorkflowAppGenerator().generate(
@ -112,7 +197,7 @@ class AppGenerateService:
user=user,
args=args,
invoke_from=invoke_from,
streaming=streaming,
streaming=False,
root_node_id=root_node_id,
call_depth=0,
),
@ -248,3 +333,19 @@ class AppGenerateService:
raise ValueError("Workflow not published")
return workflow
@classmethod
def get_response_generator(
cls,
app_model: App,
workflow_run: WorkflowRun,
):
if workflow_run.status.is_ended():
# TODO(QuantumGhost): handled the ended scenario.
pass
generator = AdvancedChatAppGenerator()
return generator.convert_to_event_stream(
generator.retrieve_events(AppMode(app_model.mode), workflow_run.id),
)