|
|
|
|
@ -5,7 +5,7 @@ from typing import Optional, Union
|
|
|
|
|
|
|
|
|
|
from core.model_runtime.utils.encoders import jsonable_encoder
|
|
|
|
|
from core.workflow.callbacks.base_workflow_callback import BaseWorkflowCallback
|
|
|
|
|
from core.workflow.entities.node_entities import NodeRunResult, NodeType
|
|
|
|
|
from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult, NodeType
|
|
|
|
|
from core.workflow.entities.variable_pool import VariablePool, VariableValue
|
|
|
|
|
from core.workflow.entities.workflow_entities import WorkflowRunState
|
|
|
|
|
from core.workflow.nodes.base_node import BaseNode
|
|
|
|
|
@ -122,10 +122,10 @@ class WorkflowEngineManager:
|
|
|
|
|
if 'nodes' not in graph or 'edges' not in graph:
|
|
|
|
|
raise ValueError('nodes or edges not found in workflow graph')
|
|
|
|
|
|
|
|
|
|
if isinstance(graph.get('nodes'), list):
|
|
|
|
|
if not isinstance(graph.get('nodes'), list):
|
|
|
|
|
raise ValueError('nodes in workflow graph must be a list')
|
|
|
|
|
|
|
|
|
|
if isinstance(graph.get('edges'), list):
|
|
|
|
|
if not isinstance(graph.get('edges'), list):
|
|
|
|
|
raise ValueError('edges in workflow graph must be a list')
|
|
|
|
|
|
|
|
|
|
# init workflow run
|
|
|
|
|
@ -150,6 +150,7 @@ class WorkflowEngineManager:
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
predecessor_node = None
|
|
|
|
|
has_entry_node = False
|
|
|
|
|
while True:
|
|
|
|
|
# get next node, multiple target nodes in the future
|
|
|
|
|
next_node = self._get_next_node(
|
|
|
|
|
@ -161,6 +162,8 @@ class WorkflowEngineManager:
|
|
|
|
|
if not next_node:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
has_entry_node = True
|
|
|
|
|
|
|
|
|
|
# max steps 30 reached
|
|
|
|
|
if len(workflow_run_state.workflow_node_executions) > 30:
|
|
|
|
|
raise ValueError('Max steps 30 reached.')
|
|
|
|
|
@ -182,7 +185,7 @@ class WorkflowEngineManager:
|
|
|
|
|
|
|
|
|
|
predecessor_node = next_node
|
|
|
|
|
|
|
|
|
|
if not predecessor_node and not next_node:
|
|
|
|
|
if not has_entry_node:
|
|
|
|
|
self._workflow_run_failed(
|
|
|
|
|
workflow_run_state=workflow_run_state,
|
|
|
|
|
error='Start node not found in workflow graph.',
|
|
|
|
|
@ -219,38 +222,31 @@ class WorkflowEngineManager:
|
|
|
|
|
:param callbacks: workflow callbacks
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
db.session.begin()
|
|
|
|
|
max_sequence = db.session.query(db.func.max(WorkflowRun.sequence_number)) \
|
|
|
|
|
.filter(WorkflowRun.tenant_id == workflow.tenant_id) \
|
|
|
|
|
.filter(WorkflowRun.app_id == workflow.app_id) \
|
|
|
|
|
.scalar() or 0
|
|
|
|
|
new_sequence_number = max_sequence + 1
|
|
|
|
|
|
|
|
|
|
max_sequence = db.session.query(db.func.max(WorkflowRun.sequence_number)) \
|
|
|
|
|
.filter(WorkflowRun.tenant_id == workflow.tenant_id) \
|
|
|
|
|
.filter(WorkflowRun.app_id == workflow.app_id) \
|
|
|
|
|
.for_update() \
|
|
|
|
|
.scalar() or 0
|
|
|
|
|
new_sequence_number = max_sequence + 1
|
|
|
|
|
# init workflow run
|
|
|
|
|
workflow_run = WorkflowRun(
|
|
|
|
|
tenant_id=workflow.tenant_id,
|
|
|
|
|
app_id=workflow.app_id,
|
|
|
|
|
sequence_number=new_sequence_number,
|
|
|
|
|
workflow_id=workflow.id,
|
|
|
|
|
type=workflow.type,
|
|
|
|
|
triggered_from=triggered_from.value,
|
|
|
|
|
version=workflow.version,
|
|
|
|
|
graph=workflow.graph,
|
|
|
|
|
inputs=json.dumps({**user_inputs, **jsonable_encoder(system_inputs)}),
|
|
|
|
|
status=WorkflowRunStatus.RUNNING.value,
|
|
|
|
|
created_by_role=(CreatedByRole.ACCOUNT.value
|
|
|
|
|
if isinstance(user, Account) else CreatedByRole.END_USER.value),
|
|
|
|
|
created_by=user.id
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# init workflow run
|
|
|
|
|
workflow_run = WorkflowRun(
|
|
|
|
|
tenant_id=workflow.tenant_id,
|
|
|
|
|
app_id=workflow.app_id,
|
|
|
|
|
sequence_number=new_sequence_number,
|
|
|
|
|
workflow_id=workflow.id,
|
|
|
|
|
type=workflow.type,
|
|
|
|
|
triggered_from=triggered_from.value,
|
|
|
|
|
version=workflow.version,
|
|
|
|
|
graph=workflow.graph,
|
|
|
|
|
inputs=json.dumps({**user_inputs, **system_inputs}),
|
|
|
|
|
status=WorkflowRunStatus.RUNNING.value,
|
|
|
|
|
created_by_role=(CreatedByRole.ACCOUNT.value
|
|
|
|
|
if isinstance(user, Account) else CreatedByRole.END_USER.value),
|
|
|
|
|
created_by=user.id
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
db.session.add(workflow_run)
|
|
|
|
|
db.session.commit()
|
|
|
|
|
except:
|
|
|
|
|
db.session.rollback()
|
|
|
|
|
raise
|
|
|
|
|
db.session.add(workflow_run)
|
|
|
|
|
db.session.commit()
|
|
|
|
|
|
|
|
|
|
if callbacks:
|
|
|
|
|
for callback in callbacks:
|
|
|
|
|
@ -330,7 +326,7 @@ class WorkflowEngineManager:
|
|
|
|
|
|
|
|
|
|
if not predecessor_node:
|
|
|
|
|
for node_config in nodes:
|
|
|
|
|
if node_config.get('type') == NodeType.START.value:
|
|
|
|
|
if node_config.get('data', {}).get('type', '') == NodeType.START.value:
|
|
|
|
|
return StartNode(config=node_config)
|
|
|
|
|
else:
|
|
|
|
|
edges = graph.get('edges')
|
|
|
|
|
@ -368,7 +364,7 @@ class WorkflowEngineManager:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# get next node
|
|
|
|
|
target_node = node_classes.get(NodeType.value_of(target_node_config.get('type')))
|
|
|
|
|
target_node = node_classes.get(NodeType.value_of(target_node_config.get('data', {}).get('type')))
|
|
|
|
|
|
|
|
|
|
return target_node(
|
|
|
|
|
config=target_node_config,
|
|
|
|
|
@ -424,17 +420,18 @@ class WorkflowEngineManager:
|
|
|
|
|
callbacks=callbacks
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
for variable_key, variable_value in node_run_result.outputs.items():
|
|
|
|
|
# append variables to variable pool recursively
|
|
|
|
|
self._append_variables_recursively(
|
|
|
|
|
variable_pool=workflow_run_state.variable_pool,
|
|
|
|
|
node_id=node.node_id,
|
|
|
|
|
variable_key_list=[variable_key],
|
|
|
|
|
variable_value=variable_value
|
|
|
|
|
)
|
|
|
|
|
if node_run_result.outputs:
|
|
|
|
|
for variable_key, variable_value in node_run_result.outputs.items():
|
|
|
|
|
# append variables to variable pool recursively
|
|
|
|
|
self._append_variables_recursively(
|
|
|
|
|
variable_pool=workflow_run_state.variable_pool,
|
|
|
|
|
node_id=node.node_id,
|
|
|
|
|
variable_key_list=[variable_key],
|
|
|
|
|
variable_value=variable_value
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if node_run_result.metadata.get('total_tokens'):
|
|
|
|
|
workflow_run_state.total_tokens += int(node_run_result.metadata.get('total_tokens'))
|
|
|
|
|
if node_run_result.metadata and node_run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS):
|
|
|
|
|
workflow_run_state.total_tokens += int(node_run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS))
|
|
|
|
|
|
|
|
|
|
return workflow_node_execution
|
|
|
|
|
|
|
|
|
|
@ -464,7 +461,6 @@ class WorkflowEngineManager:
|
|
|
|
|
node_id=node.node_id,
|
|
|
|
|
node_type=node.node_type.value,
|
|
|
|
|
title=node.node_data.title,
|
|
|
|
|
type=node.node_type.value,
|
|
|
|
|
status=WorkflowNodeExecutionStatus.RUNNING.value,
|
|
|
|
|
created_by_role=workflow_run.created_by_role,
|
|
|
|
|
created_by=workflow_run.created_by
|
|
|
|
|
@ -493,10 +489,11 @@ class WorkflowEngineManager:
|
|
|
|
|
"""
|
|
|
|
|
workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
|
|
|
|
|
workflow_node_execution.elapsed_time = time.perf_counter() - start_at
|
|
|
|
|
workflow_node_execution.inputs = json.dumps(result.inputs)
|
|
|
|
|
workflow_node_execution.process_data = json.dumps(result.process_data)
|
|
|
|
|
workflow_node_execution.outputs = json.dumps(result.outputs)
|
|
|
|
|
workflow_node_execution.execution_metadata = json.dumps(jsonable_encoder(result.metadata))
|
|
|
|
|
workflow_node_execution.inputs = json.dumps(result.inputs) if result.inputs else None
|
|
|
|
|
workflow_node_execution.process_data = json.dumps(result.process_data) if result.process_data else None
|
|
|
|
|
workflow_node_execution.outputs = json.dumps(result.outputs) if result.outputs else None
|
|
|
|
|
workflow_node_execution.execution_metadata = json.dumps(jsonable_encoder(result.metadata)) \
|
|
|
|
|
if result.metadata else None
|
|
|
|
|
workflow_node_execution.finished_at = datetime.utcnow()
|
|
|
|
|
|
|
|
|
|
db.session.commit()
|
|
|
|
|
|