Merge remote-tracking branch 'origin/main' into feat/trigger

This commit is contained in:
lyzno1
2025-10-30 12:14:47 +08:00
37 changed files with 851 additions and 149 deletions

View File

@ -1,3 +1,4 @@
import json
import logging
import re
import time
@ -60,6 +61,7 @@ from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTas
from core.app.task_pipeline.message_cycle_manager import MessageCycleManager
from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
from core.model_runtime.entities.llm_entities import LLMUsage
from core.model_runtime.utils.encoders import jsonable_encoder
from core.ops.ops_trace_manager import TraceQueueManager
from core.workflow.enums import WorkflowExecutionStatus
from core.workflow.nodes import NodeType
@ -391,6 +393,14 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
if should_direct_answer:
return
current_time = time.perf_counter()
if self._task_state.first_token_time is None and delta_text.strip():
self._task_state.first_token_time = current_time
self._task_state.is_streaming_response = True
if delta_text.strip():
self._task_state.last_token_time = current_time
# Only publish tts message at text chunk streaming
if tts_publisher and queue_message:
tts_publisher.publish(queue_message)
@ -772,7 +782,33 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
message.answer = answer_text
message.updated_at = naive_utc_now()
message.provider_response_latency = time.perf_counter() - self._base_task_pipeline.start_at
message.message_metadata = self._task_state.metadata.model_dump_json()
# Set usage first before dumping metadata
if graph_runtime_state and graph_runtime_state.llm_usage:
usage = graph_runtime_state.llm_usage
message.message_tokens = usage.prompt_tokens
message.message_unit_price = usage.prompt_unit_price
message.message_price_unit = usage.prompt_price_unit
message.answer_tokens = usage.completion_tokens
message.answer_unit_price = usage.completion_unit_price
message.answer_price_unit = usage.completion_price_unit
message.total_price = usage.total_price
message.currency = usage.currency
self._task_state.metadata.usage = usage
else:
usage = LLMUsage.empty_usage()
self._task_state.metadata.usage = usage
# Add streaming metrics to usage if available
if self._task_state.is_streaming_response and self._task_state.first_token_time:
start_time = self._base_task_pipeline.start_at
first_token_time = self._task_state.first_token_time
last_token_time = self._task_state.last_token_time or first_token_time
usage.time_to_first_token = round(first_token_time - start_time, 3)
usage.time_to_generate = round(last_token_time - first_token_time, 3)
metadata = self._task_state.metadata.model_dump()
message.message_metadata = json.dumps(jsonable_encoder(metadata))
message_files = [
MessageFile(
message_id=message.id,
@ -790,20 +826,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
]
session.add_all(message_files)
if graph_runtime_state and graph_runtime_state.llm_usage:
usage = graph_runtime_state.llm_usage
message.message_tokens = usage.prompt_tokens
message.message_unit_price = usage.prompt_unit_price
message.message_price_unit = usage.prompt_price_unit
message.answer_tokens = usage.completion_tokens
message.answer_unit_price = usage.completion_unit_price
message.answer_price_unit = usage.completion_price_unit
message.total_price = usage.total_price
message.currency = usage.currency
self._task_state.metadata.usage = usage
else:
self._task_state.metadata.usage = LLMUsage.empty_usage()
def _seed_graph_runtime_state_from_queue_manager(self) -> None:
"""Bootstrap the cached runtime state from the queue manager when present."""
candidate = self._base_task_pipeline.queue_manager.graph_runtime_state

View File

@ -48,6 +48,9 @@ class WorkflowTaskState(TaskState):
"""
answer: str = ""
first_token_time: float | None = None
last_token_time: float | None = None
is_streaming_response: bool = False
class StreamEvent(StrEnum):

View File

@ -29,6 +29,18 @@ def batch_fetch_plugin_manifests(plugin_ids: list[str]) -> Sequence[MarketplaceP
return [MarketplacePluginDeclaration.model_validate(plugin) for plugin in response.json()["data"]["plugins"]]
def batch_fetch_plugin_by_ids(plugin_ids: list[str]) -> list[dict]:
if not plugin_ids:
return []
url = str(marketplace_api_url / "api/v1/plugins/batch")
response = httpx.post(url, json={"plugin_ids": plugin_ids}, headers={"X-Dify-Version": dify_config.project.version})
response.raise_for_status()
data = response.json()
return data.get("data", {}).get("plugins", [])
def batch_fetch_plugin_manifests_ignore_deserialization_error(
plugin_ids: list[str],
) -> Sequence[MarketplacePluginDeclaration]:

View File

@ -38,6 +38,8 @@ class LLMUsageMetadata(TypedDict, total=False):
prompt_price: Union[float, str]
completion_price: Union[float, str]
latency: float
time_to_first_token: float
time_to_generate: float
class LLMUsage(ModelUsage):
@ -57,6 +59,8 @@ class LLMUsage(ModelUsage):
total_price: Decimal
currency: str
latency: float
time_to_first_token: float | None = None
time_to_generate: float | None = None
@classmethod
def empty_usage(cls):
@ -73,6 +77,8 @@ class LLMUsage(ModelUsage):
total_price=Decimal("0.0"),
currency="USD",
latency=0.0,
time_to_first_token=None,
time_to_generate=None,
)
@classmethod
@ -108,6 +114,8 @@ class LLMUsage(ModelUsage):
prompt_price=Decimal(str(metadata.get("prompt_price", 0))),
completion_price=Decimal(str(metadata.get("completion_price", 0))),
latency=metadata.get("latency", 0.0),
time_to_first_token=metadata.get("time_to_first_token"),
time_to_generate=metadata.get("time_to_generate"),
)
def plus(self, other: LLMUsage) -> LLMUsage:
@ -133,6 +141,8 @@ class LLMUsage(ModelUsage):
total_price=self.total_price + other.total_price,
currency=other.currency,
latency=self.latency + other.latency,
time_to_first_token=other.time_to_first_token,
time_to_generate=other.time_to_generate,
)
def __add__(self, other: LLMUsage) -> LLMUsage:

View File

@ -62,6 +62,9 @@ class MessageTraceInfo(BaseTraceInfo):
file_list: Union[str, dict[str, Any], list] | None = None
message_file_data: Any | None = None
conversation_mode: str
gen_ai_server_time_to_first_token: float | None = None
llm_streaming_time_to_generate: float | None = None
is_streaming_request: bool = False
class ModerationTraceInfo(BaseTraceInfo):

View File

@ -14,7 +14,7 @@ from flask import current_app
from sqlalchemy import select
from sqlalchemy.orm import Session, sessionmaker
from core.helper.encrypter import decrypt_token, encrypt_token, obfuscated_token
from core.helper.encrypter import batch_decrypt_token, encrypt_token, obfuscated_token
from core.ops.entities.config_entity import (
OPS_FILE_PATH,
TracingProviderEnum,
@ -141,6 +141,8 @@ provider_config_map = OpsTraceProviderConfigMap()
class OpsTraceManager:
ops_trace_instances_cache: LRUCache = LRUCache(maxsize=128)
decrypted_configs_cache: LRUCache = LRUCache(maxsize=128)
_decryption_cache_lock = threading.RLock()
@classmethod
def encrypt_tracing_config(
@ -161,7 +163,7 @@ class OpsTraceManager:
provider_config_map[tracing_provider]["other_keys"],
)
new_config = {}
new_config: dict[str, Any] = {}
# Encrypt necessary keys
for key in secret_keys:
if key in tracing_config:
@ -191,20 +193,41 @@ class OpsTraceManager:
:param tracing_config: tracing config
:return:
"""
config_class, secret_keys, other_keys = (
provider_config_map[tracing_provider]["config_class"],
provider_config_map[tracing_provider]["secret_keys"],
provider_config_map[tracing_provider]["other_keys"],
config_json = json.dumps(tracing_config, sort_keys=True)
decrypted_config_key = (
tenant_id,
tracing_provider,
config_json,
)
new_config = {}
for key in secret_keys:
if key in tracing_config:
new_config[key] = decrypt_token(tenant_id, tracing_config[key])
for key in other_keys:
new_config[key] = tracing_config.get(key, "")
# First check without lock for performance
cached_config = cls.decrypted_configs_cache.get(decrypted_config_key)
if cached_config is not None:
return dict(cached_config)
return config_class(**new_config).model_dump()
with cls._decryption_cache_lock:
# Second check (double-checked locking) to prevent race conditions
cached_config = cls.decrypted_configs_cache.get(decrypted_config_key)
if cached_config is not None:
return dict(cached_config)
config_class, secret_keys, other_keys = (
provider_config_map[tracing_provider]["config_class"],
provider_config_map[tracing_provider]["secret_keys"],
provider_config_map[tracing_provider]["other_keys"],
)
new_config: dict[str, Any] = {}
keys_to_decrypt = [key for key in secret_keys if key in tracing_config]
if keys_to_decrypt:
decrypted_values = batch_decrypt_token(tenant_id, [tracing_config[key] for key in keys_to_decrypt])
new_config.update(zip(keys_to_decrypt, decrypted_values))
for key in other_keys:
new_config[key] = tracing_config.get(key, "")
decrypted_config = config_class(**new_config).model_dump()
cls.decrypted_configs_cache[decrypted_config_key] = decrypted_config
return dict(decrypted_config)
@classmethod
def obfuscated_decrypt_token(cls, tracing_provider: str, decrypt_tracing_config: dict):
@ -219,7 +242,7 @@ class OpsTraceManager:
provider_config_map[tracing_provider]["secret_keys"],
provider_config_map[tracing_provider]["other_keys"],
)
new_config = {}
new_config: dict[str, Any] = {}
for key in secret_keys:
if key in decrypt_tracing_config:
new_config[key] = obfuscated_token(decrypt_tracing_config[key])
@ -596,6 +619,8 @@ class TraceTask:
file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else ""
file_list.append(file_url)
streaming_metrics = self._extract_streaming_metrics(message_data)
metadata = {
"conversation_id": message_data.conversation_id,
"ls_provider": message_data.model_provider,
@ -628,6 +653,9 @@ class TraceTask:
metadata=metadata,
message_file_data=message_file_data,
conversation_mode=conversation_mode,
gen_ai_server_time_to_first_token=streaming_metrics.get("gen_ai_server_time_to_first_token"),
llm_streaming_time_to_generate=streaming_metrics.get("llm_streaming_time_to_generate"),
is_streaming_request=streaming_metrics.get("is_streaming_request", False),
)
return message_trace_info
@ -853,6 +881,24 @@ class TraceTask:
return generate_name_trace_info
def _extract_streaming_metrics(self, message_data) -> dict:
if not message_data.message_metadata:
return {}
try:
metadata = json.loads(message_data.message_metadata)
usage = metadata.get("usage", {})
time_to_first_token = usage.get("time_to_first_token")
time_to_generate = usage.get("time_to_generate")
return {
"gen_ai_server_time_to_first_token": time_to_first_token,
"llm_streaming_time_to_generate": time_to_generate,
"is_streaming_request": time_to_first_token is not None,
}
except (json.JSONDecodeError, AttributeError):
return {}
trace_manager_timer: threading.Timer | None = None
trace_manager_queue: queue.Queue = queue.Queue()

View File

@ -11,6 +11,11 @@ import socket
from typing import TYPE_CHECKING
from urllib.parse import urlparse
try:
from importlib.metadata import version
except ImportError:
from importlib_metadata import version # type: ignore[import-not-found]
if TYPE_CHECKING:
from opentelemetry.metrics import Meter
from opentelemetry.metrics._internal.instrument import Histogram
@ -27,12 +32,27 @@ from opentelemetry.util.types import AttributeValue
from configs import dify_config
from .entities.tencent_semconv import LLM_OPERATION_DURATION
from .entities.semconv import (
GEN_AI_SERVER_TIME_TO_FIRST_TOKEN,
GEN_AI_STREAMING_TIME_TO_GENERATE,
GEN_AI_TOKEN_USAGE,
GEN_AI_TRACE_DURATION,
LLM_OPERATION_DURATION,
)
from .entities.tencent_trace_entity import SpanData
logger = logging.getLogger(__name__)
def _get_opentelemetry_sdk_version() -> str:
"""Get OpenTelemetry SDK version dynamically."""
try:
return version("opentelemetry-sdk")
except Exception:
logger.debug("Failed to get opentelemetry-sdk version, using default")
return "1.27.0" # fallback version
class TencentTraceClient:
"""Tencent APM trace client using OpenTelemetry OTLP exporter"""
@ -57,6 +77,9 @@ class TencentTraceClient:
ResourceAttributes.SERVICE_VERSION: f"dify-{dify_config.project.version}-{dify_config.COMMIT_SHA}",
ResourceAttributes.DEPLOYMENT_ENVIRONMENT: f"{dify_config.DEPLOY_ENV}-{dify_config.EDITION}",
ResourceAttributes.HOST_NAME: socket.gethostname(),
ResourceAttributes.TELEMETRY_SDK_LANGUAGE: "python",
ResourceAttributes.TELEMETRY_SDK_NAME: "opentelemetry",
ResourceAttributes.TELEMETRY_SDK_VERSION: _get_opentelemetry_sdk_version(),
}
)
# Prepare gRPC endpoint/metadata
@ -80,13 +103,18 @@ class TencentTraceClient:
)
self.tracer_provider.add_span_processor(self.span_processor)
self.tracer = self.tracer_provider.get_tracer("dify.tencent_apm")
# use dify api version as tracer version
self.tracer = self.tracer_provider.get_tracer("dify-sdk", dify_config.project.version)
# Store span contexts for parent-child relationships
self.span_contexts: dict[int, trace_api.SpanContext] = {}
self.meter: Meter | None = None
self.hist_llm_duration: Histogram | None = None
self.hist_token_usage: Histogram | None = None
self.hist_time_to_first_token: Histogram | None = None
self.hist_time_to_generate: Histogram | None = None
self.hist_trace_duration: Histogram | None = None
self.metric_reader: MetricReader | None = None
# Metrics exporter and instruments
@ -99,7 +127,7 @@ class TencentTraceClient:
use_http_protobuf = protocol in {"http/protobuf", "http-protobuf"}
use_http_json = protocol in {"http/json", "http-json"}
# Set preferred temporality for histograms to DELTA
# Tencent APM works best with delta aggregation temporality
preferred_temporality: dict[type, AggregationTemporality] = {Histogram: AggregationTemporality.DELTA}
def _create_metric_exporter(exporter_cls, **kwargs):
@ -177,20 +205,59 @@ class TencentTraceClient:
provider = MeterProvider(resource=self.resource, metric_readers=[metric_reader])
metrics.set_meter_provider(provider)
self.meter = metrics.get_meter("dify-sdk", dify_config.project.version)
# LLM operation duration histogram
self.hist_llm_duration = self.meter.create_histogram(
name=LLM_OPERATION_DURATION,
unit="s",
description="LLM operation duration (seconds)",
)
# Token usage histogram with exponential buckets
self.hist_token_usage = self.meter.create_histogram(
name=GEN_AI_TOKEN_USAGE,
unit="token",
description="Number of tokens used in prompt and completions",
)
# Time to first token histogram
self.hist_time_to_first_token = self.meter.create_histogram(
name=GEN_AI_SERVER_TIME_TO_FIRST_TOKEN,
unit="s",
description="Time to first token for streaming LLM responses (seconds)",
)
# Time to generate histogram
self.hist_time_to_generate = self.meter.create_histogram(
name=GEN_AI_STREAMING_TIME_TO_GENERATE,
unit="s",
description="Total time to generate streaming LLM responses (seconds)",
)
# Trace duration histogram
self.hist_trace_duration = self.meter.create_histogram(
name=GEN_AI_TRACE_DURATION,
unit="s",
description="End-to-end GenAI trace duration (seconds)",
)
self.metric_reader = metric_reader
else:
self.meter = None
self.hist_llm_duration = None
self.hist_token_usage = None
self.hist_time_to_first_token = None
self.hist_time_to_generate = None
self.hist_trace_duration = None
self.metric_reader = None
except Exception:
logger.exception("[Tencent APM] Metrics initialization failed; metrics disabled")
self.meter = None
self.hist_llm_duration = None
self.hist_token_usage = None
self.hist_time_to_first_token = None
self.hist_time_to_generate = None
self.hist_trace_duration = None
self.metric_reader = None
def add_span(self, span_data: SpanData) -> None:
@ -216,6 +283,117 @@ class TencentTraceClient:
except Exception:
logger.debug("[Tencent APM] Failed to record LLM duration", exc_info=True)
def record_token_usage(
self,
token_count: int,
token_type: str,
operation_name: str,
request_model: str,
response_model: str,
server_address: str,
provider: str,
) -> None:
"""Record token usage histogram.
Args:
token_count: Number of tokens used
token_type: "input" or "output"
operation_name: Operation name (e.g., "chat")
request_model: Model used in request
response_model: Model used in response
server_address: Server address
provider: Model provider name
"""
try:
if not hasattr(self, "hist_token_usage") or self.hist_token_usage is None:
return
attributes = {
"gen_ai.operation.name": operation_name,
"gen_ai.request.model": request_model,
"gen_ai.response.model": response_model,
"gen_ai.system": provider,
"gen_ai.token.type": token_type,
"server.address": server_address,
}
self.hist_token_usage.record(token_count, attributes) # type: ignore[attr-defined]
except Exception:
logger.debug("[Tencent APM] Failed to record token usage", exc_info=True)
def record_time_to_first_token(
self, ttft_seconds: float, provider: str, model: str, operation_name: str = "chat"
) -> None:
"""Record time to first token histogram.
Args:
ttft_seconds: Time to first token in seconds
provider: Model provider name
model: Model name
operation_name: Operation name (default: "chat")
"""
try:
if not hasattr(self, "hist_time_to_first_token") or self.hist_time_to_first_token is None:
return
attributes = {
"gen_ai.operation.name": operation_name,
"gen_ai.system": provider,
"gen_ai.request.model": model,
"gen_ai.response.model": model,
"stream": "true",
}
self.hist_time_to_first_token.record(ttft_seconds, attributes) # type: ignore[attr-defined]
except Exception:
logger.debug("[Tencent APM] Failed to record time to first token", exc_info=True)
def record_time_to_generate(
self, ttg_seconds: float, provider: str, model: str, operation_name: str = "chat"
) -> None:
"""Record time to generate histogram.
Args:
ttg_seconds: Time to generate in seconds
provider: Model provider name
model: Model name
operation_name: Operation name (default: "chat")
"""
try:
if not hasattr(self, "hist_time_to_generate") or self.hist_time_to_generate is None:
return
attributes = {
"gen_ai.operation.name": operation_name,
"gen_ai.system": provider,
"gen_ai.request.model": model,
"gen_ai.response.model": model,
"stream": "true",
}
self.hist_time_to_generate.record(ttg_seconds, attributes) # type: ignore[attr-defined]
except Exception:
logger.debug("[Tencent APM] Failed to record time to generate", exc_info=True)
def record_trace_duration(self, duration_seconds: float, attributes: dict[str, str] | None = None) -> None:
"""Record end-to-end trace duration histogram in seconds.
Args:
duration_seconds: Trace duration in seconds
attributes: Optional attributes (e.g., conversation_mode, app_id)
"""
try:
if not hasattr(self, "hist_trace_duration") or self.hist_trace_duration is None:
return
attrs: dict[str, str] = {}
if attributes:
for k, v in attributes.items():
attrs[k] = str(v) if not isinstance(v, (str, int, float, bool)) else v # type: ignore[assignment]
self.hist_trace_duration.record(duration_seconds, attrs) # type: ignore[attr-defined]
except Exception:
logger.debug("[Tencent APM] Failed to record trace duration", exc_info=True)
def _create_and_export_span(self, span_data: SpanData) -> None:
"""Create span using OpenTelemetry Tracer API"""
try:

View File

@ -47,6 +47,9 @@ GEN_AI_COMPLETION = "gen_ai.completion"
GEN_AI_RESPONSE_FINISH_REASON = "gen_ai.response.finish_reason"
# Streaming Span Attributes
GEN_AI_IS_STREAMING_REQUEST = "llm.is_streaming" # Same as OpenLLMetry semconv
# Tool
TOOL_NAME = "tool.name"
@ -62,6 +65,19 @@ INSTRUMENTATION_LANGUAGE = "python"
# Metrics
LLM_OPERATION_DURATION = "gen_ai.client.operation.duration"
GEN_AI_TOKEN_USAGE = "gen_ai.client.token.usage"
GEN_AI_SERVER_TIME_TO_FIRST_TOKEN = "gen_ai.server.time_to_first_token"
GEN_AI_STREAMING_TIME_TO_GENERATE = "gen_ai.streaming.time_to_generate"
# The LLM trace duration which is exclusive to tencent apm
GEN_AI_TRACE_DURATION = "gen_ai.trace.duration"
# Token Usage Attributes
GEN_AI_OPERATION_NAME = "gen_ai.operation.name"
GEN_AI_REQUEST_MODEL = "gen_ai.request.model"
GEN_AI_RESPONSE_MODEL = "gen_ai.response.model"
GEN_AI_SYSTEM = "gen_ai.system"
GEN_AI_TOKEN_TYPE = "gen_ai.token.type"
SERVER_ADDRESS = "server.address"
class GenAISpanKind(Enum):

View File

@ -14,10 +14,11 @@ from core.ops.entities.trace_entity import (
ToolTraceInfo,
WorkflowTraceInfo,
)
from core.ops.tencent_trace.entities.tencent_semconv import (
from core.ops.tencent_trace.entities.semconv import (
GEN_AI_COMPLETION,
GEN_AI_FRAMEWORK,
GEN_AI_IS_ENTRY,
GEN_AI_IS_STREAMING_REQUEST,
GEN_AI_MODEL_NAME,
GEN_AI_PROMPT,
GEN_AI_PROVIDER,
@ -156,6 +157,25 @@ class TencentSpanBuilder:
outputs = node_execution.outputs or {}
usage_data = process_data.get("usage", {}) if "usage" in process_data else outputs.get("usage", {})
attributes = {
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id", ""),
GEN_AI_SPAN_KIND: GenAISpanKind.GENERATION.value,
GEN_AI_FRAMEWORK: "dify",
GEN_AI_MODEL_NAME: process_data.get("model_name", ""),
GEN_AI_PROVIDER: process_data.get("model_provider", ""),
GEN_AI_USAGE_INPUT_TOKENS: str(usage_data.get("prompt_tokens", 0)),
GEN_AI_USAGE_OUTPUT_TOKENS: str(usage_data.get("completion_tokens", 0)),
GEN_AI_USAGE_TOTAL_TOKENS: str(usage_data.get("total_tokens", 0)),
GEN_AI_PROMPT: json.dumps(process_data.get("prompts", []), ensure_ascii=False),
GEN_AI_COMPLETION: str(outputs.get("text", "")),
GEN_AI_RESPONSE_FINISH_REASON: outputs.get("finish_reason", ""),
INPUT_VALUE: json.dumps(process_data.get("prompts", []), ensure_ascii=False),
OUTPUT_VALUE: str(outputs.get("text", "")),
}
if usage_data.get("time_to_first_token") is not None:
attributes[GEN_AI_IS_STREAMING_REQUEST] = "true"
return SpanData(
trace_id=trace_id,
parent_span_id=workflow_span_id,
@ -163,21 +183,7 @@ class TencentSpanBuilder:
name="GENERATION",
start_time=TencentSpanBuilder._get_time_nanoseconds(node_execution.created_at),
end_time=TencentSpanBuilder._get_time_nanoseconds(node_execution.finished_at),
attributes={
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id", ""),
GEN_AI_SPAN_KIND: GenAISpanKind.GENERATION.value,
GEN_AI_FRAMEWORK: "dify",
GEN_AI_MODEL_NAME: process_data.get("model_name", ""),
GEN_AI_PROVIDER: process_data.get("model_provider", ""),
GEN_AI_USAGE_INPUT_TOKENS: str(usage_data.get("prompt_tokens", 0)),
GEN_AI_USAGE_OUTPUT_TOKENS: str(usage_data.get("completion_tokens", 0)),
GEN_AI_USAGE_TOTAL_TOKENS: str(usage_data.get("total_tokens", 0)),
GEN_AI_PROMPT: json.dumps(process_data.get("prompts", []), ensure_ascii=False),
GEN_AI_COMPLETION: str(outputs.get("text", "")),
GEN_AI_RESPONSE_FINISH_REASON: outputs.get("finish_reason", ""),
INPUT_VALUE: json.dumps(process_data.get("prompts", []), ensure_ascii=False),
OUTPUT_VALUE: str(outputs.get("text", "")),
},
attributes=attributes,
status=TencentSpanBuilder._get_workflow_node_status(node_execution),
)
@ -191,6 +197,19 @@ class TencentSpanBuilder:
if trace_info.error:
status = Status(StatusCode.ERROR, trace_info.error)
attributes = {
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id", ""),
GEN_AI_USER_ID: str(user_id),
GEN_AI_SPAN_KIND: GenAISpanKind.WORKFLOW.value,
GEN_AI_FRAMEWORK: "dify",
GEN_AI_IS_ENTRY: "true",
INPUT_VALUE: str(trace_info.inputs or ""),
OUTPUT_VALUE: str(trace_info.outputs or ""),
}
if trace_info.is_streaming_request:
attributes[GEN_AI_IS_STREAMING_REQUEST] = "true"
return SpanData(
trace_id=trace_id,
parent_span_id=None,
@ -198,15 +217,7 @@ class TencentSpanBuilder:
name="message",
start_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.start_time),
end_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.end_time),
attributes={
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id", ""),
GEN_AI_USER_ID: str(user_id),
GEN_AI_SPAN_KIND: GenAISpanKind.WORKFLOW.value,
GEN_AI_FRAMEWORK: "dify",
GEN_AI_IS_ENTRY: "true",
INPUT_VALUE: str(trace_info.inputs or ""),
OUTPUT_VALUE: str(trace_info.outputs or ""),
},
attributes=attributes,
status=status,
links=links,
)

