Merge branch 'feat/summary-index' into deploy/dev

This commit is contained in:
zxhlyh
2026-01-28 09:50:05 +08:00
301 changed files with 40720 additions and 8308 deletions

View File

@ -21,6 +21,7 @@ from core.app.entities.queue_entities import (
)
from core.app.features.annotation_reply.annotation_reply import AnnotationReplyFeature
from core.app.layers.conversation_variable_persist_layer import ConversationVariablePersistenceLayer
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.db.session_factory import session_factory
from core.moderation.base import ModerationError
from core.moderation.input_moderation import InputModeration
@ -28,7 +29,6 @@ from core.variables.variables import Variable
from core.workflow.enums import WorkflowType
from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.graph_engine.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.runtime import GraphRuntimeState, VariablePool

View File

@ -9,12 +9,12 @@ from core.app.entities.app_invoke_entities import (
InvokeFrom,
RagPipelineGenerateEntity,
)
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.app.workflow.node_factory import DifyNodeFactory
from core.variables.variables import RAGPipelineVariable, RAGPipelineVariableInput
from core.workflow.entities.graph_init_params import GraphInitParams
from core.workflow.enums import WorkflowType
from core.workflow.graph import Graph
from core.workflow.graph_engine.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.workflow.graph_events import GraphEngineEvent, GraphRunFailedEvent
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository

View File

@ -7,10 +7,10 @@ from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.apps.workflow.app_config_manager import WorkflowAppConfig
from core.app.apps.workflow_app_runner import WorkflowBasedAppRunner
from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.workflow.enums import WorkflowType
from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.graph_engine.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.runtime import GraphRuntimeState, VariablePool

View File

@ -157,7 +157,7 @@ class WorkflowBasedAppRunner:
# Create initial runtime state with variable pool containing environment variables
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={},
environment_variables=workflow.environment_variables,
),
@ -272,7 +272,9 @@ class WorkflowBasedAppRunner:
)
# init graph
graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=node_id)
graph = Graph.init(
graph_config=graph_config, node_factory=node_factory, root_node_id=node_id, skip_validation=True
)
if not graph:
raise ValueError("graph not found in workflow")

View File

@ -0,0 +1,10 @@
"""Workflow-level GraphEngine layers that depend on outer infrastructure."""
from .observability import ObservabilityLayer
from .persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
__all__ = [
"ObservabilityLayer",
"PersistenceWorkflowInfo",
"WorkflowPersistenceLayer",
]

View File

