mirror of
https://github.com/langgenius/dify.git
synced 2026-05-21 09:17:27 +08:00
Merge remote-tracking branch 'origin/main' into chore/upgrade-xyflow-v12
This commit is contained in:
@ -13,6 +13,8 @@ from langfuse.api import (
|
||||
TraceBody,
|
||||
)
|
||||
from langfuse.api.commons.types.usage import Usage
|
||||
from opentelemetry.sdk.resources import Resource
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from core.ops.base_trace_instance import BaseTraceInstance
|
||||
@ -52,13 +54,40 @@ class LangFuseDataTrace(BaseTraceInstance):
|
||||
langfuse_config: LangfuseConfig,
|
||||
):
|
||||
super().__init__(langfuse_config)
|
||||
# Isolated TracerProvider prevents the langfuse v3 SDK from attaching its
|
||||
# SpanProcessor to the global OpenTelemetry TracerProvider, which would
|
||||
# otherwise siphon every Flask/Celery/SQLAlchemy span in the process into
|
||||
# this tenant's Langfuse project. See langfuse upgrade guide v2 -> v3.
|
||||
self._tracer_provider: TracerProvider | None = TracerProvider(
|
||||
resource=Resource.create({"service.name": "dify-langfuse-app-trace"}),
|
||||
)
|
||||
self.langfuse_client = Langfuse(
|
||||
public_key=langfuse_config.public_key,
|
||||
secret_key=langfuse_config.secret_key,
|
||||
host=langfuse_config.host,
|
||||
tracer_provider=self._tracer_provider,
|
||||
)
|
||||
self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
|
||||
|
||||
def close(self) -> None:
|
||||
"""Flush and shut down the isolated TracerProvider.
|
||||
|
||||
Called explicitly when the trace instance is evicted from the cache, or
|
||||
implicitly via ``__del__`` on garbage collection. Idempotent.
|
||||
"""
|
||||
provider = getattr(self, "_tracer_provider", None)
|
||||
if provider is None:
|
||||
return
|
||||
try:
|
||||
provider.shutdown()
|
||||
except Exception:
|
||||
logger.debug("Failed to shut down Langfuse TracerProvider", exc_info=True)
|
||||
finally:
|
||||
self._tracer_provider = None
|
||||
|
||||
def __del__(self) -> None:
|
||||
self.close()
|
||||
|
||||
@staticmethod
|
||||
def _get_completion_start_time(
|
||||
start_time: datetime | None, time_to_first_token: float | int | None
|
||||
|
||||
@ -50,20 +50,92 @@ def trace_instance(langfuse_config, monkeypatch: pytest.MonkeyPatch):
|
||||
|
||||
|
||||
def test_init(langfuse_config, monkeypatch: pytest.MonkeyPatch):
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
|
||||
mock_langfuse = MagicMock()
|
||||
monkeypatch.setattr("dify_trace_langfuse.langfuse_trace.Langfuse", mock_langfuse)
|
||||
monkeypatch.setenv("FILES_URL", "http://test.url")
|
||||
|
||||
instance = LangFuseDataTrace(langfuse_config)
|
||||
|
||||
mock_langfuse.assert_called_once_with(
|
||||
public_key=langfuse_config.public_key,
|
||||
secret_key=langfuse_config.secret_key,
|
||||
host=langfuse_config.host,
|
||||
)
|
||||
mock_langfuse.assert_called_once()
|
||||
kwargs = mock_langfuse.call_args.kwargs
|
||||
assert kwargs["public_key"] == langfuse_config.public_key
|
||||
assert kwargs["secret_key"] == langfuse_config.secret_key
|
||||
assert kwargs["host"] == langfuse_config.host
|
||||
assert isinstance(kwargs["tracer_provider"], TracerProvider)
|
||||
assert kwargs["tracer_provider"] is instance._tracer_provider
|
||||
assert instance.file_base_url == "http://test.url"
|
||||
|
||||
|
||||
def test_init_passes_isolated_tracer_provider_to_langfuse(langfuse_config, monkeypatch: pytest.MonkeyPatch):
|
||||
"""Regression test for langfuse v3 SDK side effect.
|
||||
|
||||
Without an explicit ``tracer_provider=`` kwarg, the Langfuse v3 SDK
|
||||
attaches a ``LangfuseSpanProcessor`` to the *global* OpenTelemetry
|
||||
TracerProvider — siphoning every Flask / Celery / SQLAlchemy span in the
|
||||
process into the tenant's Langfuse project. See langfuse upgrade-path
|
||||
docs (v2 -> v3) and GitHub discussion #9136.
|
||||
|
||||
The fix is to construct an isolated ``TracerProvider`` and pass it via
|
||||
``tracer_provider=`` so the SDK never touches the global one.
|
||||
"""
|
||||
from opentelemetry import trace as otel_trace_api
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
|
||||
captured: dict[str, object] = {}
|
||||
|
||||
def fake_langfuse(**kwargs):
|
||||
captured.update(kwargs)
|
||||
return MagicMock()
|
||||
|
||||
monkeypatch.setattr("dify_trace_langfuse.langfuse_trace.Langfuse", fake_langfuse)
|
||||
|
||||
instance = LangFuseDataTrace(langfuse_config)
|
||||
|
||||
# 1. tracer_provider kwarg must be supplied (drives the no-pollution branch
|
||||
# in langfuse.LangfuseResourceManager._init_tracer_provider).
|
||||
assert "tracer_provider" in captured, (
|
||||
"Langfuse() must receive an explicit tracer_provider=; without it the "
|
||||
"v3 SDK attaches its SpanProcessor to the global OTEL TracerProvider."
|
||||
)
|
||||
|
||||
passed_provider = captured["tracer_provider"]
|
||||
assert isinstance(passed_provider, TracerProvider)
|
||||
assert passed_provider is instance._tracer_provider
|
||||
|
||||
# 2. The instance's provider must not be the global one.
|
||||
global_provider = otel_trace_api.get_tracer_provider()
|
||||
assert passed_provider is not global_provider
|
||||
|
||||
|
||||
def test_close_shuts_down_tracer_provider(langfuse_config, monkeypatch: pytest.MonkeyPatch):
|
||||
monkeypatch.setattr("dify_trace_langfuse.langfuse_trace.Langfuse", lambda **kwargs: MagicMock())
|
||||
|
||||
instance = LangFuseDataTrace(langfuse_config)
|
||||
provider = instance._tracer_provider
|
||||
provider_shutdown = MagicMock()
|
||||
monkeypatch.setattr(provider, "shutdown", provider_shutdown)
|
||||
|
||||
instance.close()
|
||||
|
||||
provider_shutdown.assert_called_once()
|
||||
assert instance._tracer_provider is None
|
||||
|
||||
|
||||
def test_close_is_idempotent(langfuse_config, monkeypatch: pytest.MonkeyPatch):
|
||||
monkeypatch.setattr("dify_trace_langfuse.langfuse_trace.Langfuse", lambda **kwargs: MagicMock())
|
||||
|
||||
instance = LangFuseDataTrace(langfuse_config)
|
||||
provider_shutdown = MagicMock()
|
||||
monkeypatch.setattr(instance._tracer_provider, "shutdown", provider_shutdown)
|
||||
|
||||
instance.close()
|
||||
instance.close()
|
||||
|
||||
provider_shutdown.assert_called_once()
|
||||
|
||||
|
||||
def test_trace_dispatch(trace_instance, monkeypatch: pytest.MonkeyPatch):
|
||||
methods = [
|
||||
"workflow_trace",
|
||||
|
||||
12
api/uv.lock
generated
12
api/uv.lock
generated
@ -382,14 +382,14 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "authlib"
|
||||
version = "1.6.11"
|
||||
version = "1.6.12"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "cryptography" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/28/10/b325d58ffe86815b399334a101e63bc6fa4e1953921cb23703b48a0a0220/authlib-1.6.11.tar.gz", hash = "sha256:64db35b9b01aeccb4715a6c9a6613a06f2bd7be2ab9d2eb89edd1dfc7580a38f", size = 165359, upload-time = "2026-04-16T07:22:50.279Z" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/d3/30/6691fdc63b35f54a5a65e04fa1e59d827f4d4e8f4a39678ba7d3088ce0c8/authlib-1.6.12.tar.gz", hash = "sha256:0656d8482f28fc8221929d5f35b2bde5d13e10555ebc06b4561b0d622e83b1bd", size = 165368, upload-time = "2026-05-04T08:11:31.826Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/57/2f/55fca558f925a51db046e5b929deb317ddb05afed74b22d89f4eca578980/authlib-1.6.11-py2.py3-none-any.whl", hash = "sha256:c8687a9a26451c51a34a06fa17bb97cb15bba46a6a626755e2d7f50da8bff3e3", size = 244469, upload-time = "2026-04-16T07:22:48.413Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/cd/51/9b0b5cd4cf683a02db937a6f9bbebcdc9c56558a7bb3763ce7d3512103c3/authlib-1.6.12-py2.py3-none-any.whl", hash = "sha256:e9229ad7fde610b139dd12f5edbe97eab9ee78bfb85691247e767727850b99ab", size = 244473, upload-time = "2026-05-04T08:11:30.354Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -3607,7 +3607,7 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "langsmith"
|
||||
version = "0.7.31"
|
||||
version = "0.7.38"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "httpx" },
|
||||
@ -3620,9 +3620,9 @@ dependencies = [
|
||||
{ name = "xxhash" },
|
||||
{ name = "zstandard" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/e6/11/696019490992db5c87774dc20515529ef42a01e1d770fb754ed6d9b12fb0/langsmith-0.7.31.tar.gz", hash = "sha256:331ee4f7c26bb5be4022b9859b7d7b122cbf8c9d01d9f530114c1914b0349ffb", size = 1178480, upload-time = "2026-04-14T17:55:41.242Z" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/af/c9/b3e54cfcb480876dfe33ecfdd64feeb621a86d9e6f4a6b9eb46851807018/langsmith-0.7.38.tar.gz", hash = "sha256:0db529b768d66c45f22fe959a0af7151342704fefafdecf3c60b14097c14fdb1", size = 4431914, upload-time = "2026-04-29T00:21:42.865Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/1d/a1/a013cf458c301cda86a213dd153ce0a01c93f1ab5833f951e6a44c9763ce/langsmith-0.7.31-py3-none-any.whl", hash = "sha256:0291d49203f6e80dda011af1afda61eb0595a4d697adb684590a8805e1d61fb6", size = 373276, upload-time = "2026-04-14T17:55:39.677Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/86/bc/a19d0a6d5575c637796675831dbef3555568e84d913f14ec579f92162ffa/langsmith-0.7.38-py3-none-any.whl", hash = "sha256:9c400ad508c0e4edc37bd55987047c6b8aac36ddd55f6096e3806f4d6a100618", size = 392310, upload-time = "2026-04-29T00:21:40.534Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
Reference in New Issue
Block a user