Merge branch 'main' into fix/chore-fix

This commit is contained in:
Yeuoly
2024-12-04 15:34:39 +08:00
288 changed files with 8067 additions and 1950 deletions

View File

@ -3,7 +3,7 @@ import logging
import threading
import uuid
from collections.abc import Generator
from typing import Any, Literal, Optional, Union, overload
from typing import Any, Literal, Mapping, Optional, Union, overload
from flask import Flask, current_app
from pydantic import ValidationError
@ -23,6 +23,7 @@ from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity,
from core.app.entities.task_entities import ChatbotAppBlockingResponse, ChatbotAppStreamResponse
from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeError
from core.ops.ops_trace_manager import TraceQueueManager
from core.prompt.utils.get_thread_messages_length import get_thread_messages_length
from extensions.ext_database import db
from factories import file_factory
from models.account import Account
@ -33,16 +34,7 @@ logger = logging.getLogger(__name__)
class AdvancedChatAppGenerator(MessageBasedAppGenerator):
@overload
def generate(
self,
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
args: dict,
invoke_from: InvokeFrom,
stream: Literal[True] = True,
) -> Generator[dict | str, None, None]: ...
_dialogue_count: int
@overload
def generate(
@ -50,10 +42,10 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
args: dict,
args: Mapping,
invoke_from: InvokeFrom,
stream: Literal[False] = False,
) -> dict: ...
streaming: Literal[True] = True,
) -> Generator[Mapping | str, None, None]: ...
@overload
def generate(
@ -61,20 +53,31 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
args: dict,
args: Mapping,
invoke_from: InvokeFrom,
stream: bool = True,
) -> Union[dict[str, Any], Generator[dict | str, None, None]]: ...
streaming: Literal[False] = False,
) -> Mapping: ...
@overload
def generate(
self,
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
args: Mapping,
invoke_from: InvokeFrom,
streaming: bool,
) -> Mapping[str, Any] | Generator[str | Mapping, None, None]: ...
def generate(
self,
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
args: dict,
args: Mapping,
invoke_from: InvokeFrom,
stream: bool = True,
) -> dict[str, Any] | Generator[str | dict, None, None]:
streaming: bool = True,
) -> Mapping[str, Any] | Generator[str | Mapping, None, None]:
"""
Generate App response.
@ -145,7 +148,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
files=file_objs,
parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,
user_id=user.id,
stream=stream,
stream=streaming,
invoke_from=invoke_from,
extras=extras,
trace_manager=trace_manager,
@ -161,12 +164,18 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
invoke_from=invoke_from,
application_generate_entity=application_generate_entity,
conversation=conversation,
stream=stream,
stream=streaming,
)
def single_iteration_generate(
self, app_model: App, workflow: Workflow, node_id: str, user: Account | EndUser, args: dict, stream: bool = True
) -> dict[str, Any] | Generator[str | dict[str, Any], Any, None]:
self,
app_model: App,
workflow: Workflow,
node_id: str,
user: Account | EndUser,
args: Mapping,
streaming: bool = True,
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], Any, None]:
"""
Generate App response.
@ -195,7 +204,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
query="",
files=[],
user_id=user.id,
stream=stream,
stream=streaming,
invoke_from=InvokeFrom.DEBUGGER,
extras={"auto_generate_conversation_name": False},
single_iteration_run=AdvancedChatAppGenerateEntity.SingleIterationRunEntity(
@ -212,7 +221,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
invoke_from=InvokeFrom.DEBUGGER,
application_generate_entity=application_generate_entity,
conversation=None,
stream=stream,
stream=streaming,
)
def _generate(
@ -224,7 +233,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
application_generate_entity: AdvancedChatAppGenerateEntity,
conversation: Optional[Conversation] = None,
stream: bool = True,
) -> dict[str, Any] | Generator[str | dict[str, Any], Any, None]:
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], Any, None]:
"""
Generate App response.
@ -248,6 +257,9 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
db.session.commit()
db.session.refresh(conversation)
# get conversation dialogue count
self._dialogue_count = get_thread_messages_length(conversation.id)
# init queue manager
queue_manager = MessageBasedAppQueueManager(
task_id=application_generate_entity.task_id,
@ -318,6 +330,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
queue_manager=queue_manager,
conversation=conversation,
message=message,
dialogue_count=self._dialogue_count,
)
runner.run()
@ -371,6 +384,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
message=message,
user=user,
stream=stream,
dialogue_count=self._dialogue_count,
)
try:

View File

@ -39,12 +39,14 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
queue_manager: AppQueueManager,
conversation: Conversation,
message: Message,
dialogue_count: int,
) -> None:
super().__init__(queue_manager)
self.application_generate_entity = application_generate_entity
self.conversation = conversation
self.message = message
self._dialogue_count = dialogue_count
def run(self) -> None:
app_config = self.application_generate_entity.app_config
@ -122,19 +124,13 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
session.commit()
# Increment dialogue count.
self.conversation.dialogue_count += 1
conversation_dialogue_count = self.conversation.dialogue_count
db.session.commit()
# Create a variable pool.
system_inputs = {
SystemVariableKey.QUERY: query,
SystemVariableKey.FILES: files,
SystemVariableKey.CONVERSATION_ID: self.conversation.id,
SystemVariableKey.USER_ID: user_id,
SystemVariableKey.DIALOGUE_COUNT: conversation_dialogue_count,
SystemVariableKey.DIALOGUE_COUNT: self._dialogue_count,
SystemVariableKey.APP_ID: app_config.app_id,
SystemVariableKey.WORKFLOW_ID: app_config.workflow_id,
SystemVariableKey.WORKFLOW_RUN_ID: self.application_generate_entity.workflow_run_id,

View File

@ -88,6 +88,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
message: Message,
user: Union[Account, EndUser],
stream: bool,
dialogue_count: int,
) -> None:
"""
Initialize AdvancedChatAppGenerateTaskPipeline.
@ -98,6 +99,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
:param message: message
:param user: user
:param stream: stream
:param dialogue_count: dialogue count
"""
super().__init__(application_generate_entity, queue_manager, user, stream)
@ -114,7 +116,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
SystemVariableKey.FILES: application_generate_entity.files,
SystemVariableKey.CONVERSATION_ID: conversation.id,
SystemVariableKey.USER_ID: user_id,
SystemVariableKey.DIALOGUE_COUNT: conversation.dialogue_count,
SystemVariableKey.DIALOGUE_COUNT: dialogue_count,
SystemVariableKey.APP_ID: application_generate_entity.app_config.app_id,
SystemVariableKey.WORKFLOW_ID: workflow.id,
SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_run_id,
@ -125,6 +127,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
self._conversation_name_generate_thread = None
self._recorded_files: list[Mapping[str, Any]] = []
self.total_tokens: int = 0
def process(self):
"""
@ -358,6 +361,8 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
if not workflow_run:
raise Exception("Workflow run not initialized.")
# FIXME for issue #11221 quick fix maybe have a better solution
self.total_tokens += event.metadata.get("total_tokens", 0) if event.metadata else 0
yield self._workflow_iteration_completed_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
@ -371,7 +376,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
workflow_run = self._handle_workflow_run_success(
workflow_run=workflow_run,
start_at=graph_runtime_state.start_at,
total_tokens=graph_runtime_state.total_tokens,
total_tokens=graph_runtime_state.total_tokens or self.total_tokens,
total_steps=graph_runtime_state.node_run_steps,
outputs=event.outputs,
conversation_id=self._conversation.id,

View File

@ -1,5 +1,6 @@
import uuid
from typing import Optional
from collections.abc import Mapping
from typing import Any, Optional
from core.agent.entities import AgentEntity
from core.app.app_config.base_app_config_manager import BaseAppConfigManager
@ -85,7 +86,7 @@ class AgentChatAppConfigManager(BaseAppConfigManager):
return app_config
@classmethod
def config_validate(cls, tenant_id: str, config: dict) -> dict:
def config_validate(cls, tenant_id: str, config: Mapping[str, Any]) -> dict:
"""
Validate for agent chat app model config

View File

@ -2,7 +2,7 @@ import contextvars
import logging
import threading
import uuid
from collections.abc import Generator
from collections.abc import Generator, Mapping
from typing import Any, Literal, Union, overload
from flask import Flask, current_app
@ -32,36 +32,37 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
@overload
def generate(
self,
*,
app_model: App,
user: Union[Account, EndUser],
args: dict,
args: Mapping[str, Any],
invoke_from: InvokeFrom,
stream: Literal[True] = True,
) -> Generator[dict | str, None, None]: ...
streaming: Literal[True] = True,
) -> Generator[Mapping | str, None, None]: ...
@overload
def generate(
self,
app_model: App,
user: Union[Account, EndUser],
args: dict,
args: Mapping,
invoke_from: InvokeFrom,
stream: Literal[False] = False,
) -> dict: ...
streaming: Literal[False] = False,
) -> Mapping: ...
@overload
def generate(
self,
app_model: App,
user: Union[Account, EndUser],
args: dict,
args: Mapping,
invoke_from: InvokeFrom,
stream: bool = False,
) -> dict | Generator[dict | str, None, None]: ...
streaming: bool,
) -> Union[Mapping, Generator[Mapping | str, None, None]]: ...
def generate(
self, app_model: App, user: Union[Account, EndUser], args: Any, invoke_from: InvokeFrom, stream: bool = True
) -> Union[dict, Generator[dict | str, None, None]]:
self, app_model: App, user: Union[Account, EndUser], args: Any, invoke_from: InvokeFrom, streaming: bool = True
) -> Union[Mapping, Generator[Mapping | str, None, None]]:
"""
Generate App response.
@ -71,7 +72,7 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
:param invoke_from: invoke from source
:param stream: is stream
"""
if not stream:
if not streaming:
raise ValueError("Agent Chat App does not support blocking mode")
if not args.get("query"):
@ -102,7 +103,8 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
# validate config
override_model_config_dict = AgentChatAppConfigManager.config_validate(
tenant_id=app_model.tenant_id, config=args.get("model_config")
tenant_id=app_model.tenant_id,
config=args["model_config"],
)
# always enable retriever resource in debugger mode
@ -147,7 +149,7 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
files=file_objs,
parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,
user_id=user.id,
stream=stream,
stream=streaming,
invoke_from=invoke_from,
extras=extras,
call_depth=0,
@ -189,7 +191,7 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
conversation=conversation,
message=message,
user=user,
stream=stream,
stream=streaming,
)
return AgentChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)

View File

@ -141,7 +141,7 @@ class BaseAppGenerator:
return value
@classmethod
def convert_to_event_stream(cls, generator: Union[dict, Generator[dict | str, None, None]]):
def convert_to_event_stream(cls, generator: Union[Mapping, Generator[Mapping | str, None, None]]):
"""
Convert messages into event stream
"""
@ -151,7 +151,7 @@ class BaseAppGenerator:
def gen():
for message in generator:
if isinstance(message, dict):
if isinstance(message, (Mapping, dict)):
yield f"data: {json.dumps(message)}\n\n"
else:
yield f"event: {message}\n\n"

View File

@ -36,7 +36,7 @@ class ChatAppGenerator(MessageBasedAppGenerator):
user: Union[Account, EndUser],
args: Any,
invoke_from: InvokeFrom,
stream: Literal[True] = True,
streaming: Literal[True] = True,
) -> Generator[dict | str, None, None]: ...
@overload
@ -46,7 +46,7 @@ class ChatAppGenerator(MessageBasedAppGenerator):
user: Union[Account, EndUser],
args: Any,
invoke_from: InvokeFrom,
stream: Literal[False] = False,
streaming: Literal[False] = False,
) -> dict: ...
@overload
@ -56,7 +56,7 @@ class ChatAppGenerator(MessageBasedAppGenerator):
user: Union[Account, EndUser],
args: Any,
invoke_from: InvokeFrom,
stream: bool = False,
streaming: bool = False,
) -> Union[dict, Generator[dict | str, None, None]]: ...
def generate(
@ -65,7 +65,7 @@ class ChatAppGenerator(MessageBasedAppGenerator):
user: Union[Account, EndUser],
args: Any,
invoke_from: InvokeFrom,
stream: bool = True,
streaming: bool = True,
) -> Union[dict, Generator[dict | str, None, None]]:
"""
Generate App response.
@ -152,7 +152,7 @@ class ChatAppGenerator(MessageBasedAppGenerator):
invoke_from=invoke_from,
extras=extras,
trace_manager=trace_manager,
stream=stream,
stream=streaming,
)
# init generate records
@ -172,7 +172,7 @@ class ChatAppGenerator(MessageBasedAppGenerator):
worker_thread = threading.Thread(
target=self._generate_worker,
kwargs={
"flask_app": current_app._get_current_object(),
"flask_app": current_app._get_current_object(), # type: ignore
"application_generate_entity": application_generate_entity,
"queue_manager": queue_manager,
"conversation_id": conversation.id,
@ -189,7 +189,7 @@ class ChatAppGenerator(MessageBasedAppGenerator):
conversation=conversation,
message=message,
user=user,
stream=stream,
stream=streaming,
)
return ChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)

