fix(graph_engine): Cannot run single iteration or loop node

Signed-off-by: -LAN- <laipz8200@outlook.com>
This commit is contained in:
-LAN-
2026-01-23 23:49:31 +08:00
parent 8d45755303
commit bd64062e8b
12 changed files with 56 additions and 51 deletions

View File

@ -157,7 +157,7 @@ class WorkflowBasedAppRunner:
# Create initial runtime state with variable pool containing environment variables # Create initial runtime state with variable pool containing environment variables
graph_runtime_state = GraphRuntimeState( graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool( variable_pool=VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={}, user_inputs={},
environment_variables=workflow.environment_variables, environment_variables=workflow.environment_variables,
), ),
@ -268,7 +268,9 @@ class WorkflowBasedAppRunner:
) )
# init graph # init graph
graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=node_id) graph = Graph.init(
graph_config=graph_config, node_factory=node_factory, root_node_id=node_id, skip_validation=True
)
if not graph: if not graph:
raise ValueError("graph not found in workflow") raise ValueError("graph not found in workflow")

View File

@ -288,6 +288,7 @@ class Graph:
graph_config: Mapping[str, object], graph_config: Mapping[str, object],
node_factory: NodeFactory, node_factory: NodeFactory,
root_node_id: str | None = None, root_node_id: str | None = None,
skip_validation: bool = False,
) -> Graph: ) -> Graph:
""" """
Initialize graph Initialize graph
@ -339,8 +340,9 @@ class Graph:
root_node=root_node, root_node=root_node,
) )
# Validate the graph structure using built-in validators if not skip_validation:
get_graph_validator().validate(graph) # Validate the graph structure using built-in validators
get_graph_validator().validate(graph)
return graph return graph

View File

@ -44,7 +44,7 @@ class VariablePool(BaseModel):
) )
system_variables: SystemVariable = Field( system_variables: SystemVariable = Field(
description="System variables", description="System variables",
default_factory=SystemVariable.empty, default_factory=SystemVariable.default,
) )
environment_variables: Sequence[Variable] = Field( environment_variables: Sequence[Variable] = Field(
description="Environment variables.", description="Environment variables.",
@ -271,4 +271,4 @@ class VariablePool(BaseModel):
@classmethod @classmethod
def empty(cls) -> VariablePool: def empty(cls) -> VariablePool:
"""Create an empty variable pool.""" """Create an empty variable pool."""
return cls(system_variables=SystemVariable.empty()) return cls(system_variables=SystemVariable.default())

View File

@ -3,6 +3,7 @@ from __future__ import annotations
from collections.abc import Mapping, Sequence from collections.abc import Mapping, Sequence
from types import MappingProxyType from types import MappingProxyType
from typing import Any from typing import Any
from uuid import uuid4
from pydantic import AliasChoices, BaseModel, ConfigDict, Field, model_validator from pydantic import AliasChoices, BaseModel, ConfigDict, Field, model_validator
@ -72,8 +73,8 @@ class SystemVariable(BaseModel):
return data return data
@classmethod @classmethod
def empty(cls) -> SystemVariable: def default(cls) -> SystemVariable:
return cls() return cls(workflow_execution_id=str(uuid4()))
def to_dict(self) -> dict[SystemVariableKey, Any]: def to_dict(self) -> dict[SystemVariableKey, Any]:
# NOTE: This method is provided for compatibility with legacy code. # NOTE: This method is provided for compatibility with legacy code.

View File

@ -276,7 +276,7 @@ class WorkflowEntry:
# init variable pool # init variable pool
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={}, user_inputs={},
environment_variables=[], environment_variables=[],
) )

View File

