Compare commits

...

9 Commits

Author SHA1 Message Date
232b8eb248 fix CI 2026-03-09 15:41:54 +08:00
4c0d81029f fix CI 2026-03-09 15:29:21 +08:00
bacf70c00a [autofix.ci] apply automated fixes 2026-03-09 06:46:39 +00:00
942729ba48 Update api/extensions/otel/runtime.py
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2026-03-09 14:44:35 +08:00
4f5af0b43c Update api/extensions/otel/runtime.py
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2026-03-09 14:44:27 +08:00
a28cb993b8 Merge remote-tracking branch 'myori/main' into p372 2026-03-09 14:43:03 +08:00
c6dd2ef25a add task label to the cleanup task 2026-03-09 14:30:04 +08:00
b0e8becd14 add clean message metrics 2026-03-09 13:45:08 +08:00
f90e0d781a feat: add metrics to clean workflow run task 2026-03-09 09:48:24 +08:00
7 changed files with 684 additions and 142 deletions

View File

@ -937,6 +937,12 @@ def clear_free_plan_tenant_expired_logs(days: int, batch: int, tenant_ids: list[
is_flag=True, is_flag=True,
help="Preview cleanup results without deleting any workflow run data.", help="Preview cleanup results without deleting any workflow run data.",
) )
@click.option(
"--task-label",
default="daily",
show_default=True,
help="Stable label value used to distinguish multiple cleanup CronJobs in metrics.",
)
def clean_workflow_runs( def clean_workflow_runs(
before_days: int, before_days: int,
batch_size: int, batch_size: int,
@ -945,10 +951,13 @@ def clean_workflow_runs(
start_from: datetime.datetime | None, start_from: datetime.datetime | None,
end_before: datetime.datetime | None, end_before: datetime.datetime | None,
dry_run: bool, dry_run: bool,
task_label: str,
): ):
""" """
Clean workflow runs and related workflow data for free tenants. Clean workflow runs and related workflow data for free tenants.
""" """
from extensions.otel.runtime import flush_telemetry
if (start_from is None) ^ (end_before is None): if (start_from is None) ^ (end_before is None):
raise click.UsageError("--start-from and --end-before must be provided together.") raise click.UsageError("--start-from and --end-before must be provided together.")
@ -968,13 +977,17 @@ def clean_workflow_runs(
start_time = datetime.datetime.now(datetime.UTC) start_time = datetime.datetime.now(datetime.UTC)
click.echo(click.style(f"Starting workflow run cleanup at {start_time.isoformat()}.", fg="white")) click.echo(click.style(f"Starting workflow run cleanup at {start_time.isoformat()}.", fg="white"))
WorkflowRunCleanup( try:
days=before_days, WorkflowRunCleanup(
batch_size=batch_size, days=before_days,
start_from=start_from, batch_size=batch_size,
end_before=end_before, start_from=start_from,
dry_run=dry_run, end_before=end_before,
).run() dry_run=dry_run,
task_label=task_label,
).run()
finally:
flush_telemetry()
end_time = datetime.datetime.now(datetime.UTC) end_time = datetime.datetime.now(datetime.UTC)
elapsed = end_time - start_time elapsed = end_time - start_time
@ -2630,6 +2643,12 @@ def migrate_oss(
help="Graceful period in days after subscription expiration, will be ignored when billing is disabled.", help="Graceful period in days after subscription expiration, will be ignored when billing is disabled.",
) )
@click.option("--dry-run", is_flag=True, default=False, help="Show messages logs would be cleaned without deleting") @click.option("--dry-run", is_flag=True, default=False, help="Show messages logs would be cleaned without deleting")
@click.option(
"--task-label",
default="daily",
show_default=True,
help="Stable label value used to distinguish multiple cleanup CronJobs in metrics.",
)
def clean_expired_messages( def clean_expired_messages(
batch_size: int, batch_size: int,
graceful_period: int, graceful_period: int,
@ -2638,10 +2657,13 @@ def clean_expired_messages(
from_days_ago: int | None, from_days_ago: int | None,
before_days: int | None, before_days: int | None,
dry_run: bool, dry_run: bool,
task_label: str,
): ):
""" """
Clean expired messages and related data for tenants based on clean policy. Clean expired messages and related data for tenants based on clean policy.
""" """
from extensions.otel.runtime import flush_telemetry
click.echo(click.style("clean_messages: start clean messages.", fg="green")) click.echo(click.style("clean_messages: start clean messages.", fg="green"))
start_at = time.perf_counter() start_at = time.perf_counter()
@ -2691,6 +2713,7 @@ def clean_expired_messages(
end_before=end_before, end_before=end_before,
batch_size=batch_size, batch_size=batch_size,
dry_run=dry_run, dry_run=dry_run,
task_label=task_label,
) )
elif from_days_ago is None: elif from_days_ago is None:
assert before_days is not None assert before_days is not None
@ -2699,6 +2722,7 @@ def clean_expired_messages(
days=before_days, days=before_days,
batch_size=batch_size, batch_size=batch_size,
dry_run=dry_run, dry_run=dry_run,
task_label=task_label,
) )
else: else:
assert before_days is not None assert before_days is not None
@ -2710,6 +2734,7 @@ def clean_expired_messages(
end_before=now - datetime.timedelta(days=before_days), end_before=now - datetime.timedelta(days=before_days),
batch_size=batch_size, batch_size=batch_size,
dry_run=dry_run, dry_run=dry_run,
task_label=task_label,
) )
stats = service.run() stats = service.run()
@ -2735,6 +2760,8 @@ def clean_expired_messages(
) )
) )
raise raise
finally:
flush_telemetry()
click.echo(click.style("messages cleanup completed.", fg="green")) click.echo(click.style("messages cleanup completed.", fg="green"))

