feat: archive workflow runs backend

This commit is contained in:
hjlarry
2026-01-20 20:45:45 +08:00
parent b63dfbf654
commit b4d29ad6ee
28 changed files with 2659 additions and 49 deletions

View File

@ -9,7 +9,9 @@ from sqlalchemy import delete
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import sessionmaker
from configs import dify_config
from extensions.ext_database import db
from libs.archive_storage import ArchiveStorageNotConfiguredError, get_archive_storage
from models import (
ApiToken,
AppAnnotationHitHistory,
@ -40,6 +42,7 @@ from models.workflow import (
ConversationVariable,
Workflow,
WorkflowAppLog,
WorkflowArchiveLog,
)
from repositories.factory import DifyAPIRepositoryFactory
@ -64,6 +67,9 @@ def remove_app_and_related_data_task(self, tenant_id: str, app_id: str):
_delete_app_workflow_runs(tenant_id, app_id)
_delete_app_workflow_node_executions(tenant_id, app_id)
_delete_app_workflow_app_logs(tenant_id, app_id)
if dify_config.BILLING_ENABLED and dify_config.ARCHIVE_STORAGE_ENABLED:
_delete_app_workflow_archive_logs(tenant_id, app_id)
_delete_archived_workflow_run_files(tenant_id, app_id)
_delete_app_conversations(tenant_id, app_id)
_delete_app_messages(tenant_id, app_id)
_delete_workflow_tool_providers(tenant_id, app_id)
@ -254,6 +260,45 @@ def _delete_app_workflow_app_logs(tenant_id: str, app_id: str):
)
def _delete_app_workflow_archive_logs(tenant_id: str, app_id: str):
def del_workflow_archive_log(workflow_archive_log_id: str):
db.session.query(WorkflowArchiveLog).where(WorkflowArchiveLog.id == workflow_archive_log_id).delete(
synchronize_session=False
)
_delete_records(
"""select id from workflow_archive_logs where tenant_id=:tenant_id and app_id=:app_id limit 1000""",
{"tenant_id": tenant_id, "app_id": app_id},
del_workflow_archive_log,
"workflow archive log",
)
def _delete_archived_workflow_run_files(tenant_id: str, app_id: str):
prefix = f"{tenant_id}/app_id={app_id}/"
try:
archive_storage = get_archive_storage()
except ArchiveStorageNotConfiguredError as e:
logger.info("Archive storage not configured, skipping archive file cleanup: %s", e)
return
try:
keys = archive_storage.list_objects(prefix)
except Exception:
logger.exception("Failed to list archive files for app %s", app_id)
return
deleted = 0
for key in keys:
try:
archive_storage.delete_object(key)
deleted += 1
except Exception:
logger.exception("Failed to delete archive object %s", key)
logger.info("Deleted %s archive objects for app %s", deleted, app_id)
def _delete_app_conversations(tenant_id: str, app_id: str):
def del_conversation(conversation_id: str):
db.session.query(PinnedConversation).where(PinnedConversation.conversation_id == conversation_id).delete(