Feat: add delete all support for delete operations (#13530)

### What problem does this PR solve?

Add delete all support for delete operations.

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
- [x] Documentation Update

---------

Co-authored-by: writinwaters <cai.keith@gmail.com>
This commit is contained in:
Yongteng Lei
2026-03-12 09:47:42 +08:00
committed by GitHub
parent d201a81db7
commit e1b632a7bb
19 changed files with 1042 additions and 975 deletions

View File

@ -241,14 +241,16 @@ class InfinityConnectionBase(DocStoreConnection):
Return the health status of the database.
"""
inf_conn = self.connPool.get_conn()
res = inf_conn.show_current_node()
self.connPool.release_conn(inf_conn)
res2 = {
"type": "infinity",
"status": "green" if res.error_code == 0 and res.server_status in ["started", "alive"] else "red",
"error": res.error_msg,
}
return res2
try:
res = inf_conn.show_current_node()
res2 = {
"type": "infinity",
"status": "green" if res.error_code == 0 and res.server_status in ["started", "alive"] else "red",
"error": res.error_msg,
}
return res2
finally:
self.connPool.release_conn(inf_conn)
"""
Table operations
@ -259,83 +261,85 @@ class InfinityConnectionBase(DocStoreConnection):
self.logger.debug(f"CREATE_IDX: Creating table {table_name}, parser_id: {parser_id}")
inf_conn = self.connPool.get_conn()
inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore)
try:
inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore)
# Use configured schema
fp_mapping = os.path.join(get_project_base_directory(), "conf", self.mapping_file_name)
if not os.path.exists(fp_mapping):
raise Exception(f"Mapping file not found at {fp_mapping}")
schema = json.load(open(fp_mapping))
# Use configured schema
fp_mapping = os.path.join(get_project_base_directory(), "conf", self.mapping_file_name)
if not os.path.exists(fp_mapping):
raise Exception(f"Mapping file not found at {fp_mapping}")
schema = json.load(open(fp_mapping))
if parser_id is not None:
from common.constants import ParserType
if parser_id is not None:
from common.constants import ParserType
if parser_id == ParserType.TABLE.value:
# Table parser: add chunk_data JSON column to store table-specific fields
schema["chunk_data"] = {"type": "json", "default": "{}"}
self.logger.info("Added chunk_data column for TABLE parser")
if parser_id == ParserType.TABLE.value:
# Table parser: add chunk_data JSON column to store table-specific fields
schema["chunk_data"] = {"type": "json", "default": "{}"}
self.logger.info("Added chunk_data column for TABLE parser")
vector_name = f"q_{vector_size}_vec"
schema[vector_name] = {"type": f"vector,{vector_size},float"}
inf_table = inf_db.create_table(
table_name,
schema,
ConflictType.Ignore,
)
inf_table.create_index(
"q_vec_idx",
IndexInfo(
vector_name,
IndexType.Hnsw,
{
"M": "16",
"ef_construction": "50",
"metric": "cosine",
"encode": "lvq",
},
),
ConflictType.Ignore,
)
for field_name, field_info in schema.items():
if field_info["type"] != "varchar" or "analyzer" not in field_info:
continue
analyzers = field_info["analyzer"]
if isinstance(analyzers, str):
analyzers = [analyzers]
for analyzer in analyzers:
inf_table.create_index(
f"ft_{re.sub(r'[^a-zA-Z0-9]', '_', field_name)}_{re.sub(r'[^a-zA-Z0-9]', '_', analyzer)}",
IndexInfo(field_name, IndexType.FullText, {"ANALYZER": analyzer}),
ConflictType.Ignore,
)
# Create secondary indexes for fields with index_type
for field_name, field_info in schema.items():
if "index_type" not in field_info:
continue
index_config = field_info["index_type"]
if isinstance(index_config, str) and index_config == "secondary":
inf_table.create_index(
f"sec_{field_name}",
IndexInfo(field_name, IndexType.Secondary),
ConflictType.Ignore,
)
self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name}")
elif isinstance(index_config, dict):
if index_config.get("type") == "secondary":
params = {}
if "cardinality" in index_config:
params = {"cardinality": index_config["cardinality"]}
vector_name = f"q_{vector_size}_vec"
schema[vector_name] = {"type": f"vector,{vector_size},float"}
inf_table = inf_db.create_table(
table_name,
schema,
ConflictType.Ignore,
)
inf_table.create_index(
"q_vec_idx",
IndexInfo(
vector_name,
IndexType.Hnsw,
{
"M": "16",
"ef_construction": "50",
"metric": "cosine",
"encode": "lvq",
},
),
ConflictType.Ignore,
)
for field_name, field_info in schema.items():
if field_info["type"] != "varchar" or "analyzer" not in field_info:
continue
analyzers = field_info["analyzer"]
if isinstance(analyzers, str):
analyzers = [analyzers]
for analyzer in analyzers:
inf_table.create_index(
f"sec_{field_name}",
IndexInfo(field_name, IndexType.Secondary, params),
f"ft_{re.sub(r'[^a-zA-Z0-9]', '_', field_name)}_{re.sub(r'[^a-zA-Z0-9]', '_', analyzer)}",
IndexInfo(field_name, IndexType.FullText, {"ANALYZER": analyzer}),
ConflictType.Ignore,
)
self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name} with params {params}")
self.connPool.release_conn(inf_conn)
self.logger.info(f"INFINITY created table {table_name}, vector size {vector_size}")
return True
# Create secondary indexes for fields with index_type
for field_name, field_info in schema.items():
if "index_type" not in field_info:
continue
index_config = field_info["index_type"]
if isinstance(index_config, str) and index_config == "secondary":
inf_table.create_index(
f"sec_{field_name}",
IndexInfo(field_name, IndexType.Secondary),
ConflictType.Ignore,
)
self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name}")
elif isinstance(index_config, dict):
if index_config.get("type") == "secondary":
params = {}
if "cardinality" in index_config:
params = {"cardinality": index_config["cardinality"]}
inf_table.create_index(
f"sec_{field_name}",
IndexInfo(field_name, IndexType.Secondary, params),
ConflictType.Ignore,
)
self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name} with params {params}")
self.logger.info(f"INFINITY created table {table_name}, vector size {vector_size}")
return True
finally:
self.connPool.release_conn(inf_conn)
def create_doc_meta_idx(self, index_name: str):
"""
@ -398,25 +402,28 @@ class InfinityConnectionBase(DocStoreConnection):
else:
table_name = f"{index_name}_{dataset_id}"
inf_conn = self.connPool.get_conn()
db_instance = inf_conn.get_database(self.dbName)
db_instance.drop_table(table_name, ConflictType.Ignore)
self.connPool.release_conn(inf_conn)
self.logger.info(f"INFINITY dropped table {table_name}")
try:
db_instance = inf_conn.get_database(self.dbName)
db_instance.drop_table(table_name, ConflictType.Ignore)
self.logger.info(f"INFINITY dropped table {table_name}")
finally:
self.connPool.release_conn(inf_conn)
def index_exist(self, index_name: str, dataset_id: str) -> bool:
if index_name.startswith("ragflow_doc_meta_"):
table_name = index_name
else:
table_name = f"{index_name}_{dataset_id}"
inf_conn = self.connPool.get_conn()
try:
inf_conn = self.connPool.get_conn()
db_instance = inf_conn.get_database(self.dbName)
_ = db_instance.get_table(table_name)
self.connPool.release_conn(inf_conn)
return True
except Exception as e:
self.logger.warning(f"INFINITY indexExist {str(e)}")
return False
return False
finally:
self.connPool.release_conn(inf_conn)
"""
CRUD operations
@ -453,21 +460,23 @@ class InfinityConnectionBase(DocStoreConnection):
def delete(self, condition: dict, index_name: str, dataset_id: str) -> int:
inf_conn = self.connPool.get_conn()
db_instance = inf_conn.get_database(self.dbName)
if index_name.startswith("ragflow_doc_meta_"):
table_name = index_name
else:
table_name = f"{index_name}_{dataset_id}"
try:
table_instance = db_instance.get_table(table_name)
except Exception:
self.logger.warning(f"Skipped deleting from table {table_name} since the table doesn't exist.")
return 0
filter = self.equivalent_condition_to_str(condition, table_instance)
self.logger.debug(f"INFINITY delete table {table_name}, filter {filter}.")
res = table_instance.delete(filter)
self.connPool.release_conn(inf_conn)
return res.deleted_rows
db_instance = inf_conn.get_database(self.dbName)
if index_name.startswith("ragflow_doc_meta_"):
table_name = index_name
else:
table_name = f"{index_name}_{dataset_id}"
try:
table_instance = db_instance.get_table(table_name)
except Exception:
self.logger.warning(f"Skipped deleting from table {table_name} since the table doesn't exist.")
return 0
filter = self.equivalent_condition_to_str(condition, table_instance)
self.logger.debug(f"INFINITY delete table {table_name}, filter {filter}.")
res = table_instance.delete(filter)
return res.deleted_rows
finally:
self.connPool.release_conn(inf_conn)
"""
Helper functions for search result