Compare commits

...

3 Commits

Author SHA1 Message Date
4fa49b27f8 [autofix.ci] apply automated fixes 2026-05-18 08:20:01 +00:00
19b334e5ca chore(api): update graphon filter revision
Point the temporary Graphon source pin at the latest langgenius/graphon#146 head after its event filter updates.
2026-05-18 16:16:54 +08:00
dce25a3909 fix(api): filter Graphon workflow response events
Apply Graphon ResponseStreamFilter to workflow-based app runners so workflow execution consumes filtered graph events in both streaming and blocking paths.

Pin Graphon to the PR revision while the change waits for the 0.5.0 release, and update unit coverage for the filtered event stream behavior.
2026-05-18 15:54:55 +08:00
13 changed files with 81 additions and 21 deletions

View File

@ -246,7 +246,11 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
for layer in self._graph_engine_layers:
workflow_entry.graph_engine.layer(layer)
generator = workflow_entry.run()
generator = self._iter_workflow_events(
workflow_entry,
workflow_entry.run(),
stream=self.application_generate_entity.stream,
)
for event in generator:
self._handle_event(workflow_entry, event)

View File

@ -169,7 +169,11 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
for layer in self._graph_engine_layers:
workflow_entry.graph_engine.layer(layer)
generator = workflow_entry.run()
generator = self._iter_workflow_events(
workflow_entry,
workflow_entry.run(),
stream=self.application_generate_entity.stream,
)
for event in generator:
self._handle_event(workflow_entry, event)

View File

