fix(workflow): separate runtime outputs from debug bootstrap

This commit is contained in:
WH-2099
2026-03-24 14:11:33 +08:00
parent 9764a8e0ee
commit 57f9053b3a
9 changed files with 486 additions and 31 deletions

View File

@ -37,10 +37,13 @@ from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id
from core.workflow.system_variables import (
build_bootstrap_variables,
default_system_variables,
get_node_creation_preload_selectors,
inject_default_system_variable_mappings,
preload_node_creation_variables,
)
from core.workflow.variable_pool_initializer import add_variables_to_pool
from core.workflow.workflow_entry import WorkflowEntry
from core.workflow.workflow_run_outputs import project_node_outputs_for_workflow_run
from dify_graph.entities import GraphInitParams
from dify_graph.entities.graph_config import NodeConfigDictAdapter
from dify_graph.entities.pause_reason import HumanInputRequired
@ -278,6 +281,8 @@ class WorkflowBasedAppRunner:
graph_config["edges"] = edge_configs
typed_node_configs = [NodeConfigDictAdapter.validate_python(node) for node in node_configs]
# Create required parameters for Graph.init
graph_init_params = GraphInitParams(
workflow_id=workflow.id,
@ -297,26 +302,15 @@ class WorkflowBasedAppRunner:
graph_runtime_state=graph_runtime_state,
)
# init graph
graph = Graph.init(
graph_config=graph_config, node_factory=node_factory, root_node_id=node_id, skip_validation=True
)
if not graph:
raise ValueError("graph not found in workflow")
# fetch node config from node id
target_node_config = None
for node in node_configs:
if node.get("id") == node_id:
for node in typed_node_configs:
if node["id"] == node_id:
target_node_config = node
break
if not target_node_config:
raise ValueError(f"{node_type_label} node id not found in workflow graph")
target_node_config = NodeConfigDictAdapter.validate_python(target_node_config)
# Get node class
node_type = target_node_config["data"].type
node_version = str(target_node_config["data"].version)
@ -325,6 +319,19 @@ class WorkflowBasedAppRunner:
# Use the variable pool from graph_runtime_state instead of creating a new one
variable_pool = graph_runtime_state.variable_pool
preload_node_creation_variables(
variable_loader=self._variable_loader,
variable_pool=variable_pool,
selectors=[
selector
for node_config in typed_node_configs
for selector in get_node_creation_preload_selectors(
node_type=node_config["data"].type,
node_data=node_config["data"],
)
],
)
try:
variable_mapping = node_cls.extract_variable_selector_to_variable_mapping(
graph_config=workflow.graph_dict, config=target_node_config
@ -352,6 +359,14 @@ class WorkflowBasedAppRunner:
tenant_id=workflow.tenant_id,
)
# init graph after constructor-time context has been loaded
graph = Graph.init(
graph_config=graph_config, node_factory=node_factory, root_node_id=node_id, skip_validation=True
)
if not graph:
raise ValueError("graph not found in workflow")
return graph, variable_pool
@staticmethod
@ -420,7 +435,11 @@ class WorkflowBasedAppRunner:
node_run_result = event.node_run_result
inputs = node_run_result.inputs
process_data = node_run_result.process_data
outputs = node_run_result.outputs
outputs = project_node_outputs_for_workflow_run(
node_type=event.node_type,
inputs=inputs,
outputs=node_run_result.outputs,
)
execution_metadata = node_run_result.metadata
self._publish_event(
QueueNodeRetryEvent(
@ -460,7 +479,11 @@ class WorkflowBasedAppRunner:
node_run_result = event.node_run_result
inputs = node_run_result.inputs
process_data = node_run_result.process_data
outputs = node_run_result.outputs
outputs = project_node_outputs_for_workflow_run(
node_type=event.node_type,
inputs=inputs,
outputs=node_run_result.outputs,
)
execution_metadata = node_run_result.metadata
self._publish_event(
QueueNodeSucceededEvent(
@ -478,6 +501,11 @@ class WorkflowBasedAppRunner:
)
)
elif isinstance(event, NodeRunFailedEvent):
outputs = project_node_outputs_for_workflow_run(
node_type=event.node_type,
inputs=event.node_run_result.inputs,
outputs=event.node_run_result.outputs,
)
self._publish_event(
QueueNodeFailedEvent(
node_execution_id=event.id,
@ -487,7 +515,7 @@ class WorkflowBasedAppRunner:
finished_at=event.finished_at,
inputs=event.node_run_result.inputs,
process_data=event.node_run_result.process_data,
outputs=event.node_run_result.outputs,
outputs=outputs,
error=event.node_run_result.error or "Unknown error",
execution_metadata=event.node_run_result.metadata,
in_iteration_id=event.in_iteration_id,
@ -495,6 +523,11 @@ class WorkflowBasedAppRunner:
)
)
elif isinstance(event, NodeRunExceptionEvent):
outputs = project_node_outputs_for_workflow_run(
node_type=event.node_type,
inputs=event.node_run_result.inputs,
outputs=event.node_run_result.outputs,
)
self._publish_event(
QueueNodeExceptionEvent(
node_execution_id=event.id,
@ -504,7 +537,7 @@ class WorkflowBasedAppRunner:
finished_at=event.finished_at,
inputs=event.node_run_result.inputs,
process_data=event.node_run_result.process_data,
outputs=event.node_run_result.outputs,
outputs=outputs,
error=event.node_run_result.error or "Unknown error",
execution_metadata=event.node_run_result.metadata,
in_iteration_id=event.in_iteration_id,

View File

@ -20,6 +20,7 @@ from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
from core.workflow.system_variables import SystemVariableKey
from core.workflow.variable_prefixes import SYSTEM_VARIABLE_NODE_ID
from core.workflow.workflow_run_outputs import project_node_outputs_for_workflow_run
from dify_graph.entities import WorkflowExecution, WorkflowNodeExecution
from dify_graph.enums import (
WorkflowExecutionStatus,
@ -371,10 +372,15 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
domain_execution.error = error
if update_outputs:
projected_outputs = project_node_outputs_for_workflow_run(
node_type=domain_execution.node_type,
inputs=node_result.inputs,
outputs=node_result.outputs,
)
domain_execution.update_from_mapping(
inputs=node_result.inputs,
process_data=node_result.process_data,
outputs=node_result.outputs,
outputs=projected_outputs,
metadata=node_result.metadata,
)

View File

@ -44,6 +44,14 @@ class _VariablePoolReader(Protocol):
def get_by_prefix(self, prefix: str, /) -> Mapping[str, object]: ...
class _VariablePoolWriter(_VariablePoolReader, Protocol):
def add(self, selector: Sequence[str], value: object, /) -> None: ...
class _VariableLoader(Protocol):
def load_variables(self, selectors: list[list[str]]) -> Sequence[object]: ...
def system_variable_name(key: str | SystemVariableKey) -> str:
return key.value if isinstance(key, SystemVariableKey) else key
@ -156,6 +164,57 @@ def get_all_system_variables(variable_pool: _VariablePoolReader) -> Mapping[str,
return variable_pool.get_by_prefix(SYSTEM_VARIABLE_NODE_ID)
_MEMORY_BOOTSTRAP_NODE_TYPES = frozenset(
(
BuiltinNodeTypes.LLM,
BuiltinNodeTypes.QUESTION_CLASSIFIER,
BuiltinNodeTypes.PARAMETER_EXTRACTOR,
)
)
def get_node_creation_preload_selectors(
*,
node_type: str,
node_data: object,
) -> tuple[tuple[str, str], ...]:
"""Return selectors that must exist before node construction begins."""
if node_type not in _MEMORY_BOOTSTRAP_NODE_TYPES or getattr(node_data, "memory", None) is None:
return ()
return (system_variable_selector(SystemVariableKey.CONVERSATION_ID),)
def preload_node_creation_variables(
*,
variable_loader: _VariableLoader,
variable_pool: _VariablePoolWriter,
selectors: Sequence[Sequence[str]],
) -> None:
"""Load constructor-time variables before node or graph creation."""
seen_selectors: set[tuple[str, ...]] = set()
selectors_to_load: list[list[str]] = []
for selector in selectors:
normalized_selector = tuple(selector)
if len(normalized_selector) < 2:
raise ValueError(f"Invalid preload selector: {selector}")
if normalized_selector in seen_selectors:
continue
seen_selectors.add(normalized_selector)
if variable_pool.get(normalized_selector) is None:
selectors_to_load.append(list(normalized_selector))
loaded_variables = variable_loader.load_variables(selectors_to_load)
for variable in loaded_variables:
raw_selector = getattr(variable, "selector", ())
loaded_selector = list(raw_selector)
if len(loaded_selector) < 2:
raise ValueError(f"Invalid loaded variable selector: {raw_selector}")
variable_pool.add(loaded_selector[:2], variable)
def inject_default_system_variable_mappings(
*,
node_id: str,

View File

@ -13,7 +13,9 @@ from core.app.workflow.layers.observability import ObservabilityLayer
from core.workflow.node_factory import DifyNodeFactory, is_start_node_type, resolve_workflow_node_class
from core.workflow.system_variables import (
default_system_variables,
get_node_creation_preload_selectors,
inject_default_system_variable_mappings,
preload_node_creation_variables,
)
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
from core.workflow.variable_prefixes import ENVIRONMENT_VARIABLE_NODE_ID
@ -226,6 +228,8 @@ class WorkflowEntry:
# Get node type
node_type = node_config_data.type
node_version = str(node_config_data.version)
node_cls = resolve_workflow_node_class(node_type=node_type, node_version=node_version)
# init graph init params and runtime state
graph_init_params = GraphInitParams(
@ -249,13 +253,14 @@ class WorkflowEntry:
if is_start_node_type(node_type):
add_node_inputs_to_pool(variable_pool, node_id=node_id, inputs=user_inputs)
# init workflow run state
node_factory = DifyNodeFactory(
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
preload_node_creation_variables(
variable_loader=variable_loader,
variable_pool=variable_pool,
selectors=get_node_creation_preload_selectors(
node_type=node_type,
node_data=node_config_data,
),
)
node = node_factory.create_node(node_config)
node_cls = type(node)
try:
# variable selector to variable mapping
@ -287,6 +292,13 @@ class WorkflowEntry:
tenant_id=workflow.tenant_id,
)
# init workflow run state
node_factory = DifyNodeFactory(
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)
node = node_factory.create_node(node_config)
try:
generator = cls._traced_node_run(node)
except Exception as e:

View File

@ -0,0 +1,18 @@
from collections.abc import Mapping
from typing import Any
from dify_graph.enums import BuiltinNodeTypes, NodeType
def project_node_outputs_for_workflow_run(
*,
node_type: NodeType,
inputs: Mapping[str, Any],
outputs: Mapping[str, Any],
) -> dict[str, Any]:
"""Project internal node outputs onto the workflow-run public contract."""
if node_type == BuiltinNodeTypes.START:
return dict(inputs)
return dict(outputs)

View File

@ -1,6 +1,6 @@
from __future__ import annotations
from datetime import datetime
from datetime import UTC, datetime
from types import SimpleNamespace
import pytest
@ -11,6 +11,10 @@ from core.app.entities.queue_entities import (
QueueAgentLogEvent,
QueueIterationCompletedEvent,
QueueLoopCompletedEvent,
QueueNodeExceptionEvent,
QueueNodeFailedEvent,
QueueNodeRetryEvent,
QueueNodeSucceededEvent,
QueueTextChunkEvent,
QueueWorkflowPausedEvent,
QueueWorkflowStartedEvent,
@ -24,12 +28,18 @@ from dify_graph.graph_events import (
GraphRunStartedEvent,
GraphRunSucceededEvent,
NodeRunAgentLogEvent,
NodeRunExceptionEvent,
NodeRunFailedEvent,
NodeRunIterationSucceededEvent,
NodeRunLoopFailedEvent,
NodeRunRetryEvent,
NodeRunStartedEvent,
NodeRunStreamChunkEvent,
NodeRunSucceededEvent,
)
from dify_graph.node_events import NodeRunResult
from dify_graph.runtime import GraphRuntimeState, VariablePool
from dify_graph.variables.variables import StringVariable
class TestWorkflowBasedAppRunner:
@ -131,6 +141,96 @@ class TestWorkflowBasedAppRunner:
assert graph is not None
assert variable_pool is graph_runtime_state.variable_pool
def test_get_graph_and_variable_pool_preloads_constructor_variables_before_graph_init(self, monkeypatch):
variable_loader = SimpleNamespace(
load_variables=lambda selectors: (
[
StringVariable(
name="conversation_id",
value="conv-1",
selector=["sys", "conversation_id"],
)
]
if selectors
else []
)
)
runner = WorkflowBasedAppRunner(
queue_manager=SimpleNamespace(),
variable_loader=variable_loader,
app_id="app",
)
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool(system_variables=default_system_variables()),
start_at=0.0,
)
workflow = SimpleNamespace(
tenant_id="tenant",
id="workflow",
graph_dict={
"nodes": [
{"id": "loop-node", "data": {"type": "loop", "version": "1", "title": "Loop"}},
{
"id": "llm-child",
"data": {
"type": "llm",
"version": "1",
"loop_id": "loop-node",
"memory": object(),
},
},
],
"edges": [],
},
)
class _LoopNodeCls:
@staticmethod
def extract_variable_selector_to_variable_mapping(graph_config, config):
return {}
def _validate_node_config(value):
return {"id": value["id"], "data": SimpleNamespace(**value["data"])}
def _graph_init(**kwargs):
variable_pool = graph_runtime_state.variable_pool
assert variable_pool.get(["sys", "conversation_id"]) is not None
return SimpleNamespace()
monkeypatch.setattr(
"core.app.apps.workflow_app_runner.NodeConfigDictAdapter.validate_python",
_validate_node_config,
)
monkeypatch.setattr(
"core.app.apps.workflow_app_runner.Graph.init",
_graph_init,
)
monkeypatch.setattr(
"core.app.apps.workflow_app_runner.resolve_workflow_node_class",
lambda **_kwargs: _LoopNodeCls,
)
monkeypatch.setattr(
"core.app.apps.workflow_app_runner.load_into_variable_pool",
lambda **kwargs: None,
)
monkeypatch.setattr(
"core.app.apps.workflow_app_runner.WorkflowEntry.mapping_user_inputs_to_variable_pool",
lambda **kwargs: None,
)
graph, variable_pool = runner._get_graph_and_variable_pool_for_single_node_run(
workflow=workflow,
node_id="loop-node",
user_inputs={},
graph_runtime_state=graph_runtime_state,
node_type_filter_key="loop_id",
node_type_label="loop",
)
assert graph is not None
assert variable_pool.get(["sys", "conversation_id"]).value == "conv-1"
def test_handle_graph_run_events_and_pause_notifications(self, monkeypatch):
published: list[object] = []
@ -195,7 +295,7 @@ class TestWorkflowBasedAppRunner:
node_id="node",
node_type=BuiltinNodeTypes.START,
node_title="Start",
start_at=datetime.utcnow(),
start_at=datetime.now(UTC),
),
)
runner._handle_event(
@ -232,7 +332,7 @@ class TestWorkflowBasedAppRunner:
node_id="node",
node_type=BuiltinNodeTypes.LLM,
node_title="Iter",
start_at=datetime.utcnow(),
start_at=datetime.now(UTC),
inputs={},
outputs={"ok": True},
metadata={},
@ -246,7 +346,7 @@ class TestWorkflowBasedAppRunner:
node_id="node",
node_type=BuiltinNodeTypes.LLM,
node_title="Loop",
start_at=datetime.utcnow(),
start_at=datetime.now(UTC),
inputs={},
outputs={},
metadata={},
@ -259,3 +359,87 @@ class TestWorkflowBasedAppRunner:
assert any(isinstance(event, QueueAgentLogEvent) for event in published)
assert any(isinstance(event, QueueIterationCompletedEvent) for event in published)
assert any(isinstance(event, QueueLoopCompletedEvent) for event in published)
@pytest.mark.parametrize(
("event_factory", "queue_event_cls"),
[
(
lambda result, start_at, finished_at: NodeRunSucceededEvent(
id="exec",
node_id="node",
node_type=BuiltinNodeTypes.START,
start_at=start_at,
finished_at=finished_at,
node_run_result=result,
),
QueueNodeSucceededEvent,
),
(
lambda result, start_at, finished_at: NodeRunFailedEvent(
id="exec",
node_id="node",
node_type=BuiltinNodeTypes.START,
start_at=start_at,
finished_at=finished_at,
error="boom",
node_run_result=result,
),
QueueNodeFailedEvent,
),
(
lambda result, start_at, finished_at: NodeRunExceptionEvent(
id="exec",
node_id="node",
node_type=BuiltinNodeTypes.START,
start_at=start_at,
finished_at=finished_at,
error="boom",
node_run_result=result,
),
QueueNodeExceptionEvent,
),
(
lambda result, start_at, _finished_at: NodeRunRetryEvent(
id="exec",
node_id="node",
node_type=BuiltinNodeTypes.START,
node_title="Start",
start_at=start_at,
error="boom",
retry_index=1,
node_run_result=result,
),
QueueNodeRetryEvent,
),
],
)
def test_handle_start_node_result_events_project_outputs(self, event_factory, queue_event_cls):
published: list[object] = []
class _QueueManager:
def publish(self, event, publish_from):
published.append(event)
runner = WorkflowBasedAppRunner(queue_manager=_QueueManager(), app_id="app")
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool(system_variables=default_system_variables()),
start_at=0.0,
)
workflow_entry = SimpleNamespace(graph_engine=SimpleNamespace(graph_runtime_state=graph_runtime_state))
started_at = datetime.now(UTC)
finished_at = datetime.now(UTC)
result = NodeRunResult(
inputs={"question": "hello"},
outputs={
"question": "hello",
"sys.query": "hello",
"env.API_KEY": "secret",
"conversation.session_id": "session-1",
},
)
runner._handle_event(workflow_entry, event_factory(result, started_at, finished_at))
queue_event = published[-1]
assert isinstance(queue_event, queue_event_cls)
assert queue_event.outputs == {"question": "hello"}

View File

@ -8,7 +8,7 @@ from core.app.workflow.layers.persistence import (
WorkflowPersistenceLayer,
_NodeRuntimeSnapshot,
)
from dify_graph.enums import WorkflowNodeExecutionStatus, WorkflowType
from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionStatus, WorkflowType
from dify_graph.node_events import NodeRunResult
@ -58,3 +58,42 @@ def test_update_node_execution_prefers_event_finished_at(monkeypatch: pytest.Mon
assert node_execution.finished_at == event_finished_at
assert node_execution.elapsed_time == 2.0
def test_update_node_execution_projects_start_outputs() -> None:
layer = _build_layer()
node_execution = Mock()
node_execution.id = "node-exec-2"
node_execution.node_type = BuiltinNodeTypes.START
node_execution.created_at = datetime(2024, 1, 1, 0, 0, 0, tzinfo=UTC).replace(tzinfo=None)
node_execution.update_from_mapping = Mock()
layer._node_snapshots[node_execution.id] = _NodeRuntimeSnapshot(
node_id="start",
title="Start",
predecessor_node_id=None,
iteration_id=None,
loop_id=None,
created_at=node_execution.created_at,
)
layer._update_node_execution(
node_execution,
NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs={"question": "hello"},
outputs={
"question": "hello",
"sys.query": "hello",
"env.API_KEY": "secret",
},
),
WorkflowNodeExecutionStatus.SUCCEEDED,
)
node_execution.update_from_mapping.assert_called_once_with(
inputs={"question": "hello"},
process_data={},
outputs={"question": "hello"},
metadata={},
)

View File

@ -1,6 +1,14 @@
from core.workflow.system_variables import build_system_variables, default_system_variables, system_variables_to_mapping
from types import SimpleNamespace
from core.workflow.system_variables import (
build_system_variables,
default_system_variables,
get_node_creation_preload_selectors,
system_variables_to_mapping,
)
from dify_graph.file.enums import FileTransferMethod, FileType
from dify_graph.file.models import File
from dify_graph.nodes import BuiltinNodeTypes
def test_build_system_variables_normalizes_workflow_execution_id():
@ -56,3 +64,21 @@ def test_default_system_variables_generates_workflow_run_id():
assert isinstance(system_values["workflow_run_id"], str)
assert system_values["workflow_run_id"]
assert system_values["files"] == []
def test_get_node_creation_preload_selectors_requires_conversation_for_memory_nodes():
selectors = get_node_creation_preload_selectors(
node_type=BuiltinNodeTypes.QUESTION_CLASSIFIER,
node_data=SimpleNamespace(memory=object()),
)
assert selectors == (("sys", "conversation_id"),)
def test_get_node_creation_preload_selectors_skips_non_memory_nodes():
selectors = get_node_creation_preload_selectors(
node_type=BuiltinNodeTypes.START,
node_data=SimpleNamespace(memory=None),
)
assert selectors == ()

View File

@ -9,6 +9,7 @@ from core.app.apps.exc import GenerateTaskStoppedError
from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom
from core.model_manager import ModelInstance
from core.workflow import workflow_entry
from core.workflow.system_variables import default_system_variables
from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.entities.graph_config import NodeConfigDictAdapter
from dify_graph.enums import NodeType, WorkflowNodeExecutionStatus
@ -21,7 +22,8 @@ from dify_graph.model_runtime.entities.llm_entities import LLMUsage
from dify_graph.node_events import NodeRunResult
from dify_graph.nodes import BuiltinNodeTypes
from dify_graph.nodes.base.node import Node
from dify_graph.runtime import ChildGraphNotFoundError
from dify_graph.runtime import ChildGraphNotFoundError, VariablePool
from dify_graph.variables.variables import StringVariable
from tests.workflow_test_utils import build_test_graph_init_params, build_test_variable_pool
@ -323,6 +325,79 @@ class TestWorkflowEntryRun:
class TestWorkflowEntrySingleStepRun:
def test_preloads_constructor_variables_before_creating_memory_node(self):
class FakeLLMNode:
id = "node-id"
title = "Node Title"
node_type = BuiltinNodeTypes.LLM
@staticmethod
def version():
return "1"
@staticmethod
def extract_variable_selector_to_variable_mapping(**_kwargs):
return {}
variable_pool = VariablePool(system_variables=default_system_variables(), user_inputs={})
variable_loader = MagicMock()
variable_loader.load_variables.return_value = [
StringVariable(
name="conversation_id",
value="conv-1",
selector=["sys", "conversation_id"],
)
]
with (
patch.object(workflow_entry, "GraphInitParams", return_value=sentinel.graph_init_params),
patch.object(
workflow_entry,
"GraphRuntimeState",
return_value=SimpleNamespace(variable_pool=variable_pool),
),
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeLLMNode),
patch.object(workflow_entry, "DifyNodeFactory") as dify_node_factory,
patch.object(workflow_entry, "load_into_variable_pool"),
patch.object(workflow_entry.WorkflowEntry, "mapping_user_inputs_to_variable_pool"),
patch.object(
workflow_entry.WorkflowEntry,
"_traced_node_run",
return_value=iter(["event"]),
),
):
def _create_node(_node_config):
assert variable_pool.get(["sys", "conversation_id"]) is not None
return FakeLLMNode()
dify_node_factory.return_value.create_node.side_effect = _create_node
workflow = SimpleNamespace(
tenant_id="tenant-id",
app_id="app-id",
id="workflow-id",
graph_dict={"nodes": [], "edges": []},
get_node_config_by_id=lambda _node_id: {
"id": "node-id",
"data": SimpleNamespace(type=BuiltinNodeTypes.LLM, version="1", memory=object()),
},
)
node, generator = workflow_entry.WorkflowEntry.single_step_run(
workflow=workflow,
node_id="node-id",
user_id="user-id",
user_inputs={},
variable_pool=variable_pool,
variable_loader=variable_loader,
)
assert node.id == "node-id"
assert list(generator) == ["event"]
variable_loader.load_variables.assert_called_once_with([["sys", "conversation_id"]])
def test_uses_empty_mapping_when_selector_extraction_is_not_implemented(self):
class FakeNode:
id = "node-id"
@ -342,6 +417,7 @@ class TestWorkflowEntrySingleStepRun:
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeNode),
patch.object(workflow_entry, "DifyNodeFactory") as dify_node_factory,
patch.object(workflow_entry, "add_node_inputs_to_pool") as add_node_inputs_to_pool,
patch.object(workflow_entry, "load_into_variable_pool") as load_into_variable_pool,
@ -410,6 +486,7 @@ class TestWorkflowEntrySingleStepRun:
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeDatasourceNode),
patch.object(workflow_entry, "DifyNodeFactory") as dify_node_factory,
patch.object(workflow_entry, "add_node_inputs_to_pool") as add_node_inputs_to_pool,
patch.object(workflow_entry, "load_into_variable_pool") as load_into_variable_pool,
@ -469,6 +546,7 @@ class TestWorkflowEntrySingleStepRun:
patch.object(workflow_entry, "GraphRuntimeState", return_value=sentinel.graph_runtime_state),
patch.object(workflow_entry, "build_dify_run_context", return_value={"_dify": "context"}),
patch.object(workflow_entry.time, "perf_counter", return_value=123.0),
patch.object(workflow_entry, "resolve_workflow_node_class", return_value=FakeNode),
patch.object(workflow_entry, "DifyNodeFactory") as dify_node_factory,
patch.object(workflow_entry, "add_node_inputs_to_pool"),
patch.object(workflow_entry, "load_into_variable_pool"),