Merge branch 'main' into feat/mcp

This commit is contained in:
Novice
2025-07-09 09:41:42 +08:00
234 changed files with 8742 additions and 1254 deletions

View File

@ -66,11 +66,21 @@ class WorkflowNodeExecution(BaseModel):
but they are not stored in the model.
"""
# Core identification fields
id: str # Unique identifier for this execution record
node_execution_id: Optional[str] = None # Optional secondary ID for cross-referencing
# --------- Core identification fields ---------
# Unique identifier for this execution record, used when persisting to storage.
# Value is a UUID string (e.g., '09b3e04c-f9ae-404c-ad82-290b8d7bd382').
id: str
# Optional secondary ID for cross-referencing purposes.
#
# NOTE: For referencing the persisted record, use `id` rather than `node_execution_id`.
# While `node_execution_id` may sometimes be a UUID string, this is not guaranteed.
# In most scenarios, `id` should be used as the primary identifier.
node_execution_id: Optional[str] = None
workflow_id: str # ID of the workflow this node belongs to
workflow_execution_id: Optional[str] = None # ID of the specific workflow run (null for single-step debugging)
# --------- Core identification fields ends ---------
# Execution positioning and flow
index: int # Sequence number for ordering in trace visualization

View File

@ -103,7 +103,7 @@ class GraphEngine:
call_depth: int,
graph: Graph,
graph_config: Mapping[str, Any],
variable_pool: VariablePool,
graph_runtime_state: GraphRuntimeState,
max_execution_steps: int,
max_execution_time: int,
thread_pool_id: Optional[str] = None,
@ -140,7 +140,7 @@ class GraphEngine:
call_depth=call_depth,
)
self.graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
self.graph_runtime_state = graph_runtime_state
self.max_execution_steps = max_execution_steps
self.max_execution_time = max_execution_time

View File

@ -1,4 +1,5 @@
import json
import uuid
from collections.abc import Generator, Mapping, Sequence
from typing import Any, Optional, cast
@ -15,7 +16,7 @@ from core.model_runtime.entities.model_entities import AIModelEntity, ModelType
from core.plugin.impl.exc import PluginDaemonClientSideError
from core.plugin.impl.plugin import PluginInstaller
from core.provider_manager import ProviderManager
from core.tools.entities.tool_entities import ToolParameter, ToolProviderType
from core.tools.entities.tool_entities import ToolInvokeMessage, ToolParameter, ToolProviderType
from core.tools.tool_manager import ToolManager
from core.variables.segments import StringSegment
from core.workflow.entities.node_entities import NodeRunResult
@ -106,6 +107,32 @@ class AgentNode(ToolNode):
try:
# convert tool messages
agent_thoughts: list = []
thought_log_message = ToolInvokeMessage(
type=ToolInvokeMessage.MessageType.LOG,
message=ToolInvokeMessage.LogMessage(
id=str(uuid.uuid4()),
label=f"Agent Strategy: {cast(AgentNodeData, self.node_data).agent_strategy_name}",
parent_id=None,
error=None,
status=ToolInvokeMessage.LogMessage.LogStatus.START,
data={
"strategy": cast(AgentNodeData, self.node_data).agent_strategy_name,
"parameters": parameters_for_log,
"thought_process": "Agent strategy execution started",
},
metadata={
"icon": self.agent_strategy_icon,
"agent_strategy": cast(AgentNodeData, self.node_data).agent_strategy_name,
},
),
)
def enhanced_message_stream():
yield thought_log_message
yield from message_stream
yield from self._transform_message(
message_stream,
@ -114,6 +141,7 @@ class AgentNode(ToolNode):
"agent_strategy": cast(AgentNodeData, self.node_data).agent_strategy_name,
},
parameters_for_log,
agent_thoughts,
)
except PluginDaemonClientSideError as e:
yield RunCompletedEvent(

View File

@ -2,7 +2,6 @@ import logging
from collections.abc import Generator
from typing import cast
from core.file import FILE_MODEL_IDENTITY, File
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.graph_engine.entities.event import (
GraphEngineEvent,
@ -201,44 +200,3 @@ class AnswerStreamProcessor(StreamProcessor):
stream_out_answer_node_ids.append(answer_node_id)
return stream_out_answer_node_ids
@classmethod
def _fetch_files_from_variable_value(cls, value: dict | list) -> list[dict]:
"""
Fetch files from variable value
:param value: variable value
:return:
"""
if not value:
return []
files = []
if isinstance(value, list):
for item in value:
file_var = cls._get_file_var_from_value(item)
if file_var:
files.append(file_var)
elif isinstance(value, dict):
file_var = cls._get_file_var_from_value(value)
if file_var:
files.append(file_var)
return files
@classmethod
def _get_file_var_from_value(cls, value: dict | list):
"""
Get file var from value
:param value: variable value
:return:
"""
if not value:
return None
if isinstance(value, dict):
if "dify_model_identity" in value and value["dify_model_identity"] == FILE_MODEL_IDENTITY:
return value
elif isinstance(value, File):
return value.to_dict()
return None

View File

@ -8,6 +8,7 @@ from typing import Any, Literal
from urllib.parse import urlencode, urlparse
import httpx
from json_repair import repair_json
from configs import dify_config
from core.file import file_manager
@ -178,7 +179,8 @@ class Executor:
raise RequestBodyError("json body type should have exactly one item")
json_string = self.variable_pool.convert_template(data[0].value).text
try:
json_object = json.loads(json_string, strict=False)
repaired = repair_json(json_string)
json_object = json.loads(repaired, strict=False)
except json.JSONDecodeError as e:
raise RequestBodyError(f"Failed to parse JSON: {json_string}") from e
self.json = json_object
@ -333,7 +335,7 @@ class Executor:
try:
response = getattr(ssrf_proxy, self.method.lower())(**request_args)
except (ssrf_proxy.MaxRetriesExceededError, httpx.RequestError) as e:
raise HttpRequestNodeError(str(e))
raise HttpRequestNodeError(str(e)) from e
# FIXME: fix type ignore, this maybe httpx type issue
return response # type: ignore

View File

@ -1,5 +1,6 @@
import contextvars
import logging
import time
import uuid
from collections.abc import Generator, Mapping, Sequence
from concurrent.futures import Future, wait
@ -133,8 +134,11 @@ class IterationNode(BaseNode[IterationNodeData]):
variable_pool.add([self.node_id, "item"], iterator_list_value[0])
# init graph engine
from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState
from core.workflow.graph_engine.graph_engine import GraphEngine, GraphEngineThreadPool
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
graph_engine = GraphEngine(
tenant_id=self.tenant_id,
app_id=self.app_id,
@ -146,7 +150,7 @@ class IterationNode(BaseNode[IterationNodeData]):
call_depth=self.workflow_call_depth,
graph=iteration_graph,
graph_config=graph_config,
variable_pool=variable_pool,
graph_runtime_state=graph_runtime_state,
max_execution_steps=dify_config.WORKFLOW_MAX_EXECUTION_STEPS,
max_execution_time=dify_config.WORKFLOW_MAX_EXECUTION_TIME,
thread_pool_id=self.thread_pool_id,

View File

@ -490,6 +490,9 @@ class KnowledgeRetrievalNode(LLMNode):
def _process_metadata_filter_func(
self, sequence: int, condition: str, metadata_name: str, value: Optional[Any], filters: list
):
if value is None:
return
key = f"{metadata_name}_{sequence}"
key_value = f"{metadata_name}_{sequence}_value"
match condition:

View File

@ -221,15 +221,6 @@ class LLMNode(BaseNode[LLMNodeData]):
jinja2_variables=self.node_data.prompt_config.jinja2_variables,
)
process_data = {
"model_mode": model_config.mode,
"prompts": PromptMessageUtil.prompt_messages_to_prompt_for_saving(
model_mode=model_config.mode, prompt_messages=prompt_messages
),
"model_provider": model_config.provider,
"model_name": model_config.model,
}
# handle invoke result
generator = self._invoke_llm(
node_data_model=self.node_data.model,
@ -253,6 +244,17 @@ class LLMNode(BaseNode[LLMNodeData]):
elif isinstance(event, LLMStructuredOutput):
structured_output = event
process_data = {
"model_mode": model_config.mode,
"prompts": PromptMessageUtil.prompt_messages_to_prompt_for_saving(
model_mode=model_config.mode, prompt_messages=prompt_messages
),
"usage": jsonable_encoder(usage),
"finish_reason": finish_reason,
"model_provider": model_config.provider,
"model_name": model_config.model,
}
outputs = {"text": result_text, "usage": jsonable_encoder(usage), "finish_reason": finish_reason}
if structured_output:
outputs["structured_output"] = structured_output.structured_output

View File

@ -1,5 +1,6 @@
import json
import logging
import time
from collections.abc import Generator, Mapping, Sequence
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any, Literal, cast
@ -101,8 +102,11 @@ class LoopNode(BaseNode[LoopNodeData]):
loop_variable_selectors[loop_variable.label] = variable_selector
inputs[loop_variable.label] = processed_segment.value
from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState
from core.workflow.graph_engine.graph_engine import GraphEngine
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
graph_engine = GraphEngine(
tenant_id=self.tenant_id,
app_id=self.app_id,
@ -114,7 +118,7 @@ class LoopNode(BaseNode[LoopNodeData]):
call_depth=self.workflow_call_depth,
graph=loop_graph,
graph_config=self.graph_config,
variable_pool=variable_pool,
graph_runtime_state=graph_runtime_state,
max_execution_steps=dify_config.WORKFLOW_MAX_EXECUTION_STEPS,
max_execution_time=dify_config.WORKFLOW_MAX_EXECUTION_TIME,
thread_pool_id=self.thread_pool_id,

View File

@ -253,7 +253,12 @@ class ParameterExtractorNode(BaseNode):
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs=inputs,
process_data=process_data,
outputs={"__is_success": 1 if not error else 0, "__reason": error, **result},
outputs={
"__is_success": 1 if not error else 0,
"__reason": error,
"__usage": jsonable_encoder(usage),
**result,
},
metadata={
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens,
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price,

View File

@ -145,7 +145,11 @@ class QuestionClassifierNode(LLMNode):
"model_provider": model_config.provider,
"model_name": model_config.model,
}
outputs = {"class_name": category_name, "class_id": category_id}
outputs = {
"class_name": category_name,
"class_id": category_id,
"usage": jsonable_encoder(usage),
}
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,

View File

@ -1,11 +1,12 @@
from collections.abc import Generator, Mapping, Sequence
from typing import Any, cast
from typing import Any, Optional, cast
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.callback_handler.workflow_tool_callback_handler import DifyWorkflowCallbackHandler
from core.file import File, FileTransferMethod
from core.model_runtime.entities.llm_entities import LLMUsage
from core.plugin.impl.exc import PluginDaemonClientSideError
from core.plugin.impl.plugin import PluginInstaller
from core.tools.entities.tool_entities import ToolInvokeMessage, ToolParameter
@ -190,6 +191,7 @@ class ToolNode(BaseNode[ToolNodeData]):
messages: Generator[ToolInvokeMessage, None, None],
tool_info: Mapping[str, Any],
parameters_for_log: dict[str, Any],
agent_thoughts: Optional[list] = None,
) -> Generator:
"""
Convert ToolInvokeMessages into tuple[plain_text, files]
@ -208,7 +210,7 @@ class ToolNode(BaseNode[ToolNodeData]):
agent_logs: list[AgentLogEvent] = []
agent_execution_metadata: Mapping[WorkflowNodeExecutionMetadataKey, Any] = {}
llm_usage: LLMUsage | None = None
variables: dict[str, Any] = {}
for message in message_stream:
@ -276,9 +278,10 @@ class ToolNode(BaseNode[ToolNodeData]):
elif message.type == ToolInvokeMessage.MessageType.JSON:
assert isinstance(message.message, ToolInvokeMessage.JsonMessage)
if self.node_type == NodeType.AGENT:
msg_metadata = message.message.json_object.pop("execution_metadata", {})
msg_metadata: dict[str, Any] = message.message.json_object.pop("execution_metadata", {})
llm_usage = LLMUsage.from_metadata(msg_metadata)
agent_execution_metadata = {
key: value
WorkflowNodeExecutionMetadataKey(key): value
for key, value in msg_metadata.items()
if key in WorkflowNodeExecutionMetadataKey.__members__.values()
}
@ -366,17 +369,42 @@ class ToolNode(BaseNode[ToolNodeData]):
agent_logs.append(agent_log)
yield agent_log
# Add agent_logs to outputs['json'] to ensure frontend can access thinking process
json_output: dict[str, Any] = {}
if json:
if isinstance(json, list) and len(json) == 1:
# If json is a list with only one element, convert it to a dictionary
json_output = json[0] if isinstance(json[0], dict) else {"data": json[0]}
elif isinstance(json, list):
# If json is a list with multiple elements, create a dictionary containing all data
json_output = {"data": json}
if agent_logs:
# Add agent_logs to json output
json_output["agent_logs"] = [
{
"id": log.id,
"parent_id": log.parent_id,
"error": log.error,
"status": log.status,
"data": log.data,
"label": log.label,
"metadata": log.metadata,
"node_id": log.node_id,
}
for log in agent_logs
]
yield RunCompletedEvent(
run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
outputs={"text": text, "files": ArrayFileSegment(value=files), "json": json, **variables},
outputs={"text": text, "files": ArrayFileSegment(value=files), "json": json_output, **variables},
metadata={
**agent_execution_metadata,
WorkflowNodeExecutionMetadataKey.TOOL_INFO: tool_info,
WorkflowNodeExecutionMetadataKey.AGENT_LOG: agent_logs,
},
inputs=parameters_for_log,
llm_usage=llm_usage,
)
)

View File

@ -0,0 +1,32 @@
import abc
from collections.abc import Mapping
from typing import Any, Protocol
from sqlalchemy.orm import Session
from core.workflow.nodes.enums import NodeType
class DraftVariableSaver(Protocol):
@abc.abstractmethod
def save(self, process_data: Mapping[str, Any] | None, outputs: Mapping[str, Any] | None):
pass
class DraftVariableSaverFactory(Protocol):
@abc.abstractmethod
def __call__(
self,
session: Session,
app_id: str,
node_id: str,
node_type: NodeType,
node_execution_id: str,
enclosing_node_id: str | None = None,
) -> "DraftVariableSaver":
pass
class NoopDraftVariableSaver(DraftVariableSaver):
def save(self, process_data: Mapping[str, Any] | None, outputs: Mapping[str, Any] | None):
pass

View File

@ -27,6 +27,7 @@ from core.workflow.enums import SystemVariableKey
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.workflow_entry import WorkflowEntry
from libs.datetime_utils import naive_utc_now
@dataclass
@ -160,12 +161,13 @@ class WorkflowCycleManager:
exceptions_count: int = 0,
) -> WorkflowExecution:
workflow_execution = self._get_workflow_execution_or_raise_error(workflow_run_id)
now = naive_utc_now()
workflow_execution.status = WorkflowExecutionStatus(status.value)
workflow_execution.error_message = error_message
workflow_execution.total_tokens = total_tokens
workflow_execution.total_steps = total_steps
workflow_execution.finished_at = datetime.now(UTC).replace(tzinfo=None)
workflow_execution.finished_at = now
workflow_execution.exceptions_count = exceptions_count
# Use the instance repository to find running executions for a workflow run
@ -174,7 +176,6 @@ class WorkflowCycleManager:
)
# Update the domain models
now = datetime.now(UTC).replace(tzinfo=None)
for node_execution in running_node_executions:
if node_execution.node_execution_id:
# Update the domain model

View File

@ -69,6 +69,7 @@ class WorkflowEntry:
raise ValueError("Max workflow call depth {} reached.".format(workflow_call_max_depth))
# init workflow run state
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
self.graph_engine = GraphEngine(
tenant_id=tenant_id,
app_id=app_id,
@ -80,7 +81,7 @@ class WorkflowEntry:
call_depth=call_depth,
graph=graph,
graph_config=graph_config,
variable_pool=variable_pool,
graph_runtime_state=graph_runtime_state,
max_execution_steps=dify_config.WORKFLOW_MAX_EXECUTION_STEPS,
max_execution_time=dify_config.WORKFLOW_MAX_EXECUTION_TIME,
thread_pool_id=thread_pool_id,