improve query workflowNodeExecution

This commit is contained in:
hjlarry
2026-01-16 14:46:46 +08:00
parent 65a9233559
commit c5eeb929cf

View File

@ -9,7 +9,7 @@ from collections.abc import Sequence
from datetime import datetime
from typing import TypedDict, cast
from sqlalchemy import asc, delete, desc, func, select, tuple_
from sqlalchemy import asc, delete, desc, func, select
from sqlalchemy.engine import CursorResult
from sqlalchemy.orm import Session, sessionmaker
@ -328,39 +328,14 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut
"""
Delete node executions (and offloads) for the given workflow runs using indexed columns.
Uses the composite index on (tenant_id, app_id, workflow_id, triggered_from, workflow_run_id)
by filtering on those columns with tuple IN.
Uses the workflow_run_id index to target executions by run id.
"""
if not runs:
return 0, 0
tuple_values = [
(
run["tenant_id"],
run["app_id"],
run["workflow_id"],
DifyAPISQLAlchemyWorkflowNodeExecutionRepository._map_run_triggered_from_to_node_triggered_from(
run["triggered_from"]
),
run["run_id"],
)
for run in runs
]
node_execution_ids = session.scalars(
select(WorkflowNodeExecutionModel.id).where(
tuple_(
WorkflowNodeExecutionModel.tenant_id,
WorkflowNodeExecutionModel.app_id,
WorkflowNodeExecutionModel.workflow_id,
WorkflowNodeExecutionModel.triggered_from,
WorkflowNodeExecutionModel.workflow_run_id,
).in_(tuple_values)
)
).all()
if not node_execution_ids:
return 0, 0
run_ids = [run["run_id"] for run in runs]
run_id_filter = WorkflowNodeExecutionModel.workflow_run_id.in_(run_ids)
node_execution_ids = select(WorkflowNodeExecutionModel.id).where(run_id_filter)
offloads_deleted = (
cast(
@ -378,7 +353,7 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut
cast(
CursorResult,
session.execute(
delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id.in_(node_execution_ids))
delete(WorkflowNodeExecutionModel).where(run_id_filter)
),
).rowcount
or 0
@ -394,38 +369,18 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut
if not runs:
return 0, 0
tuple_values = [
(
run["tenant_id"],
run["app_id"],
run["workflow_id"],
DifyAPISQLAlchemyWorkflowNodeExecutionRepository._map_run_triggered_from_to_node_triggered_from(
run["triggered_from"]
),
run["run_id"],
)
for run in runs
]
tuple_filter = tuple_(
WorkflowNodeExecutionModel.tenant_id,
WorkflowNodeExecutionModel.app_id,
WorkflowNodeExecutionModel.workflow_id,
WorkflowNodeExecutionModel.triggered_from,
WorkflowNodeExecutionModel.workflow_run_id,
).in_(tuple_values)
run_ids = [run["run_id"] for run in runs]
run_id_filter = WorkflowNodeExecutionModel.workflow_run_id.in_(run_ids)
node_executions_count = (
session.scalar(select(func.count()).select_from(WorkflowNodeExecutionModel).where(tuple_filter)) or 0
session.scalar(select(func.count()).select_from(WorkflowNodeExecutionModel).where(run_id_filter)) or 0
)
node_execution_ids = select(WorkflowNodeExecutionModel.id).where(run_id_filter)
offloads_count = (
session.scalar(
select(func.count())
.select_from(WorkflowNodeExecutionOffload)
.join(
WorkflowNodeExecutionModel,
WorkflowNodeExecutionOffload.node_execution_id == WorkflowNodeExecutionModel.id,
)
.where(tuple_filter)
.where(WorkflowNodeExecutionOffload.node_execution_id.in_(node_execution_ids))
)
or 0
)