@ -1,6 +1,6 @@
import logging
import time
from collections.abc import Mapping, Sequence
from collections.abc import Iterable, Mapping, Sequence
from typing import Any, cast
from pydantic import ValidationError
@ -51,6 +51,7 @@ from core.workflow.workflow_entry import WorkflowEntry
from core.workflow.workflow_run_outputs import project_node_outputs_for_workflow_run
from graphon.entities.graph_config import NodeConfigDictAdapter
from graphon.entities.pause_reason import HumanInputRequired
from graphon.filters import GraphEventFilterContext, ResponseStreamFilter, filter_graph_events
from graphon.graph import Graph
from graphon.graph_engine.layers import GraphEngineLayer
from graphon.graph_events import (
@ -381,6 +382,21 @@ class WorkflowBasedAppRunner:
return graph, variable_pool
@staticmethod
def _iter_workflow_events(
workflow_entry: WorkflowEntry,
events: Iterable[GraphEngineEvent],
*,
stream: bool,
) -> Iterable[GraphEngineEvent]:
_ = stream
return filter_graph_events(
events,
context=GraphEventFilterContext.from_engine(workflow_entry.graph_engine),
filters=[ResponseStreamFilter()],
)
@staticmethod
def _build_agent_strategy_info(event: NodeRunStartedEvent) -> AgentStrategyInfo | None:
raw_agent_strategy = event.extras.get("agent_strategy")

View File

@ -97,6 +97,7 @@ dify-trace-mlflow = { workspace = true }
dify-trace-opik = { workspace = true }
dify-trace-tencent = { workspace = true }
dify-trace-weave = { workspace = true }
graphon = { git = "https://github.com/langgenius/graphon.git", rev = "a24cf1f227c4b91494779be40c754cb4107c60ed" }
[tool.uv]
default-groups = ["storage", "tools", "vdb-all", "trace-all"]

View File

@ -100,6 +100,7 @@ class TestAdvancedChatAppRunnerConversationVariables:
mock_app_generate_entity.single_iteration_run = None
mock_app_generate_entity.single_loop_run = None
mock_app_generate_entity.trace_manager = None
mock_app_generate_entity.stream = False
# Create runner
runner = AdvancedChatAppRunner(
@ -245,6 +246,7 @@ class TestAdvancedChatAppRunnerConversationVariables:
mock_app_generate_entity.single_iteration_run = None
mock_app_generate_entity.single_loop_run = None
mock_app_generate_entity.trace_manager = None
mock_app_generate_entity.stream = False
# Create runner
runner = AdvancedChatAppRunner(
@ -405,6 +407,7 @@ class TestAdvancedChatAppRunnerConversationVariables:
mock_app_generate_entity.single_iteration_run = None
mock_app_generate_entity.single_loop_run = None
mock_app_generate_entity.trace_manager = None
mock_app_generate_entity.stream = False
# Create runner
runner = AdvancedChatAppRunner(

View File

@ -64,6 +64,7 @@ def build_runner():
gen.single_iteration_run = None
gen.single_loop_run = None
gen.trace_manager = None
gen.stream = False
runner = AdvancedChatAppRunner(
application_generate_entity=gen,

View File

@ -234,6 +234,39 @@ class TestWorkflowBasedAppRunner:
assert graph is not None
assert variable_pool.get(["sys", "conversation_id"]).value == "conv-1"
@pytest.mark.parametrize("stream", [False, True])
def test_iter_workflow_events_filters_response_stream(self, stream: bool):
runner = WorkflowBasedAppRunner(queue_manager=SimpleNamespace(), app_id="app")
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool.from_bootstrap(system_variables=default_system_variables()),
start_at=0.0,
)
workflow_entry = SimpleNamespace(
graph_engine=SimpleNamespace(
graph=SimpleNamespace(nodes={}),
graph_runtime_state=graph_runtime_state,
)
)
events = iter(
[
GraphRunStartedEvent(),
NodeRunStreamChunkEvent(
id="exec",
node_id="llm",
node_type=BuiltinNodeTypes.LLM,
selector=["llm", "text"],
chunk="raw",
is_final=False,
),
GraphRunSucceededEvent(outputs={"answer": "done"}),
]
)
filtered_events = list(runner._iter_workflow_events(workflow_entry, events, stream=stream))
assert [type(event) for event in filtered_events] == [GraphRunStartedEvent, GraphRunSucceededEvent]
def test_handle_graph_run_events_and_pause_notifications(self, monkeypatch: pytest.MonkeyPatch):
published: list[object] = []

View File

@ -53,6 +53,7 @@ def test_run_uses_single_node_execution_branch(
app_generate_entity.trace_manager = None
app_generate_entity.single_iteration_run = single_iteration_run
app_generate_entity.single_loop_run = single_loop_run
app_generate_entity.stream = False
workflow = MagicMock(spec=Workflow)
workflow.tenant_id = "tenant"

View File

@ -1,3 +1,4 @@
from graphon.filters import GraphEventFilterContext, ResponseStreamFilter, filter_graph_events
from graphon.graph_engine import GraphEngine, GraphEngineConfig
from graphon.graph_engine.command_channels import InMemoryChannel
from graphon.graph_events import (
@ -31,7 +32,13 @@ def test_tool_in_chatflow():
config=GraphEngineConfig(),
)
events = list(engine.run())
events = list(
filter_graph_events(
engine.run(),
context=GraphEventFilterContext.from_engine(engine),
filters=[ResponseStreamFilter()],
)
)
# Check for successful completion
success_events = [e for e in events if isinstance(e, GraphRunSucceededEvent)]

8
api/uv.lock generated
View File

@ -1628,7 +1628,7 @@ requires-dist = [
{ name = "gmpy2", specifier = ">=2.3.0" },
{ name = "google-api-python-client", specifier = ">=2.196.0" },
{ name = "google-cloud-aiplatform", specifier = ">=1.151.0,<2.0.0" },
{ name = "graphon", specifier = "~=0.4.0" },
{ name = "graphon", git = "https://github.com/langgenius/graphon.git?rev=a24cf1f227c4b91494779be40c754cb4107c60ed" },
{ name = "gunicorn", specifier = ">=26.0.0" },
{ name = "httpx", extras = ["socks"], specifier = ">=0.28.1,<1.0.0" },
{ name = "httpx-sse", specifier = "~=0.4.0" },
@ -2985,7 +2985,7 @@ httpx = [
[[package]]
name = "graphon"
version = "0.4.0"
source = { registry = "https://pypi.org/simple" }
source = { git = "https://github.com/langgenius/graphon.git?rev=a24cf1f227c4b91494779be40c754cb4107c60ed#a24cf1f227c4b91494779be40c754cb4107c60ed" }
dependencies = [
{ name = "charset-normalizer" },
{ name = "httpx" },
@ -3005,10 +3005,6 @@ dependencies = [
{ name = "unstructured", extra = ["docx", "epub", "md", "ppt", "pptx"] },
{ name = "webvtt-py" },
]
sdist = { url = "https://files.pythonhosted.org/packages/76/24/eb1e7983404dcac84816b76ea450e1bb97023e55e00c699d609340bc361e/graphon-0.4.0.tar.gz", hash = "sha256:afb0c7a58f89e09cfa585296429b4d08cd0df80b9ac54d550f88e7d76ec48ee0", size = 261812, upload-time = "2026-05-13T11:48:39.198Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/de/bad6b3fd1e4b4defc16e6ea106e55c44725a159f1d191a99877bce1c9931/graphon-0.4.0-py3-none-any.whl", hash = "sha256:b33f95886da823d5b1b53d663a4f5f8fa383c37740f3bd19297b8d140fcb804c", size = 372711, upload-time = "2026-05-13T11:48:37.712Z" },
]
[[package]]
name = "graphql-core"

View File

@ -8,14 +8,14 @@
Snapshot generated from `packages/contracts/generated/api/readiness.json` after running `pnpm -C packages/contracts gen-api-contract-from-openapi`.
Are we OpenAPI ready? **No.** Current generated API contracts are **16.6% ready**.
Are we OpenAPI ready? **No.** Current generated API contracts are **16.7% ready**.
| Surface | Ready | Not ready | Total | Ready % |
| --------- | ------: | --------: | ------: | --------: |
| console | 95 | 475 | 570 | 16.7% |
| console | 96 | 474 | 570 | 16.8% |
| service | 16 | 72 | 88 | 18.2% |
| web | 5 | 36 | 41 | 12.2% |
| **total** | **116** | **583** | **699** | **16.6%** |
| **total** | **117** | **582** | **699** | **16.7%** |
Readiness here means the generated contract operation is not marked with:

View File

@ -426,16 +426,10 @@ export const imports = {
/**
* Get workflow online users
*
* Generated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.
*
* @deprecated
*/
export const post3 = oc
.route({
deprecated: true,
description:
'Get workflow online users\n\nGenerated contract types may be inaccurate because backend OpenAPI annotations are incomplete. Do not migrate callers until the generated contract is accurate.',
description: 'Get workflow online users',
inputStructure: 'detailed',
method: 'POST',
operationId: 'postAppsWorkflowsOnlineUsers',

View File

@ -1,7 +1,7 @@
{
"surfaces": {
"console": {
"notReady": 475,
"notReady": 474,
"total": 570
},
"service": {