Implement InsertDataset and InsertMetadata in GO (#13883)

### What problem does this PR solve?

Implement InsertDataset and InsertMetadata in GO

new internal cli for go:

INSERT DATASET FROM FILE "file_name"
INSERT METADATA FROM FILE "file_name"

### Type of change

- [x] Refactoring
This commit is contained in:
qinling0210
2026-04-01 16:16:25 +08:00
committed by GitHub
parent b1d28b5898
commit bb4a06f759
18 changed files with 915 additions and 8 deletions

View File

@ -97,6 +97,8 @@ sql_command: login_user
| search_on_datasets
| get_chunk
| list_chunks
| insert_dataset_from_file
| insert_metadata_from_file
| create_chat_session
| drop_chat_session
| list_chat_sessions
@ -207,10 +209,12 @@ DOC_META: "DOC_META"i
CHUNK: "CHUNK"i
CHUNKS: "CHUNKS"i
GET: "GET"i
INSERT: "INSERT"i
PAGE: "PAGE"i
SIZE: "SIZE"i
KEYWORDS: "KEYWORDS"i
AVAILABLE: "AVAILABLE"i
FILE: "FILE"i
login_user: LOGIN USER quoted_string ";"
list_services: LIST SERVICES ";"
@ -349,6 +353,10 @@ parse_dataset_docs: PARSE quoted_string OF DATASET quoted_string ";"
parse_dataset_sync: PARSE DATASET quoted_string SYNC ";"
parse_dataset_async: PARSE DATASET quoted_string ASYNC ";"
// Internal CLI for GO
insert_dataset_from_file: INSERT DATASET FROM FILE quoted_string ";"
insert_metadata_from_file: INSERT METADATA FROM FILE quoted_string ";"
identifier_list: identifier ("," identifier)*
identifier: WORD
@ -750,6 +758,14 @@ class RAGFlowCLITransformer(Transformer):
chunk_id = items[2].children[0].strip("'\"")
return {"type": "get_chunk", "chunk_id": chunk_id}
def insert_dataset_from_file(self, items):
file_path = items[4].children[0].strip("'\"")
return {"type": "insert_dataset_from_file", "file_path": file_path}
def insert_metadata_from_file(self, items):
file_path = items[4].children[0].strip("'\"")
return {"type": "insert_metadata_from_file", "file_path": file_path}
def list_chunks(self, items):
doc_id = items[4].children[0].strip("'\"")
result = {"type": "list_chunks", "doc_id": doc_id}

View File

@ -1520,6 +1520,48 @@ class RAGFlowClient:
else:
print(f"Fail to get chunk, code: {res_json['code']}, message: {res_json['message']}")
# Internal
def insert_dataset_from_file(self, command_dict):
if self.server_type != "user":
print("This command is only allowed in USER mode")
return
file_path = command_dict["file_path"]
payload = {"file_path": file_path}
response = self.http_client.request("POST", "/kb/insert_from_file", json_body=payload,
use_api_base=False, auth_kind="web")
res_json = response.json()
if response.status_code == 200:
if res_json["code"] == 0:
print(f"Success to insert dataset from file: {file_path}")
if res_json.get("data"):
self._print_key_value(res_json["data"])
else:
print(f"Fail to insert dataset from file, code: {res_json['code']}, message: {res_json['message']}")
else:
print(f"Fail to insert dataset from file, code: {res_json['code']}, message: {res_json['message']}")
# Internal
def insert_metadata_from_file(self, command_dict):
if self.server_type != "user":
print("This command is only allowed in USER mode")
return
file_path = command_dict["file_path"]
payload = {"file_path": file_path}
response = self.http_client.request("POST", "/tenant/insert_metadata_from_file", json_body=payload,
use_api_base=False, auth_kind="web")
res_json = response.json()
if response.status_code == 200:
if res_json["code"] == 0:
print(f"Success to insert metadata from file: {file_path}")
if res_json.get("data"):
self._print_key_value(res_json["data"])
else:
print(f"Fail to insert metadata from file, code: {res_json['code']}, message: {res_json['message']}")
else:
print(f"Fail to insert metadata from file, code: {res_json['code']}, message: {res_json['message']}")
def list_chunks(self, command_dict):
if self.server_type != "user":
print("This command is only allowed in USER mode")
@ -1903,6 +1945,10 @@ def run_command(client: RAGFlowClient, command_dict: dict):
return client.search_on_datasets(command_dict)
case "get_chunk":
return client.get_chunk(command_dict)
case "insert_dataset_from_file":
return client.insert_dataset_from_file(command_dict)
case "insert_metadata_from_file":
return client.insert_metadata_from_file(command_dict)
case "list_chunks":
return client.list_chunks(command_dict)
case "meta":

6
go.mod
View File

@ -20,6 +20,8 @@ require (
github.com/spf13/viper v1.18.2
go.uber.org/zap v1.27.1
golang.org/x/crypto v0.47.0
golang.org/x/term v0.41.0
gopkg.in/yaml.v3 v3.0.1
gorm.io/driver/mysql v1.5.2
gorm.io/gorm v1.25.5
)
@ -98,12 +100,10 @@ require (
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b // indirect
golang.org/x/net v0.49.0 // indirect
golang.org/x/sys v0.42.0 // indirect
golang.org/x/term v0.41.0 // indirect
golang.org/x/text v0.33.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
replace github.com/infiniflow/infinity-go-sdk => github.com/infiniflow/infinity/go v0.0.0-20260317024756-4aff48d0d843
replace github.com/infiniflow/infinity-go-sdk => github.com/infiniflow/infinity/go v0.0.0-20260331112649-9bcd52a3d364

6
go.sum
View File

@ -98,8 +98,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/infiniflow/infinity/go v0.0.0-20260317024756-4aff48d0d843 h1:s5g1APIXv4c6hwVL4+DwT5JDvRpegZpxxh2ltZzgeGE=
github.com/infiniflow/infinity/go v0.0.0-20260317024756-4aff48d0d843/go.mod h1:hw3z5AwNFsGy1cdrE0Mfjot2y9jqVHTxBufUx9VzZ+0=
github.com/infiniflow/infinity/go v0.0.0-20260331112649-9bcd52a3d364 h1:0v5TjSirmCAUX3oaIV8Rd9d5B+kHPdymveETUU8OcC0=
github.com/infiniflow/infinity/go v0.0.0-20260331112649-9bcd52a3d364/go.mod h1:hw3z5AwNFsGy1cdrE0Mfjot2y9jqVHTxBufUx9VzZ+0=
github.com/iromli/go-itsdangerous v0.0.0-20220223194502-9c8bef8dac6a h1:Inib12UR9HAfBubrGNraPjKt/Cu8xPbTJbC50+0wP5U=
github.com/iromli/go-itsdangerous v0.0.0-20220223194502-9c8bef8dac6a/go.mod h1:8N0Hlye5Lzw+H/yHWpZMkT0QLA+iOHG7KLdvAm95DZg=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
@ -226,8 +226,6 @@ golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o=
golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8=
golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU=

View File

@ -223,6 +223,10 @@ func (c *RAGFlowClient) ExecuteUserCommand(cmd *Command) (ResponseIf, error) {
return c.CEList(cmd)
case "ce_search":
return c.CESearch(cmd)
case "insert_dataset_from_file":
return c.InsertDatasetFromFile(cmd)
case "insert_metadata_from_file":
return c.InsertMetadataFromFile(cmd)
// TODO: Implement other commands
default:
return nil, fmt.Errorf("command '%s' would be executed with API", cmd.Type)

View File

@ -305,6 +305,14 @@ func (l *Lexer) lookupIdent(ident string) Token {
return Token{Type: TokenAvailable, Value: ident}
case "NAME":
return Token{Type: TokenName, Value: ident}
case "POOL":
return Token{Type: TokenPool, Value: ident}
case "INSERT":
return Token{Type: TokenInsert, Value: ident}
case "FILE":
return Token{Type: TokenFile, Value: ident}
case "METADATA":
return Token{Type: TokenMetadata, Value: ident}
default:
return Token{Type: TokenIdentifier, Value: ident}
}

View File

@ -166,6 +166,8 @@ func (p *Parser) parseUserCommand() (*Command, error) {
return p.parseGenerateCommand()
case TokenImport:
return p.parseImportCommand()
case TokenInsert:
return p.parseInsertCommand()
case TokenSearch:
return p.parseSearchCommand()
case TokenParse:
@ -217,7 +219,7 @@ func (p *Parser) expectSemicolon() error {
}
func isKeyword(tokenType int) bool {
return tokenType >= TokenLogin && tokenType <= TokenDocMeta
return tokenType >= TokenLogin && tokenType <= TokenMetadata
}
// isCECommand checks if the given string is a ContextEngine command

View File

@ -104,6 +104,9 @@ const (
TokenVectorSize
TokenDocMeta
TokenName // For ALTER PROVIDER <name> NAME <new_name>
TokenInsert
TokenFile
TokenMetadata
// Literals
TokenIdentifier

View File

@ -941,3 +941,93 @@ func (c *RAGFlowClient) CESearch(cmd *Command) (ResponseIf, error) {
return &response, nil
}
// InsertDatasetFromFile inserts dataset chunks from a JSON file
func (c *RAGFlowClient) InsertDatasetFromFile(cmd *Command) (ResponseIf, error) {
if c.ServerType != "user" {
return nil, fmt.Errorf("this command is only allowed in USER mode")
}
filePath, ok := cmd.Params["file_path"].(string)
if !ok {
return nil, fmt.Errorf("file_path not provided")
}
payload := map[string]interface{}{
"file_path": filePath,
}
resp, err := c.HTTPClient.Request("POST", "/kb/insert_from_file", false, "web", nil, payload)
if err != nil {
return nil, fmt.Errorf("failed to insert dataset from file: %w", err)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("failed to insert dataset from file: HTTP %d, body: %s", resp.StatusCode, string(resp.Body))
}
resJSON, err := resp.JSON()
if err != nil {
return nil, fmt.Errorf("invalid JSON response: %w", err)
}
code, ok := resJSON["code"].(float64)
if !ok {
return nil, fmt.Errorf("invalid response format: code is not a number")
}
var result SimpleResponse
result.Code = int(code)
if result.Code == 0 {
result.Message = fmt.Sprintf("Success to insert dataset from file: %s", filePath)
} else {
result.Message = fmt.Sprintf("Failed to insert dataset from file: %v", resJSON)
}
result.Duration = 0
return &result, nil
}
// InsertMetadataFromFile inserts metadata from a JSON file
func (c *RAGFlowClient) InsertMetadataFromFile(cmd *Command) (ResponseIf, error) {
if c.ServerType != "user" {
return nil, fmt.Errorf("this command is only allowed in USER mode")
}
filePath, ok := cmd.Params["file_path"].(string)
if !ok {
return nil, fmt.Errorf("file_path not provided")
}
payload := map[string]interface{}{
"file_path": filePath,
}
resp, err := c.HTTPClient.Request("POST", "/tenant/insert_metadata_from_file", false, "web", nil, payload)
if err != nil {
return nil, fmt.Errorf("failed to insert metadata from file: %w", err)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("failed to insert metadata from file: HTTP %d, body: %s", resp.StatusCode, string(resp.Body))
}
resJSON, err := resp.JSON()
if err != nil {
return nil, fmt.Errorf("invalid JSON response: %w", err)
}
code, ok := resJSON["code"].(float64)
if !ok {
return nil, fmt.Errorf("invalid response format: code is not a number")
}
var result SimpleResponse
result.Code = int(code)
if result.Code == 0 {
result.Message = fmt.Sprintf("Success to insert metadata from file: %s", filePath)
} else {
result.Message = fmt.Sprintf("Failed to insert metadata from file: %v", resJSON)
}
result.Duration = 0
return &result, nil
}

View File

@ -32,6 +32,21 @@ func (p *Parser) parseLoginUser() (*Command, error) {
cmd.Params["email"] = email
p.nextToken()
// Optional: WITH PASSWORD 'password'
if p.curToken.Type == TokenWith {
p.nextToken()
if p.curToken.Type != TokenPassword {
return nil, fmt.Errorf("expected PASSWORD after WITH")
}
p.nextToken()
password, err := p.parseQuotedString()
if err != nil {
return nil, err
}
cmd.Params["password"] = password
p.nextToken()
}
// Semicolon is optional for UNSET TOKEN
if p.curToken.Type == TokenSemicolon {
p.nextToken()
@ -1531,6 +1546,88 @@ func (p *Parser) parseImportCommand() (*Command, error) {
return cmd, nil
}
// parseInsertCommand parses INSERT command and dispatches to specific handler
func (p *Parser) parseInsertCommand() (*Command, error) {
p.nextToken() // consume INSERT
// Expect DATASET or METADATA
if p.curToken.Type == TokenDataset {
return p.parseInsertDatasetFromFile()
}
if p.curToken.Type == TokenMetadata {
return p.parseInsertMetadataFromFile()
}
return nil, fmt.Errorf("expected DATASET or METADATA after INSERT, got %s", p.curToken.Value)
}
// Internal CLI for GO
// parseInsertDatasetFromFile parses: INSERT DATASET FROM FILE "file_path"
func (p *Parser) parseInsertDatasetFromFile() (*Command, error) {
p.nextToken() // consume DATASET
// Expect FROM
if p.curToken.Type != TokenFrom {
return nil, fmt.Errorf("expected FROM, got %s", p.curToken.Value)
}
p.nextToken()
// Expect FILE
if p.curToken.Type != TokenFile {
return nil, fmt.Errorf("expected FILE, got %s", p.curToken.Value)
}
p.nextToken()
// Get file path (quoted string)
filePath, err := p.parseQuotedString()
if err != nil {
return nil, err
}
cmd := NewCommand("insert_dataset_from_file")
cmd.Params["file_path"] = filePath
p.nextToken()
// Semicolon is optional
if p.curToken.Type == TokenSemicolon {
p.nextToken()
}
return cmd, nil
}
// Internal CLI for GO
// parseInsertMetadataFromFile parses: INSERT INTO METADATA FROM FILE "file_path"
func (p *Parser) parseInsertMetadataFromFile() (*Command, error) {
p.nextToken() // consume METADATA
// Expect FROM
if p.curToken.Type != TokenFrom {
return nil, fmt.Errorf("expected FROM, got %s", p.curToken.Value)
}
p.nextToken()
// Expect FILE
if p.curToken.Type != TokenFile {
return nil, fmt.Errorf("expected FILE, got %s", p.curToken.Value)
}
p.nextToken()
// Get file path (quoted string)
filePath, err := p.parseQuotedString()
if err != nil {
return nil, err
}
cmd := NewCommand("insert_metadata_from_file")
cmd.Params["file_path"] = filePath
p.nextToken()
// Semicolon is optional
if p.curToken.Type == TokenSemicolon {
p.nextToken()
}
return cmd, nil
}
func (p *Parser) parseSearchCommand() (*Command, error) {
p.nextToken() // consume SEARCH
question, err := p.parseQuotedString()
@ -1687,6 +1784,8 @@ func (p *Parser) parseUserStatement() (*Command, error) {
return p.parseParseCommand()
case TokenImport:
return p.parseImportCommand()
case TokenInsert:
return p.parseInsertCommand()
case TokenSearch:
return p.parseSearchCommand()
default:

View File

@ -151,3 +151,15 @@ func (e *elasticsearchEngine) CreateDocMetaIndex(ctx context.Context, indexName
// TODO
return nil
}
// InsertDataset inserts documents into a dataset index
func (e *elasticsearchEngine) InsertDataset(ctx context.Context, documents []map[string]interface{}, indexName string, knowledgebaseID string) ([]string, error) {
// TODO
return []string{}, nil
}
// InsertMetadata inserts documents into tenant's metadata index
func (e *elasticsearchEngine) InsertMetadata(ctx context.Context, documents []map[string]interface{}, tenantID string) ([]string, error) {
// TODO
return []string{}, nil
}

View File

@ -46,6 +46,10 @@ type DocEngine interface {
DeleteIndex(ctx context.Context, indexName string) error
IndexExists(ctx context.Context, indexName string) (bool, error)
// Insert operations
InsertDataset(ctx context.Context, documents []map[string]interface{}, indexName string, knowledgebaseID string) ([]string, error)
InsertMetadata(ctx context.Context, documents []map[string]interface{}, tenantID string) ([]string, error)
// Document operations
IndexDocument(ctx context.Context, indexName, docID string, doc interface{}) error
BulkIndex(ctx context.Context, indexName string, docs []interface{}) (interface{}, error)

View File

@ -24,6 +24,7 @@ import (
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
infinity "github.com/infiniflow/infinity-go-sdk"
@ -344,3 +345,321 @@ func (e *infinityEngine) CreateDocMetaIndex(ctx context.Context, indexName strin
return nil
}
// InsertDataset inserts chunks into a dataset table
// Table name format: {tableNamePrefix}_{knowledgebaseID}
// Auto-create the table if it doesn't exist
// Transform chunks before insert:
// - docnm_kwd -> docnm
// - title_kwd/title_sm_tks -> docnm (if docnm_kwd not set)
// - content_with_weight/content_ltks/content_sm_ltks -> content
// - important_kwd -> important_keywords (+ important_kwd_empty_count)
// - question_kwd -> questions (joined with \n)
// - kb_id: list -> str (first element)
// - position_int: list -> hex_joined string
// - chunk_data: dict -> JSON string
// - meta_fields: dict -> JSON string
// - *_feas fields -> JSON string
// - keyword fields with list values -> ### joined string
// - Missing embeddings filled with zeros
// Delete existing rows with matching IDs before insert
func (e *infinityEngine) InsertDataset(ctx context.Context, chunks []map[string]interface{}, tableNamePrefix string, knowledgebaseID string) ([]string, error) {
tableName := fmt.Sprintf("%s_%s", tableNamePrefix, knowledgebaseID)
logger.Info("InfinityConnection.InsertDataset called", zap.String("tableName", tableName), zap.Int("chunkCount", len(chunks)))
db, err := e.client.conn.GetDatabase(e.client.dbName)
if err != nil {
return nil, fmt.Errorf("Failed to get database: %w", err)
}
table, err := db.GetTable(tableName)
if err != nil {
// Table doesn't exist, try to create it
errMsg := strings.ToLower(err.Error())
if !strings.Contains(errMsg, "not found") && !strings.Contains(errMsg, "doesn't exist") {
return nil, fmt.Errorf("Failed to get table %s: %w", tableName, err)
}
// Infer vector size from chunks
vectorSize := 0
vectorPattern := regexp.MustCompile(`q_(\d+)_vec`)
for _, chunk := range chunks {
for key := range chunk {
matches := vectorPattern.FindStringSubmatch(key)
if len(matches) >= 2 {
vectorSize, _ = strconv.Atoi(matches[1])
break
}
}
if vectorSize > 0 {
break
}
}
if vectorSize == 0 {
return nil, fmt.Errorf("cannot infer vector size from chunks")
}
// Determine parser_id from chunk structure
parserID := ""
if chunkData, ok := chunks[0]["chunk_data"].(map[string]interface{}); ok && chunkData != nil {
parserID = "table"
}
// Create table
if err := e.CreateIndex(ctx, tableNamePrefix, knowledgebaseID, vectorSize, parserID); err != nil {
return nil, fmt.Errorf("Failed to create table: %w", err)
}
table, err = db.GetTable(tableName)
if err != nil {
return nil, fmt.Errorf("Failed to get table after creation: %w", err)
}
}
// Get embedding columns and their sizes
var embeddingCols [][2]interface{}
colsResp, err := table.ShowColumns()
if err != nil {
return nil, fmt.Errorf("Failed to get columns: %w", err)
}
result, ok := colsResp.(*infinity.QueryResult)
if !ok {
return nil, fmt.Errorf("unexpected response type: %T", colsResp)
}
// ShowColumns returns a result set where Data contains arrays of column values
re := regexp.MustCompile(`Embedding\([a-z]+,(\d+)\)`)
if nameArr, ok := result.Data["name"]; ok {
if typeArr, ok := result.Data["type"]; ok {
for i := 0; i < len(nameArr); i++ {
colName, _ := nameArr[i].(string)
colType, _ := typeArr[i].(string)
matches := re.FindStringSubmatch(colType)
if len(matches) >= 2 {
size, _ := strconv.Atoi(matches[1])
embeddingCols = append(embeddingCols, [2]interface{}{colName, size})
}
}
}
}
// Transform chunks
insertChunks := make([]map[string]interface{}, len(chunks))
for i, chunk := range chunks {
d := make(map[string]interface{})
for k, v := range chunk {
switch k {
case "docnm_kwd":
d["docnm"] = v
case "title_kwd":
if _, exists := chunk["docnm_kwd"]; !exists {
d["docnm"] = utility.ConvertToString(v)
}
case "title_sm_tks":
if _, exists := chunk["docnm_kwd"]; !exists {
d["docnm"] = utility.ConvertToString(v)
}
case "important_kwd":
if list, ok := v.([]interface{}); ok {
emptyCount := 0
tokens := make([]string, 0)
for _, item := range list {
if str, ok := item.(string); ok {
if str == "" {
emptyCount++
} else {
tokens = append(tokens, str)
}
}
}
d["important_keywords"] = strings.Join(tokens, ",")
d["important_kwd_empty_count"] = emptyCount
} else {
d["important_keywords"] = utility.ConvertToString(v)
}
case "important_tks":
if _, exists := chunk["important_kwd"]; !exists {
d["important_keywords"] = v
}
case "content_with_weight":
d["content"] = v
case "content_ltks":
if _, exists := chunk["content_with_weight"]; !exists {
d["content"] = v
}
case "content_sm_ltks":
if _, exists := chunk["content_with_weight"]; !exists {
d["content"] = v
}
case "authors_tks":
d["authors"] = v
case "authors_sm_tks":
if _, exists := chunk["authors_tks"]; !exists {
d["authors"] = v
}
case "question_kwd":
d["questions"] = strings.Join(utility.ConvertToStringSlice(v), "\n")
case "question_tks":
if _, exists := chunk["question_kwd"]; !exists {
d["questions"] = utility.ConvertToString(v)
}
case "kb_id":
if list, ok := v.([]interface{}); ok && len(list) > 0 {
d["kb_id"] = list[0]
} else {
d["kb_id"] = v
}
case "position_int":
if list, ok := v.([]interface{}); ok {
d["position_int"] = utility.ConvertPositionIntArrayToHex(list)
} else {
d["position_int"] = v
}
case "page_num_int", "top_int":
if list, ok := v.([]interface{}); ok {
d[k] = utility.ConvertIntArrayToHex(list)
} else {
d[k] = v
}
case "chunk_data":
d["chunk_data"] = utility.ConvertMapToJSONString(v)
default:
// Check for *_feas fields
if strings.HasSuffix(k, "_feas") {
jsonBytes, _ := json.Marshal(v)
d[k] = string(jsonBytes)
} else if fieldKeyword(k) {
// keyword fields with list values -> ### joined
if list, ok := v.([]interface{}); ok {
d[k] = strings.Join(utility.ConvertToStringSlice(list), "###")
} else {
d[k] = v
}
} else {
d[k] = v
}
}
}
// Remove intermediate token fields
for _, key := range []string{"docnm_kwd", "title_tks", "title_sm_tks", "important_kwd", "important_tks",
"content_with_weight", "content_ltks", "content_sm_ltks", "authors_tks", "authors_sm_tks",
"question_kwd", "question_tks"} {
delete(d, key)
}
// Fill missing embedding columns with zeros (raw slice, matching Python SDK)
for _, ec := range embeddingCols {
name, size := ec[0].(string), ec[1].(int)
if _, exists := d[name]; !exists {
zeros := make([]float64, size)
for i := range zeros {
zeros[i] = 0
}
d[name] = zeros
}
}
insertChunks[i] = d
}
// Delete existing rows with matching IDs
if len(insertChunks) > 0 {
idList := make([]string, len(insertChunks))
for i, chunk := range insertChunks {
idList[i] = fmt.Sprintf("'%v'", chunk["id"])
}
filter := fmt.Sprintf("id IN (%s)", strings.Join(idList, ", "))
logger.Debug(fmt.Sprintf("Deleting existing rows with filter: %s", filter))
delResp, delErr := table.Delete(filter)
if delErr != nil {
logger.Warn(fmt.Sprintf("Failed to delete existing rows: %v", delErr))
} else {
logger.Info(fmt.Sprintf("Deleted %d existing rows", delResp.DeletedRows))
}
}
// Insert chunks to dataset
_, err = table.Insert(insertChunks)
if err != nil {
return nil, fmt.Errorf("Failed to insert chunks to dataset: %w", err)
}
logger.Info("InfinityConnection.InsertDataset result", zap.String("tableName", tableName), zap.Int("count", len(insertChunks)))
return []string{}, nil
}
// InsertMetadata inserts document metadata into tenant's metadata table
// Table name format: ragflow_doc_meta_{tenant_id}
// Auto-create the table if it doesn't exist
// Replace existing metadata with same id and kb_id
func (e *infinityEngine) InsertMetadata(ctx context.Context, metadata []map[string]interface{}, tenantID string) ([]string, error) {
tableName := fmt.Sprintf("ragflow_doc_meta_%s", tenantID)
logger.Info("InfinityConnection.InsertMetadata called", zap.String("tableName", tableName), zap.Int("metaCount", len(metadata)))
db, err := e.client.conn.GetDatabase(e.client.dbName)
if err != nil {
return nil, fmt.Errorf("Failed to get database: %w", err)
}
table, err := db.GetTable(tableName)
if err != nil {
// Table doesn't exist, try to create it
errMsg := strings.ToLower(err.Error())
if !strings.Contains(errMsg, "not found") && !strings.Contains(errMsg, "doesn't exist") {
return nil, fmt.Errorf("Failed to get table %s: %w", tableName, err)
}
// Create metadata table
if createErr := e.CreateDocMetaIndex(ctx, tableName); createErr != nil {
return nil, fmt.Errorf("Failed to create metadata table: %w", createErr)
}
table, err = db.GetTable(tableName)
if err != nil {
return nil, fmt.Errorf("Failed to get table after creation: %w", err)
}
}
// Transform metadata - convert meta_fields map to JSON string
insertMetadata := make([]map[string]interface{}, len(metadata))
for i, m := range metadata {
d := make(map[string]interface{})
for k, v := range m {
if k == "meta_fields" {
d["meta_fields"] = utility.ConvertMapToJSONString(v)
} else {
d[k] = v
}
}
insertMetadata[i] = d
}
// Delete existing metadata with same id and kb_id, then insert new
if len(insertMetadata) > 0 {
idList := make([]string, len(insertMetadata))
for i, m := range insertMetadata {
docID := fmt.Sprintf("'%v'", m["id"])
kbID := fmt.Sprintf("'%v'", m["kb_id"])
idList[i] = fmt.Sprintf("(id = %s AND kb_id = %s)", docID, kbID)
}
filter := strings.Join(idList, " OR ")
logger.Debug(fmt.Sprintf("Deleting existing metadata with filter: %s", filter))
delResp, delErr := table.Delete(filter)
if delErr != nil {
logger.Warn(fmt.Sprintf("Failed to delete existing metadata: %v", delErr))
} else if delResp.DeletedRows > 0 {
logger.Info(fmt.Sprintf("Deleted %d existing metadata entries", delResp.DeletedRows))
}
}
// Insert metadata
_, err = table.Insert(insertMetadata)
if err != nil {
return nil, fmt.Errorf("Failed to insert metadata: %w", err)
}
logger.Info("InfinityConnection.InsertMetadata result", zap.String("tableName", tableName), zap.Int("metaCount", len(metadata)))
return []string{}, nil
}

View File

@ -17,8 +17,11 @@
package handler
import (
"encoding/json"
"net/http"
"os"
"ragflow/internal/common"
"ragflow/internal/engine"
"ragflow/internal/service"
"strconv"
"strings"
@ -703,3 +706,92 @@ func (h *KnowledgebaseHandler) DeleteIndex(c *gin.Context) {
jsonResponse(c, common.CodeSuccess, nil, "success")
}
// InsertDatasetFromFileRequest request for inserting chunks into dataset from file
type InsertDatasetFromFileRequest struct {
FilePath string `json:"file_path" binding:"required"`
}
// @Summary Insert chunks into dataset from file
// @Description Internal: Insert into dataset table from a JSON file (table name extracted from file)
// @Tags knowledgebase
// @Accept json
// @Produce json
// @Security ApiKeyAuth
// @Param request body InsertDatasetFromFileRequest true "insert dataset request"
// @Success 200 {object} map[string]interface{}
// @Router /v1/kb/insert_from_file [post]
func (h *KnowledgebaseHandler) InsertDatasetFromFile(c *gin.Context) {
_, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
var req InsertDatasetFromFileRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": err.Error(),
})
return
}
if req.FilePath == "" {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": "file_path is required",
})
return
}
// Read the JSON file
data, err := os.ReadFile(req.FilePath)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": "failed to read file: " + err.Error(),
})
return
}
// Parse JSON - format: {"table_name": ..., "knowledgebase_id": ..., "chunks": [...]}
var debugFormat struct {
TableNamePrefix string `json:"table_name"`
KnowledgebaseID string `json:"knowledgebase_id"`
Chunks []map[string]interface{} `json:"chunks"`
}
if err := json.Unmarshal(data, &debugFormat); err != nil || debugFormat.Chunks == nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": "invalid JSON format: expected {\"table_name\": ..., \"knowledgebase_id\": ..., \"chunks\": [...]}",
})
return
}
if len(debugFormat.Chunks) == 0 {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": "no chunks found in file",
})
return
}
// Get the document engine and insert
docEngine := engine.Get()
result, err := docEngine.InsertDataset(c.Request.Context(), debugFormat.Chunks, debugFormat.TableNamePrefix, debugFormat.KnowledgebaseID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"code": 500,
"message": "failed to insert into dataset: " + err.Error(),
})
return
}
c.JSON(http.StatusOK, gin.H{
"code": 0,
"data": result,
"message": "success",
})
}

View File

@ -17,11 +17,14 @@
package handler
import (
"encoding/json"
"net/http"
"os"
"github.com/gin-gonic/gin"
"ragflow/internal/common"
"ragflow/internal/engine"
"ragflow/internal/service"
)
@ -177,3 +180,93 @@ func (h *TenantHandler) DeleteDocMetaIndex(c *gin.Context) {
"data": nil,
})
}
// InsertMetadataFromFileRequest request for inserting metadata from file
type InsertMetadataFromFileRequest struct {
FilePath string `json:"file_path" binding:"required"`
}
// @Summary Insert document metadata from JSON file
// @Description Internal: Insert metadata into tenant's metadata table from a JSON file
// @Tags tenants
// @Accept json
// @Produce json
// @Security ApiKeyAuth
// @Param request body InsertMetadataFromFileRequest true "insert metadata request"
// @Success 200 {object} map[string]interface{}
// @Router /v1/tenant/insert_metadata_from_file [post]
func (h *TenantHandler) InsertMetadataFromFile(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
var req InsertMetadataFromFileRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": err.Error(),
})
return
}
if req.FilePath == "" {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": "file_path is required",
})
return
}
// Read the JSON file
data, err := os.ReadFile(req.FilePath)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": "failed to read file: " + err.Error(),
})
return
}
// Parse JSON - format: {"chunks": [...]}
var inputFormat struct {
Chunks []map[string]interface{} `json:"chunks"`
}
if err := json.Unmarshal(data, &inputFormat); err != nil || inputFormat.Chunks == nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": "invalid JSON format: expected {\"chunks\": [...]}",
})
return
}
if len(inputFormat.Chunks) == 0 {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": "no chunks found in file",
})
return
}
// Use user.ID as tenant ID (user IS the tenant in user mode)
tenantID := user.ID
// Get the document engine and insert
docEngine := engine.Get()
result, err := docEngine.InsertMetadata(c.Request.Context(), inputFormat.Chunks, tenantID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"code": 500,
"message": "failed to insert metadata: " + err.Error(),
})
return
}
c.JSON(http.StatusOK, gin.H{
"code": 0,
"data": result,
"message": "success",
})
}

