diff --git a/api/repositories/sqlalchemy_api_workflow_node_execution_repository.py b/api/repositories/sqlalchemy_api_workflow_node_execution_repository.py index 2de3a15d65..6c6dc1dea1 100644 --- a/api/repositories/sqlalchemy_api_workflow_node_execution_repository.py +++ b/api/repositories/sqlalchemy_api_workflow_node_execution_repository.py @@ -9,7 +9,7 @@ from collections.abc import Sequence from datetime import datetime from typing import TypedDict, cast -from sqlalchemy import asc, delete, desc, func, select, tuple_ +from sqlalchemy import asc, delete, desc, func, select from sqlalchemy.engine import CursorResult from sqlalchemy.orm import Session, sessionmaker @@ -328,39 +328,14 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut """ Delete node executions (and offloads) for the given workflow runs using indexed columns. - Uses the composite index on (tenant_id, app_id, workflow_id, triggered_from, workflow_run_id) - by filtering on those columns with tuple IN. + Uses the workflow_run_id index to target executions by run id. """ if not runs: return 0, 0 - tuple_values = [ - ( - run["tenant_id"], - run["app_id"], - run["workflow_id"], - DifyAPISQLAlchemyWorkflowNodeExecutionRepository._map_run_triggered_from_to_node_triggered_from( - run["triggered_from"] - ), - run["run_id"], - ) - for run in runs - ] - - node_execution_ids = session.scalars( - select(WorkflowNodeExecutionModel.id).where( - tuple_( - WorkflowNodeExecutionModel.tenant_id, - WorkflowNodeExecutionModel.app_id, - WorkflowNodeExecutionModel.workflow_id, - WorkflowNodeExecutionModel.triggered_from, - WorkflowNodeExecutionModel.workflow_run_id, - ).in_(tuple_values) - ) - ).all() - - if not node_execution_ids: - return 0, 0 + run_ids = [run["run_id"] for run in runs] + run_id_filter = WorkflowNodeExecutionModel.workflow_run_id.in_(run_ids) + node_execution_ids = select(WorkflowNodeExecutionModel.id).where(run_id_filter) offloads_deleted = ( cast( @@ -378,7 +353,7 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut cast( CursorResult, session.execute( - delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id.in_(node_execution_ids)) + delete(WorkflowNodeExecutionModel).where(run_id_filter) ), ).rowcount or 0 @@ -394,38 +369,18 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut if not runs: return 0, 0 - tuple_values = [ - ( - run["tenant_id"], - run["app_id"], - run["workflow_id"], - DifyAPISQLAlchemyWorkflowNodeExecutionRepository._map_run_triggered_from_to_node_triggered_from( - run["triggered_from"] - ), - run["run_id"], - ) - for run in runs - ] - tuple_filter = tuple_( - WorkflowNodeExecutionModel.tenant_id, - WorkflowNodeExecutionModel.app_id, - WorkflowNodeExecutionModel.workflow_id, - WorkflowNodeExecutionModel.triggered_from, - WorkflowNodeExecutionModel.workflow_run_id, - ).in_(tuple_values) + run_ids = [run["run_id"] for run in runs] + run_id_filter = WorkflowNodeExecutionModel.workflow_run_id.in_(run_ids) node_executions_count = ( - session.scalar(select(func.count()).select_from(WorkflowNodeExecutionModel).where(tuple_filter)) or 0 + session.scalar(select(func.count()).select_from(WorkflowNodeExecutionModel).where(run_id_filter)) or 0 ) + node_execution_ids = select(WorkflowNodeExecutionModel.id).where(run_id_filter) offloads_count = ( session.scalar( select(func.count()) .select_from(WorkflowNodeExecutionOffload) - .join( - WorkflowNodeExecutionModel, - WorkflowNodeExecutionOffload.node_execution_id == WorkflowNodeExecutionModel.id, - ) - .where(tuple_filter) + .where(WorkflowNodeExecutionOffload.node_execution_id.in_(node_execution_ids)) ) or 0 )