diff --git a/cmd/server_main.go b/cmd/server_main.go index 01713473d..e7d629216 100644 --- a/cmd/server_main.go +++ b/cmd/server_main.go @@ -203,7 +203,7 @@ func startServer(config *server.Config) { // Initialize handler layer authHandler := handler.NewAuthHandler() userHandler := handler.NewUserHandler(userService) - tenantHandler := handler.NewTenantHandler(tenantService, userService) + tenantHandler := handler.NewTenantHandler(tenantService, userService, knowledgebaseService) documentHandler := handler.NewDocumentHandler(documentService, datasetsService) datasetsHandler := handler.NewDatasetsHandler(datasetsService, metadataService) systemHandler := handler.NewSystemHandler(systemService) diff --git a/internal/cli/client.go b/internal/cli/client.go index 20efd2e65..15df81682 100644 --- a/internal/cli/client.go +++ b/internal/cli/client.go @@ -309,17 +309,16 @@ func (c *RAGFlowClient) ExecuteUserCommand(cmd *Command) (ResponseIf, error) { return c.ListTasksUserCommand(cmd) case "show_task_user_command": return c.ShowTaskUserCommand(cmd) - // Dataset, metadata commands - case "create_dataset_table": - return c.CreateDatasetInDocEngine(cmd) - case "drop_dataset_table": - return c.DropDatasetInDocEngine(cmd) - case "create_metadata_table": - return c.CreateMetadataInDocEngine(cmd) - case "drop_metadata_table": - return c.DropMetadataInDocEngine(cmd) - case "insert_dataset_from_file": - return c.InsertDatasetFromFile(cmd) + case "create_chunk_store": + return c.CreateChunkStore(cmd) + case "drop_chunk_store": + return c.DropChunkStore(cmd) + case "create_metadata_store": + return c.CreateMetadataStore(cmd) + case "drop_metadata_store": + return c.DropMetadataStore(cmd) + case "insert_chunks_from_file": + return c.InsertChunksFromFile(cmd) case "insert_metadata_from_file": return c.InsertMetadataFromFile(cmd) case "update_chunk": @@ -328,6 +327,8 @@ func (c *RAGFlowClient) ExecuteUserCommand(cmd *Command) (ResponseIf, error) { return c.GetChunk(cmd) case "set_meta": return c.SetMeta(cmd) + case "delete_meta": + return c.DeleteMeta(cmd) case "rm_tags": return c.RmTags(cmd) case "remove_chunks": diff --git a/internal/cli/lexer.go b/internal/cli/lexer.go index 8676d8dbf..ea9075908 100644 --- a/internal/cli/lexer.go +++ b/internal/cli/lexer.go @@ -222,18 +222,6 @@ func (l *Lexer) lookupIdent(ident string) Token { case "PASSWORD": return Token{Type: TokenPassword, Value: ident} case "DATASET": - // Check if followed by TABLE for compound token - if strings.ToUpper(l.peekToken()) == "TABLE" { - // Skip whitespace to TABLE - for l.ch == ' ' || l.ch == '\t' || l.ch == '\n' || l.ch == '\r' { - l.readChar() - } - // Skip past TABLE - for isLetter(l.ch) || isDigit(l.ch) || l.ch == '_' || l.ch == '-' || l.ch == '.' { - l.readChar() - } - return Token{Type: TokenDatasetTable, Value: "DATASET TABLE"} - } return Token{Type: TokenDataset, Value: ident} case "DATASETS": return Token{Type: TokenDatasets, Value: ident} @@ -327,6 +315,8 @@ func (l *Lexer) lookupIdent(ident string) Token { return Token{Type: TokenHigh, Value: ident} case "MAX": return Token{Type: TokenMax, Value: ident} + case "STORE": + return Token{Type: TokenStore, Value: ident} case "STREAM": return Token{Type: TokenStream, Value: ident} case "LS": @@ -430,6 +420,18 @@ func (l *Lexer) lookupIdent(ident string) Token { case "REMOVE": return Token{Type: TokenRemove, Value: ident} case "CHUNK": + // Check if followed by STORE for compound token + if strings.ToUpper(l.peekToken()) == "STORE" { + // Skip whitespace to STORE + for l.ch == ' ' || l.ch == '\t' || l.ch == '\n' || l.ch == '\r' { + l.readChar() + } + // Skip past STORE + for isLetter(l.ch) || isDigit(l.ch) || l.ch == '_' || l.ch == '-' || l.ch == '.' { + l.readChar() + } + return Token{Type: TokenChunkStore, Value: "CHUNK STORE"} + } return Token{Type: TokenChunk, Value: ident} case "CHUNKS": return Token{Type: TokenChunks, Value: ident} @@ -465,6 +467,8 @@ func (l *Lexer) lookupIdent(ident string) Token { return Token{Type: TokenDebug, Value: ident} case "INFO": return Token{Type: TokenInfo, Value: ident} + case "IN": + return Token{Type: TokenIn, Value: ident} case "WARN": return Token{Type: TokenWarn, Value: ident} case "ERROR": diff --git a/internal/cli/types.go b/internal/cli/types.go index 9e9f123ab..ed662843e 100644 --- a/internal/cli/types.go +++ b/internal/cli/types.go @@ -47,7 +47,7 @@ const ( TokenPassword TokenDataset TokenDatasets - TokenDatasetTable + TokenChunkStore TokenOf TokenAgents TokenRole @@ -91,6 +91,7 @@ const ( TokenParse TokenImport TokenInto + TokenIn TokenWith TokenParser TokenPipeline @@ -122,6 +123,7 @@ const ( TokenIndex TokenVector TokenSize + TokenStore TokenName // For ALTER PROVIDER NAME TokenBalance TokenInstance diff --git a/internal/cli/user_command.go b/internal/cli/user_command.go index 67618a543..448295445 100644 --- a/internal/cli/user_command.go +++ b/internal/cli/user_command.go @@ -875,8 +875,8 @@ func (c *RAGFlowClient) UnsetToken(cmd *Command) (ResponseIf, error) { return &result, nil } -// CreateDataset creates a table for a dataset -func (c *RAGFlowClient) CreateDataset(cmd *Command) (ResponseIf, error) { +// CreateChunkStore creates a chunk store in doc engine +func (c *RAGFlowClient) CreateChunkStore(cmd *Command) (ResponseIf, error) { if c.ServerType != "user" { return nil, fmt.Errorf("this command is only allowed in USER mode") } @@ -902,13 +902,13 @@ func (c *RAGFlowClient) CreateDataset(cmd *Command) (ResponseIf, error) { "vector_size": vectorSize, } - resp, err := c.HTTPClient.Request("POST", "/kb/doc_engine_table", "web", nil, payload) + resp, err := c.HTTPClient.Request("POST", "/tenant/chunk_store", "web", nil, payload) if err != nil { - return nil, fmt.Errorf("failed to create table: %w", err) + return nil, fmt.Errorf("failed to create chunk store: %w", err) } if resp.StatusCode != 200 { - return nil, fmt.Errorf("failed to create table: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) + return nil, fmt.Errorf("failed to create chunk store: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) } resJSON, err := resp.JSON() @@ -924,73 +924,16 @@ func (c *RAGFlowClient) CreateDataset(cmd *Command) (ResponseIf, error) { var result SimpleResponse result.Code = int(code) if result.Code == 0 { - result.Message = fmt.Sprintf("Success to create table for dataset: %s", datasetName) + result.Message = fmt.Sprintf("Success to create chunk store for dataset: %s", datasetName) } else { - result.Message = fmt.Sprintf("Failed to create table: %v", resJSON) + result.Message = fmt.Sprintf("Failed to create chunk store: %v", resJSON) } result.Duration = 0 return &result, nil } -// CreateDatasetInDocEngine creates a table for a dataset in doc engine -func (c *RAGFlowClient) CreateDatasetInDocEngine(cmd *Command) (ResponseIf, error) { - if c.ServerType != "user" { - return nil, fmt.Errorf("this command is only allowed in USER mode") - } - - datasetName, ok := cmd.Params["dataset_name"].(string) - if !ok { - return nil, fmt.Errorf("dataset_name not provided") - } - - vectorSize, ok := cmd.Params["vector_size"].(int) - if !ok { - return nil, fmt.Errorf("vector_size not provided") - } - - // Get dataset ID by name - datasetID, err := c.getDatasetID(datasetName) - if err != nil { - return nil, err - } - - payload := map[string]interface{}{ - "kb_id": datasetID, - "vector_size": vectorSize, - } - - resp, err := c.HTTPClient.Request("POST", "/kb/doc_engine_table", "web", nil, payload) - if err != nil { - return nil, fmt.Errorf("failed to create table: %w", err) - } - - if resp.StatusCode != 200 { - return nil, fmt.Errorf("failed to create table: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) - } - - resJSON, err := resp.JSON() - if err != nil { - return nil, fmt.Errorf("invalid JSON response: %w", err) - } - - code, ok := resJSON["code"].(float64) - if !ok { - return nil, fmt.Errorf("invalid response format: code is not a number") - } - - var result SimpleResponse - result.Code = int(code) - if result.Code == 0 { - result.Message = fmt.Sprintf("Success to create table for dataset: %s", datasetName) - } else { - result.Message = fmt.Sprintf("Failed to create table: %v", resJSON) - } - result.Duration = 0 - return &result, nil -} - -// DropDatasetInDocEngine drops a table for a dataset in doc engine -func (c *RAGFlowClient) DropDatasetInDocEngine(cmd *Command) (ResponseIf, error) { +// DropChunkStore drops a chunk store in doc engine +func (c *RAGFlowClient) DropChunkStore(cmd *Command) (ResponseIf, error) { if c.ServerType != "user" { return nil, fmt.Errorf("this command is only allowed in USER mode") } @@ -1010,7 +953,7 @@ func (c *RAGFlowClient) DropDatasetInDocEngine(cmd *Command) (ResponseIf, error) "kb_id": datasetID, } - resp, err := c.HTTPClient.Request("DELETE", "/kb/doc_engine_table", "web", nil, payload) + resp, err := c.HTTPClient.Request("DELETE", "/tenant/chunk_store", "web", nil, payload) if err != nil { return nil, fmt.Errorf("failed to drop dataset: %w", err) } @@ -1032,27 +975,27 @@ func (c *RAGFlowClient) DropDatasetInDocEngine(cmd *Command) (ResponseIf, error) var result SimpleResponse result.Code = int(code) if result.Code == 0 { - result.Message = fmt.Sprintf("Success to drop table for dataset: %s", datasetName) + result.Message = fmt.Sprintf("Success to drop chunk store for dataset: %s", datasetName) } else { - result.Message = fmt.Sprintf("Failed to drop table for dataset: %s: %v", datasetName, resJSON) + result.Message = fmt.Sprintf("Failed to drop chunk store for dataset: %s: %v", datasetName, resJSON) } result.Duration = 0 return &result, nil } -// CreateMetadataInDocEngine creates the document metadata table for the tenant -func (c *RAGFlowClient) CreateMetadataInDocEngine(cmd *Command) (ResponseIf, error) { +// CreateMetadataStore creates the document metadata store for the tenant +func (c *RAGFlowClient) CreateMetadataStore(cmd *Command) (ResponseIf, error) { if c.ServerType != "user" { return nil, fmt.Errorf("this command is only allowed in USER mode") } - resp, err := c.HTTPClient.Request("POST", "/tenant/doc_engine_metadata_table", "web", nil, nil) + resp, err := c.HTTPClient.Request("POST", "/tenant/metadata_store", "web", nil, nil) if err != nil { - return nil, fmt.Errorf("failed to create metadata table: %w", err) + return nil, fmt.Errorf("failed to create metadata store: %w", err) } if resp.StatusCode != 200 { - return nil, fmt.Errorf("failed to create metadata table: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) + return nil, fmt.Errorf("failed to create metadata store: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) } resJSON, err := resp.JSON() @@ -1068,27 +1011,27 @@ func (c *RAGFlowClient) CreateMetadataInDocEngine(cmd *Command) (ResponseIf, err var result SimpleResponse result.Code = int(code) if result.Code == 0 { - result.Message = "Success to create metadata table" + result.Message = "Success to create metadata store" } else { - result.Message = fmt.Sprintf("Failed to create metadata table: %v", resJSON) + result.Message = fmt.Sprintf("Failed to create metadata store: %v", resJSON) } result.Duration = 0 return &result, nil } -// DropMetadataInDocEngine drops the document metadata table for the tenant -func (c *RAGFlowClient) DropMetadataInDocEngine(cmd *Command) (ResponseIf, error) { +// DropMetadataStore drops the document metadata store for the tenant +func (c *RAGFlowClient) DropMetadataStore(cmd *Command) (ResponseIf, error) { if c.ServerType != "user" { return nil, fmt.Errorf("this command is only allowed in USER mode") } - resp, err := c.HTTPClient.Request("DELETE", "/tenant/doc_engine_metadata_table", "web", nil, nil) + resp, err := c.HTTPClient.Request("DELETE", "/tenant/metadata_store", "web", nil, nil) if err != nil { - return nil, fmt.Errorf("failed to drop metadata table: %w", err) + return nil, fmt.Errorf("failed to drop metadata store: %w", err) } if resp.StatusCode != 200 { - return nil, fmt.Errorf("failed to drop metadata table: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) + return nil, fmt.Errorf("failed to drop metadata store: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) } resJSON, err := resp.JSON() @@ -1104,9 +1047,9 @@ func (c *RAGFlowClient) DropMetadataInDocEngine(cmd *Command) (ResponseIf, error var result SimpleResponse result.Code = int(code) if result.Code == 0 { - result.Message = "Success to drop metadata table" + result.Message = "Success to drop metadata store" } else { - result.Message = fmt.Sprintf("Failed to drop metadata table: %v", resJSON) + result.Message = fmt.Sprintf("Failed to drop metadata store: %v", resJSON) } result.Duration = 0 return &result, nil @@ -2838,8 +2781,8 @@ func (c *RAGFlowClient) CESearch(cmd *Command) (ResponseIf, error) { return &response, nil } -// InsertDatasetFromFile inserts dataset chunks from a JSON file -func (c *RAGFlowClient) InsertDatasetFromFile(cmd *Command) (ResponseIf, error) { +// InsertChunksFromFile inserts chunks from a JSON file +func (c *RAGFlowClient) InsertChunksFromFile(cmd *Command) (ResponseIf, error) { if c.ServerType != "user" { return nil, fmt.Errorf("this command is only allowed in USER mode") } @@ -2852,7 +2795,7 @@ func (c *RAGFlowClient) InsertDatasetFromFile(cmd *Command) (ResponseIf, error) "file_path": filePath, } - resp, err := c.HTTPClient.Request("POST", "/kb/insert_from_file", "web", nil, payload) + resp, err := c.HTTPClient.Request("POST", "/tenant/insert_chunks_from_file", "web", nil, payload) if err != nil { return nil, fmt.Errorf("failed to insert dataset from file: %w", err) } @@ -2938,42 +2881,25 @@ func (c *RAGFlowClient) UpdateChunk(cmd *Command) (ResponseIf, error) { return nil, fmt.Errorf("chunk_id not provided") } + docID, ok := cmd.Params["doc_id"].(string) + if !ok { + return nil, fmt.Errorf("doc_id not provided") + } + datasetName, ok := cmd.Params["dataset_name"].(string) if !ok { return nil, fmt.Errorf("dataset_name not provided") } - jsonBody, ok := cmd.Params["json_body"].(string) - if !ok { - return nil, fmt.Errorf("json_body not provided") - } - // Look up dataset_id from dataset_name datasetID, err := c.getDatasetID(datasetName) if err != nil { return nil, fmt.Errorf("failed to get dataset ID: %w", err) } - // Try to get doc_id from the chunk retrieval endpoint - getResp, err := c.HTTPClient.Request("GET", "/chunk/get?chunk_id="+chunkID, "web", nil, nil) - if err != nil { - return nil, fmt.Errorf("failed to get chunk info: %w", err) - } - - var docID string - if getResp.StatusCode == 200 { - getJSON, err := getResp.JSON() - if err == nil { - if data, ok := getJSON["data"].(map[string]interface{}); ok { - if d, ok := data["doc_id"].(string); ok { - docID = d - } - } - } - } - - if docID == "" { - return nil, fmt.Errorf("could not find document_id for chunk %s. Please provide document_id explicitly", chunkID) + jsonBody, ok := cmd.Params["json_body"].(string) + if !ok { + return nil, fmt.Errorf("json_body not provided") } // Parse the JSON body @@ -3028,21 +2954,16 @@ func (c *RAGFlowClient) GetChunk(cmd *Command) (ResponseIf, error) { return nil, fmt.Errorf("chunk_id not provided") } - datasetName, ok := cmd.Params["dataset_name"].(string) - if !ok { - return nil, fmt.Errorf("dataset_name not provided") - } - - datasetID, err := c.getDatasetID(datasetName) - if err != nil { - return nil, err - } - docID, ok := cmd.Params["doc_id"].(string) if !ok { return nil, fmt.Errorf("doc_id not provided") } + datasetID, ok := cmd.Params["dataset_id"].(string) + if !ok { + return nil, fmt.Errorf("dataset_id not provided") + } + resp, err := c.HTTPClient.Request("GET", fmt.Sprintf("/datasets/%s/documents/%s/chunks/%s", datasetID, docID, chunkID), "web", nil, nil) if err != nil { return nil, fmt.Errorf("failed to get chunk: %w", err) @@ -3116,6 +3037,57 @@ func (c *RAGFlowClient) SetMeta(cmd *Command) (ResponseIf, error) { return &result, nil } +// DeleteMeta deletes metadata for a document +// If keys is provided, deletes specific keys; otherwise deletes entire document metadata +func (c *RAGFlowClient) DeleteMeta(cmd *Command) (ResponseIf, error) { + if c.ServerType != "user" { + return nil, fmt.Errorf("this command is only allowed in USER mode") + } + + docID, ok := cmd.Params["doc_id"].(string) + if !ok { + return nil, fmt.Errorf("doc_id not provided") + } + + payload := map[string]interface{}{ + "doc_id": docID, + } + + // If keys provided, include in payload for deleting specific keys + if keysJSON, ok := cmd.Params["keys"].(string); ok { + payload["keys"] = keysJSON + } + + resp, err := c.HTTPClient.Request("POST", "/document/delete_meta", "web", nil, payload) + if err != nil { + return nil, fmt.Errorf("failed to delete metadata: %w", err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("failed to delete metadata: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) + } + + resJSON, err := resp.JSON() + if err != nil { + return nil, fmt.Errorf("invalid JSON response: %w", err) + } + + code, ok := resJSON["code"].(float64) + if !ok { + return nil, fmt.Errorf("invalid response format: code is not a number") + } + + var result SimpleResponse + result.Code = int(code) + if result.Code == 0 { + result.Message = fmt.Sprintf("Success to delete metadata for document: %s", docID) + } else { + result.Message = fmt.Sprintf("Failed to delete metadata: %v", resJSON) + } + result.Duration = 0 + return &result, nil +} + // RmTags removes tags from chunks in a dataset func (c *RAGFlowClient) RmTags(cmd *Command) (ResponseIf, error) { if c.ServerType != "user" { @@ -3177,15 +3149,24 @@ func (c *RAGFlowClient) RemoveChunks(cmd *Command) (ResponseIf, error) { return nil, fmt.Errorf("this command is only allowed in USER mode") } + datasetName, ok := cmd.Params["dataset_name"].(string) + if !ok { + return nil, fmt.Errorf("dataset_name not provided") + } + docID, ok := cmd.Params["doc_id"].(string) if !ok { return nil, fmt.Errorf("doc_id not provided") } - payload := map[string]interface{}{ - "doc_id": docID, + // Look up dataset ID by name + datasetID, err := c.getDatasetID(datasetName) + if err != nil { + return nil, fmt.Errorf("dataset not found: %w", err) } + payload := map[string]interface{}{} + // Check if delete_all is set if deleteAll, ok := cmd.Params["delete_all"].(bool); ok && deleteAll { payload["delete_all"] = true @@ -3193,7 +3174,7 @@ func (c *RAGFlowClient) RemoveChunks(cmd *Command) (ResponseIf, error) { payload["chunk_ids"] = chunkIDs } - resp, err := c.HTTPClient.Request("POST", "/chunk/rm", "web", nil, payload) + resp, err := c.HTTPClient.Request("DELETE", "/datasets/"+datasetID+"/documents/"+docID+"/chunks", "web", nil, payload) if err != nil { return nil, fmt.Errorf("failed to remove chunks: %w", err) } diff --git a/internal/cli/user_parser.go b/internal/cli/user_parser.go index 6b481af66..2575d2517 100644 --- a/internal/cli/user_parser.go +++ b/internal/cli/user_parser.go @@ -621,10 +621,10 @@ func (p *Parser) parseCreateCommand() (*Command, error) { return p.parseCreateChat() case TokenToken: return p.parseCreateToken() - case TokenDatasetTable: - return p.parseCreateDatasetTable() + case TokenChunkStore: + return p.parseCreateChunkStore() case TokenMetadata: - return p.parseCreateMetadataTable() + return p.parseCreateMetadataStore() case TokenProvider: return p.parseCreateProviderInstance() default: @@ -656,9 +656,21 @@ func (p *Parser) parseCreateToken() (*Command, error) { } // Internal CLI for GO -// parseCreateDatasetTable parses: CREATE DATASET TABLE 'name' VECTOR SIZE N -func (p *Parser) parseCreateDatasetTable() (*Command, error) { - p.nextToken() // consume DATASET TABLE compound token +// parseCreateChunkStore parses: CREATE CHUNK STORE for Dataset 'name' VECTOR SIZE N +func (p *Parser) parseCreateChunkStore() (*Command, error) { + p.nextToken() // consume CHUNK STORE compound token + + // Expect FOR + if p.curToken.Type != TokenFor { + return nil, fmt.Errorf("expected FOR after CHUNK STORE, got %s", p.curToken.Value) + } + p.nextToken() + + // Expect Dataset + if p.curToken.Type != TokenDataset { + return nil, fmt.Errorf("expected Dataset after FOR, got %s", p.curToken.Value) + } + p.nextToken() datasetName, err := p.parseQuotedString() if err != nil { @@ -688,20 +700,20 @@ func (p *Parser) parseCreateDatasetTable() (*Command, error) { p.nextToken() } - cmd := NewCommand("create_dataset_table") + cmd := NewCommand("create_chunk_store") cmd.Params["dataset_name"] = datasetName cmd.Params["vector_size"] = vectorSize return cmd, nil } // Internal CLI for GO -// parseCreateMetadataTable parses: CREATE METADATA TABLE -func (p *Parser) parseCreateMetadataTable() (*Command, error) { - // CREATE METADATA TABLE +// parseCreateMetadataStore parses: CREATE METADATA STORE +func (p *Parser) parseCreateMetadataStore() (*Command, error) { + // CREATE METADATA STORE p.nextToken() // consume METADATA - if p.curToken.Type != TokenTable { - return nil, fmt.Errorf("expected TABLE after METADATA, got %s", p.curToken.Value) + if p.curToken.Type != TokenStore { + return nil, fmt.Errorf("expected STORE after METADATA, got %s", p.curToken.Value) } p.nextToken() @@ -709,7 +721,7 @@ func (p *Parser) parseCreateMetadataTable() (*Command, error) { p.nextToken() } - return NewCommand("create_metadata_table"), nil + return NewCommand("create_metadata_store"), nil } func (p *Parser) parseCreateUser() (*Command, error) { @@ -1039,10 +1051,10 @@ func (p *Parser) parseDropCommand() (*Command, error) { return p.parseDropChat() case TokenToken: return p.parseDropToken() - case TokenDatasetTable: - return p.parseDropDatasetTable() + case TokenChunkStore: + return p.parseDropChunkStore() case TokenMetadata: - return p.parseDropMetadataTable() + return p.parseDropMetadataStore() case TokenInstance: return p.parseDropInstance() case TokenModel: @@ -1058,8 +1070,10 @@ func (p *Parser) parseDeleteCommand() (*Command, error) { switch p.curToken.Type { case TokenProvider: return p.parseDeleteProvider() + case TokenMetadata: + return p.parseDeleteMeta() default: - return nil, fmt.Errorf("unknown DROP target: %s", p.curToken.Value) + return nil, fmt.Errorf("unknown DELETE target: %s", p.curToken.Value) } } @@ -1108,9 +1122,21 @@ func (p *Parser) parseDropToken() (*Command, error) { } // Internal CLI for GO -// parseDropDatasetTable parses: DROP DATASET TABLE 'name' -func (p *Parser) parseDropDatasetTable() (*Command, error) { - p.nextToken() // consume DATASET TABLE +// parseDropChunkStore parses: DROP CHUNK STORE for Dataset 'name' +func (p *Parser) parseDropChunkStore() (*Command, error) { + p.nextToken() // consume CHUNK STORE + + // Expect FOR + if p.curToken.Type != TokenFor { + return nil, fmt.Errorf("expected FOR after CHUNK STORE, got %s", p.curToken.Value) + } + p.nextToken() + + // Expect Dataset + if p.curToken.Type != TokenDataset { + return nil, fmt.Errorf("expected Dataset after FOR, got %s", p.curToken.Value) + } + p.nextToken() datasetName, err := p.parseQuotedString() if err != nil { @@ -1122,26 +1148,25 @@ func (p *Parser) parseDropDatasetTable() (*Command, error) { p.nextToken() } - cmd := NewCommand("drop_dataset_table") + cmd := NewCommand("drop_chunk_store") cmd.Params["dataset_name"] = datasetName return cmd, nil } -// Internal CLI for GO -// parseDropMetadataTable parses: DROP METADATA TABLE -func (p *Parser) parseDropMetadataTable() (*Command, error) { - // DROP METADATA TABLE +// parseDropMetadataStore parses: DROP METADATA STORE +func (p *Parser) parseDropMetadataStore() (*Command, error) { + // DROP METADATA STORE p.nextToken() // consume METADATA - if p.curToken.Type != TokenTable { - return nil, fmt.Errorf("expected TABLE after METADATA, got %s", p.curToken.Value) + if p.curToken.Type != TokenStore { + return nil, fmt.Errorf("expected STORE after METADATA, got %s", p.curToken.Value) } p.nextToken() if p.curToken.Type == TokenSemicolon { p.nextToken() } - cmd := NewCommand("drop_metadata_table") + cmd := NewCommand("drop_metadata_store") return cmd, nil } @@ -1545,7 +1570,7 @@ func (p *Parser) parseShowInstance() (*Command, error) { return cmd, nil } -// parseShowInstance parses SHOW BALANCE FROM +// parseShowBalance parses SHOW BALANCE FROM func (p *Parser) parseShowBalance() (*Command, error) { p.nextToken() // consume INSTANCE @@ -2166,20 +2191,20 @@ func (p *Parser) parseImportCommand() (*Command, error) { func (p *Parser) parseInsertCommand() (*Command, error) { p.nextToken() // consume INSERT - // Expect DATASET or METADATA - if p.curToken.Type == TokenDataset { - return p.parseInsertDatasetFromFile() + // Expect CHUNKS or METADATA + if p.curToken.Type == TokenChunks { + return p.parseInsertChunksFromFile() } if p.curToken.Type == TokenMetadata { return p.parseInsertMetadataFromFile() } - return nil, fmt.Errorf("expected DATASET or METADATA after INSERT, got %s", p.curToken.Value) + return nil, fmt.Errorf("expected CHUNKS or METADATA after INSERT, got %s", p.curToken.Value) } // Internal CLI for GO -// parseInsertDatasetFromFile parses: INSERT DATASET FROM FILE "file_path" -func (p *Parser) parseInsertDatasetFromFile() (*Command, error) { - p.nextToken() // consume DATASET +// parseInsertChunksFromFile parses: INSERT CHUNKS FROM FILE "file_path" +func (p *Parser) parseInsertChunksFromFile() (*Command, error) { + p.nextToken() // consume CHUNKS // Expect FROM if p.curToken.Type != TokenFrom { @@ -2199,7 +2224,7 @@ func (p *Parser) parseInsertDatasetFromFile() (*Command, error) { return nil, err } - cmd := NewCommand("insert_dataset_from_file") + cmd := NewCommand("insert_chunks_from_file") cmd.Params["file_path"] = filePath p.nextToken() @@ -3261,6 +3286,8 @@ func (p *Parser) parseUserStatement() (*Command, error) { switch p.curToken.Type { case TokenPing: return p.parsePingServer() + case TokenDelete: + return p.parseDeleteCommand() case TokenShow: return p.parseShowCommand() case TokenCreate: @@ -3389,7 +3416,7 @@ func (p *Parser) parseGetCommand() (*Command, error) { return nil, fmt.Errorf("unknown GET target: %s", p.curToken.Value) } -// parseGetChunk parses: GET CHUNK 'chunk_id' OF DATASET 'dataset_name' DOCUMENT 'doc_id' +// parseGetChunk parses: GET CHUNK 'chunk_id' OF DOCUMENT 'doc_id' IN DATASET 'dataset_id' func (p *Parser) parseGetChunk() (*Command, error) { p.nextToken() // consume CHUNK @@ -3408,21 +3435,8 @@ func (p *Parser) parseGetChunk() (*Command, error) { } p.nextToken() - if p.curToken.Type != TokenDataset { - return nil, fmt.Errorf("expected DATASET after OF") - } - p.nextToken() - - // Parse dataset_name - datasetName, err := p.parseQuotedString() - if err != nil { - return nil, fmt.Errorf("expected dataset_name: %w", err) - } - cmd.Params["dataset_name"] = datasetName - - p.nextToken() if p.curToken.Type != TokenDocument { - return nil, fmt.Errorf("expected DOCUMENT after dataset_name") + return nil, fmt.Errorf("expected DOCUMENT after OF") } p.nextToken() @@ -3433,6 +3447,24 @@ func (p *Parser) parseGetChunk() (*Command, error) { } cmd.Params["doc_id"] = docID + p.nextToken() + if p.curToken.Type != TokenIn { + return nil, fmt.Errorf("expected IN after doc_id") + } + p.nextToken() + + if p.curToken.Type != TokenDataset { + return nil, fmt.Errorf("expected DATASET after IN") + } + p.nextToken() + + // Parse dataset_id + datasetID, err := p.parseQuotedString() + if err != nil { + return nil, fmt.Errorf("expected dataset_id: %w", err) + } + cmd.Params["dataset_id"] = datasetID + p.nextToken() // Semicolon is optional if p.curToken.Type == TokenSemicolon { @@ -3455,7 +3487,7 @@ func (p *Parser) parseUpdateCommand() (*Command, error) { } // Internal CLI for GO -// parseUpdateChunk parses: UPDATE CHUNK 'chunk_id' OF DATASET 'dataset_name' SET '{"content": "..."}' +// parseUpdateChunk parses: UPDATE CHUNK 'chunk_id' OF DOCUMENT 'doc_id' IN DATASET 'dataset_id' SET '{"content": "..."}' func (p *Parser) parseUpdateChunk() (*Command, error) { p.nextToken() // consume CHUNK @@ -3474,8 +3506,26 @@ func (p *Parser) parseUpdateChunk() (*Command, error) { } p.nextToken() + if p.curToken.Type != TokenDocument { + return nil, fmt.Errorf("expected DOCUMENT after OF") + } + p.nextToken() + + // Parse doc_id + docID, err := p.parseQuotedString() + if err != nil { + return nil, fmt.Errorf("expected doc_id: %w", err) + } + cmd.Params["doc_id"] = docID + + p.nextToken() + if p.curToken.Type != TokenIn { + return nil, fmt.Errorf("expected IN after doc_id") + } + p.nextToken() + if p.curToken.Type != TokenDataset { - return nil, fmt.Errorf("expected DATASET after OF") + return nil, fmt.Errorf("expected DATASET after IN") } p.nextToken() @@ -3555,6 +3605,65 @@ func (p *Parser) parseSetMeta() (*Command, error) { return cmd, nil } +// parseDeleteMeta parses: DELETE METADATA OF DOCUMENT 'doc_id' [KEYS '["key1", "key2"]'] +// If KEYS is not provided, deletes entire document metadata +func (p *Parser) parseDeleteMeta() (*Command, error) { + p.nextToken() // consume METADATA + + // Expect OF + if p.curToken.Type != TokenOf { + return nil, fmt.Errorf("expected OF after DELETE METADATA") + } + p.nextToken() + + // Expect DOCUMENT + if p.curToken.Type != TokenDocument { + return nil, fmt.Errorf("expected DOCUMENT after DELETE METADATA OF") + } + p.nextToken() + + // Parse doc_id + docID, err := p.parseQuotedString() + if err != nil { + return nil, fmt.Errorf("expected doc_id: %w", err) + } + cmd := NewCommand("delete_meta") + cmd.Params["doc_id"] = docID + + p.nextToken() + // KEYS is optional - if not provided, delete entire document metadata + if p.curToken.Type != TokenKeys { + if p.curToken.Type == TokenSemicolon { + p.nextToken() + return cmd, nil + } + if p.curToken.Type == TokenEOF { + return cmd, nil + } + return nil, fmt.Errorf("expected KEYS or end of command after doc_id") + } + + // Parse keys JSON array + p.nextToken() + keys, err := p.parseQuotedString() + if err != nil { + return nil, fmt.Errorf("expected keys JSON array: %w", err) + } + cmd.Params["keys"] = keys + + p.nextToken() + // Semicolon is optional + if p.curToken.Type == TokenSemicolon { + p.nextToken() + return cmd, nil + } + if p.curToken.Type != TokenEOF { + return nil, fmt.Errorf("expected end of command after KEYS") + } + + return cmd, nil +} + // parseRemoveTags parses: REMOVE TAGS 'tag1', 'tag2' from DATASET 'dataset_name'; func (p *Parser) parseRemoveTags() (*Command, error) { p.nextToken() // consume TAGS @@ -3611,8 +3720,8 @@ func (p *Parser) parseRemoveTags() (*Command, error) { } // parseRemoveChunk parses: -// - REMOVE CHUNKS 'chunk_id1', 'chunk_id2' FROM DOCUMENT 'doc_id'; -// - REMOVE ALL CHUNKS FROM DOCUMENT 'doc_id'; +// - REMOVE CHUNKS 'chunk_id1', 'chunk_id2' FROM DOCUMENT 'doc_id' IN DATASET 'dataset_name'; +// - REMOVE ALL CHUNKS FROM DOCUMENT 'doc_id' IN DATASET 'dataset_name'; func (p *Parser) parseRemoveChunk() (*Command, error) { cmd := NewCommand("remove_chunks") @@ -3627,7 +3736,7 @@ func (p *Parser) parseRemoveChunk() (*Command, error) { } else { // curToken is TokenChunks, consume it first p.nextToken() - // Multiple chunks: REMOVE CHUNKS 'id1' 'id2' FROM DOCUMENT 'doc_id' (space-separated) + // Multiple chunks: REMOVE CHUNKS 'id1' 'id2' FROM DOCUMENT 'doc_id' IN DATASET 'dataset_name' (space-separated) // Parse first chunk ID chunkID, err := p.parseQuotedString() if err != nil { @@ -3669,6 +3778,28 @@ func (p *Parser) parseRemoveChunk() (*Command, error) { return nil, fmt.Errorf("expected doc_id: %w", err) } cmd.Params["doc_id"] = docID + + p.nextToken() + + // Expect IN + if p.curToken.Type != TokenIn { + return nil, fmt.Errorf("expected IN after doc_id") + } + p.nextToken() + + // Expect DATASET + if p.curToken.Type != TokenDataset { + return nil, fmt.Errorf("expected DATASET after IN") + } + p.nextToken() + + // Parse dataset_name (quoted string) + datasetName, err := p.parseQuotedString() + if err != nil { + return nil, fmt.Errorf("expected dataset_name: %w", err) + } + cmd.Params["dataset_name"] = datasetName + p.nextToken() // Semicolon is optional diff --git a/internal/dao/kb.go b/internal/dao/kb.go index e025b5e7c..d74c98b7b 100644 --- a/internal/dao/kb.go +++ b/internal/dao/kb.go @@ -17,16 +17,24 @@ package dao import ( + "errors" "path" "ragflow/internal/entity" "strconv" "strings" + + "gorm.io/gorm" ) // KnowledgebaseDAO knowledge base data access object type KnowledgebaseDAO struct{} +// IsNotFoundErr returns true if the error indicates a record not found +func IsNotFoundErr(err error) bool { + return errors.Is(err, gorm.ErrRecordNotFound) +} + // NewKnowledgebaseDAO create knowledge base DAO func NewKnowledgebaseDAO() *KnowledgebaseDAO { return &KnowledgebaseDAO{} diff --git a/internal/development.md b/internal/development.md index 2483e6e1e..9834ef4f7 100644 --- a/internal/development.md +++ b/internal/development.md @@ -386,19 +386,19 @@ RAGFlow(user)> ocr with 'paddleocr-vl-0.9b@test@baidu' file './internal/text.jpg ### 6.26 Chunk Management Commands -- Create a dataset table with vector size +- Create a chunk store with vector size ``` -RAGFlow(user)> CREATE DATASET TABLE 'test' VECTOR SIZE 384 +RAGFlow(user)> CREATE CHUNK STORE FOR DATASET 'test' VECTOR SIZE 384 ``` - Insert data from JSON files ``` -RAGFlow(user)> INSERT DATASET FROM FILE 'insert_kb.json' +RAGFlow(user)> INSERT CHUNKS FROM FILE 'insert_kb.json' ``` - Update a chunk's content ``` -RAGFlow(user)> UPDATE CHUNK 'deb165dc6a732a64' OF DATASET 'test' SET '{"content": "Updated chunk content here", "important_keywords": ["keyword1", "keyword2"], "questions": ["What is this about?", "Why is it important?"], "available": true, "tag_kwd": ["tag5", "tag2"]}' +RAGFlow(user)> UPDATE CHUNK 'deb165dc6a732a64' OF DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' IN DATASET 'test' SET '{"content": "Updated chunk content here", "important_keywords": ["keyword1", "keyword2"], "questions": ["What is this about?", "Why is it important?"], "available": true, "tag_kwd": ["tag5", "tag2"]}' ``` - Remove tags from a dataset @@ -408,17 +408,17 @@ RAGFlow(user)> REMOVE TAGS 'tag1', 'tag2' FROM DATASET 'test' - Remove specific chunks from a document ``` -RAGFlow(user)> REMOVE CHUNKS '29cc4f6d7a5c6e7c' '0360e3d8519eab12' FROM DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' +RAGFlow(user)> REMOVE CHUNKS '29cc4f6d7a5c6e7c' '0360e3d8519eab12' FROM DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' IN DATASET 'test' ``` - Remove all chunks from a document ``` -RAGFlow(user)> REMOVE ALL CHUNKS FROM DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' +RAGFlow(user)> REMOVE ALL CHUNKS FROM DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' IN DATASET 'test' ``` -- Drop dataset table +- Drop chunk store ``` -RAGFlow(user)> DROP DATASET TABLE 'test' +RAGFlow(user)> DROP CHUNK STORE FOR DATASET 'test' ``` - Search chunks @@ -428,17 +428,17 @@ RAGFlow(user)> SEARCH '曹操' ON DATASETS 'test' - Get chunks ``` -RAGFlow(user)> GET CHUNK '29cc4f6d7a5c6e7c' OF DATASET 'test' DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' +RAGFlow(user)> GET CHUNK '29cc4f6d7a5c6e7c' OF DATASET 'test' DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' IN DATASET 'test' ``` ### 6.27 Metadata Management Commands -- Create metadata table +- Create metadata store ``` -RAGFlow(user)> CREATE METADATA TABLE +RAGFlow(user)> CREATE METADATA STORE ``` -- Insert data from JSON files +- Insert metadata from JSON files ``` RAGFlow(user)> INSERT METADATA FROM FILE 'insert_metadata.json' ``` @@ -447,9 +447,19 @@ RAGFlow(user)> INSERT METADATA FROM FILE 'insert_metadata.json' RAGFlow(user)> SET METADATA OF DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' TO '{"author": ["John", "Tom"], "category": "tech"}'; ``` -- Drop metadata table +- Delete metadata of a document ``` -RAGFlow(user)> DROP METADATA TABLE +DELETE METADATA OF DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' +``` + +- Delete metadata keys of a document +``` +DELETE METADATA OF DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' KEYS '["key1", "key2"]' +``` + +- Drop metadata store +``` +RAGFlow(user)> DROP METADATA STORE ``` - List metadata diff --git a/internal/engine/elasticsearch/chunk.go b/internal/engine/elasticsearch/chunk.go index 26414d7e6..bc70ac8ab 100644 --- a/internal/engine/elasticsearch/chunk.go +++ b/internal/engine/elasticsearch/chunk.go @@ -23,6 +23,8 @@ import ( "fmt" "io" "os" + "slices" + "sort" "strings" "github.com/elastic/go-elasticsearch/v8/esapi" @@ -38,13 +40,14 @@ func (e *elasticsearchEngine) CreateChunkStore(ctx context.Context, baseName, da return fmt.Errorf("index name cannot be empty") } - // Check if index already exists + // Check if index already exists (matches Python create_idx behavior) exists, err := e.indexExists(ctx, baseName) if err != nil { return fmt.Errorf("failed to check index existence: %w", err) } if exists { - return fmt.Errorf("index '%s' already exists", baseName) + common.Info("Index already exists, skipping creation", zap.String("index_name", baseName)) + return nil } // Load mapping based on index type @@ -108,53 +111,61 @@ func (e *elasticsearchEngine) CreateChunkStore(ctx context.Context, baseName, da return fmt.Errorf("index creation not acknowledged") } + common.Info("Successfully created Elasticsearch index", zap.String("index_name", baseName)) return nil } -// InsertChunks inserts documents into a dataset index +// InsertChunks inserts chunks into a chunk index +// If a chunk with the same id + doc_id + kb_id already exists, it will be updated with the new value func (e *elasticsearchEngine) InsertChunks(ctx context.Context, chunks []map[string]interface{}, baseName string, datasetID string) ([]string, error) { - fullIndexName := fmt.Sprintf("%s_%s", baseName, datasetID) - common.Info("Inserting chunks into Elasticsearch index", zap.String("index_name", fullIndexName), zap.String("dataset_id", datasetID), zap.Int("doc_count", len(chunks))) + common.Info("ElasticsearchConnection.InsertChunks called", zap.String("index_name", baseName), zap.Int("chunkCount", len(chunks))) if len(chunks) == 0 { return []string{}, nil } - if fullIndexName == "" { + if baseName == "" { return nil, fmt.Errorf("index name cannot be empty") } - // Build bulk request body + // Build bulk request body with index operations (upsert behavior: insert if not exists, update if exists) var buf bytes.Buffer for _, doc := range chunks { - // Action line - index operation - action := map[string]interface{}{ - "index": map[string]interface{}{ - "_index": fullIndexName, - }, + docID, _ := doc["doc_id"].(string) + chunkID, _ := doc["id"].(string) + if docID == "" || chunkID == "" { + common.Warn("Skipping chunk without doc_id or id") + continue } - actionBytes, err := json.Marshal(action) + + compositeID := fmt.Sprintf("%s_%s_%s", docID, datasetID, chunkID) + + // Action line: use json.Marshal to properly escape string values + action, err := json.Marshal(map[string]interface{}{ + "index": map[string]interface{}{ + "_index": baseName, + "_id": compositeID, + }, + }) if err != nil { common.Error("Failed to marshal bulk action", err) return nil, fmt.Errorf("failed to marshal bulk action: %w", err) } - buf.Write(actionBytes) + buf.Write(action) buf.WriteByte('\n') - // Document line - docBytes, err := json.Marshal(doc) - if err != nil { - common.Error("Failed to marshal document", err) - return nil, fmt.Errorf("failed to marshal document: %w", err) + // Document line: work with a copy to avoid mutating the original + docCopy := copyFields(doc) + docCopy["kb_id"] = datasetID + if err := json.NewEncoder(&buf).Encode(docCopy); err != nil { + return nil, fmt.Errorf("failed to encode document: %w", err) } - buf.Write(docBytes) - buf.WriteByte('\n') } - // Execute bulk request + // Execute bulk request with refresh="wait_for" (matches Python behavior) req := esapi.BulkRequest{ Body: bytes.NewReader(buf.Bytes()), - Refresh: "false", + Refresh: "wait_for", } res, err := req.Do(ctx, e.client) @@ -165,11 +176,12 @@ func (e *elasticsearchEngine) InsertChunks(ctx context.Context, chunks []map[str defer res.Body.Close() if res.IsError() { - common.Sugar.Errorw("Elasticsearch bulk request returned error", "status", res.Status()) - return nil, fmt.Errorf("elasticsearch bulk request returned error: %s", res.Status()) + bodyBytes, _ := io.ReadAll(res.Body) + common.Sugar.Errorw("Elasticsearch bulk request returned error", "status", res.Status(), "body", string(bodyBytes)) + return nil, fmt.Errorf("elasticsearch bulk request returned error: %s, body: %s", res.Status(), string(bodyBytes)) } - // Parse bulk response to check for errors + // Parse bulk response var bulkResponse map[string]interface{} if err := json.NewDecoder(res.Body).Decode(&bulkResponse); err != nil { common.Error("Failed to parse bulk response", err) @@ -182,14 +194,14 @@ func (e *elasticsearchEngine) InsertChunks(ctx context.Context, chunks []map[str // Could iterate through items to find specific errors if needed } - common.Info("Successfully inserted chunks into Elasticsearch index", zap.String("index_name", fullIndexName), zap.Int("doc_count", len(chunks))) + common.Info("ElasticsearchConnection.InsertChunks result", zap.String("index_name", baseName), zap.Int("count", len(chunks))) return []string{}, nil } // UpdateChunks updates chunks by condition func (e *elasticsearchEngine) UpdateChunks(ctx context.Context, condition map[string]interface{}, newValue map[string]interface{}, baseName string, datasetID string) error { - fullIndexName := fmt.Sprintf("%s_%s", baseName, datasetID) - common.Info("Updating chunks in Elasticsearch index", zap.String("index_name", fullIndexName), zap.Any("condition", condition), zap.Any("new_value", newValue)) + fullIndexName := baseName + common.Info("ElasticsearchConnection.UpdateChunks called", zap.String("index_name", fullIndexName), zap.Any("condition", condition), zap.Any("new_value", newValue)) if fullIndexName == "" { return fmt.Errorf("index name cannot be empty") @@ -205,42 +217,296 @@ func (e *elasticsearchEngine) UpdateChunks(ctx context.Context, condition map[st return fmt.Errorf("index '%s' does not exist", fullIndexName) } - // Build query from condition - query := e.buildQueryFromCondition(condition) - if query == nil { - query = map[string]interface{}{"match_all": map[string]interface{}{}} + // Add kb_id to condition + condition["kb_id"] = datasetID + + // Case 1: Single document update (when condition["id"] is a string) + if chunkID, ok := condition["id"].(string); ok { + return e.updateSingleChunk(ctx, fullIndexName, chunkID, newValue) } - // Process remove operation if present - var removeOperations []map[string]interface{} - if removeData, ok := newValue["remove"].(map[string]interface{}); ok { - removeOperations = e.buildRemoveOperations(removeData, query, fullIndexName) - } - delete(newValue, "remove") + // Case 2: Multi-document update via UpdateByQuery + return e.updateChunksByQuery(ctx, fullIndexName, condition, newValue) +} - // Build update body +// updateSingleChunk handles single document update (matches Python lines 350-398) +func (e *elasticsearchEngine) updateSingleChunk(ctx context.Context, indexName, chunkID string, newValue map[string]interface{}) error { + common.Debug("ElasticsearchConnection.updateSingleChunk called", zap.String("indexName", indexName), zap.String("chunkID", chunkID)) + + // First find the document by id field to get the actual _id + searchReq := map[string]interface{}{ + "query": map[string]interface{}{ + "term": map[string]interface{}{"id": chunkID}, + }, + } + + body, err := json.Marshal(searchReq) + if err != nil { + return fmt.Errorf("failed to marshal search request: %w", err) + } + + res, err := e.client.Search( + e.client.Search.WithContext(ctx), + e.client.Search.WithIndex(indexName), + e.client.Search.WithBody(bytes.NewReader(body)), + ) + if err != nil { + return fmt.Errorf("failed to search for chunk: %w", err) + } + defer res.Body.Close() + + if res.IsError() { + return fmt.Errorf("failed to search for chunk: %s", res.Status()) + } + + var searchResult map[string]interface{} + if err := json.NewDecoder(res.Body).Decode(&searchResult); err != nil { + return fmt.Errorf("failed to parse search response: %w", err) + } + + hits, ok := searchResult["hits"].(map[string]interface{}) + if !ok { + return fmt.Errorf("elasticsearch update error: 404 Not Found") + } + + hitList, ok := hits["hits"].([]interface{}) + if !ok || len(hitList) == 0 { + return fmt.Errorf("elasticsearch update error: 404 Not Found") + } + + firstHit, ok := hitList[0].(map[string]interface{}) + if !ok { + return fmt.Errorf("elasticsearch update error: 404 Not Found") + } + + actualID, ok := firstHit["_id"].(string) + if !ok { + return fmt.Errorf("elasticsearch update error: 404 Not Found") + } + + doc := copyFields(newValue) + delete(doc, "id") + + removeValue, _ := doc["remove"] + delete(doc, "remove") + removeField, _ := removeValue.(string) + removeDict, _ := removeValue.(map[string]interface{}) + + // Remove *_feas fields + var feasFields []string + for k := range doc { + if strings.HasSuffix(k, "feas") { + feasFields = append(feasFields, k) + } + } + for _, k := range feasFields { + scriptBody := map[string]interface{}{ + "script": map[string]interface{}{ + "source": fmt.Sprintf("ctx._source.remove(\"%s\");", k), + }, + } + body, _ := json.Marshal(scriptBody) + req := esapi.UpdateRequest{ + Index: indexName, + DocumentID: actualID, + Body: bytes.NewReader(body), + } + res, err := req.Do(ctx, e.client) + if err != nil { + common.Warn("Failed to remove feas field", zap.String("field", k), zap.Error(err)) + } else { + res.Body.Close() + } + } + + // Remove specific field if removeField is set + if removeField != "" { + scriptBody := map[string]interface{}{ + "script": map[string]interface{}{ + "source": fmt.Sprintf("ctx._source.remove('%s');", removeField), + }, + } + body, _ := json.Marshal(scriptBody) + req := esapi.UpdateRequest{ + Index: indexName, + DocumentID: actualID, + Body: bytes.NewReader(body), + } + res, err := req.Do(ctx, e.client) + if err != nil { + common.Warn("Failed to remove field", zap.String("field", removeField), zap.Error(err)) + } else { + res.Body.Close() + } + } + + // Remove specific values from array fields (removeDict) + if removeDict != nil { + scripts := []string{} + params := make(map[string]interface{}) + for kk, vv := range removeDict { + scripts = append(scripts, + fmt.Sprintf("if (ctx._source.containsKey('%s') && ctx._source.%s != null) { int i = ctx._source.%s.indexOf(params.p_%s); if (i >= 0) { ctx._source.%s.remove(i); }}", + kk, kk, kk, kk, kk)) + params[fmt.Sprintf("p_%s", kk)] = vv + } + if scripts != nil { + scriptBody := map[string]interface{}{ + "script": map[string]interface{}{ + "source": strings.Join(scripts, ""), + "params": params, + }, + } + body, _ := json.Marshal(scriptBody) + req := esapi.UpdateRequest{ + Index: indexName, + DocumentID: actualID, + Body: bytes.NewReader(body), + } + res, err := req.Do(ctx, e.client) + if err != nil { + common.Warn("Failed to remove dict fields", zap.Error(err)) + } else { + res.Body.Close() + } + } + } + + // Update document fields if any remain + if len(doc) > 0 { + updateBody := map[string]interface{}{"doc": doc} + body, _ := json.Marshal(updateBody) + req := esapi.UpdateRequest{ + Index: indexName, + DocumentID: actualID, + Body: bytes.NewReader(body), + } + res, err := req.Do(ctx, e.client) + if err != nil { + return fmt.Errorf("failed to update document: %w", err) + } + defer res.Body.Close() + if res.IsError() { + return fmt.Errorf("elasticsearch update error: %s", res.Status()) + } + } + + common.Debug("ElasticsearchConnection.updateSingleChunk completed", zap.String("indexName", indexName), zap.String("chunkID", chunkID)) + return nil +} + +// updateChunksByQuery handles multi-document update +func (e *elasticsearchEngine) updateChunksByQuery(ctx context.Context, indexName string, condition map[string]interface{}, newValue map[string]interface{}) error { + common.Debug("ElasticsearchConnection.updateChunksByQuery called", zap.String("indexName", indexName)) + + // Build bool query from condition + var mustClauses []map[string]interface{} + for k, v := range condition { + if k == "exists" { + mustClauses = append(mustClauses, map[string]interface{}{ + "exists": map[string]interface{}{"field": v}, + }) + continue + } + if v == nil || v == "" { + continue + } + if listVal, ok := v.([]interface{}); ok { + mustClauses = append(mustClauses, map[string]interface{}{ + "terms": map[string]interface{}{k: listVal}, + }) + } else if _, ok := v.(string); ok { + mustClauses = append(mustClauses, map[string]interface{}{ + "term": map[string]interface{}{k: v}, + }) + } else if _, ok := v.(int); ok { + mustClauses = append(mustClauses, map[string]interface{}{ + "term": map[string]interface{}{k: v}, + }) + } + } + + boolQuery := map[string]interface{}{ + "bool": map[string]interface{}{ + "filter": mustClauses, + }, + } + + // Build painless scripts from newValue + var scripts []string + params := make(map[string]interface{}) + + for k, v := range newValue { + if k == "remove" { + if removeStr, ok := v.(string); ok { + scripts = append(scripts, fmt.Sprintf("ctx._source.remove('%s');", removeStr)) + continue + } + if removeDict, ok := v.(map[string]interface{}); ok { + for kk, vv := range removeDict { + scripts = append(scripts, + fmt.Sprintf("if (ctx._source.containsKey('%s') && ctx._source.%s != null) { int i = ctx._source.%s.indexOf(params.p_%s); if (i >= 0) { ctx._source.%s.remove(i); }}", + kk, kk, kk, kk, kk)) + params[fmt.Sprintf("p_%s", kk)] = vv + } + } + continue + } + if k == "add" { + if addDict, ok := v.(map[string]interface{}); ok { + for kk, vv := range addDict { + vvStr, ok := vv.(string) + if ok { + vvStr = strings.TrimSpace(vvStr) + scripts = append(scripts, fmt.Sprintf("ctx._source.%s.add(params.pp_%s);", kk, kk)) + params[fmt.Sprintf("pp_%s", kk)] = vvStr + } + } + } + continue + } + if (k == "" || v == nil) && k != "available_int" { + continue + } + + switch val := v.(type) { + case string: + // Sanitize: replace ' \n \r with space + sanitized := sanitizeString(val) + params[fmt.Sprintf("pp_%s", k)] = sanitized + scripts = append(scripts, fmt.Sprintf("ctx._source.%s=params.pp_%s;", k, k)) + case int, float64: + scripts = append(scripts, fmt.Sprintf("ctx._source.%s=%v;", k, val)) + case []interface{}: + params[fmt.Sprintf("pp_%s", k)] = val + scripts = append(scripts, fmt.Sprintf("ctx._source.%s=params.pp_%s;", k, k)) + } + } + + scriptSource := strings.Join(scripts, "") + + // Build update by query body updateBody := map[string]interface{}{ - "query": query, - } - - // Handle script-based update if needed (for remove operations or transformations) - if len(removeOperations) > 0 || e.needsScriptUpdate(newValue) { - script := e.buildUpdateScript(newValue, removeOperations) - updateBody["script"] = script - } else { - updateBody["doc"] = newValue + "query": boolQuery, + "script": map[string]interface{}{ + "source": scriptSource, + "params": params, + }, } bodyBytes, err := json.Marshal(updateBody) if err != nil { - common.Error("Failed to marshal update body", err) return fmt.Errorf("failed to marshal update body: %w", err) } - // Execute update by query + // Execute update by query with refresh=true, slices=5, conflicts="proceed" + refreshTrue := true req := esapi.UpdateByQueryRequest{ - Index: []string{fullIndexName}, - Body: bytes.NewReader(bodyBytes), + Index: []string{indexName}, + Body: bytes.NewReader(bodyBytes), + Refresh: &refreshTrue, + Slices: 5, + Conflicts: "proceed", } res, err := req.Do(ctx, e.client) @@ -251,27 +517,35 @@ func (e *elasticsearchEngine) UpdateChunks(ctx context.Context, condition map[st defer res.Body.Close() if res.IsError() { - common.Sugar.Errorw("Elasticsearch update by query returned error", "status", res.Status()) - return fmt.Errorf("elasticsearch update by query returned error: %s", res.Status()) - } - - // Parse response - var result map[string]interface{} - if err := json.NewDecoder(res.Body).Decode(&result); err != nil { - common.Error("Failed to parse update response", err) - return fmt.Errorf("failed to parse update response: %w", err) - } - - if updated, ok := result["updated"].(float64); ok { - common.Info("Successfully updated chunks", zap.String("index_name", fullIndexName), zap.Float64("updated_count", updated)) + bodyBytes, _ := io.ReadAll(res.Body) + return fmt.Errorf("elasticsearch update by query error: %s, body: %s", res.Status(), string(bodyBytes)) } + common.Debug("ElasticsearchConnection.updateChunksByQuery completed", zap.String("indexName", indexName)) return nil } +// sanitizeString replaces ' \n \r with space +func sanitizeString(s string) string { + s = strings.ReplaceAll(s, "'", " ") + s = strings.ReplaceAll(s, "\n", " ") + s = strings.ReplaceAll(s, "\r", " ") + return strings.TrimSpace(s) +} + +// copyFields creates a shallow copy of a map +func copyFields(m map[string]interface{}) map[string]interface{} { + result := make(map[string]interface{}) + for k, v := range m { + result[k] = v + } + return result +} + // DeleteChunks deletes chunks from a dataset index by condition func (e *elasticsearchEngine) DeleteChunks(ctx context.Context, condition map[string]interface{}, indexName string, datasetID string) (int64, error) { - fullIndexName := fmt.Sprintf("%s_%s", indexName, datasetID) + // For ES, index name is just indexName (e.g., "ragflow_{tenantID}"), not indexName_datasetID + fullIndexName := indexName common.Info("Deleting chunks from Elasticsearch index", zap.String("index_name", fullIndexName), zap.Any("condition", condition)) // Check if index exists @@ -284,15 +558,97 @@ func (e *elasticsearchEngine) DeleteChunks(ctx context.Context, condition map[st return 0, nil } - // Build query from condition - query := e.buildQueryFromCondition(condition) - if query == nil { - query = map[string]interface{}{"match_all": map[string]interface{}{}} + // Build bool query from condition + var mustClauses []map[string]interface{} + var filterClauses []map[string]interface{} + var mustNotClauses []map[string]interface{} + + // Handle chunk IDs - use terms query on "id" field instead of ids query on _id + if idVal, ok := condition["id"]; ok && idVal != nil { + switch v := idVal.(type) { + case []interface{}: + ids := make([]string, 0, len(v)) + for _, id := range v { + if s, ok := id.(string); ok { + ids = append(ids, s) + } + } + if len(ids) > 0 { + mustClauses = append(mustClauses, map[string]interface{}{ + "terms": map[string]interface{}{"id": ids}, + }) + } + case string: + mustClauses = append(mustClauses, map[string]interface{}{ + "term": map[string]interface{}{"id": v}, + }) + } + } + + // Handle kb_id - add as term filter + if kbID, ok := condition["kb_id"].(string); ok && kbID != "" { + filterClauses = append(filterClauses, map[string]interface{}{ + "term": map[string]interface{}{"kb_id": kbID}, + }) + } + + // Add all other conditions as filters/must/must_not + for k, v := range condition { + if k == "id" || k == "kb_id" { + continue // Already handled above + } + if k == "exists" { + filterClauses = append(filterClauses, map[string]interface{}{ + "exists": map[string]interface{}{"field": v}, + }) + } else if k == "must_not" { + if m, ok := v.(map[string]interface{}); ok { + for kk, vv := range m { + if kk == "exists" { + mustNotClauses = append(mustNotClauses, map[string]interface{}{ + "exists": map[string]interface{}{"field": vv}, + }) + } + } + } + } else if v != nil { + if listVal, ok := v.([]interface{}); ok { + mustClauses = append(mustClauses, map[string]interface{}{ + "terms": map[string]interface{}{k: listVal}, + }) + } else if _, ok := v.(string); ok { + mustClauses = append(mustClauses, map[string]interface{}{ + "term": map[string]interface{}{k: v}, + }) + } else if _, ok := v.(int); ok { + mustClauses = append(mustClauses, map[string]interface{}{ + "term": map[string]interface{}{k: v}, + }) + } + } + } + + // Build the query + var qry map[string]interface{} + if len(filterClauses) == 0 && len(mustClauses) == 0 && len(mustNotClauses) == 0 { + qry = map[string]interface{}{"match_all": map[string]interface{}{}} + } else { + boolMap := map[string]interface{}{} + if len(filterClauses) > 0 { + boolMap["filter"] = filterClauses + } + if len(mustClauses) > 0 { + boolMap["must"] = mustClauses + } + if len(mustNotClauses) > 0 { + boolMap["must_not"] = mustNotClauses + } + qry = map[string]interface{}{"bool": boolMap} } // Build delete by query body deleteBody := map[string]interface{}{ - "query": query, + "query": qry, } bodyBytes, err := json.Marshal(deleteBody) @@ -300,20 +656,30 @@ func (e *elasticsearchEngine) DeleteChunks(ctx context.Context, condition map[st return 0, fmt.Errorf("failed to marshal delete body: %w", err) } - // Execute delete by query + // Execute delete by query with refresh=true + refreshTrue := true req := esapi.DeleteByQueryRequest{ - Index: []string{fullIndexName}, - Body: bytes.NewReader(bodyBytes), + Index: []string{fullIndexName}, + Body: bytes.NewReader(bodyBytes), + Refresh: &refreshTrue, } res, err := req.Do(ctx, e.client) if err != nil { common.Error("Failed to execute delete by query", err) + if strings.Contains(err.Error(), "not_found") { + return 0, nil + } return 0, fmt.Errorf("failed to execute delete by query: %w", err) } defer res.Body.Close() if res.IsError() { + bodyBytes, _ := io.ReadAll(res.Body) + errStr := string(bodyBytes) + if strings.Contains(errStr, "not_found") { + return 0, nil + } common.Sugar.Errorw("Elasticsearch delete by query returned error", "status", res.Status()) return 0, fmt.Errorf("elasticsearch delete by query returned error: %s", res.Status()) } @@ -355,220 +721,461 @@ func (e *elasticsearchEngine) Search(ctx context.Context, req *types.SearchReque } // searchUnified handles the unified types.SearchRequest +// Matches the behavior of Infinity's Search() method func (e *elasticsearchEngine) searchUnified(ctx context.Context, req *types.SearchRequest) (*types.SearchResult, error) { + common.Debug("Search in Elasticsearch started", zap.Any("indexNames", req.IndexNames)) + if len(req.IndexNames) == 0 { return nil, fmt.Errorf("index names cannot be empty") } - // Build pagination parameters + // Get retrieval parameters with defaults + pageSize := req.Limit + if pageSize <= 0 { + pageSize = 30 + } + offset := req.Offset - limit := req.Limit - if limit <= 0 { - limit = 30 // default ES size + if offset < 0 { + offset = 0 } - // Check if this is a skill index - isSkillIndex := len(req.IndexNames) > 0 && strings.HasPrefix(req.IndexNames[0], "skill_") + isMetadataTable := false + isSkillIndex := false + for _, idx := range req.IndexNames { + if strings.HasPrefix(idx, "ragflow_doc_meta_") { + isMetadataTable = true + break + } + if strings.HasPrefix(idx, "skill_") { + isSkillIndex = true + break + } + } - // Build filter clauses - var filterClauses []map[string]interface{} - if isSkillIndex { - filterClauses = buildSkillFilterClauses() + var outputColumns []string + if isMetadataTable { + outputColumns = []string{"id", "kb_id", "meta_fields"} + } else if isSkillIndex { + outputColumns = []string{ + "skill_id", "space_id", "folder_id", "name", "tags", "description", "content", + "version", "status", "create_time", "update_time", + } } else { - filterClauses = buildFilterClauses(req.KbIDs, 1) + outputColumns = []string{ + "id", "doc_id", "kb_id", "content_ltks", "content_with_weight", + "title_tks", "docnm_kwd", "img_id", "available_int", "important_kwd", + "position_int", "page_num_int", "top_int", "chunk_order_int", + "create_timestamp_flt", "knowledge_graph_kwd", "question_kwd", "question_tks", + "doc_type_kwd", "mom_id", "tag_kwd", "pagerank_fea", "tag_feas", + } } - // Add filters from req.Filter - if req.Filter != nil && len(req.Filter) > 0 { - filterClauses = append(filterClauses, buildFilterFromMap(req.Filter)...) - } - - // Build search query body - queryBody := make(map[string]interface{}) - - // Determine search type from MatchExprs - var matchText string + hasTextMatch := false + hasVectorMatch := false + var matchText *types.MatchTextExpr var matchDense *types.MatchDenseExpr - var hasVectorMatch bool - - for _, expr := range req.MatchExprs { - if expr == nil { - continue - } - switch e := expr.(type) { - case string: - matchText = e - case *types.MatchTextExpr: - matchText = e.MatchingText - case *types.MatchDenseExpr: - hasVectorMatch = true - matchDense = e - } - } - - var vectorFieldName string - if !hasVectorMatch || matchDense == nil { - // Keyword-only search - if isSkillIndex { - queryBody["query"] = buildSkillKeywordQuery(matchText, filterClauses, 1.0) - } else { - queryBody["query"] = buildESKeywordQuery(matchText, filterClauses, 1.0) - } - } else { - // Hybrid search: keyword + vector - textWeight := 0.7 // default: vector weight = 0.3 - vectorWeight := 0.3 - if matchDense.ExtraOptions != nil { - if vw, ok := matchDense.ExtraOptions["text_weight"].(float64); ok { - textWeight = vw + if req.MatchExprs != nil && len(req.MatchExprs) > 0 { + for _, expr := range req.MatchExprs { + if expr == nil { + continue } - if vw, ok := matchDense.ExtraOptions["vector_weight"].(float64); ok { - vectorWeight = vw - } - } - - // Build boolean query for text match and filters - var boolQuery map[string]interface{} - if isSkillIndex { - boolQuery = buildSkillKeywordQuery(matchText, filterClauses, 1.0) - } else { - boolQuery = buildESKeywordQuery(matchText, filterClauses, 1.0) - } - // Add boost to the bool query (as in Python code) - if boolMap, ok := boolQuery["bool"].(map[string]interface{}); ok { - boolMap["boost"] = textWeight - } - - // Build kNN query - vectorData := matchDense.EmbeddingData - vectorFieldName = matchDense.VectorColumnName - k := matchDense.TopN - if k <= 0 { - k = req.Limit - } - if k <= 0 { - k = 1024 - } - numCandidates := k * 2 - - similarity := 0.0 - if matchDense.ExtraOptions != nil { - if sim, ok := matchDense.ExtraOptions["similarity"].(float64); ok { - similarity = sim - } - } - - knnQuery := map[string]interface{}{ - "field": vectorFieldName, - "query_vector": vectorData, - "k": k, - "num_candidates": numCandidates, - "similarity": similarity, - "boost": vectorWeight, - } - - queryBody["knn"] = knnQuery - queryBody["query"] = boolQuery - - // Add vector column to Source fields (matching Python ES: src.append(f"q_{len(q_vec)}_vec")) - // Only modify Source if it was explicitly set by the caller - if vectorFieldName != "" && len(req.SelectFields) > 0 { - sourceFields := req.SelectFields - found := false - for _, f := range sourceFields { - if f == vectorFieldName { - found = true - break + switch e := expr.(type) { + case string: + if e != "" { + hasTextMatch = true + matchText = &types.MatchTextExpr{ + MatchingText: e, + TopN: pageSize, + } + } + case *types.MatchTextExpr: + if e.MatchingText != "" { + hasTextMatch = true + matchText = e + } + case *types.MatchDenseExpr: + if len(e.EmbeddingData) > 0 { + hasVectorMatch = true + matchDense = e } } - if !found { - sourceFields = append(sourceFields, vectorFieldName) + } + } + + // Extract FusionExpr if present (used for hybrid search fusion) + var fusionExpr *types.FusionExpr + if len(req.MatchExprs) > 2 { + if fe, ok := req.MatchExprs[2].(*types.FusionExpr); ok { + fusionExpr = fe + } + } + _ = fusionExpr // TODO: implement fusion for ES hybrid search + + if hasTextMatch || hasVectorMatch { + if !isSkillIndex { + if !slices.Contains(outputColumns, common.PAGERANK_FLD) { + outputColumns = append(outputColumns, common.PAGERANK_FLD) + } + if !slices.Contains(outputColumns, common.TAG_FLD) { + outputColumns = append(outputColumns, common.TAG_FLD) } - req.SelectFields = sourceFields } } - queryBody["size"] = limit - queryBody["from"] = offset - - // Add sorting if specified - if req.OrderBy != nil { - sort := parseOrderByExpr(req.OrderBy) - if len(sort) > 0 { - queryBody["sort"] = sort - } + if hasVectorMatch && matchDense != nil && matchDense.VectorColumnName != "" { + outputColumns = append(outputColumns, matchDense.VectorColumnName) } - // Serialize query - var buf bytes.Buffer - if err := json.NewEncoder(&buf).Encode(queryBody); err != nil { - return nil, fmt.Errorf("error encoding query: %w", err) - } + // Build filter string + var filterParts []string - // Log search details - common.Debug("Elasticsearch searching indices", zap.Strings("indices", req.IndexNames)) - common.Debug("Elasticsearch DSL", zap.Any("dsl", queryBody)) - - // Build search request - reqES := esapi.SearchRequest{ - Index: req.IndexNames, - Body: &buf, - } - - // Execute search - res, err := reqES.Do(ctx, e.client) - if err != nil { - return nil, fmt.Errorf("search failed: %w", err) - } - defer res.Body.Close() - - if res.IsError() { - bodyBytes, err := io.ReadAll(res.Body) - if err != nil { - common.Error("Elasticsearch failed to read error response body", err) + if !isMetadataTable && (hasTextMatch || hasVectorMatch) { + if req.Filter != nil { + if availInt, ok := req.Filter["available_int"]; ok { + filterParts = append(filterParts, fmt.Sprintf("available_int=%v", availInt)) + } else if status, ok := req.Filter["status"]; ok { + filterParts = append(filterParts, fmt.Sprintf("status='%s'", status)) + } else { + if isSkillIndex { + filterParts = append(filterParts, "status='1'") + } else { + filterParts = append(filterParts, "available_int=1") + } + } } else { - common.Warn("Elasticsearch error response", zap.String("body", string(bodyBytes))) + if isSkillIndex { + filterParts = append(filterParts, "status='1'") + } else { + filterParts = append(filterParts, "available_int=1") + } } - return nil, fmt.Errorf("Elasticsearch returned error: %s", res.Status()) } - // Parse response - var esResp SearchResponse - if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil { - return nil, fmt.Errorf("error parsing response: %w", err) + // Build filter string from req.Filter + if req.Filter != nil { + filterCopy := req.Filter + if !isMetadataTable { + filterCopy = make(map[string]interface{}) + for k, v := range req.Filter { + if k != "kb_id" { + filterCopy[k] = v + } + } + } + + condStr := equivalentConditionToStr(filterCopy) + if condStr != "" { + filterParts = append(filterParts, condStr) + } + } + filterStr := strings.Join(filterParts, " AND ") + + orderBy := req.OrderBy + _ = orderBy // TODO: implement rank feature for ES + + var allResults []map[string]interface{} + totalHits := int64(0) + + for _, indexName := range req.IndexNames { + var indexNames []string + if strings.HasPrefix(indexName, "ragflow_doc_meta_") { + indexNames = []string{indexName} + } else { + indexNames = []string{indexName} + } + + for _, fullIndexName := range indexNames { + // Build search query body + queryBody := make(map[string]interface{}) + + // Determine text fields for the query (used indirectly via buildESKeywordQuery) + if matchText != nil && len(matchText.Fields) > 0 { + // Use matchText.Fields for text matching + } else if isSkillIndex { + // Use skill-specific fields in buildSkillKeywordQuery + } else { + // Use default fields in buildESKeywordQuery + } + + var vectorFieldName string + if !hasVectorMatch || matchDense == nil { + // Keyword-only search (no vector match) + queryBody["query"] = map[string]interface{}{ + "match_all": map[string]interface{}{}, + } + if hasTextMatch && matchText != nil { + if isSkillIndex { + queryBody["query"] = buildSkillKeywordQuery(matchText.MatchingText, nil, 1.0) + } else { + queryBody["query"] = buildESKeywordQuery(matchText.MatchingText, nil, 1.0) + } + // Add filter if present + if filterStr != "" { + if boolQuery, ok := queryBody["query"].(map[string]interface{}); ok { + if boolMap, ok := boolQuery["bool"].(map[string]interface{}); ok { + filterClauses := buildFilterClausesFromStr(filterStr) + if existingFilter, ok := boolMap["filter"].([]map[string]interface{}); ok { + boolMap["filter"] = append(existingFilter, filterClauses...) + } else { + boolMap["filter"] = filterClauses + } + } + } + } + } + } else { + // Hybrid search: keyword + vector + textWeight := 0.7 // default: vector weight = 0.3 + vectorWeight := 0.3 + if matchDense.ExtraOptions != nil { + if vw, ok := matchDense.ExtraOptions["text_weight"].(float64); ok { + textWeight = vw + } + if vw, ok := matchDense.ExtraOptions["vector_weight"].(float64); ok { + vectorWeight = vw + } + } + + // Build boolean query for text match and filters + var boolQuery map[string]interface{} + matchingText := "" + if matchText != nil { + matchingText = matchText.MatchingText + } + if isSkillIndex { + boolQuery = buildSkillKeywordQuery(matchingText, nil, textWeight) + } else { + boolQuery = buildESKeywordQuery(matchingText, nil, textWeight) + } + + // Add filter to bool query + if filterStr != "" { + if boolMap, ok := boolQuery["bool"].(map[string]interface{}); ok { + filterClauses := buildFilterClausesFromStr(filterStr) + if existingFilter, ok := boolMap["filter"].([]map[string]interface{}); ok { + boolMap["filter"] = append(existingFilter, filterClauses...) + } else { + boolMap["filter"] = filterClauses + } + } + } + + // Build kNN query + vectorData := matchDense.EmbeddingData + vectorFieldName = matchDense.VectorColumnName + k := matchDense.TopN + if k <= 0 { + k = req.Limit + } + if k <= 0 { + k = 1024 + } + numCandidates := k * 2 + + similarity := 0.0 + if matchDense.ExtraOptions != nil { + if sim, ok := matchDense.ExtraOptions["similarity"].(float64); ok { + similarity = sim + } + } + + knnQuery := map[string]interface{}{ + "field": vectorFieldName, + "query_vector": vectorData, + "k": k, + "num_candidates": numCandidates, + "similarity": similarity, + "boost": vectorWeight, + } + + queryBody["knn"] = knnQuery + queryBody["query"] = boolQuery + + // Add vector column to output columns + if vectorFieldName != "" { + outputColumns = append(outputColumns, vectorFieldName) + } + } + + queryBody["size"] = pageSize + queryBody["from"] = offset + + // Add sorting if specified + if orderBy != nil && len(orderBy.Fields) > 0 { + sort := parseOrderByExpr(orderBy) + if len(sort) > 0 { + queryBody["sort"] = sort + } + } + + // Serialize query + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(queryBody); err != nil { + return nil, fmt.Errorf("error encoding query: %w", err) + } + + // Log search details + common.Debug("Elasticsearch searching index", zap.String("index", fullIndexName)) + common.Debug("Elasticsearch DSL", zap.Any("dsl", queryBody)) + + // Build search request + reqES := esapi.SearchRequest{ + Index: []string{fullIndexName}, + Body: &buf, + } + + // Execute search + res, err := reqES.Do(ctx, e.client) + if err != nil { + common.Warn("Elasticsearch query failed", zap.String("index", fullIndexName), zap.Error(err)) + continue + } + + if res.IsError() { + bodyBytes, err := io.ReadAll(res.Body) + res.Body.Close() + if err != nil { + common.Error("Elasticsearch failed to read error response body", err) + } else { + common.Warn("Elasticsearch error response", zap.String("index", fullIndexName), zap.String("body", string(bodyBytes))) + } + continue + } + + // Parse response + var esResp SearchResponse + if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil { + res.Body.Close() + common.Warn("Elasticsearch failed to parse response", zap.String("index", fullIndexName), zap.Error(err)) + continue + } + + res.Body.Close() + + // Convert to unified response + searchChunks := convertESResponse(&esResp, vectorFieldName) + totalHits += esResp.Hits.Total.Value + + // Apply field name mapping and row_id handling + if !isSkillIndex { + GetFields(searchChunks, nil) + } + + allResults = append(allResults, searchChunks...) + } } - // Convert to unified response - chunks := convertESResponse(&esResp, vectorFieldName) + // Calculate scores and sort + if hasTextMatch || hasVectorMatch { + scoreColumn := "_score" + if hasTextMatch && hasVectorMatch { + scoreColumn = "SCORE" + } + + pagerankField := common.PAGERANK_FLD + if isSkillIndex { + pagerankField = "" + } + + allResults = calculateScores(allResults, scoreColumn, pagerankField) + allResults = sortByScore(allResults, len(allResults)) + } + + // Limit results + if len(allResults) > pageSize { + allResults = allResults[:pageSize] + } + + common.Debug("Search in Elasticsearch completed", zap.Int("returnedRows", len(allResults)), zap.Int64("totalHits", totalHits)) + return &types.SearchResult{ - Chunks: chunks, - Total: esResp.Hits.Total.Value, + Chunks: allResults, + Total: totalHits, }, nil } -// GetChunk gets a chunk by ID +// buildFilterClausesFromStr converts a filter string to ES filter clauses +func buildFilterClausesFromStr(filterStr string) []map[string]interface{} { + if filterStr == "" { + return nil + } + return []map[string]interface{}{ + {"query_string": map[string]interface{}{ + "query": filterStr, + }}, + } +} + +// GetChunk gets a chunk by ID using ES search API +// _id in ES is composite: {doc_id}_{kb_id}_{chunk_id} func (e *elasticsearchEngine) GetChunk(ctx context.Context, baseName, chunkID string, datasetIDs []string) (interface{}, error) { - // Build unified search request to get the chunk by ID - searchReq := &types.SearchRequest{ - IndexNames: []string{baseName}, - Limit: 1, - Offset: 0, - Filter: map[string]interface{}{ - "id": chunkID, - }, + // Try search by doc_id field (which is stored in the document) + for _, datasetID := range datasetIDs { + searchReq := map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "must": []map[string]interface{}{ + {"term": map[string]interface{}{"id": chunkID}}, + {"term": map[string]interface{}{"kb_id": datasetID}}, + }, + }, + }, + } + + body, err := json.Marshal(searchReq) + if err != nil { + return nil, fmt.Errorf("failed to marshal search request: %w", err) + } + + res, err := e.client.Search( + e.client.Search.WithContext(ctx), + e.client.Search.WithIndex(baseName), + e.client.Search.WithBody(bytes.NewReader(body)), + ) + if err != nil { + return nil, fmt.Errorf("failed to search for chunk: %w", err) + } + + if res.IsError() { + res.Body.Close() + return nil, fmt.Errorf("failed to search for chunk: %s", res.Status()) + } + + var searchResult map[string]interface{} + if err := json.NewDecoder(res.Body).Decode(&searchResult); err != nil { + res.Body.Close() + return nil, fmt.Errorf("failed to parse search response: %w", err) + } + res.Body.Close() + + hits, ok := searchResult["hits"].(map[string]interface{}) + if !ok { + continue + } + + hitList, ok := hits["hits"].([]interface{}) + if !ok || len(hitList) == 0 { + continue + } + + firstHit, ok := hitList[0].(map[string]interface{}) + if !ok { + continue + } + + source, ok := firstHit["_source"].(map[string]interface{}) + if !ok { + continue + } + + common.Info("GetChunk found hit", zap.String("baseName", baseName), zap.String("chunkID", chunkID)) + source["id"] = chunkID + return source, nil } - // Execute search - searchResp, err := e.Search(ctx, searchReq) - if err != nil { - return nil, fmt.Errorf("failed to search: %w", err) - } - - if len(searchResp.Chunks) == 0 { - return nil, nil - } - - return searchResp.Chunks[0], nil + common.Info("GetChunk no hits found", zap.String("baseName", baseName), zap.String("chunkID", chunkID)) + return nil, nil } // GetFields is not implemented for Elasticsearch @@ -1213,4 +1820,232 @@ func AddMustNot(query map[string]interface{}, clauses ...map[string]interface{}) func (e *elasticsearchEngine) GetDocIDs(chunks []map[string]interface{}) []string { common.Warn("GetDocIDs not implemented for Elasticsearch") return nil +} + +// equivalentConditionToStr converts a condition map to a filter string (for ES query_string) +func equivalentConditionToStr(condition map[string]interface{}) string { + if len(condition) == 0 { + return "" + } + + var cond []string + + for k, v := range condition { + if k == "_id" { + continue + } + if v == nil || v == "" { + continue + } + + // Handle list values + if list, ok := v.([]interface{}); ok && len(list) > 0 { + var items []string + for _, item := range list { + if s, ok := item.(string); ok { + items = append(items, fmt.Sprintf("%s:'%s'", k, strings.ReplaceAll(s, "'", "\\'"))) + } else { + items = append(items, fmt.Sprintf("%s:%v", k, item)) + } + } + if len(items) > 0 { + cond = append(cond, "("+strings.Join(items, " OR ")+")") + } + continue + } + + if list, ok := v.([]string); ok && len(list) > 0 { + var items []string + for _, item := range list { + items = append(items, fmt.Sprintf("%s:'%s'", k, strings.ReplaceAll(item, "'", "\\'"))) + } + if len(items) > 0 { + cond = append(cond, "("+strings.Join(items, " OR ")+")") + } + continue + } + + // Handle numeric values (no quotes) + if isNumericValue(v) { + cond = append(cond, fmt.Sprintf("%s:%v", k, v)) + continue + } + + // Handle string values (with quotes and escaping) + if str, ok := v.(string); ok { + cond = append(cond, fmt.Sprintf("%s:'%s'", k, strings.ReplaceAll(str, "'", "\\'"))) + continue + } + + // Fallback: treat as string + cond = append(cond, fmt.Sprintf("%s:'%v'", k, v)) + } + + if len(cond) == 0 { + return "" + } + return strings.Join(cond, " AND ") +} + +// isNumericValue checks if a value is numeric +func isNumericValue(v interface{}) bool { + switch v.(type) { + case int, int8, int16, int32, int64: + return true + case uint, uint8, uint16, uint32, uint64: + return true + case float32, float64: + return true + } + return false +} + +// calculateScores calculates _score for chunks +func calculateScores(chunks []map[string]interface{}, scoreColumn, pagerankField string) []map[string]interface{} { + for i := range chunks { + score := 0.0 + if scoreVal, ok := chunks[i][scoreColumn]; ok { + if f, ok := toFloat64(scoreVal); ok { + score += f + } + } + if pagerankField != "" { + if prVal, ok := chunks[i][pagerankField]; ok { + if f, ok := toFloat64(prVal); ok { + score += f + } + } + } + chunks[i]["_score"] = score + } + return chunks +} + +// toFloat64 converts a value to float64 +func toFloat64(v interface{}) (float64, bool) { + switch val := v.(type) { + case float64: + return val, true + case float32: + return float64(val), true + case int: + return float64(val), true + case int64: + return float64(val), true + } + return 0, false +} + +// sortByScore sorts chunks by _score descending and limits +func sortByScore(chunks []map[string]interface{}, limit int) []map[string]interface{} { + if len(chunks) == 0 { + return chunks + } + + // Sort by _score descending + sort.Slice(chunks, func(i, j int) bool { + scoreI := getChunkScore(chunks[i]) + scoreJ := getChunkScore(chunks[j]) + return scoreI > scoreJ + }) + + // Limit + if len(chunks) > limit && limit > 0 { + chunks = chunks[:limit] + } + + return chunks +} + +// getChunkScore extracts the score from a chunk +func getChunkScore(chunk map[string]interface{}) float64 { + if v, ok := chunk["_score"].(float64); ok { + return v + } + if v, ok := chunk["SCORE"].(float64); ok { + return v + } + if v, ok := chunk["SIMILARITY"].(float64); ok { + return v + } + return 0.0 +} + +// GetFields applies field mappings to chunks and returns a dict keyed by chunk ID. +// This mirrors the Infinity GetFields function behavior. +func GetFields(chunks []map[string]interface{}, fields []string) map[string]map[string]interface{} { + result := make(map[string]map[string]interface{}) + if len(chunks) == 0 { + return result + } + + // If fields is provided, create a set for lookup + fieldSet := make(map[string]bool) + for _, f := range fields { + fieldSet[f] = true + } + + for _, chunk := range chunks { + // Apply field mappings + // docnm -> docnm_kwd, title_tks, title_sm_tks + if val, ok := chunk["docnm"].(string); ok { + chunk["docnm_kwd"] = val + chunk["title_tks"] = val + chunk["title_sm_tks"] = val + } + + // important_keywords -> important_kwd (split by comma), important_tks + if val, ok := chunk["important_keywords"].(string); ok { + if val == "" { + chunk["important_kwd"] = []interface{}{} + } else { + parts := strings.Split(val, ",") + chunk["important_kwd"] = parts + } + chunk["important_tks"] = val + } else { + chunk["important_kwd"] = []interface{}{} + chunk["important_tks"] = []interface{}{} + } + + // questions -> question_kwd (split by newline), question_tks + if val, ok := chunk["questions"].(string); ok { + if val == "" { + chunk["question_kwd"] = []interface{}{} + } else { + parts := strings.Split(val, "\n") + chunk["question_kwd"] = parts + } + chunk["question_tks"] = val + } else { + chunk["question_kwd"] = []interface{}{} + chunk["question_tks"] = []interface{}{} + } + + // content -> content_with_weight, content_ltks, content_sm_ltks + if val, ok := chunk["content"].(string); ok { + chunk["content_with_weight"] = val + chunk["content_ltks"] = val + chunk["content_sm_ltks"] = val + } + + // authors -> authors_tks, authors_sm_tks + if val, ok := chunk["authors"].(string); ok { + chunk["authors_tks"] = val + chunk["authors_sm_tks"] = val + } + + // Build result map keyed by id + if id, ok := chunk["id"].(string); ok { + fieldMap := make(map[string]interface{}) + for field, value := range chunk { + if len(fieldSet) == 0 || fieldSet[field] { + fieldMap[field] = value + } + } + result[id] = fieldMap + } + } + + return result } \ No newline at end of file diff --git a/internal/engine/elasticsearch/client.go b/internal/engine/elasticsearch/client.go index b5680f065..c69d78f93 100644 --- a/internal/engine/elasticsearch/client.go +++ b/internal/engine/elasticsearch/client.go @@ -17,11 +17,16 @@ package elasticsearch import ( + "bytes" "context" "encoding/json" "fmt" + "io" "net/http" + "os" + "path/filepath" "ragflow/internal/server" + "ragflow/internal/utility" "time" "github.com/elastic/go-elasticsearch/v8" @@ -81,6 +86,16 @@ func NewEngine(cfg interface{}) (*elasticsearchEngine, error) { config: esConfig, } + // Create two index templates for different index types + // Template for chunk indices (ragflow_*) - priority 1 + if err := engine.CreateIndexTemplate(context.Background(), "ragflow_mapping", "ragflow_*", "mapping.json", 1); err != nil { + return nil, fmt.Errorf("failed to create chunk index template: %w", err) + } + // Template for doc_meta indices (ragflow_doc_meta_*) - priority 2 (higher than ragflow_*) + if err := engine.CreateIndexTemplate(context.Background(), "ragflow_doc_meta_mapping", "ragflow_doc_meta_*", "doc_meta_es_mapping.json", 2); err != nil { + return nil, fmt.Errorf("failed to create doc_meta index template: %w", err) + } + return engine, nil } @@ -109,6 +124,82 @@ func (e *elasticsearchEngine) Close() error { return nil } +// CreateIndexTemplate creates an index template with the specified mapping +// The template will be automatically applied to any new index matching the pattern +func (e *elasticsearchEngine) CreateIndexTemplate(ctx context.Context, templateName, indexPattern, mappingFileName string, priority ...int) error { + if templateName == "" || indexPattern == "" { + return fmt.Errorf("template name and index pattern cannot be empty") + } + + p := 1 + if len(priority) > 0 { + p = priority[0] + } + + if mappingFileName == "" { + mappingFileName = "mapping.json" + } + + // Read mapping from file + mappingPath := filepath.Join(utility.GetProjectRoot(), "conf", mappingFileName) + data, err := os.ReadFile(mappingPath) + if err != nil { + return fmt.Errorf("failed to read mapping file: %w", err) + } + + var mapping map[string]interface{} + if err := json.Unmarshal(data, &mapping); err != nil { + return fmt.Errorf("failed to parse mapping file: %w", err) + } + + // Separate settings and mappings from the mapping file + templateSettings := mapping["settings"] + templateMappings := mapping["mappings"] + + // Build template body with proper structure + templateBody := map[string]interface{}{ + "index_patterns": []string{indexPattern}, + "priority": p, // Configurable priority to override existing templates + "template": map[string]interface{}{ + "settings": templateSettings, + "mappings": templateMappings, + }, + } + + templateBytes, err := json.Marshal(templateBody) + if err != nil { + return fmt.Errorf("failed to marshal template: %w", err) + } + + // Create or update template + req := esapi.IndicesPutIndexTemplateRequest{ + Name: templateName, + Body: bytes.NewReader(templateBytes), + } + + res, err := req.Do(ctx, e.client) + if err != nil { + return fmt.Errorf("failed to create index template: %w", err) + } + defer res.Body.Close() + + if res.IsError() { + bodyBytes, _ := io.ReadAll(res.Body) + return fmt.Errorf("failed to create index template: %s, body: %s", res.Status(), string(bodyBytes)) + } + + var result map[string]interface{} + if err := json.NewDecoder(res.Body).Decode(&result); err != nil { + return fmt.Errorf("failed to parse response: %w", err) + } + + if acknowledged, ok := result["acknowledged"].(bool); !ok || !acknowledged { + return fmt.Errorf("index template creation not acknowledged") + } + + return nil +} + // GetClusterStats gets Elasticsearch cluster statistics // Reference: curl -XGET "http://{es_host}/_cluster/stats" -H "kbn-xsrf: reporting" func (e *elasticsearchEngine) GetClusterStats() (map[string]interface{}, error) { diff --git a/internal/engine/elasticsearch/metadata.go b/internal/engine/elasticsearch/metadata.go index 272708680..065ffb76e 100644 --- a/internal/engine/elasticsearch/metadata.go +++ b/internal/engine/elasticsearch/metadata.go @@ -21,6 +21,7 @@ import ( "context" "encoding/json" "fmt" + "io" "strings" "github.com/elastic/go-elasticsearch/v8/esapi" @@ -32,6 +33,17 @@ import ( // CreateMetadataStore creates the document metadata index func (e *elasticsearchEngine) CreateMetadataStore(ctx context.Context, tenantID string) error { indexName := buildMetadataIndexName(tenantID) + + // Check if index already exists + exists, err := e.indexExists(ctx, indexName) + if err != nil { + return fmt.Errorf("failed to check index existence: %w", err) + } + if exists { + return nil + } + + // Index will be created with mapping from index template (ragflow_doc_meta_mapping) req := esapi.IndicesCreateRequest{ Index: indexName, } @@ -41,15 +53,29 @@ func (e *elasticsearchEngine) CreateMetadataStore(ctx context.Context, tenantID } defer res.Body.Close() if res.IsError() { - return fmt.Errorf("elasticsearch returned error: %s", res.Status()) + bodyBytes, _ := io.ReadAll(res.Body) + return fmt.Errorf("elasticsearch returned error: %s, body: %s", res.Status(), string(bodyBytes)) } + + // Parse response + var result map[string]interface{} + if err := json.NewDecoder(res.Body).Decode(&result); err != nil { + return fmt.Errorf("failed to parse response: %w", err) + } + + acknowledged, ok := result["acknowledged"].(bool) + if !ok || !acknowledged { + return fmt.Errorf("metadata index creation not acknowledged") + } + return nil } // InsertMetadata inserts documents into tenant's metadata index +// If a document with the same id and kb_id already exists, it will be updated with the new value func (e *elasticsearchEngine) InsertMetadata(ctx context.Context, metadata []map[string]interface{}, tenantID string) ([]string, error) { indexName := buildMetadataIndexName(tenantID) - common.Info("Inserting metadata into Elasticsearch index", zap.String("index_name", indexName), zap.String("tenant_id", tenantID), zap.Int("doc_count", len(metadata))) + common.Info("ElasticsearchConnection.InsertMetadata called", zap.String("index_name", indexName), zap.String("tenant_id", tenantID), zap.Int("doc_count", len(metadata))) if len(metadata) == 0 { return []string{}, nil @@ -75,28 +101,34 @@ func (e *elasticsearchEngine) InsertMetadata(ctx context.Context, metadata []map // Build bulk request body var buf bytes.Buffer for _, doc := range metadata { - // Action line - index operation - action := map[string]interface{}{ + docIDRaw, hasID := doc["id"] + kbIDRaw, hasKBID := doc["kb_id"] + docID, idOK := docIDRaw.(string) + kbID, kbOK := kbIDRaw.(string) + if !hasID || !hasKBID || !idOK || !kbOK || strings.TrimSpace(docID) == "" || strings.TrimSpace(kbID) == "" { + common.Warn("Skipping metadata document without id or kb_id") + continue + } + + // Action line: use json.Marshal to properly escape string values + compositeID := fmt.Sprintf("%d:%s|%d:%s", len(docID), docID, len(kbID), kbID) + action, err := json.Marshal(map[string]interface{}{ "index": map[string]interface{}{ "_index": indexName, + "_id": compositeID, }, - } - actionBytes, err := json.Marshal(action) + }) if err != nil { common.Error("Failed to marshal bulk action", err) return nil, fmt.Errorf("failed to marshal bulk action: %w", err) } - buf.Write(actionBytes) + buf.Write(action) buf.WriteByte('\n') - // Document line - meta_fields is stored as-is (ES can handle nested objects) - docBytes, err := json.Marshal(doc) - if err != nil { - common.Error("Failed to marshal document", err) - return nil, fmt.Errorf("failed to marshal document: %w", err) + // Document line + if err := json.NewEncoder(&buf).Encode(doc); err != nil { + return nil, fmt.Errorf("failed to encode document: %w", err) } - buf.Write(docBytes) - buf.WriteByte('\n') } // Execute bulk request @@ -113,8 +145,9 @@ func (e *elasticsearchEngine) InsertMetadata(ctx context.Context, metadata []map defer res.Body.Close() if res.IsError() { - common.Sugar.Errorw("Elasticsearch bulk request returned error", "status", res.Status()) - return nil, fmt.Errorf("elasticsearch bulk request returned error: %s", res.Status()) + bodyBytes, _ := io.ReadAll(res.Body) + common.Sugar.Errorw("Elasticsearch bulk request returned error", "status", res.Status(), "body", string(bodyBytes)) + return nil, fmt.Errorf("elasticsearch bulk request returned error: %s, body: %s", res.Status(), string(bodyBytes)) } // Parse bulk response to check for errors @@ -129,14 +162,15 @@ func (e *elasticsearchEngine) InsertMetadata(ctx context.Context, metadata []map common.Warn("Bulk request had some errors") } - common.Info("Successfully inserted metadata into Elasticsearch index", zap.String("index_name", indexName), zap.Int("doc_count", len(metadata))) + common.Info("ElasticsearchConnection.InsertMetadata result", zap.String("index_name", indexName), zap.Int("count", len(metadata))) return []string{}, nil } // UpdateMetadata updates document metadata in tenant's metadata index +// The metaFields map will fully replace the existing meta_fields func (e *elasticsearchEngine) UpdateMetadata(ctx context.Context, docID string, datasetID string, metaFields map[string]interface{}, tenantID string) error { indexName := buildMetadataIndexName(tenantID) - common.Info("Updating metadata in Elasticsearch index", zap.String("index_name", indexName), zap.String("docID", docID), zap.String("datasetID", datasetID)) + common.Info("ElasticsearchConnection.UpdateMetadata called", zap.String("index_name", indexName), zap.String("docID", docID), zap.String("datasetID", datasetID)) // Check if index exists exists, err := e.indexExists(ctx, indexName) @@ -193,14 +227,16 @@ func (e *elasticsearchEngine) UpdateMetadata(ctx context.Context, docID string, return fmt.Errorf("elasticsearch update by query returned error: %s", res.Status()) } - common.Info("Successfully updated metadata in Elasticsearch index", zap.String("index_name", indexName), zap.String("docID", docID)) + common.Info("ElasticsearchConnection.UpdateMetadata completes", zap.String("index_name", indexName), zap.String("docID", docID)) return nil } // DeleteMetadata deletes metadata from tenant's metadata index by condition +// The condition is a map used to build an ES query (e.g., map["kb_id"]="xxx") +// Returns the number of deleted documents func (e *elasticsearchEngine) DeleteMetadata(ctx context.Context, condition map[string]interface{}, tenantID string) (int64, error) { indexName := buildMetadataIndexName(tenantID) - common.Info("Deleting metadata from Elasticsearch index", zap.String("index_name", indexName), zap.Any("condition", condition)) + common.Info("ElasticsearchConnection.DeleteMetadata called", zap.String("index_name", indexName), zap.Any("condition", condition)) // Check if index exists exists, err := e.indexExists(ctx, indexName) @@ -258,10 +294,205 @@ func (e *elasticsearchEngine) DeleteMetadata(ctx context.Context, condition map[ deleted = int64(d) } - common.Info("Successfully deleted metadata", zap.String("index_name", indexName), zap.Int64("deleted_count", deleted)) + common.Info("ElasticsearchConnection.DeleteMetadata completes", zap.String("index_name", indexName), zap.Int64("deleted_count", deleted)) return deleted, nil } +// DeleteMetadataKeys deletes specific metadata keys from a document's meta_fields. +// If deleting those keys leaves no metadata entries, the metadata document is removed. +func (e *elasticsearchEngine) DeleteMetadataKeys(ctx context.Context, docID string, datasetID string, keys []string, tenantID string) error { + indexName := buildMetadataIndexName(tenantID) + common.Info("ElasticsearchConnection.DeleteMetadataKeys called", zap.String("index_name", indexName), zap.String("docID", docID), zap.Any("keys", keys)) + + // Check if index exists + exists, err := e.indexExists(ctx, indexName) + if err != nil { + return fmt.Errorf("failed to check index existence: %w", err) + } + if !exists { + return fmt.Errorf("index '%s' does not exist", indexName) + } + + // Build the document ID for query (no escaping needed for ES term queries) + docIDTerm := docID + datasetIDTerm := datasetID + + // Build query to find the document + query := map[string]interface{}{ + "bool": map[string]interface{}{ + "must": []map[string]interface{}{ + {"term": map[string]interface{}{"id": docIDTerm}}, + {"term": map[string]interface{}{"kb_id": datasetIDTerm}}, + }, + }, + } + + // First, get the current meta_fields to check if it will be empty after deletion + getReq := map[string]interface{}{ + "query": query, + "_source": []string{"meta_fields"}, + "size": 1, + } + + getBytes, err := json.Marshal(getReq) + if err != nil { + return fmt.Errorf("failed to marshal get request: %w", err) + } + + // Use esapi.SearchRequest directly + getSearchReq := esapi.SearchRequest{ + Index: []string{indexName}, + Body: bytes.NewReader(getBytes), + } + + getRes, err := getSearchReq.Do(ctx, e.client) + if err != nil { + return fmt.Errorf("failed to get current metadata: %w", err) + } + defer getRes.Body.Close() + + if getRes.IsError() { + return fmt.Errorf("elasticsearch get request returned error: %s", getRes.Status()) + } + + var getResult map[string]interface{} + if err := json.NewDecoder(getRes.Body).Decode(&getResult); err != nil { + return fmt.Errorf("failed to parse get response: %w", err) + } + + hits, ok := getResult["hits"].(map[string]interface{}) + if !ok { + return fmt.Errorf("invalid get response format") + } + hitsArray, ok := hits["hits"].([]interface{}) + if !ok || len(hitsArray) == 0 { + return fmt.Errorf("document not found: %s", docID) + } + + // Check current meta_fields + firstHit, ok := hitsArray[0].(map[string]interface{}) + if !ok { + return fmt.Errorf("invalid hit format") + } + source, ok := firstHit["_source"].(map[string]interface{}) + if !ok { + return fmt.Errorf("invalid source format") + } + metaFieldsVal, hasMetaFields := source["meta_fields"] + + var currentMetaFields map[string]interface{} + if hasMetaFields && metaFieldsVal != nil { + switch v := metaFieldsVal.(type) { + case map[string]interface{}: + currentMetaFields = v + case string: + if unmarshalErr := json.Unmarshal([]byte(v), ¤tMetaFields); unmarshalErr != nil { + common.Warn("Failed to parse meta_fields JSON", zap.String("docID", docID), zap.Error(unmarshalErr)) + currentMetaFields = make(map[string]interface{}) + } + } + } + + // If no current meta_fields or already empty, nothing to delete + if currentMetaFields == nil || len(currentMetaFields) == 0 { + common.Info("No metadata fields to delete from document", zap.String("docID", docID)) + return nil + } + + // Calculate which keys will be removed + keysToRemove := make(map[string]bool) + for _, k := range keys { + keysToRemove[k] = true + } + + // Check if any keys actually exist and would be removed + hasKeysToRemove := false + for k := range currentMetaFields { + if keysToRemove[k] { + hasKeysToRemove = true + break + } + } + + if !hasKeysToRemove { + common.Info("No matching keys to delete from document", zap.String("docID", docID)) + return nil + } + + // Count remaining keys after deletion (keys that are NOT being removed) + remainingKeys := 0 + for k := range currentMetaFields { + if !keysToRemove[k] { + remainingKeys++ + } + } + + // If no other keys would remain after deletion, delete the document directly + if remainingKeys == 0 { + common.Info("All metadata keys would be deleted, removing document from index", zap.String("docID", docID)) + + // Build condition for deletion using docID and datasetID + condition := map[string]interface{}{ + "id": docIDTerm, + "kb_id": datasetIDTerm, + } + + // Use existing DeleteMetadata method which handles the deletion properly + _, err := e.DeleteMetadata(ctx, condition, tenantID) + if err != nil { + return fmt.Errorf("failed to delete document: %w", err) + } + + common.Info("Successfully removed document with empty meta_fields", zap.String("docID", docID)) + return nil + } + + // Some keys will remain, so remove only the specified keys + keysParam := make([]string, len(keys)) + for i, k := range keys { + keysParam[i] = k + } + + // Build update script that removes keys from meta_fields map + scriptSource := "for(int i=0;i 0 { existingMetaFieldsVal := metaFieldsData[0] - // Parse existing meta_fields if it's a string + // Parse existing meta_fields if it's a string or []uint8 var existingMetaFields map[string]interface{} if existingMetaFieldsVal != nil { switch v := existingMetaFieldsVal.(type) { @@ -237,6 +237,16 @@ func (e *infinityEngine) UpdateMetadata(ctx context.Context, docID string, datas common.Warn(fmt.Sprintf("Failed to parse existing meta_fields: %v", err)) existingMetaFields = make(map[string]interface{}) } + case []uint8: + // Handle raw bytes from Infinity - Infinity prefixes JSON with 4 bytes (likely length), skip them + decoded := v + if len(decoded) > 4 { + decoded = decoded[4:] + } + if err := json.Unmarshal(decoded, &existingMetaFields); err != nil { + common.Warn(fmt.Sprintf("Failed to parse existing meta_fields from []uint8: %v", err)) + existingMetaFields = make(map[string]interface{}) + } case map[string]interface{}: existingMetaFields = v } @@ -285,6 +295,7 @@ func (e *infinityEngine) UpdateMetadata(ctx context.Context, docID string, datas } // DeleteMetadata deletes metadata from tenant's metadata table by condition +// Returns the number of deleted documents. func (e *infinityEngine) DeleteMetadata(ctx context.Context, condition map[string]interface{}, tenantID string) (int64, error) { tableName := buildMetadataTableName(tenantID) @@ -341,6 +352,143 @@ func (e *infinityEngine) DeleteMetadata(ctx context.Context, condition map[strin return delResp.DeletedRows, nil } +// DeleteMetadataKeys deletes specific metadata keys from a document's meta_fields. +// If deleting those keys leaves no metadata entries, the metadata row is removed. +func (e *infinityEngine) DeleteMetadataKeys(ctx context.Context, docID string, datasetID string, keys []string, tenantID string) error { + tableName := buildMetadataTableName(tenantID) + common.Info("InfinityConnection.DeleteMetadataKeys called", zap.String("tableName", tableName), zap.String("docID", docID), zap.Any("keys", keys)) + + db, err := e.client.conn.GetDatabase(e.client.dbName) + if err != nil { + return fmt.Errorf("failed to get database: %w", err) + } + + table, err := db.GetTable(tableName) + if err != nil { + return fmt.Errorf("failed to get metadata table %s: %w", tableName, err) + } + + // Build filter to find the document + escapedDocID := strings.ReplaceAll(docID, "'", "''") + escapedDatasetID := strings.ReplaceAll(datasetID, "'", "''") + filter := fmt.Sprintf("id = '%s' AND kb_id = '%s'", escapedDocID, escapedDatasetID) + + // Query existing metadata to get current meta_fields + queryTable := table.Output([]string{"id", "kb_id", "meta_fields"}).Filter(filter).Limit(1).Offset(0) + result, err := queryTable.ToResult() + if err != nil { + return fmt.Errorf("failed to query existing metadata: %w", err) + } + + qr, ok := result.(*infinity.QueryResult) + if !ok || qr == nil || len(qr.Data["id"]) == 0 { + return fmt.Errorf("document not found: %s", docID) + } + + // Get existing meta_fields + var existingMetaFields map[string]interface{} + if metaFieldsData, exists := qr.Data["meta_fields"]; exists && len(metaFieldsData) > 0 { + if metaFieldsData[0] != nil { + switch v := metaFieldsData[0].(type) { + case string: + if err := json.Unmarshal([]byte(v), &existingMetaFields); err != nil { + common.Warn("Failed to parse meta_fields JSON", zap.String("docID", docID), zap.Error(err)) + existingMetaFields = make(map[string]interface{}) + } + case []uint8: + // Handle raw bytes from Infinity - Infinity prefixes JSON with 4 bytes (likely length), skip them + decoded := v + if len(decoded) > 4 { + decoded = decoded[4:] + } + if err := json.Unmarshal(decoded, &existingMetaFields); err != nil { + common.Warn("Failed to parse meta_fields JSON from []uint8", zap.String("docID", docID), zap.String("err", err.Error())) + existingMetaFields = make(map[string]interface{}) + } + case map[string]interface{}: + existingMetaFields = v + default: + common.Debug("meta_fields unexpected type", zap.String("type", fmt.Sprintf("%T", metaFieldsData[0])), zap.Any("value", metaFieldsData[0])) + } + } + } else { + common.Debug("meta_fields not found in qr.Data or empty", zap.Any("exists", exists)) + } + + if existingMetaFields == nil { + existingMetaFields = make(map[string]interface{}) + } + + // Build set of keys to remove + keysToRemove := make(map[string]bool) + for _, k := range keys { + keysToRemove[k] = true + } + + // Check if any keys actually exist and would be removed + hasKeysToRemove := false + for k := range existingMetaFields { + if keysToRemove[k] { + hasKeysToRemove = true + break + } + } + + if !hasKeysToRemove { + common.Info( + "No matching keys to delete from document", + zap.String("docID", docID), + zap.Int("existingMetaFieldCount", len(existingMetaFields)), + zap.Int("keysCount", len(keys)), + ) + return nil + } + + // Count remaining keys after deletion (keys that are NOT being removed) + remainingKeys := 0 + for k := range existingMetaFields { + if !keysToRemove[k] { + remainingKeys++ + } + } + + // If no other keys would remain after deletion, delete the document directly + if remainingKeys == 0 { + common.Info("All metadata keys would be deleted, removing document from index", zap.String("docID", docID)) + + // Use existing DeleteMetadata method which handles the deletion properly + condition := map[string]interface{}{ + "id": docID, + "kb_id": datasetID, + } + _, err := e.DeleteMetadata(ctx, condition, tenantID) + if err != nil { + return fmt.Errorf("failed to delete document: %w", err) + } + + common.Info("Successfully removed document with empty meta_fields", zap.String("docID", docID)) + return nil + } + + // Some keys will remain, so remove only the specified keys + for _, key := range keys { + delete(existingMetaFields, key) + } + + // Update with the modified metadata + updatedFields := map[string]interface{}{ + "meta_fields": utility.ConvertMapToJSONString(existingMetaFields), + } + + _, err = table.Update(filter, updatedFields) + if err != nil { + return fmt.Errorf("failed to delete metadata keys: %w", err) + } + + common.Info("InfinityConnection.DeleteMetadataKeys completed", zap.String("tableName", tableName), zap.String("docID", docID)) + return nil +} + // DropMetadataStore drops a metadata table from Infinity func (e *infinityEngine) DropMetadataStore(ctx context.Context, tenantID string) error { tableName := buildMetadataTableName(tenantID) diff --git a/internal/handler/chunk.go b/internal/handler/chunk.go index 8159ce059..71f8df4fd 100644 --- a/internal/handler/chunk.go +++ b/internal/handler/chunk.go @@ -133,7 +133,7 @@ func (h *ChunkHandler) Get(c *gin.Context) { return } - chunkID := c.Query("chunk_id") + chunkID := c.Param("chunk_id") if chunkID == "" { c.JSON(http.StatusBadRequest, gin.H{ "code": 400, @@ -337,7 +337,7 @@ func (h *ChunkHandler) UpdateChunk(c *gin.Context) { }) } -// Remove handles chunk removal requests +// RemoveChunks handles chunk removal requests // @Summary Remove Chunks // @Description Remove chunks from a document // @Tags chunks @@ -345,14 +345,24 @@ func (h *ChunkHandler) UpdateChunk(c *gin.Context) { // @Produce json // @Param request body service.RemoveChunksRequest true "remove chunks request" // @Success 200 {object} map[string]interface{} -// @Router /v1/chunk/rm [post] -func (h *ChunkHandler) Remove(c *gin.Context) { +// @Router /api/v1/datasets/{dataset_id}/documents/{document_id}/chunks [delete] +func (h *ChunkHandler) RemoveChunks(c *gin.Context) { user, errorCode, errorMessage := GetUser(c) if errorCode != common.CodeSuccess { jsonError(c, errorCode, errorMessage) return } + // Get document_id from URL path + docID := c.Param("document_id") + if docID == "" { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": "document_id is required", + }) + return + } + var req service.RemoveChunksRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{ @@ -362,6 +372,8 @@ func (h *ChunkHandler) Remove(c *gin.Context) { return } + req.DocID = docID + if req.DocID == "" { c.JSON(http.StatusBadRequest, gin.H{ "code": 400, diff --git a/internal/handler/document.go b/internal/handler/document.go index b2a5be438..7c1e5ad4a 100644 --- a/internal/handler/document.go +++ b/internal/handler/document.go @@ -565,6 +565,124 @@ func (h *DocumentHandler) SetMeta(c *gin.Context) { }) } +// DeleteMetaRequest represents the request for deleting document metadata +type DeleteMetaRequest struct { + DocID string `json:"doc_id" binding:"required"` + Keys string `json:"keys"` // optional - if provided, deletes specific keys; otherwise deletes entire document metadata +} + +// DeleteMeta handles the delete metadata request for a document +// If Keys is provided, deletes specific metadata keys; otherwise deletes entire document metadata +// @Summary Delete Document Metadata +// @Description Delete metadata keys or entire document metadata for a specific document +// @Tags documents +// @Accept json +// @Produce json +// @Security ApiKeyAuth +// @Param request body DeleteMetaRequest true "metadata keys to delete or empty to delete all" +// @Success 200 {object} map[string]interface{} +// @Router /v1/document/delete_meta [post] +func (h *DocumentHandler) DeleteMeta(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + var req DeleteMetaRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 1, + "message": err.Error(), + }) + return + } + + if req.DocID == "" { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 1, + "message": "doc_id is required", + }) + return + } + + // Authorization: user must be able to access the document's dataset. + doc, err := h.documentService.GetDocumentByID(req.DocID) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 1, + "message": "document not found", + }) + return + } + if !h.datasetService.Accessible(doc.KbID, user.ID) { + jsonError(c, common.CodeAuthenticationError, "No authorization.") + return + } + + // If Keys is provided, parse and delete specific keys; otherwise delete entire document metadata + if req.Keys != "" { + // Parse keys JSON string - expected to be a list of key names to delete + var keys []string + if err := json.Unmarshal([]byte(req.Keys), &keys); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 1, + "message": "Json syntax error: " + err.Error(), + }) + return + } + + if keys == nil || len(keys) == 0 { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 1, + "message": "keys list is required", + }) + return + } + + err := h.documentService.DeleteDocumentMetadata(req.DocID, keys) + if err != nil { + errMsg := err.Error() + if strings.Contains(errMsg, "no such document") || strings.Contains(errMsg, "document not found") { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 1, + "message": errMsg, + }) + } else { + c.JSON(http.StatusInternalServerError, gin.H{ + "code": 1, + "message": "Failed to delete metadata: " + errMsg, + }) + } + return + } + } else { + // Delete entire document metadata + err := h.documentService.DeleteDocumentAllMetadata(req.DocID) + if err != nil { + errMsg := err.Error() + if strings.Contains(errMsg, "no such document") || strings.Contains(errMsg, "document not found") { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 1, + "message": errMsg, + }) + } else { + c.JSON(http.StatusInternalServerError, gin.H{ + "code": 1, + "message": "Failed to delete metadata: " + errMsg, + }) + } + return + } + } + + c.JSON(http.StatusOK, gin.H{ + "code": 0, + "message": "success", + "data": true, + }) +} + type ParseDocumentRequest struct { Documents []string `json:"documents" binding:"required"` } diff --git a/internal/handler/kb.go b/internal/handler/kb.go index a9f42a8a1..c44fa741c 100644 --- a/internal/handler/kb.go +++ b/internal/handler/kb.go @@ -17,11 +17,8 @@ package handler import ( - "encoding/json" "net/http" - "os" "ragflow/internal/common" - "ragflow/internal/engine" "ragflow/internal/service" "strings" @@ -426,174 +423,4 @@ func (h *KnowledgebaseHandler) GetBasicInfo(c *gin.Context) { } jsonResponse(c, common.CodeSuccess, map[string]interface{}{}, "success") -} - -// CreateDatasetInDocEngine handles the create dataset request for a knowledge base -// @Summary Create Dataset in Doc Engine -// @Description Create the Infinity table for a knowledge base -// @Tags knowledgebase -// @Accept json -// @Produce json -// @Security ApiKeyAuth -// @Param request body service.CreateDatasetTableRequest true "create dataset request" -// @Success 200 {object} map[string]interface{} -// @Router /v1/kb/doc_engine_table [post] -func (h *KnowledgebaseHandler) CreateDatasetInDocEngine(c *gin.Context) { - user, errorCode, errorMessage := GetUser(c) - if errorCode != common.CodeSuccess { - jsonError(c, errorCode, errorMessage) - return - } - - var req service.CreateDatasetTableRequest - if err := c.ShouldBindJSON(&req); err != nil { - jsonError(c, common.CodeDataError, err.Error()) - return - } - - // Check authorization - if !h.kbService.Accessible(req.KBID, user.ID) { - jsonError(c, common.CodeAuthenticationError, "No authorization.") - return - } - - result, code, err := h.kbService.CreateDatasetInDocEngine(&req) - if err != nil { - jsonError(c, code, err.Error()) - return - } - - jsonResponse(c, common.CodeSuccess, result, "success") -} - -// DeleteDatasetInDocEngineRequest represents the request for deleting a dataset table -type DeleteDatasetInDocEngineRequest struct { - KBID string `json:"kb_id" binding:"required"` -} - -// DeleteDatasetInDocEngine handles the delete dataset request for a knowledge base -// @Summary Delete Dataset in Doc Engine -// @Description Delete the Infinity table for a knowledge base -// @Tags knowledgebase -// @Accept json -// @Produce json -// @Security ApiKeyAuth -// @Param request body DeleteDatasetInDocEngineRequest true "delete dataset request" -// @Success 200 {object} map[string]interface{} -// @Router /v1/kb/doc_engine_table [delete] -func (h *KnowledgebaseHandler) DeleteDatasetInDocEngine(c *gin.Context) { - user, errorCode, errorMessage := GetUser(c) - if errorCode != common.CodeSuccess { - jsonError(c, errorCode, errorMessage) - return - } - - var req DeleteDatasetInDocEngineRequest - if err := c.ShouldBindJSON(&req); err != nil { - jsonError(c, common.CodeDataError, err.Error()) - return - } - - // Check authorization - if !h.kbService.Accessible(req.KBID, user.ID) { - jsonError(c, common.CodeAuthenticationError, "No authorization.") - return - } - - code, err := h.kbService.DeleteDatasetInDocEngine(req.KBID) - if err != nil { - jsonError(c, code, err.Error()) - return - } - - jsonResponse(c, common.CodeSuccess, nil, "success") -} - -// InsertDatasetFromFileRequest request for inserting chunks into dataset from file -type InsertDatasetFromFileRequest struct { - FilePath string `json:"file_path" binding:"required"` -} - -// @Summary Insert chunks into dataset from file -// @Description Internal: Insert into dataset table from a JSON file (table name extracted from file) -// @Tags knowledgebase -// @Accept json -// @Produce json -// @Security ApiKeyAuth -// @Param request body InsertDatasetFromFileRequest true "insert dataset request" -// @Success 200 {object} map[string]interface{} -// @Router /v1/kb/insert_from_file [post] -func (h *KnowledgebaseHandler) InsertDatasetFromFile(c *gin.Context) { - _, errorCode, errorMessage := GetUser(c) - if errorCode != common.CodeSuccess { - jsonError(c, errorCode, errorMessage) - return - } - - var req InsertDatasetFromFileRequest - if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(http.StatusBadRequest, gin.H{ - "code": 400, - "message": err.Error(), - }) - return - } - - if req.FilePath == "" { - c.JSON(http.StatusBadRequest, gin.H{ - "code": 400, - "message": "file_path is required", - }) - return - } - - // Read the JSON file - data, err := os.ReadFile(req.FilePath) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{ - "code": 400, - "message": "failed to read file: " + err.Error(), - }) - return - } - - // Parse JSON - format: {"table_name": ..., "knowledgebase_id": ..., "chunks": [...]} - var debugFormat struct { - TableNamePrefix string `json:"table_name"` - KnowledgebaseID string `json:"knowledgebase_id"` - Chunks []map[string]interface{} `json:"chunks"` - } - - if err := json.Unmarshal(data, &debugFormat); err != nil || debugFormat.Chunks == nil { - c.JSON(http.StatusBadRequest, gin.H{ - "code": 400, - "message": "invalid JSON format: expected {\"table_name\": ..., \"knowledgebase_id\": ..., \"chunks\": [...]}", - }) - return - } - - if len(debugFormat.Chunks) == 0 { - c.JSON(http.StatusBadRequest, gin.H{ - "code": 400, - "message": "no chunks found in file", - }) - return - } - - // Get the document engine and insert - docEngine := engine.Get() - result, err := docEngine.InsertChunks(c.Request.Context(), debugFormat.Chunks, debugFormat.TableNamePrefix, debugFormat.KnowledgebaseID) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{ - "code": 500, - "message": "failed to insert into dataset: " + err.Error(), - }) - return - } - - c.JSON(http.StatusOK, gin.H{ - "code": 0, - "data": result, - "message": "success", - }) } \ No newline at end of file diff --git a/internal/handler/tenant.go b/internal/handler/tenant.go index 90fcde458..4505cb7bc 100644 --- a/internal/handler/tenant.go +++ b/internal/handler/tenant.go @@ -32,13 +32,15 @@ import ( type TenantHandler struct { tenantService *service.TenantService userService *service.UserService + kbService *service.KnowledgebaseService } // NewTenantHandler create tenant handler -func NewTenantHandler(tenantService *service.TenantService, userService *service.UserService) *TenantHandler { +func NewTenantHandler(tenantService *service.TenantService, userService *service.UserService, kbService *service.KnowledgebaseService) *TenantHandler { return &TenantHandler{ tenantService: tenantService, userService: userService, + kbService: kbService, } } @@ -192,16 +194,16 @@ func (h *TenantHandler) TenantList(c *gin.Context) { }) } -// CreateMetadataInDocEngine handles the create doc meta table request -// @Summary Create Doc Meta Table -// @Description Create the document metadata table for a tenant +// CreateMetadataStore handles the create metadata store request +// @Summary Create Metadata Store +// @Description Create the metadata store for a tenant // @Tags tenants // @Accept json // @Produce json // @Security ApiKeyAuth // @Success 200 {object} map[string]interface{} -// @Router /v1/tenant/doc_engine_metadata_table [post] -func (h *TenantHandler) CreateMetadataInDocEngine(c *gin.Context) { +// @Router /v1/tenant/metadata_store [post] +func (h *TenantHandler) CreateMetadataStore(c *gin.Context) { user, errorCode, errorMessage := GetUser(c) if errorCode != common.CodeSuccess { jsonError(c, errorCode, errorMessage) @@ -211,7 +213,7 @@ func (h *TenantHandler) CreateMetadataInDocEngine(c *gin.Context) { // Use user.ID as tenant ID (user IS the tenant in user mode) tenantID := user.ID - code, err := h.tenantService.CreateMetadataInDocEngine(tenantID) + code, err := h.tenantService.CreateMetadataStore(tenantID) if err != nil { jsonError(c, code, err.Error()) return @@ -224,16 +226,16 @@ func (h *TenantHandler) CreateMetadataInDocEngine(c *gin.Context) { }) } -// DeleteMetadataInDocEngine handles the delete doc meta table request -// @Summary Delete Metadata In Doc Engine -// @Description Delete the document metadata table for a tenant +// DeleteMetadataStore handles the delete metadata store request +// @Summary Delete Metadata Store +// @Description Delete the metadata store for a tenant // @Tags tenants // @Accept json // @Produce json // @Security ApiKeyAuth // @Success 200 {object} map[string]interface{} -// @Router /v1/tenant/doc_engine_metadata_table [delete] -func (h *TenantHandler) DeleteMetadataInDocEngine(c *gin.Context) { +// @Router /v1/tenant/metadata_store [delete] +func (h *TenantHandler) DeleteMetadataStore(c *gin.Context) { user, errorCode, errorMessage := GetUser(c) if errorCode != common.CodeSuccess { jsonError(c, errorCode, errorMessage) @@ -243,7 +245,7 @@ func (h *TenantHandler) DeleteMetadataInDocEngine(c *gin.Context) { // Use user.ID as tenant ID (user IS the tenant in user mode) tenantID := user.ID - code, err := h.tenantService.DeleteMetadataInDocEngine(tenantID) + code, err := h.tenantService.DeleteMetadataStore(tenantID) if err != nil { jsonError(c, code, err.Error()) return @@ -256,6 +258,201 @@ func (h *TenantHandler) DeleteMetadataInDocEngine(c *gin.Context) { }) } +// CreateChunkTableRequest represents the request for creating a chunk table +type CreateChunkTableRequest struct { + KBID string `json:"kb_id" binding:"required"` + VectorSize int `json:"vector_size" binding:"required"` +} + +// CreateChunkStore handles the create chunk store request +// @Summary Create Chunk Store +// @Description Create the chunk store for a knowledge base +// @Tags tenants +// @Accept json +// @Produce json +// @Security ApiKeyAuth +// @Param request body CreateChunkTableRequest true "create chunk store request" +// @Success 200 {object} map[string]interface{} +// @Router /v1/tenant/chunk_store [post] +func (h *TenantHandler) CreateChunkStore(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + var req CreateChunkTableRequest + if err := c.ShouldBindJSON(&req); err != nil { + jsonError(c, common.CodeDataError, err.Error()) + return + } + + // Check authorization - user must have access to this kb + if !h.kbService.Accessible(req.KBID, user.ID) { + jsonError(c, common.CodeAuthenticationError, "No authorization.") + return + } + + serviceReq := &service.CreateDatasetTableRequest{ + KBID: req.KBID, + VectorSize: req.VectorSize, + } + result, code, err := h.tenantService.CreateChunkStore(serviceReq) + if err != nil { + jsonError(c, code, err.Error()) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": common.CodeSuccess, + "message": "success", + "data": result, + }) +} + +// DeleteChunkTableRequest represents the request for deleting a chunk table +type DeleteChunkTableRequest struct { + KBID string `json:"kb_id" binding:"required"` +} + +// DeleteChunkStore handles the delete chunk store request +// @Summary Delete Chunk Store +// @Description Delete the chunk store for a knowledge base +// @Tags tenants +// @Accept json +// @Produce json +// @Security ApiKeyAuth +// @Param request body DeleteChunkTableRequest true "delete chunk store request" +// @Success 200 {object} map[string]interface{} +// @Router /v1/tenant/chunk_store [delete] +func (h *TenantHandler) DeleteChunkStore(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + var req DeleteChunkTableRequest + if err := c.ShouldBindJSON(&req); err != nil { + jsonError(c, common.CodeDataError, err.Error()) + return + } + + // Check authorization + if !h.kbService.Accessible(req.KBID, user.ID) { + jsonError(c, common.CodeAuthenticationError, "No authorization.") + return + } + + code, err := h.tenantService.DeleteChunkStore(req.KBID) + if err != nil { + jsonError(c, code, err.Error()) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": common.CodeSuccess, + "message": "success", + "data": nil, + }) +} + +// InsertChunksFromFileRequest request for inserting chunks from file +type InsertChunksFromFileRequest struct { + FilePath string `json:"file_path" binding:"required"` +} + +// @Summary Insert chunks into dataset from JSON file +// @Description Internal: Insert chunks into dataset table from a JSON file +// @Tags tenants +// @Accept json +// @Produce json +// @Security ApiKeyAuth +// @Param request body InsertChunksFromFileRequest true "insert chunks request" +// @Success 200 {object} map[string]interface{} +// @Router /v1/tenant/insert_chunks_from_file [post] +func (h *TenantHandler) InsertChunksFromFile(c *gin.Context) { + _, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + var req InsertChunksFromFileRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": err.Error(), + }) + return + } + + if req.FilePath == "" { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": "file_path is required", + }) + return + } + + // Read the JSON file + data, err := os.ReadFile(req.FilePath) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": "failed to read file: " + err.Error(), + }) + return + } + + // Parse JSON - format: {"index_name"/"table_name": ..., "knowledgebase_id": ..., "chunks": [...]} + var debugFormat struct { + IndexName string `json:"index_name"` + TableName string `json:"table_name"` + KnowledgebaseID string `json:"knowledgebase_id"` + Chunks []map[string]interface{} `json:"chunks"` + } + + if err := json.Unmarshal(data, &debugFormat); err != nil || debugFormat.Chunks == nil { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": "invalid JSON format: expected {\"index_name\"/\"table_name\": ..., \"knowledgebase_id\": ..., \"chunks\": [...]}", + }) + return + } + + if len(debugFormat.Chunks) == 0 { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": "no chunks found in file", + }) + return + } + + // Support both index_name (ES) and table_name (Infinity) in JSON + indexName := debugFormat.IndexName + if indexName == "" { + indexName = debugFormat.TableName + } + + // Get the document engine and insert + docEngine := engine.Get() + result, err := docEngine.InsertChunks(c.Request.Context(), debugFormat.Chunks, indexName, debugFormat.KnowledgebaseID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{ + "code": 500, + "message": "failed to insert into dataset: " + err.Error(), + }) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": 0, + "data": result, + "message": "success", + }) +} + // InsertMetadataFromFileRequest request for inserting metadata from file type InsertMetadataFromFileRequest struct { FilePath string `json:"file_path" binding:"required"` diff --git a/internal/router/router.go b/internal/router/router.go index 045f72634..d343d8ee1 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -194,6 +194,7 @@ func (r *Router) Setup(engine *gin.Engine) { // Dataset document chunk datasets.GET("/:dataset_id/documents/:document_id/chunks/:chunk_id", r.chunkHandler.Get) datasets.POST("/:dataset_id/documents/parse", r.documentHandler.ParseDocuments) + datasets.DELETE("/:dataset_id/documents/:document_id/chunks", r.chunkHandler.RemoveChunks) } // Search routes @@ -341,9 +342,6 @@ func (r *Router) Setup(engine *gin.Engine) { kb.GET("/tags", r.knowledgebaseHandler.ListTagsFromKbs) kb.GET("/get_meta", r.knowledgebaseHandler.GetMeta) kb.GET("/basic_info", r.knowledgebaseHandler.GetBasicInfo) - kb.POST("/doc_engine_table", r.knowledgebaseHandler.CreateDatasetInDocEngine) // Internal API only for GO - kb.DELETE("/doc_engine_table", r.knowledgebaseHandler.DeleteDatasetInDocEngine) // Internal API only for GO - kb.POST("/insert_from_file", r.knowledgebaseHandler.InsertDatasetFromFile) // Internal API only for GO // KB ID specific routes kbByID := kb.Group("/:kb_id") @@ -358,8 +356,11 @@ func (r *Router) Setup(engine *gin.Engine) { // Tenant routes (per-tenant resources) tenant := v1.Group("/tenant") { - tenant.POST("/doc_engine_metadata_table", r.tenantHandler.CreateMetadataInDocEngine) // Internal API only for GO - tenant.DELETE("/doc_engine_metadata_table", r.tenantHandler.DeleteMetadataInDocEngine) // Internal API only for GO + tenant.POST("/chunk_store", r.tenantHandler.CreateChunkStore) // Internal API only for GO + tenant.DELETE("/chunk_store", r.tenantHandler.DeleteChunkStore) // Internal API only for GO + tenant.POST("/metadata_store", r.tenantHandler.CreateMetadataStore) // Internal API only for GO + tenant.DELETE("/metadata_store", r.tenantHandler.DeleteMetadataStore) // Internal API only for GO + tenant.POST("/insert_chunks_from_file", r.tenantHandler.InsertChunksFromFile) // Internal API only for GO tenant.POST("/insert_metadata_from_file", r.tenantHandler.InsertMetadataFromFile) // Internal API only for GO } @@ -369,6 +370,7 @@ func (r *Router) Setup(engine *gin.Engine) { doc.POST("/list", r.documentHandler.ListDocuments) doc.POST("/metadata/summary", r.documentHandler.MetadataSummary) doc.POST("/set_meta", r.documentHandler.SetMeta) + doc.POST("/delete_meta", r.documentHandler.DeleteMeta) // Internal API only for GO } // Chunk routes @@ -376,7 +378,6 @@ func (r *Router) Setup(engine *gin.Engine) { { chunk.POST("/list", r.chunkHandler.List) chunk.POST("/update", r.chunkHandler.UpdateChunk) // Internal API only for GO - chunk.POST("/rm", r.chunkHandler.Remove) } // LLM routes diff --git a/internal/service/document.go b/internal/service/document.go index 072b1d946..8950772a8 100644 --- a/internal/service/document.go +++ b/internal/service/document.go @@ -386,6 +386,58 @@ func (s *DocumentService) SetDocumentMetadata(docID string, meta map[string]inte return nil } +// DeleteDocumentMetadata deletes metadata keys for a document in the document engine +func (s *DocumentService) DeleteDocumentMetadata(docID string, keys []string) error { + // Get document to find kb_id + doc, err := s.documentDAO.GetByID(docID) + if err != nil { + return fmt.Errorf("document not found: %w", err) + } + + // Get tenant ID + tenantID, err := s.metadataSvc.GetTenantIDByKBID(doc.KbID) + if err != nil { + return fmt.Errorf("failed to get tenant ID: %w", err) + } + + // Delete metadata using the document engine + err = s.docEngine.DeleteMetadataKeys(nil, docID, doc.KbID, keys, tenantID) + if err != nil { + return fmt.Errorf("failed to delete metadata: %w", err) + } + + return nil +} + +// DeleteDocumentAllMetadata deletes all metadata for a document in the document engine +func (s *DocumentService) DeleteDocumentAllMetadata(docID string) error { + // Get document to find kb_id + doc, err := s.documentDAO.GetByID(docID) + if err != nil { + return fmt.Errorf("document not found: %w", err) + } + + // Get tenant ID + tenantID, err := s.metadataSvc.GetTenantIDByKBID(doc.KbID) + if err != nil { + return fmt.Errorf("failed to get tenant ID: %w", err) + } + + // Build condition to match the document + condition := map[string]interface{}{ + "id": docID, + "kb_id": doc.KbID, + } + + // Delete entire document metadata + _, err = s.docEngine.DeleteMetadata(nil, condition, tenantID) + if err != nil { + return fmt.Errorf("failed to delete document metadata: %w", err) + } + + return nil +} + // GetDocumentMetadataByID get metadata for a specific document func (s *DocumentService) GetDocumentMetadataByID(docID string) (map[string]interface{}, error) { // Get document to find kb_id diff --git a/internal/service/kb.go b/internal/service/kb.go index 5017b9393..8913bf0a2 100644 --- a/internal/service/kb.go +++ b/internal/service/kb.go @@ -78,69 +78,6 @@ type ListKbsResponse struct { Total int64 `json:"total"` } -// CreateDatasetTableRequest represents the request for creating a dataset table -type CreateDatasetTableRequest struct { - KBID string `json:"kb_id" binding:"required"` - VectorSize int `json:"vector_size" binding:"required"` - ParserID string `json:"parser_id,omitempty"` -} - -// CreateDatasetInDocEngineResponse represents the response for creating a dataset table -type CreateDatasetInDocEngineResponse struct { - KBID string `json:"kb_id"` - TableName string `json:"table_name"` - VectorSize int `json:"vector_size"` -} - -// CreateDatasetInDocEngine creates a table in the document engine for a knowledge base -func (s *KnowledgebaseService) CreateDatasetInDocEngine(req *CreateDatasetTableRequest) (*CreateDatasetInDocEngineResponse, common.ErrorCode, error) { - // Get KB to find tenant_id for building table name - kb, err := s.kbDAO.GetByID(req.KBID) - if err != nil { - return nil, common.CodeDataError, fmt.Errorf("knowledge base not found: %s", req.KBID) - } - - // vector_size is required - vecSize := req.VectorSize - if vecSize <= 0 { - return nil, common.CodeDataError, fmt.Errorf("vector_size must be positive") - } - - // Build table name prefix: ragflow_ - tableName := fmt.Sprintf("ragflow_%s", kb.TenantID) - - // Call document engine to create table - // Full table name will be built as "{tableName}_{kb_id}" - err = s.docEngine.CreateChunkStore(context.Background(), tableName, req.KBID, vecSize, req.ParserID) - if err != nil { - return nil, common.CodeServerError, fmt.Errorf("failed to create dataset: %w", err) - } - - return &CreateDatasetInDocEngineResponse{ - KBID: req.KBID, - TableName: tableName, - VectorSize: vecSize, - }, common.CodeSuccess, nil -} - -// DeleteDatasetInDocEngine deletes the table in the document engine for a knowledge base -func (s *KnowledgebaseService) DeleteDatasetInDocEngine(kbID string) (common.ErrorCode, error) { - // Get KB to find tenant_id for building table name - kb, err := s.kbDAO.GetByID(kbID) - if err != nil { - return common.CodeDataError, fmt.Errorf("knowledge base not found: %s", kbID) - } - - // Call document engine to delete table - err = s.docEngine.DropChunkStore(context.Background(), fmt.Sprintf("ragflow_%s", kb.TenantID), kbID) - - if err != nil { - return common.CodeServerError, fmt.Errorf("failed to delete table: %w", err) - } - - return common.CodeSuccess, nil -} - // UpdateKB updates an existing knowledge base // This matches the Python update endpoint in kb_app.py func (s *KnowledgebaseService) UpdateKB(req *UpdateKBRequest, userID string) (map[string]interface{}, common.ErrorCode, error) { diff --git a/internal/service/tenant.go b/internal/service/tenant.go index 08eee5d89..7ff19c5f3 100644 --- a/internal/service/tenant.go +++ b/internal/service/tenant.go @@ -35,6 +35,7 @@ type TenantService struct { modelDAO *dao.TenantModelDAO modelGroupDAO *dao.TenantModelGroupDAO modelGroupMappingDAO *dao.TenantModelGroupMappingDAO + kbDAO *dao.KnowledgebaseDAO docEngine engine.DocEngine } @@ -48,6 +49,7 @@ func NewTenantService() *TenantService { modelDAO: dao.NewTenantModelDAO(), modelGroupDAO: dao.NewTenantModelGroupDAO(), modelGroupMappingDAO: dao.NewTenantModelGroupMappingDAO(), + kbDAO: dao.NewKnowledgebaseDAO(), docEngine: engine.Get(), } } @@ -265,8 +267,8 @@ func (s *TenantService) GetTenantList(userID string) ([]*TenantListItem, error) return result, nil } -// CreateMetadataInDocEngine creates the document metadata table for a tenant -func (s *TenantService) CreateMetadataInDocEngine(tenantID string) (common.ErrorCode, error) { +// CreateMetadataStore creates the metadata store for a tenant +func (s *TenantService) CreateMetadataStore(tenantID string) (common.ErrorCode, error) { // Call document engine to create doc meta table err := s.docEngine.CreateMetadataStore(context.Background(), tenantID) if err != nil { @@ -276,8 +278,8 @@ func (s *TenantService) CreateMetadataInDocEngine(tenantID string) (common.Error return common.CodeSuccess, nil } -// DeleteMetadataInDocEngine deletes the document metadata table for a tenant -func (s *TenantService) DeleteMetadataInDocEngine(tenantID string) (common.ErrorCode, error) { +// DeleteMetadataStore deletes the metadata store for a tenant +func (s *TenantService) DeleteMetadataStore(tenantID string) (common.ErrorCode, error) { // Call document engine to delete doc meta table err := s.docEngine.DropMetadataStore(context.Background(), tenantID) if err != nil { @@ -287,6 +289,77 @@ func (s *TenantService) DeleteMetadataInDocEngine(tenantID string) (common.Error return common.CodeSuccess, nil } +// CreateDatasetTableRequest represents the request for creating a dataset table +type CreateDatasetTableRequest struct { + KBID string `json:"kb_id" binding:"required"` + VectorSize int `json:"vector_size" binding:"required"` + ParserID string `json:"parser_id,omitempty"` +} + +// CreateChunkStoreResponse represents the response for creating a chunk store +type CreateChunkStoreResponse struct { + KBID string `json:"kb_id"` + TableName string `json:"table_name"` + VectorSize int `json:"vector_size"` +} + +// CreateChunkStore creates a chunk store in the document engine for a knowledge base +func (s *TenantService) CreateChunkStore(req *CreateDatasetTableRequest) (*CreateChunkStoreResponse, common.ErrorCode, error) { + if req == nil { + return nil, common.CodeDataError, fmt.Errorf("request is required") + } + // Get KB to find tenant_id for building table name + kb, err := s.kbDAO.GetByID(req.KBID) + if err != nil { + if dao.IsNotFoundErr(err) { + return nil, common.CodeDataError, fmt.Errorf("knowledge base not found: %s", req.KBID) + } + return nil, common.CodeServerError, fmt.Errorf("failed to query knowledge base %s: %w", req.KBID, err) + } + + // vector_size is required + vecSize := req.VectorSize + if vecSize <= 0 { + return nil, common.CodeDataError, fmt.Errorf("vector_size must be positive") + } + + // Build table name prefix: ragflow_ + tableName := fmt.Sprintf("ragflow_%s", kb.TenantID) + + // Call document engine to create table + // Full table name will be built as "{tableName}_{kb_id}" + err = s.docEngine.CreateChunkStore(context.Background(), tableName, req.KBID, vecSize, req.ParserID) + if err != nil { + return nil, common.CodeServerError, fmt.Errorf("failed to create dataset: %w", err) + } + + return &CreateChunkStoreResponse{ + KBID: req.KBID, + TableName: tableName, + VectorSize: vecSize, + }, common.CodeSuccess, nil +} + +// DeleteChunkStore deletes the chunk store in the document engine for a knowledge base +func (s *TenantService) DeleteChunkStore(kbID string) (common.ErrorCode, error) { + // Get KB to find tenant_id for building table name + kb, err := s.kbDAO.GetByID(kbID) + if err != nil { + if dao.IsNotFoundErr(err) { + return common.CodeDataError, fmt.Errorf("knowledge base not found: %s", kbID) + } + return common.CodeServerError, fmt.Errorf("failed to query knowledge base %s: %w", kbID, err) + } + + // Call document engine to delete table + err = s.docEngine.DropChunkStore(context.Background(), fmt.Sprintf("ragflow_%s", kb.TenantID), kbID) + if err != nil { + return common.CodeServerError, fmt.Errorf("failed to delete table: %w", err) + } + + return common.CodeSuccess, nil +} + type ModelItem struct { ModelProvider *string `json:"model_provider"` ModelInstance *string `json:"model_instance"`