mirror of
https://github.com/langgenius/dify.git
synced 2026-05-05 01:48:04 +08:00
feat: enterprise OTEL telemetry exporter (squash merge from feat/otel-telemetry-ee)
This commit is contained in:
52
api/tasks/enterprise_telemetry_task.py
Normal file
52
api/tasks/enterprise_telemetry_task.py
Normal file
@ -0,0 +1,52 @@
|
||||
"""Celery worker for enterprise metric/log telemetry events.
|
||||
|
||||
This module defines the Celery task that processes telemetry envelopes
|
||||
from the enterprise_telemetry queue. It deserializes envelopes and
|
||||
dispatches them to the EnterpriseMetricHandler.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
from celery import shared_task
|
||||
|
||||
from enterprise.telemetry.contracts import TelemetryEnvelope
|
||||
from enterprise.telemetry.metric_handler import EnterpriseMetricHandler
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@shared_task(queue="enterprise_telemetry")
|
||||
def process_enterprise_telemetry(envelope_json: str) -> None:
|
||||
"""Process enterprise metric/log telemetry envelope.
|
||||
|
||||
This task is enqueued by the TelemetryGateway for metric/log-only
|
||||
events. It deserializes the envelope and dispatches to the handler.
|
||||
|
||||
Best-effort processing: logs errors but never raises, to avoid
|
||||
failing user requests due to telemetry issues.
|
||||
|
||||
Args:
|
||||
envelope_json: JSON-serialized TelemetryEnvelope.
|
||||
"""
|
||||
try:
|
||||
# Deserialize envelope
|
||||
envelope_dict = json.loads(envelope_json)
|
||||
envelope = TelemetryEnvelope.model_validate(envelope_dict)
|
||||
|
||||
# Process through handler
|
||||
handler = EnterpriseMetricHandler()
|
||||
handler.handle(envelope)
|
||||
|
||||
logger.debug(
|
||||
"Successfully processed telemetry envelope: tenant_id=%s, event_id=%s, case=%s",
|
||||
envelope.tenant_id,
|
||||
envelope.event_id,
|
||||
envelope.case,
|
||||
)
|
||||
except Exception:
|
||||
# Best-effort: log and drop on error, never fail user request
|
||||
logger.warning(
|
||||
"Failed to process enterprise telemetry envelope, dropping event",
|
||||
exc_info=True,
|
||||
)
|
||||
@ -39,17 +39,36 @@ def process_trace_tasks(file_info):
|
||||
trace_info["documents"] = [Document.model_validate(doc) for doc in trace_info["documents"]]
|
||||
|
||||
try:
|
||||
trace_type = trace_info_info_map.get(trace_info_type)
|
||||
if trace_type:
|
||||
trace_info = trace_type(**trace_info)
|
||||
|
||||
from extensions.ext_enterprise_telemetry import is_enabled as is_ee_telemetry_enabled
|
||||
|
||||
if is_ee_telemetry_enabled():
|
||||
from enterprise.telemetry.enterprise_trace import EnterpriseOtelTrace
|
||||
|
||||
try:
|
||||
EnterpriseOtelTrace().trace(trace_info)
|
||||
except Exception:
|
||||
logger.exception("Enterprise trace failed for app_id: %s", app_id)
|
||||
|
||||
if trace_instance:
|
||||
with current_app.app_context():
|
||||
trace_type = trace_info_info_map.get(trace_info_type)
|
||||
if trace_type:
|
||||
trace_info = trace_type(**trace_info)
|
||||
trace_instance.trace(trace_info)
|
||||
|
||||
logger.info("Processing trace tasks success, app_id: %s", app_id)
|
||||
except Exception as e:
|
||||
logger.info("error:\n\n\n%s\n\n\n\n", e)
|
||||
logger.exception("Processing trace tasks failed, app_id: %s", app_id)
|
||||
failed_key = f"{OPS_TRACE_FAILED_KEY}_{app_id}"
|
||||
redis_client.incr(failed_key)
|
||||
logger.info("Processing trace tasks failed, app_id: %s", app_id)
|
||||
finally:
|
||||
storage.delete(file_path)
|
||||
try:
|
||||
storage.delete(file_path)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Failed to delete trace file %s for app_id %s: %s",
|
||||
file_path,
|
||||
app_id,
|
||||
e,
|
||||
)
|
||||
|
||||
@ -20,13 +20,14 @@ from core.db.session_factory import session_factory
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
from core.plugin.entities.request import TriggerInvokeEventResponse
|
||||
from core.plugin.impl.exc import PluginInvokeError
|
||||
from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE
|
||||
from core.trigger.debug.event_bus import TriggerDebugEventBus
|
||||
from core.trigger.debug.events import PluginTriggerDebugEvent, build_plugin_pool_key
|
||||
from core.trigger.entities.entities import TriggerProviderEntity
|
||||
from core.trigger.provider import PluginTriggerProviderController
|
||||
from core.trigger.trigger_manager import TriggerManager
|
||||
from dify_graph.enums import NodeType, WorkflowExecutionStatus
|
||||
from dify_graph.nodes.trigger_plugin.entities import TriggerEventNodeData
|
||||
from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData
|
||||
from dify_graph.enums import WorkflowExecutionStatus
|
||||
from enums.quota_type import QuotaType, unlimited
|
||||
from models.enums import (
|
||||
AppTriggerType,
|
||||
@ -164,7 +165,7 @@ def _record_trigger_failure_log(
|
||||
elapsed_time=0.0,
|
||||
total_tokens=0,
|
||||
total_steps=0,
|
||||
created_by_role=created_by_role.value,
|
||||
created_by_role=created_by_role,
|
||||
created_by=created_by,
|
||||
created_at=now,
|
||||
finished_at=now,
|
||||
@ -179,7 +180,7 @@ def _record_trigger_failure_log(
|
||||
workflow_id=workflow.id,
|
||||
workflow_run_id=workflow_run.id,
|
||||
created_from=WorkflowAppLogCreatedFrom.SERVICE_API.value,
|
||||
created_by_role=created_by_role.value,
|
||||
created_by_role=created_by_role,
|
||||
created_by=created_by,
|
||||
)
|
||||
session.add(workflow_app_log)
|
||||
@ -212,7 +213,7 @@ def _record_trigger_failure_log(
|
||||
error=error_message,
|
||||
queue_name=queue_name,
|
||||
retry_count=0,
|
||||
created_by_role=created_by_role.value,
|
||||
created_by_role=created_by_role,
|
||||
created_by=created_by,
|
||||
triggered_at=now,
|
||||
finished_at=now,
|
||||
@ -278,7 +279,7 @@ def dispatch_triggered_workflow(
|
||||
|
||||
# Find the trigger node in the workflow
|
||||
event_node = None
|
||||
for node_id, node_config in workflow.walk_nodes(NodeType.TRIGGER_PLUGIN):
|
||||
for node_id, node_config in workflow.walk_nodes(TRIGGER_PLUGIN_NODE_TYPE):
|
||||
if node_id == plugin_trigger.node_id:
|
||||
event_node = node_config
|
||||
break
|
||||
|
||||
@ -94,13 +94,15 @@ def _create_workflow_run_from_execution(
|
||||
workflow_run.tenant_id = tenant_id
|
||||
workflow_run.app_id = app_id
|
||||
workflow_run.workflow_id = execution.workflow_id
|
||||
workflow_run.type = execution.workflow_type.value
|
||||
workflow_run.triggered_from = triggered_from.value
|
||||
from models.workflow import WorkflowType as ModelWorkflowType
|
||||
|
||||
workflow_run.type = ModelWorkflowType(execution.workflow_type.value)
|
||||
workflow_run.triggered_from = triggered_from
|
||||
workflow_run.version = execution.workflow_version
|
||||
json_converter = WorkflowRuntimeTypeConverter()
|
||||
workflow_run.graph = json.dumps(json_converter.to_json_encodable(execution.graph))
|
||||
workflow_run.inputs = json.dumps(json_converter.to_json_encodable(execution.inputs))
|
||||
workflow_run.status = execution.status.value
|
||||
workflow_run.status = execution.status
|
||||
workflow_run.outputs = (
|
||||
json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else "{}"
|
||||
)
|
||||
@ -108,7 +110,7 @@ def _create_workflow_run_from_execution(
|
||||
workflow_run.elapsed_time = execution.elapsed_time
|
||||
workflow_run.total_tokens = execution.total_tokens
|
||||
workflow_run.total_steps = execution.total_steps
|
||||
workflow_run.created_by_role = creator_user_role.value
|
||||
workflow_run.created_by_role = creator_user_role
|
||||
workflow_run.created_by = creator_user_id
|
||||
workflow_run.created_at = execution.started_at
|
||||
workflow_run.finished_at = execution.finished_at
|
||||
@ -121,7 +123,7 @@ def _update_workflow_run_from_execution(workflow_run: WorkflowRun, execution: Wo
|
||||
Update a WorkflowRun database model from a WorkflowExecution domain entity.
|
||||
"""
|
||||
json_converter = WorkflowRuntimeTypeConverter()
|
||||
workflow_run.status = execution.status.value
|
||||
workflow_run.status = execution.status
|
||||
workflow_run.outputs = (
|
||||
json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else "{}"
|
||||
)
|
||||
|
||||
@ -98,12 +98,12 @@ def _create_node_execution_from_domain(
|
||||
node_execution.tenant_id = tenant_id
|
||||
node_execution.app_id = app_id
|
||||
node_execution.workflow_id = execution.workflow_id
|
||||
node_execution.triggered_from = triggered_from.value
|
||||
node_execution.triggered_from = triggered_from
|
||||
node_execution.workflow_run_id = execution.workflow_execution_id
|
||||
node_execution.index = execution.index
|
||||
node_execution.predecessor_node_id = execution.predecessor_node_id
|
||||
node_execution.node_id = execution.node_id
|
||||
node_execution.node_type = execution.node_type.value
|
||||
node_execution.node_type = execution.node_type
|
||||
node_execution.title = execution.title
|
||||
node_execution.node_execution_id = execution.node_execution_id
|
||||
|
||||
@ -128,7 +128,7 @@ def _create_node_execution_from_domain(
|
||||
node_execution.status = execution.status.value
|
||||
node_execution.error = execution.error
|
||||
node_execution.elapsed_time = execution.elapsed_time
|
||||
node_execution.created_by_role = creator_user_role.value
|
||||
node_execution.created_by_role = creator_user_role
|
||||
node_execution.created_by = creator_user_id
|
||||
node_execution.created_at = execution.created_at
|
||||
node_execution.finished_at = execution.finished_at
|
||||
|
||||
@ -3,7 +3,7 @@ import logging
|
||||
from celery import shared_task
|
||||
|
||||
from core.db.session_factory import session_factory
|
||||
from dify_graph.nodes.trigger_schedule.exc import (
|
||||
from core.workflow.nodes.trigger_schedule.exc import (
|
||||
ScheduleExecutionError,
|
||||
ScheduleNotFoundError,
|
||||
TenantOwnerNotFoundError,
|
||||
|
||||
Reference in New Issue
Block a user