refactor(api): continue decoupling dify_graph from API concerns (#33580)

Signed-off-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: WH-2099 <wh2099@pm.me>
This commit is contained in:
-LAN-
2026-03-25 20:32:24 +08:00
committed by GitHub
parent b7b9b003c9
commit 56593f20b0
487 changed files with 17999 additions and 9186 deletions

View File

@ -24,6 +24,7 @@ from core.app.apps.advanced_chat.app_runner import AdvancedChatAppRunner
from core.app.apps.advanced_chat.generate_response_converter import AdvancedChatAppGenerateResponseConverter
from core.app.apps.advanced_chat.generate_task_pipeline import AdvancedChatAppGenerateTaskPipeline
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.draft_variable_saver import DraftVariableSaverFactory
from core.app.apps.exc import GenerateTaskStoppedError
from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager
@ -34,13 +35,9 @@ from core.helper.trace_id_helper import extract_external_trace_id_from_args
from core.ops.ops_trace_manager import TraceQueueManager
from core.prompt.utils.get_thread_messages_length import get_thread_messages_length
from core.repositories import DifyCoreRepositoryFactory
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.model_runtime.errors.invoke import InvokeAuthorizationError
from dify_graph.repositories.draft_variable_repository import (
DraftVariableSaverFactory,
)
from dify_graph.repositories.workflow_execution_repository import WorkflowExecutionRepository
from dify_graph.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from dify_graph.runtime import GraphRuntimeState
from dify_graph.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader
from extensions.ext_database import db
@ -150,85 +147,87 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
#
# For implementation reference, see the `_parse_file` function and
# `DraftWorkflowNodeRunApi` class which handle this properly.
files = args["files"] if args.get("files") else []
file_extra_config = FileUploadConfigManager.convert(workflow.features_dict, is_vision=False)
if file_extra_config:
file_objs = file_factory.build_from_mappings(
mappings=files,
tenant_id=app_model.tenant_id,
config=file_extra_config,
with self._bind_file_access_scope(tenant_id=app_model.tenant_id, user=user, invoke_from=invoke_from):
files = args["files"] if args.get("files") else []
file_extra_config = FileUploadConfigManager.convert(workflow.features_dict, is_vision=False)
if file_extra_config:
file_objs = file_factory.build_from_mappings(
mappings=files,
tenant_id=app_model.tenant_id,
config=file_extra_config,
access_controller=self._file_access_controller,
)
else:
file_objs = []
# convert to app config
app_config = AdvancedChatAppConfigManager.get_app_config(app_model=app_model, workflow=workflow)
# get tracing instance
trace_manager = TraceQueueManager(
app_id=app_model.id, user_id=user.id if isinstance(user, Account) else user.session_id
)
else:
file_objs = []
# convert to app config
app_config = AdvancedChatAppConfigManager.get_app_config(app_model=app_model, workflow=workflow)
if invoke_from == InvokeFrom.DEBUGGER:
# always enable retriever resource in debugger mode
app_config.additional_features.show_retrieve_source = True # type: ignore
# get tracing instance
trace_manager = TraceQueueManager(
app_id=app_model.id, user_id=user.id if isinstance(user, Account) else user.session_id
)
# init application generate entity
application_generate_entity = AdvancedChatAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
file_upload_config=file_extra_config,
conversation_id=conversation.id if conversation else None,
inputs=self._prepare_user_inputs(
user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
),
query=query,
files=list(file_objs),
parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,
user_id=user.id,
stream=streaming,
invoke_from=invoke_from,
extras=extras,
trace_manager=trace_manager,
workflow_run_id=str(workflow_run_id),
)
contexts.plugin_tool_providers.set({})
contexts.plugin_tool_providers_lock.set(threading.Lock())
if invoke_from == InvokeFrom.DEBUGGER:
# always enable retriever resource in debugger mode
app_config.additional_features.show_retrieve_source = True # type: ignore
# Create repositories
#
# Create session factory
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
# Create workflow execution(aka workflow run) repository
if invoke_from == InvokeFrom.DEBUGGER:
workflow_triggered_from = WorkflowRunTriggeredFrom.DEBUGGING
else:
workflow_triggered_from = WorkflowRunTriggeredFrom.APP_RUN
workflow_execution_repository = DifyCoreRepositoryFactory.create_workflow_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
triggered_from=workflow_triggered_from,
)
# Create workflow node execution repository
workflow_node_execution_repository = DifyCoreRepositoryFactory.create_workflow_node_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
)
# init application generate entity
application_generate_entity = AdvancedChatAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
file_upload_config=file_extra_config,
conversation_id=conversation.id if conversation else None,
inputs=self._prepare_user_inputs(
user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
),
query=query,
files=list(file_objs),
parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,
user_id=user.id,
stream=streaming,
invoke_from=invoke_from,
extras=extras,
trace_manager=trace_manager,
workflow_run_id=str(workflow_run_id),
)
contexts.plugin_tool_providers.set({})
contexts.plugin_tool_providers_lock.set(threading.Lock())
# Create repositories
#
# Create session factory
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
# Create workflow execution(aka workflow run) repository
if invoke_from == InvokeFrom.DEBUGGER:
workflow_triggered_from = WorkflowRunTriggeredFrom.DEBUGGING
else:
workflow_triggered_from = WorkflowRunTriggeredFrom.APP_RUN
workflow_execution_repository = DifyCoreRepositoryFactory.create_workflow_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
triggered_from=workflow_triggered_from,
)
# Create workflow node execution repository
workflow_node_execution_repository = DifyCoreRepositoryFactory.create_workflow_node_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
)
return self._generate(
workflow=workflow,
user=user,
invoke_from=invoke_from,
application_generate_entity=application_generate_entity,
workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository,
conversation=conversation,
stream=streaming,
pause_state_config=pause_state_config,
)
return self._generate(
workflow=workflow,
user=user,
invoke_from=invoke_from,
application_generate_entity=application_generate_entity,
workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository,
conversation=conversation,
stream=streaming,
pause_state_config=pause_state_config,
)
def resume(
self,
@ -460,94 +459,90 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
:param conversation: conversation
:param stream: is stream
"""
is_first_conversation = conversation is None
with self._bind_file_access_scope(
tenant_id=application_generate_entity.app_config.tenant_id,
user=user,
invoke_from=invoke_from,
):
is_first_conversation = conversation is None
if conversation is not None and message is not None:
pass
else:
conversation, message = self._init_generate_records(application_generate_entity, conversation)
if conversation is not None and message is not None:
pass
else:
conversation, message = self._init_generate_records(application_generate_entity, conversation)
if is_first_conversation:
# update conversation features
conversation.override_model_configs = workflow.features
db.session.commit()
db.session.refresh(conversation)
if is_first_conversation:
# update conversation features
conversation.override_model_configs = workflow.features
db.session.commit()
db.session.refresh(conversation)
# get conversation dialogue count
# NOTE: dialogue_count should not start from 0,
# because during the first conversation, dialogue_count should be 1.
self._dialogue_count = get_thread_messages_length(conversation.id) + 1
# get conversation dialogue count
# NOTE: dialogue_count should not start from 0,
# because during the first conversation, dialogue_count should be 1.
self._dialogue_count = get_thread_messages_length(conversation.id) + 1
# init queue manager
queue_manager = MessageBasedAppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
conversation_id=conversation.id,
app_mode=conversation.mode,
message_id=message.id,
)
graph_layers: list[GraphEngineLayer] = list(graph_engine_layers)
if pause_state_config is not None:
graph_layers.append(
PauseStatePersistenceLayer(
session_factory=pause_state_config.session_factory,
generate_entity=application_generate_entity,
state_owner_user_id=pause_state_config.state_owner_user_id,
)
# init queue manager
queue_manager = MessageBasedAppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
conversation_id=conversation.id,
app_mode=conversation.mode,
message_id=message.id,
)
# new thread with request context and contextvars
context = contextvars.copy_context()
graph_layers: list[GraphEngineLayer] = list(graph_engine_layers)
if pause_state_config is not None:
graph_layers.append(
PauseStatePersistenceLayer(
session_factory=pause_state_config.session_factory,
generate_entity=application_generate_entity,
state_owner_user_id=pause_state_config.state_owner_user_id,
)
)
worker_thread = threading.Thread(
target=self._generate_worker,
kwargs={
"flask_app": current_app._get_current_object(), # type: ignore
"application_generate_entity": application_generate_entity,
"queue_manager": queue_manager,
"conversation_id": conversation.id,
"message_id": message.id,
"context": context,
"variable_loader": variable_loader,
"workflow_execution_repository": workflow_execution_repository,
"workflow_node_execution_repository": workflow_node_execution_repository,
"graph_engine_layers": tuple(graph_layers),
"graph_runtime_state": graph_runtime_state,
},
)
# new thread with request context and contextvars
context = contextvars.copy_context()
worker_thread.start()
worker_thread = threading.Thread(
target=self._generate_worker,
kwargs={
"flask_app": current_app._get_current_object(), # type: ignore
"application_generate_entity": application_generate_entity,
"queue_manager": queue_manager,
"conversation_id": conversation.id,
"message_id": message.id,
"context": context,
"variable_loader": variable_loader,
"workflow_execution_repository": workflow_execution_repository,
"workflow_node_execution_repository": workflow_node_execution_repository,
"graph_engine_layers": tuple(graph_layers),
"graph_runtime_state": graph_runtime_state,
},
)
# release database connection, because the following new thread operations may take a long time
with Session(bind=db.engine, expire_on_commit=False) as session:
workflow = _refresh_model(session, workflow)
message = _refresh_model(session, message)
# workflow_ = session.get(Workflow, workflow.id)
# assert workflow_ is not None
# workflow = workflow_
# message_ = session.get(Message, message.id)
# assert message_ is not None
# message = message_
# db.session.refresh(workflow)
# db.session.refresh(message)
# db.session.refresh(user)
db.session.close()
worker_thread.start()
# return response or stream generator
response = self._handle_advanced_chat_response(
application_generate_entity=application_generate_entity,
workflow=workflow,
queue_manager=queue_manager,
conversation=conversation,
message=message,
user=user,
stream=stream,
draft_var_saver_factory=self._get_draft_var_saver_factory(invoke_from, account=user),
)
# release database connection, because the following new thread operations may take a long time
with Session(bind=db.engine, expire_on_commit=False) as session:
workflow = _refresh_model(session, workflow)
message = _refresh_model(session, message)
db.session.close()
return AdvancedChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
# return response or stream generator
response = self._handle_advanced_chat_response(
application_generate_entity=application_generate_entity,
workflow=workflow,
queue_manager=queue_manager,
conversation=conversation,
message=message,
user=user,
stream=stream,
draft_var_saver_factory=self._get_draft_var_saver_factory(invoke_from, account=user),
)
return AdvancedChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
def _generate_worker(
self,

View File

@ -25,14 +25,19 @@ from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, Workfl
from core.db.session_factory import session_factory
from core.moderation.base import ModerationError
from core.moderation.input_moderation import InputModeration
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
from core.workflow.node_factory import get_default_root_node_id
from core.workflow.system_variables import (
build_bootstrap_variables,
build_system_variables,
system_variables_to_mapping,
)
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
from core.workflow.workflow_entry import WorkflowEntry
from dify_graph.enums import WorkflowType
from dify_graph.graph_engine.command_channels.redis_channel import RedisChannel
from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.repositories.workflow_execution_repository import WorkflowExecutionRepository
from dify_graph.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from dify_graph.runtime import GraphRuntimeState, VariablePool
from dify_graph.system_variable import SystemVariable
from dify_graph.variable_loader import VariableLoader
from dify_graph.variables.variables import Variable
from extensions.ext_database import db
@ -90,7 +95,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
app_config = self.application_generate_entity.app_config
app_config = cast(AdvancedChatAppConfig, app_config)
system_inputs = SystemVariable(
system_inputs = build_system_variables(
query=self.application_generate_entity.query,
files=self.application_generate_entity.files,
conversation_id=self.conversation.id,
@ -150,7 +155,10 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
self.application_generate_entity.inputs = new_inputs
self.application_generate_entity.query = new_query
system_inputs.query = new_query
system_inputs = build_system_variables(
system_variables_to_mapping(system_inputs),
query=new_query,
)
# annotation reply
if self.handle_annotation_reply(
@ -166,14 +174,17 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
# Create a variable pool.
# init variable pool
variable_pool = VariablePool(
system_variables=system_inputs,
user_inputs=new_inputs,
environment_variables=self._workflow.environment_variables,
# Based on the definition of `Variable`,
# `VariableBase` instances can be safely used as `Variable` since they are compatible.
conversation_variables=conversation_variables,
variable_pool = VariablePool()
add_variables_to_pool(
variable_pool,
build_bootstrap_variables(
system_variables=system_inputs,
environment_variables=self._workflow.environment_variables,
conversation_variables=conversation_variables,
),
)
root_node_id = get_default_root_node_id(self._workflow.graph_dict)
add_node_inputs_to_pool(variable_pool, node_id=root_node_id, inputs=new_inputs)
# init graph
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.time())
@ -185,6 +196,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
user_id=self.application_generate_entity.user_id,
user_from=user_from,
invoke_from=invoke_from,
root_node_id=root_node_id,
)
db.session.close()

View File

@ -14,6 +14,7 @@ from constants.tts_auto_play_timeout import TTS_AUTO_PLAY_TIMEOUT, TTS_AUTO_PLAY
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.common.graph_runtime_state_support import GraphRuntimeStateSupport
from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter
from core.app.apps.draft_variable_saver import DraftVariableSaverFactory
from core.app.entities.app_invoke_entities import (
AdvancedChatAppGenerateEntity,
InvokeFrom,
@ -65,14 +66,14 @@ from core.app.task_pipeline.message_cycle_manager import MessageCycleManager
from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
from core.ops.ops_trace_manager import TraceQueueManager
from core.repositories.human_input_repository import HumanInputFormRepositoryImpl
from core.workflow.file_reference import resolve_file_record_id
from core.workflow.system_variables import build_system_variables
from dify_graph.entities.pause_reason import HumanInputRequired
from dify_graph.enums import WorkflowExecutionStatus
from dify_graph.model_runtime.entities.llm_entities import LLMUsage
from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from dify_graph.nodes import BuiltinNodeTypes
from dify_graph.repositories.draft_variable_repository import DraftVariableSaverFactory
from dify_graph.runtime import GraphRuntimeState
from dify_graph.system_variable import SystemVariable
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models import Account, Conversation, EndUser, Message, MessageFile
@ -117,7 +118,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
else:
raise NotImplementedError(f"User type not supported: {type(user)}")
self._workflow_system_variables = SystemVariable(
self._workflow_system_variables = build_system_variables(
query=message.query,
files=application_generate_entity.files,
conversation_id=conversation.id,
@ -741,8 +742,9 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
def _load_human_input_form_id(self, *, node_id: str) -> str | None:
form_repository = HumanInputFormRepositoryImpl(
tenant_id=self._workflow_tenant_id,
workflow_execution_id=self._workflow_run_id,
)
form = form_repository.get_form(self._workflow_run_id, node_id)
form = form_repository.get_form(node_id)
if form is None:
return None
return form.id
@ -933,21 +935,23 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
metadata = self._task_state.metadata.model_dump()
message.message_metadata = json.dumps(jsonable_encoder(metadata))
message_files = [
MessageFile(
message_id=message.id,
type=file["type"],
transfer_method=file["transfer_method"],
url=file["remote_url"],
belongs_to=MessageFileBelongsTo.ASSISTANT,
upload_file_id=file["related_id"],
created_by_role=CreatorUserRole.ACCOUNT
if message.invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER}
else CreatorUserRole.END_USER,
created_by=message.from_account_id or message.from_end_user_id or "",
message_files: list[MessageFile] = []
for file in self._recorded_files:
reference = file.get("reference") or file.get("related_id")
message_files.append(
MessageFile(
message_id=message.id,
type=file["type"],
transfer_method=file["transfer_method"],
url=file["remote_url"],
belongs_to=MessageFileBelongsTo.ASSISTANT,
upload_file_id=resolve_file_record_id(reference if isinstance(reference, str) else None),
created_by_role=CreatorUserRole.ACCOUNT
if message.invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER}
else CreatorUserRole.END_USER,
created_by=message.from_account_id or message.from_end_user_id or "",
)
)
for file in self._recorded_files
]
session.add_all(message_files)
def _seed_graph_runtime_state_from_queue_manager(self) -> None:
@ -1003,13 +1007,11 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
return message
def _save_output_for_event(self, event: QueueNodeSucceededEvent | QueueNodeExceptionEvent, node_execution_id: str):
with Session(db.engine) as session, session.begin():
saver = self._draft_var_saver_factory(
session=session,
app_id=self._application_generate_entity.app_config.app_id,
node_id=event.node_id,
node_type=event.node_type,
node_execution_id=node_execution_id,
enclosing_node_id=event.in_loop_id or event.in_iteration_id,
)
saver.save(event.process_data, event.outputs)
saver = self._draft_var_saver_factory(
app_id=self._application_generate_entity.app_config.app_id,
node_id=event.node_id,
node_type=event.node_type,
node_execution_id=node_execution_id,
enclosing_node_id=event.in_loop_id or event.in_iteration_id,
)
saver.save(event.process_data, event.outputs)

View File

@ -129,89 +129,93 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
#
# For implementation reference, see the `_parse_file` function and
# `DraftWorkflowNodeRunApi` class which handle this properly.
files = args.get("files") or []
file_extra_config = FileUploadConfigManager.convert(override_model_config_dict or app_model_config.to_dict())
if file_extra_config:
file_objs = file_factory.build_from_mappings(
mappings=files,
tenant_id=app_model.tenant_id,
config=file_extra_config,
with self._bind_file_access_scope(tenant_id=app_model.tenant_id, user=user, invoke_from=invoke_from):
files = args.get("files") or []
file_extra_config = FileUploadConfigManager.convert(
override_model_config_dict or app_model_config.to_dict()
)
else:
file_objs = []
if file_extra_config:
file_objs = file_factory.build_from_mappings(
mappings=files,
tenant_id=app_model.tenant_id,
config=file_extra_config,
access_controller=self._file_access_controller,
)
else:
file_objs = []
# convert to app config
app_config = AgentChatAppConfigManager.get_app_config(
app_model=app_model,
app_model_config=app_model_config,
conversation=conversation,
override_config_dict=override_model_config_dict,
)
# convert to app config
app_config = AgentChatAppConfigManager.get_app_config(
app_model=app_model,
app_model_config=app_model_config,
conversation=conversation,
override_config_dict=override_model_config_dict,
)
# get tracing instance
trace_manager = TraceQueueManager(app_model.id, user.id if isinstance(user, Account) else user.session_id)
# get tracing instance
trace_manager = TraceQueueManager(app_model.id, user.id if isinstance(user, Account) else user.session_id)
# init application generate entity
application_generate_entity = AgentChatAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
model_conf=ModelConfigConverter.convert(app_config),
file_upload_config=file_extra_config,
conversation_id=conversation.id if conversation else None,
inputs=self._prepare_user_inputs(
user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
),
query=query,
files=list(file_objs),
parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,
user_id=user.id,
stream=streaming,
invoke_from=invoke_from,
extras=extras,
call_depth=0,
trace_manager=trace_manager,
)
# init application generate entity
application_generate_entity = AgentChatAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
model_conf=ModelConfigConverter.convert(app_config),
file_upload_config=file_extra_config,
conversation_id=conversation.id if conversation else None,
inputs=self._prepare_user_inputs(
user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
),
query=query,
files=list(file_objs),
parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,
user_id=user.id,
stream=streaming,
invoke_from=invoke_from,
extras=extras,
call_depth=0,
trace_manager=trace_manager,
)
# init generate records
(conversation, message) = self._init_generate_records(application_generate_entity, conversation)
# init generate records
(conversation, message) = self._init_generate_records(application_generate_entity, conversation)
# init queue manager
queue_manager = MessageBasedAppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
conversation_id=conversation.id,
app_mode=conversation.mode,
message_id=message.id,
)
# init queue manager
queue_manager = MessageBasedAppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
conversation_id=conversation.id,
app_mode=conversation.mode,
message_id=message.id,
)
# new thread with request context and contextvars
context = contextvars.copy_context()
# new thread with request context and contextvars
context = contextvars.copy_context()
worker_thread = threading.Thread(
target=self._generate_worker,
kwargs={
"flask_app": current_app._get_current_object(), # type: ignore
"context": context,
"application_generate_entity": application_generate_entity,
"queue_manager": queue_manager,
"conversation_id": conversation.id,
"message_id": message.id,
},
)
worker_thread = threading.Thread(
target=self._generate_worker,
kwargs={
"flask_app": current_app._get_current_object(), # type: ignore
"context": context,
"application_generate_entity": application_generate_entity,
"queue_manager": queue_manager,
"conversation_id": conversation.id,
"message_id": message.id,
},
)
worker_thread.start()
worker_thread.start()
# return response or stream generator
response = self._handle_response(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation=conversation,
message=message,
user=user,
stream=streaming,
)
return AgentChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
# return response or stream generator
response = self._handle_response(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation=conversation,
message=message,
user=user,
stream=streaming,
)
return AgentChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
def _generate_worker(
self,

View File

@ -1,17 +1,20 @@
from collections.abc import Generator, Mapping, Sequence
from contextlib import AbstractContextManager, nullcontext
from typing import TYPE_CHECKING, Any, Union, final
from sqlalchemy.orm import Session
from core.app.entities.app_invoke_entities import InvokeFrom
from dify_graph.enums import NodeType
from dify_graph.file import File, FileUploadConfig
from dify_graph.repositories.draft_variable_repository import (
from core.app.apps.draft_variable_saver import (
DraftVariableSaver,
DraftVariableSaverFactory,
NoopDraftVariableSaver,
)
from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom
from core.app.file_access import DatabaseFileAccessController, FileAccessScope, bind_file_access_scope
from dify_graph.enums import NodeType
from dify_graph.file import File, FileUploadConfig
from dify_graph.variables.input_entities import VariableEntityType
from extensions.ext_database import db
from factories import file_factory
from libs.orjson import orjson_dumps
from models import Account, EndUser
@ -21,7 +24,66 @@ if TYPE_CHECKING:
from dify_graph.variables.input_entities import VariableEntity
@final
class _DebuggerDraftVariableSaver:
"""Adapter that binds SQLAlchemy session setup outside the saver port."""
def __init__(
self,
*,
account: Account,
app_id: str,
node_id: str,
node_type: NodeType,
node_execution_id: str,
enclosing_node_id: str | None = None,
) -> None:
self._account = account
self._app_id = app_id
self._node_id = node_id
self._node_type = node_type
self._node_execution_id = node_execution_id
self._enclosing_node_id = enclosing_node_id
def save(self, process_data: Mapping[str, Any] | None, outputs: Mapping[str, Any] | None) -> None:
with Session(db.engine) as session, session.begin():
DraftVariableSaverImpl(
session=session,
app_id=self._app_id,
node_id=self._node_id,
node_type=self._node_type,
node_execution_id=self._node_execution_id,
enclosing_node_id=self._enclosing_node_id,
user=self._account,
).save(process_data, outputs)
class BaseAppGenerator:
_file_access_controller: DatabaseFileAccessController = DatabaseFileAccessController()
@staticmethod
def _bind_file_access_scope(
*,
tenant_id: str,
user: Account | EndUser,
invoke_from: InvokeFrom,
) -> AbstractContextManager[None]:
"""Bind request-scoped file ownership markers for downstream file lookups."""
user_id = getattr(user, "id", None)
if not isinstance(user_id, str) or not user_id:
return nullcontext()
user_from = UserFrom.ACCOUNT if isinstance(user, Account) else UserFrom.END_USER
return bind_file_access_scope(
FileAccessScope(
tenant_id=tenant_id,
user_id=user_id,
user_from=user_from,
invoke_from=invoke_from,
)
)
def _prepare_user_inputs(
self,
*,
@ -50,6 +112,7 @@ class BaseAppGenerator:
allowed_file_upload_methods=entity_dictionary[k].allowed_file_upload_methods or [],
),
strict_type_validation=strict_type_validation,
access_controller=self._file_access_controller,
)
for k, v in user_inputs.items()
if isinstance(v, dict) and entity_dictionary[k].type == VariableEntityType.FILE
@ -64,6 +127,7 @@ class BaseAppGenerator:
allowed_file_extensions=entity_dictionary[k].allowed_file_extensions or [],
allowed_file_upload_methods=entity_dictionary[k].allowed_file_upload_methods or [],
),
access_controller=self._file_access_controller,
)
for k, v in user_inputs.items()
if isinstance(v, list)
@ -226,32 +290,30 @@ class BaseAppGenerator:
assert isinstance(account, Account)
def draft_var_saver_factory(
session: Session,
app_id: str,
node_id: str,
node_type: NodeType,
node_execution_id: str,
enclosing_node_id: str | None = None,
) -> DraftVariableSaver:
return DraftVariableSaverImpl(
session=session,
return _DebuggerDraftVariableSaver(
account=account,
app_id=app_id,
node_id=node_id,
node_type=node_type,
node_execution_id=node_execution_id,
enclosing_node_id=enclosing_node_id,
user=account,
)
else:
def draft_var_saver_factory(
session: Session,
app_id: str,
node_id: str,
node_type: NodeType,
node_execution_id: str,
enclosing_node_id: str | None = None,
) -> DraftVariableSaver:
_ = app_id, node_id, node_type, node_execution_id, enclosing_node_id
return NoopDraftVariableSaver()
return draft_var_saver_factory

View File

@ -61,27 +61,30 @@ class AppQueueManager(ABC):
listen_timeout = dify_config.APP_MAX_EXECUTION_TIME
start_time = time.time()
last_ping_time: int | float = 0
while True:
try:
message = self._q.get(timeout=1)
if message is None:
break
try:
while True:
try:
message = self._q.get(timeout=1)
if message is None:
break
yield message
except queue.Empty:
continue
finally:
elapsed_time = time.time() - start_time
if elapsed_time >= listen_timeout or self._is_stopped():
# publish two messages to make sure the client can receive the stop signal
# and stop listening after the stop signal processed
self.publish(
QueueStopEvent(stopped_by=QueueStopEvent.StopBy.USER_MANUAL), PublishFrom.TASK_PIPELINE
)
yield message
except queue.Empty:
continue
finally:
elapsed_time = time.time() - start_time
if elapsed_time >= listen_timeout or self._is_stopped():
# publish two messages to make sure the client can receive the stop signal
# and stop listening after the stop signal processed
self.publish(
QueueStopEvent(stopped_by=QueueStopEvent.StopBy.USER_MANUAL), PublishFrom.TASK_PIPELINE
)
if elapsed_time // 10 > last_ping_time:
self.publish(QueuePingEvent(), PublishFrom.TASK_PIPELINE)
last_ping_time = elapsed_time // 10
if elapsed_time // 10 > last_ping_time:
self.publish(QueuePingEvent(), PublishFrom.TASK_PIPELINE)
last_ping_time = elapsed_time // 10
finally:
self._graph_runtime_state = None # Release reference once consumers finish or close the generator.
def stop_listen(self):
"""
@ -90,7 +93,6 @@ class AppQueueManager(ABC):
"""
self._clear_task_belong_cache()
self._q.put(None)
self._graph_runtime_state = None # Release reference to allow GC to reclaim memory
def _clear_task_belong_cache(self) -> None:
"""

View File

@ -1,3 +1,4 @@
import contextvars
import logging
import threading
import uuid
@ -120,89 +121,96 @@ class ChatAppGenerator(MessageBasedAppGenerator):
#
# For implementation reference, see the `_parse_file` function and
# `DraftWorkflowNodeRunApi` class which handle this properly.
files = args["files"] if args.get("files") else []
file_extra_config = FileUploadConfigManager.convert(override_model_config_dict or app_model_config.to_dict())
if file_extra_config:
file_objs = file_factory.build_from_mappings(
mappings=files,
tenant_id=app_model.tenant_id,
config=file_extra_config,
with self._bind_file_access_scope(tenant_id=app_model.tenant_id, user=user, invoke_from=invoke_from):
files = args["files"] if args.get("files") else []
file_extra_config = FileUploadConfigManager.convert(
override_model_config_dict or app_model_config.to_dict()
)
else:
file_objs = []
if file_extra_config:
file_objs = file_factory.build_from_mappings(
mappings=files,
tenant_id=app_model.tenant_id,
config=file_extra_config,
access_controller=self._file_access_controller,
)
else:
file_objs = []
# convert to app config
app_config = ChatAppConfigManager.get_app_config(
app_model=app_model,
app_model_config=app_model_config,
conversation=conversation,
override_config_dict=override_model_config_dict,
)
# convert to app config
app_config = ChatAppConfigManager.get_app_config(
app_model=app_model,
app_model_config=app_model_config,
conversation=conversation,
override_config_dict=override_model_config_dict,
)
# get tracing instance
trace_manager = TraceQueueManager(
app_id=app_model.id, user_id=user.id if isinstance(user, Account) else user.session_id
)
# get tracing instance
trace_manager = TraceQueueManager(
app_id=app_model.id, user_id=user.id if isinstance(user, Account) else user.session_id
)
# init application generate entity
application_generate_entity = ChatAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
model_conf=ModelConfigConverter.convert(app_config),
file_upload_config=file_extra_config,
conversation_id=conversation.id if conversation else None,
inputs=self._prepare_user_inputs(
user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
),
query=query,
files=list(file_objs),
parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,
user_id=user.id,
invoke_from=invoke_from,
extras=extras,
trace_manager=trace_manager,
stream=streaming,
)
# init application generate entity
application_generate_entity = ChatAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
model_conf=ModelConfigConverter.convert(app_config),
file_upload_config=file_extra_config,
conversation_id=conversation.id if conversation else None,
inputs=self._prepare_user_inputs(
user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
),
query=query,
files=list(file_objs),
parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,
user_id=user.id,
invoke_from=invoke_from,
extras=extras,
trace_manager=trace_manager,
stream=streaming,
)
# init generate records
(conversation, message) = self._init_generate_records(application_generate_entity, conversation)
# init generate records
(conversation, message) = self._init_generate_records(application_generate_entity, conversation)
# init queue manager
queue_manager = MessageBasedAppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
conversation_id=conversation.id,
app_mode=conversation.mode,
message_id=message.id,
)
# new thread with request context
@copy_current_request_context
def worker_with_context():
return self._generate_worker(
flask_app=current_app._get_current_object(), # type: ignore
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
# init queue manager
queue_manager = MessageBasedAppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
conversation_id=conversation.id,
app_mode=conversation.mode,
message_id=message.id,
)
worker_thread = threading.Thread(target=worker_with_context)
context = contextvars.copy_context()
worker_thread.start()
# new thread with request context
@copy_current_request_context
def worker_with_context():
return context.run(
self._generate_worker,
flask_app=current_app._get_current_object(), # type: ignore
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation_id=conversation.id,
message_id=message.id,
)
# return response or stream generator
response = self._handle_response(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation=conversation,
message=message,
user=user,
stream=streaming,
)
worker_thread = threading.Thread(target=worker_with_context)
return ChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
worker_thread.start()
# return response or stream generator
response = self._handle_response(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation=conversation,
message=message,
user=user,
stream=streaming,
)
return ChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
def _generate_worker(
self,

View File

@ -223,7 +223,6 @@ class ChatAppRunner(AppRunner):
model_parameters=application_generate_entity.model_conf.parameters,
stop=stop,
stream=application_generate_entity.stream,
user=application_generate_entity.user_id,
)
# handle invoke result

View File

@ -4,6 +4,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from core.workflow.system_variables import SystemVariableKey, get_system_text
from dify_graph.runtime import GraphRuntimeState
if TYPE_CHECKING:
@ -30,10 +31,10 @@ class GraphRuntimeStateSupport:
return self._resolve_graph_runtime_state(graph_runtime_state)
def _extract_workflow_run_id(self, graph_runtime_state: GraphRuntimeState) -> str:
system_variables = graph_runtime_state.variable_pool.system_variables
if not system_variables or not system_variables.workflow_execution_id:
workflow_run_id = get_system_text(graph_runtime_state.variable_pool, SystemVariableKey.WORKFLOW_EXECUTION_ID)
if not workflow_run_id:
raise ValueError("workflow_execution_id missing from runtime state")
return str(system_variables.workflow_execution_id)
return workflow_run_id
def _resolve_graph_runtime_state(
self,

View File

@ -1,3 +1,4 @@
import json
import logging
import time
from collections.abc import Mapping, Sequence
@ -50,20 +51,21 @@ from core.tools.entities.tool_entities import ToolProviderType
from core.tools.tool_manager import ToolManager
from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE
from core.trigger.trigger_manager import TriggerManager
from core.workflow.human_input_forms import load_form_tokens_by_form_id
from core.workflow.system_variables import SystemVariableKey, system_variables_to_mapping
from core.workflow.workflow_entry import WorkflowEntry
from dify_graph.entities.pause_reason import HumanInputRequired
from dify_graph.entities.workflow_start_reason import WorkflowStartReason
from dify_graph.enums import (
BuiltinNodeTypes,
SystemVariableKey,
WorkflowExecutionStatus,
WorkflowNodeExecutionMetadataKey,
WorkflowNodeExecutionStatus,
)
from dify_graph.file import FILE_MODEL_IDENTITY, File
from dify_graph.runtime import GraphRuntimeState
from dify_graph.system_variable import SystemVariable
from dify_graph.variables.segments import ArrayFileSegment, FileSegment, Segment
from dify_graph.variables.variables import Variable
from dify_graph.workflow_type_encoder import WorkflowRuntimeTypeConverter
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
@ -111,11 +113,11 @@ class WorkflowResponseConverter:
*,
application_generate_entity: Union[AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity],
user: Union[Account, EndUser],
system_variables: SystemVariable,
system_variables: Sequence[Variable],
):
self._application_generate_entity = application_generate_entity
self._user = user
self._system_variables = system_variables
self._system_variables = system_variables_to_mapping(system_variables)
self._workflow_inputs = self._prepare_workflow_inputs()
# Disable truncation for SERVICE_API calls to keep backward compatibility.
@ -133,7 +135,7 @@ class WorkflowResponseConverter:
# ------------------------------------------------------------------
def _prepare_workflow_inputs(self) -> Mapping[str, Any]:
inputs = dict(self._application_generate_entity.inputs)
for field_name, value in self._system_variables.to_dict().items():
for field_name, value in self._system_variables.items():
# TODO(@future-refactor): store system variables separately from user inputs so we don't
# need to flatten `sys.*` entries into the input payload just for rerun/export tooling.
if field_name == SystemVariableKey.CONVERSATION_ID:
@ -318,13 +320,23 @@ class WorkflowResponseConverter:
pause_reasons = [reason.model_dump(mode="json") for reason in event.reasons]
human_input_form_ids = [reason.form_id for reason in event.reasons if isinstance(reason, HumanInputRequired)]
expiration_times_by_form_id: dict[str, datetime] = {}
display_in_ui_by_form_id: dict[str, bool] = {}
form_token_by_form_id: dict[str, str] = {}
if human_input_form_ids:
stmt = select(HumanInputForm.id, HumanInputForm.expiration_time).where(
HumanInputForm.id.in_(human_input_form_ids)
)
stmt = select(
HumanInputForm.id,
HumanInputForm.expiration_time,
HumanInputForm.form_definition,
).where(HumanInputForm.id.in_(human_input_form_ids))
with Session(bind=db.engine) as session:
for form_id, expiration_time in session.execute(stmt):
for form_id, expiration_time, form_definition in session.execute(stmt):
expiration_times_by_form_id[str(form_id)] = expiration_time
try:
definition_payload = json.loads(form_definition) if form_definition else {}
except (TypeError, json.JSONDecodeError):
definition_payload = {}
display_in_ui_by_form_id[str(form_id)] = bool(definition_payload.get("display_in_ui"))
form_token_by_form_id = load_form_tokens_by_form_id(human_input_form_ids, session=session)
responses: list[StreamResponse] = []
@ -344,8 +356,8 @@ class WorkflowResponseConverter:
form_content=reason.form_content,
inputs=reason.inputs,
actions=reason.actions,
display_in_ui=reason.display_in_ui,
form_token=reason.form_token,
display_in_ui=display_in_ui_by_form_id.get(reason.form_id, False),
form_token=form_token_by_form_id.get(reason.form_id),
resolved_default_values=reason.resolved_default_values,
expiration_time=int(expiration_time.timestamp()),
),