View File

@ -5,7 +5,7 @@ from typing import Union
from celery.signals import worker_init from celery.signals import worker_init
from flask_login import user_loaded_from_request, user_logged_in from flask_login import user_loaded_from_request, user_logged_in
from opentelemetry import trace from opentelemetry import metrics, trace
from opentelemetry.propagate import set_global_textmap from opentelemetry.propagate import set_global_textmap
from opentelemetry.propagators.b3 import B3Format from opentelemetry.propagators.b3 import B3Format
from opentelemetry.propagators.composite import CompositePropagator from opentelemetry.propagators.composite import CompositePropagator
@ -31,9 +31,29 @@ def setup_context_propagation() -> None:
def shutdown_tracer() -> None: def shutdown_tracer() -> None:
flush_telemetry()
def flush_telemetry() -> None:
"""
Best-effort flush for telemetry providers.
This is mainly used by short-lived command processes (e.g. Kubernetes CronJob)
so counters/histograms are exported before the process exits.
"""
provider = trace.get_tracer_provider() provider = trace.get_tracer_provider()
if hasattr(provider, "force_flush"): if hasattr(provider, "force_flush"):
provider.force_flush() try:
provider.force_flush()
except Exception:
logger.exception("otel: failed to flush trace provider")
metric_provider = metrics.get_meter_provider()
if hasattr(metric_provider, "force_flush"):
try:
metric_provider.force_flush()
except Exception:
logger.exception("otel: failed to flush metric provider")
def is_celery_worker(): def is_celery_worker():

View File