View File

@ -90,6 +90,9 @@ class TencentDataTrace(BaseTraceInstance):
self._process_workflow_nodes(trace_info, trace_id)
# Record trace duration for entry span
self._record_workflow_trace_duration(trace_info)
except Exception:
logger.exception("[Tencent APM] Failed to process workflow trace")
@ -107,6 +110,11 @@ class TencentDataTrace(BaseTraceInstance):
self.trace_client.add_span(message_span)
self._record_message_llm_metrics(trace_info)
# Record trace duration for entry span
self._record_message_trace_duration(trace_info)
except Exception:
logger.exception("[Tencent APM] Failed to process message trace")
@ -290,24 +298,219 @@ class TencentDataTrace(BaseTraceInstance):
def _record_llm_metrics(self, node_execution: WorkflowNodeExecution) -> None:
"""Record LLM performance metrics"""
try:
if not hasattr(self.trace_client, "record_llm_duration"):
return
process_data = node_execution.process_data or {}
usage = process_data.get("usage", {})
latency_s = float(usage.get("latency", 0.0))
outputs = node_execution.outputs or {}
usage = process_data.get("usage", {}) if "usage" in process_data else outputs.get("usage", {})
if latency_s > 0:
attributes = {
"provider": process_data.get("model_provider", ""),
"model": process_data.get("model_name", ""),
"span_kind": "GENERATION",
}
self.trace_client.record_llm_duration(latency_s, attributes)
model_provider = process_data.get("model_provider", "unknown")
model_name = process_data.get("model_name", "unknown")
model_mode = process_data.get("model_mode", "chat")
# Record LLM duration
if hasattr(self.trace_client, "record_llm_duration"):
latency_s = float(usage.get("latency", 0.0))
if latency_s > 0:
# Determine if streaming from usage metrics
is_streaming = usage.get("time_to_first_token") is not None
attributes = {
"gen_ai.system": model_provider,
"gen_ai.response.model": model_name,
"gen_ai.operation.name": model_mode,
"stream": "true" if is_streaming else "false",
}
self.trace_client.record_llm_duration(latency_s, attributes)
# Record streaming metrics from usage
time_to_first_token = usage.get("time_to_first_token")
if time_to_first_token is not None and hasattr(self.trace_client, "record_time_to_first_token"):
ttft_seconds = float(time_to_first_token)
if ttft_seconds > 0:
self.trace_client.record_time_to_first_token(
ttft_seconds=ttft_seconds, provider=model_provider, model=model_name, operation_name=model_mode
)
time_to_generate = usage.get("time_to_generate")
if time_to_generate is not None and hasattr(self.trace_client, "record_time_to_generate"):
ttg_seconds = float(time_to_generate)
if ttg_seconds > 0:
self.trace_client.record_time_to_generate(
ttg_seconds=ttg_seconds, provider=model_provider, model=model_name, operation_name=model_mode
)
# Record token usage
if hasattr(self.trace_client, "record_token_usage"):
# Extract token counts
input_tokens = int(usage.get("prompt_tokens", 0))
output_tokens = int(usage.get("completion_tokens", 0))
if input_tokens > 0 or output_tokens > 0:
server_address = f"{model_provider}"
# Record input tokens
if input_tokens > 0:
self.trace_client.record_token_usage(
token_count=input_tokens,
token_type="input",
operation_name=model_mode,
request_model=model_name,
response_model=model_name,
server_address=server_address,
provider=model_provider,
)
# Record output tokens
if output_tokens > 0:
self.trace_client.record_token_usage(
token_count=output_tokens,
token_type="output",
operation_name=model_mode,
request_model=model_name,
response_model=model_name,
server_address=server_address,
provider=model_provider,
)
except Exception:
logger.debug("[Tencent APM] Failed to record LLM metrics")
def _record_message_llm_metrics(self, trace_info: MessageTraceInfo) -> None:
"""Record LLM metrics for message traces"""
try:
trace_metadata = trace_info.metadata or {}
message_data = trace_info.message_data or {}
provider_latency = 0.0
if isinstance(message_data, dict):
provider_latency = float(message_data.get("provider_response_latency", 0.0) or 0.0)
else:
provider_latency = float(getattr(message_data, "provider_response_latency", 0.0) or 0.0)
model_provider = trace_metadata.get("ls_provider") or (
message_data.get("model_provider", "") if isinstance(message_data, dict) else ""
)
model_name = trace_metadata.get("ls_model_name") or (
message_data.get("model_id", "") if isinstance(message_data, dict) else ""
)
# Record LLM duration
if provider_latency > 0 and hasattr(self.trace_client, "record_llm_duration"):
is_streaming = trace_info.is_streaming_request
duration_attributes = {
"gen_ai.system": model_provider,
"gen_ai.response.model": model_name,
"gen_ai.operation.name": "chat", # Message traces are always chat
"stream": "true" if is_streaming else "false",
}
self.trace_client.record_llm_duration(provider_latency, duration_attributes)
# Record streaming metrics for message traces
if trace_info.is_streaming_request:
# Record time to first token
if trace_info.gen_ai_server_time_to_first_token is not None and hasattr(
self.trace_client, "record_time_to_first_token"
):
ttft_seconds = float(trace_info.gen_ai_server_time_to_first_token)
if ttft_seconds > 0:
self.trace_client.record_time_to_first_token(
ttft_seconds=ttft_seconds, provider=str(model_provider or ""), model=str(model_name or "")
)
# Record time to generate
if trace_info.llm_streaming_time_to_generate is not None and hasattr(
self.trace_client, "record_time_to_generate"
):
ttg_seconds = float(trace_info.llm_streaming_time_to_generate)
if ttg_seconds > 0:
self.trace_client.record_time_to_generate(
ttg_seconds=ttg_seconds, provider=str(model_provider or ""), model=str(model_name or "")
)
# Record token usage
if hasattr(self.trace_client, "record_token_usage"):
input_tokens = int(trace_info.message_tokens or 0)
output_tokens = int(trace_info.answer_tokens or 0)
if input_tokens > 0:
self.trace_client.record_token_usage(
token_count=input_tokens,
token_type="input",
operation_name="chat",
request_model=str(model_name or ""),
response_model=str(model_name or ""),
server_address=str(model_provider or ""),
provider=str(model_provider or ""),
)
if output_tokens > 0:
self.trace_client.record_token_usage(
token_count=output_tokens,
token_type="output",
operation_name="chat",
request_model=str(model_name or ""),
response_model=str(model_name or ""),
server_address=str(model_provider or ""),
provider=str(model_provider or ""),
)
except Exception:
logger.debug("[Tencent APM] Failed to record message LLM metrics")
def _record_workflow_trace_duration(self, trace_info: WorkflowTraceInfo) -> None:
"""Record end-to-end workflow trace duration."""
try:
if not hasattr(self.trace_client, "record_trace_duration"):
return
# Calculate duration from start_time and end_time to match span duration
if trace_info.start_time and trace_info.end_time:
duration_s = (trace_info.end_time - trace_info.start_time).total_seconds()
else:
# Fallback to workflow_run_elapsed_time if timestamps not available
duration_s = float(trace_info.workflow_run_elapsed_time)
if duration_s > 0:
attributes = {
"conversation_mode": "workflow",
"workflow_status": trace_info.workflow_run_status,
}
# Add conversation_id if available
if trace_info.conversation_id:
attributes["has_conversation"] = "true"
else:
attributes["has_conversation"] = "false"
self.trace_client.record_trace_duration(duration_s, attributes)
except Exception:
logger.debug("[Tencent APM] Failed to record workflow trace duration")
def _record_message_trace_duration(self, trace_info: MessageTraceInfo) -> None:
"""Record end-to-end message trace duration."""
try:
if not hasattr(self.trace_client, "record_trace_duration"):
return
# Calculate duration from start_time and end_time
if trace_info.start_time and trace_info.end_time:
duration = (trace_info.end_time - trace_info.start_time).total_seconds()
if duration > 0:
attributes = {
"conversation_mode": trace_info.conversation_mode,
}
# Add streaming flag if available
if hasattr(trace_info, "is_streaming_request"):
attributes["stream"] = "true" if trace_info.is_streaming_request else "false"
self.trace_client.record_trace_duration(duration, attributes)
except Exception:
logger.debug("[Tencent APM] Failed to record message trace duration")
def __del__(self):
"""Ensure proper cleanup on garbage collection."""
try:

View File

@ -100,6 +100,7 @@ class WeaviateVector(BaseVector):
grpc_port=grpc_port,
grpc_secure=grpc_secure,
auth_credentials=Auth.api_key(config.api_key) if config.api_key else None,
skip_init_checks=True, # Skip PyPI version check to avoid unnecessary HTTP requests
)
if not client.is_ready():

View File

@ -104,7 +104,7 @@ class HttpRequestNode(Node):
status=WorkflowNodeExecutionStatus.FAILED,
outputs={
"status_code": response.status_code,
"body": response.text if not files else "",
"body": response.text if not files.value else "",
"headers": response.headers,
"files": files,
},

View File

@ -3,6 +3,7 @@ import io
import json
import logging
import re
import time
from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any, Literal
@ -384,6 +385,8 @@ class LLMNode(Node):
output_schema = LLMNode.fetch_structured_output_schema(
structured_output=structured_output or {},
)
request_start_time = time.perf_counter()
invoke_result = invoke_llm_with_structured_output(
provider=model_instance.provider,
model_schema=model_schema,
@ -396,6 +399,8 @@ class LLMNode(Node):
user=user_id,
)
else:
request_start_time = time.perf_counter()
invoke_result = model_instance.invoke_llm(
prompt_messages=list(prompt_messages),
model_parameters=node_data_model.completion_params,
@ -411,6 +416,7 @@ class LLMNode(Node):
node_id=node_id,
node_type=node_type,
reasoning_format=reasoning_format,
request_start_time=request_start_time,
)
@staticmethod
@ -422,14 +428,20 @@ class LLMNode(Node):
node_id: str,
node_type: NodeType,
reasoning_format: Literal["separated", "tagged"] = "tagged",
request_start_time: float | None = None,
) -> Generator[NodeEventBase | LLMStructuredOutput, None, None]:
# For blocking mode
if isinstance(invoke_result, LLMResult):
duration = None
if request_start_time is not None:
duration = time.perf_counter() - request_start_time
invoke_result.usage.latency = round(duration, 3)
event = LLMNode.handle_blocking_result(
invoke_result=invoke_result,
saver=file_saver,
file_outputs=file_outputs,
reasoning_format=reasoning_format,
request_latency=duration,
)
yield event
return
@ -441,6 +453,12 @@ class LLMNode(Node):
usage = LLMUsage.empty_usage()
finish_reason = None
full_text_buffer = io.StringIO()
# Initialize streaming metrics tracking
start_time = request_start_time if request_start_time is not None else time.perf_counter()
first_token_time = None
has_content = False
collected_structured_output = None # Collect structured_output from streaming chunks
# Consume the invoke result and handle generator exception
try:
@ -457,6 +475,11 @@ class LLMNode(Node):
file_saver=file_saver,
file_outputs=file_outputs,
):
# Detect first token for TTFT calculation
if text_part and not has_content:
first_token_time = time.perf_counter()
has_content = True
full_text_buffer.write(text_part)
yield StreamChunkEvent(
selector=[node_id, "text"],
@ -489,6 +512,16 @@ class LLMNode(Node):
# Extract clean text and reasoning from <think> tags
clean_text, reasoning_content = LLMNode._split_reasoning(full_text, reasoning_format)
# Calculate streaming metrics
end_time = time.perf_counter()
total_duration = end_time - start_time
usage.latency = round(total_duration, 3)
if has_content and first_token_time:
gen_ai_server_time_to_first_token = first_token_time - start_time
llm_streaming_time_to_generate = end_time - first_token_time
usage.time_to_first_token = round(gen_ai_server_time_to_first_token, 3)
usage.time_to_generate = round(llm_streaming_time_to_generate, 3)
yield ModelInvokeCompletedEvent(
# Use clean_text for separated mode, full_text for tagged mode
text=clean_text if reasoning_format == "separated" else full_text,
@ -1068,6 +1101,7 @@ class LLMNode(Node):
saver: LLMFileSaver,
file_outputs: list["File"],
reasoning_format: Literal["separated", "tagged"] = "tagged",
request_latency: float | None = None,
) -> ModelInvokeCompletedEvent:
buffer = io.StringIO()
for text_part in LLMNode._save_multimodal_output_and_convert_result_to_markdown(
@ -1088,7 +1122,7 @@ class LLMNode(Node):
# Extract clean text and reasoning from <think> tags
clean_text, reasoning_content = LLMNode._split_reasoning(full_text, reasoning_format)
return ModelInvokeCompletedEvent(
event = ModelInvokeCompletedEvent(
# Use clean_text for separated mode, full_text for tagged mode
text=clean_text if reasoning_format == "separated" else full_text,
usage=invoke_result.usage,
@ -1098,6 +1132,9 @@ class LLMNode(Node):
# Pass structured output if enabled
structured_output=getattr(invoke_result, "structured_output", None),
)
if request_latency is not None:
event.usage.latency = round(request_latency, 3)
return event
@staticmethod
def save_multimodal_image_output(