View File

@ -2,7 +2,7 @@ import logging
import threading
import uuid
from collections.abc import Generator
from typing import Any, Literal, Union, overload
from typing import Any, Literal, Mapping, Union, overload
from flask import Flask, current_app
from pydantic import ValidationError
@ -34,9 +34,9 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
self,
app_model: App,
user: Union[Account, EndUser],
args: dict,
args: Mapping,
invoke_from: InvokeFrom,
stream: Literal[True] = True,
streaming: Literal[True] = True,
) -> Generator[str, None, None]: ...
@overload
@ -44,24 +44,24 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
self,
app_model: App,
user: Union[Account, EndUser],
args: dict,
args: Mapping,
invoke_from: InvokeFrom,
stream: Literal[False] = False,
) -> dict: ...
streaming: Literal[False] = False,
) -> Mapping: ...
@overload
def generate(
self,
app_model: App,
user: Union[Account, EndUser],
args: dict,
args: Mapping,
invoke_from: InvokeFrom,
stream: bool = False,
) -> dict | Generator[str, None, None]: ...
streaming: bool = False,
) -> Mapping | Generator[str, None, None]: ...
def generate(
self, app_model: App, user: Union[Account, EndUser], args: Any, invoke_from: InvokeFrom, stream: bool = True
) -> Union[dict, Generator[str, None, None]]:
self, app_model: App, user: Union[Account, EndUser], args: Any, invoke_from: InvokeFrom, streaming: bool = True
) -> Union[Mapping, Generator[str, None, None]]:
"""
Generate App response.
@ -129,7 +129,7 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
query=query,
files=file_objs,
user_id=user.id,
stream=stream,
stream=streaming,
invoke_from=invoke_from,
extras=extras,
trace_manager=trace_manager,
@ -168,7 +168,7 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
conversation=conversation,
message=message,
user=user,
stream=stream,
stream=streaming,
)
return CompletionAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
@ -226,7 +226,7 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
user: Union[Account, EndUser],
invoke_from: InvokeFrom,
stream: bool = True,
) -> Union[dict, Generator[str, None, None]]:
) -> Union[Mapping, Generator[str, None, None]]:
"""
Generate App response.

