diff --git a/admin/client/ragflow_client.py b/admin/client/ragflow_client.py index b9f04783c..084057bf8 100644 --- a/admin/client/ragflow_client.py +++ b/admin/client/ragflow_client.py @@ -1325,7 +1325,7 @@ class RAGFlowClient: print(f"Documents {document_names} not found in {dataset_name}") payload = {"doc_ids": document_ids, "run": 1} - response = self.http_client.request("POST", "/document/run", json_body=payload, use_api_base=False, + response = self.http_client.request("POST", "/documents/ingest", json_body=payload, use_api_base=True, auth_kind="web") res_json = response.json() if response.status_code == 200 and res_json["code"] == 0: @@ -1351,7 +1351,7 @@ class RAGFlowClient: document_ids.append(doc["id"]) payload = {"doc_ids": document_ids, "run": 1} - response = self.http_client.request("POST", "/document/run", json_body=payload, use_api_base=False, + response = self.http_client.request("POST", "/documents/ingest", json_body=payload, use_api_base=True, auth_kind="web") res_json = response.json() if response.status_code == 200 and res_json["code"] == 0: diff --git a/api/apps/document_app.py b/api/apps/document_app.py index a468014a8..766430a8b 100644 --- a/api/apps/document_app.py +++ b/api/apps/document_app.py @@ -20,11 +20,8 @@ from quart import make_response, request from api.apps import current_user, login_required from api.constants import IMG_BASE64_PREFIX from api.db import FileType -from api.db.db_models import Task from api.db.services.document_service import DocumentService from api.db.services.file2document_service import File2DocumentService -from api.db.services.knowledgebase_service import KnowledgebaseService -from api.db.services.task_service import TaskService, cancel_all_task_of from api.utils.api_utils import ( get_data_error_result, get_json_result, @@ -58,69 +55,6 @@ def thumbnails(): return server_error_response(e) -@manager.route("/run", methods=["POST"]) # noqa: F821 -@login_required -@validate_request("doc_ids", "run") -async def run(): - req = await get_request_json() - uid = current_user.id - try: - - def _run_sync(): - for doc_id in req["doc_ids"]: - if not DocumentService.accessible(doc_id, uid): - return get_json_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR) - - kb_table_num_map = {} - for id in req["doc_ids"]: - info = {"run": str(req["run"]), "progress": 0} - if str(req["run"]) == TaskStatus.RUNNING.value and req.get("delete", False): - info["progress_msg"] = "" - info["chunk_num"] = 0 - info["token_num"] = 0 - - tenant_id = DocumentService.get_tenant_id(id) - if not tenant_id: - return get_data_error_result(message="Tenant not found!") - e, doc = DocumentService.get_by_id(id) - if not e: - return get_data_error_result(message="Document not found!") - - if str(req["run"]) == TaskStatus.CANCEL.value: - tasks = list(TaskService.query(doc_id=id)) - has_unfinished_task = any((task.progress or 0) < 1 for task in tasks) - if str(doc.run) in [TaskStatus.RUNNING.value, TaskStatus.CANCEL.value] or has_unfinished_task: - cancel_all_task_of(id) - else: - return get_data_error_result(message="Cannot cancel a task that is not in RUNNING status") - if all([("delete" not in req or req["delete"]), str(req["run"]) == TaskStatus.RUNNING.value, str(doc.run) == TaskStatus.DONE.value]): - DocumentService.clear_chunk_num_when_rerun(doc.id) - - DocumentService.update_by_id(id, info) - if req.get("delete", False): - TaskService.filter_delete([Task.doc_id == id]) - if settings.docStoreConn.index_exist(search.index_name(tenant_id), doc.kb_id): - settings.docStoreConn.delete({"doc_id": id}, search.index_name(tenant_id), doc.kb_id) - - if str(req["run"]) == TaskStatus.RUNNING.value: - if req.get("apply_kb"): - e, kb = KnowledgebaseService.get_by_id(doc.kb_id) - if not e: - raise LookupError("Can't find this dataset!") - doc.parser_config["llm_id"] = kb.parser_config.get("llm_id") - doc.parser_config["enable_metadata"] = kb.parser_config.get("enable_metadata", False) - doc.parser_config["metadata"] = kb.parser_config.get("metadata", {}) - DocumentService.update_parser_config(doc.id, doc.parser_config) - doc_dict = doc.to_dict() - DocumentService.run(tenant_id, doc_dict, kb_table_num_map) - - return get_json_result(data=True) - - return await thread_pool_exec(_run_sync) - except Exception as e: - return server_error_response(e) - - @manager.route("/get/", methods=["GET"]) # noqa: F821 @login_required async def get(doc_id): diff --git a/api/apps/restful_apis/document_api.py b/api/apps/restful_apis/document_api.py index 1e077482c..4ad8e68f8 100644 --- a/api/apps/restful_apis/document_api.py +++ b/api/apps/restful_apis/document_api.py @@ -34,13 +34,14 @@ from api.db.db_models import Task from api.db.services.document_service import DocumentService from api.db.services.file_service import FileService from api.db.services.knowledgebase_service import KnowledgebaseService -from api.db.services.task_service import TaskService, cancel_all_task_of from api.common.check_team_permission import check_kb_team_permission +from api.db.services.task_service import TaskService, cancel_all_task_of from api.utils.api_utils import get_data_error_result, get_error_data_result, get_result, get_json_result, \ server_error_response, add_tenant_id_to_kwargs, get_request_json, get_error_argument_result, check_duplicate_ids from api.utils.validation_utils import ( UpdateDocumentReq, format_validation_error_message, validate_and_parse_json_request, DeleteDocumentReq, ) + from common import settings from common.constants import ParserType, RetCode, TaskStatus, SANDBOX_ARTIFACT_BUCKET from common.metadata_utils import convert_conditions, meta_filter, turn2jsonschema @@ -1295,6 +1296,77 @@ async def update_metadata(tenant_id, dataset_id): return get_result(data={"updated": updated, "matched_docs": len(target_doc_ids)}) +@manager.route("/documents/ingest", methods=["POST"]) # noqa: F821 +@login_required +@add_tenant_id_to_kwargs +async def ingest(tenant_id): + req = await get_request_json() + try: + user_id = tenant_id + + error_code, error_message = await thread_pool_exec(_run_sync, user_id, req) + + if error_code: + logging.error(f"error when ingest documents:{req}, error message:{error_message}") + return get_json_result(error_code, error_message) + + return get_json_result(data=True) + except Exception as e: + logging.exception("document ingest/run failed") + return server_error_response(e) + +def _run_sync(user_id:str, req): + for doc_id in req["doc_ids"]: + if not DocumentService.accessible(doc_id, user_id): + return RetCode.AUTHENTICATION_ERROR, "No authorization." + + kb_table_num_map = {} + for doc_id in req["doc_ids"]: + info = {"run": str(req["run"]), "progress": 0} + rerun_with_delete = str(req["run"]) == TaskStatus.RUNNING.value and req.get("delete", False) + if rerun_with_delete: + info["progress_msg"] = "" + info["chunk_num"] = 0 + info["token_num"] = 0 + + doc_tenant_id = DocumentService.get_tenant_id(doc_id) + if not doc_tenant_id: + return RetCode.DATA_ERROR, "Tenant not found!" + e, doc = DocumentService.get_by_id(doc_id) + if not e: + return RetCode.DATA_ERROR, "Document not found!" + + if str(req["run"]) == TaskStatus.CANCEL.value: + tasks = list(TaskService.query(doc_id=doc_id)) + has_unfinished_task = any((task.progress or 0) < 1 for task in tasks) + if str(doc.run) in [TaskStatus.RUNNING.value, TaskStatus.CANCEL.value] or has_unfinished_task: + cancel_all_task_of(doc_id) + else: + return RetCode.DATA_ERROR, "Cannot cancel a task that is not in RUNNING status" + if all([rerun_with_delete, str(doc.run) == TaskStatus.DONE.value]): + DocumentService.clear_chunk_num_when_rerun(doc_id) + + DocumentService.update_by_id(doc_id, info) + if req.get("delete", False): + TaskService.filter_delete([Task.doc_id == doc_id]) + if settings.docStoreConn.index_exist(search.index_name(doc_tenant_id), doc.kb_id): + settings.docStoreConn.delete({"doc_id": doc_id}, search.index_name(doc_tenant_id), doc.kb_id) + + if str(req["run"]) == TaskStatus.RUNNING.value: + if req.get("apply_kb"): + e, kb = KnowledgebaseService.get_by_id(doc.kb_id) + if not e: + raise LookupError("Can't find this dataset!") + doc.parser_config["llm_id"] = kb.parser_config.get("llm_id") + doc.parser_config["enable_metadata"] = kb.parser_config.get("enable_metadata", False) + doc.parser_config["metadata"] = kb.parser_config.get("metadata", {}) + DocumentService.update_parser_config(doc.id, doc.parser_config) + doc_dict = doc.to_dict() + DocumentService.run(doc_tenant_id, doc_dict, kb_table_num_map) + + return None, None + + @manager.route("/datasets//documents/parse", methods=["POST"]) # noqa: F821 @login_required @add_tenant_id_to_kwargs diff --git a/sdk/python/test/test_frontend_api/common.py b/sdk/python/test/test_frontend_api/common.py index 7e09041eb..aafe64a59 100644 --- a/sdk/python/test/test_frontend_api/common.py +++ b/sdk/python/test/test_frontend_api/common.py @@ -106,7 +106,7 @@ def get_docs_info(auth, dataset_id, doc_ids=None, doc_id=None): def parse_docs(auth, doc_ids): authorization = {"Authorization": auth} json_req = {"doc_ids": doc_ids, "run": 1} - url = f"{HOST_ADDRESS}/v1/document/run" + url = f"{HOST_ADDRESS}/api/v1/documents/ingest" res = requests.post(url=url, headers=authorization, json=json_req) return res.json() diff --git a/test/testcases/test_web_api/test_common.py b/test/testcases/test_web_api/test_common.py index 4183a3fdc..8d687f028 100644 --- a/test/testcases/test_web_api/test_common.py +++ b/test/testcases/test_web_api/test_common.py @@ -405,7 +405,7 @@ def delete_document(auth, dataset_id, payload=None, *, headers=HEADERS, data=Non def parse_documents(auth, payload=None, *, headers=HEADERS, data=None): - res = requests.post(url=f"{HOST_ADDRESS}{DOCUMENT_APP_URL}/run", headers=headers, auth=auth, json=payload, data=data) + res = requests.post(url=f"{HOST_ADDRESS}/api/{VERSION}/documents/ingest", headers=headers, auth=auth, json=payload, data=data) return res.json() diff --git a/test/testcases/test_web_api/test_document_app/test_paser_documents.py b/test/testcases/test_web_api/test_document_app/test_paser_documents.py index 79d6e2697..4a3980093 100644 --- a/test/testcases/test_web_api/test_document_app/test_paser_documents.py +++ b/test/testcases/test_web_api/test_document_app/test_paser_documents.py @@ -15,7 +15,6 @@ # import asyncio from concurrent.futures import ThreadPoolExecutor, as_completed -from types import SimpleNamespace import pytest from test_common import bulk_upload_documents, list_documents, parse_documents @@ -124,6 +123,102 @@ class TestDocumentsParse: assert res["code"] == 109, res assert res["message"] == "No authorization.", res + @pytest.mark.p2 + def test_document_not_found(self, WebApiAuth, add_documents_func): + """Test document not found error.""" + kb_id, document_ids = add_documents_func + + # Try to parse a non-existent document + res = parse_documents(WebApiAuth, {"doc_ids": ["non_existent_doc_id"], "run": "1"}) + assert res["code"] == 109, res + assert "No authorization" in res["message"], res + + @pytest.mark.p2 + def test_cancel_non_running_task_error(self, WebApiAuth, add_documents_func): + """Test cancel error when task is not in RUNNING status.""" + kb_id, document_ids = add_documents_func + doc_id = document_ids[0] + + # First, run the document parsing + res = parse_documents(WebApiAuth, {"doc_ids": [doc_id], "run": "1"}) + assert res["code"] == 0, res + + # Wait for parsing to complete + condition(WebApiAuth, kb_id, [doc_id]) + validate_document_parse_done(WebApiAuth, kb_id, [doc_id]) + + # Now try to cancel a completed task - should fail + res = parse_documents(WebApiAuth, {"doc_ids": [doc_id], "run": "2"}) + assert res["code"] == 102, res + assert res["message"] == "Cannot cancel a task that is not in RUNNING status", res + + @pytest.mark.p2 + def test_rerun_with_delete(self, WebApiAuth, add_documents_func): + """Test rerun with delete scenario.""" + kb_id, document_ids = add_documents_func + doc_id = document_ids[0] + + # First, run the document parsing + res = parse_documents(WebApiAuth, {"doc_ids": [doc_id], "run": "1"}) + assert res["code"] == 0, res + + # Wait for parsing to complete + condition(WebApiAuth, kb_id, [doc_id]) + validate_document_parse_done(WebApiAuth, kb_id, [doc_id]) + + # Verify document has chunks + res = list_documents(WebApiAuth, {"kb_id": kb_id}) + doc = next((d for d in res["data"]["docs"] if d["id"] == doc_id), None) + assert doc is not None + assert doc["chunk_count"] > 0, "Document should have chunks after parsing" + + # Now rerun with delete - this should clear chunks and re-parse + res = parse_documents(WebApiAuth, {"doc_ids": [doc_id], "run": "1", "delete": True}) + assert res["code"] == 0, res + + # Wait for parsing to complete + condition(WebApiAuth, kb_id, [doc_id]) + validate_document_parse_done(WebApiAuth, kb_id, [doc_id]) + + @pytest.mark.p2 + def test_apply_kb_dataset_not_found(self, WebApiAuth, add_documents_func): + """Test apply_kb when dataset is not found.""" + kb_id, document_ids = add_documents_func + doc_id = document_ids[0] + + # Try to apply_kb with a non-existent dataset - this is tricky to test + # because we can't easily delete the dataset after getting the doc_id + # This test verifies the happy path works + res = parse_documents(WebApiAuth, {"doc_ids": [doc_id], "run": "1"}) + assert res["code"] == 0, res + + # Wait for parsing to complete + condition(WebApiAuth, kb_id, [doc_id]) + validate_document_parse_done(WebApiAuth, kb_id, [doc_id]) + + @pytest.mark.p2 + def test_successful_parse(self, WebApiAuth, add_documents_func): + """Test successful document parsing.""" + kb_id, document_ids = add_documents_func + doc_id = document_ids[0] + + # Run the document parsing + res = parse_documents(WebApiAuth, {"doc_ids": [doc_id], "run": "1"}) + assert res["code"] == 0, res + + # Wait for parsing to complete + condition(WebApiAuth, kb_id, [doc_id]) + validate_document_parse_done(WebApiAuth, kb_id, [doc_id]) + + # Verify the document is properly parsed + res = list_documents(WebApiAuth, {"kb_id": kb_id}) + doc = next((d for d in res["data"]["docs"] if d["id"] == doc_id), None) + assert doc is not None + assert doc["run"] == "DONE" + assert doc["chunk_count"] > 0 + assert len(doc["process_begin_at"]) > 0 + assert doc["process_duration"] > 0 + @pytest.mark.p3 def test_repeated_parse(self, WebApiAuth, add_documents_func): kb_id, document_ids = add_documents_func @@ -199,94 +294,6 @@ def test_concurrent_parse(WebApiAuth, add_dataset_func, tmp_path): validate_document_parse_done(WebApiAuth, kb_id, document_ids) -@pytest.mark.p2 -class TestDocumentsParseUnit: - def test_run_branch_matrix_unit(self, document_app_module, monkeypatch): - module = document_app_module - calls = {"clear": [], "filter_delete": [], "docstore_delete": [], "cancel": [], "run": []} - - async def fake_thread_pool_exec(func, *args, **kwargs): - return func(*args, **kwargs) - - monkeypatch.setattr(module, "thread_pool_exec", fake_thread_pool_exec) - monkeypatch.setattr(module, "server_error_response", lambda e: {"code": 500, "message": str(e)}) - monkeypatch.setattr(module.search, "index_name", lambda tenant_id: f"idx_{tenant_id}") - monkeypatch.setattr(module, "cancel_all_task_of", lambda doc_id: calls["cancel"].append(doc_id)) - - class _DocStore: - def index_exist(self, _index_name, _kb_id): - return True - - def delete(self, where, _index_name, _kb_id): - calls["docstore_delete"].append(where["doc_id"]) - - monkeypatch.setattr(module.settings, "docStoreConn", _DocStore()) - - async def set_request(payload): - return payload - - def apply_request(payload): - async def fake_request_json(): - return await set_request(payload) - - monkeypatch.setattr(module, "get_request_json", fake_request_json) - - apply_request({"doc_ids": ["doc1"], "run": module.TaskStatus.RUNNING.value}) - monkeypatch.setattr(module.DocumentService, "accessible", lambda *_args, **_kwargs: False) - res = _run(module.run.__wrapped__()) - assert res["code"] == module.RetCode.AUTHENTICATION_ERROR - - monkeypatch.setattr(module.DocumentService, "accessible", lambda *_args, **_kwargs: True) - monkeypatch.setattr(module.DocumentService, "get_tenant_id", lambda _doc_id: None) - res = _run(module.run.__wrapped__()) - assert res["code"] == module.RetCode.DATA_ERROR - assert "Tenant not found!" in res["message"] - - monkeypatch.setattr(module.DocumentService, "get_tenant_id", lambda _doc_id: "tenant1") - monkeypatch.setattr(module.DocumentService, "get_by_id", lambda _doc_id: (False, None)) - res = _run(module.run.__wrapped__()) - assert res["code"] == module.RetCode.DATA_ERROR - assert "Document not found!" in res["message"] - - apply_request({"doc_ids": ["doc1"], "run": module.TaskStatus.CANCEL.value}) - doc_cancel = SimpleNamespace(id="doc1", run=module.TaskStatus.DONE.value, kb_id="kb1", parser_config={}, to_dict=lambda: {"id": "doc1"}) - monkeypatch.setattr(module.DocumentService, "get_by_id", lambda _doc_id: (True, doc_cancel)) - monkeypatch.setattr(module.TaskService, "query", lambda **_kwargs: [SimpleNamespace(progress=1)]) - res = _run(module.run.__wrapped__()) - assert res["code"] == module.RetCode.DATA_ERROR - assert "Cannot cancel a task that is not in RUNNING status" in res["message"] - - apply_request({"doc_ids": ["doc1"], "run": module.TaskStatus.RUNNING.value, "delete": True}) - doc_rerun = SimpleNamespace(id="doc1", run=module.TaskStatus.DONE.value, kb_id="kb1", parser_config={}, to_dict=lambda: {"id": "doc1"}) - monkeypatch.setattr(module.DocumentService, "get_by_id", lambda _doc_id: (True, doc_rerun)) - monkeypatch.setattr(module.DocumentService, "clear_chunk_num_when_rerun", lambda doc_id: calls["clear"].append(doc_id)) - monkeypatch.setattr(module.TaskService, "filter_delete", lambda _filters: calls["filter_delete"].append(True)) - monkeypatch.setattr(module.DocumentService, "update_by_id", lambda *_args, **_kwargs: True) - monkeypatch.setattr(module.DocumentService, "run", lambda tenant_id, doc_dict, _kb_map: calls["run"].append((tenant_id, doc_dict))) - res = _run(module.run.__wrapped__()) - assert res["code"] == 0 - assert calls["clear"] == ["doc1"] - assert calls["filter_delete"] == [True] - assert calls["docstore_delete"] == ["doc1"] - assert calls["run"] == [("tenant1", {"id": "doc1"})] - - apply_request({"doc_ids": ["doc1"], "run": module.TaskStatus.RUNNING.value, "apply_kb": True}) - monkeypatch.setattr(module.KnowledgebaseService, "get_by_id", lambda _kb_id: (False, None)) - res = _run(module.run.__wrapped__()) - assert res["code"] == 500 - assert "Can't find this dataset!" in res["message"] - - apply_request({"doc_ids": ["doc1"], "run": module.TaskStatus.RUNNING.value}) - - def raise_run_error(*_args, **_kwargs): - raise RuntimeError("run boom") - - monkeypatch.setattr(module.DocumentService, "run", raise_run_error) - res = _run(module.run.__wrapped__()) - assert res["code"] == 500 - assert "run boom" in res["message"] - - # @pytest.mark.skip class TestDocumentsParseStop: @pytest.mark.parametrize( diff --git a/web/src/hooks/use-document-request.ts b/web/src/hooks/use-document-request.ts index 9f1e7b07d..3ac6b9735 100644 --- a/web/src/hooks/use-document-request.ts +++ b/web/src/hooks/use-document-request.ts @@ -301,7 +301,7 @@ export const useRunDocument = () => { queryClient.invalidateQueries({ queryKey: [DocumentApiAction.FetchDocumentList], }); - const ret = await kbService.documentRun({ + const ret = await kbService.documentIngest({ doc_ids: documentIds, run, ...(option || {}), diff --git a/web/src/services/knowledge-service.ts b/web/src/services/knowledge-service.ts index c571c437e..4e570f867 100644 --- a/web/src/services/knowledge-service.ts +++ b/web/src/services/knowledge-service.ts @@ -19,7 +19,7 @@ const { documentChangeParser, documentThumbnails, retrievalTest, - documentRun, + documentIngest, documentUpload, webCrawl, knowledgeGraph, @@ -47,8 +47,8 @@ const methods = { url: documentChangeStatus, method: 'post', }, - documentRun: { - url: documentRun, + documentIngest: { + url: documentIngest, method: 'post', }, documentChangeParser: { diff --git a/web/src/utils/api.ts b/web/src/utils/api.ts index 3b46dba6a..a2551d1da 100644 --- a/web/src/utils/api.ts +++ b/web/src/utils/api.ts @@ -119,9 +119,9 @@ export default { `${restAPIv1}/datasets/${datasetId}/documents`, documentRename: (datasetId: string, documentId: string) => `${restAPIv1}/datasets/${datasetId}/documents/${documentId}`, + documentIngest: `${restAPIv1}/documents/ingest`, documentCreate: (datasetId: string) => `${restAPIv1}/datasets/${datasetId}/documents?type=empty`, - documentRun: `${webAPI}/document/run`, documentChangeParser: `${webAPI}/document/change_parser`, documentThumbnails: `${webAPI}/document/thumbnails`, getDocumentFile: `${webAPI}/document/get`,