Change the is_resumption field in WorkflowStarted event into reason (vibe-kanban 19ac040e)

Reason should be an enumeration with only one member `resumption` currently.

Please update these part of events:

- Graph / Engine Event (GraphRunStartedEvent)
- Queue event (QueueWorkflowStartedEvent)
- SSE response event (WorkflowStartStreamResponse)

Besides, you should remove the `is_resumption` flag for `node_started` events; including:

- Queue Event (`QueueNodeStartedEvent`)
- SSE Event (`NodeStartStreamResponse`)
- Node event (`NodeRunStartedEvent`)

After finishing the changes above, adjust related tests.
You should run the affected tests and ensure they can pass. (You should use `uv run pytest` to run tests)
This commit is contained in:
QuantumGhost
2026-01-18 21:00:25 +08:00
parent 7bc7a8d0ab
commit afdf2397f2
21 changed files with 59 additions and 227 deletions

View File

@ -3,6 +3,7 @@ from types import SimpleNamespace
from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.queue_entities import QueueHumanInputFormFilledEvent
from core.workflow.entities.workflow_start_reason import WorkflowStartReason
from core.workflow.runtime import GraphRuntimeState, VariablePool
from core.workflow.system_variable import SystemVariable
@ -39,7 +40,7 @@ def test_human_input_form_filled_stream_response_contains_rendered_content():
task_id="task-1",
workflow_run_id="run-1",
workflow_id="wf-1",
is_resumption=False,
reason=WorkflowStartReason.INITIAL,
)
queue_event = QueueHumanInputFormFilledEvent(

View File

@ -1,14 +1,8 @@
from dataclasses import dataclass
from types import SimpleNamespace
import pytest
from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.queue_entities import QueueNodeStartedEvent
from core.app.entities.task_entities import NodeStartStreamResponse
from core.workflow.entities import AgentNodeStrategyInit
from core.workflow.enums import NodeType
from core.workflow.entities.workflow_start_reason import WorkflowStartReason
from core.workflow.runtime import GraphRuntimeState, VariablePool
from core.workflow.system_variable import SystemVariable
@ -40,116 +34,23 @@ def _build_converter() -> WorkflowResponseConverter:
)
def test_node_start_stream_response_carries_resumption_flag():
converter = _build_converter()
# Seed workflow run id for converter
converter.workflow_start_to_stream_response(
task_id="task-1",
workflow_run_id="run-1",
workflow_id="wf-1",
is_resumption=False,
)
queue_event = QueueNodeStartedEvent(
node_execution_id="exec-1",
node_id="node-1",
node_title="Title",
node_type=NodeType.CODE,
start_at=converter._workflow_started_at, # type: ignore[attr-defined]
agent_strategy=AgentNodeStrategyInit(name="test"),
provider_type="",
provider_id="",
is_resumption=True,
)
resp = converter.workflow_node_start_to_stream_response(event=queue_event, task_id="task-1")
assert isinstance(resp, NodeStartStreamResponse)
assert resp.data.is_resumption is True
def test_node_start_stream_response_defaults_to_false():
converter = _build_converter()
converter.workflow_start_to_stream_response(
task_id="task-1",
workflow_run_id="run-1",
workflow_id="wf-1",
is_resumption=False,
)
queue_event = QueueNodeStartedEvent(
node_execution_id="exec-2",
node_id="node-2",
node_title="Title",
node_type=NodeType.CODE,
start_at=converter._workflow_started_at, # type: ignore[attr-defined]
agent_strategy=None,
provider_type="",
provider_id="",
)
resp = converter.workflow_node_start_to_stream_response(event=queue_event, task_id="task-1")
assert isinstance(resp, NodeStartStreamResponse)
assert resp.data.is_resumption is False
def test_workflow_start_stream_response_carries_resumption_flag():
def test_workflow_start_stream_response_carries_resumption_reason():
converter = _build_converter()
resp = converter.workflow_start_to_stream_response(
task_id="task-1",
workflow_run_id="run-1",
workflow_id="wf-1",
is_resumption=True,
reason=WorkflowStartReason.RESUMPTION,
)
assert resp.data.is_resumption is True
assert resp.data.reason is WorkflowStartReason.RESUMPTION
def test_workflow_start_stream_response_defaults_to_false():
def test_workflow_start_stream_response_carries_initial_reason():
converter = _build_converter()
resp = converter.workflow_start_to_stream_response(
task_id="task-1",
workflow_run_id="run-1",
workflow_id="wf-1",
is_resumption=False,
reason=WorkflowStartReason.INITIAL,
)
assert resp.data.is_resumption is False
@dataclass(frozen=True)
class _IgnoreDetailCase:
execution_id: str
node_id: str
is_resumption: bool
@pytest.mark.parametrize(
"case",
[
_IgnoreDetailCase(execution_id="exec-1", node_id="node-1", is_resumption=True),
_IgnoreDetailCase(execution_id="exec-2", node_id="node-2", is_resumption=False),
],
)
def test_node_start_ignore_detail_includes_resumption_flag(case: _IgnoreDetailCase) -> None:
converter = _build_converter()
converter.workflow_start_to_stream_response(
task_id="task-1",
workflow_run_id="run-1",
workflow_id="wf-1",
is_resumption=False,
)
queue_event = QueueNodeStartedEvent(
node_execution_id=case.execution_id,
node_id=case.node_id,
node_title="Title",
node_type=NodeType.CODE,
start_at=converter._workflow_started_at, # type: ignore[attr-defined]
agent_strategy=None,
provider_type="",
provider_id="",
is_resumption=case.is_resumption,
)
resp = converter.workflow_node_start_to_stream_response(event=queue_event, task_id="task-1")
assert isinstance(resp, NodeStartStreamResponse)
ignore_detail = resp.to_ignore_detail_dict()
assert ignore_detail["data"]["is_resumption"] is case.is_resumption
assert resp.data.reason is WorkflowStartReason.INITIAL

