Refa: unify document create flows under REST documents API (#14345)

### What problem does this PR solve?

unify document create flows under REST documents API

### Type of change

- [x] Refactoring
This commit is contained in:
buua436
2026-04-27 10:18:16 +08:00
committed by GitHub
parent 4dcc42e0e1
commit a9e5724b46
10 changed files with 454 additions and 258 deletions

View File

@ -15,16 +15,14 @@
#
import os.path
import re
from pathlib import Path, PurePosixPath, PureWindowsPath
from pathlib import PurePosixPath, PureWindowsPath
from quart import make_response, request
from api.apps import current_user, login_required
from api.common.check_team_permission import check_kb_team_permission
from api.constants import FILE_NAME_LEN_LIMIT, IMG_BASE64_PREFIX
from api.constants import IMG_BASE64_PREFIX
from api.db import FileType
from api.db.db_models import Task
from api.db.services import duplicate_name
from api.db.services.document_service import DocumentService, doc_upload_and_parse
from api.db.services.file2document_service import File2DocumentService
from api.db.services.file_service import FileService
@ -37,12 +35,11 @@ from api.utils.api_utils import (
server_error_response,
validate_request,
)
from api.utils.file_utils import filename_type, thumbnail
from api.utils.web_utils import CONTENT_TYPE_MAP, apply_safe_file_response_headers, html2pdf, is_valid_url
from api.utils.web_utils import CONTENT_TYPE_MAP, apply_safe_file_response_headers, is_valid_url
from common import settings
from common.constants import SANDBOX_ARTIFACT_BUCKET, ParserType, RetCode, TaskStatus
from common.constants import SANDBOX_ARTIFACT_BUCKET, RetCode, TaskStatus
from common.file_utils import get_project_base_directory
from common.misc_utils import get_uuid, thread_pool_exec
from common.misc_utils import thread_pool_exec
from common.ssrf_guard import assert_url_is_safe
from deepdoc.parser.html_parser import RAGFlowHtmlParser
from rag.nlp import search
@ -60,128 +57,6 @@ def _is_safe_download_filename(name: str) -> bool:
return True
@manager.route("/web_crawl", methods=["POST"]) # noqa: F821
@login_required
@validate_request("kb_id", "name", "url")
async def web_crawl():
form = await request.form
kb_id = form.get("kb_id")
if not kb_id:
return get_json_result(data=False, message='Lack of "KB ID"', code=RetCode.ARGUMENT_ERROR)
name = form.get("name")
url = form.get("url")
if not is_valid_url(url):
return get_json_result(data=False, message="The URL format is invalid", code=RetCode.ARGUMENT_ERROR)
e, kb = KnowledgebaseService.get_by_id(kb_id)
if not e:
raise LookupError("Can't find this dataset!")
if not check_kb_team_permission(kb, current_user.id):
return get_json_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR)
blob = html2pdf(url)
if not blob:
return server_error_response(ValueError("Download failure."))
root_folder = FileService.get_root_folder(current_user.id)
pf_id = root_folder["id"]
FileService.init_knowledgebase_docs(pf_id, current_user.id)
kb_root_folder = FileService.get_kb_folder(current_user.id)
kb_folder = FileService.new_a_file_from_kb(kb.tenant_id, kb.name, kb_root_folder["id"])
try:
filename = duplicate_name(DocumentService.query, name=name + ".pdf", kb_id=kb.id)
filetype = filename_type(filename)
if filetype == FileType.OTHER.value:
raise RuntimeError("This type of file has not been supported yet!")
location = filename
while settings.STORAGE_IMPL.obj_exist(kb_id, location):
location += "_"
settings.STORAGE_IMPL.put(kb_id, location, blob)
doc = {
"id": get_uuid(),
"kb_id": kb.id,
"parser_id": kb.parser_id,
"parser_config": kb.parser_config,
"created_by": current_user.id,
"type": filetype,
"name": filename,
"location": location,
"size": len(blob),
"thumbnail": thumbnail(filename, blob),
"suffix": Path(filename).suffix.lstrip("."),
}
if doc["type"] == FileType.VISUAL:
doc["parser_id"] = ParserType.PICTURE.value
if doc["type"] == FileType.AURAL:
doc["parser_id"] = ParserType.AUDIO.value
if re.search(r"\.(ppt|pptx|pages)$", filename):
doc["parser_id"] = ParserType.PRESENTATION.value
if re.search(r"\.(eml)$", filename):
doc["parser_id"] = ParserType.EMAIL.value
DocumentService.insert(doc)
FileService.add_file_from_kb(doc, kb_folder["id"], kb.tenant_id)
except Exception as e:
return server_error_response(e)
return get_json_result(data=True)
@manager.route("/create", methods=["POST"]) # noqa: F821
@login_required
@validate_request("name", "kb_id")
async def create():
req = await get_request_json()
kb_id = req["kb_id"]
if not kb_id:
return get_json_result(data=False, message='Lack of "KB ID"', code=RetCode.ARGUMENT_ERROR)
if len(req["name"].encode("utf-8")) > FILE_NAME_LEN_LIMIT:
return get_json_result(data=False, message=f"File name must be {FILE_NAME_LEN_LIMIT} bytes or less.", code=RetCode.ARGUMENT_ERROR)
if req["name"].strip() == "":
return get_json_result(data=False, message="File name can't be empty.", code=RetCode.ARGUMENT_ERROR)
req["name"] = req["name"].strip()
try:
e, kb = KnowledgebaseService.get_by_id(kb_id)
if not e:
return get_data_error_result(message="Can't find this dataset!")
if DocumentService.query(name=req["name"], kb_id=kb_id):
return get_data_error_result(message="Duplicated document name in the same dataset.")
kb_root_folder = FileService.get_kb_folder(kb.tenant_id)
if not kb_root_folder:
return get_data_error_result(message="Cannot find the root folder.")
kb_folder = FileService.new_a_file_from_kb(
kb.tenant_id,
kb.name,
kb_root_folder["id"],
)
if not kb_folder:
return get_data_error_result(message="Cannot find the kb folder for this file.")
doc = DocumentService.insert(
{
"id": get_uuid(),
"kb_id": kb.id,
"parser_id": kb.parser_id,
"pipeline_id": kb.pipeline_id,
"parser_config": kb.parser_config,
"created_by": current_user.id,
"type": FileType.VIRTUAL,
"name": req["name"],
"suffix": Path(req["name"]).suffix.lstrip("."),
"location": "",
"size": 0,
}
)
FileService.add_file_from_kb(doc.to_dict(), kb_folder["id"], kb.tenant_id)
return get_json_result(data=doc.to_json())
except Exception as e:
return server_error_response(e)
@manager.route("/thumbnails", methods=["GET"]) # noqa: F821
# @login_required