@ -1,16 +1,16 @@
import datetime import datetime
import logging import logging
import os
import random import random
import time import time
from collections.abc import Sequence from collections.abc import Sequence
from typing import cast from typing import TYPE_CHECKING, cast
import sqlalchemy as sa import sqlalchemy as sa
from sqlalchemy import delete, select, tuple_ from sqlalchemy import delete, select, tuple_
from sqlalchemy.engine import CursorResult from sqlalchemy.engine import CursorResult
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from configs import dify_config
from extensions.ext_database import db from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now from libs.datetime_utils import naive_utc_now
from models.model import ( from models.model import (
@ -33,6 +33,128 @@ from services.retention.conversation.messages_clean_policy import (
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from opentelemetry.metrics import Counter, Histogram
class MessagesCleanupMetrics:
"""
Records low-cardinality OpenTelemetry metrics for expired message cleanup jobs.
We keep labels stable (dry_run/window_mode/task_label/status) so these metrics remain
dashboard-friendly for long-running CronJob executions.
"""
_job_runs_total: "Counter | None"
_batches_total: "Counter | None"
_messages_scanned_total: "Counter | None"
_messages_filtered_total: "Counter | None"
_messages_deleted_total: "Counter | None"
_job_duration_seconds: "Histogram | None"
_batch_duration_seconds: "Histogram | None"
_base_attributes: dict[str, str]
def __init__(self, *, dry_run: bool, has_window: bool, task_label: str) -> None:
self._job_runs_total = None
self._batches_total = None
self._messages_scanned_total = None
self._messages_filtered_total = None
self._messages_deleted_total = None
self._job_duration_seconds = None
self._batch_duration_seconds = None
self._base_attributes = {
"job_name": "messages_cleanup",
"dry_run": str(dry_run).lower(),
"window_mode": "between" if has_window else "before_cutoff",
"task_label": task_label,
}
self._init_instruments()
def _init_instruments(self) -> None:
try:
from opentelemetry.metrics import get_meter
meter = get_meter("messages_cleanup", version=dify_config.project.version)
self._job_runs_total = meter.create_counter(
"messages_cleanup_jobs_total",
description="Total number of expired message cleanup jobs by status.",
unit="{job}",
)
self._batches_total = meter.create_counter(
"messages_cleanup_batches_total",
description="Total number of message cleanup batches processed.",
unit="{batch}",
)
self._messages_scanned_total = meter.create_counter(
"messages_cleanup_scanned_messages_total",
description="Total messages scanned by cleanup jobs.",
unit="{message}",
)
self._messages_filtered_total = meter.create_counter(
"messages_cleanup_filtered_messages_total",
description="Total messages selected by cleanup policy.",
unit="{message}",
)
self._messages_deleted_total = meter.create_counter(
"messages_cleanup_deleted_messages_total",
description="Total messages deleted by cleanup jobs.",
unit="{message}",
)
self._job_duration_seconds = meter.create_histogram(
"messages_cleanup_job_duration_seconds",
description="Duration of expired message cleanup jobs in seconds.",
unit="s",
)
self._batch_duration_seconds = meter.create_histogram(
"messages_cleanup_batch_duration_seconds",
description="Duration of expired message cleanup batch processing in seconds.",
unit="s",
)
except Exception:
logger.exception("messages_cleanup_metrics: failed to initialize instruments")
def _attrs(self, **extra: str) -> dict[str, str]:
return {**self._base_attributes, **extra}
@staticmethod
def _add(counter: "Counter | None", value: int, attributes: dict[str, str]) -> None:
if not counter or value <= 0:
return
try:
counter.add(value, attributes)
except Exception:
logger.exception("messages_cleanup_metrics: failed to add counter value")
@staticmethod
def _record(histogram: "Histogram | None", value: float, attributes: dict[str, str]) -> None:
if not histogram:
return
try:
histogram.record(value, attributes)
except Exception:
logger.exception("messages_cleanup_metrics: failed to record histogram value")
def record_batch(
self,
*,
scanned_messages: int,
filtered_messages: int,
deleted_messages: int,
batch_duration_seconds: float,
) -> None:
attributes = self._attrs()
self._add(self._batches_total, 1, attributes)
self._add(self._messages_scanned_total, scanned_messages, attributes)
self._add(self._messages_filtered_total, filtered_messages, attributes)
self._add(self._messages_deleted_total, deleted_messages, attributes)
self._record(self._batch_duration_seconds, batch_duration_seconds, attributes)
def record_completion(self, *, status: str, job_duration_seconds: float) -> None:
attributes = self._attrs(status=status)
self._add(self._job_runs_total, 1, attributes)
self._record(self._job_duration_seconds, job_duration_seconds, attributes)
class MessagesCleanService: class MessagesCleanService:
""" """
Service for cleaning expired messages based on retention policies. Service for cleaning expired messages based on retention policies.
@ -48,6 +170,7 @@ class MessagesCleanService:
start_from: datetime.datetime | None = None, start_from: datetime.datetime | None = None,
batch_size: int = 1000, batch_size: int = 1000,
dry_run: bool = False, dry_run: bool = False,
task_label: str = "daily",
) -> None: ) -> None:
""" """
Initialize the service with cleanup parameters. Initialize the service with cleanup parameters.
@ -58,12 +181,20 @@ class MessagesCleanService:
start_from: Optional start time (inclusive) of the range start_from: Optional start time (inclusive) of the range
batch_size: Number of messages to process per batch batch_size: Number of messages to process per batch
dry_run: Whether to perform a dry run (no actual deletion) dry_run: Whether to perform a dry run (no actual deletion)
task_label: Stable task label to distinguish multiple cleanup CronJobs
""" """
self._policy = policy self._policy = policy
self._end_before = end_before self._end_before = end_before
self._start_from = start_from self._start_from = start_from
self._batch_size = batch_size self._batch_size = batch_size
self._dry_run = dry_run self._dry_run = dry_run
normalized_task_label = task_label.strip()
self._task_label = normalized_task_label or "daily"
self._metrics = MessagesCleanupMetrics(
dry_run=dry_run,
has_window=bool(start_from),
task_label=self._task_label,
)
@classmethod @classmethod
def from_time_range( def from_time_range(
@ -73,6 +204,7 @@ class MessagesCleanService:
end_before: datetime.datetime, end_before: datetime.datetime,
batch_size: int = 1000, batch_size: int = 1000,
dry_run: bool = False, dry_run: bool = False,
task_label: str = "daily",
) -> "MessagesCleanService": ) -> "MessagesCleanService":
""" """
Create a service instance for cleaning messages within a specific time range. Create a service instance for cleaning messages within a specific time range.
@ -85,6 +217,7 @@ class MessagesCleanService:
end_before: End time (exclusive) of the range end_before: End time (exclusive) of the range
batch_size: Number of messages to process per batch batch_size: Number of messages to process per batch
dry_run: Whether to perform a dry run (no actual deletion) dry_run: Whether to perform a dry run (no actual deletion)
task_label: Stable task label to distinguish multiple cleanup CronJobs
Returns: Returns:
MessagesCleanService instance MessagesCleanService instance
@ -112,6 +245,7 @@ class MessagesCleanService:
start_from=start_from, start_from=start_from,
batch_size=batch_size, batch_size=batch_size,
dry_run=dry_run, dry_run=dry_run,
task_label=task_label,
) )
@classmethod @classmethod
@ -121,6 +255,7 @@ class MessagesCleanService:
days: int = 30, days: int = 30,
batch_size: int = 1000, batch_size: int = 1000,
dry_run: bool = False, dry_run: bool = False,
task_label: str = "daily",
) -> "MessagesCleanService": ) -> "MessagesCleanService":
""" """
Create a service instance for cleaning messages older than specified days. Create a service instance for cleaning messages older than specified days.
@ -130,6 +265,7 @@ class MessagesCleanService:
days: Number of days to look back from now days: Number of days to look back from now
batch_size: Number of messages to process per batch batch_size: Number of messages to process per batch
dry_run: Whether to perform a dry run (no actual deletion) dry_run: Whether to perform a dry run (no actual deletion)
task_label: Stable task label to distinguish multiple cleanup CronJobs
Returns: Returns:
MessagesCleanService instance MessagesCleanService instance
@ -153,7 +289,14 @@ class MessagesCleanService:
policy.__class__.__name__, policy.__class__.__name__,
) )
return cls(policy=policy, end_before=end_before, start_from=None, batch_size=batch_size, dry_run=dry_run) return cls(
policy=policy,
end_before=end_before,
start_from=None,
batch_size=batch_size,
dry_run=dry_run,
task_label=task_label,
)
def run(self) -> dict[str, int]: def run(self) -> dict[str, int]:
""" """
@ -162,7 +305,18 @@ class MessagesCleanService:
Returns: Returns:
Dict with statistics: batches, filtered_messages, total_deleted Dict with statistics: batches, filtered_messages, total_deleted
""" """
return self._clean_messages_by_time_range() status = "success"
run_start = time.monotonic()
try:
return self._clean_messages_by_time_range()
except Exception:
status = "failed"
raise
finally:
self._metrics.record_completion(
status=status,
job_duration_seconds=time.monotonic() - run_start,
)
def _clean_messages_by_time_range(self) -> dict[str, int]: def _clean_messages_by_time_range(self) -> dict[str, int]:
""" """
@ -197,11 +351,14 @@ class MessagesCleanService:
self._end_before, self._end_before,
) )
max_batch_interval_ms = int(os.environ.get("SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", 200)) max_batch_interval_ms = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL
while True: while True:
stats["batches"] += 1 stats["batches"] += 1
batch_start = time.monotonic() batch_start = time.monotonic()
batch_scanned_messages = 0
batch_filtered_messages = 0
batch_deleted_messages = 0
# Step 1: Fetch a batch of messages using cursor # Step 1: Fetch a batch of messages using cursor
with Session(db.engine, expire_on_commit=False) as session: with Session(db.engine, expire_on_commit=False) as session:
@ -240,9 +397,16 @@ class MessagesCleanService:
# Track total messages fetched across all batches # Track total messages fetched across all batches
stats["total_messages"] += len(messages) stats["total_messages"] += len(messages)
batch_scanned_messages = len(messages)
if not messages: if not messages:
logger.info("clean_messages (batch %s): no more messages to process", stats["batches"]) logger.info("clean_messages (batch %s): no more messages to process", stats["batches"])
self._metrics.record_batch(
scanned_messages=batch_scanned_messages,
filtered_messages=batch_filtered_messages,
deleted_messages=batch_deleted_messages,
batch_duration_seconds=time.monotonic() - batch_start,
)
break break
# Update cursor to the last message's (created_at, id) # Update cursor to the last message's (created_at, id)
@ -268,6 +432,12 @@ class MessagesCleanService:
if not apps: if not apps:
logger.info("clean_messages (batch %s): no apps found, skip", stats["batches"]) logger.info("clean_messages (batch %s): no apps found, skip", stats["batches"])
self._metrics.record_batch(
scanned_messages=batch_scanned_messages,
filtered_messages=batch_filtered_messages,
deleted_messages=batch_deleted_messages,
batch_duration_seconds=time.monotonic() - batch_start,
)
continue continue
# Build app_id -> tenant_id mapping # Build app_id -> tenant_id mapping
@ -286,9 +456,16 @@ class MessagesCleanService:
if not message_ids_to_delete: if not message_ids_to_delete:
logger.info("clean_messages (batch %s): no messages to delete, skip", stats["batches"]) logger.info("clean_messages (batch %s): no messages to delete, skip", stats["batches"])
self._metrics.record_batch(
scanned_messages=batch_scanned_messages,
filtered_messages=batch_filtered_messages,
deleted_messages=batch_deleted_messages,
batch_duration_seconds=time.monotonic() - batch_start,
)
continue continue
stats["filtered_messages"] += len(message_ids_to_delete) stats["filtered_messages"] += len(message_ids_to_delete)
batch_filtered_messages = len(message_ids_to_delete)
# Step 4: Batch delete messages and their relations # Step 4: Batch delete messages and their relations
if not self._dry_run: if not self._dry_run:
@ -309,6 +486,7 @@ class MessagesCleanService:
commit_ms = int((time.monotonic() - commit_start) * 1000) commit_ms = int((time.monotonic() - commit_start) * 1000)
stats["total_deleted"] += messages_deleted stats["total_deleted"] += messages_deleted
batch_deleted_messages = messages_deleted
logger.info( logger.info(
"clean_messages (batch %s): processed %s messages, deleted %s messages", "clean_messages (batch %s): processed %s messages, deleted %s messages",
@ -343,6 +521,13 @@ class MessagesCleanService:
for msg_id in sampled_ids: for msg_id in sampled_ids:
logger.info("clean_messages (batch %s, dry_run) sample: message_id=%s", stats["batches"], msg_id) logger.info("clean_messages (batch %s, dry_run) sample: message_id=%s", stats["batches"], msg_id)
self._metrics.record_batch(
scanned_messages=batch_scanned_messages,
filtered_messages=batch_filtered_messages,
deleted_messages=batch_deleted_messages,
batch_duration_seconds=time.monotonic() - batch_start,
)
logger.info( logger.info(
"clean_messages completed: total batches: %s, total messages: %s, filtered messages: %s, total deleted: %s", "clean_messages completed: total batches: %s, total messages: %s, filtered messages: %s, total deleted: %s",
stats["batches"], stats["batches"],

View File

@ -1,9 +1,9 @@
import datetime import datetime
import logging import logging
import os
import random import random
import time import time
from collections.abc import Iterable, Sequence from collections.abc import Iterable, Sequence
from typing import TYPE_CHECKING
import click import click
from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.orm import Session, sessionmaker
@ -20,6 +20,156 @@ from services.billing_service import BillingService, SubscriptionPlan
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from opentelemetry.metrics import Counter, Histogram
class WorkflowRunCleanupMetrics:
"""
Records low-cardinality OpenTelemetry metrics for workflow run cleanup jobs.
Metrics are emitted with stable labels only (dry_run/window_mode/task_label/status)
to keep dashboard and alert cardinality predictable in production clusters.
"""
_job_runs_total: "Counter | None"
_batches_total: "Counter | None"
_runs_scanned_total: "Counter | None"
_runs_targeted_total: "Counter | None"
_runs_deleted_total: "Counter | None"
_runs_skipped_total: "Counter | None"
_related_records_total: "Counter | None"
_job_duration_seconds: "Histogram | None"
_batch_duration_seconds: "Histogram | None"
_base_attributes: dict[str, str]
def __init__(self, *, dry_run: bool, has_window: bool, task_label: str) -> None:
self._job_runs_total = None
self._batches_total = None
self._runs_scanned_total = None
self._runs_targeted_total = None
self._runs_deleted_total = None
self._runs_skipped_total = None
self._related_records_total = None
self._job_duration_seconds = None
self._batch_duration_seconds = None
self._base_attributes = {
"job_name": "workflow_run_cleanup",
"dry_run": str(dry_run).lower(),
"window_mode": "between" if has_window else "before_cutoff",
"task_label": task_label,
}
self._init_instruments()
def _init_instruments(self) -> None:
try:
from opentelemetry.metrics import get_meter
meter = get_meter("workflow_run_cleanup", version=dify_config.project.version)
self._job_runs_total = meter.create_counter(
"workflow_run_cleanup_jobs_total",
description="Total number of workflow run cleanup jobs by status.",
unit="{job}",
)
self._batches_total = meter.create_counter(
"workflow_run_cleanup_batches_total",
description="Total number of processed cleanup batches.",
unit="{batch}",
)
self._runs_scanned_total = meter.create_counter(
"workflow_run_cleanup_scanned_runs_total",
description="Total workflow runs scanned by cleanup jobs.",
unit="{run}",
)
self._runs_targeted_total = meter.create_counter(
"workflow_run_cleanup_targeted_runs_total",
description="Total workflow runs targeted by cleanup policy.",
unit="{run}",
)
self._runs_deleted_total = meter.create_counter(
"workflow_run_cleanup_deleted_runs_total",
description="Total workflow runs deleted by cleanup jobs.",
unit="{run}",
)
self._runs_skipped_total = meter.create_counter(
"workflow_run_cleanup_skipped_runs_total",
description="Total workflow runs skipped because tenant is paid/unknown.",
unit="{run}",
)
self._related_records_total = meter.create_counter(
"workflow_run_cleanup_related_records_total",
description="Total related records processed by cleanup jobs.",
unit="{record}",
)
self._job_duration_seconds = meter.create_histogram(
"workflow_run_cleanup_job_duration_seconds",
description="Duration of workflow run cleanup jobs in seconds.",
unit="s",
)
self._batch_duration_seconds = meter.create_histogram(
"workflow_run_cleanup_batch_duration_seconds",
description="Duration of workflow run cleanup batch processing in seconds.",
unit="s",
)
except Exception:
logger.exception("workflow_run_cleanup_metrics: failed to initialize instruments")
def _attrs(self, **extra: str) -> dict[str, str]:
return {**self._base_attributes, **extra}
@staticmethod
def _add(counter: "Counter | None", value: int, attributes: dict[str, str]) -> None:
if not counter or value <= 0:
return
try:
counter.add(value, attributes)
except Exception:
logger.exception("workflow_run_cleanup_metrics: failed to add counter value")
@staticmethod
def _record(histogram: "Histogram | None", value: float, attributes: dict[str, str]) -> None:
if not histogram:
return
try:
histogram.record(value, attributes)
except Exception:
logger.exception("workflow_run_cleanup_metrics: failed to record histogram value")
def record_batch(
self,
*,
batch_rows: int,
targeted_runs: int,
skipped_runs: int,
deleted_runs: int,
related_counts: dict[str, int] | None,
related_action: str | None,
batch_duration_seconds: float,
) -> None:
attributes = self._attrs()
self._add(self._batches_total, 1, attributes)
self._add(self._runs_scanned_total, batch_rows, attributes)
self._add(self._runs_targeted_total, targeted_runs, attributes)
self._add(self._runs_skipped_total, skipped_runs, attributes)
self._add(self._runs_deleted_total, deleted_runs, attributes)
self._record(self._batch_duration_seconds, batch_duration_seconds, attributes)
if not related_counts or not related_action:
return
for record_type, count in related_counts.items():
self._add(
self._related_records_total,
count,
self._attrs(action=related_action, record_type=record_type),
)
def record_completion(self, *, status: str, job_duration_seconds: float) -> None:
attributes = self._attrs(status=status)
self._add(self._job_runs_total, 1, attributes)
self._record(self._job_duration_seconds, job_duration_seconds, attributes)
class WorkflowRunCleanup: class WorkflowRunCleanup:
def __init__( def __init__(
self, self,
@ -29,6 +179,7 @@ class WorkflowRunCleanup:
end_before: datetime.datetime | None = None, end_before: datetime.datetime | None = None,
workflow_run_repo: APIWorkflowRunRepository | None = None, workflow_run_repo: APIWorkflowRunRepository | None = None,
dry_run: bool = False, dry_run: bool = False,
task_label: str = "daily",
): ):
if (start_from is None) ^ (end_before is None): if (start_from is None) ^ (end_before is None):
raise ValueError("start_from and end_before must be both set or both omitted.") raise ValueError("start_from and end_before must be both set or both omitted.")
@ -46,6 +197,13 @@ class WorkflowRunCleanup:
self.batch_size = batch_size self.batch_size = batch_size
self._cleanup_whitelist: set[str] | None = None self._cleanup_whitelist: set[str] | None = None
self.dry_run = dry_run self.dry_run = dry_run
normalized_task_label = task_label.strip()
self.task_label = normalized_task_label or "daily"
self._metrics = WorkflowRunCleanupMetrics(
dry_run=dry_run,
has_window=bool(start_from),
task_label=self.task_label,
)
self.free_plan_grace_period_days = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD self.free_plan_grace_period_days = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD
self.workflow_run_repo: APIWorkflowRunRepository self.workflow_run_repo: APIWorkflowRunRepository
if workflow_run_repo: if workflow_run_repo:
@ -74,153 +232,193 @@ class WorkflowRunCleanup:
related_totals = self._empty_related_counts() if self.dry_run else None related_totals = self._empty_related_counts() if self.dry_run else None
batch_index = 0 batch_index = 0
last_seen: tuple[datetime.datetime, str] | None = None last_seen: tuple[datetime.datetime, str] | None = None
status = "success"
run_start = time.monotonic()
max_batch_interval_ms = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL
max_batch_interval_ms = int(os.environ.get("SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", 200)) try:
while True:
batch_start = time.monotonic()
while True: fetch_start = time.monotonic()
batch_start = time.monotonic() run_rows = self.workflow_run_repo.get_runs_batch_by_time_range(
start_from=self.window_start,
fetch_start = time.monotonic() end_before=self.window_end,
run_rows = self.workflow_run_repo.get_runs_batch_by_time_range( last_seen=last_seen,
start_from=self.window_start, batch_size=self.batch_size,
end_before=self.window_end,
last_seen=last_seen,
batch_size=self.batch_size,
)
if not run_rows:
logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1)
break
batch_index += 1
last_seen = (run_rows[-1].created_at, run_rows[-1].id)
logger.info(
"workflow_run_cleanup (batch #%s): fetched %s rows in %sms",
batch_index,
len(run_rows),
int((time.monotonic() - fetch_start) * 1000),
)
tenant_ids = {row.tenant_id for row in run_rows}
filter_start = time.monotonic()
free_tenants = self._filter_free_tenants(tenant_ids)
logger.info(
"workflow_run_cleanup (batch #%s): filtered %s free tenants from %s tenants in %sms",
batch_index,
len(free_tenants),
len(tenant_ids),
int((time.monotonic() - filter_start) * 1000),
)
free_runs = [row for row in run_rows if row.tenant_id in free_tenants]
paid_or_skipped = len(run_rows) - len(free_runs)
if not free_runs:
skipped_message = (
f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)"
) )
click.echo( if not run_rows:
click.style( logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1)
skipped_message, break
fg="yellow",
)
)
continue
total_runs_targeted += len(free_runs) batch_index += 1
last_seen = (run_rows[-1].created_at, run_rows[-1].id)
if self.dry_run:
count_start = time.monotonic()
batch_counts = self.workflow_run_repo.count_runs_with_related(
free_runs,
count_node_executions=self._count_node_executions,
count_trigger_logs=self._count_trigger_logs,
)
logger.info( logger.info(
"workflow_run_cleanup (batch #%s, dry_run): counted related records in %sms", "workflow_run_cleanup (batch #%s): fetched %s rows in %sms",
batch_index, batch_index,
int((time.monotonic() - count_start) * 1000), len(run_rows),
int((time.monotonic() - fetch_start) * 1000),
) )
if related_totals is not None:
for key in related_totals: tenant_ids = {row.tenant_id for row in run_rows}
related_totals[key] += batch_counts.get(key, 0)
sample_ids = ", ".join(run.id for run in free_runs[:5]) filter_start = time.monotonic()
free_tenants = self._filter_free_tenants(tenant_ids)
logger.info(
"workflow_run_cleanup (batch #%s): filtered %s free tenants from %s tenants in %sms",
batch_index,
len(free_tenants),
len(tenant_ids),
int((time.monotonic() - filter_start) * 1000),
)
free_runs = [row for row in run_rows if row.tenant_id in free_tenants]
paid_or_skipped = len(run_rows) - len(free_runs)
if not free_runs:
skipped_message = (
f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)"
)
click.echo(
click.style(
skipped_message,
fg="yellow",
)
)
self._metrics.record_batch(
batch_rows=len(run_rows),
targeted_runs=0,
skipped_runs=paid_or_skipped,
deleted_runs=0,
related_counts=None,
related_action=None,
batch_duration_seconds=time.monotonic() - batch_start,
)
continue
total_runs_targeted += len(free_runs)
if self.dry_run:
count_start = time.monotonic()
batch_counts = self.workflow_run_repo.count_runs_with_related(
free_runs,
count_node_executions=self._count_node_executions,
count_trigger_logs=self._count_trigger_logs,
)
logger.info(
"workflow_run_cleanup (batch #%s, dry_run): counted related records in %sms",
batch_index,
int((time.monotonic() - count_start) * 1000),
)
if related_totals is not None:
for key in related_totals:
related_totals[key] += batch_counts.get(key, 0)
sample_ids = ", ".join(run.id for run in free_runs[:5])
click.echo(
click.style(
f"[batch #{batch_index}] would delete {len(free_runs)} runs "
f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown",
fg="yellow",
)
)
logger.info(
"workflow_run_cleanup (batch #%s, dry_run): batch total %sms",
batch_index,
int((time.monotonic() - batch_start) * 1000),
)
self._metrics.record_batch(
batch_rows=len(run_rows),
targeted_runs=len(free_runs),
skipped_runs=paid_or_skipped,
deleted_runs=0,
related_counts={key: batch_counts.get(key, 0) for key in self._empty_related_counts()},
related_action="would_delete",
batch_duration_seconds=time.monotonic() - batch_start,
)
continue
try:
delete_start = time.monotonic()
counts = self.workflow_run_repo.delete_runs_with_related(
free_runs,
delete_node_executions=self._delete_node_executions,
delete_trigger_logs=self._delete_trigger_logs,
)
delete_ms = int((time.monotonic() - delete_start) * 1000)
except Exception:
logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0])
raise
total_runs_deleted += counts["runs"]
click.echo( click.echo(
click.style( click.style(
f"[batch #{batch_index}] would delete {len(free_runs)} runs " f"[batch #{batch_index}] deleted runs: {counts['runs']} "
f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown", f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, "
fg="yellow", f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, "
f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); "
f"skipped {paid_or_skipped} paid/unknown",
fg="green",
) )
) )
logger.info( logger.info(
"workflow_run_cleanup (batch #%s, dry_run): batch total %sms", "workflow_run_cleanup (batch #%s): delete %sms, batch total %sms",
batch_index, batch_index,
delete_ms,
int((time.monotonic() - batch_start) * 1000), int((time.monotonic() - batch_start) * 1000),
) )
continue self._metrics.record_batch(
batch_rows=len(run_rows),
try: targeted_runs=len(free_runs),
delete_start = time.monotonic() skipped_runs=paid_or_skipped,
counts = self.workflow_run_repo.delete_runs_with_related( deleted_runs=counts["runs"],
free_runs, related_counts={key: counts.get(key, 0) for key in self._empty_related_counts()},
delete_node_executions=self._delete_node_executions, related_action="deleted",
delete_trigger_logs=self._delete_trigger_logs, batch_duration_seconds=time.monotonic() - batch_start,
) )
delete_ms = int((time.monotonic() - delete_start) * 1000)
except Exception:
logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0])
raise
total_runs_deleted += counts["runs"] # Random sleep between batches to avoid overwhelming the database
click.echo( sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311
click.style( logger.info("workflow_run_cleanup (batch #%s): sleeping for %.2fms", batch_index, sleep_ms)
f"[batch #{batch_index}] deleted runs: {counts['runs']} " time.sleep(sleep_ms / 1000)
f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, "
f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, "
f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); "
f"skipped {paid_or_skipped} paid/unknown",
fg="green",
)
)
logger.info(
"workflow_run_cleanup (batch #%s): delete %sms, batch total %sms",
batch_index,
delete_ms,
int((time.monotonic() - batch_start) * 1000),
)
# Random sleep between batches to avoid overwhelming the database if self.dry_run:
sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311 if self.window_start:
logger.info("workflow_run_cleanup (batch #%s): sleeping for %.2fms", batch_index, sleep_ms) summary_message = (
time.sleep(sleep_ms / 1000) f"Dry run complete. Would delete {total_runs_targeted} workflow runs "
f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}"
if self.dry_run: )
if self.window_start: else:
summary_message = ( summary_message = (
f"Dry run complete. Would delete {total_runs_targeted} workflow runs " f"Dry run complete. Would delete {total_runs_targeted} workflow runs "
f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}" f"before {self.window_end.isoformat()}"
) )
if related_totals is not None:
summary_message = (
f"{summary_message}; related records: {self._format_related_counts(related_totals)}"
)
summary_color = "yellow"
else: else:
summary_message = ( if self.window_start:
f"Dry run complete. Would delete {total_runs_targeted} workflow runs " summary_message = (
f"before {self.window_end.isoformat()}" f"Cleanup complete. Deleted {total_runs_deleted} workflow runs "
) f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}"
if related_totals is not None: )
summary_message = f"{summary_message}; related records: {self._format_related_counts(related_totals)}" else:
summary_color = "yellow" summary_message = (
else: f"Cleanup complete. Deleted {total_runs_deleted} workflow runs "
if self.window_start: f"before {self.window_end.isoformat()}"
summary_message = ( )
f"Cleanup complete. Deleted {total_runs_deleted} workflow runs " summary_color = "white"
f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}"
)
else:
summary_message = (
f"Cleanup complete. Deleted {total_runs_deleted} workflow runs before {self.window_end.isoformat()}"
)
summary_color = "white"
click.echo(click.style(summary_message, fg=summary_color)) click.echo(click.style(summary_message, fg=summary_color))
except Exception:
status = "failed"
raise
finally:
self._metrics.record_completion(
status=status,
job_duration_seconds=time.monotonic() - run_start,
)
def _filter_free_tenants(self, tenant_ids: Iterable[str]) -> set[str]: def _filter_free_tenants(self, tenant_ids: Iterable[str]) -> set[str]:
tenant_id_list = list(tenant_ids) tenant_id_list = list(tenant_ids)

