mirror of
https://github.com/langgenius/dify.git
synced 2026-05-01 16:08:04 +08:00
Merge branch 'main' into feat/mcp-06-18
This commit is contained in:
@ -472,6 +472,9 @@ class ProviderConfiguration(BaseModel):
|
||||
provider_model_credentials_cache.delete()
|
||||
|
||||
self.switch_preferred_provider_type(provider_type=ProviderType.CUSTOM, session=session)
|
||||
else:
|
||||
# some historical data may have a provider record but not be set as valid
|
||||
provider_record.is_valid = True
|
||||
|
||||
session.commit()
|
||||
except Exception:
|
||||
|
||||
@ -7,7 +7,7 @@ import uuid
|
||||
from collections import deque
|
||||
from collections.abc import Sequence
|
||||
from datetime import datetime
|
||||
from typing import Final
|
||||
from typing import Final, cast
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import httpx
|
||||
@ -199,7 +199,7 @@ def convert_to_trace_id(uuid_v4: str | None) -> int:
|
||||
raise ValueError("UUID cannot be None")
|
||||
try:
|
||||
uuid_obj = uuid.UUID(uuid_v4)
|
||||
return uuid_obj.int
|
||||
return cast(int, uuid_obj.int)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid UUID input: {uuid_v4}") from e
|
||||
|
||||
|
||||
@ -13,6 +13,7 @@ class TracingProviderEnum(StrEnum):
|
||||
OPIK = "opik"
|
||||
WEAVE = "weave"
|
||||
ALIYUN = "aliyun"
|
||||
TENCENT = "tencent"
|
||||
|
||||
|
||||
class BaseTracingConfig(BaseModel):
|
||||
@ -195,5 +196,32 @@ class AliyunConfig(BaseTracingConfig):
|
||||
return validate_url_with_path(v, "https://tracing-analysis-dc-hz.aliyuncs.com")
|
||||
|
||||
|
||||
class TencentConfig(BaseTracingConfig):
|
||||
"""
|
||||
Tencent APM tracing config
|
||||
"""
|
||||
|
||||
token: str
|
||||
endpoint: str
|
||||
service_name: str
|
||||
|
||||
@field_validator("token")
|
||||
@classmethod
|
||||
def token_validator(cls, v, info: ValidationInfo):
|
||||
if not v or v.strip() == "":
|
||||
raise ValueError("Token cannot be empty")
|
||||
return v
|
||||
|
||||
@field_validator("endpoint")
|
||||
@classmethod
|
||||
def endpoint_validator(cls, v, info: ValidationInfo):
|
||||
return cls.validate_endpoint_url(v, "https://apm.tencentcloudapi.com")
|
||||
|
||||
@field_validator("service_name")
|
||||
@classmethod
|
||||
def service_name_validator(cls, v, info: ValidationInfo):
|
||||
return cls.validate_project_field(v, "dify_app")
|
||||
|
||||
|
||||
OPS_FILE_PATH = "ops_trace/"
|
||||
OPS_TRACE_FAILED_KEY = "FAILED_OPS_TRACE"
|
||||
|
||||
@ -90,6 +90,7 @@ class SuggestedQuestionTraceInfo(BaseTraceInfo):
|
||||
|
||||
class DatasetRetrievalTraceInfo(BaseTraceInfo):
|
||||
documents: Any = None
|
||||
error: str | None = None
|
||||
|
||||
|
||||
class ToolTraceInfo(BaseTraceInfo):
|
||||
|
||||
@ -120,6 +120,17 @@ class OpsTraceProviderConfigMap(collections.UserDict[str, dict[str, Any]]):
|
||||
"trace_instance": AliyunDataTrace,
|
||||
}
|
||||
|
||||
case TracingProviderEnum.TENCENT:
|
||||
from core.ops.entities.config_entity import TencentConfig
|
||||
from core.ops.tencent_trace.tencent_trace import TencentDataTrace
|
||||
|
||||
return {
|
||||
"config_class": TencentConfig,
|
||||
"secret_keys": ["token"],
|
||||
"other_keys": ["endpoint", "service_name"],
|
||||
"trace_instance": TencentDataTrace,
|
||||
}
|
||||
|
||||
case _:
|
||||
raise KeyError(f"Unsupported tracing provider: {provider}")
|
||||
|
||||
@ -723,6 +734,7 @@ class TraceTask:
|
||||
end_time=timer.get("end"),
|
||||
metadata=metadata,
|
||||
message_data=message_data.to_dict(),
|
||||
error=kwargs.get("error"),
|
||||
)
|
||||
|
||||
return dataset_retrieval_trace_info
|
||||
@ -889,6 +901,7 @@ class TraceQueueManager:
|
||||
continue
|
||||
file_id = uuid4().hex
|
||||
trace_info = task.execute()
|
||||
|
||||
task_data = TaskData(
|
||||
app_id=task.app_id,
|
||||
trace_info_type=type(trace_info).__name__,
|
||||
|
||||
0
api/core/ops/tencent_trace/__init__.py
Normal file
0
api/core/ops/tencent_trace/__init__.py
Normal file
337
api/core/ops/tencent_trace/client.py
Normal file
337
api/core/ops/tencent_trace/client.py
Normal file
@ -0,0 +1,337 @@
|
||||
"""
|
||||
Tencent APM Trace Client - handles network operations, metrics, and API communication
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
from typing import TYPE_CHECKING
|
||||
from urllib.parse import urlparse
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from opentelemetry.metrics import Meter
|
||||
from opentelemetry.metrics._internal.instrument import Histogram
|
||||
from opentelemetry.sdk.metrics.export import MetricReader
|
||||
|
||||
from opentelemetry import trace as trace_api
|
||||
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
|
||||
from opentelemetry.sdk.resources import Resource
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||
from opentelemetry.semconv.resource import ResourceAttributes
|
||||
from opentelemetry.trace import SpanKind
|
||||
from opentelemetry.util.types import AttributeValue
|
||||
|
||||
from configs import dify_config
|
||||
|
||||
from .entities.tencent_semconv import LLM_OPERATION_DURATION
|
||||
from .entities.tencent_trace_entity import SpanData
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TencentTraceClient:
|
||||
"""Tencent APM trace client using OpenTelemetry OTLP exporter"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
service_name: str,
|
||||
endpoint: str,
|
||||
token: str,
|
||||
max_queue_size: int = 1000,
|
||||
schedule_delay_sec: int = 5,
|
||||
max_export_batch_size: int = 50,
|
||||
metrics_export_interval_sec: int = 10,
|
||||
):
|
||||
self.endpoint = endpoint
|
||||
self.token = token
|
||||
self.service_name = service_name
|
||||
self.metrics_export_interval_sec = metrics_export_interval_sec
|
||||
|
||||
self.resource = Resource(
|
||||
attributes={
|
||||
ResourceAttributes.SERVICE_NAME: service_name,
|
||||
ResourceAttributes.SERVICE_VERSION: f"dify-{dify_config.project.version}-{dify_config.COMMIT_SHA}",
|
||||
ResourceAttributes.DEPLOYMENT_ENVIRONMENT: f"{dify_config.DEPLOY_ENV}-{dify_config.EDITION}",
|
||||
ResourceAttributes.HOST_NAME: socket.gethostname(),
|
||||
}
|
||||
)
|
||||
# Prepare gRPC endpoint/metadata
|
||||
grpc_endpoint, insecure, _, _ = self._resolve_grpc_target(endpoint)
|
||||
|
||||
headers = (("authorization", f"Bearer {token}"),)
|
||||
|
||||
self.exporter = OTLPSpanExporter(
|
||||
endpoint=grpc_endpoint,
|
||||
headers=headers,
|
||||
insecure=insecure,
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
self.tracer_provider = TracerProvider(resource=self.resource)
|
||||
self.span_processor = BatchSpanProcessor(
|
||||
span_exporter=self.exporter,
|
||||
max_queue_size=max_queue_size,
|
||||
schedule_delay_millis=schedule_delay_sec * 1000,
|
||||
max_export_batch_size=max_export_batch_size,
|
||||
)
|
||||
self.tracer_provider.add_span_processor(self.span_processor)
|
||||
|
||||
self.tracer = self.tracer_provider.get_tracer("dify.tencent_apm")
|
||||
|
||||
# Store span contexts for parent-child relationships
|
||||
self.span_contexts: dict[int, trace_api.SpanContext] = {}
|
||||
|
||||
self.meter: Meter | None = None
|
||||
self.hist_llm_duration: Histogram | None = None
|
||||
self.metric_reader: MetricReader | None = None
|
||||
|
||||
# Metrics exporter and instruments
|
||||
try:
|
||||
from opentelemetry import metrics
|
||||
from opentelemetry.sdk.metrics import Histogram, MeterProvider
|
||||
from opentelemetry.sdk.metrics.export import AggregationTemporality, PeriodicExportingMetricReader
|
||||
|
||||
protocol = os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL", "").strip().lower()
|
||||
use_http_protobuf = protocol in {"http/protobuf", "http-protobuf"}
|
||||
use_http_json = protocol in {"http/json", "http-json"}
|
||||
|
||||
# Set preferred temporality for histograms to DELTA
|
||||
preferred_temporality: dict[type, AggregationTemporality] = {Histogram: AggregationTemporality.DELTA}
|
||||
|
||||
def _create_metric_exporter(exporter_cls, **kwargs):
|
||||
"""Create metric exporter with preferred_temporality support"""
|
||||
try:
|
||||
return exporter_cls(**kwargs, preferred_temporality=preferred_temporality)
|
||||
except Exception:
|
||||
return exporter_cls(**kwargs)
|
||||
|
||||
metric_reader = None
|
||||
if use_http_json:
|
||||
exporter_cls = None
|
||||
for mod_path in (
|
||||
"opentelemetry.exporter.otlp.http.json.metric_exporter",
|
||||
"opentelemetry.exporter.otlp.json.metric_exporter",
|
||||
):
|
||||
try:
|
||||
mod = importlib.import_module(mod_path)
|
||||
exporter_cls = getattr(mod, "OTLPMetricExporter", None)
|
||||
if exporter_cls:
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
if exporter_cls is not None:
|
||||
metric_exporter = _create_metric_exporter(
|
||||
exporter_cls,
|
||||
endpoint=endpoint,
|
||||
headers={"authorization": f"Bearer {token}"},
|
||||
)
|
||||
else:
|
||||
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
|
||||
OTLPMetricExporter as HttpMetricExporter,
|
||||
)
|
||||
|
||||
metric_exporter = _create_metric_exporter(
|
||||
HttpMetricExporter,
|
||||
endpoint=endpoint,
|
||||
headers={"authorization": f"Bearer {token}"},
|
||||
)
|
||||
metric_reader = PeriodicExportingMetricReader(
|
||||
metric_exporter, export_interval_millis=self.metrics_export_interval_sec * 1000
|
||||
)
|
||||
|
||||
elif use_http_protobuf:
|
||||
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
|
||||
OTLPMetricExporter as HttpMetricExporter,
|
||||
)
|
||||
|
||||
metric_exporter = _create_metric_exporter(
|
||||
HttpMetricExporter,
|
||||
endpoint=endpoint,
|
||||
headers={"authorization": f"Bearer {token}"},
|
||||
)
|
||||
metric_reader = PeriodicExportingMetricReader(
|
||||
metric_exporter, export_interval_millis=self.metrics_export_interval_sec * 1000
|
||||
)
|
||||
else:
|
||||
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
|
||||
OTLPMetricExporter as GrpcMetricExporter,
|
||||
)
|
||||
|
||||
m_grpc_endpoint, m_insecure, _, _ = self._resolve_grpc_target(endpoint)
|
||||
|
||||
metric_exporter = _create_metric_exporter(
|
||||
GrpcMetricExporter,
|
||||
endpoint=m_grpc_endpoint,
|
||||
headers={"authorization": f"Bearer {token}"},
|
||||
insecure=m_insecure,
|
||||
)
|
||||
metric_reader = PeriodicExportingMetricReader(
|
||||
metric_exporter, export_interval_millis=self.metrics_export_interval_sec * 1000
|
||||
)
|
||||
|
||||
if metric_reader is not None:
|
||||
provider = MeterProvider(resource=self.resource, metric_readers=[metric_reader])
|
||||
metrics.set_meter_provider(provider)
|
||||
self.meter = metrics.get_meter("dify-sdk", dify_config.project.version)
|
||||
self.hist_llm_duration = self.meter.create_histogram(
|
||||
name=LLM_OPERATION_DURATION,
|
||||
unit="s",
|
||||
description="LLM operation duration (seconds)",
|
||||
)
|
||||
self.metric_reader = metric_reader
|
||||
else:
|
||||
self.meter = None
|
||||
self.hist_llm_duration = None
|
||||
self.metric_reader = None
|
||||
except Exception:
|
||||
logger.exception("[Tencent APM] Metrics initialization failed; metrics disabled")
|
||||
self.meter = None
|
||||
self.hist_llm_duration = None
|
||||
self.metric_reader = None
|
||||
|
||||
def add_span(self, span_data: SpanData) -> None:
|
||||
"""Create and export span using OpenTelemetry Tracer API"""
|
||||
try:
|
||||
self._create_and_export_span(span_data)
|
||||
logger.debug("[Tencent APM] Created span: %s", span_data.name)
|
||||
|
||||
except Exception:
|
||||
logger.exception("[Tencent APM] Failed to create span: %s", span_data.name)
|
||||
|
||||
# Metrics recording API
|
||||
def record_llm_duration(self, latency_seconds: float, attributes: dict[str, str] | None = None) -> None:
|
||||
"""Record LLM operation duration histogram in seconds."""
|
||||
try:
|
||||
if not hasattr(self, "hist_llm_duration") or self.hist_llm_duration is None:
|
||||
return
|
||||
attrs: dict[str, str] = {}
|
||||
if attributes:
|
||||
for k, v in attributes.items():
|
||||
attrs[k] = str(v) if not isinstance(v, (str, int, float, bool)) else v # type: ignore[assignment]
|
||||
self.hist_llm_duration.record(latency_seconds, attrs) # type: ignore[attr-defined]
|
||||
except Exception:
|
||||
logger.debug("[Tencent APM] Failed to record LLM duration", exc_info=True)
|
||||
|
||||
def _create_and_export_span(self, span_data: SpanData) -> None:
|
||||
"""Create span using OpenTelemetry Tracer API"""
|
||||
try:
|
||||
parent_context = None
|
||||
if span_data.parent_span_id and span_data.parent_span_id in self.span_contexts:
|
||||
parent_context = trace_api.set_span_in_context(
|
||||
trace_api.NonRecordingSpan(self.span_contexts[span_data.parent_span_id])
|
||||
)
|
||||
|
||||
span = self.tracer.start_span(
|
||||
name=span_data.name,
|
||||
context=parent_context,
|
||||
kind=SpanKind.INTERNAL,
|
||||
start_time=span_data.start_time,
|
||||
)
|
||||
self.span_contexts[span_data.span_id] = span.get_span_context()
|
||||
|
||||
if span_data.attributes:
|
||||
attributes: dict[str, AttributeValue] = {}
|
||||
for key, value in span_data.attributes.items():
|
||||
if isinstance(value, (int, float, bool)):
|
||||
attributes[key] = value
|
||||
else:
|
||||
attributes[key] = str(value)
|
||||
span.set_attributes(attributes)
|
||||
|
||||
if span_data.events:
|
||||
for event in span_data.events:
|
||||
span.add_event(event.name, event.attributes, event.timestamp)
|
||||
|
||||
if span_data.status:
|
||||
span.set_status(span_data.status)
|
||||
|
||||
# Manually end span; do not use context manager to avoid double-end warnings
|
||||
span.end(end_time=span_data.end_time)
|
||||
|
||||
except Exception:
|
||||
logger.exception("[Tencent APM] Error creating span: %s", span_data.name)
|
||||
|
||||
def api_check(self) -> bool:
|
||||
"""Check API connectivity using socket connection test for gRPC endpoints"""
|
||||
try:
|
||||
# Resolve gRPC target consistently with exporters
|
||||
_, _, host, port = self._resolve_grpc_target(self.endpoint)
|
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.settimeout(5)
|
||||
result = sock.connect_ex((host, port))
|
||||
sock.close()
|
||||
|
||||
if result == 0:
|
||||
logger.info("[Tencent APM] Endpoint %s:%s is accessible", host, port)
|
||||
return True
|
||||
else:
|
||||
logger.warning("[Tencent APM] Endpoint %s:%s is not accessible", host, port)
|
||||
if host in ["127.0.0.1", "localhost"]:
|
||||
logger.info("[Tencent APM] Development environment detected, allowing config save")
|
||||
return True
|
||||
return False
|
||||
|
||||
except Exception:
|
||||
logger.exception("[Tencent APM] API check failed")
|
||||
if "127.0.0.1" in self.endpoint or "localhost" in self.endpoint:
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_project_url(self) -> str:
|
||||
"""Get project console URL"""
|
||||
return "https://console.cloud.tencent.com/apm"
|
||||
|
||||
def shutdown(self) -> None:
|
||||
"""Shutdown the client and export remaining spans"""
|
||||
try:
|
||||
if self.span_processor:
|
||||
logger.info("[Tencent APM] Flushing remaining spans before shutdown")
|
||||
_ = self.span_processor.force_flush()
|
||||
self.span_processor.shutdown()
|
||||
|
||||
if self.tracer_provider:
|
||||
self.tracer_provider.shutdown()
|
||||
if self.metric_reader is not None:
|
||||
try:
|
||||
self.metric_reader.shutdown() # type: ignore[attr-defined]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
except Exception:
|
||||
logger.exception("[Tencent APM] Error during client shutdown")
|
||||
|
||||
@staticmethod
|
||||
def _resolve_grpc_target(endpoint: str, default_port: int = 4317) -> tuple[str, bool, str, int]:
|
||||
"""Normalize endpoint to gRPC target and security flag.
|
||||
|
||||
Returns:
|
||||
(grpc_endpoint, insecure, host, port)
|
||||
"""
|
||||
try:
|
||||
if endpoint.startswith(("http://", "https://")):
|
||||
parsed = urlparse(endpoint)
|
||||
host = parsed.hostname or "localhost"
|
||||
port = parsed.port or default_port
|
||||
insecure = parsed.scheme == "http"
|
||||
return f"{host}:{port}", insecure, host, port
|
||||
|
||||
host = endpoint
|
||||
port = default_port
|
||||
if ":" in endpoint:
|
||||
parts = endpoint.rsplit(":", 1)
|
||||
host = parts[0] or "localhost"
|
||||
try:
|
||||
port = int(parts[1])
|
||||
except Exception:
|
||||
port = default_port
|
||||
|
||||
insecure = ("localhost" in host) or ("127.0.0.1" in host)
|
||||
return f"{host}:{port}", insecure, host, port
|
||||
except Exception:
|
||||
host, port = "localhost", default_port
|
||||
return f"{host}:{port}", True, host, port
|
||||
1
api/core/ops/tencent_trace/entities/__init__.py
Normal file
1
api/core/ops/tencent_trace/entities/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
# Tencent trace entities module
|
||||
73
api/core/ops/tencent_trace/entities/tencent_semconv.py
Normal file
73
api/core/ops/tencent_trace/entities/tencent_semconv.py
Normal file
@ -0,0 +1,73 @@
|
||||
from enum import Enum
|
||||
|
||||
# public
|
||||
GEN_AI_SESSION_ID = "gen_ai.session.id"
|
||||
|
||||
GEN_AI_USER_ID = "gen_ai.user.id"
|
||||
|
||||
GEN_AI_USER_NAME = "gen_ai.user.name"
|
||||
|
||||
GEN_AI_SPAN_KIND = "gen_ai.span.kind"
|
||||
|
||||
GEN_AI_FRAMEWORK = "gen_ai.framework"
|
||||
|
||||
GEN_AI_IS_ENTRY = "gen_ai.is_entry" # mark to count the LLM-related traces
|
||||
|
||||
# Chain
|
||||
INPUT_VALUE = "gen_ai.entity.input"
|
||||
|
||||
OUTPUT_VALUE = "gen_ai.entity.output"
|
||||
|
||||
|
||||
# Retriever
|
||||
RETRIEVAL_QUERY = "retrieval.query"
|
||||
|
||||
RETRIEVAL_DOCUMENT = "retrieval.document"
|
||||
|
||||
|
||||
# GENERATION
|
||||
GEN_AI_MODEL_NAME = "gen_ai.response.model"
|
||||
|
||||
GEN_AI_PROVIDER = "gen_ai.provider.name"
|
||||
|
||||
|
||||
GEN_AI_USAGE_INPUT_TOKENS = "gen_ai.usage.input_tokens"
|
||||
|
||||
GEN_AI_USAGE_OUTPUT_TOKENS = "gen_ai.usage.output_tokens"
|
||||
|
||||
GEN_AI_USAGE_TOTAL_TOKENS = "gen_ai.usage.total_tokens"
|
||||
|
||||
GEN_AI_PROMPT_TEMPLATE_TEMPLATE = "gen_ai.prompt_template.template"
|
||||
|
||||
GEN_AI_PROMPT_TEMPLATE_VARIABLE = "gen_ai.prompt_template.variable"
|
||||
|
||||
GEN_AI_PROMPT = "gen_ai.prompt"
|
||||
|
||||
GEN_AI_COMPLETION = "gen_ai.completion"
|
||||
|
||||
GEN_AI_RESPONSE_FINISH_REASON = "gen_ai.response.finish_reason"
|
||||
|
||||
# Tool
|
||||
TOOL_NAME = "tool.name"
|
||||
|
||||
TOOL_DESCRIPTION = "tool.description"
|
||||
|
||||
TOOL_PARAMETERS = "tool.parameters"
|
||||
|
||||
# Instrumentation Library
|
||||
INSTRUMENTATION_NAME = "dify-sdk"
|
||||
INSTRUMENTATION_VERSION = "0.1.0"
|
||||
INSTRUMENTATION_LANGUAGE = "python"
|
||||
|
||||
|
||||
# Metrics
|
||||
LLM_OPERATION_DURATION = "gen_ai.client.operation.duration"
|
||||
|
||||
|
||||
class GenAISpanKind(Enum):
|
||||
WORKFLOW = "WORKFLOW" # OpenLLMetry
|
||||
RETRIEVER = "RETRIEVER" # RAG
|
||||
GENERATION = "GENERATION" # Langfuse
|
||||
TOOL = "TOOL" # OpenLLMetry
|
||||
AGENT = "AGENT" # OpenLLMetry
|
||||
TASK = "TASK" # OpenLLMetry
|
||||
21
api/core/ops/tencent_trace/entities/tencent_trace_entity.py
Normal file
21
api/core/ops/tencent_trace/entities/tencent_trace_entity.py
Normal file
@ -0,0 +1,21 @@
|
||||
from collections.abc import Sequence
|
||||
|
||||
from opentelemetry import trace as trace_api
|
||||
from opentelemetry.sdk.trace import Event
|
||||
from opentelemetry.trace import Status, StatusCode
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class SpanData(BaseModel):
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
trace_id: int = Field(..., description="The unique identifier for the trace.")
|
||||
parent_span_id: int | None = Field(None, description="The ID of the parent span, if any.")
|
||||
span_id: int = Field(..., description="The unique identifier for this span.")
|
||||
name: str = Field(..., description="The name of the span.")
|
||||
attributes: dict[str, str] = Field(default_factory=dict, description="Attributes associated with the span.")
|
||||
events: Sequence[Event] = Field(default_factory=list, description="Events recorded in the span.")
|
||||
links: Sequence[trace_api.Link] = Field(default_factory=list, description="Links to other spans.")
|
||||
status: Status = Field(default=Status(StatusCode.UNSET), description="The status of the span.")
|
||||
start_time: int = Field(..., description="The start time of the span in nanoseconds.")
|
||||
end_time: int = Field(..., description="The end time of the span in nanoseconds.")
|
||||
372
api/core/ops/tencent_trace/span_builder.py
Normal file
372
api/core/ops/tencent_trace/span_builder.py
Normal file
@ -0,0 +1,372 @@
|
||||
"""
|
||||
Tencent APM Span Builder - handles all span construction logic
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
from opentelemetry.trace import Status, StatusCode
|
||||
|
||||
from core.ops.entities.trace_entity import (
|
||||
DatasetRetrievalTraceInfo,
|
||||
MessageTraceInfo,
|
||||
ToolTraceInfo,
|
||||
WorkflowTraceInfo,
|
||||
)
|
||||
from core.ops.tencent_trace.entities.tencent_semconv import (
|
||||
GEN_AI_COMPLETION,
|
||||
GEN_AI_FRAMEWORK,
|
||||
GEN_AI_IS_ENTRY,
|
||||
GEN_AI_MODEL_NAME,
|
||||
GEN_AI_PROMPT,
|
||||
GEN_AI_PROVIDER,
|
||||
GEN_AI_RESPONSE_FINISH_REASON,
|
||||
GEN_AI_SESSION_ID,
|
||||
GEN_AI_SPAN_KIND,
|
||||
GEN_AI_USAGE_INPUT_TOKENS,
|
||||
GEN_AI_USAGE_OUTPUT_TOKENS,
|
||||
GEN_AI_USAGE_TOTAL_TOKENS,
|
||||
GEN_AI_USER_ID,
|
||||
INPUT_VALUE,
|
||||
OUTPUT_VALUE,
|
||||
RETRIEVAL_DOCUMENT,
|
||||
RETRIEVAL_QUERY,
|
||||
TOOL_DESCRIPTION,
|
||||
TOOL_NAME,
|
||||
TOOL_PARAMETERS,
|
||||
GenAISpanKind,
|
||||
)
|
||||
from core.ops.tencent_trace.entities.tencent_trace_entity import SpanData
|
||||
from core.ops.tencent_trace.utils import TencentTraceUtils
|
||||
from core.rag.models.document import Document
|
||||
from core.workflow.entities.workflow_node_execution import (
|
||||
WorkflowNodeExecution,
|
||||
WorkflowNodeExecutionMetadataKey,
|
||||
WorkflowNodeExecutionStatus,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TencentSpanBuilder:
|
||||
"""Builder class for constructing different types of spans"""
|
||||
|
||||
@staticmethod
|
||||
def _get_time_nanoseconds(time_value: datetime | None) -> int:
|
||||
"""Convert datetime to nanoseconds for span creation."""
|
||||
return TencentTraceUtils.convert_datetime_to_nanoseconds(time_value)
|
||||
|
||||
@staticmethod
|
||||
def build_workflow_spans(
|
||||
trace_info: WorkflowTraceInfo, trace_id: int, user_id: str, links: list | None = None
|
||||
) -> list[SpanData]:
|
||||
"""Build workflow-related spans"""
|
||||
spans = []
|
||||
links = links or []
|
||||
|
||||
message_span_id = None
|
||||
workflow_span_id = TencentTraceUtils.convert_to_span_id(trace_info.workflow_run_id, "workflow")
|
||||
|
||||
if hasattr(trace_info, "metadata") and trace_info.metadata.get("conversation_id"):
|
||||
message_span_id = TencentTraceUtils.convert_to_span_id(trace_info.workflow_run_id, "message")
|
||||
|
||||
status = Status(StatusCode.OK)
|
||||
if trace_info.error:
|
||||
status = Status(StatusCode.ERROR, trace_info.error)
|
||||
|
||||
if message_span_id:
|
||||
message_span = TencentSpanBuilder._build_message_span(
|
||||
trace_info, trace_id, message_span_id, user_id, status, links
|
||||
)
|
||||
spans.append(message_span)
|
||||
|
||||
workflow_span = TencentSpanBuilder._build_workflow_span(
|
||||
trace_info, trace_id, workflow_span_id, message_span_id, user_id, status, links
|
||||
)
|
||||
spans.append(workflow_span)
|
||||
|
||||
return spans
|
||||
|
||||
@staticmethod
|
||||
def _build_message_span(
|
||||
trace_info: WorkflowTraceInfo, trace_id: int, message_span_id: int, user_id: str, status: Status, links: list
|
||||
) -> SpanData:
|
||||
"""Build message span for chatflow"""
|
||||
return SpanData(
|
||||
trace_id=trace_id,
|
||||
parent_span_id=None,
|
||||
span_id=message_span_id,
|
||||
name="message",
|
||||
start_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.start_time),
|
||||
end_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.end_time),
|
||||
attributes={
|
||||
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id", ""),
|
||||
GEN_AI_USER_ID: str(user_id),
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.WORKFLOW.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
GEN_AI_IS_ENTRY: "true",
|
||||
INPUT_VALUE: trace_info.workflow_run_inputs.get("sys.query", ""),
|
||||
OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False),
|
||||
},
|
||||
status=status,
|
||||
links=links,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _build_workflow_span(
|
||||
trace_info: WorkflowTraceInfo,
|
||||
trace_id: int,
|
||||
workflow_span_id: int,
|
||||
message_span_id: int | None,
|
||||
user_id: str,
|
||||
status: Status,
|
||||
links: list,
|
||||
) -> SpanData:
|
||||
"""Build workflow span"""
|
||||
attributes = {
|
||||
GEN_AI_USER_ID: str(user_id),
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.WORKFLOW.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
INPUT_VALUE: json.dumps(trace_info.workflow_run_inputs, ensure_ascii=False),
|
||||
OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False),
|
||||
}
|
||||
|
||||
if message_span_id is None:
|
||||
attributes[GEN_AI_IS_ENTRY] = "true"
|
||||
|
||||
return SpanData(
|
||||
trace_id=trace_id,
|
||||
parent_span_id=message_span_id,
|
||||
span_id=workflow_span_id,
|
||||
name="workflow",
|
||||
start_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.start_time),
|
||||
end_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.end_time),
|
||||
attributes=attributes,
|
||||
status=status,
|
||||
links=links,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def build_workflow_llm_span(
|
||||
trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution
|
||||
) -> SpanData:
|
||||
"""Build LLM span for workflow nodes."""
|
||||
process_data = node_execution.process_data or {}
|
||||
outputs = node_execution.outputs or {}
|
||||
usage_data = process_data.get("usage", {}) if "usage" in process_data else outputs.get("usage", {})
|
||||
|
||||
return SpanData(
|
||||
trace_id=trace_id,
|
||||
parent_span_id=workflow_span_id,
|
||||
span_id=TencentTraceUtils.convert_to_span_id(node_execution.id, "node"),
|
||||
name="GENERATION",
|
||||
start_time=TencentSpanBuilder._get_time_nanoseconds(node_execution.created_at),
|
||||
end_time=TencentSpanBuilder._get_time_nanoseconds(node_execution.finished_at),
|
||||
attributes={
|
||||
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id", ""),
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.GENERATION.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
GEN_AI_MODEL_NAME: process_data.get("model_name", ""),
|
||||
GEN_AI_PROVIDER: process_data.get("model_provider", ""),
|
||||
GEN_AI_USAGE_INPUT_TOKENS: str(usage_data.get("prompt_tokens", 0)),
|
||||
GEN_AI_USAGE_OUTPUT_TOKENS: str(usage_data.get("completion_tokens", 0)),
|
||||
GEN_AI_USAGE_TOTAL_TOKENS: str(usage_data.get("total_tokens", 0)),
|
||||
GEN_AI_PROMPT: json.dumps(process_data.get("prompts", []), ensure_ascii=False),
|
||||
GEN_AI_COMPLETION: str(outputs.get("text", "")),
|
||||
GEN_AI_RESPONSE_FINISH_REASON: outputs.get("finish_reason", ""),
|
||||
INPUT_VALUE: json.dumps(process_data.get("prompts", []), ensure_ascii=False),
|
||||
OUTPUT_VALUE: str(outputs.get("text", "")),
|
||||
},
|
||||
status=TencentSpanBuilder._get_workflow_node_status(node_execution),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def build_message_span(
|
||||
trace_info: MessageTraceInfo, trace_id: int, user_id: str, links: list | None = None
|
||||
) -> SpanData:
|
||||
"""Build message span."""
|
||||
links = links or []
|
||||
status = Status(StatusCode.OK)
|
||||
if trace_info.error:
|
||||
status = Status(StatusCode.ERROR, trace_info.error)
|
||||
|
||||
return SpanData(
|
||||
trace_id=trace_id,
|
||||
parent_span_id=None,
|
||||
span_id=TencentTraceUtils.convert_to_span_id(trace_info.message_id, "message"),
|
||||
name="message",
|
||||
start_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.start_time),
|
||||
end_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.end_time),
|
||||
attributes={
|
||||
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id", ""),
|
||||
GEN_AI_USER_ID: str(user_id),
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.WORKFLOW.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
GEN_AI_IS_ENTRY: "true",
|
||||
INPUT_VALUE: str(trace_info.inputs or ""),
|
||||
OUTPUT_VALUE: str(trace_info.outputs or ""),
|
||||
},
|
||||
status=status,
|
||||
links=links,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def build_tool_span(trace_info: ToolTraceInfo, trace_id: int, parent_span_id: int) -> SpanData:
|
||||
"""Build tool span."""
|
||||
status = Status(StatusCode.OK)
|
||||
if trace_info.error:
|
||||
status = Status(StatusCode.ERROR, trace_info.error)
|
||||
|
||||
return SpanData(
|
||||
trace_id=trace_id,
|
||||
parent_span_id=parent_span_id,
|
||||
span_id=TencentTraceUtils.convert_to_span_id(trace_info.message_id, "tool"),
|
||||
name=trace_info.tool_name,
|
||||
start_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.start_time),
|
||||
end_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.end_time),
|
||||
attributes={
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.TOOL.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
TOOL_NAME: trace_info.tool_name,
|
||||
TOOL_DESCRIPTION: "",
|
||||
TOOL_PARAMETERS: json.dumps(trace_info.tool_parameters, ensure_ascii=False),
|
||||
INPUT_VALUE: json.dumps(trace_info.tool_inputs, ensure_ascii=False),
|
||||
OUTPUT_VALUE: str(trace_info.tool_outputs),
|
||||
},
|
||||
status=status,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def build_retrieval_span(trace_info: DatasetRetrievalTraceInfo, trace_id: int, parent_span_id: int) -> SpanData:
|
||||
"""Build dataset retrieval span."""
|
||||
status = Status(StatusCode.OK)
|
||||
if getattr(trace_info, "error", None):
|
||||
status = Status(StatusCode.ERROR, trace_info.error) # type: ignore[arg-type]
|
||||
|
||||
documents_data = TencentSpanBuilder._extract_retrieval_documents(trace_info.documents)
|
||||
|
||||
return SpanData(
|
||||
trace_id=trace_id,
|
||||
parent_span_id=parent_span_id,
|
||||
span_id=TencentTraceUtils.convert_to_span_id(trace_info.message_id, "retrieval"),
|
||||
name="retrieval",
|
||||
start_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.start_time),
|
||||
end_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.end_time),
|
||||
attributes={
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.RETRIEVER.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
RETRIEVAL_QUERY: str(trace_info.inputs or ""),
|
||||
RETRIEVAL_DOCUMENT: json.dumps(documents_data, ensure_ascii=False),
|
||||
INPUT_VALUE: str(trace_info.inputs or ""),
|
||||
OUTPUT_VALUE: json.dumps(documents_data, ensure_ascii=False),
|
||||
},
|
||||
status=status,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _get_workflow_node_status(node_execution: WorkflowNodeExecution) -> Status:
|
||||
"""Get workflow node execution status."""
|
||||
if node_execution.status == WorkflowNodeExecutionStatus.SUCCEEDED:
|
||||
return Status(StatusCode.OK)
|
||||
elif node_execution.status in [WorkflowNodeExecutionStatus.FAILED, WorkflowNodeExecutionStatus.EXCEPTION]:
|
||||
return Status(StatusCode.ERROR, str(node_execution.error))
|
||||
return Status(StatusCode.UNSET)
|
||||
|
||||
@staticmethod
|
||||
def build_workflow_retrieval_span(
|
||||
trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution
|
||||
) -> SpanData:
|
||||
"""Build knowledge retrieval span for workflow nodes."""
|
||||
input_value = ""
|
||||
if node_execution.inputs:
|
||||
input_value = str(node_execution.inputs.get("query", ""))
|
||||
output_value = ""
|
||||
if node_execution.outputs:
|
||||
output_value = json.dumps(node_execution.outputs.get("result", []), ensure_ascii=False)
|
||||
|
||||
return SpanData(
|
||||
trace_id=trace_id,
|
||||
parent_span_id=workflow_span_id,
|
||||
span_id=TencentTraceUtils.convert_to_span_id(node_execution.id, "node"),
|
||||
name=node_execution.title,
|
||||
start_time=TencentSpanBuilder._get_time_nanoseconds(node_execution.created_at),
|
||||
end_time=TencentSpanBuilder._get_time_nanoseconds(node_execution.finished_at),
|
||||
attributes={
|
||||
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id", ""),
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.RETRIEVER.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
RETRIEVAL_QUERY: input_value,
|
||||
RETRIEVAL_DOCUMENT: output_value,
|
||||
INPUT_VALUE: input_value,
|
||||
OUTPUT_VALUE: output_value,
|
||||
},
|
||||
status=TencentSpanBuilder._get_workflow_node_status(node_execution),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def build_workflow_tool_span(
|
||||
trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution
|
||||
) -> SpanData:
|
||||
"""Build tool span for workflow nodes."""
|
||||
tool_des = {}
|
||||
if node_execution.metadata:
|
||||
tool_des = node_execution.metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO, {})
|
||||
|
||||
return SpanData(
|
||||
trace_id=trace_id,
|
||||
parent_span_id=workflow_span_id,
|
||||
span_id=TencentTraceUtils.convert_to_span_id(node_execution.id, "node"),
|
||||
name=node_execution.title,
|
||||
start_time=TencentSpanBuilder._get_time_nanoseconds(node_execution.created_at),
|
||||
end_time=TencentSpanBuilder._get_time_nanoseconds(node_execution.finished_at),
|
||||
attributes={
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.TOOL.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
TOOL_NAME: node_execution.title,
|
||||
TOOL_DESCRIPTION: json.dumps(tool_des, ensure_ascii=False),
|
||||
TOOL_PARAMETERS: json.dumps(node_execution.inputs or {}, ensure_ascii=False),
|
||||
INPUT_VALUE: json.dumps(node_execution.inputs or {}, ensure_ascii=False),
|
||||
OUTPUT_VALUE: json.dumps(node_execution.outputs, ensure_ascii=False),
|
||||
},
|
||||
status=TencentSpanBuilder._get_workflow_node_status(node_execution),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def build_workflow_task_span(
|
||||
trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution
|
||||
) -> SpanData:
|
||||
"""Build generic task span for workflow nodes."""
|
||||
return SpanData(
|
||||
trace_id=trace_id,
|
||||
parent_span_id=workflow_span_id,
|
||||
span_id=TencentTraceUtils.convert_to_span_id(node_execution.id, "node"),
|
||||
name=node_execution.title,
|
||||
start_time=TencentSpanBuilder._get_time_nanoseconds(node_execution.created_at),
|
||||
end_time=TencentSpanBuilder._get_time_nanoseconds(node_execution.finished_at),
|
||||
attributes={
|
||||
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id", ""),
|
||||
GEN_AI_SPAN_KIND: GenAISpanKind.TASK.value,
|
||||
GEN_AI_FRAMEWORK: "dify",
|
||||
INPUT_VALUE: json.dumps(node_execution.inputs, ensure_ascii=False),
|
||||
OUTPUT_VALUE: json.dumps(node_execution.outputs, ensure_ascii=False),
|
||||
},
|
||||
status=TencentSpanBuilder._get_workflow_node_status(node_execution),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _extract_retrieval_documents(documents: list[Document]):
|
||||
"""Extract documents data for retrieval tracing."""
|
||||
documents_data = []
|
||||
for document in documents:
|
||||
document_data = {
|
||||
"content": document.page_content,
|
||||
"metadata": {
|
||||
"dataset_id": document.metadata.get("dataset_id"),
|
||||
"doc_id": document.metadata.get("doc_id"),
|
||||
"document_id": document.metadata.get("document_id"),
|
||||
},
|
||||
"score": document.metadata.get("score"),
|
||||
}
|
||||
documents_data.append(document_data)
|
||||
return documents_data
|
||||
317
api/core/ops/tencent_trace/tencent_trace.py
Normal file
317
api/core/ops/tencent_trace/tencent_trace.py
Normal file
@ -0,0 +1,317 @@
|
||||
"""
|
||||
Tencent APM tracing implementation with separated concerns
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
from core.ops.base_trace_instance import BaseTraceInstance
|
||||
from core.ops.entities.config_entity import TencentConfig
|
||||
from core.ops.entities.trace_entity import (
|
||||
BaseTraceInfo,
|
||||
DatasetRetrievalTraceInfo,
|
||||
GenerateNameTraceInfo,
|
||||
MessageTraceInfo,
|
||||
ModerationTraceInfo,
|
||||
SuggestedQuestionTraceInfo,
|
||||
ToolTraceInfo,
|
||||
WorkflowTraceInfo,
|
||||
)
|
||||
from core.ops.tencent_trace.client import TencentTraceClient
|
||||
from core.ops.tencent_trace.entities.tencent_trace_entity import SpanData
|
||||
from core.ops.tencent_trace.span_builder import TencentSpanBuilder
|
||||
from core.ops.tencent_trace.utils import TencentTraceUtils
|
||||
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
|
||||
from core.workflow.entities.workflow_node_execution import (
|
||||
WorkflowNodeExecution,
|
||||
)
|
||||
from core.workflow.nodes import NodeType
|
||||
from extensions.ext_database import db
|
||||
from models import Account, App, TenantAccountJoin, WorkflowNodeExecutionTriggeredFrom
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TencentDataTrace(BaseTraceInstance):
|
||||
"""
|
||||
Tencent APM trace implementation with single responsibility principle.
|
||||
Acts as a coordinator that delegates specific tasks to specialized classes.
|
||||
"""
|
||||
|
||||
def __init__(self, tencent_config: TencentConfig):
|
||||
super().__init__(tencent_config)
|
||||
self.trace_client = TencentTraceClient(
|
||||
service_name=tencent_config.service_name,
|
||||
endpoint=tencent_config.endpoint,
|
||||
token=tencent_config.token,
|
||||
metrics_export_interval_sec=5,
|
||||
)
|
||||
|
||||
def trace(self, trace_info: BaseTraceInfo) -> None:
|
||||
"""Main tracing entry point - coordinates different trace types."""
|
||||
if isinstance(trace_info, WorkflowTraceInfo):
|
||||
self.workflow_trace(trace_info)
|
||||
elif isinstance(trace_info, MessageTraceInfo):
|
||||
self.message_trace(trace_info)
|
||||
elif isinstance(trace_info, ModerationTraceInfo):
|
||||
pass
|
||||
elif isinstance(trace_info, SuggestedQuestionTraceInfo):
|
||||
self.suggested_question_trace(trace_info)
|
||||
elif isinstance(trace_info, DatasetRetrievalTraceInfo):
|
||||
self.dataset_retrieval_trace(trace_info)
|
||||
elif isinstance(trace_info, ToolTraceInfo):
|
||||
self.tool_trace(trace_info)
|
||||
elif isinstance(trace_info, GenerateNameTraceInfo):
|
||||
pass
|
||||
|
||||
def api_check(self) -> bool:
|
||||
return self.trace_client.api_check()
|
||||
|
||||
def get_project_url(self) -> str:
|
||||
return self.trace_client.get_project_url()
|
||||
|
||||
def workflow_trace(self, trace_info: WorkflowTraceInfo) -> None:
|
||||
"""Handle workflow tracing by coordinating data retrieval and span construction."""
|
||||
try:
|
||||
trace_id = TencentTraceUtils.convert_to_trace_id(trace_info.workflow_run_id)
|
||||
|
||||
links = []
|
||||
if trace_info.trace_id:
|
||||
links.append(TencentTraceUtils.create_link(trace_info.trace_id))
|
||||
|
||||
user_id = self._get_user_id(trace_info)
|
||||
|
||||
workflow_spans = TencentSpanBuilder.build_workflow_spans(trace_info, trace_id, str(user_id), links)
|
||||
|
||||
for span in workflow_spans:
|
||||
self.trace_client.add_span(span)
|
||||
|
||||
self._process_workflow_nodes(trace_info, trace_id)
|
||||
|
||||
except Exception:
|
||||
logger.exception("[Tencent APM] Failed to process workflow trace")
|
||||
|
||||
def message_trace(self, trace_info: MessageTraceInfo) -> None:
|
||||
"""Handle message tracing."""
|
||||
try:
|
||||
trace_id = TencentTraceUtils.convert_to_trace_id(trace_info.message_id)
|
||||
user_id = self._get_user_id(trace_info)
|
||||
|
||||
links = []
|
||||
if trace_info.trace_id:
|
||||
links.append(TencentTraceUtils.create_link(trace_info.trace_id))
|
||||
|
||||
message_span = TencentSpanBuilder.build_message_span(trace_info, trace_id, str(user_id), links)
|
||||
|
||||
self.trace_client.add_span(message_span)
|
||||
|
||||
except Exception:
|
||||
logger.exception("[Tencent APM] Failed to process message trace")
|
||||
|
||||
def tool_trace(self, trace_info: ToolTraceInfo) -> None:
|
||||
"""Handle tool tracing."""
|
||||
try:
|
||||
parent_span_id = None
|
||||
trace_root_id = None
|
||||
|
||||
if trace_info.message_id:
|
||||
parent_span_id = TencentTraceUtils.convert_to_span_id(trace_info.message_id, "message")
|
||||
trace_root_id = trace_info.message_id
|
||||
|
||||
if parent_span_id and trace_root_id:
|
||||
trace_id = TencentTraceUtils.convert_to_trace_id(trace_root_id)
|
||||
|
||||
tool_span = TencentSpanBuilder.build_tool_span(trace_info, trace_id, parent_span_id)
|
||||
|
||||
self.trace_client.add_span(tool_span)
|
||||
|
||||
except Exception:
|
||||
logger.exception("[Tencent APM] Failed to process tool trace")
|
||||
|
||||
def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo) -> None:
|
||||
"""Handle dataset retrieval tracing."""
|
||||
try:
|
||||
parent_span_id = None
|
||||
trace_root_id = None
|
||||
|
||||
if trace_info.message_id:
|
||||
parent_span_id = TencentTraceUtils.convert_to_span_id(trace_info.message_id, "message")
|
||||
trace_root_id = trace_info.message_id
|
||||
|
||||
if parent_span_id and trace_root_id:
|
||||
trace_id = TencentTraceUtils.convert_to_trace_id(trace_root_id)
|
||||
|
||||
retrieval_span = TencentSpanBuilder.build_retrieval_span(trace_info, trace_id, parent_span_id)
|
||||
|
||||
self.trace_client.add_span(retrieval_span)
|
||||
|
||||
except Exception:
|
||||
logger.exception("[Tencent APM] Failed to process dataset retrieval trace")
|
||||
|
||||
def suggested_question_trace(self, trace_info: SuggestedQuestionTraceInfo) -> None:
|
||||
"""Handle suggested question tracing"""
|
||||
try:
|
||||
logger.info("[Tencent APM] Processing suggested question trace")
|
||||
|
||||
except Exception:
|
||||
logger.exception("[Tencent APM] Failed to process suggested question trace")
|
||||
|
||||
def _process_workflow_nodes(self, trace_info: WorkflowTraceInfo, trace_id: int) -> None:
|
||||
"""Process workflow node executions."""
|
||||
try:
|
||||
workflow_span_id = TencentTraceUtils.convert_to_span_id(trace_info.workflow_run_id, "workflow")
|
||||
|
||||
node_executions = self._get_workflow_node_executions(trace_info)
|
||||
|
||||
for node_execution in node_executions:
|
||||
try:
|
||||
node_span = self._build_workflow_node_span(node_execution, trace_id, trace_info, workflow_span_id)
|
||||
if node_span:
|
||||
self.trace_client.add_span(node_span)
|
||||
|
||||
if node_execution.node_type == NodeType.LLM:
|
||||
self._record_llm_metrics(node_execution)
|
||||
except Exception:
|
||||
logger.exception("[Tencent APM] Failed to process node execution: %s", node_execution.id)
|
||||
|
||||
except Exception:
|
||||
logger.exception("[Tencent APM] Failed to process workflow nodes")
|
||||
|
||||
def _build_workflow_node_span(
|
||||
self, node_execution: WorkflowNodeExecution, trace_id: int, trace_info: WorkflowTraceInfo, workflow_span_id: int
|
||||
) -> SpanData | None:
|
||||
"""Build span for different node types"""
|
||||
try:
|
||||
if node_execution.node_type == NodeType.LLM:
|
||||
return TencentSpanBuilder.build_workflow_llm_span(
|
||||
trace_id, workflow_span_id, trace_info, node_execution
|
||||
)
|
||||
elif node_execution.node_type == NodeType.KNOWLEDGE_RETRIEVAL:
|
||||
return TencentSpanBuilder.build_workflow_retrieval_span(
|
||||
trace_id, workflow_span_id, trace_info, node_execution
|
||||
)
|
||||
elif node_execution.node_type == NodeType.TOOL:
|
||||
return TencentSpanBuilder.build_workflow_tool_span(
|
||||
trace_id, workflow_span_id, trace_info, node_execution
|
||||
)
|
||||
else:
|
||||
# Handle all other node types as generic tasks
|
||||
return TencentSpanBuilder.build_workflow_task_span(
|
||||
trace_id, workflow_span_id, trace_info, node_execution
|
||||
)
|
||||
except Exception:
|
||||
logger.debug(
|
||||
"[Tencent APM] Error building span for node %s: %s",
|
||||
node_execution.id,
|
||||
node_execution.node_type,
|
||||
exc_info=True,
|
||||
)
|
||||
return None
|
||||
|
||||
def _get_workflow_node_executions(self, trace_info: WorkflowTraceInfo) -> list[WorkflowNodeExecution]:
|
||||
"""Retrieve workflow node executions from database."""
|
||||
try:
|
||||
session_maker = sessionmaker(bind=db.engine)
|
||||
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
app_id = trace_info.metadata.get("app_id")
|
||||
if not app_id:
|
||||
raise ValueError("No app_id found in trace_info metadata")
|
||||
|
||||
app_stmt = select(App).where(App.id == app_id)
|
||||
app = session.scalar(app_stmt)
|
||||
if not app:
|
||||
raise ValueError(f"App with id {app_id} not found")
|
||||
|
||||
if not app.created_by:
|
||||
raise ValueError(f"App with id {app_id} has no creator")
|
||||
|
||||
account_stmt = select(Account).where(Account.id == app.created_by)
|
||||
service_account = session.scalar(account_stmt)
|
||||
if not service_account:
|
||||
raise ValueError(f"Creator account not found for app {app_id}")
|
||||
|
||||
current_tenant = (
|
||||
session.query(TenantAccountJoin).filter_by(account_id=service_account.id, current=True).first()
|
||||
)
|
||||
if not current_tenant:
|
||||
raise ValueError(f"Current tenant not found for account {service_account.id}")
|
||||
|
||||
service_account.set_tenant_id(current_tenant.tenant_id)
|
||||
|
||||
repository = SQLAlchemyWorkflowNodeExecutionRepository(
|
||||
session_factory=session_maker,
|
||||
user=service_account,
|
||||
app_id=trace_info.metadata.get("app_id"),
|
||||
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
|
||||
)
|
||||
|
||||
executions = repository.get_by_workflow_run(workflow_run_id=trace_info.workflow_run_id)
|
||||
return list(executions)
|
||||
|
||||
except Exception:
|
||||
logger.exception("[Tencent APM] Failed to get workflow node executions")
|
||||
return []
|
||||
|
||||
def _get_user_id(self, trace_info: BaseTraceInfo) -> str:
|
||||
"""Get user ID from trace info."""
|
||||
try:
|
||||
tenant_id = None
|
||||
user_id = None
|
||||
|
||||
if isinstance(trace_info, (WorkflowTraceInfo, GenerateNameTraceInfo)):
|
||||
tenant_id = trace_info.tenant_id
|
||||
|
||||
if hasattr(trace_info, "metadata") and trace_info.metadata:
|
||||
user_id = trace_info.metadata.get("user_id")
|
||||
|
||||
if user_id and tenant_id:
|
||||
stmt = (
|
||||
select(Account.name)
|
||||
.join(TenantAccountJoin, Account.id == TenantAccountJoin.account_id)
|
||||
.where(Account.id == user_id, TenantAccountJoin.tenant_id == tenant_id)
|
||||
)
|
||||
|
||||
session_maker = sessionmaker(bind=db.engine)
|
||||
with session_maker() as session:
|
||||
account_name = session.scalar(stmt)
|
||||
return account_name or str(user_id)
|
||||
elif user_id:
|
||||
return str(user_id)
|
||||
|
||||
return "anonymous"
|
||||
|
||||
except Exception:
|
||||
logger.exception("[Tencent APM] Failed to get user ID")
|
||||
return "unknown"
|
||||
|
||||
def _record_llm_metrics(self, node_execution: WorkflowNodeExecution) -> None:
|
||||
"""Record LLM performance metrics"""
|
||||
try:
|
||||
if not hasattr(self.trace_client, "record_llm_duration"):
|
||||
return
|
||||
|
||||
process_data = node_execution.process_data or {}
|
||||
usage = process_data.get("usage", {})
|
||||
latency_s = float(usage.get("latency", 0.0))
|
||||
|
||||
if latency_s > 0:
|
||||
attributes = {
|
||||
"provider": process_data.get("model_provider", ""),
|
||||
"model": process_data.get("model_name", ""),
|
||||
"span_kind": "GENERATION",
|
||||
}
|
||||
self.trace_client.record_llm_duration(latency_s, attributes)
|
||||
|
||||
except Exception:
|
||||
logger.debug("[Tencent APM] Failed to record LLM metrics")
|
||||
|
||||
def __del__(self):
|
||||
"""Ensure proper cleanup on garbage collection."""
|
||||
try:
|
||||
if hasattr(self, "trace_client"):
|
||||
self.trace_client.shutdown()
|
||||
except Exception:
|
||||
pass
|
||||
65
api/core/ops/tencent_trace/utils.py
Normal file
65
api/core/ops/tencent_trace/utils.py
Normal file
@ -0,0 +1,65 @@
|
||||
"""
|
||||
Utility functions for Tencent APM tracing
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import random
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from typing import cast
|
||||
|
||||
from opentelemetry.trace import Link, SpanContext, TraceFlags
|
||||
|
||||
|
||||
class TencentTraceUtils:
|
||||
"""Utility class for common tracing operations."""
|
||||
|
||||
INVALID_SPAN_ID = 0x0000000000000000
|
||||
INVALID_TRACE_ID = 0x00000000000000000000000000000000
|
||||
|
||||
@staticmethod
|
||||
def convert_to_trace_id(uuid_v4: str | None) -> int:
|
||||
try:
|
||||
uuid_obj = uuid.UUID(uuid_v4) if uuid_v4 else uuid.uuid4()
|
||||
except Exception as e:
|
||||
raise ValueError(f"Invalid UUID input: {e}")
|
||||
return cast(int, uuid_obj.int)
|
||||
|
||||
@staticmethod
|
||||
def convert_to_span_id(uuid_v4: str | None, span_type: str) -> int:
|
||||
try:
|
||||
uuid_obj = uuid.UUID(uuid_v4) if uuid_v4 else uuid.uuid4()
|
||||
except Exception as e:
|
||||
raise ValueError(f"Invalid UUID input: {e}")
|
||||
combined_key = f"{uuid_obj.hex}-{span_type}"
|
||||
hash_bytes = hashlib.sha256(combined_key.encode("utf-8")).digest()
|
||||
return int.from_bytes(hash_bytes[:8], byteorder="big", signed=False)
|
||||
|
||||
@staticmethod
|
||||
def generate_span_id() -> int:
|
||||
span_id = random.getrandbits(64)
|
||||
while span_id == TencentTraceUtils.INVALID_SPAN_ID:
|
||||
span_id = random.getrandbits(64)
|
||||
return span_id
|
||||
|
||||
@staticmethod
|
||||
def convert_datetime_to_nanoseconds(start_time: datetime | None) -> int:
|
||||
if start_time is None:
|
||||
start_time = datetime.now()
|
||||
timestamp_in_seconds = start_time.timestamp()
|
||||
return int(timestamp_in_seconds * 1e9)
|
||||
|
||||
@staticmethod
|
||||
def create_link(trace_id_str: str) -> Link:
|
||||
try:
|
||||
trace_id = int(trace_id_str, 16) if len(trace_id_str) == 32 else cast(int, uuid.UUID(trace_id_str).int)
|
||||
except (ValueError, TypeError):
|
||||
trace_id = cast(int, uuid.uuid4().int)
|
||||
|
||||
span_context = SpanContext(
|
||||
trace_id=trace_id,
|
||||
span_id=TencentTraceUtils.INVALID_SPAN_ID,
|
||||
is_remote=False,
|
||||
trace_flags=TraceFlags(TraceFlags.SAMPLED),
|
||||
)
|
||||
return Link(span_context)
|
||||
@ -1,9 +1,24 @@
|
||||
"""
|
||||
Weaviate vector database implementation for Dify's RAG system.
|
||||
|
||||
This module provides integration with Weaviate vector database for storing and retrieving
|
||||
document embeddings used in retrieval-augmented generation workflows.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import uuid as _uuid
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import weaviate # type: ignore
|
||||
import weaviate
|
||||
import weaviate.classes.config as wc
|
||||
from pydantic import BaseModel, model_validator
|
||||
from weaviate.classes.data import DataObject
|
||||
from weaviate.classes.init import Auth
|
||||
from weaviate.classes.query import Filter, MetadataQuery
|
||||
from weaviate.exceptions import UnexpectedStatusCodeError
|
||||
|
||||
from configs import dify_config
|
||||
from core.rag.datasource.vdb.field import Field
|
||||
@ -15,265 +30,394 @@ from core.rag.models.document import Document
|
||||
from extensions.ext_redis import redis_client
|
||||
from models.dataset import Dataset
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WeaviateConfig(BaseModel):
|
||||
"""
|
||||
Configuration model for Weaviate connection settings.
|
||||
|
||||
Attributes:
|
||||
endpoint: Weaviate server endpoint URL
|
||||
api_key: Optional API key for authentication
|
||||
batch_size: Number of objects to batch per insert operation
|
||||
"""
|
||||
|
||||
endpoint: str
|
||||
api_key: str | None = None
|
||||
batch_size: int = 100
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def validate_config(cls, values: dict):
|
||||
def validate_config(cls, values: dict) -> dict:
|
||||
"""Validates that required configuration values are present."""
|
||||
if not values["endpoint"]:
|
||||
raise ValueError("config WEAVIATE_ENDPOINT is required")
|
||||
return values
|
||||
|
||||
|
||||
class WeaviateVector(BaseVector):
|
||||
"""
|
||||
Weaviate vector database implementation for document storage and retrieval.
|
||||
|
||||
Handles creation, insertion, deletion, and querying of document embeddings
|
||||
in a Weaviate collection.
|
||||
"""
|
||||
|
||||
def __init__(self, collection_name: str, config: WeaviateConfig, attributes: list):
|
||||
"""
|
||||
Initializes the Weaviate vector store.
|
||||
|
||||
Args:
|
||||
collection_name: Name of the Weaviate collection
|
||||
config: Weaviate configuration settings
|
||||
attributes: List of metadata attributes to store
|
||||
"""
|
||||
super().__init__(collection_name)
|
||||
self._client = self._init_client(config)
|
||||
self._attributes = attributes
|
||||
|
||||
def _init_client(self, config: WeaviateConfig) -> weaviate.Client:
|
||||
auth_config = weaviate.AuthApiKey(api_key=config.api_key or "")
|
||||
def _init_client(self, config: WeaviateConfig) -> weaviate.WeaviateClient:
|
||||
"""
|
||||
Initializes and returns a connected Weaviate client.
|
||||
|
||||
weaviate.connect.connection.has_grpc = False # ty: ignore [unresolved-attribute]
|
||||
Configures both HTTP and gRPC connections with proper authentication.
|
||||
"""
|
||||
p = urlparse(config.endpoint)
|
||||
host = p.hostname or config.endpoint.replace("https://", "").replace("http://", "")
|
||||
http_secure = p.scheme == "https"
|
||||
http_port = p.port or (443 if http_secure else 80)
|
||||
|
||||
try:
|
||||
client = weaviate.Client(
|
||||
url=config.endpoint, auth_client_secret=auth_config, timeout_config=(5, 60), startup_period=None
|
||||
)
|
||||
except Exception as exc:
|
||||
raise ConnectionError("Vector database connection error") from exc
|
||||
grpc_host = host
|
||||
grpc_secure = http_secure
|
||||
grpc_port = 443 if grpc_secure else 50051
|
||||
|
||||
client.batch.configure(
|
||||
# `batch_size` takes an `int` value to enable auto-batching
|
||||
# (`None` is used for manual batching)
|
||||
batch_size=config.batch_size,
|
||||
# dynamically update the `batch_size` based on import speed
|
||||
dynamic=True,
|
||||
# `timeout_retries` takes an `int` value to retry on time outs
|
||||
timeout_retries=3,
|
||||
client = weaviate.connect_to_custom(
|
||||
http_host=host,
|
||||
http_port=http_port,
|
||||
http_secure=http_secure,
|
||||
grpc_host=grpc_host,
|
||||
grpc_port=grpc_port,
|
||||
grpc_secure=grpc_secure,
|
||||
auth_credentials=Auth.api_key(config.api_key) if config.api_key else None,
|
||||
)
|
||||
|
||||
if not client.is_ready():
|
||||
raise ConnectionError("Vector database is not ready")
|
||||
|
||||
return client
|
||||
|
||||
def get_type(self) -> str:
|
||||
"""Returns the vector database type identifier."""
|
||||
return VectorType.WEAVIATE
|
||||
|
||||
def get_collection_name(self, dataset: Dataset) -> str:
|
||||
"""
|
||||
Retrieves or generates the collection name for a dataset.
|
||||
|
||||
Uses existing index structure if available, otherwise generates from dataset ID.
|
||||
"""
|
||||
if dataset.index_struct_dict:
|
||||
class_prefix: str = dataset.index_struct_dict["vector_store"]["class_prefix"]
|
||||
if not class_prefix.endswith("_Node"):
|
||||
# original class_prefix
|
||||
class_prefix += "_Node"
|
||||
|
||||
return class_prefix
|
||||
|
||||
dataset_id = dataset.id
|
||||
return Dataset.gen_collection_name_by_id(dataset_id)
|
||||
|
||||
def to_index_struct(self):
|
||||
def to_index_struct(self) -> dict:
|
||||
"""Returns the index structure dictionary for persistence."""
|
||||
return {"type": self.get_type(), "vector_store": {"class_prefix": self._collection_name}}
|
||||
|
||||
def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs):
|
||||
# create collection
|
||||
"""
|
||||
Creates a new collection and adds initial documents with embeddings.
|
||||
"""
|
||||
self._create_collection()
|
||||
# create vector
|
||||
self.add_texts(texts, embeddings)
|
||||
|
||||
def _create_collection(self):
|
||||
"""
|
||||
Creates the Weaviate collection with required schema if it doesn't exist.
|
||||
|
||||
Uses Redis locking to prevent concurrent creation attempts.
|
||||
"""
|
||||
lock_name = f"vector_indexing_lock_{self._collection_name}"
|
||||
with redis_client.lock(lock_name, timeout=20):
|
||||
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
|
||||
if redis_client.get(collection_exist_cache_key):
|
||||
cache_key = f"vector_indexing_{self._collection_name}"
|
||||
if redis_client.get(cache_key):
|
||||
return
|
||||
schema = self._default_schema(self._collection_name)
|
||||
if not self._client.schema.contains(schema):
|
||||
# create collection
|
||||
self._client.schema.create_class(schema)
|
||||
redis_client.set(collection_exist_cache_key, 1, ex=3600)
|
||||
|
||||
try:
|
||||
if not self._client.collections.exists(self._collection_name):
|
||||
self._client.collections.create(
|
||||
name=self._collection_name,
|
||||
properties=[
|
||||
wc.Property(
|
||||
name=Field.TEXT_KEY.value,
|
||||
data_type=wc.DataType.TEXT,
|
||||
tokenization=wc.Tokenization.WORD,
|
||||
),
|
||||
wc.Property(name="document_id", data_type=wc.DataType.TEXT),
|
||||
wc.Property(name="doc_id", data_type=wc.DataType.TEXT),
|
||||
wc.Property(name="chunk_index", data_type=wc.DataType.INT),
|
||||
],
|
||||
vector_config=wc.Configure.Vectors.self_provided(),
|
||||
)
|
||||
|
||||
self._ensure_properties()
|
||||
redis_client.set(cache_key, 1, ex=3600)
|
||||
except Exception as e:
|
||||
logger.exception("Error creating collection %s", self._collection_name)
|
||||
raise
|
||||
|
||||
def _ensure_properties(self) -> None:
|
||||
"""
|
||||
Ensures all required properties exist in the collection schema.
|
||||
|
||||
Adds missing properties if the collection exists but lacks them.
|
||||
"""
|
||||
if not self._client.collections.exists(self._collection_name):
|
||||
return
|
||||
|
||||
col = self._client.collections.use(self._collection_name)
|
||||
cfg = col.config.get()
|
||||
existing = {p.name for p in (cfg.properties or [])}
|
||||
|
||||
to_add = []
|
||||
if "document_id" not in existing:
|
||||
to_add.append(wc.Property(name="document_id", data_type=wc.DataType.TEXT))
|
||||
if "doc_id" not in existing:
|
||||
to_add.append(wc.Property(name="doc_id", data_type=wc.DataType.TEXT))
|
||||
if "chunk_index" not in existing:
|
||||
to_add.append(wc.Property(name="chunk_index", data_type=wc.DataType.INT))
|
||||
|
||||
for prop in to_add:
|
||||
try:
|
||||
col.config.add_property(prop)
|
||||
except Exception as e:
|
||||
logger.warning("Could not add property %s: %s", prop.name, e)
|
||||
|
||||
def _get_uuids(self, documents: list[Document]) -> list[str]:
|
||||
"""
|
||||
Generates deterministic UUIDs for documents based on their content.
|
||||
|
||||
Uses UUID5 with URL namespace to ensure consistent IDs for identical content.
|
||||
"""
|
||||
URL_NAMESPACE = _uuid.UUID("6ba7b811-9dad-11d1-80b4-00c04fd430c8")
|
||||
|
||||
uuids = []
|
||||
for doc in documents:
|
||||
uuid_val = _uuid.uuid5(URL_NAMESPACE, doc.page_content)
|
||||
uuids.append(str(uuid_val))
|
||||
|
||||
return uuids
|
||||
|
||||
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):
|
||||
"""
|
||||
Adds documents with their embeddings to the collection.
|
||||
|
||||
Batches insertions for efficiency and returns the list of inserted object IDs.
|
||||
"""
|
||||
uuids = self._get_uuids(documents)
|
||||
texts = [d.page_content for d in documents]
|
||||
metadatas = [d.metadata for d in documents]
|
||||
|
||||
ids = []
|
||||
col = self._client.collections.use(self._collection_name)
|
||||
objs: list[DataObject] = []
|
||||
ids_out: list[str] = []
|
||||
|
||||
with self._client.batch as batch:
|
||||
for i, text in enumerate(texts):
|
||||
data_properties = {Field.TEXT_KEY: text}
|
||||
if metadatas is not None:
|
||||
# metadata maybe None
|
||||
for key, val in (metadatas[i] or {}).items():
|
||||
data_properties[key] = self._json_serializable(val)
|
||||
for i, text in enumerate(texts):
|
||||
props: dict[str, Any] = {Field.TEXT_KEY.value: text}
|
||||
meta = metadatas[i] or {}
|
||||
for k, v in meta.items():
|
||||
props[k] = self._json_serializable(v)
|
||||
|
||||
batch.add_data_object(
|
||||
data_object=data_properties,
|
||||
class_name=self._collection_name,
|
||||
uuid=uuids[i],
|
||||
vector=embeddings[i] if embeddings else None,
|
||||
candidate = uuids[i] if uuids else None
|
||||
uid = candidate if (candidate and self._is_uuid(candidate)) else str(_uuid.uuid4())
|
||||
ids_out.append(uid)
|
||||
|
||||
vec_payload = None
|
||||
if embeddings and i < len(embeddings) and embeddings[i]:
|
||||
vec_payload = {"default": embeddings[i]}
|
||||
|
||||
objs.append(
|
||||
DataObject(
|
||||
uuid=uid,
|
||||
properties=props, # type: ignore[arg-type] # mypy incorrectly infers DataObject signature
|
||||
vector=vec_payload,
|
||||
)
|
||||
ids.append(uuids[i])
|
||||
return ids
|
||||
)
|
||||
|
||||
def delete_by_metadata_field(self, key: str, value: str):
|
||||
# check whether the index already exists
|
||||
schema = self._default_schema(self._collection_name)
|
||||
if self._client.schema.contains(schema):
|
||||
where_filter = {"operator": "Equal", "path": [key], "valueText": value}
|
||||
batch_size = max(1, int(dify_config.WEAVIATE_BATCH_SIZE or 100))
|
||||
with col.batch.dynamic() as batch:
|
||||
for obj in objs:
|
||||
batch.add_object(properties=obj.properties, uuid=obj.uuid, vector=obj.vector)
|
||||
|
||||
self._client.batch.delete_objects(class_name=self._collection_name, where=where_filter, output="minimal")
|
||||
return ids_out
|
||||
|
||||
def _is_uuid(self, val: str) -> bool:
|
||||
"""Validates whether a string is a valid UUID format."""
|
||||
try:
|
||||
_uuid.UUID(str(val))
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def delete_by_metadata_field(self, key: str, value: str) -> None:
|
||||
"""Deletes all objects matching a specific metadata field value."""
|
||||
if not self._client.collections.exists(self._collection_name):
|
||||
return
|
||||
|
||||
col = self._client.collections.use(self._collection_name)
|
||||
col.data.delete_many(where=Filter.by_property(key).equal(value))
|
||||
|
||||
def delete(self):
|
||||
# check whether the index already exists
|
||||
schema = self._default_schema(self._collection_name)
|
||||
if self._client.schema.contains(schema):
|
||||
self._client.schema.delete_class(self._collection_name)
|
||||
"""Deletes the entire collection from Weaviate."""
|
||||
if self._client.collections.exists(self._collection_name):
|
||||
self._client.collections.delete(self._collection_name)
|
||||
|
||||
def text_exists(self, id: str) -> bool:
|
||||
collection_name = self._collection_name
|
||||
schema = self._default_schema(self._collection_name)
|
||||
|
||||
# check whether the index already exists
|
||||
if not self._client.schema.contains(schema):
|
||||
"""Checks if a document with the given doc_id exists in the collection."""
|
||||
if not self._client.collections.exists(self._collection_name):
|
||||
return False
|
||||
result = (
|
||||
self._client.query.get(collection_name)
|
||||
.with_additional(["id"])
|
||||
.with_where(
|
||||
{
|
||||
"path": ["doc_id"],
|
||||
"operator": "Equal",
|
||||
"valueText": id,
|
||||
}
|
||||
)
|
||||
.with_limit(1)
|
||||
.do()
|
||||
|
||||
col = self._client.collections.use(self._collection_name)
|
||||
res = col.query.fetch_objects(
|
||||
filters=Filter.by_property("doc_id").equal(id),
|
||||
limit=1,
|
||||
return_properties=["doc_id"],
|
||||
)
|
||||
|
||||
if "errors" in result:
|
||||
raise ValueError(f"Error during query: {result['errors']}")
|
||||
return len(res.objects) > 0
|
||||
|
||||
entries = result["data"]["Get"][collection_name]
|
||||
if len(entries) == 0:
|
||||
return False
|
||||
def delete_by_ids(self, ids: list[str]) -> None:
|
||||
"""
|
||||
Deletes objects by their UUID identifiers.
|
||||
|
||||
return True
|
||||
Silently ignores 404 errors for non-existent IDs.
|
||||
"""
|
||||
if not self._client.collections.exists(self._collection_name):
|
||||
return
|
||||
|
||||
def delete_by_ids(self, ids: list[str]):
|
||||
# check whether the index already exists
|
||||
schema = self._default_schema(self._collection_name)
|
||||
if self._client.schema.contains(schema):
|
||||
for uuid in ids:
|
||||
try:
|
||||
self._client.data_object.delete(
|
||||
class_name=self._collection_name,
|
||||
uuid=uuid,
|
||||
)
|
||||
except weaviate.UnexpectedStatusCodeException as e:
|
||||
# tolerate not found error
|
||||
if e.status_code != 404:
|
||||
raise e
|
||||
col = self._client.collections.use(self._collection_name)
|
||||
|
||||
for uid in ids:
|
||||
try:
|
||||
col.data.delete_by_id(uid)
|
||||
except UnexpectedStatusCodeError as e:
|
||||
if getattr(e, "status_code", None) != 404:
|
||||
raise
|
||||
|
||||
def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]:
|
||||
"""Look up similar documents by embedding vector in Weaviate."""
|
||||
collection_name = self._collection_name
|
||||
properties = self._attributes
|
||||
properties.append(Field.TEXT_KEY)
|
||||
query_obj = self._client.query.get(collection_name, properties)
|
||||
"""
|
||||
Performs vector similarity search using the provided query vector.
|
||||
|
||||
vector = {"vector": query_vector}
|
||||
document_ids_filter = kwargs.get("document_ids_filter")
|
||||
if document_ids_filter:
|
||||
operands = []
|
||||
for document_id_filter in document_ids_filter:
|
||||
operands.append({"path": ["document_id"], "operator": "Equal", "valueText": document_id_filter})
|
||||
where_filter = {"operator": "Or", "operands": operands}
|
||||
query_obj = query_obj.with_where(where_filter)
|
||||
result = (
|
||||
query_obj.with_near_vector(vector)
|
||||
.with_limit(kwargs.get("top_k", 4))
|
||||
.with_additional(["vector", "distance"])
|
||||
.do()
|
||||
Filters by document IDs if provided and applies score threshold.
|
||||
Returns documents sorted by relevance score.
|
||||
"""
|
||||
if not self._client.collections.exists(self._collection_name):
|
||||
return []
|
||||
|
||||
col = self._client.collections.use(self._collection_name)
|
||||
props = list({*self._attributes, "document_id", Field.TEXT_KEY.value})
|
||||
|
||||
where = None
|
||||
doc_ids = kwargs.get("document_ids_filter") or []
|
||||
if doc_ids:
|
||||
ors = [Filter.by_property("document_id").equal(x) for x in doc_ids]
|
||||
where = ors[0]
|
||||
for f in ors[1:]:
|
||||
where = where | f
|
||||
|
||||
top_k = int(kwargs.get("top_k", 4))
|
||||
score_threshold = float(kwargs.get("score_threshold") or 0.0)
|
||||
|
||||
res = col.query.near_vector(
|
||||
near_vector=query_vector,
|
||||
limit=top_k,
|
||||
return_properties=props,
|
||||
return_metadata=MetadataQuery(distance=True),
|
||||
include_vector=False,
|
||||
filters=where,
|
||||
target_vector="default",
|
||||
)
|
||||
if "errors" in result:
|
||||
raise ValueError(f"Error during query: {result['errors']}")
|
||||
|
||||
docs_and_scores = []
|
||||
for res in result["data"]["Get"][collection_name]:
|
||||
text = res.pop(Field.TEXT_KEY)
|
||||
score = 1 - res["_additional"]["distance"]
|
||||
docs_and_scores.append((Document(page_content=text, metadata=res), score))
|
||||
docs: list[Document] = []
|
||||
for obj in res.objects:
|
||||
properties = dict(obj.properties or {})
|
||||
text = properties.pop(Field.TEXT_KEY.value, "")
|
||||
distance = (obj.metadata.distance if obj.metadata else None) or 1.0
|
||||
score = 1.0 - distance
|
||||
|
||||
docs = []
|
||||
for doc, score in docs_and_scores:
|
||||
score_threshold = float(kwargs.get("score_threshold") or 0.0)
|
||||
# check score threshold
|
||||
if score >= score_threshold:
|
||||
if doc.metadata is not None:
|
||||
doc.metadata["score"] = score
|
||||
docs.append(doc)
|
||||
# Sort the documents by score in descending order
|
||||
docs = sorted(docs, key=lambda x: x.metadata.get("score", 0) if x.metadata else 0, reverse=True)
|
||||
if score > score_threshold:
|
||||
properties["score"] = score
|
||||
docs.append(Document(page_content=text, metadata=properties))
|
||||
|
||||
docs.sort(key=lambda d: d.metadata.get("score", 0.0), reverse=True)
|
||||
return docs
|
||||
|
||||
def search_by_full_text(self, query: str, **kwargs: Any) -> list[Document]:
|
||||
"""Return docs using BM25F.
|
||||
|
||||
Args:
|
||||
query: Text to look up documents similar to.
|
||||
|
||||
Returns:
|
||||
List of Documents most similar to the query.
|
||||
"""
|
||||
collection_name = self._collection_name
|
||||
content: dict[str, Any] = {"concepts": [query]}
|
||||
properties = self._attributes
|
||||
properties.append(Field.TEXT_KEY)
|
||||
if kwargs.get("search_distance"):
|
||||
content["certainty"] = kwargs.get("search_distance")
|
||||
query_obj = self._client.query.get(collection_name, properties)
|
||||
document_ids_filter = kwargs.get("document_ids_filter")
|
||||
if document_ids_filter:
|
||||
operands = []
|
||||
for document_id_filter in document_ids_filter:
|
||||
operands.append({"path": ["document_id"], "operator": "Equal", "valueText": document_id_filter})
|
||||
where_filter = {"operator": "Or", "operands": operands}
|
||||
query_obj = query_obj.with_where(where_filter)
|
||||
query_obj = query_obj.with_additional(["vector"])
|
||||
properties = ["text"]
|
||||
result = query_obj.with_bm25(query=query, properties=properties).with_limit(kwargs.get("top_k", 4)).do()
|
||||
if "errors" in result:
|
||||
raise ValueError(f"Error during query: {result['errors']}")
|
||||
docs = []
|
||||
for res in result["data"]["Get"][collection_name]:
|
||||
text = res.pop(Field.TEXT_KEY)
|
||||
additional = res.pop("_additional")
|
||||
docs.append(Document(page_content=text, vector=additional["vector"], metadata=res))
|
||||
Performs BM25 full-text search on document content.
|
||||
|
||||
Filters by document IDs if provided and returns matching documents with vectors.
|
||||
"""
|
||||
if not self._client.collections.exists(self._collection_name):
|
||||
return []
|
||||
|
||||
col = self._client.collections.use(self._collection_name)
|
||||
props = list({*self._attributes, Field.TEXT_KEY.value})
|
||||
|
||||
where = None
|
||||
doc_ids = kwargs.get("document_ids_filter") or []
|
||||
if doc_ids:
|
||||
ors = [Filter.by_property("document_id").equal(x) for x in doc_ids]
|
||||
where = ors[0]
|
||||
for f in ors[1:]:
|
||||
where = where | f
|
||||
|
||||
top_k = int(kwargs.get("top_k", 4))
|
||||
|
||||
res = col.query.bm25(
|
||||
query=query,
|
||||
query_properties=[Field.TEXT_KEY.value],
|
||||
limit=top_k,
|
||||
return_properties=props,
|
||||
include_vector=True,
|
||||
filters=where,
|
||||
)
|
||||
|
||||
docs: list[Document] = []
|
||||
for obj in res.objects:
|
||||
properties = dict(obj.properties or {})
|
||||
text = properties.pop(Field.TEXT_KEY.value, "")
|
||||
|
||||
vec = obj.vector
|
||||
if isinstance(vec, dict):
|
||||
vec = vec.get("default") or next(iter(vec.values()), None)
|
||||
|
||||
docs.append(Document(page_content=text, vector=vec, metadata=properties))
|
||||
return docs
|
||||
|
||||
def _default_schema(self, index_name: str):
|
||||
return {
|
||||
"class": index_name,
|
||||
"properties": [
|
||||
{
|
||||
"name": "text",
|
||||
"dataType": ["text"],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
def _json_serializable(self, value: Any):
|
||||
def _json_serializable(self, value: Any) -> Any:
|
||||
"""Converts values to JSON-serializable format, handling datetime objects."""
|
||||
if isinstance(value, datetime.datetime):
|
||||
return value.isoformat()
|
||||
return value
|
||||
|
||||
|
||||
class WeaviateVectorFactory(AbstractVectorFactory):
|
||||
"""Factory class for creating WeaviateVector instances."""
|
||||
|
||||
def init_vector(self, dataset: Dataset, attributes: list, embeddings: Embeddings) -> WeaviateVector:
|
||||
"""
|
||||
Initializes a WeaviateVector instance for the given dataset.
|
||||
|
||||
Uses existing collection name from dataset index structure or generates a new one.
|
||||
Updates dataset index structure if not already set.
|
||||
"""
|
||||
if dataset.index_struct_dict:
|
||||
class_prefix: str = dataset.index_struct_dict["vector_store"]["class_prefix"]
|
||||
collection_name = class_prefix
|
||||
@ -281,7 +425,6 @@ class WeaviateVectorFactory(AbstractVectorFactory):
|
||||
dataset_id = dataset.id
|
||||
collection_name = Dataset.gen_collection_name_by_id(dataset_id)
|
||||
dataset.index_struct = json.dumps(self.gen_index_struct_dict(VectorType.WEAVIATE, collection_name))
|
||||
|
||||
return WeaviateVector(
|
||||
collection_name=collection_name,
|
||||
config=WeaviateConfig(
|
||||
|
||||
Reference in New Issue
Block a user