View File

@ -15,6 +15,8 @@
#
import logging
import json
import re
from pathlib import Path
from quart import request
from peewee import OperationalError
@ -23,8 +25,9 @@ from pydantic import ValidationError
from api.apps import login_required
from api.apps.services.document_api_service import validate_document_update_fields, map_doc_keys, \
map_doc_keys_with_run_status, update_document_name_only, update_chunk_method_only, update_document_status_only
from api.constants import IMG_BASE64_PREFIX
from api.db import VALID_FILE_TYPES
from api.constants import FILE_NAME_LEN_LIMIT, IMG_BASE64_PREFIX
from api.db import FileType, VALID_FILE_TYPES
from api.db.services import duplicate_name
from api.db.services.doc_metadata_service import DocMetadataService
from api.db.db_models import Task
from api.db.services.document_service import DocumentService
@ -38,9 +41,11 @@ from api.utils.validation_utils import (
UpdateDocumentReq, format_validation_error_message, validate_and_parse_json_request, DeleteDocumentReq,
)
from common import settings
from common.constants import RetCode, TaskStatus
from common.constants import ParserType, RetCode, TaskStatus
from common.metadata_utils import convert_conditions, meta_filter, turn2jsonschema
from common.misc_utils import thread_pool_exec
from common.misc_utils import get_uuid, thread_pool_exec
from api.utils.file_utils import filename_type, thumbnail
from api.utils.web_utils import html2pdf, is_valid_url
from rag.nlp import search
@manager.route("/datasets/<dataset_id>/documents/<document_id>", methods=["PATCH"]) # noqa: F821
@ -348,13 +353,144 @@ async def upload_document(dataset_id, tenant_id):
type: string
description: Processing status.
"""
from api.constants import FILE_NAME_LEN_LIMIT
from api.db.services.file_service import FileService
upload_type = (request.args.get("type") or "local").lower()
e, kb = KnowledgebaseService.get_by_id(dataset_id)
if not e:
logging.error(f"Can't find the dataset with ID {dataset_id}!")
return get_error_data_result(message=f"Can't find the dataset with ID {dataset_id}!", code=RetCode.DATA_ERROR)
if not check_kb_team_permission(kb, tenant_id):
logging.error("No authorization.")
return get_error_data_result(message="No authorization.", code=RetCode.AUTHENTICATION_ERROR)
if upload_type == "web":
return await _upload_web_document(dataset_id, kb, tenant_id)
if upload_type == "empty":
return await _upload_empty_document(dataset_id, kb, tenant_id)
if upload_type != "local":
return get_error_data_result(
message='`type` must be one of "local", "web", or "empty".',
code=RetCode.ARGUMENT_ERROR,
)
return await _upload_local_documents(kb, tenant_id)
async def _upload_web_document(dataset_id, kb, tenant_id):
form = await request.form
name = (form.get("name") or "").strip()
url = form.get("url")
if not name:
return get_error_data_result(message='Lack of "name"', code=RetCode.ARGUMENT_ERROR)
if not url:
return get_error_data_result(message='Lack of "url"', code=RetCode.ARGUMENT_ERROR)
if len(name.encode("utf-8")) > FILE_NAME_LEN_LIMIT:
return get_error_data_result(
message=f"File name must be {FILE_NAME_LEN_LIMIT} bytes or less.",
code=RetCode.ARGUMENT_ERROR,
)
if not is_valid_url(url):
return get_error_data_result(message="The URL format is invalid", code=RetCode.ARGUMENT_ERROR)
blob = html2pdf(url)
if not blob:
return server_error_response(ValueError("Download failure."))
root_folder = FileService.get_root_folder(tenant_id)
FileService.init_knowledgebase_docs(root_folder["id"], tenant_id)
kb_root_folder = FileService.get_kb_folder(tenant_id)
kb_folder = FileService.new_a_file_from_kb(kb.tenant_id, kb.name, kb_root_folder["id"])
try:
filename = duplicate_name(DocumentService.query, name=f"{name}.pdf", kb_id=kb.id)
filetype = filename_type(filename)
if filetype == FileType.OTHER.value:
raise RuntimeError("This type of file has not been supported yet!")
location = filename
while settings.STORAGE_IMPL.obj_exist(dataset_id, location):
location += "_"
settings.STORAGE_IMPL.put(dataset_id, location, blob)
doc = {
"id": get_uuid(),
"kb_id": kb.id,
"parser_id": kb.parser_id,
"pipeline_id": kb.pipeline_id,
"parser_config": kb.parser_config,
"created_by": tenant_id,
"type": filetype,
"name": filename,
"location": location,
"size": len(blob),
"thumbnail": thumbnail(filename, blob),
"suffix": Path(filename).suffix.lstrip("."),
}
if doc["type"] == FileType.VISUAL:
doc["parser_id"] = ParserType.PICTURE.value
if doc["type"] == FileType.AURAL:
doc["parser_id"] = ParserType.AUDIO.value
if re.search(r"\.(ppt|pptx|pages)$", filename):
doc["parser_id"] = ParserType.PRESENTATION.value
if re.search(r"\.(eml)$", filename):
doc["parser_id"] = ParserType.EMAIL.value
DocumentService.insert(doc)
FileService.add_file_from_kb(doc, kb_folder["id"], kb.tenant_id)
return get_result(data=map_doc_keys_with_run_status(doc, run_status="0"))
except Exception as e:
return server_error_response(e)
async def _upload_empty_document(dataset_id, kb, tenant_id):
req = await get_request_json()
name = (req.get("name") or "").strip()
if not name:
return get_error_data_result(message="File name can't be empty.", code=RetCode.ARGUMENT_ERROR)
if len(name.encode("utf-8")) > FILE_NAME_LEN_LIMIT:
return get_error_data_result(
message=f"File name must be {FILE_NAME_LEN_LIMIT} bytes or less.",
code=RetCode.ARGUMENT_ERROR,
)
if DocumentService.query(name=name, kb_id=dataset_id):
return get_error_data_result(message="Duplicated document name in the same dataset.")
try:
kb_root_folder = FileService.get_kb_folder(kb.tenant_id)
if not kb_root_folder:
return get_error_data_result(message="Cannot find the root folder.")
kb_folder = FileService.new_a_file_from_kb(kb.tenant_id, kb.name, kb_root_folder["id"])
if not kb_folder:
return get_error_data_result(message="Cannot find the kb folder for this file.")
doc = DocumentService.insert(
{
"id": get_uuid(),
"kb_id": kb.id,
"parser_id": kb.parser_id,
"pipeline_id": kb.pipeline_id,
"parser_config": kb.parser_config,
"created_by": tenant_id,
"type": FileType.VIRTUAL,
"name": name,
"suffix": Path(name).suffix.lstrip("."),
"location": "",
"size": 0,
}
)
FileService.add_file_from_kb(doc.to_dict(), kb_folder["id"], kb.tenant_id)
return get_result(data=map_doc_keys(doc))
except Exception as e:
return server_error_response(e)
async def _upload_local_documents(kb, tenant_id):
form = await request.form
files = await request.files
# Validation
if "file" not in files:
logging.error("No file part!")
return get_error_data_result(message="No file part!", code=RetCode.ARGUMENT_ERROR)
@ -369,18 +505,6 @@ async def upload_document(dataset_id, tenant_id):
logging.error(msg)
return get_error_data_result(message=msg, code=RetCode.ARGUMENT_ERROR)
# KB Lookup
e, kb = KnowledgebaseService.get_by_id(dataset_id)
if not e:
logging.error(f"Can't find the dataset with ID {dataset_id}!")
return get_error_data_result(message=f"Can't find the dataset with ID {dataset_id}!", code=RetCode.DATA_ERROR)
# Permission Check
if not check_kb_team_permission(kb, tenant_id):
logging.error("No authorization.")
return get_error_data_result(message="No authorization.", code=RetCode.AUTHENTICATION_ERROR)
# File Upload (async)
err, files = await thread_pool_exec(
FileService.upload_document, kb, file_objs, tenant_id,
parent_path=form.get("parent_path")
@ -396,8 +520,6 @@ async def upload_document(dataset_id, tenant_id):
return get_error_data_result(message=msg, code=RetCode.DATA_ERROR)
files = [f[0] for f in files] # remove the blob
# Check if we should return raw files without document key mapping
return_raw_files = request.args.get("return_raw_files", "false").lower() == "true"
if return_raw_files: