Implement Elasticsearch functions in GO (#15160)

### What problem does this PR solve?

Implement Elasticsearch functions in GO (except for Search)

### Type of change

- [x] Refactoring
This commit is contained in:
qinling0210
2026-05-25 19:15:07 +08:00
committed by GitHub
parent 7d200d5bd7
commit af85aa9c7b
21 changed files with 2418 additions and 758 deletions

View File

@ -203,7 +203,7 @@ func startServer(config *server.Config) {
// Initialize handler layer
authHandler := handler.NewAuthHandler()
userHandler := handler.NewUserHandler(userService)
tenantHandler := handler.NewTenantHandler(tenantService, userService)
tenantHandler := handler.NewTenantHandler(tenantService, userService, knowledgebaseService)
documentHandler := handler.NewDocumentHandler(documentService, datasetsService)
datasetsHandler := handler.NewDatasetsHandler(datasetsService, metadataService)
systemHandler := handler.NewSystemHandler(systemService)

View File

@ -309,17 +309,16 @@ func (c *RAGFlowClient) ExecuteUserCommand(cmd *Command) (ResponseIf, error) {
return c.ListTasksUserCommand(cmd)
case "show_task_user_command":
return c.ShowTaskUserCommand(cmd)
// Dataset, metadata commands
case "create_dataset_table":
return c.CreateDatasetInDocEngine(cmd)
case "drop_dataset_table":
return c.DropDatasetInDocEngine(cmd)
case "create_metadata_table":
return c.CreateMetadataInDocEngine(cmd)
case "drop_metadata_table":
return c.DropMetadataInDocEngine(cmd)
case "insert_dataset_from_file":
return c.InsertDatasetFromFile(cmd)
case "create_chunk_store":
return c.CreateChunkStore(cmd)
case "drop_chunk_store":
return c.DropChunkStore(cmd)
case "create_metadata_store":
return c.CreateMetadataStore(cmd)
case "drop_metadata_store":
return c.DropMetadataStore(cmd)
case "insert_chunks_from_file":
return c.InsertChunksFromFile(cmd)
case "insert_metadata_from_file":
return c.InsertMetadataFromFile(cmd)
case "update_chunk":
@ -328,6 +327,8 @@ func (c *RAGFlowClient) ExecuteUserCommand(cmd *Command) (ResponseIf, error) {
return c.GetChunk(cmd)
case "set_meta":
return c.SetMeta(cmd)
case "delete_meta":
return c.DeleteMeta(cmd)
case "rm_tags":
return c.RmTags(cmd)
case "remove_chunks":

View File

@ -222,18 +222,6 @@ func (l *Lexer) lookupIdent(ident string) Token {
case "PASSWORD":
return Token{Type: TokenPassword, Value: ident}
case "DATASET":
// Check if followed by TABLE for compound token
if strings.ToUpper(l.peekToken()) == "TABLE" {
// Skip whitespace to TABLE
for l.ch == ' ' || l.ch == '\t' || l.ch == '\n' || l.ch == '\r' {
l.readChar()
}
// Skip past TABLE
for isLetter(l.ch) || isDigit(l.ch) || l.ch == '_' || l.ch == '-' || l.ch == '.' {
l.readChar()
}
return Token{Type: TokenDatasetTable, Value: "DATASET TABLE"}
}
return Token{Type: TokenDataset, Value: ident}
case "DATASETS":
return Token{Type: TokenDatasets, Value: ident}
@ -327,6 +315,8 @@ func (l *Lexer) lookupIdent(ident string) Token {
return Token{Type: TokenHigh, Value: ident}
case "MAX":
return Token{Type: TokenMax, Value: ident}
case "STORE":
return Token{Type: TokenStore, Value: ident}
case "STREAM":
return Token{Type: TokenStream, Value: ident}
case "LS":
@ -430,6 +420,18 @@ func (l *Lexer) lookupIdent(ident string) Token {
case "REMOVE":
return Token{Type: TokenRemove, Value: ident}
case "CHUNK":
// Check if followed by STORE for compound token
if strings.ToUpper(l.peekToken()) == "STORE" {
// Skip whitespace to STORE
for l.ch == ' ' || l.ch == '\t' || l.ch == '\n' || l.ch == '\r' {
l.readChar()
}
// Skip past STORE
for isLetter(l.ch) || isDigit(l.ch) || l.ch == '_' || l.ch == '-' || l.ch == '.' {
l.readChar()
}
return Token{Type: TokenChunkStore, Value: "CHUNK STORE"}
}
return Token{Type: TokenChunk, Value: ident}
case "CHUNKS":
return Token{Type: TokenChunks, Value: ident}
@ -465,6 +467,8 @@ func (l *Lexer) lookupIdent(ident string) Token {
return Token{Type: TokenDebug, Value: ident}
case "INFO":
return Token{Type: TokenInfo, Value: ident}
case "IN":
return Token{Type: TokenIn, Value: ident}
case "WARN":
return Token{Type: TokenWarn, Value: ident}
case "ERROR":

View File

@ -47,7 +47,7 @@ const (
TokenPassword
TokenDataset
TokenDatasets
TokenDatasetTable
TokenChunkStore
TokenOf
TokenAgents
TokenRole
@ -91,6 +91,7 @@ const (
TokenParse
TokenImport
TokenInto
TokenIn
TokenWith
TokenParser
TokenPipeline
@ -122,6 +123,7 @@ const (
TokenIndex
TokenVector
TokenSize
TokenStore
TokenName // For ALTER PROVIDER <name> NAME <new_name>
TokenBalance
TokenInstance

View File

@ -875,8 +875,8 @@ func (c *RAGFlowClient) UnsetToken(cmd *Command) (ResponseIf, error) {
return &result, nil
}
// CreateDataset creates a table for a dataset
func (c *RAGFlowClient) CreateDataset(cmd *Command) (ResponseIf, error) {
// CreateChunkStore creates a chunk store in doc engine
func (c *RAGFlowClient) CreateChunkStore(cmd *Command) (ResponseIf, error) {
if c.ServerType != "user" {
return nil, fmt.Errorf("this command is only allowed in USER mode")
}
@ -902,13 +902,13 @@ func (c *RAGFlowClient) CreateDataset(cmd *Command) (ResponseIf, error) {
"vector_size": vectorSize,
}
resp, err := c.HTTPClient.Request("POST", "/kb/doc_engine_table", "web", nil, payload)
resp, err := c.HTTPClient.Request("POST", "/tenant/chunk_store", "web", nil, payload)
if err != nil {
return nil, fmt.Errorf("failed to create table: %w", err)
return nil, fmt.Errorf("failed to create chunk store: %w", err)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("failed to create table: HTTP %d, body: %s", resp.StatusCode, string(resp.Body))
return nil, fmt.Errorf("failed to create chunk store: HTTP %d, body: %s", resp.StatusCode, string(resp.Body))
}
resJSON, err := resp.JSON()
@ -924,73 +924,16 @@ func (c *RAGFlowClient) CreateDataset(cmd *Command) (ResponseIf, error) {
var result SimpleResponse
result.Code = int(code)
if result.Code == 0 {
result.Message = fmt.Sprintf("Success to create table for dataset: %s", datasetName)
result.Message = fmt.Sprintf("Success to create chunk store for dataset: %s", datasetName)
} else {
result.Message = fmt.Sprintf("Failed to create table: %v", resJSON)
result.Message = fmt.Sprintf("Failed to create chunk store: %v", resJSON)
}
result.Duration = 0
return &result, nil
}
// CreateDatasetInDocEngine creates a table for a dataset in doc engine
func (c *RAGFlowClient) CreateDatasetInDocEngine(cmd *Command) (ResponseIf, error) {
if c.ServerType != "user" {
return nil, fmt.Errorf("this command is only allowed in USER mode")
}
datasetName, ok := cmd.Params["dataset_name"].(string)
if !ok {
return nil, fmt.Errorf("dataset_name not provided")
}
vectorSize, ok := cmd.Params["vector_size"].(int)
if !ok {
return nil, fmt.Errorf("vector_size not provided")
}
// Get dataset ID by name
datasetID, err := c.getDatasetID(datasetName)
if err != nil {
return nil, err
}
payload := map[string]interface{}{
"kb_id": datasetID,
"vector_size": vectorSize,
}
resp, err := c.HTTPClient.Request("POST", "/kb/doc_engine_table", "web", nil, payload)
if err != nil {
return nil, fmt.Errorf("failed to create table: %w", err)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("failed to create table: HTTP %d, body: %s", resp.StatusCode, string(resp.Body))
}
resJSON, err := resp.JSON()
if err != nil {
return nil, fmt.Errorf("invalid JSON response: %w", err)
}
code, ok := resJSON["code"].(float64)
if !ok {
return nil, fmt.Errorf("invalid response format: code is not a number")
}
var result SimpleResponse
result.Code = int(code)
if result.Code == 0 {
result.Message = fmt.Sprintf("Success to create table for dataset: %s", datasetName)
} else {
result.Message = fmt.Sprintf("Failed to create table: %v", resJSON)
}
result.Duration = 0
return &result, nil
}
// DropDatasetInDocEngine drops a table for a dataset in doc engine
func (c *RAGFlowClient) DropDatasetInDocEngine(cmd *Command) (ResponseIf, error) {
// DropChunkStore drops a chunk store in doc engine
func (c *RAGFlowClient) DropChunkStore(cmd *Command) (ResponseIf, error) {
if c.ServerType != "user" {
return nil, fmt.Errorf("this command is only allowed in USER mode")
}
@ -1010,7 +953,7 @@ func (c *RAGFlowClient) DropDatasetInDocEngine(cmd *Command) (ResponseIf, error)
"kb_id": datasetID,
}
resp, err := c.HTTPClient.Request("DELETE", "/kb/doc_engine_table", "web", nil, payload)
resp, err := c.HTTPClient.Request("DELETE", "/tenant/chunk_store", "web", nil, payload)
if err != nil {
return nil, fmt.Errorf("failed to drop dataset: %w", err)
}
@ -1032,27 +975,27 @@ func (c *RAGFlowClient) DropDatasetInDocEngine(cmd *Command) (ResponseIf, error)
var result SimpleResponse
result.Code = int(code)
if result.Code == 0 {
result.Message = fmt.Sprintf("Success to drop table for dataset: %s", datasetName)
result.Message = fmt.Sprintf("Success to drop chunk store for dataset: %s", datasetName)
} else {
result.Message = fmt.Sprintf("Failed to drop table for dataset: %s: %v", datasetName, resJSON)
result.Message = fmt.Sprintf("Failed to drop chunk store for dataset: %s: %v", datasetName, resJSON)
}
result.Duration = 0
return &result, nil
}
// CreateMetadataInDocEngine creates the document metadata table for the tenant
func (c *RAGFlowClient) CreateMetadataInDocEngine(cmd *Command) (ResponseIf, error) {
// CreateMetadataStore creates the document metadata store for the tenant
func (c *RAGFlowClient) CreateMetadataStore(cmd *Command) (ResponseIf, error) {
if c.ServerType != "user" {
return nil, fmt.Errorf("this command is only allowed in USER mode")
}
resp, err := c.HTTPClient.Request("POST", "/tenant/doc_engine_metadata_table", "web", nil, nil)
resp, err := c.HTTPClient.Request("POST", "/tenant/metadata_store", "web", nil, nil)
if err != nil {
return nil, fmt.Errorf("failed to create metadata table: %w", err)
return nil, fmt.Errorf("failed to create metadata store: %w", err)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("failed to create metadata table: HTTP %d, body: %s", resp.StatusCode, string(resp.Body))
return nil, fmt.Errorf("failed to create metadata store: HTTP %d, body: %s", resp.StatusCode, string(resp.Body))
}
resJSON, err := resp.JSON()
@ -1068,27 +1011,27 @@ func (c *RAGFlowClient) CreateMetadataInDocEngine(cmd *Command) (ResponseIf, err
var result SimpleResponse
result.Code = int(code)
if result.Code == 0 {
result.Message = "Success to create metadata table"
result.Message = "Success to create metadata store"
} else {
result.Message = fmt.Sprintf("Failed to create metadata table: %v", resJSON)
result.Message = fmt.Sprintf("Failed to create metadata store: %v", resJSON)
}
result.Duration = 0
return &result, nil
}
// DropMetadataInDocEngine drops the document metadata table for the tenant
func (c *RAGFlowClient) DropMetadataInDocEngine(cmd *Command) (ResponseIf, error) {
// DropMetadataStore drops the document metadata store for the tenant
func (c *RAGFlowClient) DropMetadataStore(cmd *Command) (ResponseIf, error) {
if c.ServerType != "user" {
return nil, fmt.Errorf("this command is only allowed in USER mode")
}
resp, err := c.HTTPClient.Request("DELETE", "/tenant/doc_engine_metadata_table", "web", nil, nil)
resp, err := c.HTTPClient.Request("DELETE", "/tenant/metadata_store", "web", nil, nil)
if err != nil {
return nil, fmt.Errorf("failed to drop metadata table: %w", err)
return nil, fmt.Errorf("failed to drop metadata store: %w", err)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("failed to drop metadata table: HTTP %d, body: %s", resp.StatusCode, string(resp.Body))
return nil, fmt.Errorf("failed to drop metadata store: HTTP %d, body: %s", resp.StatusCode, string(resp.Body))
}
resJSON, err := resp.JSON()
@ -1104,9 +1047,9 @@ func (c *RAGFlowClient) DropMetadataInDocEngine(cmd *Command) (ResponseIf, error
var result SimpleResponse
result.Code = int(code)
if result.Code == 0 {
result.Message = "Success to drop metadata table"
result.Message = "Success to drop metadata store"
} else {
result.Message = fmt.Sprintf("Failed to drop metadata table: %v", resJSON)
result.Message = fmt.Sprintf("Failed to drop metadata store: %v", resJSON)
}
result.Duration = 0
return &result, nil
@ -2838,8 +2781,8 @@ func (c *RAGFlowClient) CESearch(cmd *Command) (ResponseIf, error) {
return &response, nil
}
// InsertDatasetFromFile inserts dataset chunks from a JSON file
func (c *RAGFlowClient) InsertDatasetFromFile(cmd *Command) (ResponseIf, error) {
// InsertChunksFromFile inserts chunks from a JSON file
func (c *RAGFlowClient) InsertChunksFromFile(cmd *Command) (ResponseIf, error) {
if c.ServerType != "user" {
return nil, fmt.Errorf("this command is only allowed in USER mode")
}
@ -2852,7 +2795,7 @@ func (c *RAGFlowClient) InsertDatasetFromFile(cmd *Command) (ResponseIf, error)
"file_path": filePath,
}
resp, err := c.HTTPClient.Request("POST", "/kb/insert_from_file", "web", nil, payload)
resp, err := c.HTTPClient.Request("POST", "/tenant/insert_chunks_from_file", "web", nil, payload)
if err != nil {
return nil, fmt.Errorf("failed to insert dataset from file: %w", err)
}
@ -2938,42 +2881,25 @@ func (c *RAGFlowClient) UpdateChunk(cmd *Command) (ResponseIf, error) {
return nil, fmt.Errorf("chunk_id not provided")
}
docID, ok := cmd.Params["doc_id"].(string)
if !ok {
return nil, fmt.Errorf("doc_id not provided")
}
datasetName, ok := cmd.Params["dataset_name"].(string)
if !ok {
return nil, fmt.Errorf("dataset_name not provided")
}
jsonBody, ok := cmd.Params["json_body"].(string)
if !ok {
return nil, fmt.Errorf("json_body not provided")
}
// Look up dataset_id from dataset_name
datasetID, err := c.getDatasetID(datasetName)
if err != nil {
return nil, fmt.Errorf("failed to get dataset ID: %w", err)
}
// Try to get doc_id from the chunk retrieval endpoint
getResp, err := c.HTTPClient.Request("GET", "/chunk/get?chunk_id="+chunkID, "web", nil, nil)
if err != nil {
return nil, fmt.Errorf("failed to get chunk info: %w", err)
}
var docID string
if getResp.StatusCode == 200 {
getJSON, err := getResp.JSON()
if err == nil {
if data, ok := getJSON["data"].(map[string]interface{}); ok {
if d, ok := data["doc_id"].(string); ok {
docID = d
}
}
}
}
if docID == "" {
return nil, fmt.Errorf("could not find document_id for chunk %s. Please provide document_id explicitly", chunkID)
jsonBody, ok := cmd.Params["json_body"].(string)
if !ok {
return nil, fmt.Errorf("json_body not provided")
}
// Parse the JSON body
@ -3028,21 +2954,16 @@ func (c *RAGFlowClient) GetChunk(cmd *Command) (ResponseIf, error) {
return nil, fmt.Errorf("chunk_id not provided")
}
datasetName, ok := cmd.Params["dataset_name"].(string)
if !ok {
return nil, fmt.Errorf("dataset_name not provided")
}
datasetID, err := c.getDatasetID(datasetName)
if err != nil {
return nil, err
}
docID, ok := cmd.Params["doc_id"].(string)
if !ok {
return nil, fmt.Errorf("doc_id not provided")
}
datasetID, ok := cmd.Params["dataset_id"].(string)
if !ok {
return nil, fmt.Errorf("dataset_id not provided")
}
resp, err := c.HTTPClient.Request("GET", fmt.Sprintf("/datasets/%s/documents/%s/chunks/%s", datasetID, docID, chunkID), "web", nil, nil)
if err != nil {
return nil, fmt.Errorf("failed to get chunk: %w", err)
@ -3116,6 +3037,57 @@ func (c *RAGFlowClient) SetMeta(cmd *Command) (ResponseIf, error) {
return &result, nil
}
// DeleteMeta deletes metadata for a document
// If keys is provided, deletes specific keys; otherwise deletes entire document metadata
func (c *RAGFlowClient) DeleteMeta(cmd *Command) (ResponseIf, error) {
if c.ServerType != "user" {
return nil, fmt.Errorf("this command is only allowed in USER mode")
}
docID, ok := cmd.Params["doc_id"].(string)
if !ok {
return nil, fmt.Errorf("doc_id not provided")
}
payload := map[string]interface{}{
"doc_id": docID,
}
// If keys provided, include in payload for deleting specific keys
if keysJSON, ok := cmd.Params["keys"].(string); ok {
payload["keys"] = keysJSON
}
resp, err := c.HTTPClient.Request("POST", "/document/delete_meta", "web", nil, payload)
if err != nil {
return nil, fmt.Errorf("failed to delete metadata: %w", err)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("failed to delete metadata: HTTP %d, body: %s", resp.StatusCode, string(resp.Body))
}
resJSON, err := resp.JSON()
if err != nil {
return nil, fmt.Errorf("invalid JSON response: %w", err)
}
code, ok := resJSON["code"].(float64)
if !ok {
return nil, fmt.Errorf("invalid response format: code is not a number")
}
var result SimpleResponse
result.Code = int(code)
if result.Code == 0 {
result.Message = fmt.Sprintf("Success to delete metadata for document: %s", docID)
} else {
result.Message = fmt.Sprintf("Failed to delete metadata: %v", resJSON)
}
result.Duration = 0
return &result, nil
}
// RmTags removes tags from chunks in a dataset
func (c *RAGFlowClient) RmTags(cmd *Command) (ResponseIf, error) {
if c.ServerType != "user" {
@ -3177,15 +3149,24 @@ func (c *RAGFlowClient) RemoveChunks(cmd *Command) (ResponseIf, error) {
return nil, fmt.Errorf("this command is only allowed in USER mode")
}
datasetName, ok := cmd.Params["dataset_name"].(string)
if !ok {
return nil, fmt.Errorf("dataset_name not provided")
}
docID, ok := cmd.Params["doc_id"].(string)
if !ok {
return nil, fmt.Errorf("doc_id not provided")
}
payload := map[string]interface{}{
"doc_id": docID,
// Look up dataset ID by name
datasetID, err := c.getDatasetID(datasetName)
if err != nil {
return nil, fmt.Errorf("dataset not found: %w", err)
}
payload := map[string]interface{}{}
// Check if delete_all is set
if deleteAll, ok := cmd.Params["delete_all"].(bool); ok && deleteAll {
payload["delete_all"] = true
@ -3193,7 +3174,7 @@ func (c *RAGFlowClient) RemoveChunks(cmd *Command) (ResponseIf, error) {
payload["chunk_ids"] = chunkIDs
}
resp, err := c.HTTPClient.Request("POST", "/chunk/rm", "web", nil, payload)
resp, err := c.HTTPClient.Request("DELETE", "/datasets/"+datasetID+"/documents/"+docID+"/chunks", "web", nil, payload)
if err != nil {
return nil, fmt.Errorf("failed to remove chunks: %w", err)
}

View File

@ -621,10 +621,10 @@ func (p *Parser) parseCreateCommand() (*Command, error) {
return p.parseCreateChat()
case TokenToken:
return p.parseCreateToken()
case TokenDatasetTable:
return p.parseCreateDatasetTable()
case TokenChunkStore:
return p.parseCreateChunkStore()
case TokenMetadata:
return p.parseCreateMetadataTable()
return p.parseCreateMetadataStore()
case TokenProvider:
return p.parseCreateProviderInstance()
default:
@ -656,9 +656,21 @@ func (p *Parser) parseCreateToken() (*Command, error) {
}
// Internal CLI for GO
// parseCreateDatasetTable parses: CREATE DATASET TABLE 'name' VECTOR SIZE N
func (p *Parser) parseCreateDatasetTable() (*Command, error) {
p.nextToken() // consume DATASET TABLE compound token
// parseCreateChunkStore parses: CREATE CHUNK STORE for Dataset 'name' VECTOR SIZE N
func (p *Parser) parseCreateChunkStore() (*Command, error) {
p.nextToken() // consume CHUNK STORE compound token
// Expect FOR
if p.curToken.Type != TokenFor {
return nil, fmt.Errorf("expected FOR after CHUNK STORE, got %s", p.curToken.Value)
}
p.nextToken()
// Expect Dataset
if p.curToken.Type != TokenDataset {
return nil, fmt.Errorf("expected Dataset after FOR, got %s", p.curToken.Value)
}
p.nextToken()
datasetName, err := p.parseQuotedString()
if err != nil {
@ -688,20 +700,20 @@ func (p *Parser) parseCreateDatasetTable() (*Command, error) {
p.nextToken()
}
cmd := NewCommand("create_dataset_table")
cmd := NewCommand("create_chunk_store")
cmd.Params["dataset_name"] = datasetName
cmd.Params["vector_size"] = vectorSize
return cmd, nil
}
// Internal CLI for GO
// parseCreateMetadataTable parses: CREATE METADATA TABLE
func (p *Parser) parseCreateMetadataTable() (*Command, error) {
// CREATE METADATA TABLE
// parseCreateMetadataStore parses: CREATE METADATA STORE
func (p *Parser) parseCreateMetadataStore() (*Command, error) {
// CREATE METADATA STORE
p.nextToken() // consume METADATA
if p.curToken.Type != TokenTable {
return nil, fmt.Errorf("expected TABLE after METADATA, got %s", p.curToken.Value)
if p.curToken.Type != TokenStore {
return nil, fmt.Errorf("expected STORE after METADATA, got %s", p.curToken.Value)
}
p.nextToken()
@ -709,7 +721,7 @@ func (p *Parser) parseCreateMetadataTable() (*Command, error) {
p.nextToken()
}
return NewCommand("create_metadata_table"), nil
return NewCommand("create_metadata_store"), nil
}
func (p *Parser) parseCreateUser() (*Command, error) {
@ -1039,10 +1051,10 @@ func (p *Parser) parseDropCommand() (*Command, error) {
return p.parseDropChat()
case TokenToken:
return p.parseDropToken()
case TokenDatasetTable:
return p.parseDropDatasetTable()
case TokenChunkStore:
return p.parseDropChunkStore()
case TokenMetadata:
return p.parseDropMetadataTable()
return p.parseDropMetadataStore()
case TokenInstance:
return p.parseDropInstance()
case TokenModel:
@ -1058,8 +1070,10 @@ func (p *Parser) parseDeleteCommand() (*Command, error) {
switch p.curToken.Type {
case TokenProvider:
return p.parseDeleteProvider()
case TokenMetadata:
return p.parseDeleteMeta()
default:
return nil, fmt.Errorf("unknown DROP target: %s", p.curToken.Value)
return nil, fmt.Errorf("unknown DELETE target: %s", p.curToken.Value)
}
}
@ -1108,9 +1122,21 @@ func (p *Parser) parseDropToken() (*Command, error) {
}
// Internal CLI for GO
// parseDropDatasetTable parses: DROP DATASET TABLE 'name'
func (p *Parser) parseDropDatasetTable() (*Command, error) {
p.nextToken() // consume DATASET TABLE
// parseDropChunkStore parses: DROP CHUNK STORE for Dataset 'name'
func (p *Parser) parseDropChunkStore() (*Command, error) {
p.nextToken() // consume CHUNK STORE
// Expect FOR
if p.curToken.Type != TokenFor {
return nil, fmt.Errorf("expected FOR after CHUNK STORE, got %s", p.curToken.Value)
}
p.nextToken()
// Expect Dataset
if p.curToken.Type != TokenDataset {
return nil, fmt.Errorf("expected Dataset after FOR, got %s", p.curToken.Value)
}
p.nextToken()
datasetName, err := p.parseQuotedString()
if err != nil {
@ -1122,26 +1148,25 @@ func (p *Parser) parseDropDatasetTable() (*Command, error) {
p.nextToken()
}
cmd := NewCommand("drop_dataset_table")
cmd := NewCommand("drop_chunk_store")
cmd.Params["dataset_name"] = datasetName
return cmd, nil
}
// Internal CLI for GO
// parseDropMetadataTable parses: DROP METADATA TABLE
func (p *Parser) parseDropMetadataTable() (*Command, error) {
// DROP METADATA TABLE
// parseDropMetadataStore parses: DROP METADATA STORE
func (p *Parser) parseDropMetadataStore() (*Command, error) {
// DROP METADATA STORE
p.nextToken() // consume METADATA
if p.curToken.Type != TokenTable {
return nil, fmt.Errorf("expected TABLE after METADATA, got %s", p.curToken.Value)
if p.curToken.Type != TokenStore {
return nil, fmt.Errorf("expected STORE after METADATA, got %s", p.curToken.Value)
}
p.nextToken()
if p.curToken.Type == TokenSemicolon {
p.nextToken()
}
cmd := NewCommand("drop_metadata_table")
cmd := NewCommand("drop_metadata_store")
return cmd, nil
}
@ -1545,7 +1570,7 @@ func (p *Parser) parseShowInstance() (*Command, error) {
return cmd, nil
}
// parseShowInstance parses SHOW BALANCE FROM <provider_name> <instance_name>
// parseShowBalance parses SHOW BALANCE FROM <provider_name> <instance_name>
func (p *Parser) parseShowBalance() (*Command, error) {
p.nextToken() // consume INSTANCE
@ -2166,20 +2191,20 @@ func (p *Parser) parseImportCommand() (*Command, error) {
func (p *Parser) parseInsertCommand() (*Command, error) {
p.nextToken() // consume INSERT
// Expect DATASET or METADATA
if p.curToken.Type == TokenDataset {
return p.parseInsertDatasetFromFile()
// Expect CHUNKS or METADATA
if p.curToken.Type == TokenChunks {
return p.parseInsertChunksFromFile()
}
if p.curToken.Type == TokenMetadata {
return p.parseInsertMetadataFromFile()
}
return nil, fmt.Errorf("expected DATASET or METADATA after INSERT, got %s", p.curToken.Value)
return nil, fmt.Errorf("expected CHUNKS or METADATA after INSERT, got %s", p.curToken.Value)
}
// Internal CLI for GO
// parseInsertDatasetFromFile parses: INSERT DATASET FROM FILE "file_path"
func (p *Parser) parseInsertDatasetFromFile() (*Command, error) {
p.nextToken() // consume DATASET
// parseInsertChunksFromFile parses: INSERT CHUNKS FROM FILE "file_path"
func (p *Parser) parseInsertChunksFromFile() (*Command, error) {
p.nextToken() // consume CHUNKS
// Expect FROM
if p.curToken.Type != TokenFrom {
@ -2199,7 +2224,7 @@ func (p *Parser) parseInsertDatasetFromFile() (*Command, error) {
return nil, err
}
cmd := NewCommand("insert_dataset_from_file")
cmd := NewCommand("insert_chunks_from_file")
cmd.Params["file_path"] = filePath
p.nextToken()
@ -3261,6 +3286,8 @@ func (p *Parser) parseUserStatement() (*Command, error) {
switch p.curToken.Type {
case TokenPing:
return p.parsePingServer()
case TokenDelete:
return p.parseDeleteCommand()
case TokenShow:
return p.parseShowCommand()
case TokenCreate:
@ -3389,7 +3416,7 @@ func (p *Parser) parseGetCommand() (*Command, error) {
return nil, fmt.Errorf("unknown GET target: %s", p.curToken.Value)
}
// parseGetChunk parses: GET CHUNK 'chunk_id' OF DATASET 'dataset_name' DOCUMENT 'doc_id'
// parseGetChunk parses: GET CHUNK 'chunk_id' OF DOCUMENT 'doc_id' IN DATASET 'dataset_id'
func (p *Parser) parseGetChunk() (*Command, error) {
p.nextToken() // consume CHUNK
@ -3408,21 +3435,8 @@ func (p *Parser) parseGetChunk() (*Command, error) {
}
p.nextToken()
if p.curToken.Type != TokenDataset {
return nil, fmt.Errorf("expected DATASET after OF")
}
p.nextToken()
// Parse dataset_name
datasetName, err := p.parseQuotedString()
if err != nil {
return nil, fmt.Errorf("expected dataset_name: %w", err)
}
cmd.Params["dataset_name"] = datasetName
p.nextToken()
if p.curToken.Type != TokenDocument {
return nil, fmt.Errorf("expected DOCUMENT after dataset_name")
return nil, fmt.Errorf("expected DOCUMENT after OF")
}
p.nextToken()
@ -3433,6 +3447,24 @@ func (p *Parser) parseGetChunk() (*Command, error) {
}
cmd.Params["doc_id"] = docID
p.nextToken()
if p.curToken.Type != TokenIn {
return nil, fmt.Errorf("expected IN after doc_id")
}
p.nextToken()
if p.curToken.Type != TokenDataset {
return nil, fmt.Errorf("expected DATASET after IN")
}
p.nextToken()
// Parse dataset_id
datasetID, err := p.parseQuotedString()
if err != nil {
return nil, fmt.Errorf("expected dataset_id: %w", err)
}
cmd.Params["dataset_id"] = datasetID
p.nextToken()
// Semicolon is optional
if p.curToken.Type == TokenSemicolon {
@ -3455,7 +3487,7 @@ func (p *Parser) parseUpdateCommand() (*Command, error) {
}
// Internal CLI for GO
// parseUpdateChunk parses: UPDATE CHUNK 'chunk_id' OF DATASET 'dataset_name' SET '{"content": "..."}'
// parseUpdateChunk parses: UPDATE CHUNK 'chunk_id' OF DOCUMENT 'doc_id' IN DATASET 'dataset_id' SET '{"content": "..."}'
func (p *Parser) parseUpdateChunk() (*Command, error) {
p.nextToken() // consume CHUNK
@ -3474,8 +3506,26 @@ func (p *Parser) parseUpdateChunk() (*Command, error) {
}
p.nextToken()
if p.curToken.Type != TokenDocument {
return nil, fmt.Errorf("expected DOCUMENT after OF")
}
p.nextToken()
// Parse doc_id
docID, err := p.parseQuotedString()
if err != nil {
return nil, fmt.Errorf("expected doc_id: %w", err)
}
cmd.Params["doc_id"] = docID
p.nextToken()
if p.curToken.Type != TokenIn {
return nil, fmt.Errorf("expected IN after doc_id")
}
p.nextToken()
if p.curToken.Type != TokenDataset {
return nil, fmt.Errorf("expected DATASET after OF")
return nil, fmt.Errorf("expected DATASET after IN")
}
p.nextToken()
@ -3555,6 +3605,65 @@ func (p *Parser) parseSetMeta() (*Command, error) {
return cmd, nil
}
// parseDeleteMeta parses: DELETE METADATA OF DOCUMENT 'doc_id' [KEYS '["key1", "key2"]']
// If KEYS is not provided, deletes entire document metadata
func (p *Parser) parseDeleteMeta() (*Command, error) {
p.nextToken() // consume METADATA
// Expect OF
if p.curToken.Type != TokenOf {
return nil, fmt.Errorf("expected OF after DELETE METADATA")
}
p.nextToken()
// Expect DOCUMENT
if p.curToken.Type != TokenDocument {
return nil, fmt.Errorf("expected DOCUMENT after DELETE METADATA OF")
}
p.nextToken()
// Parse doc_id
docID, err := p.parseQuotedString()
if err != nil {
return nil, fmt.Errorf("expected doc_id: %w", err)
}
cmd := NewCommand("delete_meta")
cmd.Params["doc_id"] = docID
p.nextToken()
// KEYS is optional - if not provided, delete entire document metadata
if p.curToken.Type != TokenKeys {
if p.curToken.Type == TokenSemicolon {
p.nextToken()
return cmd, nil
}
if p.curToken.Type == TokenEOF {
return cmd, nil
}
return nil, fmt.Errorf("expected KEYS or end of command after doc_id")
}
// Parse keys JSON array
p.nextToken()
keys, err := p.parseQuotedString()
if err != nil {
return nil, fmt.Errorf("expected keys JSON array: %w", err)
}
cmd.Params["keys"] = keys
p.nextToken()
// Semicolon is optional
if p.curToken.Type == TokenSemicolon {
p.nextToken()
return cmd, nil
}
if p.curToken.Type != TokenEOF {
return nil, fmt.Errorf("expected end of command after KEYS")
}
return cmd, nil
}
// parseRemoveTags parses: REMOVE TAGS 'tag1', 'tag2' from DATASET 'dataset_name';
func (p *Parser) parseRemoveTags() (*Command, error) {
p.nextToken() // consume TAGS
@ -3611,8 +3720,8 @@ func (p *Parser) parseRemoveTags() (*Command, error) {
}
// parseRemoveChunk parses:
// - REMOVE CHUNKS 'chunk_id1', 'chunk_id2' FROM DOCUMENT 'doc_id';
// - REMOVE ALL CHUNKS FROM DOCUMENT 'doc_id';
// - REMOVE CHUNKS 'chunk_id1', 'chunk_id2' FROM DOCUMENT 'doc_id' IN DATASET 'dataset_name';
// - REMOVE ALL CHUNKS FROM DOCUMENT 'doc_id' IN DATASET 'dataset_name';
func (p *Parser) parseRemoveChunk() (*Command, error) {
cmd := NewCommand("remove_chunks")
@ -3627,7 +3736,7 @@ func (p *Parser) parseRemoveChunk() (*Command, error) {
} else {
// curToken is TokenChunks, consume it first
p.nextToken()
// Multiple chunks: REMOVE CHUNKS 'id1' 'id2' FROM DOCUMENT 'doc_id' (space-separated)
// Multiple chunks: REMOVE CHUNKS 'id1' 'id2' FROM DOCUMENT 'doc_id' IN DATASET 'dataset_name' (space-separated)
// Parse first chunk ID
chunkID, err := p.parseQuotedString()
if err != nil {
@ -3669,6 +3778,28 @@ func (p *Parser) parseRemoveChunk() (*Command, error) {
return nil, fmt.Errorf("expected doc_id: %w", err)
}
cmd.Params["doc_id"] = docID
p.nextToken()
// Expect IN
if p.curToken.Type != TokenIn {
return nil, fmt.Errorf("expected IN after doc_id")
}
p.nextToken()
// Expect DATASET
if p.curToken.Type != TokenDataset {
return nil, fmt.Errorf("expected DATASET after IN")
}
p.nextToken()
// Parse dataset_name (quoted string)
datasetName, err := p.parseQuotedString()
if err != nil {
return nil, fmt.Errorf("expected dataset_name: %w", err)
}
cmd.Params["dataset_name"] = datasetName
p.nextToken()
// Semicolon is optional

View File

@ -17,16 +17,24 @@
package dao
import (
"errors"
"path"
"ragflow/internal/entity"
"strconv"
"strings"
"gorm.io/gorm"
)
// KnowledgebaseDAO knowledge base data access object
type KnowledgebaseDAO struct{}
// IsNotFoundErr returns true if the error indicates a record not found
func IsNotFoundErr(err error) bool {
return errors.Is(err, gorm.ErrRecordNotFound)
}
// NewKnowledgebaseDAO create knowledge base DAO
func NewKnowledgebaseDAO() *KnowledgebaseDAO {
return &KnowledgebaseDAO{}

View File

@ -386,19 +386,19 @@ RAGFlow(user)> ocr with 'paddleocr-vl-0.9b@test@baidu' file './internal/text.jpg
### 6.26 Chunk Management Commands
- Create a dataset table with vector size
- Create a chunk store with vector size
```
RAGFlow(user)> CREATE DATASET TABLE 'test' VECTOR SIZE 384
RAGFlow(user)> CREATE CHUNK STORE FOR DATASET 'test' VECTOR SIZE 384
```
- Insert data from JSON files
```
RAGFlow(user)> INSERT DATASET FROM FILE 'insert_kb.json'
RAGFlow(user)> INSERT CHUNKS FROM FILE 'insert_kb.json'
```
- Update a chunk's content
```
RAGFlow(user)> UPDATE CHUNK 'deb165dc6a732a64' OF DATASET 'test' SET '{"content": "Updated chunk content here", "important_keywords": ["keyword1", "keyword2"], "questions": ["What is this about?", "Why is it important?"], "available": true, "tag_kwd": ["tag5", "tag2"]}'
RAGFlow(user)> UPDATE CHUNK 'deb165dc6a732a64' OF DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' IN DATASET 'test' SET '{"content": "Updated chunk content here", "important_keywords": ["keyword1", "keyword2"], "questions": ["What is this about?", "Why is it important?"], "available": true, "tag_kwd": ["tag5", "tag2"]}'
```
- Remove tags from a dataset
@ -408,17 +408,17 @@ RAGFlow(user)> REMOVE TAGS 'tag1', 'tag2' FROM DATASET 'test'
- Remove specific chunks from a document
```
RAGFlow(user)> REMOVE CHUNKS '29cc4f6d7a5c6e7c' '0360e3d8519eab12' FROM DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3'
RAGFlow(user)> REMOVE CHUNKS '29cc4f6d7a5c6e7c' '0360e3d8519eab12' FROM DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' IN DATASET 'test'
```
- Remove all chunks from a document
```
RAGFlow(user)> REMOVE ALL CHUNKS FROM DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3'
RAGFlow(user)> REMOVE ALL CHUNKS FROM DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' IN DATASET 'test'
```
- Drop dataset table
- Drop chunk store
```
RAGFlow(user)> DROP DATASET TABLE 'test'
RAGFlow(user)> DROP CHUNK STORE FOR DATASET 'test'
```
- Search chunks
@ -428,17 +428,17 @@ RAGFlow(user)> SEARCH '曹操' ON DATASETS 'test'
- Get chunks
```
RAGFlow(user)> GET CHUNK '29cc4f6d7a5c6e7c' OF DATASET 'test' DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3'
RAGFlow(user)> GET CHUNK '29cc4f6d7a5c6e7c' OF DATASET 'test' DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' IN DATASET 'test'
```
### 6.27 Metadata Management Commands
- Create metadata table
- Create metadata store
```
RAGFlow(user)> CREATE METADATA TABLE
RAGFlow(user)> CREATE METADATA STORE
```
- Insert data from JSON files
- Insert metadata from JSON files
```
RAGFlow(user)> INSERT METADATA FROM FILE 'insert_metadata.json'
```
@ -447,9 +447,19 @@ RAGFlow(user)> INSERT METADATA FROM FILE 'insert_metadata.json'
RAGFlow(user)> SET METADATA OF DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' TO '{"author": ["John", "Tom"], "category": "tech"}';
```
- Drop metadata table
- Delete metadata of a document
```
RAGFlow(user)> DROP METADATA TABLE
DELETE METADATA OF DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3'
```
- Delete metadata keys of a document
```
DELETE METADATA OF DOCUMENT 'bbe55942535e11f1bc5184ba59049aa3' KEYS '["key1", "key2"]'
```
- Drop metadata store
```
RAGFlow(user)> DROP METADATA STORE
```
- List metadata

File diff suppressed because it is too large Load Diff

View File

@ -17,11 +17,16 @@
package elasticsearch
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"ragflow/internal/server"
"ragflow/internal/utility"
"time"
"github.com/elastic/go-elasticsearch/v8"
@ -81,6 +86,16 @@ func NewEngine(cfg interface{}) (*elasticsearchEngine, error) {
config: esConfig,
}
// Create two index templates for different index types
// Template for chunk indices (ragflow_*) - priority 1
if err := engine.CreateIndexTemplate(context.Background(), "ragflow_mapping", "ragflow_*", "mapping.json", 1); err != nil {
return nil, fmt.Errorf("failed to create chunk index template: %w", err)
}
// Template for doc_meta indices (ragflow_doc_meta_*) - priority 2 (higher than ragflow_*)
if err := engine.CreateIndexTemplate(context.Background(), "ragflow_doc_meta_mapping", "ragflow_doc_meta_*", "doc_meta_es_mapping.json", 2); err != nil {
return nil, fmt.Errorf("failed to create doc_meta index template: %w", err)
}
return engine, nil
}
@ -109,6 +124,82 @@ func (e *elasticsearchEngine) Close() error {
return nil
}
// CreateIndexTemplate creates an index template with the specified mapping
// The template will be automatically applied to any new index matching the pattern
func (e *elasticsearchEngine) CreateIndexTemplate(ctx context.Context, templateName, indexPattern, mappingFileName string, priority ...int) error {
if templateName == "" || indexPattern == "" {
return fmt.Errorf("template name and index pattern cannot be empty")
}
p := 1
if len(priority) > 0 {
p = priority[0]
}
if mappingFileName == "" {
mappingFileName = "mapping.json"
}
// Read mapping from file
mappingPath := filepath.Join(utility.GetProjectRoot(), "conf", mappingFileName)
data, err := os.ReadFile(mappingPath)
if err != nil {
return fmt.Errorf("failed to read mapping file: %w", err)
}
var mapping map[string]interface{}
if err := json.Unmarshal(data, &mapping); err != nil {
return fmt.Errorf("failed to parse mapping file: %w", err)
}
// Separate settings and mappings from the mapping file
templateSettings := mapping["settings"]
templateMappings := mapping["mappings"]
// Build template body with proper structure
templateBody := map[string]interface{}{
"index_patterns": []string{indexPattern},
"priority": p, // Configurable priority to override existing templates
"template": map[string]interface{}{
"settings": templateSettings,
"mappings": templateMappings,
},
}
templateBytes, err := json.Marshal(templateBody)
if err != nil {
return fmt.Errorf("failed to marshal template: %w", err)
}
// Create or update template
req := esapi.IndicesPutIndexTemplateRequest{
Name: templateName,
Body: bytes.NewReader(templateBytes),
}
res, err := req.Do(ctx, e.client)
if err != nil {
return fmt.Errorf("failed to create index template: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return fmt.Errorf("failed to create index template: %s, body: %s", res.Status(), string(bodyBytes))
}
var result map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return fmt.Errorf("failed to parse response: %w", err)
}
if acknowledged, ok := result["acknowledged"].(bool); !ok || !acknowledged {
return fmt.Errorf("index template creation not acknowledged")
}
return nil
}
// GetClusterStats gets Elasticsearch cluster statistics
// Reference: curl -XGET "http://{es_host}/_cluster/stats" -H "kbn-xsrf: reporting"
func (e *elasticsearchEngine) GetClusterStats() (map[string]interface{}, error) {

View File

@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"strings"
"github.com/elastic/go-elasticsearch/v8/esapi"
@ -32,6 +33,17 @@ import (
// CreateMetadataStore creates the document metadata index
func (e *elasticsearchEngine) CreateMetadataStore(ctx context.Context, tenantID string) error {
indexName := buildMetadataIndexName(tenantID)
// Check if index already exists
exists, err := e.indexExists(ctx, indexName)
if err != nil {
return fmt.Errorf("failed to check index existence: %w", err)
}
if exists {
return nil
}
// Index will be created with mapping from index template (ragflow_doc_meta_mapping)
req := esapi.IndicesCreateRequest{
Index: indexName,
}
@ -41,15 +53,29 @@ func (e *elasticsearchEngine) CreateMetadataStore(ctx context.Context, tenantID
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("elasticsearch returned error: %s", res.Status())
bodyBytes, _ := io.ReadAll(res.Body)
return fmt.Errorf("elasticsearch returned error: %s, body: %s", res.Status(), string(bodyBytes))
}
// Parse response
var result map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return fmt.Errorf("failed to parse response: %w", err)
}
acknowledged, ok := result["acknowledged"].(bool)
if !ok || !acknowledged {
return fmt.Errorf("metadata index creation not acknowledged")
}
return nil
}
// InsertMetadata inserts documents into tenant's metadata index
// If a document with the same id and kb_id already exists, it will be updated with the new value
func (e *elasticsearchEngine) InsertMetadata(ctx context.Context, metadata []map[string]interface{}, tenantID string) ([]string, error) {
indexName := buildMetadataIndexName(tenantID)
common.Info("Inserting metadata into Elasticsearch index", zap.String("index_name", indexName), zap.String("tenant_id", tenantID), zap.Int("doc_count", len(metadata)))
common.Info("ElasticsearchConnection.InsertMetadata called", zap.String("index_name", indexName), zap.String("tenant_id", tenantID), zap.Int("doc_count", len(metadata)))
if len(metadata) == 0 {
return []string{}, nil
@ -75,28 +101,34 @@ func (e *elasticsearchEngine) InsertMetadata(ctx context.Context, metadata []map
// Build bulk request body
var buf bytes.Buffer
for _, doc := range metadata {
// Action line - index operation
action := map[string]interface{}{
docIDRaw, hasID := doc["id"]
kbIDRaw, hasKBID := doc["kb_id"]
docID, idOK := docIDRaw.(string)
kbID, kbOK := kbIDRaw.(string)
if !hasID || !hasKBID || !idOK || !kbOK || strings.TrimSpace(docID) == "" || strings.TrimSpace(kbID) == "" {
common.Warn("Skipping metadata document without id or kb_id")
continue
}
// Action line: use json.Marshal to properly escape string values
compositeID := fmt.Sprintf("%d:%s|%d:%s", len(docID), docID, len(kbID), kbID)
action, err := json.Marshal(map[string]interface{}{
"index": map[string]interface{}{
"_index": indexName,
"_id": compositeID,
},
}
actionBytes, err := json.Marshal(action)
})
if err != nil {
common.Error("Failed to marshal bulk action", err)
return nil, fmt.Errorf("failed to marshal bulk action: %w", err)
}
buf.Write(actionBytes)
buf.Write(action)
buf.WriteByte('\n')
// Document line - meta_fields is stored as-is (ES can handle nested objects)
docBytes, err := json.Marshal(doc)
if err != nil {
common.Error("Failed to marshal document", err)
return nil, fmt.Errorf("failed to marshal document: %w", err)
// Document line
if err := json.NewEncoder(&buf).Encode(doc); err != nil {
return nil, fmt.Errorf("failed to encode document: %w", err)
}
buf.Write(docBytes)
buf.WriteByte('\n')
}
// Execute bulk request
@ -113,8 +145,9 @@ func (e *elasticsearchEngine) InsertMetadata(ctx context.Context, metadata []map
defer res.Body.Close()
if res.IsError() {
common.Sugar.Errorw("Elasticsearch bulk request returned error", "status", res.Status())
return nil, fmt.Errorf("elasticsearch bulk request returned error: %s", res.Status())
bodyBytes, _ := io.ReadAll(res.Body)
common.Sugar.Errorw("Elasticsearch bulk request returned error", "status", res.Status(), "body", string(bodyBytes))
return nil, fmt.Errorf("elasticsearch bulk request returned error: %s, body: %s", res.Status(), string(bodyBytes))
}
// Parse bulk response to check for errors
@ -129,14 +162,15 @@ func (e *elasticsearchEngine) InsertMetadata(ctx context.Context, metadata []map
common.Warn("Bulk request had some errors")
}
common.Info("Successfully inserted metadata into Elasticsearch index", zap.String("index_name", indexName), zap.Int("doc_count", len(metadata)))
common.Info("ElasticsearchConnection.InsertMetadata result", zap.String("index_name", indexName), zap.Int("count", len(metadata)))
return []string{}, nil
}
// UpdateMetadata updates document metadata in tenant's metadata index
// The metaFields map will fully replace the existing meta_fields
func (e *elasticsearchEngine) UpdateMetadata(ctx context.Context, docID string, datasetID string, metaFields map[string]interface{}, tenantID string) error {
indexName := buildMetadataIndexName(tenantID)
common.Info("Updating metadata in Elasticsearch index", zap.String("index_name", indexName), zap.String("docID", docID), zap.String("datasetID", datasetID))
common.Info("ElasticsearchConnection.UpdateMetadata called", zap.String("index_name", indexName), zap.String("docID", docID), zap.String("datasetID", datasetID))
// Check if index exists
exists, err := e.indexExists(ctx, indexName)
@ -193,14 +227,16 @@ func (e *elasticsearchEngine) UpdateMetadata(ctx context.Context, docID string,
return fmt.Errorf("elasticsearch update by query returned error: %s", res.Status())
}
common.Info("Successfully updated metadata in Elasticsearch index", zap.String("index_name", indexName), zap.String("docID", docID))
common.Info("ElasticsearchConnection.UpdateMetadata completes", zap.String("index_name", indexName), zap.String("docID", docID))
return nil
}
// DeleteMetadata deletes metadata from tenant's metadata index by condition
// The condition is a map used to build an ES query (e.g., map["kb_id"]="xxx")
// Returns the number of deleted documents
func (e *elasticsearchEngine) DeleteMetadata(ctx context.Context, condition map[string]interface{}, tenantID string) (int64, error) {
indexName := buildMetadataIndexName(tenantID)
common.Info("Deleting metadata from Elasticsearch index", zap.String("index_name", indexName), zap.Any("condition", condition))
common.Info("ElasticsearchConnection.DeleteMetadata called", zap.String("index_name", indexName), zap.Any("condition", condition))
// Check if index exists
exists, err := e.indexExists(ctx, indexName)
@ -258,10 +294,205 @@ func (e *elasticsearchEngine) DeleteMetadata(ctx context.Context, condition map[
deleted = int64(d)
}
common.Info("Successfully deleted metadata", zap.String("index_name", indexName), zap.Int64("deleted_count", deleted))
common.Info("ElasticsearchConnection.DeleteMetadata completes", zap.String("index_name", indexName), zap.Int64("deleted_count", deleted))
return deleted, nil
}
// DeleteMetadataKeys deletes specific metadata keys from a document's meta_fields.
// If deleting those keys leaves no metadata entries, the metadata document is removed.
func (e *elasticsearchEngine) DeleteMetadataKeys(ctx context.Context, docID string, datasetID string, keys []string, tenantID string) error {
indexName := buildMetadataIndexName(tenantID)
common.Info("ElasticsearchConnection.DeleteMetadataKeys called", zap.String("index_name", indexName), zap.String("docID", docID), zap.Any("keys", keys))
// Check if index exists
exists, err := e.indexExists(ctx, indexName)
if err != nil {
return fmt.Errorf("failed to check index existence: %w", err)
}
if !exists {
return fmt.Errorf("index '%s' does not exist", indexName)
}
// Build the document ID for query (no escaping needed for ES term queries)
docIDTerm := docID
datasetIDTerm := datasetID
// Build query to find the document
query := map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{"term": map[string]interface{}{"id": docIDTerm}},
{"term": map[string]interface{}{"kb_id": datasetIDTerm}},
},
},
}
// First, get the current meta_fields to check if it will be empty after deletion
getReq := map[string]interface{}{
"query": query,
"_source": []string{"meta_fields"},
"size": 1,
}
getBytes, err := json.Marshal(getReq)
if err != nil {
return fmt.Errorf("failed to marshal get request: %w", err)
}
// Use esapi.SearchRequest directly
getSearchReq := esapi.SearchRequest{
Index: []string{indexName},
Body: bytes.NewReader(getBytes),
}
getRes, err := getSearchReq.Do(ctx, e.client)
if err != nil {
return fmt.Errorf("failed to get current metadata: %w", err)
}
defer getRes.Body.Close()
if getRes.IsError() {
return fmt.Errorf("elasticsearch get request returned error: %s", getRes.Status())
}
var getResult map[string]interface{}
if err := json.NewDecoder(getRes.Body).Decode(&getResult); err != nil {
return fmt.Errorf("failed to parse get response: %w", err)
}
hits, ok := getResult["hits"].(map[string]interface{})
if !ok {
return fmt.Errorf("invalid get response format")
}
hitsArray, ok := hits["hits"].([]interface{})
if !ok || len(hitsArray) == 0 {
return fmt.Errorf("document not found: %s", docID)
}
// Check current meta_fields
firstHit, ok := hitsArray[0].(map[string]interface{})
if !ok {
return fmt.Errorf("invalid hit format")
}
source, ok := firstHit["_source"].(map[string]interface{})
if !ok {
return fmt.Errorf("invalid source format")
}
metaFieldsVal, hasMetaFields := source["meta_fields"]
var currentMetaFields map[string]interface{}
if hasMetaFields && metaFieldsVal != nil {
switch v := metaFieldsVal.(type) {
case map[string]interface{}:
currentMetaFields = v
case string:
if unmarshalErr := json.Unmarshal([]byte(v), &currentMetaFields); unmarshalErr != nil {
common.Warn("Failed to parse meta_fields JSON", zap.String("docID", docID), zap.Error(unmarshalErr))
currentMetaFields = make(map[string]interface{})
}
}
}
// If no current meta_fields or already empty, nothing to delete
if currentMetaFields == nil || len(currentMetaFields) == 0 {
common.Info("No metadata fields to delete from document", zap.String("docID", docID))
return nil
}
// Calculate which keys will be removed
keysToRemove := make(map[string]bool)
for _, k := range keys {
keysToRemove[k] = true
}
// Check if any keys actually exist and would be removed
hasKeysToRemove := false
for k := range currentMetaFields {
if keysToRemove[k] {
hasKeysToRemove = true
break
}
}
if !hasKeysToRemove {
common.Info("No matching keys to delete from document", zap.String("docID", docID))
return nil
}
// Count remaining keys after deletion (keys that are NOT being removed)
remainingKeys := 0
for k := range currentMetaFields {
if !keysToRemove[k] {
remainingKeys++
}
}
// If no other keys would remain after deletion, delete the document directly
if remainingKeys == 0 {
common.Info("All metadata keys would be deleted, removing document from index", zap.String("docID", docID))
// Build condition for deletion using docID and datasetID
condition := map[string]interface{}{
"id": docIDTerm,
"kb_id": datasetIDTerm,
}
// Use existing DeleteMetadata method which handles the deletion properly
_, err := e.DeleteMetadata(ctx, condition, tenantID)
if err != nil {
return fmt.Errorf("failed to delete document: %w", err)
}
common.Info("Successfully removed document with empty meta_fields", zap.String("docID", docID))
return nil
}
// Some keys will remain, so remove only the specified keys
keysParam := make([]string, len(keys))
for i, k := range keys {
keysParam[i] = k
}
// Build update script that removes keys from meta_fields map
scriptSource := "for(int i=0;i<params.keys.length;i++){if(ctx._source.meta_fields.containsKey(params.keys[i])){ctx._source.meta_fields.remove(params.keys[i])}}"
updateReq := map[string]interface{}{
"query": query,
"script": map[string]interface{}{
"source": scriptSource,
"params": map[string]interface{}{
"keys": keysParam,
},
},
}
updateBytes, err := json.Marshal(updateReq)
if err != nil {
return fmt.Errorf("failed to marshal update request: %w", err)
}
req := esapi.UpdateByQueryRequest{
Index: []string{indexName},
Body: bytes.NewReader(updateBytes),
}
res, err := req.Do(ctx, e.client)
if err != nil {
common.Error("Failed to execute update by query", err)
return fmt.Errorf("failed to execute update by query: %w", err)
}
defer res.Body.Close()
if res.IsError() {
common.Sugar.Errorw("Elasticsearch update by query returned error", "status", res.Status())
return fmt.Errorf("elasticsearch update by query returned error: %s", res.Status())
}
common.Info("ElasticsearchConnection.DeleteMetadataKeys completes", zap.String("index_name", indexName), zap.String("docID", docID))
return nil
}
// DropMetadataStore drops a metadata index from Elasticsearch
func (e *elasticsearchEngine) DropMetadataStore(ctx context.Context, tenantID string) error {
indexName := buildMetadataIndexName(tenantID)

View File

@ -47,6 +47,7 @@ type DocEngine interface {
InsertMetadata(ctx context.Context, metadata []map[string]interface{}, tenantID string) ([]string, error)
UpdateMetadata(ctx context.Context, docID string, datasetID string, metaFields map[string]interface{}, tenantID string) error
DeleteMetadata(ctx context.Context, condition map[string]interface{}, tenantID string) (int64, error)
DeleteMetadataKeys(ctx context.Context, docID string, datasetID string, keys []string, tenantID string) error
DropMetadataStore(ctx context.Context, tenantID string) error
MetadataStoreExists(ctx context.Context, tenantID string) (bool, error)

View File

@ -228,7 +228,7 @@ func (e *infinityEngine) UpdateMetadata(ctx context.Context, docID string, datas
if metaFieldsData, exists := qr.Data["meta_fields"]; exists && len(metaFieldsData) > 0 {
existingMetaFieldsVal := metaFieldsData[0]
// Parse existing meta_fields if it's a string
// Parse existing meta_fields if it's a string or []uint8
var existingMetaFields map[string]interface{}
if existingMetaFieldsVal != nil {
switch v := existingMetaFieldsVal.(type) {
@ -237,6 +237,16 @@ func (e *infinityEngine) UpdateMetadata(ctx context.Context, docID string, datas
common.Warn(fmt.Sprintf("Failed to parse existing meta_fields: %v", err))
existingMetaFields = make(map[string]interface{})
}
case []uint8:
// Handle raw bytes from Infinity - Infinity prefixes JSON with 4 bytes (likely length), skip them
decoded := v
if len(decoded) > 4 {
decoded = decoded[4:]
}
if err := json.Unmarshal(decoded, &existingMetaFields); err != nil {
common.Warn(fmt.Sprintf("Failed to parse existing meta_fields from []uint8: %v", err))
existingMetaFields = make(map[string]interface{})
}
case map[string]interface{}:
existingMetaFields = v
}
@ -285,6 +295,7 @@ func (e *infinityEngine) UpdateMetadata(ctx context.Context, docID string, datas
}
// DeleteMetadata deletes metadata from tenant's metadata table by condition
// Returns the number of deleted documents.
func (e *infinityEngine) DeleteMetadata(ctx context.Context, condition map[string]interface{}, tenantID string) (int64, error) {
tableName := buildMetadataTableName(tenantID)
@ -341,6 +352,143 @@ func (e *infinityEngine) DeleteMetadata(ctx context.Context, condition map[strin
return delResp.DeletedRows, nil
}
// DeleteMetadataKeys deletes specific metadata keys from a document's meta_fields.
// If deleting those keys leaves no metadata entries, the metadata row is removed.
func (e *infinityEngine) DeleteMetadataKeys(ctx context.Context, docID string, datasetID string, keys []string, tenantID string) error {
tableName := buildMetadataTableName(tenantID)
common.Info("InfinityConnection.DeleteMetadataKeys called", zap.String("tableName", tableName), zap.String("docID", docID), zap.Any("keys", keys))
db, err := e.client.conn.GetDatabase(e.client.dbName)
if err != nil {
return fmt.Errorf("failed to get database: %w", err)
}
table, err := db.GetTable(tableName)
if err != nil {
return fmt.Errorf("failed to get metadata table %s: %w", tableName, err)
}
// Build filter to find the document
escapedDocID := strings.ReplaceAll(docID, "'", "''")
escapedDatasetID := strings.ReplaceAll(datasetID, "'", "''")
filter := fmt.Sprintf("id = '%s' AND kb_id = '%s'", escapedDocID, escapedDatasetID)
// Query existing metadata to get current meta_fields
queryTable := table.Output([]string{"id", "kb_id", "meta_fields"}).Filter(filter).Limit(1).Offset(0)
result, err := queryTable.ToResult()
if err != nil {
return fmt.Errorf("failed to query existing metadata: %w", err)
}
qr, ok := result.(*infinity.QueryResult)
if !ok || qr == nil || len(qr.Data["id"]) == 0 {
return fmt.Errorf("document not found: %s", docID)
}
// Get existing meta_fields
var existingMetaFields map[string]interface{}
if metaFieldsData, exists := qr.Data["meta_fields"]; exists && len(metaFieldsData) > 0 {
if metaFieldsData[0] != nil {
switch v := metaFieldsData[0].(type) {
case string:
if err := json.Unmarshal([]byte(v), &existingMetaFields); err != nil {
common.Warn("Failed to parse meta_fields JSON", zap.String("docID", docID), zap.Error(err))
existingMetaFields = make(map[string]interface{})
}
case []uint8:
// Handle raw bytes from Infinity - Infinity prefixes JSON with 4 bytes (likely length), skip them
decoded := v
if len(decoded) > 4 {
decoded = decoded[4:]
}
if err := json.Unmarshal(decoded, &existingMetaFields); err != nil {
common.Warn("Failed to parse meta_fields JSON from []uint8", zap.String("docID", docID), zap.String("err", err.Error()))
existingMetaFields = make(map[string]interface{})
}
case map[string]interface{}:
existingMetaFields = v
default:
common.Debug("meta_fields unexpected type", zap.String("type", fmt.Sprintf("%T", metaFieldsData[0])), zap.Any("value", metaFieldsData[0]))
}
}
} else {
common.Debug("meta_fields not found in qr.Data or empty", zap.Any("exists", exists))
}
if existingMetaFields == nil {
existingMetaFields = make(map[string]interface{})
}
// Build set of keys to remove
keysToRemove := make(map[string]bool)
for _, k := range keys {
keysToRemove[k] = true
}
// Check if any keys actually exist and would be removed
hasKeysToRemove := false
for k := range existingMetaFields {
if keysToRemove[k] {
hasKeysToRemove = true
break
}
}
if !hasKeysToRemove {
common.Info(
"No matching keys to delete from document",
zap.String("docID", docID),
zap.Int("existingMetaFieldCount", len(existingMetaFields)),
zap.Int("keysCount", len(keys)),
)
return nil
}
// Count remaining keys after deletion (keys that are NOT being removed)
remainingKeys := 0
for k := range existingMetaFields {
if !keysToRemove[k] {
remainingKeys++
}
}
// If no other keys would remain after deletion, delete the document directly
if remainingKeys == 0 {
common.Info("All metadata keys would be deleted, removing document from index", zap.String("docID", docID))
// Use existing DeleteMetadata method which handles the deletion properly
condition := map[string]interface{}{
"id": docID,
"kb_id": datasetID,
}
_, err := e.DeleteMetadata(ctx, condition, tenantID)
if err != nil {
return fmt.Errorf("failed to delete document: %w", err)
}
common.Info("Successfully removed document with empty meta_fields", zap.String("docID", docID))
return nil
}
// Some keys will remain, so remove only the specified keys
for _, key := range keys {
delete(existingMetaFields, key)
}
// Update with the modified metadata
updatedFields := map[string]interface{}{
"meta_fields": utility.ConvertMapToJSONString(existingMetaFields),
}
_, err = table.Update(filter, updatedFields)
if err != nil {
return fmt.Errorf("failed to delete metadata keys: %w", err)
}
common.Info("InfinityConnection.DeleteMetadataKeys completed", zap.String("tableName", tableName), zap.String("docID", docID))
return nil
}
// DropMetadataStore drops a metadata table from Infinity
func (e *infinityEngine) DropMetadataStore(ctx context.Context, tenantID string) error {
tableName := buildMetadataTableName(tenantID)

View File

@ -133,7 +133,7 @@ func (h *ChunkHandler) Get(c *gin.Context) {
return
}
chunkID := c.Query("chunk_id")
chunkID := c.Param("chunk_id")
if chunkID == "" {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
@ -337,7 +337,7 @@ func (h *ChunkHandler) UpdateChunk(c *gin.Context) {
})
}
// Remove handles chunk removal requests
// RemoveChunks handles chunk removal requests
// @Summary Remove Chunks
// @Description Remove chunks from a document
// @Tags chunks
@ -345,14 +345,24 @@ func (h *ChunkHandler) UpdateChunk(c *gin.Context) {
// @Produce json
// @Param request body service.RemoveChunksRequest true "remove chunks request"
// @Success 200 {object} map[string]interface{}
// @Router /v1/chunk/rm [post]
func (h *ChunkHandler) Remove(c *gin.Context) {
// @Router /api/v1/datasets/{dataset_id}/documents/{document_id}/chunks [delete]
func (h *ChunkHandler) RemoveChunks(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
// Get document_id from URL path
docID := c.Param("document_id")
if docID == "" {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": "document_id is required",
})
return
}
var req service.RemoveChunksRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
@ -362,6 +372,8 @@ func (h *ChunkHandler) Remove(c *gin.Context) {
return
}
req.DocID = docID
if req.DocID == "" {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,

View File

@ -565,6 +565,124 @@ func (h *DocumentHandler) SetMeta(c *gin.Context) {
})
}
// DeleteMetaRequest represents the request for deleting document metadata
type DeleteMetaRequest struct {
DocID string `json:"doc_id" binding:"required"`
Keys string `json:"keys"` // optional - if provided, deletes specific keys; otherwise deletes entire document metadata
}
// DeleteMeta handles the delete metadata request for a document
// If Keys is provided, deletes specific metadata keys; otherwise deletes entire document metadata
// @Summary Delete Document Metadata
// @Description Delete metadata keys or entire document metadata for a specific document
// @Tags documents
// @Accept json
// @Produce json
// @Security ApiKeyAuth
// @Param request body DeleteMetaRequest true "metadata keys to delete or empty to delete all"
// @Success 200 {object} map[string]interface{}
// @Router /v1/document/delete_meta [post]
func (h *DocumentHandler) DeleteMeta(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
var req DeleteMetaRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 1,
"message": err.Error(),
})
return
}
if req.DocID == "" {
c.JSON(http.StatusBadRequest, gin.H{
"code": 1,
"message": "doc_id is required",
})
return
}
// Authorization: user must be able to access the document's dataset.
doc, err := h.documentService.GetDocumentByID(req.DocID)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 1,
"message": "document not found",
})
return
}
if !h.datasetService.Accessible(doc.KbID, user.ID) {
jsonError(c, common.CodeAuthenticationError, "No authorization.")
return
}
// If Keys is provided, parse and delete specific keys; otherwise delete entire document metadata
if req.Keys != "" {
// Parse keys JSON string - expected to be a list of key names to delete
var keys []string
if err := json.Unmarshal([]byte(req.Keys), &keys); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 1,
"message": "Json syntax error: " + err.Error(),
})
return
}
if keys == nil || len(keys) == 0 {
c.JSON(http.StatusBadRequest, gin.H{
"code": 1,
"message": "keys list is required",
})
return
}
err := h.documentService.DeleteDocumentMetadata(req.DocID, keys)
if err != nil {
errMsg := err.Error()
if strings.Contains(errMsg, "no such document") || strings.Contains(errMsg, "document not found") {
c.JSON(http.StatusBadRequest, gin.H{
"code": 1,
"message": errMsg,
})
} else {
c.JSON(http.StatusInternalServerError, gin.H{
"code": 1,
"message": "Failed to delete metadata: " + errMsg,
})
}
return
}
} else {
// Delete entire document metadata
err := h.documentService.DeleteDocumentAllMetadata(req.DocID)
if err != nil {
errMsg := err.Error()
if strings.Contains(errMsg, "no such document") || strings.Contains(errMsg, "document not found") {
c.JSON(http.StatusBadRequest, gin.H{
"code": 1,
"message": errMsg,
})
} else {
c.JSON(http.StatusInternalServerError, gin.H{
"code": 1,
"message": "Failed to delete metadata: " + errMsg,
})
}
return
}
}
c.JSON(http.StatusOK, gin.H{
"code": 0,
"message": "success",
"data": true,
})
}
type ParseDocumentRequest struct {
Documents []string `json:"documents" binding:"required"`
}

View File

@ -17,11 +17,8 @@
package handler
import (
"encoding/json"
"net/http"
"os"
"ragflow/internal/common"
"ragflow/internal/engine"
"ragflow/internal/service"
"strings"
@ -426,174 +423,4 @@ func (h *KnowledgebaseHandler) GetBasicInfo(c *gin.Context) {
}
jsonResponse(c, common.CodeSuccess, map[string]interface{}{}, "success")
}
// CreateDatasetInDocEngine handles the create dataset request for a knowledge base
// @Summary Create Dataset in Doc Engine
// @Description Create the Infinity table for a knowledge base
// @Tags knowledgebase
// @Accept json
// @Produce json
// @Security ApiKeyAuth
// @Param request body service.CreateDatasetTableRequest true "create dataset request"
// @Success 200 {object} map[string]interface{}
// @Router /v1/kb/doc_engine_table [post]
func (h *KnowledgebaseHandler) CreateDatasetInDocEngine(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
var req service.CreateDatasetTableRequest
if err := c.ShouldBindJSON(&req); err != nil {
jsonError(c, common.CodeDataError, err.Error())
return
}
// Check authorization
if !h.kbService.Accessible(req.KBID, user.ID) {
jsonError(c, common.CodeAuthenticationError, "No authorization.")
return
}
result, code, err := h.kbService.CreateDatasetInDocEngine(&req)
if err != nil {
jsonError(c, code, err.Error())
return
}
jsonResponse(c, common.CodeSuccess, result, "success")
}
// DeleteDatasetInDocEngineRequest represents the request for deleting a dataset table
type DeleteDatasetInDocEngineRequest struct {
KBID string `json:"kb_id" binding:"required"`
}
// DeleteDatasetInDocEngine handles the delete dataset request for a knowledge base
// @Summary Delete Dataset in Doc Engine
// @Description Delete the Infinity table for a knowledge base
// @Tags knowledgebase
// @Accept json
// @Produce json
// @Security ApiKeyAuth
// @Param request body DeleteDatasetInDocEngineRequest true "delete dataset request"
// @Success 200 {object} map[string]interface{}
// @Router /v1/kb/doc_engine_table [delete]
func (h *KnowledgebaseHandler) DeleteDatasetInDocEngine(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
var req DeleteDatasetInDocEngineRequest
if err := c.ShouldBindJSON(&req); err != nil {
jsonError(c, common.CodeDataError, err.Error())
return
}
// Check authorization
if !h.kbService.Accessible(req.KBID, user.ID) {
jsonError(c, common.CodeAuthenticationError, "No authorization.")
return
}
code, err := h.kbService.DeleteDatasetInDocEngine(req.KBID)
if err != nil {
jsonError(c, code, err.Error())
return
}
jsonResponse(c, common.CodeSuccess, nil, "success")
}
// InsertDatasetFromFileRequest request for inserting chunks into dataset from file
type InsertDatasetFromFileRequest struct {
FilePath string `json:"file_path" binding:"required"`
}
// @Summary Insert chunks into dataset from file
// @Description Internal: Insert into dataset table from a JSON file (table name extracted from file)
// @Tags knowledgebase
// @Accept json
// @Produce json
// @Security ApiKeyAuth
// @Param request body InsertDatasetFromFileRequest true "insert dataset request"
// @Success 200 {object} map[string]interface{}
// @Router /v1/kb/insert_from_file [post]
func (h *KnowledgebaseHandler) InsertDatasetFromFile(c *gin.Context) {
_, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
var req InsertDatasetFromFileRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": err.Error(),
})
return
}
if req.FilePath == "" {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": "file_path is required",
})
return
}
// Read the JSON file
data, err := os.ReadFile(req.FilePath)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": "failed to read file: " + err.Error(),
})
return
}
// Parse JSON - format: {"table_name": ..., "knowledgebase_id": ..., "chunks": [...]}
var debugFormat struct {
TableNamePrefix string `json:"table_name"`
KnowledgebaseID string `json:"knowledgebase_id"`
Chunks []map[string]interface{} `json:"chunks"`
}
if err := json.Unmarshal(data, &debugFormat); err != nil || debugFormat.Chunks == nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": "invalid JSON format: expected {\"table_name\": ..., \"knowledgebase_id\": ..., \"chunks\": [...]}",
})
return
}
if len(debugFormat.Chunks) == 0 {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": "no chunks found in file",
})
return
}
// Get the document engine and insert
docEngine := engine.Get()
result, err := docEngine.InsertChunks(c.Request.Context(), debugFormat.Chunks, debugFormat.TableNamePrefix, debugFormat.KnowledgebaseID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"code": 500,
"message": "failed to insert into dataset: " + err.Error(),
})
return
}
c.JSON(http.StatusOK, gin.H{
"code": 0,
"data": result,
"message": "success",
})
}

View File

@ -32,13 +32,15 @@ import (
type TenantHandler struct {
tenantService *service.TenantService
userService *service.UserService
kbService *service.KnowledgebaseService
}
// NewTenantHandler create tenant handler
func NewTenantHandler(tenantService *service.TenantService, userService *service.UserService) *TenantHandler {
func NewTenantHandler(tenantService *service.TenantService, userService *service.UserService, kbService *service.KnowledgebaseService) *TenantHandler {
return &TenantHandler{
tenantService: tenantService,
userService: userService,
kbService: kbService,
}
}
@ -192,16 +194,16 @@ func (h *TenantHandler) TenantList(c *gin.Context) {
})
}
// CreateMetadataInDocEngine handles the create doc meta table request
// @Summary Create Doc Meta Table
// @Description Create the document metadata table for a tenant
// CreateMetadataStore handles the create metadata store request
// @Summary Create Metadata Store
// @Description Create the metadata store for a tenant
// @Tags tenants
// @Accept json
// @Produce json
// @Security ApiKeyAuth
// @Success 200 {object} map[string]interface{}
// @Router /v1/tenant/doc_engine_metadata_table [post]
func (h *TenantHandler) CreateMetadataInDocEngine(c *gin.Context) {
// @Router /v1/tenant/metadata_store [post]
func (h *TenantHandler) CreateMetadataStore(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
@ -211,7 +213,7 @@ func (h *TenantHandler) CreateMetadataInDocEngine(c *gin.Context) {
// Use user.ID as tenant ID (user IS the tenant in user mode)
tenantID := user.ID
code, err := h.tenantService.CreateMetadataInDocEngine(tenantID)
code, err := h.tenantService.CreateMetadataStore(tenantID)
if err != nil {
jsonError(c, code, err.Error())
return
@ -224,16 +226,16 @@ func (h *TenantHandler) CreateMetadataInDocEngine(c *gin.Context) {
})
}
// DeleteMetadataInDocEngine handles the delete doc meta table request
// @Summary Delete Metadata In Doc Engine
// @Description Delete the document metadata table for a tenant
// DeleteMetadataStore handles the delete metadata store request
// @Summary Delete Metadata Store
// @Description Delete the metadata store for a tenant
// @Tags tenants
// @Accept json
// @Produce json
// @Security ApiKeyAuth
// @Success 200 {object} map[string]interface{}
// @Router /v1/tenant/doc_engine_metadata_table [delete]
func (h *TenantHandler) DeleteMetadataInDocEngine(c *gin.Context) {
// @Router /v1/tenant/metadata_store [delete]
func (h *TenantHandler) DeleteMetadataStore(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
@ -243,7 +245,7 @@ func (h *TenantHandler) DeleteMetadataInDocEngine(c *gin.Context) {
// Use user.ID as tenant ID (user IS the tenant in user mode)
tenantID := user.ID
code, err := h.tenantService.DeleteMetadataInDocEngine(tenantID)
code, err := h.tenantService.DeleteMetadataStore(tenantID)
if err != nil {
jsonError(c, code, err.Error())
return
@ -256,6 +258,201 @@ func (h *TenantHandler) DeleteMetadataInDocEngine(c *gin.Context) {
})
}
// CreateChunkTableRequest represents the request for creating a chunk table
type CreateChunkTableRequest struct {
KBID string `json:"kb_id" binding:"required"`
VectorSize int `json:"vector_size" binding:"required"`
}
// CreateChunkStore handles the create chunk store request
// @Summary Create Chunk Store
// @Description Create the chunk store for a knowledge base
// @Tags tenants
// @Accept json
// @Produce json
// @Security ApiKeyAuth
// @Param request body CreateChunkTableRequest true "create chunk store request"
// @Success 200 {object} map[string]interface{}
// @Router /v1/tenant/chunk_store [post]
func (h *TenantHandler) CreateChunkStore(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
var req CreateChunkTableRequest
if err := c.ShouldBindJSON(&req); err != nil {
jsonError(c, common.CodeDataError, err.Error())
return
}
// Check authorization - user must have access to this kb
if !h.kbService.Accessible(req.KBID, user.ID) {
jsonError(c, common.CodeAuthenticationError, "No authorization.")
return
}
serviceReq := &service.CreateDatasetTableRequest{
KBID: req.KBID,
VectorSize: req.VectorSize,
}
result, code, err := h.tenantService.CreateChunkStore(serviceReq)
if err != nil {
jsonError(c, code, err.Error())
return
}
c.JSON(http.StatusOK, gin.H{
"code": common.CodeSuccess,
"message": "success",
"data": result,
})
}
// DeleteChunkTableRequest represents the request for deleting a chunk table
type DeleteChunkTableRequest struct {
KBID string `json:"kb_id" binding:"required"`
}
// DeleteChunkStore handles the delete chunk store request
// @Summary Delete Chunk Store
// @Description Delete the chunk store for a knowledge base
// @Tags tenants
// @Accept json
// @Produce json
// @Security ApiKeyAuth
// @Param request body DeleteChunkTableRequest true "delete chunk store request"
// @Success 200 {object} map[string]interface{}
// @Router /v1/tenant/chunk_store [delete]
func (h *TenantHandler) DeleteChunkStore(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
var req DeleteChunkTableRequest
if err := c.ShouldBindJSON(&req); err != nil {
jsonError(c, common.CodeDataError, err.Error())
return
}
// Check authorization
if !h.kbService.Accessible(req.KBID, user.ID) {
jsonError(c, common.CodeAuthenticationError, "No authorization.")
return
}
code, err := h.tenantService.DeleteChunkStore(req.KBID)
if err != nil {
jsonError(c, code, err.Error())
return
}
c.JSON(http.StatusOK, gin.H{
"code": common.CodeSuccess,
"message": "success",
"data": nil,
})
}
// InsertChunksFromFileRequest request for inserting chunks from file
type InsertChunksFromFileRequest struct {
FilePath string `json:"file_path" binding:"required"`
}
// @Summary Insert chunks into dataset from JSON file
// @Description Internal: Insert chunks into dataset table from a JSON file
// @Tags tenants
// @Accept json
// @Produce json
// @Security ApiKeyAuth
// @Param request body InsertChunksFromFileRequest true "insert chunks request"
// @Success 200 {object} map[string]interface{}
// @Router /v1/tenant/insert_chunks_from_file [post]
func (h *TenantHandler) InsertChunksFromFile(c *gin.Context) {
_, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
var req InsertChunksFromFileRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": err.Error(),
})
return
}
if req.FilePath == "" {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": "file_path is required",
})
return
}
// Read the JSON file
data, err := os.ReadFile(req.FilePath)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": "failed to read file: " + err.Error(),
})
return
}
// Parse JSON - format: {"index_name"/"table_name": ..., "knowledgebase_id": ..., "chunks": [...]}
var debugFormat struct {
IndexName string `json:"index_name"`
TableName string `json:"table_name"`
KnowledgebaseID string `json:"knowledgebase_id"`
Chunks []map[string]interface{} `json:"chunks"`
}
if err := json.Unmarshal(data, &debugFormat); err != nil || debugFormat.Chunks == nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": "invalid JSON format: expected {\"index_name\"/\"table_name\": ..., \"knowledgebase_id\": ..., \"chunks\": [...]}",
})
return
}
if len(debugFormat.Chunks) == 0 {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"message": "no chunks found in file",
})
return
}
// Support both index_name (ES) and table_name (Infinity) in JSON
indexName := debugFormat.IndexName
if indexName == "" {
indexName = debugFormat.TableName
}
// Get the document engine and insert
docEngine := engine.Get()
result, err := docEngine.InsertChunks(c.Request.Context(), debugFormat.Chunks, indexName, debugFormat.KnowledgebaseID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"code": 500,
"message": "failed to insert into dataset: " + err.Error(),
})
return
}
c.JSON(http.StatusOK, gin.H{
"code": 0,
"data": result,
"message": "success",
})
}
// InsertMetadataFromFileRequest request for inserting metadata from file
type InsertMetadataFromFileRequest struct {
FilePath string `json:"file_path" binding:"required"`

View File

@ -194,6 +194,7 @@ func (r *Router) Setup(engine *gin.Engine) {
// Dataset document chunk
datasets.GET("/:dataset_id/documents/:document_id/chunks/:chunk_id", r.chunkHandler.Get)
datasets.POST("/:dataset_id/documents/parse", r.documentHandler.ParseDocuments)
datasets.DELETE("/:dataset_id/documents/:document_id/chunks", r.chunkHandler.RemoveChunks)
}
// Search routes
@ -341,9 +342,6 @@ func (r *Router) Setup(engine *gin.Engine) {
kb.GET("/tags", r.knowledgebaseHandler.ListTagsFromKbs)
kb.GET("/get_meta", r.knowledgebaseHandler.GetMeta)
kb.GET("/basic_info", r.knowledgebaseHandler.GetBasicInfo)
kb.POST("/doc_engine_table", r.knowledgebaseHandler.CreateDatasetInDocEngine) // Internal API only for GO
kb.DELETE("/doc_engine_table", r.knowledgebaseHandler.DeleteDatasetInDocEngine) // Internal API only for GO
kb.POST("/insert_from_file", r.knowledgebaseHandler.InsertDatasetFromFile) // Internal API only for GO
// KB ID specific routes
kbByID := kb.Group("/:kb_id")
@ -358,8 +356,11 @@ func (r *Router) Setup(engine *gin.Engine) {
// Tenant routes (per-tenant resources)
tenant := v1.Group("/tenant")
{
tenant.POST("/doc_engine_metadata_table", r.tenantHandler.CreateMetadataInDocEngine) // Internal API only for GO
tenant.DELETE("/doc_engine_metadata_table", r.tenantHandler.DeleteMetadataInDocEngine) // Internal API only for GO
tenant.POST("/chunk_store", r.tenantHandler.CreateChunkStore) // Internal API only for GO
tenant.DELETE("/chunk_store", r.tenantHandler.DeleteChunkStore) // Internal API only for GO
tenant.POST("/metadata_store", r.tenantHandler.CreateMetadataStore) // Internal API only for GO
tenant.DELETE("/metadata_store", r.tenantHandler.DeleteMetadataStore) // Internal API only for GO
tenant.POST("/insert_chunks_from_file", r.tenantHandler.InsertChunksFromFile) // Internal API only for GO
tenant.POST("/insert_metadata_from_file", r.tenantHandler.InsertMetadataFromFile) // Internal API only for GO
}
@ -369,6 +370,7 @@ func (r *Router) Setup(engine *gin.Engine) {
doc.POST("/list", r.documentHandler.ListDocuments)
doc.POST("/metadata/summary", r.documentHandler.MetadataSummary)
doc.POST("/set_meta", r.documentHandler.SetMeta)
doc.POST("/delete_meta", r.documentHandler.DeleteMeta) // Internal API only for GO
}
// Chunk routes
@ -376,7 +378,6 @@ func (r *Router) Setup(engine *gin.Engine) {
{
chunk.POST("/list", r.chunkHandler.List)
chunk.POST("/update", r.chunkHandler.UpdateChunk) // Internal API only for GO
chunk.POST("/rm", r.chunkHandler.Remove)
}
// LLM routes

View File

@ -386,6 +386,58 @@ func (s *DocumentService) SetDocumentMetadata(docID string, meta map[string]inte
return nil
}
// DeleteDocumentMetadata deletes metadata keys for a document in the document engine
func (s *DocumentService) DeleteDocumentMetadata(docID string, keys []string) error {
// Get document to find kb_id
doc, err := s.documentDAO.GetByID(docID)
if err != nil {
return fmt.Errorf("document not found: %w", err)
}
// Get tenant ID
tenantID, err := s.metadataSvc.GetTenantIDByKBID(doc.KbID)
if err != nil {
return fmt.Errorf("failed to get tenant ID: %w", err)
}
// Delete metadata using the document engine
err = s.docEngine.DeleteMetadataKeys(nil, docID, doc.KbID, keys, tenantID)
if err != nil {
return fmt.Errorf("failed to delete metadata: %w", err)
}
return nil
}
// DeleteDocumentAllMetadata deletes all metadata for a document in the document engine
func (s *DocumentService) DeleteDocumentAllMetadata(docID string) error {
// Get document to find kb_id
doc, err := s.documentDAO.GetByID(docID)
if err != nil {
return fmt.Errorf("document not found: %w", err)
}
// Get tenant ID
tenantID, err := s.metadataSvc.GetTenantIDByKBID(doc.KbID)
if err != nil {
return fmt.Errorf("failed to get tenant ID: %w", err)
}
// Build condition to match the document
condition := map[string]interface{}{
"id": docID,
"kb_id": doc.KbID,
}
// Delete entire document metadata
_, err = s.docEngine.DeleteMetadata(nil, condition, tenantID)
if err != nil {
return fmt.Errorf("failed to delete document metadata: %w", err)
}
return nil
}
// GetDocumentMetadataByID get metadata for a specific document
func (s *DocumentService) GetDocumentMetadataByID(docID string) (map[string]interface{}, error) {
// Get document to find kb_id

View File

@ -78,69 +78,6 @@ type ListKbsResponse struct {
Total int64 `json:"total"`
}
// CreateDatasetTableRequest represents the request for creating a dataset table
type CreateDatasetTableRequest struct {
KBID string `json:"kb_id" binding:"required"`
VectorSize int `json:"vector_size" binding:"required"`
ParserID string `json:"parser_id,omitempty"`
}
// CreateDatasetInDocEngineResponse represents the response for creating a dataset table
type CreateDatasetInDocEngineResponse struct {
KBID string `json:"kb_id"`
TableName string `json:"table_name"`
VectorSize int `json:"vector_size"`
}
// CreateDatasetInDocEngine creates a table in the document engine for a knowledge base
func (s *KnowledgebaseService) CreateDatasetInDocEngine(req *CreateDatasetTableRequest) (*CreateDatasetInDocEngineResponse, common.ErrorCode, error) {
// Get KB to find tenant_id for building table name
kb, err := s.kbDAO.GetByID(req.KBID)
if err != nil {
return nil, common.CodeDataError, fmt.Errorf("knowledge base not found: %s", req.KBID)
}
// vector_size is required
vecSize := req.VectorSize
if vecSize <= 0 {
return nil, common.CodeDataError, fmt.Errorf("vector_size must be positive")
}
// Build table name prefix: ragflow_<tenant_id>
tableName := fmt.Sprintf("ragflow_%s", kb.TenantID)
// Call document engine to create table
// Full table name will be built as "{tableName}_{kb_id}"
err = s.docEngine.CreateChunkStore(context.Background(), tableName, req.KBID, vecSize, req.ParserID)
if err != nil {
return nil, common.CodeServerError, fmt.Errorf("failed to create dataset: %w", err)
}
return &CreateDatasetInDocEngineResponse{
KBID: req.KBID,
TableName: tableName,
VectorSize: vecSize,
}, common.CodeSuccess, nil
}
// DeleteDatasetInDocEngine deletes the table in the document engine for a knowledge base
func (s *KnowledgebaseService) DeleteDatasetInDocEngine(kbID string) (common.ErrorCode, error) {
// Get KB to find tenant_id for building table name
kb, err := s.kbDAO.GetByID(kbID)
if err != nil {
return common.CodeDataError, fmt.Errorf("knowledge base not found: %s", kbID)
}
// Call document engine to delete table
err = s.docEngine.DropChunkStore(context.Background(), fmt.Sprintf("ragflow_%s", kb.TenantID), kbID)
if err != nil {
return common.CodeServerError, fmt.Errorf("failed to delete table: %w", err)
}
return common.CodeSuccess, nil
}
// UpdateKB updates an existing knowledge base
// This matches the Python update endpoint in kb_app.py
func (s *KnowledgebaseService) UpdateKB(req *UpdateKBRequest, userID string) (map[string]interface{}, common.ErrorCode, error) {

View File

@ -35,6 +35,7 @@ type TenantService struct {
modelDAO *dao.TenantModelDAO
modelGroupDAO *dao.TenantModelGroupDAO
modelGroupMappingDAO *dao.TenantModelGroupMappingDAO
kbDAO *dao.KnowledgebaseDAO
docEngine engine.DocEngine
}
@ -48,6 +49,7 @@ func NewTenantService() *TenantService {
modelDAO: dao.NewTenantModelDAO(),
modelGroupDAO: dao.NewTenantModelGroupDAO(),
modelGroupMappingDAO: dao.NewTenantModelGroupMappingDAO(),
kbDAO: dao.NewKnowledgebaseDAO(),
docEngine: engine.Get(),
}
}
@ -265,8 +267,8 @@ func (s *TenantService) GetTenantList(userID string) ([]*TenantListItem, error)
return result, nil
}
// CreateMetadataInDocEngine creates the document metadata table for a tenant
func (s *TenantService) CreateMetadataInDocEngine(tenantID string) (common.ErrorCode, error) {
// CreateMetadataStore creates the metadata store for a tenant
func (s *TenantService) CreateMetadataStore(tenantID string) (common.ErrorCode, error) {
// Call document engine to create doc meta table
err := s.docEngine.CreateMetadataStore(context.Background(), tenantID)
if err != nil {
@ -276,8 +278,8 @@ func (s *TenantService) CreateMetadataInDocEngine(tenantID string) (common.Error
return common.CodeSuccess, nil
}
// DeleteMetadataInDocEngine deletes the document metadata table for a tenant
func (s *TenantService) DeleteMetadataInDocEngine(tenantID string) (common.ErrorCode, error) {
// DeleteMetadataStore deletes the metadata store for a tenant
func (s *TenantService) DeleteMetadataStore(tenantID string) (common.ErrorCode, error) {
// Call document engine to delete doc meta table
err := s.docEngine.DropMetadataStore(context.Background(), tenantID)
if err != nil {
@ -287,6 +289,77 @@ func (s *TenantService) DeleteMetadataInDocEngine(tenantID string) (common.Error
return common.CodeSuccess, nil
}
// CreateDatasetTableRequest represents the request for creating a dataset table
type CreateDatasetTableRequest struct {
KBID string `json:"kb_id" binding:"required"`
VectorSize int `json:"vector_size" binding:"required"`
ParserID string `json:"parser_id,omitempty"`
}
// CreateChunkStoreResponse represents the response for creating a chunk store
type CreateChunkStoreResponse struct {
KBID string `json:"kb_id"`
TableName string `json:"table_name"`
VectorSize int `json:"vector_size"`
}
// CreateChunkStore creates a chunk store in the document engine for a knowledge base
func (s *TenantService) CreateChunkStore(req *CreateDatasetTableRequest) (*CreateChunkStoreResponse, common.ErrorCode, error) {
if req == nil {
return nil, common.CodeDataError, fmt.Errorf("request is required")
}
// Get KB to find tenant_id for building table name
kb, err := s.kbDAO.GetByID(req.KBID)
if err != nil {
if dao.IsNotFoundErr(err) {
return nil, common.CodeDataError, fmt.Errorf("knowledge base not found: %s", req.KBID)
}
return nil, common.CodeServerError, fmt.Errorf("failed to query knowledge base %s: %w", req.KBID, err)
}
// vector_size is required
vecSize := req.VectorSize
if vecSize <= 0 {
return nil, common.CodeDataError, fmt.Errorf("vector_size must be positive")
}
// Build table name prefix: ragflow_<tenant_id>
tableName := fmt.Sprintf("ragflow_%s", kb.TenantID)
// Call document engine to create table
// Full table name will be built as "{tableName}_{kb_id}"
err = s.docEngine.CreateChunkStore(context.Background(), tableName, req.KBID, vecSize, req.ParserID)
if err != nil {
return nil, common.CodeServerError, fmt.Errorf("failed to create dataset: %w", err)
}
return &CreateChunkStoreResponse{
KBID: req.KBID,
TableName: tableName,
VectorSize: vecSize,
}, common.CodeSuccess, nil
}
// DeleteChunkStore deletes the chunk store in the document engine for a knowledge base
func (s *TenantService) DeleteChunkStore(kbID string) (common.ErrorCode, error) {
// Get KB to find tenant_id for building table name
kb, err := s.kbDAO.GetByID(kbID)
if err != nil {
if dao.IsNotFoundErr(err) {
return common.CodeDataError, fmt.Errorf("knowledge base not found: %s", kbID)
}
return common.CodeServerError, fmt.Errorf("failed to query knowledge base %s: %w", kbID, err)
}
// Call document engine to delete table
err = s.docEngine.DropChunkStore(context.Background(), fmt.Sprintf("ragflow_%s", kb.TenantID), kbID)
if err != nil {
return common.CodeServerError, fmt.Errorf("failed to delete table: %w", err)
}
return common.CodeSuccess, nil
}
type ModelItem struct {
ModelProvider *string `json:"model_provider"`
ModelInstance *string `json:"model_instance"`