View File

@ -38,6 +38,7 @@ def test_absolute_mode_calls_from_time_range():
from_days_ago=None, from_days_ago=None,
before_days=None, before_days=None,
dry_run=True, dry_run=True,
task_label="daily",
) )
mock_from_time_range.assert_called_once_with( mock_from_time_range.assert_called_once_with(
@ -46,6 +47,7 @@ def test_absolute_mode_calls_from_time_range():
end_before=end_before, end_before=end_before,
batch_size=200, batch_size=200,
dry_run=True, dry_run=True,
task_label="daily",
) )
mock_from_days.assert_not_called() mock_from_days.assert_not_called()
@ -67,6 +69,7 @@ def test_relative_mode_before_days_only_calls_from_days():
from_days_ago=None, from_days_ago=None,
before_days=30, before_days=30,
dry_run=False, dry_run=False,
task_label="daily",
) )
mock_from_days.assert_called_once_with( mock_from_days.assert_called_once_with(
@ -74,6 +77,7 @@ def test_relative_mode_before_days_only_calls_from_days():
days=30, days=30,
batch_size=500, batch_size=500,
dry_run=False, dry_run=False,
task_label="daily",
) )
mock_from_time_range.assert_not_called() mock_from_time_range.assert_not_called()
@ -97,6 +101,7 @@ def test_relative_mode_with_from_days_ago_calls_from_time_range():
from_days_ago=60, from_days_ago=60,
before_days=30, before_days=30,
dry_run=False, dry_run=False,
task_label="daily",
) )
mock_from_time_range.assert_called_once_with( mock_from_time_range.assert_called_once_with(
@ -105,6 +110,7 @@ def test_relative_mode_with_from_days_ago_calls_from_time_range():
end_before=fixed_now - datetime.timedelta(days=30), end_before=fixed_now - datetime.timedelta(days=30),
batch_size=1000, batch_size=1000,
dry_run=False, dry_run=False,
task_label="daily",
) )
mock_from_days.assert_not_called() mock_from_days.assert_not_called()
@ -178,4 +184,5 @@ def test_invalid_inputs_raise_usage_error(kwargs: dict, message: str):
from_days_ago=kwargs["from_days_ago"], from_days_ago=kwargs["from_days_ago"],
before_days=kwargs["before_days"], before_days=kwargs["before_days"],
dry_run=False, dry_run=False,
task_label="daily",
) )