View File

@ -23,6 +23,7 @@ from core.app.entities.queue_entities import (
QueueNodeStartedEvent,
QueueNodeSucceededEvent,
)
from core.workflow.entities.workflow_start_reason import WorkflowStartReason
from core.workflow.enums import NodeType
from core.workflow.system_variable import SystemVariable
from libs.datetime_utils import naive_utc_now
@ -128,7 +129,7 @@ class TestWorkflowResponseConverter:
task_id="bootstrap",
workflow_run_id="run-id",
workflow_id="wf-id",
is_resumption=False,
reason=WorkflowStartReason.INITIAL,
)
start_event = self.create_node_started_event()
converter.workflow_node_start_to_stream_response(
@ -169,7 +170,7 @@ class TestWorkflowResponseConverter:
task_id="bootstrap",
workflow_run_id="run-id",
workflow_id="wf-id",
is_resumption=False,
reason=WorkflowStartReason.INITIAL,
)
start_event = self.create_node_started_event()
converter.workflow_node_start_to_stream_response(
@ -205,7 +206,7 @@ class TestWorkflowResponseConverter:
task_id="bootstrap",
workflow_run_id="run-id",
workflow_id="wf-id",
is_resumption=False,
reason=WorkflowStartReason.INITIAL,
)
start_event = self.create_node_started_event()
converter.workflow_node_start_to_stream_response(
@ -244,7 +245,7 @@ class TestWorkflowResponseConverter:
task_id="bootstrap",
workflow_run_id="run-id",
workflow_id="wf-id",
is_resumption=False,
reason=WorkflowStartReason.INITIAL,
)
start_event = self.create_node_started_event()
converter.workflow_node_start_to_stream_response(
@ -285,7 +286,7 @@ class TestWorkflowResponseConverter:
task_id="bootstrap",
workflow_run_id="run-id",
workflow_id="wf-id",
is_resumption=False,
reason=WorkflowStartReason.INITIAL,
)
start_event = self.create_node_started_event()
converter.workflow_node_start_to_stream_response(
@ -425,7 +426,7 @@ class TestWorkflowResponseConverterServiceApiTruncation:
task_id="test-task-id",
workflow_run_id="test-workflow-run-id",
workflow_id="test-workflow-id",
is_resumption=False,
reason=WorkflowStartReason.INITIAL,
)
return converter