View File

@ -33,15 +33,16 @@ class WorkflowAppGenerator(BaseAppGenerator):
@overload
def generate(
self,
*,
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
args: dict,
args: Mapping,
invoke_from: InvokeFrom,
stream: Literal[True] = True,
streaming: Literal[True] = True,
call_depth: int = 0,
workflow_thread_pool_id: Optional[str] = None,
) -> Generator[dict | str, None, None]: ...
) -> Generator[Mapping | str, None, None]: ...
@overload
def generate(
@ -49,12 +50,12 @@ class WorkflowAppGenerator(BaseAppGenerator):
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
args: dict,
args: Mapping,
invoke_from: InvokeFrom,
stream: Literal[False] = False,
streaming: Literal[False] = False,
call_depth: int = 0,
workflow_thread_pool_id: Optional[str] = None,
) -> dict: ...
) -> Mapping: ...
@overload
def generate(
@ -62,11 +63,12 @@ class WorkflowAppGenerator(BaseAppGenerator):
app_model: App,
workflow: Workflow,
user: Union[Account, EndUser],
args: dict,
args: Mapping,
invoke_from: InvokeFrom,
stream: bool = False,
streaming: bool,
call_depth: int = 0,
) -> dict | Generator[dict | str, None, None]: ...
workflow_thread_pool_id: Optional[str] = None,
) -> Mapping | Generator[Mapping | str, None, None]: ...
def generate(
self,
@ -75,10 +77,10 @@ class WorkflowAppGenerator(BaseAppGenerator):
user: Union[Account, EndUser],
args: Mapping[str, Any],
invoke_from: InvokeFrom,
stream: bool = True,
streaming: bool = True,
call_depth: int = 0,
workflow_thread_pool_id: Optional[str] = None,
):
) -> Mapping | Generator[Mapping | str, None, None]:
files: Sequence[Mapping[str, Any]] = args.get("files") or []
# parse files
@ -113,7 +115,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
),
files=system_files,
user_id=user.id,
stream=stream,
stream=streaming,
invoke_from=invoke_from,
call_depth=call_depth,
trace_manager=trace_manager,
@ -130,7 +132,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
user=user,
application_generate_entity=application_generate_entity,
invoke_from=invoke_from,
stream=stream,
streaming=streaming,
workflow_thread_pool_id=workflow_thread_pool_id,
)
@ -142,7 +144,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
user: Union[Account, EndUser],
application_generate_entity: WorkflowAppGenerateEntity,
invoke_from: InvokeFrom,
stream: bool = True,
streaming: bool = True,
workflow_thread_pool_id: Optional[str] = None,
) -> Union[dict, Generator[str | dict, None, None]]:
"""
@ -184,13 +186,19 @@ class WorkflowAppGenerator(BaseAppGenerator):
workflow=workflow,
queue_manager=queue_manager,
user=user,
stream=stream,
stream=streaming,
)
return WorkflowAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
def single_iteration_generate(
self, app_model: App, workflow: Workflow, node_id: str, user: Account | EndUser, args: dict, stream: bool = True
self,
app_model: App,
workflow: Workflow,
node_id: str,
user: Account | EndUser,
args: dict,
streaming: bool = True,
) -> dict[str, Any] | Generator[str | dict, Any, None]:
"""
Generate App response.
@ -218,7 +226,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
inputs={},
files=[],
user_id=user.id,
stream=stream,
stream=streaming,
invoke_from=InvokeFrom.DEBUGGER,
extras={"auto_generate_conversation_name": False},
single_iteration_run=WorkflowAppGenerateEntity.SingleIterationRunEntity(
@ -235,7 +243,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
user=user,
invoke_from=InvokeFrom.DEBUGGER,
application_generate_entity=application_generate_entity,
stream=stream,
streaming=streaming,
)
def _generate_worker(

View File

@ -106,6 +106,7 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
self._task_state = WorkflowTaskState()
self._wip_workflow_node_executions = {}
self.total_tokens: int = 0
def process(self) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]:
"""
@ -319,6 +320,8 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
if not workflow_run:
raise Exception("Workflow run not initialized.")
# FIXME for issue #11221 quick fix maybe have a better solution
self.total_tokens += event.metadata.get("total_tokens", 0) if event.metadata else 0
yield self._workflow_iteration_completed_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
)
@ -332,7 +335,7 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
workflow_run = self._handle_workflow_run_success(
workflow_run=workflow_run,
start_at=graph_runtime_state.start_at,
total_tokens=graph_runtime_state.total_tokens,
total_tokens=graph_runtime_state.total_tokens or self.total_tokens,
total_steps=graph_runtime_state.node_run_steps,
outputs=event.outputs,
conversation_id=None,

View File

@ -43,7 +43,7 @@ from core.workflow.graph_engine.entities.event import (
)
from core.workflow.graph_engine.entities.graph import Graph
from core.workflow.nodes import NodeType
from core.workflow.nodes.node_mapping import node_type_classes_mapping
from core.workflow.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING
from core.workflow.workflow_entry import WorkflowEntry
from extensions.ext_database import db
from models.model import App
@ -138,7 +138,8 @@ class WorkflowBasedAppRunner(AppRunner):
# Get node class
node_type = NodeType(iteration_node_config.get("data", {}).get("type"))
node_cls = node_type_classes_mapping[node_type]
node_version = iteration_node_config.get("data", {}).get("version", "1")
node_cls = NODE_TYPE_CLASSES_MAPPING[node_type][node_version]
# init variable pool
variable_pool = VariablePool(