mirror of
https://github.com/langgenius/dify.git
synced 2026-05-05 18:08:07 +08:00
Merge remote-tracking branch 'myori/main' into feat/collaboration2
This commit is contained in:
@ -16,7 +16,7 @@ from typing import Protocol
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
|
||||
from models.workflow import WorkflowNodeExecutionModel
|
||||
from models.workflow import WorkflowNodeExecutionModel, WorkflowNodeExecutionOffload
|
||||
|
||||
|
||||
class DifyAPIWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository, Protocol):
|
||||
@ -209,3 +209,23 @@ class DifyAPIWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository, Pr
|
||||
The number of executions deleted
|
||||
"""
|
||||
...
|
||||
|
||||
def get_offloads_by_execution_ids(
|
||||
self,
|
||||
session: Session,
|
||||
node_execution_ids: Sequence[str],
|
||||
) -> Sequence[WorkflowNodeExecutionOffload]:
|
||||
"""
|
||||
Get offload records by node execution IDs.
|
||||
|
||||
This method retrieves workflow node execution offload records
|
||||
that belong to the given node execution IDs.
|
||||
|
||||
Args:
|
||||
session: The database session to use
|
||||
node_execution_ids: List of node execution IDs to filter by
|
||||
|
||||
Returns:
|
||||
A sequence of WorkflowNodeExecutionOffload instances
|
||||
"""
|
||||
...
|
||||
|
||||
@ -45,7 +45,7 @@ from core.workflow.enums import WorkflowType
|
||||
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
|
||||
from libs.infinite_scroll_pagination import InfiniteScrollPagination
|
||||
from models.enums import WorkflowRunTriggeredFrom
|
||||
from models.workflow import WorkflowRun
|
||||
from models.workflow import WorkflowAppLog, WorkflowArchiveLog, WorkflowPause, WorkflowPauseReason, WorkflowRun
|
||||
from repositories.entities.workflow_pause import WorkflowPauseEntity
|
||||
from repositories.types import (
|
||||
AverageInteractionStats,
|
||||
@ -270,6 +270,58 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol):
|
||||
"""
|
||||
...
|
||||
|
||||
def get_archived_run_ids(
|
||||
self,
|
||||
session: Session,
|
||||
run_ids: Sequence[str],
|
||||
) -> set[str]:
|
||||
"""
|
||||
Fetch workflow run IDs that already have archive log records.
|
||||
"""
|
||||
...
|
||||
|
||||
def get_archived_logs_by_time_range(
|
||||
self,
|
||||
session: Session,
|
||||
tenant_ids: Sequence[str] | None,
|
||||
start_date: datetime,
|
||||
end_date: datetime,
|
||||
limit: int,
|
||||
) -> Sequence[WorkflowArchiveLog]:
|
||||
"""
|
||||
Fetch archived workflow logs by time range for restore.
|
||||
"""
|
||||
...
|
||||
|
||||
def get_archived_log_by_run_id(
|
||||
self,
|
||||
run_id: str,
|
||||
) -> WorkflowArchiveLog | None:
|
||||
"""
|
||||
Fetch a workflow archive log by workflow run ID.
|
||||
"""
|
||||
...
|
||||
|
||||
def delete_archive_log_by_run_id(
|
||||
self,
|
||||
session: Session,
|
||||
run_id: str,
|
||||
) -> int:
|
||||
"""
|
||||
Delete archive log by workflow run ID.
|
||||
|
||||
Used after restoring a workflow run to remove the archive log record,
|
||||
allowing the run to be archived again if needed.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
run_id: Workflow run ID
|
||||
|
||||
Returns:
|
||||
Number of records deleted (0 or 1)
|
||||
"""
|
||||
...
|
||||
|
||||
def delete_runs_with_related(
|
||||
self,
|
||||
runs: Sequence[WorkflowRun],
|
||||
@ -282,6 +334,61 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol):
|
||||
"""
|
||||
...
|
||||
|
||||
def get_pause_records_by_run_id(
|
||||
self,
|
||||
session: Session,
|
||||
run_id: str,
|
||||
) -> Sequence[WorkflowPause]:
|
||||
"""
|
||||
Fetch workflow pause records by workflow run ID.
|
||||
"""
|
||||
...
|
||||
|
||||
def get_pause_reason_records_by_run_id(
|
||||
self,
|
||||
session: Session,
|
||||
pause_ids: Sequence[str],
|
||||
) -> Sequence[WorkflowPauseReason]:
|
||||
"""
|
||||
Fetch workflow pause reason records by pause IDs.
|
||||
"""
|
||||
...
|
||||
|
||||
def get_app_logs_by_run_id(
|
||||
self,
|
||||
session: Session,
|
||||
run_id: str,
|
||||
) -> Sequence[WorkflowAppLog]:
|
||||
"""
|
||||
Fetch workflow app logs by workflow run ID.
|
||||
"""
|
||||
...
|
||||
|
||||
def create_archive_logs(
|
||||
self,
|
||||
session: Session,
|
||||
run: WorkflowRun,
|
||||
app_logs: Sequence[WorkflowAppLog],
|
||||
trigger_metadata: str | None,
|
||||
) -> int:
|
||||
"""
|
||||
Create archive log records for a workflow run.
|
||||
"""
|
||||
...
|
||||
|
||||
def get_archived_runs_by_time_range(
|
||||
self,
|
||||
session: Session,
|
||||
tenant_ids: Sequence[str] | None,
|
||||
start_date: datetime,
|
||||
end_date: datetime,
|
||||
limit: int,
|
||||
) -> Sequence[WorkflowRun]:
|
||||
"""
|
||||
Return workflow runs that already have archive logs, for cleanup of `workflow_runs`.
|
||||
"""
|
||||
...
|
||||
|
||||
def count_runs_with_related(
|
||||
self,
|
||||
runs: Sequence[WorkflowRun],
|
||||
|
||||
@ -351,3 +351,27 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut
|
||||
)
|
||||
|
||||
return int(node_executions_count), int(offloads_count)
|
||||
|
||||
@staticmethod
|
||||
def get_by_run(
|
||||
session: Session,
|
||||
run_id: str,
|
||||
) -> Sequence[WorkflowNodeExecutionModel]:
|
||||
"""
|
||||
Fetch node executions for a run using workflow_run_id.
|
||||
"""
|
||||
stmt = select(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.workflow_run_id == run_id)
|
||||
return list(session.scalars(stmt))
|
||||
|
||||
def get_offloads_by_execution_ids(
|
||||
self,
|
||||
session: Session,
|
||||
node_execution_ids: Sequence[str],
|
||||
) -> Sequence[WorkflowNodeExecutionOffload]:
|
||||
if not node_execution_ids:
|
||||
return []
|
||||
|
||||
stmt = select(WorkflowNodeExecutionOffload).where(
|
||||
WorkflowNodeExecutionOffload.node_execution_id.in_(node_execution_ids)
|
||||
)
|
||||
return list(session.scalars(stmt))
|
||||
|
||||
@ -40,14 +40,7 @@ from libs.infinite_scroll_pagination import InfiniteScrollPagination
|
||||
from libs.time_parser import get_time_threshold
|
||||
from libs.uuid_utils import uuidv7
|
||||
from models.enums import WorkflowRunTriggeredFrom
|
||||
from models.workflow import (
|
||||
WorkflowAppLog,
|
||||
WorkflowPauseReason,
|
||||
WorkflowRun,
|
||||
)
|
||||
from models.workflow import (
|
||||
WorkflowPause as WorkflowPauseModel,
|
||||
)
|
||||
from models.workflow import WorkflowAppLog, WorkflowArchiveLog, WorkflowPause, WorkflowPauseReason, WorkflowRun
|
||||
from repositories.api_workflow_run_repository import APIWorkflowRunRepository
|
||||
from repositories.entities.workflow_pause import WorkflowPauseEntity
|
||||
from repositories.types import (
|
||||
@ -369,6 +362,53 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
|
||||
return session.scalars(stmt).all()
|
||||
|
||||
def get_archived_run_ids(
|
||||
self,
|
||||
session: Session,
|
||||
run_ids: Sequence[str],
|
||||
) -> set[str]:
|
||||
if not run_ids:
|
||||
return set()
|
||||
|
||||
stmt = select(WorkflowArchiveLog.workflow_run_id).where(WorkflowArchiveLog.workflow_run_id.in_(run_ids))
|
||||
return set(session.scalars(stmt).all())
|
||||
|
||||
def get_archived_log_by_run_id(
|
||||
self,
|
||||
run_id: str,
|
||||
) -> WorkflowArchiveLog | None:
|
||||
with self._session_maker() as session:
|
||||
stmt = select(WorkflowArchiveLog).where(WorkflowArchiveLog.workflow_run_id == run_id).limit(1)
|
||||
return session.scalar(stmt)
|
||||
|
||||
def delete_archive_log_by_run_id(
|
||||
self,
|
||||
session: Session,
|
||||
run_id: str,
|
||||
) -> int:
|
||||
stmt = delete(WorkflowArchiveLog).where(WorkflowArchiveLog.workflow_run_id == run_id)
|
||||
result = session.execute(stmt)
|
||||
return cast(CursorResult, result).rowcount or 0
|
||||
|
||||
def get_pause_records_by_run_id(
|
||||
self,
|
||||
session: Session,
|
||||
run_id: str,
|
||||
) -> Sequence[WorkflowPause]:
|
||||
stmt = select(WorkflowPause).where(WorkflowPause.workflow_run_id == run_id)
|
||||
return list(session.scalars(stmt))
|
||||
|
||||
def get_pause_reason_records_by_run_id(
|
||||
self,
|
||||
session: Session,
|
||||
pause_ids: Sequence[str],
|
||||
) -> Sequence[WorkflowPauseReason]:
|
||||
if not pause_ids:
|
||||
return []
|
||||
|
||||
stmt = select(WorkflowPauseReason).where(WorkflowPauseReason.pause_id.in_(pause_ids))
|
||||
return list(session.scalars(stmt))
|
||||
|
||||
def delete_runs_with_related(
|
||||
self,
|
||||
runs: Sequence[WorkflowRun],
|
||||
@ -396,9 +436,8 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
app_logs_result = session.execute(delete(WorkflowAppLog).where(WorkflowAppLog.workflow_run_id.in_(run_ids)))
|
||||
app_logs_deleted = cast(CursorResult, app_logs_result).rowcount or 0
|
||||
|
||||
pause_ids = session.scalars(
|
||||
select(WorkflowPauseModel.id).where(WorkflowPauseModel.workflow_run_id.in_(run_ids))
|
||||
).all()
|
||||
pause_stmt = select(WorkflowPause.id).where(WorkflowPause.workflow_run_id.in_(run_ids))
|
||||
pause_ids = session.scalars(pause_stmt).all()
|
||||
pause_reasons_deleted = 0
|
||||
pauses_deleted = 0
|
||||
|
||||
@ -407,7 +446,7 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
delete(WorkflowPauseReason).where(WorkflowPauseReason.pause_id.in_(pause_ids))
|
||||
)
|
||||
pause_reasons_deleted = cast(CursorResult, pause_reasons_result).rowcount or 0
|
||||
pauses_result = session.execute(delete(WorkflowPauseModel).where(WorkflowPauseModel.id.in_(pause_ids)))
|
||||
pauses_result = session.execute(delete(WorkflowPause).where(WorkflowPause.id.in_(pause_ids)))
|
||||
pauses_deleted = cast(CursorResult, pauses_result).rowcount or 0
|
||||
|
||||
trigger_logs_deleted = delete_trigger_logs(session, run_ids) if delete_trigger_logs else 0
|
||||
@ -427,6 +466,124 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
"pause_reasons": pause_reasons_deleted,
|
||||
}
|
||||
|
||||
def get_app_logs_by_run_id(
|
||||
self,
|
||||
session: Session,
|
||||
run_id: str,
|
||||
) -> Sequence[WorkflowAppLog]:
|
||||
stmt = select(WorkflowAppLog).where(WorkflowAppLog.workflow_run_id == run_id)
|
||||
return list(session.scalars(stmt))
|
||||
|
||||
def create_archive_logs(
|
||||
self,
|
||||
session: Session,
|
||||
run: WorkflowRun,
|
||||
app_logs: Sequence[WorkflowAppLog],
|
||||
trigger_metadata: str | None,
|
||||
) -> int:
|
||||
if not app_logs:
|
||||
archive_log = WorkflowArchiveLog(
|
||||
log_id=None,
|
||||
log_created_at=None,
|
||||
log_created_from=None,
|
||||
tenant_id=run.tenant_id,
|
||||
app_id=run.app_id,
|
||||
workflow_id=run.workflow_id,
|
||||
workflow_run_id=run.id,
|
||||
created_by_role=run.created_by_role,
|
||||
created_by=run.created_by,
|
||||
run_version=run.version,
|
||||
run_status=run.status,
|
||||
run_triggered_from=run.triggered_from,
|
||||
run_error=run.error,
|
||||
run_elapsed_time=run.elapsed_time,
|
||||
run_total_tokens=run.total_tokens,
|
||||
run_total_steps=run.total_steps,
|
||||
run_created_at=run.created_at,
|
||||
run_finished_at=run.finished_at,
|
||||
run_exceptions_count=run.exceptions_count,
|
||||
trigger_metadata=trigger_metadata,
|
||||
)
|
||||
session.add(archive_log)
|
||||
return 1
|
||||
|
||||
archive_logs = [
|
||||
WorkflowArchiveLog(
|
||||
log_id=app_log.id,
|
||||
log_created_at=app_log.created_at,
|
||||
log_created_from=app_log.created_from,
|
||||
tenant_id=run.tenant_id,
|
||||
app_id=run.app_id,
|
||||
workflow_id=run.workflow_id,
|
||||
workflow_run_id=run.id,
|
||||
created_by_role=run.created_by_role,
|
||||
created_by=run.created_by,
|
||||
run_version=run.version,
|
||||
run_status=run.status,
|
||||
run_triggered_from=run.triggered_from,
|
||||
run_error=run.error,
|
||||
run_elapsed_time=run.elapsed_time,
|
||||
run_total_tokens=run.total_tokens,
|
||||
run_total_steps=run.total_steps,
|
||||
run_created_at=run.created_at,
|
||||
run_finished_at=run.finished_at,
|
||||
run_exceptions_count=run.exceptions_count,
|
||||
trigger_metadata=trigger_metadata,
|
||||
)
|
||||
for app_log in app_logs
|
||||
]
|
||||
session.add_all(archive_logs)
|
||||
return len(archive_logs)
|
||||
|
||||
def get_archived_runs_by_time_range(
|
||||
self,
|
||||
session: Session,
|
||||
tenant_ids: Sequence[str] | None,
|
||||
start_date: datetime,
|
||||
end_date: datetime,
|
||||
limit: int,
|
||||
) -> Sequence[WorkflowRun]:
|
||||
"""
|
||||
Retrieves WorkflowRun records by joining workflow_archive_logs.
|
||||
|
||||
Used to identify runs that are already archived and ready for deletion.
|
||||
"""
|
||||
stmt = (
|
||||
select(WorkflowRun)
|
||||
.join(WorkflowArchiveLog, WorkflowArchiveLog.workflow_run_id == WorkflowRun.id)
|
||||
.where(
|
||||
WorkflowArchiveLog.run_created_at >= start_date,
|
||||
WorkflowArchiveLog.run_created_at < end_date,
|
||||
)
|
||||
.order_by(WorkflowArchiveLog.run_created_at.asc(), WorkflowArchiveLog.workflow_run_id.asc())
|
||||
.limit(limit)
|
||||
)
|
||||
if tenant_ids:
|
||||
stmt = stmt.where(WorkflowArchiveLog.tenant_id.in_(tenant_ids))
|
||||
return list(session.scalars(stmt))
|
||||
|
||||
def get_archived_logs_by_time_range(
|
||||
self,
|
||||
session: Session,
|
||||
tenant_ids: Sequence[str] | None,
|
||||
start_date: datetime,
|
||||
end_date: datetime,
|
||||
limit: int,
|
||||
) -> Sequence[WorkflowArchiveLog]:
|
||||
# Returns WorkflowArchiveLog rows directly; use this when workflow_runs may be deleted.
|
||||
stmt = (
|
||||
select(WorkflowArchiveLog)
|
||||
.where(
|
||||
WorkflowArchiveLog.run_created_at >= start_date,
|
||||
WorkflowArchiveLog.run_created_at < end_date,
|
||||
)
|
||||
.order_by(WorkflowArchiveLog.run_created_at.asc(), WorkflowArchiveLog.workflow_run_id.asc())
|
||||
.limit(limit)
|
||||
)
|
||||
if tenant_ids:
|
||||
stmt = stmt.where(WorkflowArchiveLog.tenant_id.in_(tenant_ids))
|
||||
return list(session.scalars(stmt))
|
||||
|
||||
def count_runs_with_related(
|
||||
self,
|
||||
runs: Sequence[WorkflowRun],
|
||||
@ -459,7 +616,7 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
)
|
||||
|
||||
pause_ids = session.scalars(
|
||||
select(WorkflowPauseModel.id).where(WorkflowPauseModel.workflow_run_id.in_(run_ids))
|
||||
select(WorkflowPause.id).where(WorkflowPause.workflow_run_id.in_(run_ids))
|
||||
).all()
|
||||
pauses_count = len(pause_ids)
|
||||
pause_reasons_count = 0
|
||||
@ -511,9 +668,7 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
ValueError: If workflow_run_id is invalid or workflow run doesn't exist
|
||||
RuntimeError: If workflow is already paused or in invalid state
|
||||
"""
|
||||
previous_pause_model_query = select(WorkflowPauseModel).where(
|
||||
WorkflowPauseModel.workflow_run_id == workflow_run_id
|
||||
)
|
||||
previous_pause_model_query = select(WorkflowPause).where(WorkflowPause.workflow_run_id == workflow_run_id)
|
||||
with self._session_maker() as session, session.begin():
|
||||
# Get the workflow run
|
||||
workflow_run = session.get(WorkflowRun, workflow_run_id)
|
||||
@ -538,7 +693,7 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
# Upload the state file
|
||||
|
||||
# Create the pause record
|
||||
pause_model = WorkflowPauseModel()
|
||||
pause_model = WorkflowPause()
|
||||
pause_model.id = str(uuidv7())
|
||||
pause_model.workflow_id = workflow_run.workflow_id
|
||||
pause_model.workflow_run_id = workflow_run.id
|
||||
@ -710,13 +865,13 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
"""
|
||||
with self._session_maker() as session, session.begin():
|
||||
# Get the pause model by ID
|
||||
pause_model = session.get(WorkflowPauseModel, pause_entity.id)
|
||||
pause_model = session.get(WorkflowPause, pause_entity.id)
|
||||
if pause_model is None:
|
||||
raise _WorkflowRunError(f"WorkflowPause not found: {pause_entity.id}")
|
||||
self._delete_pause_model(session, pause_model)
|
||||
|
||||
@staticmethod
|
||||
def _delete_pause_model(session: Session, pause_model: WorkflowPauseModel):
|
||||
def _delete_pause_model(session: Session, pause_model: WorkflowPause):
|
||||
storage.delete(pause_model.state_object_key)
|
||||
|
||||
# Delete the pause record
|
||||
@ -751,15 +906,15 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
_limit: int = limit or 1000
|
||||
pruned_record_ids: list[str] = []
|
||||
cond = or_(
|
||||
WorkflowPauseModel.created_at < expiration,
|
||||
WorkflowPause.created_at < expiration,
|
||||
and_(
|
||||
WorkflowPauseModel.resumed_at.is_not(null()),
|
||||
WorkflowPauseModel.resumed_at < resumption_expiration,
|
||||
WorkflowPause.resumed_at.is_not(null()),
|
||||
WorkflowPause.resumed_at < resumption_expiration,
|
||||
),
|
||||
)
|
||||
# First, collect pause records to delete with their state files
|
||||
# Expired pauses (created before expiration time)
|
||||
stmt = select(WorkflowPauseModel).where(cond).limit(_limit)
|
||||
stmt = select(WorkflowPause).where(cond).limit(_limit)
|
||||
|
||||
with self._session_maker(expire_on_commit=False) as session:
|
||||
# Old resumed pauses (resumed more than resumption_duration ago)
|
||||
@ -770,7 +925,7 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
|
||||
# Delete state files from storage
|
||||
for pause in pauses_to_delete:
|
||||
with self._session_maker(expire_on_commit=False) as session, session.begin():
|
||||
# todo: this issues a separate query for each WorkflowPauseModel record.
|
||||
# todo: this issues a separate query for each WorkflowPause record.
|
||||
# consider batching this lookup.
|
||||
try:
|
||||
storage.delete(pause.state_object_key)
|
||||
@ -1022,7 +1177,7 @@ class _PrivateWorkflowPauseEntity(WorkflowPauseEntity):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
pause_model: WorkflowPauseModel,
|
||||
pause_model: WorkflowPause,
|
||||
reason_models: Sequence[WorkflowPauseReason],
|
||||
human_input_form: Sequence = (),
|
||||
) -> None:
|
||||
|
||||
@ -46,6 +46,11 @@ class SQLAlchemyWorkflowTriggerLogRepository(WorkflowTriggerLogRepository):
|
||||
|
||||
return self.session.scalar(query)
|
||||
|
||||
def list_by_run_id(self, run_id: str) -> Sequence[WorkflowTriggerLog]:
|
||||
"""List trigger logs for a workflow run."""
|
||||
query = select(WorkflowTriggerLog).where(WorkflowTriggerLog.workflow_run_id == run_id)
|
||||
return list(self.session.scalars(query).all())
|
||||
|
||||
def get_failed_for_retry(
|
||||
self, tenant_id: str, max_retry_count: int = 3, limit: int = 100
|
||||
) -> Sequence[WorkflowTriggerLog]:
|
||||
|
||||
Reference in New Issue
Block a user