fix: cannot run single step on iteration node

This commit is contained in:
-LAN-
2025-12-16 22:56:32 +08:00
parent a93eecaeee
commit bd1f9bb735
8 changed files with 269 additions and 29 deletions

View File

@ -0,0 +1,94 @@
from types import SimpleNamespace
from unittest.mock import MagicMock
from core.app.apps.workflow_app_runner import WorkflowBasedAppRunner
from core.workflow.enums import NodeType, SystemVariableKey
from core.workflow.system_variable import SystemVariable
def test_prepare_single_iteration_injects_system_variables_and_fake_workflow():
node_id = "iteration_node"
execution_id = "workflow-exec-123"
workflow = SimpleNamespace(
id="workflow-id",
tenant_id="tenant-id",
app_id="app-id",
environment_variables=[],
graph_dict={
"nodes": [
{
"id": node_id,
"type": "custom",
"data": {
"type": NodeType.ITERATION,
"title": "Iteration",
"version": "1",
"iterator_selector": ["start", "items"],
"output_selector": [node_id, "output"],
},
}
],
"edges": [],
},
)
runner = WorkflowBasedAppRunner(queue_manager=MagicMock(), app_id="app-id")
system_inputs = SystemVariable(app_id="app-id", workflow_id="workflow-id", workflow_execution_id=execution_id)
graph, _, runtime_state = runner._prepare_single_node_execution(
workflow=workflow,
single_iteration_run=SimpleNamespace(node_id=node_id, inputs={"input_selector": [1, 2, 3]}),
system_variables=system_inputs,
)
assert runtime_state.variable_pool.system_variables.workflow_execution_id == execution_id
assert runtime_state.variable_pool.get_by_prefix("sys")[SystemVariableKey.WORKFLOW_EXECUTION_ID] == execution_id
assert graph.root_node.id == f"{node_id}_single_step_start"
assert f"{node_id}_single_step_end" in graph.nodes
def test_prepare_single_loop_injects_system_variables_and_fake_workflow():
node_id = "loop_node"
execution_id = "workflow-exec-456"
workflow = SimpleNamespace(
id="workflow-id",
tenant_id="tenant-id",
app_id="app-id",
environment_variables=[],
graph_dict={
"nodes": [
{
"id": node_id,
"type": "custom",
"data": {
"type": NodeType.LOOP,
"title": "Loop",
"version": "1",
"loop_count": 1,
"break_conditions": [],
"logical_operator": "and",
"loop_variables": [],
"outputs": {},
},
}
],
"edges": [],
},
)
runner = WorkflowBasedAppRunner(queue_manager=MagicMock(), app_id="app-id")
system_inputs = SystemVariable(app_id="app-id", workflow_id="workflow-id", workflow_execution_id=execution_id)
graph, _, runtime_state = runner._prepare_single_node_execution(
workflow=workflow,
single_loop_run=SimpleNamespace(node_id=node_id, inputs={}),
system_variables=system_inputs,
)
assert runtime_state.variable_pool.system_variables.workflow_execution_id == execution_id
assert graph.root_node.id == f"{node_id}_single_step_start"
assert f"{node_id}_single_step_end" in graph.nodes

View File

@ -0,0 +1,51 @@
from core.workflow.entities import GraphInitParams
from core.workflow.graph_events import (
NodeRunIterationStartedEvent,
NodeRunIterationSucceededEvent,
NodeRunSucceededEvent,
)
from core.workflow.nodes.iteration.iteration_node import IterationNode
from core.workflow.runtime import GraphRuntimeState, VariablePool
from core.workflow.system_variable import SystemVariable
def test_iteration_node_emits_iteration_events_when_iterator_empty():
init_params = GraphInitParams(
tenant_id="tenant",
app_id="app",
workflow_id="workflow",
graph_config={},
user_id="user",
user_from="account",
invoke_from="debugger",
call_depth=0,
)
runtime_state = GraphRuntimeState(
variable_pool=VariablePool(system_variables=SystemVariable.empty(), user_inputs={}),
start_at=0.0,
)
runtime_state.variable_pool.add(("start", "items"), [])
node = IterationNode(
id="iteration-node",
config={
"id": "iteration-node",
"data": {
"title": "Iteration",
"iterator_selector": ["start", "items"],
"output_selector": ["iteration-node", "output"],
},
},
graph_init_params=init_params,
graph_runtime_state=runtime_state,
)
events = list(node.run())
assert any(isinstance(event, NodeRunIterationStartedEvent) for event in events)
iteration_succeeded_event = next(event for event in events if isinstance(event, NodeRunIterationSucceededEvent))
assert iteration_succeeded_event.steps == 0
assert iteration_succeeded_event.outputs == {"output": []}
assert any(isinstance(event, NodeRunSucceededEvent) for event in events)