mirror of
https://github.com/langgenius/dify.git
synced 2026-03-13 19:17:43 +08:00
467 lines
20 KiB
Python
467 lines
20 KiB
Python
import json
|
|
|
|
import click
|
|
from flask import current_app
|
|
from sqlalchemy import select
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
from configs import dify_config
|
|
from core.rag.datasource.vdb.vector_factory import Vector
|
|
from core.rag.datasource.vdb.vector_type import VectorType
|
|
from core.rag.index_processor.constant.built_in_field import BuiltInField
|
|
from core.rag.models.document import ChildDocument, Document
|
|
from extensions.ext_database import db
|
|
from models.dataset import Dataset, DatasetCollectionBinding, DatasetMetadata, DatasetMetadataBinding, DocumentSegment
|
|
from models.dataset import Document as DatasetDocument
|
|
from models.model import App, AppAnnotationSetting, MessageAnnotation
|
|
|
|
|
|
@click.command("vdb-migrate", help="Migrate vector db.")
|
|
@click.option("--scope", default="all", prompt=False, help="The scope of vector database to migrate, Default is All.")
|
|
def vdb_migrate(scope: str):
|
|
if scope in {"knowledge", "all"}:
|
|
migrate_knowledge_vector_database()
|
|
if scope in {"annotation", "all"}:
|
|
migrate_annotation_vector_database()
|
|
|
|
|
|
def migrate_annotation_vector_database():
|
|
"""
|
|
Migrate annotation datas to target vector database .
|
|
"""
|
|
click.echo(click.style("Starting annotation data migration.", fg="green"))
|
|
create_count = 0
|
|
skipped_count = 0
|
|
total_count = 0
|
|
page = 1
|
|
while True:
|
|
try:
|
|
# get apps info
|
|
per_page = 50
|
|
with sessionmaker(db.engine, expire_on_commit=False).begin() as session:
|
|
apps = (
|
|
session.query(App)
|
|
.where(App.status == "normal")
|
|
.order_by(App.created_at.desc())
|
|
.limit(per_page)
|
|
.offset((page - 1) * per_page)
|
|
.all()
|
|
)
|
|
if not apps:
|
|
break
|
|
except SQLAlchemyError:
|
|
raise
|
|
|
|
page += 1
|
|
for app in apps:
|
|
total_count = total_count + 1
|
|
click.echo(
|
|
f"Processing the {total_count} app {app.id}. " + f"{create_count} created, {skipped_count} skipped."
|
|
)
|
|
try:
|
|
click.echo(f"Creating app annotation index: {app.id}")
|
|
with sessionmaker(db.engine, expire_on_commit=False).begin() as session:
|
|
app_annotation_setting = (
|
|
session.query(AppAnnotationSetting).where(AppAnnotationSetting.app_id == app.id).first()
|
|
)
|
|
|
|
if not app_annotation_setting:
|
|
skipped_count = skipped_count + 1
|
|
click.echo(f"App annotation setting disabled: {app.id}")
|
|
continue
|
|
# get dataset_collection_binding info
|
|
dataset_collection_binding = (
|
|
session.query(DatasetCollectionBinding)
|
|
.where(DatasetCollectionBinding.id == app_annotation_setting.collection_binding_id)
|
|
.first()
|
|
)
|
|
if not dataset_collection_binding:
|
|
click.echo(f"App annotation collection binding not found: {app.id}")
|
|
continue
|
|
annotations = session.scalars(
|
|
select(MessageAnnotation).where(MessageAnnotation.app_id == app.id)
|
|
).all()
|
|
dataset = Dataset(
|
|
id=app.id,
|
|
tenant_id=app.tenant_id,
|
|
indexing_technique="high_quality",
|
|
embedding_model_provider=dataset_collection_binding.provider_name,
|
|
embedding_model=dataset_collection_binding.model_name,
|
|
collection_binding_id=dataset_collection_binding.id,
|
|
)
|
|
documents = []
|
|
if annotations:
|
|
for annotation in annotations:
|
|
document = Document(
|
|
page_content=annotation.question_text,
|
|
metadata={"annotation_id": annotation.id, "app_id": app.id, "doc_id": annotation.id},
|
|
)
|
|
documents.append(document)
|
|
|
|
vector = Vector(dataset, attributes=["doc_id", "annotation_id", "app_id"])
|
|
click.echo(f"Migrating annotations for app: {app.id}.")
|
|
|
|
try:
|
|
vector.delete()
|
|
click.echo(click.style(f"Deleted vector index for app {app.id}.", fg="green"))
|
|
except Exception as e:
|
|
click.echo(click.style(f"Failed to delete vector index for app {app.id}.", fg="red"))
|
|
raise e
|
|
if documents:
|
|
try:
|
|
click.echo(
|
|
click.style(
|
|
f"Creating vector index with {len(documents)} annotations for app {app.id}.",
|
|
fg="green",
|
|
)
|
|
)
|
|
vector.create(documents)
|
|
click.echo(click.style(f"Created vector index for app {app.id}.", fg="green"))
|
|
except Exception as e:
|
|
click.echo(click.style(f"Failed to created vector index for app {app.id}.", fg="red"))
|
|
raise e
|
|
click.echo(f"Successfully migrated app annotation {app.id}.")
|
|
create_count += 1
|
|
except Exception as e:
|
|
click.echo(
|
|
click.style(f"Error creating app annotation index: {e.__class__.__name__} {str(e)}", fg="red")
|
|
)
|
|
continue
|
|
|
|
click.echo(
|
|
click.style(
|
|
f"Migration complete. Created {create_count} app annotation indexes. Skipped {skipped_count} apps.",
|
|
fg="green",
|
|
)
|
|
)
|
|
|
|
|
|
def migrate_knowledge_vector_database():
|
|
"""
|
|
Migrate vector database datas to target vector database .
|
|
"""
|
|
click.echo(click.style("Starting vector database migration.", fg="green"))
|
|
create_count = 0
|
|
skipped_count = 0
|
|
total_count = 0
|
|
vector_type = dify_config.VECTOR_STORE
|
|
upper_collection_vector_types = {
|
|
VectorType.MILVUS,
|
|
VectorType.PGVECTOR,
|
|
VectorType.VASTBASE,
|
|
VectorType.RELYT,
|
|
VectorType.WEAVIATE,
|
|
VectorType.ORACLE,
|
|
VectorType.ELASTICSEARCH,
|
|
VectorType.OPENGAUSS,
|
|
VectorType.TABLESTORE,
|
|
VectorType.MATRIXONE,
|
|
}
|
|
lower_collection_vector_types = {
|
|
VectorType.ANALYTICDB,
|
|
VectorType.CHROMA,
|
|
VectorType.MYSCALE,
|
|
VectorType.PGVECTO_RS,
|
|
VectorType.TIDB_VECTOR,
|
|
VectorType.OPENSEARCH,
|
|
VectorType.TENCENT,
|
|
VectorType.BAIDU,
|
|
VectorType.VIKINGDB,
|
|
VectorType.UPSTASH,
|
|
VectorType.COUCHBASE,
|
|
VectorType.OCEANBASE,
|
|
}
|
|
page = 1
|
|
while True:
|
|
try:
|
|
stmt = (
|
|
select(Dataset).where(Dataset.indexing_technique == "high_quality").order_by(Dataset.created_at.desc())
|
|
)
|
|
|
|
datasets = db.paginate(select=stmt, page=page, per_page=50, max_per_page=50, error_out=False)
|
|
if not datasets.items:
|
|
break
|
|
except SQLAlchemyError:
|
|
raise
|
|
|
|
page += 1
|
|
for dataset in datasets:
|
|
total_count = total_count + 1
|
|
click.echo(
|
|
f"Processing the {total_count} dataset {dataset.id}. {create_count} created, {skipped_count} skipped."
|
|
)
|
|
try:
|
|
click.echo(f"Creating dataset vector database index: {dataset.id}")
|
|
if dataset.index_struct_dict:
|
|
if dataset.index_struct_dict["type"] == vector_type:
|
|
skipped_count = skipped_count + 1
|
|
continue
|
|
collection_name = ""
|
|
dataset_id = dataset.id
|
|
if vector_type in upper_collection_vector_types:
|
|
collection_name = Dataset.gen_collection_name_by_id(dataset_id)
|
|
elif vector_type == VectorType.QDRANT:
|
|
if dataset.collection_binding_id:
|
|
dataset_collection_binding = (
|
|
db.session.query(DatasetCollectionBinding)
|
|
.where(DatasetCollectionBinding.id == dataset.collection_binding_id)
|
|
.one_or_none()
|
|
)
|
|
if dataset_collection_binding:
|
|
collection_name = dataset_collection_binding.collection_name
|
|
else:
|
|
raise ValueError("Dataset Collection Binding not found")
|
|
else:
|
|
collection_name = Dataset.gen_collection_name_by_id(dataset_id)
|
|
|
|
elif vector_type in lower_collection_vector_types:
|
|
collection_name = Dataset.gen_collection_name_by_id(dataset_id).lower()
|
|
else:
|
|
raise ValueError(f"Vector store {vector_type} is not supported.")
|
|
|
|
index_struct_dict = {"type": vector_type, "vector_store": {"class_prefix": collection_name}}
|
|
dataset.index_struct = json.dumps(index_struct_dict)
|
|
vector = Vector(dataset)
|
|
click.echo(f"Migrating dataset {dataset.id}.")
|
|
|
|
try:
|
|
vector.delete()
|
|
click.echo(
|
|
click.style(f"Deleted vector index {collection_name} for dataset {dataset.id}.", fg="green")
|
|
)
|
|
except Exception as e:
|
|
click.echo(
|
|
click.style(
|
|
f"Failed to delete vector index {collection_name} for dataset {dataset.id}.", fg="red"
|
|
)
|
|
)
|
|
raise e
|
|
|
|
dataset_documents = db.session.scalars(
|
|
select(DatasetDocument).where(
|
|
DatasetDocument.dataset_id == dataset.id,
|
|
DatasetDocument.indexing_status == "completed",
|
|
DatasetDocument.enabled == True,
|
|
DatasetDocument.archived == False,
|
|
)
|
|
).all()
|
|
|
|
documents = []
|
|
segments_count = 0
|
|
for dataset_document in dataset_documents:
|
|
segments = db.session.scalars(
|
|
select(DocumentSegment).where(
|
|
DocumentSegment.document_id == dataset_document.id,
|
|
DocumentSegment.status == "completed",
|
|
DocumentSegment.enabled == True,
|
|
)
|
|
).all()
|
|
|
|
for segment in segments:
|
|
document = Document(
|
|
page_content=segment.content,
|
|
metadata={
|
|
"doc_id": segment.index_node_id,
|
|
"doc_hash": segment.index_node_hash,
|
|
"document_id": segment.document_id,
|
|
"dataset_id": segment.dataset_id,
|
|
},
|
|
)
|
|
if dataset_document.doc_form == "hierarchical_model":
|
|
child_chunks = segment.get_child_chunks()
|
|
if child_chunks:
|
|
child_documents = []
|
|
for child_chunk in child_chunks:
|
|
child_document = ChildDocument(
|
|
page_content=child_chunk.content,
|
|
metadata={
|
|
"doc_id": child_chunk.index_node_id,
|
|
"doc_hash": child_chunk.index_node_hash,
|
|
"document_id": segment.document_id,
|
|
"dataset_id": segment.dataset_id,
|
|
},
|
|
)
|
|
child_documents.append(child_document)
|
|
document.children = child_documents
|
|
|
|
documents.append(document)
|
|
segments_count = segments_count + 1
|
|
|
|
if documents:
|
|
try:
|
|
click.echo(
|
|
click.style(
|
|
f"Creating vector index with {len(documents)} documents of {segments_count}"
|
|
f" segments for dataset {dataset.id}.",
|
|
fg="green",
|
|
)
|
|
)
|
|
all_child_documents = []
|
|
for doc in documents:
|
|
if doc.children:
|
|
all_child_documents.extend(doc.children)
|
|
vector.create(documents)
|
|
if all_child_documents:
|
|
vector.create(all_child_documents)
|
|
click.echo(click.style(f"Created vector index for dataset {dataset.id}.", fg="green"))
|
|
except Exception as e:
|
|
click.echo(click.style(f"Failed to created vector index for dataset {dataset.id}.", fg="red"))
|
|
raise e
|
|
db.session.add(dataset)
|
|
db.session.commit()
|
|
click.echo(f"Successfully migrated dataset {dataset.id}.")
|
|
create_count += 1
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
click.echo(click.style(f"Error creating dataset index: {e.__class__.__name__} {str(e)}", fg="red"))
|
|
continue
|
|
|
|
click.echo(
|
|
click.style(
|
|
f"Migration complete. Created {create_count} dataset indexes. Skipped {skipped_count} datasets.", fg="green"
|
|
)
|
|
)
|
|
|
|
|
|
@click.command("add-qdrant-index", help="Add Qdrant index.")
|
|
@click.option("--field", default="metadata.doc_id", prompt=False, help="Index field , default is metadata.doc_id.")
|
|
def add_qdrant_index(field: str):
|
|
click.echo(click.style("Starting Qdrant index creation.", fg="green"))
|
|
|
|
create_count = 0
|
|
|
|
try:
|
|
bindings = db.session.query(DatasetCollectionBinding).all()
|
|
if not bindings:
|
|
click.echo(click.style("No dataset collection bindings found.", fg="red"))
|
|
return
|
|
import qdrant_client
|
|
from qdrant_client.http.exceptions import UnexpectedResponse
|
|
from qdrant_client.http.models import PayloadSchemaType
|
|
|
|
from core.rag.datasource.vdb.qdrant.qdrant_vector import PathQdrantParams, QdrantConfig
|
|
|
|
for binding in bindings:
|
|
if dify_config.QDRANT_URL is None:
|
|
raise ValueError("Qdrant URL is required.")
|
|
qdrant_config = QdrantConfig(
|
|
endpoint=dify_config.QDRANT_URL,
|
|
api_key=dify_config.QDRANT_API_KEY,
|
|
root_path=current_app.root_path,
|
|
timeout=dify_config.QDRANT_CLIENT_TIMEOUT,
|
|
grpc_port=dify_config.QDRANT_GRPC_PORT,
|
|
prefer_grpc=dify_config.QDRANT_GRPC_ENABLED,
|
|
)
|
|
try:
|
|
params = qdrant_config.to_qdrant_params()
|
|
# Check the type before using
|
|
if isinstance(params, PathQdrantParams):
|
|
# PathQdrantParams case
|
|
client = qdrant_client.QdrantClient(path=params.path)
|
|
else:
|
|
# UrlQdrantParams case - params is UrlQdrantParams
|
|
client = qdrant_client.QdrantClient(
|
|
url=params.url,
|
|
api_key=params.api_key,
|
|
timeout=int(params.timeout),
|
|
verify=params.verify,
|
|
grpc_port=params.grpc_port,
|
|
prefer_grpc=params.prefer_grpc,
|
|
)
|
|
# create payload index
|
|
client.create_payload_index(binding.collection_name, field, field_schema=PayloadSchemaType.KEYWORD)
|
|
create_count += 1
|
|
except UnexpectedResponse as e:
|
|
# Collection does not exist, so return
|
|
if e.status_code == 404:
|
|
click.echo(click.style(f"Collection not found: {binding.collection_name}.", fg="red"))
|
|
continue
|
|
# Some other error occurred, so re-raise the exception
|
|
else:
|
|
click.echo(
|
|
click.style(
|
|
f"Failed to create Qdrant index for collection: {binding.collection_name}.", fg="red"
|
|
)
|
|
)
|
|
|
|
except Exception:
|
|
click.echo(click.style("Failed to create Qdrant client.", fg="red"))
|
|
|
|
click.echo(click.style(f"Index creation complete. Created {create_count} collection indexes.", fg="green"))
|
|
|
|
|
|
@click.command("old-metadata-migration", help="Old metadata migration.")
|
|
def old_metadata_migration():
|
|
"""
|
|
Old metadata migration.
|
|
"""
|
|
click.echo(click.style("Starting old metadata migration.", fg="green"))
|
|
|
|
page = 1
|
|
while True:
|
|
try:
|
|
stmt = (
|
|
select(DatasetDocument)
|
|
.where(DatasetDocument.doc_metadata.is_not(None))
|
|
.order_by(DatasetDocument.created_at.desc())
|
|
)
|
|
documents = db.paginate(select=stmt, page=page, per_page=50, max_per_page=50, error_out=False)
|
|
except SQLAlchemyError:
|
|
raise
|
|
if not documents:
|
|
break
|
|
for document in documents:
|
|
if document.doc_metadata:
|
|
doc_metadata = document.doc_metadata
|
|
for key in doc_metadata:
|
|
for field in BuiltInField:
|
|
if field.value == key:
|
|
break
|
|
else:
|
|
dataset_metadata = (
|
|
db.session.query(DatasetMetadata)
|
|
.where(DatasetMetadata.dataset_id == document.dataset_id, DatasetMetadata.name == key)
|
|
.first()
|
|
)
|
|
if not dataset_metadata:
|
|
dataset_metadata = DatasetMetadata(
|
|
tenant_id=document.tenant_id,
|
|
dataset_id=document.dataset_id,
|
|
name=key,
|
|
type="string",
|
|
created_by=document.created_by,
|
|
)
|
|
db.session.add(dataset_metadata)
|
|
db.session.flush()
|
|
dataset_metadata_binding = DatasetMetadataBinding(
|
|
tenant_id=document.tenant_id,
|
|
dataset_id=document.dataset_id,
|
|
metadata_id=dataset_metadata.id,
|
|
document_id=document.id,
|
|
created_by=document.created_by,
|
|
)
|
|
db.session.add(dataset_metadata_binding)
|
|
else:
|
|
dataset_metadata_binding = (
|
|
db.session.query(DatasetMetadataBinding) # type: ignore
|
|
.where(
|
|
DatasetMetadataBinding.dataset_id == document.dataset_id,
|
|
DatasetMetadataBinding.document_id == document.id,
|
|
DatasetMetadataBinding.metadata_id == dataset_metadata.id,
|
|
)
|
|
.first()
|
|
)
|
|
if not dataset_metadata_binding:
|
|
dataset_metadata_binding = DatasetMetadataBinding(
|
|
tenant_id=document.tenant_id,
|
|
dataset_id=document.dataset_id,
|
|
metadata_id=dataset_metadata.id,
|
|
document_id=document.id,
|
|
created_by=document.created_by,
|
|
)
|
|
db.session.add(dataset_metadata_binding)
|
|
db.session.commit()
|
|
page += 1
|
|
click.echo(click.style("Old metadata migration completed.", fg="green"))
|