Files
dify/api/tasks/workflow_node_execution_tasks.py
-LAN- e4619b5b73 refactor(workflow): consolidate persistence async writes
Route workflow persistence mode from InvokeFrom so debugger executions keep synchronous DB writes while non-debug invocations enqueue Celery tasks through the default SQLAlchemy repositories.

Remove the legacy Celery workflow execution repositories, obsolete workflow node execution storage config, and tests tied only to the removed repository classes.
2026-05-20 20:59:18 +08:00

214 lines
7.9 KiB
Python

"""
Celery tasks for asynchronous workflow node execution storage operations.
These tasks provide asynchronous storage capabilities for workflow node execution data,
improving performance by offloading storage operations to background workers.
"""
import json
import logging
from typing import Any
from celery import shared_task
from sqlalchemy import select
from core.db.session_factory import session_factory
from graphon.entities.workflow_node_execution import (
WorkflowNodeExecution,
)
from graphon.workflow_type_encoder import WorkflowRuntimeTypeConverter
from models import Account, CreatorUserRole, EndUser, WorkflowNodeExecutionModel
from models.workflow import WorkflowNodeExecutionTriggeredFrom
logger = logging.getLogger(__name__)
@shared_task(queue="workflow_storage", bind=True, max_retries=3, default_retry_delay=60)
def save_workflow_node_execution_task(
self,
execution_data: dict[str, Any],
tenant_id: str,
app_id: str,
triggered_from: str,
creator_user_id: str,
creator_user_role: str,
) -> bool:
"""
Asynchronously save or update workflow node execution metadata to the database.
Args:
execution_data: Serialized WorkflowNodeExecution data
tenant_id: Tenant ID for multi-tenancy
app_id: Application ID
triggered_from: Source of the execution trigger
creator_user_id: ID of the user who created the execution
creator_user_role: Role of the user who created the execution
Returns:
True if successful, False otherwise
"""
try:
with session_factory.create_session() as session:
execution = WorkflowNodeExecution.model_validate(execution_data)
existing_execution = session.scalar(
select(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id == execution.id)
)
if existing_execution:
_update_node_execution_metadata(existing_execution, execution)
logger.debug("Updated existing workflow node execution metadata: %s", execution.id)
else:
node_execution = _create_node_execution_from_domain(
execution=execution,
tenant_id=tenant_id,
app_id=app_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom(triggered_from),
creator_user_id=creator_user_id,
creator_user_role=CreatorUserRole(creator_user_role),
)
session.add(node_execution)
logger.debug("Created new workflow node execution: %s", execution.id)
session.commit()
return True
except Exception as e:
logger.exception("Failed to save workflow node execution %s", execution_data.get("id", "unknown"))
raise self.retry(exc=e, countdown=60 * (2**self.request.retries))
@shared_task(queue="workflow_storage", bind=True, max_retries=3, default_retry_delay=60)
def save_workflow_node_execution_data_task(
self,
execution_data: dict[str, Any],
tenant_id: str,
app_id: str,
triggered_from: str,
creator_user_id: str,
creator_user_role: str,
) -> bool:
"""
Asynchronously save full workflow node execution data to the database.
This path preserves the SQLAlchemy repository's truncation and offload behavior while
moving the blocking database and storage work out of the workflow engine thread.
"""
try:
execution = WorkflowNodeExecution.model_validate(execution_data)
repository = _create_sqlalchemy_repository(
tenant_id=tenant_id,
app_id=app_id,
triggered_from=triggered_from,
creator_user_id=creator_user_id,
creator_user_role=creator_user_role,
)
repository.save(execution)
repository.save_execution_data(execution)
return True
except Exception as e:
logger.exception("Failed to save workflow node execution data %s", execution_data.get("id", "unknown"))
raise self.retry(exc=e, countdown=60 * (2**self.request.retries))
def _create_sqlalchemy_repository(
*,
tenant_id: str,
app_id: str,
triggered_from: str,
creator_user_id: str,
creator_user_role: str,
) -> "SQLAlchemyWorkflowNodeExecutionRepository":
from core.repositories.sqlalchemy_workflow_node_execution_repository import (
SQLAlchemyWorkflowNodeExecutionRepository,
)
session_maker = session_factory.get_session_maker()
role = CreatorUserRole(creator_user_role)
user: Account | EndUser | None
with session_maker() as session:
if role == CreatorUserRole.ACCOUNT:
user = session.get(Account, creator_user_id)
if user is not None:
user.set_tenant_id(tenant_id)
else:
user = session.get(EndUser, creator_user_id)
if user is None:
raise ValueError(f"Creator user {creator_user_id} not found for workflow node execution persistence")
return SQLAlchemyWorkflowNodeExecutionRepository(
session_factory=session_maker,
user=user,
app_id=app_id or None,
triggered_from=WorkflowNodeExecutionTriggeredFrom(triggered_from),
)
def _create_node_execution_from_domain(
execution: WorkflowNodeExecution,
tenant_id: str,
app_id: str,
triggered_from: WorkflowNodeExecutionTriggeredFrom,
creator_user_id: str,
creator_user_role: CreatorUserRole,
) -> WorkflowNodeExecutionModel:
"""
Create a WorkflowNodeExecutionModel database model from a WorkflowNodeExecution domain entity.
"""
node_execution = WorkflowNodeExecutionModel()
node_execution.id = execution.id
node_execution.tenant_id = tenant_id
node_execution.app_id = app_id
node_execution.workflow_id = execution.workflow_id
node_execution.triggered_from = triggered_from
node_execution.workflow_run_id = execution.workflow_execution_id
node_execution.index = execution.index
node_execution.predecessor_node_id = execution.predecessor_node_id
node_execution.node_id = execution.node_id
node_execution.node_type = execution.node_type
node_execution.title = execution.title
node_execution.node_execution_id = execution.node_execution_id
node_execution.inputs = "{}"
node_execution.process_data = "{}"
node_execution.outputs = "{}"
json_converter = WorkflowRuntimeTypeConverter()
if execution.metadata:
metadata_for_json = {
key.value if hasattr(key, "value") else str(key): value for key, value in execution.metadata.items()
}
node_execution.execution_metadata = json.dumps(json_converter.to_json_encodable(metadata_for_json))
else:
node_execution.execution_metadata = "{}"
node_execution.status = execution.status
node_execution.error = execution.error
node_execution.elapsed_time = execution.elapsed_time
node_execution.created_by_role = creator_user_role
node_execution.created_by = creator_user_id
node_execution.created_at = execution.created_at
node_execution.finished_at = execution.finished_at
return node_execution
def _update_node_execution_metadata(node_execution: WorkflowNodeExecutionModel, execution: WorkflowNodeExecution):
"""
Update WorkflowNodeExecutionModel metadata without changing persisted data payload fields.
"""
json_converter = WorkflowRuntimeTypeConverter()
if execution.metadata:
metadata_for_json = {
key.value if hasattr(key, "value") else str(key): value for key, value in execution.metadata.items()
}
node_execution.execution_metadata = json.dumps(json_converter.to_json_encodable(metadata_for_json))
else:
node_execution.execution_metadata = "{}"
node_execution.status = execution.status
node_execution.error = execution.error
node_execution.elapsed_time = execution.elapsed_time
node_execution.finished_at = execution.finished_at