Files
dify/api/tasks/ops_trace_task.py
GareArc 6405a30278 feat(enterprise): Add node execution trace integration and complete parent context wiring
Integrate enterprise per-node telemetry into workflow execution pipeline and
complete parent trace context propagation through the trace system.

Enterprise node execution tracing (GATED):
- Add WorkflowNodeTraceInfo entity with full node execution metadata
- Emit node trace on every node completion (succeeded/failed/exception/paused)
- Include LLM tokens, tool info, iteration/loop context, and timing data
- Hook into workflow persistence layer via _enqueue_node_trace_task()

Parent trace context wiring (COMMUNITY):
- Pass parent_trace_context through TraceTask to WorkflowTraceInfo.metadata
- Enables child workflows to include parent attributes for all trace providers
- Completes the distributed tracing feature started in first commit

Dual processing architecture:
- TraceQueueManager processes traces when enterprise OR per-app tracing enabled
- Celery task calls both EnterpriseDataTrace AND per-app trace providers
- Single queue, dual dispatch pattern

Files changed:
- core/ops/entities/trace_entity.py: Add WorkflowNodeTraceInfo + NODE_EXECUTION_TRACE
- core/app/workflow/layers/persistence.py: Emit node traces + parent context
- core/ops/ops_trace_manager.py: node_execution_trace() + dual dispatch
- tasks/ops_trace_task.py: Call enterprise trace handler
2026-01-29 17:07:58 -08:00

69 lines
2.5 KiB
Python

import json
import logging
from celery import shared_task
from flask import current_app
from core.ops.entities.config_entity import OPS_FILE_PATH, OPS_TRACE_FAILED_KEY
from core.ops.entities.trace_entity import trace_info_info_map
from core.rag.models.document import Document
from extensions.ext_redis import redis_client
from extensions.ext_storage import storage
from models.model import Message
from models.workflow import WorkflowRun
logger = logging.getLogger(__name__)
@shared_task(queue="ops_trace")
def process_trace_tasks(file_info):
"""
Async process trace tasks
Usage: process_trace_tasks.delay(tasks_data)
"""
from core.ops.ops_trace_manager import OpsTraceManager
app_id = file_info.get("app_id")
file_id = file_info.get("file_id")
file_path = f"{OPS_FILE_PATH}{app_id}/{file_id}.json"
file_data = json.loads(storage.load(file_path))
trace_info = file_data.get("trace_info")
trace_info_type = file_data.get("trace_info_type")
trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
if trace_info.get("message_data"):
trace_info["message_data"] = Message.from_dict(data=trace_info["message_data"])
if trace_info.get("workflow_data"):
trace_info["workflow_data"] = WorkflowRun.from_dict(data=trace_info["workflow_data"])
if trace_info.get("documents"):
trace_info["documents"] = [Document.model_validate(doc) for doc in trace_info["documents"]]
try:
trace_type = trace_info_info_map.get(trace_info_type)
if trace_type:
trace_info = trace_type(**trace_info)
# process enterprise trace separately
from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled
if is_enterprise_telemetry_enabled():
from enterprise.telemetry.enterprise_trace import EnterpriseDataTrace
try:
EnterpriseDataTrace().trace(trace_info)
except Exception:
logger.warning("Enterprise trace failed for app_id: %s", app_id, exc_info=True)
if trace_instance:
with current_app.app_context():
trace_instance.trace(trace_info)
logger.info("Processing trace tasks success, app_id: %s", app_id)
except Exception as e:
logger.info("error:\n\n\n%s\n\n\n\n", e)
failed_key = f"{OPS_TRACE_FAILED_KEY}_{app_id}"
redis_client.incr(failed_key)
logger.info("Processing trace tasks failed, app_id: %s", app_id)
finally:
storage.delete(file_path)