mirror of
https://github.com/langgenius/dify.git
synced 2026-05-05 01:48:04 +08:00
refactor workflow runner
This commit is contained in:
@ -1,34 +1,63 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Optional
|
||||
|
||||
from models.workflow import WorkflowNodeExecution, WorkflowRun
|
||||
from core.workflow.entities.base_node_data_entities import BaseNodeData
|
||||
from core.workflow.entities.node_entities import NodeType
|
||||
|
||||
|
||||
class BaseWorkflowCallback(ABC):
|
||||
@abstractmethod
|
||||
def on_workflow_run_started(self, workflow_run: WorkflowRun) -> None:
|
||||
def on_workflow_run_started(self) -> None:
|
||||
"""
|
||||
Workflow run started
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def on_workflow_run_finished(self, workflow_run: WorkflowRun) -> None:
|
||||
def on_workflow_run_succeeded(self) -> None:
|
||||
"""
|
||||
Workflow run finished
|
||||
Workflow run succeeded
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def on_workflow_node_execute_started(self, workflow_node_execution: WorkflowNodeExecution) -> None:
|
||||
def on_workflow_run_failed(self, error: str) -> None:
|
||||
"""
|
||||
Workflow run failed
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def on_workflow_node_execute_started(self, node_id: str,
|
||||
node_type: NodeType,
|
||||
node_data: BaseNodeData,
|
||||
node_run_index: int = 1,
|
||||
predecessor_node_id: Optional[str] = None) -> None:
|
||||
"""
|
||||
Workflow node execute started
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def on_workflow_node_execute_finished(self, workflow_node_execution: WorkflowNodeExecution) -> None:
|
||||
def on_workflow_node_execute_succeeded(self, node_id: str,
|
||||
node_type: NodeType,
|
||||
node_data: BaseNodeData,
|
||||
inputs: Optional[dict] = None,
|
||||
process_data: Optional[dict] = None,
|
||||
outputs: Optional[dict] = None,
|
||||
execution_metadata: Optional[dict] = None) -> None:
|
||||
"""
|
||||
Workflow node execute finished
|
||||
Workflow node execute succeeded
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def on_workflow_node_execute_failed(self, node_id: str,
|
||||
node_type: NodeType,
|
||||
node_data: BaseNodeData,
|
||||
error: str) -> None:
|
||||
"""
|
||||
Workflow node execute failed
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@ -38,4 +67,3 @@ class BaseWorkflowCallback(ABC):
|
||||
Publish text chunk
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@ -1,22 +1,32 @@
|
||||
from typing import Optional
|
||||
|
||||
from core.workflow.entities.node_entities import NodeRunResult
|
||||
from core.workflow.entities.variable_pool import VariablePool
|
||||
from models.workflow import WorkflowNodeExecution, WorkflowRun
|
||||
from core.workflow.nodes.base_node import BaseNode
|
||||
from models.workflow import Workflow
|
||||
|
||||
|
||||
class WorkflowNodeAndResult:
|
||||
node: BaseNode
|
||||
result: Optional[NodeRunResult] = None
|
||||
|
||||
def __init__(self, node: BaseNode, result: Optional[NodeRunResult] = None):
|
||||
self.node = node
|
||||
self.result = result
|
||||
|
||||
|
||||
class WorkflowRunState:
|
||||
workflow_run: WorkflowRun
|
||||
workflow: Workflow
|
||||
start_at: float
|
||||
user_inputs: dict
|
||||
variable_pool: VariablePool
|
||||
|
||||
total_tokens: int = 0
|
||||
|
||||
workflow_node_executions: list[WorkflowNodeExecution] = []
|
||||
workflow_nodes_and_results: list[WorkflowNodeAndResult] = []
|
||||
|
||||
def __init__(self, workflow_run: WorkflowRun,
|
||||
start_at: float,
|
||||
user_inputs: dict,
|
||||
variable_pool: VariablePool) -> None:
|
||||
self.workflow_run = workflow_run
|
||||
def __init__(self, workflow: Workflow, start_at: float, user_inputs: dict, variable_pool: VariablePool):
|
||||
self.workflow = workflow
|
||||
self.start_at = start_at
|
||||
self.user_inputs = user_inputs
|
||||
self.variable_pool = variable_pool
|
||||
|
||||
@ -43,7 +43,7 @@ class DirectAnswerNode(BaseNode):
|
||||
# publish answer as stream
|
||||
for word in answer:
|
||||
self.publish_text_chunk(word)
|
||||
time.sleep(0.01)
|
||||
time.sleep(0.01) # todo sleep 0.01
|
||||
|
||||
return NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
|
||||
@ -1,13 +1,11 @@
|
||||
import json
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Optional, Union
|
||||
from typing import Optional
|
||||
|
||||
from core.model_runtime.utils.encoders import jsonable_encoder
|
||||
from core.app.apps.base_app_queue_manager import GenerateTaskStoppedException
|
||||
from core.workflow.callbacks.base_workflow_callback import BaseWorkflowCallback
|
||||
from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult, NodeType
|
||||
from core.workflow.entities.variable_pool import VariablePool, VariableValue
|
||||
from core.workflow.entities.workflow_entities import WorkflowRunState
|
||||
from core.workflow.entities.workflow_entities import WorkflowNodeAndResult, WorkflowRunState
|
||||
from core.workflow.nodes.base_node import BaseNode
|
||||
from core.workflow.nodes.code.code_node import CodeNode
|
||||
from core.workflow.nodes.direct_answer.direct_answer_node import DirectAnswerNode
|
||||
@ -21,18 +19,9 @@ from core.workflow.nodes.start.start_node import StartNode
|
||||
from core.workflow.nodes.template_transform.template_transform_node import TemplateTransformNode
|
||||
from core.workflow.nodes.tool.tool_node import ToolNode
|
||||
from core.workflow.nodes.variable_assigner.variable_assigner_node import VariableAssignerNode
|
||||
from extensions.ext_database import db
|
||||
from models.account import Account
|
||||
from models.model import App, EndUser
|
||||
from models.workflow import (
|
||||
CreatedByRole,
|
||||
Workflow,
|
||||
WorkflowNodeExecution,
|
||||
WorkflowNodeExecutionStatus,
|
||||
WorkflowNodeExecutionTriggeredFrom,
|
||||
WorkflowRun,
|
||||
WorkflowRunStatus,
|
||||
WorkflowRunTriggeredFrom,
|
||||
WorkflowType,
|
||||
)
|
||||
|
||||
@ -53,20 +42,6 @@ node_classes = {
|
||||
|
||||
|
||||
class WorkflowEngineManager:
|
||||
def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]:
|
||||
"""
|
||||
Get workflow
|
||||
"""
|
||||
# fetch workflow by workflow_id
|
||||
workflow = db.session.query(Workflow).filter(
|
||||
Workflow.tenant_id == app_model.tenant_id,
|
||||
Workflow.app_id == app_model.id,
|
||||
Workflow.id == workflow_id
|
||||
).first()
|
||||
|
||||
# return workflow
|
||||
return workflow
|
||||
|
||||
def get_default_configs(self) -> list[dict]:
|
||||
"""
|
||||
Get default block configs
|
||||
@ -100,16 +75,12 @@ class WorkflowEngineManager:
|
||||
return default_config
|
||||
|
||||
def run_workflow(self, workflow: Workflow,
|
||||
triggered_from: WorkflowRunTriggeredFrom,
|
||||
user: Union[Account, EndUser],
|
||||
user_inputs: dict,
|
||||
system_inputs: Optional[dict] = None,
|
||||
callbacks: list[BaseWorkflowCallback] = None) -> None:
|
||||
"""
|
||||
Run workflow
|
||||
:param workflow: Workflow instance
|
||||
:param triggered_from: triggered from
|
||||
:param user: account or end user
|
||||
:param user_inputs: user variables inputs
|
||||
:param system_inputs: system inputs, like: query, files
|
||||
:param callbacks: workflow callbacks
|
||||
@ -130,18 +101,13 @@ class WorkflowEngineManager:
|
||||
raise ValueError('edges in workflow graph must be a list')
|
||||
|
||||
# init workflow run
|
||||
workflow_run = self._init_workflow_run(
|
||||
workflow=workflow,
|
||||
triggered_from=triggered_from,
|
||||
user=user,
|
||||
user_inputs=user_inputs,
|
||||
system_inputs=system_inputs,
|
||||
callbacks=callbacks
|
||||
)
|
||||
if callbacks:
|
||||
for callback in callbacks:
|
||||
callback.on_workflow_run_started()
|
||||
|
||||
# init workflow run state
|
||||
workflow_run_state = WorkflowRunState(
|
||||
workflow_run=workflow_run,
|
||||
workflow=workflow,
|
||||
start_at=time.perf_counter(),
|
||||
user_inputs=user_inputs,
|
||||
variable_pool=VariablePool(
|
||||
@ -166,7 +132,7 @@ class WorkflowEngineManager:
|
||||
has_entry_node = True
|
||||
|
||||
# max steps 30 reached
|
||||
if len(workflow_run_state.workflow_node_executions) > 30:
|
||||
if len(workflow_run_state.workflow_nodes_and_results) > 30:
|
||||
raise ValueError('Max steps 30 reached.')
|
||||
|
||||
# or max execution time 10min reached
|
||||
@ -188,14 +154,14 @@ class WorkflowEngineManager:
|
||||
|
||||
if not has_entry_node:
|
||||
self._workflow_run_failed(
|
||||
workflow_run_state=workflow_run_state,
|
||||
error='Start node not found in workflow graph.',
|
||||
callbacks=callbacks
|
||||
)
|
||||
return
|
||||
except GenerateTaskStoppedException as e:
|
||||
return
|
||||
except Exception as e:
|
||||
self._workflow_run_failed(
|
||||
workflow_run_state=workflow_run_state,
|
||||
error=str(e),
|
||||
callbacks=callbacks
|
||||
)
|
||||
@ -203,112 +169,33 @@ class WorkflowEngineManager:
|
||||
|
||||
# workflow run success
|
||||
self._workflow_run_success(
|
||||
workflow_run_state=workflow_run_state,
|
||||
callbacks=callbacks
|
||||
)
|
||||
|
||||
def _init_workflow_run(self, workflow: Workflow,
|
||||
triggered_from: WorkflowRunTriggeredFrom,
|
||||
user: Union[Account, EndUser],
|
||||
user_inputs: dict,
|
||||
system_inputs: Optional[dict] = None,
|
||||
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowRun:
|
||||
"""
|
||||
Init workflow run
|
||||
:param workflow: Workflow instance
|
||||
:param triggered_from: triggered from
|
||||
:param user: account or end user
|
||||
:param user_inputs: user variables inputs
|
||||
:param system_inputs: system inputs, like: query, files
|
||||
:param callbacks: workflow callbacks
|
||||
:return:
|
||||
"""
|
||||
max_sequence = db.session.query(db.func.max(WorkflowRun.sequence_number)) \
|
||||
.filter(WorkflowRun.tenant_id == workflow.tenant_id) \
|
||||
.filter(WorkflowRun.app_id == workflow.app_id) \
|
||||
.scalar() or 0
|
||||
new_sequence_number = max_sequence + 1
|
||||
|
||||
# init workflow run
|
||||
workflow_run = WorkflowRun(
|
||||
tenant_id=workflow.tenant_id,
|
||||
app_id=workflow.app_id,
|
||||
sequence_number=new_sequence_number,
|
||||
workflow_id=workflow.id,
|
||||
type=workflow.type,
|
||||
triggered_from=triggered_from.value,
|
||||
version=workflow.version,
|
||||
graph=workflow.graph,
|
||||
inputs=json.dumps({**user_inputs, **jsonable_encoder(system_inputs)}),
|
||||
status=WorkflowRunStatus.RUNNING.value,
|
||||
created_by_role=(CreatedByRole.ACCOUNT.value
|
||||
if isinstance(user, Account) else CreatedByRole.END_USER.value),
|
||||
created_by=user.id
|
||||
)
|
||||
|
||||
db.session.add(workflow_run)
|
||||
db.session.commit()
|
||||
|
||||
if callbacks:
|
||||
for callback in callbacks:
|
||||
callback.on_workflow_run_started(workflow_run)
|
||||
|
||||
return workflow_run
|
||||
|
||||
def _workflow_run_success(self, workflow_run_state: WorkflowRunState,
|
||||
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowRun:
|
||||
def _workflow_run_success(self, callbacks: list[BaseWorkflowCallback] = None) -> None:
|
||||
"""
|
||||
Workflow run success
|
||||
:param workflow_run_state: workflow run state
|
||||
:param callbacks: workflow callbacks
|
||||
:return:
|
||||
"""
|
||||
workflow_run = workflow_run_state.workflow_run
|
||||
workflow_run.status = WorkflowRunStatus.SUCCEEDED.value
|
||||
|
||||
# fetch last workflow_node_executions
|
||||
last_workflow_node_execution = workflow_run_state.workflow_node_executions[-1]
|
||||
if last_workflow_node_execution:
|
||||
workflow_run.outputs = last_workflow_node_execution.outputs
|
||||
|
||||
workflow_run.elapsed_time = time.perf_counter() - workflow_run_state.start_at
|
||||
workflow_run.total_tokens = workflow_run_state.total_tokens
|
||||
workflow_run.total_steps = len(workflow_run_state.workflow_node_executions)
|
||||
workflow_run.finished_at = datetime.utcnow()
|
||||
|
||||
db.session.commit()
|
||||
|
||||
if callbacks:
|
||||
for callback in callbacks:
|
||||
callback.on_workflow_run_finished(workflow_run)
|
||||
callback.on_workflow_run_succeeded()
|
||||
|
||||
return workflow_run
|
||||
|
||||
def _workflow_run_failed(self, workflow_run_state: WorkflowRunState,
|
||||
error: str,
|
||||
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowRun:
|
||||
def _workflow_run_failed(self, error: str,
|
||||
callbacks: list[BaseWorkflowCallback] = None) -> None:
|
||||
"""
|
||||
Workflow run failed
|
||||
:param workflow_run_state: workflow run state
|
||||
:param error: error message
|
||||
:param callbacks: workflow callbacks
|
||||
:return:
|
||||
"""
|
||||
workflow_run = workflow_run_state.workflow_run
|
||||
workflow_run.status = WorkflowRunStatus.FAILED.value
|
||||
workflow_run.error = error
|
||||
workflow_run.elapsed_time = time.perf_counter() - workflow_run_state.start_at
|
||||
workflow_run.total_tokens = workflow_run_state.total_tokens
|
||||
workflow_run.total_steps = len(workflow_run_state.workflow_node_executions)
|
||||
workflow_run.finished_at = datetime.utcnow()
|
||||
|
||||
db.session.commit()
|
||||
|
||||
if callbacks:
|
||||
for callback in callbacks:
|
||||
callback.on_workflow_run_finished(workflow_run)
|
||||
|
||||
return workflow_run
|
||||
callback.on_workflow_run_failed(
|
||||
error=error
|
||||
)
|
||||
|
||||
def _get_next_node(self, graph: dict,
|
||||
predecessor_node: Optional[BaseNode] = None,
|
||||
@ -384,18 +271,24 @@ class WorkflowEngineManager:
|
||||
def _run_workflow_node(self, workflow_run_state: WorkflowRunState,
|
||||
node: BaseNode,
|
||||
predecessor_node: Optional[BaseNode] = None,
|
||||
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution:
|
||||
# init workflow node execution
|
||||
start_at = time.perf_counter()
|
||||
workflow_node_execution = self._init_node_execution_from_workflow_run(
|
||||
workflow_run_state=workflow_run_state,
|
||||
callbacks: list[BaseWorkflowCallback] = None) -> None:
|
||||
if callbacks:
|
||||
for callback in callbacks:
|
||||
callback.on_workflow_node_execute_started(
|
||||
node_id=node.node_id,
|
||||
node_type=node.node_type,
|
||||
node_data=node.node_data,
|
||||
node_run_index=len(workflow_run_state.workflow_nodes_and_results) + 1,
|
||||
predecessor_node_id=predecessor_node.node_id if predecessor_node else None
|
||||
)
|
||||
|
||||
workflow_nodes_and_result = WorkflowNodeAndResult(
|
||||
node=node,
|
||||
predecessor_node=predecessor_node,
|
||||
callbacks=callbacks
|
||||
result=None
|
||||
)
|
||||
|
||||
# add to workflow node executions
|
||||
workflow_run_state.workflow_node_executions.append(workflow_node_execution)
|
||||
# add to workflow_nodes_and_results
|
||||
workflow_run_state.workflow_nodes_and_results.append(workflow_nodes_and_result)
|
||||
|
||||
# run node, result must have inputs, process_data, outputs, execution_metadata
|
||||
node_run_result = node.run(
|
||||
@ -406,24 +299,34 @@ class WorkflowEngineManager:
|
||||
|
||||
if node_run_result.status == WorkflowNodeExecutionStatus.FAILED:
|
||||
# node run failed
|
||||
self._workflow_node_execution_failed(
|
||||
workflow_node_execution=workflow_node_execution,
|
||||
start_at=start_at,
|
||||
error=node_run_result.error,
|
||||
callbacks=callbacks
|
||||
)
|
||||
if callbacks:
|
||||
for callback in callbacks:
|
||||
callback.on_workflow_node_execute_failed(
|
||||
node_id=node.node_id,
|
||||
node_type=node.node_type,
|
||||
node_data=node.node_data,
|
||||
error=node_run_result.error
|
||||
)
|
||||
|
||||
raise ValueError(f"Node {node.node_data.title} run failed: {node_run_result.error}")
|
||||
|
||||
# set end node output if in chat
|
||||
self._set_end_node_output_if_in_chat(workflow_run_state, node, node_run_result)
|
||||
|
||||
workflow_nodes_and_result.result = node_run_result
|
||||
|
||||
# node run success
|
||||
self._workflow_node_execution_success(
|
||||
workflow_node_execution=workflow_node_execution,
|
||||
start_at=start_at,
|
||||
result=node_run_result,
|
||||
callbacks=callbacks
|
||||
)
|
||||
if callbacks:
|
||||
for callback in callbacks:
|
||||
callback.on_workflow_node_execute_succeeded(
|
||||
node_id=node.node_id,
|
||||
node_type=node.node_type,
|
||||
node_data=node.node_data,
|
||||
inputs=node_run_result.inputs,
|
||||
process_data=node_run_result.process_data,
|
||||
outputs=node_run_result.outputs,
|
||||
execution_metadata=node_run_result.metadata
|
||||
)
|
||||
|
||||
if node_run_result.outputs:
|
||||
for variable_key, variable_value in node_run_result.outputs.items():
|
||||
@ -438,105 +341,9 @@ class WorkflowEngineManager:
|
||||
if node_run_result.metadata and node_run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS):
|
||||
workflow_run_state.total_tokens += int(node_run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS))
|
||||
|
||||
return workflow_node_execution
|
||||
|
||||
def _init_node_execution_from_workflow_run(self, workflow_run_state: WorkflowRunState,
|
||||
node: BaseNode,
|
||||
predecessor_node: Optional[BaseNode] = None,
|
||||
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution:
|
||||
"""
|
||||
Init workflow node execution from workflow run
|
||||
:param workflow_run_state: workflow run state
|
||||
:param node: current node
|
||||
:param predecessor_node: predecessor node if exists
|
||||
:param callbacks: workflow callbacks
|
||||
:return:
|
||||
"""
|
||||
workflow_run = workflow_run_state.workflow_run
|
||||
|
||||
# init workflow node execution
|
||||
workflow_node_execution = WorkflowNodeExecution(
|
||||
tenant_id=workflow_run.tenant_id,
|
||||
app_id=workflow_run.app_id,
|
||||
workflow_id=workflow_run.workflow_id,
|
||||
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value,
|
||||
workflow_run_id=workflow_run.id,
|
||||
predecessor_node_id=predecessor_node.node_id if predecessor_node else None,
|
||||
index=len(workflow_run_state.workflow_node_executions) + 1,
|
||||
node_id=node.node_id,
|
||||
node_type=node.node_type.value,
|
||||
title=node.node_data.title,
|
||||
status=WorkflowNodeExecutionStatus.RUNNING.value,
|
||||
created_by_role=workflow_run.created_by_role,
|
||||
created_by=workflow_run.created_by
|
||||
)
|
||||
|
||||
db.session.add(workflow_node_execution)
|
||||
db.session.commit()
|
||||
|
||||
if callbacks:
|
||||
for callback in callbacks:
|
||||
callback.on_workflow_node_execute_started(workflow_node_execution)
|
||||
|
||||
return workflow_node_execution
|
||||
|
||||
def _workflow_node_execution_success(self, workflow_node_execution: WorkflowNodeExecution,
|
||||
start_at: float,
|
||||
result: NodeRunResult,
|
||||
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution:
|
||||
"""
|
||||
Workflow node execution success
|
||||
:param workflow_node_execution: workflow node execution
|
||||
:param start_at: start time
|
||||
:param result: node run result
|
||||
:param callbacks: workflow callbacks
|
||||
:return:
|
||||
"""
|
||||
workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
|
||||
workflow_node_execution.elapsed_time = time.perf_counter() - start_at
|
||||
workflow_node_execution.inputs = json.dumps(result.inputs) if result.inputs else None
|
||||
workflow_node_execution.process_data = json.dumps(result.process_data) if result.process_data else None
|
||||
workflow_node_execution.outputs = json.dumps(result.outputs) if result.outputs else None
|
||||
workflow_node_execution.execution_metadata = json.dumps(jsonable_encoder(result.metadata)) \
|
||||
if result.metadata else None
|
||||
workflow_node_execution.finished_at = datetime.utcnow()
|
||||
|
||||
db.session.commit()
|
||||
|
||||
if callbacks:
|
||||
for callback in callbacks:
|
||||
callback.on_workflow_node_execute_finished(workflow_node_execution)
|
||||
|
||||
return workflow_node_execution
|
||||
|
||||
def _workflow_node_execution_failed(self, workflow_node_execution: WorkflowNodeExecution,
|
||||
start_at: float,
|
||||
error: str,
|
||||
callbacks: list[BaseWorkflowCallback] = None) -> WorkflowNodeExecution:
|
||||
"""
|
||||
Workflow node execution failed
|
||||
:param workflow_node_execution: workflow node execution
|
||||
:param start_at: start time
|
||||
:param error: error message
|
||||
:param callbacks: workflow callbacks
|
||||
:return:
|
||||
"""
|
||||
workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
|
||||
workflow_node_execution.error = error
|
||||
workflow_node_execution.elapsed_time = time.perf_counter() - start_at
|
||||
workflow_node_execution.finished_at = datetime.utcnow()
|
||||
|
||||
db.session.commit()
|
||||
|
||||
if callbacks:
|
||||
for callback in callbacks:
|
||||
callback.on_workflow_node_execute_finished(workflow_node_execution)
|
||||
|
||||
return workflow_node_execution
|
||||
|
||||
def _set_end_node_output_if_in_chat(self, workflow_run_state: WorkflowRunState,
|
||||
node: BaseNode,
|
||||
node_run_result: NodeRunResult):
|
||||
node_run_result: NodeRunResult) -> None:
|
||||
"""
|
||||
Set end node output if in chat
|
||||
:param workflow_run_state: workflow run state
|
||||
@ -544,21 +351,19 @@ class WorkflowEngineManager:
|
||||
:param node_run_result: node run result
|
||||
:return:
|
||||
"""
|
||||
if workflow_run_state.workflow_run.type == WorkflowType.CHAT.value and node.node_type == NodeType.END:
|
||||
workflow_node_execution_before_end = workflow_run_state.workflow_node_executions[-2]
|
||||
if workflow_node_execution_before_end:
|
||||
if workflow_node_execution_before_end.node_type == NodeType.LLM.value:
|
||||
if workflow_run_state.workflow.type == WorkflowType.CHAT.value and node.node_type == NodeType.END:
|
||||
workflow_nodes_and_result_before_end = workflow_run_state.workflow_nodes_and_results[-2]
|
||||
if workflow_nodes_and_result_before_end:
|
||||
if workflow_nodes_and_result_before_end.node.node_type == NodeType.LLM.value:
|
||||
if not node_run_result.outputs:
|
||||
node_run_result.outputs = {}
|
||||
|
||||
node_run_result.outputs['text'] = workflow_node_execution_before_end.outputs_dict.get('text')
|
||||
elif workflow_node_execution_before_end.node_type == NodeType.DIRECT_ANSWER.value:
|
||||
node_run_result.outputs['text'] = workflow_nodes_and_result_before_end.result.outputs.get('text')
|
||||
elif workflow_nodes_and_result_before_end.node.node_type == NodeType.DIRECT_ANSWER.value:
|
||||
if not node_run_result.outputs:
|
||||
node_run_result.outputs = {}
|
||||
|
||||
node_run_result.outputs['text'] = workflow_node_execution_before_end.outputs_dict.get('answer')
|
||||
|
||||
return node_run_result
|
||||
node_run_result.outputs['text'] = workflow_nodes_and_result_before_end.result.outputs.get('answer')
|
||||
|
||||
def _append_variables_recursively(self, variable_pool: VariablePool,
|
||||
node_id: str,
|
||||
|
||||
Reference in New Issue
Block a user