diff --git a/api/db/services/dialog_service.py b/api/db/services/dialog_service.py index aa6d25509..66a6060d2 100644 --- a/api/db/services/dialog_service.py +++ b/api/db/services/dialog_service.py @@ -572,6 +572,7 @@ async def async_chat(dialog, messages, stream=True, **kwargs): check_llm_ts = timer() langfuse_tracer = None + langfuse_generation = None trace_context = {} langfuse_keys = TenantLangfuseService.filter_by_tenant(tenant_id=dialog.tenant_id) if langfuse_keys: @@ -783,7 +784,7 @@ async def async_chat(dialog, messages, stream=True, **kwargs): gen_conf["max_tokens"] = min(gen_conf["max_tokens"], max_tokens - used_token_count) async def decorate_answer(answer): - nonlocal embd_mdl, prompt_config, knowledges, kwargs, kbinfos, prompt, retrieval_ts, questions, langfuse_tracer + nonlocal embd_mdl, prompt_config, knowledges, kwargs, kbinfos, prompt, retrieval_ts, questions, langfuse_generation refs = [] ans = answer.split("") @@ -855,8 +856,8 @@ async def async_chat(dialog, messages, stream=True, **kwargs): f" - Token speed: {int(tk_num / (generate_result_time_cost / 1000.0))}/s" ) - # Add a condition check to call the end method only if langfuse_tracer exists - if langfuse_tracer and "langfuse_generation" in locals(): + # Add a condition check to call the end method only if langfuse_generation exists + if langfuse_generation is not None: langfuse_output = "\n" + re.sub(r"^.*?(### Query:.*)", r"\1", prompt, flags=re.DOTALL) langfuse_output = {"time_elapsed:": re.sub(r"\n", " \n", langfuse_output), "created_at": time.time()} langfuse_generation.update( @@ -872,9 +873,18 @@ async def async_chat(dialog, messages, stream=True, **kwargs): return {"answer": think + answer, "reference": refs, "prompt": re.sub(r"\n", " \n", prompt), "created_at": time.time()} if langfuse_tracer: - langfuse_generation = langfuse_tracer.start_generation( - trace_context=trace_context, name="chat", model=llm_model_config["llm_name"], input={"prompt": prompt, "prompt4citation": prompt4citation, "messages": msg} - ) + try: + langfuse_generation = langfuse_tracer.start_observation( + as_type="generation", + trace_context=trace_context, + name="chat", + model=llm_model_config["llm_name"], + input={"prompt": prompt, "prompt4citation": prompt4citation, "messages": msg}, + ) + except Exception as e: # noqa: BLE001 - tracing must not break chat flow + logger.warning("Langfuse start_observation failed; continuing without tracing: %s", e) + langfuse_tracer = None + langfuse_generation = None if stream: if llm_type == "chat": diff --git a/test/unit_test/api/db/services/test_dialog_service_final_answer.py b/test/unit_test/api/db/services/test_dialog_service_final_answer.py index d38d15705..30fb1e4c3 100644 --- a/test/unit_test/api/db/services/test_dialog_service_final_answer.py +++ b/test/unit_test/api/db/services/test_dialog_service_final_answer.py @@ -140,6 +140,41 @@ class _StubRetriever: return answer, set() +class _FakeLangfuseObservation: + def __init__(self): + self.updates = [] + self.ended = False + + def update(self, **kwargs): + self.updates.append(kwargs) + + def end(self): + self.ended = True + + +class _FakeLangfuseClient: + instances = [] + fail_start_observation = False + + def __init__(self, **kwargs): + self.init_kwargs = kwargs + self.observation_kwargs = None + self.observation = _FakeLangfuseObservation() + self.instances.append(self) + + def auth_check(self): + return True + + def create_trace_id(self): + return "trace-id" + + def start_observation(self, **kwargs): + if self.fail_start_observation: + raise RuntimeError("langfuse unavailable") + self.observation_kwargs = kwargs + return self.observation + + def _collect(async_gen): async def _run(): return [ev async for ev in async_gen] @@ -356,3 +391,132 @@ def test_async_chat_final_event_carries_decorated_answer(monkeypatch): assert llm_answer in final["answer"], ( f"LLM answer text expected in final event, got: {final['answer']!r}" ) + + +@pytest.mark.p2 +def test_async_chat_langfuse_uses_start_observation(monkeypatch): + """ + Langfuse v4 exposes start_observation(as_type="generation"), not + start_generation(). Keep async_chat() on the migrated API. + """ + _FakeLangfuseClient.instances = [] + monkeypatch.setattr(_FakeLangfuseClient, "fail_start_observation", False) + llm_answer = "RAGFlow traces chat answers through Langfuse." + chat_mdl = _StreamingChatModel(llm_answer) + retriever = _StubRetriever() + + monkeypatch.setattr( + dialog_service.TenantLLMService, "llm_id2llm_type", lambda _llm_id: "chat" + ) + monkeypatch.setattr( + dialog_service.TenantLLMService, "get_model_config", + lambda _tid, _type, _llm_id: _LLM_CONFIG, + ) + monkeypatch.setattr( + dialog_service.TenantLangfuseService, "filter_by_tenant", + lambda tenant_id: SimpleNamespace( + public_key="public", + secret_key="secret", + host="http://langfuse.local", + ), + ) + monkeypatch.setattr(dialog_service, "Langfuse", _FakeLangfuseClient) + monkeypatch.setattr( + dialog_service, + "get_models", + lambda _dialog: ([_KB], chat_mdl, None, chat_mdl, None), + ) + monkeypatch.setattr( + dialog_service.KnowledgebaseService, "get_field_map", lambda _kb_ids: {} + ) + monkeypatch.setattr( + dialog_service.KnowledgebaseService, "get_by_ids", lambda _ids: [_KB] + ) + monkeypatch.setattr(dialog_service.settings, "retriever", retriever, raising=False) + monkeypatch.setattr(dialog_service, "label_question", lambda _q, _kbs: "") + monkeypatch.setattr( + dialog_service, + "kb_prompt", + lambda _kbinfos, _max_tokens, **_kw: ["RAGFlow is a RAG engine."], + ) + + dialog = _make_dialog(chat_mdl) + messages = [{"role": "user", "content": "What is RAGFlow?"}] + + events = _collect(dialog_service.async_chat(dialog, messages, stream=True, quote=True)) + + assert any(e.get("final") is True for e in events) + assert len(_FakeLangfuseClient.instances) == 1 + langfuse = _FakeLangfuseClient.instances[0] + assert langfuse.observation_kwargs["as_type"] == "generation" + assert langfuse.observation_kwargs["trace_context"] == {"trace_id": "trace-id"} + assert langfuse.observation_kwargs["name"] == "chat" + assert langfuse.observation_kwargs["model"] == _LLM_CONFIG["llm_name"] + input_payload = langfuse.observation_kwargs["input"] + assert set(input_payload.keys()) == {"prompt", "prompt4citation", "messages"} + assert input_payload["prompt"] == "You are helpful. \n------\nRAGFlow is a RAG engine." + assert input_payload["prompt4citation"] == dialog_service.citation_prompt() + assert input_payload["messages"][0]["role"] == "system" + assert input_payload["messages"][0]["content"] == input_payload["prompt"] + assert input_payload["messages"][1] == {"role": "user", "content": "What is RAGFlow?"} + assert langfuse.observation.ended is True + + +@pytest.mark.p2 +def test_async_chat_continues_when_langfuse_observation_start_fails(monkeypatch): + """ + Langfuse tracing is best-effort; observation startup errors must not break + chat responses. + """ + _FakeLangfuseClient.instances = [] + monkeypatch.setattr(_FakeLangfuseClient, "fail_start_observation", True) + llm_answer = "RAGFlow still answers when tracing is unavailable." + chat_mdl = _StreamingChatModel(llm_answer) + retriever = _StubRetriever() + + monkeypatch.setattr( + dialog_service.TenantLLMService, "llm_id2llm_type", lambda _llm_id: "chat" + ) + monkeypatch.setattr( + dialog_service.TenantLLMService, "get_model_config", + lambda _tid, _type, _llm_id: _LLM_CONFIG, + ) + monkeypatch.setattr( + dialog_service.TenantLangfuseService, "filter_by_tenant", + lambda tenant_id: SimpleNamespace( + public_key="public", + secret_key="secret", + host="http://langfuse.local", + ), + ) + monkeypatch.setattr(dialog_service, "Langfuse", _FakeLangfuseClient) + monkeypatch.setattr( + dialog_service, + "get_models", + lambda _dialog: ([_KB], chat_mdl, None, chat_mdl, None), + ) + monkeypatch.setattr( + dialog_service.KnowledgebaseService, "get_field_map", lambda _kb_ids: {} + ) + monkeypatch.setattr( + dialog_service.KnowledgebaseService, "get_by_ids", lambda _ids: [_KB] + ) + monkeypatch.setattr(dialog_service.settings, "retriever", retriever, raising=False) + monkeypatch.setattr(dialog_service, "label_question", lambda _q, _kbs: "") + monkeypatch.setattr( + dialog_service, + "kb_prompt", + lambda _kbinfos, _max_tokens, **_kw: ["RAGFlow is a RAG engine."], + ) + + dialog = _make_dialog(chat_mdl) + messages = [{"role": "user", "content": "What is RAGFlow?"}] + + events = _collect(dialog_service.async_chat(dialog, messages, stream=True, quote=True)) + + final_events = [e for e in events if e.get("final") is True] + assert len(final_events) == 1 + assert llm_answer in final_events[0]["answer"] + assert len(_FakeLangfuseClient.instances) == 1 + assert _FakeLangfuseClient.instances[0].observation_kwargs is None + assert _FakeLangfuseClient.instances[0].observation.ended is False