fix: OceanBase metadata not returned in document list API (#13209)

### What problem does this PR solve?

Fix #13144.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
He Wang
2026-02-25 15:29:17 +08:00
committed by GitHub
parent 4ceb668d40
commit 394ff16b66
3 changed files with 43 additions and 21 deletions

View File

@ -102,13 +102,13 @@ class DocMetadataService:
@classmethod
def _iter_search_results(cls, results):
"""
Iterate over search results in various formats (DataFrame, ES, list).
Iterate over search results in various formats (DataFrame, ES, OceanBase, list).
Yields:
Tuple of (doc_id, doc_dict) for each document
Args:
results: Search results from ES/Infinity in any format
results: Search results from ES/Infinity/OceanBase in any format
"""
# Handle tuple return from Infinity: (DataFrame, int)
# Check this FIRST because pandas DataFrames also have __getitem__
@ -148,6 +148,14 @@ class DocMetadataService:
if doc_id:
yield doc_id, doc
# Check if OceanBase SearchResult format
elif hasattr(results, 'chunks') and hasattr(results, 'total'):
# OceanBase format: SearchResult(total=int, chunks=[{...}, {...}])
for doc in results.chunks:
doc_id = cls._extract_doc_id(doc)
if doc_id:
yield doc_id, doc
@classmethod
def _search_metadata(cls, kb_id: str, condition: Dict = None, limit: int = 10000):
"""
@ -367,7 +375,7 @@ class DocMetadataService:
logging.debug(f"[update_document_metadata] Updating doc_id: {doc_id}, kb_id: {kb_id}, meta_fields: {processed_meta}")
# For Elasticsearch, use efficient partial update
if not settings.DOC_ENGINE_INFINITY:
if not settings.DOC_ENGINE_INFINITY and not settings.DOC_ENGINE_OCEANBASE:
try:
# Use ES partial update API - much more efficient than delete+insert
settings.docStoreConn.es.update(

View File

@ -24,7 +24,8 @@ from typing import Any
from pymysql.converters import escape_string
from pyobvector import ObVecClient, FtsIndexParam, FtsParser, VECTOR
from sqlalchemy import Column, Table
from sqlalchemy import Column, JSON, Table
from sqlalchemy.dialects.mysql import VARCHAR
from common.doc_store.doc_store_base import DocStoreConnection, MatchExpr, OrderByExpr
@ -37,6 +38,15 @@ fulltext_search_template = "MATCH (%s) AGAINST ('%s' IN NATURAL LANGUAGE MODE)"
vector_search_template = "cosine_distance(%s, '%s')"
vector_column_pattern = re.compile(r"q_(?P<vector_size>\d+)_vec")
# Document metadata table columns
doc_meta_columns = [
Column("id", VARCHAR(256), primary_key=True, comment="document id"),
Column("kb_id", VARCHAR(256), nullable=False, comment="knowledge base id"),
Column("meta_fields", JSON, nullable=True, comment="document metadata fields"),
]
doc_meta_column_names = [col.name for col in doc_meta_columns]
doc_meta_column_types = {col.name: col.type for col in doc_meta_columns}
def get_value_str(value: Any) -> str:
"""Convert value to SQL string representation."""
@ -266,19 +276,9 @@ class OBConnectionBase(DocStoreConnection):
Table name pattern: ragflow_doc_meta_{tenant_id}
- Per-tenant metadata table for storing document metadata fields
"""
from sqlalchemy import JSON
from sqlalchemy.dialects.mysql import VARCHAR
table_name = index_name
lock_prefix = self.get_lock_prefix()
# Define columns for document metadata table
doc_meta_columns = [
Column("id", VARCHAR(256), primary_key=True, comment="document id"),
Column("kb_id", VARCHAR(256), nullable=False, comment="knowledge base id"),
Column("meta_fields", JSON, nullable=True, comment="document metadata fields"),
]
try:
# Create table with distributed lock
_try_with_lock(
@ -319,11 +319,17 @@ class OBConnectionBase(DocStoreConnection):
def index_exist(self, index_name: str, dataset_id: str = None) -> bool:
"""Check if index/table exists."""
# For doc_meta tables, use index_name directly as table name
# For doc_meta tables, use index_name directly and only check table existence
# (metadata tables don't have fulltext/vector indexes that chunk tables have)
if index_name.startswith("ragflow_doc_meta_"):
table_name = index_name
else:
table_name = self.get_table_name(index_name, dataset_id) if dataset_id else index_name
if index_name in self._table_exists_cache:
return True
if not self.client.check_table_exists(index_name):
return False
with self._table_exists_cache_lock:
self._table_exists_cache.add(index_name)
return True
table_name = self.get_table_name(index_name, dataset_id) if dataset_id else index_name
return self._check_table_exists_cached(table_name)
"""

View File

@ -34,7 +34,8 @@ from common.doc_store.doc_store_base import MatchExpr, OrderByExpr, FusionExpr,
from common.doc_store.ob_conn_base import (
OBConnectionBase, get_value_str,
vector_search_template, vector_column_pattern,
fulltext_index_name_template,
fulltext_index_name_template, doc_meta_column_names,
doc_meta_column_types,
)
from common.float_utils import get_float
from rag.nlp import rag_tokenizer
@ -135,8 +136,9 @@ class SearchResult(BaseModel):
def get_column_value(column_name: str, value: Any) -> Any:
if column_name in column_types:
column_type = column_types[column_name]
# Check chunk table columns first, then doc_meta table columns
column_type = column_types.get(column_name) or doc_meta_column_types.get(column_name)
if column_type:
if isinstance(column_type, String):
return str(value)
elif isinstance(column_type, Integer):
@ -658,6 +660,12 @@ class OBConnection(OBConnectionBase):
return result
output_fields = select_fields.copy()
if "*" in output_fields:
if index_names[0].startswith("ragflow_doc_meta_"):
output_fields = doc_meta_column_names.copy()
else:
output_fields = column_names.copy()
if "id" not in output_fields:
output_fields = ["id"] + output_fields
if "_score" in output_fields: