mirror of
https://github.com/langgenius/dify.git
synced 2026-04-30 07:28:05 +08:00
Merge branch 'feat/r2' into deploy/rag-dev
# Conflicts: # web/i18n/zh-Hans/app.ts
This commit is contained in:
0
api/core/rag/datasource/vdb/matrixone/__init__.py
Normal file
0
api/core/rag/datasource/vdb/matrixone/__init__.py
Normal file
233
api/core/rag/datasource/vdb/matrixone/matrixone_vector.py
Normal file
233
api/core/rag/datasource/vdb/matrixone/matrixone_vector.py
Normal file
@ -0,0 +1,233 @@
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from functools import wraps
|
||||
from typing import Any, Optional
|
||||
|
||||
from mo_vector.client import MoVectorClient # type: ignore
|
||||
from pydantic import BaseModel, model_validator
|
||||
|
||||
from configs import dify_config
|
||||
from core.rag.datasource.vdb.vector_base import BaseVector
|
||||
from core.rag.datasource.vdb.vector_factory import AbstractVectorFactory
|
||||
from core.rag.datasource.vdb.vector_type import VectorType
|
||||
from core.rag.embedding.embedding_base import Embeddings
|
||||
from core.rag.models.document import Document
|
||||
from extensions.ext_redis import redis_client
|
||||
from models.dataset import Dataset
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MatrixoneConfig(BaseModel):
|
||||
host: str = "localhost"
|
||||
port: int = 6001
|
||||
user: str = "dump"
|
||||
password: str = "111"
|
||||
database: str = "dify"
|
||||
metric: str = "l2"
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def validate_config(cls, values: dict) -> dict:
|
||||
if not values["host"]:
|
||||
raise ValueError("config host is required")
|
||||
if not values["port"]:
|
||||
raise ValueError("config port is required")
|
||||
if not values["user"]:
|
||||
raise ValueError("config user is required")
|
||||
if not values["password"]:
|
||||
raise ValueError("config password is required")
|
||||
if not values["database"]:
|
||||
raise ValueError("config database is required")
|
||||
return values
|
||||
|
||||
|
||||
def ensure_client(func):
|
||||
@wraps(func)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
if self.client is None:
|
||||
self.client = self._get_client(None, False)
|
||||
return func(self, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class MatrixoneVector(BaseVector):
|
||||
"""
|
||||
Matrixone vector storage implementation.
|
||||
"""
|
||||
|
||||
def __init__(self, collection_name: str, config: MatrixoneConfig):
|
||||
super().__init__(collection_name)
|
||||
self.config = config
|
||||
self.collection_name = collection_name.lower()
|
||||
self.client = None
|
||||
|
||||
@property
|
||||
def collection_name(self):
|
||||
return self._collection_name
|
||||
|
||||
@collection_name.setter
|
||||
def collection_name(self, value):
|
||||
self._collection_name = value
|
||||
|
||||
def get_type(self) -> str:
|
||||
return VectorType.MATRIXONE
|
||||
|
||||
def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs):
|
||||
if self.client is None:
|
||||
self.client = self._get_client(len(embeddings[0]), True)
|
||||
return self.add_texts(texts, embeddings)
|
||||
|
||||
def _get_client(self, dimension: Optional[int] = None, create_table: bool = False) -> MoVectorClient:
|
||||
"""
|
||||
Create a new client for the collection.
|
||||
|
||||
The collection will be created if it doesn't exist.
|
||||
"""
|
||||
lock_name = f"vector_indexing_lock_{self._collection_name}"
|
||||
with redis_client.lock(lock_name, timeout=20):
|
||||
client = MoVectorClient(
|
||||
connection_string=f"mysql+pymysql://{self.config.user}:{self.config.password}@{self.config.host}:{self.config.port}/{self.config.database}",
|
||||
table_name=self.collection_name,
|
||||
vector_dimension=dimension,
|
||||
create_table=create_table,
|
||||
)
|
||||
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
|
||||
if redis_client.get(collection_exist_cache_key):
|
||||
return client
|
||||
try:
|
||||
client.create_full_text_index()
|
||||
except Exception as e:
|
||||
logger.exception("Failed to create full text index")
|
||||
redis_client.set(collection_exist_cache_key, 1, ex=3600)
|
||||
return client
|
||||
|
||||
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):
|
||||
if self.client is None:
|
||||
self.client = self._get_client(len(embeddings[0]), True)
|
||||
assert self.client is not None
|
||||
ids = []
|
||||
for _, doc in enumerate(documents):
|
||||
if doc.metadata is not None:
|
||||
doc_id = doc.metadata.get("doc_id", str(uuid.uuid4()))
|
||||
ids.append(doc_id)
|
||||
self.client.insert(
|
||||
texts=[doc.page_content for doc in documents],
|
||||
embeddings=embeddings,
|
||||
metadatas=[doc.metadata for doc in documents],
|
||||
ids=ids,
|
||||
)
|
||||
return ids
|
||||
|
||||
@ensure_client
|
||||
def text_exists(self, id: str) -> bool:
|
||||
assert self.client is not None
|
||||
result = self.client.get(ids=[id])
|
||||
return len(result) > 0
|
||||
|
||||
@ensure_client
|
||||
def delete_by_ids(self, ids: list[str]) -> None:
|
||||
assert self.client is not None
|
||||
if not ids:
|
||||
return
|
||||
self.client.delete(ids=ids)
|
||||
|
||||
@ensure_client
|
||||
def get_ids_by_metadata_field(self, key: str, value: str):
|
||||
assert self.client is not None
|
||||
results = self.client.query_by_metadata(filter={key: value})
|
||||
return [result.id for result in results]
|
||||
|
||||
@ensure_client
|
||||
def delete_by_metadata_field(self, key: str, value: str) -> None:
|
||||
assert self.client is not None
|
||||
self.client.delete(filter={key: value})
|
||||
|
||||
@ensure_client
|
||||
def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]:
|
||||
assert self.client is not None
|
||||
top_k = kwargs.get("top_k", 5)
|
||||
document_ids_filter = kwargs.get("document_ids_filter")
|
||||
filter = None
|
||||
if document_ids_filter:
|
||||
filter = {"document_id": {"$in": document_ids_filter}}
|
||||
|
||||
results = self.client.query(
|
||||
query_vector=query_vector,
|
||||
k=top_k,
|
||||
filter=filter,
|
||||
)
|
||||
|
||||
docs = []
|
||||
# TODO: add the score threshold to the query
|
||||
for result in results:
|
||||
metadata = result.metadata
|
||||
docs.append(
|
||||
Document(
|
||||
page_content=result.document,
|
||||
metadata=metadata,
|
||||
)
|
||||
)
|
||||
return docs
|
||||
|
||||
@ensure_client
|
||||
def search_by_full_text(self, query: str, **kwargs: Any) -> list[Document]:
|
||||
assert self.client is not None
|
||||
top_k = kwargs.get("top_k", 5)
|
||||
document_ids_filter = kwargs.get("document_ids_filter")
|
||||
filter = None
|
||||
if document_ids_filter:
|
||||
filter = {"document_id": {"$in": document_ids_filter}}
|
||||
score_threshold = float(kwargs.get("score_threshold", 0.0))
|
||||
|
||||
results = self.client.full_text_query(
|
||||
keywords=[query],
|
||||
k=top_k,
|
||||
filter=filter,
|
||||
)
|
||||
|
||||
docs = []
|
||||
for result in results:
|
||||
metadata = result.metadata
|
||||
if isinstance(metadata, str):
|
||||
import json
|
||||
|
||||
metadata = json.loads(metadata)
|
||||
score = 1 - result.distance
|
||||
if score >= score_threshold:
|
||||
metadata["score"] = score
|
||||
docs.append(
|
||||
Document(
|
||||
page_content=result.document,
|
||||
metadata=metadata,
|
||||
)
|
||||
)
|
||||
return docs
|
||||
|
||||
@ensure_client
|
||||
def delete(self) -> None:
|
||||
assert self.client is not None
|
||||
self.client.delete()
|
||||
|
||||
|
||||
class MatrixoneVectorFactory(AbstractVectorFactory):
|
||||
def init_vector(self, dataset: Dataset, attributes: list, embeddings: Embeddings) -> MatrixoneVector:
|
||||
if dataset.index_struct_dict:
|
||||
class_prefix: str = dataset.index_struct_dict["vector_store"]["class_prefix"]
|
||||
collection_name = class_prefix
|
||||
else:
|
||||
dataset_id = dataset.id
|
||||
collection_name = Dataset.gen_collection_name_by_id(dataset_id)
|
||||
dataset.index_struct = json.dumps(self.gen_index_struct_dict(VectorType.MATRIXONE, collection_name))
|
||||
|
||||
config = MatrixoneConfig(
|
||||
host=dify_config.MATRIXONE_HOST or "localhost",
|
||||
port=dify_config.MATRIXONE_PORT or 6001,
|
||||
user=dify_config.MATRIXONE_USER or "dump",
|
||||
password=dify_config.MATRIXONE_PASSWORD or "111",
|
||||
database=dify_config.MATRIXONE_DATABASE or "dify",
|
||||
metric=dify_config.MATRIXONE_METRIC or "l2",
|
||||
)
|
||||
return MatrixoneVector(collection_name=collection_name, config=config)
|
||||
@ -164,6 +164,10 @@ class Vector:
|
||||
from core.rag.datasource.vdb.huawei.huawei_cloud_vector import HuaweiCloudVectorFactory
|
||||
|
||||
return HuaweiCloudVectorFactory
|
||||
case VectorType.MATRIXONE:
|
||||
from core.rag.datasource.vdb.matrixone.matrixone_vector import MatrixoneVectorFactory
|
||||
|
||||
return MatrixoneVectorFactory
|
||||
case _:
|
||||
raise ValueError(f"Vector store {vector_type} is not supported.")
|
||||
|
||||
|
||||
@ -29,3 +29,4 @@ class VectorType(StrEnum):
|
||||
OPENGAUSS = "opengauss"
|
||||
TABLESTORE = "tablestore"
|
||||
HUAWEI_CLOUD = "huawei_cloud"
|
||||
MATRIXONE = "matrixone"
|
||||
|
||||
@ -45,7 +45,8 @@ class WeaviateVector(BaseVector):
|
||||
# by changing the connection timeout to pypi.org from 1 second to 0.001 seconds.
|
||||
# TODO: This can be removed once weaviate-client is updated to 3.26.7 or higher,
|
||||
# which does not contain the deprecation check.
|
||||
weaviate.connect.connection.PYPI_TIMEOUT = 0.001
|
||||
if hasattr(weaviate.connect.connection, "PYPI_TIMEOUT"):
|
||||
weaviate.connect.connection.PYPI_TIMEOUT = 0.001
|
||||
|
||||
try:
|
||||
client = weaviate.Client(
|
||||
|
||||
@ -22,6 +22,7 @@ class FirecrawlApp:
|
||||
"formats": ["markdown"],
|
||||
"onlyMainContent": True,
|
||||
"timeout": 30000,
|
||||
"integration": "dify",
|
||||
}
|
||||
if params:
|
||||
json_data.update(params)
|
||||
@ -39,7 +40,7 @@ class FirecrawlApp:
|
||||
def crawl_url(self, url, params=None) -> str:
|
||||
# Documentation: https://docs.firecrawl.dev/api-reference/endpoint/crawl-post
|
||||
headers = self._prepare_headers()
|
||||
json_data = {"url": url}
|
||||
json_data = {"url": url, "integration": "dify"}
|
||||
if params:
|
||||
json_data.update(params)
|
||||
response = self._post_request(f"{self.base_url}/v1/crawl", json_data, headers)
|
||||
@ -49,7 +50,6 @@ class FirecrawlApp:
|
||||
return cast(str, job_id)
|
||||
else:
|
||||
self._handle_error(response, "start crawl job")
|
||||
# FIXME: unreachable code for mypy
|
||||
return "" # unreachable
|
||||
|
||||
def check_crawl_status(self, job_id) -> dict[str, Any]:
|
||||
@ -82,7 +82,6 @@ class FirecrawlApp:
|
||||
)
|
||||
else:
|
||||
self._handle_error(response, "check crawl status")
|
||||
# FIXME: unreachable code for mypy
|
||||
return {} # unreachable
|
||||
|
||||
def _format_crawl_status_response(
|
||||
@ -126,4 +125,31 @@ class FirecrawlApp:
|
||||
|
||||
def _handle_error(self, response, action) -> None:
|
||||
error_message = response.json().get("error", "Unknown error occurred")
|
||||
raise Exception(f"Failed to {action}. Status code: {response.status_code}. Error: {error_message}")
|
||||
raise Exception(f"Failed to {action}. Status code: {response.status_code}. Error: {error_message}") # type: ignore[return]
|
||||
|
||||
def search(self, query: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
|
||||
# Documentation: https://docs.firecrawl.dev/api-reference/endpoint/search
|
||||
headers = self._prepare_headers()
|
||||
json_data = {
|
||||
"query": query,
|
||||
"limit": 5,
|
||||
"lang": "en",
|
||||
"country": "us",
|
||||
"timeout": 60000,
|
||||
"ignoreInvalidURLs": False,
|
||||
"scrapeOptions": {},
|
||||
"integration": "dify",
|
||||
}
|
||||
if params:
|
||||
json_data.update(params)
|
||||
response = self._post_request(f"{self.base_url}/v1/search", json_data, headers)
|
||||
if response.status_code == 200:
|
||||
response_data = response.json()
|
||||
if not response_data.get("success"):
|
||||
raise Exception(f"Search failed. Error: {response_data.get('warning', 'Unknown error')}")
|
||||
return cast(dict[str, Any], response_data)
|
||||
elif response.status_code in {402, 409, 500, 429, 408}:
|
||||
self._handle_error(response, "perform search")
|
||||
return {} # Avoid additional exception after handling error
|
||||
else:
|
||||
raise Exception(f"Failed to perform search. Status code: {response.status_code}")
|
||||
|
||||
@ -107,7 +107,7 @@ class QAIndexProcessor(BaseIndexProcessor):
|
||||
|
||||
def format_by_template(self, file: FileStorage, **kwargs) -> list[Document]:
|
||||
# check file type
|
||||
if not file.filename or not file.filename.endswith(".csv"):
|
||||
if not file.filename or not file.filename.lower().endswith(".csv"):
|
||||
raise ValueError("Invalid file type. Only CSV files are allowed")
|
||||
|
||||
try:
|
||||
|
||||
@ -496,6 +496,8 @@ class DatasetRetrieval:
|
||||
all_documents = self.calculate_keyword_score(query, all_documents, top_k)
|
||||
elif index_type == "high_quality":
|
||||
all_documents = self.calculate_vector_score(all_documents, top_k, score_threshold)
|
||||
else:
|
||||
all_documents = all_documents[:top_k] if top_k else all_documents
|
||||
|
||||
self._on_query(query, dataset_ids, app_id, user_from, user_id)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user