Compare commits

..

14 Commits

Author SHA1 Message Date
bedaf0cbcd Merge branch 'main' into zhaohao1004/archive-db-resilience 2026-06-30 14:23:10 +08:00
46163c68bd fix: update documentation links in permission set and role modals (#38188) 2026-06-30 05:41:55 +00:00
54a8ff2c1d chore(i18n): update role management permission keys for multiple languages (#38186) 2026-06-30 03:28:34 +00:00
9521c7fe7d feat(mcp): support dynamic HTTP request headers in MCPClient (#37938) 2026-06-30 03:28:24 +00:00
995ba6b00e fix(api): skip uuidv7() creation when PostgreSQL 18 provides it natively (#36998)
Co-authored-by: Yunlu Wen <yunlu.wen@dify.ai>
2026-06-30 02:34:27 +00:00
49a92f096f perf: make command rbac-migrate-member-roles use less mem and make it… (#38151) 2026-06-29 12:42:10 +00:00
38602640f4 [autofix.ci] apply automated fixes 2026-06-29 12:30:07 +00:00
510408ebbc fix: harden workflow archive DB retries 2026-06-29 20:26:22 +08:00
fa1ac75922 fix: editor can view the logs (#38165) 2026-06-29 10:05:32 +00:00
cb35c6fa98 perf(api): retrieve published workflows via app.workflow_id (#38153) 2026-06-29 09:51:21 +00:00
34f62e7df6 fix: editor should not query billing subscriptions (#38157) 2026-06-29 09:45:54 +00:00
07b5dcbb19 fix(vdb): remove deprecated SQL options for ADB-PG 7.0 compatibility (#38004) 2026-06-29 09:30:01 +00:00
23917c7b3e fix: agent app log detail modal not display well (#38014) 2026-06-29 09:29:35 +00:00
8a6ce28855 fix: multimodal segment attachment indexing (#38080) 2026-06-29 09:18:29 +00:00
52 changed files with 1285 additions and 228 deletions

View File

@ -1,12 +1,52 @@
from __future__ import annotations
from collections.abc import Iterator
from concurrent.futures import ThreadPoolExecutor, as_completed
import click
from sqlalchemy import select
from configs import dify_config
from core.db.session_factory import session_factory
from models import TenantAccountJoin, TenantAccountRole
from services.enterprise.rbac_service import ListOption, RBACService
_LEGACY_ROLE_TO_BUILTIN_TAG = {
TenantAccountRole.OWNER.value: "owner",
TenantAccountRole.ADMIN.value: "admin",
TenantAccountRole.EDITOR.value: "editor",
TenantAccountRole.NORMAL.value: "normal",
TenantAccountRole.DATASET_OPERATOR.value: "dataset_operator",
}
def _resolve_builtin_role_ids(tenant_id: str, operator_account_id: str) -> dict[str, str]:
"""Resolve every legacy workspace role to the current tenant's builtin RBAC role id.
The migration replays the old `TenantAccountJoin.role` values onto the
RBAC member-role binding API. Builtin RBAC roles are tenant-scoped and
identified by runtime ids, so the command must look them up per tenant.
"""
roles = RBACService.Roles.list(
tenant_id=tenant_id,
account_id=operator_account_id,
options=ListOption(page_number=1, results_per_page=100),
).data
role_id_by_tag = {
role.role_tag: role.id
for role in roles
if role.is_builtin and role.category == "global_system_default" and role.role_tag
}
resolved: dict[str, str] = {}
for legacy_role, expected_builtin_tag in _LEGACY_ROLE_TO_BUILTIN_TAG.items():
role_id = role_id_by_tag.get(expected_builtin_tag)
if expected_builtin_tag == "dataset_operator" and not dify_config.DATASET_OPERATOR_ENABLED:
continue
if not role_id:
raise ValueError(f"Builtin RBAC role not found for tenant={tenant_id}, legacy_role={legacy_role}")
resolved[legacy_role] = role_id
return resolved
def _resolve_builtin_role_id(tenant_id: str, operator_account_id: str, legacy_role: str) -> str:
"""Resolve a legacy workspace role to the current tenant's builtin RBAC role id.
@ -15,26 +55,86 @@ def _resolve_builtin_role_id(tenant_id: str, operator_account_id: str, legacy_ro
RBAC member-role binding API. Builtin RBAC roles are tenant-scoped and
identified by runtime ids, so the command must look them up per tenant.
"""
expected_builtin_tag = {
TenantAccountRole.OWNER.value: "owner",
TenantAccountRole.ADMIN.value: "admin",
TenantAccountRole.EDITOR.value: "editor",
TenantAccountRole.NORMAL.value: "normal",
TenantAccountRole.DATASET_OPERATOR.value: "dataset_operator",
}.get(legacy_role)
if not expected_builtin_tag:
if legacy_role not in _LEGACY_ROLE_TO_BUILTIN_TAG:
raise ValueError(f"Unsupported legacy workspace role: {legacy_role}")
roles = RBACService.Roles.list(
return _resolve_builtin_role_ids(tenant_id, operator_account_id)[legacy_role]
def _iter_tenant_member_batches(
tenant_id: str | None,
*,
db_batch_size: int,
api_batch_size: int,
) -> Iterator[tuple[str, str, list[tuple[str, str]]]]:
"""Yield legacy member roles in tenant-scoped API-sized batches.
Rows are projected to primitive values and streamed from the database, so
the command never materializes every TenantAccountJoin ORM object. The
iterator only keeps one tenant's API-sized batches in memory while it
finds that tenant's owner account.
"""
with session_factory.create_session() as session:
stmt = (
select(TenantAccountJoin.tenant_id, TenantAccountJoin.account_id, TenantAccountJoin.role)
.order_by(TenantAccountJoin.tenant_id.asc(), TenantAccountJoin.id.asc())
.execution_options(yield_per=db_batch_size)
)
if tenant_id:
stmt = stmt.where(TenantAccountJoin.tenant_id == tenant_id)
current_tenant_id: str | None = None
owner_account_id: str | None = None
batches: list[list[tuple[str, str]]] = []
batch: list[tuple[str, str]] = []
def flush_current_tenant() -> Iterator[tuple[str, str, list[tuple[str, str]]]]:
if current_tenant_id is None:
return
if batch:
batches.append(batch.copy())
if not owner_account_id:
raise ValueError(f"Workspace owner not found for tenant={current_tenant_id}")
for item in batches:
yield current_tenant_id, owner_account_id, item
for row in session.execute(stmt):
workspace_id = str(row.tenant_id)
if current_tenant_id is not None and workspace_id != current_tenant_id:
yield from flush_current_tenant()
owner_account_id = None
batches = []
batch = []
current_tenant_id = workspace_id
account_id = str(row.account_id)
role = str(row.role)
if role == TenantAccountRole.OWNER.value:
owner_account_id = account_id
batch.append((account_id, role))
if len(batch) >= api_batch_size:
batches.append(batch)
batch = []
yield from flush_current_tenant()
def _member_already_has_role(current_roles_by_account_id: dict[str, set[str]], account_id: str, role_id: str) -> bool:
return current_roles_by_account_id.get(account_id) == {role_id}
def _replace_member_role(
tenant_id: str,
operator_account_id: str,
member_account_id: str,
role_id: str,
) -> str:
RBACService.MemberRoles.replace(
tenant_id=tenant_id,
account_id=operator_account_id,
options=ListOption(page_number=1, results_per_page=100),
).data
for role in roles:
if role.is_builtin and role.category == "global_system_default" and role.role_tag == expected_builtin_tag:
return str(role.id)
raise ValueError(f"Builtin RBAC role not found for tenant={tenant_id}, legacy_role={legacy_role}")
member_account_id=member_account_id,
role_ids=[role_id],
)
return member_account_id
@click.command(
@ -42,7 +142,16 @@ def _resolve_builtin_role_id(tenant_id: str, operator_account_id: str, legacy_ro
)
@click.option("--tenant-id", help="Only migrate a single workspace.")
@click.option("--dry-run", is_flag=True, default=False, help="Preview the migration without writing RBAC bindings.")
def migrate_member_roles_to_rbac(tenant_id: str | None, dry_run: bool) -> None:
@click.option("--db-batch-size", default=5000, show_default=True, help="Rows fetched per database batch.")
@click.option("--api-batch-size", default=200, show_default=True, help="Members checked per RBAC batch_get call.")
@click.option("--workers", default=1, show_default=True, help="Concurrent member role replace calls per tenant batch.")
def migrate_member_roles_to_rbac(
tenant_id: str | None,
dry_run: bool,
db_batch_size: int,
api_batch_size: int,
workers: int,
) -> None:
"""Backfill RBAC member-role bindings from legacy `TenantAccountJoin.role` data.
This is an offline migration command for workspaces that already have
@ -50,63 +159,102 @@ def migrate_member_roles_to_rbac(tenant_id: str | None, dry_run: bool) -> None:
member-role binding store.
"""
click.echo(click.style("Starting RBAC member-role migration.", fg="green"))
if workers < 1:
raise click.BadParameter("workers must be >= 1", param_hint="--workers")
with session_factory.create_session() as session:
stmt = select(TenantAccountJoin).order_by(TenantAccountJoin.tenant_id.asc(), TenantAccountJoin.id.asc())
if tenant_id:
stmt = stmt.where(TenantAccountJoin.tenant_id == tenant_id)
tenant_count = 0
scanned_count = 0
skipped_count = 0
migrated_count = 0
current_tenant_id: str | None = None
role_ids_by_legacy_role: dict[str, str] = {}
joins = list(session.scalars(stmt).all())
for workspace_id, owner_account_id, batch in _iter_tenant_member_batches(
tenant_id,
db_batch_size=db_batch_size,
api_batch_size=api_batch_size,
):
scanned_count += len(batch)
if workspace_id != current_tenant_id:
tenant_count += 1
current_tenant_id = workspace_id
role_ids_by_legacy_role = _resolve_builtin_role_ids(workspace_id, owner_account_id)
click.echo(f"tenant={workspace_id}")
if not joins:
current_roles_by_account_id: dict[str, set[str]] = {}
if not dry_run:
current_roles = RBACService.MemberRoles.batch_get(
tenant_id=workspace_id,
account_id=owner_account_id,
member_account_ids=[account_id for account_id, _ in batch],
)
current_roles_by_account_id = {
item.account_id: {str(role.id) for role in item.roles} for item in current_roles
}
replace_jobs: list[tuple[str, str]] = []
for member_account_id, legacy_role in batch:
resolved_role_id = role_ids_by_legacy_role.get(legacy_role)
if not resolved_role_id:
raise ValueError(f"Unsupported legacy workspace role: {legacy_role}")
if dry_run:
click.echo(
f"tenant={workspace_id} member={member_account_id} "
f"legacy_role={legacy_role} -> rbac_role_id={resolved_role_id}"
)
continue
if _member_already_has_role(current_roles_by_account_id, member_account_id, resolved_role_id):
skipped_count += 1
continue
replace_jobs.append((member_account_id, resolved_role_id))
if replace_jobs:
if workers == 1:
for member_account_id, resolved_role_id in replace_jobs:
_replace_member_role(workspace_id, owner_account_id, member_account_id, resolved_role_id)
migrated_count += 1
else:
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = [
executor.submit(
_replace_member_role,
workspace_id,
owner_account_id,
member_account_id,
resolved_role_id,
)
for member_account_id, resolved_role_id in replace_jobs
]
for future in as_completed(futures):
future.result()
migrated_count += 1
if scanned_count % 10000 == 0:
click.echo(
f"progress scanned={scanned_count} migrated={migrated_count} skipped={skipped_count}",
err=True,
)
if scanned_count == 0:
click.echo(click.style("No workspace members found for migration.", fg="yellow"))
return
owner_account_by_tenant: dict[str, str] = {}
resolved_role_ids: dict[tuple[str, str], str] = {}
migrated_count = 0
for join in joins:
workspace_id = str(join.tenant_id)
member_account_id = str(join.account_id)
legacy_role = str(join.role)
if workspace_id not in owner_account_by_tenant:
owner_join = next(
(
item
for item in joins
if str(item.tenant_id) == workspace_id and str(item.role) == TenantAccountRole.OWNER.value
),
None,
)
if not owner_join:
raise ValueError(f"Workspace owner not found for tenant={workspace_id}")
owner_account_by_tenant[workspace_id] = str(owner_join.account_id)
operator_account_id = owner_account_by_tenant[workspace_id]
cache_key = (workspace_id, legacy_role)
if cache_key not in resolved_role_ids:
resolved_role_ids[cache_key] = _resolve_builtin_role_id(workspace_id, operator_account_id, legacy_role)
resolved_role_id = resolved_role_ids[cache_key]
click.echo(
f"tenant={workspace_id} member={member_account_id} "
f"legacy_role={legacy_role} -> rbac_role_id={resolved_role_id}"
)
if dry_run:
continue
RBACService.MemberRoles.replace(
tenant_id=workspace_id,
account_id=operator_account_id,
member_account_id=member_account_id,
role_ids=[resolved_role_id],
)
migrated_count += 1
if dry_run:
click.echo(click.style("Dry run completed. No RBAC bindings were written.", fg="yellow"))
click.echo(
click.style(
f"Dry run completed. Scanned {scanned_count} members across {tenant_count} tenants. "
"No RBAC bindings were written.",
fg="yellow",
)
)
else:
click.echo(click.style(f"RBAC member-role migration completed. Migrated {migrated_count} members.", fg="green"))
click.echo(
click.style(
f"RBAC member-role migration completed. Scanned {scanned_count} members across {tenant_count} tenants, "
f"migrated {migrated_count}, skipped {skipped_count} already up-to-date.",
fg="green",
)
)

View File

@ -1,10 +1,12 @@
import datetime
import logging
import time
from collections.abc import Callable
from typing import TypedDict
import click
import sqlalchemy as sa
from sqlalchemy.orm import Session, sessionmaker
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
@ -12,6 +14,7 @@ from services.clear_free_plan_tenant_expired_logs import ClearFreePlanTenantExpi
from services.retention.conversation.messages_clean_policy import create_message_clean_policy
from services.retention.conversation.messages_clean_service import MessagesCleanService
from services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup
from services.retention.workflow_run.db_retry import run_with_db_retry
from services.retention.workflow_run.tenant_prefix import tenant_prefix_condition
from tasks.remove_app_and_related_data_task import delete_draft_variables_batch
@ -35,6 +38,12 @@ class WorkflowRunArchiveTenantPlan(TypedDict):
unpaid_tenant_ids: list[str]
class WorkflowRunArchivePrefixStats(TypedDict):
tenant_ids: list[str]
workflow_runs: int
workflow_node_executions: int
def _normalize_utc_datetime(value: datetime.datetime) -> datetime.datetime:
if value.tzinfo is None:
return value.replace(tzinfo=datetime.UTC)
@ -57,6 +66,7 @@ def _parse_tenant_prefixes(prefixes: str | None) -> list[str]:
def _get_archive_candidate_tenant_ids_by_prefix(
session: Session,
prefix: str,
*,
start_from: datetime.datetime | None,
@ -75,7 +85,7 @@ def _get_archive_candidate_tenant_ids_by_prefix(
if start_from is not None:
conditions.append(WorkflowRun.created_at >= start_from)
tenant_ids = db.session.scalars(
tenant_ids = session.scalars(
sa.select(WorkflowRun.tenant_id).where(*conditions).distinct().order_by(WorkflowRun.tenant_id)
).all()
return list(tenant_ids)
@ -102,8 +112,80 @@ def _filter_paid_workflow_archive_tenant_ids(tenant_ids: list[str]) -> tuple[lis
return paid_tenant_ids, unpaid_tenant_ids
def _run_archive_command_db_retry[T](operation_name: str, operation: Callable[[], T]) -> T:
return run_with_db_retry(operation_name, operation, logger=logger)
def _get_archive_candidate_tenant_ids_with_retry(
session_maker: sessionmaker[Session],
prefix: str,
*,
start_from: datetime.datetime | None,
end_before: datetime.datetime,
) -> list[str]:
def fetch_tenant_ids() -> list[str]:
with session_maker() as session:
return _get_archive_candidate_tenant_ids_by_prefix(
session,
prefix,
start_from=start_from,
end_before=end_before,
)
return _run_archive_command_db_retry(f"workflow archive tenant resolve for prefix {prefix}", fetch_tenant_ids)
def _get_archive_plan_prefix_stats(
session_maker: sessionmaker[Session],
prefix: str,
*,
start_from: datetime.datetime | None,
end_before: datetime.datetime,
) -> WorkflowRunArchivePrefixStats:
from graphon.enums import WorkflowExecutionStatus
from models.workflow import WorkflowNodeExecutionModel, WorkflowRun
from services.retention.workflow_run.archive_paid_plan_workflow_run import WorkflowRunArchiver
def fetch_prefix_stats() -> WorkflowRunArchivePrefixStats:
with session_maker() as session:
tenant_ids = _get_archive_candidate_tenant_ids_by_prefix(
session,
prefix,
start_from=start_from,
end_before=end_before,
)
run_conditions = [
WorkflowRun.created_at < end_before,
WorkflowRun.status.in_(WorkflowExecutionStatus.ended_values()),
WorkflowRun.type.in_(WorkflowRunArchiver.ARCHIVED_TYPE),
tenant_prefix_condition(WorkflowRun.tenant_id, prefix),
]
if start_from is not None:
run_conditions.append(WorkflowRun.created_at >= start_from)
workflow_runs = (
session.scalar(sa.select(sa.func.count()).select_from(WorkflowRun).where(*run_conditions)) or 0
)
candidate_runs = sa.select(WorkflowRun.id).where(*run_conditions).subquery()
workflow_node_executions = (
session.scalar(
sa.select(sa.func.count())
.select_from(WorkflowNodeExecutionModel)
.join(candidate_runs, WorkflowNodeExecutionModel.workflow_run_id == candidate_runs.c.id)
)
or 0
)
return WorkflowRunArchivePrefixStats(
tenant_ids=tenant_ids,
workflow_runs=workflow_runs,
workflow_node_executions=workflow_node_executions,
)
return _run_archive_command_db_retry(f"workflow archive plan for prefix {prefix}", fetch_prefix_stats)
def _resolve_archive_tenant_ids_from_plan(
*,
session_maker: sessionmaker[Session],
tenant_ids: str | None,
tenant_prefixes: list[str],
start_from: datetime.datetime | None,
@ -122,7 +204,8 @@ def _resolve_archive_tenant_ids_from_plan(
requested_tenant_ids = []
for prefix in tenant_prefixes:
requested_tenant_ids.extend(
_get_archive_candidate_tenant_ids_by_prefix(
_get_archive_candidate_tenant_ids_with_retry(
session_maker,
prefix,
start_from=start_from,
end_before=end_before,
@ -143,6 +226,24 @@ def _resolve_archive_tenant_ids_from_plan(
)
def _safe_remove_scoped_session(context: str) -> None:
try:
db.session.remove()
except Exception:
logger.warning("Ignoring DB scoped-session cleanup error after %s", context, exc_info=True)
registry = getattr(db.session, "registry", None)
clear_registry = getattr(registry, "clear", None)
if callable(clear_registry):
try:
clear_registry()
except Exception:
logger.warning("Ignoring DB scoped-session registry cleanup error after %s", context, exc_info=True)
try:
db.engine.dispose()
except Exception:
logger.warning("Ignoring DB engine dispose error after %s", context, exc_info=True)
def _resolve_archive_time_range(
*,
before_days: int,
@ -349,10 +450,6 @@ def archive_workflow_runs_plan(
supported workflow types, and the requested created_at window. V2 bundle archive
does not maintain per-run archive logs, so this plan reports source-table volume.
"""
from graphon.enums import WorkflowExecutionStatus
from models.workflow import WorkflowNodeExecutionModel, WorkflowRun
from services.retention.workflow_run.archive_paid_plan_workflow_run import WorkflowRunArchiver
before_days, start_from, end_before = _resolve_archive_time_range(
before_days=before_days,
from_days_ago=from_days_ago,
@ -364,37 +461,25 @@ def archive_workflow_runs_plan(
if include_archived:
click.echo(click.style("--include-archived is a no-op for V2 bundle archive plans.", fg="yellow"))
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
rows: list[WorkflowRunArchivePlanRow] = []
for prefix in _HEX_PREFIXES:
tenant_ids = _get_archive_candidate_tenant_ids_by_prefix(
prefix,
start_from=start_from,
end_before=plan_end_before,
)
try:
prefix_stats = _get_archive_plan_prefix_stats(
session_maker,
prefix,
start_from=start_from,
end_before=plan_end_before,
)
except Exception as exc:
logger.exception("Failed to build workflow archive plan for prefix %s", prefix)
raise click.ClickException(f"Failed to build workflow archive plan for prefix {prefix}.") from exc
tenant_ids = prefix_stats["tenant_ids"]
workflow_runs = prefix_stats["workflow_runs"]
workflow_node_executions = prefix_stats["workflow_node_executions"]
total_tenants = len(tenant_ids)
paid_tenant_ids, unpaid_tenant_ids = _filter_paid_workflow_archive_tenant_ids(tenant_ids)
run_conditions = [
WorkflowRun.created_at < plan_end_before,
WorkflowRun.status.in_(WorkflowExecutionStatus.ended_values()),
WorkflowRun.type.in_(WorkflowRunArchiver.ARCHIVED_TYPE),
tenant_prefix_condition(WorkflowRun.tenant_id, prefix),
]
if start_from is not None:
run_conditions.append(WorkflowRun.created_at >= start_from)
workflow_runs = (
db.session.scalar(sa.select(sa.func.count()).select_from(WorkflowRun).where(*run_conditions)) or 0
)
candidate_runs = sa.select(WorkflowRun.id).where(*run_conditions).subquery()
workflow_node_executions = (
db.session.scalar(
sa.select(sa.func.count())
.select_from(WorkflowNodeExecutionModel)
.join(candidate_runs, WorkflowNodeExecutionModel.workflow_run_id == candidate_runs.c.id)
)
or 0
)
rows.append(
WorkflowRunArchivePlanRow(
tenant_prefix=prefix,
@ -574,17 +659,18 @@ def archive_workflow_runs(
)
)
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
try:
tenant_plan = _resolve_archive_tenant_ids_from_plan(
session_maker=session_maker,
tenant_ids=tenant_ids,
tenant_prefixes=parsed_tenant_prefixes,
start_from=start_from,
end_before=plan_end_before,
)
except Exception:
except Exception as exc:
logger.exception("Failed to resolve workflow archive tenant plan")
click.echo(click.style("Failed to resolve workflow archive tenant plan.", fg="red"))
return
raise click.ClickException("Failed to resolve workflow archive tenant plan.") from exc
planned_tenant_ids = tenant_plan["archive_tenant_ids"]
planned_paid_tenant_ids = tenant_plan["paid_tenant_ids"] if planned_tenant_ids is not None else None
@ -616,7 +702,10 @@ def archive_workflow_runs(
dry_run=dry_run,
delete_after_archive=delete_after_archive,
)
summary = archiver.run()
try:
summary = archiver.run()
finally:
_safe_remove_scoped_session("archive workflow run command")
click.echo(
click.style(
f"Summary: processed={summary.total_runs_processed}, archived={summary.runs_archived}, "

View File

@ -1,10 +1,13 @@
import logging
import re
from collections.abc import Callable
from contextlib import AbstractContextManager, ExitStack
from types import TracebackType
from typing import Any
from urllib.parse import urlparse
from flask import has_request_context, request
from core.mcp.client.sse_client import sse_client
from core.mcp.client.streamable_client import streamablehttp_client
from core.mcp.error import MCPConnectionError
@ -23,10 +26,22 @@ class MCPClient:
sse_read_timeout: float | None = None,
):
self.server_url = server_url
self.headers = headers or {}
self.headers = headers.copy() if headers else {}
self.timeout = timeout
self.sse_read_timeout = sse_read_timeout
# Substitute placeholders with incoming request headers if in a request context
if has_request_context() and self.headers:
pattern = re.compile(r"\{\{\s*request\.headers?\.(.+?)\s*\}\}", re.IGNORECASE)
for key, value in list(self.headers.items()):
if isinstance(value, str):
def replace_func(match):
header_name = match.group(1)
return request.headers.get(header_name, "")
self.headers[key] = pattern.sub(replace_func, value)
# Initialize session and client objects
self._session: ClientSession | None = None
self._exit_stack = ExitStack()

View File

@ -45,34 +45,52 @@ def upgrade():
# PostgreSQL 18's `uuidv7` function. This capability is rarely needed in practice, as IDs can be
# generated and controlled within the application layer.
conn = op.get_bind()
if _is_pg(conn):
# PostgreSQL: Create uuidv7 functions
op.execute(sa.text(r"""
/* Main function to generate a uuidv7 value with millisecond precision */
CREATE FUNCTION uuidv7() RETURNS uuid
AS
$$
-- Replace the first 48 bits of a uuidv4 with the current
-- number of milliseconds since 1970-01-01 UTC
-- and set the "ver" field to 7 by setting additional bits
SELECT encode(
set_bit(
set_bit(
overlay(uuid_send(gen_random_uuid()) placing
substring(int8send((extract(epoch from clock_timestamp()) * 1000)::bigint) from
3)
from 1 for 6),
52, 1),
53, 1), 'hex')::uuid;
$$ LANGUAGE SQL VOLATILE PARALLEL SAFE;
COMMENT ON FUNCTION uuidv7 IS
'Generate a uuid-v7 value with a 48-bit timestamp (millisecond precision) and 74 bits of randomness';
if _is_pg(conn):
# PostgreSQL: Create uuidv7 functions.
# PostgreSQL 18 ships a native pg_catalog.uuidv7(), so only create our own
# implementation when the server does not already provide one. Otherwise the
# CREATE FUNCTION below and the unqualified COMMENT statement collide with the
# built-in and the migration fails.
#
# The existence check is done server-side via a DO block rather than
# conn.execute().scalar() because the latter returns None in offline
# migration mode (no real database connection), causing an AttributeError.
op.execute(sa.text(r"""
DO $do$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_proc p
JOIN pg_namespace n ON p.pronamespace = n.oid
WHERE p.proname = 'uuidv7' AND n.nspname = 'pg_catalog'
) THEN
/* Main function to generate a uuidv7 value with millisecond precision */
CREATE FUNCTION public.uuidv7() RETURNS uuid
AS
$func$
-- Replace the first 48 bits of a uuidv4 with the current
-- number of milliseconds since 1970-01-01 UTC
-- and set the "ver" field to 7 by setting additional bits
SELECT encode(
set_bit(
set_bit(
overlay(uuid_send(gen_random_uuid()) placing
substring(int8send((extract(epoch from clock_timestamp()) * 1000)::bigint) from
3)
from 1 for 6),
52, 1),
53, 1), 'hex')::uuid;
$func$ LANGUAGE SQL VOLATILE PARALLEL SAFE;
COMMENT ON FUNCTION public.uuidv7 IS
'Generate a uuid-v7 value with a 48-bit timestamp (millisecond precision) and 74 bits of randomness';
END IF;
END
$do$;
"""))
op.execute(sa.text(r"""
CREATE FUNCTION uuidv7_boundary(timestamptz) RETURNS uuid
CREATE FUNCTION public.uuidv7_boundary(timestamptz) RETURNS uuid
AS
$$
/* uuid fields: version=0b0111, variant=0b10 */
@ -83,7 +101,7 @@ SELECT encode(
'hex')::uuid;
$$ LANGUAGE SQL STABLE STRICT PARALLEL SAFE;
COMMENT ON FUNCTION uuidv7_boundary(timestamptz) IS
COMMENT ON FUNCTION public.uuidv7_boundary(timestamptz) IS
'Generate a non-random uuidv7 with the given timestamp (first 48 bits) and all random bits to 0. As the smallest possible uuidv7 for that timestamp, it may be used as a boundary for partitions.';
"""
))
@ -95,7 +113,10 @@ def downgrade():
conn = op.get_bind()
if _is_pg(conn):
op.execute(sa.text("DROP FUNCTION uuidv7"))
op.execute(sa.text("DROP FUNCTION uuidv7_boundary"))
# IF EXISTS keeps the downgrade a no-op on PostgreSQL 18, where the native
# pg_catalog.uuidv7() was kept and no public.uuidv7() was created. Scoping the
# drop to the public schema avoids touching the built-in.
op.execute(sa.text("DROP FUNCTION IF EXISTS public.uuidv7()"))
op.execute(sa.text("DROP FUNCTION IF EXISTS public.uuidv7_boundary(timestamptz)"))
else:
pass

View File

@ -144,7 +144,7 @@ class AnalyticdbVectorBySql:
f"id text PRIMARY KEY,"
f"vector real[], ref_doc_id text, page_content text, metadata_ jsonb, "
f"to_tsvector TSVECTOR"
f") WITH (fillfactor=70) DISTRIBUTED BY (id);"
f") DISTRIBUTED BY (id);"
)
if embedding_dimension is not None:
index_name = f"{self._collection_name}_embedding_idx"
@ -153,7 +153,7 @@ class AnalyticdbVectorBySql:
cur.execute(
f"CREATE INDEX {index_name} ON {self.table_name} USING ann(vector) "
f"WITH(dim='{embedding_dimension}', distancemeasure='{self.config.metrics}', "
f"pq_enable=0, external_storage=0)"
f"pq_enable=0)"
)
cur.execute(f"CREATE INDEX ON {self.table_name} USING gin(to_tsvector)")
except Exception as e:

View File

@ -435,6 +435,7 @@ _LEGACY_APP_EDITOR_KEYS: list[str] = [
"app.acl.delete",
"app.acl.release_and_version",
"app.acl.monitor",
"app.acl.log_and_annotation",
"app.acl.access_config",
]

View File

@ -29,12 +29,12 @@ import hashlib
import json
import logging
import time
from collections.abc import Sequence
from collections.abc import Callable, Sequence
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from enum import Enum
from threading import Lock
from typing import Any, NotRequired, TypedDict, cast
from typing import Any, NotRequired, TypedDict, TypeVar, cast
import click
import pyarrow as pa
@ -70,8 +70,10 @@ from services.retention.workflow_run.constants import (
ARCHIVE_BUNDLE_MANIFEST_NAME,
ARCHIVE_BUNDLE_SCHEMA_VERSION,
)
from services.retention.workflow_run.db_retry import is_retryable_db_disconnect, run_with_db_retry
logger = logging.getLogger(__name__)
T = TypeVar("T")
class TableStatsManifestEntry(TypedDict):
@ -208,6 +210,8 @@ class WorkflowRunArchiver:
"workflow_pause_reasons",
"workflow_trigger_logs",
]
DB_RETRY_ATTEMPTS = 3
DB_RETRY_DELAYS_SECONDS = (1.0, 2.0)
start_from: datetime.datetime | None
end_before: datetime.datetime
@ -455,16 +459,40 @@ class WorkflowRunArchiver:
"""Fetch a batch of workflow runs to archive."""
repo = self._get_workflow_run_repo()
tenant_ids = list(tenant_scope) if tenant_scope is not None else self.tenant_ids or None
return repo.get_runs_batch_by_time_range(
start_from=self.start_from,
end_before=self.end_before,
last_seen=last_seen,
batch_size=self.batch_size,
run_types=self.ARCHIVED_TYPE,
tenant_ids=tenant_ids,
tenant_prefixes=None if tenant_ids else self.tenant_prefixes or None,
run_shard_index=self.run_shard_index,
run_shard_total=self.run_shard_total,
return self._run_with_db_retry(
"workflow run batch fetch",
lambda: repo.get_runs_batch_by_time_range(
start_from=self.start_from,
end_before=self.end_before,
last_seen=last_seen,
batch_size=self.batch_size,
run_types=self.ARCHIVED_TYPE,
tenant_ids=tenant_ids,
tenant_prefixes=None if tenant_ids else self.tenant_prefixes or None,
run_shard_index=self.run_shard_index,
run_shard_total=self.run_shard_total,
),
)
@staticmethod
def _is_retryable_db_disconnect(exc: BaseException) -> bool:
return is_retryable_db_disconnect(exc)
@staticmethod
def _safe_rollback(session: Session, bundle_id: str) -> None:
try:
session.rollback()
except Exception:
logger.warning("Failed to rollback archive session for bundle %s", bundle_id, exc_info=True)
def _run_with_db_retry(self, operation_name: str, operation: Callable[[], T]) -> T:
return run_with_db_retry(
operation_name,
operation,
logger=logger,
attempts=self.DB_RETRY_ATTEMPTS,
delays_seconds=self.DB_RETRY_DELAYS_SECONDS,
)
def _tenant_scan_scopes(self) -> list[list[str] | None]:
@ -532,16 +560,14 @@ class WorkflowRunArchiver:
if self.workers == 1 or len(bundle_groups) == 1:
results: list[ArchiveResult] = []
for bundle_runs in bundle_groups:
with session_maker() as session:
results.append(self._archive_bundle(session, storage, bundle_runs))
results.append(self._archive_bundle_with_retry(session_maker, storage, bundle_runs))
return results
results = []
max_workers = min(self.workers, len(bundle_groups))
def archive_in_worker(bundle_runs: Sequence[WorkflowRun]) -> ArchiveResult:
with session_maker() as session:
return self._archive_bundle(session, storage, bundle_runs)
return self._archive_bundle_with_retry(session_maker, storage, bundle_runs)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(archive_in_worker, bundle_runs) for bundle_runs in bundle_groups]
@ -549,6 +575,39 @@ class WorkflowRunArchiver:
results.append(future.result())
return results
def _archive_bundle_with_retry(
self,
session_maker: sessionmaker[Session],
storage: ArchiveStorage | None,
runs: Sequence[WorkflowRun],
) -> ArchiveResult:
identity = self._build_bundle_identity(runs)
try:
return self._run_with_db_retry(
f"archive workflow run bundle {identity.bundle_id}",
lambda: self._archive_bundle_once(session_maker, storage, runs),
)
except Exception as exc:
logger.exception("Failed to archive workflow run bundle %s after retries", identity.bundle_id)
return ArchiveResult(
bundle_id=identity.bundle_id,
tenant_id=identity.tenant_id,
object_prefix=identity.object_prefix,
run_count=len(runs),
success=False,
error=str(exc),
)
def _archive_bundle_once(
self,
session_maker: sessionmaker[Session],
storage: ArchiveStorage | None,
runs: Sequence[WorkflowRun],
) -> ArchiveResult:
with session_maker() as session:
return self._archive_bundle(session, storage, runs)
def _archive_bundle(
self,
session: Session,
@ -645,9 +704,12 @@ class WorkflowRunArchiver:
result.success = True
except Exception as e:
if self._is_retryable_db_disconnect(e):
self._safe_rollback(session, identity.bundle_id)
raise
logger.exception("Failed to archive workflow run bundle %s", identity.bundle_id)
result.error = str(e)
session.rollback()
self._safe_rollback(session, identity.bundle_id)
result.elapsed_time = time.time() - start_time
return result
@ -794,6 +856,9 @@ class WorkflowRunArchiver:
end_before = self.end_before
if end_before is None:
raise ValueError("archive window end must be set")
formatted_end_before = self._format_window_datetime(end_before)
if formatted_end_before is None:
raise ValueError("archive window end must be set")
return ArchiveManifestDict(
schema_version=ARCHIVE_BUNDLE_SCHEMA_VERSION,
archive_format=ARCHIVE_BUNDLE_FORMAT,
@ -813,7 +878,7 @@ class WorkflowRunArchiver:
archived_at=datetime.datetime.now(datetime.UTC).isoformat(),
campaign_id=self.campaign_id,
archive_window_start=self._format_window_datetime(self.start_from),
archive_window_end=end_before.isoformat(),
archive_window_end=formatted_end_before,
run_shard=identity.shard,
tables=tables,
run_ids=[run.id for run in sorted_runs],

View File

@ -0,0 +1,66 @@
import logging
import time
from collections.abc import Callable
from sqlalchemy.exc import DBAPIError
from sqlalchemy.exc import OperationalError as SQLAlchemyOperationalError
DEFAULT_DB_RETRY_ATTEMPTS = 3
DEFAULT_DB_RETRY_DELAYS_SECONDS = (1.0, 2.0)
_DB_DISCONNECT_PATTERNS = (
"server closed the connection unexpectedly",
"connection already closed",
"closed the connection",
"connection not open",
"terminating connection",
"connection reset",
"broken pipe",
"connection invalidated",
)
def is_retryable_db_disconnect(exc: BaseException) -> bool:
if isinstance(exc, DBAPIError) and exc.connection_invalidated:
return True
if not _is_db_operational_error(exc):
return False
original_exception = getattr(exc, "orig", None)
message = f"{exc} {original_exception or ''}".lower()
return any(pattern in message for pattern in _DB_DISCONNECT_PATTERNS)
def run_with_db_retry[T](
operation_name: str,
operation: Callable[[], T],
*,
logger: logging.Logger,
attempts: int = DEFAULT_DB_RETRY_ATTEMPTS,
delays_seconds: tuple[float, ...] = DEFAULT_DB_RETRY_DELAYS_SECONDS,
) -> T:
for attempt in range(1, attempts + 1):
try:
return operation()
except Exception as exc:
if not is_retryable_db_disconnect(exc) or attempt == attempts:
raise
delay = delays_seconds[min(attempt - 1, len(delays_seconds) - 1)]
logger.warning(
"Retrying %s after retryable DB disconnect (attempt %s/%s, sleep %.1fs)",
operation_name,
attempt,
attempts,
delay,
exc_info=True,
)
time.sleep(delay)
raise RuntimeError(f"{operation_name} did not complete")
def _is_db_operational_error(exc: BaseException) -> bool:
if isinstance(exc, SQLAlchemyOperationalError):
return True
return exc.__class__.__name__ == "OperationalError" and exc.__class__.__module__.startswith(("psycopg", "psycopg2"))

View File

@ -40,6 +40,7 @@ from services.errors.app import QuotaExceededError
from services.quota_service import QuotaService
from services.trigger.app_trigger_service import AppTriggerService
from services.workflow.entities import WebhookTriggerData
from services.workflow_service import WorkflowService
try:
import magic
@ -114,6 +115,7 @@ class WebhookService:
workflow = session.scalar(
select(Workflow)
.where(
Workflow.tenant_id == webhook_trigger.tenant_id,
Workflow.app_id == webhook_trigger.app_id,
Workflow.version == Workflow.VERSION_DRAFT,
)
@ -125,6 +127,7 @@ class WebhookService:
app_trigger = session.scalar(
select(AppTrigger)
.where(
AppTrigger.tenant_id == webhook_trigger.tenant_id,
AppTrigger.app_id == webhook_trigger.app_id,
AppTrigger.node_id == webhook_trigger.node_id,
AppTrigger.trigger_type == AppTriggerType.TRIGGER_WEBHOOK,
@ -145,16 +148,18 @@ class WebhookService:
if app_trigger.status != AppTriggerStatus.ENABLED:
raise ValueError(f"Webhook trigger is disabled for webhook {webhook_id}")
# Get workflow
workflow = session.scalar(
select(Workflow)
app = session.scalar(
select(App)
.where(
Workflow.app_id == webhook_trigger.app_id,
Workflow.version != Workflow.VERSION_DRAFT,
App.tenant_id == webhook_trigger.tenant_id,
App.id == webhook_trigger.app_id,
)
.order_by(Workflow.created_at.desc())
.limit(1)
)
if not app:
raise ValueError(f"App not found for webhook {webhook_id}")
workflow = WorkflowService().get_published_workflow(app, session=session)
if not workflow:
raise ValueError(f"Workflow not found for app {webhook_trigger.app_id}")

View File

@ -333,7 +333,7 @@ class VectorService:
# Add documents to vector store if any
if documents and dataset.is_multimodal:
vector.add_texts(documents, duplicate_check=True)
vector.create_multimodal(documents)
# Single commit for all operations
db.session.commit()

View File

@ -12,7 +12,7 @@ from datetime import UTC, datetime
from typing import Any
from celery import shared_task
from sqlalchemy import func, select
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.db.session_factory import session_factory
@ -35,7 +35,7 @@ from models.enums import (
WorkflowRunTriggeredFrom,
WorkflowTriggerStatus,
)
from models.model import EndUser
from models.model import App, EndUser
from models.provider_ids import TriggerProviderID
from models.trigger import TriggerSubscription, WorkflowPluginTrigger, WorkflowTriggerLog
from models.workflow import Workflow, WorkflowAppLog, WorkflowAppLogCreatedFrom, WorkflowRun
@ -99,23 +99,25 @@ def dispatch_trigger_debug_event(
return 0
def _get_latest_workflows_by_app_ids(
def _get_published_workflows_by_app_ids(
session: Session, subscribers: Sequence[WorkflowPluginTrigger]
) -> Mapping[str, Workflow]:
"""Get the latest workflows by app_ids"""
workflow_query = (
select(Workflow.app_id, func.max(Workflow.created_at).label("max_created_at"))
.where(
Workflow.app_id.in_({t.app_id for t in subscribers}),
Workflow.version != Workflow.VERSION_DRAFT,
)
.group_by(Workflow.app_id)
.subquery()
)
"""Get current published workflows through apps.workflow_id."""
app_ids = {trigger.app_id for trigger in subscribers}
tenant_ids = {trigger.tenant_id for trigger in subscribers}
if not app_ids or not tenant_ids:
return {}
workflows = session.scalars(
select(Workflow).join(
workflow_query,
(Workflow.app_id == workflow_query.c.app_id) & (Workflow.created_at == workflow_query.c.max_created_at),
select(Workflow)
.join(App, App.workflow_id == Workflow.id)
.where(
App.id.in_(app_ids),
App.tenant_id.in_(tenant_ids),
App.workflow_id.isnot(None),
Workflow.app_id == App.id,
Workflow.tenant_id == App.tenant_id,
Workflow.version != Workflow.VERSION_DRAFT,
)
).all()
return {w.app_id: w for w in workflows}
@ -262,7 +264,7 @@ def dispatch_triggered_workflow(
# Ensure expire_on_commit is set to False to remain workflows available
with session_factory.create_session() as session:
workflows: Mapping[str, Workflow] = _get_latest_workflows_by_app_ids(session, subscribers)
workflows: Mapping[str, Workflow] = _get_published_workflows_by_app_ids(session, subscribers)
end_users: Mapping[str, EndUser] = EndUserService.create_end_user_batch(
type=EndUserType.TRIGGER,

View File

@ -6,8 +6,10 @@ from unittest.mock import ANY, MagicMock, patch
import pyarrow as pa
import pyarrow.parquet as pq
import pytest
from sqlalchemy.exc import OperationalError
from services.retention.workflow_run.archive_paid_plan_workflow_run import (
ArchiveResult,
ArchiveSummary,
WorkflowRunArchiver,
)
@ -32,6 +34,30 @@ class FakeArchiveStorage:
return sorted(key for key in self.objects if key.startswith(prefix))
def _db_disconnect_error() -> OperationalError:
return OperationalError(
"select 1",
{},
RuntimeError("server closed the connection unexpectedly"),
connection_invalidated=True,
)
def _run(run_id: str = "run-1"):
run = MagicMock()
run.id = run_id
run.tenant_id = "tenant-1"
run.created_at = datetime.datetime(2025, 3, 15, 10, 0, 0)
return run
def _session_context(session):
context = MagicMock()
context.__enter__.return_value = session
context.__exit__.return_value = False
return context
class TestWorkflowRunArchiverInit:
def test_start_from_without_end_before_raises(self):
with pytest.raises(ValueError, match="start_from and end_before must be provided together"):
@ -139,6 +165,32 @@ class TestWorkflowRunArchiverInit:
repo.get_runs_batch_by_time_range.assert_called_once()
assert repo.get_runs_batch_by_time_range.call_args.kwargs["tenant_ids"] == ["tenant-b"]
def test_get_runs_batch_retries_retryable_db_disconnect(self):
repo = MagicMock()
repo.get_runs_batch_by_time_range.side_effect = [_db_disconnect_error(), []]
archiver = WorkflowRunArchiver(workflow_run_repo=repo)
with patch("services.retention.workflow_run.db_retry.time.sleep") as sleep:
runs = archiver._get_runs_batch(None)
assert runs == []
assert repo.get_runs_batch_by_time_range.call_count == 2
sleep.assert_called_once_with(1.0)
def test_get_runs_batch_does_not_retry_non_db_broken_pipe_error(self):
repo = MagicMock()
repo.get_runs_batch_by_time_range.side_effect = RuntimeError("broken pipe")
archiver = WorkflowRunArchiver(workflow_run_repo=repo)
with (
patch("services.retention.workflow_run.db_retry.time.sleep") as sleep,
pytest.raises(RuntimeError, match="broken pipe"),
):
archiver._get_runs_batch(None)
repo.get_runs_batch_by_time_range.assert_called_once()
sleep.assert_not_called()
def test_start_message_includes_shard(self):
archiver = WorkflowRunArchiver(tenant_prefixes=["0"], run_shard_index=1, run_shard_total=4)
@ -351,6 +403,72 @@ class TestDryRunArchive:
assert summary.table_stats["workflow_app_logs"].size_bytes == 32
class TestArchiveDbRetry:
def test_archive_bundle_groups_retries_with_fresh_session(self):
archiver = WorkflowRunArchiver(days=90)
run = _run()
session_maker = MagicMock(
side_effect=[
_session_context(MagicMock(name="session-1")),
_session_context(MagicMock(name="session-2")),
]
)
success = ArchiveResult(
bundle_id=archiver._build_bundle_identity([run]).bundle_id,
tenant_id=run.tenant_id,
object_prefix=archiver._build_bundle_identity([run]).object_prefix,
run_count=1,
success=True,
)
with (
patch.object(archiver, "_archive_bundle", side_effect=[_db_disconnect_error(), success]) as archive_bundle,
patch("services.retention.workflow_run.db_retry.time.sleep") as sleep,
):
results = archiver._archive_bundle_groups(session_maker, MagicMock(), [[run]])
assert results == [success]
assert archive_bundle.call_count == 2
assert session_maker.call_count == 2
sleep.assert_called_once_with(1.0)
def test_archive_bundle_groups_returns_failed_result_after_retry_exhaustion(self):
archiver = WorkflowRunArchiver(days=90)
run = _run()
session_maker = MagicMock(
side_effect=[
_session_context(MagicMock(name="session-1")),
_session_context(MagicMock(name="session-2")),
_session_context(MagicMock(name="session-3")),
]
)
with (
patch.object(archiver, "_archive_bundle", side_effect=[_db_disconnect_error()] * 3) as archive_bundle,
patch("services.retention.workflow_run.db_retry.time.sleep") as sleep,
):
results = archiver._archive_bundle_groups(session_maker, MagicMock(), [[run]])
assert len(results) == 1
assert results[0].success is False
assert "server closed the connection unexpectedly" in (results[0].error or "")
assert archive_bundle.call_count == archiver.DB_RETRY_ATTEMPTS
assert session_maker.call_count == archiver.DB_RETRY_ATTEMPTS
assert sleep.call_count == archiver.DB_RETRY_ATTEMPTS - 1
def test_archive_bundle_uses_safe_rollback_when_failure_rolls_back_badly(self):
archiver = WorkflowRunArchiver(days=90, dry_run=True)
session = MagicMock()
session.rollback.side_effect = RuntimeError("rollback failed")
with patch.object(archiver, "_extract_bundle_data", side_effect=RuntimeError("extract failed")):
result = archiver._archive_bundle(session, None, [_run()])
assert result.success is False
assert result.error == "extract failed"
session.rollback.assert_called_once()
class TestArchiveRunIdempotency:
def _index_payload(self, archiver: WorkflowRunArchiver, run_ids: list[str], run) -> tuple[str, bytes]:
identity = archiver._build_bundle_identity([run])

View File

@ -127,6 +127,9 @@ class TestWebhookService:
db_session_with_containers.add(workflow)
db_session_with_containers.flush()
app.workflow_id = workflow.id
db_session_with_containers.flush()
# Create webhook trigger
webhook_id = fake.uuid4()[:16]
webhook_trigger = WorkflowWebhookTrigger(

View File

@ -2,6 +2,7 @@ from __future__ import annotations
import json
import logging
from datetime import datetime
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from uuid import uuid4
@ -240,6 +241,40 @@ class TestWebhookServiceLookupWithContainers:
with pytest.raises(ValueError, match="Workflow not found"):
WebhookService.get_webhook_trigger_and_workflow(webhook_trigger.webhook_id)
def test_get_webhook_trigger_and_workflow_uses_app_workflow_id(
self, db_session_with_containers: Session, flask_app_with_containers: Flask
):
del flask_app_with_containers
factory = WebhookServiceRelationshipFactory
account, tenant = factory.create_account_and_tenant(db_session_with_containers)
app = factory.create_app(db_session_with_containers, tenant, account)
current_workflow = factory.create_workflow(
db_session_with_containers, app=app, account=account, node_ids=["node-1"], version="2026-04-14.001"
)
newer_workflow = factory.create_workflow(
db_session_with_containers, app=app, account=account, node_ids=["node-1"], version="2026-04-15.001"
)
current_workflow.created_at = datetime(2026, 4, 14)
newer_workflow.created_at = datetime(2026, 4, 15)
app.workflow_id = current_workflow.id
db_session_with_containers.commit()
webhook_trigger = factory.create_webhook_trigger(
db_session_with_containers, app=app, account=account, node_id="node-1"
)
factory.create_app_trigger(
db_session_with_containers, app=app, node_id="node-1", status=AppTriggerStatus.ENABLED
)
got_trigger, got_workflow, got_node_config = WebhookService.get_webhook_trigger_and_workflow(
webhook_trigger.webhook_id
)
assert got_trigger.id == webhook_trigger.id
assert got_workflow.id == current_workflow.id
assert got_workflow.id != newer_workflow.id
assert got_node_config["id"] == "node-1"
def test_get_webhook_trigger_and_workflow_returns_debug_draft_workflow(
self, db_session_with_containers: Session, flask_app_with_containers: Flask
):

View File

@ -0,0 +1,139 @@
import datetime
from unittest.mock import MagicMock
import click
import pytest
from sqlalchemy.exc import OperationalError
from commands import retention
def _db_disconnect_error() -> OperationalError:
return OperationalError(
"select 1",
{},
RuntimeError("server closed the connection unexpectedly"),
connection_invalidated=True,
)
def _session_context(session):
context = MagicMock()
context.__enter__.return_value = session
context.__exit__.return_value = False
return context
def test_resolve_archive_tenant_ids_from_plan_uses_explicit_sessions(monkeypatch):
end_before = datetime.datetime(2025, 4, 1, tzinfo=datetime.UTC)
sessions = [MagicMock(name="session-a"), MagicMock(name="session-b")]
session_maker = MagicMock(side_effect=[_session_context(sessions[0]), _session_context(sessions[1])])
calls = []
def get_candidate_tenants(session, prefix, *, start_from, end_before):
calls.append((session, prefix, start_from, end_before))
return [f"{prefix}-paid", f"{prefix}-free"]
monkeypatch.setattr(retention, "_get_archive_candidate_tenant_ids_by_prefix", get_candidate_tenants)
monkeypatch.setattr(
retention,
"_filter_paid_workflow_archive_tenant_ids",
lambda tenant_ids: (["a-paid", "b-paid"], ["a-free", "b-free"]),
)
tenant_plan = retention._resolve_archive_tenant_ids_from_plan(
session_maker=session_maker,
tenant_ids=None,
tenant_prefixes=["a", "b"],
start_from=None,
end_before=end_before,
)
assert tenant_plan["archive_tenant_ids"] == ["a-paid", "b-paid"]
assert tenant_plan["paid_tenant_ids"] == ["a-paid", "b-paid"]
assert tenant_plan["unpaid_tenant_ids"] == ["a-free", "b-free"]
assert calls == [
(sessions[0], "a", None, end_before),
(sessions[1], "b", None, end_before),
]
def test_safe_remove_scoped_session_discards_registry_and_disposes_after_remove_error(monkeypatch):
fake_db = MagicMock()
fake_db.session.remove.side_effect = RuntimeError("server closed the connection unexpectedly")
monkeypatch.setattr(retention, "db", fake_db)
retention._safe_remove_scoped_session("archive workflow run command")
fake_db.session.remove.assert_called_once()
fake_db.session.registry.clear.assert_called_once()
fake_db.engine.dispose.assert_called_once()
def test_archive_command_db_retry_retries_retryable_db_disconnect(monkeypatch):
operation = MagicMock(side_effect=[_db_disconnect_error(), "ok"])
sleep = MagicMock()
monkeypatch.setattr("services.retention.workflow_run.db_retry.time.sleep", sleep)
result = retention._run_archive_command_db_retry("archive plan", operation)
assert result == "ok"
assert operation.call_count == 2
sleep.assert_called_once_with(1.0)
def test_archive_plan_prefix_stats_retries_count_query_with_fresh_session(monkeypatch):
end_before = datetime.datetime(2025, 4, 1, tzinfo=datetime.UTC)
sessions = [MagicMock(name="session-1"), MagicMock(name="session-2")]
sessions[0].scalar.side_effect = _db_disconnect_error()
sessions[1].scalar.side_effect = [7, 9]
session_maker = MagicMock(side_effect=[_session_context(sessions[0]), _session_context(sessions[1])])
sleep = MagicMock()
monkeypatch.setattr(
retention,
"_get_archive_candidate_tenant_ids_by_prefix",
lambda session, prefix, *, start_from, end_before: [f"{prefix}-tenant"],
)
monkeypatch.setattr("services.retention.workflow_run.db_retry.time.sleep", sleep)
stats = retention._get_archive_plan_prefix_stats(
session_maker,
"a",
start_from=None,
end_before=end_before,
)
assert stats["tenant_ids"] == ["a-tenant"]
assert stats["workflow_runs"] == 7
assert stats["workflow_node_executions"] == 9
assert session_maker.call_count == 2
sleep.assert_called_once_with(1.0)
def test_archive_workflow_runs_raises_click_exception_when_tenant_plan_fails(monkeypatch):
fake_db = MagicMock()
monkeypatch.setattr(retention, "db", fake_db)
monkeypatch.setattr(
retention,
"_resolve_archive_tenant_ids_from_plan",
MagicMock(side_effect=RuntimeError("tenant plan failed")),
)
with pytest.raises(click.ClickException, match="Failed to resolve workflow archive tenant plan"):
retention.archive_workflow_runs.callback(
tenant_ids="tenant-1",
tenant_prefixes=None,
before_days=90,
from_days_ago=None,
to_days_ago=None,
start_from=None,
end_before=None,
batch_size=10000,
workers=1,
run_shard_index=None,
run_shard_total=None,
limit=None,
dry_run=True,
delete_after_archive=False,
)

View File

@ -43,6 +43,57 @@ class TestMCPClient:
assert client.timeout is None
assert client.sse_read_timeout is None
def test_init_with_dynamic_request_headers(self):
"""Test client initialization with dynamic request headers."""
from flask import Flask
app = Flask("test")
with app.test_request_context(headers={"X-Custom-Auth": "my-secret-token", "X-User-Id": "user123"}):
client = MCPClient(
server_url="http://test.example.com",
headers={
"Authorization": "Bearer {{request.headers.X-Custom-Auth}}",
"X-Request-User": "{{request.header.X-User-Id}}",
"X-Static-Header": "static-val",
},
)
assert client.headers == {
"Authorization": "Bearer my-secret-token",
"X-Request-User": "user123",
"X-Static-Header": "static-val",
}
def test_init_with_dynamic_request_headers_missing(self):
"""Test client initialization with dynamic request headers that are missing."""
from flask import Flask
app = Flask("test")
with app.test_request_context(headers={}):
client = MCPClient(
server_url="http://test.example.com",
headers={
"Authorization": "Bearer {{request.headers.X-Custom-Auth}}",
},
)
assert client.headers == {
"Authorization": "Bearer ",
}
def test_init_with_dynamic_request_headers_no_context(self):
"""Test client initialization with dynamic request headers but no request context."""
client = MCPClient(
server_url="http://test.example.com",
headers={
"Authorization": "Bearer {{request.headers.X-Custom-Auth}}",
},
)
assert client.headers == {
"Authorization": "Bearer {{request.headers.X-Custom-Auth}}",
}
@patch("core.mcp.mcp_client.streamablehttp_client")
@patch("core.mcp.mcp_client.ClientSession")
def test_initialize_with_mcp_url(self, mock_client_session, mock_streamable_client):

View File

@ -0,0 +1,111 @@
"""Tests for the uuidv7 SQL migration's PostgreSQL 18 compatibility guard.
The migration file name is not a valid Python identifier (it starts with a date and
contains hyphens), so it is loaded directly from its path. The ``models`` import at the
top of the migration is stubbed because the migration never uses it during
``upgrade()``/``downgrade()`` and pulling in the real package would require a full app
context.
"""
import importlib.util
import sys
import types
from pathlib import Path
from unittest import mock
import pytest
MIGRATION_PATH = (
Path(__file__).resolve().parents[3]
/ "migrations"
/ "versions"
/ "2025_07_02_2332-1c9ba48be8e4_add_uuidv7_function_in_sql.py"
)
def _load_migration():
# The migration does `import models as models` but never references it, so a stub is
# enough and keeps the test free of any database/app configuration.
sys.modules.setdefault("models", types.ModuleType("models"))
spec = importlib.util.spec_from_file_location("uuidv7_pg18_migration_under_test", MIGRATION_PATH)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def _make_bind(dialect_name):
bind = mock.MagicMock()
bind.dialect.name = dialect_name
return bind
def _executed_sql(fake_op):
return [str(call.args[0]) for call in fake_op.execute.call_args_list]
@pytest.fixture
def migration():
return _load_migration()
def test_upgrade_creates_both_functions_when_native_uuidv7_absent(migration):
# PostgreSQL 13 to 17: no native pg_catalog.uuidv7(), so both functions are created.
# The DO block contains the CREATE FUNCTION guarded by IF NOT EXISTS, and
# uuidv7_boundary is created unconditionally.
bind = _make_bind("postgresql")
with mock.patch.object(migration, "op") as fake_op:
fake_op.get_bind.return_value = bind
migration.upgrade()
sql = _executed_sql(fake_op)
assert any("CREATE FUNCTION public.uuidv7()" in stmt for stmt in sql)
assert any("CREATE FUNCTION public.uuidv7_boundary(timestamptz)" in stmt for stmt in sql)
def test_upgrade_skips_uuidv7_but_keeps_boundary_when_native_present(migration):
# PostgreSQL 18: native pg_catalog.uuidv7() exists, so the DO block must guard
# the CREATE FUNCTION with an IF NOT EXISTS check against pg_catalog.
# uuidv7_boundary is still missing and has to be created unconditionally.
bind = _make_bind("postgresql")
with mock.patch.object(migration, "op") as fake_op:
fake_op.get_bind.return_value = bind
migration.upgrade()
sql = _executed_sql(fake_op)
# The DO block must contain the pg_catalog existence check.
do_block = next((stmt for stmt in sql if "DO $do$" in stmt), None)
assert do_block is not None
assert "pg_catalog" in do_block
assert "uuidv7" in do_block
assert "IF NOT EXISTS" in do_block
# uuidv7_boundary is always created (not guarded by the DO block).
assert any("CREATE FUNCTION public.uuidv7_boundary(timestamptz)" in stmt for stmt in sql)
def test_upgrade_is_noop_on_non_postgres(migration):
bind = _make_bind("sqlite")
with mock.patch.object(migration, "op") as fake_op:
fake_op.get_bind.return_value = bind
migration.upgrade()
fake_op.execute.assert_not_called()
def test_downgrade_uses_if_exists_and_public_schema(migration):
bind = _make_bind("postgresql")
with mock.patch.object(migration, "op") as fake_op:
fake_op.get_bind.return_value = bind
migration.downgrade()
sql = _executed_sql(fake_op)
assert "DROP FUNCTION IF EXISTS public.uuidv7()" in sql
assert "DROP FUNCTION IF EXISTS public.uuidv7_boundary(timestamptz)" in sql
def test_downgrade_is_noop_on_non_postgres(migration):
bind = _make_bind("sqlite")
with mock.patch.object(migration, "op") as fake_op:
fake_op.get_bind.return_value = bind
migration.downgrade()
fake_op.execute.assert_not_called()

View File

@ -633,6 +633,8 @@ class TestMyPermissions:
assert "dataset.acl.preview" in out.workspace.permission_keys
assert "app.acl.preview" in out.app.default_permission_keys
assert "dataset.acl.preview" in out.dataset.default_permission_keys
if role == "editor":
assert "app.acl.log_and_annotation" in out.app.default_permission_keys
@pytest.mark.parametrize(
("role", "expected_snippet_keys"),

View File

@ -639,8 +639,8 @@ def test_update_multimodel_vector_adds_bindings_and_vectors_and_skips_missing_up
assert len(bindings) == 1
assert bindings[0]["attachment_id"] == "file-1"
vector_instance.add_texts.assert_called_once()
documents = vector_instance.add_texts.call_args.args[0]
vector_instance.create_multimodal.assert_called_once()
documents = vector_instance.create_multimodal.call_args.args[0]
assert len(documents) == 1
assert documents[0].page_content == "img.png"
assert documents[0].metadata["doc_id"] == "file-1"

View File

@ -98,7 +98,7 @@ class TestDispatchTriggeredWorkflow:
),
patch.object(
trigger_processing_tasks_module,
"_get_latest_workflows_by_app_ids",
"_get_published_workflows_by_app_ids",
) as get_workflows,
patch.object(
trigger_processing_tasks_module.EndUserService,

View File

@ -232,7 +232,20 @@ describe('Billing Page + Plan Integration', () => {
// Verify billing URL button visibility and behavior
describe('Billing URL button', () => {
it('should show billing button when subscription management permission is granted', () => {
it('should show billing button when manager has subscription management permission', () => {
setupProviderContext({ type: Plan.sandbox })
setupAppContext({
isCurrentWorkspaceManager: true,
workspacePermissionKeys: ['billing.subscription.manage'],
})
render(<Billing />)
expect(screen.getByText(/viewBillingTitle/i)).toBeInTheDocument()
expect(screen.getByText(/viewBillingAction/i)).toBeInTheDocument()
})
it('should hide billing button when subscription management permission is granted without manager role', () => {
setupProviderContext({ type: Plan.sandbox })
setupAppContext({
isCurrentWorkspaceManager: false,
@ -241,8 +254,7 @@ describe('Billing Page + Plan Integration', () => {
render(<Billing />)
expect(screen.getByText(/viewBillingTitle/i)).toBeInTheDocument()
expect(screen.getByText(/viewBillingAction/i)).toBeInTheDocument()
expect(screen.queryByText(/viewBillingTitle/i)).not.toBeInTheDocument()
})
it('should hide billing button when subscription management permission is missing', () => {

View File

@ -21,6 +21,7 @@ let mockChatConversationDetail: Record<string, unknown> | undefined
let mockCompletionConversationDetail: Record<string, unknown> | undefined
let mockShowMessageLogModal = false
let mockShowPromptLogModal = false
let mockShowAgentLogModal = false
let mockCurrentLogItem: Record<string, unknown> | undefined
let mockCurrentLogModalActiveTab = 'messages'
@ -81,6 +82,7 @@ vi.mock('@/app/components/app/store', () => ({
setShowAgentLogModal: mockSetShowAgentLogModal,
setShowMessageLogModal: mockSetShowMessageLogModal,
showPromptLogModal: mockShowPromptLogModal,
showAgentLogModal: mockShowAgentLogModal,
currentLogModalActiveTab: mockCurrentLogModalActiveTab,
}),
}))
@ -126,6 +128,7 @@ vi.mock('@/app/components/base/chat/chat', () => ({
onAnnotationEdited,
onAnnotationRemoved,
switchSibling,
hideLogModal,
}: {
chatList: Array<{ id: string }>
onFeedback: (mid: string, value: { rating: string, content?: string }) => Promise<boolean>
@ -133,8 +136,9 @@ vi.mock('@/app/components/base/chat/chat', () => ({
onAnnotationEdited: (query: string, answer: string, index: number) => void
onAnnotationRemoved: (index: number) => Promise<boolean>
switchSibling: (siblingMessageId: string) => void
hideLogModal?: boolean
}) => (
<div data-testid="chat-panel">
<div data-testid="chat-panel" data-hide-log-modal={String(hideLogModal)}>
<div>{chatList.length}</div>
<button onClick={() => void onFeedback('message-1', { rating: 'like', content: 'nice' })}>chat-feedback</button>
<button onClick={() => onAnnotationAdded('annotation-2', 'Admin', 'Edited question', 'Edited answer', 1)}>chat-add-annotation</button>
@ -145,6 +149,14 @@ vi.mock('@/app/components/base/chat/chat', () => ({
),
}))
vi.mock('@/app/components/base/agent-log-modal', () => ({
default: ({ floating, onCancel }: { floating?: boolean, onCancel: () => void }) => (
<div data-testid="agent-log-modal" data-floating={String(floating)}>
<button onClick={onCancel}>close-agent-log-modal</button>
</div>
),
}))
vi.mock('@/app/components/base/message-log-modal', () => ({
default: ({ onCancel }: { onCancel: () => void }) => (
<div data-testid="message-log-modal">
@ -255,6 +267,7 @@ describe('ConversationList', () => {
mockCompletionConversationDetail = undefined
mockShowMessageLogModal = false
mockShowPromptLogModal = false
mockShowAgentLogModal = false
mockCurrentLogItem = undefined
mockCurrentLogModalActiveTab = 'messages'
mockDelAnnotation.mockResolvedValue(undefined)
@ -383,6 +396,7 @@ describe('ConversationList', () => {
expect(screen.getByTestId('var-panel')).toHaveTextContent('query:Latest question')
expect(screen.getByTestId('model-info')).toHaveTextContent('gpt-4o')
expect(screen.getByTestId('chat-panel')).toHaveAttribute('data-hide-log-modal', 'true')
expect(screen.getByTestId('message-log-modal')).toBeInTheDocument()
fireEvent.click(screen.getByText('chat-feedback'))
@ -399,6 +413,61 @@ describe('ConversationList', () => {
})
})
it('should mount agent log modals from the detail panel instead of the nested chat layout', async () => {
mockChatConversationDetail = {
id: 'conversation-1',
created_at: 1710000000,
model_config: {
model: 'gpt-4o',
configs: {
introduction: 'Hello there',
},
user_input_form: [],
},
message: {
inputs: {},
},
}
mockShowAgentLogModal = true
mockCurrentLogItem = {
id: 'message-1',
conversationId: 'conversation-1',
}
mockFetchChatMessages.mockResolvedValue({
data: [
{
id: 'message-1',
answer: 'Assistant reply',
query: 'Latest question',
created_at: 1710000000,
inputs: {},
feedbacks: [],
message: [],
message_files: [],
agent_thoughts: [{ id: 'thought-1' }],
},
],
has_more: false,
})
renderConversationList({
searchParams: '?page=2&conversation_id=conversation-1',
})
await waitFor(() => {
expect(screen.getByTestId('chat-panel')).toBeInTheDocument()
})
expect(screen.getByTestId('chat-panel')).toHaveAttribute('data-hide-log-modal', 'true')
expect(screen.getByTestId('agent-log-modal')).toBeInTheDocument()
expect(screen.getByTestId('agent-log-modal')).toHaveAttribute('data-floating', 'true')
fireEvent.click(screen.getByText('close-agent-log-modal'))
expect(mockSetCurrentLogItem).toHaveBeenCalled()
expect(mockSetShowAgentLogModal).toHaveBeenCalledWith(false)
})
it('should render completion details and refetch after feedback updates', async () => {
mockCompletionConversationDetail = {
id: 'conversation-1',
@ -424,7 +493,7 @@ describe('ConversationList', () => {
},
}
mockShowPromptLogModal = true
mockCurrentLogItem = { id: 'log-2' }
mockCurrentLogItem = { id: 'log-2', log: [{ role: 'user', text: 'Prompt body' }] }
renderConversationList({
appDetail: { id: 'app-1', mode: AppModeEnum.COMPLETION } as any,
@ -626,7 +695,7 @@ describe('ConversationList', () => {
},
}
mockShowPromptLogModal = true
mockCurrentLogItem = { id: 'log-2' }
mockCurrentLogItem = { id: 'log-2', log: [{ role: 'user', text: 'Prompt body' }] }
renderConversationList({
appDetail: { id: 'app-1', mode: AppModeEnum.COMPLETION } as any,

View File

@ -36,6 +36,7 @@ import ModelInfo from '@/app/components/app/log/model-info'
import { useStore as useAppStore } from '@/app/components/app/store'
import TextGeneration from '@/app/components/app/text-generate/item'
import ActionButton from '@/app/components/base/action-button'
import AgentLogModal from '@/app/components/base/agent-log-modal'
import Chat from '@/app/components/base/chat/chat'
import CopyIcon from '@/app/components/base/copy-icon'
import Loading from '@/app/components/base/loading'
@ -165,13 +166,25 @@ function DetailPanel({ detail, onFeedback }: IDetailPanel) {
})
const { formatTime } = useTimestamp()
const { onClose, appDetail } = useContext(DrawerContext)
const { currentLogItem, setCurrentLogItem, showMessageLogModal, setShowMessageLogModal, showPromptLogModal, setShowPromptLogModal, currentLogModalActiveTab } = useAppStore(useShallow((state: AppStoreState) => ({
const {
currentLogItem,
setCurrentLogItem,
showMessageLogModal,
setShowMessageLogModal,
showPromptLogModal,
setShowPromptLogModal,
showAgentLogModal,
setShowAgentLogModal,
currentLogModalActiveTab,
} = useAppStore(useShallow((state: AppStoreState) => ({
currentLogItem: state.currentLogItem,
setCurrentLogItem: state.setCurrentLogItem,
showMessageLogModal: state.showMessageLogModal,
setShowMessageLogModal: state.setShowMessageLogModal,
showPromptLogModal: state.showPromptLogModal,
setShowPromptLogModal: state.setShowPromptLogModal,
showAgentLogModal: state.showAgentLogModal,
setShowAgentLogModal: state.setShowAgentLogModal,
currentLogModalActiveTab: state.currentLogModalActiveTab,
})))
const { t } = useTranslation()
@ -395,6 +408,7 @@ function DetailPanel({ detail, onFeedback }: IDetailPanel) {
const isChatMode = appDetail?.mode !== AppModeEnum.COMPLETION
const isAdvanced = appDetail?.mode === AppModeEnum.ADVANCED_CHAT
const shouldShowPromptLogModal = showPromptLogModal && !!currentLogItem?.log
const varList = getDetailVarList(detail, varValues)
const message_files = getCompletionMessageFiles(detail, isChatMode)
@ -507,6 +521,7 @@ function DetailPanel({ detail, onFeedback }: IDetailPanel) {
noChatInput
showPromptLog
hideProcessDetail
hideLogModal
chatContainerInnerClassName="px-3"
switchSibling={switchSibling}
/>
@ -546,6 +561,7 @@ function DetailPanel({ detail, onFeedback }: IDetailPanel) {
noChatInput
showPromptLog
hideProcessDetail
hideLogModal
chatContainerInnerClassName="px-3"
switchSibling={switchSibling}
/>
@ -574,7 +590,18 @@ function DetailPanel({ detail, onFeedback }: IDetailPanel) {
/>
</WorkflowContextProvider>
)}
{!isChatMode && showPromptLogModal && (
{showAgentLogModal && (
<AgentLogModal
floating
width={width}
currentLogItem={currentLogItem}
onCancel={() => {
setCurrentLogItem()
setShowAgentLogModal(false)
}}
/>
)}
{shouldShowPromptLogModal && (
<PromptLogModal
width={width}
currentLogItem={currentLogItem}

View File

@ -119,6 +119,17 @@ describe('AgentLogModal', () => {
})
})
it('should render the floating modal through a dialog portal', () => {
vi.mocked(fetchAgentLogDetail).mockReturnValue(new Promise(() => {}))
const { container } = render(<AgentLogModal {...mockProps} floating />)
const modal = screen.getByRole('dialog')
expect(container).not.toContainElement(modal)
expect(document.body).toContainElement(modal)
expect(modal).toHaveClass('fixed', 'z-50', 'w-[480px]!', 'left-[max(8px,calc(100vw-1136px))]!')
})
it('should call onCancel when close button is clicked', () => {
vi.mocked(fetchAgentLogDetail).mockReturnValue(new Promise(() => {}))
@ -158,4 +169,18 @@ describe('AgentLogModal', () => {
expect(mockProps.onCancel).not.toHaveBeenCalled()
})
it('should not use click-away to close the floating dialog', () => {
vi.mocked(fetchAgentLogDetail).mockReturnValue(new Promise(() => {}))
let clickAwayHandler!: (event: Event) => void
vi.mocked(useClickAway).mockImplementation((callback) => {
clickAwayHandler = callback
})
render(<AgentLogModal {...mockProps} floating />)
clickAwayHandler(new Event('click'))
expect(mockProps.onCancel).not.toHaveBeenCalled()
})
})

View File

@ -1,6 +1,7 @@
import type { FC } from 'react'
import type { IChatItem } from '@/app/components/base/chat/chat/type'
import { cn } from '@langgenius/dify-ui/cn'
import { Dialog, DialogContent, DialogTitle } from '@langgenius/dify-ui/dialog'
import { RiCloseLine } from '@remixicon/react'
import { useClickAway } from 'ahooks'
import { useEffect, useRef, useState } from 'react'
@ -10,11 +11,13 @@ import AgentLogDetail from './detail'
type AgentLogModalProps = Readonly<{
currentLogItem?: IChatItem
width: number
floating?: boolean
onCancel: () => void
}>
const AgentLogModal: FC<AgentLogModalProps> = ({
currentLogItem,
width,
floating,
onCancel,
}) => {
const { t } = useTranslation()
@ -22,7 +25,7 @@ const AgentLogModal: FC<AgentLogModalProps> = ({
const [mounted, setMounted] = useState(false)
useClickAway(() => {
if (mounted)
if (mounted && !floating)
onCancel()
}, ref)
@ -33,6 +36,44 @@ const AgentLogModal: FC<AgentLogModalProps> = ({
if (!currentLogItem || !currentLogItem.conversationId)
return null
const detailContent = (
<>
<AgentLogDetail
conversationID={currentLogItem.conversationId}
messageID={currentLogItem.id}
log={currentLogItem}
/>
</>
)
if (floating) {
return (
<Dialog
open
onOpenChange={(open) => {
if (!open)
onCancel()
}}
>
<DialogContent
backdropClassName="bg-transparent!"
className="top-16! bottom-4! left-[max(8px,calc(100vw-1136px))]! flex max-h-none! w-[480px]! max-w-[calc(100vw-16px)]! translate-x-0! translate-y-0! flex-col overflow-hidden! rounded-xl! border-[0.5px]! border-components-panel-border! bg-components-panel-bg! p-0! pt-3! pb-3! shadow-xl!"
>
<DialogTitle className="text-md shrink-0 px-4 py-1 font-semibold text-text-primary">{t('runDetail.workflowTitle', { ns: 'appLog' })}</DialogTitle>
<button
type="button"
aria-label={t('operation.close', { ns: 'common' })}
className="absolute top-4 right-3 z-20 cursor-pointer border-none bg-transparent p-1 focus-visible:ring-1 focus-visible:ring-components-input-border-active focus-visible:outline-hidden"
onClick={onCancel}
>
<RiCloseLine className="size-4 text-text-tertiary" aria-hidden="true" />
</button>
{detailContent}
</DialogContent>
</Dialog>
)
}
return (
<div
className={cn('relative z-10 flex flex-col rounded-xl border-[0.5px] border-components-panel-border bg-components-panel-bg py-3 shadow-xl')}
@ -54,11 +95,7 @@ const AgentLogModal: FC<AgentLogModalProps> = ({
>
<RiCloseLine className="size-4 text-text-tertiary" aria-hidden="true" />
</button>
<AgentLogDetail
conversationID={currentLogItem.conversationId}
messageID={currentLogItem.id}
log={currentLogItem}
/>
{detailContent}
</div>
)
}

View File

@ -6,6 +6,7 @@ let fetching = false
let isManager = true
let enableBilling = true
let workspacePermissionKeys: string[] = ['billing.subscription.manage']
let billingUrlEnabled = false
const refetchMock = vi.fn()
const openAsyncWindowMock = vi.fn()
@ -19,11 +20,14 @@ type BillingWindowOptions = {
type OpenAsyncWindowCall = [BillingUrlCallback, BillingWindowOptions]
vi.mock('@/service/use-billing', () => ({
useBillingUrl: () => ({
data: currentBillingUrl,
isFetching: fetching,
refetch: refetchMock,
}),
useBillingUrl: (enabled: boolean) => {
billingUrlEnabled = enabled
return {
data: currentBillingUrl,
isFetching: fetching,
refetch: refetchMock,
}
},
}))
vi.mock('@/hooks/use-async-window-open', () => ({
@ -54,28 +58,32 @@ describe('Billing', () => {
fetching = false
isManager = true
enableBilling = true
billingUrlEnabled = false
workspacePermissionKeys = ['billing.subscription.manage']
refetchMock.mockResolvedValue({ data: 'https://billing' })
})
it('shows the billing action when subscription management permission is granted without manager role', () => {
it('hides the billing action when subscription management permission is granted without manager role', () => {
isManager = false
render(<Billing />)
expect(screen.getByRole('button', { name: /billing\.viewBillingTitle/ })).toBeInTheDocument()
expect(screen.queryByRole('button', { name: /billing\.viewBillingTitle/ })).not.toBeInTheDocument()
expect(billingUrlEnabled).toBe(false)
})
it('hides the billing action when subscription management permission is missing or billing is disabled', () => {
workspacePermissionKeys = []
render(<Billing />)
expect(screen.queryByRole('button', { name: /billing\.viewBillingTitle/ })).not.toBeInTheDocument()
expect(billingUrlEnabled).toBe(false)
vi.clearAllMocks()
workspacePermissionKeys = ['billing.subscription.manage']
enableBilling = false
render(<Billing />)
expect(screen.queryByRole('button', { name: /billing\.viewBillingTitle/ })).not.toBeInTheDocument()
expect(billingUrlEnabled).toBe(false)
})
it('opens the billing window with the immediate url when the button is clicked', async () => {

View File

@ -11,9 +11,9 @@ import PlanComp from '../plan'
const Billing: FC = () => {
const { t } = useTranslation()
const { workspacePermissionKeys } = useAppContext()
const { isCurrentWorkspaceManager, workspacePermissionKeys } = useAppContext()
const { enableBilling } = useProviderContext()
const canManageBillingSubscription = hasPermission(workspacePermissionKeys, BillingPermission.SubscriptionManage)
const canManageBillingSubscription = isCurrentWorkspaceManager && hasPermission(workspacePermissionKeys, BillingPermission.SubscriptionManage)
const { data: billingUrl, isFetching, refetch } = useBillingUrl(enableBilling && canManageBillingSubscription)
const openAsyncWindow = useAsyncWindowOpen()

View File

@ -13,6 +13,8 @@ import { Input } from '@langgenius/dify-ui/input'
import { Textarea } from '@langgenius/dify-ui/textarea'
import { useState } from 'react'
import { useTranslation } from 'react-i18next'
import { useLocale } from '@/context/i18n'
import { getDocLanguage } from '@/i18n-config/language'
import PermissionPicker from './permission-picker'
export type PermissionSetModalMode = 'create' | 'edit' | 'view'
@ -42,6 +44,8 @@ const PermissionSetModalBody = ({
onSubmit,
}: PermissionSetModalBodyProps) => {
const { t } = useTranslation()
const locale = useLocale()
const docLanguage = getDocLanguage(locale)
const [name, setName] = useState(initialValues?.name ?? '')
const [description, setDescription] = useState(initialValues?.description ?? '')
const [permissionKeys, setPermissionKeys] = useState<string[]>(initialValues?.permissionKeys ?? [])
@ -122,7 +126,7 @@ const PermissionSetModalBody = ({
<div className="flex shrink-0 items-center justify-between gap-3 border-t border-divider-subtle px-6 py-4">
<a
href="https://docs.dify.ai/"
href={`https://enterprise-docs.dify.ai/${docLanguage}/3.11.x/use/workspace/permission-reference`}
target="_blank"
rel="noreferrer"
className="inline-flex items-center gap-1 system-xs-medium text-text-accent hover:underline"

View File

@ -13,6 +13,8 @@ import { Input } from '@langgenius/dify-ui/input'
import { Textarea } from '@langgenius/dify-ui/textarea'
import { useCallback, useState } from 'react'
import { useTranslation } from 'react-i18next'
import { useLocale } from '@/context/i18n'
import { getDocLanguage } from '@/i18n-config/language'
import PermissionField from './permission-field'
export type RoleModalMode = 'create' | 'view' | 'edit'
@ -39,6 +41,8 @@ const RoleModal = ({
onSubmit,
}: RoleModalProps) => {
const { t } = useTranslation()
const locale = useLocale()
const docLanguage = getDocLanguage(locale)
const [name, setName] = useState(role?.name ?? '')
const [desc, setDesc] = useState(role?.description ?? '')
const [permissionKeys, setPermissionKeys] = useState<string[]>(role?.permission_keys ?? [])
@ -116,7 +120,7 @@ const RoleModal = ({
</div>
<div className="flex shrink-0 items-center justify-between gap-3 border-t border-divider-subtle px-6 py-4">
<a
href="https://docs.dify.ai/"
href={`https://enterprise-docs.dify.ai/${docLanguage}/3.11.x/use/workspace/permission-reference`}
target="_blank"
rel="noreferrer"
className="inline-flex items-center gap-1 system-xs-medium text-text-accent hover:underline"

View File

@ -50,5 +50,5 @@
"snippets.management": "إدارة المقتطفات",
"tool.manage": "إدارة الأدوات",
"workspace.member.manage": "إدارة الأعضاء",
"workspace.role.manage": "إدارة أذونات الأدوار وقواعد الوصول إلى الموارد"
"workspace.role.manage": "إدارة الأدوار ومجموعات الأذونات"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "Snippets verwalten",
"tool.manage": "Tools verwalten",
"workspace.member.manage": "Mitglieder verwalten",
"workspace.role.manage": "Rollenberechtigungen und Ressourcenzugriffsregeln verwalten"
"workspace.role.manage": "Rollen und Berechtigungssätze verwalten"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "Manage snippets",
"tool.manage": "Manage tools",
"workspace.member.manage": "Manage members",
"workspace.role.manage": "Manage role permissions and resource access rules"
"workspace.role.manage": "Manage roles and permission sets"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "Gestionar fragmentos",
"tool.manage": "Gestionar herramientas",
"workspace.member.manage": "Gestionar miembros",
"workspace.role.manage": "Gestionar permisos de roles y reglas de acceso a recursos"
"workspace.role.manage": "Gestionar roles y conjuntos de permisos"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "مدیریت قطعه‌کدها",
"tool.manage": "مدیریت ابزارها",
"workspace.member.manage": "مدیریت اعضا",
"workspace.role.manage": "مدیریت مجوزهای نقش و قوانین دسترسی به منابع"
"workspace.role.manage": "مدیریت نقش‌ها و مجموعه‌های مجوز"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "Gérer les extraits",
"tool.manage": "Gérer les outils",
"workspace.member.manage": "Gérer les membres",
"workspace.role.manage": "Gérer les autorisations de rôles et les règles d'accès aux ressources"
"workspace.role.manage": "Gérer les rôles et les ensembles d'autorisations"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "स्निपेट प्रबंधित करें",
"tool.manage": "टूल प्रबंधित करें",
"workspace.member.manage": "सदस्य प्रबंधित करें",
"workspace.role.manage": "भूमिका अनुमतियाँ और संसाधन एक्सेस नियम प्रबंधित करें"
"workspace.role.manage": "भूमिकाएँ और अनुमति सेट प्रबंधित करें"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "Kelola snippet",
"tool.manage": "Kelola alat",
"workspace.member.manage": "Kelola anggota",
"workspace.role.manage": "Kelola izin peran dan aturan akses sumber daya"
"workspace.role.manage": "Kelola peran dan set izin"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "Gestisci snippet",
"tool.manage": "Gestisci strumenti",
"workspace.member.manage": "Gestisci i membri",
"workspace.role.manage": "Gestisci i permessi dei ruoli e le regole di accesso alle risorse"
"workspace.role.manage": "Gestisci ruoli e set di autorizzazioni"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "スニペットを管理",
"tool.manage": "ツールを管理",
"workspace.member.manage": "メンバーを管理",
"workspace.role.manage": "ロール権限とリソースアクセスルールを管理"
"workspace.role.manage": "ロール権限セットを管理"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "스니펫 관리",
"tool.manage": "도구 관리",
"workspace.member.manage": "멤버 관리",
"workspace.role.manage": "역할 권한 및 리소스 접근 규칙 관리"
"workspace.role.manage": "역할 권한 세트 관리"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "Snippets beheren",
"tool.manage": "Tools beheren",
"workspace.member.manage": "Leden beheren",
"workspace.role.manage": "Rolrechten en regels voor resourcetoegang beheren"
"workspace.role.manage": "Rollen en machtigingensets beheren"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "Zarządzaj fragmentami kodu",
"tool.manage": "Zarządzaj narzędziami",
"workspace.member.manage": "Zarządzaj członkami",
"workspace.role.manage": "Zarządzaj uprawnieniami ról i regułami dostępu do zasobów"
"workspace.role.manage": "Zarządzaj rolami i zestawami uprawnień"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "Gerenciar snippets",
"tool.manage": "Gerenciar ferramentas",
"workspace.member.manage": "Gerenciar membros",
"workspace.role.manage": "Gerenciar permissões de função e regras de acesso a recursos"
"workspace.role.manage": "Gerenciar funções e conjuntos de permissões"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "Gestionează fragmente",
"tool.manage": "Gestionează instrumente",
"workspace.member.manage": "Gestionează membrii",
"workspace.role.manage": "Gestionează permisiunile rolurilor și regulile de acces la resurse"
"workspace.role.manage": "Gestionează rolurile și seturile de permisiuni"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "Управление сниппетами",
"tool.manage": "Управление инструментами",
"workspace.member.manage": "Управление участниками",
"workspace.role.manage": "Управление правами ролей и правилами доступа к ресурсам"
"workspace.role.manage": "Управление ролями и наборами разрешений"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "Upravljanje izsekov",
"tool.manage": "Upravljanje orodij",
"workspace.member.manage": "Upravljanje članov",
"workspace.role.manage": "Upravljanje dovoljenj vlog in pravil za dostop do virov"
"workspace.role.manage": "Upravljanje vlog in naborov dovoljenj"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "จัดการสนิปเปต",
"tool.manage": "จัดการเครื่องมือ",
"workspace.member.manage": "จัดการสมาชิก",
"workspace.role.manage": "จัดการสิทธิ์บทบาทและกฎการเข้าถึงทรัพยากร"
"workspace.role.manage": "จัดการบทบาทและชุดสิทธิ์"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "Snippet'leri yönet",
"tool.manage": "Araçları yönet",
"workspace.member.manage": "Üyeleri yönet",
"workspace.role.manage": "Rol izinlerini ve kaynak erişim kurallarını yönet"
"workspace.role.manage": "Rolleri ve izin setlerini yönet"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "Керування фрагментами",
"tool.manage": "Керування інструментами",
"workspace.member.manage": "Керування учасниками",
"workspace.role.manage": "Керування дозволами ролей та правилами доступу до ресурсів"
"workspace.role.manage": "Керування ролями та наборами дозволів"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "Quản lý đoạn mã",
"tool.manage": "Quản lý công cụ",
"workspace.member.manage": "Quản lý thành viên",
"workspace.role.manage": "Quản lý quyền vai trò và quy tắc truy cập tài nguyên"
"workspace.role.manage": "Quản lý vai trò và bộ quyn"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "管理 Snippets",
"tool.manage": "管理工具",
"workspace.member.manage": "管理成员",
"workspace.role.manage": "管理角色权限与访问权限规则"
"workspace.role.manage": "管理角色与权限集"
}

View File

@ -50,5 +50,5 @@
"snippets.management": "管理 Snippets",
"tool.manage": "管理工具",
"workspace.member.manage": "管理成員",
"workspace.role.manage": "管理角色權限與訪問權限規則"
"workspace.role.manage": "管理角色與權限集"
}