diff --git a/admin/client/parser.py b/admin/client/parser.py index 471ff699a..91b09e138 100644 --- a/admin/client/parser.py +++ b/admin/client/parser.py @@ -77,6 +77,9 @@ sql_command: login_user | drop_user_dataset | list_user_datasets | list_user_dataset_files + | list_user_dataset_documents + | list_user_datasets_metadata + | list_user_documents_metadata_summary | list_user_agents | list_user_chats | create_user_chat @@ -161,10 +164,14 @@ DEFAULT: "DEFAULT"i CHATS: "CHATS"i CHAT: "CHAT"i FILES: "FILES"i +DOCUMENTS: "DOCUMENTS"i +METADATA: "METADATA"i +SUMMARY: "SUMMARY"i AS: "AS"i PARSE: "PARSE"i IMPORT: "IMPORT"i INTO: "INTO"i +IN: "IN"i WITH: "WITH"i PARSER: "PARSER"i PIPELINE: "PIPELINE"i @@ -299,6 +306,9 @@ create_user_dataset_with_parser: CREATE DATASET quoted_string WITH EMBEDDING quo create_user_dataset_with_pipeline: CREATE DATASET quoted_string WITH EMBEDDING quoted_string PIPELINE quoted_string ";" drop_user_dataset: DROP DATASET quoted_string ";" list_user_dataset_files: LIST FILES OF DATASET quoted_string ";" +list_user_dataset_documents: LIST DOCUMENTS OF DATASET quoted_string ";" +list_user_datasets_metadata: LIST METADATA OF DATASETS quoted_string ("," quoted_string)* ";" +list_user_documents_metadata_summary: LIST METADATA SUMMARY OF DATASET quoted_string (DOCUMENTS quoted_string ("," quoted_string)*)? ";" list_user_agents: LIST AGENTS ";" list_user_chats: LIST CHATS ";" create_user_chat: CREATE CHAT quoted_string ";" @@ -592,6 +602,28 @@ class RAGFlowCLITransformer(Transformer): dataset_name = items[4].children[0].strip("'\"") return {"type": "list_user_dataset_files", "dataset_name": dataset_name} + def list_user_dataset_documents(self, items): + dataset_name = items[4].children[0].strip("'\"") + return {"type": "list_user_dataset_documents", "dataset_name": dataset_name} + + def list_user_datasets_metadata(self, items): + dataset_names = [] + dataset_names.append(items[4].children[0].strip("'\"")) + for i in range(5, len(items)): + if items[i] and hasattr(items[i], 'children') and items[i].children: + dataset_names.append(items[i].children[0].strip("'\"")) + return {"type": "list_user_datasets_metadata", "dataset_names": dataset_names} + + def list_user_documents_metadata_summary(self, items): + dataset_name = items[5].children[0].strip("'\"") + doc_ids = [] + if len(items) > 6 and items[6] == "DOCUMENTS": + for i in range(7, len(items)): + if items[i] and hasattr(items[i], 'children') and items[i].children: + doc_id = items[i].children[0].strip("'\"") + doc_ids.append(doc_id) + return {"type": "list_user_documents_metadata_summary", "dataset_name": dataset_name, "document_ids": doc_ids} + def list_user_agents(self, items): return {"type": "list_user_agents"} diff --git a/admin/client/ragflow_client.py b/admin/client/ragflow_client.py index 86236bb52..a6fc85a44 100644 --- a/admin/client/ragflow_client.py +++ b/admin/client/ragflow_client.py @@ -63,7 +63,7 @@ class RAGFlowClient: print("Can't access server for login (connection failed)") return - email : str = command["email"] + email: str = command["email"] user_password = getpass.getpass(f"password for {email}: ").strip() try: token = login_user(self.http_client, self.server_type, email, user_password) @@ -597,7 +597,8 @@ class RAGFlowClient: if self.server_type != "admin": print("This command is only allowed in ADMIN mode") license = command["license"] - response = self.http_client.request("POST", "/admin/license", json_body={"license": license}, use_api_base=True, auth_kind="admin") + response = self.http_client.request("POST", "/admin/license", json_body={"license": license}, use_api_base=True, + auth_kind="admin") res_json = response.json() if response.status_code == 200: print("Set license successfully") @@ -609,7 +610,9 @@ class RAGFlowClient: print("This command is only allowed in ADMIN mode") value1 = command["value1"] value2 = command["value2"] - response = self.http_client.request("POST", "/admin/license/config", json_body={"value1": value1, "value2": value2}, use_api_base=True, auth_kind="admin") + response = self.http_client.request("POST", "/admin/license/config", + json_body={"value1": value1, "value2": value2}, use_api_base=True, + auth_kind="admin") res_json = response.json() if response.status_code == 200: print("Set license successfully") @@ -636,7 +639,6 @@ class RAGFlowClient: else: print(f"Fail to show license, code: {res_json['code']}, message: {res_json['message']}") - def list_server_configs(self, command): """List server configs by calling /system/configs API and flattening the JSON response.""" response = self.http_client.request("GET", "/system/configs", use_api_base=False, auth_kind="web") @@ -825,6 +827,130 @@ class RAGFlowClient: return self._print_table_simple(res_json) + def list_user_dataset_documents(self, command_dict): + if self.server_type != "user": + print("This command is only allowed in USER mode") + + dataset_name = command_dict["dataset_name"] + dataset_id = self._get_dataset_id(dataset_name) + if dataset_id is None: + return + + docs = self._list_documents(dataset_name, dataset_id) + if docs is None: + return + + if not docs: + print(f"No documents found in dataset {dataset_name}") + return + + print(f"Documents in dataset: {dataset_name}") + print("-" * 60) + # Select key fields for display + display_docs = [] + for doc in docs: + meta_fields = doc.get("meta_fields", {}) + # Convert meta_fields dict to string for display + meta_fields_str = "" + if meta_fields: + meta_fields_str = str(meta_fields) + display_doc = { + "name": doc.get("name", ""), + "id": doc.get("id", ""), + "size": doc.get("size", 0), + "status": doc.get("status", ""), + "created_at": doc.get("created_at", ""), + } + if meta_fields_str: + display_doc["meta_fields"] = meta_fields_str + display_docs.append(display_doc) + self._print_table_simple(display_docs) + + def list_user_datasets_metadata(self, command_dict): + if self.server_type != "user": + print("This command is only allowed in USER mode") + return + + dataset_names = command_dict["dataset_names"] + valid_datasets = [] + for dataset_name in dataset_names: + dataset_id = self._get_dataset_id(dataset_name) + if dataset_id is None: + print(f"Dataset not found: {dataset_name}") + continue + valid_datasets.append((dataset_name, dataset_id)) + + if not valid_datasets: + print("No valid datasets found") + return + + dataset_ids = [dataset_id for _, dataset_id in valid_datasets] + kb_ids_param = ",".join(dataset_ids) + response = self.http_client.request("GET", f"/kb/get_meta?kb_ids={kb_ids_param}", + use_api_base=False, auth_kind="web") + res_json = response.json() + if response.status_code != 200: + print(f"Fail to get metadata, code: {res_json.get('code')}, message: {res_json.get('message')}") + return + + meta = res_json.get("data", {}) + if not meta: + print("No metadata found") + return + + table_data = [] + for field_name, values_dict in meta.items(): + for value, docs in values_dict.items(): + table_data.append({ + "field": field_name, + "value": value, + "doc_ids": ", ".join(docs) + }) + self._print_table_simple(table_data) + + def list_user_documents_metadata_summary(self, command_dict): + if self.server_type != "user": + print("This command is only allowed in USER mode") + return + + dataset_name = command_dict["dataset_name"] + doc_ids = command_dict.get("document_ids", []) + + kb_id = self._get_dataset_id(dataset_name) + if kb_id is None: + return + + payload = {"kb_id": kb_id} + if doc_ids: + payload["doc_ids"] = doc_ids + response = self.http_client.request("POST", "/document/metadata/summary", json_body=payload, + use_api_base=False, auth_kind="web") + res_json = response.json() + if response.status_code == 200: + summary = res_json.get("data", {}).get("summary", {}) + if not summary: + if doc_ids: + print(f"No metadata summary found for documents: {', '.join(doc_ids)}") + else: + print(f"No metadata summary found in dataset {dataset_name}") + return + if doc_ids: + print(f"Metadata summary for document(s): {', '.join(doc_ids)}") + else: + print(f"Metadata summary for all documents in dataset: {dataset_name}") + print("-" * 60) + for field_name, field_info in summary.items(): + field_type = field_info.get("type", "unknown") + values = field_info.get("values", []) + print(f"\nField: {field_name} (type: {field_type})") + print(f" Total unique values: {len(values)}") + if values: + print(" Values:") + for value, count in values: + print(f" {value}: {count}") + else: + print(f"Fail to get metadata summary, code: {res_json.get('code')}, message: {res_json.get('message')}") + def list_user_agents(self, command): if self.server_type != "user": print("This command is only allowed in USER mode") @@ -1013,7 +1139,8 @@ class RAGFlowClient: if response.status_code == 200 and res_json["code"] == 0: print(f"Success to create chat session for chat: {chat_name}") else: - print(f"Fail to create chat session for chat {chat_name}, code: {res_json['code']}, message: {res_json['message']}") + print( + f"Fail to create chat session for chat {chat_name}, code: {res_json['code']}, message: {res_json['message']}") def drop_chat_session(self, command): if self.server_type != "user": @@ -1040,7 +1167,8 @@ class RAGFlowClient: if response.status_code == 200 and res_json["code"] == 0: print(f"Success to drop chat session '{session_id}' from chat: {chat_name}") else: - print(f"Fail to drop chat session '{session_id}' from chat {chat_name}, code: {res_json['code']}, message: {res_json['message']}") + print( + f"Fail to drop chat session '{session_id}' from chat {chat_name}, code: {res_json['code']}, message: {res_json['message']}") def list_chat_sessions(self, command): if self.server_type != "user": @@ -1094,7 +1222,8 @@ class RAGFlowClient: try: data_json = json.loads(data_str) if data_json.get("code") != 0: - print(f"\nFail to chat on session, code: {data_json.get('code')}, message: {data_json.get('message', '')}") + print( + f"\nFail to chat on session, code: {data_json.get('code')}, message: {data_json.get('message', '')}") return # Check if it's the final message if data_json.get("data") is True: @@ -1598,6 +1727,12 @@ def run_command(client: RAGFlowClient, command_dict: dict): client.drop_user_dataset(command_dict) case "list_user_dataset_files": return client.list_user_dataset_files(command_dict) + case "list_user_dataset_documents": + return client.list_user_dataset_documents(command_dict) + case "list_user_datasets_metadata": + return client.list_user_datasets_metadata(command_dict) + case "list_user_documents_metadata_summary": + return client.list_user_documents_metadata_summary(command_dict) case "list_user_agents": return client.list_user_agents(command_dict) case "list_user_chats": @@ -1677,6 +1812,13 @@ GENERATE KEY FOR USER LIST KEYS OF DROP KEY OF +User Commands (use -t user): +LIST DATASETS +LIST DOCUMENTS OF DATASET +SEARCH ON DATASETS +LIST METADATA OF DATASETS [, ]* +LIST METADATA SUMMARY OF DATASET DOCUMENTS [, ]* + Meta Commands: \\?, \\h, \\help Show this help \\q, \\quit, \\exit Quit the CLI diff --git a/cmd/server_main.go b/cmd/server_main.go index c22f19670..5d0fa5798 100644 --- a/cmd/server_main.go +++ b/cmd/server_main.go @@ -183,7 +183,7 @@ func startServer(config *server.Config) { documentHandler := handler.NewDocumentHandler(documentService) datasetsHandler := handler.NewDatasetsHandler(datasetsService) systemHandler := handler.NewSystemHandler(systemService) - kbHandler := handler.NewKnowledgebaseHandler(kbService, userService) + kbHandler := handler.NewKnowledgebaseHandler(kbService, userService, documentService) chunkHandler := handler.NewChunkHandler(chunkService, userService) llmHandler := handler.NewLLMHandler(llmService, userService) chatHandler := handler.NewChatHandler(chatService, userService) diff --git a/internal/dao/document.go b/internal/dao/document.go index c275427c2..8a7e61bec 100644 --- a/internal/dao/document.go +++ b/internal/dao/document.go @@ -36,7 +36,7 @@ func (dao *DocumentDAO) Create(document *model.Document) error { // GetByID get document by ID func (dao *DocumentDAO) GetByID(id string) (*model.Document, error) { var document model.Document - err := DB.Preload("Author").First(&document, "id = ?", id).Error + err := DB.First(&document, "id = ?", id).Error if err != nil { return nil, err } @@ -80,6 +80,19 @@ func (dao *DocumentDAO) List(offset, limit int) ([]*model.Document, int64, error return documents, total, err } +// ListByKBID list documents by knowledge base ID +func (dao *DocumentDAO) ListByKBID(kbID string, offset, limit int) ([]*model.Document, int64, error) { + var documents []*model.Document + var total int64 + + if err := DB.Model(&model.Document{}).Where("kb_id = ?", kbID).Count(&total).Error; err != nil { + return nil, 0, err + } + + err := DB.Where("kb_id = ?", kbID).Offset(offset).Limit(limit).Find(&documents).Error + return documents, total, err +} + // DeleteByTenantID deletes all documents by tenant ID (hard delete) func (dao *DocumentDAO) DeleteByTenantID(tenantID string) (int64, error) { result := DB.Unscoped().Where("tenant_id = ?", tenantID).Delete(&model.Document{}) diff --git a/internal/engine/infinity/search.go b/internal/engine/infinity/search.go index 53223cf37..2f026ebd1 100644 --- a/internal/engine/infinity/search.go +++ b/internal/engine/infinity/search.go @@ -381,25 +381,40 @@ func (e *infinityEngine) searchUnified(ctx context.Context, req *types.SearchReq return nil, fmt.Errorf("failed to get database: %w", err) } + // Determine if this is a metadata table + isMetadataTable := false + for _, idx := range req.IndexNames { + if strings.HasPrefix(idx, "ragflow_doc_meta_") { + isMetadataTable = true + break + } + } + // Build output columns - // Include all fields needed for retrieval test results - outputColumns := []string{ - "id", - "doc_id", - "kb_id", - "content", - "content_ltks", - "content_with_weight", - "title_tks", - "docnm_kwd", - "img_id", - "available_int", - "important_kwd", - "position_int", - "page_num_int", - "doc_type_kwd", - "mom_id", - "question_tks", + // For metadata tables, only use: id, kb_id, meta_fields + // For chunk tables, use all the standard fields + var outputColumns []string + if isMetadataTable { + outputColumns = []string{"id", "kb_id", "meta_fields"} + } else { + outputColumns = []string{ + "id", + "doc_id", + "kb_id", + "content", + "content_ltks", + "content_with_weight", + "title_tks", + "docnm_kwd", + "img_id", + "available_int", + "important_kwd", + "position_int", + "page_num_int", + "doc_type_kwd", + "mom_id", + "question_tks", + } } outputColumns = convertSelectFields(outputColumns) @@ -431,16 +446,31 @@ func (e *infinityEngine) searchUnified(ctx context.Context, req *types.SearchReq // Build filter string var filterParts []string - if len(req.DocIDs) > 0 { - if len(req.DocIDs) == 1 { - filterParts = append(filterParts, fmt.Sprintf("doc_id = '%s'", req.DocIDs[0])) + + // For metadata tables, add kb_id filter if provided + if isMetadataTable && len(req.KbIDs) > 0 && req.KbIDs[0] != "" { + kbIDs := req.KbIDs + if len(kbIDs) == 1 { + filterParts = append(filterParts, fmt.Sprintf("kb_id = '%s'", kbIDs[0])) } else { - docIDs := strings.Join(req.DocIDs, "', '") - filterParts = append(filterParts, fmt.Sprintf("doc_id IN ('%s')", docIDs)) + kbIDStr := strings.Join(kbIDs, "', '") + filterParts = append(filterParts, fmt.Sprintf("kb_id IN ('%s')", kbIDStr)) } } - // Default filter for available chunks - filterParts = append(filterParts, "available_int=1") + + if len(req.DocIDs) > 0 { + if len(req.DocIDs) == 1 { + filterParts = append(filterParts, fmt.Sprintf("id = '%s'", req.DocIDs[0])) + } else { + docIDs := strings.Join(req.DocIDs, "', '") + filterParts = append(filterParts, fmt.Sprintf("id IN ('%s')", docIDs)) + } + } + + if !isMetadataTable { + // Default filter for available chunks + filterParts = append(filterParts, "available_int=1") + } filterStr := strings.Join(filterParts, " AND ") @@ -693,7 +723,7 @@ func toFloat64(val interface{}) (float64, bool) { // executeTableSearch executes search on a single table func (e *infinityEngine) executeTableSearch(db *infinity.Database, tableName string, outputColumns []string, question string, vector []float64, filterStr string, topK, pageSize, offset int, orderBy *OrderByExpr, rankFeature map[string]float64, similarityThreshold float64, minMatch float64) (*types.SearchResponse, error) { // Debug logging - fmt.Printf("[DEBUG] executeTableSearch: question=%s, topK=%d, pageSize=%d, similarityThreshold=%f, rankFeature=%v\n", question, topK, pageSize, similarityThreshold, rankFeature) + fmt.Printf("[DEBUG] executeTableSearch: question=%s, topK=%d, pageSize=%d, similarityThreshold=%f, filterStr=%s\n", question, topK, pageSize, similarityThreshold, filterStr) // Get table table, err := db.GetTable(tableName) @@ -806,6 +836,12 @@ func (e *infinityEngine) executeTableSearch(db *infinity.Database, tableName str table = table.Sort(sortFields) } + // Add filter when there's no text/vector match (like metadata queries) + if !hasTextMatch && !hasVectorMatch && filterStr != "" { + fmt.Printf("[DEBUG] Adding filter for no-match query: %s\n", filterStr) + table = table.Filter(filterStr) + } + // Set limit and offset // Use topK to get more results from Infinity, then filter/sort in Go table = table.Limit(topK) diff --git a/internal/handler/document.go b/internal/handler/document.go index dedd4146a..3cd6f8d11 100644 --- a/internal/handler/document.go +++ b/internal/handler/document.go @@ -203,7 +203,7 @@ func (h *DocumentHandler) DeleteDocument(c *gin.Context) { // @Param page query int false "page number" default(1) // @Param page_size query int false "items per page" default(10) // @Success 200 {object} map[string]interface{} -// @Router /api/v1/documents [get] +// @Router /api/v1/document/list [post] func (h *DocumentHandler) ListDocuments(c *gin.Context) { _, errorCode, errorMessage := GetUser(c) if errorCode != common.CodeSuccess { @@ -211,6 +211,16 @@ func (h *DocumentHandler) ListDocuments(c *gin.Context) { return } + kbID := c.Query("kb_id") + if kbID == "" { + c.JSON(http.StatusOK, gin.H{ + "code": 1, + "message": "Lack of KB ID", + "data": false, + }) + return + } + page, _ := strconv.Atoi(c.DefaultQuery("page", "1")) pageSize, _ := strconv.Atoi(c.DefaultQuery("page_size", "10")) @@ -221,20 +231,41 @@ func (h *DocumentHandler) ListDocuments(c *gin.Context) { pageSize = 10 } - documents, total, err := h.documentService.ListDocuments(page, pageSize) + // Use kbID to filter documents + documents, total, err := h.documentService.ListDocumentsByKBID(kbID, page, pageSize) if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{ - "error": "failed to get documents", + c.JSON(http.StatusOK, gin.H{ + "code": 1, + "message": "failed to get documents", + "data": map[string]interface{}{"total": 0, "docs": []interface{}{}}, }) return } + docs := make([]map[string]interface{}, 0, len(documents)) + for _, doc := range documents { + metaFields, err := h.documentService.GetDocumentMetadataByID(doc.ID) + if err != nil { + metaFields = make(map[string]interface{}) + } + + docs = append(docs, map[string]interface{}{ + "id": doc.ID, + "name": doc.Name, + "size": doc.Size, + "type": doc.Type, + "status": doc.Status, + "created_at": doc.CreatedAt, + "meta_fields": metaFields, + }) + } + c.JSON(http.StatusOK, gin.H{ + "code": 0, + "message": "success", "data": gin.H{ - "items": documents, - "total": total, - "page": page, - "page_size": pageSize, + "total": total, + "docs": docs, }, }) } @@ -293,3 +324,51 @@ func (h *DocumentHandler) GetDocumentsByAuthorID(c *gin.Context) { }, }) } + +// MetadataSummary handles the metadata summary request +func (h *DocumentHandler) MetadataSummary(c *gin.Context) { + _, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + var requestBody struct { + KBID string `json:"kb_id" binding:"required"` + DocIDs []string `json:"doc_ids"` + } + + if err := c.ShouldBindJSON(&requestBody); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 1, + "message": "kb_id is required", + }) + return + } + + kbID := requestBody.KBID + if kbID == "" { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 1, + "message": "kb_id is required", + }) + return + } + + summary, err := h.documentService.GetMetadataSummary(kbID, requestBody.DocIDs) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{ + "code": 1, + "message": "Failed to get metadata summary: " + err.Error(), + }) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": 0, + "message": "success", + "data": gin.H{ + "summary": summary, + }, + }) +} diff --git a/internal/handler/kb.go b/internal/handler/kb.go index ef608a263..ab37158e3 100644 --- a/internal/handler/kb.go +++ b/internal/handler/kb.go @@ -28,15 +28,17 @@ import ( // KnowledgebaseHandler handles knowledge base HTTP requests type KnowledgebaseHandler struct { - kbService *service.KnowledgebaseService - userService *service.UserService + kbService *service.KnowledgebaseService + userService *service.UserService + documentService *service.DocumentService } // NewKnowledgebaseHandler creates a new knowledge base handler -func NewKnowledgebaseHandler(kbService *service.KnowledgebaseService, userService *service.UserService) *KnowledgebaseHandler { +func NewKnowledgebaseHandler(kbService *service.KnowledgebaseService, userService *service.UserService, documentService *service.DocumentService) *KnowledgebaseHandler { return &KnowledgebaseHandler{ - kbService: kbService, - userService: userService, + kbService: kbService, + userService: userService, + documentService: documentService, } } @@ -581,7 +583,13 @@ func (h *KnowledgebaseHandler) GetMeta(c *gin.Context) { } } - jsonResponse(c, common.CodeSuccess, map[string]interface{}{}, "success") + meta, err := h.documentService.GetMetadataByKBs(kbIDs) + if err != nil { + jsonError(c, common.CodeExceptionError, err.Error()) + return + } + + jsonResponse(c, common.CodeSuccess, meta, "success") } // GetBasicInfo handles the get basic info request diff --git a/internal/router/router.go b/internal/router/router.go index b6319d080..1810cb639 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -177,6 +177,13 @@ func (r *Router) Setup(engine *gin.Engine) { } } + // Document routes + doc := authorized.Group("/v1/document") + { + doc.POST("/list", r.documentHandler.ListDocuments) + doc.POST("/metadata/summary", r.documentHandler.MetadataSummary) + } + // Chunk routes chunk := authorized.Group("/v1/chunk") { diff --git a/internal/service/document.go b/internal/service/document.go index ad94ce647..a7961a386 100644 --- a/internal/service/document.go +++ b/internal/service/document.go @@ -17,22 +17,36 @@ package service import ( + "encoding/json" "fmt" + "regexp" + "sort" "time" "ragflow/internal/dao" + "ragflow/internal/engine" "ragflow/internal/model" + "ragflow/internal/server" ) // DocumentService document service type DocumentService struct { documentDAO *dao.DocumentDAO + kbDAO *dao.KnowledgebaseDAO + docEngine engine.DocEngine + engineType server.EngineType + metadataSvc *MetadataService } // NewDocumentService create document service func NewDocumentService() *DocumentService { + cfg := server.GetConfig() return &DocumentService{ documentDAO: dao.NewDocumentDAO(), + kbDAO: dao.NewKnowledgebaseDAO(), + docEngine: engine.Get(), + engineType: cfg.DocEngine.Type, + metadataSvc: NewMetadataService(), } } @@ -160,6 +174,22 @@ func (s *DocumentService) ListDocuments(page, pageSize int) ([]*DocumentResponse return responses, total, nil } +// ListDocumentsByKBID list documents by knowledge base ID +func (s *DocumentService) ListDocumentsByKBID(kbID string, page, pageSize int) ([]*DocumentResponse, int64, error) { + offset := (page - 1) * pageSize + documents, total, err := s.documentDAO.ListByKBID(kbID, offset, pageSize) + if err != nil { + return nil, 0, err + } + + responses := make([]*DocumentResponse, len(documents)) + for i, doc := range documents { + responses[i] = s.toResponse(doc) + } + + return responses, total, nil +} + // GetDocumentsByAuthorID get documents by author ID func (s *DocumentService) GetDocumentsByAuthorID(authorID, page, pageSize int) ([]*DocumentResponse, int64, error) { offset := (page - 1) * pageSize @@ -180,7 +210,15 @@ func (s *DocumentService) GetDocumentsByAuthorID(authorID, page, pageSize int) ( func (s *DocumentService) toResponse(doc *model.Document) *DocumentResponse { createdAt := "" if doc.CreateTime != nil { - createdAt = time.Unix(*doc.CreateTime, 0).Format("2006-01-02 15:04:05") + // Check if timestamp is in milliseconds (13 digits) or seconds (10 digits) + var ts int64 + if *doc.CreateTime > 1000000000000 { + // Milliseconds - convert to seconds + ts = *doc.CreateTime / 1000 + } else { + ts = *doc.CreateTime + } + createdAt = time.Unix(ts, 0).Format("2006-01-02 15:04:05") } updatedAt := "" if doc.UpdateTime != nil { @@ -209,3 +247,413 @@ func (s *DocumentService) toResponse(doc *model.Document) *DocumentResponse { UpdatedAt: updatedAt, } } + +// GetMetadataSummaryRequest request for metadata summary +type GetMetadataSummaryRequest struct { + KBID string `json:"kb_id" binding:"required"` + DocIDs []string `json:"doc_ids"` +} + +// GetMetadataSummaryResponse response for metadata summary +type GetMetadataSummaryResponse struct { + Summary map[string]interface{} `json:"summary"` +} + +// GetMetadataSummary get metadata summary for documents +func (s *DocumentService) GetMetadataSummary(kbID string, docIDs []string) (map[string]interface{}, error) { + tenantID, err := s.metadataSvc.GetTenantIDByKBID(kbID) + if err != nil { + return nil, err + } + + searchResult, err := s.metadataSvc.SearchMetadata(kbID, tenantID, docIDs, 1000) + if err != nil { + return nil, err + } + + // Aggregate metadata from results + return aggregateMetadata(searchResult.Chunks), nil +} + +// GetDocumentMetadataByID get metadata for a specific document +func (s *DocumentService) GetDocumentMetadataByID(docID string) (map[string]interface{}, error) { + // Get document to find kb_id + doc, err := s.documentDAO.GetByID(docID) + if err != nil { + return nil, fmt.Errorf("document not found: %w", err) + } + + tenantID, err := s.metadataSvc.GetTenantIDByKBID(doc.KbID) + if err != nil { + return nil, err + } + + searchResult, err := s.metadataSvc.SearchMetadata(doc.KbID, tenantID, []string{docID}, 1) + if err != nil { + return nil, err + } + + // Return metadata if found + if len(searchResult.Chunks) > 0 { + chunk := searchResult.Chunks[0] + return ExtractMetaFields(chunk) + } + + return make(map[string]interface{}), nil +} + +// GetMetadataByKBs get metadata for knowledge bases +func (s *DocumentService) GetMetadataByKBs(kbIDs []string) (map[string]interface{}, error) { + if len(kbIDs) == 0 { + return make(map[string]interface{}), nil + } + + searchResult, err := s.metadataSvc.SearchMetadataByKBs(kbIDs, 10000) + if err != nil { + return nil, err + } + + flattenedMeta := make(map[string]map[string][]string) + numChunks := len(searchResult.Chunks) + + var allMetaFields []map[string]interface{} + if numChunks > 1 && len(searchResult.Chunks) > 0 { + firstChunk := searchResult.Chunks[0] + if metaFieldsVal := firstChunk["meta_fields"]; metaFieldsVal != nil { + if v, ok := metaFieldsVal.([]byte); ok { + allMetaFields = ParseAllLengthPrefixedJSON(v) + } + } + } + + for idx, chunk := range searchResult.Chunks { + docID, ok := ExtractDocumentID(chunk) + if !ok { + continue + } + + var metaFields map[string]interface{} + var metaFieldsVal interface{} + + if len(allMetaFields) > 0 && idx < len(allMetaFields) { + // Use pre-parsed meta_fields from concatenated data + metaFields = allMetaFields[idx] + } else { + // Normal case - get from chunk + metaFieldsVal = chunk["meta_fields"] + if metaFieldsVal != nil { + switch v := metaFieldsVal.(type) { + case string: + if err := json.Unmarshal([]byte(v), &metaFields); err != nil { + continue + } + case []byte: + // Try direct JSON parse first + if err := json.Unmarshal(v, &metaFields); err != nil { + // Try to parse as concatenated JSON objects + metaFields = ParseLengthPrefixedJSON(v) + } + case map[string]interface{}: + metaFields = v + default: + continue + } + } + } + + if metaFields == nil { + continue + } + + // Process each metadata field + for fieldName, fieldValue := range metaFields { + if fieldName == "kb_id" || fieldName == "id" { + continue + } + + if _, ok := flattenedMeta[fieldName]; !ok { + flattenedMeta[fieldName] = make(map[string][]string) + } + + // Handle list and single values + var values []interface{} + switch v := fieldValue.(type) { + case []interface{}: + values = v + default: + values = []interface{}{v} + } + + for _, val := range values { + if val == nil { + continue + } + strVal := fmt.Sprintf("%v", val) + flattenedMeta[fieldName][strVal] = append(flattenedMeta[fieldName][strVal], docID) + } + } + } + + // Convert to map[string]interface{} for return + var metaResult map[string]interface{} = make(map[string]interface{}) + for k, v := range flattenedMeta { + metaResult[k] = v + } + + return metaResult, nil +} + +// valueInfo holds count and order of first appearance +type valueInfo struct { + count int + firstOrder int +} + +// aggregateMetadata aggregates metadata from search results +func aggregateMetadata(chunks []map[string]interface{}) map[string]interface{} { + // summary: map[fieldName]map[value]valueInfo + summary := make(map[string]map[string]valueInfo) + typeCounter := make(map[string]map[string]int) + orderCounter := 0 + + for _, chunk := range chunks { + // For metadata table, the actual metadata is in the "meta_fields" JSON field + // Extract it first + metaFieldsVal := chunk["meta_fields"] + if metaFieldsVal == nil { + continue + } + + // Parse meta_fields - could be a string (JSON) or a map + var metaFields map[string]interface{} + switch v := metaFieldsVal.(type) { + case string: + // Parse JSON string + if err := json.Unmarshal([]byte(v), &metaFields); err != nil { + continue + } + case []byte: + // Handle byte slice - Infinity returns concatenated JSON objects with length prefixes + rawBytes := v + + // Try to detect and handle length-prefixed format + // Format: [4-byte length][JSON][4-byte length][JSON]... + parsedMetaFields := make(map[string]interface{}) + offset := 0 + for offset < len(rawBytes) { + // Need at least 4 bytes for length prefix + if offset+4 > len(rawBytes) { + break + } + + // Read 4-byte length (little-endian, not big-endian!) + length := uint32(rawBytes[offset]) | uint32(rawBytes[offset+1])<<8 | + uint32(rawBytes[offset+2])<<16 | uint32(rawBytes[offset+3])<<24 + + // Check if length looks valid (not too large) + if length > 10000 || length == 0 { + // Try to find next '{' from current position + nextBrace := -1 + for i := offset; i < len(rawBytes) && i < offset+100; i++ { + if rawBytes[i] == '{' { + nextBrace = i + break + } + } + if nextBrace > offset { + // Skip to the next '{' + offset = nextBrace + continue + } + break + } + + // Extract JSON data + jsonStart := offset + 4 + jsonEnd := jsonStart + int(length) + if jsonEnd > len(rawBytes) { + jsonEnd = len(rawBytes) + } + + jsonBytes := rawBytes[jsonStart:jsonEnd] + + // Try to parse this JSON + var singleMeta map[string]interface{} + if err := json.Unmarshal(jsonBytes, &singleMeta); err == nil { + // Merge metadata from this document + for k, vv := range singleMeta { + if existing, ok := parsedMetaFields[k]; ok { + // Combine values + if existList, ok := existing.([]interface{}); ok { + if newList, ok := vv.([]interface{}); ok { + parsedMetaFields[k] = append(existList, newList...) + } else { + parsedMetaFields[k] = append(existList, vv) + } + } else { + parsedMetaFields[k] = []interface{}{existing, vv} + } + } else { + parsedMetaFields[k] = vv + } + } + } + + offset = jsonEnd + } + + // If we successfully parsed multiple JSON objects, use the merged result + if len(parsedMetaFields) > 0 { + metaFields = parsedMetaFields + } else { + // Fallback: try the original parsing method + startIdx := -1 + for i, b := range rawBytes { + if b == '{' { + startIdx = i + break + } + } + if startIdx > 0 { + strVal := string(rawBytes[startIdx:]) + if err := json.Unmarshal([]byte(strVal), &metaFields); err != nil { + metaFields = map[string]interface{}{"raw": strVal} + } + } else if err := json.Unmarshal(rawBytes, &metaFields); err != nil { + metaFields = map[string]interface{}{"raw": string(rawBytes)} + } + } + case map[string]interface{}: + metaFields = v + default: + continue + } + + // Now iterate over the extracted metadata fields + for k, v := range metaFields { + // Skip nil values + if v == nil { + continue + } + + // Determine value type + valueType := getMetaValueType(v) + + // Track type counts + if valueType != "" { + if _, ok := typeCounter[k]; !ok { + typeCounter[k] = make(map[string]int) + } + typeCounter[k][valueType] = typeCounter[k][valueType] + 1 + } + + // Aggregate value counts + values := v + if v, ok := v.([]interface{}); ok { + values = v + } else { + values = []interface{}{v} + } + + for _, vv := range values.([]interface{}) { + if vv == nil { + continue + } + sv := fmt.Sprintf("%v", vv) + + if _, ok := summary[k]; !ok { + summary[k] = make(map[string]valueInfo) + } + + if existing, ok := summary[k][sv]; ok { + // Already exists, just increment count + existing.count++ + summary[k][sv] = existing + } else { + // First time seeing this value - record order + summary[k][sv] = valueInfo{count: 1, firstOrder: orderCounter} + orderCounter++ + } + } + } + } + + // Build result with type information and sorted values + result := make(map[string]interface{}) + for k, v := range summary { + // Sort by count descending, then by firstOrder ascending (to match Python stable sort) + // values: [value, count, firstOrder] + values := make([][3]interface{}, 0, len(v)) + for val, info := range v { + values = append(values, [3]interface{}{val, info.count, info.firstOrder}) + } + // Use stable sort - sort by count descending, then by firstOrder + sort.SliceStable(values, func(i, j int) bool { + cntI := values[i][1].(int) + cntJ := values[j][1].(int) + if cntI != cntJ { + return cntI > cntJ // count descending + } + // If counts equal, use firstOrder ascending (earlier appearance first) + return values[i][2].(int) < values[j][2].(int) + }) + + // Determine dominant type + valueType := "string" + if typeCounts, ok := typeCounter[k]; ok { + maxCount := 0 + for t, c := range typeCounts { + if c > maxCount { + maxCount = c + valueType = t + } + } + } + + // Convert from [value, count, firstOrder] to [value, count] for output + outputValues := make([][2]interface{}, len(values)) + for i, val := range values { + outputValues[i] = [2]interface{}{val[0], val[1]} + } + + result[k] = map[string]interface{}{ + "type": valueType, + "values": outputValues, + } + } + + return result +} + +// getMetaValueType determines the type of a metadata value +func getMetaValueType(value interface{}) string { + if value == nil { + return "" + } + + switch v := value.(type) { + case []interface{}: + if len(v) > 0 { + return "list" + } + return "" + case bool: + return "string" + case int, int8, int16, int32, int64: + return "number" + case float32, float64: + return "number" + case string: + if isTimeString(v) { + return "time" + } + return "string" + } + return "string" +} + +// isTimeString checks if a string is an ISO 8601 datetime +func isTimeString(s string) bool { + matched, _ := regexp.MatchString(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$`, s) + return matched +} diff --git a/internal/service/metadata.go b/internal/service/metadata.go new file mode 100644 index 000000000..7f21775a1 --- /dev/null +++ b/internal/service/metadata.go @@ -0,0 +1,269 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package service + +import ( + "context" + "encoding/json" + "fmt" + + "ragflow/internal/dao" + "ragflow/internal/engine" + "ragflow/internal/engine/types" +) + +// MetadataService provides common metadata operations +type MetadataService struct { + kbDAO *dao.KnowledgebaseDAO + docEngine engine.DocEngine +} + +// NewMetadataService creates a new metadata service +func NewMetadataService() *MetadataService { + return &MetadataService{ + kbDAO: dao.NewKnowledgebaseDAO(), + docEngine: engine.Get(), + } +} + +// BuildMetadataIndexName constructs the metadata index name for a tenant +func BuildMetadataIndexName(tenantID string) string { + return fmt.Sprintf("ragflow_doc_meta_%s", tenantID) +} + +// GetTenantIDByKBID retrieves tenant ID from knowledge base ID +func (s *MetadataService) GetTenantIDByKBID(kbID string) (string, error) { + kb, err := s.kbDAO.GetByID(kbID) + if err != nil { + return "", fmt.Errorf("knowledgebase not found: %w", err) + } + return kb.TenantID, nil +} + +// GetTenantIDByKBIDs retrieves tenant ID from the first knowledge base ID in the list +func (s *MetadataService) GetTenantIDByKBIDs(kbIDs []string) (string, error) { + if len(kbIDs) == 0 { + return "", fmt.Errorf("no kb_ids provided") + } + kb, err := s.kbDAO.GetByID(kbIDs[0]) + if err != nil { + return "", fmt.Errorf("knowledgebase not found: %w", err) + } + return kb.TenantID, nil +} + +// SearchMetadataResult holds the result of a metadata search +type SearchMetadataResult struct { + IndexName string + Chunks []map[string]interface{} +} + +// SearchMetadata searches the metadata index with the given parameters +func (s *MetadataService) SearchMetadata(kbID, tenantID string, docIDs []string, size int) (*SearchMetadataResult, error) { + indexName := BuildMetadataIndexName(tenantID) + + searchReq := &types.SearchRequest{ + IndexNames: []string{indexName}, + KbIDs: []string{kbID}, + DocIDs: docIDs, + Page: 1, + Size: size, + KeywordOnly: true, + } + + result, err := s.docEngine.Search(context.Background(), searchReq) + if err != nil { + return nil, fmt.Errorf("search failed: %w", err) + } + + searchResp, ok := result.(*types.SearchResponse) + if !ok { + return nil, fmt.Errorf("invalid search response type") + } + + return &SearchMetadataResult{ + IndexName: indexName, + Chunks: searchResp.Chunks, + }, nil +} + +// SearchMetadataByKBs searches the metadata index for multiple knowledge bases +func (s *MetadataService) SearchMetadataByKBs(kbIDs []string, size int) (*SearchMetadataResult, error) { + if len(kbIDs) == 0 { + return &SearchMetadataResult{Chunks: []map[string]interface{}{}}, nil + } + + tenantID, err := s.GetTenantIDByKBIDs(kbIDs) + if err != nil { + return nil, err + } + + indexName := BuildMetadataIndexName(tenantID) + + searchReq := &types.SearchRequest{ + IndexNames: []string{indexName}, + KbIDs: kbIDs, + Page: 1, + Size: size, + KeywordOnly: true, + } + + result, err := s.docEngine.Search(context.Background(), searchReq) + if err != nil { + return nil, fmt.Errorf("search failed: %w", err) + } + + searchResp, ok := result.(*types.SearchResponse) + if !ok { + return nil, fmt.Errorf("invalid search response type") + } + + return &SearchMetadataResult{ + IndexName: indexName, + Chunks: searchResp.Chunks, + }, nil +} + +// ExtractDocumentID extracts the document ID from a chunk +func ExtractDocumentID(chunk map[string]interface{}) (string, bool) { + docID, ok := chunk["id"].(string) + return docID, ok +} + +// ExtractMetaFields extracts meta_fields from a chunk, handling different types +func ExtractMetaFields(chunk map[string]interface{}) (map[string]interface{}, error) { + metaFieldsVal := chunk["meta_fields"] + if metaFieldsVal == nil { + return make(map[string]interface{}), nil + } + + var metaFields map[string]interface{} + switch v := metaFieldsVal.(type) { + case map[string]interface{}: + metaFields = v + case string: + if err := json.Unmarshal([]byte(v), &metaFields); err != nil { + return make(map[string]interface{}), nil + } + case []byte: + metaFields = ParseLengthPrefixedJSON(v) + if metaFields == nil { + if err := json.Unmarshal(v, &metaFields); err != nil { + return make(map[string]interface{}), nil + } + } + default: + return make(map[string]interface{}), nil + } + + return metaFields, nil +} + +// ParseLengthPrefixedJSON parses Infinity's length-prefixed JSON format +// Format: [4-byte length (little-endian)][JSON][4-byte length][JSON]... +// Returns the FIRST valid JSON object found +func ParseLengthPrefixedJSON(data []byte) map[string]interface{} { + if len(data) < 4 { + return nil + } + + // Try to find the first valid JSON object by skipping length prefixes + offset := 0 + for offset < len(data) { + // Skip non-'{' bytes + for offset < len(data) && data[offset] != '{' { + offset++ + } + if offset >= len(data) { + break + } + + // Try to parse JSON from current position + var result map[string]interface{} + err := json.Unmarshal(data[offset:], &result) + if err == nil { + return result + } + + // Move forward to try next position + offset++ + } + return nil +} + +// ParseAllLengthPrefixedJSON parses Infinity's length-prefixed JSON format +// and returns ALL JSON objects found (for cases where multiple rows are concatenated) +// Format: [4-byte length (little-endian)][JSON][4-byte length][JSON]... +func ParseAllLengthPrefixedJSON(data []byte) []map[string]interface{} { + if len(data) < 4 { + return nil + } + + var results []map[string]interface{} + offset := 0 + + // Use length prefix to extract each JSON + for offset+4 <= len(data) { + // Read 4-byte length (little-endian) + length := uint32(data[offset]) | uint32(data[offset+1])<<8 | + uint32(data[offset+2])<<16 | uint32(data[offset+3])<<24 + + // Check if length looks reasonable + if length == 0 || offset+4+int(length) > len(data) { + // Length invalid, try to find next '{' + nextBrace := -1 + for i := offset + 4; i < len(data) && i < offset+104; i++ { + if data[i] == '{' { + nextBrace = i + break + } + } + if nextBrace > offset { + offset = nextBrace + continue + } + break + } + + // Extract JSON bytes (skip the 4-byte length prefix) + jsonStart := offset + 4 + jsonEnd := jsonStart + int(length) + jsonBytes := data[jsonStart:jsonEnd] + + var result map[string]interface{} + if err := json.Unmarshal(jsonBytes, &result); err == nil { + results = append(results, result) + offset = jsonEnd + continue + } else { + // Try to find next '{' + nextBrace := -1 + for i := offset + 4; i < len(data) && i < offset+104; i++ { + if data[i] == '{' { + nextBrace = i + break + } + } + if nextBrace > offset { + offset = nextBrace + continue + } + break + } + } + return results +}