mirror of
https://github.com/langgenius/dify.git
synced 2026-07-01 11:26:49 +08:00
Compare commits
14 Commits
deploy/dev
...
deploy/arc
| Author | SHA1 | Date | |
|---|---|---|---|
| bedaf0cbcd | |||
| 46163c68bd | |||
| 54a8ff2c1d | |||
| 9521c7fe7d | |||
| 995ba6b00e | |||
| 49a92f096f | |||
| 38602640f4 | |||
| 510408ebbc | |||
| fa1ac75922 | |||
| cb35c6fa98 | |||
| 34f62e7df6 | |||
| 07b5dcbb19 | |||
| 23917c7b3e | |||
| 8a6ce28855 |
@ -1,12 +1,52 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Iterator
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
import click
|
||||
from sqlalchemy import select
|
||||
|
||||
from configs import dify_config
|
||||
from core.db.session_factory import session_factory
|
||||
from models import TenantAccountJoin, TenantAccountRole
|
||||
from services.enterprise.rbac_service import ListOption, RBACService
|
||||
|
||||
_LEGACY_ROLE_TO_BUILTIN_TAG = {
|
||||
TenantAccountRole.OWNER.value: "owner",
|
||||
TenantAccountRole.ADMIN.value: "admin",
|
||||
TenantAccountRole.EDITOR.value: "editor",
|
||||
TenantAccountRole.NORMAL.value: "normal",
|
||||
TenantAccountRole.DATASET_OPERATOR.value: "dataset_operator",
|
||||
}
|
||||
|
||||
|
||||
def _resolve_builtin_role_ids(tenant_id: str, operator_account_id: str) -> dict[str, str]:
|
||||
"""Resolve every legacy workspace role to the current tenant's builtin RBAC role id.
|
||||
|
||||
The migration replays the old `TenantAccountJoin.role` values onto the
|
||||
RBAC member-role binding API. Builtin RBAC roles are tenant-scoped and
|
||||
identified by runtime ids, so the command must look them up per tenant.
|
||||
"""
|
||||
roles = RBACService.Roles.list(
|
||||
tenant_id=tenant_id,
|
||||
account_id=operator_account_id,
|
||||
options=ListOption(page_number=1, results_per_page=100),
|
||||
).data
|
||||
role_id_by_tag = {
|
||||
role.role_tag: role.id
|
||||
for role in roles
|
||||
if role.is_builtin and role.category == "global_system_default" and role.role_tag
|
||||
}
|
||||
resolved: dict[str, str] = {}
|
||||
for legacy_role, expected_builtin_tag in _LEGACY_ROLE_TO_BUILTIN_TAG.items():
|
||||
role_id = role_id_by_tag.get(expected_builtin_tag)
|
||||
if expected_builtin_tag == "dataset_operator" and not dify_config.DATASET_OPERATOR_ENABLED:
|
||||
continue
|
||||
if not role_id:
|
||||
raise ValueError(f"Builtin RBAC role not found for tenant={tenant_id}, legacy_role={legacy_role}")
|
||||
resolved[legacy_role] = role_id
|
||||
return resolved
|
||||
|
||||
|
||||
def _resolve_builtin_role_id(tenant_id: str, operator_account_id: str, legacy_role: str) -> str:
|
||||
"""Resolve a legacy workspace role to the current tenant's builtin RBAC role id.
|
||||
@ -15,26 +55,86 @@ def _resolve_builtin_role_id(tenant_id: str, operator_account_id: str, legacy_ro
|
||||
RBAC member-role binding API. Builtin RBAC roles are tenant-scoped and
|
||||
identified by runtime ids, so the command must look them up per tenant.
|
||||
"""
|
||||
expected_builtin_tag = {
|
||||
TenantAccountRole.OWNER.value: "owner",
|
||||
TenantAccountRole.ADMIN.value: "admin",
|
||||
TenantAccountRole.EDITOR.value: "editor",
|
||||
TenantAccountRole.NORMAL.value: "normal",
|
||||
TenantAccountRole.DATASET_OPERATOR.value: "dataset_operator",
|
||||
}.get(legacy_role)
|
||||
if not expected_builtin_tag:
|
||||
if legacy_role not in _LEGACY_ROLE_TO_BUILTIN_TAG:
|
||||
raise ValueError(f"Unsupported legacy workspace role: {legacy_role}")
|
||||
|
||||
roles = RBACService.Roles.list(
|
||||
return _resolve_builtin_role_ids(tenant_id, operator_account_id)[legacy_role]
|
||||
|
||||
|
||||
def _iter_tenant_member_batches(
|
||||
tenant_id: str | None,
|
||||
*,
|
||||
db_batch_size: int,
|
||||
api_batch_size: int,
|
||||
) -> Iterator[tuple[str, str, list[tuple[str, str]]]]:
|
||||
"""Yield legacy member roles in tenant-scoped API-sized batches.
|
||||
|
||||
Rows are projected to primitive values and streamed from the database, so
|
||||
the command never materializes every TenantAccountJoin ORM object. The
|
||||
iterator only keeps one tenant's API-sized batches in memory while it
|
||||
finds that tenant's owner account.
|
||||
"""
|
||||
with session_factory.create_session() as session:
|
||||
stmt = (
|
||||
select(TenantAccountJoin.tenant_id, TenantAccountJoin.account_id, TenantAccountJoin.role)
|
||||
.order_by(TenantAccountJoin.tenant_id.asc(), TenantAccountJoin.id.asc())
|
||||
.execution_options(yield_per=db_batch_size)
|
||||
)
|
||||
if tenant_id:
|
||||
stmt = stmt.where(TenantAccountJoin.tenant_id == tenant_id)
|
||||
|
||||
current_tenant_id: str | None = None
|
||||
owner_account_id: str | None = None
|
||||
batches: list[list[tuple[str, str]]] = []
|
||||
batch: list[tuple[str, str]] = []
|
||||
|
||||
def flush_current_tenant() -> Iterator[tuple[str, str, list[tuple[str, str]]]]:
|
||||
if current_tenant_id is None:
|
||||
return
|
||||
if batch:
|
||||
batches.append(batch.copy())
|
||||
if not owner_account_id:
|
||||
raise ValueError(f"Workspace owner not found for tenant={current_tenant_id}")
|
||||
for item in batches:
|
||||
yield current_tenant_id, owner_account_id, item
|
||||
|
||||
for row in session.execute(stmt):
|
||||
workspace_id = str(row.tenant_id)
|
||||
if current_tenant_id is not None and workspace_id != current_tenant_id:
|
||||
yield from flush_current_tenant()
|
||||
owner_account_id = None
|
||||
batches = []
|
||||
batch = []
|
||||
current_tenant_id = workspace_id
|
||||
account_id = str(row.account_id)
|
||||
role = str(row.role)
|
||||
if role == TenantAccountRole.OWNER.value:
|
||||
owner_account_id = account_id
|
||||
batch.append((account_id, role))
|
||||
if len(batch) >= api_batch_size:
|
||||
batches.append(batch)
|
||||
batch = []
|
||||
|
||||
yield from flush_current_tenant()
|
||||
|
||||
|
||||
def _member_already_has_role(current_roles_by_account_id: dict[str, set[str]], account_id: str, role_id: str) -> bool:
|
||||
return current_roles_by_account_id.get(account_id) == {role_id}
|
||||
|
||||
|
||||
def _replace_member_role(
|
||||
tenant_id: str,
|
||||
operator_account_id: str,
|
||||
member_account_id: str,
|
||||
role_id: str,
|
||||
) -> str:
|
||||
RBACService.MemberRoles.replace(
|
||||
tenant_id=tenant_id,
|
||||
account_id=operator_account_id,
|
||||
options=ListOption(page_number=1, results_per_page=100),
|
||||
).data
|
||||
for role in roles:
|
||||
if role.is_builtin and role.category == "global_system_default" and role.role_tag == expected_builtin_tag:
|
||||
return str(role.id)
|
||||
|
||||
raise ValueError(f"Builtin RBAC role not found for tenant={tenant_id}, legacy_role={legacy_role}")
|
||||
member_account_id=member_account_id,
|
||||
role_ids=[role_id],
|
||||
)
|
||||
return member_account_id
|
||||
|
||||
|
||||
@click.command(
|
||||
@ -42,7 +142,16 @@ def _resolve_builtin_role_id(tenant_id: str, operator_account_id: str, legacy_ro
|
||||
)
|
||||
@click.option("--tenant-id", help="Only migrate a single workspace.")
|
||||
@click.option("--dry-run", is_flag=True, default=False, help="Preview the migration without writing RBAC bindings.")
|
||||
def migrate_member_roles_to_rbac(tenant_id: str | None, dry_run: bool) -> None:
|
||||
@click.option("--db-batch-size", default=5000, show_default=True, help="Rows fetched per database batch.")
|
||||
@click.option("--api-batch-size", default=200, show_default=True, help="Members checked per RBAC batch_get call.")
|
||||
@click.option("--workers", default=1, show_default=True, help="Concurrent member role replace calls per tenant batch.")
|
||||
def migrate_member_roles_to_rbac(
|
||||
tenant_id: str | None,
|
||||
dry_run: bool,
|
||||
db_batch_size: int,
|
||||
api_batch_size: int,
|
||||
workers: int,
|
||||
) -> None:
|
||||
"""Backfill RBAC member-role bindings from legacy `TenantAccountJoin.role` data.
|
||||
|
||||
This is an offline migration command for workspaces that already have
|
||||
@ -50,63 +159,102 @@ def migrate_member_roles_to_rbac(tenant_id: str | None, dry_run: bool) -> None:
|
||||
member-role binding store.
|
||||
"""
|
||||
click.echo(click.style("Starting RBAC member-role migration.", fg="green"))
|
||||
if workers < 1:
|
||||
raise click.BadParameter("workers must be >= 1", param_hint="--workers")
|
||||
|
||||
with session_factory.create_session() as session:
|
||||
stmt = select(TenantAccountJoin).order_by(TenantAccountJoin.tenant_id.asc(), TenantAccountJoin.id.asc())
|
||||
if tenant_id:
|
||||
stmt = stmt.where(TenantAccountJoin.tenant_id == tenant_id)
|
||||
tenant_count = 0
|
||||
scanned_count = 0
|
||||
skipped_count = 0
|
||||
migrated_count = 0
|
||||
current_tenant_id: str | None = None
|
||||
role_ids_by_legacy_role: dict[str, str] = {}
|
||||
|
||||
joins = list(session.scalars(stmt).all())
|
||||
for workspace_id, owner_account_id, batch in _iter_tenant_member_batches(
|
||||
tenant_id,
|
||||
db_batch_size=db_batch_size,
|
||||
api_batch_size=api_batch_size,
|
||||
):
|
||||
scanned_count += len(batch)
|
||||
if workspace_id != current_tenant_id:
|
||||
tenant_count += 1
|
||||
current_tenant_id = workspace_id
|
||||
role_ids_by_legacy_role = _resolve_builtin_role_ids(workspace_id, owner_account_id)
|
||||
click.echo(f"tenant={workspace_id}")
|
||||
|
||||
if not joins:
|
||||
current_roles_by_account_id: dict[str, set[str]] = {}
|
||||
if not dry_run:
|
||||
current_roles = RBACService.MemberRoles.batch_get(
|
||||
tenant_id=workspace_id,
|
||||
account_id=owner_account_id,
|
||||
member_account_ids=[account_id for account_id, _ in batch],
|
||||
)
|
||||
current_roles_by_account_id = {
|
||||
item.account_id: {str(role.id) for role in item.roles} for item in current_roles
|
||||
}
|
||||
|
||||
replace_jobs: list[tuple[str, str]] = []
|
||||
for member_account_id, legacy_role in batch:
|
||||
resolved_role_id = role_ids_by_legacy_role.get(legacy_role)
|
||||
if not resolved_role_id:
|
||||
raise ValueError(f"Unsupported legacy workspace role: {legacy_role}")
|
||||
|
||||
if dry_run:
|
||||
click.echo(
|
||||
f"tenant={workspace_id} member={member_account_id} "
|
||||
f"legacy_role={legacy_role} -> rbac_role_id={resolved_role_id}"
|
||||
)
|
||||
continue
|
||||
|
||||
if _member_already_has_role(current_roles_by_account_id, member_account_id, resolved_role_id):
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
replace_jobs.append((member_account_id, resolved_role_id))
|
||||
|
||||
if replace_jobs:
|
||||
if workers == 1:
|
||||
for member_account_id, resolved_role_id in replace_jobs:
|
||||
_replace_member_role(workspace_id, owner_account_id, member_account_id, resolved_role_id)
|
||||
migrated_count += 1
|
||||
else:
|
||||
with ThreadPoolExecutor(max_workers=workers) as executor:
|
||||
futures = [
|
||||
executor.submit(
|
||||
_replace_member_role,
|
||||
workspace_id,
|
||||
owner_account_id,
|
||||
member_account_id,
|
||||
resolved_role_id,
|
||||
)
|
||||
for member_account_id, resolved_role_id in replace_jobs
|
||||
]
|
||||
for future in as_completed(futures):
|
||||
future.result()
|
||||
migrated_count += 1
|
||||
|
||||
if scanned_count % 10000 == 0:
|
||||
click.echo(
|
||||
f"progress scanned={scanned_count} migrated={migrated_count} skipped={skipped_count}",
|
||||
err=True,
|
||||
)
|
||||
|
||||
if scanned_count == 0:
|
||||
click.echo(click.style("No workspace members found for migration.", fg="yellow"))
|
||||
return
|
||||
|
||||
owner_account_by_tenant: dict[str, str] = {}
|
||||
resolved_role_ids: dict[tuple[str, str], str] = {}
|
||||
migrated_count = 0
|
||||
|
||||
for join in joins:
|
||||
workspace_id = str(join.tenant_id)
|
||||
member_account_id = str(join.account_id)
|
||||
legacy_role = str(join.role)
|
||||
|
||||
if workspace_id not in owner_account_by_tenant:
|
||||
owner_join = next(
|
||||
(
|
||||
item
|
||||
for item in joins
|
||||
if str(item.tenant_id) == workspace_id and str(item.role) == TenantAccountRole.OWNER.value
|
||||
),
|
||||
None,
|
||||
)
|
||||
if not owner_join:
|
||||
raise ValueError(f"Workspace owner not found for tenant={workspace_id}")
|
||||
owner_account_by_tenant[workspace_id] = str(owner_join.account_id)
|
||||
|
||||
operator_account_id = owner_account_by_tenant[workspace_id]
|
||||
cache_key = (workspace_id, legacy_role)
|
||||
if cache_key not in resolved_role_ids:
|
||||
resolved_role_ids[cache_key] = _resolve_builtin_role_id(workspace_id, operator_account_id, legacy_role)
|
||||
|
||||
resolved_role_id = resolved_role_ids[cache_key]
|
||||
click.echo(
|
||||
f"tenant={workspace_id} member={member_account_id} "
|
||||
f"legacy_role={legacy_role} -> rbac_role_id={resolved_role_id}"
|
||||
)
|
||||
|
||||
if dry_run:
|
||||
continue
|
||||
|
||||
RBACService.MemberRoles.replace(
|
||||
tenant_id=workspace_id,
|
||||
account_id=operator_account_id,
|
||||
member_account_id=member_account_id,
|
||||
role_ids=[resolved_role_id],
|
||||
)
|
||||
migrated_count += 1
|
||||
|
||||
if dry_run:
|
||||
click.echo(click.style("Dry run completed. No RBAC bindings were written.", fg="yellow"))
|
||||
click.echo(
|
||||
click.style(
|
||||
f"Dry run completed. Scanned {scanned_count} members across {tenant_count} tenants. "
|
||||
"No RBAC bindings were written.",
|
||||
fg="yellow",
|
||||
)
|
||||
)
|
||||
else:
|
||||
click.echo(click.style(f"RBAC member-role migration completed. Migrated {migrated_count} members.", fg="green"))
|
||||
click.echo(
|
||||
click.style(
|
||||
f"RBAC member-role migration completed. Scanned {scanned_count} members across {tenant_count} tenants, "
|
||||
f"migrated {migrated_count}, skipped {skipped_count} already up-to-date.",
|
||||
fg="green",
|
||||
)
|
||||
)
|
||||
|
||||
@ -1,10 +1,12 @@
|
||||
import datetime
|
||||
import logging
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from typing import TypedDict
|
||||
|
||||
import click
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
from extensions.ext_database import db
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
@ -12,6 +14,7 @@ from services.clear_free_plan_tenant_expired_logs import ClearFreePlanTenantExpi
|
||||
from services.retention.conversation.messages_clean_policy import create_message_clean_policy
|
||||
from services.retention.conversation.messages_clean_service import MessagesCleanService
|
||||
from services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup
|
||||
from services.retention.workflow_run.db_retry import run_with_db_retry
|
||||
from services.retention.workflow_run.tenant_prefix import tenant_prefix_condition
|
||||
from tasks.remove_app_and_related_data_task import delete_draft_variables_batch
|
||||
|
||||
@ -35,6 +38,12 @@ class WorkflowRunArchiveTenantPlan(TypedDict):
|
||||
unpaid_tenant_ids: list[str]
|
||||
|
||||
|
||||
class WorkflowRunArchivePrefixStats(TypedDict):
|
||||
tenant_ids: list[str]
|
||||
workflow_runs: int
|
||||
workflow_node_executions: int
|
||||
|
||||
|
||||
def _normalize_utc_datetime(value: datetime.datetime) -> datetime.datetime:
|
||||
if value.tzinfo is None:
|
||||
return value.replace(tzinfo=datetime.UTC)
|
||||
@ -57,6 +66,7 @@ def _parse_tenant_prefixes(prefixes: str | None) -> list[str]:
|
||||
|
||||
|
||||
def _get_archive_candidate_tenant_ids_by_prefix(
|
||||
session: Session,
|
||||
prefix: str,
|
||||
*,
|
||||
start_from: datetime.datetime | None,
|
||||
@ -75,7 +85,7 @@ def _get_archive_candidate_tenant_ids_by_prefix(
|
||||
if start_from is not None:
|
||||
conditions.append(WorkflowRun.created_at >= start_from)
|
||||
|
||||
tenant_ids = db.session.scalars(
|
||||
tenant_ids = session.scalars(
|
||||
sa.select(WorkflowRun.tenant_id).where(*conditions).distinct().order_by(WorkflowRun.tenant_id)
|
||||
).all()
|
||||
return list(tenant_ids)
|
||||
@ -102,8 +112,80 @@ def _filter_paid_workflow_archive_tenant_ids(tenant_ids: list[str]) -> tuple[lis
|
||||
return paid_tenant_ids, unpaid_tenant_ids
|
||||
|
||||
|
||||
def _run_archive_command_db_retry[T](operation_name: str, operation: Callable[[], T]) -> T:
|
||||
return run_with_db_retry(operation_name, operation, logger=logger)
|
||||
|
||||
|
||||
def _get_archive_candidate_tenant_ids_with_retry(
|
||||
session_maker: sessionmaker[Session],
|
||||
prefix: str,
|
||||
*,
|
||||
start_from: datetime.datetime | None,
|
||||
end_before: datetime.datetime,
|
||||
) -> list[str]:
|
||||
def fetch_tenant_ids() -> list[str]:
|
||||
with session_maker() as session:
|
||||
return _get_archive_candidate_tenant_ids_by_prefix(
|
||||
session,
|
||||
prefix,
|
||||
start_from=start_from,
|
||||
end_before=end_before,
|
||||
)
|
||||
|
||||
return _run_archive_command_db_retry(f"workflow archive tenant resolve for prefix {prefix}", fetch_tenant_ids)
|
||||
|
||||
|
||||
def _get_archive_plan_prefix_stats(
|
||||
session_maker: sessionmaker[Session],
|
||||
prefix: str,
|
||||
*,
|
||||
start_from: datetime.datetime | None,
|
||||
end_before: datetime.datetime,
|
||||
) -> WorkflowRunArchivePrefixStats:
|
||||
from graphon.enums import WorkflowExecutionStatus
|
||||
from models.workflow import WorkflowNodeExecutionModel, WorkflowRun
|
||||
from services.retention.workflow_run.archive_paid_plan_workflow_run import WorkflowRunArchiver
|
||||
|
||||
def fetch_prefix_stats() -> WorkflowRunArchivePrefixStats:
|
||||
with session_maker() as session:
|
||||
tenant_ids = _get_archive_candidate_tenant_ids_by_prefix(
|
||||
session,
|
||||
prefix,
|
||||
start_from=start_from,
|
||||
end_before=end_before,
|
||||
)
|
||||
run_conditions = [
|
||||
WorkflowRun.created_at < end_before,
|
||||
WorkflowRun.status.in_(WorkflowExecutionStatus.ended_values()),
|
||||
WorkflowRun.type.in_(WorkflowRunArchiver.ARCHIVED_TYPE),
|
||||
tenant_prefix_condition(WorkflowRun.tenant_id, prefix),
|
||||
]
|
||||
if start_from is not None:
|
||||
run_conditions.append(WorkflowRun.created_at >= start_from)
|
||||
workflow_runs = (
|
||||
session.scalar(sa.select(sa.func.count()).select_from(WorkflowRun).where(*run_conditions)) or 0
|
||||
)
|
||||
candidate_runs = sa.select(WorkflowRun.id).where(*run_conditions).subquery()
|
||||
workflow_node_executions = (
|
||||
session.scalar(
|
||||
sa.select(sa.func.count())
|
||||
.select_from(WorkflowNodeExecutionModel)
|
||||
.join(candidate_runs, WorkflowNodeExecutionModel.workflow_run_id == candidate_runs.c.id)
|
||||
)
|
||||
or 0
|
||||
)
|
||||
return WorkflowRunArchivePrefixStats(
|
||||
tenant_ids=tenant_ids,
|
||||
workflow_runs=workflow_runs,
|
||||
workflow_node_executions=workflow_node_executions,
|
||||
)
|
||||
|
||||
return _run_archive_command_db_retry(f"workflow archive plan for prefix {prefix}", fetch_prefix_stats)
|
||||
|
||||
|
||||
def _resolve_archive_tenant_ids_from_plan(
|
||||
*,
|
||||
session_maker: sessionmaker[Session],
|
||||
tenant_ids: str | None,
|
||||
tenant_prefixes: list[str],
|
||||
start_from: datetime.datetime | None,
|
||||
@ -122,7 +204,8 @@ def _resolve_archive_tenant_ids_from_plan(
|
||||
requested_tenant_ids = []
|
||||
for prefix in tenant_prefixes:
|
||||
requested_tenant_ids.extend(
|
||||
_get_archive_candidate_tenant_ids_by_prefix(
|
||||
_get_archive_candidate_tenant_ids_with_retry(
|
||||
session_maker,
|
||||
prefix,
|
||||
start_from=start_from,
|
||||
end_before=end_before,
|
||||
@ -143,6 +226,24 @@ def _resolve_archive_tenant_ids_from_plan(
|
||||
)
|
||||
|
||||
|
||||
def _safe_remove_scoped_session(context: str) -> None:
|
||||
try:
|
||||
db.session.remove()
|
||||
except Exception:
|
||||
logger.warning("Ignoring DB scoped-session cleanup error after %s", context, exc_info=True)
|
||||
registry = getattr(db.session, "registry", None)
|
||||
clear_registry = getattr(registry, "clear", None)
|
||||
if callable(clear_registry):
|
||||
try:
|
||||
clear_registry()
|
||||
except Exception:
|
||||
logger.warning("Ignoring DB scoped-session registry cleanup error after %s", context, exc_info=True)
|
||||
try:
|
||||
db.engine.dispose()
|
||||
except Exception:
|
||||
logger.warning("Ignoring DB engine dispose error after %s", context, exc_info=True)
|
||||
|
||||
|
||||
def _resolve_archive_time_range(
|
||||
*,
|
||||
before_days: int,
|
||||
@ -349,10 +450,6 @@ def archive_workflow_runs_plan(
|
||||
supported workflow types, and the requested created_at window. V2 bundle archive
|
||||
does not maintain per-run archive logs, so this plan reports source-table volume.
|
||||
"""
|
||||
from graphon.enums import WorkflowExecutionStatus
|
||||
from models.workflow import WorkflowNodeExecutionModel, WorkflowRun
|
||||
from services.retention.workflow_run.archive_paid_plan_workflow_run import WorkflowRunArchiver
|
||||
|
||||
before_days, start_from, end_before = _resolve_archive_time_range(
|
||||
before_days=before_days,
|
||||
from_days_ago=from_days_ago,
|
||||
@ -364,37 +461,25 @@ def archive_workflow_runs_plan(
|
||||
if include_archived:
|
||||
click.echo(click.style("--include-archived is a no-op for V2 bundle archive plans.", fg="yellow"))
|
||||
|
||||
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
|
||||
rows: list[WorkflowRunArchivePlanRow] = []
|
||||
for prefix in _HEX_PREFIXES:
|
||||
tenant_ids = _get_archive_candidate_tenant_ids_by_prefix(
|
||||
prefix,
|
||||
start_from=start_from,
|
||||
end_before=plan_end_before,
|
||||
)
|
||||
try:
|
||||
prefix_stats = _get_archive_plan_prefix_stats(
|
||||
session_maker,
|
||||
prefix,
|
||||
start_from=start_from,
|
||||
end_before=plan_end_before,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.exception("Failed to build workflow archive plan for prefix %s", prefix)
|
||||
raise click.ClickException(f"Failed to build workflow archive plan for prefix {prefix}.") from exc
|
||||
tenant_ids = prefix_stats["tenant_ids"]
|
||||
workflow_runs = prefix_stats["workflow_runs"]
|
||||
workflow_node_executions = prefix_stats["workflow_node_executions"]
|
||||
total_tenants = len(tenant_ids)
|
||||
paid_tenant_ids, unpaid_tenant_ids = _filter_paid_workflow_archive_tenant_ids(tenant_ids)
|
||||
|
||||
run_conditions = [
|
||||
WorkflowRun.created_at < plan_end_before,
|
||||
WorkflowRun.status.in_(WorkflowExecutionStatus.ended_values()),
|
||||
WorkflowRun.type.in_(WorkflowRunArchiver.ARCHIVED_TYPE),
|
||||
tenant_prefix_condition(WorkflowRun.tenant_id, prefix),
|
||||
]
|
||||
if start_from is not None:
|
||||
run_conditions.append(WorkflowRun.created_at >= start_from)
|
||||
workflow_runs = (
|
||||
db.session.scalar(sa.select(sa.func.count()).select_from(WorkflowRun).where(*run_conditions)) or 0
|
||||
)
|
||||
candidate_runs = sa.select(WorkflowRun.id).where(*run_conditions).subquery()
|
||||
workflow_node_executions = (
|
||||
db.session.scalar(
|
||||
sa.select(sa.func.count())
|
||||
.select_from(WorkflowNodeExecutionModel)
|
||||
.join(candidate_runs, WorkflowNodeExecutionModel.workflow_run_id == candidate_runs.c.id)
|
||||
)
|
||||
or 0
|
||||
)
|
||||
|
||||
rows.append(
|
||||
WorkflowRunArchivePlanRow(
|
||||
tenant_prefix=prefix,
|
||||
@ -574,17 +659,18 @@ def archive_workflow_runs(
|
||||
)
|
||||
)
|
||||
|
||||
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
|
||||
try:
|
||||
tenant_plan = _resolve_archive_tenant_ids_from_plan(
|
||||
session_maker=session_maker,
|
||||
tenant_ids=tenant_ids,
|
||||
tenant_prefixes=parsed_tenant_prefixes,
|
||||
start_from=start_from,
|
||||
end_before=plan_end_before,
|
||||
)
|
||||
except Exception:
|
||||
except Exception as exc:
|
||||
logger.exception("Failed to resolve workflow archive tenant plan")
|
||||
click.echo(click.style("Failed to resolve workflow archive tenant plan.", fg="red"))
|
||||
return
|
||||
raise click.ClickException("Failed to resolve workflow archive tenant plan.") from exc
|
||||
|
||||
planned_tenant_ids = tenant_plan["archive_tenant_ids"]
|
||||
planned_paid_tenant_ids = tenant_plan["paid_tenant_ids"] if planned_tenant_ids is not None else None
|
||||
@ -616,7 +702,10 @@ def archive_workflow_runs(
|
||||
dry_run=dry_run,
|
||||
delete_after_archive=delete_after_archive,
|
||||
)
|
||||
summary = archiver.run()
|
||||
try:
|
||||
summary = archiver.run()
|
||||
finally:
|
||||
_safe_remove_scoped_session("archive workflow run command")
|
||||
click.echo(
|
||||
click.style(
|
||||
f"Summary: processed={summary.total_runs_processed}, archived={summary.runs_archived}, "
|
||||
|
||||
@ -1,10 +1,13 @@
|
||||
import logging
|
||||
import re
|
||||
from collections.abc import Callable
|
||||
from contextlib import AbstractContextManager, ExitStack
|
||||
from types import TracebackType
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from flask import has_request_context, request
|
||||
|
||||
from core.mcp.client.sse_client import sse_client
|
||||
from core.mcp.client.streamable_client import streamablehttp_client
|
||||
from core.mcp.error import MCPConnectionError
|
||||
@ -23,10 +26,22 @@ class MCPClient:
|
||||
sse_read_timeout: float | None = None,
|
||||
):
|
||||
self.server_url = server_url
|
||||
self.headers = headers or {}
|
||||
self.headers = headers.copy() if headers else {}
|
||||
self.timeout = timeout
|
||||
self.sse_read_timeout = sse_read_timeout
|
||||
|
||||
# Substitute placeholders with incoming request headers if in a request context
|
||||
if has_request_context() and self.headers:
|
||||
pattern = re.compile(r"\{\{\s*request\.headers?\.(.+?)\s*\}\}", re.IGNORECASE)
|
||||
for key, value in list(self.headers.items()):
|
||||
if isinstance(value, str):
|
||||
|
||||
def replace_func(match):
|
||||
header_name = match.group(1)
|
||||
return request.headers.get(header_name, "")
|
||||
|
||||
self.headers[key] = pattern.sub(replace_func, value)
|
||||
|
||||
# Initialize session and client objects
|
||||
self._session: ClientSession | None = None
|
||||
self._exit_stack = ExitStack()
|
||||
|
||||
@ -45,34 +45,52 @@ def upgrade():
|
||||
# PostgreSQL 18's `uuidv7` function. This capability is rarely needed in practice, as IDs can be
|
||||
# generated and controlled within the application layer.
|
||||
conn = op.get_bind()
|
||||
|
||||
if _is_pg(conn):
|
||||
# PostgreSQL: Create uuidv7 functions
|
||||
op.execute(sa.text(r"""
|
||||
/* Main function to generate a uuidv7 value with millisecond precision */
|
||||
CREATE FUNCTION uuidv7() RETURNS uuid
|
||||
AS
|
||||
$$
|
||||
-- Replace the first 48 bits of a uuidv4 with the current
|
||||
-- number of milliseconds since 1970-01-01 UTC
|
||||
-- and set the "ver" field to 7 by setting additional bits
|
||||
SELECT encode(
|
||||
set_bit(
|
||||
set_bit(
|
||||
overlay(uuid_send(gen_random_uuid()) placing
|
||||
substring(int8send((extract(epoch from clock_timestamp()) * 1000)::bigint) from
|
||||
3)
|
||||
from 1 for 6),
|
||||
52, 1),
|
||||
53, 1), 'hex')::uuid;
|
||||
$$ LANGUAGE SQL VOLATILE PARALLEL SAFE;
|
||||
|
||||
COMMENT ON FUNCTION uuidv7 IS
|
||||
'Generate a uuid-v7 value with a 48-bit timestamp (millisecond precision) and 74 bits of randomness';
|
||||
if _is_pg(conn):
|
||||
# PostgreSQL: Create uuidv7 functions.
|
||||
# PostgreSQL 18 ships a native pg_catalog.uuidv7(), so only create our own
|
||||
# implementation when the server does not already provide one. Otherwise the
|
||||
# CREATE FUNCTION below and the unqualified COMMENT statement collide with the
|
||||
# built-in and the migration fails.
|
||||
#
|
||||
# The existence check is done server-side via a DO block rather than
|
||||
# conn.execute().scalar() because the latter returns None in offline
|
||||
# migration mode (no real database connection), causing an AttributeError.
|
||||
op.execute(sa.text(r"""
|
||||
DO $do$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
SELECT 1 FROM pg_proc p
|
||||
JOIN pg_namespace n ON p.pronamespace = n.oid
|
||||
WHERE p.proname = 'uuidv7' AND n.nspname = 'pg_catalog'
|
||||
) THEN
|
||||
/* Main function to generate a uuidv7 value with millisecond precision */
|
||||
CREATE FUNCTION public.uuidv7() RETURNS uuid
|
||||
AS
|
||||
$func$
|
||||
-- Replace the first 48 bits of a uuidv4 with the current
|
||||
-- number of milliseconds since 1970-01-01 UTC
|
||||
-- and set the "ver" field to 7 by setting additional bits
|
||||
SELECT encode(
|
||||
set_bit(
|
||||
set_bit(
|
||||
overlay(uuid_send(gen_random_uuid()) placing
|
||||
substring(int8send((extract(epoch from clock_timestamp()) * 1000)::bigint) from
|
||||
3)
|
||||
from 1 for 6),
|
||||
52, 1),
|
||||
53, 1), 'hex')::uuid;
|
||||
$func$ LANGUAGE SQL VOLATILE PARALLEL SAFE;
|
||||
|
||||
COMMENT ON FUNCTION public.uuidv7 IS
|
||||
'Generate a uuid-v7 value with a 48-bit timestamp (millisecond precision) and 74 bits of randomness';
|
||||
END IF;
|
||||
END
|
||||
$do$;
|
||||
"""))
|
||||
|
||||
op.execute(sa.text(r"""
|
||||
CREATE FUNCTION uuidv7_boundary(timestamptz) RETURNS uuid
|
||||
CREATE FUNCTION public.uuidv7_boundary(timestamptz) RETURNS uuid
|
||||
AS
|
||||
$$
|
||||
/* uuid fields: version=0b0111, variant=0b10 */
|
||||
@ -83,7 +101,7 @@ SELECT encode(
|
||||
'hex')::uuid;
|
||||
$$ LANGUAGE SQL STABLE STRICT PARALLEL SAFE;
|
||||
|
||||
COMMENT ON FUNCTION uuidv7_boundary(timestamptz) IS
|
||||
COMMENT ON FUNCTION public.uuidv7_boundary(timestamptz) IS
|
||||
'Generate a non-random uuidv7 with the given timestamp (first 48 bits) and all random bits to 0. As the smallest possible uuidv7 for that timestamp, it may be used as a boundary for partitions.';
|
||||
"""
|
||||
))
|
||||
@ -95,7 +113,10 @@ def downgrade():
|
||||
conn = op.get_bind()
|
||||
|
||||
if _is_pg(conn):
|
||||
op.execute(sa.text("DROP FUNCTION uuidv7"))
|
||||
op.execute(sa.text("DROP FUNCTION uuidv7_boundary"))
|
||||
# IF EXISTS keeps the downgrade a no-op on PostgreSQL 18, where the native
|
||||
# pg_catalog.uuidv7() was kept and no public.uuidv7() was created. Scoping the
|
||||
# drop to the public schema avoids touching the built-in.
|
||||
op.execute(sa.text("DROP FUNCTION IF EXISTS public.uuidv7()"))
|
||||
op.execute(sa.text("DROP FUNCTION IF EXISTS public.uuidv7_boundary(timestamptz)"))
|
||||
else:
|
||||
pass
|
||||
|
||||
@ -144,7 +144,7 @@ class AnalyticdbVectorBySql:
|
||||
f"id text PRIMARY KEY,"
|
||||
f"vector real[], ref_doc_id text, page_content text, metadata_ jsonb, "
|
||||
f"to_tsvector TSVECTOR"
|
||||
f") WITH (fillfactor=70) DISTRIBUTED BY (id);"
|
||||
f") DISTRIBUTED BY (id);"
|
||||
)
|
||||
if embedding_dimension is not None:
|
||||
index_name = f"{self._collection_name}_embedding_idx"
|
||||
@ -153,7 +153,7 @@ class AnalyticdbVectorBySql:
|
||||
cur.execute(
|
||||
f"CREATE INDEX {index_name} ON {self.table_name} USING ann(vector) "
|
||||
f"WITH(dim='{embedding_dimension}', distancemeasure='{self.config.metrics}', "
|
||||
f"pq_enable=0, external_storage=0)"
|
||||
f"pq_enable=0)"
|
||||
)
|
||||
cur.execute(f"CREATE INDEX ON {self.table_name} USING gin(to_tsvector)")
|
||||
except Exception as e:
|
||||
|
||||
@ -435,6 +435,7 @@ _LEGACY_APP_EDITOR_KEYS: list[str] = [
|
||||
"app.acl.delete",
|
||||
"app.acl.release_and_version",
|
||||
"app.acl.monitor",
|
||||
"app.acl.log_and_annotation",
|
||||
"app.acl.access_config",
|
||||
]
|
||||
|
||||
|
||||
@ -29,12 +29,12 @@ import hashlib
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from collections.abc import Sequence
|
||||
from collections.abc import Callable, Sequence
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from threading import Lock
|
||||
from typing import Any, NotRequired, TypedDict, cast
|
||||
from typing import Any, NotRequired, TypedDict, TypeVar, cast
|
||||
|
||||
import click
|
||||
import pyarrow as pa
|
||||
@ -70,8 +70,10 @@ from services.retention.workflow_run.constants import (
|
||||
ARCHIVE_BUNDLE_MANIFEST_NAME,
|
||||
ARCHIVE_BUNDLE_SCHEMA_VERSION,
|
||||
)
|
||||
from services.retention.workflow_run.db_retry import is_retryable_db_disconnect, run_with_db_retry
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class TableStatsManifestEntry(TypedDict):
|
||||
@ -208,6 +210,8 @@ class WorkflowRunArchiver:
|
||||
"workflow_pause_reasons",
|
||||
"workflow_trigger_logs",
|
||||
]
|
||||
DB_RETRY_ATTEMPTS = 3
|
||||
DB_RETRY_DELAYS_SECONDS = (1.0, 2.0)
|
||||
|
||||
start_from: datetime.datetime | None
|
||||
end_before: datetime.datetime
|
||||
@ -455,16 +459,40 @@ class WorkflowRunArchiver:
|
||||
"""Fetch a batch of workflow runs to archive."""
|
||||
repo = self._get_workflow_run_repo()
|
||||
tenant_ids = list(tenant_scope) if tenant_scope is not None else self.tenant_ids or None
|
||||
return repo.get_runs_batch_by_time_range(
|
||||
start_from=self.start_from,
|
||||
end_before=self.end_before,
|
||||
last_seen=last_seen,
|
||||
batch_size=self.batch_size,
|
||||
run_types=self.ARCHIVED_TYPE,
|
||||
tenant_ids=tenant_ids,
|
||||
tenant_prefixes=None if tenant_ids else self.tenant_prefixes or None,
|
||||
run_shard_index=self.run_shard_index,
|
||||
run_shard_total=self.run_shard_total,
|
||||
|
||||
return self._run_with_db_retry(
|
||||
"workflow run batch fetch",
|
||||
lambda: repo.get_runs_batch_by_time_range(
|
||||
start_from=self.start_from,
|
||||
end_before=self.end_before,
|
||||
last_seen=last_seen,
|
||||
batch_size=self.batch_size,
|
||||
run_types=self.ARCHIVED_TYPE,
|
||||
tenant_ids=tenant_ids,
|
||||
tenant_prefixes=None if tenant_ids else self.tenant_prefixes or None,
|
||||
run_shard_index=self.run_shard_index,
|
||||
run_shard_total=self.run_shard_total,
|
||||
),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _is_retryable_db_disconnect(exc: BaseException) -> bool:
|
||||
return is_retryable_db_disconnect(exc)
|
||||
|
||||
@staticmethod
|
||||
def _safe_rollback(session: Session, bundle_id: str) -> None:
|
||||
try:
|
||||
session.rollback()
|
||||
except Exception:
|
||||
logger.warning("Failed to rollback archive session for bundle %s", bundle_id, exc_info=True)
|
||||
|
||||
def _run_with_db_retry(self, operation_name: str, operation: Callable[[], T]) -> T:
|
||||
return run_with_db_retry(
|
||||
operation_name,
|
||||
operation,
|
||||
logger=logger,
|
||||
attempts=self.DB_RETRY_ATTEMPTS,
|
||||
delays_seconds=self.DB_RETRY_DELAYS_SECONDS,
|
||||
)
|
||||
|
||||
def _tenant_scan_scopes(self) -> list[list[str] | None]:
|
||||
@ -532,16 +560,14 @@ class WorkflowRunArchiver:
|
||||
if self.workers == 1 or len(bundle_groups) == 1:
|
||||
results: list[ArchiveResult] = []
|
||||
for bundle_runs in bundle_groups:
|
||||
with session_maker() as session:
|
||||
results.append(self._archive_bundle(session, storage, bundle_runs))
|
||||
results.append(self._archive_bundle_with_retry(session_maker, storage, bundle_runs))
|
||||
return results
|
||||
|
||||
results = []
|
||||
max_workers = min(self.workers, len(bundle_groups))
|
||||
|
||||
def archive_in_worker(bundle_runs: Sequence[WorkflowRun]) -> ArchiveResult:
|
||||
with session_maker() as session:
|
||||
return self._archive_bundle(session, storage, bundle_runs)
|
||||
return self._archive_bundle_with_retry(session_maker, storage, bundle_runs)
|
||||
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
futures = [executor.submit(archive_in_worker, bundle_runs) for bundle_runs in bundle_groups]
|
||||
@ -549,6 +575,39 @@ class WorkflowRunArchiver:
|
||||
results.append(future.result())
|
||||
return results
|
||||
|
||||
def _archive_bundle_with_retry(
|
||||
self,
|
||||
session_maker: sessionmaker[Session],
|
||||
storage: ArchiveStorage | None,
|
||||
runs: Sequence[WorkflowRun],
|
||||
) -> ArchiveResult:
|
||||
identity = self._build_bundle_identity(runs)
|
||||
|
||||
try:
|
||||
return self._run_with_db_retry(
|
||||
f"archive workflow run bundle {identity.bundle_id}",
|
||||
lambda: self._archive_bundle_once(session_maker, storage, runs),
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.exception("Failed to archive workflow run bundle %s after retries", identity.bundle_id)
|
||||
return ArchiveResult(
|
||||
bundle_id=identity.bundle_id,
|
||||
tenant_id=identity.tenant_id,
|
||||
object_prefix=identity.object_prefix,
|
||||
run_count=len(runs),
|
||||
success=False,
|
||||
error=str(exc),
|
||||
)
|
||||
|
||||
def _archive_bundle_once(
|
||||
self,
|
||||
session_maker: sessionmaker[Session],
|
||||
storage: ArchiveStorage | None,
|
||||
runs: Sequence[WorkflowRun],
|
||||
) -> ArchiveResult:
|
||||
with session_maker() as session:
|
||||
return self._archive_bundle(session, storage, runs)
|
||||
|
||||
def _archive_bundle(
|
||||
self,
|
||||
session: Session,
|
||||
@ -645,9 +704,12 @@ class WorkflowRunArchiver:
|
||||
result.success = True
|
||||
|
||||
except Exception as e:
|
||||
if self._is_retryable_db_disconnect(e):
|
||||
self._safe_rollback(session, identity.bundle_id)
|
||||
raise
|
||||
logger.exception("Failed to archive workflow run bundle %s", identity.bundle_id)
|
||||
result.error = str(e)
|
||||
session.rollback()
|
||||
self._safe_rollback(session, identity.bundle_id)
|
||||
|
||||
result.elapsed_time = time.time() - start_time
|
||||
return result
|
||||
@ -794,6 +856,9 @@ class WorkflowRunArchiver:
|
||||
end_before = self.end_before
|
||||
if end_before is None:
|
||||
raise ValueError("archive window end must be set")
|
||||
formatted_end_before = self._format_window_datetime(end_before)
|
||||
if formatted_end_before is None:
|
||||
raise ValueError("archive window end must be set")
|
||||
return ArchiveManifestDict(
|
||||
schema_version=ARCHIVE_BUNDLE_SCHEMA_VERSION,
|
||||
archive_format=ARCHIVE_BUNDLE_FORMAT,
|
||||
@ -813,7 +878,7 @@ class WorkflowRunArchiver:
|
||||
archived_at=datetime.datetime.now(datetime.UTC).isoformat(),
|
||||
campaign_id=self.campaign_id,
|
||||
archive_window_start=self._format_window_datetime(self.start_from),
|
||||
archive_window_end=end_before.isoformat(),
|
||||
archive_window_end=formatted_end_before,
|
||||
run_shard=identity.shard,
|
||||
tables=tables,
|
||||
run_ids=[run.id for run in sorted_runs],
|
||||
|
||||
66
api/services/retention/workflow_run/db_retry.py
Normal file
66
api/services/retention/workflow_run/db_retry.py
Normal file
@ -0,0 +1,66 @@
|
||||
import logging
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
|
||||
from sqlalchemy.exc import DBAPIError
|
||||
from sqlalchemy.exc import OperationalError as SQLAlchemyOperationalError
|
||||
|
||||
DEFAULT_DB_RETRY_ATTEMPTS = 3
|
||||
DEFAULT_DB_RETRY_DELAYS_SECONDS = (1.0, 2.0)
|
||||
|
||||
_DB_DISCONNECT_PATTERNS = (
|
||||
"server closed the connection unexpectedly",
|
||||
"connection already closed",
|
||||
"closed the connection",
|
||||
"connection not open",
|
||||
"terminating connection",
|
||||
"connection reset",
|
||||
"broken pipe",
|
||||
"connection invalidated",
|
||||
)
|
||||
|
||||
|
||||
def is_retryable_db_disconnect(exc: BaseException) -> bool:
|
||||
if isinstance(exc, DBAPIError) and exc.connection_invalidated:
|
||||
return True
|
||||
|
||||
if not _is_db_operational_error(exc):
|
||||
return False
|
||||
|
||||
original_exception = getattr(exc, "orig", None)
|
||||
message = f"{exc} {original_exception or ''}".lower()
|
||||
return any(pattern in message for pattern in _DB_DISCONNECT_PATTERNS)
|
||||
|
||||
|
||||
def run_with_db_retry[T](
|
||||
operation_name: str,
|
||||
operation: Callable[[], T],
|
||||
*,
|
||||
logger: logging.Logger,
|
||||
attempts: int = DEFAULT_DB_RETRY_ATTEMPTS,
|
||||
delays_seconds: tuple[float, ...] = DEFAULT_DB_RETRY_DELAYS_SECONDS,
|
||||
) -> T:
|
||||
for attempt in range(1, attempts + 1):
|
||||
try:
|
||||
return operation()
|
||||
except Exception as exc:
|
||||
if not is_retryable_db_disconnect(exc) or attempt == attempts:
|
||||
raise
|
||||
delay = delays_seconds[min(attempt - 1, len(delays_seconds) - 1)]
|
||||
logger.warning(
|
||||
"Retrying %s after retryable DB disconnect (attempt %s/%s, sleep %.1fs)",
|
||||
operation_name,
|
||||
attempt,
|
||||
attempts,
|
||||
delay,
|
||||
exc_info=True,
|
||||
)
|
||||
time.sleep(delay)
|
||||
raise RuntimeError(f"{operation_name} did not complete")
|
||||
|
||||
|
||||
def _is_db_operational_error(exc: BaseException) -> bool:
|
||||
if isinstance(exc, SQLAlchemyOperationalError):
|
||||
return True
|
||||
|
||||
return exc.__class__.__name__ == "OperationalError" and exc.__class__.__module__.startswith(("psycopg", "psycopg2"))
|
||||
@ -40,6 +40,7 @@ from services.errors.app import QuotaExceededError
|
||||
from services.quota_service import QuotaService
|
||||
from services.trigger.app_trigger_service import AppTriggerService
|
||||
from services.workflow.entities import WebhookTriggerData
|
||||
from services.workflow_service import WorkflowService
|
||||
|
||||
try:
|
||||
import magic
|
||||
@ -114,6 +115,7 @@ class WebhookService:
|
||||
workflow = session.scalar(
|
||||
select(Workflow)
|
||||
.where(
|
||||
Workflow.tenant_id == webhook_trigger.tenant_id,
|
||||
Workflow.app_id == webhook_trigger.app_id,
|
||||
Workflow.version == Workflow.VERSION_DRAFT,
|
||||
)
|
||||
@ -125,6 +127,7 @@ class WebhookService:
|
||||
app_trigger = session.scalar(
|
||||
select(AppTrigger)
|
||||
.where(
|
||||
AppTrigger.tenant_id == webhook_trigger.tenant_id,
|
||||
AppTrigger.app_id == webhook_trigger.app_id,
|
||||
AppTrigger.node_id == webhook_trigger.node_id,
|
||||
AppTrigger.trigger_type == AppTriggerType.TRIGGER_WEBHOOK,
|
||||
@ -145,16 +148,18 @@ class WebhookService:
|
||||
if app_trigger.status != AppTriggerStatus.ENABLED:
|
||||
raise ValueError(f"Webhook trigger is disabled for webhook {webhook_id}")
|
||||
|
||||
# Get workflow
|
||||
workflow = session.scalar(
|
||||
select(Workflow)
|
||||
app = session.scalar(
|
||||
select(App)
|
||||
.where(
|
||||
Workflow.app_id == webhook_trigger.app_id,
|
||||
Workflow.version != Workflow.VERSION_DRAFT,
|
||||
App.tenant_id == webhook_trigger.tenant_id,
|
||||
App.id == webhook_trigger.app_id,
|
||||
)
|
||||
.order_by(Workflow.created_at.desc())
|
||||
.limit(1)
|
||||
)
|
||||
if not app:
|
||||
raise ValueError(f"App not found for webhook {webhook_id}")
|
||||
|
||||
workflow = WorkflowService().get_published_workflow(app, session=session)
|
||||
if not workflow:
|
||||
raise ValueError(f"Workflow not found for app {webhook_trigger.app_id}")
|
||||
|
||||
|
||||
@ -333,7 +333,7 @@ class VectorService:
|
||||
|
||||
# Add documents to vector store if any
|
||||
if documents and dataset.is_multimodal:
|
||||
vector.add_texts(documents, duplicate_check=True)
|
||||
vector.create_multimodal(documents)
|
||||
|
||||
# Single commit for all operations
|
||||
db.session.commit()
|
||||
|
||||
@ -12,7 +12,7 @@ from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
from celery import shared_task
|
||||
from sqlalchemy import func, select
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.db.session_factory import session_factory
|
||||
@ -35,7 +35,7 @@ from models.enums import (
|
||||
WorkflowRunTriggeredFrom,
|
||||
WorkflowTriggerStatus,
|
||||
)
|
||||
from models.model import EndUser
|
||||
from models.model import App, EndUser
|
||||
from models.provider_ids import TriggerProviderID
|
||||
from models.trigger import TriggerSubscription, WorkflowPluginTrigger, WorkflowTriggerLog
|
||||
from models.workflow import Workflow, WorkflowAppLog, WorkflowAppLogCreatedFrom, WorkflowRun
|
||||
@ -99,23 +99,25 @@ def dispatch_trigger_debug_event(
|
||||
return 0
|
||||
|
||||
|
||||
def _get_latest_workflows_by_app_ids(
|
||||
def _get_published_workflows_by_app_ids(
|
||||
session: Session, subscribers: Sequence[WorkflowPluginTrigger]
|
||||
) -> Mapping[str, Workflow]:
|
||||
"""Get the latest workflows by app_ids"""
|
||||
workflow_query = (
|
||||
select(Workflow.app_id, func.max(Workflow.created_at).label("max_created_at"))
|
||||
.where(
|
||||
Workflow.app_id.in_({t.app_id for t in subscribers}),
|
||||
Workflow.version != Workflow.VERSION_DRAFT,
|
||||
)
|
||||
.group_by(Workflow.app_id)
|
||||
.subquery()
|
||||
)
|
||||
"""Get current published workflows through apps.workflow_id."""
|
||||
app_ids = {trigger.app_id for trigger in subscribers}
|
||||
tenant_ids = {trigger.tenant_id for trigger in subscribers}
|
||||
if not app_ids or not tenant_ids:
|
||||
return {}
|
||||
|
||||
workflows = session.scalars(
|
||||
select(Workflow).join(
|
||||
workflow_query,
|
||||
(Workflow.app_id == workflow_query.c.app_id) & (Workflow.created_at == workflow_query.c.max_created_at),
|
||||
select(Workflow)
|
||||
.join(App, App.workflow_id == Workflow.id)
|
||||
.where(
|
||||
App.id.in_(app_ids),
|
||||
App.tenant_id.in_(tenant_ids),
|
||||
App.workflow_id.isnot(None),
|
||||
Workflow.app_id == App.id,
|
||||
Workflow.tenant_id == App.tenant_id,
|
||||
Workflow.version != Workflow.VERSION_DRAFT,
|
||||
)
|
||||
).all()
|
||||
return {w.app_id: w for w in workflows}
|
||||
@ -262,7 +264,7 @@ def dispatch_triggered_workflow(
|
||||
|
||||
# Ensure expire_on_commit is set to False to remain workflows available
|
||||
with session_factory.create_session() as session:
|
||||
workflows: Mapping[str, Workflow] = _get_latest_workflows_by_app_ids(session, subscribers)
|
||||
workflows: Mapping[str, Workflow] = _get_published_workflows_by_app_ids(session, subscribers)
|
||||
|
||||
end_users: Mapping[str, EndUser] = EndUserService.create_end_user_batch(
|
||||
type=EndUserType.TRIGGER,
|
||||
|
||||
@ -6,8 +6,10 @@ from unittest.mock import ANY, MagicMock, patch
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
import pytest
|
||||
from sqlalchemy.exc import OperationalError
|
||||
|
||||
from services.retention.workflow_run.archive_paid_plan_workflow_run import (
|
||||
ArchiveResult,
|
||||
ArchiveSummary,
|
||||
WorkflowRunArchiver,
|
||||
)
|
||||
@ -32,6 +34,30 @@ class FakeArchiveStorage:
|
||||
return sorted(key for key in self.objects if key.startswith(prefix))
|
||||
|
||||
|
||||
def _db_disconnect_error() -> OperationalError:
|
||||
return OperationalError(
|
||||
"select 1",
|
||||
{},
|
||||
RuntimeError("server closed the connection unexpectedly"),
|
||||
connection_invalidated=True,
|
||||
)
|
||||
|
||||
|
||||
def _run(run_id: str = "run-1"):
|
||||
run = MagicMock()
|
||||
run.id = run_id
|
||||
run.tenant_id = "tenant-1"
|
||||
run.created_at = datetime.datetime(2025, 3, 15, 10, 0, 0)
|
||||
return run
|
||||
|
||||
|
||||
def _session_context(session):
|
||||
context = MagicMock()
|
||||
context.__enter__.return_value = session
|
||||
context.__exit__.return_value = False
|
||||
return context
|
||||
|
||||
|
||||
class TestWorkflowRunArchiverInit:
|
||||
def test_start_from_without_end_before_raises(self):
|
||||
with pytest.raises(ValueError, match="start_from and end_before must be provided together"):
|
||||
@ -139,6 +165,32 @@ class TestWorkflowRunArchiverInit:
|
||||
repo.get_runs_batch_by_time_range.assert_called_once()
|
||||
assert repo.get_runs_batch_by_time_range.call_args.kwargs["tenant_ids"] == ["tenant-b"]
|
||||
|
||||
def test_get_runs_batch_retries_retryable_db_disconnect(self):
|
||||
repo = MagicMock()
|
||||
repo.get_runs_batch_by_time_range.side_effect = [_db_disconnect_error(), []]
|
||||
archiver = WorkflowRunArchiver(workflow_run_repo=repo)
|
||||
|
||||
with patch("services.retention.workflow_run.db_retry.time.sleep") as sleep:
|
||||
runs = archiver._get_runs_batch(None)
|
||||
|
||||
assert runs == []
|
||||
assert repo.get_runs_batch_by_time_range.call_count == 2
|
||||
sleep.assert_called_once_with(1.0)
|
||||
|
||||
def test_get_runs_batch_does_not_retry_non_db_broken_pipe_error(self):
|
||||
repo = MagicMock()
|
||||
repo.get_runs_batch_by_time_range.side_effect = RuntimeError("broken pipe")
|
||||
archiver = WorkflowRunArchiver(workflow_run_repo=repo)
|
||||
|
||||
with (
|
||||
patch("services.retention.workflow_run.db_retry.time.sleep") as sleep,
|
||||
pytest.raises(RuntimeError, match="broken pipe"),
|
||||
):
|
||||
archiver._get_runs_batch(None)
|
||||
|
||||
repo.get_runs_batch_by_time_range.assert_called_once()
|
||||
sleep.assert_not_called()
|
||||
|
||||
def test_start_message_includes_shard(self):
|
||||
archiver = WorkflowRunArchiver(tenant_prefixes=["0"], run_shard_index=1, run_shard_total=4)
|
||||
|
||||
@ -351,6 +403,72 @@ class TestDryRunArchive:
|
||||
assert summary.table_stats["workflow_app_logs"].size_bytes == 32
|
||||
|
||||
|
||||
class TestArchiveDbRetry:
|
||||
def test_archive_bundle_groups_retries_with_fresh_session(self):
|
||||
archiver = WorkflowRunArchiver(days=90)
|
||||
run = _run()
|
||||
session_maker = MagicMock(
|
||||
side_effect=[
|
||||
_session_context(MagicMock(name="session-1")),
|
||||
_session_context(MagicMock(name="session-2")),
|
||||
]
|
||||
)
|
||||
success = ArchiveResult(
|
||||
bundle_id=archiver._build_bundle_identity([run]).bundle_id,
|
||||
tenant_id=run.tenant_id,
|
||||
object_prefix=archiver._build_bundle_identity([run]).object_prefix,
|
||||
run_count=1,
|
||||
success=True,
|
||||
)
|
||||
|
||||
with (
|
||||
patch.object(archiver, "_archive_bundle", side_effect=[_db_disconnect_error(), success]) as archive_bundle,
|
||||
patch("services.retention.workflow_run.db_retry.time.sleep") as sleep,
|
||||
):
|
||||
results = archiver._archive_bundle_groups(session_maker, MagicMock(), [[run]])
|
||||
|
||||
assert results == [success]
|
||||
assert archive_bundle.call_count == 2
|
||||
assert session_maker.call_count == 2
|
||||
sleep.assert_called_once_with(1.0)
|
||||
|
||||
def test_archive_bundle_groups_returns_failed_result_after_retry_exhaustion(self):
|
||||
archiver = WorkflowRunArchiver(days=90)
|
||||
run = _run()
|
||||
session_maker = MagicMock(
|
||||
side_effect=[
|
||||
_session_context(MagicMock(name="session-1")),
|
||||
_session_context(MagicMock(name="session-2")),
|
||||
_session_context(MagicMock(name="session-3")),
|
||||
]
|
||||
)
|
||||
|
||||
with (
|
||||
patch.object(archiver, "_archive_bundle", side_effect=[_db_disconnect_error()] * 3) as archive_bundle,
|
||||
patch("services.retention.workflow_run.db_retry.time.sleep") as sleep,
|
||||
):
|
||||
results = archiver._archive_bundle_groups(session_maker, MagicMock(), [[run]])
|
||||
|
||||
assert len(results) == 1
|
||||
assert results[0].success is False
|
||||
assert "server closed the connection unexpectedly" in (results[0].error or "")
|
||||
assert archive_bundle.call_count == archiver.DB_RETRY_ATTEMPTS
|
||||
assert session_maker.call_count == archiver.DB_RETRY_ATTEMPTS
|
||||
assert sleep.call_count == archiver.DB_RETRY_ATTEMPTS - 1
|
||||
|
||||
def test_archive_bundle_uses_safe_rollback_when_failure_rolls_back_badly(self):
|
||||
archiver = WorkflowRunArchiver(days=90, dry_run=True)
|
||||
session = MagicMock()
|
||||
session.rollback.side_effect = RuntimeError("rollback failed")
|
||||
|
||||
with patch.object(archiver, "_extract_bundle_data", side_effect=RuntimeError("extract failed")):
|
||||
result = archiver._archive_bundle(session, None, [_run()])
|
||||
|
||||
assert result.success is False
|
||||
assert result.error == "extract failed"
|
||||
session.rollback.assert_called_once()
|
||||
|
||||
|
||||
class TestArchiveRunIdempotency:
|
||||
def _index_payload(self, archiver: WorkflowRunArchiver, run_ids: list[str], run) -> tuple[str, bytes]:
|
||||
identity = archiver._build_bundle_identity([run])
|
||||
|
||||
@ -127,6 +127,9 @@ class TestWebhookService:
|
||||
db_session_with_containers.add(workflow)
|
||||
db_session_with_containers.flush()
|
||||
|
||||
app.workflow_id = workflow.id
|
||||
db_session_with_containers.flush()
|
||||
|
||||
# Create webhook trigger
|
||||
webhook_id = fake.uuid4()[:16]
|
||||
webhook_trigger = WorkflowWebhookTrigger(
|
||||
|
||||
@ -2,6 +2,7 @@ from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch
|
||||
from uuid import uuid4
|
||||
@ -240,6 +241,40 @@ class TestWebhookServiceLookupWithContainers:
|
||||
with pytest.raises(ValueError, match="Workflow not found"):
|
||||
WebhookService.get_webhook_trigger_and_workflow(webhook_trigger.webhook_id)
|
||||
|
||||
def test_get_webhook_trigger_and_workflow_uses_app_workflow_id(
|
||||
self, db_session_with_containers: Session, flask_app_with_containers: Flask
|
||||
):
|
||||
del flask_app_with_containers
|
||||
factory = WebhookServiceRelationshipFactory
|
||||
account, tenant = factory.create_account_and_tenant(db_session_with_containers)
|
||||
app = factory.create_app(db_session_with_containers, tenant, account)
|
||||
current_workflow = factory.create_workflow(
|
||||
db_session_with_containers, app=app, account=account, node_ids=["node-1"], version="2026-04-14.001"
|
||||
)
|
||||
newer_workflow = factory.create_workflow(
|
||||
db_session_with_containers, app=app, account=account, node_ids=["node-1"], version="2026-04-15.001"
|
||||
)
|
||||
current_workflow.created_at = datetime(2026, 4, 14)
|
||||
newer_workflow.created_at = datetime(2026, 4, 15)
|
||||
app.workflow_id = current_workflow.id
|
||||
db_session_with_containers.commit()
|
||||
|
||||
webhook_trigger = factory.create_webhook_trigger(
|
||||
db_session_with_containers, app=app, account=account, node_id="node-1"
|
||||
)
|
||||
factory.create_app_trigger(
|
||||
db_session_with_containers, app=app, node_id="node-1", status=AppTriggerStatus.ENABLED
|
||||
)
|
||||
|
||||
got_trigger, got_workflow, got_node_config = WebhookService.get_webhook_trigger_and_workflow(
|
||||
webhook_trigger.webhook_id
|
||||
)
|
||||
|
||||
assert got_trigger.id == webhook_trigger.id
|
||||
assert got_workflow.id == current_workflow.id
|
||||
assert got_workflow.id != newer_workflow.id
|
||||
assert got_node_config["id"] == "node-1"
|
||||
|
||||
def test_get_webhook_trigger_and_workflow_returns_debug_draft_workflow(
|
||||
self, db_session_with_containers: Session, flask_app_with_containers: Flask
|
||||
):
|
||||
|
||||
139
api/tests/unit_tests/commands/test_archive_workflow_runs.py
Normal file
139
api/tests/unit_tests/commands/test_archive_workflow_runs.py
Normal file
@ -0,0 +1,139 @@
|
||||
import datetime
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import click
|
||||
import pytest
|
||||
from sqlalchemy.exc import OperationalError
|
||||
|
||||
from commands import retention
|
||||
|
||||
|
||||
def _db_disconnect_error() -> OperationalError:
|
||||
return OperationalError(
|
||||
"select 1",
|
||||
{},
|
||||
RuntimeError("server closed the connection unexpectedly"),
|
||||
connection_invalidated=True,
|
||||
)
|
||||
|
||||
|
||||
def _session_context(session):
|
||||
context = MagicMock()
|
||||
context.__enter__.return_value = session
|
||||
context.__exit__.return_value = False
|
||||
return context
|
||||
|
||||
|
||||
def test_resolve_archive_tenant_ids_from_plan_uses_explicit_sessions(monkeypatch):
|
||||
end_before = datetime.datetime(2025, 4, 1, tzinfo=datetime.UTC)
|
||||
sessions = [MagicMock(name="session-a"), MagicMock(name="session-b")]
|
||||
session_maker = MagicMock(side_effect=[_session_context(sessions[0]), _session_context(sessions[1])])
|
||||
calls = []
|
||||
|
||||
def get_candidate_tenants(session, prefix, *, start_from, end_before):
|
||||
calls.append((session, prefix, start_from, end_before))
|
||||
return [f"{prefix}-paid", f"{prefix}-free"]
|
||||
|
||||
monkeypatch.setattr(retention, "_get_archive_candidate_tenant_ids_by_prefix", get_candidate_tenants)
|
||||
monkeypatch.setattr(
|
||||
retention,
|
||||
"_filter_paid_workflow_archive_tenant_ids",
|
||||
lambda tenant_ids: (["a-paid", "b-paid"], ["a-free", "b-free"]),
|
||||
)
|
||||
|
||||
tenant_plan = retention._resolve_archive_tenant_ids_from_plan(
|
||||
session_maker=session_maker,
|
||||
tenant_ids=None,
|
||||
tenant_prefixes=["a", "b"],
|
||||
start_from=None,
|
||||
end_before=end_before,
|
||||
)
|
||||
|
||||
assert tenant_plan["archive_tenant_ids"] == ["a-paid", "b-paid"]
|
||||
assert tenant_plan["paid_tenant_ids"] == ["a-paid", "b-paid"]
|
||||
assert tenant_plan["unpaid_tenant_ids"] == ["a-free", "b-free"]
|
||||
assert calls == [
|
||||
(sessions[0], "a", None, end_before),
|
||||
(sessions[1], "b", None, end_before),
|
||||
]
|
||||
|
||||
|
||||
def test_safe_remove_scoped_session_discards_registry_and_disposes_after_remove_error(monkeypatch):
|
||||
fake_db = MagicMock()
|
||||
fake_db.session.remove.side_effect = RuntimeError("server closed the connection unexpectedly")
|
||||
monkeypatch.setattr(retention, "db", fake_db)
|
||||
|
||||
retention._safe_remove_scoped_session("archive workflow run command")
|
||||
|
||||
fake_db.session.remove.assert_called_once()
|
||||
fake_db.session.registry.clear.assert_called_once()
|
||||
fake_db.engine.dispose.assert_called_once()
|
||||
|
||||
|
||||
def test_archive_command_db_retry_retries_retryable_db_disconnect(monkeypatch):
|
||||
operation = MagicMock(side_effect=[_db_disconnect_error(), "ok"])
|
||||
sleep = MagicMock()
|
||||
monkeypatch.setattr("services.retention.workflow_run.db_retry.time.sleep", sleep)
|
||||
|
||||
result = retention._run_archive_command_db_retry("archive plan", operation)
|
||||
|
||||
assert result == "ok"
|
||||
assert operation.call_count == 2
|
||||
sleep.assert_called_once_with(1.0)
|
||||
|
||||
|
||||
def test_archive_plan_prefix_stats_retries_count_query_with_fresh_session(monkeypatch):
|
||||
end_before = datetime.datetime(2025, 4, 1, tzinfo=datetime.UTC)
|
||||
sessions = [MagicMock(name="session-1"), MagicMock(name="session-2")]
|
||||
sessions[0].scalar.side_effect = _db_disconnect_error()
|
||||
sessions[1].scalar.side_effect = [7, 9]
|
||||
session_maker = MagicMock(side_effect=[_session_context(sessions[0]), _session_context(sessions[1])])
|
||||
sleep = MagicMock()
|
||||
|
||||
monkeypatch.setattr(
|
||||
retention,
|
||||
"_get_archive_candidate_tenant_ids_by_prefix",
|
||||
lambda session, prefix, *, start_from, end_before: [f"{prefix}-tenant"],
|
||||
)
|
||||
monkeypatch.setattr("services.retention.workflow_run.db_retry.time.sleep", sleep)
|
||||
|
||||
stats = retention._get_archive_plan_prefix_stats(
|
||||
session_maker,
|
||||
"a",
|
||||
start_from=None,
|
||||
end_before=end_before,
|
||||
)
|
||||
|
||||
assert stats["tenant_ids"] == ["a-tenant"]
|
||||
assert stats["workflow_runs"] == 7
|
||||
assert stats["workflow_node_executions"] == 9
|
||||
assert session_maker.call_count == 2
|
||||
sleep.assert_called_once_with(1.0)
|
||||
|
||||
|
||||
def test_archive_workflow_runs_raises_click_exception_when_tenant_plan_fails(monkeypatch):
|
||||
fake_db = MagicMock()
|
||||
monkeypatch.setattr(retention, "db", fake_db)
|
||||
monkeypatch.setattr(
|
||||
retention,
|
||||
"_resolve_archive_tenant_ids_from_plan",
|
||||
MagicMock(side_effect=RuntimeError("tenant plan failed")),
|
||||
)
|
||||
|
||||
with pytest.raises(click.ClickException, match="Failed to resolve workflow archive tenant plan"):
|
||||
retention.archive_workflow_runs.callback(
|
||||
tenant_ids="tenant-1",
|
||||
tenant_prefixes=None,
|
||||
before_days=90,
|
||||
from_days_ago=None,
|
||||
to_days_ago=None,
|
||||
start_from=None,
|
||||
end_before=None,
|
||||
batch_size=10000,
|
||||
workers=1,
|
||||
run_shard_index=None,
|
||||
run_shard_total=None,
|
||||
limit=None,
|
||||
dry_run=True,
|
||||
delete_after_archive=False,
|
||||
)
|
||||
@ -43,6 +43,57 @@ class TestMCPClient:
|
||||
assert client.timeout is None
|
||||
assert client.sse_read_timeout is None
|
||||
|
||||
def test_init_with_dynamic_request_headers(self):
|
||||
"""Test client initialization with dynamic request headers."""
|
||||
from flask import Flask
|
||||
|
||||
app = Flask("test")
|
||||
with app.test_request_context(headers={"X-Custom-Auth": "my-secret-token", "X-User-Id": "user123"}):
|
||||
client = MCPClient(
|
||||
server_url="http://test.example.com",
|
||||
headers={
|
||||
"Authorization": "Bearer {{request.headers.X-Custom-Auth}}",
|
||||
"X-Request-User": "{{request.header.X-User-Id}}",
|
||||
"X-Static-Header": "static-val",
|
||||
},
|
||||
)
|
||||
|
||||
assert client.headers == {
|
||||
"Authorization": "Bearer my-secret-token",
|
||||
"X-Request-User": "user123",
|
||||
"X-Static-Header": "static-val",
|
||||
}
|
||||
|
||||
def test_init_with_dynamic_request_headers_missing(self):
|
||||
"""Test client initialization with dynamic request headers that are missing."""
|
||||
from flask import Flask
|
||||
|
||||
app = Flask("test")
|
||||
with app.test_request_context(headers={}):
|
||||
client = MCPClient(
|
||||
server_url="http://test.example.com",
|
||||
headers={
|
||||
"Authorization": "Bearer {{request.headers.X-Custom-Auth}}",
|
||||
},
|
||||
)
|
||||
|
||||
assert client.headers == {
|
||||
"Authorization": "Bearer ",
|
||||
}
|
||||
|
||||
def test_init_with_dynamic_request_headers_no_context(self):
|
||||
"""Test client initialization with dynamic request headers but no request context."""
|
||||
client = MCPClient(
|
||||
server_url="http://test.example.com",
|
||||
headers={
|
||||
"Authorization": "Bearer {{request.headers.X-Custom-Auth}}",
|
||||
},
|
||||
)
|
||||
|
||||
assert client.headers == {
|
||||
"Authorization": "Bearer {{request.headers.X-Custom-Auth}}",
|
||||
}
|
||||
|
||||
@patch("core.mcp.mcp_client.streamablehttp_client")
|
||||
@patch("core.mcp.mcp_client.ClientSession")
|
||||
def test_initialize_with_mcp_url(self, mock_client_session, mock_streamable_client):
|
||||
|
||||
111
api/tests/unit_tests/migrations/test_uuidv7_pg18_migration.py
Normal file
111
api/tests/unit_tests/migrations/test_uuidv7_pg18_migration.py
Normal file
@ -0,0 +1,111 @@
|
||||
"""Tests for the uuidv7 SQL migration's PostgreSQL 18 compatibility guard.
|
||||
|
||||
The migration file name is not a valid Python identifier (it starts with a date and
|
||||
contains hyphens), so it is loaded directly from its path. The ``models`` import at the
|
||||
top of the migration is stubbed because the migration never uses it during
|
||||
``upgrade()``/``downgrade()`` and pulling in the real package would require a full app
|
||||
context.
|
||||
"""
|
||||
|
||||
import importlib.util
|
||||
import sys
|
||||
import types
|
||||
from pathlib import Path
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
|
||||
MIGRATION_PATH = (
|
||||
Path(__file__).resolve().parents[3]
|
||||
/ "migrations"
|
||||
/ "versions"
|
||||
/ "2025_07_02_2332-1c9ba48be8e4_add_uuidv7_function_in_sql.py"
|
||||
)
|
||||
|
||||
|
||||
def _load_migration():
|
||||
# The migration does `import models as models` but never references it, so a stub is
|
||||
# enough and keeps the test free of any database/app configuration.
|
||||
sys.modules.setdefault("models", types.ModuleType("models"))
|
||||
spec = importlib.util.spec_from_file_location("uuidv7_pg18_migration_under_test", MIGRATION_PATH)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
def _make_bind(dialect_name):
|
||||
bind = mock.MagicMock()
|
||||
bind.dialect.name = dialect_name
|
||||
return bind
|
||||
|
||||
|
||||
def _executed_sql(fake_op):
|
||||
return [str(call.args[0]) for call in fake_op.execute.call_args_list]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def migration():
|
||||
return _load_migration()
|
||||
|
||||
|
||||
def test_upgrade_creates_both_functions_when_native_uuidv7_absent(migration):
|
||||
# PostgreSQL 13 to 17: no native pg_catalog.uuidv7(), so both functions are created.
|
||||
# The DO block contains the CREATE FUNCTION guarded by IF NOT EXISTS, and
|
||||
# uuidv7_boundary is created unconditionally.
|
||||
bind = _make_bind("postgresql")
|
||||
with mock.patch.object(migration, "op") as fake_op:
|
||||
fake_op.get_bind.return_value = bind
|
||||
migration.upgrade()
|
||||
|
||||
sql = _executed_sql(fake_op)
|
||||
assert any("CREATE FUNCTION public.uuidv7()" in stmt for stmt in sql)
|
||||
assert any("CREATE FUNCTION public.uuidv7_boundary(timestamptz)" in stmt for stmt in sql)
|
||||
|
||||
|
||||
def test_upgrade_skips_uuidv7_but_keeps_boundary_when_native_present(migration):
|
||||
# PostgreSQL 18: native pg_catalog.uuidv7() exists, so the DO block must guard
|
||||
# the CREATE FUNCTION with an IF NOT EXISTS check against pg_catalog.
|
||||
# uuidv7_boundary is still missing and has to be created unconditionally.
|
||||
bind = _make_bind("postgresql")
|
||||
with mock.patch.object(migration, "op") as fake_op:
|
||||
fake_op.get_bind.return_value = bind
|
||||
migration.upgrade()
|
||||
|
||||
sql = _executed_sql(fake_op)
|
||||
# The DO block must contain the pg_catalog existence check.
|
||||
do_block = next((stmt for stmt in sql if "DO $do$" in stmt), None)
|
||||
assert do_block is not None
|
||||
assert "pg_catalog" in do_block
|
||||
assert "uuidv7" in do_block
|
||||
assert "IF NOT EXISTS" in do_block
|
||||
# uuidv7_boundary is always created (not guarded by the DO block).
|
||||
assert any("CREATE FUNCTION public.uuidv7_boundary(timestamptz)" in stmt for stmt in sql)
|
||||
|
||||
|
||||
def test_upgrade_is_noop_on_non_postgres(migration):
|
||||
bind = _make_bind("sqlite")
|
||||
with mock.patch.object(migration, "op") as fake_op:
|
||||
fake_op.get_bind.return_value = bind
|
||||
migration.upgrade()
|
||||
|
||||
fake_op.execute.assert_not_called()
|
||||
|
||||
|
||||
def test_downgrade_uses_if_exists_and_public_schema(migration):
|
||||
bind = _make_bind("postgresql")
|
||||
with mock.patch.object(migration, "op") as fake_op:
|
||||
fake_op.get_bind.return_value = bind
|
||||
migration.downgrade()
|
||||
|
||||
sql = _executed_sql(fake_op)
|
||||
assert "DROP FUNCTION IF EXISTS public.uuidv7()" in sql
|
||||
assert "DROP FUNCTION IF EXISTS public.uuidv7_boundary(timestamptz)" in sql
|
||||
|
||||
|
||||
def test_downgrade_is_noop_on_non_postgres(migration):
|
||||
bind = _make_bind("sqlite")
|
||||
with mock.patch.object(migration, "op") as fake_op:
|
||||
fake_op.get_bind.return_value = bind
|
||||
migration.downgrade()
|
||||
|
||||
fake_op.execute.assert_not_called()
|
||||
@ -633,6 +633,8 @@ class TestMyPermissions:
|
||||
assert "dataset.acl.preview" in out.workspace.permission_keys
|
||||
assert "app.acl.preview" in out.app.default_permission_keys
|
||||
assert "dataset.acl.preview" in out.dataset.default_permission_keys
|
||||
if role == "editor":
|
||||
assert "app.acl.log_and_annotation" in out.app.default_permission_keys
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("role", "expected_snippet_keys"),
|
||||
|
||||
@ -639,8 +639,8 @@ def test_update_multimodel_vector_adds_bindings_and_vectors_and_skips_missing_up
|
||||
assert len(bindings) == 1
|
||||
assert bindings[0]["attachment_id"] == "file-1"
|
||||
|
||||
vector_instance.add_texts.assert_called_once()
|
||||
documents = vector_instance.add_texts.call_args.args[0]
|
||||
vector_instance.create_multimodal.assert_called_once()
|
||||
documents = vector_instance.create_multimodal.call_args.args[0]
|
||||
assert len(documents) == 1
|
||||
assert documents[0].page_content == "img.png"
|
||||
assert documents[0].metadata["doc_id"] == "file-1"
|
||||
|
||||
@ -98,7 +98,7 @@ class TestDispatchTriggeredWorkflow:
|
||||
),
|
||||
patch.object(
|
||||
trigger_processing_tasks_module,
|
||||
"_get_latest_workflows_by_app_ids",
|
||||
"_get_published_workflows_by_app_ids",
|
||||
) as get_workflows,
|
||||
patch.object(
|
||||
trigger_processing_tasks_module.EndUserService,
|
||||
|
||||
@ -232,7 +232,20 @@ describe('Billing Page + Plan Integration', () => {
|
||||
|
||||
// Verify billing URL button visibility and behavior
|
||||
describe('Billing URL button', () => {
|
||||
it('should show billing button when subscription management permission is granted', () => {
|
||||
it('should show billing button when manager has subscription management permission', () => {
|
||||
setupProviderContext({ type: Plan.sandbox })
|
||||
setupAppContext({
|
||||
isCurrentWorkspaceManager: true,
|
||||
workspacePermissionKeys: ['billing.subscription.manage'],
|
||||
})
|
||||
|
||||
render(<Billing />)
|
||||
|
||||
expect(screen.getByText(/viewBillingTitle/i)).toBeInTheDocument()
|
||||
expect(screen.getByText(/viewBillingAction/i)).toBeInTheDocument()
|
||||
})
|
||||
|
||||
it('should hide billing button when subscription management permission is granted without manager role', () => {
|
||||
setupProviderContext({ type: Plan.sandbox })
|
||||
setupAppContext({
|
||||
isCurrentWorkspaceManager: false,
|
||||
@ -241,8 +254,7 @@ describe('Billing Page + Plan Integration', () => {
|
||||
|
||||
render(<Billing />)
|
||||
|
||||
expect(screen.getByText(/viewBillingTitle/i)).toBeInTheDocument()
|
||||
expect(screen.getByText(/viewBillingAction/i)).toBeInTheDocument()
|
||||
expect(screen.queryByText(/viewBillingTitle/i)).not.toBeInTheDocument()
|
||||
})
|
||||
|
||||
it('should hide billing button when subscription management permission is missing', () => {
|
||||
|
||||
@ -21,6 +21,7 @@ let mockChatConversationDetail: Record<string, unknown> | undefined
|
||||
let mockCompletionConversationDetail: Record<string, unknown> | undefined
|
||||
let mockShowMessageLogModal = false
|
||||
let mockShowPromptLogModal = false
|
||||
let mockShowAgentLogModal = false
|
||||
let mockCurrentLogItem: Record<string, unknown> | undefined
|
||||
let mockCurrentLogModalActiveTab = 'messages'
|
||||
|
||||
@ -81,6 +82,7 @@ vi.mock('@/app/components/app/store', () => ({
|
||||
setShowAgentLogModal: mockSetShowAgentLogModal,
|
||||
setShowMessageLogModal: mockSetShowMessageLogModal,
|
||||
showPromptLogModal: mockShowPromptLogModal,
|
||||
showAgentLogModal: mockShowAgentLogModal,
|
||||
currentLogModalActiveTab: mockCurrentLogModalActiveTab,
|
||||
}),
|
||||
}))
|
||||
@ -126,6 +128,7 @@ vi.mock('@/app/components/base/chat/chat', () => ({
|
||||
onAnnotationEdited,
|
||||
onAnnotationRemoved,
|
||||
switchSibling,
|
||||
hideLogModal,
|
||||
}: {
|
||||
chatList: Array<{ id: string }>
|
||||
onFeedback: (mid: string, value: { rating: string, content?: string }) => Promise<boolean>
|
||||
@ -133,8 +136,9 @@ vi.mock('@/app/components/base/chat/chat', () => ({
|
||||
onAnnotationEdited: (query: string, answer: string, index: number) => void
|
||||
onAnnotationRemoved: (index: number) => Promise<boolean>
|
||||
switchSibling: (siblingMessageId: string) => void
|
||||
hideLogModal?: boolean
|
||||
}) => (
|
||||
<div data-testid="chat-panel">
|
||||
<div data-testid="chat-panel" data-hide-log-modal={String(hideLogModal)}>
|
||||
<div>{chatList.length}</div>
|
||||
<button onClick={() => void onFeedback('message-1', { rating: 'like', content: 'nice' })}>chat-feedback</button>
|
||||
<button onClick={() => onAnnotationAdded('annotation-2', 'Admin', 'Edited question', 'Edited answer', 1)}>chat-add-annotation</button>
|
||||
@ -145,6 +149,14 @@ vi.mock('@/app/components/base/chat/chat', () => ({
|
||||
),
|
||||
}))
|
||||
|
||||
vi.mock('@/app/components/base/agent-log-modal', () => ({
|
||||
default: ({ floating, onCancel }: { floating?: boolean, onCancel: () => void }) => (
|
||||
<div data-testid="agent-log-modal" data-floating={String(floating)}>
|
||||
<button onClick={onCancel}>close-agent-log-modal</button>
|
||||
</div>
|
||||
),
|
||||
}))
|
||||
|
||||
vi.mock('@/app/components/base/message-log-modal', () => ({
|
||||
default: ({ onCancel }: { onCancel: () => void }) => (
|
||||
<div data-testid="message-log-modal">
|
||||
@ -255,6 +267,7 @@ describe('ConversationList', () => {
|
||||
mockCompletionConversationDetail = undefined
|
||||
mockShowMessageLogModal = false
|
||||
mockShowPromptLogModal = false
|
||||
mockShowAgentLogModal = false
|
||||
mockCurrentLogItem = undefined
|
||||
mockCurrentLogModalActiveTab = 'messages'
|
||||
mockDelAnnotation.mockResolvedValue(undefined)
|
||||
@ -383,6 +396,7 @@ describe('ConversationList', () => {
|
||||
|
||||
expect(screen.getByTestId('var-panel')).toHaveTextContent('query:Latest question')
|
||||
expect(screen.getByTestId('model-info')).toHaveTextContent('gpt-4o')
|
||||
expect(screen.getByTestId('chat-panel')).toHaveAttribute('data-hide-log-modal', 'true')
|
||||
expect(screen.getByTestId('message-log-modal')).toBeInTheDocument()
|
||||
|
||||
fireEvent.click(screen.getByText('chat-feedback'))
|
||||
@ -399,6 +413,61 @@ describe('ConversationList', () => {
|
||||
})
|
||||
})
|
||||
|
||||
it('should mount agent log modals from the detail panel instead of the nested chat layout', async () => {
|
||||
mockChatConversationDetail = {
|
||||
id: 'conversation-1',
|
||||
created_at: 1710000000,
|
||||
model_config: {
|
||||
model: 'gpt-4o',
|
||||
configs: {
|
||||
introduction: 'Hello there',
|
||||
},
|
||||
user_input_form: [],
|
||||
},
|
||||
message: {
|
||||
inputs: {},
|
||||
},
|
||||
}
|
||||
mockShowAgentLogModal = true
|
||||
mockCurrentLogItem = {
|
||||
id: 'message-1',
|
||||
conversationId: 'conversation-1',
|
||||
}
|
||||
mockFetchChatMessages.mockResolvedValue({
|
||||
data: [
|
||||
{
|
||||
id: 'message-1',
|
||||
answer: 'Assistant reply',
|
||||
query: 'Latest question',
|
||||
created_at: 1710000000,
|
||||
inputs: {},
|
||||
feedbacks: [],
|
||||
message: [],
|
||||
message_files: [],
|
||||
agent_thoughts: [{ id: 'thought-1' }],
|
||||
},
|
||||
],
|
||||
has_more: false,
|
||||
})
|
||||
|
||||
renderConversationList({
|
||||
searchParams: '?page=2&conversation_id=conversation-1',
|
||||
})
|
||||
|
||||
await waitFor(() => {
|
||||
expect(screen.getByTestId('chat-panel')).toBeInTheDocument()
|
||||
})
|
||||
|
||||
expect(screen.getByTestId('chat-panel')).toHaveAttribute('data-hide-log-modal', 'true')
|
||||
expect(screen.getByTestId('agent-log-modal')).toBeInTheDocument()
|
||||
expect(screen.getByTestId('agent-log-modal')).toHaveAttribute('data-floating', 'true')
|
||||
|
||||
fireEvent.click(screen.getByText('close-agent-log-modal'))
|
||||
|
||||
expect(mockSetCurrentLogItem).toHaveBeenCalled()
|
||||
expect(mockSetShowAgentLogModal).toHaveBeenCalledWith(false)
|
||||
})
|
||||
|
||||
it('should render completion details and refetch after feedback updates', async () => {
|
||||
mockCompletionConversationDetail = {
|
||||
id: 'conversation-1',
|
||||
@ -424,7 +493,7 @@ describe('ConversationList', () => {
|
||||
},
|
||||
}
|
||||
mockShowPromptLogModal = true
|
||||
mockCurrentLogItem = { id: 'log-2' }
|
||||
mockCurrentLogItem = { id: 'log-2', log: [{ role: 'user', text: 'Prompt body' }] }
|
||||
|
||||
renderConversationList({
|
||||
appDetail: { id: 'app-1', mode: AppModeEnum.COMPLETION } as any,
|
||||
@ -626,7 +695,7 @@ describe('ConversationList', () => {
|
||||
},
|
||||
}
|
||||
mockShowPromptLogModal = true
|
||||
mockCurrentLogItem = { id: 'log-2' }
|
||||
mockCurrentLogItem = { id: 'log-2', log: [{ role: 'user', text: 'Prompt body' }] }
|
||||
|
||||
renderConversationList({
|
||||
appDetail: { id: 'app-1', mode: AppModeEnum.COMPLETION } as any,
|
||||
|
||||
@ -36,6 +36,7 @@ import ModelInfo from '@/app/components/app/log/model-info'
|
||||
import { useStore as useAppStore } from '@/app/components/app/store'
|
||||
import TextGeneration from '@/app/components/app/text-generate/item'
|
||||
import ActionButton from '@/app/components/base/action-button'
|
||||
import AgentLogModal from '@/app/components/base/agent-log-modal'
|
||||
import Chat from '@/app/components/base/chat/chat'
|
||||
import CopyIcon from '@/app/components/base/copy-icon'
|
||||
import Loading from '@/app/components/base/loading'
|
||||
@ -165,13 +166,25 @@ function DetailPanel({ detail, onFeedback }: IDetailPanel) {
|
||||
})
|
||||
const { formatTime } = useTimestamp()
|
||||
const { onClose, appDetail } = useContext(DrawerContext)
|
||||
const { currentLogItem, setCurrentLogItem, showMessageLogModal, setShowMessageLogModal, showPromptLogModal, setShowPromptLogModal, currentLogModalActiveTab } = useAppStore(useShallow((state: AppStoreState) => ({
|
||||
const {
|
||||
currentLogItem,
|
||||
setCurrentLogItem,
|
||||
showMessageLogModal,
|
||||
setShowMessageLogModal,
|
||||
showPromptLogModal,
|
||||
setShowPromptLogModal,
|
||||
showAgentLogModal,
|
||||
setShowAgentLogModal,
|
||||
currentLogModalActiveTab,
|
||||
} = useAppStore(useShallow((state: AppStoreState) => ({
|
||||
currentLogItem: state.currentLogItem,
|
||||
setCurrentLogItem: state.setCurrentLogItem,
|
||||
showMessageLogModal: state.showMessageLogModal,
|
||||
setShowMessageLogModal: state.setShowMessageLogModal,
|
||||
showPromptLogModal: state.showPromptLogModal,
|
||||
setShowPromptLogModal: state.setShowPromptLogModal,
|
||||
showAgentLogModal: state.showAgentLogModal,
|
||||
setShowAgentLogModal: state.setShowAgentLogModal,
|
||||
currentLogModalActiveTab: state.currentLogModalActiveTab,
|
||||
})))
|
||||
const { t } = useTranslation()
|
||||
@ -395,6 +408,7 @@ function DetailPanel({ detail, onFeedback }: IDetailPanel) {
|
||||
|
||||
const isChatMode = appDetail?.mode !== AppModeEnum.COMPLETION
|
||||
const isAdvanced = appDetail?.mode === AppModeEnum.ADVANCED_CHAT
|
||||
const shouldShowPromptLogModal = showPromptLogModal && !!currentLogItem?.log
|
||||
|
||||
const varList = getDetailVarList(detail, varValues)
|
||||
const message_files = getCompletionMessageFiles(detail, isChatMode)
|
||||
@ -507,6 +521,7 @@ function DetailPanel({ detail, onFeedback }: IDetailPanel) {
|
||||
noChatInput
|
||||
showPromptLog
|
||||
hideProcessDetail
|
||||
hideLogModal
|
||||
chatContainerInnerClassName="px-3"
|
||||
switchSibling={switchSibling}
|
||||
/>
|
||||
@ -546,6 +561,7 @@ function DetailPanel({ detail, onFeedback }: IDetailPanel) {
|
||||
noChatInput
|
||||
showPromptLog
|
||||
hideProcessDetail
|
||||
hideLogModal
|
||||
chatContainerInnerClassName="px-3"
|
||||
switchSibling={switchSibling}
|
||||
/>
|
||||
@ -574,7 +590,18 @@ function DetailPanel({ detail, onFeedback }: IDetailPanel) {
|
||||
/>
|
||||
</WorkflowContextProvider>
|
||||
)}
|
||||
{!isChatMode && showPromptLogModal && (
|
||||
{showAgentLogModal && (
|
||||
<AgentLogModal
|
||||
floating
|
||||
width={width}
|
||||
currentLogItem={currentLogItem}
|
||||
onCancel={() => {
|
||||
setCurrentLogItem()
|
||||
setShowAgentLogModal(false)
|
||||
}}
|
||||
/>
|
||||
)}
|
||||
{shouldShowPromptLogModal && (
|
||||
<PromptLogModal
|
||||
width={width}
|
||||
currentLogItem={currentLogItem}
|
||||
|
||||
@ -119,6 +119,17 @@ describe('AgentLogModal', () => {
|
||||
})
|
||||
})
|
||||
|
||||
it('should render the floating modal through a dialog portal', () => {
|
||||
vi.mocked(fetchAgentLogDetail).mockReturnValue(new Promise(() => {}))
|
||||
|
||||
const { container } = render(<AgentLogModal {...mockProps} floating />)
|
||||
|
||||
const modal = screen.getByRole('dialog')
|
||||
expect(container).not.toContainElement(modal)
|
||||
expect(document.body).toContainElement(modal)
|
||||
expect(modal).toHaveClass('fixed', 'z-50', 'w-[480px]!', 'left-[max(8px,calc(100vw-1136px))]!')
|
||||
})
|
||||
|
||||
it('should call onCancel when close button is clicked', () => {
|
||||
vi.mocked(fetchAgentLogDetail).mockReturnValue(new Promise(() => {}))
|
||||
|
||||
@ -158,4 +169,18 @@ describe('AgentLogModal', () => {
|
||||
|
||||
expect(mockProps.onCancel).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should not use click-away to close the floating dialog', () => {
|
||||
vi.mocked(fetchAgentLogDetail).mockReturnValue(new Promise(() => {}))
|
||||
|
||||
let clickAwayHandler!: (event: Event) => void
|
||||
vi.mocked(useClickAway).mockImplementation((callback) => {
|
||||
clickAwayHandler = callback
|
||||
})
|
||||
|
||||
render(<AgentLogModal {...mockProps} floating />)
|
||||
clickAwayHandler(new Event('click'))
|
||||
|
||||
expect(mockProps.onCancel).not.toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import type { FC } from 'react'
|
||||
import type { IChatItem } from '@/app/components/base/chat/chat/type'
|
||||
import { cn } from '@langgenius/dify-ui/cn'
|
||||
import { Dialog, DialogContent, DialogTitle } from '@langgenius/dify-ui/dialog'
|
||||
import { RiCloseLine } from '@remixicon/react'
|
||||
import { useClickAway } from 'ahooks'
|
||||
import { useEffect, useRef, useState } from 'react'
|
||||
@ -10,11 +11,13 @@ import AgentLogDetail from './detail'
|
||||
type AgentLogModalProps = Readonly<{
|
||||
currentLogItem?: IChatItem
|
||||
width: number
|
||||
floating?: boolean
|
||||
onCancel: () => void
|
||||
}>
|
||||
const AgentLogModal: FC<AgentLogModalProps> = ({
|
||||
currentLogItem,
|
||||
width,
|
||||
floating,
|
||||
onCancel,
|
||||
}) => {
|
||||
const { t } = useTranslation()
|
||||
@ -22,7 +25,7 @@ const AgentLogModal: FC<AgentLogModalProps> = ({
|
||||
const [mounted, setMounted] = useState(false)
|
||||
|
||||
useClickAway(() => {
|
||||
if (mounted)
|
||||
if (mounted && !floating)
|
||||
onCancel()
|
||||
}, ref)
|
||||
|
||||
@ -33,6 +36,44 @@ const AgentLogModal: FC<AgentLogModalProps> = ({
|
||||
if (!currentLogItem || !currentLogItem.conversationId)
|
||||
return null
|
||||
|
||||
const detailContent = (
|
||||
<>
|
||||
<AgentLogDetail
|
||||
conversationID={currentLogItem.conversationId}
|
||||
messageID={currentLogItem.id}
|
||||
log={currentLogItem}
|
||||
/>
|
||||
</>
|
||||
)
|
||||
|
||||
if (floating) {
|
||||
return (
|
||||
<Dialog
|
||||
open
|
||||
onOpenChange={(open) => {
|
||||
if (!open)
|
||||
onCancel()
|
||||
}}
|
||||
>
|
||||
<DialogContent
|
||||
backdropClassName="bg-transparent!"
|
||||
className="top-16! bottom-4! left-[max(8px,calc(100vw-1136px))]! flex max-h-none! w-[480px]! max-w-[calc(100vw-16px)]! translate-x-0! translate-y-0! flex-col overflow-hidden! rounded-xl! border-[0.5px]! border-components-panel-border! bg-components-panel-bg! p-0! pt-3! pb-3! shadow-xl!"
|
||||
>
|
||||
<DialogTitle className="text-md shrink-0 px-4 py-1 font-semibold text-text-primary">{t('runDetail.workflowTitle', { ns: 'appLog' })}</DialogTitle>
|
||||
<button
|
||||
type="button"
|
||||
aria-label={t('operation.close', { ns: 'common' })}
|
||||
className="absolute top-4 right-3 z-20 cursor-pointer border-none bg-transparent p-1 focus-visible:ring-1 focus-visible:ring-components-input-border-active focus-visible:outline-hidden"
|
||||
onClick={onCancel}
|
||||
>
|
||||
<RiCloseLine className="size-4 text-text-tertiary" aria-hidden="true" />
|
||||
</button>
|
||||
{detailContent}
|
||||
</DialogContent>
|
||||
</Dialog>
|
||||
)
|
||||
}
|
||||
|
||||
return (
|
||||
<div
|
||||
className={cn('relative z-10 flex flex-col rounded-xl border-[0.5px] border-components-panel-border bg-components-panel-bg py-3 shadow-xl')}
|
||||
@ -54,11 +95,7 @@ const AgentLogModal: FC<AgentLogModalProps> = ({
|
||||
>
|
||||
<RiCloseLine className="size-4 text-text-tertiary" aria-hidden="true" />
|
||||
</button>
|
||||
<AgentLogDetail
|
||||
conversationID={currentLogItem.conversationId}
|
||||
messageID={currentLogItem.id}
|
||||
log={currentLogItem}
|
||||
/>
|
||||
{detailContent}
|
||||
</div>
|
||||
)
|
||||
}
|
||||
|
||||
@ -6,6 +6,7 @@ let fetching = false
|
||||
let isManager = true
|
||||
let enableBilling = true
|
||||
let workspacePermissionKeys: string[] = ['billing.subscription.manage']
|
||||
let billingUrlEnabled = false
|
||||
|
||||
const refetchMock = vi.fn()
|
||||
const openAsyncWindowMock = vi.fn()
|
||||
@ -19,11 +20,14 @@ type BillingWindowOptions = {
|
||||
type OpenAsyncWindowCall = [BillingUrlCallback, BillingWindowOptions]
|
||||
|
||||
vi.mock('@/service/use-billing', () => ({
|
||||
useBillingUrl: () => ({
|
||||
data: currentBillingUrl,
|
||||
isFetching: fetching,
|
||||
refetch: refetchMock,
|
||||
}),
|
||||
useBillingUrl: (enabled: boolean) => {
|
||||
billingUrlEnabled = enabled
|
||||
return {
|
||||
data: currentBillingUrl,
|
||||
isFetching: fetching,
|
||||
refetch: refetchMock,
|
||||
}
|
||||
},
|
||||
}))
|
||||
|
||||
vi.mock('@/hooks/use-async-window-open', () => ({
|
||||
@ -54,28 +58,32 @@ describe('Billing', () => {
|
||||
fetching = false
|
||||
isManager = true
|
||||
enableBilling = true
|
||||
billingUrlEnabled = false
|
||||
workspacePermissionKeys = ['billing.subscription.manage']
|
||||
refetchMock.mockResolvedValue({ data: 'https://billing' })
|
||||
})
|
||||
|
||||
it('shows the billing action when subscription management permission is granted without manager role', () => {
|
||||
it('hides the billing action when subscription management permission is granted without manager role', () => {
|
||||
isManager = false
|
||||
|
||||
render(<Billing />)
|
||||
|
||||
expect(screen.getByRole('button', { name: /billing\.viewBillingTitle/ })).toBeInTheDocument()
|
||||
expect(screen.queryByRole('button', { name: /billing\.viewBillingTitle/ })).not.toBeInTheDocument()
|
||||
expect(billingUrlEnabled).toBe(false)
|
||||
})
|
||||
|
||||
it('hides the billing action when subscription management permission is missing or billing is disabled', () => {
|
||||
workspacePermissionKeys = []
|
||||
render(<Billing />)
|
||||
expect(screen.queryByRole('button', { name: /billing\.viewBillingTitle/ })).not.toBeInTheDocument()
|
||||
expect(billingUrlEnabled).toBe(false)
|
||||
|
||||
vi.clearAllMocks()
|
||||
workspacePermissionKeys = ['billing.subscription.manage']
|
||||
enableBilling = false
|
||||
render(<Billing />)
|
||||
expect(screen.queryByRole('button', { name: /billing\.viewBillingTitle/ })).not.toBeInTheDocument()
|
||||
expect(billingUrlEnabled).toBe(false)
|
||||
})
|
||||
|
||||
it('opens the billing window with the immediate url when the button is clicked', async () => {
|
||||
|
||||
@ -11,9 +11,9 @@ import PlanComp from '../plan'
|
||||
|
||||
const Billing: FC = () => {
|
||||
const { t } = useTranslation()
|
||||
const { workspacePermissionKeys } = useAppContext()
|
||||
const { isCurrentWorkspaceManager, workspacePermissionKeys } = useAppContext()
|
||||
const { enableBilling } = useProviderContext()
|
||||
const canManageBillingSubscription = hasPermission(workspacePermissionKeys, BillingPermission.SubscriptionManage)
|
||||
const canManageBillingSubscription = isCurrentWorkspaceManager && hasPermission(workspacePermissionKeys, BillingPermission.SubscriptionManage)
|
||||
const { data: billingUrl, isFetching, refetch } = useBillingUrl(enableBilling && canManageBillingSubscription)
|
||||
const openAsyncWindow = useAsyncWindowOpen()
|
||||
|
||||
|
||||
@ -13,6 +13,8 @@ import { Input } from '@langgenius/dify-ui/input'
|
||||
import { Textarea } from '@langgenius/dify-ui/textarea'
|
||||
import { useState } from 'react'
|
||||
import { useTranslation } from 'react-i18next'
|
||||
import { useLocale } from '@/context/i18n'
|
||||
import { getDocLanguage } from '@/i18n-config/language'
|
||||
import PermissionPicker from './permission-picker'
|
||||
|
||||
export type PermissionSetModalMode = 'create' | 'edit' | 'view'
|
||||
@ -42,6 +44,8 @@ const PermissionSetModalBody = ({
|
||||
onSubmit,
|
||||
}: PermissionSetModalBodyProps) => {
|
||||
const { t } = useTranslation()
|
||||
const locale = useLocale()
|
||||
const docLanguage = getDocLanguage(locale)
|
||||
const [name, setName] = useState(initialValues?.name ?? '')
|
||||
const [description, setDescription] = useState(initialValues?.description ?? '')
|
||||
const [permissionKeys, setPermissionKeys] = useState<string[]>(initialValues?.permissionKeys ?? [])
|
||||
@ -122,7 +126,7 @@ const PermissionSetModalBody = ({
|
||||
|
||||
<div className="flex shrink-0 items-center justify-between gap-3 border-t border-divider-subtle px-6 py-4">
|
||||
<a
|
||||
href="https://docs.dify.ai/"
|
||||
href={`https://enterprise-docs.dify.ai/${docLanguage}/3.11.x/use/workspace/permission-reference`}
|
||||
target="_blank"
|
||||
rel="noreferrer"
|
||||
className="inline-flex items-center gap-1 system-xs-medium text-text-accent hover:underline"
|
||||
|
||||
@ -13,6 +13,8 @@ import { Input } from '@langgenius/dify-ui/input'
|
||||
import { Textarea } from '@langgenius/dify-ui/textarea'
|
||||
import { useCallback, useState } from 'react'
|
||||
import { useTranslation } from 'react-i18next'
|
||||
import { useLocale } from '@/context/i18n'
|
||||
import { getDocLanguage } from '@/i18n-config/language'
|
||||
import PermissionField from './permission-field'
|
||||
|
||||
export type RoleModalMode = 'create' | 'view' | 'edit'
|
||||
@ -39,6 +41,8 @@ const RoleModal = ({
|
||||
onSubmit,
|
||||
}: RoleModalProps) => {
|
||||
const { t } = useTranslation()
|
||||
const locale = useLocale()
|
||||
const docLanguage = getDocLanguage(locale)
|
||||
const [name, setName] = useState(role?.name ?? '')
|
||||
const [desc, setDesc] = useState(role?.description ?? '')
|
||||
const [permissionKeys, setPermissionKeys] = useState<string[]>(role?.permission_keys ?? [])
|
||||
@ -116,7 +120,7 @@ const RoleModal = ({
|
||||
</div>
|
||||
<div className="flex shrink-0 items-center justify-between gap-3 border-t border-divider-subtle px-6 py-4">
|
||||
<a
|
||||
href="https://docs.dify.ai/"
|
||||
href={`https://enterprise-docs.dify.ai/${docLanguage}/3.11.x/use/workspace/permission-reference`}
|
||||
target="_blank"
|
||||
rel="noreferrer"
|
||||
className="inline-flex items-center gap-1 system-xs-medium text-text-accent hover:underline"
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "إدارة المقتطفات",
|
||||
"tool.manage": "إدارة الأدوات",
|
||||
"workspace.member.manage": "إدارة الأعضاء",
|
||||
"workspace.role.manage": "إدارة أذونات الأدوار وقواعد الوصول إلى الموارد"
|
||||
"workspace.role.manage": "إدارة الأدوار ومجموعات الأذونات"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "Snippets verwalten",
|
||||
"tool.manage": "Tools verwalten",
|
||||
"workspace.member.manage": "Mitglieder verwalten",
|
||||
"workspace.role.manage": "Rollenberechtigungen und Ressourcenzugriffsregeln verwalten"
|
||||
"workspace.role.manage": "Rollen und Berechtigungssätze verwalten"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "Manage snippets",
|
||||
"tool.manage": "Manage tools",
|
||||
"workspace.member.manage": "Manage members",
|
||||
"workspace.role.manage": "Manage role permissions and resource access rules"
|
||||
"workspace.role.manage": "Manage roles and permission sets"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "Gestionar fragmentos",
|
||||
"tool.manage": "Gestionar herramientas",
|
||||
"workspace.member.manage": "Gestionar miembros",
|
||||
"workspace.role.manage": "Gestionar permisos de roles y reglas de acceso a recursos"
|
||||
"workspace.role.manage": "Gestionar roles y conjuntos de permisos"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "مدیریت قطعهکدها",
|
||||
"tool.manage": "مدیریت ابزارها",
|
||||
"workspace.member.manage": "مدیریت اعضا",
|
||||
"workspace.role.manage": "مدیریت مجوزهای نقش و قوانین دسترسی به منابع"
|
||||
"workspace.role.manage": "مدیریت نقشها و مجموعههای مجوز"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "Gérer les extraits",
|
||||
"tool.manage": "Gérer les outils",
|
||||
"workspace.member.manage": "Gérer les membres",
|
||||
"workspace.role.manage": "Gérer les autorisations de rôles et les règles d'accès aux ressources"
|
||||
"workspace.role.manage": "Gérer les rôles et les ensembles d'autorisations"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "स्निपेट प्रबंधित करें",
|
||||
"tool.manage": "टूल प्रबंधित करें",
|
||||
"workspace.member.manage": "सदस्य प्रबंधित करें",
|
||||
"workspace.role.manage": "भूमिका अनुमतियाँ और संसाधन एक्सेस नियम प्रबंधित करें"
|
||||
"workspace.role.manage": "भूमिकाएँ और अनुमति सेट प्रबंधित करें"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "Kelola snippet",
|
||||
"tool.manage": "Kelola alat",
|
||||
"workspace.member.manage": "Kelola anggota",
|
||||
"workspace.role.manage": "Kelola izin peran dan aturan akses sumber daya"
|
||||
"workspace.role.manage": "Kelola peran dan set izin"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "Gestisci snippet",
|
||||
"tool.manage": "Gestisci strumenti",
|
||||
"workspace.member.manage": "Gestisci i membri",
|
||||
"workspace.role.manage": "Gestisci i permessi dei ruoli e le regole di accesso alle risorse"
|
||||
"workspace.role.manage": "Gestisci ruoli e set di autorizzazioni"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "スニペットを管理",
|
||||
"tool.manage": "ツールを管理",
|
||||
"workspace.member.manage": "メンバーを管理",
|
||||
"workspace.role.manage": "ロール権限とリソースアクセスルールを管理"
|
||||
"workspace.role.manage": "ロールと権限セットを管理"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "스니펫 관리",
|
||||
"tool.manage": "도구 관리",
|
||||
"workspace.member.manage": "멤버 관리",
|
||||
"workspace.role.manage": "역할 권한 및 리소스 접근 규칙 관리"
|
||||
"workspace.role.manage": "역할 및 권한 세트 관리"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "Snippets beheren",
|
||||
"tool.manage": "Tools beheren",
|
||||
"workspace.member.manage": "Leden beheren",
|
||||
"workspace.role.manage": "Rolrechten en regels voor resourcetoegang beheren"
|
||||
"workspace.role.manage": "Rollen en machtigingensets beheren"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "Zarządzaj fragmentami kodu",
|
||||
"tool.manage": "Zarządzaj narzędziami",
|
||||
"workspace.member.manage": "Zarządzaj członkami",
|
||||
"workspace.role.manage": "Zarządzaj uprawnieniami ról i regułami dostępu do zasobów"
|
||||
"workspace.role.manage": "Zarządzaj rolami i zestawami uprawnień"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "Gerenciar snippets",
|
||||
"tool.manage": "Gerenciar ferramentas",
|
||||
"workspace.member.manage": "Gerenciar membros",
|
||||
"workspace.role.manage": "Gerenciar permissões de função e regras de acesso a recursos"
|
||||
"workspace.role.manage": "Gerenciar funções e conjuntos de permissões"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "Gestionează fragmente",
|
||||
"tool.manage": "Gestionează instrumente",
|
||||
"workspace.member.manage": "Gestionează membrii",
|
||||
"workspace.role.manage": "Gestionează permisiunile rolurilor și regulile de acces la resurse"
|
||||
"workspace.role.manage": "Gestionează rolurile și seturile de permisiuni"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "Управление сниппетами",
|
||||
"tool.manage": "Управление инструментами",
|
||||
"workspace.member.manage": "Управление участниками",
|
||||
"workspace.role.manage": "Управление правами ролей и правилами доступа к ресурсам"
|
||||
"workspace.role.manage": "Управление ролями и наборами разрешений"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "Upravljanje izsekov",
|
||||
"tool.manage": "Upravljanje orodij",
|
||||
"workspace.member.manage": "Upravljanje članov",
|
||||
"workspace.role.manage": "Upravljanje dovoljenj vlog in pravil za dostop do virov"
|
||||
"workspace.role.manage": "Upravljanje vlog in naborov dovoljenj"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "จัดการสนิปเปต",
|
||||
"tool.manage": "จัดการเครื่องมือ",
|
||||
"workspace.member.manage": "จัดการสมาชิก",
|
||||
"workspace.role.manage": "จัดการสิทธิ์บทบาทและกฎการเข้าถึงทรัพยากร"
|
||||
"workspace.role.manage": "จัดการบทบาทและชุดสิทธิ์"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "Snippet'leri yönet",
|
||||
"tool.manage": "Araçları yönet",
|
||||
"workspace.member.manage": "Üyeleri yönet",
|
||||
"workspace.role.manage": "Rol izinlerini ve kaynak erişim kurallarını yönet"
|
||||
"workspace.role.manage": "Rolleri ve izin setlerini yönet"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "Керування фрагментами",
|
||||
"tool.manage": "Керування інструментами",
|
||||
"workspace.member.manage": "Керування учасниками",
|
||||
"workspace.role.manage": "Керування дозволами ролей та правилами доступу до ресурсів"
|
||||
"workspace.role.manage": "Керування ролями та наборами дозволів"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "Quản lý đoạn mã",
|
||||
"tool.manage": "Quản lý công cụ",
|
||||
"workspace.member.manage": "Quản lý thành viên",
|
||||
"workspace.role.manage": "Quản lý quyền vai trò và quy tắc truy cập tài nguyên"
|
||||
"workspace.role.manage": "Quản lý vai trò và bộ quyền"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "管理 Snippets",
|
||||
"tool.manage": "管理工具",
|
||||
"workspace.member.manage": "管理成员",
|
||||
"workspace.role.manage": "管理角色权限与访问权限规则"
|
||||
"workspace.role.manage": "管理角色与权限集"
|
||||
}
|
||||
|
||||
@ -50,5 +50,5 @@
|
||||
"snippets.management": "管理 Snippets",
|
||||
"tool.manage": "管理工具",
|
||||
"workspace.member.manage": "管理成員",
|
||||
"workspace.role.manage": "管理角色權限與訪問權限規則"
|
||||
"workspace.role.manage": "管理角色與權限集"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user