diff --git a/api/services/clear_free_plan_tenant_expired_logs.py b/api/services/clear_free_plan_tenant_expired_logs.py index dcc93b4b0f..3fb340d3a7 100644 --- a/api/services/clear_free_plan_tenant_expired_logs.py +++ b/api/services/clear_free_plan_tenant_expired_logs.py @@ -35,6 +35,21 @@ logger = logging.getLogger(__name__) class ClearFreePlanTenantExpiredLogs: + @staticmethod + def _serialize_record(record: object) -> dict[str, object]: + if hasattr(record, "to_dict"): + return record.to_dict() # type: ignore[no-any-return] + + table = getattr(record, "__table__", None) + columns = getattr(table, "columns", None) + if columns is None: + raise TypeError(f"Unsupported record type for serialization: {type(record)!r}") + + record_dict: dict[str, object] = {} + for column in columns: + record_dict[column.name] = getattr(record, column.name) + return record_dict + @classmethod def _clear_message_related_tables(cls, session: Session, tenant_id: str, batch_message_ids: list[str]): """ @@ -77,14 +92,7 @@ class ClearFreePlanTenantExpiredLogs: record_data = [] for record in records: try: - if hasattr(record, "to_dict"): - record_data.append(record.to_dict()) - else: - # if record doesn't have to_dict method, we need to transform it to dict manually - record_dict = {} - for column in record.__table__.columns: - record_dict[column.name] = getattr(record, column.name) - record_data.append(record_dict) + record_data.append(cls._serialize_record(record)) except Exception: logger.exception("Failed to transform %s record: %s", table_name, record.id) continue @@ -222,7 +230,12 @@ class ClearFreePlanTenantExpiredLogs: f"{tenant_id}/workflow_node_executions/{datetime.datetime.now().strftime('%Y-%m-%d')}" f"-{time.time()}.json", json.dumps( - jsonable_encoder(workflow_node_executions), + jsonable_encoder( + [ + cls._serialize_record(workflow_node_execution) + for workflow_node_execution in workflow_node_executions + ] + ), ).encode("utf-8"), ) diff --git a/api/tests/unit_tests/services/test_clear_free_plan_tenant_expired_logs.py b/api/tests/unit_tests/services/test_clear_free_plan_tenant_expired_logs.py index 1bbd214110..6c54e9c572 100644 --- a/api/tests/unit_tests/services/test_clear_free_plan_tenant_expired_logs.py +++ b/api/tests/unit_tests/services/test_clear_free_plan_tenant_expired_logs.py @@ -304,8 +304,11 @@ def test_process_tenant_processes_all_batches(monkeypatch: pytest.MonkeyPatch) - monkeypatch.setattr(service_module, "select", fake_select) # Repositories for workflow node executions and workflow runs + node_execution = SimpleNamespace(id="ne-1") + node_execution.__table__ = SimpleNamespace(columns=[SimpleNamespace(name="id")]) + node_repo = MagicMock() - node_repo.get_expired_executions_batch.side_effect = [[SimpleNamespace(id="ne-1")], []] + node_repo.get_expired_executions_batch.side_effect = [[node_execution], []] node_repo.delete_executions_by_ids.return_value = 1 run_repo = MagicMock() @@ -329,6 +332,21 @@ def test_process_tenant_processes_all_batches(monkeypatch: pytest.MonkeyPatch) - clear_related.assert_called() +def test_serialize_record_falls_back_to_table_columns() -> None: + record = SimpleNamespace(id="ne-1", node_id="node-1") + record.__table__ = SimpleNamespace( + columns=[ + SimpleNamespace(name="id"), + SimpleNamespace(name="node_id"), + ] + ) + + assert ClearFreePlanTenantExpiredLogs._serialize_record(record) == { + "id": "ne-1", + "node_id": "node-1", + } + + def test_process_with_tenant_ids_filters_by_plan_and_logs_errors(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr(service_module, "db", SimpleNamespace(engine=object())) @@ -533,9 +551,14 @@ def test_process_tenant_repo_loops_break_on_empty_second_batch(monkeypatch: pyte monkeypatch.setattr(service_module, "select", fake_select) # Repos: first returns exactly batch items -> no "< batch" break, second returns [] -> hit the len==0 break. + node_execution_1 = SimpleNamespace(id="ne-1") + node_execution_1.__table__ = SimpleNamespace(columns=[SimpleNamespace(name="id")]) + node_execution_2 = SimpleNamespace(id="ne-2") + node_execution_2.__table__ = SimpleNamespace(columns=[SimpleNamespace(name="id")]) + node_repo = MagicMock() node_repo.get_expired_executions_batch.side_effect = [ - [SimpleNamespace(id="ne-1"), SimpleNamespace(id="ne-2")], + [node_execution_1, node_execution_2], [], ] node_repo.delete_executions_by_ids.return_value = 2