From 375a910bcf3db56f4e246f5ade196ef44caac5a8 Mon Sep 17 00:00:00 2001 From: Yongteng Lei Date: Thu, 12 Mar 2026 12:39:01 +0800 Subject: [PATCH] Fix: add deadlock retry (#13552) ### What problem does this PR solve? Add deadlock retry. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- api/db/services/common_service.py | 39 ++++++++++++++++++++++++++++- api/db/services/document_service.py | 15 +++++++++-- 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/api/db/services/common_service.py b/api/db/services/common_service.py index df95debb5..8ef4bb94b 100644 --- a/api/db/services/common_service.py +++ b/api/db/services/common_service.py @@ -13,7 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import logging +import time from datetime import datetime +from functools import wraps + from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type import peewee from peewee import InterfaceError, OperationalError @@ -22,6 +26,38 @@ from api.db.db_models import DB from common.misc_utils import get_uuid from common.time_utils import current_timestamp, datetime_format + +def _is_deadlock_error(exc: OperationalError) -> bool: + return isinstance(exc, OperationalError) and bool(getattr(exc, "args", ())) and exc.args[0] == 1213 + + +def retry_deadlock_operation(max_retries=3, retry_delay=0.1): + """Retry a full DB operation when MySQL/OceanBase aborts it due to deadlock.""" + + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + for attempt in range(max_retries): + try: + return func(*args, **kwargs) + except OperationalError as e: + if not _is_deadlock_error(e) or attempt >= max_retries - 1: + raise + current_delay = retry_delay * (2**attempt) + logging.warning( + "%s failed due to DB deadlock, retrying (%s/%s): %s", + func.__qualname__, + attempt + 1, + max_retries, + e, + ) + time.sleep(current_delay) + + return wrapper + + return decorator + + def retry_db_operation(func): @retry( stop=stop_after_attempt(3), @@ -34,6 +70,7 @@ def retry_db_operation(func): return func(*args, **kwargs) return wrapper + class CommonService: """Base service class that provides common database operations. @@ -279,7 +316,7 @@ class CommonService: # Returns: # Number of records deleted return cls.model.delete().where(cls.model.id == pid).execute() - + @classmethod @DB.connection_context() def delete_by_ids(cls, pids): diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index 83893df89..8671aa989 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -30,7 +30,7 @@ from api.constants import IMG_BASE64_PREFIX, FILE_NAME_LEN_LIMIT from api.db import PIPELINE_SPECIAL_PROGRESS_FREEZE_TASK_TYPES, FileType, UserTenantRole, CanvasCategory from api.db.db_models import DB, Document, Knowledgebase, Task, Tenant, UserTenant, File2Document, File, UserCanvas, User from api.db.db_utils import bulk_insert_into_db -from api.db.services.common_service import CommonService +from api.db.services.common_service import CommonService, retry_deadlock_operation from api.db.services.knowledgebase_service import KnowledgebaseService from api.db.services.doc_metadata_service import DocMetadataService from common.misc_utils import get_uuid @@ -560,6 +560,7 @@ class DocumentService(CommonService): return num @classmethod + @retry_deadlock_operation() @DB.connection_context() def delete_document_and_update_kb_counts(cls, doc_id) -> bool: """Atomically delete the document row and update KB counters. @@ -568,7 +569,17 @@ class DocumentService(CommonService): already deleted by a concurrent request (idempotent). """ with DB.atomic(): - doc = cls.model.get_or_none(cls.model.id == doc_id) + doc = ( + cls.model.select( + cls.model.id, + cls.model.kb_id, + cls.model.token_num, + cls.model.chunk_num, + ) + .where(cls.model.id == doc_id) + .for_update() + .get_or_none() + ) if doc is None: return False deleted = cls.model.delete().where(cls.model.id == doc_id).execute()