fix(api): ensure is_resumption is properly propagated to SSE events

While running workflow / chatflow from "Installed Apps" / "Web App"
pages, the `node_started` SSE event is manually serialized from the
pydantic model. This causes the lack of `is_resumption` flag in SSE
events.

This PR addresses the problem by adding a `is_resumption` field to
the serialized dict.
This commit is contained in:
QuantumGhost
2026-01-14 10:26:57 +08:00
parent 8e0e5d2974
commit 552b65e36b
2 changed files with 45 additions and 0 deletions

View File

@ -349,6 +349,7 @@ class NodeStartStreamResponse(StreamResponse):
"extras": {},
"iteration_id": self.data.iteration_id,
"loop_id": self.data.loop_id,
"is_resumption": self.data.is_resumption,
},
}

View File

@ -1,5 +1,8 @@
from dataclasses import dataclass
from types import SimpleNamespace
import pytest
from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.queue_entities import QueueNodeStartedEvent
@ -109,3 +112,44 @@ def test_workflow_start_stream_response_defaults_to_false():
is_resumption=False,
)
assert resp.data.is_resumption is False
@dataclass(frozen=True)
class _IgnoreDetailCase:
execution_id: str
node_id: str
is_resumption: bool
@pytest.mark.parametrize(
"case",
[
_IgnoreDetailCase(execution_id="exec-1", node_id="node-1", is_resumption=True),
_IgnoreDetailCase(execution_id="exec-2", node_id="node-2", is_resumption=False),
],
)
def test_node_start_ignore_detail_includes_resumption_flag(case: _IgnoreDetailCase) -> None:
converter = _build_converter()
converter.workflow_start_to_stream_response(
task_id="task-1",
workflow_run_id="run-1",
workflow_id="wf-1",
is_resumption=False,
)
queue_event = QueueNodeStartedEvent(
node_execution_id=case.execution_id,
node_id=case.node_id,
node_title="Title",
node_type=NodeType.CODE,
start_at=converter._workflow_started_at, # type: ignore[attr-defined]
agent_strategy=None,
provider_type="",
provider_id="",
is_resumption=case.is_resumption,
)
resp = converter.workflow_node_start_to_stream_response(event=queue_event, task_id="task-1")
assert isinstance(resp, NodeStartStreamResponse)
ignore_detail = resp.to_ignore_detail_dict()
assert ignore_detail["data"]["is_resumption"] is case.is_resumption