mirror of
https://github.com/langgenius/dify.git
synced 2026-05-01 07:58:02 +08:00
Merge branch 'main' into fix/chore-fix
This commit is contained in:
@ -361,6 +361,7 @@ class WorkflowBasedAppRunner(AppRunner):
|
||||
node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps,
|
||||
output=event.pre_iteration_output,
|
||||
parallel_mode_run_id=event.parallel_mode_run_id,
|
||||
duration=event.duration,
|
||||
)
|
||||
)
|
||||
elif isinstance(event, (IterationRunSucceededEvent | IterationRunFailedEvent)):
|
||||
|
||||
@ -111,6 +111,7 @@ class QueueIterationNextEvent(AppQueueEvent):
|
||||
"""iteratoin run in parallel mode run id"""
|
||||
node_run_index: int
|
||||
output: Optional[Any] = None # output for the current iteration
|
||||
duration: Optional[float] = None
|
||||
|
||||
@field_validator("output", mode="before")
|
||||
@classmethod
|
||||
@ -307,6 +308,8 @@ class QueueNodeSucceededEvent(AppQueueEvent):
|
||||
execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None
|
||||
|
||||
error: Optional[str] = None
|
||||
"""single iteration duration map"""
|
||||
iteration_duration_map: Optional[dict[str, float]] = None
|
||||
|
||||
|
||||
class QueueNodeInIterationFailedEvent(AppQueueEvent):
|
||||
|
||||
@ -434,6 +434,7 @@ class IterationNodeNextStreamResponse(StreamResponse):
|
||||
parallel_id: Optional[str] = None
|
||||
parallel_start_node_id: Optional[str] = None
|
||||
parallel_mode_run_id: Optional[str] = None
|
||||
duration: Optional[float] = None
|
||||
|
||||
event: StreamEvent = StreamEvent.ITERATION_NEXT
|
||||
workflow_run_id: str
|
||||
|
||||
@ -624,6 +624,7 @@ class WorkflowCycleManage:
|
||||
parallel_id=event.parallel_id,
|
||||
parallel_start_node_id=event.parallel_start_node_id,
|
||||
parallel_mode_run_id=event.parallel_mode_run_id,
|
||||
duration=event.duration,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@ -0,0 +1,95 @@
|
||||
model: Qwen2.5-72B-Instruct
|
||||
label:
|
||||
zh_Hans: Qwen2.5-72B-Instruct
|
||||
en_US: Qwen2.5-72B-Instruct
|
||||
model_type: llm
|
||||
features:
|
||||
- agent-thought
|
||||
- tool-call
|
||||
- stream-tool-call
|
||||
model_properties:
|
||||
mode: chat
|
||||
context_size: 32768
|
||||
parameter_rules:
|
||||
- name: max_tokens
|
||||
use_template: max_tokens
|
||||
label:
|
||||
en_US: "Max Tokens"
|
||||
zh_Hans: "最大Token数"
|
||||
type: int
|
||||
default: 512
|
||||
min: 1
|
||||
required: true
|
||||
help:
|
||||
en_US: "The maximum number of tokens that can be generated by the model varies depending on the model."
|
||||
zh_Hans: "模型可生成的最大 token 个数,不同模型上限不同。"
|
||||
|
||||
- name: temperature
|
||||
use_template: temperature
|
||||
label:
|
||||
en_US: "Temperature"
|
||||
zh_Hans: "采样温度"
|
||||
type: float
|
||||
default: 0.7
|
||||
min: 0.0
|
||||
max: 1.0
|
||||
precision: 1
|
||||
required: true
|
||||
help:
|
||||
en_US: "The randomness of the sampling temperature control output. The temperature value is within the range of [0.0, 1.0]. The higher the value, the more random and creative the output; the lower the value, the more stable it is. It is recommended to adjust either top_p or temperature parameters according to your needs to avoid adjusting both at the same time."
|
||||
zh_Hans: "采样温度控制输出的随机性。温度值在 [0.0, 1.0] 范围内,值越高,输出越随机和创造性;值越低,输出越稳定。建议根据需求调整 top_p 或 temperature 参数,避免同时调整两者。"
|
||||
|
||||
- name: top_p
|
||||
use_template: top_p
|
||||
label:
|
||||
en_US: "Top P"
|
||||
zh_Hans: "Top P"
|
||||
type: float
|
||||
default: 0.7
|
||||
min: 0.0
|
||||
max: 1.0
|
||||
precision: 1
|
||||
required: true
|
||||
help:
|
||||
en_US: "The value range of the sampling method is [0.0, 1.0]. The top_p value determines that the model selects tokens from the top p% of candidate words with the highest probability; when top_p is 0, this parameter is invalid. It is recommended to adjust either top_p or temperature parameters according to your needs to avoid adjusting both at the same time."
|
||||
zh_Hans: "采样方法的取值范围为 [0.0,1.0]。top_p 值确定模型从概率最高的前p%的候选词中选取 tokens;当 top_p 为 0 时,此参数无效。建议根据需求调整 top_p 或 temperature 参数,避免同时调整两者。"
|
||||
|
||||
- name: top_k
|
||||
use_template: top_k
|
||||
label:
|
||||
en_US: "Top K"
|
||||
zh_Hans: "Top K"
|
||||
type: int
|
||||
default: 50
|
||||
min: 0
|
||||
max: 100
|
||||
required: true
|
||||
help:
|
||||
en_US: "The value range is [0,100], which limits the model to only select from the top k words with the highest probability when choosing the next word at each step. The larger the value, the more diverse text generation will be."
|
||||
zh_Hans: "取值范围为 [0,100],限制模型在每一步选择下一个词时,只从概率最高的前 k 个词中选取。数值越大,文本生成越多样。"
|
||||
|
||||
- name: frequency_penalty
|
||||
use_template: frequency_penalty
|
||||
label:
|
||||
en_US: "Frequency Penalty"
|
||||
zh_Hans: "频率惩罚"
|
||||
type: float
|
||||
default: 0
|
||||
min: -1.0
|
||||
max: 1.0
|
||||
precision: 1
|
||||
required: false
|
||||
help:
|
||||
en_US: "Used to adjust the frequency of repeated content in automatically generated text. Positive numbers reduce repetition, while negative numbers increase repetition. After setting this parameter, if a word has already appeared in the text, the model will decrease the probability of choosing that word for subsequent generation."
|
||||
zh_Hans: "用于调整自动生成文本中重复内容的频率。正数减少重复,负数增加重复。设置此参数后,如果一个词在文本中已经出现过,模型在后续生成中选择该词的概率会降低。"
|
||||
|
||||
- name: user
|
||||
use_template: text
|
||||
label:
|
||||
en_US: "User"
|
||||
zh_Hans: "用户"
|
||||
type: string
|
||||
required: false
|
||||
help:
|
||||
en_US: "Used to track and differentiate conversation requests from different users."
|
||||
zh_Hans: "用于追踪和区分不同用户的对话请求。"
|
||||
@ -1,3 +1,4 @@
|
||||
- Qwen2.5-72B-Instruct
|
||||
- Qwen2-7B-Instruct
|
||||
- Qwen2-72B-Instruct
|
||||
- Yi-1.5-34B-Chat
|
||||
|
||||
@ -6,6 +6,7 @@ from core.model_runtime.entities.message_entities import (
|
||||
PromptMessage,
|
||||
PromptMessageTool,
|
||||
)
|
||||
from core.model_runtime.entities.model_entities import ModelFeature
|
||||
from core.model_runtime.model_providers.openai_api_compatible.llm.llm import OAIAPICompatLargeLanguageModel
|
||||
|
||||
|
||||
@ -28,14 +29,13 @@ class GiteeAILargeLanguageModel(OAIAPICompatLargeLanguageModel):
|
||||
user: Optional[str] = None,
|
||||
) -> Union[LLMResult, Generator]:
|
||||
self._add_custom_parameters(credentials, model, model_parameters)
|
||||
return super()._invoke(model, credentials, prompt_messages, model_parameters, tools, stop, stream)
|
||||
return super()._invoke(model, credentials, prompt_messages, model_parameters, tools, stop, stream, user)
|
||||
|
||||
def validate_credentials(self, model: str, credentials: dict) -> None:
|
||||
self._add_custom_parameters(credentials, model, None)
|
||||
super().validate_credentials(model, credentials)
|
||||
|
||||
@staticmethod
|
||||
def _add_custom_parameters(credentials: dict, model: str, model_parameters: dict) -> None:
|
||||
def _add_custom_parameters(self, credentials: dict, model: str, model_parameters: dict) -> None:
|
||||
if model is None:
|
||||
model = "bge-large-zh-v1.5"
|
||||
|
||||
@ -45,3 +45,7 @@ class GiteeAILargeLanguageModel(OAIAPICompatLargeLanguageModel):
|
||||
credentials["mode"] = LLMMode.COMPLETION.value
|
||||
else:
|
||||
credentials["mode"] = LLMMode.CHAT.value
|
||||
|
||||
schema = self.get_model_schema(model, credentials)
|
||||
if ModelFeature.TOOL_CALL in schema.features or ModelFeature.MULTI_TOOL_CALL in schema.features:
|
||||
credentials["function_calling_type"] = "tool_call"
|
||||
|
||||
@ -178,6 +178,7 @@ class ElasticSearchVector(BaseVector):
|
||||
Field.VECTOR.value: { # Make sure the dimension is correct here
|
||||
"type": "dense_vector",
|
||||
"dims": dim,
|
||||
"index": True,
|
||||
"similarity": "cosine",
|
||||
},
|
||||
Field.METADATA_KEY.value: {
|
||||
|
||||
@ -50,9 +50,9 @@ class WordExtractor(BaseExtractor):
|
||||
|
||||
self.web_path = self.file_path
|
||||
# TODO: use a better way to handle the file
|
||||
with tempfile.NamedTemporaryFile(delete=False) as self.temp_file:
|
||||
self.temp_file.write(r.content)
|
||||
self.file_path = self.temp_file.name
|
||||
self.temp_file = tempfile.NamedTemporaryFile() # noqa: SIM115
|
||||
self.temp_file.write(r.content)
|
||||
self.file_path = self.temp_file.name
|
||||
elif not os.path.isfile(self.file_path):
|
||||
raise ValueError(f"File path {self.file_path} is not a valid file or url")
|
||||
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
from typing import Literal, Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
|
||||
from core.model_runtime.utils.encoders import jsonable_encoder
|
||||
from core.tools.__base.tool import ToolParameter
|
||||
@ -37,6 +38,11 @@ class ToolProviderApiEntity(BaseModel):
|
||||
tools: list[ToolApiEntity] = Field(default_factory=list)
|
||||
labels: list[str] = Field(default_factory=list)
|
||||
|
||||
@field_validator("tools", mode="before")
|
||||
@classmethod
|
||||
def convert_none_to_empty_list(cls, v):
|
||||
return v if v is not None else []
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
# -------------
|
||||
# overwrite tool parameter types for temp fix
|
||||
|
||||
@ -698,7 +698,11 @@ class ToolManager:
|
||||
"""
|
||||
get api provider
|
||||
"""
|
||||
provider_obj: ApiToolProvider | None = (
|
||||
"""
|
||||
get tool provider
|
||||
"""
|
||||
provider_name = provider
|
||||
provider_obj: ApiToolProvider = (
|
||||
db.session.query(ApiToolProvider)
|
||||
.filter(
|
||||
ApiToolProvider.tenant_id == tenant_id,
|
||||
@ -708,7 +712,7 @@ class ToolManager:
|
||||
)
|
||||
|
||||
if provider_obj is None:
|
||||
raise ValueError(f"you have not added provider {provider}")
|
||||
raise ValueError(f"you have not added provider {provider_name}")
|
||||
|
||||
try:
|
||||
credentials = json.loads(provider_obj.credentials_str) or {}
|
||||
|
||||
@ -17,6 +17,7 @@ from .segments import (
|
||||
from .types import SegmentType
|
||||
from .variables import (
|
||||
ArrayAnyVariable,
|
||||
ArrayFileVariable,
|
||||
ArrayNumberVariable,
|
||||
ArrayObjectVariable,
|
||||
ArrayStringVariable,
|
||||
@ -58,4 +59,5 @@ __all__ = [
|
||||
"ArrayStringSegment",
|
||||
"FileSegment",
|
||||
"FileVariable",
|
||||
"ArrayFileVariable",
|
||||
]
|
||||
|
||||
@ -1,9 +1,13 @@
|
||||
from collections.abc import Sequence
|
||||
from uuid import uuid4
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
from core.helper import encrypter
|
||||
|
||||
from .segments import (
|
||||
ArrayAnySegment,
|
||||
ArrayFileSegment,
|
||||
ArrayNumberSegment,
|
||||
ArrayObjectSegment,
|
||||
ArrayStringSegment,
|
||||
@ -24,11 +28,12 @@ class Variable(Segment):
|
||||
"""
|
||||
|
||||
id: str = Field(
|
||||
default="",
|
||||
description="Unique identity for variable. It's only used by environment variables now.",
|
||||
default=lambda _: str(uuid4()),
|
||||
description="Unique identity for variable.",
|
||||
)
|
||||
name: str
|
||||
description: str = Field(default="", description="Description of the variable.")
|
||||
selector: Sequence[str] = Field(default_factory=list)
|
||||
|
||||
|
||||
class StringVariable(StringSegment, Variable):
|
||||
@ -78,3 +83,7 @@ class NoneVariable(NoneSegment, Variable):
|
||||
|
||||
class FileVariable(FileSegment, Variable):
|
||||
pass
|
||||
|
||||
|
||||
class ArrayFileVariable(ArrayFileSegment, Variable):
|
||||
pass
|
||||
|
||||
@ -24,6 +24,7 @@ class NodeRunMetadataKey(str, Enum):
|
||||
PARENT_PARALLEL_ID = "parent_parallel_id"
|
||||
PARENT_PARALLEL_START_NODE_ID = "parent_parallel_start_node_id"
|
||||
PARALLEL_MODE_RUN_ID = "parallel_mode_run_id"
|
||||
ITERATION_DURATION_MAP = "iteration_duration_map" # single iteration duration if iteration node runs
|
||||
|
||||
|
||||
class NodeRunResult(BaseModel):
|
||||
|
||||
@ -95,13 +95,16 @@ class VariablePool(BaseModel):
|
||||
if len(selector) < 2:
|
||||
raise ValueError("Invalid selector")
|
||||
|
||||
if isinstance(value, Variable):
|
||||
variable = value
|
||||
if isinstance(value, Segment):
|
||||
v = value
|
||||
variable = variable_factory.segment_to_variable(segment=value, selector=selector)
|
||||
else:
|
||||
v = variable_factory.build_segment(value)
|
||||
segment = variable_factory.build_segment(value)
|
||||
variable = variable_factory.segment_to_variable(segment=segment, selector=selector)
|
||||
|
||||
hash_key = hash(tuple(selector[1:]))
|
||||
self.variable_dictionary[selector[0]][hash_key] = v
|
||||
self.variable_dictionary[selector[0]][hash_key] = variable
|
||||
|
||||
def get(self, selector: Sequence[str], /) -> Segment | None:
|
||||
"""
|
||||
|
||||
@ -148,6 +148,7 @@ class IterationRunStartedEvent(BaseIterationEvent):
|
||||
class IterationRunNextEvent(BaseIterationEvent):
|
||||
index: int = Field(..., description="index")
|
||||
pre_iteration_output: Optional[Any] = Field(None, description="pre iteration output")
|
||||
duration: Optional[float] = Field(None, description="duration")
|
||||
|
||||
|
||||
class IterationRunSucceededEvent(BaseIterationEvent):
|
||||
@ -156,6 +157,7 @@ class IterationRunSucceededEvent(BaseIterationEvent):
|
||||
outputs: Optional[dict[str, Any]] = None
|
||||
metadata: Optional[dict[str, Any]] = None
|
||||
steps: int = 0
|
||||
iteration_duration_map: Optional[dict[str, float]] = None
|
||||
|
||||
|
||||
class IterationRunFailedEvent(BaseIterationEvent):
|
||||
|
||||
@ -143,14 +143,14 @@ def _extract_text_by_file_extension(*, file_content: bytes, file_extension: str)
|
||||
|
||||
def _extract_text_from_plain_text(file_content: bytes) -> str:
|
||||
try:
|
||||
return file_content.decode("utf-8")
|
||||
return file_content.decode("utf-8", "ignore")
|
||||
except UnicodeDecodeError as e:
|
||||
raise TextExtractionError("Failed to decode plain text file") from e
|
||||
|
||||
|
||||
def _extract_text_from_json(file_content: bytes) -> str:
|
||||
try:
|
||||
json_data = json.loads(file_content.decode("utf-8"))
|
||||
json_data = json.loads(file_content.decode("utf-8", "ignore"))
|
||||
return json.dumps(json_data, indent=2, ensure_ascii=False)
|
||||
except (UnicodeDecodeError, json.JSONDecodeError) as e:
|
||||
raise TextExtractionError(f"Failed to decode or parse JSON file: {e}") from e
|
||||
@ -159,7 +159,7 @@ def _extract_text_from_json(file_content: bytes) -> str:
|
||||
def _extract_text_from_yaml(file_content: bytes) -> str:
|
||||
"""Extract the content from yaml file"""
|
||||
try:
|
||||
yaml_data = yaml.safe_load_all(file_content.decode("utf-8"))
|
||||
yaml_data = yaml.safe_load_all(file_content.decode("utf-8", "ignore"))
|
||||
return yaml.dump_all(yaml_data, allow_unicode=True, sort_keys=False)
|
||||
except (UnicodeDecodeError, yaml.YAMLError) as e:
|
||||
raise TextExtractionError(f"Failed to decode or parse YAML file: {e}") from e
|
||||
@ -217,7 +217,7 @@ def _extract_text_from_file(file: File):
|
||||
|
||||
def _extract_text_from_csv(file_content: bytes) -> str:
|
||||
try:
|
||||
csv_file = io.StringIO(file_content.decode("utf-8"))
|
||||
csv_file = io.StringIO(file_content.decode("utf-8", "ignore"))
|
||||
csv_reader = csv.reader(csv_file)
|
||||
rows = list(csv_reader)
|
||||
|
||||
|
||||
@ -156,6 +156,7 @@ class IterationNode(BaseNode[IterationNodeData]):
|
||||
index=0,
|
||||
pre_iteration_output=None,
|
||||
)
|
||||
iter_run_map: dict[str, float] = {}
|
||||
outputs: list[Any] = [None] * len(iterator_list_value)
|
||||
try:
|
||||
if self.node_data.is_parallel:
|
||||
@ -175,6 +176,7 @@ class IterationNode(BaseNode[IterationNodeData]):
|
||||
iteration_graph,
|
||||
index,
|
||||
item,
|
||||
iter_run_map,
|
||||
)
|
||||
future.add_done_callback(thread_pool.task_done_callback)
|
||||
futures.append(future)
|
||||
@ -213,6 +215,7 @@ class IterationNode(BaseNode[IterationNodeData]):
|
||||
start_at,
|
||||
graph_engine,
|
||||
iteration_graph,
|
||||
iter_run_map,
|
||||
)
|
||||
if self.node_data.error_handle_mode == ErrorHandleMode.REMOVE_ABNORMAL_OUTPUT:
|
||||
outputs = [output for output in outputs if output is not None]
|
||||
@ -230,7 +233,9 @@ class IterationNode(BaseNode[IterationNodeData]):
|
||||
|
||||
yield RunCompletedEvent(
|
||||
run_result=NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED, outputs={"output": jsonable_encoder(outputs)}
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
outputs={"output": jsonable_encoder(outputs)},
|
||||
metadata={NodeRunMetadataKey.ITERATION_DURATION_MAP: iter_run_map},
|
||||
)
|
||||
)
|
||||
except IterationNodeError as e:
|
||||
@ -356,15 +361,19 @@ class IterationNode(BaseNode[IterationNodeData]):
|
||||
start_at: datetime,
|
||||
graph_engine: "GraphEngine",
|
||||
iteration_graph: Graph,
|
||||
iter_run_map: dict[str, float],
|
||||
parallel_mode_run_id: Optional[str] = None,
|
||||
) -> Generator[NodeEvent | InNodeEvent, None, None]:
|
||||
"""
|
||||
run single iteration
|
||||
"""
|
||||
iter_start_at = datetime.now(timezone.utc).replace(tzinfo=None)
|
||||
|
||||
try:
|
||||
rst = graph_engine.run()
|
||||
# get current iteration index
|
||||
current_index = variable_pool.get([self.node_id, "index"]).value
|
||||
iteration_run_id = parallel_mode_run_id if parallel_mode_run_id is not None else f"{current_index}"
|
||||
next_index = int(current_index) + 1
|
||||
|
||||
if current_index is None:
|
||||
@ -431,6 +440,8 @@ class IterationNode(BaseNode[IterationNodeData]):
|
||||
variable_pool.add([self.node_id, "index"], next_index)
|
||||
if next_index < len(iterator_list_value):
|
||||
variable_pool.add([self.node_id, "item"], iterator_list_value[next_index])
|
||||
duration = (datetime.now(timezone.utc).replace(tzinfo=None) - iter_start_at).total_seconds()
|
||||
iter_run_map[iteration_run_id] = duration
|
||||
yield IterationRunNextEvent(
|
||||
iteration_id=self.id,
|
||||
iteration_node_id=self.node_id,
|
||||
@ -439,6 +450,7 @@ class IterationNode(BaseNode[IterationNodeData]):
|
||||
index=next_index,
|
||||
parallel_mode_run_id=parallel_mode_run_id,
|
||||
pre_iteration_output=None,
|
||||
duration=duration,
|
||||
)
|
||||
return
|
||||
elif self.node_data.error_handle_mode == ErrorHandleMode.REMOVE_ABNORMAL_OUTPUT:
|
||||
@ -449,6 +461,8 @@ class IterationNode(BaseNode[IterationNodeData]):
|
||||
|
||||
if next_index < len(iterator_list_value):
|
||||
variable_pool.add([self.node_id, "item"], iterator_list_value[next_index])
|
||||
duration = (datetime.now(timezone.utc).replace(tzinfo=None) - iter_start_at).total_seconds()
|
||||
iter_run_map[iteration_run_id] = duration
|
||||
yield IterationRunNextEvent(
|
||||
iteration_id=self.id,
|
||||
iteration_node_id=self.node_id,
|
||||
@ -457,6 +471,7 @@ class IterationNode(BaseNode[IterationNodeData]):
|
||||
index=next_index,
|
||||
parallel_mode_run_id=parallel_mode_run_id,
|
||||
pre_iteration_output=None,
|
||||
duration=duration,
|
||||
)
|
||||
return
|
||||
elif self.node_data.error_handle_mode == ErrorHandleMode.TERMINATED:
|
||||
@ -474,7 +489,10 @@ class IterationNode(BaseNode[IterationNodeData]):
|
||||
)
|
||||
yield metadata_event
|
||||
|
||||
current_iteration_output = variable_pool.get(self.node_data.output_selector).value
|
||||
current_output_segment = variable_pool.get(self.node_data.output_selector)
|
||||
if current_output_segment is None:
|
||||
raise IterationNodeError("iteration output selector not found")
|
||||
current_iteration_output = current_output_segment.value
|
||||
outputs[current_index] = current_iteration_output
|
||||
# remove all nodes outputs from variable pool
|
||||
for node_id in iteration_graph.node_ids:
|
||||
@ -485,6 +503,8 @@ class IterationNode(BaseNode[IterationNodeData]):
|
||||
|
||||
if next_index < len(iterator_list_value):
|
||||
variable_pool.add([self.node_id, "item"], iterator_list_value[next_index])
|
||||
duration = (datetime.now(timezone.utc).replace(tzinfo=None) - iter_start_at).total_seconds()
|
||||
iter_run_map[iteration_run_id] = duration
|
||||
yield IterationRunNextEvent(
|
||||
iteration_id=self.id,
|
||||
iteration_node_id=self.node_id,
|
||||
@ -493,6 +513,7 @@ class IterationNode(BaseNode[IterationNodeData]):
|
||||
index=next_index,
|
||||
parallel_mode_run_id=parallel_mode_run_id,
|
||||
pre_iteration_output=jsonable_encoder(current_iteration_output) if current_iteration_output else None,
|
||||
duration=duration,
|
||||
)
|
||||
|
||||
except IterationNodeError as e:
|
||||
@ -528,6 +549,7 @@ class IterationNode(BaseNode[IterationNodeData]):
|
||||
iteration_graph: Graph,
|
||||
index: int,
|
||||
item: Any,
|
||||
iter_run_map: dict[str, float],
|
||||
) -> Generator[NodeEvent | InNodeEvent, None, None]:
|
||||
"""
|
||||
run single iteration in parallel mode
|
||||
@ -546,6 +568,7 @@ class IterationNode(BaseNode[IterationNodeData]):
|
||||
start_at=start_at,
|
||||
graph_engine=graph_engine_copy,
|
||||
iteration_graph=iteration_graph,
|
||||
iter_run_map=iter_run_map,
|
||||
parallel_mode_run_id=parallel_mode_run_id,
|
||||
):
|
||||
q.put(event)
|
||||
|
||||
@ -59,4 +59,4 @@ class ListOperatorNodeData(BaseNodeData):
|
||||
filter_by: FilterBy
|
||||
order_by: OrderBy
|
||||
limit: Limit
|
||||
extract_by: ExtractConfig
|
||||
extract_by: ExtractConfig = Field(default_factory=ExtractConfig)
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
from collections.abc import Generator, Mapping, Sequence
|
||||
from os import path
|
||||
from typing import Any, cast
|
||||
from collections.abc import Mapping, Sequence
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
Reference in New Issue
Block a user