@ -18,12 +18,15 @@ from typing_extensions import override
from configs import dify_config
from core.workflow.enums import NodeType
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.graph_engine.layers.node_parsers import (
from core.workflow.graph_events import GraphNodeEventBase
from core.workflow.nodes.base.node import Node
from extensions.otel.parser import (
DefaultNodeOTelParser,
LLMNodeOTelParser,
NodeOTelParser,
RetrievalNodeOTelParser,
ToolNodeOTelParser,
)
from core.workflow.nodes.base.node import Node
from extensions.otel.runtime import is_instrument_flag_enabled
logger = logging.getLogger(__name__)
@ -72,6 +75,8 @@ class ObservabilityLayer(GraphEngineLayer):
"""Initialize parser registry for node types."""
self._parsers = {
NodeType.TOOL: ToolNodeOTelParser(),
NodeType.LLM: LLMNodeOTelParser(),
NodeType.KNOWLEDGE_RETRIEVAL: RetrievalNodeOTelParser(),
}
def _get_parser(self, node: Node) -> NodeOTelParser:
@ -119,7 +124,9 @@ class ObservabilityLayer(GraphEngineLayer):
logger.warning("Failed to create OpenTelemetry span for node %s: %s", node.id, e)
@override
def on_node_run_end(self, node: Node, error: Exception | None) -> None:
def on_node_run_end(
self, node: Node, error: Exception | None, result_event: GraphNodeEventBase | None = None
) -> None:
"""
Called when a node finishes execution.
@ -139,7 +146,7 @@ class ObservabilityLayer(GraphEngineLayer):
span = node_context.span
parser = self._get_parser(node)
try:
parser.parse(node=node, span=span, error=error)
parser.parse(node=node, span=span, error=error, result_event=result_event)
span.end()
finally:
token = node_context.token

View File

@ -45,7 +45,6 @@ from core.workflow.graph_events import (
from core.workflow.node_events import NodeRunResult
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.workflow_entry import WorkflowEntry
from libs.datetime_utils import naive_utc_now
@ -316,6 +315,9 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
# workflow inputs stay reusable without binding future runs to this conversation.
continue
inputs[f"sys.{field_name}"] = value
# Local import to avoid circular dependency during app bootstrapping.
from core.workflow.workflow_entry import WorkflowEntry
handled = WorkflowEntry.handle_special_values(inputs)
return handled or {}

View File

@ -1,4 +1,4 @@
from collections.abc import Generator, Mapping
from collections.abc import Generator
from typing import Any
from core.datasource.__base.datasource_plugin import DatasourcePlugin
@ -34,7 +34,7 @@ class OnlineDocumentDatasourcePlugin(DatasourcePlugin):
def get_online_document_pages(
self,
user_id: str,
datasource_parameters: Mapping[str, Any],
datasource_parameters: dict[str, Any],
provider_type: str,
) -> Generator[OnlineDocumentPagesMessage, None, None]:
manager = PluginDatasourceManager()

View File

@ -288,6 +288,7 @@ class Graph:
graph_config: Mapping[str, object],
node_factory: NodeFactory,
root_node_id: str | None = None,
skip_validation: bool = False,
) -> Graph:
"""
Initialize graph
@ -339,8 +340,9 @@ class Graph:
root_node=root_node,
)
# Validate the graph structure using built-in validators
get_graph_validator().validate(graph)
if not skip_validation:
# Validate the graph structure using built-in validators
get_graph_validator().validate(graph)
return graph

View File

@ -8,11 +8,9 @@ with middleware-like components that can observe events and interact with execut
from .base import GraphEngineLayer
from .debug_logging import DebugLoggingLayer
from .execution_limits import ExecutionLimitsLayer
from .observability import ObservabilityLayer
__all__ = [
"DebugLoggingLayer",
"ExecutionLimitsLayer",
"GraphEngineLayer",
"ObservabilityLayer",
]

View File

