mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-05-21 08:37:05 +08:00
Fix: retry RocksDB metadata contention on concurrent CREATE/DROP (#14563)
Concurrent CREATE TABLE / CREATE INDEX / DROP TABLE on the same Infinity instance can race on the catalog counter (e.g. db|1|next_table_id) and fail with error 9003 "Resource busy" instead of waiting on a lock. Two users creating a knowledge base at the same instant, or any deployment with multiple backend workers behind one Infinity, can hit it. Wrap the metadata paths in create_idx, create_doc_meta_idx, and delete_idx with exponential backoff + jitter (5 attempts, 50ms base). The wrapped operations already use ConflictType.Ignore, so retrying is idempotent — worst case the second attempt is a no-op against an already-created table. Tunable via INFINITY_META_RETRY_MAX / INFINITY_META_RETRY_BASE_DELAY_MS. Repro: stress 30 concurrent POST /api/v1/datasets against a 4-worker backend → ~50% of requests fail without the patch (Resource busy from the second worker that hits the counter), 100% succeed with it. At 100 concurrent requests, all 100 succeed in ~1.2s; the retry budget never exhausted in our tests. Scope is limited to metadata paths only — data-path operations (INSERT chunks, SELECT for retrieval) go through per-table code paths and don't share the contended counter. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --------- Co-authored-by: yoan sapienza <Yoan Sapienza yoan.sapienza@orange.fr Yoan Sapienza zappy@macbookpro.home>
This commit is contained in:
@ -16,10 +16,12 @@
|
||||
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import json
|
||||
import time
|
||||
from abc import abstractmethod
|
||||
from typing import Callable, TypeVar
|
||||
|
||||
import infinity
|
||||
from infinity.common import ConflictType
|
||||
@ -32,6 +34,117 @@ from common import settings
|
||||
from common.doc_store.doc_store_base import DocStoreConnection, MatchExpr, OrderByExpr
|
||||
|
||||
|
||||
# Concurrent CREATE/DROP TABLE on the same Infinity instance can race on
|
||||
# Infinity's RocksDB-backed catalog counters (e.g. ``db|1|next_table_id``).
|
||||
# When two writers touch the counter at the same instant, Infinity surfaces
|
||||
# error 9003 / "Resource busy" instead of waiting on a lock — turning a
|
||||
# user-visible operation into an avoidable failure under modest concurrency
|
||||
# (two users creating a knowledge base at the same time, batch onboarding,
|
||||
# multi-replica deployments, …).
|
||||
#
|
||||
# We retry the metadata path (CREATE TABLE / CREATE INDEX / DROP TABLE) on
|
||||
# this specific error with exponential backoff + jitter. The wrapped calls
|
||||
# already use ``ConflictType.Ignore``, so re-running them on retry is
|
||||
# idempotent. The retry budget is intentionally bounded (5 attempts,
|
||||
# ~1.5s worst case) so a genuine outage still surfaces quickly.
|
||||
#
|
||||
# Tunable from the environment:
|
||||
# INFINITY_META_RETRY_MAX default 5
|
||||
# INFINITY_META_RETRY_BASE_DELAY_MS default 50
|
||||
|
||||
_T = TypeVar("_T")
|
||||
|
||||
# Infinity error code 9003 is raised on RocksDB transaction contention. It is
|
||||
# not in the SDK's ErrorCode enum yet, so we keep the literal here.
|
||||
_INFINITY_RESOURCE_BUSY_CODE = 9003
|
||||
|
||||
|
||||
def _int_env(name: str, default: int) -> int:
|
||||
"""Read an int from the environment without crashing on bad input.
|
||||
|
||||
A misconfigured ``INFINITY_META_RETRY_MAX=`` (empty value) or non-numeric
|
||||
string would otherwise raise ``ValueError`` at module import time and
|
||||
take down every backend worker. We log and fall back to the default
|
||||
instead.
|
||||
"""
|
||||
raw = os.getenv(name)
|
||||
if raw is None or raw == "":
|
||||
return default
|
||||
try:
|
||||
return int(raw)
|
||||
except ValueError:
|
||||
logging.getLogger(__name__).warning(
|
||||
"Ignoring invalid %s=%r, falling back to %d", name, raw, default,
|
||||
)
|
||||
return default
|
||||
|
||||
|
||||
_META_RETRY_MAX = _int_env("INFINITY_META_RETRY_MAX", 5)
|
||||
_META_RETRY_BASE_DELAY_MS = _int_env("INFINITY_META_RETRY_BASE_DELAY_MS", 50)
|
||||
|
||||
|
||||
def _is_meta_contention_error(exc: BaseException) -> bool:
|
||||
"""Return True iff ``exc`` is the RocksDB metadata-counter "Resource busy".
|
||||
|
||||
Prefer the numeric error code when the SDK exposes one — substring matching
|
||||
on ``str(exc)`` is the fallback for older SDKs that surface only a tuple
|
||||
or a plain string. Both surfaces are observed in the wild today.
|
||||
"""
|
||||
code = getattr(exc, "error_code", None)
|
||||
if code is None:
|
||||
# Some Infinity SDK paths raise a plain ``Exception((9003, "..."))``
|
||||
# whose ``args[0]`` carries the code.
|
||||
args = getattr(exc, "args", None)
|
||||
if args and isinstance(args, tuple) and args:
|
||||
code = args[0]
|
||||
if code == _INFINITY_RESOURCE_BUSY_CODE:
|
||||
return True
|
||||
msg = str(exc)
|
||||
return "Resource busy" in msg and "rocksdb" in msg.lower()
|
||||
|
||||
|
||||
def _retry_on_meta_contention(
|
||||
op_name: str,
|
||||
operation: Callable[[], _T],
|
||||
*,
|
||||
logger: logging.Logger | None = None,
|
||||
max_attempts: int = _META_RETRY_MAX,
|
||||
base_delay_ms: int = _META_RETRY_BASE_DELAY_MS,
|
||||
) -> _T:
|
||||
"""Run ``operation`` and retry on RocksDB "Resource busy" errors.
|
||||
|
||||
Exponential backoff with ±50% jitter to avoid a thundering herd when many
|
||||
workers retry simultaneously. Any exception that does not match
|
||||
:func:`_is_meta_contention_error` is re-raised immediately so genuine
|
||||
failures still surface fast.
|
||||
"""
|
||||
log = logger or logging.getLogger(__name__)
|
||||
last_exc: BaseException | None = None
|
||||
for attempt in range(max_attempts):
|
||||
try:
|
||||
return operation()
|
||||
except Exception as exc:
|
||||
if not _is_meta_contention_error(exc):
|
||||
raise
|
||||
last_exc = exc
|
||||
if attempt == max_attempts - 1:
|
||||
break
|
||||
base = (base_delay_ms / 1000.0) * (2 ** attempt)
|
||||
sleep_for = base + random.uniform(0, base * 0.5)
|
||||
log.info(
|
||||
"INFINITY meta contention on %s (attempt %d/%d), "
|
||||
"retrying in %.3fs: %s",
|
||||
op_name, attempt + 1, max_attempts, sleep_for, exc,
|
||||
)
|
||||
time.sleep(sleep_for)
|
||||
log.warning(
|
||||
"INFINITY meta contention on %s exhausted %d attempts: %s",
|
||||
op_name, max_attempts, last_exc,
|
||||
)
|
||||
assert last_exc is not None
|
||||
raise last_exc
|
||||
|
||||
|
||||
class InfinityConnectionBase(DocStoreConnection):
|
||||
def __init__(self, mapping_file_name: str = "infinity_mapping.json", logger_name: str = "ragflow.infinity_conn", table_name_prefix: str="ragflow_"):
|
||||
from common.doc_store.infinity_conn_pool import INFINITY_CONN
|
||||
@ -274,7 +387,11 @@ class InfinityConnectionBase(DocStoreConnection):
|
||||
|
||||
inf_conn = self.connPool.get_conn()
|
||||
try:
|
||||
inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore)
|
||||
inf_db = _retry_on_meta_contention(
|
||||
f"create_database({self.dbName})",
|
||||
lambda: inf_conn.create_database(self.dbName, ConflictType.Ignore),
|
||||
logger=self.logger,
|
||||
)
|
||||
|
||||
# Use configured schema
|
||||
fp_mapping = os.path.join(get_project_base_directory(), "conf", self.mapping_file_name)
|
||||
@ -293,24 +410,32 @@ class InfinityConnectionBase(DocStoreConnection):
|
||||
|
||||
vector_name = f"q_{vector_size}_vec"
|
||||
schema[vector_name] = {"type": f"vector,{vector_size},float"}
|
||||
inf_table = inf_db.create_table(
|
||||
table_name,
|
||||
schema,
|
||||
ConflictType.Ignore,
|
||||
)
|
||||
inf_table.create_index(
|
||||
"q_vec_idx",
|
||||
IndexInfo(
|
||||
vector_name,
|
||||
IndexType.Hnsw,
|
||||
{
|
||||
"M": "16",
|
||||
"ef_construction": "50",
|
||||
"metric": "cosine",
|
||||
"encode": "lvq",
|
||||
},
|
||||
inf_table = _retry_on_meta_contention(
|
||||
f"create_table({table_name})",
|
||||
lambda: inf_db.create_table(
|
||||
table_name,
|
||||
schema,
|
||||
ConflictType.Ignore,
|
||||
),
|
||||
ConflictType.Ignore,
|
||||
logger=self.logger,
|
||||
)
|
||||
_retry_on_meta_contention(
|
||||
f"create_index(q_vec_idx, {table_name})",
|
||||
lambda: inf_table.create_index(
|
||||
"q_vec_idx",
|
||||
IndexInfo(
|
||||
vector_name,
|
||||
IndexType.Hnsw,
|
||||
{
|
||||
"M": "16",
|
||||
"ef_construction": "50",
|
||||
"metric": "cosine",
|
||||
"encode": "lvq",
|
||||
},
|
||||
),
|
||||
ConflictType.Ignore,
|
||||
),
|
||||
logger=self.logger,
|
||||
)
|
||||
for field_name, field_info in schema.items():
|
||||
if field_info["type"] != "varchar" or "analyzer" not in field_info:
|
||||
@ -319,10 +444,15 @@ class InfinityConnectionBase(DocStoreConnection):
|
||||
if isinstance(analyzers, str):
|
||||
analyzers = [analyzers]
|
||||
for analyzer in analyzers:
|
||||
inf_table.create_index(
|
||||
f"ft_{re.sub(r'[^a-zA-Z0-9]', '_', field_name)}_{re.sub(r'[^a-zA-Z0-9]', '_', analyzer)}",
|
||||
IndexInfo(field_name, IndexType.FullText, {"ANALYZER": analyzer}),
|
||||
ConflictType.Ignore,
|
||||
idx_name = f"ft_{re.sub(r'[^a-zA-Z0-9]', '_', field_name)}_{re.sub(r'[^a-zA-Z0-9]', '_', analyzer)}"
|
||||
_retry_on_meta_contention(
|
||||
f"create_index({idx_name}, {table_name})",
|
||||
lambda fn=field_name, an=analyzer, name=idx_name: inf_table.create_index(
|
||||
name,
|
||||
IndexInfo(fn, IndexType.FullText, {"ANALYZER": an}),
|
||||
ConflictType.Ignore,
|
||||
),
|
||||
logger=self.logger,
|
||||
)
|
||||
|
||||
# Create secondary indexes for fields with index_type
|
||||
@ -331,10 +461,14 @@ class InfinityConnectionBase(DocStoreConnection):
|
||||
continue
|
||||
index_config = field_info["index_type"]
|
||||
if isinstance(index_config, str) and index_config == "secondary":
|
||||
inf_table.create_index(
|
||||
f"sec_{field_name}",
|
||||
IndexInfo(field_name, IndexType.Secondary),
|
||||
ConflictType.Ignore,
|
||||
_retry_on_meta_contention(
|
||||
f"create_index(sec_{field_name}, {table_name})",
|
||||
lambda fn=field_name: inf_table.create_index(
|
||||
f"sec_{fn}",
|
||||
IndexInfo(fn, IndexType.Secondary),
|
||||
ConflictType.Ignore,
|
||||
),
|
||||
logger=self.logger,
|
||||
)
|
||||
self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name}")
|
||||
elif isinstance(index_config, dict):
|
||||
@ -342,10 +476,14 @@ class InfinityConnectionBase(DocStoreConnection):
|
||||
params = {}
|
||||
if "cardinality" in index_config:
|
||||
params = {"cardinality": index_config["cardinality"]}
|
||||
inf_table.create_index(
|
||||
f"sec_{field_name}",
|
||||
IndexInfo(field_name, IndexType.Secondary, params),
|
||||
ConflictType.Ignore,
|
||||
_retry_on_meta_contention(
|
||||
f"create_index(sec_{field_name}, {table_name})",
|
||||
lambda fn=field_name, p=params: inf_table.create_index(
|
||||
f"sec_{fn}",
|
||||
IndexInfo(fn, IndexType.Secondary, p),
|
||||
ConflictType.Ignore,
|
||||
),
|
||||
logger=self.logger,
|
||||
)
|
||||
self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name} with params {params}")
|
||||
|
||||
@ -363,18 +501,26 @@ class InfinityConnectionBase(DocStoreConnection):
|
||||
"""
|
||||
table_name = index_name
|
||||
inf_conn = self.connPool.get_conn()
|
||||
inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore)
|
||||
try:
|
||||
inf_db = _retry_on_meta_contention(
|
||||
f"create_database({self.dbName})",
|
||||
lambda: inf_conn.create_database(self.dbName, ConflictType.Ignore),
|
||||
logger=self.logger,
|
||||
)
|
||||
fp_mapping = os.path.join(get_project_base_directory(), "conf", "doc_meta_infinity_mapping.json")
|
||||
if not os.path.exists(fp_mapping):
|
||||
self.logger.error(f"Document metadata mapping file not found at {fp_mapping}")
|
||||
return False
|
||||
with open(fp_mapping) as f:
|
||||
schema = json.load(f)
|
||||
inf_db.create_table(
|
||||
table_name,
|
||||
schema,
|
||||
ConflictType.Ignore,
|
||||
_retry_on_meta_contention(
|
||||
f"create_table({table_name})",
|
||||
lambda: inf_db.create_table(
|
||||
table_name,
|
||||
schema,
|
||||
ConflictType.Ignore,
|
||||
),
|
||||
logger=self.logger,
|
||||
)
|
||||
|
||||
# Create secondary indexes on id and kb_id for better query performance
|
||||
@ -400,14 +546,14 @@ class InfinityConnectionBase(DocStoreConnection):
|
||||
except Exception as e:
|
||||
self.logger.warning(f"Failed to create index on kb_id for {table_name}: {e}")
|
||||
|
||||
self.connPool.release_conn(inf_conn)
|
||||
self.logger.debug(f"INFINITY created document metadata table {table_name} with secondary indexes")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
self.connPool.release_conn(inf_conn)
|
||||
self.logger.exception(f"Error creating document metadata table {table_name}: {e}")
|
||||
return False
|
||||
finally:
|
||||
self.connPool.release_conn(inf_conn)
|
||||
|
||||
def delete_idx(self, index_name: str, dataset_id: str):
|
||||
if index_name.startswith("ragflow_doc_meta_"):
|
||||
@ -417,7 +563,11 @@ class InfinityConnectionBase(DocStoreConnection):
|
||||
inf_conn = self.connPool.get_conn()
|
||||
try:
|
||||
db_instance = inf_conn.get_database(self.dbName)
|
||||
db_instance.drop_table(table_name, ConflictType.Ignore)
|
||||
_retry_on_meta_contention(
|
||||
f"drop_table({table_name})",
|
||||
lambda: db_instance.drop_table(table_name, ConflictType.Ignore),
|
||||
logger=self.logger,
|
||||
)
|
||||
self.logger.info(f"INFINITY dropped table {table_name}")
|
||||
finally:
|
||||
self.connPool.release_conn(inf_conn)
|
||||
|
||||
Reference in New Issue
Block a user