View File

@ -1,3 +1,4 @@
import contextvars
import logging
import threading
import uuid
@ -108,83 +109,90 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
#
# For implementation reference, see the `_parse_file` function and
# `DraftWorkflowNodeRunApi` class which handle this properly.
files = args["files"] if args.get("files") else []
file_extra_config = FileUploadConfigManager.convert(override_model_config_dict or app_model_config.to_dict())
if file_extra_config:
file_objs = file_factory.build_from_mappings(
mappings=files,
tenant_id=app_model.tenant_id,
config=file_extra_config,
with self._bind_file_access_scope(tenant_id=app_model.tenant_id, user=user, invoke_from=invoke_from):
files = args["files"] if args.get("files") else []
file_extra_config = FileUploadConfigManager.convert(
override_model_config_dict or app_model_config.to_dict()
)
else:
file_objs = []
if file_extra_config:
file_objs = file_factory.build_from_mappings(
mappings=files,
tenant_id=app_model.tenant_id,
config=file_extra_config,
access_controller=self._file_access_controller,
)
else:
file_objs = []
# convert to app config
app_config = CompletionAppConfigManager.get_app_config(
app_model=app_model, app_model_config=app_model_config, override_config_dict=override_model_config_dict
)
# convert to app config
app_config = CompletionAppConfigManager.get_app_config(
app_model=app_model, app_model_config=app_model_config, override_config_dict=override_model_config_dict
)
# get tracing instance
trace_manager = TraceQueueManager(
app_id=app_model.id, user_id=user.id if isinstance(user, Account) else user.session_id
)
# get tracing instance
trace_manager = TraceQueueManager(
app_id=app_model.id, user_id=user.id if isinstance(user, Account) else user.session_id
)
# init application generate entity
application_generate_entity = CompletionAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
model_conf=ModelConfigConverter.convert(app_config),
file_upload_config=file_extra_config,
inputs=self._prepare_user_inputs(
user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
),
query=query,
files=list(file_objs),
user_id=user.id,
stream=streaming,
invoke_from=invoke_from,
extras={},
trace_manager=trace_manager,
)
# init application generate entity
application_generate_entity = CompletionAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
model_conf=ModelConfigConverter.convert(app_config),
file_upload_config=file_extra_config,
inputs=self._prepare_user_inputs(
user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
),
query=query,
files=list(file_objs),
user_id=user.id,
stream=streaming,
invoke_from=invoke_from,
extras={},
trace_manager=trace_manager,
)
# init generate records
(conversation, message) = self._init_generate_records(application_generate_entity)
# init generate records
(conversation, message) = self._init_generate_records(application_generate_entity)
# init queue manager
queue_manager = MessageBasedAppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
conversation_id=conversation.id,
app_mode=conversation.mode,
message_id=message.id,
)
# new thread with request context
@copy_current_request_context
def worker_with_context():
return self._generate_worker(
flask_app=current_app._get_current_object(), # type: ignore
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
# init queue manager
queue_manager = MessageBasedAppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
conversation_id=conversation.id,
app_mode=conversation.mode,
message_id=message.id,
)
worker_thread = threading.Thread(target=worker_with_context)
context = contextvars.copy_context()
worker_thread.start()
# new thread with request context
@copy_current_request_context
def worker_with_context():
return context.run(
self._generate_worker,
flask_app=current_app._get_current_object(), # type: ignore
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
message_id=message.id,
)
# return response or stream generator
response = self._handle_response(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation=conversation,
message=message,
user=user,
stream=streaming,
)
worker_thread = threading.Thread(target=worker_with_context)
return CompletionAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
worker_thread.start()
# return response or stream generator
response = self._handle_response(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation=conversation,
message=message,
user=user,
stream=streaming,
)
return CompletionAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
def _generate_worker(
self,
@ -280,71 +288,76 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
model_dict["completion_params"] = completion_params
override_model_config_dict["model"] = model_dict
# parse files
file_extra_config = FileUploadConfigManager.convert(override_model_config_dict)
if file_extra_config:
file_objs = file_factory.build_from_mappings(
mappings=message.message_files,
tenant_id=app_model.tenant_id,
config=file_extra_config,
with self._bind_file_access_scope(tenant_id=app_model.tenant_id, user=user, invoke_from=invoke_from):
# parse files
file_extra_config = FileUploadConfigManager.convert(override_model_config_dict)
if file_extra_config:
file_objs = file_factory.build_from_mappings(
mappings=message.message_files,
tenant_id=app_model.tenant_id,
config=file_extra_config,
access_controller=self._file_access_controller,
)
else:
file_objs = []
# convert to app config
app_config = CompletionAppConfigManager.get_app_config(
app_model=app_model, app_model_config=app_model_config, override_config_dict=override_model_config_dict
)
else:
file_objs = []
# convert to app config
app_config = CompletionAppConfigManager.get_app_config(
app_model=app_model, app_model_config=app_model_config, override_config_dict=override_model_config_dict
)
# init application generate entity
application_generate_entity = CompletionAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
model_conf=ModelConfigConverter.convert(app_config),
inputs=message.inputs,
query=message.query,
files=list(file_objs),
user_id=user.id,
stream=stream,
invoke_from=invoke_from,
extras={},
)
# init application generate entity
application_generate_entity = CompletionAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
model_conf=ModelConfigConverter.convert(app_config),
inputs=message.inputs,
query=message.query,
files=list(file_objs),
user_id=user.id,
stream=stream,
invoke_from=invoke_from,
extras={},
)
# init generate records
(conversation, message) = self._init_generate_records(application_generate_entity)
# init generate records
(conversation, message) = self._init_generate_records(application_generate_entity)
# init queue manager
queue_manager = MessageBasedAppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
conversation_id=conversation.id,
app_mode=conversation.mode,
message_id=message.id,
)
# new thread with request context
@copy_current_request_context
def worker_with_context():
return self._generate_worker(
flask_app=current_app._get_current_object(), # type: ignore
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
# init queue manager
queue_manager = MessageBasedAppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
conversation_id=conversation.id,
app_mode=conversation.mode,
message_id=message.id,
)
worker_thread = threading.Thread(target=worker_with_context)
context = contextvars.copy_context()
worker_thread.start()
# new thread with request context
@copy_current_request_context
def worker_with_context():
return context.run(
self._generate_worker,
flask_app=current_app._get_current_object(), # type: ignore
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
message_id=message.id,
)
# return response or stream generator
response = self._handle_response(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation=conversation,
message=message,
user=user,
stream=stream,
)
worker_thread = threading.Thread(target=worker_with_context)
return CompletionAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
worker_thread.start()
# return response or stream generator
response = self._handle_response(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation=conversation,
message=message,
user=user,
stream=stream,
)
return CompletionAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)

View File

@ -181,7 +181,6 @@ class CompletionAppRunner(AppRunner):
model_parameters=application_generate_entity.model_conf.parameters,
stop=stop,
stream=application_generate_entity.stream,
user=application_generate_entity.user_id,
)
# handle invoke result

View File

@ -0,0 +1,33 @@
from __future__ import annotations
import abc
from collections.abc import Mapping
from typing import Any, Protocol
from dify_graph.enums import NodeType
class DraftVariableSaver(Protocol):
@abc.abstractmethod
def save(self, process_data: Mapping[str, Any] | None, outputs: Mapping[str, Any] | None) -> None:
"""Persist node draft variables for a completed execution."""
raise NotImplementedError
class DraftVariableSaverFactory(Protocol):
@abc.abstractmethod
def __call__(
self,
app_id: str,
node_id: str,
node_type: NodeType,
node_execution_id: str,
enclosing_node_id: str | None = None,
) -> DraftVariableSaver:
"""Build a saver bound to a concrete node execution."""
raise NotImplementedError
class NoopDraftVariableSaver(DraftVariableSaver):
def save(self, process_data: Mapping[str, Any] | None, outputs: Mapping[str, Any] | None) -> None:
return None

View File

@ -28,6 +28,7 @@ from core.app.entities.task_entities import (
)
from core.app.task_pipeline.easy_ui_based_generate_task_pipeline import EasyUIBasedGenerateTaskPipeline
from core.prompt.utils.prompt_template_parser import PromptTemplateParser
from core.workflow.file_reference import resolve_file_record_id
from extensions.ext_database import db
from extensions.ext_redis import get_pubsub_broadcast_channel
from libs.broadcast_channel.channel import Topic
@ -227,7 +228,7 @@ class MessageBasedAppGenerator(BaseAppGenerator):
transfer_method=file.transfer_method,
belongs_to=MessageFileBelongsTo.USER,
url=file.remote_url,
upload_file_id=file.related_id,
upload_file_id=resolve_file_record_id(file.reference),
created_by_role=(CreatorUserRole.ACCOUNT if account_id else CreatorUserRole.END_USER),
created_by=account_id or end_user_id or "",
)

View File

@ -18,6 +18,7 @@ import contexts
from configs import dify_config
from core.app.apps.base_app_generator import BaseAppGenerator
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.draft_variable_saver import DraftVariableSaverFactory
from core.app.apps.exc import GenerateTaskStoppedError
from core.app.apps.pipeline.pipeline_config_manager import PipelineConfigManager
from core.app.apps.pipeline.pipeline_queue_manager import PipelineQueueManager
@ -34,11 +35,12 @@ from core.datasource.entities.datasource_entities import (
from core.datasource.online_drive.online_drive_plugin import OnlineDriveDatasourcePlugin
from core.entities.knowledge_entities import PipelineDataset, PipelineDocument
from core.rag.index_processor.constant.built_in_field import BuiltInField
from core.repositories.factory import DifyCoreRepositoryFactory
from core.repositories.factory import (
DifyCoreRepositoryFactory,
WorkflowExecutionRepository,
WorkflowNodeExecutionRepository,
)
from dify_graph.model_runtime.errors.invoke import InvokeAuthorizationError
from dify_graph.repositories.draft_variable_repository import DraftVariableSaverFactory
from dify_graph.repositories.workflow_execution_repository import WorkflowExecutionRepository
from dify_graph.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from dify_graph.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader
from extensions.ext_database import db
from libs.flask_utils import preserve_flask_contexts

View File

@ -12,16 +12,16 @@ from core.app.entities.app_invoke_entities import (
build_dify_run_context,
)
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id
from core.workflow.system_variables import build_bootstrap_variables, build_system_variables
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
from core.workflow.workflow_entry import WorkflowEntry
from dify_graph.entities.graph_init_params import GraphInitParams
from dify_graph.enums import WorkflowType
from dify_graph.graph import Graph
from dify_graph.graph_events import GraphEngineEvent, GraphRunFailedEvent
from dify_graph.repositories.workflow_execution_repository import WorkflowExecutionRepository
from dify_graph.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from dify_graph.runtime import GraphRuntimeState, VariablePool
from dify_graph.system_variable import SystemVariable
from dify_graph.variable_loader import VariableLoader
from dify_graph.variables.variables import RAGPipelineVariable, RAGPipelineVariableInput
from extensions.ext_database import db
@ -112,7 +112,7 @@ class PipelineRunner(WorkflowBasedAppRunner):
files = self.application_generate_entity.files
# Create a variable pool.
system_inputs = SystemVariable(
system_inputs = build_system_variables(
files=files,
user_id=user_id,
app_id=app_config.app_id,
@ -142,19 +142,25 @@ class PipelineRunner(WorkflowBasedAppRunner):
)
)
variable_pool = VariablePool(
system_variables=system_inputs,
user_inputs=inputs,
environment_variables=workflow.environment_variables,
conversation_variables=[],
rag_pipeline_variables=rag_pipeline_variables,
variable_pool = VariablePool()
add_variables_to_pool(
variable_pool,
build_bootstrap_variables(
system_variables=system_inputs,
environment_variables=workflow.environment_variables,
rag_pipeline_variables=rag_pipeline_variables,
),
)
root_node_id = self.application_generate_entity.start_node_id or get_default_root_node_id(
workflow.graph_dict
)
add_node_inputs_to_pool(variable_pool, node_id=root_node_id, inputs=inputs)
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
# init graph
graph = self._init_rag_pipeline_graph(
graph_runtime_state=graph_runtime_state,
start_node_id=self.application_generate_entity.start_node_id,
start_node_id=root_node_id,
workflow=workflow,
user_from=user_from,
invoke_from=invoke_from,

View File

@ -17,6 +17,7 @@ from configs import dify_config
from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
from core.app.apps.base_app_generator import BaseAppGenerator
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.draft_variable_saver import DraftVariableSaverFactory
from core.app.apps.exc import GenerateTaskStoppedError
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
from core.app.apps.workflow.app_queue_manager import WorkflowAppQueueManager
@ -30,11 +31,9 @@ from core.db.session_factory import session_factory
from core.helper.trace_id_helper import extract_external_trace_id_from_args
from core.ops.ops_trace_manager import TraceQueueManager
from core.repositories import DifyCoreRepositoryFactory
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.model_runtime.errors.invoke import InvokeAuthorizationError
from dify_graph.repositories.draft_variable_repository import DraftVariableSaverFactory
from dify_graph.repositories.workflow_execution_repository import WorkflowExecutionRepository
from dify_graph.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from dify_graph.runtime import GraphRuntimeState
from dify_graph.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader
from extensions.ext_database import db
@ -129,107 +128,109 @@ class WorkflowAppGenerator(BaseAppGenerator):
graph_engine_layers: Sequence[GraphEngineLayer] = (),
pause_state_config: PauseStateLayerConfig | None = None,
) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]:
files: Sequence[Mapping[str, Any]] = args.get("files") or []
with self._bind_file_access_scope(tenant_id=app_model.tenant_id, user=user, invoke_from=invoke_from):
files: Sequence[Mapping[str, Any]] = args.get("files") or []
# parse files
# TODO(QuantumGhost): Move file parsing logic to the API controller layer
# for better separation of concerns.
#
# For implementation reference, see the `_parse_file` function and
# `DraftWorkflowNodeRunApi` class which handle this properly.
file_extra_config = FileUploadConfigManager.convert(workflow.features_dict, is_vision=False)
system_files = file_factory.build_from_mappings(
mappings=files,
tenant_id=app_model.tenant_id,
config=file_extra_config,
strict_type_validation=True if invoke_from == InvokeFrom.SERVICE_API else False,
)
# convert to app config
app_config = WorkflowAppConfigManager.get_app_config(
app_model=app_model,
workflow=workflow,
)
# get tracing instance
trace_manager = TraceQueueManager(
app_id=app_model.id,
user_id=user.id if isinstance(user, Account) else user.session_id,
)
inputs: Mapping[str, Any] = args["inputs"]
extras = {
**extract_external_trace_id_from_args(args),
}
workflow_run_id = str(workflow_run_id or uuid.uuid4())
# FIXME (Yeuoly): we need to remove the SKIP_PREPARE_USER_INPUTS_KEY from the args
# trigger shouldn't prepare user inputs
if self._should_prepare_user_inputs(args):
inputs = self._prepare_user_inputs(
user_inputs=inputs,
variables=app_config.variables,
# parse files
# TODO(QuantumGhost): Move file parsing logic to the API controller layer
# for better separation of concerns.
#
# For implementation reference, see the `_parse_file` function and
# `DraftWorkflowNodeRunApi` class which handle this properly.
file_extra_config = FileUploadConfigManager.convert(workflow.features_dict, is_vision=False)
system_files = file_factory.build_from_mappings(
mappings=files,
tenant_id=app_model.tenant_id,
config=file_extra_config,
strict_type_validation=True if invoke_from == InvokeFrom.SERVICE_API else False,
access_controller=self._file_access_controller,
)
# init application generate entity
application_generate_entity = WorkflowAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
file_upload_config=file_extra_config,
inputs=inputs,
files=list(system_files),
user_id=user.id,
stream=streaming,
invoke_from=invoke_from,
call_depth=call_depth,
trace_manager=trace_manager,
workflow_execution_id=workflow_run_id,
extras=extras,
)
contexts.plugin_tool_providers.set({})
contexts.plugin_tool_providers_lock.set(threading.Lock())
# convert to app config
app_config = WorkflowAppConfigManager.get_app_config(
app_model=app_model,
workflow=workflow,
)
# Create repositories
#
# Create session factory
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
# Create workflow execution(aka workflow run) repository
if triggered_from is not None:
# Use explicitly provided triggered_from (for async triggers)
workflow_triggered_from = triggered_from
elif invoke_from == InvokeFrom.DEBUGGER:
workflow_triggered_from = WorkflowRunTriggeredFrom.DEBUGGING
else:
workflow_triggered_from = WorkflowRunTriggeredFrom.APP_RUN
workflow_execution_repository = DifyCoreRepositoryFactory.create_workflow_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
triggered_from=workflow_triggered_from,
)
# Create workflow node execution repository
workflow_node_execution_repository = DifyCoreRepositoryFactory.create_workflow_node_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
)
# get tracing instance
trace_manager = TraceQueueManager(
app_id=app_model.id,
user_id=user.id if isinstance(user, Account) else user.session_id,
)
return self._generate(
app_model=app_model,
workflow=workflow,
user=user,
application_generate_entity=application_generate_entity,
invoke_from=invoke_from,
workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository,
streaming=streaming,
root_node_id=root_node_id,
graph_engine_layers=graph_engine_layers,
pause_state_config=pause_state_config,
)
inputs: Mapping[str, Any] = args["inputs"]
extras = {
**extract_external_trace_id_from_args(args),
}
workflow_run_id = str(workflow_run_id or uuid.uuid4())
# FIXME (Yeuoly): we need to remove the SKIP_PREPARE_USER_INPUTS_KEY from the args
# trigger shouldn't prepare user inputs
if self._should_prepare_user_inputs(args):
inputs = self._prepare_user_inputs(
user_inputs=inputs,
variables=app_config.variables,
tenant_id=app_model.tenant_id,
strict_type_validation=True if invoke_from == InvokeFrom.SERVICE_API else False,
)
# init application generate entity
application_generate_entity = WorkflowAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
file_upload_config=file_extra_config,
inputs=inputs,
files=list(system_files),
user_id=user.id,
stream=streaming,
invoke_from=invoke_from,
call_depth=call_depth,
trace_manager=trace_manager,
workflow_execution_id=workflow_run_id,
extras=extras,
)
contexts.plugin_tool_providers.set({})
contexts.plugin_tool_providers_lock.set(threading.Lock())
# Create repositories
#
# Create session factory
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
# Create workflow execution(aka workflow run) repository
if triggered_from is not None:
# Use explicitly provided triggered_from (for async triggers)
workflow_triggered_from = triggered_from
elif invoke_from == InvokeFrom.DEBUGGER:
workflow_triggered_from = WorkflowRunTriggeredFrom.DEBUGGING
else:
workflow_triggered_from = WorkflowRunTriggeredFrom.APP_RUN
workflow_execution_repository = DifyCoreRepositoryFactory.create_workflow_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
triggered_from=workflow_triggered_from,
)
# Create workflow node execution repository
workflow_node_execution_repository = DifyCoreRepositoryFactory.create_workflow_node_execution_repository(
session_factory=session_factory,
user=user,
app_id=application_generate_entity.app_config.app_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
)
return self._generate(
app_model=app_model,
workflow=workflow,
user=user,
application_generate_entity=application_generate_entity,
invoke_from=invoke_from,
workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository,
streaming=streaming,
root_node_id=root_node_id,
graph_engine_layers=graph_engine_layers,
pause_state_config=pause_state_config,
)
def resume(
self,
@ -292,62 +293,67 @@ class WorkflowAppGenerator(BaseAppGenerator):
:param workflow_node_execution_repository: repository for workflow node execution
:param streaming: is stream
"""
graph_layers: list[GraphEngineLayer] = list(graph_engine_layers)
with self._bind_file_access_scope(
tenant_id=application_generate_entity.app_config.tenant_id,
user=user,
invoke_from=invoke_from,
):
graph_layers: list[GraphEngineLayer] = list(graph_engine_layers)
# init queue manager
queue_manager = WorkflowAppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
app_mode=app_model.mode,
)
if pause_state_config is not None:
graph_layers.append(
PauseStatePersistenceLayer(
session_factory=pause_state_config.session_factory,
generate_entity=application_generate_entity,
state_owner_user_id=pause_state_config.state_owner_user_id,
)
# init queue manager
queue_manager = WorkflowAppQueueManager(
task_id=application_generate_entity.task_id,
user_id=application_generate_entity.user_id,
invoke_from=application_generate_entity.invoke_from,
app_mode=app_model.mode,
)
# new thread with request context and contextvars
context = contextvars.copy_context()
if pause_state_config is not None:
graph_layers.append(
PauseStatePersistenceLayer(
session_factory=pause_state_config.session_factory,
generate_entity=application_generate_entity,
state_owner_user_id=pause_state_config.state_owner_user_id,
)
)
# release database connection, because the following new thread operations may take a long time
db.session.close()
# new thread with request context and contextvars
context = contextvars.copy_context()
worker_thread = threading.Thread(
target=self._generate_worker,
kwargs={
"flask_app": current_app._get_current_object(), # type: ignore
"application_generate_entity": application_generate_entity,
"queue_manager": queue_manager,
"context": context,
"variable_loader": variable_loader,
"root_node_id": root_node_id,
"workflow_execution_repository": workflow_execution_repository,
"workflow_node_execution_repository": workflow_node_execution_repository,
"graph_engine_layers": tuple(graph_layers),
"graph_runtime_state": graph_runtime_state,
},
)
# release database connection, because the following new thread operations may take a long time
db.session.close()
worker_thread.start()
worker_thread = threading.Thread(
target=self._generate_worker,
kwargs={
"flask_app": current_app._get_current_object(), # type: ignore
"application_generate_entity": application_generate_entity,
"queue_manager": queue_manager,
"context": context,
"variable_loader": variable_loader,
"root_node_id": root_node_id,
"workflow_execution_repository": workflow_execution_repository,
"workflow_node_execution_repository": workflow_node_execution_repository,
"graph_engine_layers": tuple(graph_layers),
"graph_runtime_state": graph_runtime_state,
},
)
draft_var_saver_factory = self._get_draft_var_saver_factory(invoke_from, user)
worker_thread.start()
# return response or stream generator
response = self._handle_response(
application_generate_entity=application_generate_entity,
workflow=workflow,
queue_manager=queue_manager,
user=user,
draft_var_saver_factory=draft_var_saver_factory,
stream=streaming,
)
draft_var_saver_factory = self._get_draft_var_saver_factory(invoke_from, user)
return WorkflowAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
# return response or stream generator
response = self._handle_response(
application_generate_entity=application_generate_entity,
workflow=workflow,
queue_manager=queue_manager,
user=user,
draft_var_saver_factory=draft_var_saver_factory,
stream=streaming,
)
return WorkflowAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
def single_iteration_generate(
self,

View File

@ -8,14 +8,15 @@ from core.app.apps.workflow.app_config_manager import WorkflowAppConfig
from core.app.apps.workflow_app_runner import WorkflowBasedAppRunner
from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
from core.workflow.node_factory import get_default_root_node_id
from core.workflow.system_variables import build_bootstrap_variables, build_system_variables
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
from core.workflow.workflow_entry import WorkflowEntry
from dify_graph.enums import WorkflowType
from dify_graph.graph_engine.command_channels.redis_channel import RedisChannel
from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.repositories.workflow_execution_repository import WorkflowExecutionRepository
from dify_graph.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from dify_graph.runtime import GraphRuntimeState, VariablePool
from dify_graph.system_variable import SystemVariable
from dify_graph.variable_loader import VariableLoader
from extensions.ext_redis import redis_client
from extensions.otel import WorkflowAppRunnerHandler, trace_span
@ -96,7 +97,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
inputs = self.application_generate_entity.inputs
# Create a variable pool.
system_inputs = SystemVariable(
system_inputs = build_system_variables(
files=self.application_generate_entity.files,
user_id=self._sys_user_id,
app_id=app_config.app_id,
@ -104,12 +105,16 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
workflow_id=app_config.workflow_id,
workflow_execution_id=self.application_generate_entity.workflow_execution_id,
)
variable_pool = VariablePool(
system_variables=system_inputs,
user_inputs=inputs,
environment_variables=self._workflow.environment_variables,
conversation_variables=[],
variable_pool = VariablePool()
add_variables_to_pool(
variable_pool,
build_bootstrap_variables(
system_variables=system_inputs,
environment_variables=self._workflow.environment_variables,
),
)
root_node_id = self._root_node_id or get_default_root_node_id(self._workflow.graph_dict)
add_node_inputs_to_pool(variable_pool, node_id=root_node_id, inputs=inputs)
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
graph = self._init_graph(
@ -120,7 +125,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
user_id=self.application_generate_entity.user_id,
user_from=user_from,
invoke_from=invoke_from,
root_node_id=self._root_node_id,
root_node_id=root_node_id,
)
# RUN WORKFLOW

View File

@ -10,6 +10,7 @@ from constants.tts_auto_play_timeout import TTS_AUTO_PLAY_TIMEOUT, TTS_AUTO_PLAY
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.apps.common.graph_runtime_state_support import GraphRuntimeStateSupport
from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter
from core.app.apps.draft_variable_saver import DraftVariableSaverFactory
from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity
from core.app.entities.queue_entities import (
AppQueueEvent,
@ -55,11 +56,10 @@ from core.app.entities.task_entities import (
from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline
from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
from core.ops.ops_trace_manager import TraceQueueManager
from core.workflow.system_variables import build_system_variables
from dify_graph.entities.workflow_start_reason import WorkflowStartReason
from dify_graph.enums import WorkflowExecutionStatus
from dify_graph.repositories.draft_variable_repository import DraftVariableSaverFactory
from dify_graph.runtime import GraphRuntimeState
from dify_graph.system_variable import SystemVariable
from extensions.ext_database import db
from models import Account
from models.enums import CreatorUserRole
@ -104,7 +104,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
self._invoke_from = queue_manager.invoke_from
self._draft_var_saver_factory = draft_var_saver_factory
self._workflow = workflow
self._workflow_system_variables = SystemVariable(
self._workflow_system_variables = build_system_variables(
files=application_generate_entity.files,
user_id=user_session_id,
app_id=application_generate_entity.app_config.app_id,
@ -728,13 +728,11 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
return response
def _save_output_for_event(self, event: QueueNodeSucceededEvent | QueueNodeExceptionEvent, node_execution_id: str):
with Session(db.engine) as session, session.begin():
saver = self._draft_var_saver_factory(
session=session,
app_id=self._application_generate_entity.app_config.app_id,
node_id=event.node_id,
node_type=event.node_type,
node_execution_id=node_execution_id,
enclosing_node_id=event.in_loop_id or event.in_iteration_id,
)
saver.save(event.process_data, event.outputs)
saver = self._draft_var_saver_factory(
app_id=self._application_generate_entity.app_config.app_id,
node_id=event.node_id,
node_type=event.node_type,
node_execution_id=node_execution_id,
enclosing_node_id=event.in_loop_id or event.in_iteration_id,
)
saver.save(event.process_data, event.outputs)

View File

@ -34,7 +34,16 @@ from core.app.entities.queue_entities import (
)
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id, resolve_workflow_node_class
from core.workflow.system_variables import (
build_bootstrap_variables,
default_system_variables,
get_node_creation_preload_selectors,
inject_default_system_variable_mappings,
preload_node_creation_variables,
)
from core.workflow.variable_pool_initializer import add_variables_to_pool
from core.workflow.workflow_entry import WorkflowEntry
from core.workflow.workflow_run_outputs import project_node_outputs_for_workflow_run
from dify_graph.entities import GraphInitParams
from dify_graph.entities.graph_config import NodeConfigDictAdapter
from dify_graph.entities.pause_reason import HumanInputRequired
@ -68,7 +77,6 @@ from dify_graph.graph_events import (
)
from dify_graph.graph_events.graph import GraphRunAbortedEvent
from dify_graph.runtime import GraphRuntimeState, VariablePool
from dify_graph.system_variable import SystemVariable
from dify_graph.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader, load_into_variable_pool
from models.workflow import Workflow
from tasks.mail_human_input_delivery_task import dispatch_human_input_email_task
@ -173,14 +181,15 @@ class WorkflowBasedAppRunner:
ValueError: If neither single_iteration_run nor single_loop_run is specified
"""
# Create initial runtime state with variable pool containing environment variables
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool(
system_variables=SystemVariable.default(),
user_inputs={},
variable_pool = VariablePool()
add_variables_to_pool(
variable_pool,
build_bootstrap_variables(
system_variables=default_system_variables(),
environment_variables=workflow.environment_variables,
),
start_at=time.time(),
)
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.time())
# Determine which type of single node execution and get graph/variable_pool
if single_iteration_run:
@ -272,6 +281,8 @@ class WorkflowBasedAppRunner:
graph_config["edges"] = edge_configs
typed_node_configs = [NodeConfigDictAdapter.validate_python(node) for node in node_configs]
# Create required parameters for Graph.init
graph_init_params = GraphInitParams(
workflow_id=workflow.id,
@ -291,26 +302,15 @@ class WorkflowBasedAppRunner:
graph_runtime_state=graph_runtime_state,
)
# init graph
graph = Graph.init(
graph_config=graph_config, node_factory=node_factory, root_node_id=node_id, skip_validation=True
)
if not graph:
raise ValueError("graph not found in workflow")
# fetch node config from node id
target_node_config = None
for node in node_configs:
if node.get("id") == node_id:
for node in typed_node_configs:
if node["id"] == node_id:
target_node_config = node
break
if not target_node_config:
raise ValueError(f"{node_type_label} node id not found in workflow graph")
target_node_config = NodeConfigDictAdapter.validate_python(target_node_config)
# Get node class
node_type = target_node_config["data"].type
node_version = str(target_node_config["data"].version)
@ -319,12 +319,31 @@ class WorkflowBasedAppRunner:
# Use the variable pool from graph_runtime_state instead of creating a new one
variable_pool = graph_runtime_state.variable_pool
preload_node_creation_variables(
variable_loader=self._variable_loader,
variable_pool=variable_pool,
selectors=[
selector
for node_config in typed_node_configs
for selector in get_node_creation_preload_selectors(
node_type=node_config["data"].type,
node_data=node_config["data"],
)
],
)
try:
variable_mapping = node_cls.extract_variable_selector_to_variable_mapping(
graph_config=workflow.graph_dict, config=target_node_config
)
except NotImplementedError:
variable_mapping = {}
variable_mapping = inject_default_system_variable_mappings(
node_id=target_node_config["id"],
node_type=node_type,
node_data=target_node_config["data"],
variable_mapping=variable_mapping,
)
load_into_variable_pool(
variable_loader=self._variable_loader,
@ -340,6 +359,14 @@ class WorkflowBasedAppRunner:
tenant_id=workflow.tenant_id,
)
# init graph after constructor-time context has been loaded
graph = Graph.init(
graph_config=graph_config, node_factory=node_factory, root_node_id=node_id, skip_validation=True
)
if not graph:
raise ValueError("graph not found in workflow")
return graph, variable_pool
@staticmethod
@ -408,7 +435,11 @@ class WorkflowBasedAppRunner:
node_run_result = event.node_run_result
inputs = node_run_result.inputs
process_data = node_run_result.process_data
outputs = node_run_result.outputs
outputs = project_node_outputs_for_workflow_run(
node_type=event.node_type,
inputs=inputs,
outputs=node_run_result.outputs,
)
execution_metadata = node_run_result.metadata
self._publish_event(
QueueNodeRetryEvent(
@ -448,7 +479,11 @@ class WorkflowBasedAppRunner:
node_run_result = event.node_run_result
inputs = node_run_result.inputs
process_data = node_run_result.process_data
outputs = node_run_result.outputs
outputs = project_node_outputs_for_workflow_run(
node_type=event.node_type,
inputs=inputs,
outputs=node_run_result.outputs,
)
execution_metadata = node_run_result.metadata
self._publish_event(
QueueNodeSucceededEvent(
@ -466,6 +501,11 @@ class WorkflowBasedAppRunner:
)
)
elif isinstance(event, NodeRunFailedEvent):
outputs = project_node_outputs_for_workflow_run(
node_type=event.node_type,
inputs=event.node_run_result.inputs,
outputs=event.node_run_result.outputs,
)
self._publish_event(
QueueNodeFailedEvent(
node_execution_id=event.id,
@ -475,7 +515,7 @@ class WorkflowBasedAppRunner:
finished_at=event.finished_at,
inputs=event.node_run_result.inputs,
process_data=event.node_run_result.process_data,
outputs=event.node_run_result.outputs,
outputs=outputs,
error=event.node_run_result.error or "Unknown error",
execution_metadata=event.node_run_result.metadata,
in_iteration_id=event.in_iteration_id,
@ -483,6 +523,11 @@ class WorkflowBasedAppRunner:
)
)
elif isinstance(event, NodeRunExceptionEvent):
outputs = project_node_outputs_for_workflow_run(
node_type=event.node_type,
inputs=event.node_run_result.inputs,
outputs=event.node_run_result.outputs,
)
self._publish_event(
QueueNodeExceptionEvent(
node_execution_id=event.id,
@ -492,7 +537,7 @@ class WorkflowBasedAppRunner:
finished_at=event.finished_at,
inputs=event.node_run_result.inputs,
process_data=event.node_run_result.process_data,
outputs=event.node_run_result.outputs,
outputs=outputs,
error=event.node_run_result.error or "Unknown error",
execution_metadata=event.node_run_result.metadata,
in_iteration_id=event.in_iteration_id,