From dbef3e361ffabe763dc3712bf4155ff3ac314948 Mon Sep 17 00:00:00 2001 From: qinling0210 <88864212+qinling0210@users.noreply.github.com> Date: Wed, 20 May 2026 20:32:06 +0800 Subject: [PATCH] Update chunk/metadata cli (#15055) ### What problem does this PR solve? Update chunk/metadata cli ### Type of change - [ ] Refactoring --- cmd/server_main.go | 7 +- internal/cli/benchmark.go | 4 +- internal/cli/client.go | 4 + internal/cli/filesystem/dataset.go | 16 ++-- internal/cli/lexer.go | 2 + internal/cli/parser.go | 2 + internal/cli/response.go | 103 +++++++++++++++++++++- internal/cli/types.go | 2 + internal/cli/user_command.go | 96 ++++++++++++++++++++- internal/cli/user_parser.go | 131 +++++++++++++++++++++++++--- internal/development.md | 73 ++++++++++++++++ internal/engine/infinity/chunk.go | 20 ++--- internal/handler/datasets.go | 132 ++++++++++++++++++++++++++++- internal/handler/kb.go | 68 --------------- internal/router/router.go | 50 ++++++----- internal/service/chunk.go | 1 - rag/utils/infinity_conn.py | 38 +++++---- 17 files changed, 602 insertions(+), 147 deletions(-) diff --git a/cmd/server_main.go b/cmd/server_main.go index ab14dacd8..8c5e53b8f 100644 --- a/cmd/server_main.go +++ b/cmd/server_main.go @@ -167,6 +167,8 @@ func startServer(config *server.Config) { userService := service.NewUserService() documentService := service.NewDocumentService() datasetsService := service.NewDatasetService() + knowledgebaseService := service.NewKnowledgebaseService() + metadataService := service.NewMetadataService() chunkService := service.NewChunkService() llmService := service.NewLLMService() tenantService := service.NewTenantService() @@ -187,8 +189,9 @@ func startServer(config *server.Config) { userHandler := handler.NewUserHandler(userService) tenantHandler := handler.NewTenantHandler(tenantService, userService) documentHandler := handler.NewDocumentHandler(documentService, datasetsService) - datasetsHandler := handler.NewDatasetsHandler(datasetsService) + datasetsHandler := handler.NewDatasetsHandler(datasetsService, metadataService) systemHandler := handler.NewSystemHandler(systemService) + knowledgebaseHandler := handler.NewKnowledgebaseHandler(knowledgebaseService, userService, documentService) chunkHandler := handler.NewChunkHandler(chunkService, userService) llmHandler := handler.NewLLMHandler(llmService, userService) chatHandler := handler.NewChatHandler(chatService, userService) @@ -201,7 +204,7 @@ func startServer(config *server.Config) { providerHandler := handler.NewProviderHandler(userService, modelProviderService) // Initialize router - r := router.NewRouter(authHandler, userHandler, tenantHandler, documentHandler, datasetsHandler, systemHandler, chunkHandler, llmHandler, chatHandler, chatSessionHandler, connectorHandler, searchHandler, fileHandler, memoryHandler, skillSearchHandler, providerHandler) + r := router.NewRouter(authHandler, userHandler, tenantHandler, documentHandler, datasetsHandler, systemHandler, knowledgebaseHandler, chunkHandler, llmHandler, chatHandler, chatSessionHandler, connectorHandler, searchHandler, fileHandler, memoryHandler, skillSearchHandler, providerHandler) // Create Gin engine ginEngine := gin.New() diff --git a/internal/cli/benchmark.go b/internal/cli/benchmark.go index 1315ce171..a6c3d6de9 100644 --- a/internal/cli/benchmark.go +++ b/internal/cli/benchmark.go @@ -237,12 +237,12 @@ func (c *RAGFlowClient) executeBenchmarkSilent(cmd *Command, iterations int) []* question, _ := cmd.Params["question"].(string) datasetIDs, _ := cmd.Params["dataset_ids"].([]string) payload := map[string]interface{}{ - "kb_id": datasetIDs, + "dataset_ids": datasetIDs, "question": question, "similarity_threshold": 0.2, "vector_similarity_weight": 0.3, } - resp, err = c.HTTPClient.Request("POST", "/chunk/retrieval_test", "web", nil, payload) + resp, err = c.HTTPClient.Request("POST", "/datasets/search", "web", nil, payload) default: // For other commands, we would need to add specific handling // For now, mark as failed diff --git a/internal/cli/client.go b/internal/cli/client.go index eebfb33d0..53e53bcee 100644 --- a/internal/cli/client.go +++ b/internal/cli/client.go @@ -314,12 +314,16 @@ func (c *RAGFlowClient) ExecuteUserCommand(cmd *Command) (ResponseIf, error) { return c.InsertMetadataFromFile(cmd) case "update_chunk": return c.UpdateChunk(cmd) + case "get_chunk": + return c.GetChunk(cmd) case "set_meta": return c.SetMeta(cmd) case "rm_tags": return c.RmTags(cmd) case "remove_chunks": return c.RemoveChunks(cmd) + case "list_metadata": + return c.ListMetadata(cmd) // ContextEngine commands case "ce_ls": return c.CEList(cmd) diff --git a/internal/cli/filesystem/dataset.go b/internal/cli/filesystem/dataset.go index 06fa6b073..dc201ef00 100644 --- a/internal/cli/filesystem/dataset.go +++ b/internal/cli/filesystem/dataset.go @@ -305,8 +305,8 @@ func (p *DatasetProvider) searchWithRetrieval(ctx stdctx.Context, opts *SearchOp // Build retrieval request payload := map[string]interface{}{ - "kb_id": kbIDs, - "question": opts.Query, + "dataset_ids": kbIDs, + "question": opts.Query, } // Set top_k (default to 10 if not specified) @@ -323,8 +323,8 @@ func (p *DatasetProvider) searchWithRetrieval(ctx stdctx.Context, opts *SearchOp } payload["similarity_threshold"] = threshold - // Call retrieval API (useAPIBase=false because the route is /v1/chunk/retrieval_test, not /api/v1/...) - resp, err := p.httpClient.Request("POST", "/chunk/retrieval_test", "auto", nil, payload) + // Call retrieval API + resp, err := p.httpClient.Request("POST", "/datasets/search", "auto", nil, payload) if err != nil { return nil, fmt.Errorf("retrieval request failed: %w", err) } @@ -589,8 +589,8 @@ func (p *DatasetProvider) searchDocuments(ctx stdctx.Context, datasetName string // Build retrieval request for specific dataset payload := map[string]interface{}{ - "kb_id": []string{kbID}, - "question": opts.Query, + "dataset_ids": []string{kbID}, + "question": opts.Query, } // Set top_k (default to 10 if not specified) @@ -607,8 +607,8 @@ func (p *DatasetProvider) searchDocuments(ctx stdctx.Context, datasetName string } payload["similarity_threshold"] = threshold - // Call retrieval API (useAPIBase=false because the route is /v1/chunk/retrieval_test, not /api/v1/...) - resp, err := p.httpClient.Request("POST", "/chunk/retrieval_test", "auto", nil, payload) + // Call retrieval API + resp, err := p.httpClient.Request("POST", "/datasets/search", "auto", nil, payload) if err != nil { return nil, fmt.Errorf("retrieval request failed: %w", err) } diff --git a/internal/cli/lexer.go b/internal/cli/lexer.go index b3bd1c8b3..48e283a31 100644 --- a/internal/cli/lexer.go +++ b/internal/cli/lexer.go @@ -349,6 +349,8 @@ func (l *Lexer) lookupIdent(ident string) Token { return Token{Type: TokenParser, Value: ident} case "PIPELINE": return Token{Type: TokenPipeline, Value: ident} + case "GET": + return Token{Type: TokenGet, Value: ident} case "SEARCH": return Token{Type: TokenSearch, Value: ident} case "CURRENT": diff --git a/internal/cli/parser.go b/internal/cli/parser.go index 02723f546..a23a9f358 100644 --- a/internal/cli/parser.go +++ b/internal/cli/parser.go @@ -219,6 +219,8 @@ func (p *Parser) parseUserCommand() (*Command, error) { return p.parseUpdateCommand() case TokenRemove: return p.parseRemoveCommand() + case TokenGet: + return p.parseGetCommand() default: return nil, fmt.Errorf("unknown command: %s", p.curToken.Value) diff --git a/internal/cli/response.go b/internal/cli/response.go index 19daffc9e..76337e770 100644 --- a/internal/cli/response.go +++ b/internal/cli/response.go @@ -16,7 +16,10 @@ package cli -import "fmt" +import ( + "fmt" + "strings" +) type ResponseIf interface { Type() string @@ -121,6 +124,104 @@ func (r *ListDocumentsResponse) PrintOut() { } } +type ChunkResponse struct { + Code int `json:"code"` + Data map[string]interface{} `json:"data"` + Message string `json:"message"` + Duration float64 + OutputFormat OutputFormat +} + +func (r *ChunkResponse) Type() string { + return "chunk" +} + +func (r *ChunkResponse) TimeCost() float64 { + return r.Duration +} + +func (r *ChunkResponse) SetOutputFormat(format OutputFormat) { + r.OutputFormat = format +} + +func (r *ChunkResponse) PrintOut() { + if r.Code == 0 { + for k, v := range r.Data { + fmt.Printf("%s: %v\n", k, v) + } + } else { + fmt.Println("ERROR") + fmt.Printf("%d, %s\n", r.Code, r.Message) + } +} + +type MetadataResponse struct { + Code int `json:"code"` + Data map[string]interface{} `json:"data"` + Message string `json:"message"` + Duration float64 + OutputFormat OutputFormat +} + +func (r *MetadataResponse) Type() string { + return "metadata" +} + +func (r *MetadataResponse) TimeCost() float64 { + return r.Duration +} + +func (r *MetadataResponse) SetOutputFormat(format OutputFormat) { + r.OutputFormat = format +} + +func (r *MetadataResponse) PrintOut() { + if r.Code == 0 { + // Data is map[field]map[value][]doc_id - print flattened metadata + if r.Data != nil { + printFlattenedMetadata(r.Data, r.OutputFormat) + } + } else { + fmt.Println("ERROR") + fmt.Printf("%d, %s\n", r.Code, r.Message) + } +} + +func printFlattenedMetadata(data map[string]interface{}, format OutputFormat) { + // Convert flattened metadata to table format + // {field: {value: [doc_ids]}} -> [{field, value, document_ids}, ...] + tableData := make([]map[string]interface{}, 0) + for field, values := range data { + valueMap, ok := values.(map[string]interface{}) + if !ok { + continue + } + for value, docIDs := range valueMap { + var docIDStr string + switch v := docIDs.(type) { + case []string: + docIDStr = strings.Join(v, ", ") + case []interface{}: + docStrs := make([]string, 0, len(v)) + for _, d := range v { + if s, ok := d.(string); ok { + docStrs = append(docStrs, s) + } + } + docIDStr = strings.Join(docStrs, ", ") + default: + docIDStr = fmt.Sprintf("%v", docIDs) + } + tableData = append(tableData, map[string]interface{}{ + "field": field, + "value": value, + "document_ids": docIDStr, + }) + } + } + PrintTableSimpleByFormat(tableData, format) +} + type SimpleResponse struct { Code int `json:"code"` Message string `json:"message"` diff --git a/internal/cli/types.go b/internal/cli/types.go index 3f3ef2742..df52d30d2 100644 --- a/internal/cli/types.go +++ b/internal/cli/types.go @@ -145,6 +145,7 @@ const ( TokenFile TokenMetadata TokenTable + TokenGet TokenUpdate TokenRemove TokenChunk @@ -172,6 +173,7 @@ const ( TokenNumber = TokenInteger // Alias for integer tokens in path parsing (e.g., version numbers like 1.0.0) // Special + _ = iota TokenSemicolon TokenComma TokenSlash diff --git a/internal/cli/user_command.go b/internal/cli/user_command.go index 4960dbbb6..a20d99b93 100644 --- a/internal/cli/user_command.go +++ b/internal/cli/user_command.go @@ -498,6 +498,52 @@ func (c *RAGFlowClient) getDatasetID(datasetName string) (string, error) { return "", fmt.Errorf("dataset '%s' not found", datasetName) } +// ListMetadata lists metadata for datasets +func (c *RAGFlowClient) ListMetadata(cmd *Command) (ResponseIf, error) { + if c.ServerType != "user" { + return nil, fmt.Errorf("this command is only allowed in USER mode") + } + + datasetNames, ok := cmd.Params["dataset_names"].([]string) + if !ok || len(datasetNames) == 0 { + return nil, fmt.Errorf("dataset_names not provided") + } + + // Convert dataset names to IDs + datasetIDs := make([]string, 0, len(datasetNames)) + for _, name := range datasetNames { + id, err := c.getDatasetID(name) + if err != nil { + return nil, err + } + datasetIDs = append(datasetIDs, id) + } + + // Build comma-separated dataset_ids for query param + datasetIDsStr := strings.Join(datasetIDs, ",") + + resp, err := c.HTTPClient.Request("GET", "/datasets/metadata/flattened?dataset_ids="+datasetIDsStr, "web", nil, nil) + if err != nil { + return nil, fmt.Errorf("failed to list metadata: %w", err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("failed to list metadata: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) + } + + var result MetadataResponse + if err = json.Unmarshal(resp.Body, &result); err != nil { + return nil, fmt.Errorf("list metadata failed: invalid JSON (%w)", err) + } + + if result.Code != 0 { + return nil, fmt.Errorf("%s", result.Message) + } + result.Duration = resp.Duration + + return &result, nil +} + // formatEmptyArray converts empty arrays to "[]" string func formatEmptyArray(v interface{}) string { if v == nil { @@ -2971,6 +3017,54 @@ func (c *RAGFlowClient) UpdateChunk(cmd *Command) (ResponseIf, error) { return &result, nil } +// GetChunk retrieves a chunk by ID +func (c *RAGFlowClient) GetChunk(cmd *Command) (ResponseIf, error) { + if c.ServerType != "user" { + return nil, fmt.Errorf("this command is only allowed in USER mode") + } + + chunkID, ok := cmd.Params["chunk_id"].(string) + if !ok { + return nil, fmt.Errorf("chunk_id not provided") + } + + datasetName, ok := cmd.Params["dataset_name"].(string) + if !ok { + return nil, fmt.Errorf("dataset_name not provided") + } + + datasetID, err := c.getDatasetID(datasetName) + if err != nil { + return nil, err + } + + docID, ok := cmd.Params["doc_id"].(string) + if !ok { + return nil, fmt.Errorf("doc_id not provided") + } + + resp, err := c.HTTPClient.Request("GET", fmt.Sprintf("/datasets/%s/documents/%s/chunks/%s", datasetID, docID, chunkID), "web", nil, nil) + if err != nil { + return nil, fmt.Errorf("failed to get chunk: %w", err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("failed to get chunk: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) + } + + var result ChunkResponse + if err = json.Unmarshal(resp.Body, &result); err != nil { + return nil, fmt.Errorf("get chunk failed: invalid JSON (%w)", err) + } + + if result.Code != 0 { + return nil, fmt.Errorf("%s", result.Message) + } + result.Duration = resp.Duration + + return &result, nil +} + // SetMeta sets metadata for a document func (c *RAGFlowClient) SetMeta(cmd *Command) (ResponseIf, error) { if c.ServerType != "user" { @@ -3047,7 +3141,7 @@ func (c *RAGFlowClient) RmTags(cmd *Command) (ResponseIf, error) { "tags": tags, } - resp, err := c.HTTPClient.Request("POST", "/kb/"+kbID+"/rm_tags", "web", nil, payload) + resp, err := c.HTTPClient.Request("DELETE", "/datasets/"+kbID+"/tags", "web", nil, payload) if err != nil { return nil, fmt.Errorf("failed to remove tags: %w", err) } diff --git a/internal/cli/user_parser.go b/internal/cli/user_parser.go index 64357abee..307628ddf 100644 --- a/internal/cli/user_parser.go +++ b/internal/cli/user_parser.go @@ -138,6 +138,8 @@ func (p *Parser) parseListCommand() (*Command, error) { return p.parseListDatasets() case TokenDocuments: return p.parseListDatasetDocuments() + case TokenMetadata: + return p.parseListMetadata() case TokenAgents: return p.parseListAgents() case TokenTokens: @@ -208,6 +210,50 @@ func (p *Parser) parseListDatasetDocuments() (*Command, error) { return cmd, nil } +func (p *Parser) parseListMetadata() (*Command, error) { + p.nextToken() // consume METADATA + + if p.curToken.Type != TokenOf { + return nil, fmt.Errorf("expected OF after METADATA") + } + p.nextToken() + + if p.curToken.Type != TokenDataset { + return nil, fmt.Errorf("expected DATASET after OF") + } + p.nextToken() + + // Parse dataset names (space-separated) + var datasetNames []string + for { + name, err := p.parseQuotedString() + if err != nil { + return nil, fmt.Errorf("expected dataset name: %w", err) + } + datasetNames = append(datasetNames, name) + + p.nextToken() + // Stop at semicolon or non-quoted (dataset name must be quoted) + if p.curToken.Type == TokenSemicolon { + break + } + // If next token is not a quoted string, stop parsing dataset names + if p.curToken.Type != TokenQuotedString { + break + } + } + + cmd := NewCommand("list_metadata") + cmd.Params["dataset_names"] = datasetNames + + // Semicolon is optional + if p.curToken.Type == TokenSemicolon { + p.nextToken() + } + + return cmd, nil +} + func (p *Parser) parseListAgents() (*Command, error) { p.nextToken() // consume AGENTS @@ -3234,6 +3280,8 @@ func (p *Parser) parseUserStatement() (*Command, error) { return p.parseInsertCommand() case TokenSearch: return p.parseSearchCommand() + case TokenGet: + return p.parseGetCommand() case TokenUpdate: return p.parseUpdateCommand() case TokenRemove: @@ -3327,6 +3375,70 @@ func (p *Parser) parseUnsetCommand() (*Command, error) { return NewCommand("unset_token"), nil } +// parseGetCommand parses: GET CHUNK 'chunk_id' +func (p *Parser) parseGetCommand() (*Command, error) { + p.nextToken() // consume GET + + if p.curToken.Type == TokenChunk { + return p.parseGetChunk() + } + + return nil, fmt.Errorf("unknown GET target: %s", p.curToken.Value) +} + +// parseGetChunk parses: GET CHUNK 'chunk_id' OF DATASET 'dataset_name' DOCUMENT 'doc_id' +func (p *Parser) parseGetChunk() (*Command, error) { + p.nextToken() // consume CHUNK + + // Parse chunk_id + chunkID, err := p.parseQuotedString() + if err != nil { + return nil, fmt.Errorf("expected chunk_id: %w", err) + } + + cmd := NewCommand("get_chunk") + cmd.Params["chunk_id"] = chunkID + + p.nextToken() + if p.curToken.Type != TokenOf { + return nil, fmt.Errorf("expected OF after chunk_id") + } + p.nextToken() + + if p.curToken.Type != TokenDataset { + return nil, fmt.Errorf("expected DATASET after OF") + } + p.nextToken() + + // Parse dataset_name + datasetName, err := p.parseQuotedString() + if err != nil { + return nil, fmt.Errorf("expected dataset_name: %w", err) + } + cmd.Params["dataset_name"] = datasetName + + p.nextToken() + if p.curToken.Type != TokenDocument { + return nil, fmt.Errorf("expected DOCUMENT after dataset_name") + } + p.nextToken() + + // Parse doc_id + docID, err := p.parseQuotedString() + if err != nil { + return nil, fmt.Errorf("expected doc_id: %w", err) + } + cmd.Params["doc_id"] = docID + + p.nextToken() + // Semicolon is optional + if p.curToken.Type == TokenSemicolon { + p.nextToken() + } + + return cmd, nil +} + // Internal // parseUpdateCommand parses: UPDATE CHUNK 'chunk_id' OF DATASET 'dataset_name' SET '{"content": "..."}' func (p *Parser) parseUpdateCommand() (*Command, error) { @@ -3512,7 +3624,7 @@ func (p *Parser) parseRemoveChunk() (*Command, error) { } else { // curToken is TokenChunks, consume it first p.nextToken() - // Multiple chunks: REMOVE CHUNKS 'id1', 'id2' FROM DOCUMENT 'doc_id' + // Multiple chunks: REMOVE CHUNKS 'id1' 'id2' FROM DOCUMENT 'doc_id' (space-separated) // Parse first chunk ID chunkID, err := p.parseQuotedString() if err != nil { @@ -3520,19 +3632,18 @@ func (p *Parser) parseRemoveChunk() (*Command, error) { } chunkIDs := []string{chunkID} - // Parse additional chunk IDs separated by commas + // Parse additional chunk IDs separated by spaces (each quoted) for { p.nextToken() - if p.curToken.Type == TokenComma { - p.nextToken() - chunkID, err := p.parseQuotedString() - if err != nil { - return nil, fmt.Errorf("expected chunk_id after comma: %w", err) - } - chunkIDs = append(chunkIDs, chunkID) - } else { + // Stop if we hit FROM or non-quoted token + if p.curToken.Type == TokenFrom || p.curToken.Type != TokenQuotedString { break } + chunkID, err := p.parseQuotedString() + if err != nil { + return nil, fmt.Errorf("expected chunk_id: %w", err) + } + chunkIDs = append(chunkIDs, chunkID) } cmd.Params["chunk_ids"] = chunkIDs } diff --git a/internal/development.md b/internal/development.md index ae2758cb5..2483e6e1e 100644 --- a/internal/development.md +++ b/internal/development.md @@ -382,4 +382,77 @@ RAGFlow(user)> ocr with 'paddleocr-vl-0.9b@test@baidu' file './internal/text.jpg +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Parallel to these organizational innovations there were significant complementary technical innovations (e.g., improved methods of manufacturing cast-iron pipe and of coating interiors for pressure maintenance, and newer paving and construction material... | +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +``` + +### 6.26 Chunk Management Commands + +- Create a dataset table with vector size +``` +RAGFlow(user)> CREATE DATASET TABLE 'test' VECTOR SIZE 384 +``` + +- Insert data from JSON files +``` +RAGFlow(user)> INSERT DATASET FROM FILE 'insert_kb.json' +``` + +- Update a chunk's content +``` +RAGFlow(user)> UPDATE CHUNK 'deb165dc6a732a64' OF DATASET 'test' SET '{"content": "Updated chunk content here", "important_keywords": ["keyword1", "keyword2"], "questions": ["What is this about?", "Why is it important?"], "available": true, "tag_kwd": ["tag5", "tag2"]}' +``` + +- Remove tags from a dataset +``` +RAGFlow(user)> REMOVE TAGS 'tag1', 'tag2' FROM DATASET 'test' +``` + +- Remove specific chunks from a document +``` +RAGFlow(user)> REMOVE CHUNKS '29cc4f6d7a5c6e7c' '0360e3d8519eab12' FROM DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' +``` + +- Remove all chunks from a document +``` +RAGFlow(user)> REMOVE ALL CHUNKS FROM DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' +``` + +- Drop dataset table +``` +RAGFlow(user)> DROP DATASET TABLE 'test' +``` + +- Search chunks +``` +RAGFlow(user)> SEARCH '曹操' ON DATASETS 'test' +``` + +- Get chunks +``` +RAGFlow(user)> GET CHUNK '29cc4f6d7a5c6e7c' OF DATASET 'test' DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' +``` + +### 6.27 Metadata Management Commands + +- Create metadata table +``` +RAGFlow(user)> CREATE METADATA TABLE +``` + +- Insert data from JSON files +``` +RAGFlow(user)> INSERT METADATA FROM FILE 'insert_metadata.json' +``` +- Set metadata for a document +``` +RAGFlow(user)> SET METADATA OF DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' TO '{"author": ["John", "Tom"], "category": "tech"}'; +``` + +- Drop metadata table +``` +RAGFlow(user)> DROP METADATA TABLE +``` + +- List metadata +``` +RAGFlow(user)> LIST METADATA OF DATASET 'test' 'test2' ``` \ No newline at end of file diff --git a/internal/engine/infinity/chunk.go b/internal/engine/infinity/chunk.go index 2532fef5c..0a914dcbe 100644 --- a/internal/engine/infinity/chunk.go +++ b/internal/engine/infinity/chunk.go @@ -1094,19 +1094,15 @@ func (e *infinityEngine) GetChunk(ctx context.Context, tableName, chunkID string return nil, fmt.Errorf("Infinity client not initialized") } + common.Info("Infinity get chunk start", + zap.String("chunkID", chunkID), + zap.String("tableName", tableName), + zap.Strings("datasetIDs", datasetIDs)) + // Build list of table names to search - var tableNames []string - if strings.HasPrefix(tableName, "ragflow_doc_meta_") { - tableNames = []string{tableName} - } else { - // Search in tables like _ for each datasetID - if len(datasetIDs) > 0 { - for _, datasetID := range datasetIDs { - tableNames = append(tableNames, fmt.Sprintf("%s_%s", tableName, datasetID)) - } - } - // Also try the base tableName - tableNames = append(tableNames, tableName) + tableNames := make([]string, 0, len(datasetIDs)) + for _, datasetID := range datasetIDs { + tableNames = append(tableNames, fmt.Sprintf("%s_%s", tableName, datasetID)) } // Try each table and collect results from all tables diff --git a/internal/handler/datasets.go b/internal/handler/datasets.go index 250e0ea20..4efc8df12 100644 --- a/internal/handler/datasets.go +++ b/internal/handler/datasets.go @@ -34,7 +34,8 @@ import ( // DatasetsHandler handles the RESTful dataset endpoints. type DatasetsHandler struct { - datasetsService *service.DatasetService + datasetsService *service.DatasetService + metadataService *service.MetadataService } type listDatasetsExt struct { @@ -44,8 +45,11 @@ type listDatasetsExt struct { } // NewDatasetsHandler creates a new datasets handler. -func NewDatasetsHandler(datasetsService *service.DatasetService) *DatasetsHandler { - return &DatasetsHandler{datasetsService: datasetsService} +func NewDatasetsHandler(datasetsService *service.DatasetService, metadataService *service.MetadataService) *DatasetsHandler { + return &DatasetsHandler{ + datasetsService: datasetsService, + metadataService: metadataService, + } } // ListDatasets handles GET /api/v1/datasets. @@ -345,6 +349,128 @@ func (h *DatasetsHandler) DeleteKnowledgeGraph(c *gin.Context) { jsonResponse(c, common.CodeSuccess, true, "success") } +// RemoveTags handles DELETE /api/v1/datasets/:dataset_id/tags. +// @Summary Remove Tags +// @Description Remove tags from a dataset +// @Tags datasets +// @Accept json +// @Produce json +// @Security ApiKeyAuth +// @Param dataset_id path string true "Dataset ID" +// @Param request body object{tags []string} true "tags to remove" +// @Success 200 {object} map[string]interface{} +// @Router /api/v1/datasets/{dataset_id}/tags [delete] +func (h *DatasetsHandler) RemoveTags(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + datasetID := strings.TrimSpace(c.Param("dataset_id")) + if datasetID == "" { + jsonError(c, common.CodeDataError, "dataset_id is required") + return + } + + dataset, code, err := h.datasetsService.GetDataset(datasetID, user.ID) + if err != nil { + jsonError(c, code, err.Error()) + return + } + + tenantID, _ := dataset["tenant_id"].(string) + if tenantID == "" { + jsonError(c, common.CodeDataError, "tenant_id is required") + return + } + + var req struct { + Tags []string `json:"tags" binding:"required"` + } + if err := c.ShouldBindJSON(&req); err != nil { + jsonError(c, common.CodeDataError, err.Error()) + return + } + + indexName := fmt.Sprintf("ragflow_%s", tenantID) + docEngine := engine.Get() + if docEngine == nil { + jsonError(c, common.CodeServerError, "Document engine is not initialized") + return + } + + for _, tag := range req.Tags { + condition := map[string]interface{}{ + "tag_kwd": tag, + "kb_id": datasetID, + } + newValue := map[string]interface{}{ + "remove": map[string]interface{}{ + "tag_kwd": tag, + }, + } + if err := docEngine.UpdateChunks(c.Request.Context(), condition, newValue, indexName, datasetID); err != nil { + jsonError(c, common.CodeServerError, "Failed to remove tag: "+err.Error()) + return + } + } + + jsonResponse(c, common.CodeSuccess, true, "success") +} + +// ListMetadataFlattened handles GET /api/v1/datasets/metadata/flattened. +// @Summary List flattened metadata for datasets +// @Description Get flattened metadata for multiple datasets +// @Tags datasets +// @Produce json +// @Security ApiKeyAuth +// @Param dataset_ids query string true "Comma-separated dataset IDs" +// @Success 200 {object} map[string]interface{} +// @Router /api/v1/datasets/metadata/flattened [get] +func (h *DatasetsHandler) ListMetadataFlattened(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + datasetIDsStr := c.Query("dataset_ids") + if datasetIDsStr == "" { + jsonError(c, common.CodeDataError, "dataset_ids is required") + return + } + + rawIDs := strings.Split(datasetIDsStr, ",") + datasetIDs := make([]string, 0, len(rawIDs)) + for _, id := range rawIDs { + id = strings.TrimSpace(id) + if id != "" { + datasetIDs = append(datasetIDs, id) + } + } + if len(datasetIDs) == 0 { + jsonError(c, common.CodeDataError, "dataset_ids is required") + return + } + + // Check access for each dataset + for _, datasetID := range datasetIDs { + if !h.datasetsService.Accessible(datasetID, user.ID) { + jsonError(c, common.CodeAuthenticationError, "No authorization for dataset: "+datasetID) + return + } + } + + flattenedMeta, err := h.metadataService.GetFlattedMetaByKBs(datasetIDs) + if err != nil { + jsonError(c, common.CodeServerError, "Failed to get metadata: "+err.Error()) + return + } + + jsonResponse(c, common.CodeSuccess, flattenedMeta, "success") +} + func firstStringValue(value interface{}) string { switch v := value.(type) { case string: diff --git a/internal/handler/kb.go b/internal/handler/kb.go index 3debfc969..a9f42a8a1 100644 --- a/internal/handler/kb.go +++ b/internal/handler/kb.go @@ -249,74 +249,6 @@ func (h *KnowledgebaseHandler) ListTagsFromKbs(c *gin.Context) { jsonResponse(c, common.CodeSuccess, []string{}, "success") } -// RemoveTags handles the remove tags request -// @Summary Remove Tags -// @Description Remove tags from a knowledge base -// @Tags knowledgebase -// @Accept json -// @Produce json -// @Security ApiKeyAuth -// @Param kb_id path string true "Knowledge Base ID" -// @Param request body object{tags []string} true "tags to remove" -// @Success 200 {object} map[string]interface{} -// @Router /v1/kb/{kb_id}/rm_tags [post] -func (h *KnowledgebaseHandler) RemoveTags(c *gin.Context) { - user, errorCode, errorMessage := GetUser(c) - if errorCode != common.CodeSuccess { - jsonError(c, errorCode, errorMessage) - return - } - - kbID := c.Param("kb_id") - if kbID == "" { - jsonError(c, common.CodeDataError, "kb_id is required") - return - } - - if !h.kbService.Accessible(kbID, user.ID) { - jsonError(c, common.CodeAuthenticationError, "No authorization.") - return - } - - var req struct { - Tags []string `json:"tags" binding:"required"` - } - if err := c.ShouldBindJSON(&req); err != nil { - jsonError(c, common.CodeDataError, err.Error()) - return - } - - // Get KB to find tenant_id and build index name - kb, err := h.kbService.GetByID(kbID) - if err != nil { - jsonError(c, common.CodeDataError, "knowledge base not found") - return - } - - // Build index name prefix: ragflow_ - indexName := "ragflow_" + kb.TenantID - - // For each tag, call UpdateChunk to remove it from documents - for _, tag := range req.Tags { - condition := map[string]interface{}{ - "tag_kwd": tag, - "kb_id": kbID, - } - newValue := map[string]interface{}{ - "remove": map[string]interface{}{ - "tag_kwd": tag, - }, - } - err := h.kbService.RemoveTag(condition, newValue, indexName, kbID) - if err != nil { - jsonError(c, common.CodeServerError, "Failed to remove tag: "+err.Error()) - return - } - } - - jsonResponse(c, common.CodeSuccess, true, "success") -} - // RenameTag handles the rename tag request // @Summary Rename Tag // @Description Rename a tag in a knowledge base diff --git a/internal/router/router.go b/internal/router/router.go index 5b8a840e7..143877bab 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -50,6 +50,7 @@ func NewRouter( documentHandler *handler.DocumentHandler, datasetsHandler *handler.DatasetsHandler, systemHandler *handler.SystemHandler, + knowledgebaseHandler *handler.KnowledgebaseHandler, chunkHandler *handler.ChunkHandler, llmHandler *handler.LLMHandler, chatHandler *handler.ChatHandler, @@ -62,22 +63,23 @@ func NewRouter( providerHandler *handler.ProviderHandler, ) *Router { return &Router{ - authHandler: authHandler, - userHandler: userHandler, - tenantHandler: tenantHandler, - documentHandler: documentHandler, - datasetsHandler: datasetsHandler, - systemHandler: systemHandler, - chunkHandler: chunkHandler, - llmHandler: llmHandler, - chatHandler: chatHandler, - chatSessionHandler: chatSessionHandler, - connectorHandler: connectorHandler, - searchHandler: searchHandler, - fileHandler: fileHandler, - memoryHandler: memoryHandler, - skillSearchHandler: skillSearchHandler, - providerHandler: providerHandler, + authHandler: authHandler, + userHandler: userHandler, + tenantHandler: tenantHandler, + documentHandler: documentHandler, + datasetsHandler: datasetsHandler, + systemHandler: systemHandler, + knowledgebaseHandler: knowledgebaseHandler, + chunkHandler: chunkHandler, + llmHandler: llmHandler, + chatHandler: chatHandler, + chatSessionHandler: chatSessionHandler, + connectorHandler: connectorHandler, + searchHandler: searchHandler, + fileHandler: fileHandler, + memoryHandler: memoryHandler, + skillSearchHandler: skillSearchHandler, + providerHandler: providerHandler, } } @@ -180,13 +182,18 @@ func (r *Router) Setup(engine *gin.Engine) { datasets.GET("", r.datasetsHandler.ListDatasets) datasets.GET("/:dataset_id", r.datasetsHandler.GetDataset) datasets.GET("/:dataset_id/graph", r.datasetsHandler.GetKnowledgeGraph) + datasets.DELETE("/:dataset_id/tags", r.datasetsHandler.RemoveTags) datasets.DELETE("/:dataset_id/graph", r.datasetsHandler.DeleteKnowledgeGraph) datasets.POST("", r.datasetsHandler.CreateDataset) datasets.DELETE("", r.datasetsHandler.DeleteDatasets) datasets.POST("/search", r.chunkHandler.RetrievalTest) + datasets.GET("/metadata/flattened", r.datasetsHandler.ListMetadataFlattened) // Dataset documents datasets.GET("/:dataset_id/documents", r.documentHandler.ListDocuments) + + // Dataset document chunk + datasets.GET("/:dataset_id/documents/:document_id/chunks/:chunk_id", r.chunkHandler.Get) } // Search routes @@ -326,7 +333,7 @@ func (r *Router) Setup(engine *gin.Engine) { } // Knowledge base routes - kb := authorized.Group("/v1/kb") + kb := v1.Group("/kb") { kb.POST("/update", r.knowledgebaseHandler.UpdateKB) kb.POST("/update_metadata_setting", r.knowledgebaseHandler.UpdateMetadataSetting) @@ -342,7 +349,6 @@ func (r *Router) Setup(engine *gin.Engine) { kbByID := kb.Group("/:kb_id") { kbByID.GET("/tags", r.knowledgebaseHandler.ListTags) - kbByID.POST("/rm_tags", r.knowledgebaseHandler.RemoveTags) kbByID.POST("/rename_tag", r.knowledgebaseHandler.RenameTag) kbByID.GET("/knowledge_graph", r.knowledgebaseHandler.KnowledgeGraph) kbByID.DELETE("/knowledge_graph", r.knowledgebaseHandler.DeleteKnowledgeGraph) @@ -350,7 +356,7 @@ func (r *Router) Setup(engine *gin.Engine) { } // Tenant routes (per-tenant resources) - tenant := authorized.Group("/v1/tenant") + tenant := v1.Group("/tenant") { tenant.POST("/doc_engine_metadata_table", r.tenantHandler.CreateMetadataInDocEngine) // Internal API only for GO tenant.DELETE("/doc_engine_metadata_table", r.tenantHandler.DeleteMetadataInDocEngine) // Internal API only for GO @@ -358,7 +364,7 @@ func (r *Router) Setup(engine *gin.Engine) { } // Document routes - doc := authorized.Group("/v1/document") + doc := v1.Group("/document") { doc.POST("/list", r.documentHandler.ListDocuments) doc.POST("/metadata/summary", r.documentHandler.MetadataSummary) @@ -366,10 +372,8 @@ func (r *Router) Setup(engine *gin.Engine) { } // Chunk routes - chunk := authorized.Group("/v1/chunk") + chunk := v1.Group("/chunk") { - chunk.POST("/retrieval_test", r.chunkHandler.RetrievalTest) - chunk.GET("/get", r.chunkHandler.Get) chunk.POST("/list", r.chunkHandler.List) chunk.POST("/update", r.chunkHandler.UpdateChunk) // Internal API only for GO chunk.POST("/rm", r.chunkHandler.Remove) diff --git a/internal/service/chunk.go b/internal/service/chunk.go index 5cc3dcf97..6f7d08f54 100644 --- a/internal/service/chunk.go +++ b/internal/service/chunk.go @@ -89,7 +89,6 @@ type RetrievalTestResponse struct { } // RetrievalTest performs retrieval test for a given question against specified knowledge bases. -// Corresponds to Python's api/apps/chunk_app.py:retrieval_test() // // Flow: // 1. Validate kbs permissions and embedding model diff --git a/rag/utils/infinity_conn.py b/rag/utils/infinity_conn.py index 7ffd9f13d..806fb1ba3 100644 --- a/rag/utils/infinity_conn.py +++ b/rag/utils/infinity_conn.py @@ -294,10 +294,13 @@ class InfinityConnection(InfinityConnectionBase): df_list = list() assert isinstance(knowledgebase_ids, list) table_list = list() - if index_name.startswith("ragflow_doc_meta_"): - table_names_to_search = [index_name] - else: - table_names_to_search = [f"{index_name}_{kb_id}" for kb_id in knowledgebase_ids] + if not knowledgebase_ids: + self.logger.warning("INFINITY get called with empty knowledgebase_ids for index %s", index_name) + return None + table_names_to_search = [f"{index_name}_{kb_id}" for kb_id in knowledgebase_ids if kb_id] + if not table_names_to_search: + self.logger.warning("INFINITY get has only blank knowledgebase_ids for index %s", index_name) + return None for table_name in table_names_to_search: table_list.append(table_name) try: @@ -455,7 +458,8 @@ class InfinityConnection(InfinityConnectionBase): d[k] = v if v else "{}" else: d[k] = v - for k in ["docnm_kwd", "title_tks", "title_sm_tks", "important_kwd", "important_tks", "content_with_weight", + for k in ["docnm_kwd", "title_tks", "title_sm_tks", "important_kwd", "important_tks", + "content_with_weight", "content_ltks", "content_sm_ltks", "authors_tks", "authors_sm_tks", "question_kwd", "question_tks"]: if k in d: @@ -575,7 +579,8 @@ class InfinityConnection(InfinityConnectionBase): else: new_value[k] = v for k in ["docnm_kwd", "title_tks", "title_sm_tks", "important_kwd", "important_tks", "content_with_weight", - "content_ltks", "content_sm_ltks", "authors_tks", "authors_sm_tks", "question_kwd", "question_tks"]: + "content_ltks", "content_sm_ltks", "authors_tks", "authors_sm_tks", "question_kwd", + "question_tks"]: if k in new_value: del new_value[k] @@ -583,7 +588,8 @@ class InfinityConnection(InfinityConnectionBase): if removeValue: col_to_remove = list(removeValue.keys()) row_to_opt = table_instance.output(col_to_remove + ["id"]).filter(filter).to_df() - self.logger.debug(f"INFINITY search table {str(table_name)}, filter {filter}, result: {str(row_to_opt[0])}") + self.logger.debug( + f"INFINITY search table {str(table_name)}, filter {filter}, result: {str(row_to_opt[0])}") row_to_opt = self.get_fields(row_to_opt, col_to_remove) for id, old_v in row_to_opt.items(): for k, remove_v in removeValue.items(): @@ -608,15 +614,15 @@ class InfinityConnection(InfinityConnectionBase): return True def adjust_chunk_pagerank_fea( - self, - chunk_id: str, - index_name: str, - knowledgebase_id: str, - delta: int, - min_weight: int, - max_weight: int, - row_id: int | None = None, - max_retries: int = 2, + self, + chunk_id: str, + index_name: str, + knowledgebase_id: str, + delta: int, + min_weight: int, + max_weight: int, + row_id: int | None = None, + max_retries: int = 2, ) -> bool: """Adjust pagerank_fea on one chunk row in Infinity.