fix(api): align graph protocols for response streaming

This commit is contained in:
Yanli 盐粒
2026-01-30 22:12:34 +08:00
parent a433d5ed36
commit 1beb8802e3
2 changed files with 26 additions and 6 deletions

View File

@ -10,10 +10,10 @@ from __future__ import annotations
from dataclasses import dataclass
from core.workflow.nodes.answer.answer_node import AnswerNode
from core.workflow.nodes.base.node import Node
from core.workflow.nodes.base.template import Template
from core.workflow.nodes.end.end_node import EndNode
from core.workflow.nodes.knowledge_index import KnowledgeIndexNode
from core.workflow.runtime.graph_runtime_state import NodeProtocol
@dataclass
@ -29,7 +29,7 @@ class ResponseSession:
index: int = 0 # Current position in the template segments
@classmethod
def from_node(cls, node: Node) -> ResponseSession:
def from_node(cls, node: NodeProtocol) -> ResponseSession:
"""
Create a ResponseSession from an AnswerNode or EndNode.

View File

@ -12,6 +12,7 @@ from pydantic.json import pydantic_encoder
from core.model_runtime.entities.llm_entities import LLMUsage
from core.workflow.entities.pause_reason import PauseReason
from core.workflow.enums import NodeExecutionType, NodeState, NodeType
from core.workflow.runtime.variable_pool import VariablePool
@ -103,14 +104,33 @@ class ResponseStreamCoordinatorProtocol(Protocol):
...
class NodeProtocol(Protocol):
"""Structural interface for graph nodes."""
id: str
state: NodeState
execution_type: NodeExecutionType
node_type: NodeType
def blocks_variable_output(self, variable_selectors: set[tuple[str, ...]]) -> bool: ...
class EdgeProtocol(Protocol):
id: str
state: NodeState
tail: str
head: str
source_handle: str
class GraphProtocol(Protocol):
"""Structural interface required from graph instances attached to the runtime state."""
nodes: Mapping[str, object]
edges: Mapping[str, object]
root_node: object
nodes: Mapping[str, NodeProtocol]
edges: Mapping[str, EdgeProtocol]
root_node: NodeProtocol
def get_outgoing_edges(self, node_id: str) -> Sequence[object]: ...
def get_outgoing_edges(self, node_id: str) -> Sequence[EdgeProtocol]: ...
@dataclass(slots=True)