From 5da4160cb8bd2e0dcc4b735bdda5bc34f64f062e Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Tue, 26 May 2026 15:26:05 +0800 Subject: [PATCH] feat(api): preserve api exception trackback in event --- api/services/legacy_model_type_migration.py | 39 +++++-- .../test_legacy_model_type_migration.py | 100 ++++++++++++++++++ 2 files changed, 133 insertions(+), 6 deletions(-) diff --git a/api/services/legacy_model_type_migration.py b/api/services/legacy_model_type_migration.py index 79c755c9ab..8a0b98a270 100644 --- a/api/services/legacy_model_type_migration.py +++ b/api/services/legacy_model_type_migration.py @@ -19,6 +19,7 @@ import io import json import sys import threading +import traceback import uuid from collections.abc import Iterable, Iterator, Sequence from concurrent.futures import ThreadPoolExecutor, as_completed @@ -95,6 +96,10 @@ def _normalize_log_payload(value: object) -> object: return f"<{type(value).__module__}.{type(value).__qualname__}>" +def _format_exception_stacktrace(exc: BaseException) -> str: + return "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)) + + @dataclass(frozen=True, slots=True) class _RowWithRawModelType[T: TypeBase]: row: T @@ -1584,10 +1589,11 @@ class Migration: ).delete() self._log_event("cache_deleted", "Deleted related cache entry.", attrs) except Exception as exc: - self._log_event( + self._log_exception_event( "cache_delete_failed", "Failed to delete related cache entry.", - {**attrs, "error": str(exc)}, + attrs, + exc, ) def _process_load_balancing_model_config_row( @@ -2150,11 +2156,15 @@ class Migration: "table_name": table_name, "id": row_id, "tx_id": tx_id, - "error": str(exc), } if business_key is not None: attrs["business_key"] = self._business_key_to_dict(business_key) - self._log_event("lock_timeout_skipped", "Skipped transaction because row lock timed out.", attrs) + self._log_exception_event( + "lock_timeout_skipped", + "Skipped transaction because row lock timed out.", + attrs, + exc, + ) def _business_key_to_dict(self, business_key: _BusinessKey) -> dict[str, object]: return cast(dict[str, object], asdict(business_key)) @@ -2260,7 +2270,7 @@ class Migration: }, ) except Exception as exc: - self._log_event( + self._log_exception_event( "cache_delete_failed", "Failed to delete related cache entry.", { @@ -2271,8 +2281,8 @@ class Migration: "cache_type": cache_plan.cache_type.value, "tx_id": cache_plan.tx_id, "business_key": self._business_key_to_dict(cache_plan.business_key), - "error": str(exc), }, + exc, ) else: self._log_event( @@ -2289,6 +2299,23 @@ class Migration: }, ) + def _log_exception_event( + self, + event: str, + message: str, + attrs: dict[str, object], + exc: BaseException, + ) -> None: + self._log_event( + event, + message, + { + **attrs, + "error": str(exc), + "stacktrace": _format_exception_stacktrace(exc), + }, + ) + def _log_event(self, event: str, message: str, attrs: dict[str, object]) -> None: record = { "event": event, diff --git a/api/tests/unit_tests/commands/test_legacy_model_type_migration.py b/api/tests/unit_tests/commands/test_legacy_model_type_migration.py index d4c6e863d1..80b3e96b87 100644 --- a/api/tests/unit_tests/commands/test_legacy_model_type_migration.py +++ b/api/tests/unit_tests/commands/test_legacy_model_type_migration.py @@ -964,6 +964,71 @@ def test_is_lock_timeout_error_prefers_structured_backend_codes( assert migration._is_lock_timeout_error(exc) is expected +def test_process_load_balancing_model_config_row_logs_stacktrace_for_lock_timeout( + migration_module, + sqlite_engine: sa.Engine, + monkeypatch: pytest.MonkeyPatch, +) -> None: + output = io.StringIO() + migration = migration_module.Migration( + tenant_id="tenant-1", + engine=sqlite_engine, + apply=True, + output=output, + model_types=(ModelType.LLM,), + orm_models=(migration_module.LoadBalancingModelConfig,), + ) + candidate = migration_module._RowWithRawModelType( + row=SimpleNamespace(id="lb-row-1"), + raw_model_type="text-generation", + canonical_model_type=ModelType.LLM, + ) + lock_timeout_exc = OperationalError("SELECT 1", {}, SimpleNamespace(pgcode="55P03")) + + class _FakeBeginContext: + def __enter__(self) -> None: + return None + + def __exit__(self, exc_type, exc, tb) -> bool: + return False + + class _FakeSession: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb) -> bool: + return False + + def begin(self) -> _FakeBeginContext: + return _FakeBeginContext() + + def _fake_session_factory(engine: sa.Engine) -> _FakeSession: + return _FakeSession() + + def _fake_reload(self, session, original_candidate, *, lock_rows: bool): + raise lock_timeout_exc + + monkeypatch.setattr(migration_module, "_session_factory", _fake_session_factory) + monkeypatch.setattr(migration_module.Migration, "_configure_lock_timeout", lambda self, session: None) + monkeypatch.setattr( + migration_module.Migration, + "_reload_load_balancing_model_config_candidate", + _fake_reload, + ) + + migration._process_load_balancing_model_config_row(candidate) + + lines = _parse_json_lines(output) + assert len(lines) == 1 + assert lines[0]["event"] == "lock_timeout_skipped" + attrs = cast(dict[str, object], lines[0]["attrs"]) + assert attrs["table_name"] == "load_balancing_model_configs" + assert attrs["id"] == "lb-row-1" + assert attrs["error"] == str(lock_timeout_exc) + assert isinstance(attrs["stacktrace"], str) + assert "OperationalError" in attrs["stacktrace"] + + def test_process_load_balancing_model_config_row_logs_update_after_sql_execution( migration_module, sqlite_engine: sa.Engine, @@ -1046,6 +1111,41 @@ def test_process_load_balancing_model_config_row_logs_update_after_sql_execution ] +def test_load_balancing_model_config_cache_delete_failure_logs_stacktrace( + migration_module, + sqlite_engine: sa.Engine, + dirty_fixture, + monkeypatch: pytest.MonkeyPatch, +) -> None: + def _raise_delete_failure(self) -> None: + raise RuntimeError("cache delete boom") + + monkeypatch.setattr(migration_module.ProviderCredentialsCache, "delete", _raise_delete_failure) + + output = io.StringIO() + migration_module.LegacyModelTypeMigrationService( + engine=sqlite_engine, + apply=True, + output=output, + tables=("load_balancing_model_configs",), + model_types=(ModelType.LLM,), + tenant_ids=(dirty_fixture.primary.tenant_id,), + ).migrate() + + failed_events = [ + cast(dict[str, object], line["attrs"]) + for line in _parse_json_lines(output) + if line.get("event") == "cache_delete_failed" + and isinstance(line.get("attrs"), dict) + and cast(dict[str, object], line["attrs"]).get("table_name") == "load_balancing_model_configs" + ] + + assert len(failed_events) == 1 + assert failed_events[0]["error"] == "cache delete boom" + assert isinstance(failed_events[0]["stacktrace"], str) + assert "RuntimeError: cache delete boom" in cast(str, failed_events[0]["stacktrace"]) + + def test_group_completed_logs_exist_for_all_grouped_tables_and_use_canonical_model_type( migration_module, sqlite_engine: sa.Engine,