From c0fc8b32f2edadca8dc490dcbc37e2c71dbc33d1 Mon Sep 17 00:00:00 2001 From: sapienza yoan <102799524+Zzappy24@users.noreply.github.com> Date: Wed, 6 May 2026 07:32:20 +0200 Subject: [PATCH] Fix: retry RocksDB metadata contention on concurrent CREATE/DROP (#14563) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Concurrent CREATE TABLE / CREATE INDEX / DROP TABLE on the same Infinity instance can race on the catalog counter (e.g. db|1|next_table_id) and fail with error 9003 "Resource busy" instead of waiting on a lock. Two users creating a knowledge base at the same instant, or any deployment with multiple backend workers behind one Infinity, can hit it. Wrap the metadata paths in create_idx, create_doc_meta_idx, and delete_idx with exponential backoff + jitter (5 attempts, 50ms base). The wrapped operations already use ConflictType.Ignore, so retrying is idempotent — worst case the second attempt is a no-op against an already-created table. Tunable via INFINITY_META_RETRY_MAX / INFINITY_META_RETRY_BASE_DELAY_MS. Repro: stress 30 concurrent POST /api/v1/datasets against a 4-worker backend → ~50% of requests fail without the patch (Resource busy from the second worker that hits the counter), 100% succeed with it. At 100 concurrent requests, all 100 succeed in ~1.2s; the retry budget never exhausted in our tests. Scope is limited to metadata paths only — data-path operations (INSERT chunks, SELECT for retrieval) go through per-table code paths and don't share the contended counter. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --------- Co-authored-by: yoan sapienza --- common/doc_store/infinity_conn_base.py | 226 ++++++++++++++++++++----- 1 file changed, 188 insertions(+), 38 deletions(-) diff --git a/common/doc_store/infinity_conn_base.py b/common/doc_store/infinity_conn_base.py index 72d3b9367..af8493b82 100644 --- a/common/doc_store/infinity_conn_base.py +++ b/common/doc_store/infinity_conn_base.py @@ -16,10 +16,12 @@ import logging import os +import random import re import json import time from abc import abstractmethod +from typing import Callable, TypeVar import infinity from infinity.common import ConflictType @@ -32,6 +34,117 @@ from common import settings from common.doc_store.doc_store_base import DocStoreConnection, MatchExpr, OrderByExpr +# Concurrent CREATE/DROP TABLE on the same Infinity instance can race on +# Infinity's RocksDB-backed catalog counters (e.g. ``db|1|next_table_id``). +# When two writers touch the counter at the same instant, Infinity surfaces +# error 9003 / "Resource busy" instead of waiting on a lock — turning a +# user-visible operation into an avoidable failure under modest concurrency +# (two users creating a knowledge base at the same time, batch onboarding, +# multi-replica deployments, …). +# +# We retry the metadata path (CREATE TABLE / CREATE INDEX / DROP TABLE) on +# this specific error with exponential backoff + jitter. The wrapped calls +# already use ``ConflictType.Ignore``, so re-running them on retry is +# idempotent. The retry budget is intentionally bounded (5 attempts, +# ~1.5s worst case) so a genuine outage still surfaces quickly. +# +# Tunable from the environment: +# INFINITY_META_RETRY_MAX default 5 +# INFINITY_META_RETRY_BASE_DELAY_MS default 50 + +_T = TypeVar("_T") + +# Infinity error code 9003 is raised on RocksDB transaction contention. It is +# not in the SDK's ErrorCode enum yet, so we keep the literal here. +_INFINITY_RESOURCE_BUSY_CODE = 9003 + + +def _int_env(name: str, default: int) -> int: + """Read an int from the environment without crashing on bad input. + + A misconfigured ``INFINITY_META_RETRY_MAX=`` (empty value) or non-numeric + string would otherwise raise ``ValueError`` at module import time and + take down every backend worker. We log and fall back to the default + instead. + """ + raw = os.getenv(name) + if raw is None or raw == "": + return default + try: + return int(raw) + except ValueError: + logging.getLogger(__name__).warning( + "Ignoring invalid %s=%r, falling back to %d", name, raw, default, + ) + return default + + +_META_RETRY_MAX = _int_env("INFINITY_META_RETRY_MAX", 5) +_META_RETRY_BASE_DELAY_MS = _int_env("INFINITY_META_RETRY_BASE_DELAY_MS", 50) + + +def _is_meta_contention_error(exc: BaseException) -> bool: + """Return True iff ``exc`` is the RocksDB metadata-counter "Resource busy". + + Prefer the numeric error code when the SDK exposes one — substring matching + on ``str(exc)`` is the fallback for older SDKs that surface only a tuple + or a plain string. Both surfaces are observed in the wild today. + """ + code = getattr(exc, "error_code", None) + if code is None: + # Some Infinity SDK paths raise a plain ``Exception((9003, "..."))`` + # whose ``args[0]`` carries the code. + args = getattr(exc, "args", None) + if args and isinstance(args, tuple) and args: + code = args[0] + if code == _INFINITY_RESOURCE_BUSY_CODE: + return True + msg = str(exc) + return "Resource busy" in msg and "rocksdb" in msg.lower() + + +def _retry_on_meta_contention( + op_name: str, + operation: Callable[[], _T], + *, + logger: logging.Logger | None = None, + max_attempts: int = _META_RETRY_MAX, + base_delay_ms: int = _META_RETRY_BASE_DELAY_MS, +) -> _T: + """Run ``operation`` and retry on RocksDB "Resource busy" errors. + + Exponential backoff with ±50% jitter to avoid a thundering herd when many + workers retry simultaneously. Any exception that does not match + :func:`_is_meta_contention_error` is re-raised immediately so genuine + failures still surface fast. + """ + log = logger or logging.getLogger(__name__) + last_exc: BaseException | None = None + for attempt in range(max_attempts): + try: + return operation() + except Exception as exc: + if not _is_meta_contention_error(exc): + raise + last_exc = exc + if attempt == max_attempts - 1: + break + base = (base_delay_ms / 1000.0) * (2 ** attempt) + sleep_for = base + random.uniform(0, base * 0.5) + log.info( + "INFINITY meta contention on %s (attempt %d/%d), " + "retrying in %.3fs: %s", + op_name, attempt + 1, max_attempts, sleep_for, exc, + ) + time.sleep(sleep_for) + log.warning( + "INFINITY meta contention on %s exhausted %d attempts: %s", + op_name, max_attempts, last_exc, + ) + assert last_exc is not None + raise last_exc + + class InfinityConnectionBase(DocStoreConnection): def __init__(self, mapping_file_name: str = "infinity_mapping.json", logger_name: str = "ragflow.infinity_conn", table_name_prefix: str="ragflow_"): from common.doc_store.infinity_conn_pool import INFINITY_CONN @@ -274,7 +387,11 @@ class InfinityConnectionBase(DocStoreConnection): inf_conn = self.connPool.get_conn() try: - inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore) + inf_db = _retry_on_meta_contention( + f"create_database({self.dbName})", + lambda: inf_conn.create_database(self.dbName, ConflictType.Ignore), + logger=self.logger, + ) # Use configured schema fp_mapping = os.path.join(get_project_base_directory(), "conf", self.mapping_file_name) @@ -293,24 +410,32 @@ class InfinityConnectionBase(DocStoreConnection): vector_name = f"q_{vector_size}_vec" schema[vector_name] = {"type": f"vector,{vector_size},float"} - inf_table = inf_db.create_table( - table_name, - schema, - ConflictType.Ignore, - ) - inf_table.create_index( - "q_vec_idx", - IndexInfo( - vector_name, - IndexType.Hnsw, - { - "M": "16", - "ef_construction": "50", - "metric": "cosine", - "encode": "lvq", - }, + inf_table = _retry_on_meta_contention( + f"create_table({table_name})", + lambda: inf_db.create_table( + table_name, + schema, + ConflictType.Ignore, ), - ConflictType.Ignore, + logger=self.logger, + ) + _retry_on_meta_contention( + f"create_index(q_vec_idx, {table_name})", + lambda: inf_table.create_index( + "q_vec_idx", + IndexInfo( + vector_name, + IndexType.Hnsw, + { + "M": "16", + "ef_construction": "50", + "metric": "cosine", + "encode": "lvq", + }, + ), + ConflictType.Ignore, + ), + logger=self.logger, ) for field_name, field_info in schema.items(): if field_info["type"] != "varchar" or "analyzer" not in field_info: @@ -319,10 +444,15 @@ class InfinityConnectionBase(DocStoreConnection): if isinstance(analyzers, str): analyzers = [analyzers] for analyzer in analyzers: - inf_table.create_index( - f"ft_{re.sub(r'[^a-zA-Z0-9]', '_', field_name)}_{re.sub(r'[^a-zA-Z0-9]', '_', analyzer)}", - IndexInfo(field_name, IndexType.FullText, {"ANALYZER": analyzer}), - ConflictType.Ignore, + idx_name = f"ft_{re.sub(r'[^a-zA-Z0-9]', '_', field_name)}_{re.sub(r'[^a-zA-Z0-9]', '_', analyzer)}" + _retry_on_meta_contention( + f"create_index({idx_name}, {table_name})", + lambda fn=field_name, an=analyzer, name=idx_name: inf_table.create_index( + name, + IndexInfo(fn, IndexType.FullText, {"ANALYZER": an}), + ConflictType.Ignore, + ), + logger=self.logger, ) # Create secondary indexes for fields with index_type @@ -331,10 +461,14 @@ class InfinityConnectionBase(DocStoreConnection): continue index_config = field_info["index_type"] if isinstance(index_config, str) and index_config == "secondary": - inf_table.create_index( - f"sec_{field_name}", - IndexInfo(field_name, IndexType.Secondary), - ConflictType.Ignore, + _retry_on_meta_contention( + f"create_index(sec_{field_name}, {table_name})", + lambda fn=field_name: inf_table.create_index( + f"sec_{fn}", + IndexInfo(fn, IndexType.Secondary), + ConflictType.Ignore, + ), + logger=self.logger, ) self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name}") elif isinstance(index_config, dict): @@ -342,10 +476,14 @@ class InfinityConnectionBase(DocStoreConnection): params = {} if "cardinality" in index_config: params = {"cardinality": index_config["cardinality"]} - inf_table.create_index( - f"sec_{field_name}", - IndexInfo(field_name, IndexType.Secondary, params), - ConflictType.Ignore, + _retry_on_meta_contention( + f"create_index(sec_{field_name}, {table_name})", + lambda fn=field_name, p=params: inf_table.create_index( + f"sec_{fn}", + IndexInfo(fn, IndexType.Secondary, p), + ConflictType.Ignore, + ), + logger=self.logger, ) self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name} with params {params}") @@ -363,18 +501,26 @@ class InfinityConnectionBase(DocStoreConnection): """ table_name = index_name inf_conn = self.connPool.get_conn() - inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore) try: + inf_db = _retry_on_meta_contention( + f"create_database({self.dbName})", + lambda: inf_conn.create_database(self.dbName, ConflictType.Ignore), + logger=self.logger, + ) fp_mapping = os.path.join(get_project_base_directory(), "conf", "doc_meta_infinity_mapping.json") if not os.path.exists(fp_mapping): self.logger.error(f"Document metadata mapping file not found at {fp_mapping}") return False with open(fp_mapping) as f: schema = json.load(f) - inf_db.create_table( - table_name, - schema, - ConflictType.Ignore, + _retry_on_meta_contention( + f"create_table({table_name})", + lambda: inf_db.create_table( + table_name, + schema, + ConflictType.Ignore, + ), + logger=self.logger, ) # Create secondary indexes on id and kb_id for better query performance @@ -400,14 +546,14 @@ class InfinityConnectionBase(DocStoreConnection): except Exception as e: self.logger.warning(f"Failed to create index on kb_id for {table_name}: {e}") - self.connPool.release_conn(inf_conn) self.logger.debug(f"INFINITY created document metadata table {table_name} with secondary indexes") return True except Exception as e: - self.connPool.release_conn(inf_conn) self.logger.exception(f"Error creating document metadata table {table_name}: {e}") return False + finally: + self.connPool.release_conn(inf_conn) def delete_idx(self, index_name: str, dataset_id: str): if index_name.startswith("ragflow_doc_meta_"): @@ -417,7 +563,11 @@ class InfinityConnectionBase(DocStoreConnection): inf_conn = self.connPool.get_conn() try: db_instance = inf_conn.get_database(self.dbName) - db_instance.drop_table(table_name, ConflictType.Ignore) + _retry_on_meta_contention( + f"drop_table({table_name})", + lambda: db_instance.drop_table(table_name, ConflictType.Ignore), + logger=self.logger, + ) self.logger.info(f"INFINITY dropped table {table_name}") finally: self.connPool.release_conn(inf_conn)