Feat/plugins (#12547)

Co-authored-by: AkaraChen <akarachen@outlook.com>
Co-authored-by: Yi <yxiaoisme@gmail.com>
Co-authored-by: Joel <iamjoel007@gmail.com>
Co-authored-by: JzoNg <jzongcode@gmail.com>
Co-authored-by: twwu <twwu@dify.ai>
Co-authored-by: kurokobo <kuro664@gmail.com>
Co-authored-by: Hiroshi Fujita <fujita-h@users.noreply.github.com>
This commit is contained in:
zxhlyh
2025-01-09 18:47:41 +08:00
committed by GitHub
parent e4c4490175
commit 3c014f3ae5
719 changed files with 48585 additions and 8553 deletions

View File

@ -17,7 +17,8 @@ from extensions.ext_redis import redis_client
from models.dataset import Dataset
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s")
logging.getLogger("lindorm").setLevel(logging.WARN)
ROUTING_FIELD = "routing_field"
@ -134,7 +135,8 @@ class LindormVectorStore(BaseVector):
self._client.delete(index=self._collection_name, id=id, params=params)
self.refresh()
else:
logger.warning(f"DELETE BY ID: ID {id} does not exist in the index.")
logger.warning(
f"DELETE BY ID: ID {id} does not exist in the index.")
def delete(self) -> None:
if self._using_ugc:
@ -145,7 +147,8 @@ class LindormVectorStore(BaseVector):
self.refresh()
else:
if self._client.indices.exists(index=self._collection_name):
self._client.indices.delete(index=self._collection_name, params={"timeout": 60})
self._client.indices.delete(
index=self._collection_name, params={"timeout": 60})
logger.info("Delete index success")
else:
logger.warning(f"Index '{self._collection_name}' does not exist. No deletion performed.")
@ -168,7 +171,8 @@ class LindormVectorStore(BaseVector):
raise ValueError("All elements in query_vector should be floats")
top_k = kwargs.get("top_k", 10)
query = default_vector_search_query(query_vector=query_vector, k=top_k, **kwargs)
query = default_vector_search_query(
query_vector=query_vector, k=top_k, **kwargs)
try:
params = {}
if self._using_ugc:
@ -220,7 +224,8 @@ class LindormVectorStore(BaseVector):
routing=routing,
routing_field=self._routing_field,
)
response = self._client.search(index=self._collection_name, body=full_text_query)
response = self._client.search(
index=self._collection_name, body=full_text_query)
docs = []
for hit in response["hits"]["hits"]:
docs.append(
@ -238,7 +243,8 @@ class LindormVectorStore(BaseVector):
with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key):
logger.info(f"Collection {self._collection_name} already exists.")
logger.info(
f"Collection {self._collection_name} already exists.")
return
if self._client.indices.exists(index=self._collection_name):
logger.info(f"{self._collection_name.lower()} already exists.")
@ -258,10 +264,13 @@ class LindormVectorStore(BaseVector):
hnsw_ef_construction = kwargs.pop("hnsw_ef_construction", 500)
ivfpq_m = kwargs.pop("ivfpq_m", dimension)
nlist = kwargs.pop("nlist", 1000)
centroids_use_hnsw = kwargs.pop("centroids_use_hnsw", True if nlist >= 5000 else False)
centroids_use_hnsw = kwargs.pop(
"centroids_use_hnsw", True if nlist >= 5000 else False)
centroids_hnsw_m = kwargs.pop("centroids_hnsw_m", 24)
centroids_hnsw_ef_construct = kwargs.pop("centroids_hnsw_ef_construct", 500)
centroids_hnsw_ef_search = kwargs.pop("centroids_hnsw_ef_search", 100)
centroids_hnsw_ef_construct = kwargs.pop(
"centroids_hnsw_ef_construct", 500)
centroids_hnsw_ef_search = kwargs.pop(
"centroids_hnsw_ef_search", 100)
mapping = default_text_mapping(
dimension,
method_name,
@ -281,7 +290,8 @@ class LindormVectorStore(BaseVector):
using_ugc=self._using_ugc,
**kwargs,
)
self._client.indices.create(index=self._collection_name.lower(), body=mapping)
self._client.indices.create(
index=self._collection_name.lower(), body=mapping)
redis_client.set(collection_exist_cache_key, 1, ex=3600)
# logger.info(f"create index success: {self._collection_name}")
@ -347,7 +357,8 @@ def default_text_mapping(dimension: int, method_name: str, **kwargs: Any) -> dic
}
if excludes_from_source:
mapping["mappings"]["_source"] = {"excludes": excludes_from_source} # e.g. {"excludes": ["vector_field"]}
# e.g. {"excludes": ["vector_field"]}
mapping["mappings"]["_source"] = {"excludes": excludes_from_source}
if using_ugc and method_name == "ivfpq":
mapping["settings"]["index"]["knn_routing"] = True
@ -385,7 +396,8 @@ def default_text_search_query(
# build complex search_query when either of must/must_not/should/filter is specified
if must:
if not isinstance(must, list):
raise RuntimeError(f"unexpected [must] clause with {type(filters)}")
raise RuntimeError(
f"unexpected [must] clause with {type(filters)}")
if query_clause not in must:
must.append(query_clause)
else:
@ -395,19 +407,22 @@ def default_text_search_query(
if must_not:
if not isinstance(must_not, list):
raise RuntimeError(f"unexpected [must_not] clause with {type(filters)}")
raise RuntimeError(
f"unexpected [must_not] clause with {type(filters)}")
boolean_query["must_not"] = must_not
if should:
if not isinstance(should, list):
raise RuntimeError(f"unexpected [should] clause with {type(filters)}")
raise RuntimeError(
f"unexpected [should] clause with {type(filters)}")
boolean_query["should"] = should
if minimum_should_match != 0:
boolean_query["minimum_should_match"] = minimum_should_match
if filters:
if not isinstance(filters, list):
raise RuntimeError(f"unexpected [filter] clause with {type(filters)}")
raise RuntimeError(
f"unexpected [filter] clause with {type(filters)}")
boolean_query["filter"] = filters
search_query = {"size": k, "query": {"bool": boolean_query}}