mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-05-20 08:16:41 +08:00
### What problem does this PR solve? Refact functions in engine in GO ### Type of change - [x] Refactoring
310 lines
8.2 KiB
Go
310 lines
8.2 KiB
Go
//
|
|
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
|
|
package infinity
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"ragflow/internal/common"
|
|
"strings"
|
|
|
|
infinity "github.com/infiniflow/infinity-go-sdk"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// dropTable drops a table from Infinity
|
|
func (e *infinityEngine) dropTable(ctx context.Context, tableName string) error {
|
|
if tableName == "" {
|
|
return fmt.Errorf("table name cannot be empty")
|
|
}
|
|
|
|
// Check if table exists
|
|
exists, err := e.tableExists(ctx, tableName)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check table existence: %w", err)
|
|
}
|
|
if !exists {
|
|
return fmt.Errorf("table '%s' does not exist", tableName)
|
|
}
|
|
|
|
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get database: %w", err)
|
|
}
|
|
|
|
_, err = db.DropTable(tableName, infinity.ConflictTypeError)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to drop table: %w", err)
|
|
}
|
|
|
|
common.Info("Infinity dropped table", zap.String("tableName", tableName))
|
|
return nil
|
|
}
|
|
|
|
// tableExists checks if a table exists in Infinity
|
|
func (e *infinityEngine) tableExists(ctx context.Context, tableName string) (bool, error) {
|
|
if tableName == "" {
|
|
return false, fmt.Errorf("table name cannot be empty")
|
|
}
|
|
|
|
db, err := e.client.conn.GetDatabase(e.client.dbName)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to get database: %w", err)
|
|
}
|
|
|
|
// Try to get the table - if it exists, no error
|
|
_, err = db.GetTable(tableName)
|
|
if err != nil {
|
|
errMsg := strings.ToLower(err.Error())
|
|
if strings.Contains(errMsg, "not found") || strings.Contains(errMsg, "doesn't exist") {
|
|
return false, nil
|
|
}
|
|
return false, fmt.Errorf("failed to check table existence: %w", err)
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// fieldInfo represents a field in the infinity mapping schema
|
|
type fieldInfo struct {
|
|
Type string `json:"type"`
|
|
Default interface{} `json:"default"`
|
|
Analyzer interface{} `json:"analyzer"` // string or []string
|
|
IndexType interface{} `json:"index_type"` // string or map
|
|
Comment string `json:"comment"`
|
|
}
|
|
|
|
// orderedFields preserves the order of fields as defined in JSON
|
|
type orderedFields struct {
|
|
Keys []string
|
|
Fields map[string]fieldInfo
|
|
}
|
|
|
|
func (o *orderedFields) UnmarshalJSON(data []byte) error {
|
|
// Parse JSON manually to preserve key order
|
|
// Look for key names by scanning the JSON string
|
|
// This is a simple approach: find {"key": value, "key2": value2...}
|
|
o.Fields = make(map[string]fieldInfo)
|
|
o.Keys = make([]string, 0)
|
|
|
|
// Use a streaming JSON parser approach
|
|
dec := json.NewDecoder(bytes.NewReader(data))
|
|
tok, err := dec.Token()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if delim, ok := tok.(json.Delim); ok && delim == '{' {
|
|
for dec.More() {
|
|
// Read key
|
|
tok, err := dec.Token()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
key, ok := tok.(string)
|
|
if !ok {
|
|
continue
|
|
}
|
|
o.Keys = append(o.Keys, key)
|
|
|
|
// Read value into fieldInfo
|
|
var field fieldInfo
|
|
if err := dec.Decode(&field); err != nil {
|
|
return err
|
|
}
|
|
o.Fields[key] = field
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// fieldKeyword checks if field is a keyword field
|
|
func fieldKeyword(fieldName string) bool {
|
|
if fieldName == "source_id" {
|
|
return true
|
|
}
|
|
if strings.HasSuffix(fieldName, "_kwd") &&
|
|
fieldName != "knowledge_graph_kwd" &&
|
|
fieldName != "docnm_kwd" &&
|
|
fieldName != "important_kwd" &&
|
|
fieldName != "question_kwd" {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// existsCondition builds a NOT EXISTS or field!=" condition
|
|
func existsCondition(field string, tableColumns map[string]struct {
|
|
Type string
|
|
Default interface{}
|
|
}) string {
|
|
col, colOk := tableColumns[field]
|
|
if !colOk {
|
|
common.Warn(fmt.Sprintf("Column '%s' not found in table columns", field))
|
|
return fmt.Sprintf("%s!=null", field)
|
|
}
|
|
if strings.Contains(strings.ToLower(col.Type), "char") {
|
|
if col.Default != nil {
|
|
return fmt.Sprintf(" %s!='%v' ", field, col.Default)
|
|
}
|
|
return fmt.Sprintf(" %s!='' ", field)
|
|
}
|
|
if col.Default != nil {
|
|
return fmt.Sprintf("%s!=%v", field, col.Default)
|
|
}
|
|
return fmt.Sprintf("%s!=null", field)
|
|
}
|
|
|
|
func buildFilterFromCondition(condition map[string]interface{}, tableColumns map[string]struct {
|
|
Type string
|
|
Default interface{}
|
|
}) string {
|
|
var conditions []string
|
|
|
|
for k, v := range condition {
|
|
if v == nil {
|
|
continue
|
|
}
|
|
if strVal, ok := v.(string); ok && strVal == "" {
|
|
continue
|
|
}
|
|
|
|
// Handle must_not conditions -> NOT (...)
|
|
if k == "must_not" {
|
|
if mustNotMap, ok := v.(map[string]interface{}); ok {
|
|
for kk, vv := range mustNotMap {
|
|
if kk == "exists" {
|
|
if existsField, ok := vv.(string); ok {
|
|
conditions = append(conditions, fmt.Sprintf("NOT (%s)", existsCondition(existsField, tableColumns)))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Handle keyword fields -> filter_fulltext with converted field name
|
|
if fieldKeyword(k) {
|
|
var orConds []string
|
|
addFullText := func(item string) {
|
|
item = strings.ReplaceAll(item, "'", "''")
|
|
orConds = append(orConds, fmt.Sprintf("filter_fulltext('%s', '%s')", convertMatchingField(k), item))
|
|
}
|
|
|
|
switch val := v.(type) {
|
|
case []string:
|
|
for _, item := range val {
|
|
addFullText(item)
|
|
}
|
|
case []interface{}:
|
|
for _, item := range val {
|
|
addFullText(fmt.Sprintf("%v", item))
|
|
}
|
|
case string:
|
|
addFullText(val)
|
|
default:
|
|
addFullText(fmt.Sprintf("%v", val))
|
|
}
|
|
|
|
if len(orConds) > 0 {
|
|
conditions = append(conditions, "("+strings.Join(orConds, " OR ")+")")
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Handle list values (IN condition)
|
|
if listVal, ok := v.([]interface{}); ok {
|
|
var inVals []string
|
|
for _, item := range listVal {
|
|
if strItem, ok := item.(string); ok {
|
|
strItem = strings.ReplaceAll(strItem, "'", "''")
|
|
inVals = append(inVals, fmt.Sprintf("'%s'", strItem))
|
|
} else {
|
|
inVals = append(inVals, fmt.Sprintf("%v", item))
|
|
}
|
|
}
|
|
if len(inVals) > 0 {
|
|
conditions = append(conditions, fmt.Sprintf("%s IN (%s)", k, strings.Join(inVals, ", ")))
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Handle exists condition
|
|
if k == "exists" {
|
|
if existsField, ok := v.(string); ok {
|
|
conditions = append(conditions, existsCondition(existsField, tableColumns))
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Handle string values
|
|
if strVal, ok := v.(string); ok {
|
|
strVal = strings.ReplaceAll(strVal, "'", "''")
|
|
conditions = append(conditions, fmt.Sprintf("%s='%s'", k, strVal))
|
|
continue
|
|
}
|
|
|
|
// Handle other values
|
|
conditions = append(conditions, fmt.Sprintf("%s=%v", k, v))
|
|
}
|
|
|
|
if len(conditions) == 0 {
|
|
return "1=1"
|
|
}
|
|
return strings.Join(conditions, " AND ")
|
|
}
|
|
|
|
// columnExists checks if a column exists in the table
|
|
func (e *infinityEngine) columnExists(table *infinity.Table, columnName string) (bool, error) {
|
|
colsResp, err := table.ShowColumns()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
result, ok := colsResp.(*infinity.QueryResult)
|
|
if !ok {
|
|
return false, fmt.Errorf("unexpected response type: %T", colsResp)
|
|
}
|
|
|
|
// ShowColumns returns a result set where Data contains arrays of column values
|
|
if nameArr, ok := result.Data["name"]; ok {
|
|
for i := 0; i < len(nameArr); i++ {
|
|
colName, _ := nameArr[i].(string)
|
|
if colName == columnName {
|
|
return true, nil
|
|
}
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
// buildChunkTableName returns the chunk table name for a dataset
|
|
// Skill Table: table name is just baseName (e.g., "skill_abc123_def456")
|
|
// Regular chunk Table: table name is {baseName}_{datasetID}
|
|
func buildChunkTableName(baseName, datasetID string) string {
|
|
if datasetID == "skill" {
|
|
return baseName
|
|
}
|
|
return fmt.Sprintf("%s_%s", baseName, datasetID)
|
|
}
|
|
|
|
// buildMetadataTableName returns the metadata table name for a tenant
|
|
func buildMetadataTableName(tenantID string) string {
|
|
return fmt.Sprintf("ragflow_doc_meta_%s", tenantID)
|
|
}
|