mirror of
https://github.com/langgenius/dify.git
synced 2026-04-19 10:17:26 +08:00
Compare commits
9 Commits
1.13.2
...
deploy/cle
| Author | SHA1 | Date | |
|---|---|---|---|
| 232b8eb248 | |||
| 4c0d81029f | |||
| bacf70c00a | |||
| 942729ba48 | |||
| 4f5af0b43c | |||
| a28cb993b8 | |||
| c6dd2ef25a | |||
| b0e8becd14 | |||
| f90e0d781a |
@ -937,6 +937,12 @@ def clear_free_plan_tenant_expired_logs(days: int, batch: int, tenant_ids: list[
|
|||||||
is_flag=True,
|
is_flag=True,
|
||||||
help="Preview cleanup results without deleting any workflow run data.",
|
help="Preview cleanup results without deleting any workflow run data.",
|
||||||
)
|
)
|
||||||
|
@click.option(
|
||||||
|
"--task-label",
|
||||||
|
default="daily",
|
||||||
|
show_default=True,
|
||||||
|
help="Stable label value used to distinguish multiple cleanup CronJobs in metrics.",
|
||||||
|
)
|
||||||
def clean_workflow_runs(
|
def clean_workflow_runs(
|
||||||
before_days: int,
|
before_days: int,
|
||||||
batch_size: int,
|
batch_size: int,
|
||||||
@ -945,10 +951,13 @@ def clean_workflow_runs(
|
|||||||
start_from: datetime.datetime | None,
|
start_from: datetime.datetime | None,
|
||||||
end_before: datetime.datetime | None,
|
end_before: datetime.datetime | None,
|
||||||
dry_run: bool,
|
dry_run: bool,
|
||||||
|
task_label: str,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Clean workflow runs and related workflow data for free tenants.
|
Clean workflow runs and related workflow data for free tenants.
|
||||||
"""
|
"""
|
||||||
|
from extensions.otel.runtime import flush_telemetry
|
||||||
|
|
||||||
if (start_from is None) ^ (end_before is None):
|
if (start_from is None) ^ (end_before is None):
|
||||||
raise click.UsageError("--start-from and --end-before must be provided together.")
|
raise click.UsageError("--start-from and --end-before must be provided together.")
|
||||||
|
|
||||||
@ -968,13 +977,17 @@ def clean_workflow_runs(
|
|||||||
start_time = datetime.datetime.now(datetime.UTC)
|
start_time = datetime.datetime.now(datetime.UTC)
|
||||||
click.echo(click.style(f"Starting workflow run cleanup at {start_time.isoformat()}.", fg="white"))
|
click.echo(click.style(f"Starting workflow run cleanup at {start_time.isoformat()}.", fg="white"))
|
||||||
|
|
||||||
WorkflowRunCleanup(
|
try:
|
||||||
days=before_days,
|
WorkflowRunCleanup(
|
||||||
batch_size=batch_size,
|
days=before_days,
|
||||||
start_from=start_from,
|
batch_size=batch_size,
|
||||||
end_before=end_before,
|
start_from=start_from,
|
||||||
dry_run=dry_run,
|
end_before=end_before,
|
||||||
).run()
|
dry_run=dry_run,
|
||||||
|
task_label=task_label,
|
||||||
|
).run()
|
||||||
|
finally:
|
||||||
|
flush_telemetry()
|
||||||
|
|
||||||
end_time = datetime.datetime.now(datetime.UTC)
|
end_time = datetime.datetime.now(datetime.UTC)
|
||||||
elapsed = end_time - start_time
|
elapsed = end_time - start_time
|
||||||
@ -2630,6 +2643,12 @@ def migrate_oss(
|
|||||||
help="Graceful period in days after subscription expiration, will be ignored when billing is disabled.",
|
help="Graceful period in days after subscription expiration, will be ignored when billing is disabled.",
|
||||||
)
|
)
|
||||||
@click.option("--dry-run", is_flag=True, default=False, help="Show messages logs would be cleaned without deleting")
|
@click.option("--dry-run", is_flag=True, default=False, help="Show messages logs would be cleaned without deleting")
|
||||||
|
@click.option(
|
||||||
|
"--task-label",
|
||||||
|
default="daily",
|
||||||
|
show_default=True,
|
||||||
|
help="Stable label value used to distinguish multiple cleanup CronJobs in metrics.",
|
||||||
|
)
|
||||||
def clean_expired_messages(
|
def clean_expired_messages(
|
||||||
batch_size: int,
|
batch_size: int,
|
||||||
graceful_period: int,
|
graceful_period: int,
|
||||||
@ -2638,10 +2657,13 @@ def clean_expired_messages(
|
|||||||
from_days_ago: int | None,
|
from_days_ago: int | None,
|
||||||
before_days: int | None,
|
before_days: int | None,
|
||||||
dry_run: bool,
|
dry_run: bool,
|
||||||
|
task_label: str,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Clean expired messages and related data for tenants based on clean policy.
|
Clean expired messages and related data for tenants based on clean policy.
|
||||||
"""
|
"""
|
||||||
|
from extensions.otel.runtime import flush_telemetry
|
||||||
|
|
||||||
click.echo(click.style("clean_messages: start clean messages.", fg="green"))
|
click.echo(click.style("clean_messages: start clean messages.", fg="green"))
|
||||||
|
|
||||||
start_at = time.perf_counter()
|
start_at = time.perf_counter()
|
||||||
@ -2691,6 +2713,7 @@ def clean_expired_messages(
|
|||||||
end_before=end_before,
|
end_before=end_before,
|
||||||
batch_size=batch_size,
|
batch_size=batch_size,
|
||||||
dry_run=dry_run,
|
dry_run=dry_run,
|
||||||
|
task_label=task_label,
|
||||||
)
|
)
|
||||||
elif from_days_ago is None:
|
elif from_days_ago is None:
|
||||||
assert before_days is not None
|
assert before_days is not None
|
||||||
@ -2699,6 +2722,7 @@ def clean_expired_messages(
|
|||||||
days=before_days,
|
days=before_days,
|
||||||
batch_size=batch_size,
|
batch_size=batch_size,
|
||||||
dry_run=dry_run,
|
dry_run=dry_run,
|
||||||
|
task_label=task_label,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
assert before_days is not None
|
assert before_days is not None
|
||||||
@ -2710,6 +2734,7 @@ def clean_expired_messages(
|
|||||||
end_before=now - datetime.timedelta(days=before_days),
|
end_before=now - datetime.timedelta(days=before_days),
|
||||||
batch_size=batch_size,
|
batch_size=batch_size,
|
||||||
dry_run=dry_run,
|
dry_run=dry_run,
|
||||||
|
task_label=task_label,
|
||||||
)
|
)
|
||||||
stats = service.run()
|
stats = service.run()
|
||||||
|
|
||||||
@ -2735,6 +2760,8 @@ def clean_expired_messages(
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
|
finally:
|
||||||
|
flush_telemetry()
|
||||||
|
|
||||||
click.echo(click.style("messages cleanup completed.", fg="green"))
|
click.echo(click.style("messages cleanup completed.", fg="green"))
|
||||||
|
|
||||||
|
|||||||
@ -5,7 +5,7 @@ from typing import Union
|
|||||||
|
|
||||||
from celery.signals import worker_init
|
from celery.signals import worker_init
|
||||||
from flask_login import user_loaded_from_request, user_logged_in
|
from flask_login import user_loaded_from_request, user_logged_in
|
||||||
from opentelemetry import trace
|
from opentelemetry import metrics, trace
|
||||||
from opentelemetry.propagate import set_global_textmap
|
from opentelemetry.propagate import set_global_textmap
|
||||||
from opentelemetry.propagators.b3 import B3Format
|
from opentelemetry.propagators.b3 import B3Format
|
||||||
from opentelemetry.propagators.composite import CompositePropagator
|
from opentelemetry.propagators.composite import CompositePropagator
|
||||||
@ -31,9 +31,29 @@ def setup_context_propagation() -> None:
|
|||||||
|
|
||||||
|
|
||||||
def shutdown_tracer() -> None:
|
def shutdown_tracer() -> None:
|
||||||
|
flush_telemetry()
|
||||||
|
|
||||||
|
|
||||||
|
def flush_telemetry() -> None:
|
||||||
|
"""
|
||||||
|
Best-effort flush for telemetry providers.
|
||||||
|
|
||||||
|
This is mainly used by short-lived command processes (e.g. Kubernetes CronJob)
|
||||||
|
so counters/histograms are exported before the process exits.
|
||||||
|
"""
|
||||||
provider = trace.get_tracer_provider()
|
provider = trace.get_tracer_provider()
|
||||||
if hasattr(provider, "force_flush"):
|
if hasattr(provider, "force_flush"):
|
||||||
provider.force_flush()
|
try:
|
||||||
|
provider.force_flush()
|
||||||
|
except Exception:
|
||||||
|
logger.exception("otel: failed to flush trace provider")
|
||||||
|
|
||||||
|
metric_provider = metrics.get_meter_provider()
|
||||||
|
if hasattr(metric_provider, "force_flush"):
|
||||||
|
try:
|
||||||
|
metric_provider.force_flush()
|
||||||
|
except Exception:
|
||||||
|
logger.exception("otel: failed to flush metric provider")
|
||||||
|
|
||||||
|
|
||||||
def is_celery_worker():
|
def is_celery_worker():
|
||||||
|
|||||||
@ -1,16 +1,16 @@
|
|||||||
import datetime
|
import datetime
|
||||||
import logging
|
import logging
|
||||||
import os
|
|
||||||
import random
|
import random
|
||||||
import time
|
import time
|
||||||
from collections.abc import Sequence
|
from collections.abc import Sequence
|
||||||
from typing import cast
|
from typing import TYPE_CHECKING, cast
|
||||||
|
|
||||||
import sqlalchemy as sa
|
import sqlalchemy as sa
|
||||||
from sqlalchemy import delete, select, tuple_
|
from sqlalchemy import delete, select, tuple_
|
||||||
from sqlalchemy.engine import CursorResult
|
from sqlalchemy.engine import CursorResult
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from configs import dify_config
|
||||||
from extensions.ext_database import db
|
from extensions.ext_database import db
|
||||||
from libs.datetime_utils import naive_utc_now
|
from libs.datetime_utils import naive_utc_now
|
||||||
from models.model import (
|
from models.model import (
|
||||||
@ -33,6 +33,128 @@ from services.retention.conversation.messages_clean_policy import (
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from opentelemetry.metrics import Counter, Histogram
|
||||||
|
|
||||||
|
|
||||||
|
class MessagesCleanupMetrics:
|
||||||
|
"""
|
||||||
|
Records low-cardinality OpenTelemetry metrics for expired message cleanup jobs.
|
||||||
|
|
||||||
|
We keep labels stable (dry_run/window_mode/task_label/status) so these metrics remain
|
||||||
|
dashboard-friendly for long-running CronJob executions.
|
||||||
|
"""
|
||||||
|
|
||||||
|
_job_runs_total: "Counter | None"
|
||||||
|
_batches_total: "Counter | None"
|
||||||
|
_messages_scanned_total: "Counter | None"
|
||||||
|
_messages_filtered_total: "Counter | None"
|
||||||
|
_messages_deleted_total: "Counter | None"
|
||||||
|
_job_duration_seconds: "Histogram | None"
|
||||||
|
_batch_duration_seconds: "Histogram | None"
|
||||||
|
_base_attributes: dict[str, str]
|
||||||
|
|
||||||
|
def __init__(self, *, dry_run: bool, has_window: bool, task_label: str) -> None:
|
||||||
|
self._job_runs_total = None
|
||||||
|
self._batches_total = None
|
||||||
|
self._messages_scanned_total = None
|
||||||
|
self._messages_filtered_total = None
|
||||||
|
self._messages_deleted_total = None
|
||||||
|
self._job_duration_seconds = None
|
||||||
|
self._batch_duration_seconds = None
|
||||||
|
self._base_attributes = {
|
||||||
|
"job_name": "messages_cleanup",
|
||||||
|
"dry_run": str(dry_run).lower(),
|
||||||
|
"window_mode": "between" if has_window else "before_cutoff",
|
||||||
|
"task_label": task_label,
|
||||||
|
}
|
||||||
|
self._init_instruments()
|
||||||
|
|
||||||
|
def _init_instruments(self) -> None:
|
||||||
|
try:
|
||||||
|
from opentelemetry.metrics import get_meter
|
||||||
|
|
||||||
|
meter = get_meter("messages_cleanup", version=dify_config.project.version)
|
||||||
|
self._job_runs_total = meter.create_counter(
|
||||||
|
"messages_cleanup_jobs_total",
|
||||||
|
description="Total number of expired message cleanup jobs by status.",
|
||||||
|
unit="{job}",
|
||||||
|
)
|
||||||
|
self._batches_total = meter.create_counter(
|
||||||
|
"messages_cleanup_batches_total",
|
||||||
|
description="Total number of message cleanup batches processed.",
|
||||||
|
unit="{batch}",
|
||||||
|
)
|
||||||
|
self._messages_scanned_total = meter.create_counter(
|
||||||
|
"messages_cleanup_scanned_messages_total",
|
||||||
|
description="Total messages scanned by cleanup jobs.",
|
||||||
|
unit="{message}",
|
||||||
|
)
|
||||||
|
self._messages_filtered_total = meter.create_counter(
|
||||||
|
"messages_cleanup_filtered_messages_total",
|
||||||
|
description="Total messages selected by cleanup policy.",
|
||||||
|
unit="{message}",
|
||||||
|
)
|
||||||
|
self._messages_deleted_total = meter.create_counter(
|
||||||
|
"messages_cleanup_deleted_messages_total",
|
||||||
|
description="Total messages deleted by cleanup jobs.",
|
||||||
|
unit="{message}",
|
||||||
|
)
|
||||||
|
self._job_duration_seconds = meter.create_histogram(
|
||||||
|
"messages_cleanup_job_duration_seconds",
|
||||||
|
description="Duration of expired message cleanup jobs in seconds.",
|
||||||
|
unit="s",
|
||||||
|
)
|
||||||
|
self._batch_duration_seconds = meter.create_histogram(
|
||||||
|
"messages_cleanup_batch_duration_seconds",
|
||||||
|
description="Duration of expired message cleanup batch processing in seconds.",
|
||||||
|
unit="s",
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("messages_cleanup_metrics: failed to initialize instruments")
|
||||||
|
|
||||||
|
def _attrs(self, **extra: str) -> dict[str, str]:
|
||||||
|
return {**self._base_attributes, **extra}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _add(counter: "Counter | None", value: int, attributes: dict[str, str]) -> None:
|
||||||
|
if not counter or value <= 0:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
counter.add(value, attributes)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("messages_cleanup_metrics: failed to add counter value")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _record(histogram: "Histogram | None", value: float, attributes: dict[str, str]) -> None:
|
||||||
|
if not histogram:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
histogram.record(value, attributes)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("messages_cleanup_metrics: failed to record histogram value")
|
||||||
|
|
||||||
|
def record_batch(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
scanned_messages: int,
|
||||||
|
filtered_messages: int,
|
||||||
|
deleted_messages: int,
|
||||||
|
batch_duration_seconds: float,
|
||||||
|
) -> None:
|
||||||
|
attributes = self._attrs()
|
||||||
|
self._add(self._batches_total, 1, attributes)
|
||||||
|
self._add(self._messages_scanned_total, scanned_messages, attributes)
|
||||||
|
self._add(self._messages_filtered_total, filtered_messages, attributes)
|
||||||
|
self._add(self._messages_deleted_total, deleted_messages, attributes)
|
||||||
|
self._record(self._batch_duration_seconds, batch_duration_seconds, attributes)
|
||||||
|
|
||||||
|
def record_completion(self, *, status: str, job_duration_seconds: float) -> None:
|
||||||
|
attributes = self._attrs(status=status)
|
||||||
|
self._add(self._job_runs_total, 1, attributes)
|
||||||
|
self._record(self._job_duration_seconds, job_duration_seconds, attributes)
|
||||||
|
|
||||||
|
|
||||||
class MessagesCleanService:
|
class MessagesCleanService:
|
||||||
"""
|
"""
|
||||||
Service for cleaning expired messages based on retention policies.
|
Service for cleaning expired messages based on retention policies.
|
||||||
@ -48,6 +170,7 @@ class MessagesCleanService:
|
|||||||
start_from: datetime.datetime | None = None,
|
start_from: datetime.datetime | None = None,
|
||||||
batch_size: int = 1000,
|
batch_size: int = 1000,
|
||||||
dry_run: bool = False,
|
dry_run: bool = False,
|
||||||
|
task_label: str = "daily",
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Initialize the service with cleanup parameters.
|
Initialize the service with cleanup parameters.
|
||||||
@ -58,12 +181,20 @@ class MessagesCleanService:
|
|||||||
start_from: Optional start time (inclusive) of the range
|
start_from: Optional start time (inclusive) of the range
|
||||||
batch_size: Number of messages to process per batch
|
batch_size: Number of messages to process per batch
|
||||||
dry_run: Whether to perform a dry run (no actual deletion)
|
dry_run: Whether to perform a dry run (no actual deletion)
|
||||||
|
task_label: Stable task label to distinguish multiple cleanup CronJobs
|
||||||
"""
|
"""
|
||||||
self._policy = policy
|
self._policy = policy
|
||||||
self._end_before = end_before
|
self._end_before = end_before
|
||||||
self._start_from = start_from
|
self._start_from = start_from
|
||||||
self._batch_size = batch_size
|
self._batch_size = batch_size
|
||||||
self._dry_run = dry_run
|
self._dry_run = dry_run
|
||||||
|
normalized_task_label = task_label.strip()
|
||||||
|
self._task_label = normalized_task_label or "daily"
|
||||||
|
self._metrics = MessagesCleanupMetrics(
|
||||||
|
dry_run=dry_run,
|
||||||
|
has_window=bool(start_from),
|
||||||
|
task_label=self._task_label,
|
||||||
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_time_range(
|
def from_time_range(
|
||||||
@ -73,6 +204,7 @@ class MessagesCleanService:
|
|||||||
end_before: datetime.datetime,
|
end_before: datetime.datetime,
|
||||||
batch_size: int = 1000,
|
batch_size: int = 1000,
|
||||||
dry_run: bool = False,
|
dry_run: bool = False,
|
||||||
|
task_label: str = "daily",
|
||||||
) -> "MessagesCleanService":
|
) -> "MessagesCleanService":
|
||||||
"""
|
"""
|
||||||
Create a service instance for cleaning messages within a specific time range.
|
Create a service instance for cleaning messages within a specific time range.
|
||||||
@ -85,6 +217,7 @@ class MessagesCleanService:
|
|||||||
end_before: End time (exclusive) of the range
|
end_before: End time (exclusive) of the range
|
||||||
batch_size: Number of messages to process per batch
|
batch_size: Number of messages to process per batch
|
||||||
dry_run: Whether to perform a dry run (no actual deletion)
|
dry_run: Whether to perform a dry run (no actual deletion)
|
||||||
|
task_label: Stable task label to distinguish multiple cleanup CronJobs
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
MessagesCleanService instance
|
MessagesCleanService instance
|
||||||
@ -112,6 +245,7 @@ class MessagesCleanService:
|
|||||||
start_from=start_from,
|
start_from=start_from,
|
||||||
batch_size=batch_size,
|
batch_size=batch_size,
|
||||||
dry_run=dry_run,
|
dry_run=dry_run,
|
||||||
|
task_label=task_label,
|
||||||
)
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@ -121,6 +255,7 @@ class MessagesCleanService:
|
|||||||
days: int = 30,
|
days: int = 30,
|
||||||
batch_size: int = 1000,
|
batch_size: int = 1000,
|
||||||
dry_run: bool = False,
|
dry_run: bool = False,
|
||||||
|
task_label: str = "daily",
|
||||||
) -> "MessagesCleanService":
|
) -> "MessagesCleanService":
|
||||||
"""
|
"""
|
||||||
Create a service instance for cleaning messages older than specified days.
|
Create a service instance for cleaning messages older than specified days.
|
||||||
@ -130,6 +265,7 @@ class MessagesCleanService:
|
|||||||
days: Number of days to look back from now
|
days: Number of days to look back from now
|
||||||
batch_size: Number of messages to process per batch
|
batch_size: Number of messages to process per batch
|
||||||
dry_run: Whether to perform a dry run (no actual deletion)
|
dry_run: Whether to perform a dry run (no actual deletion)
|
||||||
|
task_label: Stable task label to distinguish multiple cleanup CronJobs
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
MessagesCleanService instance
|
MessagesCleanService instance
|
||||||
@ -153,7 +289,14 @@ class MessagesCleanService:
|
|||||||
policy.__class__.__name__,
|
policy.__class__.__name__,
|
||||||
)
|
)
|
||||||
|
|
||||||
return cls(policy=policy, end_before=end_before, start_from=None, batch_size=batch_size, dry_run=dry_run)
|
return cls(
|
||||||
|
policy=policy,
|
||||||
|
end_before=end_before,
|
||||||
|
start_from=None,
|
||||||
|
batch_size=batch_size,
|
||||||
|
dry_run=dry_run,
|
||||||
|
task_label=task_label,
|
||||||
|
)
|
||||||
|
|
||||||
def run(self) -> dict[str, int]:
|
def run(self) -> dict[str, int]:
|
||||||
"""
|
"""
|
||||||
@ -162,7 +305,18 @@ class MessagesCleanService:
|
|||||||
Returns:
|
Returns:
|
||||||
Dict with statistics: batches, filtered_messages, total_deleted
|
Dict with statistics: batches, filtered_messages, total_deleted
|
||||||
"""
|
"""
|
||||||
return self._clean_messages_by_time_range()
|
status = "success"
|
||||||
|
run_start = time.monotonic()
|
||||||
|
try:
|
||||||
|
return self._clean_messages_by_time_range()
|
||||||
|
except Exception:
|
||||||
|
status = "failed"
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
self._metrics.record_completion(
|
||||||
|
status=status,
|
||||||
|
job_duration_seconds=time.monotonic() - run_start,
|
||||||
|
)
|
||||||
|
|
||||||
def _clean_messages_by_time_range(self) -> dict[str, int]:
|
def _clean_messages_by_time_range(self) -> dict[str, int]:
|
||||||
"""
|
"""
|
||||||
@ -197,11 +351,14 @@ class MessagesCleanService:
|
|||||||
self._end_before,
|
self._end_before,
|
||||||
)
|
)
|
||||||
|
|
||||||
max_batch_interval_ms = int(os.environ.get("SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", 200))
|
max_batch_interval_ms = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
stats["batches"] += 1
|
stats["batches"] += 1
|
||||||
batch_start = time.monotonic()
|
batch_start = time.monotonic()
|
||||||
|
batch_scanned_messages = 0
|
||||||
|
batch_filtered_messages = 0
|
||||||
|
batch_deleted_messages = 0
|
||||||
|
|
||||||
# Step 1: Fetch a batch of messages using cursor
|
# Step 1: Fetch a batch of messages using cursor
|
||||||
with Session(db.engine, expire_on_commit=False) as session:
|
with Session(db.engine, expire_on_commit=False) as session:
|
||||||
@ -240,9 +397,16 @@ class MessagesCleanService:
|
|||||||
|
|
||||||
# Track total messages fetched across all batches
|
# Track total messages fetched across all batches
|
||||||
stats["total_messages"] += len(messages)
|
stats["total_messages"] += len(messages)
|
||||||
|
batch_scanned_messages = len(messages)
|
||||||
|
|
||||||
if not messages:
|
if not messages:
|
||||||
logger.info("clean_messages (batch %s): no more messages to process", stats["batches"])
|
logger.info("clean_messages (batch %s): no more messages to process", stats["batches"])
|
||||||
|
self._metrics.record_batch(
|
||||||
|
scanned_messages=batch_scanned_messages,
|
||||||
|
filtered_messages=batch_filtered_messages,
|
||||||
|
deleted_messages=batch_deleted_messages,
|
||||||
|
batch_duration_seconds=time.monotonic() - batch_start,
|
||||||
|
)
|
||||||
break
|
break
|
||||||
|
|
||||||
# Update cursor to the last message's (created_at, id)
|
# Update cursor to the last message's (created_at, id)
|
||||||
@ -268,6 +432,12 @@ class MessagesCleanService:
|
|||||||
|
|
||||||
if not apps:
|
if not apps:
|
||||||
logger.info("clean_messages (batch %s): no apps found, skip", stats["batches"])
|
logger.info("clean_messages (batch %s): no apps found, skip", stats["batches"])
|
||||||
|
self._metrics.record_batch(
|
||||||
|
scanned_messages=batch_scanned_messages,
|
||||||
|
filtered_messages=batch_filtered_messages,
|
||||||
|
deleted_messages=batch_deleted_messages,
|
||||||
|
batch_duration_seconds=time.monotonic() - batch_start,
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Build app_id -> tenant_id mapping
|
# Build app_id -> tenant_id mapping
|
||||||
@ -286,9 +456,16 @@ class MessagesCleanService:
|
|||||||
|
|
||||||
if not message_ids_to_delete:
|
if not message_ids_to_delete:
|
||||||
logger.info("clean_messages (batch %s): no messages to delete, skip", stats["batches"])
|
logger.info("clean_messages (batch %s): no messages to delete, skip", stats["batches"])
|
||||||
|
self._metrics.record_batch(
|
||||||
|
scanned_messages=batch_scanned_messages,
|
||||||
|
filtered_messages=batch_filtered_messages,
|
||||||
|
deleted_messages=batch_deleted_messages,
|
||||||
|
batch_duration_seconds=time.monotonic() - batch_start,
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
stats["filtered_messages"] += len(message_ids_to_delete)
|
stats["filtered_messages"] += len(message_ids_to_delete)
|
||||||
|
batch_filtered_messages = len(message_ids_to_delete)
|
||||||
|
|
||||||
# Step 4: Batch delete messages and their relations
|
# Step 4: Batch delete messages and their relations
|
||||||
if not self._dry_run:
|
if not self._dry_run:
|
||||||
@ -309,6 +486,7 @@ class MessagesCleanService:
|
|||||||
commit_ms = int((time.monotonic() - commit_start) * 1000)
|
commit_ms = int((time.monotonic() - commit_start) * 1000)
|
||||||
|
|
||||||
stats["total_deleted"] += messages_deleted
|
stats["total_deleted"] += messages_deleted
|
||||||
|
batch_deleted_messages = messages_deleted
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"clean_messages (batch %s): processed %s messages, deleted %s messages",
|
"clean_messages (batch %s): processed %s messages, deleted %s messages",
|
||||||
@ -343,6 +521,13 @@ class MessagesCleanService:
|
|||||||
for msg_id in sampled_ids:
|
for msg_id in sampled_ids:
|
||||||
logger.info("clean_messages (batch %s, dry_run) sample: message_id=%s", stats["batches"], msg_id)
|
logger.info("clean_messages (batch %s, dry_run) sample: message_id=%s", stats["batches"], msg_id)
|
||||||
|
|
||||||
|
self._metrics.record_batch(
|
||||||
|
scanned_messages=batch_scanned_messages,
|
||||||
|
filtered_messages=batch_filtered_messages,
|
||||||
|
deleted_messages=batch_deleted_messages,
|
||||||
|
batch_duration_seconds=time.monotonic() - batch_start,
|
||||||
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"clean_messages completed: total batches: %s, total messages: %s, filtered messages: %s, total deleted: %s",
|
"clean_messages completed: total batches: %s, total messages: %s, filtered messages: %s, total deleted: %s",
|
||||||
stats["batches"],
|
stats["batches"],
|
||||||
|
|||||||
@ -1,9 +1,9 @@
|
|||||||
import datetime
|
import datetime
|
||||||
import logging
|
import logging
|
||||||
import os
|
|
||||||
import random
|
import random
|
||||||
import time
|
import time
|
||||||
from collections.abc import Iterable, Sequence
|
from collections.abc import Iterable, Sequence
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
import click
|
import click
|
||||||
from sqlalchemy.orm import Session, sessionmaker
|
from sqlalchemy.orm import Session, sessionmaker
|
||||||
@ -20,6 +20,156 @@ from services.billing_service import BillingService, SubscriptionPlan
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from opentelemetry.metrics import Counter, Histogram
|
||||||
|
|
||||||
|
|
||||||
|
class WorkflowRunCleanupMetrics:
|
||||||
|
"""
|
||||||
|
Records low-cardinality OpenTelemetry metrics for workflow run cleanup jobs.
|
||||||
|
|
||||||
|
Metrics are emitted with stable labels only (dry_run/window_mode/task_label/status)
|
||||||
|
to keep dashboard and alert cardinality predictable in production clusters.
|
||||||
|
"""
|
||||||
|
|
||||||
|
_job_runs_total: "Counter | None"
|
||||||
|
_batches_total: "Counter | None"
|
||||||
|
_runs_scanned_total: "Counter | None"
|
||||||
|
_runs_targeted_total: "Counter | None"
|
||||||
|
_runs_deleted_total: "Counter | None"
|
||||||
|
_runs_skipped_total: "Counter | None"
|
||||||
|
_related_records_total: "Counter | None"
|
||||||
|
_job_duration_seconds: "Histogram | None"
|
||||||
|
_batch_duration_seconds: "Histogram | None"
|
||||||
|
_base_attributes: dict[str, str]
|
||||||
|
|
||||||
|
def __init__(self, *, dry_run: bool, has_window: bool, task_label: str) -> None:
|
||||||
|
self._job_runs_total = None
|
||||||
|
self._batches_total = None
|
||||||
|
self._runs_scanned_total = None
|
||||||
|
self._runs_targeted_total = None
|
||||||
|
self._runs_deleted_total = None
|
||||||
|
self._runs_skipped_total = None
|
||||||
|
self._related_records_total = None
|
||||||
|
self._job_duration_seconds = None
|
||||||
|
self._batch_duration_seconds = None
|
||||||
|
self._base_attributes = {
|
||||||
|
"job_name": "workflow_run_cleanup",
|
||||||
|
"dry_run": str(dry_run).lower(),
|
||||||
|
"window_mode": "between" if has_window else "before_cutoff",
|
||||||
|
"task_label": task_label,
|
||||||
|
}
|
||||||
|
self._init_instruments()
|
||||||
|
|
||||||
|
def _init_instruments(self) -> None:
|
||||||
|
try:
|
||||||
|
from opentelemetry.metrics import get_meter
|
||||||
|
|
||||||
|
meter = get_meter("workflow_run_cleanup", version=dify_config.project.version)
|
||||||
|
self._job_runs_total = meter.create_counter(
|
||||||
|
"workflow_run_cleanup_jobs_total",
|
||||||
|
description="Total number of workflow run cleanup jobs by status.",
|
||||||
|
unit="{job}",
|
||||||
|
)
|
||||||
|
self._batches_total = meter.create_counter(
|
||||||
|
"workflow_run_cleanup_batches_total",
|
||||||
|
description="Total number of processed cleanup batches.",
|
||||||
|
unit="{batch}",
|
||||||
|
)
|
||||||
|
self._runs_scanned_total = meter.create_counter(
|
||||||
|
"workflow_run_cleanup_scanned_runs_total",
|
||||||
|
description="Total workflow runs scanned by cleanup jobs.",
|
||||||
|
unit="{run}",
|
||||||
|
)
|
||||||
|
self._runs_targeted_total = meter.create_counter(
|
||||||
|
"workflow_run_cleanup_targeted_runs_total",
|
||||||
|
description="Total workflow runs targeted by cleanup policy.",
|
||||||
|
unit="{run}",
|
||||||
|
)
|
||||||
|
self._runs_deleted_total = meter.create_counter(
|
||||||
|
"workflow_run_cleanup_deleted_runs_total",
|
||||||
|
description="Total workflow runs deleted by cleanup jobs.",
|
||||||
|
unit="{run}",
|
||||||
|
)
|
||||||
|
self._runs_skipped_total = meter.create_counter(
|
||||||
|
"workflow_run_cleanup_skipped_runs_total",
|
||||||
|
description="Total workflow runs skipped because tenant is paid/unknown.",
|
||||||
|
unit="{run}",
|
||||||
|
)
|
||||||
|
self._related_records_total = meter.create_counter(
|
||||||
|
"workflow_run_cleanup_related_records_total",
|
||||||
|
description="Total related records processed by cleanup jobs.",
|
||||||
|
unit="{record}",
|
||||||
|
)
|
||||||
|
self._job_duration_seconds = meter.create_histogram(
|
||||||
|
"workflow_run_cleanup_job_duration_seconds",
|
||||||
|
description="Duration of workflow run cleanup jobs in seconds.",
|
||||||
|
unit="s",
|
||||||
|
)
|
||||||
|
self._batch_duration_seconds = meter.create_histogram(
|
||||||
|
"workflow_run_cleanup_batch_duration_seconds",
|
||||||
|
description="Duration of workflow run cleanup batch processing in seconds.",
|
||||||
|
unit="s",
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("workflow_run_cleanup_metrics: failed to initialize instruments")
|
||||||
|
|
||||||
|
def _attrs(self, **extra: str) -> dict[str, str]:
|
||||||
|
return {**self._base_attributes, **extra}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _add(counter: "Counter | None", value: int, attributes: dict[str, str]) -> None:
|
||||||
|
if not counter or value <= 0:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
counter.add(value, attributes)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("workflow_run_cleanup_metrics: failed to add counter value")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _record(histogram: "Histogram | None", value: float, attributes: dict[str, str]) -> None:
|
||||||
|
if not histogram:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
histogram.record(value, attributes)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("workflow_run_cleanup_metrics: failed to record histogram value")
|
||||||
|
|
||||||
|
def record_batch(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
batch_rows: int,
|
||||||
|
targeted_runs: int,
|
||||||
|
skipped_runs: int,
|
||||||
|
deleted_runs: int,
|
||||||
|
related_counts: dict[str, int] | None,
|
||||||
|
related_action: str | None,
|
||||||
|
batch_duration_seconds: float,
|
||||||
|
) -> None:
|
||||||
|
attributes = self._attrs()
|
||||||
|
self._add(self._batches_total, 1, attributes)
|
||||||
|
self._add(self._runs_scanned_total, batch_rows, attributes)
|
||||||
|
self._add(self._runs_targeted_total, targeted_runs, attributes)
|
||||||
|
self._add(self._runs_skipped_total, skipped_runs, attributes)
|
||||||
|
self._add(self._runs_deleted_total, deleted_runs, attributes)
|
||||||
|
self._record(self._batch_duration_seconds, batch_duration_seconds, attributes)
|
||||||
|
|
||||||
|
if not related_counts or not related_action:
|
||||||
|
return
|
||||||
|
|
||||||
|
for record_type, count in related_counts.items():
|
||||||
|
self._add(
|
||||||
|
self._related_records_total,
|
||||||
|
count,
|
||||||
|
self._attrs(action=related_action, record_type=record_type),
|
||||||
|
)
|
||||||
|
|
||||||
|
def record_completion(self, *, status: str, job_duration_seconds: float) -> None:
|
||||||
|
attributes = self._attrs(status=status)
|
||||||
|
self._add(self._job_runs_total, 1, attributes)
|
||||||
|
self._record(self._job_duration_seconds, job_duration_seconds, attributes)
|
||||||
|
|
||||||
|
|
||||||
class WorkflowRunCleanup:
|
class WorkflowRunCleanup:
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@ -29,6 +179,7 @@ class WorkflowRunCleanup:
|
|||||||
end_before: datetime.datetime | None = None,
|
end_before: datetime.datetime | None = None,
|
||||||
workflow_run_repo: APIWorkflowRunRepository | None = None,
|
workflow_run_repo: APIWorkflowRunRepository | None = None,
|
||||||
dry_run: bool = False,
|
dry_run: bool = False,
|
||||||
|
task_label: str = "daily",
|
||||||
):
|
):
|
||||||
if (start_from is None) ^ (end_before is None):
|
if (start_from is None) ^ (end_before is None):
|
||||||
raise ValueError("start_from and end_before must be both set or both omitted.")
|
raise ValueError("start_from and end_before must be both set or both omitted.")
|
||||||
@ -46,6 +197,13 @@ class WorkflowRunCleanup:
|
|||||||
self.batch_size = batch_size
|
self.batch_size = batch_size
|
||||||
self._cleanup_whitelist: set[str] | None = None
|
self._cleanup_whitelist: set[str] | None = None
|
||||||
self.dry_run = dry_run
|
self.dry_run = dry_run
|
||||||
|
normalized_task_label = task_label.strip()
|
||||||
|
self.task_label = normalized_task_label or "daily"
|
||||||
|
self._metrics = WorkflowRunCleanupMetrics(
|
||||||
|
dry_run=dry_run,
|
||||||
|
has_window=bool(start_from),
|
||||||
|
task_label=self.task_label,
|
||||||
|
)
|
||||||
self.free_plan_grace_period_days = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD
|
self.free_plan_grace_period_days = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD
|
||||||
self.workflow_run_repo: APIWorkflowRunRepository
|
self.workflow_run_repo: APIWorkflowRunRepository
|
||||||
if workflow_run_repo:
|
if workflow_run_repo:
|
||||||
@ -74,153 +232,193 @@ class WorkflowRunCleanup:
|
|||||||
related_totals = self._empty_related_counts() if self.dry_run else None
|
related_totals = self._empty_related_counts() if self.dry_run else None
|
||||||
batch_index = 0
|
batch_index = 0
|
||||||
last_seen: tuple[datetime.datetime, str] | None = None
|
last_seen: tuple[datetime.datetime, str] | None = None
|
||||||
|
status = "success"
|
||||||
|
run_start = time.monotonic()
|
||||||
|
max_batch_interval_ms = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL
|
||||||
|
|
||||||
max_batch_interval_ms = int(os.environ.get("SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", 200))
|
try:
|
||||||
|
while True:
|
||||||
|
batch_start = time.monotonic()
|
||||||
|
|
||||||
while True:
|
fetch_start = time.monotonic()
|
||||||
batch_start = time.monotonic()
|
run_rows = self.workflow_run_repo.get_runs_batch_by_time_range(
|
||||||
|
start_from=self.window_start,
|
||||||
fetch_start = time.monotonic()
|
end_before=self.window_end,
|
||||||
run_rows = self.workflow_run_repo.get_runs_batch_by_time_range(
|
last_seen=last_seen,
|
||||||
start_from=self.window_start,
|
batch_size=self.batch_size,
|
||||||
end_before=self.window_end,
|
|
||||||
last_seen=last_seen,
|
|
||||||
batch_size=self.batch_size,
|
|
||||||
)
|
|
||||||
if not run_rows:
|
|
||||||
logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1)
|
|
||||||
break
|
|
||||||
|
|
||||||
batch_index += 1
|
|
||||||
last_seen = (run_rows[-1].created_at, run_rows[-1].id)
|
|
||||||
logger.info(
|
|
||||||
"workflow_run_cleanup (batch #%s): fetched %s rows in %sms",
|
|
||||||
batch_index,
|
|
||||||
len(run_rows),
|
|
||||||
int((time.monotonic() - fetch_start) * 1000),
|
|
||||||
)
|
|
||||||
|
|
||||||
tenant_ids = {row.tenant_id for row in run_rows}
|
|
||||||
|
|
||||||
filter_start = time.monotonic()
|
|
||||||
free_tenants = self._filter_free_tenants(tenant_ids)
|
|
||||||
logger.info(
|
|
||||||
"workflow_run_cleanup (batch #%s): filtered %s free tenants from %s tenants in %sms",
|
|
||||||
batch_index,
|
|
||||||
len(free_tenants),
|
|
||||||
len(tenant_ids),
|
|
||||||
int((time.monotonic() - filter_start) * 1000),
|
|
||||||
)
|
|
||||||
|
|
||||||
free_runs = [row for row in run_rows if row.tenant_id in free_tenants]
|
|
||||||
paid_or_skipped = len(run_rows) - len(free_runs)
|
|
||||||
|
|
||||||
if not free_runs:
|
|
||||||
skipped_message = (
|
|
||||||
f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)"
|
|
||||||
)
|
)
|
||||||
click.echo(
|
if not run_rows:
|
||||||
click.style(
|
logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1)
|
||||||
skipped_message,
|
break
|
||||||
fg="yellow",
|
|
||||||
)
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
total_runs_targeted += len(free_runs)
|
batch_index += 1
|
||||||
|
last_seen = (run_rows[-1].created_at, run_rows[-1].id)
|
||||||
if self.dry_run:
|
|
||||||
count_start = time.monotonic()
|
|
||||||
batch_counts = self.workflow_run_repo.count_runs_with_related(
|
|
||||||
free_runs,
|
|
||||||
count_node_executions=self._count_node_executions,
|
|
||||||
count_trigger_logs=self._count_trigger_logs,
|
|
||||||
)
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"workflow_run_cleanup (batch #%s, dry_run): counted related records in %sms",
|
"workflow_run_cleanup (batch #%s): fetched %s rows in %sms",
|
||||||
batch_index,
|
batch_index,
|
||||||
int((time.monotonic() - count_start) * 1000),
|
len(run_rows),
|
||||||
|
int((time.monotonic() - fetch_start) * 1000),
|
||||||
)
|
)
|
||||||
if related_totals is not None:
|
|
||||||
for key in related_totals:
|
tenant_ids = {row.tenant_id for row in run_rows}
|
||||||
related_totals[key] += batch_counts.get(key, 0)
|
|
||||||
sample_ids = ", ".join(run.id for run in free_runs[:5])
|
filter_start = time.monotonic()
|
||||||
|
free_tenants = self._filter_free_tenants(tenant_ids)
|
||||||
|
logger.info(
|
||||||
|
"workflow_run_cleanup (batch #%s): filtered %s free tenants from %s tenants in %sms",
|
||||||
|
batch_index,
|
||||||
|
len(free_tenants),
|
||||||
|
len(tenant_ids),
|
||||||
|
int((time.monotonic() - filter_start) * 1000),
|
||||||
|
)
|
||||||
|
|
||||||
|
free_runs = [row for row in run_rows if row.tenant_id in free_tenants]
|
||||||
|
paid_or_skipped = len(run_rows) - len(free_runs)
|
||||||
|
|
||||||
|
if not free_runs:
|
||||||
|
skipped_message = (
|
||||||
|
f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)"
|
||||||
|
)
|
||||||
|
click.echo(
|
||||||
|
click.style(
|
||||||
|
skipped_message,
|
||||||
|
fg="yellow",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self._metrics.record_batch(
|
||||||
|
batch_rows=len(run_rows),
|
||||||
|
targeted_runs=0,
|
||||||
|
skipped_runs=paid_or_skipped,
|
||||||
|
deleted_runs=0,
|
||||||
|
related_counts=None,
|
||||||
|
related_action=None,
|
||||||
|
batch_duration_seconds=time.monotonic() - batch_start,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
total_runs_targeted += len(free_runs)
|
||||||
|
|
||||||
|
if self.dry_run:
|
||||||
|
count_start = time.monotonic()
|
||||||
|
batch_counts = self.workflow_run_repo.count_runs_with_related(
|
||||||
|
free_runs,
|
||||||
|
count_node_executions=self._count_node_executions,
|
||||||
|
count_trigger_logs=self._count_trigger_logs,
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"workflow_run_cleanup (batch #%s, dry_run): counted related records in %sms",
|
||||||
|
batch_index,
|
||||||
|
int((time.monotonic() - count_start) * 1000),
|
||||||
|
)
|
||||||
|
if related_totals is not None:
|
||||||
|
for key in related_totals:
|
||||||
|
related_totals[key] += batch_counts.get(key, 0)
|
||||||
|
sample_ids = ", ".join(run.id for run in free_runs[:5])
|
||||||
|
click.echo(
|
||||||
|
click.style(
|
||||||
|
f"[batch #{batch_index}] would delete {len(free_runs)} runs "
|
||||||
|
f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown",
|
||||||
|
fg="yellow",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"workflow_run_cleanup (batch #%s, dry_run): batch total %sms",
|
||||||
|
batch_index,
|
||||||
|
int((time.monotonic() - batch_start) * 1000),
|
||||||
|
)
|
||||||
|
self._metrics.record_batch(
|
||||||
|
batch_rows=len(run_rows),
|
||||||
|
targeted_runs=len(free_runs),
|
||||||
|
skipped_runs=paid_or_skipped,
|
||||||
|
deleted_runs=0,
|
||||||
|
related_counts={key: batch_counts.get(key, 0) for key in self._empty_related_counts()},
|
||||||
|
related_action="would_delete",
|
||||||
|
batch_duration_seconds=time.monotonic() - batch_start,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
delete_start = time.monotonic()
|
||||||
|
counts = self.workflow_run_repo.delete_runs_with_related(
|
||||||
|
free_runs,
|
||||||
|
delete_node_executions=self._delete_node_executions,
|
||||||
|
delete_trigger_logs=self._delete_trigger_logs,
|
||||||
|
)
|
||||||
|
delete_ms = int((time.monotonic() - delete_start) * 1000)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0])
|
||||||
|
raise
|
||||||
|
|
||||||
|
total_runs_deleted += counts["runs"]
|
||||||
click.echo(
|
click.echo(
|
||||||
click.style(
|
click.style(
|
||||||
f"[batch #{batch_index}] would delete {len(free_runs)} runs "
|
f"[batch #{batch_index}] deleted runs: {counts['runs']} "
|
||||||
f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown",
|
f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, "
|
||||||
fg="yellow",
|
f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, "
|
||||||
|
f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); "
|
||||||
|
f"skipped {paid_or_skipped} paid/unknown",
|
||||||
|
fg="green",
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
"workflow_run_cleanup (batch #%s, dry_run): batch total %sms",
|
"workflow_run_cleanup (batch #%s): delete %sms, batch total %sms",
|
||||||
batch_index,
|
batch_index,
|
||||||
|
delete_ms,
|
||||||
int((time.monotonic() - batch_start) * 1000),
|
int((time.monotonic() - batch_start) * 1000),
|
||||||
)
|
)
|
||||||
continue
|
self._metrics.record_batch(
|
||||||
|
batch_rows=len(run_rows),
|
||||||
try:
|
targeted_runs=len(free_runs),
|
||||||
delete_start = time.monotonic()
|
skipped_runs=paid_or_skipped,
|
||||||
counts = self.workflow_run_repo.delete_runs_with_related(
|
deleted_runs=counts["runs"],
|
||||||
free_runs,
|
related_counts={key: counts.get(key, 0) for key in self._empty_related_counts()},
|
||||||
delete_node_executions=self._delete_node_executions,
|
related_action="deleted",
|
||||||
delete_trigger_logs=self._delete_trigger_logs,
|
batch_duration_seconds=time.monotonic() - batch_start,
|
||||||
)
|
)
|
||||||
delete_ms = int((time.monotonic() - delete_start) * 1000)
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0])
|
|
||||||
raise
|
|
||||||
|
|
||||||
total_runs_deleted += counts["runs"]
|
# Random sleep between batches to avoid overwhelming the database
|
||||||
click.echo(
|
sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311
|
||||||
click.style(
|
logger.info("workflow_run_cleanup (batch #%s): sleeping for %.2fms", batch_index, sleep_ms)
|
||||||
f"[batch #{batch_index}] deleted runs: {counts['runs']} "
|
time.sleep(sleep_ms / 1000)
|
||||||
f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, "
|
|
||||||
f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, "
|
|
||||||
f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); "
|
|
||||||
f"skipped {paid_or_skipped} paid/unknown",
|
|
||||||
fg="green",
|
|
||||||
)
|
|
||||||
)
|
|
||||||
logger.info(
|
|
||||||
"workflow_run_cleanup (batch #%s): delete %sms, batch total %sms",
|
|
||||||
batch_index,
|
|
||||||
delete_ms,
|
|
||||||
int((time.monotonic() - batch_start) * 1000),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Random sleep between batches to avoid overwhelming the database
|
if self.dry_run:
|
||||||
sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311
|
if self.window_start:
|
||||||
logger.info("workflow_run_cleanup (batch #%s): sleeping for %.2fms", batch_index, sleep_ms)
|
summary_message = (
|
||||||
time.sleep(sleep_ms / 1000)
|
f"Dry run complete. Would delete {total_runs_targeted} workflow runs "
|
||||||
|
f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}"
|
||||||
if self.dry_run:
|
)
|
||||||
if self.window_start:
|
else:
|
||||||
summary_message = (
|
summary_message = (
|
||||||
f"Dry run complete. Would delete {total_runs_targeted} workflow runs "
|
f"Dry run complete. Would delete {total_runs_targeted} workflow runs "
|
||||||
f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}"
|
f"before {self.window_end.isoformat()}"
|
||||||
)
|
)
|
||||||
|
if related_totals is not None:
|
||||||
|
summary_message = (
|
||||||
|
f"{summary_message}; related records: {self._format_related_counts(related_totals)}"
|
||||||
|
)
|
||||||
|
summary_color = "yellow"
|
||||||
else:
|
else:
|
||||||
summary_message = (
|
if self.window_start:
|
||||||
f"Dry run complete. Would delete {total_runs_targeted} workflow runs "
|
summary_message = (
|
||||||
f"before {self.window_end.isoformat()}"
|
f"Cleanup complete. Deleted {total_runs_deleted} workflow runs "
|
||||||
)
|
f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}"
|
||||||
if related_totals is not None:
|
)
|
||||||
summary_message = f"{summary_message}; related records: {self._format_related_counts(related_totals)}"
|
else:
|
||||||
summary_color = "yellow"
|
summary_message = (
|
||||||
else:
|
f"Cleanup complete. Deleted {total_runs_deleted} workflow runs "
|
||||||
if self.window_start:
|
f"before {self.window_end.isoformat()}"
|
||||||
summary_message = (
|
)
|
||||||
f"Cleanup complete. Deleted {total_runs_deleted} workflow runs "
|
summary_color = "white"
|
||||||
f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
summary_message = (
|
|
||||||
f"Cleanup complete. Deleted {total_runs_deleted} workflow runs before {self.window_end.isoformat()}"
|
|
||||||
)
|
|
||||||
summary_color = "white"
|
|
||||||
|
|
||||||
click.echo(click.style(summary_message, fg=summary_color))
|
click.echo(click.style(summary_message, fg=summary_color))
|
||||||
|
except Exception:
|
||||||
|
status = "failed"
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
self._metrics.record_completion(
|
||||||
|
status=status,
|
||||||
|
job_duration_seconds=time.monotonic() - run_start,
|
||||||
|
)
|
||||||
|
|
||||||
def _filter_free_tenants(self, tenant_ids: Iterable[str]) -> set[str]:
|
def _filter_free_tenants(self, tenant_ids: Iterable[str]) -> set[str]:
|
||||||
tenant_id_list = list(tenant_ids)
|
tenant_id_list = list(tenant_ids)
|
||||||
|
|||||||
@ -38,6 +38,7 @@ def test_absolute_mode_calls_from_time_range():
|
|||||||
from_days_ago=None,
|
from_days_ago=None,
|
||||||
before_days=None,
|
before_days=None,
|
||||||
dry_run=True,
|
dry_run=True,
|
||||||
|
task_label="daily",
|
||||||
)
|
)
|
||||||
|
|
||||||
mock_from_time_range.assert_called_once_with(
|
mock_from_time_range.assert_called_once_with(
|
||||||
@ -46,6 +47,7 @@ def test_absolute_mode_calls_from_time_range():
|
|||||||
end_before=end_before,
|
end_before=end_before,
|
||||||
batch_size=200,
|
batch_size=200,
|
||||||
dry_run=True,
|
dry_run=True,
|
||||||
|
task_label="daily",
|
||||||
)
|
)
|
||||||
mock_from_days.assert_not_called()
|
mock_from_days.assert_not_called()
|
||||||
|
|
||||||
@ -67,6 +69,7 @@ def test_relative_mode_before_days_only_calls_from_days():
|
|||||||
from_days_ago=None,
|
from_days_ago=None,
|
||||||
before_days=30,
|
before_days=30,
|
||||||
dry_run=False,
|
dry_run=False,
|
||||||
|
task_label="daily",
|
||||||
)
|
)
|
||||||
|
|
||||||
mock_from_days.assert_called_once_with(
|
mock_from_days.assert_called_once_with(
|
||||||
@ -74,6 +77,7 @@ def test_relative_mode_before_days_only_calls_from_days():
|
|||||||
days=30,
|
days=30,
|
||||||
batch_size=500,
|
batch_size=500,
|
||||||
dry_run=False,
|
dry_run=False,
|
||||||
|
task_label="daily",
|
||||||
)
|
)
|
||||||
mock_from_time_range.assert_not_called()
|
mock_from_time_range.assert_not_called()
|
||||||
|
|
||||||
@ -97,6 +101,7 @@ def test_relative_mode_with_from_days_ago_calls_from_time_range():
|
|||||||
from_days_ago=60,
|
from_days_ago=60,
|
||||||
before_days=30,
|
before_days=30,
|
||||||
dry_run=False,
|
dry_run=False,
|
||||||
|
task_label="daily",
|
||||||
)
|
)
|
||||||
|
|
||||||
mock_from_time_range.assert_called_once_with(
|
mock_from_time_range.assert_called_once_with(
|
||||||
@ -105,6 +110,7 @@ def test_relative_mode_with_from_days_ago_calls_from_time_range():
|
|||||||
end_before=fixed_now - datetime.timedelta(days=30),
|
end_before=fixed_now - datetime.timedelta(days=30),
|
||||||
batch_size=1000,
|
batch_size=1000,
|
||||||
dry_run=False,
|
dry_run=False,
|
||||||
|
task_label="daily",
|
||||||
)
|
)
|
||||||
mock_from_days.assert_not_called()
|
mock_from_days.assert_not_called()
|
||||||
|
|
||||||
@ -178,4 +184,5 @@ def test_invalid_inputs_raise_usage_error(kwargs: dict, message: str):
|
|||||||
from_days_ago=kwargs["from_days_ago"],
|
from_days_ago=kwargs["from_days_ago"],
|
||||||
before_days=kwargs["before_days"],
|
before_days=kwargs["before_days"],
|
||||||
dry_run=False,
|
dry_run=False,
|
||||||
|
task_label="daily",
|
||||||
)
|
)
|
||||||
|
|||||||
@ -265,6 +265,61 @@ def test_run_exits_on_empty_batch(monkeypatch: pytest.MonkeyPatch) -> None:
|
|||||||
cleanup.run()
|
cleanup.run()
|
||||||
|
|
||||||
|
|
||||||
|
def test_run_records_metrics_on_success(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
cutoff = datetime.datetime.now()
|
||||||
|
repo = FakeRepo(
|
||||||
|
batches=[[FakeRun("run-free", "t_free", cutoff)]],
|
||||||
|
delete_result={
|
||||||
|
"runs": 0,
|
||||||
|
"node_executions": 2,
|
||||||
|
"offloads": 1,
|
||||||
|
"app_logs": 3,
|
||||||
|
"trigger_logs": 4,
|
||||||
|
"pauses": 5,
|
||||||
|
"pause_reasons": 6,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10)
|
||||||
|
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False)
|
||||||
|
|
||||||
|
batch_calls: list[dict[str, object]] = []
|
||||||
|
completion_calls: list[dict[str, object]] = []
|
||||||
|
monkeypatch.setattr(cleanup._metrics, "record_batch", lambda **kwargs: batch_calls.append(kwargs))
|
||||||
|
monkeypatch.setattr(cleanup._metrics, "record_completion", lambda **kwargs: completion_calls.append(kwargs))
|
||||||
|
|
||||||
|
cleanup.run()
|
||||||
|
|
||||||
|
assert len(batch_calls) == 1
|
||||||
|
assert batch_calls[0]["batch_rows"] == 1
|
||||||
|
assert batch_calls[0]["targeted_runs"] == 1
|
||||||
|
assert batch_calls[0]["deleted_runs"] == 1
|
||||||
|
assert batch_calls[0]["related_action"] == "deleted"
|
||||||
|
assert len(completion_calls) == 1
|
||||||
|
assert completion_calls[0]["status"] == "success"
|
||||||
|
|
||||||
|
|
||||||
|
def test_run_records_failed_metrics(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||||
|
class FailingRepo(FakeRepo):
|
||||||
|
def delete_runs_with_related(
|
||||||
|
self, runs: list[FakeRun], delete_node_executions=None, delete_trigger_logs=None
|
||||||
|
) -> dict[str, int]:
|
||||||
|
raise RuntimeError("delete failed")
|
||||||
|
|
||||||
|
cutoff = datetime.datetime.now()
|
||||||
|
repo = FailingRepo(batches=[[FakeRun("run-free", "t_free", cutoff)]])
|
||||||
|
cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10)
|
||||||
|
monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False)
|
||||||
|
|
||||||
|
completion_calls: list[dict[str, object]] = []
|
||||||
|
monkeypatch.setattr(cleanup._metrics, "record_completion", lambda **kwargs: completion_calls.append(kwargs))
|
||||||
|
|
||||||
|
with pytest.raises(RuntimeError, match="delete failed"):
|
||||||
|
cleanup.run()
|
||||||
|
|
||||||
|
assert len(completion_calls) == 1
|
||||||
|
assert completion_calls[0]["status"] == "failed"
|
||||||
|
|
||||||
|
|
||||||
def test_run_dry_run_skips_deletions(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
|
def test_run_dry_run_skips_deletions(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
|
||||||
cutoff = datetime.datetime.now()
|
cutoff = datetime.datetime.now()
|
||||||
repo = FakeRepo(
|
repo = FakeRepo(
|
||||||
|
|||||||
@ -619,3 +619,53 @@ class TestMessagesCleanServiceFromDays:
|
|||||||
assert service._end_before == expected_end_before
|
assert service._end_before == expected_end_before
|
||||||
assert service._batch_size == 1000 # default
|
assert service._batch_size == 1000 # default
|
||||||
assert service._dry_run is False # default
|
assert service._dry_run is False # default
|
||||||
|
|
||||||
|
|
||||||
|
class TestMessagesCleanServiceRun:
|
||||||
|
"""Unit tests for MessagesCleanService.run instrumentation behavior."""
|
||||||
|
|
||||||
|
def test_run_records_completion_metrics_on_success(self):
|
||||||
|
# Arrange
|
||||||
|
service = MessagesCleanService(
|
||||||
|
policy=BillingDisabledPolicy(),
|
||||||
|
start_from=datetime.datetime(2024, 1, 1),
|
||||||
|
end_before=datetime.datetime(2024, 1, 2),
|
||||||
|
batch_size=100,
|
||||||
|
dry_run=False,
|
||||||
|
)
|
||||||
|
expected_stats = {
|
||||||
|
"batches": 1,
|
||||||
|
"total_messages": 10,
|
||||||
|
"filtered_messages": 5,
|
||||||
|
"total_deleted": 5,
|
||||||
|
}
|
||||||
|
service._clean_messages_by_time_range = MagicMock(return_value=expected_stats) # type: ignore[method-assign]
|
||||||
|
completion_calls: list[dict[str, object]] = []
|
||||||
|
service._metrics.record_completion = lambda **kwargs: completion_calls.append(kwargs) # type: ignore[method-assign]
|
||||||
|
|
||||||
|
# Act
|
||||||
|
result = service.run()
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert result == expected_stats
|
||||||
|
assert len(completion_calls) == 1
|
||||||
|
assert completion_calls[0]["status"] == "success"
|
||||||
|
|
||||||
|
def test_run_records_completion_metrics_on_failure(self):
|
||||||
|
# Arrange
|
||||||
|
service = MessagesCleanService(
|
||||||
|
policy=BillingDisabledPolicy(),
|
||||||
|
start_from=datetime.datetime(2024, 1, 1),
|
||||||
|
end_before=datetime.datetime(2024, 1, 2),
|
||||||
|
batch_size=100,
|
||||||
|
dry_run=False,
|
||||||
|
)
|
||||||
|
service._clean_messages_by_time_range = MagicMock(side_effect=RuntimeError("clean failed")) # type: ignore[method-assign]
|
||||||
|
completion_calls: list[dict[str, object]] = []
|
||||||
|
service._metrics.record_completion = lambda **kwargs: completion_calls.append(kwargs) # type: ignore[method-assign]
|
||||||
|
|
||||||
|
# Act & Assert
|
||||||
|
with pytest.raises(RuntimeError, match="clean failed"):
|
||||||
|
service.run()
|
||||||
|
assert len(completion_calls) == 1
|
||||||
|
assert completion_calls[0]["status"] == "failed"
|
||||||
|
|||||||
Reference in New Issue
Block a user