refactor(graph_engine): inline output_registry into response_coordinator

Signed-off-by: -LAN- <laipz8200@outlook.com>
This commit is contained in:
-LAN-
2025-09-01 03:59:53 +08:00
parent 64c1234724
commit a5cb9d2b73
8 changed files with 110 additions and 726 deletions

View File

@ -35,7 +35,6 @@ from .event_management import EventCollector, EventEmitter, EventHandlerRegistry
from .graph_traversal import BranchHandler, EdgeProcessor, NodeReadinessChecker, SkipPropagator
from .layers.base import Layer
from .orchestration import Dispatcher, ExecutionCoordinator
from .output_registry import OutputRegistry
from .protocols.command_channel import CommandChannel
from .response_coordinator import ResponseStreamCoordinator
from .state_management import UnifiedStateManager
@ -122,8 +121,9 @@ class GraphEngine:
self.state_manager = UnifiedStateManager(self.graph, self.ready_queue)
# Response coordination
self.output_registry = OutputRegistry(self.graph_runtime_state.variable_pool)
self.response_coordinator = ResponseStreamCoordinator(registry=self.output_registry, graph=self.graph)
self.response_coordinator = ResponseStreamCoordinator(
variable_pool=self.graph_runtime_state.variable_pool, graph=self.graph
)
# Event management
self.event_collector = EventCollector()

View File

@ -1,10 +0,0 @@
"""
OutputRegistry - Thread-safe storage for node outputs (streams and scalars)
This component provides thread-safe storage and retrieval of node outputs,
supporting both scalar values and streaming chunks with proper state management.
"""
from .registry import OutputRegistry
__all__ = ["OutputRegistry"]

View File

@ -1,148 +0,0 @@
"""
Main OutputRegistry implementation.
This module contains the public OutputRegistry class that provides
thread-safe storage for node outputs.
"""
from collections.abc import Sequence
from threading import RLock
from typing import TYPE_CHECKING, Any, Union, final
from core.variables import Segment
from core.workflow.entities.variable_pool import VariablePool
from .stream import Stream
if TYPE_CHECKING:
from core.workflow.graph_events import NodeRunStreamChunkEvent
@final
class OutputRegistry:
"""
Thread-safe registry for storing and retrieving node outputs.
Supports both scalar values and streaming chunks with proper state management.
All operations are thread-safe using internal locking.
"""
def __init__(self, variable_pool: VariablePool) -> None:
"""Initialize empty registry with thread-safe storage."""
self._lock = RLock()
self._scalars = variable_pool
self._streams: dict[tuple[str, ...], Stream] = {}
def _selector_to_key(self, selector: Sequence[str]) -> tuple[str, ...]:
"""Convert selector list to tuple key for internal storage."""
return tuple(selector)
def set_scalar(
self, selector: Sequence[str], value: Union[str, int, float, bool, dict[str, Any], list[Any]]
) -> None:
"""
Set a scalar value for the given selector.
Args:
selector: List of strings identifying the output location
value: The scalar value to store
"""
with self._lock:
self._scalars.add(selector, value)
def get_scalar(self, selector: Sequence[str]) -> "Segment | None":
"""
Get a scalar value for the given selector.
Args:
selector: List of strings identifying the output location
Returns:
The stored Variable object, or None if not found
"""
with self._lock:
return self._scalars.get(selector)
def append_chunk(self, selector: Sequence[str], event: "NodeRunStreamChunkEvent") -> None:
"""
Append a NodeRunStreamChunkEvent to the stream for the given selector.
Args:
selector: List of strings identifying the stream location
event: The NodeRunStreamChunkEvent to append
Raises:
ValueError: If the stream is already closed
"""
key = self._selector_to_key(selector)
with self._lock:
if key not in self._streams:
self._streams[key] = Stream()
try:
self._streams[key].append(event)
except ValueError:
raise ValueError(f"Stream {'.'.join(selector)} is already closed")
def pop_chunk(self, selector: Sequence[str]) -> "NodeRunStreamChunkEvent | None":
"""
Pop the next unread NodeRunStreamChunkEvent from the stream.
Args:
selector: List of strings identifying the stream location
Returns:
The next event, or None if no unread events available
"""
key = self._selector_to_key(selector)
with self._lock:
if key not in self._streams:
return None
return self._streams[key].pop_next()
def has_unread(self, selector: Sequence[str]) -> bool:
"""
Check if the stream has unread events.
Args:
selector: List of strings identifying the stream location
Returns:
True if there are unread events, False otherwise
"""
key = self._selector_to_key(selector)
with self._lock:
if key not in self._streams:
return False
return self._streams[key].has_unread()
def close_stream(self, selector: Sequence[str]) -> None:
"""
Mark a stream as closed (no more chunks can be appended).
Args:
selector: List of strings identifying the stream location
"""
key = self._selector_to_key(selector)
with self._lock:
if key not in self._streams:
self._streams[key] = Stream()
self._streams[key].close()
def stream_closed(self, selector: Sequence[str]) -> bool:
"""
Check if a stream is closed.
Args:
selector: List of strings identifying the stream location
Returns:
True if the stream is closed, False otherwise
"""
key = self._selector_to_key(selector)
with self._lock:
if key not in self._streams:
return False
return self._streams[key].is_closed

