chore: add random sleep for workflow cleanup

This commit is contained in:
hj24
2026-02-06 13:58:49 +08:00
parent b4414901d1
commit d1cdc85a2e

View File

@ -1,5 +1,8 @@
import datetime
import logging
import os
import random
import time
from collections.abc import Iterable, Sequence
import click
@ -72,7 +75,12 @@ class WorkflowRunCleanup:
batch_index = 0
last_seen: tuple[datetime.datetime, str] | None = None
max_batch_interval_ms = int(os.environ.get("SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", 200))
while True:
batch_start = time.monotonic()
fetch_start = time.monotonic()
run_rows = self.workflow_run_repo.get_runs_batch_by_time_range(
start_from=self.window_start,
end_before=self.window_end,
@ -80,12 +88,30 @@ class WorkflowRunCleanup:
batch_size=self.batch_size,
)
if not run_rows:
logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1)
break
batch_index += 1
last_seen = (run_rows[-1].created_at, run_rows[-1].id)
logger.info(
"workflow_run_cleanup (batch #%s): fetched %s rows in %sms",
batch_index,
len(run_rows),
int((time.monotonic() - fetch_start) * 1000),
)
tenant_ids = {row.tenant_id for row in run_rows}
filter_start = time.monotonic()
free_tenants = self._filter_free_tenants(tenant_ids)
logger.info(
"workflow_run_cleanup (batch #%s): filtered %s free tenants from %s tenants in %sms",
batch_index,
len(free_tenants),
len(tenant_ids),
int((time.monotonic() - filter_start) * 1000),
)
free_runs = [row for row in run_rows if row.tenant_id in free_tenants]
paid_or_skipped = len(run_rows) - len(free_runs)
@ -104,11 +130,17 @@ class WorkflowRunCleanup:
total_runs_targeted += len(free_runs)
if self.dry_run:
count_start = time.monotonic()
batch_counts = self.workflow_run_repo.count_runs_with_related(
free_runs,
count_node_executions=self._count_node_executions,
count_trigger_logs=self._count_trigger_logs,
)
logger.info(
"workflow_run_cleanup (batch #%s, dry_run): counted related records in %sms",
batch_index,
int((time.monotonic() - count_start) * 1000),
)
if related_totals is not None:
for key in related_totals:
related_totals[key] += batch_counts.get(key, 0)
@ -120,14 +152,21 @@ class WorkflowRunCleanup:
fg="yellow",
)
)
logger.info(
"workflow_run_cleanup (batch #%s, dry_run): batch total %sms",
batch_index,
int((time.monotonic() - batch_start) * 1000),
)
continue
try:
delete_start = time.monotonic()
counts = self.workflow_run_repo.delete_runs_with_related(
free_runs,
delete_node_executions=self._delete_node_executions,
delete_trigger_logs=self._delete_trigger_logs,
)
delete_ms = int((time.monotonic() - delete_start) * 1000)
except Exception:
logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0])
raise
@ -143,6 +182,17 @@ class WorkflowRunCleanup:
fg="green",
)
)
logger.info(
"workflow_run_cleanup (batch #%s): delete %sms, batch total %sms",
batch_index,
delete_ms,
int((time.monotonic() - batch_start) * 1000),
)
# Random sleep between batches to avoid overwhelming the database
sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311
logger.info("workflow_run_cleanup (batch #%s): sleeping for %.2fms", batch_index, sleep_ms)
time.sleep(sleep_ms / 1000)
if self.dry_run:
if self.window_start: