mirror of
https://github.com/langgenius/dify.git
synced 2026-05-02 16:38:04 +08:00
chore: add performance logs
This commit is contained in:
@ -16,6 +16,7 @@ class SavedMessage(TypeBase):
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="saved_message_pkey"),
|
||||
sa.Index("saved_message_message_idx", "app_id", "message_id", "created_by_role", "created_by"),
|
||||
sa.Index("saved_message_message_id_idx", "message_id"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import datetime
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
from collections.abc import Sequence
|
||||
from typing import cast
|
||||
|
||||
@ -195,9 +196,11 @@ class MessagesCleanService:
|
||||
|
||||
while True:
|
||||
stats["batches"] += 1
|
||||
batch_start = time.monotonic()
|
||||
|
||||
# Step 1: Fetch a batch of messages using cursor
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
fetch_messages_start = time.monotonic()
|
||||
msg_stmt = (
|
||||
select(Message.id, Message.app_id, Message.created_at)
|
||||
.where(Message.created_at < self._end_before)
|
||||
@ -223,6 +226,12 @@ class MessagesCleanService:
|
||||
SimpleMessage(id=msg_id, app_id=app_id, created_at=msg_created_at)
|
||||
for msg_id, app_id, msg_created_at in raw_messages
|
||||
]
|
||||
logger.info(
|
||||
"clean_messages (batch %s): fetched %s messages in %sms",
|
||||
stats["batches"],
|
||||
len(messages),
|
||||
int((time.monotonic() - fetch_messages_start) * 1000),
|
||||
)
|
||||
|
||||
# Track total messages fetched across all batches
|
||||
stats["total_messages"] += len(messages)
|
||||
@ -241,8 +250,16 @@ class MessagesCleanService:
|
||||
logger.info("clean_messages (batch %s): no app_ids found, skip", stats["batches"])
|
||||
continue
|
||||
|
||||
fetch_apps_start = time.monotonic()
|
||||
app_stmt = select(App.id, App.tenant_id).where(App.id.in_(app_ids))
|
||||
apps = list(session.execute(app_stmt).all())
|
||||
logger.info(
|
||||
"clean_messages (batch %s): fetched %s apps for %s app_ids in %sms",
|
||||
stats["batches"],
|
||||
len(apps),
|
||||
len(app_ids),
|
||||
int((time.monotonic() - fetch_apps_start) * 1000),
|
||||
)
|
||||
|
||||
if not apps:
|
||||
logger.info("clean_messages (batch %s): no apps found, skip", stats["batches"])
|
||||
@ -252,7 +269,15 @@ class MessagesCleanService:
|
||||
app_to_tenant: dict[str, str] = {app.id: app.tenant_id for app in apps}
|
||||
|
||||
# Step 3: Delegate to policy to determine which messages to delete
|
||||
policy_start = time.monotonic()
|
||||
message_ids_to_delete = self._policy.filter_message_ids(messages, app_to_tenant)
|
||||
logger.info(
|
||||
"clean_messages (batch %s): policy selected %s/%s messages in %sms",
|
||||
stats["batches"],
|
||||
len(message_ids_to_delete),
|
||||
len(messages),
|
||||
int((time.monotonic() - policy_start) * 1000),
|
||||
)
|
||||
|
||||
if not message_ids_to_delete:
|
||||
logger.info("clean_messages (batch %s): no messages to delete, skip", stats["batches"])
|
||||
@ -263,14 +288,20 @@ class MessagesCleanService:
|
||||
# Step 4: Batch delete messages and their relations
|
||||
if not self._dry_run:
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
delete_relations_start = time.monotonic()
|
||||
# Delete related records first
|
||||
self._batch_delete_message_relations(session, message_ids_to_delete)
|
||||
delete_relations_ms = int((time.monotonic() - delete_relations_start) * 1000)
|
||||
|
||||
# Delete messages
|
||||
delete_messages_start = time.monotonic()
|
||||
delete_stmt = delete(Message).where(Message.id.in_(message_ids_to_delete))
|
||||
delete_result = cast(CursorResult, session.execute(delete_stmt))
|
||||
messages_deleted = delete_result.rowcount
|
||||
delete_messages_ms = int((time.monotonic() - delete_messages_start) * 1000)
|
||||
commit_start = time.monotonic()
|
||||
session.commit()
|
||||
commit_ms = int((time.monotonic() - commit_start) * 1000)
|
||||
|
||||
stats["total_deleted"] += messages_deleted
|
||||
|
||||
@ -280,6 +311,14 @@ class MessagesCleanService:
|
||||
len(messages),
|
||||
messages_deleted,
|
||||
)
|
||||
logger.info(
|
||||
"clean_messages (batch %s): relations %sms, messages %sms, commit %sms, batch total %sms",
|
||||
stats["batches"],
|
||||
delete_relations_ms,
|
||||
delete_messages_ms,
|
||||
commit_ms,
|
||||
int((time.monotonic() - batch_start) * 1000),
|
||||
)
|
||||
else:
|
||||
# Log random sample of message IDs that would be deleted (up to 10)
|
||||
sample_size = min(10, len(message_ids_to_delete))
|
||||
|
||||
Reference in New Issue
Block a user