View File

@ -265,6 +265,61 @@ def test_run_exits_on_empty_batch(monkeypatch: pytest.MonkeyPatch) -> None:
cleanup.run() cleanup.run()
def test_run_records_metrics_on_success(monkeypatch: pytest.MonkeyPatch) -> None:
cutoff = datetime.datetime.now()
repo = FakeRepo(
batches=[[FakeRun("run-free", "t_free", cutoff)]],
delete_result={
"runs": 0,
"node_executions": 2,
"offloads": 1,
"app_logs": 3,
"trigger_logs": 4,
"pauses": 5,
"pause_reasons": 6,
},
)
cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10)
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False)
batch_calls: list[dict[str, object]] = []
completion_calls: list[dict[str, object]] = []
monkeypatch.setattr(cleanup._metrics, "record_batch", lambda **kwargs: batch_calls.append(kwargs))
monkeypatch.setattr(cleanup._metrics, "record_completion", lambda **kwargs: completion_calls.append(kwargs))
cleanup.run()
assert len(batch_calls) == 1
assert batch_calls[0]["batch_rows"] == 1
assert batch_calls[0]["targeted_runs"] == 1
assert batch_calls[0]["deleted_runs"] == 1
assert batch_calls[0]["related_action"] == "deleted"
assert len(completion_calls) == 1
assert completion_calls[0]["status"] == "success"
def test_run_records_failed_metrics(monkeypatch: pytest.MonkeyPatch) -> None:
class FailingRepo(FakeRepo):
def delete_runs_with_related(
self, runs: list[FakeRun], delete_node_executions=None, delete_trigger_logs=None
) -> dict[str, int]:
raise RuntimeError("delete failed")
cutoff = datetime.datetime.now()
repo = FailingRepo(batches=[[FakeRun("run-free", "t_free", cutoff)]])
cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10)
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False)
completion_calls: list[dict[str, object]] = []
monkeypatch.setattr(cleanup._metrics, "record_completion", lambda **kwargs: completion_calls.append(kwargs))
with pytest.raises(RuntimeError, match="delete failed"):
cleanup.run()
assert len(completion_calls) == 1
assert completion_calls[0]["status"] == "failed"
def test_run_dry_run_skips_deletions(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None: def test_run_dry_run_skips_deletions(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
cutoff = datetime.datetime.now() cutoff = datetime.datetime.now()
repo = FakeRepo( repo = FakeRepo(

View File

@ -619,3 +619,53 @@ class TestMessagesCleanServiceFromDays:
assert service._end_before == expected_end_before assert service._end_before == expected_end_before
assert service._batch_size == 1000 # default assert service._batch_size == 1000 # default
assert service._dry_run is False # default assert service._dry_run is False # default
class TestMessagesCleanServiceRun:
"""Unit tests for MessagesCleanService.run instrumentation behavior."""
def test_run_records_completion_metrics_on_success(self):
# Arrange
service = MessagesCleanService(
policy=BillingDisabledPolicy(),
start_from=datetime.datetime(2024, 1, 1),
end_before=datetime.datetime(2024, 1, 2),
batch_size=100,
dry_run=False,
)
expected_stats = {
"batches": 1,
"total_messages": 10,
"filtered_messages": 5,
"total_deleted": 5,
}
service._clean_messages_by_time_range = MagicMock(return_value=expected_stats) # type: ignore[method-assign]
completion_calls: list[dict[str, object]] = []
service._metrics.record_completion = lambda **kwargs: completion_calls.append(kwargs) # type: ignore[method-assign]
# Act
result = service.run()
# Assert
assert result == expected_stats
assert len(completion_calls) == 1
assert completion_calls[0]["status"] == "success"
def test_run_records_completion_metrics_on_failure(self):
# Arrange
service = MessagesCleanService(
policy=BillingDisabledPolicy(),
start_from=datetime.datetime(2024, 1, 1),
end_before=datetime.datetime(2024, 1, 2),
batch_size=100,
dry_run=False,
)
service._clean_messages_by_time_range = MagicMock(side_effect=RuntimeError("clean failed")) # type: ignore[method-assign]
completion_calls: list[dict[str, object]] = []
service._metrics.record_completion = lambda **kwargs: completion_calls.append(kwargs) # type: ignore[method-assign]
# Act & Assert
with pytest.raises(RuntimeError, match="clean failed"):
service.run()
assert len(completion_calls) == 1
assert completion_calls[0]["status"] == "failed"