diff --git a/api/db/services/doc_metadata_service.py b/api/db/services/doc_metadata_service.py index 69f25de48..20d304beb 100644 --- a/api/db/services/doc_metadata_service.py +++ b/api/db/services/doc_metadata_service.py @@ -102,13 +102,13 @@ class DocMetadataService: @classmethod def _iter_search_results(cls, results): """ - Iterate over search results in various formats (DataFrame, ES, list). + Iterate over search results in various formats (DataFrame, ES, OceanBase, list). Yields: Tuple of (doc_id, doc_dict) for each document Args: - results: Search results from ES/Infinity in any format + results: Search results from ES/Infinity/OceanBase in any format """ # Handle tuple return from Infinity: (DataFrame, int) # Check this FIRST because pandas DataFrames also have __getitem__ @@ -148,6 +148,14 @@ class DocMetadataService: if doc_id: yield doc_id, doc + # Check if OceanBase SearchResult format + elif hasattr(results, 'chunks') and hasattr(results, 'total'): + # OceanBase format: SearchResult(total=int, chunks=[{...}, {...}]) + for doc in results.chunks: + doc_id = cls._extract_doc_id(doc) + if doc_id: + yield doc_id, doc + @classmethod def _search_metadata(cls, kb_id: str, condition: Dict = None, limit: int = 10000): """ @@ -367,7 +375,7 @@ class DocMetadataService: logging.debug(f"[update_document_metadata] Updating doc_id: {doc_id}, kb_id: {kb_id}, meta_fields: {processed_meta}") # For Elasticsearch, use efficient partial update - if not settings.DOC_ENGINE_INFINITY: + if not settings.DOC_ENGINE_INFINITY and not settings.DOC_ENGINE_OCEANBASE: try: # Use ES partial update API - much more efficient than delete+insert settings.docStoreConn.es.update( diff --git a/common/doc_store/ob_conn_base.py b/common/doc_store/ob_conn_base.py index 0b95770ca..c42868249 100644 --- a/common/doc_store/ob_conn_base.py +++ b/common/doc_store/ob_conn_base.py @@ -24,7 +24,8 @@ from typing import Any from pymysql.converters import escape_string from pyobvector import ObVecClient, FtsIndexParam, FtsParser, VECTOR -from sqlalchemy import Column, Table +from sqlalchemy import Column, JSON, Table +from sqlalchemy.dialects.mysql import VARCHAR from common.doc_store.doc_store_base import DocStoreConnection, MatchExpr, OrderByExpr @@ -37,6 +38,15 @@ fulltext_search_template = "MATCH (%s) AGAINST ('%s' IN NATURAL LANGUAGE MODE)" vector_search_template = "cosine_distance(%s, '%s')" vector_column_pattern = re.compile(r"q_(?P\d+)_vec") +# Document metadata table columns +doc_meta_columns = [ + Column("id", VARCHAR(256), primary_key=True, comment="document id"), + Column("kb_id", VARCHAR(256), nullable=False, comment="knowledge base id"), + Column("meta_fields", JSON, nullable=True, comment="document metadata fields"), +] +doc_meta_column_names = [col.name for col in doc_meta_columns] +doc_meta_column_types = {col.name: col.type for col in doc_meta_columns} + def get_value_str(value: Any) -> str: """Convert value to SQL string representation.""" @@ -266,19 +276,9 @@ class OBConnectionBase(DocStoreConnection): Table name pattern: ragflow_doc_meta_{tenant_id} - Per-tenant metadata table for storing document metadata fields """ - from sqlalchemy import JSON - from sqlalchemy.dialects.mysql import VARCHAR - table_name = index_name lock_prefix = self.get_lock_prefix() - # Define columns for document metadata table - doc_meta_columns = [ - Column("id", VARCHAR(256), primary_key=True, comment="document id"), - Column("kb_id", VARCHAR(256), nullable=False, comment="knowledge base id"), - Column("meta_fields", JSON, nullable=True, comment="document metadata fields"), - ] - try: # Create table with distributed lock _try_with_lock( @@ -319,11 +319,17 @@ class OBConnectionBase(DocStoreConnection): def index_exist(self, index_name: str, dataset_id: str = None) -> bool: """Check if index/table exists.""" - # For doc_meta tables, use index_name directly as table name + # For doc_meta tables, use index_name directly and only check table existence + # (metadata tables don't have fulltext/vector indexes that chunk tables have) if index_name.startswith("ragflow_doc_meta_"): - table_name = index_name - else: - table_name = self.get_table_name(index_name, dataset_id) if dataset_id else index_name + if index_name in self._table_exists_cache: + return True + if not self.client.check_table_exists(index_name): + return False + with self._table_exists_cache_lock: + self._table_exists_cache.add(index_name) + return True + table_name = self.get_table_name(index_name, dataset_id) if dataset_id else index_name return self._check_table_exists_cached(table_name) """ diff --git a/rag/utils/ob_conn.py b/rag/utils/ob_conn.py index 1ee47aceb..c20a4a45b 100644 --- a/rag/utils/ob_conn.py +++ b/rag/utils/ob_conn.py @@ -34,7 +34,8 @@ from common.doc_store.doc_store_base import MatchExpr, OrderByExpr, FusionExpr, from common.doc_store.ob_conn_base import ( OBConnectionBase, get_value_str, vector_search_template, vector_column_pattern, - fulltext_index_name_template, + fulltext_index_name_template, doc_meta_column_names, + doc_meta_column_types, ) from common.float_utils import get_float from rag.nlp import rag_tokenizer @@ -135,8 +136,9 @@ class SearchResult(BaseModel): def get_column_value(column_name: str, value: Any) -> Any: - if column_name in column_types: - column_type = column_types[column_name] + # Check chunk table columns first, then doc_meta table columns + column_type = column_types.get(column_name) or doc_meta_column_types.get(column_name) + if column_type: if isinstance(column_type, String): return str(value) elif isinstance(column_type, Integer): @@ -658,6 +660,12 @@ class OBConnection(OBConnectionBase): return result output_fields = select_fields.copy() + if "*" in output_fields: + if index_names[0].startswith("ragflow_doc_meta_"): + output_fields = doc_meta_column_names.copy() + else: + output_fields = column_names.copy() + if "id" not in output_fields: output_fields = ["id"] + output_fields if "_score" in output_fields: