Add SQL timing logs to workflow run cleanup

This commit is contained in:
hjlarry
2026-01-16 13:58:45 +08:00
parent 7aab4529e6
commit 65a9233559
2 changed files with 137 additions and 62 deletions

View File

@ -881,12 +881,25 @@ def clear_free_plan_tenant_expired_logs(days: int, batch: int, tenant_ids: list[
is_flag=True,
help="Preview cleanup results without deleting any workflow run data.",
)
@click.option(
"--log-sql",
is_flag=True,
help="Log SQL statements and timings for cleanup queries.",
)
@click.option(
"--log-sql-min-ms",
default=0,
show_default=True,
help="Only log SQL statements slower than N milliseconds (0 logs all).",
)
def clean_workflow_runs(
days: int,
batch_size: int,
start_from: datetime.datetime | None,
end_before: datetime.datetime | None,
dry_run: bool,
log_sql: bool,
log_sql_min_ms: int,
):
"""
Clean workflow runs and related workflow data for free tenants.
@ -903,6 +916,8 @@ def clean_workflow_runs(
start_from=start_from,
end_before=end_before,
dry_run=dry_run,
log_sql=log_sql,
log_sql_min_ms=log_sql_min_ms,
).run()
end_time = datetime.datetime.now(datetime.UTC)

View File

@ -1,8 +1,11 @@
import datetime
import logging
import time
from collections.abc import Iterable, Sequence
from contextlib import contextmanager
import click
from sqlalchemy import event
from sqlalchemy.orm import Session, sessionmaker
from configs import dify_config
@ -28,6 +31,8 @@ class WorkflowRunCleanup:
end_before: datetime.datetime | None = None,
workflow_run_repo: APIWorkflowRunRepository | None = None,
dry_run: bool = False,
log_sql: bool = False,
log_sql_min_ms: int = 0,
):
if (start_from is None) ^ (end_before is None):
raise ValueError("start_from and end_before must be both set or both omitted.")
@ -45,6 +50,8 @@ class WorkflowRunCleanup:
self.batch_size = batch_size
self._cleanup_whitelist: set[str] | None = None
self.dry_run = dry_run
self.log_sql = log_sql
self.log_sql_min_ms = max(0, log_sql_min_ms)
self.free_plan_grace_period_days = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD
self.workflow_run_repo: APIWorkflowRunRepository
if workflow_run_repo:
@ -56,6 +63,38 @@ class WorkflowRunCleanup:
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
self.workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
@contextmanager
def _sql_logger(self):
if not self.log_sql:
yield
return
def _before_cursor_execute(conn, cursor, statement, parameters, context, executemany) -> None:
context._dify_sql_start_time = time.monotonic()
context._dify_sql_statement = statement
context._dify_sql_parameters = parameters
def _after_cursor_execute(conn, cursor, statement, parameters, context, executemany) -> None:
start = getattr(context, "_dify_sql_start_time", None)
if start is None:
return
elapsed_ms = (time.monotonic() - start) * 1000
if elapsed_ms < self.log_sql_min_ms:
return
logged_statement = getattr(context, "_dify_sql_statement", statement)
logged_parameters = getattr(context, "_dify_sql_parameters", parameters)
click.echo(f"[sql] {elapsed_ms:.1f} ms {logged_statement}")
if logged_parameters:
click.echo(f"[sql] params: {logged_parameters}")
event.listen(db.engine, "before_cursor_execute", _before_cursor_execute)
event.listen(db.engine, "after_cursor_execute", _after_cursor_execute)
try:
yield
finally:
event.remove(db.engine, "before_cursor_execute", _before_cursor_execute)
event.remove(db.engine, "after_cursor_execute", _after_cursor_execute)
def run(self) -> None:
click.echo(
click.style(
@ -74,74 +113,95 @@ class WorkflowRunCleanup:
batch_index = 0
last_seen: tuple[datetime.datetime, str] | None = None
while True:
run_rows = self.workflow_run_repo.get_runs_batch_by_time_range(
start_from=self.window_start,
end_before=self.window_end,
last_seen=last_seen,
batch_size=self.batch_size,
)
if not run_rows:
break
with self._sql_logger():
while True:
batch_start = time.monotonic()
run_rows = self.workflow_run_repo.get_runs_batch_by_time_range(
start_from=self.window_start,
end_before=self.window_end,
last_seen=last_seen,
batch_size=self.batch_size,
)
fetch_ms = (time.monotonic() - batch_start) * 1000
if not run_rows:
break
batch_index += 1
last_seen = (run_rows[-1].created_at, run_rows[-1].id)
tenant_ids = {row.tenant_id for row in run_rows}
free_tenants = self._filter_free_tenants(tenant_ids)
free_runs = [row for row in run_rows if row.tenant_id in free_tenants]
paid_or_skipped = len(run_rows) - len(free_runs)
batch_index += 1
last_seen = (run_rows[-1].created_at, run_rows[-1].id)
tenant_ids = {row.tenant_id for row in run_rows}
billing_start = time.monotonic()
free_tenants = self._filter_free_tenants(tenant_ids)
billing_ms = (time.monotonic() - billing_start) * 1000
free_runs = [row for row in run_rows if row.tenant_id in free_tenants]
paid_or_skipped = len(run_rows) - len(free_runs)
if not free_runs:
if self.log_sql:
click.echo(
click.style(
f"[batch #{batch_index}] fetch_ms={fetch_ms:.1f} billing_ms={billing_ms:.1f}",
fg="white",
)
)
if not free_runs:
skipped_message = (
f"[batch #{batch_index}] skipped (no sandbox runs in batch, "
f"{paid_or_skipped} paid/unknown)"
)
click.echo(
click.style(
skipped_message,
fg="yellow",
)
)
continue
total_runs_targeted += len(free_runs)
if self.dry_run:
count_start = time.monotonic()
batch_counts = self.workflow_run_repo.count_runs_with_related(
free_runs,
count_node_executions=self._count_node_executions,
count_trigger_logs=self._count_trigger_logs,
)
count_ms = (time.monotonic() - count_start) * 1000
if self.log_sql:
click.echo(click.style(f"[batch #{batch_index}] count_ms={count_ms:.1f}", fg="white"))
if related_totals is not None:
for key in related_totals:
related_totals[key] += batch_counts.get(key, 0)
sample_ids = ", ".join(run.id for run in free_runs[:5])
click.echo(
click.style(
f"[batch #{batch_index}] would delete {len(free_runs)} runs "
f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown",
fg="yellow",
)
)
continue
try:
counts = self.workflow_run_repo.delete_runs_with_related(
free_runs,
delete_node_executions=self._delete_node_executions,
delete_trigger_logs=self._delete_trigger_logs,
)
except Exception:
logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0])
raise
total_runs_deleted += counts["runs"]
click.echo(
click.style(
f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)",
fg="yellow",
f"[batch #{batch_index}] deleted runs: {counts['runs']} "
f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, "
f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, "
f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); "
f"skipped {paid_or_skipped} paid/unknown",
fg="green",
)
)
continue
total_runs_targeted += len(free_runs)
if self.dry_run:
batch_counts = self.workflow_run_repo.count_runs_with_related(
free_runs,
count_node_executions=self._count_node_executions,
count_trigger_logs=self._count_trigger_logs,
)
if related_totals is not None:
for key in related_totals:
related_totals[key] += batch_counts.get(key, 0)
sample_ids = ", ".join(run.id for run in free_runs[:5])
click.echo(
click.style(
f"[batch #{batch_index}] would delete {len(free_runs)} runs "
f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown",
fg="yellow",
)
)
continue
try:
counts = self.workflow_run_repo.delete_runs_with_related(
free_runs,
delete_node_executions=self._delete_node_executions,
delete_trigger_logs=self._delete_trigger_logs,
)
except Exception:
logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0])
raise
total_runs_deleted += counts["runs"]
click.echo(
click.style(
f"[batch #{batch_index}] deleted runs: {counts['runs']} "
f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, "
f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, "
f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); "
f"skipped {paid_or_skipped} paid/unknown",
fg="green",
)
)
if self.dry_run:
if self.window_start: