fix(api): fix connection closing issue for event subscription

The previous implementation does not close the connection once
`WORKFLOW_PAUSED` event is met, which breaks the contract with
frontend.
This commit is contained in:
QuantumGhost
2026-01-21 15:29:00 +08:00
parent 20be1dd819
commit 004df1c159
3 changed files with 10 additions and 73 deletions

View File

@ -5,7 +5,7 @@ import logging
import queue
import threading
import time
from collections.abc import Generator, Iterable, Mapping, Sequence
from collections.abc import Generator, Mapping, Sequence
from dataclasses import dataclass
from typing import Any
@ -25,7 +25,6 @@ from core.workflow.entities import WorkflowStartReason
from core.workflow.enums import WorkflowExecutionStatus, WorkflowNodeExecutionStatus
from core.workflow.runtime import GraphRuntimeState
from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter
from models.enums import WorkflowRunTriggeredFrom
from models.model import AppMode, Message
from models.workflow import WorkflowNodeExecutionTriggeredFrom, WorkflowRun
from repositories.api_workflow_node_execution_repository import WorkflowNodeExecutionSnapshot
@ -109,7 +108,6 @@ def build_workflow_event_stream(
pause_entity=pause_entity,
resumption_context=resumption_context,
)
snapshot_keys = _collect_snapshot_keys(snapshot_events)
for event in snapshot_events:
last_msg_time = time.time()
@ -137,12 +135,10 @@ def build_workflow_event_stream(
last_ping_time = current_time
continue
if _is_duplicate_event(event, snapshot_keys):
continue
last_msg_time = time.time()
last_ping_time = last_msg_time
yield event
if _is_terminal_event(event):
if _is_terminal_event(event, include_paused=True):
return
finally:
buffer_state.stop_event.set()
@ -421,71 +417,12 @@ def _parse_event_message(message: bytes) -> Mapping[str, Any] | None:
return event
def _is_terminal_event(event: Mapping[str, Any] | str) -> bool:
def _is_terminal_event(event: Mapping[str, Any] | str, include_paused=False) -> bool:
if not isinstance(event, Mapping):
return False
event_type = event.get("event")
return event_type == StreamEvent.WORKFLOW_FINISHED.value
def _collect_snapshot_keys(events: Iterable[Mapping[str, Any]]) -> set[tuple[str, str]]:
keys: set[tuple[str, str]] = set()
for event in events:
key = _event_snapshot_key(event)
if key is not None:
keys.add(key)
return keys
def _filter_buffered_events(
events: Sequence[Mapping[str, Any]],
snapshot_keys: set[tuple[str, str]],
) -> Iterable[Mapping[str, Any]]:
for event in events:
if _is_duplicate_event(event, snapshot_keys):
continue
yield event
def _is_duplicate_event(event: Mapping[str, Any], snapshot_keys: set[tuple[str, str]]) -> bool:
key = _event_snapshot_key(event)
if key is None:
return False
return key in snapshot_keys
def _event_snapshot_key(event: Mapping[str, Any]) -> tuple[str, str] | None:
event_type = event.get("event")
if not event_type:
return None
if event_type == StreamEvent.WORKFLOW_STARTED.value:
return (event_type, event.get("workflow_run_id") or "")
if event_type in {StreamEvent.NODE_STARTED.value, StreamEvent.NODE_FINISHED.value}:
data = event.get("data") or {}
return (event_type, str(data.get("id") or ""))
if event_type == StreamEvent.WORKFLOW_PAUSED.value:
return (event_type, event.get("workflow_run_id") or "")
return None
def _resolve_node_triggered_from(workflow_run_triggered_from: str | None) -> str:
if not workflow_run_triggered_from:
return WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value
mapping = {
WorkflowRunTriggeredFrom.DEBUGGING.value: WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value,
WorkflowRunTriggeredFrom.APP_RUN.value: WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value,
WorkflowRunTriggeredFrom.WEBHOOK.value: WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value,
WorkflowRunTriggeredFrom.SCHEDULE.value: WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value,
WorkflowRunTriggeredFrom.PLUGIN.value: WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value,
WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN.value: WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN.value,
WorkflowRunTriggeredFrom.RAG_PIPELINE_DEBUGGING.value: WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value,
}
if workflow_run_triggered_from in mapping:
return mapping[workflow_run_triggered_from]
logger.warning(
"Unknown workflow run triggered_from %s, defaulting node executions to workflow-run",
workflow_run_triggered_from,
)
return WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value
if event_type == StreamEvent.WORKFLOW_FINISHED.value:
return True
if include_paused:
return event_type == StreamEvent.WORKFLOW_PAUSED.value
return False

View File

@ -10,9 +10,9 @@ from sqlalchemy import select
from sqlalchemy.orm import Session, sessionmaker
from configs import dify_config
from core.app.layers.pause_state_persist_layer import WorkflowResumptionContext
from core.workflow.nodes.human_input.entities import EmailDeliveryConfig, EmailDeliveryMethod
from core.workflow.runtime import GraphRuntimeState, VariablePool
from core.app.layers.pause_state_persist_layer import WorkflowResumptionContext
from extensions.ext_database import db
from extensions.ext_mail import mail
from libs.email_template_renderer import render_email_template

View File

@ -8,13 +8,13 @@ from core.workflow.nodes.human_input.entities import (
EmailRecipients,
ExternalRecipient,
)
from core.workflow.runtime import VariablePool
from services import human_input_delivery_test_service as service_module
from services.human_input_delivery_test_service import (
DeliveryTestContext,
DeliveryTestError,
EmailDeliveryTestHandler,
)
from core.workflow.runtime import VariablePool
def _make_email_method() -> EmailDeliveryMethod: