mirror of
https://github.com/langgenius/dify.git
synced 2026-05-05 18:08:07 +08:00
merge feat/plugins
This commit is contained in:
@ -54,10 +54,12 @@ def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
|
||||
if response.status_code not in STATUS_FORCELIST:
|
||||
return response
|
||||
else:
|
||||
logging.warning(f"Received status code {response.status_code} for URL {url} which is in the force list")
|
||||
logging.warning(
|
||||
f"Received status code {response.status_code} for URL {url} which is in the force list")
|
||||
|
||||
except httpx.RequestError as e:
|
||||
logging.warning(f"Request to URL {url} failed on attempt {retries + 1}: {e}")
|
||||
logging.warning(
|
||||
f"Request to URL {url} failed on attempt {retries + 1}: {e}")
|
||||
|
||||
retries += 1
|
||||
if retries <= max_retries:
|
||||
|
||||
@ -20,7 +20,8 @@ from extensions.ext_redis import redis_client
|
||||
from models.dataset import Dataset
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
|
||||
logging.basicConfig(level=logging.INFO,
|
||||
format="%(asctime)s - %(levelname)s - %(message)s")
|
||||
logging.getLogger("lindorm").setLevel(logging.WARN)
|
||||
|
||||
|
||||
@ -76,7 +77,8 @@ class LindormVectorStore(BaseVector):
|
||||
@retry(stop=stop_after_attempt(3), wait=wait_fixed(60))
|
||||
def __fetch_existing_ids(batch_ids: list[str]) -> set[str]:
|
||||
try:
|
||||
existing_docs = self._client.mget(index=self._collection_name, body={"ids": batch_ids}, _source=False)
|
||||
existing_docs = self._client.mget(index=self._collection_name, body={
|
||||
"ids": batch_ids}, _source=False)
|
||||
return {doc["_id"] for doc in existing_docs["docs"] if doc["found"]}
|
||||
except Exception as e:
|
||||
logger.exception(f"Error fetching batch {batch_ids}: {e}")
|
||||
@ -88,7 +90,8 @@ class LindormVectorStore(BaseVector):
|
||||
existing_docs = self._client.mget(
|
||||
body={
|
||||
"docs": [
|
||||
{"_index": self._collection_name, "_id": id, "routing": routing}
|
||||
{"_index": self._collection_name,
|
||||
"_id": id, "routing": routing}
|
||||
for id, routing in zip(batch_ids, route_ids)
|
||||
]
|
||||
},
|
||||
@ -112,12 +115,13 @@ class LindormVectorStore(BaseVector):
|
||||
def batch(iterable, n):
|
||||
length = len(iterable)
|
||||
for idx in range(0, length, n):
|
||||
yield iterable[idx : min(idx + n, length)]
|
||||
yield iterable[idx: min(idx + n, length)]
|
||||
|
||||
for ids_batch, texts_batch, metadatas_batch in zip(
|
||||
batch(ids, bulk_size),
|
||||
batch(texts, bulk_size),
|
||||
batch(metadatas, bulk_size) if metadatas is not None else batch([None] * len(ids), bulk_size),
|
||||
batch(metadatas, bulk_size) if metadatas is not None else batch(
|
||||
[None] * len(ids), bulk_size),
|
||||
):
|
||||
existing_ids_set = __fetch_existing_ids(ids_batch)
|
||||
for text, metadata, doc_id in zip(texts_batch, metadatas_batch, ids_batch):
|
||||
@ -139,7 +143,8 @@ class LindormVectorStore(BaseVector):
|
||||
"_id": uuids[i],
|
||||
"_source": {
|
||||
Field.CONTENT_KEY.value: documents[i].page_content,
|
||||
Field.VECTOR.value: embeddings[i], # Make sure you pass an array here
|
||||
# Make sure you pass an array here
|
||||
Field.VECTOR.value: embeddings[i],
|
||||
Field.METADATA_KEY.value: documents[i].metadata,
|
||||
},
|
||||
}
|
||||
@ -148,7 +153,8 @@ class LindormVectorStore(BaseVector):
|
||||
self.refresh()
|
||||
|
||||
def get_ids_by_metadata_field(self, key: str, value: str):
|
||||
query = {"query": {"term": {f"{Field.METADATA_KEY.value}.{key}.keyword": value}}}
|
||||
query = {
|
||||
"query": {"term": {f"{Field.METADATA_KEY.value}.{key}.keyword": value}}}
|
||||
response = self._client.search(index=self._collection_name, body=query)
|
||||
if response["hits"]["hits"]:
|
||||
return [hit["_id"] for hit in response["hits"]["hits"]]
|
||||
@ -157,7 +163,8 @@ class LindormVectorStore(BaseVector):
|
||||
|
||||
def delete_by_metadata_field(self, key: str, value: str):
|
||||
query_str = {"query": {"match": {f"metadata.{key}": f"{value}"}}}
|
||||
results = self._client.search(index=self._collection_name, body=query_str)
|
||||
results = self._client.search(
|
||||
index=self._collection_name, body=query_str)
|
||||
ids = [hit["_id"] for hit in results["hits"]["hits"]]
|
||||
if ids:
|
||||
self.delete_by_ids(ids)
|
||||
@ -167,15 +174,18 @@ class LindormVectorStore(BaseVector):
|
||||
if self._client.exists(index=self._collection_name, id=id):
|
||||
self._client.delete(index=self._collection_name, id=id)
|
||||
else:
|
||||
logger.warning(f"DELETE BY ID: ID {id} does not exist in the index.")
|
||||
logger.warning(
|
||||
f"DELETE BY ID: ID {id} does not exist in the index.")
|
||||
|
||||
def delete(self) -> None:
|
||||
try:
|
||||
if self._client.indices.exists(index=self._collection_name):
|
||||
self._client.indices.delete(index=self._collection_name, params={"timeout": 60})
|
||||
self._client.indices.delete(
|
||||
index=self._collection_name, params={"timeout": 60})
|
||||
logger.info("Delete index success")
|
||||
else:
|
||||
logger.warning(f"Index '{self._collection_name}' does not exist. No deletion performed.")
|
||||
logger.warning(
|
||||
f"Index '{self._collection_name}' does not exist. No deletion performed.")
|
||||
except Exception as e:
|
||||
logger.exception(f"Error occurred while deleting the index: {e}")
|
||||
raise e
|
||||
@ -197,9 +207,11 @@ class LindormVectorStore(BaseVector):
|
||||
raise ValueError("All elements in query_vector should be floats")
|
||||
|
||||
top_k = kwargs.get("top_k", 10)
|
||||
query = default_vector_search_query(query_vector=query_vector, k=top_k, **kwargs)
|
||||
query = default_vector_search_query(
|
||||
query_vector=query_vector, k=top_k, **kwargs)
|
||||
try:
|
||||
response = self._client.search(index=self._collection_name, body=query)
|
||||
response = self._client.search(
|
||||
index=self._collection_name, body=query)
|
||||
except Exception as e:
|
||||
logger.exception(f"Error executing search: {e}")
|
||||
raise
|
||||
@ -244,7 +256,8 @@ class LindormVectorStore(BaseVector):
|
||||
filters=filters,
|
||||
routing=routing,
|
||||
)
|
||||
response = self._client.search(index=self._collection_name, body=full_text_query)
|
||||
response = self._client.search(
|
||||
index=self._collection_name, body=full_text_query)
|
||||
docs = []
|
||||
for hit in response["hits"]["hits"]:
|
||||
docs.append(
|
||||
@ -262,7 +275,8 @@ class LindormVectorStore(BaseVector):
|
||||
with redis_client.lock(lock_name, timeout=20):
|
||||
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
|
||||
if redis_client.get(collection_exist_cache_key):
|
||||
logger.info(f"Collection {self._collection_name} already exists.")
|
||||
logger.info(
|
||||
f"Collection {self._collection_name} already exists.")
|
||||
return
|
||||
if self._client.indices.exists(index=self._collection_name):
|
||||
logger.info("{self._collection_name.lower()} already exists.")
|
||||
@ -281,10 +295,13 @@ class LindormVectorStore(BaseVector):
|
||||
hnsw_ef_construction = kwargs.pop("hnsw_ef_construction", 500)
|
||||
ivfpq_m = kwargs.pop("ivfpq_m", dimension)
|
||||
nlist = kwargs.pop("nlist", 1000)
|
||||
centroids_use_hnsw = kwargs.pop("centroids_use_hnsw", True if nlist >= 5000 else False)
|
||||
centroids_use_hnsw = kwargs.pop(
|
||||
"centroids_use_hnsw", True if nlist >= 5000 else False)
|
||||
centroids_hnsw_m = kwargs.pop("centroids_hnsw_m", 24)
|
||||
centroids_hnsw_ef_construct = kwargs.pop("centroids_hnsw_ef_construct", 500)
|
||||
centroids_hnsw_ef_search = kwargs.pop("centroids_hnsw_ef_search", 100)
|
||||
centroids_hnsw_ef_construct = kwargs.pop(
|
||||
"centroids_hnsw_ef_construct", 500)
|
||||
centroids_hnsw_ef_search = kwargs.pop(
|
||||
"centroids_hnsw_ef_search", 100)
|
||||
mapping = default_text_mapping(
|
||||
dimension,
|
||||
method_name,
|
||||
@ -303,7 +320,8 @@ class LindormVectorStore(BaseVector):
|
||||
centroids_hnsw_ef_search=centroids_hnsw_ef_search,
|
||||
**kwargs,
|
||||
)
|
||||
self._client.indices.create(index=self._collection_name.lower(), body=mapping)
|
||||
self._client.indices.create(
|
||||
index=self._collection_name.lower(), body=mapping)
|
||||
redis_client.set(collection_exist_cache_key, 1, ex=3600)
|
||||
# logger.info(f"create index success: {self._collection_name}")
|
||||
|
||||
@ -364,7 +382,8 @@ def default_text_mapping(dimension: int, method_name: str, **kwargs: Any) -> dic
|
||||
}
|
||||
|
||||
if excludes_from_source:
|
||||
mapping["mappings"]["_source"] = {"excludes": excludes_from_source} # e.g. {"excludes": ["vector_field"]}
|
||||
# e.g. {"excludes": ["vector_field"]}
|
||||
mapping["mappings"]["_source"] = {"excludes": excludes_from_source}
|
||||
|
||||
if method_name == "ivfpq" and routing_field is not None:
|
||||
mapping["settings"]["index"]["knn_routing"] = True
|
||||
@ -405,7 +424,8 @@ def default_text_search_query(
|
||||
# build complex search_query when either of must/must_not/should/filter is specified
|
||||
if must:
|
||||
if not isinstance(must, list):
|
||||
raise RuntimeError(f"unexpected [must] clause with {type(filters)}")
|
||||
raise RuntimeError(
|
||||
f"unexpected [must] clause with {type(filters)}")
|
||||
if query_clause not in must:
|
||||
must.append(query_clause)
|
||||
else:
|
||||
@ -415,19 +435,22 @@ def default_text_search_query(
|
||||
|
||||
if must_not:
|
||||
if not isinstance(must_not, list):
|
||||
raise RuntimeError(f"unexpected [must_not] clause with {type(filters)}")
|
||||
raise RuntimeError(
|
||||
f"unexpected [must_not] clause with {type(filters)}")
|
||||
boolean_query["must_not"] = must_not
|
||||
|
||||
if should:
|
||||
if not isinstance(should, list):
|
||||
raise RuntimeError(f"unexpected [should] clause with {type(filters)}")
|
||||
raise RuntimeError(
|
||||
f"unexpected [should] clause with {type(filters)}")
|
||||
boolean_query["should"] = should
|
||||
if minimum_should_match != 0:
|
||||
boolean_query["minimum_should_match"] = minimum_should_match
|
||||
|
||||
if filters:
|
||||
if not isinstance(filters, list):
|
||||
raise RuntimeError(f"unexpected [filter] clause with {type(filters)}")
|
||||
raise RuntimeError(
|
||||
f"unexpected [filter] clause with {type(filters)}")
|
||||
boolean_query["filter"] = filters
|
||||
|
||||
search_query = {"size": k, "query": {"bool": boolean_query}}
|
||||
@ -471,8 +494,10 @@ def default_vector_search_query(
|
||||
|
||||
if filters is not None:
|
||||
# when using filter, transform filter from List[Dict] to Dict as valid format
|
||||
filters = {"bool": {"must": filters}} if len(filters) > 1 else filters[0]
|
||||
search_query["query"]["knn"][vector_field]["filter"] = filters # filter should be Dict
|
||||
filters = {"bool": {"must": filters}} if len(
|
||||
filters) > 1 else filters[0]
|
||||
# filter should be Dict
|
||||
search_query["query"]["knn"][vector_field]["filter"] = filters
|
||||
if filter_type:
|
||||
final_ext["lvector"]["filter_type"] = filter_type
|
||||
|
||||
@ -489,7 +514,8 @@ class LindormVectorStoreFactory(AbstractVectorFactory):
|
||||
else:
|
||||
dataset_id = dataset.id
|
||||
collection_name = Dataset.gen_collection_name_by_id(dataset_id)
|
||||
dataset.index_struct = json.dumps(self.gen_index_struct_dict(VectorType.LINDORM, collection_name))
|
||||
dataset.index_struct = json.dumps(
|
||||
self.gen_index_struct_dict(VectorType.LINDORM, collection_name))
|
||||
lindorm_config = LindormVectorStoreConfig(
|
||||
hosts=dify_config.LINDORM_URL,
|
||||
username=dify_config.LINDORM_USERNAME,
|
||||
|
||||
@ -49,11 +49,13 @@ class QuestionClassifierNode(LLMNode):
|
||||
variable_pool = self.graph_runtime_state.variable_pool
|
||||
|
||||
# extract variables
|
||||
variable = variable_pool.get(node_data.query_variable_selector) if node_data.query_variable_selector else None
|
||||
variable = variable_pool.get(
|
||||
node_data.query_variable_selector) if node_data.query_variable_selector else None
|
||||
query = variable.value if variable else None
|
||||
variables = {"query": query}
|
||||
# fetch model config
|
||||
model_instance, model_config = self._fetch_model_config(node_data.model)
|
||||
model_instance, model_config = self._fetch_model_config(
|
||||
node_data.model)
|
||||
# fetch memory
|
||||
memory = self._fetch_memory(
|
||||
node_data_memory=node_data.memory,
|
||||
@ -61,7 +63,8 @@ class QuestionClassifierNode(LLMNode):
|
||||
)
|
||||
# fetch instruction
|
||||
node_data.instruction = node_data.instruction or ""
|
||||
node_data.instruction = variable_pool.convert_template(node_data.instruction).text
|
||||
node_data.instruction = variable_pool.convert_template(
|
||||
node_data.instruction).text
|
||||
|
||||
files: Sequence[File] = (
|
||||
self._fetch_files(
|
||||
@ -184,12 +187,15 @@ class QuestionClassifierNode(LLMNode):
|
||||
variable_mapping = {"query": node_data.query_variable_selector}
|
||||
variable_selectors = []
|
||||
if node_data.instruction:
|
||||
variable_template_parser = VariableTemplateParser(template=node_data.instruction)
|
||||
variable_selectors.extend(variable_template_parser.extract_variable_selectors())
|
||||
variable_template_parser = VariableTemplateParser(
|
||||
template=node_data.instruction)
|
||||
variable_selectors.extend(
|
||||
variable_template_parser.extract_variable_selectors())
|
||||
for variable_selector in variable_selectors:
|
||||
variable_mapping[variable_selector.variable] = variable_selector.value_selector
|
||||
|
||||
variable_mapping = {node_id + "." + key: value for key, value in variable_mapping.items()}
|
||||
variable_mapping = {node_id + "." + key: value for key,
|
||||
value in variable_mapping.items()}
|
||||
|
||||
return variable_mapping
|
||||
|
||||
@ -210,7 +216,8 @@ class QuestionClassifierNode(LLMNode):
|
||||
context: Optional[str],
|
||||
) -> int:
|
||||
prompt_transform = AdvancedPromptTransform(with_variable_tmpl=True)
|
||||
prompt_template = self._get_prompt_template(node_data, query, None, 2000)
|
||||
prompt_template = self._get_prompt_template(
|
||||
node_data, query, None, 2000)
|
||||
prompt_messages = prompt_transform.get_prompt(
|
||||
prompt_template=prompt_template,
|
||||
inputs={},
|
||||
@ -223,13 +230,15 @@ class QuestionClassifierNode(LLMNode):
|
||||
)
|
||||
rest_tokens = 2000
|
||||
|
||||
model_context_tokens = model_config.model_schema.model_properties.get(ModelPropertyKey.CONTEXT_SIZE)
|
||||
model_context_tokens = model_config.model_schema.model_properties.get(
|
||||
ModelPropertyKey.CONTEXT_SIZE)
|
||||
if model_context_tokens:
|
||||
model_instance = ModelInstance(
|
||||
provider_model_bundle=model_config.provider_model_bundle, model=model_config.model
|
||||
)
|
||||
|
||||
curr_message_tokens = model_instance.get_llm_num_tokens(prompt_messages)
|
||||
curr_message_tokens = model_instance.get_llm_num_tokens(
|
||||
prompt_messages)
|
||||
|
||||
max_tokens = 0
|
||||
for parameter_rule in model_config.model_schema.parameter_rules:
|
||||
@ -270,7 +279,8 @@ class QuestionClassifierNode(LLMNode):
|
||||
prompt_messages: list[LLMNodeChatModelMessage] = []
|
||||
if model_mode == ModelMode.CHAT:
|
||||
system_prompt_messages = LLMNodeChatModelMessage(
|
||||
role=PromptMessageRole.SYSTEM, text=QUESTION_CLASSIFIER_SYSTEM_PROMPT.format(histories=memory_str)
|
||||
role=PromptMessageRole.SYSTEM, text=QUESTION_CLASSIFIER_SYSTEM_PROMPT.format(
|
||||
histories=memory_str)
|
||||
)
|
||||
prompt_messages.append(system_prompt_messages)
|
||||
user_prompt_message_1 = LLMNodeChatModelMessage(
|
||||
@ -311,4 +321,5 @@ class QuestionClassifierNode(LLMNode):
|
||||
)
|
||||
|
||||
else:
|
||||
raise InvalidModelTypeError(f"Model mode {model_mode} not support.")
|
||||
raise InvalidModelTypeError(
|
||||
f"Model mode {model_mode} not support.")
|
||||
|
||||
@ -68,7 +68,8 @@ def test_executor_with_json_body_and_object_variable():
|
||||
system_variables={},
|
||||
user_inputs={},
|
||||
)
|
||||
variable_pool.add(["pre_node_id", "object"], {"name": "John Doe", "age": 30, "email": "john@example.com"})
|
||||
variable_pool.add(["pre_node_id", "object"], {
|
||||
"name": "John Doe", "age": 30, "email": "john@example.com"})
|
||||
|
||||
# Prepare the node data
|
||||
node_data = HttpRequestNodeData(
|
||||
@ -102,7 +103,8 @@ def test_executor_with_json_body_and_object_variable():
|
||||
assert executor.url == "https://api.example.com/data"
|
||||
assert executor.headers == {"Content-Type": "application/json"}
|
||||
assert executor.params == {}
|
||||
assert executor.json == {"name": "John Doe", "age": 30, "email": "john@example.com"}
|
||||
assert executor.json == {"name": "John Doe",
|
||||
"age": 30, "email": "john@example.com"}
|
||||
assert executor.data is None
|
||||
assert executor.files is None
|
||||
assert executor.content is None
|
||||
@ -123,7 +125,8 @@ def test_executor_with_json_body_and_nested_object_variable():
|
||||
system_variables={},
|
||||
user_inputs={},
|
||||
)
|
||||
variable_pool.add(["pre_node_id", "object"], {"name": "John Doe", "age": 30, "email": "john@example.com"})
|
||||
variable_pool.add(["pre_node_id", "object"], {
|
||||
"name": "John Doe", "age": 30, "email": "john@example.com"})
|
||||
|
||||
# Prepare the node data
|
||||
node_data = HttpRequestNodeData(
|
||||
@ -157,7 +160,8 @@ def test_executor_with_json_body_and_nested_object_variable():
|
||||
assert executor.url == "https://api.example.com/data"
|
||||
assert executor.headers == {"Content-Type": "application/json"}
|
||||
assert executor.params == {}
|
||||
assert executor.json == {"object": {"name": "John Doe", "age": 30, "email": "john@example.com"}}
|
||||
assert executor.json == {"object": {
|
||||
"name": "John Doe", "age": 30, "email": "john@example.com"}}
|
||||
assert executor.data is None
|
||||
assert executor.files is None
|
||||
assert executor.content is None
|
||||
|
||||
@ -23,7 +23,8 @@ def test_plain_text_to_dict():
|
||||
assert _plain_text_to_dict("aa\n cc:") == {"aa": "", "cc": ""}
|
||||
assert _plain_text_to_dict("aa:bb\n cc:dd") == {"aa": "bb", "cc": "dd"}
|
||||
assert _plain_text_to_dict("aa:bb\n cc:dd\n") == {"aa": "bb", "cc": "dd"}
|
||||
assert _plain_text_to_dict("aa:bb\n\n cc : dd\n\n") == {"aa": "bb", "cc": "dd"}
|
||||
assert _plain_text_to_dict("aa:bb\n\n cc : dd\n\n") == {
|
||||
"aa": "bb", "cc": "dd"}
|
||||
|
||||
|
||||
def test_http_request_node_binary_file(monkeypatch):
|
||||
@ -189,7 +190,8 @@ def test_http_request_node_form_with_file(monkeypatch):
|
||||
|
||||
def attr_checker(*args, **kwargs):
|
||||
assert kwargs["data"] == {"name": "test"}
|
||||
assert kwargs["files"] == {"file": (None, b"test", "application/octet-stream")}
|
||||
assert kwargs["files"] == {
|
||||
"file": (None, b"test", "application/octet-stream")}
|
||||
return httpx.Response(200, content=b"")
|
||||
|
||||
monkeypatch.setattr(
|
||||
|
||||
Reference in New Issue
Block a user