View File

@ -7,6 +7,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.queue_entities import QueueWorkflowPausedEvent
from core.app.entities.task_entities import HumanInputRequiredResponse, WorkflowPauseStreamResponse
from core.workflow.entities.pause_reason import HumanInputRequired
from core.workflow.entities.workflow_start_reason import WorkflowStartReason
from core.workflow.graph_events.graph import GraphRunPausedEvent
from core.workflow.nodes.human_input.entities import FormInput, UserAction
from core.workflow.nodes.human_input.enums import FormInputType
@ -116,7 +117,7 @@ def test_queue_workflow_paused_event_to_stream_responses():
task_id="task",
workflow_run_id="run-id",
workflow_id="workflow-id",
is_resumption=False,
reason=WorkflowStartReason.INITIAL,
)
reason = HumanInputRequired(

View File

@ -2,10 +2,6 @@
from __future__ import annotations
from dataclasses import dataclass
import pytest
from core.workflow.enums import NodeExecutionType, NodeState, NodeType, WorkflowNodeExecutionStatus
from core.workflow.graph import Graph
from core.workflow.graph_engine.domain.graph_execution import GraphExecution
@ -121,72 +117,3 @@ def test_retry_does_not_emit_additional_start_event() -> None:
node_execution = graph_execution.get_or_create_node_execution(node_id)
assert node_execution.retry_count == 1
@dataclass(frozen=True)
class _ResumptionFlagCase:
node_id: str
execution_id: str
node_title: str
is_resumption: bool
@pytest.mark.parametrize(
"case",
[
_ResumptionFlagCase(
node_id="resumed-node",
execution_id="exec-1",
node_title="Resumed Node",
is_resumption=True,
),
_ResumptionFlagCase(
node_id="fresh-node",
execution_id="exec-2",
node_title="Fresh Node",
is_resumption=False,
),
],
)
def test_node_start_preserves_resumption_flag(case: _ResumptionFlagCase) -> None:
"""Ensure NodeRunStartedEvent preserves resumption flag."""
handler, event_manager, _ = _build_event_handler(case.node_id)
start_event = NodeRunStartedEvent(
id=case.execution_id,
node_id=case.node_id,
node_type=NodeType.CODE,
node_title=case.node_title,
start_at=naive_utc_now(),
is_resumption=case.is_resumption,
)
handler.dispatch(start_event)
collected = event_manager._events # type: ignore[attr-defined]
assert len(collected) == 1
emitted_event = collected[0]
assert isinstance(emitted_event, NodeRunStartedEvent)
assert emitted_event.is_resumption is case.is_resumption
def test_node_start_marks_fresh_run_as_not_resumption() -> None:
"""Ensure fresh NodeRunStartedEvent carries is_resumption=False."""
node_id = "fresh-node"
handler, event_manager, _ = _build_event_handler(node_id)
start_event = NodeRunStartedEvent(
id="exec-2",
node_id=node_id,
node_type=NodeType.CODE,
node_title="Fresh Node",
start_at=naive_utc_now(),
)
handler.dispatch(start_event)
collected = event_manager._events # type: ignore[attr-defined]
assert len(collected) == 1
emitted_event = collected[0]
assert isinstance(emitted_event, NodeRunStartedEvent)
assert emitted_event.is_resumption is False

View File

@ -7,6 +7,7 @@ from typing import Any
from core.model_runtime.entities.llm_entities import LLMMode
from core.model_runtime.entities.message_entities import PromptMessageRole
from core.workflow.entities import GraphInitParams
from core.workflow.entities.workflow_start_reason import WorkflowStartReason
from core.workflow.graph import Graph
from core.workflow.graph_engine.command_channels.in_memory_channel import InMemoryChannel
from core.workflow.graph_engine.graph_engine import GraphEngine
@ -313,7 +314,7 @@ def test_parallel_human_input_pause_preserves_node_finished_after_snapshot_resum
events = list(engine.run())
start_event = next(e for e in events if isinstance(e, GraphRunStartedEvent))
assert start_event.is_resumption is True
assert start_event.reason is WorkflowStartReason.RESUMPTION
llm_started = any(isinstance(e, NodeRunStartedEvent) and e.node_id == "llm_a" for e in events)
llm_succeeded = any(isinstance(e, NodeRunSucceededEvent) and e.node_id == "llm_a" for e in events)

View File

@ -7,6 +7,7 @@ from typing import Any
from core.model_runtime.entities.llm_entities import LLMMode
from core.model_runtime.entities.message_entities import PromptMessageRole
from core.workflow.entities import GraphInitParams
from core.workflow.entities.workflow_start_reason import WorkflowStartReason
from core.workflow.graph import Graph
from core.workflow.graph_engine.command_channels.in_memory_channel import InMemoryChannel
from core.workflow.graph_engine.graph_engine import GraphEngine
@ -294,9 +295,8 @@ def test_pause_defers_ready_nodes_until_resume() -> None:
resumed_events = list(resumed_engine.run())
start_event = next(e for e in resumed_events if isinstance(e, GraphRunStartedEvent))
assert start_event.is_resumption is True
assert start_event.reason is WorkflowStartReason.RESUMPTION
llm_b_started = _get_node_started_event(resumed_events, "llm_b")
assert llm_b_started is not None
assert llm_b_started.is_resumption is False
assert any(isinstance(e, NodeRunSucceededEvent) and e.node_id == "llm_b" for e in resumed_events)

View File

@ -4,6 +4,7 @@ from typing import Any
from unittest.mock import MagicMock
from core.workflow.entities import GraphInitParams
from core.workflow.entities.workflow_start_reason import WorkflowStartReason
from core.workflow.graph import Graph
from core.workflow.graph_engine.command_channels.in_memory_channel import InMemoryChannel
from core.workflow.graph_engine.graph_engine import GraphEngine
@ -172,7 +173,7 @@ def test_engine_resume_restores_state_and_completion():
assert baseline_events
first_paused_event = baseline_events[0]
assert isinstance(first_paused_event, GraphRunStartedEvent)
assert first_paused_event.is_resumption is False
assert first_paused_event.reason is WorkflowStartReason.INITIAL
assert isinstance(baseline_events[-1], GraphRunSucceededEvent)
baseline_success_nodes = _node_successes(baseline_events)
@ -184,7 +185,7 @@ def test_engine_resume_restores_state_and_completion():
assert paused_events
first_paused_event = paused_events[0]
assert isinstance(first_paused_event, GraphRunStartedEvent)
assert first_paused_event.is_resumption is False
assert first_paused_event.reason is WorkflowStartReason.INITIAL
assert isinstance(paused_events[-1], GraphRunPausedEvent)
snapshot = paused_state.dumps()
@ -196,7 +197,7 @@ def test_engine_resume_restores_state_and_completion():
assert resumed_events
first_resumed_event = resumed_events[0]
assert isinstance(first_resumed_event, GraphRunStartedEvent)
assert first_resumed_event.is_resumption is True
assert first_resumed_event.reason is WorkflowStartReason.RESUMPTION
assert isinstance(resumed_events[-1], GraphRunSucceededEvent)
combined_success_nodes = _node_successes(paused_events) + _node_successes(resumed_events)
@ -207,8 +208,6 @@ def test_engine_resume_restores_state_and_completion():
assert paused_human_started is not None
assert resumed_human_started is not None
assert paused_human_started.id == resumed_human_started.id
assert paused_human_started.is_resumption is False
assert resumed_human_started.is_resumption is True
assert baseline_state.outputs == resumed_state.outputs
assert _segment_value(baseline_state.variable_pool, ("human", "__action_id")) == _segment_value(

View File

@ -149,10 +149,7 @@ class TestUserAction:
UserAction(**data)
errors = exc_info.value.errors()
assert any(
error["loc"] == (field_name,) and error["type"] == "string_too_long"
for error in errors
)
assert any(error["loc"] == (field_name,) and error["type"] == "string_too_long" for error in errors)
class TestHumanInputNodeData: