refactor: rename mention node to nested_node for generic sub-graph support

This commit is contained in:
Novice
2026-01-22 13:15:13 +08:00
parent c7d106cfa4
commit 5cb8d4cc11
35 changed files with 319 additions and 289 deletions

View File

@ -288,59 +288,45 @@ class Node(Generic[NodeDataT]):
extractor_configs.append(node_config)
return extractor_configs
def _execute_mention_nodes(self) -> Generator[GraphNodeEventBase, None, None]:
def _execute_nested_nodes(self) -> Generator[GraphNodeEventBase, None, None]:
"""
Execute all extractor nodes associated with this node.
Execute all nested nodes associated with this node.
Extractor nodes are nodes with parent_node_id == self._node_id.
Nested nodes are nodes with parent_node_id == self._node_id.
They are executed before the main node to extract values from list[PromptMessage].
"""
from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
from core.workflow.nodes.node_factory import DifyNodeFactory
extractor_configs = self._find_extractor_node_configs()
logger.debug("[Extractor] Found %d extractor nodes for parent '%s'", len(extractor_configs), self._node_id)
logger.debug("[NestedNode] Found %d nested nodes for parent '%s'", len(extractor_configs), self._node_id)
if not extractor_configs:
return
# Use DifyNodeFactory to properly instantiate nodes with required dependencies
node_factory = DifyNodeFactory(
graph_init_params=self._graph_init_params,
graph_runtime_state=self.graph_runtime_state,
)
for config in extractor_configs:
node_id = config.get("id")
node_data = config.get("data", {})
node_type_str = node_data.get("type")
if not node_id or not node_type_str:
if not node_id:
continue
# Get node class
try:
node_type = NodeType(node_type_str)
nested_node = node_factory.create_node(config)
except ValueError:
# Skip nodes that cannot be created (e.g., unknown type)
continue
node_mapping = NODE_TYPE_CLASSES_MAPPING.get(node_type)
if not node_mapping:
continue
node_version = str(node_data.get("version", "1"))
node_cls = node_mapping.get(node_version) or node_mapping.get(LATEST_VERSION)
if not node_cls:
continue
# Instantiate and execute the extractor node
extractor_node = node_cls(
id=node_id,
config=config,
graph_init_params=self._graph_init_params,
graph_runtime_state=self.graph_runtime_state,
)
# Execute and process extractor node events
for event in extractor_node.run():
# Execute and process nested node events
for event in nested_node.run():
# Tag event with parent node id for stream ordering and history tracking
if isinstance(event, GraphNodeEventBase):
event.in_mention_parent_id = self._node_id
event.in_parent_node_id = self._node_id
if isinstance(event, NodeRunSucceededEvent):
# Store extractor node outputs in variable pool
# Store nested node outputs in variable pool
outputs: Mapping[str, Any] = event.node_run_result.outputs
for variable_name, variable_value in outputs.items():
self.graph_runtime_state.variable_pool.add((node_id, variable_name), variable_value)
@ -351,8 +337,8 @@ class Node(Generic[NodeDataT]):
execution_id = self.ensure_execution_id()
self._start_at = naive_utc_now()
# Step 1: Execute associated extractor nodes before main node execution
yield from self._execute_mention_nodes()
# Step 1: Execute associated nested nodes before main node execution
yield from self._execute_nested_nodes()
# Create and push start event with required fields
start_event = NodeRunStartedEvent(