mirror of
https://github.com/langgenius/dify.git
synced 2026-05-05 09:58:04 +08:00
Merge branch 'main' into deploy/dev
This commit is contained in:
@ -90,6 +90,7 @@ class AppQueueManager:
|
||||
"""
|
||||
self._clear_task_belong_cache()
|
||||
self._q.put(None)
|
||||
self._graph_runtime_state = None # Release reference to allow GC to reclaim memory
|
||||
|
||||
def _clear_task_belong_cache(self) -> None:
|
||||
"""
|
||||
|
||||
@ -1,9 +1,14 @@
|
||||
from collections.abc import Mapping
|
||||
from textwrap import dedent
|
||||
from typing import Any
|
||||
|
||||
from core.helper.code_executor.template_transformer import TemplateTransformer
|
||||
|
||||
|
||||
class Jinja2TemplateTransformer(TemplateTransformer):
|
||||
# Use separate placeholder for base64-encoded template to avoid confusion
|
||||
_template_b64_placeholder: str = "{{template_b64}}"
|
||||
|
||||
@classmethod
|
||||
def transform_response(cls, response: str):
|
||||
"""
|
||||
@ -13,18 +18,35 @@ class Jinja2TemplateTransformer(TemplateTransformer):
|
||||
"""
|
||||
return {"result": cls.extract_result_str_from_response(response)}
|
||||
|
||||
@classmethod
|
||||
def assemble_runner_script(cls, code: str, inputs: Mapping[str, Any]) -> str:
|
||||
"""
|
||||
Override base class to use base64 encoding for template code.
|
||||
This prevents issues with special characters (quotes, newlines) in templates
|
||||
breaking the generated Python script. Fixes #26818.
|
||||
"""
|
||||
script = cls.get_runner_script()
|
||||
# Encode template as base64 to safely embed any content including quotes
|
||||
code_b64 = cls.serialize_code(code)
|
||||
script = script.replace(cls._template_b64_placeholder, code_b64)
|
||||
inputs_str = cls.serialize_inputs(inputs)
|
||||
script = script.replace(cls._inputs_placeholder, inputs_str)
|
||||
return script
|
||||
|
||||
@classmethod
|
||||
def get_runner_script(cls) -> str:
|
||||
runner_script = dedent(f"""
|
||||
# declare main function
|
||||
def main(**inputs):
|
||||
import jinja2
|
||||
template = jinja2.Template('''{cls._code_placeholder}''')
|
||||
return template.render(**inputs)
|
||||
|
||||
import jinja2
|
||||
import json
|
||||
from base64 import b64decode
|
||||
|
||||
# declare main function
|
||||
def main(**inputs):
|
||||
# Decode base64-encoded template to handle special characters safely
|
||||
template_code = b64decode('{cls._template_b64_placeholder}').decode('utf-8')
|
||||
template = jinja2.Template(template_code)
|
||||
return template.render(**inputs)
|
||||
|
||||
# decode and prepare input dict
|
||||
inputs_obj = json.loads(b64decode('{cls._inputs_placeholder}').decode('utf-8'))
|
||||
|
||||
|
||||
@ -13,6 +13,15 @@ class TemplateTransformer(ABC):
|
||||
_inputs_placeholder: str = "{{inputs}}"
|
||||
_result_tag: str = "<<RESULT>>"
|
||||
|
||||
@classmethod
|
||||
def serialize_code(cls, code: str) -> str:
|
||||
"""
|
||||
Serialize template code to base64 to safely embed in generated script.
|
||||
This prevents issues with special characters like quotes breaking the script.
|
||||
"""
|
||||
code_bytes = code.encode("utf-8")
|
||||
return b64encode(code_bytes).decode("utf-8")
|
||||
|
||||
@classmethod
|
||||
def transform_caller(cls, code: str, inputs: Mapping[str, Any]) -> tuple[str, str]:
|
||||
"""
|
||||
|
||||
@ -1,56 +0,0 @@
|
||||
import json
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from core.tools.entities.api_entities import ToolProviderTypeApiLiteral
|
||||
from extensions.ext_redis import redis_client, redis_fallback
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ToolProviderListCache:
|
||||
"""Cache for tool provider lists"""
|
||||
|
||||
CACHE_TTL = 300 # 5 minutes
|
||||
|
||||
@staticmethod
|
||||
def _generate_cache_key(tenant_id: str, typ: ToolProviderTypeApiLiteral = None) -> str:
|
||||
"""Generate cache key for tool providers list"""
|
||||
type_filter = typ or "all"
|
||||
return f"tool_providers:tenant_id:{tenant_id}:type:{type_filter}"
|
||||
|
||||
@staticmethod
|
||||
@redis_fallback(default_return=None)
|
||||
def get_cached_providers(tenant_id: str, typ: ToolProviderTypeApiLiteral = None) -> list[dict[str, Any]] | None:
|
||||
"""Get cached tool providers"""
|
||||
cache_key = ToolProviderListCache._generate_cache_key(tenant_id, typ)
|
||||
cached_data = redis_client.get(cache_key)
|
||||
if cached_data:
|
||||
try:
|
||||
return json.loads(cached_data.decode("utf-8"))
|
||||
except (json.JSONDecodeError, UnicodeDecodeError):
|
||||
logger.warning("Failed to decode cached tool providers data")
|
||||
return None
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
@redis_fallback()
|
||||
def set_cached_providers(tenant_id: str, typ: ToolProviderTypeApiLiteral, providers: list[dict[str, Any]]):
|
||||
"""Cache tool providers"""
|
||||
cache_key = ToolProviderListCache._generate_cache_key(tenant_id, typ)
|
||||
redis_client.setex(cache_key, ToolProviderListCache.CACHE_TTL, json.dumps(providers))
|
||||
|
||||
@staticmethod
|
||||
@redis_fallback()
|
||||
def invalidate_cache(tenant_id: str, typ: ToolProviderTypeApiLiteral = None):
|
||||
"""Invalidate cache for tool providers"""
|
||||
if typ:
|
||||
# Invalidate specific type cache
|
||||
cache_key = ToolProviderListCache._generate_cache_key(tenant_id, typ)
|
||||
redis_client.delete(cache_key)
|
||||
else:
|
||||
# Invalidate all caches for this tenant
|
||||
pattern = f"tool_providers:tenant_id:{tenant_id}:*"
|
||||
keys = list(redis_client.scan_iter(pattern))
|
||||
if keys:
|
||||
redis_client.delete(*keys)
|
||||
@ -313,17 +313,20 @@ class StreamableHTTPTransport:
|
||||
if is_initialization:
|
||||
self._maybe_extract_session_id_from_response(response)
|
||||
|
||||
content_type = cast(str, response.headers.get(CONTENT_TYPE, "").lower())
|
||||
# Per https://modelcontextprotocol.io/specification/2025-06-18/basic#notifications:
|
||||
# The server MUST NOT send a response to notifications.
|
||||
if isinstance(message.root, JSONRPCRequest):
|
||||
content_type = cast(str, response.headers.get(CONTENT_TYPE, "").lower())
|
||||
|
||||
if content_type.startswith(JSON):
|
||||
self._handle_json_response(response, ctx.server_to_client_queue)
|
||||
elif content_type.startswith(SSE):
|
||||
self._handle_sse_response(response, ctx)
|
||||
else:
|
||||
self._handle_unexpected_content_type(
|
||||
content_type,
|
||||
ctx.server_to_client_queue,
|
||||
)
|
||||
if content_type.startswith(JSON):
|
||||
self._handle_json_response(response, ctx.server_to_client_queue)
|
||||
elif content_type.startswith(SSE):
|
||||
self._handle_sse_response(response, ctx)
|
||||
else:
|
||||
self._handle_unexpected_content_type(
|
||||
content_type,
|
||||
ctx.server_to_client_queue,
|
||||
)
|
||||
|
||||
def _handle_json_response(
|
||||
self,
|
||||
|
||||
@ -76,7 +76,7 @@ class PluginParameter(BaseModel):
|
||||
auto_generate: PluginParameterAutoGenerate | None = None
|
||||
template: PluginParameterTemplate | None = None
|
||||
required: bool = False
|
||||
default: Union[float, int, str, bool] | None = None
|
||||
default: Union[float, int, str, bool, list, dict] | None = None
|
||||
min: Union[float, int] | None = None
|
||||
max: Union[float, int] | None = None
|
||||
precision: int | None = None
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import concurrent.futures
|
||||
import logging
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Any
|
||||
|
||||
@ -13,7 +14,7 @@ from core.model_runtime.entities.model_entities import ModelType
|
||||
from core.rag.data_post_processor.data_post_processor import DataPostProcessor
|
||||
from core.rag.datasource.keyword.keyword_factory import Keyword
|
||||
from core.rag.datasource.vdb.vector_factory import Vector
|
||||
from core.rag.embedding.retrieval import RetrievalSegments
|
||||
from core.rag.embedding.retrieval import RetrievalChildChunk, RetrievalSegments
|
||||
from core.rag.entities.metadata_entities import MetadataCondition
|
||||
from core.rag.index_processor.constant.doc_type import DocType
|
||||
from core.rag.index_processor.constant.index_type import IndexStructureType
|
||||
@ -36,6 +37,8 @@ default_retrieval_model = {
|
||||
"score_threshold_enabled": False,
|
||||
}
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RetrievalService:
|
||||
# Cache precompiled regular expressions to avoid repeated compilation
|
||||
@ -106,7 +109,12 @@ class RetrievalService:
|
||||
)
|
||||
)
|
||||
|
||||
concurrent.futures.wait(futures, timeout=3600, return_when=concurrent.futures.ALL_COMPLETED)
|
||||
if futures:
|
||||
for future in concurrent.futures.as_completed(futures, timeout=3600):
|
||||
if exceptions:
|
||||
for f in futures:
|
||||
f.cancel()
|
||||
break
|
||||
|
||||
if exceptions:
|
||||
raise ValueError(";\n".join(exceptions))
|
||||
@ -210,6 +218,7 @@ class RetrievalService:
|
||||
)
|
||||
all_documents.extend(documents)
|
||||
except Exception as e:
|
||||
logger.error(e, exc_info=True)
|
||||
exceptions.append(str(e))
|
||||
|
||||
@classmethod
|
||||
@ -303,6 +312,7 @@ class RetrievalService:
|
||||
else:
|
||||
all_documents.extend(documents)
|
||||
except Exception as e:
|
||||
logger.error(e, exc_info=True)
|
||||
exceptions.append(str(e))
|
||||
|
||||
@classmethod
|
||||
@ -351,6 +361,7 @@ class RetrievalService:
|
||||
else:
|
||||
all_documents.extend(documents)
|
||||
except Exception as e:
|
||||
logger.error(e, exc_info=True)
|
||||
exceptions.append(str(e))
|
||||
|
||||
@staticmethod
|
||||
@ -381,10 +392,9 @@ class RetrievalService:
|
||||
records = []
|
||||
include_segment_ids = set()
|
||||
segment_child_map = {}
|
||||
segment_file_map = {}
|
||||
|
||||
valid_dataset_documents = {}
|
||||
image_doc_ids = []
|
||||
image_doc_ids: list[Any] = []
|
||||
child_index_node_ids = []
|
||||
index_node_ids = []
|
||||
doc_to_document_map = {}
|
||||
@ -417,28 +427,39 @@ class RetrievalService:
|
||||
child_index_node_ids = [i for i in child_index_node_ids if i]
|
||||
index_node_ids = [i for i in index_node_ids if i]
|
||||
|
||||
segment_ids = []
|
||||
segment_ids: list[str] = []
|
||||
index_node_segments: list[DocumentSegment] = []
|
||||
segments: list[DocumentSegment] = []
|
||||
attachment_map = {}
|
||||
child_chunk_map = {}
|
||||
doc_segment_map = {}
|
||||
attachment_map: dict[str, list[dict[str, Any]]] = {}
|
||||
child_chunk_map: dict[str, list[ChildChunk]] = {}
|
||||
doc_segment_map: dict[str, list[str]] = {}
|
||||
|
||||
with session_factory.create_session() as session:
|
||||
attachments = cls.get_segment_attachment_infos(image_doc_ids, session)
|
||||
|
||||
for attachment in attachments:
|
||||
segment_ids.append(attachment["segment_id"])
|
||||
attachment_map[attachment["segment_id"]] = attachment
|
||||
doc_segment_map[attachment["segment_id"]] = attachment["attachment_id"]
|
||||
|
||||
if attachment["segment_id"] in attachment_map:
|
||||
attachment_map[attachment["segment_id"]].append(attachment["attachment_info"])
|
||||
else:
|
||||
attachment_map[attachment["segment_id"]] = [attachment["attachment_info"]]
|
||||
if attachment["segment_id"] in doc_segment_map:
|
||||
doc_segment_map[attachment["segment_id"]].append(attachment["attachment_id"])
|
||||
else:
|
||||
doc_segment_map[attachment["segment_id"]] = [attachment["attachment_id"]]
|
||||
child_chunk_stmt = select(ChildChunk).where(ChildChunk.index_node_id.in_(child_index_node_ids))
|
||||
child_index_nodes = session.execute(child_chunk_stmt).scalars().all()
|
||||
|
||||
for i in child_index_nodes:
|
||||
segment_ids.append(i.segment_id)
|
||||
child_chunk_map[i.segment_id] = i
|
||||
doc_segment_map[i.segment_id] = i.index_node_id
|
||||
if i.segment_id in child_chunk_map:
|
||||
child_chunk_map[i.segment_id].append(i)
|
||||
else:
|
||||
child_chunk_map[i.segment_id] = [i]
|
||||
if i.segment_id in doc_segment_map:
|
||||
doc_segment_map[i.segment_id].append(i.index_node_id)
|
||||
else:
|
||||
doc_segment_map[i.segment_id] = [i.index_node_id]
|
||||
|
||||
if index_node_ids:
|
||||
document_segment_stmt = select(DocumentSegment).where(
|
||||
@ -448,7 +469,7 @@ class RetrievalService:
|
||||
)
|
||||
index_node_segments = session.execute(document_segment_stmt).scalars().all() # type: ignore
|
||||
for index_node_segment in index_node_segments:
|
||||
doc_segment_map[index_node_segment.id] = index_node_segment.index_node_id
|
||||
doc_segment_map[index_node_segment.id] = [index_node_segment.index_node_id]
|
||||
if segment_ids:
|
||||
document_segment_stmt = select(DocumentSegment).where(
|
||||
DocumentSegment.enabled == True,
|
||||
@ -461,95 +482,86 @@ class RetrievalService:
|
||||
segments.extend(index_node_segments)
|
||||
|
||||
for segment in segments:
|
||||
doc_id = doc_segment_map.get(segment.id)
|
||||
child_chunk = child_chunk_map.get(segment.id)
|
||||
attachment_info = attachment_map.get(segment.id)
|
||||
child_chunks: list[ChildChunk] = child_chunk_map.get(segment.id, [])
|
||||
attachment_infos: list[dict[str, Any]] = attachment_map.get(segment.id, [])
|
||||
ds_dataset_document: DatasetDocument | None = valid_dataset_documents.get(segment.document_id)
|
||||
|
||||
if doc_id:
|
||||
document = doc_to_document_map[doc_id]
|
||||
ds_dataset_document: DatasetDocument | None = valid_dataset_documents.get(
|
||||
document.metadata.get("document_id")
|
||||
)
|
||||
|
||||
if ds_dataset_document and ds_dataset_document.doc_form == IndexStructureType.PARENT_CHILD_INDEX:
|
||||
if segment.id not in include_segment_ids:
|
||||
include_segment_ids.add(segment.id)
|
||||
if child_chunk:
|
||||
if ds_dataset_document and ds_dataset_document.doc_form == IndexStructureType.PARENT_CHILD_INDEX:
|
||||
if segment.id not in include_segment_ids:
|
||||
include_segment_ids.add(segment.id)
|
||||
if child_chunks or attachment_infos:
|
||||
child_chunk_details = []
|
||||
max_score = 0.0
|
||||
for child_chunk in child_chunks:
|
||||
document = doc_to_document_map[child_chunk.index_node_id]
|
||||
child_chunk_detail = {
|
||||
"id": child_chunk.id,
|
||||
"content": child_chunk.content,
|
||||
"position": child_chunk.position,
|
||||
"score": document.metadata.get("score", 0.0) if document else 0.0,
|
||||
}
|
||||
map_detail = {
|
||||
"max_score": document.metadata.get("score", 0.0) if document else 0.0,
|
||||
"child_chunks": [child_chunk_detail],
|
||||
}
|
||||
segment_child_map[segment.id] = map_detail
|
||||
record = {
|
||||
"segment": segment,
|
||||
child_chunk_details.append(child_chunk_detail)
|
||||
max_score = max(max_score, document.metadata.get("score", 0.0) if document else 0.0)
|
||||
for attachment_info in attachment_infos:
|
||||
file_document = doc_to_document_map[attachment_info["id"]]
|
||||
max_score = max(
|
||||
max_score, file_document.metadata.get("score", 0.0) if file_document else 0.0
|
||||
)
|
||||
|
||||
map_detail = {
|
||||
"max_score": max_score,
|
||||
"child_chunks": child_chunk_details,
|
||||
}
|
||||
if attachment_info:
|
||||
segment_file_map[segment.id] = [attachment_info]
|
||||
records.append(record)
|
||||
else:
|
||||
if child_chunk:
|
||||
child_chunk_detail = {
|
||||
"id": child_chunk.id,
|
||||
"content": child_chunk.content,
|
||||
"position": child_chunk.position,
|
||||
"score": document.metadata.get("score", 0.0),
|
||||
}
|
||||
if segment.id in segment_child_map:
|
||||
segment_child_map[segment.id]["child_chunks"].append(child_chunk_detail) # type: ignore
|
||||
segment_child_map[segment.id]["max_score"] = max(
|
||||
segment_child_map[segment.id]["max_score"],
|
||||
document.metadata.get("score", 0.0) if document else 0.0,
|
||||
)
|
||||
else:
|
||||
segment_child_map[segment.id] = {
|
||||
"max_score": document.metadata.get("score", 0.0) if document else 0.0,
|
||||
"child_chunks": [child_chunk_detail],
|
||||
}
|
||||
if attachment_info:
|
||||
if segment.id in segment_file_map:
|
||||
segment_file_map[segment.id].append(attachment_info)
|
||||
else:
|
||||
segment_file_map[segment.id] = [attachment_info]
|
||||
else:
|
||||
if segment.id not in include_segment_ids:
|
||||
include_segment_ids.add(segment.id)
|
||||
record = {
|
||||
"segment": segment,
|
||||
"score": document.metadata.get("score", 0.0), # type: ignore
|
||||
}
|
||||
if attachment_info:
|
||||
segment_file_map[segment.id] = [attachment_info]
|
||||
records.append(record)
|
||||
else:
|
||||
if attachment_info:
|
||||
attachment_infos = segment_file_map.get(segment.id, [])
|
||||
if attachment_info not in attachment_infos:
|
||||
attachment_infos.append(attachment_info)
|
||||
segment_file_map[segment.id] = attachment_infos
|
||||
segment_child_map[segment.id] = map_detail
|
||||
record: dict[str, Any] = {
|
||||
"segment": segment,
|
||||
}
|
||||
records.append(record)
|
||||
else:
|
||||
if segment.id not in include_segment_ids:
|
||||
include_segment_ids.add(segment.id)
|
||||
max_score = 0.0
|
||||
segment_document = doc_to_document_map.get(segment.index_node_id)
|
||||
if segment_document:
|
||||
max_score = max(max_score, segment_document.metadata.get("score", 0.0))
|
||||
for attachment_info in attachment_infos:
|
||||
file_doc = doc_to_document_map.get(attachment_info["id"])
|
||||
if file_doc:
|
||||
max_score = max(max_score, file_doc.metadata.get("score", 0.0))
|
||||
record = {
|
||||
"segment": segment,
|
||||
"score": max_score,
|
||||
}
|
||||
records.append(record)
|
||||
|
||||
# Add child chunks information to records
|
||||
for record in records:
|
||||
if record["segment"].id in segment_child_map:
|
||||
record["child_chunks"] = segment_child_map[record["segment"].id].get("child_chunks") # type: ignore
|
||||
record["score"] = segment_child_map[record["segment"].id]["max_score"] # type: ignore
|
||||
if record["segment"].id in segment_file_map:
|
||||
record["files"] = segment_file_map[record["segment"].id] # type: ignore[assignment]
|
||||
if record["segment"].id in attachment_map:
|
||||
record["files"] = attachment_map[record["segment"].id] # type: ignore[assignment]
|
||||
|
||||
result = []
|
||||
result: list[RetrievalSegments] = []
|
||||
for record in records:
|
||||
# Extract segment
|
||||
segment = record["segment"]
|
||||
|
||||
# Extract child_chunks, ensuring it's a list or None
|
||||
child_chunks = record.get("child_chunks")
|
||||
if not isinstance(child_chunks, list):
|
||||
child_chunks = None
|
||||
raw_child_chunks = record.get("child_chunks")
|
||||
child_chunks_list: list[RetrievalChildChunk] | None = None
|
||||
if isinstance(raw_child_chunks, list):
|
||||
# Sort by score descending
|
||||
sorted_chunks = sorted(raw_child_chunks, key=lambda x: x.get("score", 0.0), reverse=True)
|
||||
child_chunks_list = [
|
||||
RetrievalChildChunk(
|
||||
id=chunk["id"],
|
||||
content=chunk["content"],
|
||||
score=chunk.get("score", 0.0),
|
||||
position=chunk["position"],
|
||||
)
|
||||
for chunk in sorted_chunks
|
||||
]
|
||||
|
||||
# Extract files, ensuring it's a list or None
|
||||
files = record.get("files")
|
||||
@ -566,11 +578,11 @@ class RetrievalService:
|
||||
|
||||
# Create RetrievalSegments object
|
||||
retrieval_segment = RetrievalSegments(
|
||||
segment=segment, child_chunks=child_chunks, score=score, files=files
|
||||
segment=segment, child_chunks=child_chunks_list, score=score, files=files
|
||||
)
|
||||
result.append(retrieval_segment)
|
||||
|
||||
return result
|
||||
return sorted(result, key=lambda x: x.score if x.score is not None else 0.0, reverse=True)
|
||||
except Exception as e:
|
||||
db.session.rollback()
|
||||
raise e
|
||||
@ -662,7 +674,14 @@ class RetrievalService:
|
||||
document_ids_filter=document_ids_filter,
|
||||
)
|
||||
)
|
||||
concurrent.futures.wait(futures, timeout=300, return_when=concurrent.futures.ALL_COMPLETED)
|
||||
# Use as_completed for early error propagation - cancel remaining futures on first error
|
||||
if futures:
|
||||
for future in concurrent.futures.as_completed(futures, timeout=300):
|
||||
if future.exception():
|
||||
# Cancel remaining futures to avoid unnecessary waiting
|
||||
for f in futures:
|
||||
f.cancel()
|
||||
break
|
||||
|
||||
if exceptions:
|
||||
raise ValueError(";\n".join(exceptions))
|
||||
|
||||
@ -255,7 +255,10 @@ class PGVector(BaseVector):
|
||||
return
|
||||
|
||||
with self._get_cursor() as cur:
|
||||
cur.execute("CREATE EXTENSION IF NOT EXISTS vector")
|
||||
cur.execute("SELECT 1 FROM pg_extension WHERE extname = 'vector'")
|
||||
if not cur.fetchone():
|
||||
cur.execute("CREATE EXTENSION vector")
|
||||
|
||||
cur.execute(SQL_CREATE_TABLE.format(table_name=self.table_name, dimension=dimension))
|
||||
# PG hnsw index only support 2000 dimension or less
|
||||
# ref: https://github.com/pgvector/pgvector?tab=readme-ov-file#indexing
|
||||
|
||||
@ -7,7 +7,7 @@ from collections.abc import Generator, Mapping
|
||||
from typing import Any, Union, cast
|
||||
|
||||
from flask import Flask, current_app
|
||||
from sqlalchemy import and_, or_, select
|
||||
from sqlalchemy import and_, literal, or_, select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.app.app_config.entities import (
|
||||
@ -516,6 +516,9 @@ class DatasetRetrieval:
|
||||
].embedding_model_provider
|
||||
weights["vector_setting"]["embedding_model_name"] = available_datasets[0].embedding_model
|
||||
with measure_time() as timer:
|
||||
cancel_event = threading.Event()
|
||||
thread_exceptions: list[Exception] = []
|
||||
|
||||
if query:
|
||||
query_thread = threading.Thread(
|
||||
target=self._multiple_retrieve_thread,
|
||||
@ -534,6 +537,8 @@ class DatasetRetrieval:
|
||||
"score_threshold": score_threshold,
|
||||
"query": query,
|
||||
"attachment_id": None,
|
||||
"cancel_event": cancel_event,
|
||||
"thread_exceptions": thread_exceptions,
|
||||
},
|
||||
)
|
||||
all_threads.append(query_thread)
|
||||
@ -557,12 +562,25 @@ class DatasetRetrieval:
|
||||
"score_threshold": score_threshold,
|
||||
"query": None,
|
||||
"attachment_id": attachment_id,
|
||||
"cancel_event": cancel_event,
|
||||
"thread_exceptions": thread_exceptions,
|
||||
},
|
||||
)
|
||||
all_threads.append(attachment_thread)
|
||||
attachment_thread.start()
|
||||
for thread in all_threads:
|
||||
thread.join()
|
||||
|
||||
# Poll threads with short timeout to detect errors quickly (fail-fast)
|
||||
while any(t.is_alive() for t in all_threads):
|
||||
for thread in all_threads:
|
||||
thread.join(timeout=0.1)
|
||||
if thread_exceptions:
|
||||
cancel_event.set()
|
||||
break
|
||||
if thread_exceptions:
|
||||
break
|
||||
|
||||
if thread_exceptions:
|
||||
raise thread_exceptions[0]
|
||||
self._on_query(query, attachment_ids, dataset_ids, app_id, user_from, user_id)
|
||||
|
||||
if all_documents:
|
||||
@ -1036,7 +1054,7 @@ class DatasetRetrieval:
|
||||
if automatic_metadata_filters:
|
||||
conditions = []
|
||||
for sequence, filter in enumerate(automatic_metadata_filters):
|
||||
self._process_metadata_filter_func(
|
||||
self.process_metadata_filter_func(
|
||||
sequence,
|
||||
filter.get("condition"), # type: ignore
|
||||
filter.get("metadata_name"), # type: ignore
|
||||
@ -1072,7 +1090,7 @@ class DatasetRetrieval:
|
||||
value=expected_value,
|
||||
)
|
||||
)
|
||||
filters = self._process_metadata_filter_func(
|
||||
filters = self.process_metadata_filter_func(
|
||||
sequence,
|
||||
condition.comparison_operator,
|
||||
metadata_name,
|
||||
@ -1168,8 +1186,9 @@ class DatasetRetrieval:
|
||||
return None
|
||||
return automatic_metadata_filters
|
||||
|
||||
def _process_metadata_filter_func(
|
||||
self, sequence: int, condition: str, metadata_name: str, value: Any | None, filters: list
|
||||
@classmethod
|
||||
def process_metadata_filter_func(
|
||||
cls, sequence: int, condition: str, metadata_name: str, value: Any | None, filters: list
|
||||
):
|
||||
if value is None and condition not in ("empty", "not empty"):
|
||||
return filters
|
||||
@ -1218,6 +1237,20 @@ class DatasetRetrieval:
|
||||
|
||||
case "≥" | ">=":
|
||||
filters.append(DatasetDocument.doc_metadata[metadata_name].as_float() >= value)
|
||||
case "in" | "not in":
|
||||
if isinstance(value, str):
|
||||
value_list = [v.strip() for v in value.split(",") if v.strip()]
|
||||
elif isinstance(value, (list, tuple)):
|
||||
value_list = [str(v) for v in value if v is not None]
|
||||
else:
|
||||
value_list = [str(value)] if value is not None else []
|
||||
|
||||
if not value_list:
|
||||
# `field in []` is False, `field not in []` is True
|
||||
filters.append(literal(condition == "not in"))
|
||||
else:
|
||||
op = json_field.in_ if condition == "in" else json_field.notin_
|
||||
filters.append(op(value_list))
|
||||
case _:
|
||||
pass
|
||||
|
||||
@ -1389,40 +1422,53 @@ class DatasetRetrieval:
|
||||
score_threshold: float,
|
||||
query: str | None,
|
||||
attachment_id: str | None,
|
||||
cancel_event: threading.Event | None = None,
|
||||
thread_exceptions: list[Exception] | None = None,
|
||||
):
|
||||
with flask_app.app_context():
|
||||
threads = []
|
||||
all_documents_item: list[Document] = []
|
||||
index_type = None
|
||||
for dataset in available_datasets:
|
||||
index_type = dataset.indexing_technique
|
||||
document_ids_filter = None
|
||||
if dataset.provider != "external":
|
||||
if metadata_condition and not metadata_filter_document_ids:
|
||||
continue
|
||||
if metadata_filter_document_ids:
|
||||
document_ids = metadata_filter_document_ids.get(dataset.id, [])
|
||||
if document_ids:
|
||||
document_ids_filter = document_ids
|
||||
else:
|
||||
try:
|
||||
with flask_app.app_context():
|
||||
threads = []
|
||||
all_documents_item: list[Document] = []
|
||||
index_type = None
|
||||
for dataset in available_datasets:
|
||||
# Check for cancellation signal
|
||||
if cancel_event and cancel_event.is_set():
|
||||
break
|
||||
index_type = dataset.indexing_technique
|
||||
document_ids_filter = None
|
||||
if dataset.provider != "external":
|
||||
if metadata_condition and not metadata_filter_document_ids:
|
||||
continue
|
||||
retrieval_thread = threading.Thread(
|
||||
target=self._retriever,
|
||||
kwargs={
|
||||
"flask_app": flask_app,
|
||||
"dataset_id": dataset.id,
|
||||
"query": query,
|
||||
"top_k": top_k,
|
||||
"all_documents": all_documents_item,
|
||||
"document_ids_filter": document_ids_filter,
|
||||
"metadata_condition": metadata_condition,
|
||||
"attachment_ids": [attachment_id] if attachment_id else None,
|
||||
},
|
||||
)
|
||||
threads.append(retrieval_thread)
|
||||
retrieval_thread.start()
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
if metadata_filter_document_ids:
|
||||
document_ids = metadata_filter_document_ids.get(dataset.id, [])
|
||||
if document_ids:
|
||||
document_ids_filter = document_ids
|
||||
else:
|
||||
continue
|
||||
retrieval_thread = threading.Thread(
|
||||
target=self._retriever,
|
||||
kwargs={
|
||||
"flask_app": flask_app,
|
||||
"dataset_id": dataset.id,
|
||||
"query": query,
|
||||
"top_k": top_k,
|
||||
"all_documents": all_documents_item,
|
||||
"document_ids_filter": document_ids_filter,
|
||||
"metadata_condition": metadata_condition,
|
||||
"attachment_ids": [attachment_id] if attachment_id else None,
|
||||
},
|
||||
)
|
||||
threads.append(retrieval_thread)
|
||||
retrieval_thread.start()
|
||||
|
||||
# Poll threads with short timeout to respond quickly to cancellation
|
||||
while any(t.is_alive() for t in threads):
|
||||
for thread in threads:
|
||||
thread.join(timeout=0.1)
|
||||
if cancel_event and cancel_event.is_set():
|
||||
break
|
||||
if cancel_event and cancel_event.is_set():
|
||||
break
|
||||
|
||||
if reranking_enable:
|
||||
# do rerank for searched documents
|
||||
@ -1455,3 +1501,8 @@ class DatasetRetrieval:
|
||||
all_documents_item = all_documents_item[:top_k] if top_k else all_documents_item
|
||||
if all_documents_item:
|
||||
all_documents.extend(all_documents_item)
|
||||
except Exception as e:
|
||||
if cancel_event:
|
||||
cancel_event.set()
|
||||
if thread_exceptions is not None:
|
||||
thread_exceptions.append(e)
|
||||
|
||||
@ -6,7 +6,15 @@ from typing import Any
|
||||
|
||||
from core.mcp.auth_client import MCPClientWithAuthRetry
|
||||
from core.mcp.error import MCPConnectionError
|
||||
from core.mcp.types import AudioContent, CallToolResult, ImageContent, TextContent
|
||||
from core.mcp.types import (
|
||||
AudioContent,
|
||||
BlobResourceContents,
|
||||
CallToolResult,
|
||||
EmbeddedResource,
|
||||
ImageContent,
|
||||
TextContent,
|
||||
TextResourceContents,
|
||||
)
|
||||
from core.tools.__base.tool import Tool
|
||||
from core.tools.__base.tool_runtime import ToolRuntime
|
||||
from core.tools.entities.tool_entities import ToolEntity, ToolInvokeMessage, ToolProviderType
|
||||
@ -53,10 +61,19 @@ class MCPTool(Tool):
|
||||
for content in result.content:
|
||||
if isinstance(content, TextContent):
|
||||
yield from self._process_text_content(content)
|
||||
elif isinstance(content, ImageContent):
|
||||
yield self._process_image_content(content)
|
||||
elif isinstance(content, AudioContent):
|
||||
yield self._process_audio_content(content)
|
||||
elif isinstance(content, ImageContent | AudioContent):
|
||||
yield self.create_blob_message(
|
||||
blob=base64.b64decode(content.data), meta={"mime_type": content.mimeType}
|
||||
)
|
||||
elif isinstance(content, EmbeddedResource):
|
||||
resource = content.resource
|
||||
if isinstance(resource, TextResourceContents):
|
||||
yield self.create_text_message(resource.text)
|
||||
elif isinstance(resource, BlobResourceContents):
|
||||
mime_type = resource.mimeType or "application/octet-stream"
|
||||
yield self.create_blob_message(blob=base64.b64decode(resource.blob), meta={"mime_type": mime_type})
|
||||
else:
|
||||
raise ToolInvokeError(f"Unsupported embedded resource type: {type(resource)}")
|
||||
else:
|
||||
logger.warning("Unsupported content type=%s", type(content))
|
||||
|
||||
@ -101,14 +118,6 @@ class MCPTool(Tool):
|
||||
for item in json_list:
|
||||
yield self.create_json_message(item)
|
||||
|
||||
def _process_image_content(self, content: ImageContent) -> ToolInvokeMessage:
|
||||
"""Process image content and return a blob message."""
|
||||
return self.create_blob_message(blob=base64.b64decode(content.data), meta={"mime_type": content.mimeType})
|
||||
|
||||
def _process_audio_content(self, content: AudioContent) -> ToolInvokeMessage:
|
||||
"""Process audio content and return a blob message."""
|
||||
return self.create_blob_message(blob=base64.b64decode(content.data), meta={"mime_type": content.mimeType})
|
||||
|
||||
def fork_tool_runtime(self, runtime: ToolRuntime) -> "MCPTool":
|
||||
return MCPTool(
|
||||
entity=self.entity,
|
||||
|
||||
@ -5,6 +5,7 @@ from sqlalchemy.orm import Session
|
||||
|
||||
from core.app.app_config.entities import VariableEntity, VariableEntityType
|
||||
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
|
||||
from core.db.session_factory import session_factory
|
||||
from core.plugin.entities.parameters import PluginParameterOption
|
||||
from core.tools.__base.tool_provider import ToolProviderController
|
||||
from core.tools.__base.tool_runtime import ToolRuntime
|
||||
@ -47,33 +48,30 @@ class WorkflowToolProviderController(ToolProviderController):
|
||||
|
||||
@classmethod
|
||||
def from_db(cls, db_provider: WorkflowToolProvider) -> "WorkflowToolProviderController":
|
||||
with Session(db.engine, expire_on_commit=False) as session, session.begin():
|
||||
provider = session.get(WorkflowToolProvider, db_provider.id) if db_provider.id else None
|
||||
if not provider:
|
||||
raise ValueError("workflow provider not found")
|
||||
app = session.get(App, provider.app_id)
|
||||
with session_factory.create_session() as session, session.begin():
|
||||
app = session.get(App, db_provider.app_id)
|
||||
if not app:
|
||||
raise ValueError("app not found")
|
||||
|
||||
user = session.get(Account, provider.user_id) if provider.user_id else None
|
||||
user = session.get(Account, db_provider.user_id) if db_provider.user_id else None
|
||||
|
||||
controller = WorkflowToolProviderController(
|
||||
entity=ToolProviderEntity(
|
||||
identity=ToolProviderIdentity(
|
||||
author=user.name if user else "",
|
||||
name=provider.label,
|
||||
label=I18nObject(en_US=provider.label, zh_Hans=provider.label),
|
||||
description=I18nObject(en_US=provider.description, zh_Hans=provider.description),
|
||||
icon=provider.icon,
|
||||
name=db_provider.label,
|
||||
label=I18nObject(en_US=db_provider.label, zh_Hans=db_provider.label),
|
||||
description=I18nObject(en_US=db_provider.description, zh_Hans=db_provider.description),
|
||||
icon=db_provider.icon,
|
||||
),
|
||||
credentials_schema=[],
|
||||
plugin_id=None,
|
||||
),
|
||||
provider_id=provider.id or "",
|
||||
provider_id="",
|
||||
)
|
||||
|
||||
controller.tools = [
|
||||
controller._get_db_provider_tool(provider, app, session=session, user=user),
|
||||
controller._get_db_provider_tool(db_provider, app, session=session, user=user),
|
||||
]
|
||||
|
||||
return controller
|
||||
|
||||
@ -6,7 +6,7 @@ from collections import defaultdict
|
||||
from collections.abc import Mapping, Sequence
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
from sqlalchemy import and_, func, literal, or_, select
|
||||
from sqlalchemy import and_, func, or_, select
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from core.app.app_config.entities import DatasetRetrieveConfigEntity
|
||||
@ -460,7 +460,7 @@ class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeD
|
||||
if automatic_metadata_filters:
|
||||
conditions = []
|
||||
for sequence, filter in enumerate(automatic_metadata_filters):
|
||||
self._process_metadata_filter_func(
|
||||
DatasetRetrieval.process_metadata_filter_func(
|
||||
sequence,
|
||||
filter.get("condition", ""),
|
||||
filter.get("metadata_name", ""),
|
||||
@ -504,7 +504,7 @@ class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeD
|
||||
value=expected_value,
|
||||
)
|
||||
)
|
||||
filters = self._process_metadata_filter_func(
|
||||
filters = DatasetRetrieval.process_metadata_filter_func(
|
||||
sequence,
|
||||
condition.comparison_operator,
|
||||
metadata_name,
|
||||
@ -603,87 +603,6 @@ class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeD
|
||||
return [], usage
|
||||
return automatic_metadata_filters, usage
|
||||
|
||||
def _process_metadata_filter_func(
|
||||
self, sequence: int, condition: str, metadata_name: str, value: Any, filters: list[Any]
|
||||
) -> list[Any]:
|
||||
if value is None and condition not in ("empty", "not empty"):
|
||||
return filters
|
||||
|
||||
json_field = Document.doc_metadata[metadata_name].as_string()
|
||||
|
||||
match condition:
|
||||
case "contains":
|
||||
filters.append(json_field.like(f"%{value}%"))
|
||||
|
||||
case "not contains":
|
||||
filters.append(json_field.notlike(f"%{value}%"))
|
||||
|
||||
case "start with":
|
||||
filters.append(json_field.like(f"{value}%"))
|
||||
|
||||
case "end with":
|
||||
filters.append(json_field.like(f"%{value}"))
|
||||
case "in":
|
||||
if isinstance(value, str):
|
||||
value_list = [v.strip() for v in value.split(",") if v.strip()]
|
||||
elif isinstance(value, (list, tuple)):
|
||||
value_list = [str(v) for v in value if v is not None]
|
||||
else:
|
||||
value_list = [str(value)] if value is not None else []
|
||||
|
||||
if not value_list:
|
||||
filters.append(literal(False))
|
||||
else:
|
||||
filters.append(json_field.in_(value_list))
|
||||
|
||||
case "not in":
|
||||
if isinstance(value, str):
|
||||
value_list = [v.strip() for v in value.split(",") if v.strip()]
|
||||
elif isinstance(value, (list, tuple)):
|
||||
value_list = [str(v) for v in value if v is not None]
|
||||
else:
|
||||
value_list = [str(value)] if value is not None else []
|
||||
|
||||
if not value_list:
|
||||
filters.append(literal(True))
|
||||
else:
|
||||
filters.append(json_field.notin_(value_list))
|
||||
|
||||
case "is" | "=":
|
||||
if isinstance(value, str):
|
||||
filters.append(json_field == value)
|
||||
elif isinstance(value, (int, float)):
|
||||
filters.append(Document.doc_metadata[metadata_name].as_float() == value)
|
||||
|
||||
case "is not" | "≠":
|
||||
if isinstance(value, str):
|
||||
filters.append(json_field != value)
|
||||
elif isinstance(value, (int, float)):
|
||||
filters.append(Document.doc_metadata[metadata_name].as_float() != value)
|
||||
|
||||
case "empty":
|
||||
filters.append(Document.doc_metadata[metadata_name].is_(None))
|
||||
|
||||
case "not empty":
|
||||
filters.append(Document.doc_metadata[metadata_name].isnot(None))
|
||||
|
||||
case "before" | "<":
|
||||
filters.append(Document.doc_metadata[metadata_name].as_float() < value)
|
||||
|
||||
case "after" | ">":
|
||||
filters.append(Document.doc_metadata[metadata_name].as_float() > value)
|
||||
|
||||
case "≤" | "<=":
|
||||
filters.append(Document.doc_metadata[metadata_name].as_float() <= value)
|
||||
|
||||
case "≥" | ">=":
|
||||
filters.append(Document.doc_metadata[metadata_name].as_float() >= value)
|
||||
|
||||
case _:
|
||||
pass
|
||||
|
||||
return filters
|
||||
|
||||
@classmethod
|
||||
def _extract_variable_selector_to_variable_mapping(
|
||||
cls,
|
||||
|
||||
Reference in New Issue
Block a user