@ -436,7 +436,7 @@ class RagPipelineService:
user_inputs=user_inputs, user_inputs=user_inputs,
user_id=account.id, user_id=account.id,
variable_pool=VariablePool( variable_pool=VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs=user_inputs, user_inputs=user_inputs,
environment_variables=[], environment_variables=[],
conversation_variables=[], conversation_variables=[],

View File

@ -675,7 +675,7 @@ class WorkflowService:
else: else:
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs=user_inputs, user_inputs=user_inputs,
environment_variables=draft_workflow.environment_variables, environment_variables=draft_workflow.environment_variables,
conversation_variables=[], conversation_variables=[],
@ -1063,7 +1063,7 @@ def _setup_variable_pool(
system_variable.conversation_id = conversation_id system_variable.conversation_id = conversation_id
system_variable.dialogue_count = 1 system_variable.dialogue_count = 1
else: else:
system_variable = SystemVariable.empty() system_variable = SystemVariable.default()
# init variable pool # init variable pool
variable_pool = VariablePool( variable_pool = VariablePool(

View File

@ -16,7 +16,7 @@ from core.workflow.system_variable import SystemVariable
def test_executor_with_json_body_and_number_variable(): def test_executor_with_json_body_and_number_variable():
# Prepare the variable pool # Prepare the variable pool
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={}, user_inputs={},
) )
variable_pool.add(["pre_node_id", "number"], 42) variable_pool.add(["pre_node_id", "number"], 42)
@ -69,7 +69,7 @@ def test_executor_with_json_body_and_number_variable():
def test_executor_with_json_body_and_object_variable(): def test_executor_with_json_body_and_object_variable():
# Prepare the variable pool # Prepare the variable pool
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={}, user_inputs={},
) )
variable_pool.add(["pre_node_id", "object"], {"name": "John Doe", "age": 30, "email": "john@example.com"}) variable_pool.add(["pre_node_id", "object"], {"name": "John Doe", "age": 30, "email": "john@example.com"})
@ -124,7 +124,7 @@ def test_executor_with_json_body_and_object_variable():
def test_executor_with_json_body_and_nested_object_variable(): def test_executor_with_json_body_and_nested_object_variable():
# Prepare the variable pool # Prepare the variable pool
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={}, user_inputs={},
) )
variable_pool.add(["pre_node_id", "object"], {"name": "John Doe", "age": 30, "email": "john@example.com"}) variable_pool.add(["pre_node_id", "object"], {"name": "John Doe", "age": 30, "email": "john@example.com"})
@ -178,7 +178,7 @@ def test_executor_with_json_body_and_nested_object_variable():
def test_extract_selectors_from_template_with_newline(): def test_extract_selectors_from_template_with_newline():
variable_pool = VariablePool(system_variables=SystemVariable.empty()) variable_pool = VariablePool(system_variables=SystemVariable.default())
variable_pool.add(("node_id", "custom_query"), "line1\nline2") variable_pool.add(("node_id", "custom_query"), "line1\nline2")
node_data = HttpRequestNodeData( node_data = HttpRequestNodeData(
title="Test JSON Body with Nested Object Variable", title="Test JSON Body with Nested Object Variable",
@ -205,7 +205,7 @@ def test_extract_selectors_from_template_with_newline():
def test_executor_with_form_data(): def test_executor_with_form_data():
# Prepare the variable pool # Prepare the variable pool
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={}, user_inputs={},
) )
variable_pool.add(["pre_node_id", "text_field"], "Hello, World!") variable_pool.add(["pre_node_id", "text_field"], "Hello, World!")
@ -290,7 +290,7 @@ def test_init_headers():
return Executor( return Executor(
node_data=node_data, node_data=node_data,
timeout=timeout, timeout=timeout,
variable_pool=VariablePool(system_variables=SystemVariable.empty()), variable_pool=VariablePool(system_variables=SystemVariable.default()),
) )
executor = create_executor("aa\n cc:") executor = create_executor("aa\n cc:")
@ -324,7 +324,7 @@ def test_init_params():
return Executor( return Executor(
node_data=node_data, node_data=node_data,
timeout=timeout, timeout=timeout,
variable_pool=VariablePool(system_variables=SystemVariable.empty()), variable_pool=VariablePool(system_variables=SystemVariable.default()),
) )
# Test basic key-value pairs # Test basic key-value pairs
@ -355,7 +355,7 @@ def test_init_params():
def test_empty_api_key_raises_error_bearer(): def test_empty_api_key_raises_error_bearer():
"""Test that empty API key raises AuthorizationConfigError for bearer auth.""" """Test that empty API key raises AuthorizationConfigError for bearer auth."""
variable_pool = VariablePool(system_variables=SystemVariable.empty()) variable_pool = VariablePool(system_variables=SystemVariable.default())
node_data = HttpRequestNodeData( node_data = HttpRequestNodeData(
title="test", title="test",
method="get", method="get",
@ -379,7 +379,7 @@ def test_empty_api_key_raises_error_bearer():
def test_empty_api_key_raises_error_basic(): def test_empty_api_key_raises_error_basic():
"""Test that empty API key raises AuthorizationConfigError for basic auth.""" """Test that empty API key raises AuthorizationConfigError for basic auth."""
variable_pool = VariablePool(system_variables=SystemVariable.empty()) variable_pool = VariablePool(system_variables=SystemVariable.default())
node_data = HttpRequestNodeData( node_data = HttpRequestNodeData(
title="test", title="test",
method="get", method="get",
@ -403,7 +403,7 @@ def test_empty_api_key_raises_error_basic():
def test_empty_api_key_raises_error_custom(): def test_empty_api_key_raises_error_custom():
"""Test that empty API key raises AuthorizationConfigError for custom auth.""" """Test that empty API key raises AuthorizationConfigError for custom auth."""
variable_pool = VariablePool(system_variables=SystemVariable.empty()) variable_pool = VariablePool(system_variables=SystemVariable.default())
node_data = HttpRequestNodeData( node_data = HttpRequestNodeData(
title="test", title="test",
method="get", method="get",
@ -427,7 +427,7 @@ def test_empty_api_key_raises_error_custom():
def test_whitespace_only_api_key_raises_error(): def test_whitespace_only_api_key_raises_error():
"""Test that whitespace-only API key raises AuthorizationConfigError.""" """Test that whitespace-only API key raises AuthorizationConfigError."""
variable_pool = VariablePool(system_variables=SystemVariable.empty()) variable_pool = VariablePool(system_variables=SystemVariable.default())
node_data = HttpRequestNodeData( node_data = HttpRequestNodeData(
title="test", title="test",
method="get", method="get",
@ -451,7 +451,7 @@ def test_whitespace_only_api_key_raises_error():
def test_valid_api_key_works(): def test_valid_api_key_works():
"""Test that valid API key works correctly for bearer auth.""" """Test that valid API key works correctly for bearer auth."""
variable_pool = VariablePool(system_variables=SystemVariable.empty()) variable_pool = VariablePool(system_variables=SystemVariable.default())
node_data = HttpRequestNodeData( node_data = HttpRequestNodeData(
title="test", title="test",
method="get", method="get",

View File

@ -86,7 +86,7 @@ def graph_init_params() -> GraphInitParams:
@pytest.fixture @pytest.fixture
def graph_runtime_state() -> GraphRuntimeState: def graph_runtime_state() -> GraphRuntimeState:
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={}, user_inputs={},
) )
return GraphRuntimeState( return GraphRuntimeState(

View File

@ -111,7 +111,7 @@ def test_webhook_node_file_conversion_to_file_variable():
) )
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={ user_inputs={
"webhook_data": { "webhook_data": {
"headers": {}, "headers": {},
@ -184,7 +184,7 @@ def test_webhook_node_file_conversion_with_missing_files():
) )
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={ user_inputs={
"webhook_data": { "webhook_data": {
"headers": {}, "headers": {},
@ -219,7 +219,7 @@ def test_webhook_node_file_conversion_with_none_file():
) )
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={ user_inputs={
"webhook_data": { "webhook_data": {
"headers": {}, "headers": {},
@ -256,7 +256,7 @@ def test_webhook_node_file_conversion_with_non_dict_file():
) )
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={ user_inputs={
"webhook_data": { "webhook_data": {
"headers": {}, "headers": {},
@ -300,7 +300,7 @@ def test_webhook_node_file_conversion_mixed_parameters():
) )
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={ user_inputs={
"webhook_data": { "webhook_data": {
"headers": {}, "headers": {},
@ -370,7 +370,7 @@ def test_webhook_node_different_file_types():
) )
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={ user_inputs={
"webhook_data": { "webhook_data": {
"headers": {}, "headers": {},
@ -430,7 +430,7 @@ def test_webhook_node_file_conversion_with_non_dict_wrapper():
) )
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={ user_inputs={
"webhook_data": { "webhook_data": {
"headers": {}, "headers": {},

View File

@ -75,7 +75,7 @@ def test_webhook_node_basic_initialization():
) )
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={}, user_inputs={},
) )
@ -118,7 +118,7 @@ def test_webhook_node_run_with_headers():
) )
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={ user_inputs={
"webhook_data": { "webhook_data": {
"headers": { "headers": {
@ -154,7 +154,7 @@ def test_webhook_node_run_with_query_params():
) )
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={ user_inputs={
"webhook_data": { "webhook_data": {
"headers": {}, "headers": {},
@ -190,7 +190,7 @@ def test_webhook_node_run_with_body_params():
) )
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={ user_inputs={
"webhook_data": { "webhook_data": {
"headers": {}, "headers": {},
@ -249,7 +249,7 @@ def test_webhook_node_run_with_file_params():
) )
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={ user_inputs={
"webhook_data": { "webhook_data": {
"headers": {}, "headers": {},
@ -302,7 +302,7 @@ def test_webhook_node_run_mixed_parameters():
) )
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={ user_inputs={
"webhook_data": { "webhook_data": {
"headers": {"Authorization": "Bearer token"}, "headers": {"Authorization": "Bearer token"},
@ -342,7 +342,7 @@ def test_webhook_node_run_empty_webhook_data():
) )
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={}, # No webhook_data user_inputs={}, # No webhook_data
) )
@ -368,7 +368,7 @@ def test_webhook_node_run_case_insensitive_headers():
) )
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={ user_inputs={
"webhook_data": { "webhook_data": {
"headers": { "headers": {
@ -398,7 +398,7 @@ def test_webhook_node_variable_pool_user_inputs():
# Add some additional variables to the pool # Add some additional variables to the pool
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={ user_inputs={
"webhook_data": {"headers": {}, "query_params": {}, "body": {}, "files": {}}, "webhook_data": {"headers": {}, "query_params": {}, "body": {}, "files": {}},
"other_var": "should_be_included", "other_var": "should_be_included",
@ -429,7 +429,7 @@ def test_webhook_node_different_methods(method):
) )
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={ user_inputs={
"webhook_data": { "webhook_data": {
"headers": {}, "headers": {},

View File

@ -127,7 +127,7 @@ class TestWorkflowEntry:
return node_config return node_config
workflow = StubWorkflow() workflow = StubWorkflow()
variable_pool = VariablePool(system_variables=SystemVariable.empty(), user_inputs={}) variable_pool = VariablePool(system_variables=SystemVariable.default(), user_inputs={})
expected_limits = CodeNodeLimits( expected_limits = CodeNodeLimits(
max_string_length=dify_config.CODE_MAX_STRING_LENGTH, max_string_length=dify_config.CODE_MAX_STRING_LENGTH,
max_number=dify_config.CODE_MAX_NUMBER, max_number=dify_config.CODE_MAX_NUMBER,
@ -157,7 +157,7 @@ class TestWorkflowEntry:
# Initialize variable pool with environment variables # Initialize variable pool with environment variables
env_var = StringVariable(name="API_KEY", value="existing_key") env_var = StringVariable(name="API_KEY", value="existing_key")
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
environment_variables=[env_var], environment_variables=[env_var],
user_inputs={}, user_inputs={},
) )
@ -198,7 +198,7 @@ class TestWorkflowEntry:
# Initialize variable pool with conversation variables # Initialize variable pool with conversation variables
conv_var = StringVariable(name="last_message", value="Hello") conv_var = StringVariable(name="last_message", value="Hello")
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
conversation_variables=[conv_var], conversation_variables=[conv_var],
user_inputs={}, user_inputs={},
) )
@ -239,7 +239,7 @@ class TestWorkflowEntry:
"""Test mapping regular node variables from user inputs to variable pool.""" """Test mapping regular node variables from user inputs to variable pool."""
# Initialize empty variable pool # Initialize empty variable pool
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={}, user_inputs={},
) )
@ -281,7 +281,7 @@ class TestWorkflowEntry:
def test_mapping_user_inputs_with_file_handling(self): def test_mapping_user_inputs_with_file_handling(self):
"""Test mapping file inputs from user inputs to variable pool.""" """Test mapping file inputs from user inputs to variable pool."""
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={}, user_inputs={},
) )
@ -340,7 +340,7 @@ class TestWorkflowEntry:
def test_mapping_user_inputs_missing_variable_error(self): def test_mapping_user_inputs_missing_variable_error(self):
"""Test that mapping raises error when required variable is missing.""" """Test that mapping raises error when required variable is missing."""
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={}, user_inputs={},
) )
@ -366,7 +366,7 @@ class TestWorkflowEntry:
def test_mapping_user_inputs_with_alternative_key_format(self): def test_mapping_user_inputs_with_alternative_key_format(self):
"""Test mapping with alternative key format (without node prefix).""" """Test mapping with alternative key format (without node prefix)."""
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={}, user_inputs={},
) )
@ -396,7 +396,7 @@ class TestWorkflowEntry:
def test_mapping_user_inputs_with_complex_selectors(self): def test_mapping_user_inputs_with_complex_selectors(self):
"""Test mapping with complex node variable keys.""" """Test mapping with complex node variable keys."""
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={}, user_inputs={},
) )
@ -432,7 +432,7 @@ class TestWorkflowEntry:
def test_mapping_user_inputs_invalid_node_variable(self): def test_mapping_user_inputs_invalid_node_variable(self):
"""Test that mapping handles invalid node variable format.""" """Test that mapping handles invalid node variable format."""
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables=SystemVariable.empty(), system_variables=SystemVariable.default(),
user_inputs={}, user_inputs={},
) )