View File

@ -1,70 +0,0 @@
"""
Internal stream implementation for OutputRegistry.
This module contains the private Stream class used internally by OutputRegistry
to manage streaming data chunks.
"""
from typing import TYPE_CHECKING, final
if TYPE_CHECKING:
from core.workflow.graph_events import NodeRunStreamChunkEvent
@final
class Stream:
"""
A stream that holds NodeRunStreamChunkEvent objects and tracks read position.
This class encapsulates stream-specific data and operations,
including event storage, read position tracking, and closed state.
Note: This is an internal class not exposed in the public API.
"""
def __init__(self) -> None:
"""Initialize an empty stream."""
self.events: list[NodeRunStreamChunkEvent] = []
self.read_position: int = 0
self.is_closed: bool = False
def append(self, event: "NodeRunStreamChunkEvent") -> None:
"""
Append a NodeRunStreamChunkEvent to the stream.
Args:
event: The NodeRunStreamChunkEvent to append
Raises:
ValueError: If the stream is already closed
"""
if self.is_closed:
raise ValueError("Cannot append to a closed stream")
self.events.append(event)
def pop_next(self) -> "NodeRunStreamChunkEvent | None":
"""
Pop the next unread NodeRunStreamChunkEvent from the stream.
Returns:
The next event, or None if no unread events available
"""
if self.read_position >= len(self.events):
return None
event = self.events[self.read_position]
self.read_position += 1
return event
def has_unread(self) -> bool:
"""
Check if the stream has unread events.
Returns:
True if there are unread events, False otherwise
"""
return self.read_position < len(self.events)
def close(self) -> None:
"""Mark the stream as closed (no more chunks can be appended)."""
self.is_closed = True

View File

