mirror of
https://github.com/langgenius/dify.git
synced 2026-05-02 16:38:04 +08:00
Merge remote-tracking branch 'origin/main' into feat/support-agent-sandbox
# Conflicts: # api/core/workflow/graph_events/__init__.py
This commit is contained in:
@ -8,7 +8,7 @@ intercept and respond to GraphEngine events.
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from core.workflow.graph_engine.protocols.command_channel import CommandChannel
|
||||
from core.workflow.graph_events import GraphEngineEvent
|
||||
from core.workflow.graph_events import GraphEngineEvent, GraphNodeEventBase
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from core.workflow.runtime import ReadOnlyGraphRuntimeState
|
||||
|
||||
@ -98,7 +98,7 @@ class GraphEngineLayer(ABC):
|
||||
"""
|
||||
pass
|
||||
|
||||
def on_node_run_start(self, node: Node) -> None: # noqa: B027
|
||||
def on_node_run_start(self, node: Node) -> None:
|
||||
"""
|
||||
Called immediately before a node begins execution.
|
||||
|
||||
@ -109,9 +109,11 @@ class GraphEngineLayer(ABC):
|
||||
Args:
|
||||
node: The node instance about to be executed
|
||||
"""
|
||||
pass
|
||||
return
|
||||
|
||||
def on_node_run_end(self, node: Node, error: Exception | None) -> None: # noqa: B027
|
||||
def on_node_run_end(
|
||||
self, node: Node, error: Exception | None, result_event: GraphNodeEventBase | None = None
|
||||
) -> None:
|
||||
"""
|
||||
Called after a node finishes execution.
|
||||
|
||||
@ -121,5 +123,6 @@ class GraphEngineLayer(ABC):
|
||||
Args:
|
||||
node: The node instance that just finished execution
|
||||
error: Exception instance if the node failed, otherwise None
|
||||
result_event: The final result event from node execution (succeeded/failed/paused), if any
|
||||
"""
|
||||
pass
|
||||
return
|
||||
|
||||
@ -1,61 +0,0 @@
|
||||
"""
|
||||
Node-level OpenTelemetry parser interfaces and defaults.
|
||||
"""
|
||||
|
||||
import json
|
||||
from typing import Protocol
|
||||
|
||||
from opentelemetry.trace import Span
|
||||
from opentelemetry.trace.status import Status, StatusCode
|
||||
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from core.workflow.nodes.tool.entities import ToolNodeData
|
||||
|
||||
|
||||
class NodeOTelParser(Protocol):
|
||||
"""Parser interface for node-specific OpenTelemetry enrichment."""
|
||||
|
||||
def parse(self, *, node: Node, span: "Span", error: Exception | None) -> None: ...
|
||||
|
||||
|
||||
class DefaultNodeOTelParser:
|
||||
"""Fallback parser used when no node-specific parser is registered."""
|
||||
|
||||
def parse(self, *, node: Node, span: "Span", error: Exception | None) -> None:
|
||||
span.set_attribute("node.id", node.id)
|
||||
if node.execution_id:
|
||||
span.set_attribute("node.execution_id", node.execution_id)
|
||||
if hasattr(node, "node_type") and node.node_type:
|
||||
span.set_attribute("node.type", node.node_type.value)
|
||||
|
||||
if error:
|
||||
span.record_exception(error)
|
||||
span.set_status(Status(StatusCode.ERROR, str(error)))
|
||||
else:
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
|
||||
|
||||
class ToolNodeOTelParser:
|
||||
"""Parser for tool nodes that captures tool-specific metadata."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._delegate = DefaultNodeOTelParser()
|
||||
|
||||
def parse(self, *, node: Node, span: "Span", error: Exception | None) -> None:
|
||||
self._delegate.parse(node=node, span=span, error=error)
|
||||
|
||||
tool_data = getattr(node, "_node_data", None)
|
||||
if not isinstance(tool_data, ToolNodeData):
|
||||
return
|
||||
|
||||
span.set_attribute("tool.provider.id", tool_data.provider_id)
|
||||
span.set_attribute("tool.provider.type", tool_data.provider_type.value)
|
||||
span.set_attribute("tool.provider.name", tool_data.provider_name)
|
||||
span.set_attribute("tool.name", tool_data.tool_name)
|
||||
span.set_attribute("tool.label", tool_data.tool_label)
|
||||
if tool_data.plugin_unique_identifier:
|
||||
span.set_attribute("tool.plugin.id", tool_data.plugin_unique_identifier)
|
||||
if tool_data.credential_id:
|
||||
span.set_attribute("tool.credential.id", tool_data.credential_id)
|
||||
if tool_data.tool_configurations:
|
||||
span.set_attribute("tool.config", json.dumps(tool_data.tool_configurations, ensure_ascii=False))
|
||||
@ -18,12 +18,15 @@ from typing_extensions import override
|
||||
from configs import dify_config
|
||||
from core.workflow.enums import NodeType
|
||||
from core.workflow.graph_engine.layers.base import GraphEngineLayer
|
||||
from core.workflow.graph_engine.layers.node_parsers import (
|
||||
from core.workflow.graph_events import GraphNodeEventBase
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from extensions.otel.parser import (
|
||||
DefaultNodeOTelParser,
|
||||
LLMNodeOTelParser,
|
||||
NodeOTelParser,
|
||||
RetrievalNodeOTelParser,
|
||||
ToolNodeOTelParser,
|
||||
)
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from extensions.otel.runtime import is_instrument_flag_enabled
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -72,6 +75,8 @@ class ObservabilityLayer(GraphEngineLayer):
|
||||
"""Initialize parser registry for node types."""
|
||||
self._parsers = {
|
||||
NodeType.TOOL: ToolNodeOTelParser(),
|
||||
NodeType.LLM: LLMNodeOTelParser(),
|
||||
NodeType.KNOWLEDGE_RETRIEVAL: RetrievalNodeOTelParser(),
|
||||
}
|
||||
|
||||
def _get_parser(self, node: Node) -> NodeOTelParser:
|
||||
@ -119,7 +124,9 @@ class ObservabilityLayer(GraphEngineLayer):
|
||||
logger.warning("Failed to create OpenTelemetry span for node %s: %s", node.id, e)
|
||||
|
||||
@override
|
||||
def on_node_run_end(self, node: Node, error: Exception | None) -> None:
|
||||
def on_node_run_end(
|
||||
self, node: Node, error: Exception | None, result_event: GraphNodeEventBase | None = None
|
||||
) -> None:
|
||||
"""
|
||||
Called when a node finishes execution.
|
||||
|
||||
@ -139,7 +146,7 @@ class ObservabilityLayer(GraphEngineLayer):
|
||||
span = node_context.span
|
||||
parser = self._get_parser(node)
|
||||
try:
|
||||
parser.parse(node=node, span=span, error=error)
|
||||
parser.parse(node=node, span=span, error=error, result_event=result_event)
|
||||
span.end()
|
||||
finally:
|
||||
token = node_context.token
|
||||
|
||||
@ -17,7 +17,7 @@ from typing_extensions import override
|
||||
from core.workflow.context import IExecutionContext
|
||||
from core.workflow.graph import Graph
|
||||
from core.workflow.graph_engine.layers.base import GraphEngineLayer
|
||||
from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent
|
||||
from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent, is_node_result_event
|
||||
from core.workflow.nodes.base.node import Node
|
||||
|
||||
from .ready_queue import ReadyQueue
|
||||
@ -131,6 +131,7 @@ class Worker(threading.Thread):
|
||||
node.ensure_execution_id()
|
||||
|
||||
error: Exception | None = None
|
||||
result_event: GraphNodeEventBase | None = None
|
||||
|
||||
# Execute the node with preserved context if execution context is provided
|
||||
if self._execution_context is not None:
|
||||
@ -140,22 +141,26 @@ class Worker(threading.Thread):
|
||||
node_events = node.run()
|
||||
for event in node_events:
|
||||
self._event_queue.put(event)
|
||||
if is_node_result_event(event):
|
||||
result_event = event
|
||||
except Exception as exc:
|
||||
error = exc
|
||||
raise
|
||||
finally:
|
||||
self._invoke_node_run_end_hooks(node, error)
|
||||
self._invoke_node_run_end_hooks(node, error, result_event)
|
||||
else:
|
||||
self._invoke_node_run_start_hooks(node)
|
||||
try:
|
||||
node_events = node.run()
|
||||
for event in node_events:
|
||||
self._event_queue.put(event)
|
||||
if is_node_result_event(event):
|
||||
result_event = event
|
||||
except Exception as exc:
|
||||
error = exc
|
||||
raise
|
||||
finally:
|
||||
self._invoke_node_run_end_hooks(node, error)
|
||||
self._invoke_node_run_end_hooks(node, error, result_event)
|
||||
|
||||
def _invoke_node_run_start_hooks(self, node: Node) -> None:
|
||||
"""Invoke on_node_run_start hooks for all layers."""
|
||||
@ -166,11 +171,13 @@ class Worker(threading.Thread):
|
||||
# Silently ignore layer errors to prevent disrupting node execution
|
||||
continue
|
||||
|
||||
def _invoke_node_run_end_hooks(self, node: Node, error: Exception | None) -> None:
|
||||
def _invoke_node_run_end_hooks(
|
||||
self, node: Node, error: Exception | None, result_event: GraphNodeEventBase | None = None
|
||||
) -> None:
|
||||
"""Invoke on_node_run_end hooks for all layers."""
|
||||
for layer in self._layers:
|
||||
try:
|
||||
layer.on_node_run_end(node, error)
|
||||
layer.on_node_run_end(node, error, result_event)
|
||||
except Exception:
|
||||
# Silently ignore layer errors to prevent disrupting node execution
|
||||
continue
|
||||
|
||||
@ -47,6 +47,7 @@ from .node import (
|
||||
NodeRunSucceededEvent,
|
||||
ToolCall,
|
||||
ToolResult,
|
||||
is_node_result_event,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
@ -79,4 +80,5 @@ __all__ = [
|
||||
"NodeRunSucceededEvent",
|
||||
"ToolCall",
|
||||
"ToolResult",
|
||||
"is_node_result_event",
|
||||
]
|
||||
|
||||
@ -83,3 +83,26 @@ class NodeRunRetryEvent(NodeRunStartedEvent):
|
||||
|
||||
class NodeRunPauseRequestedEvent(GraphNodeEventBase):
|
||||
reason: PauseReason = Field(..., description="pause reason")
|
||||
|
||||
|
||||
def is_node_result_event(event: GraphNodeEventBase) -> bool:
|
||||
"""
|
||||
Check if an event is a final result event from node execution.
|
||||
|
||||
A result event indicates the completion of a node execution and contains
|
||||
runtime information such as inputs, outputs, or error details.
|
||||
|
||||
Args:
|
||||
event: The event to check
|
||||
|
||||
Returns:
|
||||
True if the event is a node result event (succeeded/failed/paused), False otherwise
|
||||
"""
|
||||
return isinstance(
|
||||
event,
|
||||
(
|
||||
NodeRunSucceededEvent,
|
||||
NodeRunFailedEvent,
|
||||
NodeRunPauseRequestedEvent,
|
||||
),
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user