test: migrate retention delete archived workflow run tests to testcon… (#34020)

This commit is contained in:
Desel72
2026-03-24 08:52:10 -05:00
committed by GitHub
parent 2a35f8a625
commit 4f87625df5
2 changed files with 70 additions and 216 deletions

View File

@ -141,3 +141,73 @@ class TestArchivedWorkflowRunDeletion:
db_session_with_containers.expunge_all()
deleted_run = db_session_with_containers.get(WorkflowRun, run_id)
assert deleted_run is None
def test_delete_run_dry_run(self, db_session_with_containers):
"""Dry run should return success without actually deleting."""
tenant_id = str(uuid4())
run = self._create_workflow_run(
db_session_with_containers,
tenant_id=tenant_id,
created_at=datetime.now(UTC),
)
run_id = run.id
deleter = ArchivedWorkflowRunDeletion(dry_run=True)
result = deleter._delete_run(run)
assert result.success is True
assert result.run_id == run_id
# Run should still exist because it's a dry run
db_session_with_containers.expire_all()
assert db_session_with_containers.get(WorkflowRun, run_id) is not None
def test_delete_run_exception_returns_error(self, db_session_with_containers):
"""Exception during deletion should return failure result."""
from unittest.mock import MagicMock, patch
tenant_id = str(uuid4())
run = self._create_workflow_run(
db_session_with_containers,
tenant_id=tenant_id,
created_at=datetime.now(UTC),
)
deleter = ArchivedWorkflowRunDeletion(dry_run=False)
with patch.object(deleter, "_get_workflow_run_repo") as mock_get_repo:
mock_repo = MagicMock()
mock_get_repo.return_value = mock_repo
mock_repo.delete_runs_with_related.side_effect = Exception("Database error")
result = deleter._delete_run(run)
assert result.success is False
assert result.error == "Database error"
def test_delete_by_run_id_success(self, db_session_with_containers):
"""Successfully delete an archived workflow run by ID."""
tenant_id = str(uuid4())
base_time = datetime.now(UTC)
run = self._create_workflow_run(
db_session_with_containers,
tenant_id=tenant_id,
created_at=base_time,
)
self._create_archive_log(db_session_with_containers, run=run)
run_id = run.id
deleter = ArchivedWorkflowRunDeletion()
result = deleter.delete_by_run_id(run_id)
assert result.success is True
db_session_with_containers.expunge_all()
assert db_session_with_containers.get(WorkflowRun, run_id) is None
def test_get_workflow_run_repo_caches_instance(self, db_session_with_containers):
"""_get_workflow_run_repo should return a cached repo on subsequent calls."""
deleter = ArchivedWorkflowRunDeletion()
repo1 = deleter._get_workflow_run_repo()
repo2 = deleter._get_workflow_run_repo()
assert repo1 is repo2
assert deleter.workflow_run_repo is repo1