@ -8,7 +8,7 @@ intercept and respond to GraphEngine events.
from abc import ABC, abstractmethod
from core.workflow.graph_engine.protocols.command_channel import CommandChannel
from core.workflow.graph_events import GraphEngineEvent
from core.workflow.graph_events import GraphEngineEvent, GraphNodeEventBase
from core.workflow.nodes.base.node import Node
from core.workflow.runtime import ReadOnlyGraphRuntimeState
@ -98,7 +98,7 @@ class GraphEngineLayer(ABC):
"""
pass
def on_node_run_start(self, node: Node) -> None: # noqa: B027
def on_node_run_start(self, node: Node) -> None:
"""
Called immediately before a node begins execution.
@ -109,9 +109,11 @@ class GraphEngineLayer(ABC):
Args:
node: The node instance about to be executed
"""
pass
return
def on_node_run_end(self, node: Node, error: Exception | None) -> None: # noqa: B027
def on_node_run_end(
self, node: Node, error: Exception | None, result_event: GraphNodeEventBase | None = None
) -> None:
"""
Called after a node finishes execution.
@ -121,5 +123,6 @@ class GraphEngineLayer(ABC):
Args:
node: The node instance that just finished execution
error: Exception instance if the node failed, otherwise None
result_event: The final result event from node execution (succeeded/failed/paused), if any
"""
pass
return

View File

@ -1,61 +0,0 @@
"""
Node-level OpenTelemetry parser interfaces and defaults.
"""
import json
from typing import Protocol
from opentelemetry.trace import Span
from opentelemetry.trace.status import Status, StatusCode
from core.workflow.nodes.base.node import Node
from core.workflow.nodes.tool.entities import ToolNodeData
class NodeOTelParser(Protocol):
"""Parser interface for node-specific OpenTelemetry enrichment."""
def parse(self, *, node: Node, span: "Span", error: Exception | None) -> None: ...
class DefaultNodeOTelParser:
"""Fallback parser used when no node-specific parser is registered."""
def parse(self, *, node: Node, span: "Span", error: Exception | None) -> None:
span.set_attribute("node.id", node.id)
if node.execution_id:
span.set_attribute("node.execution_id", node.execution_id)
if hasattr(node, "node_type") and node.node_type:
span.set_attribute("node.type", node.node_type.value)
if error:
span.record_exception(error)
span.set_status(Status(StatusCode.ERROR, str(error)))
else:
span.set_status(Status(StatusCode.OK))
class ToolNodeOTelParser:
"""Parser for tool nodes that captures tool-specific metadata."""
def __init__(self) -> None:
self._delegate = DefaultNodeOTelParser()
def parse(self, *, node: Node, span: "Span", error: Exception | None) -> None:
self._delegate.parse(node=node, span=span, error=error)
tool_data = getattr(node, "_node_data", None)
if not isinstance(tool_data, ToolNodeData):
return
span.set_attribute("tool.provider.id", tool_data.provider_id)
span.set_attribute("tool.provider.type", tool_data.provider_type.value)
span.set_attribute("tool.provider.name", tool_data.provider_name)
span.set_attribute("tool.name", tool_data.tool_name)
span.set_attribute("tool.label", tool_data.tool_label)
if tool_data.plugin_unique_identifier:
span.set_attribute("tool.plugin.id", tool_data.plugin_unique_identifier)
if tool_data.credential_id:
span.set_attribute("tool.credential.id", tool_data.credential_id)
if tool_data.tool_configurations:
span.set_attribute("tool.config", json.dumps(tool_data.tool_configurations, ensure_ascii=False))

View File

@ -17,7 +17,7 @@ from typing_extensions import override
from core.workflow.context import IExecutionContext
from core.workflow.graph import Graph
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent
from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent, is_node_result_event
from core.workflow.nodes.base.node import Node
from .ready_queue import ReadyQueue
@ -131,6 +131,7 @@ class Worker(threading.Thread):
node.ensure_execution_id()
error: Exception | None = None
result_event: GraphNodeEventBase | None = None
# Execute the node with preserved context if execution context is provided
if self._execution_context is not None:
@ -140,22 +141,26 @@ class Worker(threading.Thread):
node_events = node.run()
for event in node_events:
self._event_queue.put(event)
if is_node_result_event(event):
result_event = event
except Exception as exc:
error = exc
raise
finally:
self._invoke_node_run_end_hooks(node, error)
self._invoke_node_run_end_hooks(node, error, result_event)
else:
self._invoke_node_run_start_hooks(node)
try:
node_events = node.run()
for event in node_events:
self._event_queue.put(event)
if is_node_result_event(event):
result_event = event
except Exception as exc:
error = exc
raise
finally:
self._invoke_node_run_end_hooks(node, error)
self._invoke_node_run_end_hooks(node, error, result_event)
def _invoke_node_run_start_hooks(self, node: Node) -> None:
"""Invoke on_node_run_start hooks for all layers."""
@ -166,11 +171,13 @@ class Worker(threading.Thread):
# Silently ignore layer errors to prevent disrupting node execution
continue
def _invoke_node_run_end_hooks(self, node: Node, error: Exception | None) -> None:
def _invoke_node_run_end_hooks(
self, node: Node, error: Exception | None, result_event: GraphNodeEventBase | None = None
) -> None:
"""Invoke on_node_run_end hooks for all layers."""
for layer in self._layers:
try:
layer.on_node_run_end(node, error)
layer.on_node_run_end(node, error, result_event)
except Exception:
# Silently ignore layer errors to prevent disrupting node execution
continue

View File

@ -44,6 +44,7 @@ from .node import (
NodeRunStartedEvent,
NodeRunStreamChunkEvent,
NodeRunSucceededEvent,
is_node_result_event,
)
__all__ = [
@ -73,4 +74,5 @@ __all__ = [
"NodeRunStartedEvent",
"NodeRunStreamChunkEvent",
"NodeRunSucceededEvent",
"is_node_result_event",
]

View File

@ -56,3 +56,26 @@ class NodeRunRetryEvent(NodeRunStartedEvent):
class NodeRunPauseRequestedEvent(GraphNodeEventBase):
reason: PauseReason = Field(..., description="pause reason")
def is_node_result_event(event: GraphNodeEventBase) -> bool:
"""
Check if an event is a final result event from node execution.
A result event indicates the completion of a node execution and contains
runtime information such as inputs, outputs, or error details.
Args:
event: The event to check
Returns:
True if the event is a node result event (succeeded/failed/paused), False otherwise
"""
return isinstance(
event,
(
NodeRunSucceededEvent,
NodeRunFailedEvent,
NodeRunPauseRequestedEvent,
),
)

View File

@ -44,7 +44,7 @@ class VariablePool(BaseModel):
)
system_variables: SystemVariable = Field(
description="System variables",
default_factory=SystemVariable.empty,
default_factory=SystemVariable.default,
)
environment_variables: Sequence[Variable] = Field(
description="Environment variables.",
@ -271,4 +271,4 @@ class VariablePool(BaseModel):
@classmethod
def empty(cls) -> VariablePool:
"""Create an empty variable pool."""
return cls(system_variables=SystemVariable.empty())
return cls(system_variables=SystemVariable.default())

View File

@ -3,6 +3,7 @@ from __future__ import annotations
from collections.abc import Mapping, Sequence
from types import MappingProxyType
from typing import Any
from uuid import uuid4
from pydantic import AliasChoices, BaseModel, ConfigDict, Field, model_validator
@ -72,8 +73,8 @@ class SystemVariable(BaseModel):
return data
@classmethod
def empty(cls) -> SystemVariable:
return cls()
def default(cls) -> SystemVariable:
return cls(workflow_execution_id=str(uuid4()))
def to_dict(self) -> dict[SystemVariableKey, Any]:
# NOTE: This method is provided for compatibility with legacy code.

View File

@ -7,6 +7,7 @@ from typing import Any
from configs import dify_config
from core.app.apps.exc import GenerateTaskStoppedError
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.workflow.layers.observability import ObservabilityLayer
from core.app.workflow.node_factory import DifyNodeFactory
from core.file.models import File
from core.workflow.constants import ENVIRONMENT_VARIABLE_NODE_ID
@ -15,7 +16,7 @@ from core.workflow.errors import WorkflowNodeRunFailedError
from core.workflow.graph import Graph
from core.workflow.graph_engine import GraphEngine
from core.workflow.graph_engine.command_channels import InMemoryChannel
from core.workflow.graph_engine.layers import DebugLoggingLayer, ExecutionLimitsLayer, ObservabilityLayer
from core.workflow.graph_engine.layers import DebugLoggingLayer, ExecutionLimitsLayer
from core.workflow.graph_engine.protocols.command_channel import CommandChannel
from core.workflow.graph_events import GraphEngineEvent, GraphNodeEventBase, GraphRunFailedEvent
from core.workflow.nodes import NodeType
@ -276,7 +277,7 @@ class WorkflowEntry:
# init variable pool
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
system_variables=SystemVariable.default(),
user_inputs={},
environment_variables=[],
)