Compare commits

..

2 Commits

Author SHA1 Message Date
1730f900c1 fix: Add dataset_id filters to the hit_count's subqueries (#33757) 2026-03-20 10:21:45 +08:00
12178e7aec fix(api): add trigger_info to WorkflowNodeExecutionMetadataKey (#33753)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-03-19 17:58:45 +08:00
7 changed files with 105 additions and 24 deletions

View File

@ -6,6 +6,7 @@ from contextlib import ExitStack
from typing import Any, Literal, cast
from uuid import UUID
import sqlalchemy as sa
from flask import request, send_file
from flask_restx import Resource, fields, marshal, marshal_with
from pydantic import BaseModel, Field
@ -294,23 +295,15 @@ class DatasetDocumentListApi(Resource):
sort_logic = asc
if sort == "hit_count":
# sub_query = (
# sa.select(DocumentSegment.document_id,
# sa.func.sum(DocumentSegment.hit_count).label("total_hit_count"))
# .group_by(DocumentSegment.document_id)
# .subquery()
# )
sub_query = (
sa.select(DocumentSegment.document_id, sa.func.sum(DocumentSegment.hit_count).label("total_hit_count"))
.where(DocumentSegment.dataset_id == str(dataset_id))
.group_by(DocumentSegment.document_id)
.subquery()
)
# query = query.outerjoin(sub_query, sub_query.c.document_id == Document.id).order_by(
# sort_logic(sa.func.coalesce(sub_query.c.total_hit_count, 0)),
# sort_logic(Document.position),
# )
# TODO: uncomment this when we have a way to get the hit count,
# currently we don't have a way to get the hit count,
# so we use the created_at to sort the documents.
query = query.order_by(
sort_logic(Document.created_at),
query = query.outerjoin(sub_query, sub_query.c.document_id == Document.id).order_by(
sort_logic(sa.func.coalesce(sub_query.c.total_hit_count, 0)),
sort_logic(Document.position),
)
elif sort == "created_at":

View File

@ -3,7 +3,6 @@ from typing import Final
TRIGGER_WEBHOOK_NODE_TYPE: Final[str] = "trigger-webhook"
TRIGGER_SCHEDULE_NODE_TYPE: Final[str] = "trigger-schedule"
TRIGGER_PLUGIN_NODE_TYPE: Final[str] = "trigger-plugin"
TRIGGER_INFO_METADATA_KEY: Final[str] = "trigger_info"
TRIGGER_NODE_TYPES: Final[frozenset[str]] = frozenset(
{

View File

@ -1,7 +1,7 @@
from collections.abc import Mapping
from typing import Any, cast
from typing import Any
from core.trigger.constants import TRIGGER_INFO_METADATA_KEY, TRIGGER_PLUGIN_NODE_TYPE
from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE
from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, WorkflowNodeExecutionMetadataKey
@ -47,7 +47,7 @@ class TriggerEventNode(Node[TriggerEventNodeData]):
# Get trigger data passed when workflow was triggered
metadata: dict[WorkflowNodeExecutionMetadataKey, Any] = {
cast(WorkflowNodeExecutionMetadataKey, TRIGGER_INFO_METADATA_KEY): {
WorkflowNodeExecutionMetadataKey.TRIGGER_INFO: {
"provider_id": self.node_data.provider_id,
"event_name": self.node_data.event_name,
"plugin_unique_identifier": self.node_data.plugin_unique_identifier,

View File

@ -245,6 +245,9 @@ _END_STATE = frozenset(
class WorkflowNodeExecutionMetadataKey(StrEnum):
"""
Node Run Metadata Key.
Values in this enum are persisted as execution metadata and must stay in sync
with every node that writes `NodeRunResult.metadata`.
"""
TOTAL_TOKENS = "total_tokens"
@ -266,6 +269,7 @@ class WorkflowNodeExecutionMetadataKey(StrEnum):
ERROR_STRATEGY = "error_strategy" # node in continue on error mode return the field
LOOP_VARIABLE_MAP = "loop_variable_map" # single loop variable output
DATASOURCE_INFO = "datasource_info"
TRIGGER_INFO = "trigger_info"
COMPLETED_REASON = "completed_reason" # completed reason for loop node

View File

@ -22,14 +22,14 @@ from sqlalchemy import (
from sqlalchemy.orm import Mapped, mapped_column
from typing_extensions import deprecated
from core.trigger.constants import TRIGGER_INFO_METADATA_KEY, TRIGGER_PLUGIN_NODE_TYPE
from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE
from dify_graph.constants import (
CONVERSATION_VARIABLE_NODE_ID,
SYSTEM_VARIABLE_NODE_ID,
)
from dify_graph.entities.graph_config import NodeConfigDict, NodeConfigDictAdapter
from dify_graph.entities.pause_reason import HumanInputRequired, PauseReason, PauseReasonType, SchedulingPause
from dify_graph.enums import BuiltinNodeTypes, NodeType, WorkflowExecutionStatus
from dify_graph.enums import BuiltinNodeTypes, NodeType, WorkflowExecutionStatus, WorkflowNodeExecutionMetadataKey
from dify_graph.file.constants import maybe_file_object
from dify_graph.file.models import File
from dify_graph.variables import utils as variable_utils
@ -936,8 +936,11 @@ class WorkflowNodeExecutionModel(Base): # This model is expected to have `offlo
elif self.node_type == BuiltinNodeTypes.DATASOURCE and "datasource_info" in execution_metadata:
datasource_info = execution_metadata["datasource_info"]
extras["icon"] = datasource_info.get("icon")
elif self.node_type == TRIGGER_PLUGIN_NODE_TYPE and TRIGGER_INFO_METADATA_KEY in execution_metadata:
trigger_info = execution_metadata[TRIGGER_INFO_METADATA_KEY] or {}
elif (
self.node_type == TRIGGER_PLUGIN_NODE_TYPE
and WorkflowNodeExecutionMetadataKey.TRIGGER_INFO in execution_metadata
):
trigger_info = execution_metadata[WorkflowNodeExecutionMetadataKey.TRIGGER_INFO] or {}
provider_id = trigger_info.get("provider_id")
if provider_id:
extras["icon"] = TriggerManager.get_trigger_plugin_icon(

View File

@ -0,0 +1,63 @@
from collections.abc import Mapping
from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE
from core.workflow.nodes.trigger_plugin.trigger_event_node import TriggerEventNode
from dify_graph.entities import GraphInitParams
from dify_graph.entities.graph_config import NodeConfigDict, NodeConfigDictAdapter
from dify_graph.enums import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from dify_graph.runtime import GraphRuntimeState, VariablePool
from dify_graph.system_variable import SystemVariable
from tests.workflow_test_utils import build_test_graph_init_params
def _build_context(graph_config: Mapping[str, object]) -> tuple[GraphInitParams, GraphRuntimeState]:
init_params = build_test_graph_init_params(
graph_config=graph_config,
user_from="account",
invoke_from="debugger",
)
runtime_state = GraphRuntimeState(
variable_pool=VariablePool(
system_variables=SystemVariable(user_id="user", files=[]),
user_inputs={"payload": "value"},
),
start_at=0.0,
)
return init_params, runtime_state
def _build_node_config() -> NodeConfigDict:
return NodeConfigDictAdapter.validate_python(
{
"id": "node-1",
"data": {
"type": TRIGGER_PLUGIN_NODE_TYPE,
"title": "Trigger Event",
"plugin_id": "plugin-id",
"provider_id": "provider-id",
"event_name": "event-name",
"subscription_id": "subscription-id",
"plugin_unique_identifier": "plugin-unique-identifier",
"event_parameters": {},
},
}
)
def test_trigger_event_node_run_populates_trigger_info_metadata() -> None:
init_params, runtime_state = _build_context(graph_config={})
node = TriggerEventNode(
id="node-1",
config=_build_node_config(),
graph_init_params=init_params,
graph_runtime_state=runtime_state,
)
result = node._run()
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.metadata[WorkflowNodeExecutionMetadataKey.TRIGGER_INFO] == {
"provider_id": "provider-id",
"event_name": "event-name",
"plugin_unique_identifier": "plugin-unique-identifier",
}

View File

@ -0,0 +1,19 @@
from dify_graph.enums import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from dify_graph.node_events.base import NodeRunResult
def test_node_run_result_accepts_trigger_info_metadata() -> None:
result = NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
metadata={
WorkflowNodeExecutionMetadataKey.TRIGGER_INFO: {
"provider_id": "provider-id",
"event_name": "event-name",
}
},
)
assert result.metadata[WorkflowNodeExecutionMetadataKey.TRIGGER_INFO] == {
"provider_id": "provider-id",
"event_name": "event-name",
}