fix: fix delete logs failed (#36151)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
wangxiaolei
2026-05-14 16:02:31 +08:00
committed by GitHub
parent a9bcec013f
commit d9ccfcbc6e
2 changed files with 47 additions and 11 deletions

View File

@ -35,6 +35,21 @@ logger = logging.getLogger(__name__)
class ClearFreePlanTenantExpiredLogs:
@staticmethod
def _serialize_record(record: object) -> dict[str, object]:
if hasattr(record, "to_dict"):
return record.to_dict() # type: ignore[no-any-return]
table = getattr(record, "__table__", None)
columns = getattr(table, "columns", None)
if columns is None:
raise TypeError(f"Unsupported record type for serialization: {type(record)!r}")
record_dict: dict[str, object] = {}
for column in columns:
record_dict[column.name] = getattr(record, column.name)
return record_dict
@classmethod
def _clear_message_related_tables(cls, session: Session, tenant_id: str, batch_message_ids: list[str]):
"""
@ -77,14 +92,7 @@ class ClearFreePlanTenantExpiredLogs:
record_data = []
for record in records:
try:
if hasattr(record, "to_dict"):
record_data.append(record.to_dict())
else:
# if record doesn't have to_dict method, we need to transform it to dict manually
record_dict = {}
for column in record.__table__.columns:
record_dict[column.name] = getattr(record, column.name)
record_data.append(record_dict)
record_data.append(cls._serialize_record(record))
except Exception:
logger.exception("Failed to transform %s record: %s", table_name, record.id)
continue
@ -222,7 +230,12 @@ class ClearFreePlanTenantExpiredLogs:
f"{tenant_id}/workflow_node_executions/{datetime.datetime.now().strftime('%Y-%m-%d')}"
f"-{time.time()}.json",
json.dumps(
jsonable_encoder(workflow_node_executions),
jsonable_encoder(
[
cls._serialize_record(workflow_node_execution)
for workflow_node_execution in workflow_node_executions
]
),
).encode("utf-8"),
)

View File

@ -304,8 +304,11 @@ def test_process_tenant_processes_all_batches(monkeypatch: pytest.MonkeyPatch) -
monkeypatch.setattr(service_module, "select", fake_select)
# Repositories for workflow node executions and workflow runs
node_execution = SimpleNamespace(id="ne-1")
node_execution.__table__ = SimpleNamespace(columns=[SimpleNamespace(name="id")])
node_repo = MagicMock()
node_repo.get_expired_executions_batch.side_effect = [[SimpleNamespace(id="ne-1")], []]
node_repo.get_expired_executions_batch.side_effect = [[node_execution], []]
node_repo.delete_executions_by_ids.return_value = 1
run_repo = MagicMock()
@ -329,6 +332,21 @@ def test_process_tenant_processes_all_batches(monkeypatch: pytest.MonkeyPatch) -
clear_related.assert_called()
def test_serialize_record_falls_back_to_table_columns() -> None:
record = SimpleNamespace(id="ne-1", node_id="node-1")
record.__table__ = SimpleNamespace(
columns=[
SimpleNamespace(name="id"),
SimpleNamespace(name="node_id"),
]
)
assert ClearFreePlanTenantExpiredLogs._serialize_record(record) == {
"id": "ne-1",
"node_id": "node-1",
}
def test_process_with_tenant_ids_filters_by_plan_and_logs_errors(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(service_module, "db", SimpleNamespace(engine=object()))
@ -533,9 +551,14 @@ def test_process_tenant_repo_loops_break_on_empty_second_batch(monkeypatch: pyte
monkeypatch.setattr(service_module, "select", fake_select)
# Repos: first returns exactly batch items -> no "< batch" break, second returns [] -> hit the len==0 break.
node_execution_1 = SimpleNamespace(id="ne-1")
node_execution_1.__table__ = SimpleNamespace(columns=[SimpleNamespace(name="id")])
node_execution_2 = SimpleNamespace(id="ne-2")
node_execution_2.__table__ = SimpleNamespace(columns=[SimpleNamespace(name="id")])
node_repo = MagicMock()
node_repo.get_expired_executions_batch.side_effect = [
[SimpleNamespace(id="ne-1"), SimpleNamespace(id="ne-2")],
[node_execution_1, node_execution_2],
[],
]
node_repo.delete_executions_by_ids.return_value = 2