mirror of
https://github.com/langgenius/dify.git
synced 2026-05-04 09:28:04 +08:00
feat(telemetry): add enterprise OTEL telemetry with gateway, traces, metrics, and logs
This commit is contained in:
@ -21,19 +21,25 @@ from core.ops.entities.config_entity import (
|
||||
)
|
||||
from core.ops.entities.trace_entity import (
|
||||
DatasetRetrievalTraceInfo,
|
||||
DraftNodeExecutionTrace,
|
||||
GenerateNameTraceInfo,
|
||||
MessageTraceInfo,
|
||||
ModerationTraceInfo,
|
||||
PromptGenerationTraceInfo,
|
||||
SuggestedQuestionTraceInfo,
|
||||
TaskData,
|
||||
ToolTraceInfo,
|
||||
TraceTaskName,
|
||||
WorkflowNodeTraceInfo,
|
||||
WorkflowTraceInfo,
|
||||
)
|
||||
from core.ops.utils import get_message_data
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_storage import storage
|
||||
from models.account import Tenant
|
||||
from models.dataset import Dataset
|
||||
from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig
|
||||
from models.tools import ApiToolProvider, BuiltinToolProvider, MCPToolProvider, WorkflowToolProvider
|
||||
from models.workflow import WorkflowAppLog
|
||||
from tasks.ops_trace_task import process_trace_tasks
|
||||
|
||||
@ -43,6 +49,44 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _lookup_app_and_workspace_names(app_id: str | None, tenant_id: str | None) -> tuple[str, str]:
|
||||
"""Return (app_name, workspace_name) for the given IDs. Falls back to empty strings."""
|
||||
app_name = ""
|
||||
workspace_name = ""
|
||||
if not app_id and not tenant_id:
|
||||
return app_name, workspace_name
|
||||
with Session(db.engine) as session:
|
||||
if app_id:
|
||||
name = session.scalar(select(App.name).where(App.id == app_id))
|
||||
if name:
|
||||
app_name = name
|
||||
if tenant_id:
|
||||
name = session.scalar(select(Tenant.name).where(Tenant.id == tenant_id))
|
||||
if name:
|
||||
workspace_name = name
|
||||
return app_name, workspace_name
|
||||
|
||||
|
||||
_PROVIDER_TYPE_TO_MODEL: dict[str, type] = {
|
||||
"builtin": BuiltinToolProvider,
|
||||
"plugin": BuiltinToolProvider,
|
||||
"api": ApiToolProvider,
|
||||
"workflow": WorkflowToolProvider,
|
||||
"mcp": MCPToolProvider,
|
||||
}
|
||||
|
||||
|
||||
def _lookup_credential_name(credential_id: str | None, provider_type: str | None) -> str:
|
||||
if not credential_id:
|
||||
return ""
|
||||
model_cls = _PROVIDER_TYPE_TO_MODEL.get(provider_type or "")
|
||||
if not model_cls:
|
||||
return ""
|
||||
with Session(db.engine) as session:
|
||||
name = session.scalar(select(model_cls.name).where(model_cls.id == credential_id))
|
||||
return str(name) if name else ""
|
||||
|
||||
|
||||
class OpsTraceProviderConfigMap(collections.UserDict[str, dict[str, Any]]):
|
||||
def __getitem__(self, provider: str) -> dict[str, Any]:
|
||||
match provider:
|
||||
@ -317,6 +361,10 @@ class OpsTraceManager:
|
||||
if app_id is None:
|
||||
return None
|
||||
|
||||
# Handle storage_id format (tenant-{uuid}) - not a real app_id
|
||||
if isinstance(app_id, str) and app_id.startswith("tenant-"):
|
||||
return None
|
||||
|
||||
app: App | None = db.session.query(App).where(App.id == app_id).first()
|
||||
|
||||
if app is None:
|
||||
@ -479,6 +527,56 @@ class TraceTask:
|
||||
cls._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
|
||||
return cls._workflow_run_repo
|
||||
|
||||
@classmethod
|
||||
def _get_user_id_from_metadata(cls, metadata: dict[str, Any]) -> str:
|
||||
"""Extract user ID from metadata, prioritizing end_user over account.
|
||||
|
||||
Returns the actual user ID (end_user or account) who invoked the workflow,
|
||||
regardless of invoke_from context.
|
||||
"""
|
||||
# Priority 1: End user (external users via API/WebApp)
|
||||
if user_id := metadata.get("from_end_user_id"):
|
||||
return f"end_user:{user_id}"
|
||||
|
||||
# Priority 2: Account user (internal users via console/debugger)
|
||||
if user_id := metadata.get("from_account_id"):
|
||||
return f"account:{user_id}"
|
||||
|
||||
# Priority 3: User (internal users via console/debugger)
|
||||
if user_id := metadata.get("user_id"):
|
||||
return f"user:{user_id}"
|
||||
|
||||
return "anonymous"
|
||||
|
||||
@classmethod
|
||||
def _calculate_workflow_token_split(cls, workflow_run_id: str, tenant_id: str) -> tuple[int, int]:
|
||||
from core.workflow.enums import WorkflowNodeExecutionMetadataKey
|
||||
from models.workflow import WorkflowNodeExecutionModel
|
||||
|
||||
with Session(db.engine) as session:
|
||||
node_executions = session.scalars(
|
||||
select(WorkflowNodeExecutionModel).where(
|
||||
WorkflowNodeExecutionModel.tenant_id == tenant_id,
|
||||
WorkflowNodeExecutionModel.workflow_run_id == workflow_run_id,
|
||||
)
|
||||
).all()
|
||||
|
||||
total_prompt = 0
|
||||
total_completion = 0
|
||||
|
||||
for node_exec in node_executions:
|
||||
metadata = node_exec.execution_metadata_dict
|
||||
|
||||
prompt = metadata.get(WorkflowNodeExecutionMetadataKey.PROMPT_TOKENS)
|
||||
if prompt is not None:
|
||||
total_prompt += prompt
|
||||
|
||||
completion = metadata.get(WorkflowNodeExecutionMetadataKey.COMPLETION_TOKENS)
|
||||
if completion is not None:
|
||||
total_completion += completion
|
||||
|
||||
return (total_prompt, total_completion)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
trace_type: Any,
|
||||
@ -499,6 +597,8 @@ class TraceTask:
|
||||
self.app_id = None
|
||||
self.trace_id = None
|
||||
self.kwargs = kwargs
|
||||
if user_id is not None and "user_id" not in self.kwargs:
|
||||
self.kwargs["user_id"] = user_id
|
||||
external_trace_id = kwargs.get("external_trace_id")
|
||||
if external_trace_id:
|
||||
self.trace_id = external_trace_id
|
||||
@ -512,7 +612,7 @@ class TraceTask:
|
||||
TraceTaskName.WORKFLOW_TRACE: lambda: self.workflow_trace(
|
||||
workflow_run_id=self.workflow_run_id, conversation_id=self.conversation_id, user_id=self.user_id
|
||||
),
|
||||
TraceTaskName.MESSAGE_TRACE: lambda: self.message_trace(message_id=self.message_id),
|
||||
TraceTaskName.MESSAGE_TRACE: lambda: self.message_trace(message_id=self.message_id, **self.kwargs),
|
||||
TraceTaskName.MODERATION_TRACE: lambda: self.moderation_trace(
|
||||
message_id=self.message_id, timer=self.timer, **self.kwargs
|
||||
),
|
||||
@ -528,6 +628,9 @@ class TraceTask:
|
||||
TraceTaskName.GENERATE_NAME_TRACE: lambda: self.generate_name_trace(
|
||||
conversation_id=self.conversation_id, timer=self.timer, **self.kwargs
|
||||
),
|
||||
TraceTaskName.PROMPT_GENERATION_TRACE: lambda: self.prompt_generation_trace(**self.kwargs),
|
||||
TraceTaskName.NODE_EXECUTION_TRACE: lambda: self.node_execution_trace(**self.kwargs),
|
||||
TraceTaskName.DRAFT_NODE_EXECUTION_TRACE: lambda: self.draft_node_execution_trace(**self.kwargs),
|
||||
}
|
||||
|
||||
return preprocess_map.get(self.trace_type, lambda: None)()
|
||||
@ -563,6 +666,10 @@ class TraceTask:
|
||||
|
||||
total_tokens = workflow_run.total_tokens
|
||||
|
||||
prompt_tokens, completion_tokens = self._calculate_workflow_token_split(
|
||||
workflow_run_id=workflow_run_id, tenant_id=tenant_id
|
||||
)
|
||||
|
||||
file_list = workflow_run_inputs.get("sys.file") or []
|
||||
query = workflow_run_inputs.get("query") or workflow_run_inputs.get("sys.query") or ""
|
||||
|
||||
@ -583,7 +690,9 @@ class TraceTask:
|
||||
)
|
||||
message_id = session.scalar(message_data_stmt)
|
||||
|
||||
metadata = {
|
||||
app_name, workspace_name = _lookup_app_and_workspace_names(workflow_run.app_id, tenant_id)
|
||||
|
||||
metadata: dict[str, Any] = {
|
||||
"workflow_id": workflow_id,
|
||||
"conversation_id": conversation_id,
|
||||
"workflow_run_id": workflow_run_id,
|
||||
@ -596,8 +705,14 @@ class TraceTask:
|
||||
"triggered_from": workflow_run.triggered_from,
|
||||
"user_id": user_id,
|
||||
"app_id": workflow_run.app_id,
|
||||
"app_name": app_name,
|
||||
"workspace_name": workspace_name,
|
||||
}
|
||||
|
||||
parent_trace_context = self.kwargs.get("parent_trace_context")
|
||||
if parent_trace_context:
|
||||
metadata["parent_trace_context"] = parent_trace_context
|
||||
|
||||
workflow_trace_info = WorkflowTraceInfo(
|
||||
trace_id=self.trace_id,
|
||||
workflow_data=workflow_run.to_dict(),
|
||||
@ -612,6 +727,8 @@ class TraceTask:
|
||||
workflow_run_version=workflow_run_version,
|
||||
error=error,
|
||||
total_tokens=total_tokens,
|
||||
prompt_tokens=prompt_tokens,
|
||||
completion_tokens=completion_tokens,
|
||||
file_list=file_list,
|
||||
query=query,
|
||||
metadata=metadata,
|
||||
@ -619,10 +736,11 @@ class TraceTask:
|
||||
message_id=message_id,
|
||||
start_time=workflow_run.created_at,
|
||||
end_time=workflow_run.finished_at,
|
||||
invoked_by=self._get_user_id_from_metadata(metadata),
|
||||
)
|
||||
return workflow_trace_info
|
||||
|
||||
def message_trace(self, message_id: str | None):
|
||||
def message_trace(self, message_id: str | None, **kwargs):
|
||||
if not message_id:
|
||||
return {}
|
||||
message_data = get_message_data(message_id)
|
||||
@ -645,6 +763,14 @@ class TraceTask:
|
||||
|
||||
streaming_metrics = self._extract_streaming_metrics(message_data)
|
||||
|
||||
tenant_id = ""
|
||||
with Session(db.engine) as session:
|
||||
tid = session.scalar(select(App.tenant_id).where(App.id == message_data.app_id))
|
||||
if tid:
|
||||
tenant_id = str(tid)
|
||||
|
||||
app_name, workspace_name = _lookup_app_and_workspace_names(message_data.app_id, tenant_id)
|
||||
|
||||
metadata = {
|
||||
"conversation_id": message_data.conversation_id,
|
||||
"ls_provider": message_data.model_provider,
|
||||
@ -656,7 +782,14 @@ class TraceTask:
|
||||
"workflow_run_id": message_data.workflow_run_id,
|
||||
"from_source": message_data.from_source,
|
||||
"message_id": message_id,
|
||||
"tenant_id": tenant_id,
|
||||
"app_id": message_data.app_id,
|
||||
"user_id": message_data.from_end_user_id or message_data.from_account_id,
|
||||
"app_name": app_name,
|
||||
"workspace_name": workspace_name,
|
||||
}
|
||||
if node_execution_id := kwargs.get("node_execution_id"):
|
||||
metadata["node_execution_id"] = node_execution_id
|
||||
|
||||
message_tokens = message_data.message_tokens
|
||||
|
||||
@ -698,6 +831,8 @@ class TraceTask:
|
||||
"preset_response": moderation_result.preset_response,
|
||||
"query": moderation_result.query,
|
||||
}
|
||||
if node_execution_id := kwargs.get("node_execution_id"):
|
||||
metadata["node_execution_id"] = node_execution_id
|
||||
|
||||
# get workflow_app_log_id
|
||||
workflow_app_log_id = None
|
||||
@ -739,6 +874,8 @@ class TraceTask:
|
||||
"workflow_run_id": message_data.workflow_run_id,
|
||||
"from_source": message_data.from_source,
|
||||
}
|
||||
if node_execution_id := kwargs.get("node_execution_id"):
|
||||
metadata["node_execution_id"] = node_execution_id
|
||||
|
||||
# get workflow_app_log_id
|
||||
workflow_app_log_id = None
|
||||
@ -778,6 +915,36 @@ class TraceTask:
|
||||
if not message_data:
|
||||
return {}
|
||||
|
||||
tenant_id = ""
|
||||
with Session(db.engine) as session:
|
||||
tid = session.scalar(select(App.tenant_id).where(App.id == message_data.app_id))
|
||||
if tid:
|
||||
tenant_id = str(tid)
|
||||
|
||||
app_name, workspace_name = _lookup_app_and_workspace_names(message_data.app_id, tenant_id)
|
||||
|
||||
doc_list = [doc.model_dump() for doc in documents] if documents else []
|
||||
dataset_ids: set[str] = set()
|
||||
for doc in doc_list:
|
||||
doc_meta = doc.get("metadata") or {}
|
||||
did = doc_meta.get("dataset_id")
|
||||
if did:
|
||||
dataset_ids.add(did)
|
||||
|
||||
embedding_models: dict[str, dict[str, str]] = {}
|
||||
if dataset_ids:
|
||||
with Session(db.engine) as session:
|
||||
rows = session.execute(
|
||||
select(Dataset.id, Dataset.embedding_model, Dataset.embedding_model_provider).where(
|
||||
Dataset.id.in_(list(dataset_ids))
|
||||
)
|
||||
).all()
|
||||
for row in rows:
|
||||
embedding_models[str(row[0])] = {
|
||||
"embedding_model": row[1] or "",
|
||||
"embedding_model_provider": row[2] or "",
|
||||
}
|
||||
|
||||
metadata = {
|
||||
"message_id": message_id,
|
||||
"ls_provider": message_data.model_provider,
|
||||
@ -788,13 +955,21 @@ class TraceTask:
|
||||
"agent_based": message_data.agent_based,
|
||||
"workflow_run_id": message_data.workflow_run_id,
|
||||
"from_source": message_data.from_source,
|
||||
"tenant_id": tenant_id,
|
||||
"app_id": message_data.app_id,
|
||||
"user_id": message_data.from_end_user_id or message_data.from_account_id,
|
||||
"app_name": app_name,
|
||||
"workspace_name": workspace_name,
|
||||
"embedding_models": embedding_models,
|
||||
}
|
||||
if node_execution_id := kwargs.get("node_execution_id"):
|
||||
metadata["node_execution_id"] = node_execution_id
|
||||
|
||||
dataset_retrieval_trace_info = DatasetRetrievalTraceInfo(
|
||||
trace_id=self.trace_id,
|
||||
message_id=message_id,
|
||||
inputs=message_data.query or message_data.inputs,
|
||||
documents=[doc.model_dump() for doc in documents] if documents else [],
|
||||
documents=doc_list,
|
||||
start_time=timer.get("start"),
|
||||
end_time=timer.get("end"),
|
||||
metadata=metadata,
|
||||
@ -837,6 +1012,10 @@ class TraceTask:
|
||||
"error": error,
|
||||
"tool_parameters": tool_parameters,
|
||||
}
|
||||
if message_data.workflow_run_id:
|
||||
metadata["workflow_run_id"] = message_data.workflow_run_id
|
||||
if node_execution_id := kwargs.get("node_execution_id"):
|
||||
metadata["node_execution_id"] = node_execution_id
|
||||
|
||||
file_url = ""
|
||||
message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
|
||||
@ -891,6 +1070,8 @@ class TraceTask:
|
||||
"conversation_id": conversation_id,
|
||||
"tenant_id": tenant_id,
|
||||
}
|
||||
if node_execution_id := kwargs.get("node_execution_id"):
|
||||
metadata["node_execution_id"] = node_execution_id
|
||||
|
||||
generate_name_trace_info = GenerateNameTraceInfo(
|
||||
trace_id=self.trace_id,
|
||||
@ -905,6 +1086,158 @@ class TraceTask:
|
||||
|
||||
return generate_name_trace_info
|
||||
|
||||
def prompt_generation_trace(self, **kwargs) -> PromptGenerationTraceInfo | dict:
|
||||
tenant_id = kwargs.get("tenant_id", "")
|
||||
user_id = kwargs.get("user_id", "")
|
||||
app_id = kwargs.get("app_id")
|
||||
operation_type = kwargs.get("operation_type", "")
|
||||
instruction = kwargs.get("instruction", "")
|
||||
generated_output = kwargs.get("generated_output", "")
|
||||
|
||||
prompt_tokens = kwargs.get("prompt_tokens", 0)
|
||||
completion_tokens = kwargs.get("completion_tokens", 0)
|
||||
total_tokens = kwargs.get("total_tokens", 0)
|
||||
|
||||
model_provider = kwargs.get("model_provider", "")
|
||||
model_name = kwargs.get("model_name", "")
|
||||
|
||||
latency = kwargs.get("latency", 0.0)
|
||||
|
||||
timer = kwargs.get("timer")
|
||||
start_time = timer.get("start") if timer else None
|
||||
end_time = timer.get("end") if timer else None
|
||||
|
||||
total_price = kwargs.get("total_price")
|
||||
currency = kwargs.get("currency")
|
||||
|
||||
error = kwargs.get("error")
|
||||
|
||||
app_name = None
|
||||
workspace_name = None
|
||||
if app_id:
|
||||
app_name, workspace_name = _lookup_app_and_workspace_names(app_id, tenant_id)
|
||||
|
||||
metadata = {
|
||||
"tenant_id": tenant_id,
|
||||
"user_id": user_id,
|
||||
"app_id": app_id or "",
|
||||
"app_name": app_name,
|
||||
"workspace_name": workspace_name,
|
||||
"operation_type": operation_type,
|
||||
"model_provider": model_provider,
|
||||
"model_name": model_name,
|
||||
}
|
||||
if node_execution_id := kwargs.get("node_execution_id"):
|
||||
metadata["node_execution_id"] = node_execution_id
|
||||
|
||||
return PromptGenerationTraceInfo(
|
||||
trace_id=self.trace_id,
|
||||
inputs=instruction,
|
||||
outputs=generated_output,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
metadata=metadata,
|
||||
tenant_id=tenant_id,
|
||||
user_id=user_id,
|
||||
app_id=app_id,
|
||||
operation_type=operation_type,
|
||||
instruction=instruction,
|
||||
prompt_tokens=prompt_tokens,
|
||||
completion_tokens=completion_tokens,
|
||||
total_tokens=total_tokens,
|
||||
model_provider=model_provider,
|
||||
model_name=model_name,
|
||||
latency=latency,
|
||||
total_price=total_price,
|
||||
currency=currency,
|
||||
error=error,
|
||||
)
|
||||
|
||||
def node_execution_trace(self, **kwargs) -> WorkflowNodeTraceInfo | dict:
|
||||
node_data: dict = kwargs.get("node_execution_data", {})
|
||||
if not node_data:
|
||||
return {}
|
||||
|
||||
app_name, workspace_name = _lookup_app_and_workspace_names(node_data.get("app_id"), node_data.get("tenant_id"))
|
||||
|
||||
credential_name = _lookup_credential_name(
|
||||
node_data.get("credential_id"), node_data.get("credential_provider_type")
|
||||
)
|
||||
|
||||
metadata: dict[str, Any] = {
|
||||
"tenant_id": node_data.get("tenant_id"),
|
||||
"app_id": node_data.get("app_id"),
|
||||
"app_name": app_name,
|
||||
"workspace_name": workspace_name,
|
||||
"user_id": node_data.get("user_id"),
|
||||
"dataset_ids": node_data.get("dataset_ids"),
|
||||
"dataset_names": node_data.get("dataset_names"),
|
||||
"plugin_name": node_data.get("plugin_name"),
|
||||
"credential_name": credential_name,
|
||||
}
|
||||
|
||||
parent_trace_context = node_data.get("parent_trace_context")
|
||||
if parent_trace_context:
|
||||
metadata["parent_trace_context"] = parent_trace_context
|
||||
|
||||
message_id: str | None = None
|
||||
conversation_id = node_data.get("conversation_id")
|
||||
workflow_execution_id = node_data.get("workflow_execution_id")
|
||||
if conversation_id and workflow_execution_id and not parent_trace_context:
|
||||
with Session(db.engine) as session:
|
||||
msg_id = session.scalar(
|
||||
select(Message.id).where(
|
||||
Message.conversation_id == conversation_id,
|
||||
Message.workflow_run_id == workflow_execution_id,
|
||||
)
|
||||
)
|
||||
if msg_id:
|
||||
message_id = str(msg_id)
|
||||
metadata["message_id"] = message_id
|
||||
|
||||
return WorkflowNodeTraceInfo(
|
||||
trace_id=self.trace_id,
|
||||
message_id=message_id,
|
||||
start_time=node_data.get("created_at"),
|
||||
end_time=node_data.get("finished_at"),
|
||||
metadata=metadata,
|
||||
workflow_id=node_data.get("workflow_id", ""),
|
||||
workflow_run_id=node_data.get("workflow_execution_id", ""),
|
||||
tenant_id=node_data.get("tenant_id", ""),
|
||||
node_execution_id=node_data.get("node_execution_id", ""),
|
||||
node_id=node_data.get("node_id", ""),
|
||||
node_type=node_data.get("node_type", ""),
|
||||
title=node_data.get("title", ""),
|
||||
status=node_data.get("status", ""),
|
||||
error=node_data.get("error"),
|
||||
elapsed_time=node_data.get("elapsed_time", 0.0),
|
||||
index=node_data.get("index", 0),
|
||||
predecessor_node_id=node_data.get("predecessor_node_id"),
|
||||
total_tokens=node_data.get("total_tokens", 0),
|
||||
total_price=node_data.get("total_price", 0.0),
|
||||
currency=node_data.get("currency"),
|
||||
model_provider=node_data.get("model_provider"),
|
||||
model_name=node_data.get("model_name"),
|
||||
prompt_tokens=node_data.get("prompt_tokens"),
|
||||
completion_tokens=node_data.get("completion_tokens"),
|
||||
tool_name=node_data.get("tool_name"),
|
||||
iteration_id=node_data.get("iteration_id"),
|
||||
iteration_index=node_data.get("iteration_index"),
|
||||
loop_id=node_data.get("loop_id"),
|
||||
loop_index=node_data.get("loop_index"),
|
||||
parallel_id=node_data.get("parallel_id"),
|
||||
node_inputs=node_data.get("node_inputs"),
|
||||
node_outputs=node_data.get("node_outputs"),
|
||||
process_data=node_data.get("process_data"),
|
||||
invoked_by=self._get_user_id_from_metadata(metadata),
|
||||
)
|
||||
|
||||
def draft_node_execution_trace(self, **kwargs) -> DraftNodeExecutionTrace | dict:
|
||||
node_trace = self.node_execution_trace(**kwargs)
|
||||
if not node_trace or not isinstance(node_trace, WorkflowNodeTraceInfo):
|
||||
return node_trace
|
||||
return DraftNodeExecutionTrace(**node_trace.model_dump())
|
||||
|
||||
def _extract_streaming_metrics(self, message_data) -> dict:
|
||||
if not message_data.message_metadata:
|
||||
return {}
|
||||
@ -938,13 +1271,17 @@ class TraceQueueManager:
|
||||
self.user_id = user_id
|
||||
self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
|
||||
self.flask_app = current_app._get_current_object() # type: ignore
|
||||
|
||||
from core.telemetry import is_enterprise_telemetry_enabled
|
||||
|
||||
self._enterprise_telemetry_enabled = is_enterprise_telemetry_enabled()
|
||||
if trace_manager_timer is None:
|
||||
self.start_timer()
|
||||
|
||||
def add_trace_task(self, trace_task: TraceTask):
|
||||
global trace_manager_timer, trace_manager_queue
|
||||
try:
|
||||
if self.trace_instance:
|
||||
if self._enterprise_telemetry_enabled or self.trace_instance:
|
||||
trace_task.app_id = self.app_id
|
||||
trace_manager_queue.put(trace_task)
|
||||
except Exception:
|
||||
@ -980,20 +1317,27 @@ class TraceQueueManager:
|
||||
def send_to_celery(self, tasks: list[TraceTask]):
|
||||
with self.flask_app.app_context():
|
||||
for task in tasks:
|
||||
if task.app_id is None:
|
||||
continue
|
||||
storage_id = task.app_id
|
||||
if storage_id is None:
|
||||
tenant_id = task.kwargs.get("tenant_id")
|
||||
if tenant_id:
|
||||
storage_id = f"tenant-{tenant_id}"
|
||||
else:
|
||||
logger.warning("Skipping trace without app_id or tenant_id, trace_type: %s", task.trace_type)
|
||||
continue
|
||||
|
||||
file_id = uuid4().hex
|
||||
trace_info = task.execute()
|
||||
|
||||
task_data = TaskData(
|
||||
app_id=task.app_id,
|
||||
app_id=storage_id,
|
||||
trace_info_type=type(trace_info).__name__,
|
||||
trace_info=trace_info.model_dump() if trace_info else None,
|
||||
)
|
||||
file_path = f"{OPS_FILE_PATH}{task.app_id}/{file_id}.json"
|
||||
file_path = f"{OPS_FILE_PATH}{storage_id}/{file_id}.json"
|
||||
storage.save(file_path, task_data.model_dump_json().encode("utf-8"))
|
||||
file_info = {
|
||||
"file_id": file_id,
|
||||
"app_id": task.app_id,
|
||||
"app_id": storage_id,
|
||||
}
|
||||
process_trace_tasks.delay(file_info) # type: ignore
|
||||
|
||||
Reference in New Issue
Block a user