mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-04-24 12:45:32 +08:00
Revert "Refactor: dataset / kb API to RESTFul style" (#13646)
Reverts infiniflow/ragflow#13619
This commit is contained in:
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import json
|
||||
import logging
|
||||
import random
|
||||
import re
|
||||
@ -25,29 +26,34 @@ from api.db.services.connector_service import Connector2KbService
|
||||
from api.db.services.llm_service import LLMBundle
|
||||
from api.db.services.document_service import DocumentService, queue_raptor_o_graphrag_tasks
|
||||
from api.db.services.doc_metadata_service import DocMetadataService
|
||||
from api.db.services.file2document_service import File2DocumentService
|
||||
from api.db.services.file_service import FileService
|
||||
from api.db.services.pipeline_operation_log_service import PipelineOperationLogService
|
||||
from api.db.services.task_service import TaskService, GRAPH_RAPTOR_FAKE_DOC_ID
|
||||
from api.db.services.user_service import UserTenantService
|
||||
from api.db.services.user_service import TenantService, UserTenantService
|
||||
from api.db.joint_services.tenant_model_service import get_model_config_by_type_and_name, get_model_config_by_id
|
||||
from api.utils.api_utils import (
|
||||
get_error_data_result,
|
||||
server_error_response,
|
||||
get_data_error_result,
|
||||
validate_request,
|
||||
not_allowed_parameters,
|
||||
get_request_json,
|
||||
)
|
||||
from common.misc_utils import thread_pool_exec
|
||||
from api.db import VALID_FILE_TYPES
|
||||
from api.db.services.knowledgebase_service import KnowledgebaseService
|
||||
from api.db.db_models import File
|
||||
from api.utils.api_utils import get_json_result
|
||||
from api.utils.tenant_utils import ensure_tenant_model_id_for_params
|
||||
from rag.nlp import search
|
||||
from api.constants import DATASET_NAME_LIMIT
|
||||
from rag.utils.redis_conn import REDIS_CONN
|
||||
from common.constants import RetCode, PipelineTaskType, VALID_TASK_STATUS, LLMType
|
||||
from common.constants import RetCode, PipelineTaskType, StatusEnum, VALID_TASK_STATUS, FileSource, LLMType, PAGERANK_FLD
|
||||
from common import settings
|
||||
from common.doc_store.doc_store_base import OrderByExpr
|
||||
from api.apps import login_required, current_user
|
||||
|
||||
"""
|
||||
Deprecated, todo delete
|
||||
@manager.route('/create', methods=['post']) # noqa: F821
|
||||
@login_required
|
||||
@validate_request("name")
|
||||
@ -180,7 +186,7 @@ async def update():
|
||||
return get_json_result(data=kb)
|
||||
except Exception as e:
|
||||
return server_error_response(e)
|
||||
"""
|
||||
|
||||
|
||||
@manager.route('/update_metadata_setting', methods=['post']) # noqa: F821
|
||||
@login_required
|
||||
@ -228,8 +234,7 @@ def detail():
|
||||
except Exception as e:
|
||||
return server_error_response(e)
|
||||
|
||||
"""
|
||||
Deprecated, todo delete
|
||||
|
||||
@manager.route('/list', methods=['POST']) # noqa: F821
|
||||
@login_required
|
||||
async def list_kbs():
|
||||
@ -324,7 +329,7 @@ async def rm():
|
||||
return await thread_pool_exec(_rm_sync)
|
||||
except Exception as e:
|
||||
return server_error_response(e)
|
||||
"""
|
||||
|
||||
|
||||
@manager.route('/<kb_id>/tags', methods=['GET']) # noqa: F821
|
||||
@login_required
|
||||
@ -400,8 +405,7 @@ async def rename_tags(kb_id):
|
||||
kb_id)
|
||||
return get_json_result(data=True)
|
||||
|
||||
"""
|
||||
Deprecated, todo delete
|
||||
|
||||
@manager.route('/<kb_id>/knowledge_graph', methods=['GET']) # noqa: F821
|
||||
@login_required
|
||||
async def knowledge_graph(kb_id):
|
||||
@ -455,7 +459,7 @@ def delete_knowledge_graph(kb_id):
|
||||
settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation"]}, search.index_name(kb.tenant_id), kb_id)
|
||||
|
||||
return get_json_result(data=True)
|
||||
"""
|
||||
|
||||
|
||||
@manager.route("/get_meta", methods=["GET"]) # noqa: F821
|
||||
@login_required
|
||||
@ -594,8 +598,6 @@ def pipeline_log_detail():
|
||||
return get_json_result(data=log.to_dict())
|
||||
|
||||
|
||||
"""
|
||||
Deprecated, todo delete
|
||||
@manager.route("/run_graphrag", methods=["POST"]) # noqa: F821
|
||||
@login_required
|
||||
async def run_graphrag():
|
||||
@ -732,7 +734,7 @@ def trace_raptor():
|
||||
return get_error_data_result(message="RAPTOR Task Not Found or Error Occurred")
|
||||
|
||||
return get_json_result(data=task.to_dict())
|
||||
"""
|
||||
|
||||
|
||||
@manager.route("/run_mindmap", methods=["POST"]) # noqa: F821
|
||||
@login_required
|
||||
|
||||
@ -1,517 +0,0 @@
|
||||
#
|
||||
# Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import logging
|
||||
|
||||
from peewee import OperationalError
|
||||
from quart import request
|
||||
from common.constants import RetCode
|
||||
from api.apps import login_required, current_user
|
||||
from api.utils.api_utils import get_error_argument_result, get_error_data_result, get_result, add_tenant_id_to_kwargs
|
||||
from api.utils.validation_utils import (
|
||||
CreateDatasetReq,
|
||||
DeleteDatasetReq,
|
||||
ListDatasetReq,
|
||||
UpdateDatasetReq,
|
||||
validate_and_parse_json_request,
|
||||
validate_and_parse_request_args,
|
||||
)
|
||||
from api.apps.services import dataset_api_service
|
||||
|
||||
|
||||
@manager.route("/datasets", methods=["POST"]) # noqa: F821
|
||||
@login_required
|
||||
@add_tenant_id_to_kwargs
|
||||
async def create(tenant_id: str=None):
|
||||
"""
|
||||
Create a new dataset.
|
||||
---
|
||||
tags:
|
||||
- Datasets
|
||||
security:
|
||||
- ApiKeyAuth: []
|
||||
parameters:
|
||||
- in: header
|
||||
name: Authorization
|
||||
type: string
|
||||
required: true
|
||||
description: Bearer token for authentication.
|
||||
- in: body
|
||||
name: body
|
||||
description: Dataset creation parameters.
|
||||
required: true
|
||||
schema:
|
||||
type: object
|
||||
required:
|
||||
- name
|
||||
properties:
|
||||
name:
|
||||
type: string
|
||||
description: Dataset name (required).
|
||||
avatar:
|
||||
type: string
|
||||
description: Optional base64-encoded avatar image.
|
||||
description:
|
||||
type: string
|
||||
description: Optional dataset description.
|
||||
embedding_model:
|
||||
type: string
|
||||
description: Optional embedding model name; if omitted, the tenant's default embedding model is used.
|
||||
permission:
|
||||
type: string
|
||||
enum: ['me', 'team']
|
||||
description: Visibility of the dataset (private to me or shared with team).
|
||||
chunk_method:
|
||||
type: string
|
||||
enum: ["naive", "book", "email", "laws", "manual", "one", "paper",
|
||||
"picture", "presentation", "qa", "table", "tag"]
|
||||
description: Chunking method; if omitted, defaults to "naive".
|
||||
parser_config:
|
||||
type: object
|
||||
description: Optional parser configuration; server-side defaults will be applied.
|
||||
responses:
|
||||
200:
|
||||
description: Successful operation.
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
data:
|
||||
type: object
|
||||
"""
|
||||
req, err = await validate_and_parse_json_request(request, CreateDatasetReq)
|
||||
if err is not None:
|
||||
return get_error_argument_result(err)
|
||||
|
||||
try:
|
||||
if not tenant_id:
|
||||
tenant_id = current_user.id
|
||||
success, result = await dataset_api_service.create_dataset(tenant_id, req)
|
||||
if success:
|
||||
return get_result(data=result)
|
||||
else:
|
||||
return get_error_data_result(message=result)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Internal server error")
|
||||
|
||||
|
||||
@manager.route("/datasets", methods=["DELETE"]) # noqa: F821
|
||||
@login_required
|
||||
@add_tenant_id_to_kwargs
|
||||
async def delete(tenant_id):
|
||||
"""
|
||||
Delete datasets.
|
||||
---
|
||||
tags:
|
||||
- Datasets
|
||||
security:
|
||||
- ApiKeyAuth: []
|
||||
parameters:
|
||||
- in: header
|
||||
name: Authorization
|
||||
type: string
|
||||
required: true
|
||||
description: Bearer token for authentication.
|
||||
- in: body
|
||||
name: body
|
||||
description: Dataset deletion parameters.
|
||||
required: true
|
||||
schema:
|
||||
type: object
|
||||
required:
|
||||
- ids
|
||||
properties:
|
||||
ids:
|
||||
type: array or null
|
||||
items:
|
||||
type: string
|
||||
description: |
|
||||
Specifies the datasets to delete:
|
||||
- If `null`, all datasets will be deleted.
|
||||
- If an array of IDs, only the specified datasets will be deleted.
|
||||
- If an empty array, no datasets will be deleted.
|
||||
responses:
|
||||
200:
|
||||
description: Successful operation.
|
||||
schema:
|
||||
type: object
|
||||
"""
|
||||
req, err = await validate_and_parse_json_request(request, DeleteDatasetReq)
|
||||
if err is not None:
|
||||
return get_error_argument_result(err)
|
||||
|
||||
try:
|
||||
success, result = await dataset_api_service.delete_datasets(tenant_id, req.get("ids"), req.get("delete_all", False))
|
||||
if success:
|
||||
return get_result(data=result)
|
||||
else:
|
||||
return get_error_data_result(message=result)
|
||||
except OperationalError as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Database operation failed")
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Internal server error")
|
||||
|
||||
|
||||
@manager.route("/datasets/<dataset_id>", methods=["PUT"]) # noqa: F821
|
||||
@login_required
|
||||
@add_tenant_id_to_kwargs
|
||||
async def update(tenant_id, dataset_id):
|
||||
"""
|
||||
Update a dataset.
|
||||
---
|
||||
tags:
|
||||
- Datasets
|
||||
security:
|
||||
- ApiKeyAuth: []
|
||||
parameters:
|
||||
- in: path
|
||||
name: dataset_id
|
||||
type: string
|
||||
required: true
|
||||
description: ID of the dataset to update.
|
||||
- in: header
|
||||
name: Authorization
|
||||
type: string
|
||||
required: true
|
||||
description: Bearer token for authentication.
|
||||
- in: body
|
||||
name: body
|
||||
description: Dataset update parameters.
|
||||
required: true
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
name:
|
||||
type: string
|
||||
description: New name of the dataset.
|
||||
avatar:
|
||||
type: string
|
||||
description: Updated base64 encoding of the avatar.
|
||||
description:
|
||||
type: string
|
||||
description: Updated description of the dataset.
|
||||
embedding_model:
|
||||
type: string
|
||||
description: Updated embedding model Name.
|
||||
permission:
|
||||
type: string
|
||||
enum: ['me', 'team']
|
||||
description: Updated dataset permission.
|
||||
chunk_method:
|
||||
type: string
|
||||
enum: ["naive", "book", "email", "laws", "manual", "one", "paper",
|
||||
"picture", "presentation", "qa", "table", "tag"
|
||||
]
|
||||
description: Updated chunking method.
|
||||
pagerank:
|
||||
type: integer
|
||||
description: Updated page rank.
|
||||
parser_config:
|
||||
type: object
|
||||
description: Updated parser configuration.
|
||||
responses:
|
||||
200:
|
||||
description: Successful operation.
|
||||
schema:
|
||||
type: object
|
||||
"""
|
||||
# Field name transformations during model dump:
|
||||
# | Original | Dump Output |
|
||||
# |----------------|-------------|
|
||||
# | embedding_model| embd_id |
|
||||
# | chunk_method | parser_id |
|
||||
extras = {"dataset_id": dataset_id}
|
||||
req, err = await validate_and_parse_json_request(request, UpdateDatasetReq, extras=extras, exclude_unset=True)
|
||||
if err is not None:
|
||||
return get_error_argument_result(err)
|
||||
|
||||
try:
|
||||
success, result = await dataset_api_service.update_dataset(tenant_id, dataset_id, req)
|
||||
if success:
|
||||
return get_result(data=result)
|
||||
else:
|
||||
return get_error_data_result(message=result)
|
||||
except OperationalError as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Database operation failed")
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Internal server error")
|
||||
|
||||
|
||||
@manager.route("/datasets", methods=["GET"]) # noqa: F821
|
||||
@login_required
|
||||
@add_tenant_id_to_kwargs
|
||||
def list_datasets(tenant_id):
|
||||
"""
|
||||
List datasets.
|
||||
---
|
||||
tags:
|
||||
- Datasets
|
||||
security:
|
||||
- ApiKeyAuth: []
|
||||
parameters:
|
||||
- in: query
|
||||
name: id
|
||||
type: string
|
||||
required: false
|
||||
description: Dataset ID to filter.
|
||||
- in: query
|
||||
name: name
|
||||
type: string
|
||||
required: false
|
||||
description: Dataset name to filter.
|
||||
- in: query
|
||||
name: page
|
||||
type: integer
|
||||
required: false
|
||||
default: 1
|
||||
description: Page number.
|
||||
- in: query
|
||||
name: page_size
|
||||
type: integer
|
||||
required: false
|
||||
default: 30
|
||||
description: Number of items per page.
|
||||
- in: query
|
||||
name: orderby
|
||||
type: string
|
||||
required: false
|
||||
default: "create_time"
|
||||
description: Field to order by.
|
||||
- in: query
|
||||
name: desc
|
||||
type: boolean
|
||||
required: false
|
||||
default: true
|
||||
description: Order in descending.
|
||||
- in: header
|
||||
name: Authorization
|
||||
type: string
|
||||
required: true
|
||||
description: Bearer token for authentication.
|
||||
responses:
|
||||
200:
|
||||
description: Successful operation.
|
||||
schema:
|
||||
type: array
|
||||
items:
|
||||
type: object
|
||||
"""
|
||||
args, err = validate_and_parse_request_args(request, ListDatasetReq)
|
||||
if err is not None:
|
||||
return get_error_argument_result(err)
|
||||
|
||||
try:
|
||||
success, result = dataset_api_service.list_datasets(tenant_id, args)
|
||||
if success:
|
||||
return get_result(data=result.get("data"), total=result.get("total"))
|
||||
else:
|
||||
return get_error_data_result(message=result)
|
||||
except OperationalError as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Database operation failed")
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Internal server error")
|
||||
|
||||
|
||||
@manager.route('/datasets/<dataset_id>/knowledge_graph', methods=['GET']) # noqa: F821
|
||||
@login_required
|
||||
@add_tenant_id_to_kwargs
|
||||
async def knowledge_graph(tenant_id, dataset_id):
|
||||
try:
|
||||
success, result = await dataset_api_service.get_knowledge_graph(dataset_id, tenant_id)
|
||||
if success:
|
||||
return get_result(data=result)
|
||||
else:
|
||||
return get_result(
|
||||
data=False,
|
||||
message=result,
|
||||
code=RetCode.AUTHENTICATION_ERROR
|
||||
)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Internal server error")
|
||||
|
||||
|
||||
@manager.route('/datasets/<dataset_id>/knowledge_graph', methods=['DELETE']) # noqa: F821
|
||||
@login_required
|
||||
@add_tenant_id_to_kwargs
|
||||
def delete_knowledge_graph(tenant_id, dataset_id):
|
||||
try:
|
||||
success, result = dataset_api_service.delete_knowledge_graph(dataset_id, tenant_id)
|
||||
if success:
|
||||
return get_result(data=result)
|
||||
else:
|
||||
return get_result(
|
||||
data=False,
|
||||
message=result,
|
||||
code=RetCode.AUTHENTICATION_ERROR
|
||||
)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Internal server error")
|
||||
|
||||
|
||||
@manager.route("/datasets/<dataset_id>/run_graphrag", methods=["POST"]) # noqa: F821
|
||||
@login_required
|
||||
@add_tenant_id_to_kwargs
|
||||
async def run_graphrag(tenant_id, dataset_id):
|
||||
try:
|
||||
success, result = dataset_api_service.run_graphrag(dataset_id, tenant_id)
|
||||
if success:
|
||||
return get_result(data=result)
|
||||
else:
|
||||
return get_error_data_result(message=result)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Internal server error")
|
||||
|
||||
|
||||
@manager.route("/datasets/<dataset_id>/trace_graphrag", methods=["GET"]) # noqa: F821
|
||||
@login_required
|
||||
@add_tenant_id_to_kwargs
|
||||
def trace_graphrag(tenant_id, dataset_id):
|
||||
try:
|
||||
success, result = dataset_api_service.trace_graphrag(dataset_id, tenant_id)
|
||||
if success:
|
||||
return get_result(data=result)
|
||||
else:
|
||||
return get_error_data_result(message=result)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Internal server error")
|
||||
|
||||
|
||||
@manager.route("/datasets/<dataset_id>/run_raptor", methods=["POST"]) # noqa: F821
|
||||
@login_required
|
||||
@add_tenant_id_to_kwargs
|
||||
async def run_raptor(tenant_id, dataset_id):
|
||||
try:
|
||||
success, result = dataset_api_service.run_raptor(dataset_id, tenant_id)
|
||||
if success:
|
||||
return get_result(data=result)
|
||||
else:
|
||||
return get_error_data_result(message=result)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Internal server error")
|
||||
|
||||
|
||||
@manager.route("/datasets/<dataset_id>/trace_raptor", methods=["GET"]) # noqa: F821
|
||||
@login_required
|
||||
@add_tenant_id_to_kwargs
|
||||
def trace_raptor(tenant_id, dataset_id):
|
||||
try:
|
||||
success, result = dataset_api_service.trace_raptor(dataset_id, tenant_id)
|
||||
if success:
|
||||
return get_result(data=result)
|
||||
else:
|
||||
return get_error_data_result(message=result)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Internal server error")
|
||||
|
||||
|
||||
@manager.route("/datasets/<dataset_id>/auto_metadata", methods=["GET"]) # noqa: F821
|
||||
@login_required
|
||||
@add_tenant_id_to_kwargs
|
||||
def get_auto_metadata(tenant_id, dataset_id):
|
||||
"""
|
||||
Get auto-metadata configuration for a dataset.
|
||||
---
|
||||
tags:
|
||||
- Datasets
|
||||
security:
|
||||
- ApiKeyAuth: []
|
||||
parameters:
|
||||
- in: path
|
||||
name: dataset_id
|
||||
type: string
|
||||
required: true
|
||||
description: ID of the dataset.
|
||||
- in: header
|
||||
name: Authorization
|
||||
type: string
|
||||
required: true
|
||||
description: Bearer token for authentication.
|
||||
responses:
|
||||
200:
|
||||
description: Successful operation.
|
||||
schema:
|
||||
type: object
|
||||
"""
|
||||
try:
|
||||
success, result = dataset_api_service.get_auto_metadata(dataset_id, tenant_id)
|
||||
if success:
|
||||
return get_result(data=result)
|
||||
else:
|
||||
return get_error_data_result(message=result)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Internal server error")
|
||||
|
||||
|
||||
@manager.route("/datasets/<dataset_id>/auto_metadata", methods=["PUT"]) # noqa: F821
|
||||
@login_required
|
||||
@add_tenant_id_to_kwargs
|
||||
async def update_auto_metadata(tenant_id, dataset_id):
|
||||
"""
|
||||
Update auto-metadata configuration for a dataset.
|
||||
---
|
||||
tags:
|
||||
- Datasets
|
||||
security:
|
||||
- ApiKeyAuth: []
|
||||
parameters:
|
||||
- in: path
|
||||
name: dataset_id
|
||||
type: string
|
||||
required: true
|
||||
description: ID of the dataset.
|
||||
- in: header
|
||||
name: Authorization
|
||||
type: string
|
||||
required: true
|
||||
description: Bearer token for authentication.
|
||||
- in: body
|
||||
name: body
|
||||
description: Auto-metadata configuration.
|
||||
required: true
|
||||
schema:
|
||||
type: object
|
||||
responses:
|
||||
200:
|
||||
description: Successful operation.
|
||||
schema:
|
||||
type: object
|
||||
"""
|
||||
from api.utils.validation_utils import AutoMetadataConfig
|
||||
cfg, err = await validate_and_parse_json_request(request, AutoMetadataConfig)
|
||||
if err is not None:
|
||||
return get_error_argument_result(err)
|
||||
|
||||
try:
|
||||
success, result = await dataset_api_service.update_auto_metadata(dataset_id, tenant_id, cfg)
|
||||
if success:
|
||||
return get_result(data=result)
|
||||
else:
|
||||
return get_error_data_result(message=result)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Internal server error")
|
||||
@ -1,5 +1,5 @@
|
||||
#
|
||||
# Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
||||
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
||||
798
api/apps/sdk/dataset.py
Normal file
798
api/apps/sdk/dataset.py
Normal file
@ -0,0 +1,798 @@
|
||||
#
|
||||
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
|
||||
import logging
|
||||
import os
|
||||
import json
|
||||
from quart import request
|
||||
from peewee import OperationalError
|
||||
from api.db.db_models import File
|
||||
from api.db.services.document_service import DocumentService, queue_raptor_o_graphrag_tasks
|
||||
from api.db.services.file2document_service import File2DocumentService
|
||||
from api.db.services.file_service import FileService
|
||||
from api.db.services.knowledgebase_service import KnowledgebaseService
|
||||
from api.db.services.task_service import GRAPH_RAPTOR_FAKE_DOC_ID, TaskService
|
||||
from api.db.services.user_service import TenantService
|
||||
from common.constants import RetCode, FileSource, StatusEnum
|
||||
from api.utils.api_utils import (
|
||||
deep_merge,
|
||||
get_error_argument_result,
|
||||
get_error_data_result,
|
||||
get_error_permission_result,
|
||||
get_parser_config,
|
||||
get_result,
|
||||
remap_dictionary_keys,
|
||||
token_required,
|
||||
verify_embedding_availability,
|
||||
)
|
||||
from api.utils.validation_utils import (
|
||||
AutoMetadataConfig,
|
||||
CreateDatasetReq,
|
||||
DeleteDatasetReq,
|
||||
ListDatasetReq,
|
||||
UpdateDatasetReq,
|
||||
validate_and_parse_json_request,
|
||||
validate_and_parse_request_args,
|
||||
)
|
||||
from rag.nlp import search
|
||||
from common.constants import PAGERANK_FLD
|
||||
from common import settings
|
||||
|
||||
|
||||
@manager.route("/datasets", methods=["POST"]) # noqa: F821
|
||||
@token_required
|
||||
async def create(tenant_id):
|
||||
"""
|
||||
Create a new dataset.
|
||||
---
|
||||
tags:
|
||||
- Datasets
|
||||
security:
|
||||
- ApiKeyAuth: []
|
||||
parameters:
|
||||
- in: header
|
||||
name: Authorization
|
||||
type: string
|
||||
required: true
|
||||
description: Bearer token for authentication.
|
||||
- in: body
|
||||
name: body
|
||||
description: Dataset creation parameters.
|
||||
required: true
|
||||
schema:
|
||||
type: object
|
||||
required:
|
||||
- name
|
||||
properties:
|
||||
name:
|
||||
type: string
|
||||
description: Dataset name (required).
|
||||
avatar:
|
||||
type: string
|
||||
description: Optional base64-encoded avatar image.
|
||||
description:
|
||||
type: string
|
||||
description: Optional dataset description.
|
||||
embedding_model:
|
||||
type: string
|
||||
description: Optional embedding model name; if omitted, the tenant's default embedding model is used.
|
||||
permission:
|
||||
type: string
|
||||
enum: ['me', 'team']
|
||||
description: Visibility of the dataset (private to me or shared with team).
|
||||
chunk_method:
|
||||
type: string
|
||||
enum: ["naive", "book", "email", "laws", "manual", "one", "paper",
|
||||
"picture", "presentation", "qa", "table", "tag"]
|
||||
description: Chunking method; if omitted, defaults to "naive".
|
||||
parser_config:
|
||||
type: object
|
||||
description: Optional parser configuration; server-side defaults will be applied.
|
||||
responses:
|
||||
200:
|
||||
description: Successful operation.
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
data:
|
||||
type: object
|
||||
"""
|
||||
# Field name transformations during model dump:
|
||||
# | Original | Dump Output |
|
||||
# |----------------|-------------|
|
||||
# | embedding_model| embd_id |
|
||||
# | chunk_method | parser_id |
|
||||
|
||||
req, err = await validate_and_parse_json_request(request, CreateDatasetReq)
|
||||
if err is not None:
|
||||
return get_error_argument_result(err)
|
||||
# Map auto_metadata_config (if provided) into parser_config structure
|
||||
auto_meta = req.pop("auto_metadata_config", None)
|
||||
if auto_meta:
|
||||
parser_cfg = req.get("parser_config") or {}
|
||||
fields = []
|
||||
for f in auto_meta.get("fields", []):
|
||||
fields.append(
|
||||
{
|
||||
"name": f.get("name", ""),
|
||||
"type": f.get("type", ""),
|
||||
"description": f.get("description"),
|
||||
"examples": f.get("examples"),
|
||||
"restrict_values": f.get("restrict_values", False),
|
||||
}
|
||||
)
|
||||
parser_cfg["metadata"] = fields
|
||||
parser_cfg["enable_metadata"] = auto_meta.get("enabled", True)
|
||||
req["parser_config"] = parser_cfg
|
||||
e, req = KnowledgebaseService.create_with_name(name=req.pop("name", None), tenant_id=tenant_id, parser_id=req.pop("parser_id", None), **req)
|
||||
|
||||
if not e:
|
||||
return req
|
||||
|
||||
# Insert embedding model(embd id)
|
||||
ok, t = TenantService.get_by_id(tenant_id)
|
||||
if not ok:
|
||||
return get_error_permission_result(message="Tenant not found")
|
||||
if not req.get("embd_id"):
|
||||
req["embd_id"] = t.embd_id
|
||||
else:
|
||||
ok, err = verify_embedding_availability(req["embd_id"], tenant_id)
|
||||
if not ok:
|
||||
return err
|
||||
|
||||
try:
|
||||
if not KnowledgebaseService.save(**req):
|
||||
return get_error_data_result()
|
||||
ok, k = KnowledgebaseService.get_by_id(req["id"])
|
||||
if not ok:
|
||||
return get_error_data_result(message="Dataset created failed")
|
||||
response_data = remap_dictionary_keys(k.to_dict())
|
||||
return get_result(data=response_data)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Database operation failed")
|
||||
|
||||
|
||||
@manager.route("/datasets", methods=["DELETE"]) # noqa: F821
|
||||
@token_required
|
||||
async def delete(tenant_id):
|
||||
"""
|
||||
Delete datasets.
|
||||
---
|
||||
tags:
|
||||
- Datasets
|
||||
security:
|
||||
- ApiKeyAuth: []
|
||||
parameters:
|
||||
- in: header
|
||||
name: Authorization
|
||||
type: string
|
||||
required: true
|
||||
description: Bearer token for authentication.
|
||||
- in: body
|
||||
name: body
|
||||
description: Dataset deletion parameters.
|
||||
required: true
|
||||
schema:
|
||||
type: object
|
||||
required:
|
||||
- ids
|
||||
properties:
|
||||
ids:
|
||||
type: array or null
|
||||
items:
|
||||
type: string
|
||||
description: |
|
||||
List of dataset IDs to delete.
|
||||
If `null` or an empty array is provided, no datasets will be deleted
|
||||
unless `delete_all` is set to `true`.
|
||||
delete_all:
|
||||
type: boolean
|
||||
description: |
|
||||
If `true` and `ids` is null or empty, delete all datasets owned by the current user.
|
||||
Defaults to `false`.
|
||||
responses:
|
||||
200:
|
||||
description: Successful operation.
|
||||
schema:
|
||||
type: object
|
||||
"""
|
||||
req, err = await validate_and_parse_json_request(request, DeleteDatasetReq)
|
||||
if err is not None:
|
||||
return get_error_argument_result(err)
|
||||
|
||||
try:
|
||||
kb_id_instance_pairs = []
|
||||
if req["ids"] is None or len(req["ids"]) == 0:
|
||||
if req.get("delete_all"):
|
||||
req["ids"] = [kb.id for kb in KnowledgebaseService.query(tenant_id=tenant_id)]
|
||||
if not req["ids"]:
|
||||
return get_result()
|
||||
else:
|
||||
return get_result()
|
||||
|
||||
error_kb_ids = []
|
||||
for kb_id in req["ids"]:
|
||||
kb = KnowledgebaseService.get_or_none(id=kb_id, tenant_id=tenant_id)
|
||||
if kb is None:
|
||||
error_kb_ids.append(kb_id)
|
||||
continue
|
||||
kb_id_instance_pairs.append((kb_id, kb))
|
||||
if len(error_kb_ids) > 0:
|
||||
return get_error_permission_result(message=f"""User '{tenant_id}' lacks permission for datasets: '{", ".join(error_kb_ids)}'""")
|
||||
|
||||
errors = []
|
||||
success_count = 0
|
||||
for kb_id, kb in kb_id_instance_pairs:
|
||||
for doc in DocumentService.query(kb_id=kb_id):
|
||||
if not DocumentService.remove_document(doc, tenant_id):
|
||||
errors.append(f"Remove document '{doc.id}' error for dataset '{kb_id}'")
|
||||
continue
|
||||
f2d = File2DocumentService.get_by_document_id(doc.id)
|
||||
FileService.filter_delete(
|
||||
[
|
||||
File.source_type == FileSource.KNOWLEDGEBASE,
|
||||
File.id == f2d[0].file_id,
|
||||
]
|
||||
)
|
||||
File2DocumentService.delete_by_document_id(doc.id)
|
||||
FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.type == "folder", File.name == kb.name])
|
||||
|
||||
# Drop index for this dataset
|
||||
try:
|
||||
from rag.nlp import search
|
||||
|
||||
idxnm = search.index_name(kb.tenant_id)
|
||||
settings.docStoreConn.delete_idx(idxnm, kb_id)
|
||||
except Exception as e:
|
||||
logging.warning(f"Failed to drop index for dataset {kb_id}: {e}")
|
||||
|
||||
if not KnowledgebaseService.delete_by_id(kb_id):
|
||||
errors.append(f"Delete dataset error for {kb_id}")
|
||||
continue
|
||||
success_count += 1
|
||||
|
||||
if not errors:
|
||||
return get_result()
|
||||
|
||||
error_message = f"Successfully deleted {success_count} datasets, {len(errors)} failed. Details: {'; '.join(errors)[:128]}..."
|
||||
if success_count == 0:
|
||||
return get_error_data_result(message=error_message)
|
||||
|
||||
return get_result(data={"success_count": success_count, "errors": errors[:5]}, message=error_message)
|
||||
except OperationalError as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Database operation failed")
|
||||
|
||||
|
||||
@manager.route("/datasets/<dataset_id>", methods=["PUT"]) # noqa: F821
|
||||
@token_required
|
||||
async def update(tenant_id, dataset_id):
|
||||
"""
|
||||
Update a dataset.
|
||||
---
|
||||
tags:
|
||||
- Datasets
|
||||
security:
|
||||
- ApiKeyAuth: []
|
||||
parameters:
|
||||
- in: path
|
||||
name: dataset_id
|
||||
type: string
|
||||
required: true
|
||||
description: ID of the dataset to update.
|
||||
- in: header
|
||||
name: Authorization
|
||||
type: string
|
||||
required: true
|
||||
description: Bearer token for authentication.
|
||||
- in: body
|
||||
name: body
|
||||
description: Dataset update parameters.
|
||||
required: true
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
name:
|
||||
type: string
|
||||
description: New name of the dataset.
|
||||
avatar:
|
||||
type: string
|
||||
description: Updated base64 encoding of the avatar.
|
||||
description:
|
||||
type: string
|
||||
description: Updated description of the dataset.
|
||||
embedding_model:
|
||||
type: string
|
||||
description: Updated embedding model Name.
|
||||
permission:
|
||||
type: string
|
||||
enum: ['me', 'team']
|
||||
description: Updated dataset permission.
|
||||
chunk_method:
|
||||
type: string
|
||||
enum: ["naive", "book", "email", "laws", "manual", "one", "paper",
|
||||
"picture", "presentation", "qa", "table", "tag"
|
||||
]
|
||||
description: Updated chunking method.
|
||||
pagerank:
|
||||
type: integer
|
||||
description: Updated page rank.
|
||||
parser_config:
|
||||
type: object
|
||||
description: Updated parser configuration.
|
||||
responses:
|
||||
200:
|
||||
description: Successful operation.
|
||||
schema:
|
||||
type: object
|
||||
"""
|
||||
# Field name transformations during model dump:
|
||||
# | Original | Dump Output |
|
||||
# |----------------|-------------|
|
||||
# | embedding_model| embd_id |
|
||||
# | chunk_method | parser_id |
|
||||
extras = {"dataset_id": dataset_id}
|
||||
req, err = await validate_and_parse_json_request(request, UpdateDatasetReq, extras=extras, exclude_unset=True)
|
||||
if err is not None:
|
||||
return get_error_argument_result(err)
|
||||
|
||||
if not req:
|
||||
return get_error_argument_result(message="No properties were modified")
|
||||
|
||||
try:
|
||||
kb = KnowledgebaseService.get_or_none(id=dataset_id, tenant_id=tenant_id)
|
||||
if kb is None:
|
||||
return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{dataset_id}'")
|
||||
|
||||
# Map auto_metadata_config into parser_config if present
|
||||
auto_meta = req.pop("auto_metadata_config", None)
|
||||
if auto_meta:
|
||||
parser_cfg = req.get("parser_config") or {}
|
||||
fields = []
|
||||
for f in auto_meta.get("fields", []):
|
||||
fields.append(
|
||||
{
|
||||
"name": f.get("name", ""),
|
||||
"type": f.get("type", ""),
|
||||
"description": f.get("description"),
|
||||
"examples": f.get("examples"),
|
||||
"restrict_values": f.get("restrict_values", False),
|
||||
}
|
||||
)
|
||||
parser_cfg["metadata"] = fields
|
||||
parser_cfg["enable_metadata"] = auto_meta.get("enabled", True)
|
||||
req["parser_config"] = parser_cfg
|
||||
|
||||
if req.get("parser_config"):
|
||||
req["parser_config"] = deep_merge(kb.parser_config, req["parser_config"])
|
||||
|
||||
if (chunk_method := req.get("parser_id")) and chunk_method != kb.parser_id:
|
||||
if not req.get("parser_config"):
|
||||
req["parser_config"] = get_parser_config(chunk_method, None)
|
||||
elif "parser_config" in req and not req["parser_config"]:
|
||||
del req["parser_config"]
|
||||
|
||||
if "name" in req and req["name"].lower() != kb.name.lower():
|
||||
exists = KnowledgebaseService.get_or_none(name=req["name"], tenant_id=tenant_id, status=StatusEnum.VALID.value)
|
||||
if exists:
|
||||
return get_error_data_result(message=f"Dataset name '{req['name']}' already exists")
|
||||
|
||||
if "embd_id" in req:
|
||||
if not req["embd_id"]:
|
||||
req["embd_id"] = kb.embd_id
|
||||
if kb.chunk_num != 0 and req["embd_id"] != kb.embd_id:
|
||||
return get_error_data_result(message=f"When chunk_num ({kb.chunk_num}) > 0, embedding_model must remain {kb.embd_id}")
|
||||
ok, err = verify_embedding_availability(req["embd_id"], tenant_id)
|
||||
if not ok:
|
||||
return err
|
||||
|
||||
if "pagerank" in req and req["pagerank"] != kb.pagerank:
|
||||
if os.environ.get("DOC_ENGINE", "elasticsearch") == "infinity":
|
||||
return get_error_argument_result(message="'pagerank' can only be set when doc_engine is elasticsearch")
|
||||
|
||||
if req["pagerank"] > 0:
|
||||
settings.docStoreConn.update({"kb_id": kb.id}, {PAGERANK_FLD: req["pagerank"]}, search.index_name(kb.tenant_id), kb.id)
|
||||
else:
|
||||
# Elasticsearch requires PAGERANK_FLD be non-zero!
|
||||
settings.docStoreConn.update({"exists": PAGERANK_FLD}, {"remove": PAGERANK_FLD}, search.index_name(kb.tenant_id), kb.id)
|
||||
|
||||
if not KnowledgebaseService.update_by_id(kb.id, req):
|
||||
return get_error_data_result(message="Update dataset error.(Database error)")
|
||||
|
||||
ok, k = KnowledgebaseService.get_by_id(kb.id)
|
||||
if not ok:
|
||||
return get_error_data_result(message="Dataset created failed")
|
||||
|
||||
response_data = remap_dictionary_keys(k.to_dict())
|
||||
return get_result(data=response_data)
|
||||
except OperationalError as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Database operation failed")
|
||||
|
||||
|
||||
@manager.route("/datasets", methods=["GET"]) # noqa: F821
|
||||
@token_required
|
||||
def list_datasets(tenant_id):
|
||||
"""
|
||||
List datasets.
|
||||
---
|
||||
tags:
|
||||
- Datasets
|
||||
security:
|
||||
- ApiKeyAuth: []
|
||||
parameters:
|
||||
- in: query
|
||||
name: id
|
||||
type: string
|
||||
required: false
|
||||
description: Dataset ID to filter.
|
||||
- in: query
|
||||
name: name
|
||||
type: string
|
||||
required: false
|
||||
description: Dataset name to filter.
|
||||
- in: query
|
||||
name: page
|
||||
type: integer
|
||||
required: false
|
||||
default: 1
|
||||
description: Page number.
|
||||
- in: query
|
||||
name: page_size
|
||||
type: integer
|
||||
required: false
|
||||
default: 30
|
||||
description: Number of items per page.
|
||||
- in: query
|
||||
name: orderby
|
||||
type: string
|
||||
required: false
|
||||
default: "create_time"
|
||||
description: Field to order by.
|
||||
- in: query
|
||||
name: desc
|
||||
type: boolean
|
||||
required: false
|
||||
default: true
|
||||
description: Order in descending.
|
||||
- in: query
|
||||
name: include_parsing_status
|
||||
type: boolean
|
||||
required: false
|
||||
default: false
|
||||
description: |
|
||||
Whether to include document parsing status counts in the response.
|
||||
When true, each dataset object will include: unstart_count, running_count,
|
||||
cancel_count, done_count, and fail_count.
|
||||
- in: header
|
||||
name: Authorization
|
||||
type: string
|
||||
required: true
|
||||
description: Bearer token for authentication.
|
||||
responses:
|
||||
200:
|
||||
description: Successful operation.
|
||||
schema:
|
||||
type: array
|
||||
items:
|
||||
type: object
|
||||
"""
|
||||
args, err = validate_and_parse_request_args(request, ListDatasetReq)
|
||||
if err is not None:
|
||||
return get_error_argument_result(err)
|
||||
|
||||
include_parsing_status = args.get("include_parsing_status", False)
|
||||
|
||||
try:
|
||||
kb_id = request.args.get("id")
|
||||
name = args.get("name")
|
||||
# check whether user has permission for the dataset with specified id
|
||||
if kb_id:
|
||||
if not KnowledgebaseService.get_kb_by_id(kb_id, tenant_id):
|
||||
return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{kb_id}'")
|
||||
# check whether user has permission for the dataset with specified name
|
||||
if name:
|
||||
if not KnowledgebaseService.get_kb_by_name(name, tenant_id):
|
||||
return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{name}'")
|
||||
|
||||
tenants = TenantService.get_joined_tenants_by_user_id(tenant_id)
|
||||
kbs, total = KnowledgebaseService.get_list(
|
||||
[m["tenant_id"] for m in tenants],
|
||||
tenant_id,
|
||||
args["page"],
|
||||
args["page_size"],
|
||||
args["orderby"],
|
||||
args["desc"],
|
||||
kb_id,
|
||||
name,
|
||||
)
|
||||
|
||||
parsing_status_map = {}
|
||||
if include_parsing_status and kbs:
|
||||
kb_ids = [kb["id"] for kb in kbs]
|
||||
parsing_status_map = DocumentService.get_parsing_status_by_kb_ids(kb_ids)
|
||||
|
||||
response_data_list = []
|
||||
for kb in kbs:
|
||||
data = remap_dictionary_keys(kb)
|
||||
if include_parsing_status:
|
||||
data.update(parsing_status_map.get(kb["id"], {}))
|
||||
response_data_list.append(data)
|
||||
return get_result(data=response_data_list, total=total)
|
||||
except OperationalError as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Database operation failed")
|
||||
|
||||
|
||||
@manager.route("/datasets/<dataset_id>/auto_metadata", methods=["GET"]) # noqa: F821
|
||||
@token_required
|
||||
def get_auto_metadata(tenant_id, dataset_id):
|
||||
"""
|
||||
Get auto-metadata configuration for a dataset.
|
||||
"""
|
||||
try:
|
||||
kb = KnowledgebaseService.get_or_none(id=dataset_id, tenant_id=tenant_id)
|
||||
if kb is None:
|
||||
return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{dataset_id}'")
|
||||
|
||||
parser_cfg = kb.parser_config or {}
|
||||
metadata = parser_cfg.get("metadata") or []
|
||||
enabled = parser_cfg.get("enable_metadata", bool(metadata))
|
||||
# Normalize to AutoMetadataConfig-like JSON
|
||||
fields = []
|
||||
for f in metadata:
|
||||
if not isinstance(f, dict):
|
||||
continue
|
||||
fields.append(
|
||||
{
|
||||
"name": f.get("name", ""),
|
||||
"type": f.get("type", ""),
|
||||
"description": f.get("description"),
|
||||
"examples": f.get("examples"),
|
||||
"restrict_values": f.get("restrict_values", False),
|
||||
}
|
||||
)
|
||||
return get_result(data={"enabled": enabled, "fields": fields})
|
||||
except OperationalError as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Database operation failed")
|
||||
|
||||
|
||||
@manager.route("/datasets/<dataset_id>/auto_metadata", methods=["PUT"]) # noqa: F821
|
||||
@token_required
|
||||
async def update_auto_metadata(tenant_id, dataset_id):
|
||||
"""
|
||||
Update auto-metadata configuration for a dataset.
|
||||
"""
|
||||
cfg, err = await validate_and_parse_json_request(request, AutoMetadataConfig)
|
||||
if err is not None:
|
||||
return get_error_argument_result(err)
|
||||
|
||||
try:
|
||||
kb = KnowledgebaseService.get_or_none(id=dataset_id, tenant_id=tenant_id)
|
||||
if kb is None:
|
||||
return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{dataset_id}'")
|
||||
|
||||
parser_cfg = kb.parser_config or {}
|
||||
fields = []
|
||||
for f in cfg.get("fields", []):
|
||||
fields.append(
|
||||
{
|
||||
"name": f.get("name", ""),
|
||||
"type": f.get("type", ""),
|
||||
"description": f.get("description"),
|
||||
"examples": f.get("examples"),
|
||||
"restrict_values": f.get("restrict_values", False),
|
||||
}
|
||||
)
|
||||
parser_cfg["metadata"] = fields
|
||||
parser_cfg["enable_metadata"] = cfg.get("enabled", True)
|
||||
|
||||
if not KnowledgebaseService.update_by_id(kb.id, {"parser_config": parser_cfg}):
|
||||
return get_error_data_result(message="Update auto-metadata error.(Database error)")
|
||||
|
||||
return get_result(data={"enabled": parser_cfg["enable_metadata"], "fields": fields})
|
||||
except OperationalError as e:
|
||||
logging.exception(e)
|
||||
return get_error_data_result(message="Database operation failed")
|
||||
|
||||
|
||||
@manager.route("/datasets/<dataset_id>/knowledge_graph", methods=["GET"]) # noqa: F821
|
||||
@token_required
|
||||
async def knowledge_graph(tenant_id, dataset_id):
|
||||
if not KnowledgebaseService.accessible(dataset_id, tenant_id):
|
||||
return get_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR)
|
||||
_, kb = KnowledgebaseService.get_by_id(dataset_id)
|
||||
req = {"kb_id": [dataset_id], "knowledge_graph_kwd": ["graph"]}
|
||||
|
||||
obj = {"graph": {}, "mind_map": {}}
|
||||
if not settings.docStoreConn.index_exist(search.index_name(kb.tenant_id), dataset_id):
|
||||
return get_result(data=obj)
|
||||
sres = await settings.retriever.search(req, search.index_name(kb.tenant_id), [dataset_id])
|
||||
if not len(sres.ids):
|
||||
return get_result(data=obj)
|
||||
|
||||
for id in sres.ids[:1]:
|
||||
ty = sres.field[id]["knowledge_graph_kwd"]
|
||||
try:
|
||||
content_json = json.loads(sres.field[id]["content_with_weight"])
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
obj[ty] = content_json
|
||||
|
||||
if "nodes" in obj["graph"]:
|
||||
obj["graph"]["nodes"] = sorted(obj["graph"]["nodes"], key=lambda x: x.get("pagerank", 0), reverse=True)[:256]
|
||||
if "edges" in obj["graph"]:
|
||||
node_id_set = {o["id"] for o in obj["graph"]["nodes"]}
|
||||
filtered_edges = [o for o in obj["graph"]["edges"] if o["source"] != o["target"] and o["source"] in node_id_set and o["target"] in node_id_set]
|
||||
obj["graph"]["edges"] = sorted(filtered_edges, key=lambda x: x.get("weight", 0), reverse=True)[:128]
|
||||
return get_result(data=obj)
|
||||
|
||||
|
||||
@manager.route("/datasets/<dataset_id>/knowledge_graph", methods=["DELETE"]) # noqa: F821
|
||||
@token_required
|
||||
def delete_knowledge_graph(tenant_id, dataset_id):
|
||||
if not KnowledgebaseService.accessible(dataset_id, tenant_id):
|
||||
return get_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR)
|
||||
_, kb = KnowledgebaseService.get_by_id(dataset_id)
|
||||
settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation"]}, search.index_name(kb.tenant_id), dataset_id)
|
||||
|
||||
return get_result(data=True)
|
||||
|
||||
|
||||
@manager.route("/datasets/<dataset_id>/run_graphrag", methods=["POST"]) # noqa: F821
|
||||
@token_required
|
||||
def run_graphrag(tenant_id, dataset_id):
|
||||
if not dataset_id:
|
||||
return get_error_data_result(message='Lack of "Dataset ID"')
|
||||
if not KnowledgebaseService.accessible(dataset_id, tenant_id):
|
||||
return get_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR)
|
||||
|
||||
ok, kb = KnowledgebaseService.get_by_id(dataset_id)
|
||||
if not ok:
|
||||
return get_error_data_result(message="Invalid Dataset ID")
|
||||
|
||||
task_id = kb.graphrag_task_id
|
||||
if task_id:
|
||||
ok, task = TaskService.get_by_id(task_id)
|
||||
if not ok:
|
||||
logging.warning(f"A valid GraphRAG task id is expected for Dataset {dataset_id}")
|
||||
|
||||
if task and task.progress not in [-1, 1]:
|
||||
return get_error_data_result(message=f"Task {task_id} in progress with status {task.progress}. A Graph Task is already running.")
|
||||
|
||||
documents, _ = DocumentService.get_by_kb_id(
|
||||
kb_id=dataset_id,
|
||||
page_number=0,
|
||||
items_per_page=0,
|
||||
orderby="create_time",
|
||||
desc=False,
|
||||
keywords="",
|
||||
run_status=[],
|
||||
types=[],
|
||||
suffix=[],
|
||||
)
|
||||
if not documents:
|
||||
return get_error_data_result(message=f"No documents in Dataset {dataset_id}")
|
||||
|
||||
sample_document = documents[0]
|
||||
document_ids = [document["id"] for document in documents]
|
||||
|
||||
task_id = queue_raptor_o_graphrag_tasks(sample_doc_id=sample_document, ty="graphrag", priority=0, fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, doc_ids=list(document_ids))
|
||||
|
||||
if not KnowledgebaseService.update_by_id(kb.id, {"graphrag_task_id": task_id}):
|
||||
logging.warning(f"Cannot save graphrag_task_id for Dataset {dataset_id}")
|
||||
|
||||
return get_result(data={"graphrag_task_id": task_id})
|
||||
|
||||
|
||||
@manager.route("/datasets/<dataset_id>/trace_graphrag", methods=["GET"]) # noqa: F821
|
||||
@token_required
|
||||
def trace_graphrag(tenant_id, dataset_id):
|
||||
if not dataset_id:
|
||||
return get_error_data_result(message='Lack of "Dataset ID"')
|
||||
if not KnowledgebaseService.accessible(dataset_id, tenant_id):
|
||||
return get_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR)
|
||||
|
||||
ok, kb = KnowledgebaseService.get_by_id(dataset_id)
|
||||
if not ok:
|
||||
return get_error_data_result(message="Invalid Dataset ID")
|
||||
|
||||
task_id = kb.graphrag_task_id
|
||||
if not task_id:
|
||||
return get_result(data={})
|
||||
|
||||
ok, task = TaskService.get_by_id(task_id)
|
||||
if not ok:
|
||||
return get_result(data={})
|
||||
|
||||
return get_result(data=task.to_dict())
|
||||
|
||||
|
||||
@manager.route("/datasets/<dataset_id>/run_raptor", methods=["POST"]) # noqa: F821
|
||||
@token_required
|
||||
def run_raptor(tenant_id, dataset_id):
|
||||
if not dataset_id:
|
||||
return get_error_data_result(message='Lack of "Dataset ID"')
|
||||
if not KnowledgebaseService.accessible(dataset_id, tenant_id):
|
||||
return get_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR)
|
||||
|
||||
ok, kb = KnowledgebaseService.get_by_id(dataset_id)
|
||||
if not ok:
|
||||
return get_error_data_result(message="Invalid Dataset ID")
|
||||
|
||||
task_id = kb.raptor_task_id
|
||||
if task_id:
|
||||
ok, task = TaskService.get_by_id(task_id)
|
||||
if not ok:
|
||||
logging.warning(f"A valid RAPTOR task id is expected for Dataset {dataset_id}")
|
||||
|
||||
if task and task.progress not in [-1, 1]:
|
||||
return get_error_data_result(message=f"Task {task_id} in progress with status {task.progress}. A RAPTOR Task is already running.")
|
||||
|
||||
documents, _ = DocumentService.get_by_kb_id(
|
||||
kb_id=dataset_id,
|
||||
page_number=0,
|
||||
items_per_page=0,
|
||||
orderby="create_time",
|
||||
desc=False,
|
||||
keywords="",
|
||||
run_status=[],
|
||||
types=[],
|
||||
suffix=[],
|
||||
)
|
||||
if not documents:
|
||||
return get_error_data_result(message=f"No documents in Dataset {dataset_id}")
|
||||
|
||||
sample_document = documents[0]
|
||||
document_ids = [document["id"] for document in documents]
|
||||
|
||||
task_id = queue_raptor_o_graphrag_tasks(sample_doc_id=sample_document, ty="raptor", priority=0, fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, doc_ids=list(document_ids))
|
||||
|
||||
if not KnowledgebaseService.update_by_id(kb.id, {"raptor_task_id": task_id}):
|
||||
logging.warning(f"Cannot save raptor_task_id for Dataset {dataset_id}")
|
||||
|
||||
return get_result(data={"raptor_task_id": task_id})
|
||||
|
||||
|
||||
@manager.route("/datasets/<dataset_id>/trace_raptor", methods=["GET"]) # noqa: F821
|
||||
@token_required
|
||||
def trace_raptor(tenant_id, dataset_id):
|
||||
if not dataset_id:
|
||||
return get_error_data_result(message='Lack of "Dataset ID"')
|
||||
|
||||
if not KnowledgebaseService.accessible(dataset_id, tenant_id):
|
||||
return get_result(
|
||||
data=False,
|
||||
message='No authorization.',
|
||||
code=RetCode.AUTHENTICATION_ERROR
|
||||
)
|
||||
ok, kb = KnowledgebaseService.get_by_id(dataset_id)
|
||||
if not ok:
|
||||
return get_error_data_result(message="Invalid Dataset ID")
|
||||
|
||||
task_id = kb.raptor_task_id
|
||||
if not task_id:
|
||||
return get_result(data={})
|
||||
|
||||
ok, task = TaskService.get_by_id(task_id)
|
||||
if not ok:
|
||||
return get_error_data_result(message="RAPTOR Task Not Found or Error Occurred")
|
||||
|
||||
return get_result(data=task.to_dict())
|
||||
@ -1,613 +0,0 @@
|
||||
#
|
||||
# Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import logging
|
||||
import json
|
||||
import os
|
||||
from common.constants import PAGERANK_FLD
|
||||
from common import settings
|
||||
from api.db.db_models import File
|
||||
from api.db.services.document_service import DocumentService, queue_raptor_o_graphrag_tasks
|
||||
from api.db.services.file2document_service import File2DocumentService
|
||||
from api.db.services.file_service import FileService
|
||||
from api.db.services.knowledgebase_service import KnowledgebaseService
|
||||
from api.db.services.connector_service import Connector2KbService
|
||||
from api.db.services.task_service import GRAPH_RAPTOR_FAKE_DOC_ID, TaskService
|
||||
from api.db.services.user_service import TenantService, UserService
|
||||
from common.constants import FileSource, StatusEnum
|
||||
from api.utils.api_utils import deep_merge, get_parser_config, remap_dictionary_keys, verify_embedding_availability
|
||||
|
||||
|
||||
async def create_dataset(tenant_id: str, req: dict):
|
||||
"""
|
||||
Create a new dataset.
|
||||
|
||||
:param tenant_id: tenant ID
|
||||
:param req: dataset creation request
|
||||
:return: (success, result) or (success, error_message)
|
||||
"""
|
||||
# Extract ext field for additional parameters
|
||||
ext_fields = req.pop("ext", {})
|
||||
|
||||
# Map auto_metadata_config (if provided) into parser_config structure
|
||||
auto_meta = req.pop("auto_metadata_config", {})
|
||||
if auto_meta:
|
||||
parser_cfg = req.get("parser_config") or {}
|
||||
fields = []
|
||||
for f in auto_meta.get("fields", []):
|
||||
fields.append(
|
||||
{
|
||||
"name": f.get("name", ""),
|
||||
"type": f.get("type", ""),
|
||||
"description": f.get("description"),
|
||||
"examples": f.get("examples"),
|
||||
"restrict_values": f.get("restrict_values", False),
|
||||
}
|
||||
)
|
||||
parser_cfg["metadata"] = fields
|
||||
parser_cfg["enable_metadata"] = auto_meta.get("enabled", True)
|
||||
req["parser_config"] = parser_cfg
|
||||
req.update(ext_fields)
|
||||
|
||||
e, create_dict = KnowledgebaseService.create_with_name(
|
||||
name=req.pop("name", None),
|
||||
tenant_id=tenant_id,
|
||||
parser_id=req.pop("parser_id", None),
|
||||
**req
|
||||
)
|
||||
|
||||
if not e:
|
||||
return False, create_dict
|
||||
|
||||
# Insert embedding model(embd id)
|
||||
ok, t = TenantService.get_by_id(tenant_id)
|
||||
if not ok:
|
||||
return False, "Tenant not found"
|
||||
if not create_dict.get("embd_id"):
|
||||
create_dict["embd_id"] = t.embd_id
|
||||
else:
|
||||
ok, err = verify_embedding_availability(create_dict["embd_id"], tenant_id)
|
||||
if not ok:
|
||||
return False, err
|
||||
|
||||
if not KnowledgebaseService.save(**create_dict):
|
||||
return False, "Failed to save dataset"
|
||||
ok, k = KnowledgebaseService.get_by_id(create_dict["id"])
|
||||
if not ok:
|
||||
return False, "Dataset created failed"
|
||||
response_data = remap_dictionary_keys(k.to_dict())
|
||||
return True, response_data
|
||||
|
||||
|
||||
async def delete_datasets(tenant_id: str, ids: list = None, delete_all: bool = False):
|
||||
"""
|
||||
Delete datasets.
|
||||
|
||||
:param tenant_id: tenant ID
|
||||
:param ids: list of dataset IDs
|
||||
:param delete_all: whether to delete all datasets of the tenant (if ids is not provided)
|
||||
:return: (success, result) or (success, error_message)
|
||||
"""
|
||||
kb_id_instance_pairs = []
|
||||
if not ids:
|
||||
if not delete_all:
|
||||
return True, {"success_count": 0}
|
||||
else:
|
||||
ids = [kb.id for kb in KnowledgebaseService.query(tenant_id=tenant_id)]
|
||||
|
||||
error_kb_ids = []
|
||||
for kb_id in ids:
|
||||
kb = KnowledgebaseService.get_or_none(id=kb_id, tenant_id=tenant_id)
|
||||
if kb is None:
|
||||
error_kb_ids.append(kb_id)
|
||||
continue
|
||||
kb_id_instance_pairs.append((kb_id, kb))
|
||||
if len(error_kb_ids) > 0:
|
||||
return False, f"""User '{tenant_id}' lacks permission for datasets: '{", ".join(error_kb_ids)}'"""
|
||||
|
||||
errors = []
|
||||
success_count = 0
|
||||
for kb_id, kb in kb_id_instance_pairs:
|
||||
for doc in DocumentService.query(kb_id=kb_id):
|
||||
if not DocumentService.remove_document(doc, tenant_id):
|
||||
errors.append(f"Remove document '{doc.id}' error for dataset '{kb_id}'")
|
||||
continue
|
||||
f2d = File2DocumentService.get_by_document_id(doc.id)
|
||||
FileService.filter_delete(
|
||||
[
|
||||
File.source_type == FileSource.KNOWLEDGEBASE,
|
||||
File.id == f2d[0].file_id,
|
||||
]
|
||||
)
|
||||
File2DocumentService.delete_by_document_id(doc.id)
|
||||
FileService.filter_delete(
|
||||
[File.source_type == FileSource.KNOWLEDGEBASE, File.type == "folder", File.name == kb.name])
|
||||
|
||||
# Drop index for this dataset
|
||||
try:
|
||||
from rag.nlp import search
|
||||
idxnm = search.index_name(kb.tenant_id)
|
||||
settings.docStoreConn.delete_idx(idxnm, kb_id)
|
||||
except Exception as e:
|
||||
errors.append(f"Failed to drop index for dataset {kb_id}: {e}")
|
||||
|
||||
if not KnowledgebaseService.delete_by_id(kb_id):
|
||||
errors.append(f"Delete dataset error for {kb_id}")
|
||||
continue
|
||||
success_count += 1
|
||||
|
||||
if not errors:
|
||||
return True, {"success_count": success_count}
|
||||
|
||||
error_message = f"Successfully deleted {success_count} datasets, {len(errors)} failed. Details: {'; '.join(errors)[:128]}..."
|
||||
if success_count == 0:
|
||||
return False, error_message
|
||||
|
||||
return True, {"success_count": success_count, "errors": errors[:5]}
|
||||
|
||||
|
||||
async def update_dataset(tenant_id: str, dataset_id: str, req: dict):
|
||||
"""
|
||||
Update a dataset.
|
||||
|
||||
:param tenant_id: tenant ID
|
||||
:param dataset_id: dataset ID
|
||||
:param req: dataset update request
|
||||
:return: (success, result) or (success, error_message)
|
||||
"""
|
||||
if not req:
|
||||
return False, "No properties were modified"
|
||||
|
||||
kb = KnowledgebaseService.get_or_none(id=dataset_id, tenant_id=tenant_id)
|
||||
if kb is None:
|
||||
return False, f"User '{tenant_id}' lacks permission for dataset '{dataset_id}'"
|
||||
|
||||
# Extract ext field for additional parameters
|
||||
ext_fields = req.pop("ext", {})
|
||||
|
||||
# Map auto_metadata_config into parser_config if present
|
||||
auto_meta = req.pop("auto_metadata_config", {})
|
||||
if auto_meta:
|
||||
parser_cfg = req.get("parser_config") or {}
|
||||
fields = []
|
||||
for f in auto_meta.get("fields", []):
|
||||
fields.append(
|
||||
{
|
||||
"name": f.get("name", ""),
|
||||
"type": f.get("type", ""),
|
||||
"description": f.get("description"),
|
||||
"examples": f.get("examples"),
|
||||
"restrict_values": f.get("restrict_values", False),
|
||||
}
|
||||
)
|
||||
parser_cfg["metadata"] = fields
|
||||
parser_cfg["enable_metadata"] = auto_meta.get("enabled", True)
|
||||
req["parser_config"] = parser_cfg
|
||||
|
||||
# Merge ext fields with req
|
||||
req.update(ext_fields)
|
||||
|
||||
# Extract connectors from request
|
||||
connectors = []
|
||||
if "connectors" in req:
|
||||
connectors = req["connectors"]
|
||||
del req["connectors"]
|
||||
|
||||
if req.get("parser_config"):
|
||||
parser_config = req["parser_config"]
|
||||
req_ext_fields = parser_config.pop("ext", {})
|
||||
parser_config.update(req_ext_fields)
|
||||
req["parser_config"] = deep_merge(kb.parser_config, parser_config)
|
||||
|
||||
if (chunk_method := req.get("parser_id")) and chunk_method != kb.parser_id:
|
||||
if not req.get("parser_config"):
|
||||
req["parser_config"] = get_parser_config(chunk_method, None)
|
||||
elif "parser_config" in req and not req["parser_config"]:
|
||||
del req["parser_config"]
|
||||
|
||||
if "name" in req and req["name"].lower() != kb.name.lower():
|
||||
exists = KnowledgebaseService.get_or_none(name=req["name"], tenant_id=tenant_id,
|
||||
status=StatusEnum.VALID.value)
|
||||
if exists:
|
||||
return False, f"Dataset name '{req['name']}' already exists"
|
||||
|
||||
if "embd_id" in req:
|
||||
if not req["embd_id"]:
|
||||
req["embd_id"] = kb.embd_id
|
||||
if kb.chunk_num != 0 and req["embd_id"] != kb.embd_id:
|
||||
return False, f"When chunk_num ({kb.chunk_num}) > 0, embedding_model must remain {kb.embd_id}"
|
||||
ok, err = verify_embedding_availability(req["embd_id"], tenant_id)
|
||||
if not ok:
|
||||
return False, err
|
||||
|
||||
if "pagerank" in req and req["pagerank"] != kb.pagerank:
|
||||
if os.environ.get("DOC_ENGINE", "elasticsearch") == "infinity":
|
||||
return False, "'pagerank' can only be set when doc_engine is elasticsearch"
|
||||
|
||||
if req["pagerank"] > 0:
|
||||
from rag.nlp import search
|
||||
settings.docStoreConn.update({"kb_id": kb.id}, {PAGERANK_FLD: req["pagerank"]},
|
||||
search.index_name(kb.tenant_id), kb.id)
|
||||
else:
|
||||
# Elasticsearch requires PAGERANK_FLD be non-zero!
|
||||
from rag.nlp import search
|
||||
settings.docStoreConn.update({"exists": PAGERANK_FLD}, {"remove": PAGERANK_FLD},
|
||||
search.index_name(kb.tenant_id), kb.id)
|
||||
|
||||
if not KnowledgebaseService.update_by_id(kb.id, req):
|
||||
return False, "Update dataset error.(Database error)"
|
||||
|
||||
ok, k = KnowledgebaseService.get_by_id(kb.id)
|
||||
if not ok:
|
||||
return False, "Dataset updated failed"
|
||||
|
||||
# Link connectors to the dataset
|
||||
errors = Connector2KbService.link_connectors(kb.id, [conn for conn in connectors], tenant_id)
|
||||
if errors:
|
||||
logging.error("Link KB errors: %s", errors)
|
||||
|
||||
response_data = remap_dictionary_keys(k.to_dict())
|
||||
response_data["connectors"] = connectors
|
||||
return True, response_data
|
||||
|
||||
|
||||
def list_datasets(tenant_id: str, args: dict):
|
||||
"""
|
||||
List datasets.
|
||||
|
||||
:param tenant_id: tenant ID
|
||||
:param args: query arguments
|
||||
:return: (success, result) or (success, error_message)
|
||||
"""
|
||||
kb_id = args.get("id")
|
||||
name = args.get("name")
|
||||
page = int(args.get("page", 1))
|
||||
page_size = int(args.get("page_size", 30))
|
||||
ext_fields = args.get("ext", {})
|
||||
parser_id = ext_fields.get("parser_id")
|
||||
keywords = ext_fields.get("keywords", "")
|
||||
orderby = args.get("orderby", "create_time")
|
||||
desc_arg = args.get("desc", "true")
|
||||
if isinstance(desc_arg, str):
|
||||
desc = desc_arg.lower() != "false"
|
||||
elif isinstance(desc_arg, bool):
|
||||
desc = desc_arg
|
||||
else:
|
||||
# unknown type, default to True
|
||||
desc = True
|
||||
|
||||
if kb_id:
|
||||
kbs = KnowledgebaseService.get_kb_by_id(kb_id, tenant_id)
|
||||
if not kbs:
|
||||
return False, f"User '{tenant_id}' lacks permission for dataset '{kb_id}'"
|
||||
if name:
|
||||
kbs = KnowledgebaseService.get_kb_by_name(name, tenant_id)
|
||||
if not kbs:
|
||||
return False, f"User '{tenant_id}' lacks permission for dataset '{name}'"
|
||||
if ext_fields.get("owner_ids", []):
|
||||
tenant_ids = ext_fields["owner_ids"]
|
||||
else:
|
||||
tenants = TenantService.get_joined_tenants_by_user_id(tenant_id)
|
||||
tenant_ids = [m["tenant_id"] for m in tenants]
|
||||
kbs, total = KnowledgebaseService.get_list(
|
||||
tenant_ids,
|
||||
tenant_id,
|
||||
page,
|
||||
page_size,
|
||||
orderby,
|
||||
desc,
|
||||
kb_id,
|
||||
name,
|
||||
keywords,
|
||||
parser_id
|
||||
)
|
||||
users = UserService.get_by_ids([m["tenant_id"] for m in kbs])
|
||||
user_map = {m.id: m.to_dict() for m in users}
|
||||
response_data_list = []
|
||||
for kb in kbs:
|
||||
user_dict = user_map.get(kb["tenant_id"], {})
|
||||
kb.update({
|
||||
"nickname": user_dict.get("nickname", ""),
|
||||
"tenant_avatar": user_dict.get("avatar", "")
|
||||
})
|
||||
response_data_list.append(remap_dictionary_keys(kb))
|
||||
return True, {"data": response_data_list, "total": total}
|
||||
|
||||
|
||||
async def get_knowledge_graph(dataset_id: str, tenant_id: str):
|
||||
"""
|
||||
Get knowledge graph for a dataset.
|
||||
|
||||
:param dataset_id: dataset ID
|
||||
:param tenant_id: tenant ID
|
||||
:return: (success, result) or (success, error_message)
|
||||
"""
|
||||
if not KnowledgebaseService.accessible(dataset_id, tenant_id):
|
||||
return False, "No authorization."
|
||||
_, kb = KnowledgebaseService.get_by_id(dataset_id)
|
||||
|
||||
req = {
|
||||
"kb_id": [dataset_id],
|
||||
"knowledge_graph_kwd": ["graph"]
|
||||
}
|
||||
|
||||
obj = {"graph": {}, "mind_map": {}}
|
||||
from rag.nlp import search
|
||||
if not settings.docStoreConn.index_exist(search.index_name(kb.tenant_id), dataset_id):
|
||||
return True, obj
|
||||
sres = await settings.retriever.search(req, search.index_name(kb.tenant_id), [dataset_id])
|
||||
if not len(sres.ids):
|
||||
return True, obj
|
||||
|
||||
for id in sres.ids[:1]:
|
||||
ty = sres.field[id]["knowledge_graph_kwd"]
|
||||
try:
|
||||
content_json = json.loads(sres.field[id]["content_with_weight"])
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
obj[ty] = content_json
|
||||
|
||||
if "nodes" in obj["graph"]:
|
||||
obj["graph"]["nodes"] = sorted(obj["graph"]["nodes"], key=lambda x: x.get("pagerank", 0), reverse=True)[:256]
|
||||
if "edges" in obj["graph"]:
|
||||
node_id_set = {o["id"] for o in obj["graph"]["nodes"]}
|
||||
filtered_edges = [o for o in obj["graph"]["edges"] if
|
||||
o["source"] != o["target"] and o["source"] in node_id_set and o["target"] in node_id_set]
|
||||
obj["graph"]["edges"] = sorted(filtered_edges, key=lambda x: x.get("weight", 0), reverse=True)[:128]
|
||||
return True, obj
|
||||
|
||||
|
||||
def delete_knowledge_graph(dataset_id: str, tenant_id: str):
|
||||
"""
|
||||
Delete knowledge graph for a dataset.
|
||||
|
||||
:param dataset_id: dataset ID
|
||||
:param tenant_id: tenant ID
|
||||
:return: (success, result) or (success, error_message)
|
||||
"""
|
||||
if not KnowledgebaseService.accessible(dataset_id, tenant_id):
|
||||
return False, "No authorization."
|
||||
_, kb = KnowledgebaseService.get_by_id(dataset_id)
|
||||
from rag.nlp import search
|
||||
settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation"]},
|
||||
search.index_name(kb.tenant_id), dataset_id)
|
||||
|
||||
return True, True
|
||||
|
||||
|
||||
def run_graphrag(dataset_id: str, tenant_id: str):
|
||||
"""
|
||||
Run GraphRAG for a dataset.
|
||||
|
||||
:param dataset_id: dataset ID
|
||||
:param tenant_id: tenant ID
|
||||
:return: (success, result) or (success, error_message)
|
||||
"""
|
||||
if not dataset_id:
|
||||
return False, 'Lack of "Dataset ID"'
|
||||
if not KnowledgebaseService.accessible(dataset_id, tenant_id):
|
||||
return False, "No authorization."
|
||||
|
||||
ok, kb = KnowledgebaseService.get_by_id(dataset_id)
|
||||
if not ok:
|
||||
return False, "Invalid Dataset ID"
|
||||
|
||||
task_id = kb.graphrag_task_id
|
||||
if task_id:
|
||||
ok, task = TaskService.get_by_id(task_id)
|
||||
if not ok:
|
||||
logging.warning(f"A valid GraphRAG task id is expected for Dataset {dataset_id}")
|
||||
|
||||
if task and task.progress not in [-1, 1]:
|
||||
return False, f"Task {task_id} in progress with status {task.progress}. A Graph Task is already running."
|
||||
|
||||
documents, _ = DocumentService.get_by_kb_id(
|
||||
kb_id=dataset_id,
|
||||
page_number=0,
|
||||
items_per_page=0,
|
||||
orderby="create_time",
|
||||
desc=False,
|
||||
keywords="",
|
||||
run_status=[],
|
||||
types=[],
|
||||
suffix=[],
|
||||
)
|
||||
if not documents:
|
||||
return False, f"No documents in Dataset {dataset_id}"
|
||||
|
||||
sample_document = documents[0]
|
||||
document_ids = [document["id"] for document in documents]
|
||||
|
||||
task_id = queue_raptor_o_graphrag_tasks(sample_doc_id=sample_document, ty="graphrag", priority=0, fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, doc_ids=list(document_ids))
|
||||
|
||||
if not KnowledgebaseService.update_by_id(kb.id, {"graphrag_task_id": task_id}):
|
||||
logging.warning(f"Cannot save graphrag_task_id for Dataset {dataset_id}")
|
||||
|
||||
return True, {"graphrag_task_id": task_id}
|
||||
|
||||
|
||||
def trace_graphrag(dataset_id: str, tenant_id: str):
|
||||
"""
|
||||
Trace GraphRAG task for a dataset.
|
||||
|
||||
:param dataset_id: dataset ID
|
||||
:param tenant_id: tenant ID
|
||||
:return: (success, result) or (success, error_message)
|
||||
"""
|
||||
if not dataset_id:
|
||||
return False, 'Lack of "Dataset ID"'
|
||||
if not KnowledgebaseService.accessible(dataset_id, tenant_id):
|
||||
return False, "No authorization."
|
||||
|
||||
ok, kb = KnowledgebaseService.get_by_id(dataset_id)
|
||||
if not ok:
|
||||
return False, "Invalid Dataset ID"
|
||||
|
||||
task_id = kb.graphrag_task_id
|
||||
if not task_id:
|
||||
return True, {}
|
||||
|
||||
ok, task = TaskService.get_by_id(task_id)
|
||||
if not ok:
|
||||
return True, {}
|
||||
|
||||
return True, task.to_dict()
|
||||
|
||||
|
||||
def run_raptor(dataset_id: str, tenant_id: str):
|
||||
"""
|
||||
Run RAPTOR for a dataset.
|
||||
|
||||
:param dataset_id: dataset ID
|
||||
:param tenant_id: tenant ID
|
||||
:return: (success, result) or (success, error_message)
|
||||
"""
|
||||
if not dataset_id:
|
||||
return False, 'Lack of "Dataset ID"'
|
||||
if not KnowledgebaseService.accessible(dataset_id, tenant_id):
|
||||
return False, "No authorization."
|
||||
|
||||
ok, kb = KnowledgebaseService.get_by_id(dataset_id)
|
||||
if not ok:
|
||||
return False, "Invalid Dataset ID"
|
||||
|
||||
task_id = kb.raptor_task_id
|
||||
if task_id:
|
||||
ok, task = TaskService.get_by_id(task_id)
|
||||
if not ok:
|
||||
logging.warning(f"A valid RAPTOR task id is expected for Dataset {dataset_id}")
|
||||
|
||||
if task and task.progress not in [-1, 1]:
|
||||
return False, f"Task {task_id} in progress with status {task.progress}. A RAPTOR Task is already running."
|
||||
|
||||
documents, _ = DocumentService.get_by_kb_id(
|
||||
kb_id=dataset_id,
|
||||
page_number=0,
|
||||
items_per_page=0,
|
||||
orderby="create_time",
|
||||
desc=False,
|
||||
keywords="",
|
||||
run_status=[],
|
||||
types=[],
|
||||
suffix=[],
|
||||
)
|
||||
if not documents:
|
||||
return False, f"No documents in Dataset {dataset_id}"
|
||||
|
||||
sample_document = documents[0]
|
||||
document_ids = [document["id"] for document in documents]
|
||||
|
||||
task_id = queue_raptor_o_graphrag_tasks(sample_doc_id=sample_document, ty="raptor", priority=0, fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, doc_ids=list(document_ids))
|
||||
|
||||
if not KnowledgebaseService.update_by_id(kb.id, {"raptor_task_id": task_id}):
|
||||
logging.warning(f"Cannot save raptor_task_id for Dataset {dataset_id}")
|
||||
|
||||
return True, {"raptor_task_id": task_id}
|
||||
|
||||
|
||||
def trace_raptor(dataset_id: str, tenant_id: str):
|
||||
"""
|
||||
Trace RAPTOR task for a dataset.
|
||||
|
||||
:param dataset_id: dataset ID
|
||||
:param tenant_id: tenant ID
|
||||
:return: (success, result) or (success, error_message)
|
||||
"""
|
||||
if not dataset_id:
|
||||
return False, 'Lack of "Dataset ID"'
|
||||
|
||||
if not KnowledgebaseService.accessible(dataset_id, tenant_id):
|
||||
return False, "No authorization."
|
||||
|
||||
ok, kb = KnowledgebaseService.get_by_id(dataset_id)
|
||||
if not ok:
|
||||
return False, "Invalid Dataset ID"
|
||||
|
||||
task_id = kb.raptor_task_id
|
||||
if not task_id:
|
||||
return True, {}
|
||||
|
||||
ok, task = TaskService.get_by_id(task_id)
|
||||
if not ok:
|
||||
return False, "RAPTOR Task Not Found or Error Occurred"
|
||||
|
||||
return True, task.to_dict()
|
||||
|
||||
|
||||
def get_auto_metadata(dataset_id: str, tenant_id: str):
|
||||
"""
|
||||
Get auto-metadata configuration for a dataset.
|
||||
|
||||
:param dataset_id: dataset ID
|
||||
:param tenant_id: tenant ID
|
||||
:return: (success, result) or (success, error_message)
|
||||
"""
|
||||
kb = KnowledgebaseService.get_or_none(id=dataset_id, tenant_id=tenant_id)
|
||||
if kb is None:
|
||||
return False, f"User '{tenant_id}' lacks permission for dataset '{dataset_id}'"
|
||||
|
||||
parser_cfg = kb.parser_config or {}
|
||||
metadata = parser_cfg.get("metadata") or []
|
||||
enabled = parser_cfg.get("enable_metadata", bool(metadata))
|
||||
# Normalize to AutoMetadataConfig-like JSON
|
||||
fields = []
|
||||
for f in metadata:
|
||||
if not isinstance(f, dict):
|
||||
continue
|
||||
fields.append(
|
||||
{
|
||||
"name": f.get("name", ""),
|
||||
"type": f.get("type", ""),
|
||||
"description": f.get("description"),
|
||||
"examples": f.get("examples"),
|
||||
"restrict_values": f.get("restrict_values", False),
|
||||
}
|
||||
)
|
||||
return True, {"enabled": enabled, "fields": fields}
|
||||
|
||||
|
||||
async def update_auto_metadata(dataset_id: str, tenant_id: str, cfg: dict):
|
||||
"""
|
||||
Update auto-metadata configuration for a dataset.
|
||||
|
||||
:param dataset_id: dataset ID
|
||||
:param tenant_id: tenant ID
|
||||
:param cfg: auto-metadata configuration
|
||||
:return: (success, result) or (success, error_message)
|
||||
"""
|
||||
kb = KnowledgebaseService.get_or_none(id=dataset_id, tenant_id=tenant_id)
|
||||
if kb is None:
|
||||
return False, f"User '{tenant_id}' lacks permission for dataset '{dataset_id}'"
|
||||
|
||||
parser_cfg = kb.parser_config or {}
|
||||
fields = []
|
||||
for f in cfg.get("fields", []):
|
||||
fields.append(
|
||||
{
|
||||
"name": f.get("name", ""),
|
||||
"type": f.get("type", ""),
|
||||
"description": f.get("description"),
|
||||
"examples": f.get("examples"),
|
||||
"restrict_values": f.get("restrict_values", False),
|
||||
}
|
||||
)
|
||||
parser_cfg["metadata"] = fields
|
||||
parser_cfg["enable_metadata"] = cfg.get("enabled", True)
|
||||
|
||||
if not KnowledgebaseService.update_by_id(kb.id, {"parser_config": parser_cfg}):
|
||||
return False, "Update auto-metadata error.(Database error)"
|
||||
|
||||
return True, {"enabled": parser_cfg["enable_metadata"], "fields": fields}
|
||||
@ -1,5 +1,5 @@
|
||||
#
|
||||
# Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
||||
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
||||
@ -433,7 +433,7 @@ class KnowledgebaseService(CommonService):
|
||||
@classmethod
|
||||
@DB.connection_context()
|
||||
def get_list(cls, joined_tenant_ids, user_id,
|
||||
page_number, items_per_page, orderby, desc, id, name, keywords, parser_id=None):
|
||||
page_number, items_per_page, orderby, desc, id, name):
|
||||
# Get list of knowledge bases with filtering and pagination
|
||||
# Args:
|
||||
# joined_tenant_ids: List of tenant IDs
|
||||
@ -444,8 +444,6 @@ class KnowledgebaseService(CommonService):
|
||||
# desc: Boolean indicating descending order
|
||||
# id: Optional ID filter
|
||||
# name: Optional name filter
|
||||
# keywords: Optional keywords filter
|
||||
# parser_id: Optional parser ID filter
|
||||
# Returns:
|
||||
# List of knowledge bases
|
||||
# Total count of knowledge bases
|
||||
@ -454,11 +452,6 @@ class KnowledgebaseService(CommonService):
|
||||
kbs = kbs.where(cls.model.id == id)
|
||||
if name:
|
||||
kbs = kbs.where(cls.model.name == name)
|
||||
if keywords:
|
||||
kbs = kbs.where(fn.LOWER(cls.model.name).contains(keywords.lower()))
|
||||
if parser_id:
|
||||
kbs = kbs.where(cls.model.parser_id == parser_id)
|
||||
|
||||
kbs = kbs.where(
|
||||
((cls.model.tenant_id.in_(joined_tenant_ids) & (cls.model.permission ==
|
||||
TenantPermission.TEAM.value)) | (
|
||||
|
||||
@ -28,6 +28,7 @@ from typing import Any
|
||||
|
||||
import requests
|
||||
from quart import (
|
||||
Response,
|
||||
jsonify,
|
||||
request,
|
||||
has_app_context,
|
||||
@ -233,17 +234,6 @@ def active_required(func):
|
||||
return wrapper
|
||||
|
||||
|
||||
def add_tenant_id_to_kwargs(func):
|
||||
@wraps(func)
|
||||
async def wrapper(**kwargs):
|
||||
from api.apps import current_user
|
||||
kwargs["tenant_id"] = current_user.id
|
||||
if inspect.iscoroutinefunction(func):
|
||||
return await func(**kwargs)
|
||||
return func(**kwargs)
|
||||
return wrapper
|
||||
|
||||
|
||||
def get_json_result(code: RetCode = RetCode.SUCCESS, message="success", data=None):
|
||||
response = {"code": code, "message": message, "data": data}
|
||||
return _safe_jsonify(response)
|
||||
@ -523,7 +513,7 @@ def check_duplicate_ids(ids, id_type="item"):
|
||||
return list(set(ids)), duplicate_messages
|
||||
|
||||
|
||||
def verify_embedding_availability(embd_id: str, tenant_id: str) -> tuple[bool, str | None]:
|
||||
def verify_embedding_availability(embd_id: str, tenant_id: str) -> tuple[bool, Response | None]:
|
||||
from api.db.services.llm_service import LLMService
|
||||
from api.db.services.tenant_llm_service import TenantLLMService
|
||||
|
||||
@ -569,16 +559,13 @@ def verify_embedding_availability(embd_id: str, tenant_id: str) -> tuple[bool, s
|
||||
|
||||
is_builtin_model = llm_factory == "Builtin"
|
||||
if not (is_builtin_model or is_tenant_model or in_llm_service):
|
||||
return False, f"Unsupported model: <{embd_id}>"
|
||||
return False, get_error_argument_result(f"Unsupported model: <{embd_id}>")
|
||||
|
||||
if not (is_builtin_model or is_tenant_model):
|
||||
return False, f"Unauthorized model: <{embd_id}>"
|
||||
return False, get_error_argument_result(f"Unauthorized model: <{embd_id}>")
|
||||
except OperationalError as e:
|
||||
logging.exception(e)
|
||||
return False, "Database operation failed"
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
return False, "Internal server error"
|
||||
return False, get_error_data_result(message="Database operation failed")
|
||||
|
||||
return True, None
|
||||
|
||||
|
||||
@ -27,7 +27,6 @@ from pydantic import (
|
||||
ValidationError,
|
||||
field_validator,
|
||||
model_validator,
|
||||
ValidationInfo
|
||||
)
|
||||
from pydantic_core import PydanticCustomError
|
||||
from werkzeug.exceptions import BadRequest, UnsupportedMediaType
|
||||
@ -163,15 +162,6 @@ def validate_and_parse_request_args(request: Request, validator: type[BaseModel]
|
||||
- Preserves type conversion from Pydantic validation
|
||||
"""
|
||||
args = request.args.to_dict(flat=True)
|
||||
|
||||
# Handle ext parameter: parse JSON string to dict if it's a string
|
||||
if 'ext' in args and isinstance(args['ext'], str):
|
||||
import json
|
||||
try:
|
||||
args['ext'] = json.loads(args['ext'])
|
||||
except json.JSONDecodeError:
|
||||
pass # Keep the string and let validation handle the error
|
||||
|
||||
try:
|
||||
if extras is not None:
|
||||
args.update(extras)
|
||||
@ -346,7 +336,6 @@ class RaptorConfig(Base):
|
||||
max_cluster: Annotated[int, Field(default=64, ge=1, le=1024)]
|
||||
random_seed: Annotated[int, Field(default=0, ge=0)]
|
||||
auto_disable_for_structured_data: Annotated[bool, Field(default=True)]
|
||||
ext: Annotated[dict, Field(default={})]
|
||||
|
||||
|
||||
class GraphragConfig(Base):
|
||||
@ -388,7 +377,6 @@ class ParserConfig(Base):
|
||||
filename_embd_weight: Annotated[float | None, Field(default=0.1, ge=0.0, le=1.0)]
|
||||
task_page_size: Annotated[int | None, Field(default=None, ge=1)]
|
||||
pages: Annotated[list[list[int]] | None, Field(default=None)]
|
||||
ext: Annotated[dict, Field(default={})]
|
||||
|
||||
|
||||
class CreateDatasetReq(Base):
|
||||
@ -402,25 +390,6 @@ class CreateDatasetReq(Base):
|
||||
pipeline_id: Annotated[str | None, Field(default=None, min_length=32, max_length=32, serialization_alias="pipeline_id")]
|
||||
parser_config: Annotated[ParserConfig | None, Field(default=None)]
|
||||
auto_metadata_config: Annotated[AutoMetadataConfig | None, Field(default=None)]
|
||||
ext: Annotated[dict, Field(default={})]
|
||||
|
||||
@field_validator("pipeline_id", mode="before")
|
||||
@classmethod
|
||||
def handle_pipeline_id(cls, v: str | None, info: ValidationInfo):
|
||||
if v is None:
|
||||
return v
|
||||
if info.data.get("chunk_method") is not None and isinstance(v, str):
|
||||
v = None
|
||||
return v
|
||||
|
||||
@field_validator("parse_type", mode="before")
|
||||
@classmethod
|
||||
def handle_parse_type(cls, v: int | None, info: ValidationInfo):
|
||||
if v is None:
|
||||
return v
|
||||
if info.data.get("chunk_method") is not None and isinstance(v, int):
|
||||
v = None
|
||||
return v
|
||||
|
||||
@field_validator("avatar", mode="after")
|
||||
@classmethod
|
||||
@ -778,4 +747,3 @@ class BaseListReq(BaseModel):
|
||||
|
||||
class ListDatasetReq(BaseListReq):
|
||||
include_parsing_status: Annotated[bool, Field(default=False)]
|
||||
ext: Annotated[dict, Field(default={})]
|
||||
|
||||
Reference in New Issue
Block a user