Refactor: Doc batch change status (#14337)

### What problem does this PR solve?

Before migration
Web API: POST /v1/document/change_status

After consolidation, Restful API
POST /api/v1/datasets/<dataset_id>/documents/batch-update-status 

### Type of change

- [x] Refactoring
This commit is contained in:
Jack
2026-04-27 20:00:23 +08:00
committed by GitHub
parent c949096db0
commit a536980e22
10 changed files with 851 additions and 88 deletions

View File

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License
#
import logging
import re
from quart import make_response, request
@ -59,80 +58,6 @@ def thumbnails():
return server_error_response(e)
@manager.route("/change_status", methods=["POST"]) # noqa: F821
@login_required
@validate_request("doc_ids", "status")
async def change_status():
req = await get_request_json()
doc_ids = req.get("doc_ids", [])
status = str(req.get("status", ""))
if status not in ["0", "1"]:
return get_json_result(data=False, message='"Status" must be either 0 or 1!', code=RetCode.ARGUMENT_ERROR)
result = {}
has_error = False
for doc_id in doc_ids:
if not DocumentService.accessible(doc_id, current_user.id):
result[doc_id] = {"error": "No authorization."}
has_error = True
continue
try:
e, doc = DocumentService.get_by_id(doc_id)
if not e:
result[doc_id] = {"error": "No authorization."}
has_error = True
continue
e, kb = KnowledgebaseService.get_by_id(doc.kb_id)
if not e:
result[doc_id] = {"error": "Can't find this dataset!"}
has_error = True
continue
current_status = str(doc.status)
if current_status == status:
result[doc_id] = {"status": status}
continue
if not DocumentService.update_by_id(doc_id, {"status": str(status)}):
result[doc_id] = {"error": "Database error (Document update)!"}
has_error = True
continue
status_int = int(status)
if getattr(doc, "chunk_num", 0) > 0:
try:
ok = settings.docStoreConn.update(
{"doc_id": doc_id},
{"available_int": status_int},
search.index_name(kb.tenant_id),
doc.kb_id,
)
except Exception:
logging.exception(
"Document store update failed in change_status: doc_id=%s kb_id=%s status=%s",
doc_id, doc.kb_id, status_int,
)
result[doc_id] = {"error": "Document store update failed."}
has_error = True
continue
if not ok:
logging.warning(
"Document store update returned False in change_status: doc_id=%s kb_id=%s status=%s",
doc_id, doc.kb_id, status_int,
)
result[doc_id] = {"error": "Document store table missing or update failed."}
has_error = True
continue
result[doc_id] = {"status": status}
except Exception as e:
result[doc_id] = {"error": f"Internal server error: {str(e)}"}
has_error = True
if has_error:
return get_json_result(data=result, message="Partial failure", code=RetCode.SERVER_ERROR)
return get_json_result(data=result)
@manager.route("/run", methods=["POST"]) # noqa: F821
@login_required
@validate_request("doc_ids", "run")
@ -195,6 +120,7 @@ async def run():
except Exception as e:
return server_error_response(e)
@manager.route("/get/<doc_id>", methods=["GET"]) # noqa: F821
@login_required
async def get(doc_id):

View File

@ -15,11 +15,11 @@
#
import logging
import json
import os.path
import os
import re
from pathlib import Path
from quart import make_response, request
from quart import request, make_response
from peewee import OperationalError
from pydantic import ValidationError
@ -42,14 +42,13 @@ from api.utils.validation_utils import (
UpdateDocumentReq, format_validation_error_message, validate_and_parse_json_request, DeleteDocumentReq,
)
from common import settings
from common.constants import ParserType, RetCode, SANDBOX_ARTIFACT_BUCKET, TaskStatus
from common.constants import ParserType, RetCode, TaskStatus, SANDBOX_ARTIFACT_BUCKET
from common.metadata_utils import convert_conditions, meta_filter, turn2jsonschema
from common.misc_utils import get_uuid, thread_pool_exec
from common.ssrf_guard import assert_url_is_safe
from api.utils.file_utils import filename_type, thumbnail
from api.utils.web_utils import html2pdf, is_valid_url
from api.utils.web_utils import html2pdf, is_valid_url, apply_safe_file_response_headers
from common.ssrf_guard import assert_url_is_safe
from rag.nlp import search
from api.utils.web_utils import apply_safe_file_response_headers
@manager.route("/documents/upload", methods=["POST"]) # noqa: F821
@ -1570,3 +1569,124 @@ async def get_artifact(filename):
return response
except Exception as e:
return server_error_response(e)
@manager.route("/datasets/<dataset_id>/documents/batch-update-status", methods=["POST"]) # noqa: F821
@login_required
@add_tenant_id_to_kwargs
async def batch_update_document_status(tenant_id, dataset_id):
"""
Batch update status of documents within a dataset.
---
tags:
- Documents
security:
- ApiKeyAuth: []
parameters:
- in: path
name: dataset_id
type: string
required: true
description: ID of the dataset.
- in: header
name: Authorization
type: string
required: true
description: Bearer token for authentication.
- in: body
name: body
description: Document status update parameters.
required: true
schema:
type: object
required:
- doc_ids
- status
properties:
doc_ids:
type: array
items:
type: string
description: List of document IDs to update.
status:
type: string
enum: ["0", "1"]
description: New status (0 = disabled, 1 = enabled).
responses:
200:
description: Document statuses updated successfully.
"""
req = await get_request_json()
doc_ids = req.get("doc_ids", [])
if not isinstance(doc_ids, list) or not doc_ids:
return get_error_argument_result(message='"doc_ids" must be a non-empty list.')
if any(not isinstance(doc_id, str) or not doc_id for doc_id in doc_ids):
return get_error_argument_result(message='"doc_ids" must contain non-empty document IDs.')
status = str(req.get("status", -1))
if status not in ["0", "1"]:
return get_error_argument_result(message=f'"Status" must be either 0 or 1:{status}!')
# Verify dataset ownership
if not KnowledgebaseService.query(id=dataset_id, tenant_id=tenant_id):
return get_error_data_result(message="You don't own the dataset.")
e, kb = KnowledgebaseService.get_by_id(dataset_id)
if not e:
return get_error_data_result(message="Can't find this dataset!")
result = {}
has_error = False
for doc_id in doc_ids:
try:
e, doc = DocumentService.get_by_id(doc_id)
if not e:
result[doc_id] = {"error": "Document not found"}
has_error = True
continue
if doc.kb_id != dataset_id:
logging.warning(f"Document {doc.kb_id} not in dataset {dataset_id}")
result[doc_id] = {"error": "Document not found in this dataset."}
has_error = True
continue
current_status = str(doc.status)
if current_status == status:
result[doc_id] = {"status": status}
continue
if not DocumentService.update_by_id(doc_id, {"status": str(status)}):
result[doc_id] = {"error": "Database error (Document update)!"}
has_error = True
continue
status_int = int(status)
if getattr(doc, "chunk_num", 0) > 0:
try:
ok = settings.docStoreConn.update(
{"doc_id": doc_id},
{"available_int": status_int},
search.index_name(kb.tenant_id),
doc.kb_id,
)
except Exception as exc:
msg = str(exc)
if "3022" in msg:
result[doc_id] = {"error": "Document store table missing."}
else:
result[doc_id] = {"error": f"Document store update failed: {msg}"}
has_error = True
continue
if not ok:
result[doc_id] = {"error": "Database error (docStore update)!"}
has_error = True
continue
result[doc_id] = {"status": status}
except Exception as e:
result[doc_id] = {"error": f"Internal server error: {str(e)}"}
has_error = True
if has_error:
return get_json_result(data=result, message="Partial failure", code=RetCode.SERVER_ERROR)
return get_json_result(data=result)