From e1b632a7bb1018c7c88e10e0197a8d949074f85d Mon Sep 17 00:00:00 2001 From: Yongteng Lei Date: Thu, 12 Mar 2026 09:47:42 +0800 Subject: [PATCH] Feat: add delete all support for delete operations (#13530) ### What problem does this PR solve? Add delete all support for delete operations. ### Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Documentation Update --------- Co-authored-by: writinwaters --- api/apps/chunk_app.py | 24 +- api/apps/sdk/chat.py | 7 +- api/apps/sdk/dataset.py | 15 +- api/apps/sdk/doc.py | 19 +- api/apps/sdk/session.py | 14 +- api/db/services/file2document_service.py | 4 +- api/utils/validation_utils.py | 3 +- common/doc_store/infinity_conn_base.py | 205 ++--- docs/references/http_api_reference.md | 84 ++- docs/references/python_api_reference.md | 54 +- memory/utils/infinity_conn.py | 578 +++++++------- rag/utils/infinity_conn.py | 826 +++++++++++---------- sdk/python/ragflow_sdk/modules/agent.py | 9 +- sdk/python/ragflow_sdk/modules/chat.py | 4 +- sdk/python/ragflow_sdk/modules/dataset.py | 4 +- sdk/python/ragflow_sdk/modules/document.py | 4 +- sdk/python/ragflow_sdk/ragflow.py | 8 +- test/testcases/test_http_api/common.py | 90 +-- test/testcases/test_sdk_api/common.py | 65 +- 19 files changed, 1042 insertions(+), 975 deletions(-) diff --git a/api/apps/chunk_app.py b/api/apps/chunk_app.py index 4d806eb32..229017696 100644 --- a/api/apps/chunk_app.py +++ b/api/apps/chunk_app.py @@ -235,19 +235,37 @@ async def switch(): @manager.route('/rm', methods=['POST']) # noqa: F821 @login_required -@validate_request("chunk_ids", "doc_id") +@validate_request("doc_id") async def rm(): req = await get_request_json() try: def _rm_sync(): - deleted_chunk_ids = req["chunk_ids"] + deleted_chunk_ids = req.get("chunk_ids") if isinstance(deleted_chunk_ids, list): unique_chunk_ids = list(dict.fromkeys(deleted_chunk_ids)) has_ids = len(unique_chunk_ids) > 0 - else: + elif deleted_chunk_ids is not None: unique_chunk_ids = [deleted_chunk_ids] has_ids = deleted_chunk_ids not in (None, "") + else: + unique_chunk_ids = [] + has_ids = False if not has_ids: + if req.get("delete_all") is True: + e, doc = DocumentService.get_by_id(req["doc_id"]) + if not e: + return get_data_error_result(message="Document not found!") + tenant_id = DocumentService.get_tenant_id(req["doc_id"]) + # Clean up storage assets while index rows still exist for discovery + DocumentService.delete_chunk_images(doc, tenant_id) + condition = {"doc_id": req["doc_id"]} + try: + deleted_count = settings.docStoreConn.delete(condition, search.index_name(tenant_id), doc.kb_id) + except Exception: + return get_data_error_result(message="Chunk deleting failure") + if deleted_count > 0: + DocumentService.decrement_chunk_num(doc.id, doc.kb_id, 1, deleted_count, 0) + return get_json_result(data=True) return get_json_result(data=True) e, doc = DocumentService.get_by_id(req["doc_id"]) diff --git a/api/apps/sdk/chat.py b/api/apps/sdk/chat.py index e1142de25..aad3fb980 100644 --- a/api/apps/sdk/chat.py +++ b/api/apps/sdk/chat.py @@ -239,7 +239,12 @@ async def delete_chats(tenant_id): ids = req.get("ids") if not ids: - return get_result() + if req.get("delete_all") is True: + ids = [d.id for d in DialogService.query(tenant_id=tenant_id, status=StatusEnum.VALID.value)] + if not ids: + return get_result() + else: + return get_result() id_list = ids diff --git a/api/apps/sdk/dataset.py b/api/apps/sdk/dataset.py index 58f0442b6..bda151671 100644 --- a/api/apps/sdk/dataset.py +++ b/api/apps/sdk/dataset.py @@ -198,7 +198,13 @@ async def delete(tenant_id): type: string description: | List of dataset IDs to delete. - If `null` or an empty array is provided, no datasets will be deleted. + If `null` or an empty array is provided, no datasets will be deleted + unless `delete_all` is set to `true`. + delete_all: + type: boolean + description: | + If `true` and `ids` is null or empty, delete all datasets owned by the current user. + Defaults to `false`. responses: 200: description: Successful operation. @@ -212,7 +218,12 @@ async def delete(tenant_id): try: kb_id_instance_pairs = [] if req["ids"] is None or len(req["ids"]) == 0: - return get_result() + if req.get("delete_all"): + req["ids"] = [kb.id for kb in KnowledgebaseService.query(tenant_id=tenant_id)] + if not req["ids"]: + return get_result() + else: + return get_result() error_kb_ids = [] for kb_id in req["ids"]: diff --git a/api/apps/sdk/doc.py b/api/apps/sdk/doc.py index 364a959cd..2537ec4c0 100644 --- a/api/apps/sdk/doc.py +++ b/api/apps/sdk/doc.py @@ -750,7 +750,12 @@ async def delete(tenant_id, dataset_id): doc_ids = req.get("ids") if not doc_ids: - return get_result() + if req.get("delete_all") is True: + doc_ids = [doc.id for doc in DocumentService.query(kb_id=dataset_id)] + if not doc_ids: + return get_result() + else: + return get_result() doc_list = doc_ids @@ -1343,7 +1348,17 @@ async def rm_chunk(tenant_id, dataset_id, document_id): chunk_ids = req.get("chunk_ids") if not chunk_ids: - return get_result() + if req.get("delete_all") is True: + doc = docs[0] + # Clean up storage assets while index rows still exist for discovery + DocumentService.delete_chunk_images(doc, tenant_id) + condition = {"doc_id": document_id} + chunk_number = settings.docStoreConn.delete(condition, search.index_name(tenant_id), dataset_id) + if chunk_number != 0: + DocumentService.decrement_chunk_num(document_id, dataset_id, 1, chunk_number, 0) + return get_result(message=f"deleted {chunk_number} chunks") + else: + return get_result() condition = {"doc_id": document_id} unique_chunk_ids, duplicate_messages = check_duplicate_ids(chunk_ids, "chunk") diff --git a/api/apps/sdk/session.py b/api/apps/sdk/session.py index e628a3c96..722965812 100644 --- a/api/apps/sdk/session.py +++ b/api/apps/sdk/session.py @@ -751,7 +751,12 @@ async def delete(tenant_id, chat_id): ids = req.get("ids") if not ids: - return get_result() + if req.get("delete_all") is True: + ids = [conv.id for conv in ConversationService.query(dialog_id=chat_id)] + if not ids: + return get_result() + else: + return get_result() conv_list = ids @@ -799,7 +804,12 @@ async def delete_agent_session(tenant_id, agent_id): ids = req.get("ids") if not ids: - return get_result() + if req.get("delete_all") is True: + ids = [conv.id for conv in API4ConversationService.query(dialog_id=agent_id)] + if not ids: + return get_result() + else: + return get_result() conv_list = ids diff --git a/api/db/services/file2document_service.py b/api/db/services/file2document_service.py index 079ea783f..0ee151079 100644 --- a/api/db/services/file2document_service.py +++ b/api/db/services/file2document_service.py @@ -30,13 +30,13 @@ class File2DocumentService(CommonService): @DB.connection_context() def get_by_file_id(cls, file_id): objs = cls.model.select().where(cls.model.file_id == file_id) - return objs + return list(objs) @classmethod @DB.connection_context() def get_by_document_id(cls, document_id): objs = cls.model.select().where(cls.model.document_id == document_id) - return objs + return list(objs) @classmethod @DB.connection_context() diff --git a/api/utils/validation_utils.py b/api/utils/validation_utils.py index 5864e6b4d..54d5f67dc 100644 --- a/api/utils/validation_utils.py +++ b/api/utils/validation_utils.py @@ -649,7 +649,8 @@ class UpdateDatasetReq(CreateDatasetReq): class DeleteReq(Base): - ids: Annotated[list[str] | None, Field(...)] + ids: Annotated[list[str] | None, Field(default=None)] + delete_all: Annotated[bool, Field(default=False)] @field_validator("ids", mode="after") @classmethod diff --git a/common/doc_store/infinity_conn_base.py b/common/doc_store/infinity_conn_base.py index 327f518f5..6a95a634e 100644 --- a/common/doc_store/infinity_conn_base.py +++ b/common/doc_store/infinity_conn_base.py @@ -241,14 +241,16 @@ class InfinityConnectionBase(DocStoreConnection): Return the health status of the database. """ inf_conn = self.connPool.get_conn() - res = inf_conn.show_current_node() - self.connPool.release_conn(inf_conn) - res2 = { - "type": "infinity", - "status": "green" if res.error_code == 0 and res.server_status in ["started", "alive"] else "red", - "error": res.error_msg, - } - return res2 + try: + res = inf_conn.show_current_node() + res2 = { + "type": "infinity", + "status": "green" if res.error_code == 0 and res.server_status in ["started", "alive"] else "red", + "error": res.error_msg, + } + return res2 + finally: + self.connPool.release_conn(inf_conn) """ Table operations @@ -259,83 +261,85 @@ class InfinityConnectionBase(DocStoreConnection): self.logger.debug(f"CREATE_IDX: Creating table {table_name}, parser_id: {parser_id}") inf_conn = self.connPool.get_conn() - inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore) + try: + inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore) - # Use configured schema - fp_mapping = os.path.join(get_project_base_directory(), "conf", self.mapping_file_name) - if not os.path.exists(fp_mapping): - raise Exception(f"Mapping file not found at {fp_mapping}") - schema = json.load(open(fp_mapping)) + # Use configured schema + fp_mapping = os.path.join(get_project_base_directory(), "conf", self.mapping_file_name) + if not os.path.exists(fp_mapping): + raise Exception(f"Mapping file not found at {fp_mapping}") + schema = json.load(open(fp_mapping)) - if parser_id is not None: - from common.constants import ParserType + if parser_id is not None: + from common.constants import ParserType - if parser_id == ParserType.TABLE.value: - # Table parser: add chunk_data JSON column to store table-specific fields - schema["chunk_data"] = {"type": "json", "default": "{}"} - self.logger.info("Added chunk_data column for TABLE parser") + if parser_id == ParserType.TABLE.value: + # Table parser: add chunk_data JSON column to store table-specific fields + schema["chunk_data"] = {"type": "json", "default": "{}"} + self.logger.info("Added chunk_data column for TABLE parser") - vector_name = f"q_{vector_size}_vec" - schema[vector_name] = {"type": f"vector,{vector_size},float"} - inf_table = inf_db.create_table( - table_name, - schema, - ConflictType.Ignore, - ) - inf_table.create_index( - "q_vec_idx", - IndexInfo( - vector_name, - IndexType.Hnsw, - { - "M": "16", - "ef_construction": "50", - "metric": "cosine", - "encode": "lvq", - }, - ), - ConflictType.Ignore, - ) - for field_name, field_info in schema.items(): - if field_info["type"] != "varchar" or "analyzer" not in field_info: - continue - analyzers = field_info["analyzer"] - if isinstance(analyzers, str): - analyzers = [analyzers] - for analyzer in analyzers: - inf_table.create_index( - f"ft_{re.sub(r'[^a-zA-Z0-9]', '_', field_name)}_{re.sub(r'[^a-zA-Z0-9]', '_', analyzer)}", - IndexInfo(field_name, IndexType.FullText, {"ANALYZER": analyzer}), - ConflictType.Ignore, - ) - - # Create secondary indexes for fields with index_type - for field_name, field_info in schema.items(): - if "index_type" not in field_info: - continue - index_config = field_info["index_type"] - if isinstance(index_config, str) and index_config == "secondary": - inf_table.create_index( - f"sec_{field_name}", - IndexInfo(field_name, IndexType.Secondary), - ConflictType.Ignore, - ) - self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name}") - elif isinstance(index_config, dict): - if index_config.get("type") == "secondary": - params = {} - if "cardinality" in index_config: - params = {"cardinality": index_config["cardinality"]} + vector_name = f"q_{vector_size}_vec" + schema[vector_name] = {"type": f"vector,{vector_size},float"} + inf_table = inf_db.create_table( + table_name, + schema, + ConflictType.Ignore, + ) + inf_table.create_index( + "q_vec_idx", + IndexInfo( + vector_name, + IndexType.Hnsw, + { + "M": "16", + "ef_construction": "50", + "metric": "cosine", + "encode": "lvq", + }, + ), + ConflictType.Ignore, + ) + for field_name, field_info in schema.items(): + if field_info["type"] != "varchar" or "analyzer" not in field_info: + continue + analyzers = field_info["analyzer"] + if isinstance(analyzers, str): + analyzers = [analyzers] + for analyzer in analyzers: inf_table.create_index( - f"sec_{field_name}", - IndexInfo(field_name, IndexType.Secondary, params), + f"ft_{re.sub(r'[^a-zA-Z0-9]', '_', field_name)}_{re.sub(r'[^a-zA-Z0-9]', '_', analyzer)}", + IndexInfo(field_name, IndexType.FullText, {"ANALYZER": analyzer}), ConflictType.Ignore, ) - self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name} with params {params}") - self.connPool.release_conn(inf_conn) - self.logger.info(f"INFINITY created table {table_name}, vector size {vector_size}") - return True + # Create secondary indexes for fields with index_type + for field_name, field_info in schema.items(): + if "index_type" not in field_info: + continue + index_config = field_info["index_type"] + if isinstance(index_config, str) and index_config == "secondary": + inf_table.create_index( + f"sec_{field_name}", + IndexInfo(field_name, IndexType.Secondary), + ConflictType.Ignore, + ) + self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name}") + elif isinstance(index_config, dict): + if index_config.get("type") == "secondary": + params = {} + if "cardinality" in index_config: + params = {"cardinality": index_config["cardinality"]} + inf_table.create_index( + f"sec_{field_name}", + IndexInfo(field_name, IndexType.Secondary, params), + ConflictType.Ignore, + ) + self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name} with params {params}") + + self.logger.info(f"INFINITY created table {table_name}, vector size {vector_size}") + return True + finally: + self.connPool.release_conn(inf_conn) def create_doc_meta_idx(self, index_name: str): """ @@ -398,25 +402,28 @@ class InfinityConnectionBase(DocStoreConnection): else: table_name = f"{index_name}_{dataset_id}" inf_conn = self.connPool.get_conn() - db_instance = inf_conn.get_database(self.dbName) - db_instance.drop_table(table_name, ConflictType.Ignore) - self.connPool.release_conn(inf_conn) - self.logger.info(f"INFINITY dropped table {table_name}") + try: + db_instance = inf_conn.get_database(self.dbName) + db_instance.drop_table(table_name, ConflictType.Ignore) + self.logger.info(f"INFINITY dropped table {table_name}") + finally: + self.connPool.release_conn(inf_conn) def index_exist(self, index_name: str, dataset_id: str) -> bool: if index_name.startswith("ragflow_doc_meta_"): table_name = index_name else: table_name = f"{index_name}_{dataset_id}" + inf_conn = self.connPool.get_conn() try: - inf_conn = self.connPool.get_conn() db_instance = inf_conn.get_database(self.dbName) _ = db_instance.get_table(table_name) - self.connPool.release_conn(inf_conn) return True except Exception as e: self.logger.warning(f"INFINITY indexExist {str(e)}") - return False + return False + finally: + self.connPool.release_conn(inf_conn) """ CRUD operations @@ -453,21 +460,23 @@ class InfinityConnectionBase(DocStoreConnection): def delete(self, condition: dict, index_name: str, dataset_id: str) -> int: inf_conn = self.connPool.get_conn() - db_instance = inf_conn.get_database(self.dbName) - if index_name.startswith("ragflow_doc_meta_"): - table_name = index_name - else: - table_name = f"{index_name}_{dataset_id}" try: - table_instance = db_instance.get_table(table_name) - except Exception: - self.logger.warning(f"Skipped deleting from table {table_name} since the table doesn't exist.") - return 0 - filter = self.equivalent_condition_to_str(condition, table_instance) - self.logger.debug(f"INFINITY delete table {table_name}, filter {filter}.") - res = table_instance.delete(filter) - self.connPool.release_conn(inf_conn) - return res.deleted_rows + db_instance = inf_conn.get_database(self.dbName) + if index_name.startswith("ragflow_doc_meta_"): + table_name = index_name + else: + table_name = f"{index_name}_{dataset_id}" + try: + table_instance = db_instance.get_table(table_name) + except Exception: + self.logger.warning(f"Skipped deleting from table {table_name} since the table doesn't exist.") + return 0 + filter = self.equivalent_condition_to_str(condition, table_instance) + self.logger.debug(f"INFINITY delete table {table_name}, filter {filter}.") + res = table_instance.delete(filter) + return res.deleted_rows + finally: + self.connPool.release_conn(inf_conn) """ Helper functions for search result diff --git a/docs/references/http_api_reference.md b/docs/references/http_api_reference.md index 8e7199b11..0ba84c930 100644 --- a/docs/references/http_api_reference.md +++ b/docs/references/http_api_reference.md @@ -657,8 +657,9 @@ Deletes datasets by ID. - Headers: - `'content-Type: application/json'` - `'Authorization: Bearer '` - - Body: - - `"ids"`: `list[string]` or `null` +- Body: + - `"ids"`: `list[string]` or `null` + - `"delete_all"`: `boolean` ##### Request example @@ -672,12 +673,24 @@ curl --request DELETE \ }' ``` +```bash +curl --request DELETE \ + --url http://{address}/api/v1/datasets \ + --header 'Content-Type: application/json' \ + --header 'Authorization: Bearer ' \ + --data '{ + "delete_all": true + }' +``` + ##### Request parameters -- `"ids"`: (*Body parameter*), `list[string]` or `null`, *Required* +- `"ids"`: (*Body parameter*), `list[string]` or `null` Specifies the datasets to delete: - If omitted, or set to `null` or an empty array, no datasets are deleted. - If an array of IDs is provided, only the datasets matching those IDs are deleted. +- `"delete_all"`: (*Body parameter*), `boolean` + Whether to delete all datasets owned by the current user when`"ids"` is omitted, or set to `null` or an empty array. Defaults to `false`. #### Response @@ -1801,6 +1814,7 @@ Deletes documents by ID. - `'Authorization: Bearer '` - Body: - `"ids"`: `list[string]` + - `"delete_all"`: `boolean` ##### Request example @@ -1815,6 +1829,16 @@ curl --request DELETE \ }' ``` +```bash +curl --request DELETE \ + --url http://{address}/api/v1/datasets/{dataset_id}/documents \ + --header 'Content-Type: application/json' \ + --header 'Authorization: Bearer ' \ + --data '{ + "delete_all": true + }' +``` + ##### Request parameters - `dataset_id`: (*Path parameter*) @@ -1823,6 +1847,8 @@ curl --request DELETE \ The IDs of the documents to delete. - If omitted, or set to `null` or an empty array, no documents are deleted. - If an array of IDs is provided, only the documents matching those IDs are deleted. +- `"delete_all"`: (*Body parameter*), `boolean` + Whether to delete all documents in the specified dataset when `"ids"` is omitted, or set to `null` or an empty array. Defaults to `false`. #### Response @@ -2161,6 +2187,7 @@ Deletes chunks by ID. - `'Authorization: Bearer '` - Body: - `"chunk_ids"`: `list[string]` + - `"delete_all"`: `boolean` ##### Request example @@ -2175,6 +2202,16 @@ curl --request DELETE \ }' ``` +```bash +curl --request DELETE \ + --url http://{address}/api/v1/datasets/{dataset_id}/documents/{document_id}/chunks \ + --header 'Content-Type: application/json' \ + --header 'Authorization: Bearer ' \ + --data '{ + "delete_all": true + }' +``` + ##### Request parameters - `dataset_id`: (*Path parameter*) @@ -2185,6 +2222,8 @@ curl --request DELETE \ The IDs of the chunks to delete. - If omitted, or set to `null` or an empty array, no chunks are deleted. - If an array of IDs is provided, only the chunks matching those IDs are deleted. +- `"delete_all"`: (*Body parameter*), `boolean` + Whether to delete all chunks of the specified documen when `"chunk_ids"` is omitted, or set to`null` or an empty array. Defaults to `false`. #### Response @@ -2938,6 +2977,7 @@ Deletes chat assistants by ID. - `'Authorization: Bearer '` - Body: - `"ids"`: `list[string]` + - `"delete_all"`: `boolean` ##### Request example @@ -2952,12 +2992,24 @@ curl --request DELETE \ }' ``` +```bash +curl --request DELETE \ + --url http://{address}/api/v1/chats \ + --header 'Content-Type: application/json' \ + --header 'Authorization: Bearer ' \ + --data '{ + "delete_all": true + }' +``` + ##### Request parameters - `"ids"`: (*Body parameter*), `list[string]` The IDs of the chat assistants to delete. - If omitted, or set to `null` or an empty array, no chat assistants are deleted. - If an array of IDs is provided, only the chat assistants matching those IDs are deleted. +- `"delete_all"`: (*Body parameter*), `boolean` + Whether to delete all chat assistants owned by the current user when `"ids"` is omitted, or set to`null` or an empty array. Defaults to `false`. #### Response @@ -3316,6 +3368,7 @@ Deletes sessions of a chat assistant by ID. - `'Authorization: Bearer '` - Body: - `"ids"`: `list[string]` + - `"delete_all"`: `boolean` ##### Request example @@ -3330,6 +3383,16 @@ curl --request DELETE \ }' ``` +```bash +curl --request DELETE \ + --url http://{address}/api/v1/chats/{chat_id}/sessions \ + --header 'Content-Type: application/json' \ + --header 'Authorization: Bearer ' \ + --data '{ + "delete_all": true + }' +``` + ##### Request Parameters - `chat_id`: (*Path parameter*) @@ -3338,6 +3401,8 @@ curl --request DELETE \ The IDs of the sessions to delete. - If omitted, or set to `null` or an empty array, no sessions are deleted. - If an array of IDs is provided, only the sessions matching those IDs are deleted. +- `"delete_all"`: (*Body Parameter*), `boolean` + Whether to delete all sessions of the specified chat assistant when `"ids"` is omitted, or set to `null` or an empty array. Defaults to `false`. #### Response @@ -4682,6 +4747,7 @@ Deletes sessions of an agent by ID. - `'Authorization: Bearer '` - Body: - `"ids"`: `list[string]` + - `"delete_all"`: `boolean` ##### Request example @@ -4696,6 +4762,16 @@ curl --request DELETE \ }' ``` +```bash +curl --request DELETE \ + --url http://{address}/api/v1/agents/{agent_id}/sessions \ + --header 'Content-Type: application/json' \ + --header 'Authorization: Bearer ' \ + --data '{ + "delete_all": true + }' +``` + ##### Request Parameters - `agent_id`: (*Path parameter*) @@ -4704,6 +4780,8 @@ curl --request DELETE \ The IDs of the sessions to delete. - If omitted, or set to `null` or an empty array, no sessions are deleted. - If an array of IDs is provided, only the sessions matching those IDs are deleted. +- `"delete_all"`: (*Body Parameter*), `boolean` + Whether to delete all sessions of the specified agent when `"ids"` is omitted, or set to `null` or an empty array. Defaults to `false`. #### Response diff --git a/docs/references/python_api_reference.md b/docs/references/python_api_reference.md index 8fa97f3d5..f3a87d397 100644 --- a/docs/references/python_api_reference.md +++ b/docs/references/python_api_reference.md @@ -230,20 +230,24 @@ dataset = rag_object.create_dataset(name="kb_1") ### Delete datasets ```python -RAGFlow.delete_datasets(ids: list[str] | None = None) +RAGFlow.delete_datasets(ids: list[str] | None = None, delete_all: bool = False) ``` Deletes datasets by ID. #### Parameters -##### ids: `list[str]` or `None`, *Required* +##### ids: `list[str]` or `None` The IDs of the datasets to delete. Defaults to `None`. - If omitted, or set to `null` or an empty array, no datasets are deleted. - If an array of IDs is provided, only the datasets matching those IDs are deleted. +##### delete_all: `bool` + +Whether to delete all datasets owned by the current user when `ids` is omitted, or set to `None` or an empty list. Defaults to `False`. + #### Returns - Success: No value is returned. @@ -253,6 +257,7 @@ The IDs of the datasets to delete. Defaults to `None`. ```python rag_object.delete_datasets(ids=["d94a8dc02c9711f0930f7fbc369eab6d","e94a8dc02c9711f0930f7fbc369eab6e"]) +rag_object.delete_datasets(delete_all=True) ``` --- @@ -672,7 +677,7 @@ for doc in dataset.list_documents(keywords="rag", page=0, page_size=12): ### Delete documents ```python -DataSet.delete_documents(ids: list[str] = None) +DataSet.delete_documents(ids: list[str] | None = None, delete_all: bool = False) ``` Deletes documents by ID. @@ -686,6 +691,10 @@ The IDs of the documents to delete. Defaults to `None`. - If omitted, or set to `null` or an empty array, no documents are deleted. - If an array of IDs is provided, only the documents matching those IDs are deleted. +##### delete_all: `bool` + +Whether to delete all documents in the current dataset when `ids` is omitted, or set to `None` or an empty list. Defaults to `False`. + #### Returns - Success: No value is returned. @@ -700,6 +709,7 @@ rag_object = RAGFlow(api_key="", base_url="http://: dataset = rag_object.list_datasets(name="kb_1") dataset = dataset[0] dataset.delete_documents(ids=["id_1","id_2"]) +dataset.delete_documents(delete_all=True) ``` --- @@ -943,20 +953,24 @@ for chunk in docs[0].list_chunks(keywords="rag", page=0, page_size=12): ### Delete chunks ```python -Document.delete_chunks(chunk_ids: list[str]) +Document.delete_chunks(ids: list[str] | None = None, delete_all: bool = False) ``` Deletes chunks by ID. #### Parameters -##### chunk_ids: `list[str]` +##### ids: `list[str]` or `None` The IDs of the chunks to delete. Defaults to `None`. - If omitted, or set to `null` or an empty array, no chunks are deleted. - If an array of IDs is provided, only the chunks matching those IDs are deleted. +##### delete_all: `bool` + +Whether to delete all chunks in the current document when `ids` is omitted, or set to `None` or an empty list. Defaults to `False`. + #### Returns - Success: No value is returned. @@ -974,6 +988,7 @@ doc = dataset.list_documents(id="wdfxb5t547d") doc = doc[0] chunk = doc.add_chunk(content="xxxxxxx") doc.delete_chunks(["id_1","id_2"]) +doc.delete_chunks(delete_all=True) ``` --- @@ -1249,20 +1264,24 @@ assistant.update({"name": "Stefan", "llm": {"temperature": 0.8}, "prompt": {"top ### Delete chat assistants ```python -RAGFlow.delete_chats(ids: list[str] = None) +RAGFlow.delete_chats(ids: list[str] | None = None, delete_all: bool = False) ``` Deletes chat assistants by ID. #### Parameters -##### ids: `list[str]` +##### ids: `list[str]` or `None` The IDs of the chat assistants to delete. Defaults to `None`. - If omitted, or set to `null` or an empty array, no chat assistants are deleted. - If an array of IDs is provided, only the chat assistants matching those IDs are deleted. +##### delete_all: `bool` + +Whether to delete all chat assistants owned by the current user when `ids` is omitted, or set to `None` or an empty list. Defaults to `False`. + #### Returns - Success: No value is returned. @@ -1275,6 +1294,7 @@ from ragflow_sdk import RAGFlow rag_object = RAGFlow(api_key="", base_url="http://:9380") rag_object.delete_chats(ids=["id_1","id_2"]) +rag_object.delete_chats(delete_all=True) ``` --- @@ -1481,20 +1501,24 @@ for session in assistant.list_sessions(): ### Delete chat assistant's sessions ```python -Chat.delete_sessions(ids:list[str] = None) +Chat.delete_sessions(ids: list[str] | None = None, delete_all: bool = False) ``` Deletes sessions of the current chat assistant by ID. #### Parameters -##### ids: `list[str]` +##### ids: `list[str]` or `None` The IDs of the sessions to delete. Defaults to `None`. - If omitted, or set to `null` or an empty array, no sessions are deleted. - If an array of IDs is provided, only the sessions matching those IDs are deleted. +##### delete_all: `bool` + +Whether to delete all sessions of the current chat assistant when `ids` is omitted, or set to `None` or an empty list. Defaults to `False`. + #### Returns - Success: No value is returned. @@ -1509,6 +1533,7 @@ rag_object = RAGFlow(api_key="", base_url="http://: assistant = rag_object.list_chats(name="Miss R") assistant = assistant[0] assistant.delete_sessions(ids=["id_1","id_2"]) +assistant.delete_sessions(delete_all=True) ``` --- @@ -1802,20 +1827,24 @@ for session in sessions: ### Delete agent's sessions ```python -Agent.delete_sessions(ids: list[str] = None) +Agent.delete_sessions(ids: list[str] | None = None, delete_all: bool = False) ``` Deletes sessions of an agent by ID. #### Parameters -##### ids: `list[str]` +##### ids: `list[str]` or `None` The IDs of the sessions to delete. Defaults to `None`. -- If omitted, or set to `null` or an empty array, no sessions are deleted. +- If omitted, or set to `None` or an empty array, no sessions are deleted. - If an array of IDs is provided, only the sessions matching those IDs are deleted. +##### delete_all: `bool` + +Whether to delete all sessions of the current agent when `ids` is omitted, or set to `None` or an empty list. Defaults to `False`. + #### Returns - Success: No value is returned. @@ -1830,6 +1859,7 @@ rag_object = RAGFlow(api_key="", base_url="http://: AGENT_id = "AGENT_ID" agent = rag_object.list_agents(id = AGENT_id)[0] agent.delete_sessions(ids=["id_1","id_2"]) +agent.delete_sessions(delete_all=True) ``` --- diff --git a/memory/utils/infinity_conn.py b/memory/utils/infinity_conn.py index 826fbadfb..93402fa1a 100644 --- a/memory/utils/infinity_conn.py +++ b/memory/utils/infinity_conn.py @@ -122,151 +122,153 @@ class InfinityConnection(InfinityConnectionBase): index_names = index_names.split(",") assert isinstance(index_names, list) and len(index_names) > 0 inf_conn = self.connPool.get_conn() - db_instance = inf_conn.get_database(self.dbName) - df_list = list() - table_list = list() - if hide_forgotten: - condition.update({"must_not": {"exists": "forget_at_flt"}}) - output = select_fields.copy() - if agg_fields is None: - agg_fields = [] - for essential_field in ["id"] + agg_fields: - if essential_field not in output: - output.append(essential_field) - score_func = "" - score_column = "" - for matchExpr in match_expressions: - if isinstance(matchExpr, MatchTextExpr): - score_func = "score()" - score_column = "SCORE" - break - if not score_func: + try: + db_instance = inf_conn.get_database(self.dbName) + df_list = list() + table_list = list() + if hide_forgotten: + condition.update({"must_not": {"exists": "forget_at_flt"}}) + output = select_fields.copy() + if agg_fields is None: + agg_fields = [] + for essential_field in ["id"] + agg_fields: + if essential_field not in output: + output.append(essential_field) + score_func = "" + score_column = "" for matchExpr in match_expressions: - if isinstance(matchExpr, MatchDenseExpr): - score_func = "similarity()" - score_column = "SIMILARITY" + if isinstance(matchExpr, MatchTextExpr): + score_func = "score()" + score_column = "SCORE" break - if match_expressions: - if score_func not in output: - output.append(score_func) - output = [f for f in output if f != "_score"] - if limit <= 0: - # ElasticSearch default limit is 10000 - limit = 10000 - - # Prepare expressions common to all tables - filter_cond = None - filter_fulltext = "" - if condition: - condition_dict = {self.convert_condition_and_order_field(k): v for k, v in condition.items()} - table_found = False - for indexName in index_names: - for mem_id in memory_ids: - table_name = f"{indexName}_{mem_id}" - try: - filter_cond = self.equivalent_condition_to_str(condition_dict, db_instance.get_table(table_name)) - table_found = True + if not score_func: + for matchExpr in match_expressions: + if isinstance(matchExpr, MatchDenseExpr): + score_func = "similarity()" + score_column = "SIMILARITY" break + if match_expressions: + if score_func not in output: + output.append(score_func) + output = [f for f in output if f != "_score"] + if limit <= 0: + # ElasticSearch default limit is 10000 + limit = 10000 + + # Prepare expressions common to all tables + filter_cond = None + filter_fulltext = "" + if condition: + condition_dict = {self.convert_condition_and_order_field(k): v for k, v in condition.items()} + table_found = False + for indexName in index_names: + for mem_id in memory_ids: + table_name = f"{indexName}_{mem_id}" + try: + filter_cond = self.equivalent_condition_to_str(condition_dict, db_instance.get_table(table_name)) + table_found = True + break + except Exception: + pass + if table_found: + break + if not table_found: + self.logger.error(f"No valid tables found for indexNames {index_names} and memoryIds {memory_ids}") + return pd.DataFrame(), 0 + + for matchExpr in match_expressions: + if isinstance(matchExpr, MatchTextExpr): + if filter_cond and "filter" not in matchExpr.extra_options: + matchExpr.extra_options.update({"filter": filter_cond}) + matchExpr.fields = [self.convert_matching_field(field) for field in matchExpr.fields] + fields = ",".join(matchExpr.fields) + filter_fulltext = f"filter_fulltext('{fields}', '{matchExpr.matching_text}')" + if filter_cond: + filter_fulltext = f"({filter_cond}) AND {filter_fulltext}" + minimum_should_match = matchExpr.extra_options.get("minimum_should_match", 0.0) + if isinstance(minimum_should_match, float): + str_minimum_should_match = str(int(minimum_should_match * 100)) + "%" + matchExpr.extra_options["minimum_should_match"] = str_minimum_should_match + + for k, v in matchExpr.extra_options.items(): + if not isinstance(v, str): + matchExpr.extra_options[k] = str(v) + self.logger.debug(f"INFINITY search MatchTextExpr: {json.dumps(matchExpr.__dict__)}") + elif isinstance(matchExpr, MatchDenseExpr): + if filter_fulltext and "filter" not in matchExpr.extra_options: + matchExpr.extra_options.update({"filter": filter_fulltext}) + for k, v in matchExpr.extra_options.items(): + if not isinstance(v, str): + matchExpr.extra_options[k] = str(v) + similarity = matchExpr.extra_options.get("similarity") + if similarity: + matchExpr.extra_options["threshold"] = similarity + del matchExpr.extra_options["similarity"] + self.logger.debug(f"INFINITY search MatchDenseExpr: {json.dumps(matchExpr.__dict__)}") + elif isinstance(matchExpr, FusionExpr): + if matchExpr.method == "weighted_sum": + # The default is "minmax" which gives a zero score for the last doc. + matchExpr.fusion_params["normalize"] = "atan" + self.logger.debug(f"INFINITY search FusionExpr: {json.dumps(matchExpr.__dict__)}") + + order_by_expr_list = list() + if order_by.fields: + for order_field in order_by.fields: + order_field_name = self.convert_condition_and_order_field(order_field[0]) + if order_field[1] == 0: + order_by_expr_list.append((order_field_name, SortType.Asc)) + else: + order_by_expr_list.append((order_field_name, SortType.Desc)) + + total_hits_count = 0 + # Scatter search tables and gather the results + column_name_list = [] + for indexName in index_names: + for memory_id in memory_ids: + table_name = f"{indexName}_{memory_id}" + try: + table_instance = db_instance.get_table(table_name) except Exception: - pass - if table_found: - break - if not table_found: - self.logger.error(f"No valid tables found for indexNames {index_names} and memoryIds {memory_ids}") - return pd.DataFrame(), 0 - - for matchExpr in match_expressions: - if isinstance(matchExpr, MatchTextExpr): - if filter_cond and "filter" not in matchExpr.extra_options: - matchExpr.extra_options.update({"filter": filter_cond}) - matchExpr.fields = [self.convert_matching_field(field) for field in matchExpr.fields] - fields = ",".join(matchExpr.fields) - filter_fulltext = f"filter_fulltext('{fields}', '{matchExpr.matching_text}')" - if filter_cond: - filter_fulltext = f"({filter_cond}) AND {filter_fulltext}" - minimum_should_match = matchExpr.extra_options.get("minimum_should_match", 0.0) - if isinstance(minimum_should_match, float): - str_minimum_should_match = str(int(minimum_should_match * 100)) + "%" - matchExpr.extra_options["minimum_should_match"] = str_minimum_should_match - - for k, v in matchExpr.extra_options.items(): - if not isinstance(v, str): - matchExpr.extra_options[k] = str(v) - self.logger.debug(f"INFINITY search MatchTextExpr: {json.dumps(matchExpr.__dict__)}") - elif isinstance(matchExpr, MatchDenseExpr): - if filter_fulltext and "filter" not in matchExpr.extra_options: - matchExpr.extra_options.update({"filter": filter_fulltext}) - for k, v in matchExpr.extra_options.items(): - if not isinstance(v, str): - matchExpr.extra_options[k] = str(v) - similarity = matchExpr.extra_options.get("similarity") - if similarity: - matchExpr.extra_options["threshold"] = similarity - del matchExpr.extra_options["similarity"] - self.logger.debug(f"INFINITY search MatchDenseExpr: {json.dumps(matchExpr.__dict__)}") - elif isinstance(matchExpr, FusionExpr): - if matchExpr.method == "weighted_sum": - # The default is "minmax" which gives a zero score for the last doc. - matchExpr.fusion_params["normalize"] = "atan" - self.logger.debug(f"INFINITY search FusionExpr: {json.dumps(matchExpr.__dict__)}") - - order_by_expr_list = list() - if order_by.fields: - for order_field in order_by.fields: - order_field_name = self.convert_condition_and_order_field(order_field[0]) - if order_field[1] == 0: - order_by_expr_list.append((order_field_name, SortType.Asc)) - else: - order_by_expr_list.append((order_field_name, SortType.Desc)) - - total_hits_count = 0 - # Scatter search tables and gather the results - column_name_list = [] - for indexName in index_names: - for memory_id in memory_ids: - table_name = f"{indexName}_{memory_id}" - try: - table_instance = db_instance.get_table(table_name) - except Exception: - continue - table_list.append(table_name) - if not column_name_list: - column_name_list = [r[0] for r in table_instance.show_columns().rows()] - output = self.convert_select_fields(output, column_name_list) - builder = table_instance.output(output) - if len(match_expressions) > 0: - for matchExpr in match_expressions: - if isinstance(matchExpr, MatchTextExpr): - fields = ",".join(matchExpr.fields) - builder = builder.match_text( - fields, - matchExpr.matching_text, - matchExpr.topn, - matchExpr.extra_options.copy(), - ) - elif isinstance(matchExpr, MatchDenseExpr): - builder = builder.match_dense( - matchExpr.vector_column_name, - matchExpr.embedding_data, - matchExpr.embedding_data_type, - matchExpr.distance_type, - matchExpr.topn, - matchExpr.extra_options.copy(), - ) - elif isinstance(matchExpr, FusionExpr): - builder = builder.fusion(matchExpr.method, matchExpr.topn, matchExpr.fusion_params) - else: - if filter_cond and len(filter_cond) > 0: - builder.filter(filter_cond) - if order_by.fields: - builder.sort(order_by_expr_list) - builder.offset(offset).limit(limit) - mem_res, extra_result = builder.option({"total_hits_count": True}).to_df() - if extra_result: - total_hits_count += int(extra_result["total_hits_count"]) - self.logger.debug(f"INFINITY search table: {str(table_name)}, result: {str(mem_res)}") - df_list.append(mem_res) - self.connPool.release_conn(inf_conn) + continue + table_list.append(table_name) + if not column_name_list: + column_name_list = [r[0] for r in table_instance.show_columns().rows()] + output = self.convert_select_fields(output, column_name_list) + builder = table_instance.output(output) + if len(match_expressions) > 0: + for matchExpr in match_expressions: + if isinstance(matchExpr, MatchTextExpr): + fields = ",".join(matchExpr.fields) + builder = builder.match_text( + fields, + matchExpr.matching_text, + matchExpr.topn, + matchExpr.extra_options.copy(), + ) + elif isinstance(matchExpr, MatchDenseExpr): + builder = builder.match_dense( + matchExpr.vector_column_name, + matchExpr.embedding_data, + matchExpr.embedding_data_type, + matchExpr.distance_type, + matchExpr.topn, + matchExpr.extra_options.copy(), + ) + elif isinstance(matchExpr, FusionExpr): + builder = builder.fusion(matchExpr.method, matchExpr.topn, matchExpr.fusion_params) + else: + if filter_cond and len(filter_cond) > 0: + builder.filter(filter_cond) + if order_by.fields: + builder.sort(order_by_expr_list) + builder.offset(offset).limit(limit) + mem_res, extra_result = builder.option({"total_hits_count": True}).to_df() + if extra_result: + total_hits_count += int(extra_result["total_hits_count"]) + self.logger.debug(f"INFINITY search table: {str(table_name)}, result: {str(mem_res)}") + df_list.append(mem_res) + finally: + self.connPool.release_conn(inf_conn) res = self.concat_dataframes(df_list, output) if match_expressions: res["_score"] = res[score_column] @@ -281,28 +283,30 @@ class InfinityConnection(InfinityConnectionBase): order_by.asc("forget_at_flt") # query inf_conn = self.connPool.get_conn() - db_instance = inf_conn.get_database(self.dbName) - table_name = f"{index_name}_{memory_id}" - table_instance = db_instance.get_table(table_name) - column_name_list = [r[0] for r in table_instance.show_columns().rows()] - output_fields = [self.convert_message_field_to_infinity(f, column_name_list) for f in select_fields] - builder = table_instance.output(output_fields) - filter_cond = self.equivalent_condition_to_str(condition, db_instance.get_table(table_name)) - builder.filter(filter_cond) - order_by_expr_list = list() - if order_by.fields: - for order_field in order_by.fields: - order_field_name = self.convert_condition_and_order_field(order_field[0]) - if order_field[1] == 0: - order_by_expr_list.append((order_field_name, SortType.Asc)) - else: - order_by_expr_list.append((order_field_name, SortType.Desc)) - builder.sort(order_by_expr_list) - builder.offset(0).limit(limit) - mem_res, _ = builder.option({"total_hits_count": True}).to_df() - res = self.concat_dataframes(mem_res, output_fields) - res.head(limit) - self.connPool.release_conn(inf_conn) + try: + db_instance = inf_conn.get_database(self.dbName) + table_name = f"{index_name}_{memory_id}" + table_instance = db_instance.get_table(table_name) + column_name_list = [r[0] for r in table_instance.show_columns().rows()] + output_fields = [self.convert_message_field_to_infinity(f, column_name_list) for f in select_fields] + builder = table_instance.output(output_fields) + filter_cond = self.equivalent_condition_to_str(condition, db_instance.get_table(table_name)) + builder.filter(filter_cond) + order_by_expr_list = list() + if order_by.fields: + for order_field in order_by.fields: + order_field_name = self.convert_condition_and_order_field(order_field[0]) + if order_field[1] == 0: + order_by_expr_list.append((order_field_name, SortType.Asc)) + else: + order_by_expr_list.append((order_field_name, SortType.Desc)) + builder.sort(order_by_expr_list) + builder.offset(0).limit(limit) + mem_res, _ = builder.option({"total_hits_count": True}).to_df() + res = self.concat_dataframes(mem_res, output_fields) + res.head(limit) + finally: + self.connPool.release_conn(inf_conn) return res def get_missing_field_message(self, select_fields: list[str], index_name: str, memory_id: str, field_name: str, limit: int=512): @@ -311,48 +315,52 @@ class InfinityConnection(InfinityConnectionBase): order_by.asc("valid_at_flt") # query inf_conn = self.connPool.get_conn() - db_instance = inf_conn.get_database(self.dbName) - table_name = f"{index_name}_{memory_id}" - table_instance = db_instance.get_table(table_name) - column_name_list = [r[0] for r in table_instance.show_columns().rows()] - output_fields = [self.convert_message_field_to_infinity(f, column_name_list) for f in select_fields] - builder = table_instance.output(output_fields) - filter_cond = self.equivalent_condition_to_str(condition, db_instance.get_table(table_name)) - builder.filter(filter_cond) - order_by_expr_list = list() - if order_by.fields: - for order_field in order_by.fields: - order_field_name = self.convert_condition_and_order_field(order_field[0]) - if order_field[1] == 0: - order_by_expr_list.append((order_field_name, SortType.Asc)) - else: - order_by_expr_list.append((order_field_name, SortType.Desc)) - builder.sort(order_by_expr_list) - builder.offset(0).limit(limit) - mem_res, _ = builder.option({"total_hits_count": True}).to_df() - res = self.concat_dataframes(mem_res, output_fields) - res.head(limit) - self.connPool.release_conn(inf_conn) + try: + db_instance = inf_conn.get_database(self.dbName) + table_name = f"{index_name}_{memory_id}" + table_instance = db_instance.get_table(table_name) + column_name_list = [r[0] for r in table_instance.show_columns().rows()] + output_fields = [self.convert_message_field_to_infinity(f, column_name_list) for f in select_fields] + builder = table_instance.output(output_fields) + filter_cond = self.equivalent_condition_to_str(condition, db_instance.get_table(table_name)) + builder.filter(filter_cond) + order_by_expr_list = list() + if order_by.fields: + for order_field in order_by.fields: + order_field_name = self.convert_condition_and_order_field(order_field[0]) + if order_field[1] == 0: + order_by_expr_list.append((order_field_name, SortType.Asc)) + else: + order_by_expr_list.append((order_field_name, SortType.Desc)) + builder.sort(order_by_expr_list) + builder.offset(0).limit(limit) + mem_res, _ = builder.option({"total_hits_count": True}).to_df() + res = self.concat_dataframes(mem_res, output_fields) + res.head(limit) + finally: + self.connPool.release_conn(inf_conn) return res def get(self, message_id: str, index_name: str, memory_ids: list[str]) -> dict | None: inf_conn = self.connPool.get_conn() - db_instance = inf_conn.get_database(self.dbName) - df_list = list() - assert isinstance(memory_ids, list) - table_list = list() - for memoryId in memory_ids: - table_name = f"{index_name}_{memoryId}" - table_list.append(table_name) - try: - table_instance = db_instance.get_table(table_name) - except Exception: - self.logger.warning(f"Table not found: {table_name}, this memory isn't created in Infinity. Maybe it is created in other document engine.") - continue - mem_res, _ = table_instance.output(["*"]).filter(f"id = '{message_id}'").to_df() - self.logger.debug(f"INFINITY get table: {str(table_list)}, result: {str(mem_res)}") - df_list.append(mem_res) - self.connPool.release_conn(inf_conn) + try: + db_instance = inf_conn.get_database(self.dbName) + df_list = list() + assert isinstance(memory_ids, list) + table_list = list() + for memoryId in memory_ids: + table_name = f"{index_name}_{memoryId}" + table_list.append(table_name) + try: + table_instance = db_instance.get_table(table_name) + except Exception: + self.logger.warning(f"Table not found: {table_name}, this memory isn't created in Infinity. Maybe it is created in other document engine.") + continue + mem_res, _ = table_instance.output(["*"]).filter(f"id = '{message_id}'").to_df() + self.logger.debug(f"INFINITY get table: {str(table_list)}, result: {str(mem_res)}") + df_list.append(mem_res) + finally: + self.connPool.release_conn(inf_conn) res = self.concat_dataframes(df_list, ["id"]) fields = set(res.columns.tolist()) res_fields = self.get_fields(res, list(fields)) @@ -362,102 +370,106 @@ class InfinityConnection(InfinityConnectionBase): if not documents: return [] inf_conn = self.connPool.get_conn() - db_instance = inf_conn.get_database(self.dbName) - table_name = f"{index_name}_{memory_id}" - vector_size = int(len(documents[0]["content_embed"])) try: - table_instance = db_instance.get_table(table_name) - except InfinityException as e: - # src/common/status.cppm, kTableNotExist = 3022 - if e.error_code != ErrorCode.TABLE_NOT_EXIST: - raise - if vector_size == 0: - raise ValueError("Cannot infer vector size from documents") - self.create_idx(index_name, memory_id, vector_size) - table_instance = db_instance.get_table(table_name) + db_instance = inf_conn.get_database(self.dbName) + table_name = f"{index_name}_{memory_id}" + vector_size = int(len(documents[0]["content_embed"])) + try: + table_instance = db_instance.get_table(table_name) + except InfinityException as e: + # src/common/status.cppm, kTableNotExist = 3022 + if e.error_code != ErrorCode.TABLE_NOT_EXIST: + raise + if vector_size == 0: + raise ValueError("Cannot infer vector size from documents") + self.create_idx(index_name, memory_id, vector_size) + table_instance = db_instance.get_table(table_name) - # embedding fields can't have a default value.... - embedding_columns = [] - table_columns = table_instance.show_columns().rows() - for n, ty, _, _ in table_columns: - r = re.search(r"Embedding\([a-z]+,([0-9]+)\)", ty) - if not r: - continue - embedding_columns.append((n, int(r.group(1)))) - - docs = copy.deepcopy(documents) - for d in docs: - assert "_id" not in d - assert "id" in d - for k, v in list(d.items()): - if k == "content_embed": - d[f"q_{vector_size}_vec"] = d["content_embed"] - d.pop("content_embed") + # embedding fields can't have a default value.... + embedding_columns = [] + table_columns = table_instance.show_columns().rows() + for n, ty, _, _ in table_columns: + r = re.search(r"Embedding\([a-z]+,([0-9]+)\)", ty) + if not r: continue - field_name = self.convert_message_field_to_infinity(k) - if field_name in ["valid_at", "invalid_at", "forget_at"]: - d[f"{field_name}_flt"] = date_string_to_timestamp(v) if v else 0 - if v is None: - d[field_name] = "" - elif self.field_keyword(k): - if isinstance(v, list): - d[k] = "###".join(v) + embedding_columns.append((n, int(r.group(1)))) + + docs = copy.deepcopy(documents) + for d in docs: + assert "_id" not in d + assert "id" in d + for k, v in list(d.items()): + if k == "content_embed": + d[f"q_{vector_size}_vec"] = d["content_embed"] + d.pop("content_embed") + continue + field_name = self.convert_message_field_to_infinity(k) + if field_name in ["valid_at", "invalid_at", "forget_at"]: + d[f"{field_name}_flt"] = date_string_to_timestamp(v) if v else 0 + if v is None: + d[field_name] = "" + elif self.field_keyword(k): + if isinstance(v, list): + d[k] = "###".join(v) + else: + d[k] = v + elif k == "memory_id": + if isinstance(d[k], list): + d[k] = d[k][0] # since d[k] is a list, but we need a str else: - d[k] = v - elif k == "memory_id": - if isinstance(d[k], list): - d[k] = d[k][0] # since d[k] is a list, but we need a str - else: - d[field_name] = v - if k != field_name: - d.pop(k) + d[field_name] = v + if k != field_name: + d.pop(k) - for n, vs in embedding_columns: - if n in d: - continue - d[n] = [0] * vs - ids = ["'{}'".format(d["id"]) for d in docs] - str_ids = ", ".join(ids) - str_filter = f"id IN ({str_ids})" - table_instance.delete(str_filter) - table_instance.insert(docs) - self.connPool.release_conn(inf_conn) + for n, vs in embedding_columns: + if n in d: + continue + d[n] = [0] * vs + ids = ["'{}'".format(d["id"]) for d in docs] + str_ids = ", ".join(ids) + str_filter = f"id IN ({str_ids})" + table_instance.delete(str_filter) + table_instance.insert(docs) + finally: + self.connPool.release_conn(inf_conn) self.logger.debug(f"INFINITY inserted into {table_name} {str_ids}.") return [] def update(self, condition: dict, new_value: dict, index_name: str, memory_id: str) -> bool: inf_conn = self.connPool.get_conn() - db_instance = inf_conn.get_database(self.dbName) - table_name = f"{index_name}_{memory_id}" - table_instance = db_instance.get_table(table_name) + try: + db_instance = inf_conn.get_database(self.dbName) + table_name = f"{index_name}_{memory_id}" + table_instance = db_instance.get_table(table_name) - columns = {} - if table_instance: - for n, ty, de, _ in table_instance.show_columns().rows(): - columns[n] = (ty, de) - condition_dict = {self.convert_condition_and_order_field(k): v for k, v in condition.items()} - filter = self.equivalent_condition_to_str(condition_dict, table_instance) - update_dict = {self.convert_message_field_to_infinity(k): v for k, v in new_value.items()} - date_floats = {} - for k, v in update_dict.items(): - if k in ["valid_at", "invalid_at", "forget_at"]: - date_floats[f"{k}_flt"] = date_string_to_timestamp(v) if v else 0 - elif self.field_keyword(k): - if isinstance(v, list): - update_dict[k] = "###".join(v) + columns = {} + if table_instance: + for n, ty, de, _ in table_instance.show_columns().rows(): + columns[n] = (ty, de) + condition_dict = {self.convert_condition_and_order_field(k): v for k, v in condition.items()} + filter = self.equivalent_condition_to_str(condition_dict, table_instance) + update_dict = {self.convert_message_field_to_infinity(k): v for k, v in new_value.items()} + date_floats = {} + for k, v in update_dict.items(): + if k in ["valid_at", "invalid_at", "forget_at"]: + date_floats[f"{k}_flt"] = date_string_to_timestamp(v) if v else 0 + elif self.field_keyword(k): + if isinstance(v, list): + update_dict[k] = "###".join(v) + else: + update_dict[k] = v + elif k == "memory_id": + if isinstance(update_dict[k], list): + update_dict[k] = update_dict[k][0] # since d[k] is a list, but we need a str else: update_dict[k] = v - elif k == "memory_id": - if isinstance(update_dict[k], list): - update_dict[k] = update_dict[k][0] # since d[k] is a list, but we need a str - else: - update_dict[k] = v - if date_floats: - update_dict.update(date_floats) + if date_floats: + update_dict.update(date_floats) - self.logger.debug(f"INFINITY update table {table_name}, filter {filter}, newValue {new_value}.") - table_instance.update(filter, update_dict) - self.connPool.release_conn(inf_conn) + self.logger.debug(f"INFINITY update table {table_name}, filter {filter}, newValue {new_value}.") + table_instance.update(filter, update_dict) + finally: + self.connPool.release_conn(inf_conn) return True """ diff --git a/rag/utils/infinity_conn.py b/rag/utils/infinity_conn.py index 59773052e..1976e1427 100644 --- a/rag/utils/infinity_conn.py +++ b/rag/utils/infinity_conn.py @@ -110,47 +110,123 @@ class InfinityConnection(InfinityConnectionBase): index_names = index_names.split(",") assert isinstance(index_names, list) and len(index_names) > 0 inf_conn = self.connPool.get_conn() - db_instance = inf_conn.get_database(self.dbName) - df_list = list() - table_list = list() - output = select_fields.copy() - output = self.convert_select_fields(output) - if agg_fields is None: - agg_fields = [] - for essential_field in ["id"] + agg_fields: - if essential_field not in output: - output.append(essential_field) - score_func = "" - score_column = "" - for matchExpr in match_expressions: - if isinstance(matchExpr, MatchTextExpr): - score_func = "score()" - score_column = "SCORE" - break - if not score_func: + try: + db_instance = inf_conn.get_database(self.dbName) + df_list = list() + table_list = list() + output = select_fields.copy() + output = self.convert_select_fields(output) + if agg_fields is None: + agg_fields = [] + for essential_field in ["id"] + agg_fields: + if essential_field not in output: + output.append(essential_field) + score_func = "" + score_column = "" for matchExpr in match_expressions: - if isinstance(matchExpr, MatchDenseExpr): - score_func = "similarity()" - score_column = "SIMILARITY" + if isinstance(matchExpr, MatchTextExpr): + score_func = "score()" + score_column = "SCORE" break - if match_expressions: - if score_func and score_func not in output: - output.append(score_func) - if PAGERANK_FLD not in output: - output.append(PAGERANK_FLD) - output = [f for f in output if f and f != "_score"] - if limit <= 0: - # ElasticSearch default limit is 10000 - limit = 10000 + if not score_func: + for matchExpr in match_expressions: + if isinstance(matchExpr, MatchDenseExpr): + score_func = "similarity()" + score_column = "SIMILARITY" + break + if match_expressions: + if score_func and score_func not in output: + output.append(score_func) + if PAGERANK_FLD not in output: + output.append(PAGERANK_FLD) + output = [f for f in output if f and f != "_score"] + if limit <= 0: + # ElasticSearch default limit is 10000 + limit = 10000 - # Prepare expressions common to all tables - filter_cond = None - filter_fulltext = "" - if condition: - # Remove kb_id filter for Infinity (it uses table separation instead) - condition = {k: v for k, v in condition.items() if k != "kb_id"} + # Prepare expressions common to all tables + filter_cond = None + filter_fulltext = "" + if condition: + # Remove kb_id filter for Infinity (it uses table separation instead) + condition = {k: v for k, v in condition.items() if k != "kb_id"} - table_found = False + table_found = False + for indexName in index_names: + if indexName.startswith("ragflow_doc_meta_"): + table_names_to_search = [indexName] + else: + table_names_to_search = [f"{indexName}_{kb_id}" for kb_id in knowledgebase_ids] + for table_name in table_names_to_search: + try: + filter_cond = self.equivalent_condition_to_str(condition, db_instance.get_table(table_name)) + table_found = True + break + except Exception: + pass + if table_found: + break + if not table_found: + self.logger.error( + f"No valid tables found for indexNames {index_names} and knowledgebaseIds {knowledgebase_ids}") + return pd.DataFrame(), 0 + + for matchExpr in match_expressions: + if isinstance(matchExpr, MatchTextExpr): + if filter_cond and "filter" not in matchExpr.extra_options: + matchExpr.extra_options.update({"filter": filter_cond}) + matchExpr.fields = [self.convert_matching_field(field) for field in matchExpr.fields] + fields = ",".join(matchExpr.fields) + filter_fulltext = f"filter_fulltext('{fields}', '{matchExpr.matching_text}')" + if filter_cond: + filter_fulltext = f"({filter_cond}) AND {filter_fulltext}" + minimum_should_match = matchExpr.extra_options.get("minimum_should_match", 0.0) + if isinstance(minimum_should_match, float): + str_minimum_should_match = str(int(minimum_should_match * 100)) + "%" + matchExpr.extra_options["minimum_should_match"] = str_minimum_should_match + + # Add rank_feature support + if rank_feature and "rank_features" not in matchExpr.extra_options: + # Convert rank_feature dict to Infinity's rank_features string format + # Format: "field^feature_name^weight,field^feature_name^weight" + rank_features_list = [] + for feature_name, weight in rank_feature.items(): + # Use TAG_FLD as the field containing rank features + rank_features_list.append(f"{TAG_FLD}^{feature_name}^{weight}") + if rank_features_list: + matchExpr.extra_options["rank_features"] = ",".join(rank_features_list) + + for k, v in matchExpr.extra_options.items(): + if not isinstance(v, str): + matchExpr.extra_options[k] = str(v) + self.logger.debug(f"INFINITY search MatchTextExpr: {json.dumps(matchExpr.__dict__)}") + elif isinstance(matchExpr, MatchDenseExpr): + if filter_fulltext and "filter" not in matchExpr.extra_options: + matchExpr.extra_options.update({"filter": filter_fulltext}) + for k, v in matchExpr.extra_options.items(): + if not isinstance(v, str): + matchExpr.extra_options[k] = str(v) + similarity = matchExpr.extra_options.get("similarity") + if similarity: + matchExpr.extra_options["threshold"] = similarity + del matchExpr.extra_options["similarity"] + self.logger.debug(f"INFINITY search MatchDenseExpr: {json.dumps(matchExpr.__dict__)}") + elif isinstance(matchExpr, FusionExpr): + if matchExpr.method == "weighted_sum": + # The default is "minmax" which gives a zero score for the last doc. + matchExpr.fusion_params["normalize"] = "atan" + self.logger.debug(f"INFINITY search FusionExpr: {json.dumps(matchExpr.__dict__)}") + + order_by_expr_list = list() + if order_by.fields: + for order_field in order_by.fields: + if order_field[1] == 0: + order_by_expr_list.append((order_field[0], SortType.Asc)) + else: + order_by_expr_list.append((order_field[0], SortType.Desc)) + + total_hits_count = 0 + # Scatter search tables and gather the results for indexName in index_names: if indexName.startswith("ragflow_doc_meta_"): table_names_to_search = [indexName] @@ -158,149 +234,77 @@ class InfinityConnection(InfinityConnectionBase): table_names_to_search = [f"{indexName}_{kb_id}" for kb_id in knowledgebase_ids] for table_name in table_names_to_search: try: - filter_cond = self.equivalent_condition_to_str(condition, db_instance.get_table(table_name)) - table_found = True - break + table_instance = db_instance.get_table(table_name) except Exception: - pass - if table_found: - break - if not table_found: - self.logger.error( - f"No valid tables found for indexNames {index_names} and knowledgebaseIds {knowledgebase_ids}") - return pd.DataFrame(), 0 - - for matchExpr in match_expressions: - if isinstance(matchExpr, MatchTextExpr): - if filter_cond and "filter" not in matchExpr.extra_options: - matchExpr.extra_options.update({"filter": filter_cond}) - matchExpr.fields = [self.convert_matching_field(field) for field in matchExpr.fields] - fields = ",".join(matchExpr.fields) - filter_fulltext = f"filter_fulltext('{fields}', '{matchExpr.matching_text}')" - if filter_cond: - filter_fulltext = f"({filter_cond}) AND {filter_fulltext}" - minimum_should_match = matchExpr.extra_options.get("minimum_should_match", 0.0) - if isinstance(minimum_should_match, float): - str_minimum_should_match = str(int(minimum_should_match * 100)) + "%" - matchExpr.extra_options["minimum_should_match"] = str_minimum_should_match - - # Add rank_feature support - if rank_feature and "rank_features" not in matchExpr.extra_options: - # Convert rank_feature dict to Infinity's rank_features string format - # Format: "field^feature_name^weight,field^feature_name^weight" - rank_features_list = [] - for feature_name, weight in rank_feature.items(): - # Use TAG_FLD as the field containing rank features - rank_features_list.append(f"{TAG_FLD}^{feature_name}^{weight}") - if rank_features_list: - matchExpr.extra_options["rank_features"] = ",".join(rank_features_list) - - for k, v in matchExpr.extra_options.items(): - if not isinstance(v, str): - matchExpr.extra_options[k] = str(v) - self.logger.debug(f"INFINITY search MatchTextExpr: {json.dumps(matchExpr.__dict__)}") - elif isinstance(matchExpr, MatchDenseExpr): - if filter_fulltext and "filter" not in matchExpr.extra_options: - matchExpr.extra_options.update({"filter": filter_fulltext}) - for k, v in matchExpr.extra_options.items(): - if not isinstance(v, str): - matchExpr.extra_options[k] = str(v) - similarity = matchExpr.extra_options.get("similarity") - if similarity: - matchExpr.extra_options["threshold"] = similarity - del matchExpr.extra_options["similarity"] - self.logger.debug(f"INFINITY search MatchDenseExpr: {json.dumps(matchExpr.__dict__)}") - elif isinstance(matchExpr, FusionExpr): - if matchExpr.method == "weighted_sum": - # The default is "minmax" which gives a zero score for the last doc. - matchExpr.fusion_params["normalize"] = "atan" - self.logger.debug(f"INFINITY search FusionExpr: {json.dumps(matchExpr.__dict__)}") - - order_by_expr_list = list() - if order_by.fields: - for order_field in order_by.fields: - if order_field[1] == 0: - order_by_expr_list.append((order_field[0], SortType.Asc)) - else: - order_by_expr_list.append((order_field[0], SortType.Desc)) - - total_hits_count = 0 - # Scatter search tables and gather the results - for indexName in index_names: - if indexName.startswith("ragflow_doc_meta_"): - table_names_to_search = [indexName] - else: - table_names_to_search = [f"{indexName}_{kb_id}" for kb_id in knowledgebase_ids] - for table_name in table_names_to_search: - try: - table_instance = db_instance.get_table(table_name) - except Exception: - continue - table_list.append(table_name) - builder = table_instance.output(output) - if len(match_expressions) > 0: - for matchExpr in match_expressions: - if isinstance(matchExpr, MatchTextExpr): - fields = ",".join(matchExpr.fields) - builder = builder.match_text( - fields, - matchExpr.matching_text, - matchExpr.topn, - matchExpr.extra_options.copy(), - ) - elif isinstance(matchExpr, MatchDenseExpr): - builder = builder.match_dense( - matchExpr.vector_column_name, - matchExpr.embedding_data, - matchExpr.embedding_data_type, - matchExpr.distance_type, - matchExpr.topn, - matchExpr.extra_options.copy(), - ) - elif isinstance(matchExpr, FusionExpr): - builder = builder.fusion(matchExpr.method, matchExpr.topn, matchExpr.fusion_params) - else: - if filter_cond and len(filter_cond) > 0: - builder.filter(filter_cond) - if order_by.fields: - builder.sort(order_by_expr_list) - builder.offset(offset).limit(limit) - kb_res, extra_result = builder.option({"total_hits_count": True}).to_df() - if extra_result: - total_hits_count += int(extra_result["total_hits_count"]) - self.logger.debug(f"INFINITY search table: {str(table_name)}, result: {str(kb_res)}") - df_list.append(kb_res) - self.connPool.release_conn(inf_conn) - res = self.concat_dataframes(df_list, output) - if match_expressions and score_column: - res["_score"] = res[score_column] + res[PAGERANK_FLD] - res = res.sort_values(by="_score", ascending=False).reset_index(drop=True) - res = res.head(limit) - self.logger.debug(f"INFINITY search final result: {str(res)}") - return res, total_hits_count + continue + table_list.append(table_name) + builder = table_instance.output(output) + if len(match_expressions) > 0: + for matchExpr in match_expressions: + if isinstance(matchExpr, MatchTextExpr): + fields = ",".join(matchExpr.fields) + builder = builder.match_text( + fields, + matchExpr.matching_text, + matchExpr.topn, + matchExpr.extra_options.copy(), + ) + elif isinstance(matchExpr, MatchDenseExpr): + builder = builder.match_dense( + matchExpr.vector_column_name, + matchExpr.embedding_data, + matchExpr.embedding_data_type, + matchExpr.distance_type, + matchExpr.topn, + matchExpr.extra_options.copy(), + ) + elif isinstance(matchExpr, FusionExpr): + builder = builder.fusion(matchExpr.method, matchExpr.topn, matchExpr.fusion_params) + else: + if filter_cond and len(filter_cond) > 0: + builder.filter(filter_cond) + if order_by.fields: + builder.sort(order_by_expr_list) + builder.offset(offset).limit(limit) + kb_res, extra_result = builder.option({"total_hits_count": True}).to_df() + if extra_result: + total_hits_count += int(extra_result["total_hits_count"]) + self.logger.debug(f"INFINITY search table: {str(table_name)}, result: {str(kb_res)}") + df_list.append(kb_res) + res = self.concat_dataframes(df_list, output) + if match_expressions and score_column: + res["_score"] = res[score_column] + res[PAGERANK_FLD] + res = res.sort_values(by="_score", ascending=False).reset_index(drop=True) + res = res.head(limit) + self.logger.debug(f"INFINITY search final result: {str(res)}") + return res, total_hits_count + finally: + self.connPool.release_conn(inf_conn) def get(self, chunk_id: str, index_name: str, knowledgebase_ids: list[str]) -> dict | None: inf_conn = self.connPool.get_conn() - db_instance = inf_conn.get_database(self.dbName) - df_list = list() - assert isinstance(knowledgebase_ids, list) - table_list = list() - if index_name.startswith("ragflow_doc_meta_"): - table_names_to_search = [index_name] - else: - table_names_to_search = [f"{index_name}_{kb_id}" for kb_id in knowledgebase_ids] - for table_name in table_names_to_search: - table_list.append(table_name) - try: - table_instance = db_instance.get_table(table_name) - except Exception: - self.logger.warning( - f"Table not found: {table_name}, this dataset isn't created in Infinity. Maybe it is created in other document engine.") - continue - kb_res, _ = table_instance.output(["*"]).filter(f"id = '{chunk_id}'").to_df() - self.logger.debug(f"INFINITY get table: {str(table_list)}, result: {str(kb_res)}") - df_list.append(kb_res) - self.connPool.release_conn(inf_conn) + try: + db_instance = inf_conn.get_database(self.dbName) + df_list = list() + assert isinstance(knowledgebase_ids, list) + table_list = list() + if index_name.startswith("ragflow_doc_meta_"): + table_names_to_search = [index_name] + else: + table_names_to_search = [f"{index_name}_{kb_id}" for kb_id in knowledgebase_ids] + for table_name in table_names_to_search: + table_list.append(table_name) + try: + table_instance = db_instance.get_table(table_name) + except Exception: + self.logger.warning( + f"Table not found: {table_name}, this dataset isn't created in Infinity. Maybe it is created in other document engine.") + continue + kb_res, _ = table_instance.output(["*"]).filter(f"id = '{chunk_id}'").to_df() + self.logger.debug(f"INFINITY get table: {str(table_list)}, result: {str(kb_res)}") + df_list.append(kb_res) + finally: + self.connPool.release_conn(inf_conn) res = self.concat_dataframes(df_list, ["id"]) fields = set(res.columns.tolist()) for field in ["docnm_kwd", "title_tks", "title_sm_tks", "important_kwd", "important_tks", "question_kwd", @@ -312,140 +316,142 @@ class InfinityConnection(InfinityConnectionBase): def insert(self, documents: list[dict], index_name: str, knowledgebase_id: str = None) -> list[str]: inf_conn = self.connPool.get_conn() - db_instance = inf_conn.get_database(self.dbName) - if index_name.startswith("ragflow_doc_meta_"): - table_name = index_name - else: - table_name = f"{index_name}_{knowledgebase_id}" try: - table_instance = db_instance.get_table(table_name) - except InfinityException as e: - # src/common/status.cppm, kTableNotExist = 3022 - if e.error_code != ErrorCode.TABLE_NOT_EXIST: - raise - vector_size = 0 - patt = re.compile(r"q_(?P\d+)_vec") - for k in documents[0].keys(): - m = patt.match(k) - if m: - vector_size = int(m.group("vector_size")) - break - if vector_size == 0: - raise ValueError("Cannot infer vector size from documents") + db_instance = inf_conn.get_database(self.dbName) + if index_name.startswith("ragflow_doc_meta_"): + table_name = index_name + else: + table_name = f"{index_name}_{knowledgebase_id}" + try: + table_instance = db_instance.get_table(table_name) + except InfinityException as e: + # src/common/status.cppm, kTableNotExist = 3022 + if e.error_code != ErrorCode.TABLE_NOT_EXIST: + raise + vector_size = 0 + patt = re.compile(r"q_(?P\d+)_vec") + for k in documents[0].keys(): + m = patt.match(k) + if m: + vector_size = int(m.group("vector_size")) + break + if vector_size == 0: + raise ValueError("Cannot infer vector size from documents") - # Determine parser_id from document structure - # Table parser documents have 'chunk_data' field - parser_id = None - if "chunk_data" in documents[0] and isinstance(documents[0].get("chunk_data"), dict): - from common.constants import ParserType - parser_id = ParserType.TABLE.value - self.logger.debug("Detected TABLE parser from document structure") + # Determine parser_id from document structure + # Table parser documents have 'chunk_data' field + parser_id = None + if "chunk_data" in documents[0] and isinstance(documents[0].get("chunk_data"), dict): + from common.constants import ParserType + parser_id = ParserType.TABLE.value + self.logger.debug("Detected TABLE parser from document structure") - # Fallback: Create table with base schema (shouldn't normally happen as init_kb() creates it) - self.logger.debug(f"Fallback: Creating table {table_name} with base schema, parser_id: {parser_id}") - self.create_idx(index_name, knowledgebase_id, vector_size, parser_id) - table_instance = db_instance.get_table(table_name) + # Fallback: Create table with base schema (shouldn't normally happen as init_kb() creates it) + self.logger.debug(f"Fallback: Creating table {table_name} with base schema, parser_id: {parser_id}") + self.create_idx(index_name, knowledgebase_id, vector_size, parser_id) + table_instance = db_instance.get_table(table_name) - # embedding fields can't have a default value.... - embedding_clmns = [] - clmns = table_instance.show_columns().rows() - for n, ty, _, _ in clmns: - r = re.search(r"Embedding\([a-z]+,([0-9]+)\)", ty) - if not r: - continue - embedding_clmns.append((n, int(r.group(1)))) - - docs = copy.deepcopy(documents) - for d in docs: - assert "_id" not in d - assert "id" in d - for k, v in list(d.items()): - if k == "docnm_kwd": - d["docnm"] = v - elif k == "title_kwd": - if not d.get("docnm_kwd"): - d["docnm"] = self.list2str(v) - elif k == "title_sm_tks": - if not d.get("docnm_kwd"): - d["docnm"] = self.list2str(v) - elif k == "important_kwd": - if isinstance(v, list): - empty_count = sum(1 for kw in v if kw == "") - tokens = [kw for kw in v if kw != ""] - d["important_keywords"] = self.list2str(tokens, ",") - d["important_kwd_empty_count"] = empty_count - else: - d["important_keywords"] = self.list2str(v, ",") - elif k == "important_tks": - if not d.get("important_kwd"): - d["important_keywords"] = v - elif k == "content_with_weight": - d["content"] = v - elif k == "content_ltks": - if not d.get("content_with_weight"): - d["content"] = v - elif k == "content_sm_ltks": - if not d.get("content_with_weight"): - d["content"] = v - elif k == "authors_tks": - d["authors"] = v - elif k == "authors_sm_tks": - if not d.get("authors_tks"): - d["authors"] = v - elif k == "question_kwd": - d["questions"] = self.list2str(v, "\n") - elif k == "question_tks": - if not d.get("question_kwd"): - d["questions"] = self.list2str(v) - elif self.field_keyword(k): - if isinstance(v, list): - d[k] = "###".join(v) - else: - d[k] = v - elif re.search(r"_feas$", k): - d[k] = json.dumps(v) - elif k == "chunk_data": - # Convert data dict to JSON string for storage - if isinstance(v, dict): - d[k] = json.dumps(v) - else: - d[k] = v - elif k == "kb_id": - if isinstance(d[k], list): - d[k] = d[k][0] # since d[k] is a list, but we need a str - elif k == "position_int": - assert isinstance(v, list) - arr = [num for row in v for num in row] - d[k] = "_".join(f"{num:08x}" for num in arr) - elif k in ["page_num_int", "top_int"]: - assert isinstance(v, list) - d[k] = "_".join(f"{num:08x}" for num in v) - elif k == "meta_fields": - if isinstance(v, dict): - d[k] = json.dumps(v, ensure_ascii=False) - else: - d[k] = v if v else "{}" - else: - d[k] = v - for k in ["docnm_kwd", "title_tks", "title_sm_tks", "important_kwd", "important_tks", "content_with_weight", - "content_ltks", "content_sm_ltks", "authors_tks", "authors_sm_tks", "question_kwd", - "question_tks"]: - if k in d: - del d[k] - - for n, vs in embedding_clmns: - if n in d: + # embedding fields can't have a default value.... + embedding_clmns = [] + clmns = table_instance.show_columns().rows() + for n, ty, _, _ in clmns: + r = re.search(r"Embedding\([a-z]+,([0-9]+)\)", ty) + if not r: continue - d[n] = [0] * vs - ids = ["'{}'".format(d["id"]) for d in docs] - str_ids = ", ".join(ids) - str_filter = f"id IN ({str_ids})" - table_instance.delete(str_filter) - # for doc in documents: - # logger.info(f"insert position_int: {doc['position_int']}") - # logger.info(f"InfinityConnection.insert {json.dumps(documents)}") - table_instance.insert(docs) - self.connPool.release_conn(inf_conn) + embedding_clmns.append((n, int(r.group(1)))) + + docs = copy.deepcopy(documents) + for d in docs: + assert "_id" not in d + assert "id" in d + for k, v in list(d.items()): + if k == "docnm_kwd": + d["docnm"] = v + elif k == "title_kwd": + if not d.get("docnm_kwd"): + d["docnm"] = self.list2str(v) + elif k == "title_sm_tks": + if not d.get("docnm_kwd"): + d["docnm"] = self.list2str(v) + elif k == "important_kwd": + if isinstance(v, list): + empty_count = sum(1 for kw in v if kw == "") + tokens = [kw for kw in v if kw != ""] + d["important_keywords"] = self.list2str(tokens, ",") + d["important_kwd_empty_count"] = empty_count + else: + d["important_keywords"] = self.list2str(v, ",") + elif k == "important_tks": + if not d.get("important_kwd"): + d["important_keywords"] = v + elif k == "content_with_weight": + d["content"] = v + elif k == "content_ltks": + if not d.get("content_with_weight"): + d["content"] = v + elif k == "content_sm_ltks": + if not d.get("content_with_weight"): + d["content"] = v + elif k == "authors_tks": + d["authors"] = v + elif k == "authors_sm_tks": + if not d.get("authors_tks"): + d["authors"] = v + elif k == "question_kwd": + d["questions"] = self.list2str(v, "\n") + elif k == "question_tks": + if not d.get("question_kwd"): + d["questions"] = self.list2str(v) + elif self.field_keyword(k): + if isinstance(v, list): + d[k] = "###".join(v) + else: + d[k] = v + elif re.search(r"_feas$", k): + d[k] = json.dumps(v) + elif k == "chunk_data": + # Convert data dict to JSON string for storage + if isinstance(v, dict): + d[k] = json.dumps(v) + else: + d[k] = v + elif k == "kb_id": + if isinstance(d[k], list): + d[k] = d[k][0] # since d[k] is a list, but we need a str + elif k == "position_int": + assert isinstance(v, list) + arr = [num for row in v for num in row] + d[k] = "_".join(f"{num:08x}" for num in arr) + elif k in ["page_num_int", "top_int"]: + assert isinstance(v, list) + d[k] = "_".join(f"{num:08x}" for num in v) + elif k == "meta_fields": + if isinstance(v, dict): + d[k] = json.dumps(v, ensure_ascii=False) + else: + d[k] = v if v else "{}" + else: + d[k] = v + for k in ["docnm_kwd", "title_tks", "title_sm_tks", "important_kwd", "important_tks", "content_with_weight", + "content_ltks", "content_sm_ltks", "authors_tks", "authors_sm_tks", "question_kwd", + "question_tks"]: + if k in d: + del d[k] + + for n, vs in embedding_clmns: + if n in d: + continue + d[n] = [0] * vs + ids = ["'{}'".format(d["id"]) for d in docs] + str_ids = ", ".join(ids) + str_filter = f"id IN ({str_ids})" + table_instance.delete(str_filter) + # for doc in documents: + # logger.info(f"insert position_int: {doc['position_int']}") + # logger.info(f"InfinityConnection.insert {json.dumps(documents)}") + table_instance.insert(docs) + finally: + self.connPool.release_conn(inf_conn) self.logger.debug(f"INFINITY inserted into {table_name} {str_ids}.") return [] @@ -453,120 +459,122 @@ class InfinityConnection(InfinityConnectionBase): # if 'position_int' in newValue: # logger.info(f"update position_int: {newValue['position_int']}") inf_conn = self.connPool.get_conn() - db_instance = inf_conn.get_database(self.dbName) - if index_name.startswith("ragflow_doc_meta_"): - table_name = index_name - else: - table_name = f"{index_name}_{knowledgebase_id}" - table_instance = db_instance.get_table(table_name) - # if "exists" in condition: - # del condition["exists"] + try: + db_instance = inf_conn.get_database(self.dbName) + if index_name.startswith("ragflow_doc_meta_"): + table_name = index_name + else: + table_name = f"{index_name}_{knowledgebase_id}" + table_instance = db_instance.get_table(table_name) + # if "exists" in condition: + # del condition["exists"] - clmns = {} - if table_instance: - for n, ty, de, _ in table_instance.show_columns().rows(): - clmns[n] = (ty, de) - filter = self.equivalent_condition_to_str(condition, table_instance) - removeValue = {} - for k, v in list(new_value.items()): - if k == "docnm_kwd": - new_value["docnm"] = self.list2str(v) - elif k == "title_kwd": - if not new_value.get("docnm_kwd"): + clmns = {} + if table_instance: + for n, ty, de, _ in table_instance.show_columns().rows(): + clmns[n] = (ty, de) + filter = self.equivalent_condition_to_str(condition, table_instance) + removeValue = {} + for k, v in list(new_value.items()): + if k == "docnm_kwd": new_value["docnm"] = self.list2str(v) - elif k == "title_sm_tks": - if not new_value.get("docnm_kwd"): - new_value["docnm"] = v - elif k == "important_kwd": - if isinstance(v, list): - empty_count = sum(1 for kw in v if kw == "") - tokens = [kw for kw in v if kw != ""] - new_value["important_keywords"] = self.list2str(tokens, ",") - new_value["important_kwd_empty_count"] = empty_count - else: - new_value["important_keywords"] = self.list2str(v, ",") - elif k == "important_tks": - if not new_value.get("important_kwd"): - new_value["important_keywords"] = v - elif k == "content_with_weight": - new_value["content"] = v - elif k == "content_ltks": - if not new_value.get("content_with_weight"): + elif k == "title_kwd": + if not new_value.get("docnm_kwd"): + new_value["docnm"] = self.list2str(v) + elif k == "title_sm_tks": + if not new_value.get("docnm_kwd"): + new_value["docnm"] = v + elif k == "important_kwd": + if isinstance(v, list): + empty_count = sum(1 for kw in v if kw == "") + tokens = [kw for kw in v if kw != ""] + new_value["important_keywords"] = self.list2str(tokens, ",") + new_value["important_kwd_empty_count"] = empty_count + else: + new_value["important_keywords"] = self.list2str(v, ",") + elif k == "important_tks": + if not new_value.get("important_kwd"): + new_value["important_keywords"] = v + elif k == "content_with_weight": new_value["content"] = v - elif k == "content_sm_ltks": - if not new_value.get("content_with_weight"): - new_value["content"] = v - elif k == "authors_tks": - new_value["authors"] = v - elif k == "authors_sm_tks": - if not new_value.get("authors_tks"): + elif k == "content_ltks": + if not new_value.get("content_with_weight"): + new_value["content"] = v + elif k == "content_sm_ltks": + if not new_value.get("content_with_weight"): + new_value["content"] = v + elif k == "authors_tks": new_value["authors"] = v - elif k == "question_kwd": - new_value["questions"] = "\n".join(v) - elif k == "question_tks": - if not new_value.get("question_kwd"): - new_value["questions"] = self.list2str(v) - elif self.field_keyword(k): - if isinstance(v, list): - new_value[k] = "###".join(v) + elif k == "authors_sm_tks": + if not new_value.get("authors_tks"): + new_value["authors"] = v + elif k == "question_kwd": + new_value["questions"] = "\n".join(v) + elif k == "question_tks": + if not new_value.get("question_kwd"): + new_value["questions"] = self.list2str(v) + elif self.field_keyword(k): + if isinstance(v, list): + new_value[k] = "###".join(v) + else: + new_value[k] = v + elif re.search(r"_feas$", k): + new_value[k] = json.dumps(v) + elif k == "kb_id": + if isinstance(new_value[k], list): + new_value[k] = new_value[k][0] # since d[k] is a list, but we need a str + elif k == "position_int": + assert isinstance(v, list) + arr = [num for row in v for num in row] + new_value[k] = "_".join(f"{num:08x}" for num in arr) + elif k in ["page_num_int", "top_int"]: + assert isinstance(v, list) + new_value[k] = "_".join(f"{num:08x}" for num in v) + elif k == "remove": + if isinstance(v, str): + assert v in clmns, f"'{v}' should be in '{clmns}'." + ty, de = clmns[v] + if ty.lower().find("cha"): + if not de: + de = "" + new_value[v] = de + else: + for kk, vv in v.items(): + removeValue[kk] = vv + del new_value[k] else: new_value[k] = v - elif re.search(r"_feas$", k): - new_value[k] = json.dumps(v) - elif k == "kb_id": - if isinstance(new_value[k], list): - new_value[k] = new_value[k][0] # since d[k] is a list, but we need a str - elif k == "position_int": - assert isinstance(v, list) - arr = [num for row in v for num in row] - new_value[k] = "_".join(f"{num:08x}" for num in arr) - elif k in ["page_num_int", "top_int"]: - assert isinstance(v, list) - new_value[k] = "_".join(f"{num:08x}" for num in v) - elif k == "remove": - if isinstance(v, str): - assert v in clmns, f"'{v}' should be in '{clmns}'." - ty, de = clmns[v] - if ty.lower().find("cha"): - if not de: - de = "" - new_value[v] = de - else: - for kk, vv in v.items(): - removeValue[kk] = vv + for k in ["docnm_kwd", "title_tks", "title_sm_tks", "important_kwd", "important_tks", "content_with_weight", + "content_ltks", "content_sm_ltks", "authors_tks", "authors_sm_tks", "question_kwd", "question_tks"]: + if k in new_value: del new_value[k] - else: - new_value[k] = v - for k in ["docnm_kwd", "title_tks", "title_sm_tks", "important_kwd", "important_tks", "content_with_weight", - "content_ltks", "content_sm_ltks", "authors_tks", "authors_sm_tks", "question_kwd", "question_tks"]: - if k in new_value: - del new_value[k] - remove_opt = {} # "[k,new_value]": [id_to_update, ...] - if removeValue: - col_to_remove = list(removeValue.keys()) - row_to_opt = table_instance.output(col_to_remove + ["id"]).filter(filter).to_df() - self.logger.debug(f"INFINITY search table {str(table_name)}, filter {filter}, result: {str(row_to_opt[0])}") - row_to_opt = self.get_fields(row_to_opt, col_to_remove) - for id, old_v in row_to_opt.items(): - for k, remove_v in removeValue.items(): - if remove_v in old_v[k]: - new_v = old_v[k].copy() - new_v.remove(remove_v) - kv_key = json.dumps([k, new_v]) - if kv_key not in remove_opt: - remove_opt[kv_key] = [id] - else: - remove_opt[kv_key].append(id) + remove_opt = {} # "[k,new_value]": [id_to_update, ...] + if removeValue: + col_to_remove = list(removeValue.keys()) + row_to_opt = table_instance.output(col_to_remove + ["id"]).filter(filter).to_df() + self.logger.debug(f"INFINITY search table {str(table_name)}, filter {filter}, result: {str(row_to_opt[0])}") + row_to_opt = self.get_fields(row_to_opt, col_to_remove) + for id, old_v in row_to_opt.items(): + for k, remove_v in removeValue.items(): + if remove_v in old_v[k]: + new_v = old_v[k].copy() + new_v.remove(remove_v) + kv_key = json.dumps([k, new_v]) + if kv_key not in remove_opt: + remove_opt[kv_key] = [id] + else: + remove_opt[kv_key].append(id) - self.logger.debug(f"INFINITY update table {table_name}, filter {filter}, newValue {new_value}.") - for update_kv, ids in remove_opt.items(): - k, v = json.loads(update_kv) - table_instance.update(filter + " AND id in ({0})".format(",".join([f"'{id}'" for id in ids])), - {k: "###".join(v)}) + self.logger.debug(f"INFINITY update table {table_name}, filter {filter}, newValue {new_value}.") + for update_kv, ids in remove_opt.items(): + k, v = json.loads(update_kv) + table_instance.update(filter + " AND id in ({0})".format(",".join([f"'{id}'" for id in ids])), + {k: "###".join(v)}) - table_instance.update(filter, new_value) - self.connPool.release_conn(inf_conn) + table_instance.update(filter, new_value) + finally: + self.connPool.release_conn(inf_conn) return True """ diff --git a/sdk/python/ragflow_sdk/modules/agent.py b/sdk/python/ragflow_sdk/modules/agent.py index 42b97a88e..5e67f40d9 100644 --- a/sdk/python/ragflow_sdk/modules/agent.py +++ b/sdk/python/ragflow_sdk/modules/agent.py @@ -87,8 +87,11 @@ class Agent(Base): return result_list raise Exception(res.get("message")) - def delete_sessions(self, ids: list[str] | None = None): - res = self.rm(f"/agents/{self.id}/sessions", {"ids": ids}) + def delete_sessions(self, ids: list[str] | None = None, delete_all: bool = False): + payload = {"ids": ids} + if delete_all: + payload["delete_all"] = True + res = self.rm(f"/agents/{self.id}/sessions", payload) res = res.json() if res.get("code") != 0: - raise Exception(res.get("message")) \ No newline at end of file + raise Exception(res.get("message")) diff --git a/sdk/python/ragflow_sdk/modules/chat.py b/sdk/python/ragflow_sdk/modules/chat.py index 474fa54b8..7b5c94725 100644 --- a/sdk/python/ragflow_sdk/modules/chat.py +++ b/sdk/python/ragflow_sdk/modules/chat.py @@ -88,8 +88,8 @@ class Chat(Base): return result_list raise Exception(res["message"]) - def delete_sessions(self, ids: list[str] | None = None): - res = self.rm(f"/chats/{self.id}/sessions", {"ids": ids}) + def delete_sessions(self, ids: list[str] | None = None, delete_all: bool = False): + res = self.rm(f"/chats/{self.id}/sessions", {"ids": ids, "delete_all": delete_all}) res = res.json() if res.get("code") != 0: raise Exception(res.get("message")) diff --git a/sdk/python/ragflow_sdk/modules/dataset.py b/sdk/python/ragflow_sdk/modules/dataset.py index d2d689da3..b686dceec 100644 --- a/sdk/python/ragflow_sdk/modules/dataset.py +++ b/sdk/python/ragflow_sdk/modules/dataset.py @@ -95,8 +95,8 @@ class DataSet(Base): return documents raise Exception(res["message"]) - def delete_documents(self, ids: list[str] | None = None): - res = self.rm(f"/datasets/{self.id}/documents", {"ids": ids}) + def delete_documents(self, ids: list[str] | None = None, delete_all: bool = False): + res = self.rm(f"/datasets/{self.id}/documents", {"ids": ids, "delete_all": delete_all}) res = res.json() if res.get("code") != 0: raise Exception(res["message"]) diff --git a/sdk/python/ragflow_sdk/modules/document.py b/sdk/python/ragflow_sdk/modules/document.py index c96698079..ebbf553c8 100644 --- a/sdk/python/ragflow_sdk/modules/document.py +++ b/sdk/python/ragflow_sdk/modules/document.py @@ -94,8 +94,8 @@ class Document(Base): return Chunk(self.rag, res["data"].get("chunk")) raise Exception(res.get("message")) - def delete_chunks(self, ids: list[str] | None = None): - res = self.rm(f"/datasets/{self.dataset_id}/documents/{self.id}/chunks", {"chunk_ids": ids}) + def delete_chunks(self, ids: list[str] | None = None, delete_all: bool = False): + res = self.rm(f"/datasets/{self.dataset_id}/documents/{self.id}/chunks", {"chunk_ids": ids, "delete_all": delete_all}) res = res.json() if res.get("code") != 0: raise Exception(res.get("message")) diff --git a/sdk/python/ragflow_sdk/ragflow.py b/sdk/python/ragflow_sdk/ragflow.py index 764bf8d7e..ff4f423c4 100644 --- a/sdk/python/ragflow_sdk/ragflow.py +++ b/sdk/python/ragflow_sdk/ragflow.py @@ -79,8 +79,8 @@ class RAGFlow: return DataSet(self, res["data"]) raise Exception(res["message"]) - def delete_datasets(self, ids: list[str] | None = None): - res = self.delete("/datasets", {"ids": ids}) + def delete_datasets(self, ids: list[str] | None = None, delete_all: bool = False): + res = self.delete("/datasets", {"ids": ids, "delete_all": delete_all}) res = res.json() if res.get("code") != 0: raise Exception(res["message"]) @@ -185,8 +185,8 @@ class RAGFlow: return Chat(self, res["data"]) raise Exception(res["message"]) - def delete_chats(self, ids: list[str] | None = None): - res = self.delete("/chats", {"ids": ids}) + def delete_chats(self, ids: list[str] | None = None, delete_all: bool = False): + res = self.delete("/chats", {"ids": ids, "delete_all": delete_all}) res = res.json() if res.get("code") != 0: raise Exception(res["message"]) diff --git a/test/testcases/test_http_api/common.py b/test/testcases/test_http_api/common.py index 592d35c3c..2678879f9 100644 --- a/test/testcases/test_http_api/common.py +++ b/test/testcases/test_http_api/common.py @@ -59,20 +59,7 @@ def delete_datasets(auth, payload=None, *, headers=HEADERS, data=None): def delete_all_datasets(auth, *, page_size=1000): - # Dataset DELETE now treats null/empty ids as a no-op, so cleanup must enumerate explicit ids. - page = 1 - dataset_ids = [] - while True: - res = list_datasets(auth, {"page": page, "page_size": page_size}) - data = res.get("data") or [] - dataset_ids.extend(dataset["id"] for dataset in data) - if len(data) < page_size: - break - page += 1 - - if not dataset_ids: - return {"code": 0, "message": ""} - return delete_datasets(auth, {"ids": dataset_ids}) + return delete_datasets(auth, {"ids": None, "delete_all": True}) def batch_create_datasets(auth, num): @@ -146,20 +133,7 @@ def delete_documents(auth, dataset_id, payload=None): def delete_all_documents(auth, dataset_id, *, page_size=1000): - # Document DELETE now treats missing/null/empty ids as a no-op, so cleanup must enumerate explicit ids. - page = 1 - document_ids = [] - while True: - res = list_documents(auth, dataset_id, {"page": page, "page_size": page_size}) - docs = (res.get("data") or {}).get("docs") or [] - document_ids.extend(doc["id"] for doc in docs) - if len(docs) < page_size: - break - page += 1 - - if not document_ids: - return {"code": 0, "message": ""} - return delete_documents(auth, dataset_id, {"ids": document_ids}) + return delete_documents(auth, dataset_id, {"ids": None, "delete_all": True}) def parse_documents(auth, dataset_id, payload=None): @@ -212,20 +186,7 @@ def delete_chunks(auth, dataset_id, document_id, payload=None): def delete_all_chunks(auth, dataset_id, document_id, *, page_size=1000): - # Chunk DELETE now treats missing/null/empty ids as a no-op, so cleanup must enumerate explicit ids. - page = 1 - chunk_ids = [] - while True: - res = list_chunks(auth, dataset_id, document_id, {"page": page, "page_size": page_size}) - chunks = (res.get("data") or {}).get("chunks") or [] - chunk_ids.extend(chunk["id"] for chunk in chunks) - if len(chunks) < page_size: - break - page += 1 - - if not chunk_ids: - return {"code": 0, "message": ""} - return delete_chunks(auth, dataset_id, document_id, {"chunk_ids": chunk_ids}) + return delete_chunks(auth, dataset_id, document_id, {"chunk_ids": None, "delete_all": True}) def retrieval_chunks(auth, payload=None): @@ -268,20 +229,7 @@ def delete_chat_assistants(auth, payload=None): def delete_all_chat_assistants(auth, *, page_size=1000): - # Chat DELETE now treats null/empty ids as a no-op, so cleanup must enumerate explicit ids. - page = 1 - chat_ids = [] - while True: - res = list_chat_assistants(auth, {"page": page, "page_size": page_size}) - data = res.get("data") or [] - chat_ids.extend(chat["id"] for chat in data) - if len(data) < page_size: - break - page += 1 - - if not chat_ids: - return {"code": 0, "message": ""} - return delete_chat_assistants(auth, {"ids": chat_ids}) + return delete_chat_assistants(auth, {"ids": None, "delete_all": True}) def batch_create_chat_assistants(auth, num): @@ -318,20 +266,7 @@ def delete_session_with_chat_assistants(auth, chat_assistant_id, payload=None): def delete_all_sessions_with_chat_assistant(auth, chat_assistant_id, *, page_size=1000): - # Session DELETE now treats missing/null/empty ids as a no-op, so cleanup must enumerate explicit ids. - page = 1 - session_ids = [] - while True: - res = list_session_with_chat_assistants(auth, chat_assistant_id, {"page": page, "page_size": page_size}) - data = res.get("data") or [] - session_ids.extend(session["id"] for session in data) - if len(data) < page_size: - break - page += 1 - - if not session_ids: - return {"code": 0, "message": ""} - return delete_session_with_chat_assistants(auth, chat_assistant_id, {"ids": session_ids}) + return delete_session_with_chat_assistants(auth, chat_assistant_id, {"ids": None, "delete_all": True}) def batch_add_sessions_with_chat_assistant(auth, chat_assistant_id, num): @@ -439,20 +374,7 @@ def delete_agent_sessions(auth, agent_id, payload=None): def delete_all_agent_sessions(auth, agent_id, *, page_size=1000): - # Agent session DELETE now treats missing/null/empty ids as a no-op, so cleanup must enumerate explicit ids. - page = 1 - session_ids = [] - while True: - res = list_agent_sessions(auth, agent_id, {"page": page, "page_size": page_size}) - data = res.get("data") or [] - session_ids.extend(session["id"] for session in data) - if len(data) < page_size: - break - page += 1 - - if not session_ids: - return {"code": 0, "message": ""} - return delete_agent_sessions(auth, agent_id, {"ids": session_ids}) + return delete_agent_sessions(auth, agent_id, {"ids": None, "delete_all": True}) def agent_completions(auth, agent_id, payload=None): diff --git a/test/testcases/test_sdk_api/common.py b/test/testcases/test_sdk_api/common.py index 84354fc91..eebb83523 100644 --- a/test/testcases/test_sdk_api/common.py +++ b/test/testcases/test_sdk_api/common.py @@ -26,33 +26,11 @@ def batch_create_datasets(client: RAGFlow, num: int) -> list[DataSet]: def delete_all_datasets(client: RAGFlow, *, page_size: int = 1000) -> None: - # Dataset DELETE now treats null/empty ids as a no-op, so cleanup must enumerate explicit ids. - page = 1 - dataset_ids: list[str] = [] - while True: - datasets = client.list_datasets(page=page, page_size=page_size) - dataset_ids.extend(dataset.id for dataset in datasets) - if len(datasets) < page_size: - break - page += 1 - - if dataset_ids: - client.delete_datasets(ids=dataset_ids) + client.delete_datasets(delete_all=True) def delete_all_chats(client: RAGFlow, *, page_size: int = 1000) -> None: - # Chat DELETE now treats null/empty ids as a no-op, so cleanup must enumerate explicit ids. - page = 1 - chat_ids: list[str] = [] - while True: - chats = client.list_chats(page=page, page_size=page_size) - chat_ids.extend(chat.id for chat in chats) - if len(chats) < page_size: - break - page += 1 - - if chat_ids: - client.delete_chats(ids=chat_ids) + client.delete_chats(delete_all=True) # FILE MANAGEMENT WITHIN DATASET @@ -68,48 +46,15 @@ def bulk_upload_documents(dataset: DataSet, num: int, tmp_path: Path) -> list[Do def delete_all_documents(dataset: DataSet, *, page_size: int = 1000) -> None: - # Document DELETE now treats missing/null/empty ids as a no-op, so cleanup must enumerate explicit ids. - page = 1 - document_ids: list[str] = [] - while True: - documents = dataset.list_documents(page=page, page_size=page_size) - document_ids.extend(document.id for document in documents) - if len(documents) < page_size: - break - page += 1 - - if document_ids: - dataset.delete_documents(ids=document_ids) + dataset.delete_documents(delete_all=True) def delete_all_sessions(chat_assistant: Chat, *, page_size: int = 1000) -> None: - # Session DELETE now treats missing/null/empty ids as a no-op, so cleanup must enumerate explicit ids. - page = 1 - session_ids: list[str] = [] - while True: - sessions = chat_assistant.list_sessions(page=page, page_size=page_size) - session_ids.extend(session.id for session in sessions) - if len(sessions) < page_size: - break - page += 1 - - if session_ids: - chat_assistant.delete_sessions(ids=session_ids) + chat_assistant.delete_sessions(delete_all=True) def delete_all_chunks(document: Document, *, page_size: int = 1000) -> None: - # Chunk DELETE now treats missing/null/empty ids as a no-op, so cleanup must enumerate explicit ids. - page = 1 - chunk_ids: list[str] = [] - while True: - chunks = document.list_chunks(page=page, page_size=page_size) - chunk_ids.extend(chunk.id for chunk in chunks) - if len(chunks) < page_size: - break - page += 1 - - if chunk_ids: - document.delete_chunks(ids=chunk_ids) + document.delete_chunks(delete_all=True) # CHUNK MANAGEMENT WITHIN DATASET