mirror of
https://github.com/langgenius/dify.git
synced 2026-04-25 13:16:16 +08:00
feat: sync main branch (#31938)
Signed-off-by: majiayu000 <1835304752@qq.com> Signed-off-by: dependabot[bot] <support@github.com> Signed-off-by: NeatGuyCoding <15627489+NeatGuyCoding@users.noreply.github.com> Signed-off-by: -LAN- <laipz8200@outlook.com> Signed-off-by: yihong0618 <zouzou0208@gmail.com> Co-authored-by: QuantumGhost <obelisk.reg+git@gmail.com> Co-authored-by: 盐粒 Yanli <yanli@dify.ai> Co-authored-by: wangxiaolei <fatelei@gmail.com> Co-authored-by: Stephen Zhou <38493346+hyoban@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Cursx <33718736+Cursx@users.noreply.github.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: lif <1835304752@qq.com> Co-authored-by: 非法操作 <hjlarry@163.com> Co-authored-by: Asuka Minato <i@asukaminato.eu.org> Co-authored-by: fenglin <790872612@qq.com> Co-authored-by: qiaofenglin <qiaofenglin@baidu.com> Co-authored-by: -LAN- <laipz8200@outlook.com> Co-authored-by: TomoOkuyama <49631611+TomoOkuyama@users.noreply.github.com> Co-authored-by: Tomo Okuyama <tomo.okuyama@intersystems.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: zyssyz123 <916125788@qq.com> Co-authored-by: hj24 <mambahj24@gmail.com> Co-authored-by: Coding On Star <447357187@qq.com> Co-authored-by: CodingOnStar <hanxujiang@dify.ai> Co-authored-by: yyh <92089059+lyzno1@users.noreply.github.com> Co-authored-by: Xiangxuan Qu <fghpdf@outlook.com> Co-authored-by: fghpdf <fghpdf@users.noreply.github.com> Co-authored-by: coopercoder <whitetiger0127@163.com> Co-authored-by: zhaiguangpeng <zhaiguangpeng@didiglobal.com> Co-authored-by: Junyan Qin (Chin) <rockchinq@gmail.com> Co-authored-by: E.G <146701565+GlobalStar117@users.noreply.github.com> Co-authored-by: GlobalStar117 <GlobalStar117@users.noreply.github.com> Co-authored-by: Claude Haiku 4.5 <noreply@anthropic.com> Co-authored-by: CodingOnStar <hanxujiang@dify.com> Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com> Co-authored-by: heyszt <270985384@qq.com> Co-authored-by: NeatGuyCoding <15627489+NeatGuyCoding@users.noreply.github.com> Co-authored-by: Yeuoly <45712896+Yeuoly@users.noreply.github.com> Co-authored-by: zxhlyh <jasonapring2015@outlook.com> Co-authored-by: moonpanda <chuanzegao@163.com> Co-authored-by: warlocgao <warlocgao@tencent.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com> Co-authored-by: KVOJJJin <jzongcode@gmail.com> Co-authored-by: eux <euxx@users.noreply.github.com> Co-authored-by: bangjiehan <bangjiehan@gmail.com> Co-authored-by: FFXN <31929997+FFXN@users.noreply.github.com> Co-authored-by: Jyong <76649700+JohnJyong@users.noreply.github.com> Co-authored-by: Nie Ronghua <nieronghua@sf-express.com> Co-authored-by: JQSevenMiao <141806521+JQSevenMiao@users.noreply.github.com> Co-authored-by: jiasiqi <jiasiqi3@tal.com> Co-authored-by: Seokrin Taron Sung <sungsjade@gmail.com> Co-authored-by: CrabSAMA <40541269+CrabSAMA@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: yihong <zouzou0208@gmail.com> Co-authored-by: Joel <iamjoel007@gmail.com> Co-authored-by: Wu Tianwei <30284043+WTW0313@users.noreply.github.com> Co-authored-by: yessenia <yessenia.contact@gmail.com> Co-authored-by: Jax <anobaka@qq.com> Co-authored-by: niveshdandyan <155956228+niveshdandyan@users.noreply.github.com> Co-authored-by: OSS Contributor <oss-contributor@example.com> Co-authored-by: niveshdandyan <niveshdandyan@users.noreply.github.com> Co-authored-by: Sean Kenneth Doherty <Smaster7772@gmail.com>
This commit is contained in:
@ -1,3 +1,4 @@
|
||||
from .config import GraphEngineConfig
|
||||
from .graph_engine import GraphEngine
|
||||
|
||||
__all__ = ["GraphEngine"]
|
||||
__all__ = ["GraphEngine", "GraphEngineConfig"]
|
||||
|
||||
14
api/core/workflow/graph_engine/config.py
Normal file
14
api/core/workflow/graph_engine/config.py
Normal file
@ -0,0 +1,14 @@
|
||||
"""
|
||||
GraphEngine configuration models.
|
||||
"""
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class GraphEngineConfig(BaseModel):
|
||||
"""Configuration for GraphEngine worker pool scaling."""
|
||||
|
||||
min_workers: int = 1
|
||||
max_workers: int = 5
|
||||
scale_up_threshold: int = 3
|
||||
scale_down_idle_time: float = 5.0
|
||||
@ -37,6 +37,7 @@ from .command_processing import (
|
||||
PauseCommandHandler,
|
||||
UpdateVariablesCommandHandler,
|
||||
)
|
||||
from .config import GraphEngineConfig
|
||||
from .entities.commands import AbortCommand, PauseCommand, UpdateVariablesCommand
|
||||
from .error_handler import ErrorHandler
|
||||
from .event_management import EventHandler, EventManager
|
||||
@ -45,7 +46,6 @@ from .graph_traversal import EdgeProcessor, SkipPropagator
|
||||
from .layers.base import GraphEngineLayer
|
||||
from .orchestration import Dispatcher, ExecutionCoordinator
|
||||
from .protocols.command_channel import CommandChannel
|
||||
from .ready_queue import ReadyQueue
|
||||
from .worker_management import WorkerPool
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@ -70,10 +70,7 @@ class GraphEngine:
|
||||
graph: Graph,
|
||||
graph_runtime_state: GraphRuntimeState,
|
||||
command_channel: CommandChannel,
|
||||
min_workers: int | None = None,
|
||||
max_workers: int | None = None,
|
||||
scale_up_threshold: int | None = None,
|
||||
scale_down_idle_time: float | None = None,
|
||||
config: GraphEngineConfig,
|
||||
) -> None:
|
||||
"""Initialize the graph engine with all subsystems and dependencies."""
|
||||
# stop event
|
||||
@ -85,20 +82,14 @@ class GraphEngine:
|
||||
self._graph_runtime_state.stop_event = self._stop_event
|
||||
self._graph_runtime_state.configure(graph=cast("GraphProtocol", graph))
|
||||
self._command_channel = command_channel
|
||||
self._config = config
|
||||
|
||||
# Graph execution tracks the overall execution state
|
||||
self._graph_execution = cast("GraphExecution", self._graph_runtime_state.graph_execution)
|
||||
self._graph_execution.workflow_id = workflow_id
|
||||
|
||||
# === Worker Management Parameters ===
|
||||
# Parameters for dynamic worker pool scaling
|
||||
self._min_workers = min_workers
|
||||
self._max_workers = max_workers
|
||||
self._scale_up_threshold = scale_up_threshold
|
||||
self._scale_down_idle_time = scale_down_idle_time
|
||||
|
||||
# === Execution Queues ===
|
||||
self._ready_queue = cast(ReadyQueue, self._graph_runtime_state.ready_queue)
|
||||
self._ready_queue = self._graph_runtime_state.ready_queue
|
||||
|
||||
# Queue for events generated during execution
|
||||
self._event_queue: queue.Queue[GraphNodeEventBase] = queue.Queue()
|
||||
@ -167,10 +158,7 @@ class GraphEngine:
|
||||
graph=self._graph,
|
||||
layers=self._layers,
|
||||
execution_context=execution_context,
|
||||
min_workers=self._min_workers,
|
||||
max_workers=self._max_workers,
|
||||
scale_up_threshold=self._scale_up_threshold,
|
||||
scale_down_idle_time=self._scale_down_idle_time,
|
||||
config=self._config,
|
||||
stop_event=self._stop_event,
|
||||
)
|
||||
|
||||
|
||||
@ -8,11 +8,9 @@ with middleware-like components that can observe events and interact with execut
|
||||
from .base import GraphEngineLayer
|
||||
from .debug_logging import DebugLoggingLayer
|
||||
from .execution_limits import ExecutionLimitsLayer
|
||||
from .observability import ObservabilityLayer
|
||||
|
||||
__all__ = [
|
||||
"DebugLoggingLayer",
|
||||
"ExecutionLimitsLayer",
|
||||
"GraphEngineLayer",
|
||||
"ObservabilityLayer",
|
||||
]
|
||||
|
||||
@ -8,7 +8,7 @@ intercept and respond to GraphEngine events.
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from core.workflow.graph_engine.protocols.command_channel import CommandChannel
|
||||
from core.workflow.graph_events import GraphEngineEvent
|
||||
from core.workflow.graph_events import GraphEngineEvent, GraphNodeEventBase
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from core.workflow.runtime import ReadOnlyGraphRuntimeState
|
||||
|
||||
@ -98,7 +98,7 @@ class GraphEngineLayer(ABC):
|
||||
"""
|
||||
pass
|
||||
|
||||
def on_node_run_start(self, node: Node) -> None: # noqa: B027
|
||||
def on_node_run_start(self, node: Node) -> None:
|
||||
"""
|
||||
Called immediately before a node begins execution.
|
||||
|
||||
@ -109,9 +109,11 @@ class GraphEngineLayer(ABC):
|
||||
Args:
|
||||
node: The node instance about to be executed
|
||||
"""
|
||||
pass
|
||||
return
|
||||
|
||||
def on_node_run_end(self, node: Node, error: Exception | None) -> None: # noqa: B027
|
||||
def on_node_run_end(
|
||||
self, node: Node, error: Exception | None, result_event: GraphNodeEventBase | None = None
|
||||
) -> None:
|
||||
"""
|
||||
Called after a node finishes execution.
|
||||
|
||||
@ -121,5 +123,6 @@ class GraphEngineLayer(ABC):
|
||||
Args:
|
||||
node: The node instance that just finished execution
|
||||
error: Exception instance if the node failed, otherwise None
|
||||
result_event: The final result event from node execution (succeeded/failed/paused), if any
|
||||
"""
|
||||
pass
|
||||
return
|
||||
|
||||
@ -1,61 +0,0 @@
|
||||
"""
|
||||
Node-level OpenTelemetry parser interfaces and defaults.
|
||||
"""
|
||||
|
||||
import json
|
||||
from typing import Protocol
|
||||
|
||||
from opentelemetry.trace import Span
|
||||
from opentelemetry.trace.status import Status, StatusCode
|
||||
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from core.workflow.nodes.tool.entities import ToolNodeData
|
||||
|
||||
|
||||
class NodeOTelParser(Protocol):
|
||||
"""Parser interface for node-specific OpenTelemetry enrichment."""
|
||||
|
||||
def parse(self, *, node: Node, span: "Span", error: Exception | None) -> None: ...
|
||||
|
||||
|
||||
class DefaultNodeOTelParser:
|
||||
"""Fallback parser used when no node-specific parser is registered."""
|
||||
|
||||
def parse(self, *, node: Node, span: "Span", error: Exception | None) -> None:
|
||||
span.set_attribute("node.id", node.id)
|
||||
if node.execution_id:
|
||||
span.set_attribute("node.execution_id", node.execution_id)
|
||||
if hasattr(node, "node_type") and node.node_type:
|
||||
span.set_attribute("node.type", node.node_type.value)
|
||||
|
||||
if error:
|
||||
span.record_exception(error)
|
||||
span.set_status(Status(StatusCode.ERROR, str(error)))
|
||||
else:
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
|
||||
|
||||
class ToolNodeOTelParser:
|
||||
"""Parser for tool nodes that captures tool-specific metadata."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._delegate = DefaultNodeOTelParser()
|
||||
|
||||
def parse(self, *, node: Node, span: "Span", error: Exception | None) -> None:
|
||||
self._delegate.parse(node=node, span=span, error=error)
|
||||
|
||||
tool_data = getattr(node, "_node_data", None)
|
||||
if not isinstance(tool_data, ToolNodeData):
|
||||
return
|
||||
|
||||
span.set_attribute("tool.provider.id", tool_data.provider_id)
|
||||
span.set_attribute("tool.provider.type", tool_data.provider_type.value)
|
||||
span.set_attribute("tool.provider.name", tool_data.provider_name)
|
||||
span.set_attribute("tool.name", tool_data.tool_name)
|
||||
span.set_attribute("tool.label", tool_data.tool_label)
|
||||
if tool_data.plugin_unique_identifier:
|
||||
span.set_attribute("tool.plugin.id", tool_data.plugin_unique_identifier)
|
||||
if tool_data.credential_id:
|
||||
span.set_attribute("tool.credential.id", tool_data.credential_id)
|
||||
if tool_data.tool_configurations:
|
||||
span.set_attribute("tool.config", json.dumps(tool_data.tool_configurations, ensure_ascii=False))
|
||||
@ -1,169 +0,0 @@
|
||||
"""
|
||||
Observability layer for GraphEngine.
|
||||
|
||||
This layer creates OpenTelemetry spans for node execution, enabling distributed
|
||||
tracing of workflow execution. It establishes OTel context during node execution
|
||||
so that automatic instrumentation (HTTP requests, DB queries, etc.) automatically
|
||||
associates with the node span.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from typing import cast, final
|
||||
|
||||
from opentelemetry import context as context_api
|
||||
from opentelemetry.trace import Span, SpanKind, Tracer, get_tracer, set_span_in_context
|
||||
from typing_extensions import override
|
||||
|
||||
from configs import dify_config
|
||||
from core.workflow.enums import NodeType
|
||||
from core.workflow.graph_engine.layers.base import GraphEngineLayer
|
||||
from core.workflow.graph_engine.layers.node_parsers import (
|
||||
DefaultNodeOTelParser,
|
||||
NodeOTelParser,
|
||||
ToolNodeOTelParser,
|
||||
)
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from extensions.otel.runtime import is_instrument_flag_enabled
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class _NodeSpanContext:
|
||||
span: "Span"
|
||||
token: object
|
||||
|
||||
|
||||
@final
|
||||
class ObservabilityLayer(GraphEngineLayer):
|
||||
"""
|
||||
Layer that creates OpenTelemetry spans for node execution.
|
||||
|
||||
This layer:
|
||||
- Creates a span when a node starts execution
|
||||
- Establishes OTel context so automatic instrumentation associates with the span
|
||||
- Sets complete attributes and status when node execution ends
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self._node_contexts: dict[str, _NodeSpanContext] = {}
|
||||
self._parsers: dict[NodeType, NodeOTelParser] = {}
|
||||
self._default_parser: NodeOTelParser = cast(NodeOTelParser, DefaultNodeOTelParser())
|
||||
self._is_disabled: bool = False
|
||||
self._tracer: Tracer | None = None
|
||||
self._build_parser_registry()
|
||||
self._init_tracer()
|
||||
|
||||
def _init_tracer(self) -> None:
|
||||
"""Initialize OpenTelemetry tracer in constructor."""
|
||||
if not (dify_config.ENABLE_OTEL or is_instrument_flag_enabled()):
|
||||
self._is_disabled = True
|
||||
return
|
||||
|
||||
try:
|
||||
self._tracer = get_tracer(__name__)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to get OpenTelemetry tracer: %s", e)
|
||||
self._is_disabled = True
|
||||
|
||||
def _build_parser_registry(self) -> None:
|
||||
"""Initialize parser registry for node types."""
|
||||
self._parsers = {
|
||||
NodeType.TOOL: ToolNodeOTelParser(),
|
||||
}
|
||||
|
||||
def _get_parser(self, node: Node) -> NodeOTelParser:
|
||||
node_type = getattr(node, "node_type", None)
|
||||
if isinstance(node_type, NodeType):
|
||||
return self._parsers.get(node_type, self._default_parser)
|
||||
return self._default_parser
|
||||
|
||||
@override
|
||||
def on_graph_start(self) -> None:
|
||||
"""Called when graph execution starts."""
|
||||
self._node_contexts.clear()
|
||||
|
||||
@override
|
||||
def on_node_run_start(self, node: Node) -> None:
|
||||
"""
|
||||
Called when a node starts execution.
|
||||
|
||||
Creates a span and establishes OTel context for automatic instrumentation.
|
||||
"""
|
||||
if self._is_disabled:
|
||||
return
|
||||
|
||||
try:
|
||||
if not self._tracer:
|
||||
return
|
||||
|
||||
execution_id = node.execution_id
|
||||
if not execution_id:
|
||||
return
|
||||
|
||||
parent_context = context_api.get_current()
|
||||
span = self._tracer.start_span(
|
||||
f"{node.title}",
|
||||
kind=SpanKind.INTERNAL,
|
||||
context=parent_context,
|
||||
)
|
||||
|
||||
new_context = set_span_in_context(span)
|
||||
token = context_api.attach(new_context)
|
||||
|
||||
self._node_contexts[execution_id] = _NodeSpanContext(span=span, token=token)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("Failed to create OpenTelemetry span for node %s: %s", node.id, e)
|
||||
|
||||
@override
|
||||
def on_node_run_end(self, node: Node, error: Exception | None) -> None:
|
||||
"""
|
||||
Called when a node finishes execution.
|
||||
|
||||
Sets complete attributes, records exceptions, and ends the span.
|
||||
"""
|
||||
if self._is_disabled:
|
||||
return
|
||||
|
||||
try:
|
||||
execution_id = node.execution_id
|
||||
if not execution_id:
|
||||
return
|
||||
node_context = self._node_contexts.get(execution_id)
|
||||
if not node_context:
|
||||
return
|
||||
|
||||
span = node_context.span
|
||||
parser = self._get_parser(node)
|
||||
try:
|
||||
parser.parse(node=node, span=span, error=error)
|
||||
span.end()
|
||||
finally:
|
||||
token = node_context.token
|
||||
if token is not None:
|
||||
try:
|
||||
context_api.detach(token)
|
||||
except Exception:
|
||||
logger.warning("Failed to detach OpenTelemetry token: %s", token)
|
||||
self._node_contexts.pop(execution_id, None)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("Failed to end OpenTelemetry span for node %s: %s", node.id, e)
|
||||
|
||||
@override
|
||||
def on_event(self, event) -> None:
|
||||
"""Not used in this layer."""
|
||||
pass
|
||||
|
||||
@override
|
||||
def on_graph_end(self, error: Exception | None) -> None:
|
||||
"""Called when graph execution ends."""
|
||||
if self._node_contexts:
|
||||
logger.warning(
|
||||
"ObservabilityLayer: %d node spans were not properly ended",
|
||||
len(self._node_contexts),
|
||||
)
|
||||
self._node_contexts.clear()
|
||||
@ -1,405 +0,0 @@
|
||||
"""Workflow persistence layer for GraphEngine.
|
||||
|
||||
This layer mirrors the former ``WorkflowCycleManager`` responsibilities by
|
||||
listening to ``GraphEngineEvent`` instances directly and persisting workflow
|
||||
and node execution state via the injected repositories.
|
||||
|
||||
The design keeps domain persistence concerns inside the engine thread, while
|
||||
allowing presentation layers to remain read-only observers of repository
|
||||
state.
|
||||
"""
|
||||
|
||||
from collections.abc import Mapping
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Any, Union
|
||||
|
||||
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity
|
||||
from core.ops.entities.trace_entity import TraceTaskName
|
||||
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
|
||||
from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID
|
||||
from core.workflow.entities import WorkflowExecution, WorkflowNodeExecution
|
||||
from core.workflow.enums import (
|
||||
SystemVariableKey,
|
||||
WorkflowExecutionStatus,
|
||||
WorkflowNodeExecutionMetadataKey,
|
||||
WorkflowNodeExecutionStatus,
|
||||
WorkflowType,
|
||||
)
|
||||
from core.workflow.graph_engine.layers.base import GraphEngineLayer
|
||||
from core.workflow.graph_events import (
|
||||
GraphEngineEvent,
|
||||
GraphRunAbortedEvent,
|
||||
GraphRunFailedEvent,
|
||||
GraphRunPartialSucceededEvent,
|
||||
GraphRunPausedEvent,
|
||||
GraphRunStartedEvent,
|
||||
GraphRunSucceededEvent,
|
||||
NodeRunExceptionEvent,
|
||||
NodeRunFailedEvent,
|
||||
NodeRunPauseRequestedEvent,
|
||||
NodeRunRetryEvent,
|
||||
NodeRunStartedEvent,
|
||||
NodeRunSucceededEvent,
|
||||
)
|
||||
from core.workflow.node_events import NodeRunResult
|
||||
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
|
||||
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
|
||||
from core.workflow.workflow_entry import WorkflowEntry
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class PersistenceWorkflowInfo:
|
||||
"""Static workflow metadata required for persistence."""
|
||||
|
||||
workflow_id: str
|
||||
workflow_type: WorkflowType
|
||||
version: str
|
||||
graph_data: Mapping[str, Any]
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class _NodeRuntimeSnapshot:
|
||||
"""Lightweight cache to keep node metadata across event phases."""
|
||||
|
||||
node_id: str
|
||||
title: str
|
||||
predecessor_node_id: str | None
|
||||
iteration_id: str | None
|
||||
loop_id: str | None
|
||||
created_at: datetime
|
||||
|
||||
|
||||
class WorkflowPersistenceLayer(GraphEngineLayer):
|
||||
"""GraphEngine layer that persists workflow and node execution state."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
application_generate_entity: Union[AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity],
|
||||
workflow_info: PersistenceWorkflowInfo,
|
||||
workflow_execution_repository: WorkflowExecutionRepository,
|
||||
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
|
||||
trace_manager: TraceQueueManager | None = None,
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self._application_generate_entity = application_generate_entity
|
||||
self._workflow_info = workflow_info
|
||||
self._workflow_execution_repository = workflow_execution_repository
|
||||
self._workflow_node_execution_repository = workflow_node_execution_repository
|
||||
self._trace_manager = trace_manager
|
||||
|
||||
self._workflow_execution: WorkflowExecution | None = None
|
||||
self._node_execution_cache: dict[str, WorkflowNodeExecution] = {}
|
||||
self._node_snapshots: dict[str, _NodeRuntimeSnapshot] = {}
|
||||
self._node_sequence: int = 0
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# GraphEngineLayer lifecycle
|
||||
# ------------------------------------------------------------------
|
||||
def on_graph_start(self) -> None:
|
||||
self._workflow_execution = None
|
||||
self._node_execution_cache.clear()
|
||||
self._node_snapshots.clear()
|
||||
self._node_sequence = 0
|
||||
|
||||
def on_event(self, event: GraphEngineEvent) -> None:
|
||||
if isinstance(event, GraphRunStartedEvent):
|
||||
self._handle_graph_run_started()
|
||||
return
|
||||
|
||||
if isinstance(event, GraphRunSucceededEvent):
|
||||
self._handle_graph_run_succeeded(event)
|
||||
return
|
||||
|
||||
if isinstance(event, GraphRunPartialSucceededEvent):
|
||||
self._handle_graph_run_partial_succeeded(event)
|
||||
return
|
||||
|
||||
if isinstance(event, GraphRunFailedEvent):
|
||||
self._handle_graph_run_failed(event)
|
||||
return
|
||||
|
||||
if isinstance(event, GraphRunAbortedEvent):
|
||||
self._handle_graph_run_aborted(event)
|
||||
return
|
||||
|
||||
if isinstance(event, GraphRunPausedEvent):
|
||||
self._handle_graph_run_paused(event)
|
||||
return
|
||||
|
||||
if isinstance(event, NodeRunStartedEvent):
|
||||
self._handle_node_started(event)
|
||||
return
|
||||
|
||||
if isinstance(event, NodeRunRetryEvent):
|
||||
self._handle_node_retry(event)
|
||||
return
|
||||
|
||||
if isinstance(event, NodeRunSucceededEvent):
|
||||
self._handle_node_succeeded(event)
|
||||
return
|
||||
|
||||
if isinstance(event, NodeRunFailedEvent):
|
||||
self._handle_node_failed(event)
|
||||
return
|
||||
|
||||
if isinstance(event, NodeRunExceptionEvent):
|
||||
self._handle_node_exception(event)
|
||||
return
|
||||
|
||||
if isinstance(event, NodeRunPauseRequestedEvent):
|
||||
self._handle_node_pause_requested(event)
|
||||
|
||||
def on_graph_end(self, error: Exception | None) -> None:
|
||||
return
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Graph-level handlers
|
||||
# ------------------------------------------------------------------
|
||||
def _handle_graph_run_started(self) -> None:
|
||||
execution_id = self._get_execution_id()
|
||||
workflow_execution = WorkflowExecution.new(
|
||||
id_=execution_id,
|
||||
workflow_id=self._workflow_info.workflow_id,
|
||||
workflow_type=self._workflow_info.workflow_type,
|
||||
workflow_version=self._workflow_info.version,
|
||||
graph=self._workflow_info.graph_data,
|
||||
inputs=self._prepare_workflow_inputs(),
|
||||
started_at=naive_utc_now(),
|
||||
)
|
||||
|
||||
self._workflow_execution_repository.save(workflow_execution)
|
||||
self._workflow_execution = workflow_execution
|
||||
|
||||
def _handle_graph_run_succeeded(self, event: GraphRunSucceededEvent) -> None:
|
||||
execution = self._get_workflow_execution()
|
||||
execution.outputs = event.outputs
|
||||
execution.status = WorkflowExecutionStatus.SUCCEEDED
|
||||
self._populate_completion_statistics(execution)
|
||||
|
||||
self._workflow_execution_repository.save(execution)
|
||||
self._enqueue_trace_task(execution)
|
||||
|
||||
def _handle_graph_run_partial_succeeded(self, event: GraphRunPartialSucceededEvent) -> None:
|
||||
execution = self._get_workflow_execution()
|
||||
execution.outputs = event.outputs
|
||||
execution.status = WorkflowExecutionStatus.PARTIAL_SUCCEEDED
|
||||
execution.exceptions_count = event.exceptions_count
|
||||
self._populate_completion_statistics(execution)
|
||||
|
||||
self._workflow_execution_repository.save(execution)
|
||||
self._enqueue_trace_task(execution)
|
||||
|
||||
def _handle_graph_run_failed(self, event: GraphRunFailedEvent) -> None:
|
||||
execution = self._get_workflow_execution()
|
||||
execution.status = WorkflowExecutionStatus.FAILED
|
||||
execution.error_message = event.error
|
||||
execution.exceptions_count = event.exceptions_count
|
||||
self._populate_completion_statistics(execution)
|
||||
|
||||
self._fail_running_node_executions(error_message=event.error)
|
||||
self._workflow_execution_repository.save(execution)
|
||||
self._enqueue_trace_task(execution)
|
||||
|
||||
def _handle_graph_run_aborted(self, event: GraphRunAbortedEvent) -> None:
|
||||
execution = self._get_workflow_execution()
|
||||
execution.status = WorkflowExecutionStatus.STOPPED
|
||||
execution.error_message = event.reason or "Workflow execution aborted"
|
||||
self._populate_completion_statistics(execution)
|
||||
|
||||
self._fail_running_node_executions(error_message=execution.error_message or "")
|
||||
self._workflow_execution_repository.save(execution)
|
||||
self._enqueue_trace_task(execution)
|
||||
|
||||
def _handle_graph_run_paused(self, event: GraphRunPausedEvent) -> None:
|
||||
execution = self._get_workflow_execution()
|
||||
execution.status = WorkflowExecutionStatus.PAUSED
|
||||
execution.outputs = event.outputs
|
||||
self._populate_completion_statistics(execution, update_finished=False)
|
||||
|
||||
self._workflow_execution_repository.save(execution)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Node-level handlers
|
||||
# ------------------------------------------------------------------
|
||||
def _handle_node_started(self, event: NodeRunStartedEvent) -> None:
|
||||
execution = self._get_workflow_execution()
|
||||
|
||||
metadata = {
|
||||
WorkflowNodeExecutionMetadataKey.ITERATION_ID: event.in_iteration_id,
|
||||
WorkflowNodeExecutionMetadataKey.LOOP_ID: event.in_loop_id,
|
||||
}
|
||||
|
||||
domain_execution = WorkflowNodeExecution(
|
||||
id=event.id,
|
||||
node_execution_id=event.id,
|
||||
workflow_id=execution.workflow_id,
|
||||
workflow_execution_id=execution.id_,
|
||||
predecessor_node_id=event.predecessor_node_id,
|
||||
index=self._next_node_sequence(),
|
||||
node_id=event.node_id,
|
||||
node_type=event.node_type,
|
||||
title=event.node_title,
|
||||
status=WorkflowNodeExecutionStatus.RUNNING,
|
||||
metadata=metadata,
|
||||
created_at=event.start_at,
|
||||
)
|
||||
|
||||
self._node_execution_cache[event.id] = domain_execution
|
||||
self._workflow_node_execution_repository.save(domain_execution)
|
||||
|
||||
snapshot = _NodeRuntimeSnapshot(
|
||||
node_id=event.node_id,
|
||||
title=event.node_title,
|
||||
predecessor_node_id=event.predecessor_node_id,
|
||||
iteration_id=event.in_iteration_id,
|
||||
loop_id=event.in_loop_id,
|
||||
created_at=event.start_at,
|
||||
)
|
||||
self._node_snapshots[event.id] = snapshot
|
||||
|
||||
def _handle_node_retry(self, event: NodeRunRetryEvent) -> None:
|
||||
domain_execution = self._get_node_execution(event.id)
|
||||
domain_execution.status = WorkflowNodeExecutionStatus.RETRY
|
||||
domain_execution.error = event.error
|
||||
self._workflow_node_execution_repository.save(domain_execution)
|
||||
self._workflow_node_execution_repository.save_execution_data(domain_execution)
|
||||
|
||||
def _handle_node_succeeded(self, event: NodeRunSucceededEvent) -> None:
|
||||
domain_execution = self._get_node_execution(event.id)
|
||||
self._update_node_execution(domain_execution, event.node_run_result, WorkflowNodeExecutionStatus.SUCCEEDED)
|
||||
|
||||
def _handle_node_failed(self, event: NodeRunFailedEvent) -> None:
|
||||
domain_execution = self._get_node_execution(event.id)
|
||||
self._update_node_execution(
|
||||
domain_execution,
|
||||
event.node_run_result,
|
||||
WorkflowNodeExecutionStatus.FAILED,
|
||||
error=event.error,
|
||||
)
|
||||
|
||||
def _handle_node_exception(self, event: NodeRunExceptionEvent) -> None:
|
||||
domain_execution = self._get_node_execution(event.id)
|
||||
self._update_node_execution(
|
||||
domain_execution,
|
||||
event.node_run_result,
|
||||
WorkflowNodeExecutionStatus.EXCEPTION,
|
||||
error=event.error,
|
||||
)
|
||||
|
||||
def _handle_node_pause_requested(self, event: NodeRunPauseRequestedEvent) -> None:
|
||||
domain_execution = self._get_node_execution(event.id)
|
||||
self._update_node_execution(
|
||||
domain_execution,
|
||||
event.node_run_result,
|
||||
WorkflowNodeExecutionStatus.PAUSED,
|
||||
error="",
|
||||
update_outputs=False,
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ------------------------------------------------------------------
|
||||
def _get_execution_id(self) -> str:
|
||||
workflow_execution_id = self._system_variables().get(SystemVariableKey.WORKFLOW_EXECUTION_ID)
|
||||
if not workflow_execution_id:
|
||||
raise ValueError("workflow_execution_id must be provided in system variables for pause/resume flows")
|
||||
return str(workflow_execution_id)
|
||||
|
||||
def _prepare_workflow_inputs(self) -> Mapping[str, Any]:
|
||||
inputs = {**self._application_generate_entity.inputs}
|
||||
for field_name, value in self._system_variables().items():
|
||||
if field_name == SystemVariableKey.CONVERSATION_ID.value:
|
||||
# Conversation IDs are tied to the current session; omit them so persisted
|
||||
# workflow inputs stay reusable without binding future runs to this conversation.
|
||||
continue
|
||||
inputs[f"sys.{field_name}"] = value
|
||||
handled = WorkflowEntry.handle_special_values(inputs)
|
||||
return handled or {}
|
||||
|
||||
def _get_workflow_execution(self) -> WorkflowExecution:
|
||||
if self._workflow_execution is None:
|
||||
raise ValueError("workflow execution not initialized")
|
||||
return self._workflow_execution
|
||||
|
||||
def _get_node_execution(self, node_execution_id: str) -> WorkflowNodeExecution:
|
||||
if node_execution_id not in self._node_execution_cache:
|
||||
raise ValueError(f"Node execution not found for id={node_execution_id}")
|
||||
return self._node_execution_cache[node_execution_id]
|
||||
|
||||
def _next_node_sequence(self) -> int:
|
||||
self._node_sequence += 1
|
||||
return self._node_sequence
|
||||
|
||||
def _populate_completion_statistics(self, execution: WorkflowExecution, *, update_finished: bool = True) -> None:
|
||||
if update_finished:
|
||||
execution.finished_at = naive_utc_now()
|
||||
runtime_state = self.graph_runtime_state
|
||||
execution.total_tokens = runtime_state.total_tokens
|
||||
execution.total_steps = runtime_state.node_run_steps
|
||||
execution.outputs = execution.outputs or runtime_state.outputs
|
||||
execution.exceptions_count = runtime_state.exceptions_count
|
||||
|
||||
def _update_node_execution(
|
||||
self,
|
||||
domain_execution: WorkflowNodeExecution,
|
||||
node_result: NodeRunResult,
|
||||
status: WorkflowNodeExecutionStatus,
|
||||
*,
|
||||
error: str | None = None,
|
||||
update_outputs: bool = True,
|
||||
) -> None:
|
||||
finished_at = naive_utc_now()
|
||||
snapshot = self._node_snapshots.get(domain_execution.id)
|
||||
start_at = snapshot.created_at if snapshot else domain_execution.created_at
|
||||
domain_execution.status = status
|
||||
domain_execution.finished_at = finished_at
|
||||
domain_execution.elapsed_time = max((finished_at - start_at).total_seconds(), 0.0)
|
||||
|
||||
if error:
|
||||
domain_execution.error = error
|
||||
|
||||
if update_outputs:
|
||||
domain_execution.update_from_mapping(
|
||||
inputs=node_result.inputs,
|
||||
process_data=node_result.process_data,
|
||||
outputs=node_result.outputs,
|
||||
metadata=node_result.metadata,
|
||||
)
|
||||
|
||||
self._workflow_node_execution_repository.save(domain_execution)
|
||||
self._workflow_node_execution_repository.save_execution_data(domain_execution)
|
||||
|
||||
def _fail_running_node_executions(self, *, error_message: str) -> None:
|
||||
now = naive_utc_now()
|
||||
for execution in self._node_execution_cache.values():
|
||||
if execution.status == WorkflowNodeExecutionStatus.RUNNING:
|
||||
execution.status = WorkflowNodeExecutionStatus.FAILED
|
||||
execution.error = error_message
|
||||
execution.finished_at = now
|
||||
execution.elapsed_time = max((now - execution.created_at).total_seconds(), 0.0)
|
||||
self._workflow_node_execution_repository.save(execution)
|
||||
|
||||
def _enqueue_trace_task(self, execution: WorkflowExecution) -> None:
|
||||
if not self._trace_manager:
|
||||
return
|
||||
|
||||
conversation_id = self._system_variables().get(SystemVariableKey.CONVERSATION_ID.value)
|
||||
external_trace_id = None
|
||||
if isinstance(self._application_generate_entity, (WorkflowAppGenerateEntity, AdvancedChatAppGenerateEntity)):
|
||||
external_trace_id = self._application_generate_entity.extras.get("external_trace_id")
|
||||
|
||||
trace_task = TraceTask(
|
||||
TraceTaskName.WORKFLOW_TRACE,
|
||||
workflow_execution=execution,
|
||||
conversation_id=conversation_id,
|
||||
user_id=self._trace_manager.user_id,
|
||||
external_trace_id=external_trace_id,
|
||||
)
|
||||
self._trace_manager.add_trace_task(trace_task)
|
||||
|
||||
def _system_variables(self) -> Mapping[str, Any]:
|
||||
runtime_state = self.graph_runtime_state
|
||||
return runtime_state.variable_pool.get_by_prefix(SYSTEM_VARIABLE_NODE_ID)
|
||||
@ -15,10 +15,10 @@ from uuid import uuid4
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from core.workflow.enums import NodeExecutionType, NodeState
|
||||
from core.workflow.graph import Graph
|
||||
from core.workflow.graph_events import NodeRunStreamChunkEvent, NodeRunSucceededEvent
|
||||
from core.workflow.nodes.base.template import TextSegment, VariableSegment
|
||||
from core.workflow.runtime import VariablePool
|
||||
from core.workflow.runtime.graph_runtime_state import GraphProtocol
|
||||
|
||||
from .path import Path
|
||||
from .session import ResponseSession
|
||||
@ -75,7 +75,7 @@ class ResponseStreamCoordinator:
|
||||
Ensures ordered streaming of responses based on upstream node outputs and constants.
|
||||
"""
|
||||
|
||||
def __init__(self, variable_pool: "VariablePool", graph: "Graph") -> None:
|
||||
def __init__(self, variable_pool: "VariablePool", graph: GraphProtocol) -> None:
|
||||
"""
|
||||
Initialize coordinator with variable pool.
|
||||
|
||||
|
||||
@ -10,10 +10,10 @@ from __future__ import annotations
|
||||
from dataclasses import dataclass
|
||||
|
||||
from core.workflow.nodes.answer.answer_node import AnswerNode
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from core.workflow.nodes.base.template import Template
|
||||
from core.workflow.nodes.end.end_node import EndNode
|
||||
from core.workflow.nodes.knowledge_index import KnowledgeIndexNode
|
||||
from core.workflow.runtime.graph_runtime_state import NodeProtocol
|
||||
|
||||
|
||||
@dataclass
|
||||
@ -29,21 +29,26 @@ class ResponseSession:
|
||||
index: int = 0 # Current position in the template segments
|
||||
|
||||
@classmethod
|
||||
def from_node(cls, node: Node) -> ResponseSession:
|
||||
def from_node(cls, node: NodeProtocol) -> ResponseSession:
|
||||
"""
|
||||
Create a ResponseSession from an AnswerNode or EndNode.
|
||||
Create a ResponseSession from a response-capable node.
|
||||
|
||||
The parameter is typed as `NodeProtocol` because the graph is exposed behind a protocol at the runtime layer,
|
||||
but at runtime this must be an `AnswerNode`, `EndNode`, or `KnowledgeIndexNode` that provides:
|
||||
- `id: str`
|
||||
- `get_streaming_template() -> Template`
|
||||
|
||||
Args:
|
||||
node: Must be either an AnswerNode or EndNode instance
|
||||
node: Node from the materialized workflow graph.
|
||||
|
||||
Returns:
|
||||
ResponseSession configured with the node's streaming template
|
||||
|
||||
Raises:
|
||||
TypeError: If node is not an AnswerNode or EndNode
|
||||
TypeError: If node is not a supported response node type.
|
||||
"""
|
||||
if not isinstance(node, AnswerNode | EndNode | KnowledgeIndexNode):
|
||||
raise TypeError
|
||||
raise TypeError("ResponseSession.from_node only supports AnswerNode, EndNode, or KnowledgeIndexNode")
|
||||
return cls(
|
||||
node_id=node.id,
|
||||
template=node.get_streaming_template(),
|
||||
|
||||
@ -17,7 +17,7 @@ from typing_extensions import override
|
||||
from core.workflow.context import IExecutionContext
|
||||
from core.workflow.graph import Graph
|
||||
from core.workflow.graph_engine.layers.base import GraphEngineLayer
|
||||
from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent
|
||||
from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent, is_node_result_event
|
||||
from core.workflow.nodes.base.node import Node
|
||||
|
||||
from .ready_queue import ReadyQueue
|
||||
@ -131,6 +131,7 @@ class Worker(threading.Thread):
|
||||
node.ensure_execution_id()
|
||||
|
||||
error: Exception | None = None
|
||||
result_event: GraphNodeEventBase | None = None
|
||||
|
||||
# Execute the node with preserved context if execution context is provided
|
||||
if self._execution_context is not None:
|
||||
@ -140,22 +141,26 @@ class Worker(threading.Thread):
|
||||
node_events = node.run()
|
||||
for event in node_events:
|
||||
self._event_queue.put(event)
|
||||
if is_node_result_event(event):
|
||||
result_event = event
|
||||
except Exception as exc:
|
||||
error = exc
|
||||
raise
|
||||
finally:
|
||||
self._invoke_node_run_end_hooks(node, error)
|
||||
self._invoke_node_run_end_hooks(node, error, result_event)
|
||||
else:
|
||||
self._invoke_node_run_start_hooks(node)
|
||||
try:
|
||||
node_events = node.run()
|
||||
for event in node_events:
|
||||
self._event_queue.put(event)
|
||||
if is_node_result_event(event):
|
||||
result_event = event
|
||||
except Exception as exc:
|
||||
error = exc
|
||||
raise
|
||||
finally:
|
||||
self._invoke_node_run_end_hooks(node, error)
|
||||
self._invoke_node_run_end_hooks(node, error, result_event)
|
||||
|
||||
def _invoke_node_run_start_hooks(self, node: Node) -> None:
|
||||
"""Invoke on_node_run_start hooks for all layers."""
|
||||
@ -166,11 +171,13 @@ class Worker(threading.Thread):
|
||||
# Silently ignore layer errors to prevent disrupting node execution
|
||||
continue
|
||||
|
||||
def _invoke_node_run_end_hooks(self, node: Node, error: Exception | None) -> None:
|
||||
def _invoke_node_run_end_hooks(
|
||||
self, node: Node, error: Exception | None, result_event: GraphNodeEventBase | None = None
|
||||
) -> None:
|
||||
"""Invoke on_node_run_end hooks for all layers."""
|
||||
for layer in self._layers:
|
||||
try:
|
||||
layer.on_node_run_end(node, error)
|
||||
layer.on_node_run_end(node, error, result_event)
|
||||
except Exception:
|
||||
# Silently ignore layer errors to prevent disrupting node execution
|
||||
continue
|
||||
|
||||
@ -10,11 +10,11 @@ import queue
|
||||
import threading
|
||||
from typing import final
|
||||
|
||||
from configs import dify_config
|
||||
from core.workflow.context import IExecutionContext
|
||||
from core.workflow.graph import Graph
|
||||
from core.workflow.graph_events import GraphNodeEventBase
|
||||
|
||||
from ..config import GraphEngineConfig
|
||||
from ..layers.base import GraphEngineLayer
|
||||
from ..ready_queue import ReadyQueue
|
||||
from ..worker import Worker
|
||||
@ -38,11 +38,8 @@ class WorkerPool:
|
||||
graph: Graph,
|
||||
layers: list[GraphEngineLayer],
|
||||
stop_event: threading.Event,
|
||||
config: GraphEngineConfig,
|
||||
execution_context: IExecutionContext | None = None,
|
||||
min_workers: int | None = None,
|
||||
max_workers: int | None = None,
|
||||
scale_up_threshold: int | None = None,
|
||||
scale_down_idle_time: float | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize the simple worker pool.
|
||||
@ -52,23 +49,15 @@ class WorkerPool:
|
||||
event_queue: Queue for worker events
|
||||
graph: The workflow graph
|
||||
layers: Graph engine layers for node execution hooks
|
||||
config: GraphEngine worker pool configuration
|
||||
execution_context: Optional execution context for context preservation
|
||||
min_workers: Minimum number of workers
|
||||
max_workers: Maximum number of workers
|
||||
scale_up_threshold: Queue depth to trigger scale up
|
||||
scale_down_idle_time: Seconds before scaling down idle workers
|
||||
"""
|
||||
self._ready_queue = ready_queue
|
||||
self._event_queue = event_queue
|
||||
self._graph = graph
|
||||
self._execution_context = execution_context
|
||||
self._layers = layers
|
||||
|
||||
# Scaling parameters with defaults
|
||||
self._min_workers = min_workers or dify_config.GRAPH_ENGINE_MIN_WORKERS
|
||||
self._max_workers = max_workers or dify_config.GRAPH_ENGINE_MAX_WORKERS
|
||||
self._scale_up_threshold = scale_up_threshold or dify_config.GRAPH_ENGINE_SCALE_UP_THRESHOLD
|
||||
self._scale_down_idle_time = scale_down_idle_time or dify_config.GRAPH_ENGINE_SCALE_DOWN_IDLE_TIME
|
||||
self._config = config
|
||||
|
||||
# Worker management
|
||||
self._workers: list[Worker] = []
|
||||
@ -96,18 +85,18 @@ class WorkerPool:
|
||||
if initial_count is None:
|
||||
node_count = len(self._graph.nodes)
|
||||
if node_count < 10:
|
||||
initial_count = self._min_workers
|
||||
initial_count = self._config.min_workers
|
||||
elif node_count < 50:
|
||||
initial_count = min(self._min_workers + 1, self._max_workers)
|
||||
initial_count = min(self._config.min_workers + 1, self._config.max_workers)
|
||||
else:
|
||||
initial_count = min(self._min_workers + 2, self._max_workers)
|
||||
initial_count = min(self._config.min_workers + 2, self._config.max_workers)
|
||||
|
||||
logger.debug(
|
||||
"Starting worker pool: %d workers (nodes=%d, min=%d, max=%d)",
|
||||
initial_count,
|
||||
node_count,
|
||||
self._min_workers,
|
||||
self._max_workers,
|
||||
self._config.min_workers,
|
||||
self._config.max_workers,
|
||||
)
|
||||
|
||||
# Create initial workers
|
||||
@ -176,7 +165,7 @@ class WorkerPool:
|
||||
Returns:
|
||||
True if scaled up, False otherwise
|
||||
"""
|
||||
if queue_depth > self._scale_up_threshold and current_count < self._max_workers:
|
||||
if queue_depth > self._config.scale_up_threshold and current_count < self._config.max_workers:
|
||||
old_count = current_count
|
||||
self._create_worker()
|
||||
|
||||
@ -185,7 +174,7 @@ class WorkerPool:
|
||||
old_count,
|
||||
len(self._workers),
|
||||
queue_depth,
|
||||
self._scale_up_threshold,
|
||||
self._config.scale_up_threshold,
|
||||
)
|
||||
return True
|
||||
return False
|
||||
@ -204,7 +193,7 @@ class WorkerPool:
|
||||
True if scaled down, False otherwise
|
||||
"""
|
||||
# Skip if we're at minimum or have no idle workers
|
||||
if current_count <= self._min_workers or idle_count == 0:
|
||||
if current_count <= self._config.min_workers or idle_count == 0:
|
||||
return False
|
||||
|
||||
# Check if we have excess capacity
|
||||
@ -222,10 +211,10 @@ class WorkerPool:
|
||||
|
||||
for worker in self._workers:
|
||||
# Check if worker is idle and has exceeded idle time threshold
|
||||
if worker.is_idle and worker.idle_duration >= self._scale_down_idle_time:
|
||||
if worker.is_idle and worker.idle_duration >= self._config.scale_down_idle_time:
|
||||
# Don't remove if it would leave us unable to handle the queue
|
||||
remaining_workers = current_count - len(workers_to_remove) - 1
|
||||
if remaining_workers >= self._min_workers and remaining_workers >= max(1, queue_depth // 2):
|
||||
if remaining_workers >= self._config.min_workers and remaining_workers >= max(1, queue_depth // 2):
|
||||
workers_to_remove.append((worker, worker.worker_id))
|
||||
# Only remove one worker per check to avoid aggressive scaling
|
||||
break
|
||||
@ -242,7 +231,7 @@ class WorkerPool:
|
||||
old_count,
|
||||
len(self._workers),
|
||||
len(workers_to_remove),
|
||||
self._scale_down_idle_time,
|
||||
self._config.scale_down_idle_time,
|
||||
queue_depth,
|
||||
active_count,
|
||||
idle_count - len(workers_to_remove),
|
||||
@ -286,6 +275,6 @@ class WorkerPool:
|
||||
return {
|
||||
"total_workers": len(self._workers),
|
||||
"queue_depth": self._ready_queue.qsize(),
|
||||
"min_workers": self._min_workers,
|
||||
"max_workers": self._max_workers,
|
||||
"min_workers": self._config.min_workers,
|
||||
"max_workers": self._config.max_workers,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user