mirror of
https://github.com/langgenius/dify.git
synced 2026-05-03 17:08:03 +08:00
Merge branch 'plugins/beta' into dev/plugin-deploy
This commit is contained in:
@ -1,6 +1,7 @@
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import Generator
|
||||
from typing import Optional
|
||||
|
||||
from core.workflow.entities.variable_pool import VariablePool
|
||||
from core.workflow.graph_engine.entities.event import GraphEngineEvent, NodeRunExceptionEvent, NodeRunSucceededEvent
|
||||
@ -48,25 +49,35 @@ class StreamProcessor(ABC):
|
||||
# we remove the node maybe shortcut the answer node, so comment this code for now
|
||||
# there is not effect on the answer node and the workflow, when we have a better solution
|
||||
# we can open this code. Issues: #11542 #9560 #10638 #10564
|
||||
ids = self._fetch_node_ids_in_reachable_branch(edge.target_node_id)
|
||||
if "answer" in ids:
|
||||
continue
|
||||
else:
|
||||
reachable_node_ids.extend(ids)
|
||||
# ids = self._fetch_node_ids_in_reachable_branch(edge.target_node_id)
|
||||
# if "answer" in ids:
|
||||
# continue
|
||||
# else:
|
||||
# reachable_node_ids.extend(ids)
|
||||
|
||||
# The branch_identify parameter is added to ensure that
|
||||
# only nodes in the correct logical branch are included.
|
||||
ids = self._fetch_node_ids_in_reachable_branch(edge.target_node_id, run_result.edge_source_handle)
|
||||
reachable_node_ids.extend(ids)
|
||||
else:
|
||||
unreachable_first_node_ids.append(edge.target_node_id)
|
||||
|
||||
for node_id in unreachable_first_node_ids:
|
||||
self._remove_node_ids_in_unreachable_branch(node_id, reachable_node_ids)
|
||||
|
||||
def _fetch_node_ids_in_reachable_branch(self, node_id: str) -> list[str]:
|
||||
def _fetch_node_ids_in_reachable_branch(self, node_id: str, branch_identify: Optional[str] = None) -> list[str]:
|
||||
node_ids = []
|
||||
for edge in self.graph.edge_mapping.get(node_id, []):
|
||||
if edge.target_node_id == self.graph.root_node_id:
|
||||
continue
|
||||
|
||||
# Only follow edges that match the branch_identify or have no run_condition
|
||||
if edge.run_condition and edge.run_condition.branch_identify:
|
||||
if not branch_identify or edge.run_condition.branch_identify != branch_identify:
|
||||
continue
|
||||
|
||||
node_ids.append(edge.target_node_id)
|
||||
node_ids.extend(self._fetch_node_ids_in_reachable_branch(edge.target_node_id))
|
||||
node_ids.extend(self._fetch_node_ids_in_reachable_branch(edge.target_node_id, branch_identify))
|
||||
return node_ids
|
||||
|
||||
def _remove_node_ids_in_unreachable_branch(self, node_id: str, reachable_node_ids: list[str]) -> None:
|
||||
|
||||
@ -253,9 +253,9 @@ class Executor:
|
||||
)
|
||||
if executor_response.size > threshold_size:
|
||||
raise ResponseSizeError(
|
||||
f'{"File" if executor_response.is_file else "Text"} size is too large,'
|
||||
f' max size is {threshold_size / 1024 / 1024:.2f} MB,'
|
||||
f' but current size is {executor_response.readable_size}.'
|
||||
f"{'File' if executor_response.is_file else 'Text'} size is too large,"
|
||||
f" max size is {threshold_size / 1024 / 1024:.2f} MB,"
|
||||
f" but current size is {executor_response.readable_size}."
|
||||
)
|
||||
|
||||
return executor_response
|
||||
@ -338,7 +338,7 @@ class Executor:
|
||||
if self.auth.config and self.auth.config.header:
|
||||
authorization_header = self.auth.config.header
|
||||
if k.lower() == authorization_header.lower():
|
||||
raw += f'{k}: {"*" * len(v)}\r\n'
|
||||
raw += f"{k}: {'*' * len(v)}\r\n"
|
||||
continue
|
||||
raw += f"{k}: {v}\r\n"
|
||||
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import json
|
||||
from collections.abc import Sequence
|
||||
from typing import Any, cast
|
||||
|
||||
from core.variables import SegmentType, Variable
|
||||
@ -31,7 +32,7 @@ class VariableAssignerNode(BaseNode[VariableAssignerNodeData]):
|
||||
inputs = self.node_data.model_dump()
|
||||
process_data: dict[str, Any] = {}
|
||||
# NOTE: This node has no outputs
|
||||
updated_variables: list[Variable] = []
|
||||
updated_variable_selectors: list[Sequence[str]] = []
|
||||
|
||||
try:
|
||||
for item in self.node_data.items:
|
||||
@ -98,7 +99,8 @@ class VariableAssignerNode(BaseNode[VariableAssignerNodeData]):
|
||||
value=item.value,
|
||||
)
|
||||
variable = variable.model_copy(update={"value": updated_value})
|
||||
updated_variables.append(variable)
|
||||
self.graph_runtime_state.variable_pool.add(variable.selector, variable)
|
||||
updated_variable_selectors.append(variable.selector)
|
||||
except VariableOperatorNodeError as e:
|
||||
return NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.FAILED,
|
||||
@ -107,9 +109,15 @@ class VariableAssignerNode(BaseNode[VariableAssignerNodeData]):
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
# The `updated_variable_selectors` is a list contains list[str] which not hashable,
|
||||
# remove the duplicated items first.
|
||||
updated_variable_selectors = list(set(map(tuple, updated_variable_selectors)))
|
||||
|
||||
# Update variables
|
||||
for variable in updated_variables:
|
||||
self.graph_runtime_state.variable_pool.add(variable.selector, variable)
|
||||
for selector in updated_variable_selectors:
|
||||
variable = self.graph_runtime_state.variable_pool.get(selector)
|
||||
if not isinstance(variable, Variable):
|
||||
raise VariableNotFoundError(variable_selector=selector)
|
||||
process_data[variable.name] = variable.value
|
||||
|
||||
if variable.selector[0] == CONVERSATION_VARIABLE_NODE_ID:
|
||||
|
||||
Reference in New Issue
Block a user