mirror of
https://github.com/langgenius/dify.git
synced 2026-05-04 17:38:04 +08:00
Merge branch 'fix/workflow-sync-draft' into deploy/dev
This commit is contained in:
@ -166,18 +166,22 @@ class WorkflowBasedAppRunner:
|
||||
|
||||
# Determine which type of single node execution and get graph/variable_pool
|
||||
if single_iteration_run:
|
||||
graph, variable_pool = self._get_graph_and_variable_pool_of_single_iteration(
|
||||
graph, variable_pool = self._get_graph_and_variable_pool_for_single_node_run(
|
||||
workflow=workflow,
|
||||
node_id=single_iteration_run.node_id,
|
||||
user_inputs=dict(single_iteration_run.inputs),
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
node_type_filter_key="iteration_id",
|
||||
node_type_label="iteration",
|
||||
)
|
||||
elif single_loop_run:
|
||||
graph, variable_pool = self._get_graph_and_variable_pool_of_single_loop(
|
||||
graph, variable_pool = self._get_graph_and_variable_pool_for_single_node_run(
|
||||
workflow=workflow,
|
||||
node_id=single_loop_run.node_id,
|
||||
user_inputs=dict(single_loop_run.inputs),
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
node_type_filter_key="loop_id",
|
||||
node_type_label="loop",
|
||||
)
|
||||
else:
|
||||
raise ValueError("Neither single_iteration_run nor single_loop_run is specified")
|
||||
@ -314,44 +318,6 @@ class WorkflowBasedAppRunner:
|
||||
|
||||
return graph, variable_pool
|
||||
|
||||
def _get_graph_and_variable_pool_of_single_iteration(
|
||||
self,
|
||||
workflow: Workflow,
|
||||
node_id: str,
|
||||
user_inputs: dict[str, Any],
|
||||
graph_runtime_state: GraphRuntimeState,
|
||||
) -> tuple[Graph, VariablePool]:
|
||||
"""
|
||||
Get variable pool of single iteration
|
||||
"""
|
||||
return self._get_graph_and_variable_pool_for_single_node_run(
|
||||
workflow=workflow,
|
||||
node_id=node_id,
|
||||
user_inputs=user_inputs,
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
node_type_filter_key="iteration_id",
|
||||
node_type_label="iteration",
|
||||
)
|
||||
|
||||
def _get_graph_and_variable_pool_of_single_loop(
|
||||
self,
|
||||
workflow: Workflow,
|
||||
node_id: str,
|
||||
user_inputs: dict[str, Any],
|
||||
graph_runtime_state: GraphRuntimeState,
|
||||
) -> tuple[Graph, VariablePool]:
|
||||
"""
|
||||
Get variable pool of single loop
|
||||
"""
|
||||
return self._get_graph_and_variable_pool_for_single_node_run(
|
||||
workflow=workflow,
|
||||
node_id=node_id,
|
||||
user_inputs=user_inputs,
|
||||
graph_runtime_state=graph_runtime_state,
|
||||
node_type_filter_key="loop_id",
|
||||
node_type_label="loop",
|
||||
)
|
||||
|
||||
def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent):
|
||||
"""
|
||||
Handle event
|
||||
|
||||
@ -154,7 +154,7 @@ class IrisConnectionPool:
|
||||
# Add to cache to skip future checks
|
||||
self._schemas_initialized.add(schema)
|
||||
|
||||
except Exception as e:
|
||||
except Exception:
|
||||
conn.rollback()
|
||||
logger.exception("Failed to ensure schema %s exists", schema)
|
||||
raise
|
||||
@ -177,6 +177,9 @@ class IrisConnectionPool:
|
||||
class IrisVector(BaseVector):
|
||||
"""IRIS vector database implementation using native VECTOR type and HNSW indexing."""
|
||||
|
||||
# Fallback score for full-text search when Rank function unavailable or TEXT_INDEX disabled
|
||||
_FULL_TEXT_FALLBACK_SCORE = 0.5
|
||||
|
||||
def __init__(self, collection_name: str, config: IrisVectorConfig) -> None:
|
||||
super().__init__(collection_name)
|
||||
self.config = config
|
||||
@ -272,41 +275,131 @@ class IrisVector(BaseVector):
|
||||
return docs
|
||||
|
||||
def search_by_full_text(self, query: str, **kwargs: Any) -> list[Document]:
|
||||
"""Search documents by full-text using iFind index or fallback to LIKE search."""
|
||||
"""Search documents by full-text using iFind index with BM25 relevance scoring.
|
||||
|
||||
When IRIS_TEXT_INDEX is enabled, this method uses the auto-generated Rank
|
||||
function from %iFind.Index.Basic to calculate BM25 relevance scores. The Rank
|
||||
function is automatically created with naming: {schema}.{table_name}_{index}Rank
|
||||
|
||||
Args:
|
||||
query: Search query string
|
||||
**kwargs: Optional parameters including top_k, document_ids_filter
|
||||
|
||||
Returns:
|
||||
List of Document objects with relevance scores in metadata["score"]
|
||||
"""
|
||||
top_k = kwargs.get("top_k", 5)
|
||||
document_ids_filter = kwargs.get("document_ids_filter")
|
||||
|
||||
with self._get_cursor() as cursor:
|
||||
if self.config.IRIS_TEXT_INDEX:
|
||||
# Use iFind full-text search with index
|
||||
# Use iFind full-text search with auto-generated Rank function
|
||||
text_index_name = f"idx_{self.table_name}_text"
|
||||
# IRIS removes underscores from function names
|
||||
table_no_underscore = self.table_name.replace("_", "")
|
||||
index_no_underscore = text_index_name.replace("_", "")
|
||||
rank_function = f"{self.schema}.{table_no_underscore}_{index_no_underscore}Rank"
|
||||
|
||||
# Build WHERE clause with document ID filter if provided
|
||||
where_clause = f"WHERE %ID %FIND search_index({text_index_name}, ?)"
|
||||
# First param for Rank function, second for FIND
|
||||
params = [query, query]
|
||||
|
||||
if document_ids_filter:
|
||||
# Add document ID filter
|
||||
placeholders = ",".join("?" * len(document_ids_filter))
|
||||
where_clause += f" AND JSON_VALUE(meta, '$.document_id') IN ({placeholders})"
|
||||
params.extend(document_ids_filter)
|
||||
|
||||
sql = f"""
|
||||
SELECT TOP {top_k} id, text, meta
|
||||
SELECT TOP {top_k}
|
||||
id,
|
||||
text,
|
||||
meta,
|
||||
{rank_function}(%ID, ?) AS score
|
||||
FROM {self.schema}.{self.table_name}
|
||||
WHERE %ID %FIND search_index({text_index_name}, ?)
|
||||
{where_clause}
|
||||
ORDER BY score DESC
|
||||
"""
|
||||
cursor.execute(sql, (query,))
|
||||
|
||||
logger.debug(
|
||||
"iFind search: query='%s', index='%s', rank='%s'",
|
||||
query,
|
||||
text_index_name,
|
||||
rank_function,
|
||||
)
|
||||
|
||||
try:
|
||||
cursor.execute(sql, params)
|
||||
except Exception: # pylint: disable=broad-exception-caught
|
||||
# Fallback to query without Rank function if it fails
|
||||
logger.warning(
|
||||
"Rank function '%s' failed, using fixed score",
|
||||
rank_function,
|
||||
exc_info=True,
|
||||
)
|
||||
sql_fallback = f"""
|
||||
SELECT TOP {top_k} id, text, meta, {self._FULL_TEXT_FALLBACK_SCORE} AS score
|
||||
FROM {self.schema}.{self.table_name}
|
||||
{where_clause}
|
||||
"""
|
||||
# Skip first param (for Rank function)
|
||||
cursor.execute(sql_fallback, params[1:])
|
||||
else:
|
||||
# Fallback to LIKE search (inefficient for large datasets)
|
||||
# Escape special characters for LIKE clause to prevent SQL injection
|
||||
from libs.helper import escape_like_pattern
|
||||
# Fallback to LIKE search (IRIS_TEXT_INDEX disabled)
|
||||
from libs.helper import ( # pylint: disable=import-outside-toplevel
|
||||
escape_like_pattern,
|
||||
)
|
||||
|
||||
escaped_query = escape_like_pattern(query)
|
||||
query_pattern = f"%{escaped_query}%"
|
||||
|
||||
# Build WHERE clause with document ID filter if provided
|
||||
where_clause = "WHERE text LIKE ? ESCAPE '\\\\'"
|
||||
params = [query_pattern]
|
||||
|
||||
if document_ids_filter:
|
||||
placeholders = ",".join("?" * len(document_ids_filter))
|
||||
where_clause += f" AND JSON_VALUE(meta, '$.document_id') IN ({placeholders})"
|
||||
params.extend(document_ids_filter)
|
||||
|
||||
sql = f"""
|
||||
SELECT TOP {top_k} id, text, meta
|
||||
SELECT TOP {top_k} id, text, meta, {self._FULL_TEXT_FALLBACK_SCORE} AS score
|
||||
FROM {self.schema}.{self.table_name}
|
||||
WHERE text LIKE ? ESCAPE '\\'
|
||||
{where_clause}
|
||||
ORDER BY LENGTH(text) ASC
|
||||
"""
|
||||
cursor.execute(sql, (query_pattern,))
|
||||
|
||||
logger.debug(
|
||||
"LIKE fallback (TEXT_INDEX disabled): query='%s'",
|
||||
query_pattern,
|
||||
)
|
||||
cursor.execute(sql, params)
|
||||
|
||||
docs = []
|
||||
for row in cursor.fetchall():
|
||||
if len(row) >= 3:
|
||||
metadata = json.loads(row[2]) if row[2] else {}
|
||||
docs.append(Document(page_content=row[1], metadata=metadata))
|
||||
# Expecting 4 columns: id, text, meta, score
|
||||
if len(row) >= 4:
|
||||
text_content = row[1]
|
||||
meta_str = row[2]
|
||||
score_value = row[3]
|
||||
|
||||
metadata = json.loads(meta_str) if meta_str else {}
|
||||
# Add score to metadata for hybrid search compatibility
|
||||
score = float(score_value) if score_value is not None else 0.0
|
||||
metadata["score"] = score
|
||||
|
||||
docs.append(Document(page_content=text_content, metadata=metadata))
|
||||
|
||||
logger.info(
|
||||
"Full-text search completed: query='%s', results=%d/%d",
|
||||
query,
|
||||
len(docs),
|
||||
top_k,
|
||||
)
|
||||
|
||||
if not docs:
|
||||
logger.info("Full-text search for '%s' returned no results", query)
|
||||
logger.warning("Full-text search for '%s' returned no results", query)
|
||||
|
||||
return docs
|
||||
|
||||
@ -370,7 +463,11 @@ class IrisVector(BaseVector):
|
||||
AS %iFind.Index.Basic
|
||||
(LANGUAGE = '{language}', LOWER = 1, INDEXOPTION = 0)
|
||||
"""
|
||||
logger.info("Creating text index: %s with language: %s", text_index_name, language)
|
||||
logger.info(
|
||||
"Creating text index: %s with language: %s",
|
||||
text_index_name,
|
||||
language,
|
||||
)
|
||||
logger.info("SQL for text index: %s", sql_text_index)
|
||||
cursor.execute(sql_text_index)
|
||||
logger.info("Text index created successfully: %s", text_index_name)
|
||||
|
||||
@ -130,7 +130,7 @@ class ToolInvokeMessage(BaseModel):
|
||||
text: str
|
||||
|
||||
class JsonMessage(BaseModel):
|
||||
json_object: dict
|
||||
json_object: dict | list
|
||||
suppress_output: bool = Field(default=False, description="Whether to suppress JSON output in result string")
|
||||
|
||||
class BlobMessage(BaseModel):
|
||||
@ -144,7 +144,14 @@ class ToolInvokeMessage(BaseModel):
|
||||
end: bool = Field(..., description="Whether the chunk is the last chunk")
|
||||
|
||||
class FileMessage(BaseModel):
|
||||
pass
|
||||
file_marker: str = Field(default="file_marker")
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def validate_file_message(cls, values):
|
||||
if isinstance(values, dict) and "file_marker" not in values:
|
||||
raise ValueError("Invalid FileMessage: missing file_marker")
|
||||
return values
|
||||
|
||||
class VariableMessage(BaseModel):
|
||||
variable_name: str = Field(..., description="The name of the variable")
|
||||
@ -234,10 +241,22 @@ class ToolInvokeMessage(BaseModel):
|
||||
|
||||
@field_validator("message", mode="before")
|
||||
@classmethod
|
||||
def decode_blob_message(cls, v):
|
||||
def decode_blob_message(cls, v, info: ValidationInfo):
|
||||
# 处理 blob 解码
|
||||
if isinstance(v, dict) and "blob" in v:
|
||||
with contextlib.suppress(Exception):
|
||||
v["blob"] = base64.b64decode(v["blob"])
|
||||
|
||||
# Force correct message type based on type field
|
||||
# Only wrap dict types to avoid wrapping already parsed Pydantic model objects
|
||||
if info.data and isinstance(info.data, dict) and isinstance(v, dict):
|
||||
msg_type = info.data.get("type")
|
||||
if msg_type == cls.MessageType.JSON:
|
||||
if "json_object" not in v:
|
||||
v = {"json_object": v}
|
||||
elif msg_type == cls.MessageType.FILE:
|
||||
v = {"file_marker": "file_marker"}
|
||||
|
||||
return v
|
||||
|
||||
@field_serializer("message")
|
||||
|
||||
@ -494,7 +494,7 @@ class AgentNode(Node[AgentNodeData]):
|
||||
|
||||
text = ""
|
||||
files: list[File] = []
|
||||
json_list: list[dict] = []
|
||||
json_list: list[dict | list] = []
|
||||
|
||||
agent_logs: list[AgentLogEvent] = []
|
||||
agent_execution_metadata: Mapping[WorkflowNodeExecutionMetadataKey, Any] = {}
|
||||
@ -568,13 +568,18 @@ class AgentNode(Node[AgentNodeData]):
|
||||
elif message.type == ToolInvokeMessage.MessageType.JSON:
|
||||
assert isinstance(message.message, ToolInvokeMessage.JsonMessage)
|
||||
if node_type == NodeType.AGENT:
|
||||
msg_metadata: dict[str, Any] = message.message.json_object.pop("execution_metadata", {})
|
||||
llm_usage = LLMUsage.from_metadata(cast(LLMUsageMetadata, msg_metadata))
|
||||
agent_execution_metadata = {
|
||||
WorkflowNodeExecutionMetadataKey(key): value
|
||||
for key, value in msg_metadata.items()
|
||||
if key in WorkflowNodeExecutionMetadataKey.__members__.values()
|
||||
}
|
||||
if isinstance(message.message.json_object, dict):
|
||||
msg_metadata: dict[str, Any] = message.message.json_object.pop("execution_metadata", {})
|
||||
llm_usage = LLMUsage.from_metadata(cast(LLMUsageMetadata, msg_metadata))
|
||||
agent_execution_metadata = {
|
||||
WorkflowNodeExecutionMetadataKey(key): value
|
||||
for key, value in msg_metadata.items()
|
||||
if key in WorkflowNodeExecutionMetadataKey.__members__.values()
|
||||
}
|
||||
else:
|
||||
msg_metadata = {}
|
||||
llm_usage = LLMUsage.empty_usage()
|
||||
agent_execution_metadata = {}
|
||||
if message.message.json_object:
|
||||
json_list.append(message.message.json_object)
|
||||
elif message.type == ToolInvokeMessage.MessageType.LINK:
|
||||
@ -683,7 +688,7 @@ class AgentNode(Node[AgentNodeData]):
|
||||
yield agent_log
|
||||
|
||||
# Add agent_logs to outputs['json'] to ensure frontend can access thinking process
|
||||
json_output: list[dict[str, Any]] = []
|
||||
json_output: list[dict[str, Any] | list[Any]] = []
|
||||
|
||||
# Step 1: append each agent log as its own dict.
|
||||
if agent_logs:
|
||||
|
||||
@ -301,7 +301,7 @@ class DatasourceNode(Node[DatasourceNodeData]):
|
||||
|
||||
text = ""
|
||||
files: list[File] = []
|
||||
json: list[dict] = []
|
||||
json: list[dict | list] = []
|
||||
|
||||
variables: dict[str, Any] = {}
|
||||
|
||||
|
||||
@ -244,7 +244,7 @@ class ToolNode(Node[ToolNodeData]):
|
||||
|
||||
text = ""
|
||||
files: list[File] = []
|
||||
json: list[dict] = []
|
||||
json: list[dict | list] = []
|
||||
|
||||
variables: dict[str, Any] = {}
|
||||
|
||||
@ -400,7 +400,7 @@ class ToolNode(Node[ToolNodeData]):
|
||||
message.message.metadata = dict_metadata
|
||||
|
||||
# Add agent_logs to outputs['json'] to ensure frontend can access thinking process
|
||||
json_output: list[dict[str, Any]] = []
|
||||
json_output: list[dict[str, Any] | list[Any]] = []
|
||||
|
||||
# Step 2: normalize JSON into {"data": [...]}.change json to list[dict]
|
||||
if json:
|
||||
|
||||
Reference in New Issue
Block a user