View File

@ -213,6 +213,7 @@ func (r *Router) Setup(engine *gin.Engine) {
kb.GET("/basic_info", r.knowledgebaseHandler.GetBasicInfo)
kb.POST("/index", r.knowledgebaseHandler.CreateIndex)
kb.DELETE("/index", r.knowledgebaseHandler.DeleteIndex)
kb.POST("/insert_from_file", r.knowledgebaseHandler.InsertDatasetFromFile)
// KB ID specific routes
kbByID := kb.Group("/:kb_id")
@ -230,6 +231,7 @@ func (r *Router) Setup(engine *gin.Engine) {
{
tenant.POST("/doc_meta_index", r.tenantHandler.CreateDocMetaIndex)
tenant.DELETE("/doc_meta_index", r.tenantHandler.DeleteDocMetaIndex)
tenant.POST("/insert_metadata_from_file", r.tenantHandler.InsertMetadataFromFile)
}
// Document routes

View File

@ -17,6 +17,7 @@
package utility
import (
"encoding/json"
"fmt"
"os"
"strconv"
@ -142,6 +143,26 @@ func ConvertHexToPositionIntArray(hexStr string) interface{} {
return result
}
// ConvertPositionIntArrayToHex converts position_int list (2D) to hex string
// e.g. [[1,2],[3,4]] -> "0000000100000002_0000000300000004"
func ConvertPositionIntArrayToHex(list []interface{}) string {
var hexParts []string
for _, item := range list {
if inner, ok := item.([]interface{}); ok {
for _, num := range inner {
if n, ok := num.(float64); ok {
hexParts = append(hexParts, fmt.Sprintf("%08x", int64(n)))
} else if n, ok := num.(int64); ok {
hexParts = append(hexParts, fmt.Sprintf("%08x", n))
} else if n, ok := num.(int); ok {
hexParts = append(hexParts, fmt.Sprintf("%08x", n))
}
}
}
}
return strings.Join(hexParts, "_")
}
// ConvertHexToIntArray converts hex string to int array (split by "_")
func ConvertHexToIntArray(hexStr string) interface{} {
if hexStr == "" {
@ -167,6 +188,22 @@ func ConvertHexToIntArray(hexStr string) interface{} {
return result
}
// ConvertIntArrayToHex converts int array to hex string
// e.g. [1, 2] -> "00000001_00000002"
func ConvertIntArrayToHex(list []interface{}) string {
var hexParts []string
for _, num := range list {
if n, ok := num.(float64); ok {
hexParts = append(hexParts, fmt.Sprintf("%08x", int64(n)))
} else if n, ok := num.(int64); ok {
hexParts = append(hexParts, fmt.Sprintf("%08x", n))
} else if n, ok := num.(int); ok {
hexParts = append(hexParts, fmt.Sprintf("%08x", n))
}
}
return strings.Join(hexParts, "_")
}
// IsEmpty checks if value is empty (nil, empty array, or empty string)
func IsEmpty(v interface{}) bool {
if v == nil {
@ -217,3 +254,71 @@ func ToFloat64(val interface{}) (float64, bool) {
return 0, false
}
}
// ConvertToStringSlice converts an interface{} to []string
// e.g. []interface{}{"a", "b", "c"} -> []string{"a", "b", "c"}
// e.g. "hello" -> []string{"hello"}
func ConvertToStringSlice(v interface{}) []string {
if v == nil {
return nil
}
switch val := v.(type) {
case []interface{}:
result := make([]string, 0, len(val))
for _, item := range val {
if s, ok := item.(string); ok {
result = append(result, s)
} else {
result = append(result, fmt.Sprintf("%v", item))
}
}
return result
case []string:
return val
case string:
return []string{val}
default:
return nil
}
}
// ConvertToString converts an interface{} to space-separated string
// For []interface{}, joins elements with space; for other types, returns string representation
// e.g. []interface{}{"a", "b", "c"} -> "a b c"
// e.g. "hello" -> "hello"
func ConvertToString(v interface{}) string {
if v == nil {
return ""
}
switch val := v.(type) {
case []interface{}:
parts := make([]string, 0, len(val))
for _, item := range val {
if s, ok := item.(string); ok {
parts = append(parts, s)
} else {
parts = append(parts, fmt.Sprintf("%v", item))
}
}
return strings.Join(parts, " ")
default:
return fmt.Sprintf("%v", v)
}
}
// ConvertMapToJSONString converts a map to JSON string for Infinity JSON columns
// If v is a map[string]interface{}, marshals it to JSON string
// If v is nil, returns "{}"
// Otherwise returns v as-is
//
// e.g. map[string]interface{}{"key": "value"}) -> `"{\"key\":\"value\"}"`
func ConvertMapToJSONString(v interface{}) interface{} {
if v == nil {
return "{}"
}
if m, ok := v.(map[string]interface{}); ok {
jsonBytes, _ := json.Marshal(m)
return string(jsonBytes)
}
return v
}

View File

@ -319,6 +319,20 @@ class InfinityConnection(InfinityConnectionBase):
return res_fields.get(chunk_id, None)
def insert(self, documents: list[dict], index_name: str, knowledgebase_id: str = None) -> list[str]:
'''
# Save input to file to test inserting from file in GO
import datetime
import os
debug_file = os.path.join("/var/infinity/tmp", f"insert_{index_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.json")
with open(debug_file, 'w') as f:
json.dump({
"table_name": index_name,
"knowledgebase_id": knowledgebase_id,
"chunks": documents
}, f, indent=2)
self.logger.debug(f"Saved insert input to {debug_file}")
'''
inf_conn = self.connPool.get_conn()
try:
db_instance = inf_conn.get_database(self.dbName)