@ -12,12 +12,12 @@ from threading import RLock
from typing import TypeAlias, final
from uuid import uuid4
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.enums import NodeExecutionType, NodeState
from core.workflow.graph import Graph
from core.workflow.graph_events import NodeRunStreamChunkEvent, NodeRunSucceededEvent
from core.workflow.nodes.base.template import TextSegment, VariableSegment
from ..output_registry import OutputRegistry
from .path import Path
from .session import ResponseSession
@ -36,20 +36,25 @@ class ResponseStreamCoordinator:
Ensures ordered streaming of responses based on upstream node outputs and constants.
"""
def __init__(self, registry: OutputRegistry, graph: "Graph") -> None:
def __init__(self, variable_pool: "VariablePool", graph: "Graph") -> None:
"""
Initialize coordinator with output registry.
Initialize coordinator with variable pool.
Args:
registry: OutputRegistry instance for accessing node outputs
variable_pool: VariablePool instance for accessing node variables
graph: Graph instance for looking up node information
"""
self.registry = registry
self.variable_pool = variable_pool
self.graph = graph
self.active_session: ResponseSession | None = None
self.waiting_sessions: deque[ResponseSession] = deque()
self.lock = RLock()
# Internal stream management (replacing OutputRegistry)
self._stream_buffers: dict[tuple[str, ...], list[NodeRunStreamChunkEvent]] = {}
self._stream_positions: dict[tuple[str, ...], int] = {}
self._closed_streams: set[tuple[str, ...]] = set()
# Track response nodes
self._response_nodes: set[NodeID] = set()
@ -256,15 +261,15 @@ class ResponseStreamCoordinator:
) -> Sequence[NodeRunStreamChunkEvent]:
with self.lock:
if isinstance(event, NodeRunStreamChunkEvent):
self.registry.append_chunk(event.selector, event)
self._append_stream_chunk(event.selector, event)
if event.is_final:
self.registry.close_stream(event.selector)
self._close_stream(event.selector)
return self.try_flush()
else:
# Skip cause we share the same variable pool.
#
# for variable_name, variable_value in event.node_run_result.outputs.items():
# self.registry.set_scalar((event.node_id, variable_name), variable_value)
# self.variable_pool.add((event.node_id, variable_name), variable_value)
return self.try_flush()
return []
@ -327,8 +332,8 @@ class ResponseStreamCoordinator:
execution_id = self._get_or_create_execution_id(output_node_id)
# Stream all available chunks
while self.registry.has_unread(segment.selector):
if event := self.registry.pop_chunk(segment.selector):
while self._has_unread_stream(segment.selector):
if event := self._pop_stream_chunk(segment.selector):
# For special selectors, we need to update the event to use
# the active response node's information
if self.active_session and source_selector_prefix not in self.graph.nodes:
@ -349,12 +354,12 @@ class ResponseStreamCoordinator:
events.append(event)
# Check if this is the last chunk by looking ahead
stream_closed = self.registry.stream_closed(segment.selector)
stream_closed = self._is_stream_closed(segment.selector)
# Check if stream is closed to determine if segment is complete
if stream_closed:
is_complete = True
elif value := self.registry.get_scalar(segment.selector):
elif value := self.variable_pool.get(segment.selector):
# Process scalar value
is_last_segment = bool(
self.active_session and self.active_session.index == len(self.active_session.template.segments) - 1
@ -464,3 +469,93 @@ class ResponseStreamCoordinator:
events = self.try_flush()
return events
# ============= Internal Stream Management Methods =============
def _append_stream_chunk(self, selector: Sequence[str], event: NodeRunStreamChunkEvent) -> None:
"""
Append a stream chunk to the internal buffer.
Args:
selector: List of strings identifying the stream location
event: The NodeRunStreamChunkEvent to append
Raises:
ValueError: If the stream is already closed
"""
key = tuple(selector)
if key in self._closed_streams:
raise ValueError(f"Stream {'.'.join(selector)} is already closed")
if key not in self._stream_buffers:
self._stream_buffers[key] = []
self._stream_positions[key] = 0
self._stream_buffers[key].append(event)
def _pop_stream_chunk(self, selector: Sequence[str]) -> NodeRunStreamChunkEvent | None:
"""
Pop the next unread stream chunk from the buffer.
Args:
selector: List of strings identifying the stream location
Returns:
The next event, or None if no unread events available
"""
key = tuple(selector)
if key not in self._stream_buffers:
return None
position = self._stream_positions.get(key, 0)
buffer = self._stream_buffers[key]
if position >= len(buffer):
return None
event = buffer[position]
self._stream_positions[key] = position + 1
return event
def _has_unread_stream(self, selector: Sequence[str]) -> bool:
"""
Check if the stream has unread events.
Args:
selector: List of strings identifying the stream location
Returns:
True if there are unread events, False otherwise
"""
key = tuple(selector)
if key not in self._stream_buffers:
return False
position = self._stream_positions.get(key, 0)
return position < len(self._stream_buffers[key])
def _close_stream(self, selector: Sequence[str]) -> None:
"""
Mark a stream as closed (no more chunks can be appended).
Args:
selector: List of strings identifying the stream location
"""
key = tuple(selector)
self._closed_streams.add(key)
def _is_stream_closed(self, selector: Sequence[str]) -> bool:
"""
Check if a stream is closed.
Args:
selector: List of strings identifying the stream location
Returns:
True if the stream is closed, False otherwise
"""
key = tuple(selector)
return key in self._closed_streams