Files
ragflow/internal/engine/infinity/metadata.go
qinling0210 af85aa9c7b Implement Elasticsearch functions in GO (#15160)
### What problem does this PR solve?

Implement Elasticsearch functions in GO (except for Search)

### Type of change

- [x] Refactoring
2026-05-25 19:15:07 +08:00

502 lines
17 KiB
Go

//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package infinity
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
infinity "github.com/infiniflow/infinity-go-sdk"
"ragflow/internal/common"
"ragflow/internal/utility"
"go.uber.org/zap"
)
// CreateMetadataStore creates a metadata table in Infinity
// tenantID is the tenant identifier used to build the table name
func (e *infinityEngine) CreateMetadataStore(ctx context.Context, tenantID string) error {
tableName := buildMetadataTableName(tenantID)
// Get database
db, err := e.client.conn.GetDatabase(e.client.dbName)
if err != nil {
return fmt.Errorf("Failed to get database: %w", err)
}
// Check if table already exists
exists, err := e.tableExists(ctx, tableName)
if err != nil {
return fmt.Errorf("Failed to check if table exists: %w", err)
}
if exists {
return fmt.Errorf("metadata table '%s' already exists", tableName)
}
// Use configured doc_meta mapping file
fpMapping := filepath.Join(utility.GetProjectRoot(), "conf", e.docMetaMappingFileName)
schemaData, err := os.ReadFile(fpMapping)
if err != nil {
return fmt.Errorf("Failed to read mapping file: %w", err)
}
var schema map[string]fieldInfo
if err := json.Unmarshal(schemaData, &schema); err != nil {
return fmt.Errorf("Failed to parse mapping file: %w", err)
}
// Build column definitions
var columns infinity.TableSchema
for fieldName, fieldInfo := range schema {
col := infinity.ColumnDefinition{
Name: fieldName,
DataType: fieldInfo.Type,
Default: fieldInfo.Default,
}
columns = append(columns, &col)
}
// Create table
_, err = db.CreateTable(tableName, columns, infinity.ConflictTypeIgnore)
if err != nil {
return fmt.Errorf("Failed to create doc meta table: %w", err)
}
common.Debug("Infinity created doc meta table", zap.String("tableName", tableName))
// Get table for creating indexes
table, err := db.GetTable(tableName)
if err != nil {
return fmt.Errorf("Failed to get table: %w", err)
}
// Create secondary index on id
_, err = table.CreateIndex(
fmt.Sprintf("idx_%s_id", tableName),
infinity.NewIndexInfo("id", infinity.IndexTypeSecondary, nil),
infinity.ConflictTypeIgnore,
"",
)
if err != nil {
return fmt.Errorf("Failed to create secondary index on id: %w", err)
}
// Create secondary index on kb_id
_, err = table.CreateIndex(
fmt.Sprintf("idx_%s_kb_id", tableName),
infinity.NewIndexInfo("kb_id", infinity.IndexTypeSecondary, nil),
infinity.ConflictTypeIgnore,
"",
)
if err != nil {
return fmt.Errorf("Failed to create secondary index on kb_id: %w", err)
}
return nil
}
// InsertMetadata inserts document metadata into tenant's metadata table
// Auto-create the table if it doesn't exist
// Replace existing metadata with same id and kb_id
func (e *infinityEngine) InsertMetadata(ctx context.Context, metadata []map[string]interface{}, tenantID string) ([]string, error) {
tableName := buildMetadataTableName(tenantID)
common.Info("InfinityConnection.InsertMetadata called", zap.String("tableName", tableName), zap.Int("metaCount", len(metadata)))
db, err := e.client.conn.GetDatabase(e.client.dbName)
if err != nil {
return nil, fmt.Errorf("Failed to get database: %w", err)
}
table, err := db.GetTable(tableName)
if err != nil {
// Table doesn't exist, try to create it
errMsg := strings.ToLower(err.Error())
if !strings.Contains(errMsg, "not found") && !strings.Contains(errMsg, "doesn't exist") {
return nil, fmt.Errorf("Failed to get table %s: %w", tableName, err)
}
// Create metadata table
if createErr := e.CreateMetadataStore(ctx, tenantID); createErr != nil {
return nil, fmt.Errorf("Failed to create metadata table: %w", createErr)
}
table, err = db.GetTable(tableName)
if err != nil {
return nil, fmt.Errorf("Failed to get table after creation: %w", err)
}
}
// Transform metadata - convert meta_fields map to JSON string
insertMetadata := make([]map[string]interface{}, len(metadata))
for i, m := range metadata {
d := make(map[string]interface{})
for k, v := range m {
if k == "meta_fields" {
d["meta_fields"] = utility.ConvertMapToJSONString(v)
} else {
d[k] = v
}
}
insertMetadata[i] = d
}
// Delete existing metadata with same id and kb_id, then insert new
if len(insertMetadata) > 0 {
idList := make([]string, len(insertMetadata))
for i, m := range insertMetadata {
// Escape single quotes in values to prevent SQL injection
docID := fmt.Sprintf("'%s'", strings.ReplaceAll(fmt.Sprintf("%v", m["id"]), "'", "''"))
kbID := fmt.Sprintf("'%s'", strings.ReplaceAll(fmt.Sprintf("%v", m["kb_id"]), "'", "''"))
idList[i] = fmt.Sprintf("(id = %s AND kb_id = %s)", docID, kbID)
}
filter := strings.Join(idList, " OR ")
common.Debug(fmt.Sprintf("Deleting existing metadata with filter: %s", filter))
delResp, delErr := table.Delete(filter)
if delErr != nil {
common.Warn(fmt.Sprintf("Failed to delete existing metadata: %v", delErr))
} else if delResp.DeletedRows > 0 {
common.Info(fmt.Sprintf("Deleted %d existing metadata entries", delResp.DeletedRows))
}
}
// Insert metadata
_, err = table.Insert(insertMetadata)
if err != nil {
return nil, fmt.Errorf("Failed to insert metadata: %w", err)
}
common.Info("InfinityConnection.InsertMetadata result", zap.String("tableName", tableName), zap.Int("metaCount", len(metadata)))
return []string{}, nil
}
// UpdateMetadata updates or inserts document metadata in tenant's metadata table.
// If a row with the given docID and datasetID exists, it merges the new metadata with existing.
// If no row exists, it inserts a new row.
func (e *infinityEngine) UpdateMetadata(ctx context.Context, docID string, datasetID string, metaFields map[string]interface{}, tenantID string) error {
tableName := buildMetadataTableName(tenantID)
common.Info("InfinityConnection.UpdateMetadata called", zap.String("tableName", tableName), zap.String("docID", docID), zap.String("datasetID", datasetID))
db, err := e.client.conn.GetDatabase(e.client.dbName)
if err != nil {
return fmt.Errorf("failed to get database: %w", err)
}
table, err := db.GetTable(tableName)
if err != nil {
return fmt.Errorf("failed to get metadata table %s: %w", tableName, err)
}
// Build filter to find existing row by docID and datasetID
escapedDocID := strings.ReplaceAll(docID, "'", "''")
escapedDatasetID := strings.ReplaceAll(datasetID, "'", "''")
filter := fmt.Sprintf("id = '%s' AND kb_id = '%s'", escapedDocID, escapedDatasetID)
// Query existing metadata using the chainable API
queryTable := table.Output([]string{"id", "kb_id", "meta_fields"}).Filter(filter).Limit(1).Offset(0)
// Execute query to check if row exists
result, err := queryTable.ToResult()
rowExists := false
if err != nil {
common.Warn(fmt.Sprintf("Failed to query existing metadata: %v", err))
// If query fails, treat as not exists and insert
} else {
// Get results - ToResult returns *infinity.QueryResult
qr, ok := result.(*infinity.QueryResult)
// Check if id column has any rows - len(qr.Data["id"]) > 0 means there are rows
if ok && qr != nil && len(qr.Data["id"]) > 0 {
rowExists = true
// Get meta_fields from the first row
if metaFieldsData, exists := qr.Data["meta_fields"]; exists && len(metaFieldsData) > 0 {
existingMetaFieldsVal := metaFieldsData[0]
// Parse existing meta_fields if it's a string or []uint8
var existingMetaFields map[string]interface{}
if existingMetaFieldsVal != nil {
switch v := existingMetaFieldsVal.(type) {
case string:
if err := json.Unmarshal([]byte(v), &existingMetaFields); err != nil {
common.Warn(fmt.Sprintf("Failed to parse existing meta_fields: %v", err))
existingMetaFields = make(map[string]interface{})
}
case []uint8:
// Handle raw bytes from Infinity - Infinity prefixes JSON with 4 bytes (likely length), skip them
decoded := v
if len(decoded) > 4 {
decoded = decoded[4:]
}
if err := json.Unmarshal(decoded, &existingMetaFields); err != nil {
common.Warn(fmt.Sprintf("Failed to parse existing meta_fields from []uint8: %v", err))
existingMetaFields = make(map[string]interface{})
}
case map[string]interface{}:
existingMetaFields = v
}
}
// Merge new meta_fields with existing (new values override existing)
if existingMetaFields == nil {
existingMetaFields = make(map[string]interface{})
}
for k, v := range metaFields {
existingMetaFields[k] = v
}
metaFields = existingMetaFields
}
}
}
// Prepare updated metadata as JSON string
updatedFields := map[string]interface{}{
"meta_fields": utility.ConvertMapToJSONString(metaFields),
}
if rowExists {
// Row exists: update it with merged metadata
common.Info(fmt.Sprintf("UpdateMetadata: updating existing row, table=%s, filter=%s, newValue=%v", tableName, filter, updatedFields))
_, err = table.Update(filter, updatedFields)
if err != nil {
return fmt.Errorf("failed to update metadata: %w", err)
}
} else {
// Row doesn't exist: insert new row
insertFields := map[string]interface{}{
"id": docID,
"kb_id": datasetID,
"meta_fields": utility.ConvertMapToJSONString(metaFields),
}
common.Info(fmt.Sprintf("UpdateMetadata: inserting new row, table=%s, newValue=%v", tableName, insertFields))
_, err = table.Insert(insertFields)
if err != nil {
return fmt.Errorf("failed to insert metadata: %w", err)
}
}
common.Info("InfinityConnection.UpdateMetadata completes", zap.String("tableName", tableName), zap.String("docID", docID))
return nil
}
// DeleteMetadata deletes metadata from tenant's metadata table by condition
// Returns the number of deleted documents.
func (e *infinityEngine) DeleteMetadata(ctx context.Context, condition map[string]interface{}, tenantID string) (int64, error) {
tableName := buildMetadataTableName(tenantID)
db, err := e.client.conn.GetDatabase(e.client.dbName)
if err != nil {
return 0, fmt.Errorf("failed to get database: %w", err)
}
table, err := db.GetTable(tableName)
if err != nil {
common.Warn(fmt.Sprintf("Metadata table %s does not exist, skipping delete", tableName))
return 0, nil
}
// Get table columns for building filter
clmns := make(map[string]struct {
Type string
Default interface{}
})
colsResp, err := table.ShowColumns()
if err != nil {
return 0, fmt.Errorf("failed to get columns: %w", err)
}
result, ok := colsResp.(*infinity.QueryResult)
if ok {
if nameArr, ok := result.Data["name"]; ok {
if typeArr, ok := result.Data["type"]; ok {
if defArr, ok := result.Data["default"]; ok {
for i := 0; i < len(nameArr); i++ {
colName, _ := nameArr[i].(string)
colType, _ := typeArr[i].(string)
var colDefault interface{}
if i < len(defArr) {
colDefault = defArr[i]
}
clmns[colName] = struct {
Type string
Default interface{}
}{colType, colDefault}
}
}
}
}
}
// Build filter from condition
filter := buildFilterFromCondition(condition, clmns)
delResp, err := table.Delete(filter)
if err != nil {
return 0, fmt.Errorf("failed to delete metadata: %w", err)
}
return delResp.DeletedRows, nil
}
// DeleteMetadataKeys deletes specific metadata keys from a document's meta_fields.
// If deleting those keys leaves no metadata entries, the metadata row is removed.
func (e *infinityEngine) DeleteMetadataKeys(ctx context.Context, docID string, datasetID string, keys []string, tenantID string) error {
tableName := buildMetadataTableName(tenantID)
common.Info("InfinityConnection.DeleteMetadataKeys called", zap.String("tableName", tableName), zap.String("docID", docID), zap.Any("keys", keys))
db, err := e.client.conn.GetDatabase(e.client.dbName)
if err != nil {
return fmt.Errorf("failed to get database: %w", err)
}
table, err := db.GetTable(tableName)
if err != nil {
return fmt.Errorf("failed to get metadata table %s: %w", tableName, err)
}
// Build filter to find the document
escapedDocID := strings.ReplaceAll(docID, "'", "''")
escapedDatasetID := strings.ReplaceAll(datasetID, "'", "''")
filter := fmt.Sprintf("id = '%s' AND kb_id = '%s'", escapedDocID, escapedDatasetID)
// Query existing metadata to get current meta_fields
queryTable := table.Output([]string{"id", "kb_id", "meta_fields"}).Filter(filter).Limit(1).Offset(0)
result, err := queryTable.ToResult()
if err != nil {
return fmt.Errorf("failed to query existing metadata: %w", err)
}
qr, ok := result.(*infinity.QueryResult)
if !ok || qr == nil || len(qr.Data["id"]) == 0 {
return fmt.Errorf("document not found: %s", docID)
}
// Get existing meta_fields
var existingMetaFields map[string]interface{}
if metaFieldsData, exists := qr.Data["meta_fields"]; exists && len(metaFieldsData) > 0 {
if metaFieldsData[0] != nil {
switch v := metaFieldsData[0].(type) {
case string:
if err := json.Unmarshal([]byte(v), &existingMetaFields); err != nil {
common.Warn("Failed to parse meta_fields JSON", zap.String("docID", docID), zap.Error(err))
existingMetaFields = make(map[string]interface{})
}
case []uint8:
// Handle raw bytes from Infinity - Infinity prefixes JSON with 4 bytes (likely length), skip them
decoded := v
if len(decoded) > 4 {
decoded = decoded[4:]
}
if err := json.Unmarshal(decoded, &existingMetaFields); err != nil {
common.Warn("Failed to parse meta_fields JSON from []uint8", zap.String("docID", docID), zap.String("err", err.Error()))
existingMetaFields = make(map[string]interface{})
}
case map[string]interface{}:
existingMetaFields = v
default:
common.Debug("meta_fields unexpected type", zap.String("type", fmt.Sprintf("%T", metaFieldsData[0])), zap.Any("value", metaFieldsData[0]))
}
}
} else {
common.Debug("meta_fields not found in qr.Data or empty", zap.Any("exists", exists))
}
if existingMetaFields == nil {
existingMetaFields = make(map[string]interface{})
}
// Build set of keys to remove
keysToRemove := make(map[string]bool)
for _, k := range keys {
keysToRemove[k] = true
}
// Check if any keys actually exist and would be removed
hasKeysToRemove := false
for k := range existingMetaFields {
if keysToRemove[k] {
hasKeysToRemove = true
break
}
}
if !hasKeysToRemove {
common.Info(
"No matching keys to delete from document",
zap.String("docID", docID),
zap.Int("existingMetaFieldCount", len(existingMetaFields)),
zap.Int("keysCount", len(keys)),
)
return nil
}
// Count remaining keys after deletion (keys that are NOT being removed)
remainingKeys := 0
for k := range existingMetaFields {
if !keysToRemove[k] {
remainingKeys++
}
}
// If no other keys would remain after deletion, delete the document directly
if remainingKeys == 0 {
common.Info("All metadata keys would be deleted, removing document from index", zap.String("docID", docID))
// Use existing DeleteMetadata method which handles the deletion properly
condition := map[string]interface{}{
"id": docID,
"kb_id": datasetID,
}
_, err := e.DeleteMetadata(ctx, condition, tenantID)
if err != nil {
return fmt.Errorf("failed to delete document: %w", err)
}
common.Info("Successfully removed document with empty meta_fields", zap.String("docID", docID))
return nil
}
// Some keys will remain, so remove only the specified keys
for _, key := range keys {
delete(existingMetaFields, key)
}
// Update with the modified metadata
updatedFields := map[string]interface{}{
"meta_fields": utility.ConvertMapToJSONString(existingMetaFields),
}
_, err = table.Update(filter, updatedFields)
if err != nil {
return fmt.Errorf("failed to delete metadata keys: %w", err)
}
common.Info("InfinityConnection.DeleteMetadataKeys completed", zap.String("tableName", tableName), zap.String("docID", docID))
return nil
}
// DropMetadataStore drops a metadata table from Infinity
func (e *infinityEngine) DropMetadataStore(ctx context.Context, tenantID string) error {
tableName := buildMetadataTableName(tenantID)
return e.dropTable(ctx, tableName)
}
// MetadataStoreExists checks if a metadata table exists in Infinity
func (e *infinityEngine) MetadataStoreExists(ctx context.Context, tenantID string) (bool, error) {
tableName := buildMetadataTableName(tenantID)
return e.tableExists(ctx, tableName)
}