Compare commits

..

9 Commits

47 changed files with 5494 additions and 198 deletions

View File

@ -4,6 +4,12 @@ CLI command modules extracted from `commands.py`.
from .account import create_tenant, reset_email, reset_password
from .data_migrate import data_migrate, legacy_model_types
from .data_migration import (
export_migration_data,
export_migration_data_template,
import_migration_data,
migration_data_wizard,
)
from .plugin import (
extract_plugins,
extract_unique_plugins,
@ -26,7 +32,12 @@ from .retention import (
restore_workflow_runs,
)
from .storage import clear_orphaned_file_records, file_usage, migrate_oss, remove_orphaned_files_on_storage
from .system import convert_to_agent_apps, fix_app_site_missing, reset_encrypt_key_pair, upgrade_db
from .system import (
convert_to_agent_apps,
fix_app_site_missing,
reset_encrypt_key_pair,
upgrade_db,
)
from .vector import (
add_qdrant_index,
migrate_annotation_vector_database,
@ -48,10 +59,13 @@ __all__ = [
"data_migrate",
"delete_archived_workflow_runs",
"export_app_messages",
"export_migration_data",
"export_migration_data_template",
"extract_plugins",
"extract_unique_plugins",
"file_usage",
"fix_app_site_missing",
"import_migration_data",
"install_plugins",
"install_rag_pipeline_plugins",
"legacy_model_types",
@ -59,6 +73,7 @@ __all__ = [
"migrate_data_for_plugin",
"migrate_knowledge_vector_database",
"migrate_oss",
"migration_data_wizard",
"old_metadata_migration",
"remove_orphaned_files_on_storage",
"reset_email",

View File

@ -0,0 +1,754 @@
from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
from typing import Any, cast
from uuid import UUID
import click
import sqlalchemy as sa
import yaml
from extensions.ext_database import db
from models import Tenant
from models.model import App
from models.tools import ApiToolProvider, MCPToolProvider, WorkflowToolProvider
from services.app_dsl_service import AppDslService
from services.data_migration.dependency_discovery_service import DependencyDiscoveryService
from services.data_migration.entities import (
DependencyKind,
ImportOptions,
MigrationDataError,
ReportContext,
ResourceReportItem,
)
from services.data_migration.export_service import ExportConfigParser, MigrationExportService
from services.data_migration.import_service import ImportRequest, MigrationImportService
from services.data_migration.package_service import MigrationPackageService
from services.data_migration.report_service import MigrationReportService
ID_STRATEGY_CHOICES = ["preserve-id", "generate-new-id"]
CONFLICT_STRATEGY_CHOICES = ["fail", "skip", "update"]
SUPPORTED_WIZARD_APP_MODES = ["workflow", "advanced-chat"]
WizardToolMap = dict[str, dict[str, str | None]]
WizardToolSelection = dict[str, list[str]]
def _scripted_export_template() -> dict[str, Any]:
return {
"source_tenant": {
"mode": "single",
"id": "",
"name": "admin's Workspace",
},
"apps": {
"modes": ["workflow", "advanced-chat"],
"ids": [],
"all": True,
},
"include_referenced_tools": True,
"additional_tools": {
"api_tools": [],
"workflow_tools": [],
"mcp_tools": [],
},
"include_secrets": False,
"import_options": {
"create_app_api_token_on_import": False,
"id_strategy": "preserve-id",
"conflict_strategy": "fail",
},
}
@click.command("app-migration-template", help="Print or write a scripted export config JSON template.")
@click.option(
"--output",
"output_file",
required=False,
type=click.Path(dir_okay=False),
help="Path to write the export config JSON template. Prints to stdout when omitted.",
)
@click.option("--overwrite", is_flag=True, default=False, help="Overwrite output if it already exists.")
def export_migration_data_template(output_file: str | None, overwrite: bool) -> None:
template_json = json.dumps(_scripted_export_template(), indent=2, ensure_ascii=False) + "\n"
if output_file is None:
click.echo(template_json, nl=False)
return
path = Path(output_file)
if path.exists() and not overwrite:
raise click.ClickException(f"Output file already exists: {output_file}")
path.write_text(template_json)
click.echo(click.style(f"Output written to {output_file}", fg="green"))
@click.command("export-app-migration", help="Export workflow migration data to a versioned JSON package.")
@click.option(
"--input",
"input_file",
required=False,
type=click.Path(exists=True, dir_okay=False),
help="Path to export config JSON.",
)
@click.option(
"--output",
"output_file",
required=False,
type=click.Path(dir_okay=False),
help="Path to migration package JSON.",
)
@click.option("--overwrite", is_flag=True, default=False, help="Overwrite output if it already exists.")
def export_migration_data(input_file: str | None, output_file: str | None, overwrite: bool) -> None:
try:
_require_options(("--input", input_file), ("--output", output_file))
assert input_file is not None
assert output_file is not None
raw_config = _load_json_object(input_file, "Export config")
selection = ExportConfigParser().parse(raw_config)
result = MigrationExportService().export(selection)
MigrationPackageService().save_package(result.package, output_file, overwrite=overwrite)
click.echo(click.style(f"Output written to {output_file}", fg="green"))
_render_report(result.report_items, context=_with_output_path(result.report_context, output_file))
except MigrationDataError as exc:
raise click.ClickException(str(exc)) from exc
@click.command("import-app-migration", help="Import a versioned migration data package.")
@click.option(
"--input",
"input_file",
required=False,
type=click.Path(exists=True, dir_okay=False),
help="Path to migration package JSON.",
)
@click.option("--target-tenant", default=None, help="Target tenant/workspace name. Overrides package metadata.")
@click.option("--operator-email", default=None, help="Operator account email in the target tenant.")
@click.option(
"--id-strategy",
default=None,
type=click.Choice(ID_STRATEGY_CHOICES),
help="Override package ID strategy.",
)
@click.option(
"--conflict-strategy",
default=None,
type=click.Choice(CONFLICT_STRATEGY_CHOICES),
help="Override package conflict strategy.",
)
@click.option(
"--create-app-api-token-on-import/--no-create-app-api-token-on-import",
default=None,
help="Override package app API token creation behavior.",
)
def import_migration_data(
input_file: str | None,
target_tenant: str | None,
operator_email: str | None,
id_strategy: str | None,
conflict_strategy: str | None,
create_app_api_token_on_import: bool | None,
) -> None:
try:
_require_options(("--input", input_file))
assert input_file is not None
package = MigrationPackageService().load_package(input_file)
result = MigrationImportService().import_package(
ImportRequest(
package=package,
cli_target_tenant=target_tenant,
operator_email=operator_email,
options_override=_build_options_override(
package.metadata.import_options,
id_strategy=id_strategy,
conflict_strategy=conflict_strategy,
create_app_api_token_on_import=create_app_api_token_on_import,
),
)
)
_render_report(result.report_items, context=result.report_context)
except MigrationDataError as exc:
raise click.ClickException(str(exc)) from exc
def parse_index_selection(raw: str, values: list[str]) -> list[str]:
normalized = raw.strip().lower()
if normalized == "all":
return values
selected: list[str] = []
for part in raw.split(","):
stripped = part.strip()
if not stripped:
continue
try:
index = int(stripped)
except ValueError as exc:
raise click.ClickException(f"Selection must be 'all' or comma-separated numbers: {raw}") from exc
if index < 1 or index > len(values):
raise click.ClickException(f"Selection index out of range: {index}")
selected.append(values[index - 1])
return list(dict.fromkeys(selected))
def _print_wizard_step(title: str) -> None:
click.echo("")
click.echo(f"==== {title} ====")
def _print_wizard_substep(title: str) -> None:
click.echo("")
click.echo(f"-- {title} --")
@click.command("app-migration-wizard", help="Interactively export workflow migration data.")
def migration_data_wizard() -> None:
try:
tenant = _prompt_source_tenant()
apps = _eligible_apps_for_tenant(tenant.id)
app_ids = _prompt_app_ids(apps)
_print_wizard_step("Referenced Tools")
include_referenced_tools = click.confirm(
"Automatically export tools referenced by selected apps? [y/n, default: y]",
default=True,
show_default=False,
)
auto_tools = _discover_auto_tools([app for app in apps if app.id in set(app_ids)], include_referenced_tools)
auto_tools = _resolve_auto_tool_names(tenant.id, auto_tools)
_print_auto_tools(auto_tools)
additional_tools = _prompt_additional_tools(tenant.id, auto_tools)
include_secrets, create_tokens, id_strategy, conflict_strategy = _prompt_import_options()
_print_wizard_step("Output")
output_file, overwrite = _prompt_output_file()
selection = ExportConfigParser().parse(
{
"source_tenant": {"mode": "single", "id": tenant.id, "name": tenant.name},
"apps": {"ids": app_ids, "all": False},
"include_referenced_tools": include_referenced_tools,
"additional_tools": additional_tools,
"include_secrets": include_secrets,
"import_options": {
"create_app_api_token_on_import": create_tokens,
"id_strategy": id_strategy,
"conflict_strategy": conflict_strategy,
},
}
)
_confirm_wizard_summary(
tenant_name=tenant.name,
app_names=[app.name for app in apps if app.id in set(app_ids)],
auto_tools=auto_tools,
additional_tools=additional_tools,
manual_labels=_selected_tool_labels_for_tenant(tenant.id, additional_tools),
include_referenced_tools=include_referenced_tools,
include_secrets=include_secrets,
create_tokens=create_tokens,
id_strategy=id_strategy,
conflict_strategy=conflict_strategy,
output_file=output_file,
)
result = MigrationExportService().export(selection)
MigrationPackageService().save_package(result.package, output_file, overwrite=overwrite)
click.echo(click.style(f"Output written to {output_file}", fg="green"))
_print_wizard_step("Report")
_render_report(result.report_items, context=_with_output_path(result.report_context, output_file))
except MigrationDataError as exc:
raise click.ClickException(str(exc)) from exc
def _load_json_object(path: str, label: str) -> dict[str, Any]:
try:
with Path(path).open(encoding="utf-8") as file:
raw = json.load(file)
except json.JSONDecodeError as exc:
raise MigrationDataError(f"{label} JSON is invalid: {exc.msg}") from exc
if not isinstance(raw, dict):
raise MigrationDataError(f"{label} JSON must be an object.")
return raw
def _require_options(*options: tuple[str, object | None]) -> None:
missing_options = [name for name, value in options if value is None]
if missing_options:
raise click.UsageError(f"Missing option(s): {', '.join(missing_options)}.")
def _build_options_override(
package_options: ImportOptions,
*,
id_strategy: str | None,
conflict_strategy: str | None,
create_app_api_token_on_import: bool | None,
) -> ImportOptions | None:
if id_strategy is None and conflict_strategy is None and create_app_api_token_on_import is None:
return None
return ImportOptions.from_mapping(
{
"id_strategy": id_strategy or package_options.id_strategy,
"conflict_strategy": conflict_strategy or package_options.conflict_strategy,
"create_app_api_token_on_import": (
create_app_api_token_on_import
if create_app_api_token_on_import is not None
else package_options.create_app_api_token_on_import
),
}
)
def _prompt_source_tenant() -> Tenant:
tenants = list(db.session.scalars(sa.select(Tenant).order_by(Tenant.name.asc())).all())
if not tenants:
raise MigrationDataError("No tenants found.")
_print_wizard_step("Source Tenant")
click.echo("Source tenants:")
for index, tenant in enumerate(tenants, 1):
click.echo(f"{index}. {tenant.name} ({tenant.id})")
tenant_index = click.prompt("Select one source tenant by number", type=int, default=1, show_default=True)
if tenant_index < 1 or tenant_index > len(tenants):
raise click.ClickException(f"Selection index out of range: {tenant_index}")
return tenants[tenant_index - 1]
def _eligible_apps_for_tenant(tenant_id: str) -> list[App]:
return list(
db.session.scalars(
sa.select(App)
.where(App.tenant_id == tenant_id, App.mode.in_(SUPPORTED_WIZARD_APP_MODES))
.order_by(App.name.asc())
).all()
)
def _prompt_app_ids(apps: list[App]) -> list[str]:
if not apps:
raise MigrationDataError("No workflow or advanced-chat apps found for the selected tenant.")
_print_wizard_step("App Selection")
click.echo("Currently supported app types: workflow and chatflow.")
click.echo("Workflow/chatflow apps:")
for index, app in enumerate(apps, 1):
mode = app.mode.value if hasattr(app.mode, "value") else app.mode
click.echo(f"{index}. {app.name} [{mode}] ({app.id})")
app_ids = parse_index_selection(
click.prompt("Select apps by number, comma-separated numbers, or all", default="all"),
[app.id for app in apps],
)
selected_apps = [app for app in apps if app.id in set(app_ids)]
click.echo("Selected apps:")
for app in selected_apps:
click.echo(f"- {app.name} ({app.id})")
return app_ids
def _prompt_import_options() -> tuple[bool, bool, str, str]:
_print_wizard_step("Import Options")
_print_wizard_substep("Secrets")
click.echo("Secrets include workflow/app DSL secret values, custom API tool credentials,")
click.echo("and full MCP provider connection data such as server URL, headers, authentication, and tool list.")
click.echo("If you choose no, credentials are omitted or masked,")
click.echo("and MCP providers are exported as dependency metadata only.")
click.echo("Treat the output JSON as sensitive if you choose yes.")
include_secrets = click.confirm(
"Include secrets in output JSON? [y/n, default: n]",
default=False,
show_default=False,
)
_print_wizard_substep("App API Tokens")
click.echo("When enabled, import will create an app API token if the imported app has none,")
click.echo("or reuse an existing app API token if one already exists.")
create_tokens = click.confirm(
"Create or reuse app API tokens during import? [y/n, default: n]",
default=False,
show_default=False,
)
_print_wizard_substep("ID Strategy")
click.echo("ID strategy controls whether imported app and tool IDs preserve source IDs")
click.echo("or use target-generated IDs.")
click.echo("preserve-id: keep source IDs where the target service supports it.")
click.echo("generate-new-id: let the target environment generate new IDs and rewrite references via mapping.")
id_strategy = click.prompt(
"Import ID strategy. Enter one of: preserve-id, generate-new-id",
type=click.Choice(ID_STRATEGY_CHOICES),
default="preserve-id",
show_default=True,
)
_print_wizard_substep("Conflict Strategy")
click.echo("Conflict strategy controls what import does when a target resource already exists.")
click.echo("fail: stop at the first conflict; previously committed resources are not rolled back.")
click.echo("skip: keep the existing target resource and skip importing that resource.")
click.echo("update: update the existing target resource in place.")
conflict_strategy = click.prompt(
"Import conflict strategy. Enter one of: fail, skip, update",
type=click.Choice(CONFLICT_STRATEGY_CHOICES),
default="update",
show_default=True,
)
return include_secrets, create_tokens, id_strategy, conflict_strategy
def _discover_auto_tools(apps: list[App], include_referenced_tools: bool) -> WizardToolMap:
auto_tools: WizardToolMap = {"api_tools": {}, "workflow_tools": {}, "mcp_tools": {}}
if not include_referenced_tools:
return auto_tools
discovery_service = DependencyDiscoveryService()
for app in apps:
dsl_content = AppDslService.export_dsl(app_model=app, include_secret=False)
raw_dsl = yaml.safe_load(dsl_content) if dsl_content else {}
dsl = raw_dsl if isinstance(raw_dsl, dict) else {}
for dependency in discovery_service.discover_from_dsl(dsl):
if dependency.kind == DependencyKind.API_TOOL:
auto_tools["api_tools"][dependency.provider_name or dependency.provider_id] = dependency.provider_id
elif dependency.kind == DependencyKind.WORKFLOW_TOOL:
auto_tools["workflow_tools"][dependency.provider_name or dependency.provider_id] = (
dependency.provider_id
)
elif dependency.kind == DependencyKind.MCP_TOOL:
auto_tools["mcp_tools"][dependency.provider_name or dependency.provider_id] = dependency.provider_id
return auto_tools
def _resolve_auto_tool_names(tenant_id: str, auto_tools: WizardToolMap) -> WizardToolMap:
return {
"api_tools": _resolve_api_tool_names(tenant_id, auto_tools["api_tools"]),
"workflow_tools": _resolve_workflow_tool_names(tenant_id, auto_tools["workflow_tools"]),
"mcp_tools": _resolve_mcp_tool_names(tenant_id, auto_tools["mcp_tools"]),
}
def _resolve_api_tool_names(tenant_id: str, tools: dict[str, str | None]) -> dict[str, str | None]:
resolved: dict[str, str | None] = {}
for name, identifier in tools.items():
predicates = [ApiToolProvider.name == name]
if _is_uuid_string(identifier):
predicates.append(ApiToolProvider.id == identifier)
provider = db.session.scalar(
sa.select(ApiToolProvider).where(
ApiToolProvider.tenant_id == tenant_id,
sa.or_(*predicates),
)
)
resolved[provider.name if provider else name] = provider.id if provider else identifier
return resolved
def _resolve_workflow_tool_names(tenant_id: str, tools: dict[str, str | None]) -> dict[str, str | None]:
resolved: dict[str, str | None] = {}
for name, identifier in tools.items():
predicates = [WorkflowToolProvider.name == name]
if _is_uuid_string(identifier):
predicates.append(WorkflowToolProvider.id == identifier)
provider = db.session.scalar(
sa.select(WorkflowToolProvider).where(
WorkflowToolProvider.tenant_id == tenant_id,
sa.or_(*predicates),
)
)
resolved[provider.name if provider else name] = provider.id if provider else identifier
return resolved
def _resolve_mcp_tool_names(tenant_id: str, tools: dict[str, str | None]) -> dict[str, str | None]:
resolved: dict[str, str | None] = {}
for name, identifier in tools.items():
predicates = [MCPToolProvider.name == name]
if identifier:
predicates.append(MCPToolProvider.server_identifier == identifier)
if _is_uuid_string(identifier):
predicates.append(MCPToolProvider.id == identifier)
provider = db.session.scalar(
sa.select(MCPToolProvider).where(
MCPToolProvider.tenant_id == tenant_id,
sa.or_(*predicates),
)
)
resolved[provider.name if provider else name] = provider.id if provider else identifier
return resolved
def _is_uuid_string(value: str | None) -> bool:
if not value:
return False
try:
UUID(value)
except ValueError:
return False
return True
def _print_auto_tools(auto_tools: WizardToolMap) -> None:
_print_wizard_step("Automatically Discovered Tools")
click.echo("Automatically discovered tools:")
_print_auto_tool_category("Custom API tools", auto_tools["api_tools"])
_print_auto_tool_category("Workflow tools", auto_tools["workflow_tools"])
_print_auto_tool_category("MCP tools", auto_tools["mcp_tools"])
def _print_auto_tool_category(label: str, values: dict[str, str | None]) -> None:
click.echo(label)
if not values:
click.echo("- none")
return
for name, identifier in sorted(values.items()):
click.echo(f"- {_format_tool_name_id(name, identifier)}")
def _prompt_additional_tools(tenant_id: str, auto_tools: WizardToolMap) -> WizardToolSelection:
selections: WizardToolSelection = {"api_tools": [], "workflow_tools": [], "mcp_tools": []}
_print_wizard_step("Additional Tools")
if not click.confirm(
"Export additional tools manually? [y/n, default: n]",
default=False,
show_default=False,
):
_print_final_tool_selection(auto_tools, selections, {})
return selections
manual_labels: dict[str, str] = {}
api_tool_options = [
(tool.name, tool.name, tool.id)
for tool in db.session.scalars(
sa.select(ApiToolProvider).where(ApiToolProvider.tenant_id == tenant_id).order_by(ApiToolProvider.name)
).all()
]
selections["api_tools"] = _prompt_tool_category(
"Custom API tools",
api_tool_options,
auto_tools=auto_tools["api_tools"],
)
manual_labels.update(_selected_tool_labels(api_tool_options, selections["api_tools"]))
workflow_tool_options = [
(tool.id, tool.name, tool.id)
for tool in db.session.scalars(
sa.select(WorkflowToolProvider)
.where(WorkflowToolProvider.tenant_id == tenant_id)
.order_by(WorkflowToolProvider.name)
).all()
]
selections["workflow_tools"] = _prompt_tool_category(
"Workflow tools",
workflow_tool_options,
auto_tools=auto_tools["workflow_tools"],
)
manual_labels.update(_selected_tool_labels(workflow_tool_options, selections["workflow_tools"]))
mcp_tool_options = [
(tool.id, tool.name, tool.server_identifier)
for tool in db.session.scalars(
sa.select(MCPToolProvider).where(MCPToolProvider.tenant_id == tenant_id).order_by(MCPToolProvider.name)
).all()
]
selections["mcp_tools"] = _prompt_tool_category(
"MCP tools",
mcp_tool_options,
auto_tools=auto_tools["mcp_tools"],
)
manual_labels.update(_selected_tool_labels(mcp_tool_options, selections["mcp_tools"]))
_print_final_tool_selection(auto_tools, selections, manual_labels)
return selections
def _selected_tool_labels_for_tenant(tenant_id: str, selected_tools: WizardToolSelection) -> dict[str, str]:
labels: dict[str, str] = {}
if selected_tools["api_tools"]:
labels.update(
_selected_tool_labels(
[
(tool.name, tool.name, tool.id)
for tool in db.session.scalars(
sa.select(ApiToolProvider)
.where(ApiToolProvider.tenant_id == tenant_id)
.order_by(ApiToolProvider.name)
).all()
],
selected_tools["api_tools"],
)
)
if selected_tools["workflow_tools"]:
labels.update(
_selected_tool_labels(
[
(tool.id, tool.name, tool.id)
for tool in db.session.scalars(
sa.select(WorkflowToolProvider)
.where(WorkflowToolProvider.tenant_id == tenant_id)
.order_by(WorkflowToolProvider.name)
).all()
],
selected_tools["workflow_tools"],
)
)
if selected_tools["mcp_tools"]:
labels.update(
_selected_tool_labels(
[
(tool.id, tool.name, tool.server_identifier)
for tool in db.session.scalars(
sa.select(MCPToolProvider)
.where(MCPToolProvider.tenant_id == tenant_id)
.order_by(MCPToolProvider.name)
).all()
],
selected_tools["mcp_tools"],
)
)
return labels
def _selected_tool_labels(options: list[tuple[str, str, str]], selected_values: list[str]) -> dict[str, str]:
selected = set(selected_values)
return {value: _format_tool_name_id(name, detail) for value, name, detail in options if value in selected}
def _prompt_tool_category(
label: str,
options: list[tuple[str, str, str]],
*,
auto_tools: dict[str, str | None],
) -> list[str]:
if not options:
click.echo(f"{label}: none")
return []
_print_wizard_step(label)
for index, (value, name, detail) in enumerate(options, 1):
marker = "[auto]" if _is_auto_tool(value, name, detail, auto_tools) else "[ ]"
click.echo(f"{index}. {marker} {name} ({detail})")
raw = click.prompt(
f"Select {label.lower()} by number, comma-separated numbers, all, or empty",
default="",
show_default=cast(Any, "empty"),
)
if not raw.strip():
return []
return parse_index_selection(raw, [value for value, _, _ in options])
def _is_auto_tool(value: str, name: str, detail: str, auto_tools: dict[str, str | None]) -> bool:
return name in auto_tools or value in auto_tools or value in auto_tools.values() or detail in auto_tools.values()
def _print_final_tool_selection(
auto_tools: WizardToolMap,
additional_tools: WizardToolSelection,
manual_labels: dict[str, str],
) -> None:
_print_wizard_step("Final Tool Selection")
_print_tool_selection_body(auto_tools, additional_tools, manual_labels)
def _print_tool_selection_body(
auto_tools: WizardToolMap,
additional_tools: WizardToolSelection,
manual_labels: dict[str, str],
) -> None:
click.echo("Final tools to export:")
_print_final_tool_category(
"Custom API tools",
auto_tools["api_tools"],
additional_tools["api_tools"],
manual_labels,
)
_print_final_tool_category(
"Workflow tools",
auto_tools["workflow_tools"],
additional_tools["workflow_tools"],
manual_labels,
)
_print_final_tool_category("MCP tools", auto_tools["mcp_tools"], additional_tools["mcp_tools"], manual_labels)
def _print_final_tool_category(
label: str,
auto_tools: dict[str, str | None],
manual_values: list[str],
manual_labels: dict[str, str],
) -> None:
click.echo(label)
lines = [f"- [auto] {_format_tool_name_id(name, identifier)}" for name, identifier in sorted(auto_tools.items())]
auto_identifiers = {identifier for identifier in auto_tools.values() if identifier}
lines.extend(
f"- [manual] {manual_labels.get(value, value)}"
for value in manual_values
if value not in auto_tools and value not in auto_identifiers
)
if not lines:
click.echo("- none")
return
for line in lines:
click.echo(line)
def _format_tool_name_id(name: str, identifier: str | None) -> str:
if identifier and identifier != name:
return f"{name}: {identifier}"
return name
def _confirm_wizard_summary(
*,
tenant_name: str,
app_names: list[str],
auto_tools: WizardToolMap,
additional_tools: WizardToolSelection,
manual_labels: dict[str, str],
include_referenced_tools: bool,
include_secrets: bool,
create_tokens: bool,
id_strategy: str,
conflict_strategy: str,
output_file: str,
) -> None:
_print_wizard_step("Summary")
click.echo("Migration export summary:")
click.echo(f"source tenant: {tenant_name}")
click.echo(f"selected apps: {len(app_names)}")
for app_name in app_names:
click.echo(f"- {app_name}")
click.echo(f"auto referenced tools: {str(include_referenced_tools).lower()}")
_print_tool_selection_body(auto_tools, additional_tools, manual_labels)
click.echo(f"include secrets: {str(include_secrets).lower()}")
click.echo(f"create app api token on import: {str(create_tokens).lower()}")
click.echo(f"id strategy: {id_strategy}")
click.echo(f"conflict strategy: {conflict_strategy}")
click.echo(f"output path: {output_file}")
if not click.confirm("Write migration package? [y/n, default: y]", default=True, show_default=False):
raise click.Abort()
def _prompt_output_file() -> tuple[str, bool]:
default_output = f"migration-data-{datetime.now().strftime('%Y%m%d-%H%M%S')}.json"
output_file = click.prompt("Output path", default=default_output, show_default=True)
if output_file.lower() in {"y", "yes", "n", "no"}:
raise click.ClickException("Output path must be a file path. Press Enter to use the default path.")
overwrite = False
if Path(output_file).exists():
overwrite = click.confirm(
"Output file exists. Overwrite? [y/n, default: n]",
default=False,
show_default=False,
)
if not overwrite:
raise click.ClickException(f"Output file already exists: {output_file}")
return output_file, overwrite
def _with_output_path(context: ReportContext | None, output_path: str) -> ReportContext:
if context is None:
return ReportContext(output_path=output_path)
return ReportContext(
output_path=output_path,
source_scope=context.source_scope,
selected_app_count=context.selected_app_count,
include_secrets=context.include_secrets,
target_tenant=context.target_tenant,
operator_email=context.operator_email,
app_api_tokens_created=context.app_api_tokens_created,
app_api_tokens_reused=context.app_api_tokens_reused,
id_mapping_count=context.id_mapping_count,
id_mappings=context.id_mappings,
)
def _render_report(report_items: list[ResourceReportItem], *, context: ReportContext | None = None) -> None:
for line in MigrationReportService().render(report_items, context=context):
click.echo(line)

View File

@ -30,7 +30,7 @@ def vdb_migrate(scope: str):
def migrate_annotation_vector_database():
"""
Migrate annotation datas to target vector database .
Migrate annotation data to target vector database.
"""
click.echo(click.style("Starting annotation data migration.", fg="green"))
create_count = 0
@ -140,7 +140,7 @@ def migrate_annotation_vector_database():
def migrate_knowledge_vector_database():
"""
Migrate vector database datas to target vector database .
Migrate vector database data to target vector database.
"""
click.echo(click.style("Starting vector database migration.", fg="green"))
create_count = 0

View File

@ -5,7 +5,7 @@ from uuid import UUID
from flask import request
from flask_restx import Resource, marshal
from pydantic import BaseModel, Field
from sqlalchemy import String, cast, func, or_, select
from sqlalchemy import String, case, cast, func, literal, or_, select
from sqlalchemy.dialects.postgresql import JSONB
from werkzeug.exceptions import Forbidden, NotFound
@ -169,12 +169,17 @@ class DatasetDocumentSegmentListApi(Resource):
# Use database-specific methods for JSON array search
if dify_config.SQLALCHEMY_DATABASE_URI_SCHEME == "postgresql":
# PostgreSQL: Use jsonb_array_elements_text to properly handle Unicode/Chinese text
# Guard with jsonb_typeof to avoid "cannot extract elements from a scalar" error
# when keywords is null or a non-array JSON value.
# Feed the set-returning function a JSON array in every row. Filtering in
# the subquery is not enough because PostgreSQL can still evaluate the
# SRF on scalar JSON before applying the predicate.
keywords_jsonb = cast(DocumentSegment.keywords, JSONB)
keywords_array = case(
(func.jsonb_typeof(keywords_jsonb) == "array", keywords_jsonb),
else_=cast(literal("[]"), JSONB),
)
keywords_condition = func.array_to_string(
func.array(
select(func.jsonb_array_elements_text(cast(DocumentSegment.keywords, JSONB)))
.where(func.jsonb_typeof(cast(DocumentSegment.keywords, JSONB)) == "array")
select(func.jsonb_array_elements_text(keywords_array))
.correlate(DocumentSegment)
.scalar_subquery()
),

View File

@ -863,7 +863,7 @@ class ToolManager:
return controller
@classmethod
def user_get_api_provider(cls, provider: str, tenant_id: str):
def user_get_api_provider(cls, provider: str, tenant_id: str, mask: bool = True):
"""
get api provider
"""
@ -902,8 +902,10 @@ class ToolManager:
tenant_id=tenant_id,
controller=controller,
)
masked_credentials = encrypter.mask_plugin_credentials(encrypter.decrypt(credentials))
if mask:
masked_credentials = encrypter.mask_plugin_credentials(encrypter.decrypt(credentials))
else:
masked_credentials = encrypter.decrypt(credentials)
try:
icon = emoji_icon_adapter.validate_json(provider_obj.icon)

View File

@ -6,7 +6,7 @@ from json.decoder import JSONDecodeError
from typing import Any, TypedDict
import httpx
from flask import request
from flask import has_request_context, request
from yaml import YAMLError, safe_load
from core.tools.entities.common_entities import I18nObject
@ -44,7 +44,7 @@ class ApiBasedToolSchemaParser:
raise ToolProviderNotFoundError("No server found in the openapi yaml.")
server_url = openapi["servers"][0]["url"]
request_env = request.headers.get("X-Request-Env")
request_env = request.headers.get("X-Request-Env") if has_request_context() else None
if request_env:
matched_servers = [server["url"] for server in openapi["servers"] if server["env"] == request_env]
server_url = matched_servers[0] if matched_servers else server_url

View File

@ -1,6 +1,7 @@
import logging
from core.tools.entities.tool_entities import ToolProviderType
from core.tools.errors import ToolProviderNotFoundError
from core.tools.tool_manager import ToolManager
from core.tools.utils.configuration import ToolParameterConfigurationManager
from core.workflow.human_input_adapter import adapt_node_config_for_graph
@ -38,6 +39,14 @@ def handle(sender, **kwargs):
identity_id=f"WORKFLOW.{app.id}.{node_data.get('id')}",
)
manager.delete_tool_parameters_cache()
except ToolProviderNotFoundError as exc:
logger.info(
"Skipped deleting tool parameters cache for workflow %s node %s "
"because tool provider is missing: %s",
app.id,
node_data.get("id"),
exc,
)
except Exception:
# tool dose not exist
logger.exception(

View File

@ -15,14 +15,18 @@ def init_app(app: DifyApp):
data_migrate,
delete_archived_workflow_runs,
export_app_messages,
export_migration_data,
export_migration_data_template,
extract_plugins,
extract_unique_plugins,
file_usage,
fix_app_site_missing,
import_migration_data,
install_plugins,
install_rag_pipeline_plugins,
migrate_data_for_plugin,
migrate_oss,
migration_data_wizard,
old_metadata_migration,
remove_orphaned_files_on_storage,
reset_email,
@ -70,6 +74,10 @@ def init_app(app: DifyApp):
clean_workflow_runs,
clean_expired_messages,
export_app_messages,
export_migration_data,
export_migration_data_template,
import_migration_data,
migration_data_wizard,
]
for cmd in cmds_to_register:
app.cli.add_command(cmd)

View File

@ -1,5 +1,6 @@
import posixpath
from collections.abc import Generator
from typing import override
import oss2 as aliyun_s3
@ -29,9 +30,11 @@ class AliyunOssStorage(BaseStorage):
cloudbox_id=dify_config.ALIYUN_CLOUDBOX_ID,
)
@override
def save(self, filename, data):
self.client.put_object(self.__wrapper_folder_filename(filename), data)
@override
def load_once(self, filename: str) -> bytes:
obj = self.client.get_object(self.__wrapper_folder_filename(filename))
data = obj.read()
@ -39,17 +42,21 @@ class AliyunOssStorage(BaseStorage):
return b""
return data
@override
def load_stream(self, filename: str) -> Generator:
obj = self.client.get_object(self.__wrapper_folder_filename(filename))
while chunk := obj.read(4096):
yield chunk
@override
def download(self, filename: str, target_filepath):
self.client.get_object_to_file(self.__wrapper_folder_filename(filename), target_filepath)
@override
def exists(self, filename: str):
return self.client.object_exists(self.__wrapper_folder_filename(filename))
@override
def delete(self, filename: str):
self.client.delete_object(self.__wrapper_folder_filename(filename))

View File

@ -1,5 +1,6 @@
import logging
from collections.abc import Generator
from typing import override
import boto3
from botocore.client import Config
@ -48,9 +49,11 @@ class AwsS3Storage(BaseStorage):
# other error, raise exception
raise
@override
def save(self, filename, data):
self.client.put_object(Bucket=self.bucket_name, Key=filename, Body=data)
@override
def load_once(self, filename: str) -> bytes:
try:
data: bytes = self.client.get_object(Bucket=self.bucket_name, Key=filename)["Body"].read()
@ -61,6 +64,7 @@ class AwsS3Storage(BaseStorage):
raise
return data
@override
def load_stream(self, filename: str) -> Generator:
try:
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
@ -73,9 +77,11 @@ class AwsS3Storage(BaseStorage):
else:
raise
@override
def download(self, filename, target_filepath):
self.client.download_file(self.bucket_name, filename, target_filepath)
@override
def exists(self, filename):
try:
self.client.head_object(Bucket=self.bucket_name, Key=filename)
@ -83,5 +89,6 @@ class AwsS3Storage(BaseStorage):
except:
return False
@override
def delete(self, filename: str):
self.client.delete_object(Bucket=self.bucket_name, Key=filename)

View File

@ -1,5 +1,6 @@
from collections.abc import Generator
from datetime import timedelta
from typing import override
from azure.identity import ChainedTokenCredential, DefaultAzureCredential
from azure.storage.blob import AccountSasPermissions, BlobServiceClient, ResourceTypes, generate_account_sas
@ -26,6 +27,7 @@ class AzureBlobStorage(BaseStorage):
else:
self.credential = None
@override
def save(self, filename, data):
if not self.bucket_name:
return
@ -34,6 +36,7 @@ class AzureBlobStorage(BaseStorage):
blob_container = client.get_container_client(container=self.bucket_name)
blob_container.upload_blob(filename, data)
@override
def load_once(self, filename: str) -> bytes:
if not self.bucket_name:
raise FileNotFoundError("Azure bucket name is not configured.")
@ -46,6 +49,7 @@ class AzureBlobStorage(BaseStorage):
raise TypeError(f"Expected bytes from blob.readall(), got {type(data).__name__}")
return data
@override
def load_stream(self, filename: str) -> Generator:
if not self.bucket_name:
raise FileNotFoundError("Azure bucket name is not configured.")
@ -55,6 +59,7 @@ class AzureBlobStorage(BaseStorage):
blob_data = blob.download_blob()
yield from blob_data.chunks()
@override
def download(self, filename, target_filepath):
if not self.bucket_name:
return
@ -66,6 +71,7 @@ class AzureBlobStorage(BaseStorage):
blob_data = blob.download_blob()
blob_data.readinto(my_blob)
@override
def exists(self, filename):
if not self.bucket_name:
return False
@ -75,6 +81,7 @@ class AzureBlobStorage(BaseStorage):
blob = client.get_blob_client(container=self.bucket_name, blob=filename)
return blob.exists()
@override
def delete(self, filename: str):
if not self.bucket_name:
return

View File

@ -1,6 +1,7 @@
import base64
import hashlib
from collections.abc import Generator
from typing import override
from baidubce.auth.bce_credentials import BceCredentials
from baidubce.bce_client_configuration import BceClientConfiguration
@ -26,6 +27,7 @@ class BaiduObsStorage(BaseStorage):
self.client = BosClient(config=client_config)
@override
def save(self, filename, data):
md5 = hashlib.md5()
md5.update(data)
@ -34,24 +36,29 @@ class BaiduObsStorage(BaseStorage):
bucket_name=self.bucket_name, key=filename, data=data, content_length=len(data), content_md5=content_md5
)
@override
def load_once(self, filename: str) -> bytes:
response = self.client.get_object(bucket_name=self.bucket_name, key=filename)
data: bytes = response.data.read()
return data
@override
def load_stream(self, filename: str) -> Generator:
response = self.client.get_object(bucket_name=self.bucket_name, key=filename).data
while chunk := response.read(4096):
yield chunk
@override
def download(self, filename, target_filepath):
self.client.get_object_to_file(bucket_name=self.bucket_name, key=filename, file_name=target_filepath)
@override
def exists(self, filename):
res = self.client.get_object_meta_data(bucket_name=self.bucket_name, key=filename)
if res is None:
return False
return True
@override
def delete(self, filename: str):
self.client.delete_object(bucket_name=self.bucket_name, key=filename)

View File

@ -10,7 +10,7 @@ import tempfile
from collections.abc import Generator
from io import BytesIO
from pathlib import Path
from typing import Any
from typing import Any, override
import clickzetta
from pydantic import BaseModel, model_validator
@ -251,6 +251,7 @@ class ClickZettaVolumeStorage(BaseStorage):
# Don't raise exception, let the operation continue
# The table might exist but not be visible due to permissions
@override
def save(self, filename: str, data: bytes):
"""Save data to ClickZetta Volume.
@ -304,6 +305,7 @@ class ClickZettaVolumeStorage(BaseStorage):
# Clean up temporary file
Path(temp_file_path).unlink(missing_ok=True)
@override
def load_once(self, filename: str) -> bytes:
"""Load file content from ClickZetta Volume.
@ -364,6 +366,7 @@ class ClickZettaVolumeStorage(BaseStorage):
logger.debug("File %s loaded from ClickZetta Volume", filename)
return content
@override
def load_stream(self, filename: str) -> Generator:
"""Load file as stream from ClickZetta Volume.
@ -382,6 +385,7 @@ class ClickZettaVolumeStorage(BaseStorage):
logger.debug("File %s loaded as stream from ClickZetta Volume", filename)
@override
def download(self, filename: str, target_filepath: str):
"""Download file from ClickZetta Volume to local path.
@ -395,6 +399,7 @@ class ClickZettaVolumeStorage(BaseStorage):
logger.debug("File %s downloaded from ClickZetta Volume to %s", filename, target_filepath)
@override
def exists(self, filename: str) -> bool:
"""Check if file exists in ClickZetta Volume.
@ -436,6 +441,7 @@ class ClickZettaVolumeStorage(BaseStorage):
logger.warning("Error checking file existence for %s: %s", filename, e)
return False
@override
def delete(self, filename: str):
"""Delete file from ClickZetta Volume.
@ -472,6 +478,7 @@ class ClickZettaVolumeStorage(BaseStorage):
logger.debug("File %s deleted from ClickZetta Volume", filename)
@override
def scan(self, path: str, files: bool = True, directories: bool = False) -> list[str]:
"""Scan files and directories in ClickZetta Volume.

View File

@ -1,7 +1,7 @@
import base64
import io
from collections.abc import Generator
from typing import Any
from typing import Any, override
from google.cloud import storage as google_cloud_storage # type: ignore
from pydantic import TypeAdapter
@ -29,12 +29,14 @@ class GoogleCloudStorage(BaseStorage):
else:
self.client = google_cloud_storage.Client()
@override
def save(self, filename, data):
bucket = self.client.get_bucket(self.bucket_name)
blob = bucket.blob(filename)
with io.BytesIO(data) as stream:
blob.upload_from_file(stream)
@override
def load_once(self, filename: str) -> bytes:
bucket = self.client.get_bucket(self.bucket_name)
blob = bucket.get_blob(filename)
@ -43,6 +45,7 @@ class GoogleCloudStorage(BaseStorage):
data: bytes = blob.download_as_bytes()
return data
@override
def load_stream(self, filename: str) -> Generator:
bucket = self.client.get_bucket(self.bucket_name)
blob = bucket.get_blob(filename)
@ -52,6 +55,7 @@ class GoogleCloudStorage(BaseStorage):
while chunk := blob_stream.read(4096):
yield chunk
@override
def download(self, filename, target_filepath):
bucket = self.client.get_bucket(self.bucket_name)
blob = bucket.get_blob(filename)
@ -59,11 +63,13 @@ class GoogleCloudStorage(BaseStorage):
raise FileNotFoundError("File not found")
blob.download_to_filename(target_filepath)
@override
def exists(self, filename):
bucket = self.client.get_bucket(self.bucket_name)
blob = bucket.blob(filename)
return blob.exists()
@override
def delete(self, filename: str):
bucket = self.client.get_bucket(self.bucket_name)
bucket.delete_blob(filename)

View File

@ -1,4 +1,5 @@
from collections.abc import Generator
from typing import override
from obs import ObsClient
@ -20,27 +21,33 @@ class HuaweiObsStorage(BaseStorage):
path_style=dify_config.HUAWEI_OBS_PATH_STYLE,
)
@override
def save(self, filename, data):
self.client.putObject(bucketName=self.bucket_name, objectKey=filename, content=data)
@override
def load_once(self, filename: str) -> bytes:
data: bytes = self.client.getObject(bucketName=self.bucket_name, objectKey=filename)["body"].response.read()
return data
@override
def load_stream(self, filename: str) -> Generator:
response = self.client.getObject(bucketName=self.bucket_name, objectKey=filename)["body"].response
while chunk := response.read(4096):
yield chunk
@override
def download(self, filename, target_filepath):
self.client.getObject(bucketName=self.bucket_name, objectKey=filename, downloadPath=target_filepath)
@override
def exists(self, filename):
res = self._get_meta(filename)
if res is None:
return False
return True
@override
def delete(self, filename: str):
self.client.deleteObject(bucketName=self.bucket_name, objectKey=filename)

View File

@ -2,7 +2,7 @@ import logging
import os
from collections.abc import Generator
from pathlib import Path
from typing import Any
from typing import Any, override
import opendal
from dotenv import dotenv_values
@ -41,10 +41,12 @@ class OpenDALStorage(BaseStorage):
logger.debug("opendal operator created with scheme %s", scheme)
logger.debug("added retry layer to opendal operator")
@override
def save(self, filename: str, data: bytes):
self.op.write(path=filename, bs=data)
logger.debug("file %s saved", filename)
@override
def load_once(self, filename: str) -> bytes:
if not self.exists(filename):
raise FileNotFoundError("File not found")
@ -53,6 +55,7 @@ class OpenDALStorage(BaseStorage):
logger.debug("file %s loaded", filename)
return content
@override
def load_stream(self, filename: str) -> Generator:
if not self.exists(filename):
raise FileNotFoundError("File not found")
@ -67,6 +70,7 @@ class OpenDALStorage(BaseStorage):
yield chunk
logger.debug("file %s loaded as stream", filename)
@override
def download(self, filename: str, target_filepath: str):
if not self.exists(filename):
raise FileNotFoundError("File not found")
@ -74,9 +78,11 @@ class OpenDALStorage(BaseStorage):
Path(target_filepath).write_bytes(self.op.read(path=filename))
logger.debug("file %s downloaded to %s", filename, target_filepath)
@override
def exists(self, filename: str) -> bool:
return self.op.exists(path=filename)
@override
def delete(self, filename: str):
if self.exists(filename):
self.op.delete(path=filename)
@ -84,6 +90,7 @@ class OpenDALStorage(BaseStorage):
return
logger.debug("file %s not found, skip delete", filename)
@override
def scan(self, path: str, files: bool = True, directories: bool = False) -> list[str]:
if not self.exists(path):
raise FileNotFoundError("Path not found")

View File

@ -1,4 +1,5 @@
from collections.abc import Generator
from typing import override
import boto3
from botocore.exceptions import ClientError
@ -22,9 +23,11 @@ class OracleOCIStorage(BaseStorage):
region_name=dify_config.OCI_REGION,
)
@override
def save(self, filename, data):
self.client.put_object(Bucket=self.bucket_name, Key=filename, Body=data)
@override
def load_once(self, filename: str) -> bytes:
try:
data: bytes = self.client.get_object(Bucket=self.bucket_name, Key=filename)["Body"].read()
@ -35,6 +38,7 @@ class OracleOCIStorage(BaseStorage):
raise
return data
@override
def load_stream(self, filename: str) -> Generator:
try:
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
@ -45,9 +49,11 @@ class OracleOCIStorage(BaseStorage):
else:
raise
@override
def download(self, filename, target_filepath):
self.client.download_file(self.bucket_name, filename, target_filepath)
@override
def exists(self, filename):
try:
self.client.head_object(Bucket=self.bucket_name, Key=filename)
@ -55,5 +61,6 @@ class OracleOCIStorage(BaseStorage):
except:
return False
@override
def delete(self, filename: str):
self.client.delete_object(Bucket=self.bucket_name, Key=filename)

View File

@ -1,6 +1,7 @@
import io
from collections.abc import Generator
from pathlib import Path
from typing import override
from supabase import Client
@ -28,29 +29,35 @@ class SupabaseStorage(BaseStorage):
if not self.bucket_exists():
self.client.storage.create_bucket(id=id, name=bucket_name)
@override
def save(self, filename, data):
self.client.storage.from_(self.bucket_name).upload(filename, data)
@override
def load_once(self, filename: str) -> bytes:
content: bytes = self.client.storage.from_(self.bucket_name).download(filename)
return content
@override
def load_stream(self, filename: str) -> Generator:
result = self.client.storage.from_(self.bucket_name).download(filename)
byte_stream = io.BytesIO(result)
while chunk := byte_stream.read(4096): # Read in chunks of 4KB
yield chunk
@override
def download(self, filename, target_filepath):
result = self.client.storage.from_(self.bucket_name).download(filename)
Path(target_filepath).write_bytes(result)
@override
def exists(self, filename):
result = self.client.storage.from_(self.bucket_name).list(path=filename)
if len(result) > 0:
return True
return False
@override
def delete(self, filename: str):
self.client.storage.from_(self.bucket_name).remove([filename])

View File

@ -1,4 +1,5 @@
from collections.abc import Generator
from typing import override
from qcloud_cos import CosConfig, CosS3Client
@ -29,23 +30,29 @@ class TencentCosStorage(BaseStorage):
)
self.client = CosS3Client(config)
@override
def save(self, filename, data):
self.client.put_object(Bucket=self.bucket_name, Body=data, Key=filename)
@override
def load_once(self, filename: str) -> bytes:
data: bytes = self.client.get_object(Bucket=self.bucket_name, Key=filename)["Body"].get_raw_stream().read()
return data
@override
def load_stream(self, filename: str) -> Generator:
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
yield from response["Body"].get_stream(chunk_size=4096)
@override
def download(self, filename, target_filepath):
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
response["Body"].get_stream_to_file(target_filepath)
@override
def exists(self, filename):
return self.client.object_exists(Bucket=self.bucket_name, Key=filename)
@override
def delete(self, filename: str):
self.client.delete_object(Bucket=self.bucket_name, Key=filename)

View File

@ -1,4 +1,5 @@
from collections.abc import Generator
from typing import override
import tos
@ -27,11 +28,13 @@ class VolcengineTosStorage(BaseStorage):
region=dify_config.VOLCENGINE_TOS_REGION,
)
@override
def save(self, filename, data):
if not self.bucket_name:
raise ValueError("VOLCENGINE_TOS_BUCKET_NAME is not set")
self.client.put_object(bucket=self.bucket_name, key=filename, content=data)
@override
def load_once(self, filename: str) -> bytes:
if not self.bucket_name:
raise FileNotFoundError("VOLCENGINE_TOS_BUCKET_NAME is not set")
@ -40,6 +43,7 @@ class VolcengineTosStorage(BaseStorage):
raise TypeError(f"Expected bytes, got {type(data).__name__}")
return data
@override
def load_stream(self, filename: str) -> Generator:
if not self.bucket_name:
raise FileNotFoundError("VOLCENGINE_TOS_BUCKET_NAME is not set")
@ -47,11 +51,13 @@ class VolcengineTosStorage(BaseStorage):
while chunk := response.read(4096):
yield chunk
@override
def download(self, filename, target_filepath):
if not self.bucket_name:
raise ValueError("VOLCENGINE_TOS_BUCKET_NAME is not set")
self.client.get_object_to_file(bucket=self.bucket_name, key=filename, file_path=target_filepath)
@override
def exists(self, filename):
if not self.bucket_name:
return False
@ -60,6 +66,7 @@ class VolcengineTosStorage(BaseStorage):
return False
return True
@override
def delete(self, filename: str):
if not self.bucket_name:
return

View File

@ -97,6 +97,7 @@ class AppDslService:
icon: str | None = None,
icon_background: str | None = None,
app_id: str | None = None,
import_app_id: str | None = None,
) -> Import:
"""Import an app from YAML content or URL."""
import_id = str(uuid.uuid4())
@ -262,6 +263,7 @@ class AppDslService:
icon=icon,
icon_background=icon_background,
dependencies=check_dependencies_pending_data,
import_app_id=import_app_id,
)
draft_var_srv = WorkflowDraftVariableService(session=self._session)
@ -385,6 +387,7 @@ class AppDslService:
icon: str | None = None,
icon_background: str | None = None,
dependencies: list[PluginDependency] | None = None,
import_app_id: str | None = None,
) -> App:
"""Create a new app or update an existing one."""
app_data = data.get("app", {})
@ -417,7 +420,7 @@ class AppDslService:
# Create new app
app = App()
app.id = str(uuid4())
app.id = import_app_id or str(uuid4())
app.tenant_id = account.current_tenant_id
app.mode = app_mode
app.name = name or app_data.get("name", "")

View File

@ -0,0 +1,17 @@
from services.data_migration.entities import (
ConflictStrategy,
ExportSelection,
IdStrategy,
ImportOptions,
MigrationDataError,
MigrationPackage,
)
__all__ = [
"ConflictStrategy",
"ExportSelection",
"IdStrategy",
"ImportOptions",
"MigrationDataError",
"MigrationPackage",
]

View File

@ -0,0 +1,92 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from services.data_migration.entities import DependencyKind
@dataclass(frozen=True)
class DiscoveredDependency:
kind: DependencyKind
provider_id: str
provider_name: str | None = None
source: str | None = None
class DependencyDiscoveryService:
def discover_from_dsl(self, dsl: dict[str, Any]) -> list[DiscoveredDependency]:
seen: set[tuple[DependencyKind, str]] = set()
result: list[DiscoveredDependency] = []
for node in self._nodes_from_dsl(dsl):
data = node.get("data", {}) if isinstance(node, dict) else {}
for dependency in self._dependencies_from_node(data):
key = (dependency.kind, dependency.provider_id)
if dependency.provider_id and key not in seen:
seen.add(key)
result.append(dependency)
return result
def _nodes_from_dsl(self, dsl: dict[str, Any]) -> list[dict[str, Any]]:
nodes: list[dict[str, Any]] = []
graph = dsl.get("graph") if isinstance(dsl, dict) else None
if isinstance(graph, dict) and isinstance(graph.get("nodes"), list):
nodes.extend(node for node in graph["nodes"] if isinstance(node, dict))
workflow = dsl.get("workflow") if isinstance(dsl, dict) else None
workflow_graph = workflow.get("graph") if isinstance(workflow, dict) else None
if isinstance(workflow_graph, dict) and isinstance(workflow_graph.get("nodes"), list):
nodes.extend(node for node in workflow_graph["nodes"] if isinstance(node, dict))
return nodes
def _dependencies_from_node(self, data: dict[str, Any]) -> list[DiscoveredDependency]:
dependencies: list[DiscoveredDependency] = []
node_type = data.get("type")
if node_type == "tool":
dependency = self._from_tool_config(data, source="tool_node")
if dependency:
dependencies.append(dependency)
if node_type == "agent":
for tool_config in self._agent_tool_configs(data):
if isinstance(tool_config, dict):
dependency = self._from_tool_config(tool_config, source="agent_node")
if dependency:
dependencies.append(dependency)
return dependencies
def _agent_tool_configs(self, data: dict[str, Any]) -> list[dict[str, Any]]:
configs = data.get("tools")
if isinstance(configs, list):
return [config for config in configs if isinstance(config, dict)]
agent_parameters = data.get("agent_parameters")
if not isinstance(agent_parameters, dict):
return []
tools_parameter = agent_parameters.get("tools")
if not isinstance(tools_parameter, dict):
return []
value = tools_parameter.get("value", [])
if not isinstance(value, list):
return []
return [config for config in value if isinstance(config, dict)]
def _from_tool_config(self, config: dict[str, Any], *, source: str) -> DiscoveredDependency | None:
provider_id = config.get("provider_id") or config.get("provider_name") or config.get("provider")
if not provider_id:
return None
provider_type = str(config.get("provider_type") or config.get("type") or "")
kind = self._kind_from_provider_type(provider_type)
return DiscoveredDependency(
kind=kind,
provider_id=str(provider_id),
provider_name=config.get("provider_name"),
source=source,
)
def _kind_from_provider_type(self, provider_type: str) -> DependencyKind:
normalized = provider_type.lower()
if normalized in {"api", "custom", "api_tool"}:
return DependencyKind.API_TOOL
if normalized in {"workflow", "workflow_tool"}:
return DependencyKind.WORKFLOW_TOOL
if normalized == "mcp":
return DependencyKind.MCP_TOOL
return DependencyKind.BUILTIN_OR_PLUGIN_TOOL

View File

@ -0,0 +1,241 @@
"""Typed entities for versioned cross-environment migration packages.
This module is intentionally side-effect free. It owns only value objects and
validation for migration package/config shapes; command output and database I/O
belong in adapter and service modules built on top of these entities.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import StrEnum
from typing import Any, Literal, TypedDict
class MigrationDataError(ValueError):
"""Raised when migration config or package data is invalid."""
class IdStrategy(StrEnum):
PRESERVE_ID = "preserve-id"
GENERATE_NEW_ID = "generate-new-id"
class ConflictStrategy(StrEnum):
FAIL = "fail"
SKIP = "skip"
UPDATE = "update"
class ResourceType(StrEnum):
WORKFLOW = "workflow"
API_TOOL = "api_tool"
WORKFLOW_TOOL = "workflow_tool"
MCP_TOOL = "mcp_tool"
DEPENDENCY = "dependency"
class DependencyKind(StrEnum):
API_TOOL = "api_tool"
WORKFLOW_TOOL = "workflow_tool"
MCP_TOOL = "mcp_tool"
BUILTIN_OR_PLUGIN_TOOL = "builtin_or_plugin_tool"
UNRESOLVED = "unresolved"
class TargetTenantSelector(TypedDict, total=False):
id: str
name: str
def _parse_target_tenant(value: Any) -> TargetTenantSelector | None:
if value is None:
return None
if not isinstance(value, dict):
raise MigrationDataError("metadata.target_tenant must be an object when provided.")
target: TargetTenantSelector = {}
target_id = value.get("id")
if target_id is not None:
if not isinstance(target_id, str):
raise MigrationDataError("metadata.target_tenant.id must be a string.")
target["id"] = target_id
target_name = value.get("name")
if target_name is not None:
if not isinstance(target_name, str):
raise MigrationDataError("metadata.target_tenant.name must be a string.")
target["name"] = target_name
unsupported_keys = sorted(set(value.keys()) - {"id", "name"})
if unsupported_keys:
raise MigrationDataError(f"metadata.target_tenant contains unsupported fields: {unsupported_keys}")
return target
def _parse_package_section(value: Any, section: str) -> list[dict[str, Any]]:
if value is None:
return []
if not isinstance(value, list):
raise MigrationDataError(f"Migration package field '{section}' must be a list.")
for item in value:
if not isinstance(item, dict):
raise MigrationDataError(f"Migration package field '{section}' must contain only objects.")
return value
@dataclass(frozen=True)
class SourceTenant:
id: str
name: str
@classmethod
def from_mapping(cls, value: dict[str, Any]) -> SourceTenant:
return cls(id=str(value.get("id", "")), name=str(value.get("name", "")))
@dataclass(frozen=True)
class ImportOptions:
create_app_api_token_on_import: bool = False
id_strategy: IdStrategy = IdStrategy.PRESERVE_ID
conflict_strategy: ConflictStrategy = ConflictStrategy.FAIL
@classmethod
def from_mapping(cls, value: dict[str, Any] | None) -> ImportOptions:
value = value or {}
try:
id_strategy = IdStrategy(value.get("id_strategy", IdStrategy.PRESERVE_ID))
except ValueError as exc:
raise MigrationDataError(f"Unsupported import_options.id_strategy: {value.get('id_strategy')}") from exc
try:
conflict_strategy = ConflictStrategy(value.get("conflict_strategy", ConflictStrategy.FAIL))
except ValueError as exc:
raise MigrationDataError(
f"Unsupported import_options.conflict_strategy: {value.get('conflict_strategy')}"
) from exc
return cls(
create_app_api_token_on_import=bool(value.get("create_app_api_token_on_import", False)),
id_strategy=id_strategy,
conflict_strategy=conflict_strategy,
)
@dataclass(frozen=True)
class MigrationMetadata:
version: str
source_scope: Literal["single"]
source_tenants: list[SourceTenant]
target_tenant: TargetTenantSelector | None = None
created_at: str | None = None
include_secrets: bool = False
import_options: ImportOptions = field(default_factory=ImportOptions)
@classmethod
def from_mapping(cls, value: dict[str, Any]) -> MigrationMetadata:
version = value.get("version")
if not version:
raise MigrationDataError("Migration package must include metadata.version.")
source_scope = value.get("source_scope", "single")
if source_scope != "single":
raise MigrationDataError(f"Unsupported source_scope: {source_scope}")
source_tenants = [
SourceTenant.from_mapping(item) for item in value.get("source_tenants", []) if isinstance(item, dict)
]
return cls(
version=str(version),
source_scope="single",
source_tenants=source_tenants,
target_tenant=_parse_target_tenant(value.get("target_tenant")),
created_at=value.get("created_at"),
include_secrets=bool(value.get("include_secrets", False)),
import_options=ImportOptions.from_mapping(value.get("import_options")),
)
@dataclass(frozen=True)
class MigrationPackage:
metadata: MigrationMetadata
workflows: list[dict[str, Any]] = field(default_factory=list)
tools: list[dict[str, Any]] = field(default_factory=list)
workflow_tools: list[dict[str, Any]] = field(default_factory=list)
mcp_tools: list[dict[str, Any]] = field(default_factory=list)
dependencies: list[dict[str, Any]] = field(default_factory=list)
@classmethod
def from_mapping(cls, value: dict[str, Any]) -> MigrationPackage:
metadata_value = value.get("metadata")
if not isinstance(metadata_value, dict):
raise MigrationDataError("Migration package must include metadata.version.")
return cls(
metadata=MigrationMetadata.from_mapping(metadata_value),
workflows=_parse_package_section(value.get("workflows"), "workflows"),
tools=_parse_package_section(value.get("tools"), "tools"),
workflow_tools=_parse_package_section(value.get("workflow_tools"), "workflow_tools"),
mcp_tools=_parse_package_section(value.get("mcp_tools"), "mcp_tools"),
dependencies=_parse_package_section(value.get("dependencies"), "dependencies"),
)
@dataclass(frozen=True)
class ExportSelection:
source_tenant_name: str
app_ids: list[str]
source_tenant_id: str | None = None
export_all_apps: bool = False
include_referenced_tools: bool = True
additional_api_tools: list[str] = field(default_factory=list)
additional_workflow_tools: list[str] = field(default_factory=list)
additional_mcp_tools: list[str] = field(default_factory=list)
include_secrets: bool = False
import_options: ImportOptions = field(default_factory=ImportOptions)
@dataclass(frozen=True)
class ResourceReportItem:
resource_type: ResourceType
identifier: str
name: str | None
status: str
message: str | None = None
@dataclass(frozen=True)
class ResourceIdMapping:
resource_type: ResourceType
name: str | None
source_id: str
target_id: str
@dataclass(frozen=True)
class ExportResult:
package: MigrationPackage
report_items: list[ResourceReportItem]
report_context: ReportContext | None = None
@dataclass(frozen=True)
class ImportTarget:
tenant_id: str
tenant_name: str
operator_id: str
operator_email: str | None
@dataclass(frozen=True)
class ImportResult:
report_items: list[ResourceReportItem]
id_mapping: dict[str, str] = field(default_factory=dict)
report_context: ReportContext | None = None
@dataclass(frozen=True)
class ReportContext:
output_path: str | None = None
source_scope: str | None = None
selected_app_count: int | None = None
include_secrets: bool | None = None
target_tenant: str | None = None
operator_email: str | None = None
app_api_tokens_created: int = 0
app_api_tokens_reused: int = 0
id_mapping_count: int = 0
id_mappings: dict[str, str] = field(default_factory=dict)
id_mapping_details: list[ResourceIdMapping] = field(default_factory=list)

View File

@ -0,0 +1,492 @@
from __future__ import annotations
from collections.abc import Iterable
from typing import Any
from uuid import UUID
import sqlalchemy as sa
import yaml
from core.tools.tool_manager import ToolManager
from extensions.ext_database import db
from graphon.model_runtime.utils.encoders import jsonable_encoder
from models import Account, Tenant
from models.account import TenantAccountJoin
from models.model import App
from models.tools import MCPToolProvider
from services.app_dsl_service import AppDslService
from services.data_migration.dependency_discovery_service import DependencyDiscoveryService, DiscoveredDependency
from services.data_migration.entities import (
DependencyKind,
ExportResult,
ExportSelection,
ImportOptions,
MigrationDataError,
ReportContext,
ResourceReportItem,
ResourceType,
)
from services.data_migration.package_service import MigrationPackageService
from services.tools.workflow_tools_manage_service import WorkflowToolManageService
SUPPORTED_APP_MODES = {"workflow", "advanced-chat"}
class ExportConfigParser:
def parse(self, data: dict[str, Any]) -> ExportSelection:
if not isinstance(data, dict):
raise MigrationDataError("Export config JSON must be an object.")
source_tenant = self._source_tenant(data)
source_tenant_name = self._source_tenant_name(source_tenant, data)
apps = self._mapping(data.get("apps"), field_name="apps")
self._validate_source_scope(data)
self._validate_app_modes(apps.get("modes", []))
additional_tools = self._mapping(data.get("additional_tools"), field_name="additional_tools")
return ExportSelection(
source_tenant_name=source_tenant_name,
app_ids=self._string_list(apps.get("ids", data.get("workflows", [])), field_name="apps.ids"),
source_tenant_id=source_tenant.get("id"),
export_all_apps=bool(apps.get("all", data.get("export_all_workflows", False))),
include_referenced_tools=bool(data.get("include_referenced_tools", True)),
additional_api_tools=self._string_list(
additional_tools.get("api_tools", data.get("tools", [])), field_name="additional_tools.api_tools"
),
additional_workflow_tools=self._string_list(
additional_tools.get("workflow_tools", data.get("workflow_tools", [])),
field_name="additional_tools.workflow_tools",
),
additional_mcp_tools=self._string_list(
additional_tools.get("mcp_tools", data.get("mcp_tools", [])),
field_name="additional_tools.mcp_tools",
),
include_secrets=bool(data.get("include_secrets", False)),
import_options=ImportOptions.from_mapping(data.get("import_options")),
)
def _source_tenant(self, data: dict[str, Any]) -> dict[str, Any]:
if "source_tenant" in data:
return self._mapping(data.get("source_tenant"), field_name="source_tenant")
return {}
def _source_tenant_name(self, source_tenant: dict[str, Any], data: dict[str, Any]) -> str:
if source_tenant:
source_tenant_name = source_tenant.get("name")
if not source_tenant_name:
raise MigrationDataError("Export config must include source_tenant.name.")
return str(source_tenant_name)
source_tenant_name = data.get("tenant_name")
if not source_tenant_name:
raise MigrationDataError("Export config must include source_tenant.name.")
return str(source_tenant_name)
def _validate_source_scope(self, data: dict[str, Any]) -> None:
source_tenant = data.get("source_tenant")
if not isinstance(source_tenant, dict):
return
mode = source_tenant.get("mode", "single")
if mode != "single":
raise MigrationDataError(f"Unsupported source_tenant.mode: {mode}")
def _validate_app_modes(self, modes: Any) -> None:
app_modes = self._string_list(modes, field_name="apps.modes") if modes else []
unsupported_modes = sorted(set(app_modes) - SUPPORTED_APP_MODES)
if unsupported_modes:
raise MigrationDataError(f"Unsupported app modes for export: {unsupported_modes}")
def _mapping(self, value: Any, *, field_name: str) -> dict[str, Any]:
if value is None:
return {}
if not isinstance(value, dict):
raise MigrationDataError(f"Export config field '{field_name}' must be an object.")
return value
def _string_list(self, value: Any, *, field_name: str) -> list[str]:
if value is None:
return []
if not isinstance(value, list):
raise MigrationDataError(f"Export config field '{field_name}' must be a list.")
return [str(item) for item in value]
class MigrationExportService:
def __init__(
self,
*,
package_service: MigrationPackageService | None = None,
dependency_discovery_service: DependencyDiscoveryService | None = None,
) -> None:
self.package_service = package_service or MigrationPackageService()
self.dependency_discovery_service = dependency_discovery_service or DependencyDiscoveryService()
def export(self, selection: ExportSelection) -> ExportResult:
tenant = self._get_tenant(selection)
package = self.package_service.build_empty_package(
source_tenant_id=tenant.id,
source_tenant_name=tenant.name,
include_secrets=selection.include_secrets,
import_options=selection.import_options,
)
report_items: list[ResourceReportItem] = []
discovered_dependencies: list[DiscoveredDependency] = []
apps = self._selected_apps(tenant.id, selection)
exported_app_ids = {app.id for app in apps}
for app in apps:
dsl_content = AppDslService.export_dsl(app_model=app, include_secret=selection.include_secrets)
package.workflows.append(
{
"id": app.id,
"name": app.name,
"mode": app.mode.value if hasattr(app.mode, "value") else app.mode,
"dsl": dsl_content,
"source_tenant_id": tenant.id,
"create_app_api_token_on_import": selection.import_options.create_app_api_token_on_import,
}
)
report_items.append(ResourceReportItem(ResourceType.WORKFLOW, app.id, app.name, "exported"))
if selection.include_referenced_tools:
discovered_dependencies.extend(self._discover_dependencies(dsl_content))
self._export_api_tools(
tenant.id,
self._provider_ids(selection.additional_api_tools, discovered_dependencies, DependencyKind.API_TOOL),
include_secrets=selection.include_secrets,
exported_tools=package.tools,
report_items=report_items,
)
self._export_workflow_tools(
tenant,
self._provider_ids(
selection.additional_workflow_tools, discovered_dependencies, DependencyKind.WORKFLOW_TOOL
),
exported_app_ids=exported_app_ids,
exported_workflow_tools=package.workflow_tools,
dependencies=package.dependencies,
report_items=report_items,
)
self._export_mcp_tools(
tenant_id=tenant.id,
provider_ids=self._provider_ids(
selection.additional_mcp_tools,
discovered_dependencies,
DependencyKind.MCP_TOOL,
),
include_secrets=selection.include_secrets,
exported_mcp_tools=package.mcp_tools,
dependencies=package.dependencies,
report_items=report_items,
)
self._record_dependency_metadata(
self._dependencies_by_kind(discovered_dependencies, DependencyKind.BUILTIN_OR_PLUGIN_TOOL),
package.dependencies,
report_items,
)
return ExportResult(
package=package,
report_items=report_items,
report_context=ReportContext(
source_scope=package.metadata.source_scope,
selected_app_count=len(apps),
include_secrets=selection.include_secrets,
),
)
def _get_tenant(self, selection: ExportSelection) -> Tenant:
if selection.source_tenant_id:
tenant = db.session.get(Tenant, selection.source_tenant_id)
if tenant is None:
raise MigrationDataError(f"Source tenant not found: {selection.source_tenant_id}")
if tenant.name != selection.source_tenant_name:
raise MigrationDataError(
f"Source tenant id/name mismatch: {selection.source_tenant_id} / {selection.source_tenant_name}"
)
return tenant
tenants = list(db.session.scalars(sa.select(Tenant).where(Tenant.name == selection.source_tenant_name)).all())
if not tenants:
raise MigrationDataError(f"Source tenant not found: {selection.source_tenant_name}")
if len(tenants) > 1:
raise MigrationDataError(
f"Source tenant name is ambiguous; use source_tenant.id: {selection.source_tenant_name}"
)
return tenants[0]
def _selected_apps(self, tenant_id: str, selection: ExportSelection) -> list[App]:
query = sa.select(App).where(App.tenant_id == tenant_id, App.mode.in_(SUPPORTED_APP_MODES))
if not selection.export_all_apps:
if not selection.app_ids:
return []
query = query.where(App.id.in_(selection.app_ids))
apps = list(db.session.scalars(query).all())
if not selection.export_all_apps and len(apps) != len(set(selection.app_ids)):
found_ids = {app.id for app in apps}
missing_ids = [app_id for app_id in selection.app_ids if app_id not in found_ids]
raise MigrationDataError(
f"Selected app IDs not found in source tenant or unsupported app mode: {missing_ids}"
)
return apps
def _discover_dependencies(self, dsl_content: str | dict[str, Any]) -> list[DiscoveredDependency]:
if isinstance(dsl_content, dict):
dsl = dsl_content
else:
raw_dsl = yaml.safe_load(dsl_content) if dsl_content else {}
dsl = raw_dsl if isinstance(raw_dsl, dict) else {}
return self.dependency_discovery_service.discover_from_dsl(dsl)
def _export_api_tools(
self,
tenant_id: str,
provider_ids: Iterable[str],
*,
include_secrets: bool,
exported_tools: list[dict[str, Any]],
report_items: list[ResourceReportItem],
) -> None:
for provider_id in self._dedupe(provider_ids):
try:
tool_data = ToolManager.user_get_api_provider(
provider=provider_id,
tenant_id=tenant_id,
mask=not include_secrets,
)
if not include_secrets:
tool_data.pop("credentials", None)
tool_data.pop("tools", None)
tool_data["provider_name"] = provider_id
tool_data["source_tenant_id"] = tenant_id
exported_tools.append(tool_data)
report_items.append(ResourceReportItem(ResourceType.API_TOOL, provider_id, provider_id, "exported"))
except Exception as exc:
report_items.append(
ResourceReportItem(ResourceType.API_TOOL, provider_id, provider_id, "unresolved", str(exc))
)
def _export_workflow_tools(
self,
tenant: Tenant,
provider_ids: Iterable[str],
*,
exported_app_ids: set[str],
exported_workflow_tools: list[dict[str, Any]],
dependencies: list[dict[str, Any]],
report_items: list[ResourceReportItem],
) -> None:
provider_ids = self._dedupe(provider_ids)
if not provider_ids:
return
owner = self._get_tenant_owner(tenant.id)
if owner is None:
for provider_id in provider_ids:
report_items.append(
ResourceReportItem(
ResourceType.WORKFLOW_TOOL,
provider_id,
provider_id,
"unresolved",
f"No owner account found for source tenant: {tenant.name}",
)
)
return
for provider_id in provider_ids:
try:
tool_data = WorkflowToolManageService.get_workflow_tool_by_tool_id(
user_id=owner.id,
tenant_id=tenant.id,
workflow_tool_id=provider_id,
)
tool_info = jsonable_encoder(tool_data)
tool_info["id"] = provider_id
tool_info["app_id"] = tool_info.get("workflow_app_id")
tool_info["source_tenant_id"] = tenant.id
for field_name in ("workflow_tool_id", "workflow_app_id", "tool"):
tool_info.pop(field_name, None)
exported_workflow_tools.append(tool_info)
if tool_info.get("app_id") not in exported_app_ids:
workflow_app_id = str(tool_info.get("app_id") or "")
workflow_app = db.session.get(App, workflow_app_id) if workflow_app_id else None
self._record_dependency_metadata(
[
DiscoveredDependency(
DependencyKind.WORKFLOW_TOOL,
workflow_app_id,
provider_name=workflow_app.name if workflow_app else tool_info.get("name"),
source="workflow_tool_app",
)
],
dependencies,
report_items,
)
report_items.append(
ResourceReportItem(ResourceType.WORKFLOW_TOOL, provider_id, tool_info.get("name"), "exported")
)
except Exception as exc:
report_items.append(
ResourceReportItem(ResourceType.WORKFLOW_TOOL, provider_id, provider_id, "unresolved", str(exc))
)
def _get_tenant_owner(self, tenant_id: str) -> Account | None:
return db.session.scalar(
sa.select(Account)
.join(TenantAccountJoin, Account.id == TenantAccountJoin.account_id)
.where(TenantAccountJoin.tenant_id == tenant_id, TenantAccountJoin.role == "owner")
.order_by(TenantAccountJoin.created_at.asc())
.limit(1)
)
def _export_mcp_tools(
self,
*,
tenant_id: str,
provider_ids: Iterable[str],
include_secrets: bool,
exported_mcp_tools: list[dict[str, Any]],
dependencies: list[dict[str, Any]],
report_items: list[ResourceReportItem],
) -> None:
for provider_id in self._dedupe(provider_ids):
if not include_secrets:
self._record_dependency_metadata(
[DiscoveredDependency(DependencyKind.MCP_TOOL, provider_id, source="mcp_provider")],
dependencies,
report_items,
)
continue
try:
provider = self._get_mcp_provider(tenant_id, provider_id)
exported_mcp_tools.append(self._serialize_mcp_provider(provider))
report_items.append(ResourceReportItem(ResourceType.MCP_TOOL, provider_id, provider.name, "exported"))
except Exception as exc:
report_items.append(
ResourceReportItem(ResourceType.MCP_TOOL, provider_id, provider_id, "unresolved", str(exc))
)
def _get_mcp_provider(self, tenant_id: str, provider_id: str) -> MCPToolProvider:
predicates = [MCPToolProvider.server_identifier == provider_id]
if self._is_uuid_string(provider_id):
predicates.append(MCPToolProvider.id == provider_id)
provider = db.session.scalar(
sa.select(MCPToolProvider).where(MCPToolProvider.tenant_id == tenant_id, sa.or_(*predicates))
)
if provider is None:
raise MigrationDataError(f"MCP provider not found: {provider_id}")
return provider
def _is_uuid_string(self, value: str) -> bool:
try:
UUID(value)
except ValueError:
return False
return True
def _serialize_mcp_provider(self, provider: MCPToolProvider) -> dict[str, Any]:
provider_entity = provider.to_entity()
provider_icon = provider_entity.provider_icon
if isinstance(provider_icon, dict):
icon = provider_icon.get("content")
icon_background = provider_icon.get("background")
icon_type = "emoji"
else:
icon = provider_icon
icon_background = None
icon_type = "url"
return {
"id": provider.id,
"name": provider.name,
"server_url": provider_entity.decrypt_server_url(),
"server_identifier": provider.server_identifier,
"icon": icon,
"icon_background": icon_background,
"icon_type": icon_type,
"configuration": {"timeout": provider.timeout, "sse_read_timeout": provider.sse_read_timeout},
"headers": provider_entity.decrypt_headers(),
"authentication": self._serialize_mcp_authentication(provider_entity.decrypt_authentication()),
"tools": provider.tool_dict,
"source_tenant_id": provider.tenant_id,
}
def _serialize_mcp_authentication(self, authentication: dict[str, Any] | None) -> dict[str, Any] | None:
if not authentication or not authentication.get("client_id"):
return None
return {
"client_id": authentication["client_id"],
"client_secret": authentication.get("client_secret"),
}
def _record_dependency_metadata(
self,
dependencies_to_record: Iterable[DiscoveredDependency],
dependencies: list[dict[str, Any]],
report_items: list[ResourceReportItem],
) -> None:
existing = {(item.get("kind"), item.get("provider_id")) for item in dependencies}
for dependency in dependencies_to_record:
key = (dependency.kind.value, dependency.provider_id)
if key in existing:
continue
existing.add(key)
dependencies.append(
{
"kind": dependency.kind.value,
"provider_id": dependency.provider_id,
"provider_name": dependency.provider_name,
"source": dependency.source,
}
)
report_items.append(
ResourceReportItem(
ResourceType.DEPENDENCY,
dependency.provider_id,
self._dependency_report_name(dependency),
"dependency-only",
self._dependency_message(dependency.kind),
)
)
def _provider_ids(
self,
manual_provider_ids: Iterable[str],
discovered_dependencies: Iterable[DiscoveredDependency],
kind: DependencyKind,
) -> list[str]:
provider_ids = list(manual_provider_ids)
provider_ids.extend(
self._provider_export_identifier(dependency)
for dependency in discovered_dependencies
if dependency.kind == kind
)
return self._dedupe(provider_ids)
def _provider_export_identifier(self, dependency: DiscoveredDependency) -> str:
if dependency.kind == DependencyKind.API_TOOL and dependency.provider_name:
return dependency.provider_name
return dependency.provider_id
def _dependencies_by_kind(
self, discovered_dependencies: Iterable[DiscoveredDependency], kind: DependencyKind
) -> list[DiscoveredDependency]:
return [dependency for dependency in discovered_dependencies if dependency.kind == kind]
def _dedupe(self, values: Iterable[str]) -> list[str]:
seen: set[str] = set()
result: list[str] = []
for value in values:
if value and value not in seen:
seen.add(value)
result.append(value)
return result
def _dependency_message(self, kind: DependencyKind) -> str:
if kind == DependencyKind.MCP_TOOL:
return "Configure MCP provider manually in the target tenant unless exporting with secrets enabled."
if kind == DependencyKind.BUILTIN_OR_PLUGIN_TOOL:
return "Ensure the built-in or plugin tool exists in the target environment."
return "Dependency metadata only; ensure the resource exists in the target environment."
def _dependency_report_name(self, dependency: DiscoveredDependency) -> str:
name = dependency.provider_name or dependency.provider_id
if dependency.kind == DependencyKind.WORKFLOW_TOOL:
return f"workflow {name}"
return f"{dependency.kind.value} {name}"

View File

@ -0,0 +1,938 @@
"""Apply versioned migration packages to an explicitly resolved target tenant.
Import target resolution is deliberately performed before any resource import
work. The service does not write Click output; callers receive structured
report items and can decide how to render them.
"""
from __future__ import annotations
import json
from collections.abc import Iterable
from dataclasses import dataclass
from typing import Any, cast
from uuid import UUID
import sqlalchemy as sa
import yaml
from sqlalchemy import or_
from sqlalchemy.orm import Session, sessionmaker
from core.entities.mcp_provider import MCPAuthentication, MCPConfiguration
from core.tools.entities.tool_entities import ApiProviderSchemaType, WorkflowToolParameterConfiguration
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models import Account, ApiToken, Tenant, TenantAccountJoin, TenantAccountRole
from models.enums import ApiTokenType
from models.model import App
from models.tools import ApiToolProvider, MCPToolProvider, WorkflowToolProvider
from services.app_dsl_service import AppDslService
from services.data_migration.dependency_discovery_service import DependencyDiscoveryService
from services.data_migration.entities import (
ConflictStrategy,
DependencyKind,
IdStrategy,
ImportOptions,
ImportResult,
ImportTarget,
MigrationDataError,
MigrationPackage,
ReportContext,
ResourceIdMapping,
ResourceReportItem,
ResourceType,
)
from services.entities.dsl_entities import ImportStatus
from services.tools.api_tools_manage_service import ApiToolManageService
from services.tools.mcp_tools_manage_service import MCPToolManageService
from services.tools.workflow_tools_manage_service import WorkflowToolManageService
from services.workflow_service import WorkflowService
@dataclass(frozen=True)
class ImportRequest:
"""Structured input for package import.
`cli_target_tenant` and `config_target_tenant` are target tenant names from
outer adapters. They intentionally override package metadata, because a
migration package may be reused across environments.
"""
package: MigrationPackage
cli_target_tenant: str | None = None
config_target_tenant: str | None = None
operator_email: str | None = None
options_override: ImportOptions | None = None
class ImportTargetResolver:
"""Resolve the target tenant and operator before import side effects begin."""
def select_target_tenant_name(self, request: ImportRequest) -> str:
if request.cli_target_tenant:
return request.cli_target_tenant
if request.config_target_tenant:
return request.config_target_tenant
package_target = request.package.metadata.target_tenant or {}
if package_target.get("name"):
return package_target["name"]
if package_target.get("id"):
return package_target["id"]
raise MigrationDataError(
"Target tenant must be provided by --target-tenant, import config, or package metadata."
)
def resolve(self, request: ImportRequest) -> ImportTarget:
target_tenant_name = self.select_target_tenant_name(request)
package_target = request.package.metadata.target_tenant or {}
if request.cli_target_tenant or request.config_target_tenant:
tenant = self._resolve_tenant_by_id_or_name(target_tenant_name)
elif package_target.get("id") and self._is_uuid(package_target["id"]):
tenant = db.session.get(Tenant, package_target["id"])
if tenant is not None and package_target.get("name") and tenant.name != package_target.get("name"):
raise MigrationDataError(
f"Target tenant id/name mismatch: {package_target['id']} / {package_target['name']}"
)
else:
tenant = self._resolve_tenant_by_id_or_name(target_tenant_name)
if tenant is None:
raise MigrationDataError(f"Target tenant not found: {target_tenant_name}")
account_query = (
db.session.query(Account)
.join(TenantAccountJoin, Account.id == TenantAccountJoin.account_id)
.filter(TenantAccountJoin.tenant_id == tenant.id)
)
if request.operator_email:
account_query = account_query.filter(Account.email == request.operator_email)
identity = request.operator_email
else:
account_query = account_query.filter(TenantAccountJoin.role == TenantAccountRole.OWNER).order_by(
TenantAccountJoin.created_at.asc()
)
identity = "earliest owner"
account = account_query.first()
if account is None:
raise MigrationDataError(f"No operator account found for target tenant {target_tenant_name}: {identity}")
return ImportTarget(
tenant_id=tenant.id,
tenant_name=tenant.name,
operator_id=account.id,
operator_email=account.email,
)
def _resolve_tenant_by_id_or_name(self, value: str) -> Tenant | None:
if self._is_uuid(value):
tenant = db.session.get(Tenant, value)
if tenant is not None:
return tenant
tenants = list(db.session.scalars(sa.select(Tenant).where(Tenant.name == value)).all())
if len(tenants) > 1:
raise MigrationDataError(f"Target tenant name is ambiguous; use target_tenant.id: {value}")
return tenants[0] if tenants else None
def _is_uuid(self, value: str) -> bool:
try:
UUID(value)
except ValueError:
return False
return True
class MigrationImportService:
"""Apply package resources using Dify service APIs and structured reporting."""
target_resolver: ImportTargetResolver
def __init__(self, *, target_resolver: ImportTargetResolver | None = None) -> None:
self.target_resolver = target_resolver or ImportTargetResolver()
def import_package(self, request: ImportRequest) -> ImportResult:
target = self.target_resolver.resolve(request)
options = request.options_override or request.package.metadata.import_options
report_items = [
ResourceReportItem(
resource_type=ResourceType.DEPENDENCY,
identifier=target.tenant_id,
name=target.tenant_name,
status="resolved",
message=f"operator: {target.operator_email or target.operator_id}",
)
]
id_mapping: dict[str, str] = {}
id_mapping_details: list[ResourceIdMapping] = []
self._import_api_tools(
request.package,
target,
options,
report_items,
id_mapping,
id_mapping_details,
self._source_api_provider_ids_by_name(request.package),
)
self._import_mcp_tools(request.package, target, options, report_items, id_mapping, id_mapping_details)
self._preflight_dependency_only_mcp(request.package, target, report_items)
workflow_tool_app_ids = self._workflow_tool_source_app_ids(request.package)
imported_workflow_ids: set[str] = set()
if workflow_tool_app_ids:
self._import_workflows(
request.package,
target,
options,
report_items,
id_mapping,
id_mapping_details=id_mapping_details,
imported_workflow_ids=imported_workflow_ids,
only_app_ids=workflow_tool_app_ids,
)
self._import_workflow_tools(request.package, target, options, id_mapping, id_mapping_details, report_items)
self._import_workflows(
request.package,
target,
options,
report_items,
id_mapping,
id_mapping_details=id_mapping_details,
imported_workflow_ids=imported_workflow_ids,
skip_app_ids=imported_workflow_ids,
)
return ImportResult(
report_items=report_items,
id_mapping=id_mapping,
report_context=ReportContext(
target_tenant=target.tenant_name,
operator_email=target.operator_email,
id_mapping_count=len(id_mapping),
id_mappings=dict(id_mapping),
id_mapping_details=id_mapping_details,
),
)
def _import_workflows(
self,
package: MigrationPackage,
target: ImportTarget,
options: ImportOptions,
report_items: list[ResourceReportItem],
id_mapping: dict[str, str],
id_mapping_details: list[ResourceIdMapping],
imported_workflow_ids: set[str] | None = None,
only_app_ids: set[str] | None = None,
skip_app_ids: set[str] | None = None,
) -> None:
account = db.session.get(Account, target.operator_id)
tenant = db.session.get(Tenant, target.tenant_id)
if account is None:
raise MigrationDataError(f"Operator account not found: {target.operator_id}")
if tenant is None:
raise MigrationDataError(f"Target tenant not found: {target.tenant_id}")
account.current_tenant = tenant
for workflow_data in package.workflows:
app_id = self._optional_string(workflow_data.get("id"))
if only_app_ids and app_id not in only_app_ids:
continue
if skip_app_ids and app_id in skip_app_ids:
continue
dsl_content = self._rewrite_workflow_dsl_provider_ids(
self._required_string(workflow_data, "dsl", "workflow"),
id_mapping,
)
existing_app = (
self._find_existing_app(app_id, target.tenant_id)
if options.id_strategy == IdStrategy.PRESERVE_ID
else None
)
if existing_app is not None and options.conflict_strategy == ConflictStrategy.FAIL:
raise MigrationDataError(f"App already exists and conflict_strategy=fail: {app_id}")
if existing_app is not None and options.conflict_strategy == ConflictStrategy.SKIP:
if app_id:
self._record_id_mappings(
id_mapping,
id_mapping_details,
ResourceType.WORKFLOW,
workflow_data.get("name") if isinstance(workflow_data.get("name"), str) else None,
{app_id},
existing_app.id,
)
report_items.append(
ResourceReportItem(ResourceType.WORKFLOW, str(app_id), workflow_data.get("name"), "skipped")
)
continue
imported_app_id = self._import_workflow_app(
account=account,
workflow_data=workflow_data,
dsl_content=dsl_content,
app_id=app_id,
existing_app=existing_app,
options=options,
)
if app_id:
self._record_id_mappings(
id_mapping,
id_mapping_details,
ResourceType.WORKFLOW,
workflow_data.get("name") if isinstance(workflow_data.get("name"), str) else None,
{app_id},
imported_app_id,
)
if imported_workflow_ids is not None:
imported_workflow_ids.add(app_id)
if options.create_app_api_token_on_import:
self._create_or_reuse_app_api_token(imported_app_id, target.tenant_id)
report_items.append(
ResourceReportItem(
ResourceType.WORKFLOW,
imported_app_id,
workflow_data.get("name"),
"updated" if existing_app is not None else "created",
)
)
def _workflow_tool_source_app_ids(self, package: MigrationPackage) -> set[str]:
app_ids: set[str] = set()
for workflow_tool_data in package.workflow_tools:
app_id = self._optional_string(workflow_tool_data.get("app_id"))
if app_id:
app_ids.add(app_id)
return app_ids
def _import_workflow_app(
self,
*,
account: Account,
workflow_data: dict[str, object],
dsl_content: str,
app_id: str | None,
existing_app: App | None,
options: ImportOptions,
) -> str:
import_service = AppDslService(cast(Session, db.session))
if existing_app is not None:
import_result = import_service.import_app(
account=account,
import_mode="yaml-content",
yaml_content=dsl_content,
app_id=existing_app.id,
)
else:
import_app_id = app_id if self._should_preserve_source_app_id(options) else None
import_result = import_service.import_app(
account=account,
import_mode="yaml-content",
yaml_content=dsl_content,
import_app_id=import_app_id,
)
if import_result.status not in {ImportStatus.COMPLETED, ImportStatus.COMPLETED_WITH_WARNINGS}:
error = import_result.error or f"unexpected import status {import_result.status}"
raise MigrationDataError(f"Workflow import failed: {error}")
if import_result.app_id is None:
raise MigrationDataError(f"Workflow import did not return an app id: {workflow_data.get('name')}")
db.session.commit()
return import_result.app_id
def _rewrite_workflow_dsl_provider_ids(self, dsl_content: str, id_mapping: dict[str, str]) -> str:
if not id_mapping:
return dsl_content
parsed = yaml.safe_load(dsl_content) if dsl_content else {}
if not isinstance(parsed, dict):
return dsl_content
for node in self._workflow_nodes(parsed):
data = node.get("data") if isinstance(node, dict) else None
if not isinstance(data, dict):
continue
self._rewrite_tool_config_provider_id(data, id_mapping)
for tool_config in self._agent_tool_configs(data):
self._rewrite_tool_config_provider_id(tool_config, id_mapping)
return yaml.safe_dump(parsed, sort_keys=False, allow_unicode=True)
def _rewrite_tool_config_provider_id(self, tool_config: dict[str, Any], id_mapping: dict[str, str]) -> None:
provider_id = self._optional_string(tool_config.get("provider_id"))
if provider_id and provider_id in id_mapping:
tool_config["provider_id"] = id_mapping[provider_id]
def _source_api_provider_ids_by_name(self, package: MigrationPackage) -> dict[str, set[str]]:
provider_ids_by_name: dict[str, set[str]] = {}
discovery_service = DependencyDiscoveryService()
for workflow_data in package.workflows:
dsl_content = self._optional_string(workflow_data.get("dsl"))
if not dsl_content:
continue
parsed = yaml.safe_load(dsl_content) if dsl_content else {}
if not isinstance(parsed, dict):
continue
for dependency in discovery_service.discover_from_dsl(parsed):
if dependency.kind != DependencyKind.API_TOOL or not dependency.provider_name:
continue
provider_ids_by_name.setdefault(dependency.provider_name, set()).add(dependency.provider_id)
return provider_ids_by_name
def _workflow_nodes(self, dsl: dict[str, Any]) -> list[dict[str, Any]]:
nodes: list[dict[str, Any]] = []
graph = dsl.get("graph")
if isinstance(graph, dict) and isinstance(graph.get("nodes"), list):
nodes.extend(node for node in graph["nodes"] if isinstance(node, dict))
workflow = dsl.get("workflow")
workflow_graph = workflow.get("graph") if isinstance(workflow, dict) else None
if isinstance(workflow_graph, dict) and isinstance(workflow_graph.get("nodes"), list):
nodes.extend(node for node in workflow_graph["nodes"] if isinstance(node, dict))
return nodes
def _agent_tool_configs(self, data: dict[str, Any]) -> list[dict[str, Any]]:
configs = data.get("tools")
if isinstance(configs, list):
return [config for config in configs if isinstance(config, dict)]
agent_parameters = data.get("agent_parameters")
if not isinstance(agent_parameters, dict):
return []
tools_parameter = agent_parameters.get("tools")
if not isinstance(tools_parameter, dict):
return []
value = tools_parameter.get("value", [])
if not isinstance(value, list):
return []
return [config for config in value if isinstance(config, dict)]
def _should_preserve_source_app_id(self, options: ImportOptions) -> bool:
return options.id_strategy == IdStrategy.PRESERVE_ID
def _find_existing_app(self, app_id: str | None, tenant_id: str) -> App | None:
if not self._is_uuid_string(app_id):
return None
return db.session.scalar(sa.select(App).where(App.id == app_id, App.tenant_id == tenant_id))
def _create_or_reuse_app_api_token(self, app_id: str, tenant_id: str) -> None:
existing = db.session.scalar(
sa.select(ApiToken).where(
ApiToken.type == ApiTokenType.APP,
ApiToken.app_id == app_id,
ApiToken.tenant_id == tenant_id,
)
)
if existing is not None:
return
api_token = ApiToken()
api_token.app_id = app_id
api_token.tenant_id = tenant_id
api_token.token = ApiToken.generate_api_key("app", 24)
api_token.type = ApiTokenType.APP
db.session.add(api_token)
db.session.commit()
def _import_api_tools(
self,
package: MigrationPackage,
target: ImportTarget,
options: ImportOptions,
report_items: list[ResourceReportItem],
id_mapping: dict[str, str],
id_mapping_details: list[ResourceIdMapping],
source_provider_ids_by_name: dict[str, set[str]],
) -> None:
for tool_data in package.tools:
provider_name = self._required_string(tool_data, "provider_name", "api_tool")
schema = self._required_string(tool_data, "schema", "api_tool")
existing = db.session.scalar(
sa.select(ApiToolProvider).where(
ApiToolProvider.tenant_id == target.tenant_id,
ApiToolProvider.name == provider_name,
)
)
if existing is not None and options.conflict_strategy == ConflictStrategy.FAIL:
raise MigrationDataError(f"API tool already exists and conflict_strategy=fail: {provider_name}")
if existing is not None and options.conflict_strategy == ConflictStrategy.SKIP:
self._record_id_mappings(
id_mapping,
id_mapping_details,
ResourceType.API_TOOL,
provider_name,
self._api_tool_source_ids(provider_name, tool_data, source_provider_ids_by_name),
existing.id,
)
report_items.append(ResourceReportItem(ResourceType.API_TOOL, provider_name, provider_name, "skipped"))
continue
schema_info = ApiToolManageService.parser_api_schema(schema=schema)
schema_type = cast(ApiProviderSchemaType, schema_info["schema_type"])
credentials = (
cast(dict[str, Any], tool_data.get("credentials"))
if isinstance(tool_data.get("credentials"), dict)
else {}
)
credentials = credentials or {"auth_type": "none"}
raw_icon = tool_data.get("icon")
icon = (
cast(dict[str, Any], raw_icon)
if isinstance(raw_icon, dict)
else {"content": "tool", "background": "#FEF7C3"}
)
raw_labels = tool_data.get("labels")
labels = [str(label) for label in raw_labels] if isinstance(raw_labels, list) else []
if existing is not None:
ApiToolManageService.update_api_tool_provider(
user_id=target.operator_id,
tenant_id=target.tenant_id,
provider_name=provider_name,
original_provider=existing.name,
_schema_type=schema_type,
schema=schema,
privacy_policy=self._optional_string(tool_data.get("privacy_policy")) or "",
credentials=credentials,
custom_disclaimer=self._optional_string(tool_data.get("custom_disclaimer")) or "",
labels=labels,
icon=icon,
)
status = "updated"
else:
ApiToolManageService.create_api_tool_provider(
user_id=target.operator_id,
tenant_id=target.tenant_id,
provider_name=provider_name,
schema_type=schema_type,
schema=schema,
privacy_policy=self._optional_string(tool_data.get("privacy_policy")) or "",
credentials=credentials,
custom_disclaimer=self._optional_string(tool_data.get("custom_disclaimer")) or "",
labels=labels,
icon=icon,
)
status = "created"
target_provider = self._find_api_tool_provider(target.tenant_id, provider_name)
if target_provider is not None:
self._record_id_mappings(
id_mapping,
id_mapping_details,
ResourceType.API_TOOL,
provider_name,
self._api_tool_source_ids(provider_name, tool_data, source_provider_ids_by_name),
target_provider.id,
)
report_items.append(ResourceReportItem(ResourceType.API_TOOL, provider_name, provider_name, status))
def _find_api_tool_provider(self, tenant_id: str, provider_name: str) -> ApiToolProvider | None:
return db.session.scalar(
sa.select(ApiToolProvider).where(
ApiToolProvider.tenant_id == tenant_id,
ApiToolProvider.name == provider_name,
)
)
def _api_tool_source_ids(
self,
provider_name: str,
tool_data: dict[str, Any],
source_provider_ids_by_name: dict[str, set[str]],
) -> set[str]:
source_ids = set(source_provider_ids_by_name.get(provider_name, set()))
source_id = self._optional_string(tool_data.get("id"))
if source_id:
source_ids.add(source_id)
return source_ids
def _record_id_mappings(
self,
id_mapping: dict[str, str],
id_mapping_details: list[ResourceIdMapping],
resource_type: ResourceType,
name: str | None,
source_ids: Iterable[str],
target_id: str,
) -> None:
for source_id in source_ids:
id_mapping[source_id] = target_id
id_mapping_details[:] = [item for item in id_mapping_details if item.source_id != source_id]
id_mapping_details.append(ResourceIdMapping(resource_type, name, source_id, target_id))
def _import_workflow_tools(
self,
package: MigrationPackage,
target: ImportTarget,
options: ImportOptions,
id_mapping: dict[str, str],
id_mapping_details: list[ResourceIdMapping],
report_items: list[ResourceReportItem],
) -> None:
if not package.workflow_tools:
return
account = db.session.get(Account, target.operator_id)
if account is None:
raise MigrationDataError(f"Operator account not found: {target.operator_id}")
for workflow_tool_data in package.workflow_tools:
app_id = self._optional_string(workflow_tool_data.get("app_id"))
resolved_app_id = id_mapping.get(app_id or "", app_id)
if not resolved_app_id or self._find_existing_app(resolved_app_id, target.tenant_id) is None:
report_items.append(
ResourceReportItem(
ResourceType.WORKFLOW_TOOL,
str(workflow_tool_data.get("id", workflow_tool_data.get("name", ""))),
self._optional_string(workflow_tool_data.get("name")),
"unresolved",
"Referenced workflow app was not found in the target tenant; workflow tool was skipped.",
)
)
continue
try:
self._ensure_workflow_app_is_published(target, account, resolved_app_id)
except Exception as exc:
report_items.append(
ResourceReportItem(
ResourceType.WORKFLOW_TOOL,
str(workflow_tool_data.get("id", workflow_tool_data.get("name", ""))),
self._optional_string(workflow_tool_data.get("name")),
"unresolved",
f"Referenced workflow app could not be published: {exc}",
)
)
continue
workflow_tool_id = self._optional_string(workflow_tool_data.get("id"))
tool_name = self._required_string(workflow_tool_data, "name", "workflow_tool")
lookup_workflow_tool_id = workflow_tool_id if options.id_strategy == IdStrategy.PRESERVE_ID else None
existing = self._find_existing_workflow_tool(
target.tenant_id, lookup_workflow_tool_id, tool_name, resolved_app_id
)
if existing is not None and options.conflict_strategy == ConflictStrategy.FAIL:
raise MigrationDataError(f"Workflow tool already exists and conflict_strategy=fail: {tool_name}")
if existing is not None and options.conflict_strategy == ConflictStrategy.SKIP:
if workflow_tool_id:
self._record_id_mappings(
id_mapping,
id_mapping_details,
ResourceType.WORKFLOW_TOOL,
tool_name,
{workflow_tool_id},
existing.id,
)
report_items.append(ResourceReportItem(ResourceType.WORKFLOW_TOOL, existing.id, tool_name, "skipped"))
continue
raw_icon = workflow_tool_data.get("icon")
icon = (
cast(dict[str, Any], raw_icon)
if isinstance(raw_icon, dict)
else {"content": "🤖", "background": "#FFEAD5"}
)
raw_parameters = workflow_tool_data.get("parameters")
parameters = [
parameter
if isinstance(parameter, WorkflowToolParameterConfiguration)
else WorkflowToolParameterConfiguration(**parameter)
for parameter in (raw_parameters if isinstance(raw_parameters, list) else [])
if isinstance(parameter, dict | WorkflowToolParameterConfiguration)
]
raw_labels = workflow_tool_data.get("labels")
labels = [str(label) for label in raw_labels] if isinstance(raw_labels, list) else []
label = self._optional_string(workflow_tool_data.get("label")) or tool_name
description = self._optional_string(workflow_tool_data.get("description")) or ""
privacy_policy = self._optional_string(workflow_tool_data.get("privacy_policy")) or ""
if existing is not None:
WorkflowToolManageService.update_workflow_tool(
user_id=account.id,
tenant_id=target.tenant_id,
workflow_tool_id=existing.id,
name=tool_name,
label=label,
icon=icon,
description=description,
parameters=parameters,
privacy_policy=privacy_policy,
labels=labels,
)
status = "updated"
identifier = existing.id
else:
import_id = workflow_tool_id if options.id_strategy == IdStrategy.PRESERVE_ID else ""
WorkflowToolManageService.create_workflow_tool(
user_id=account.id,
tenant_id=target.tenant_id,
workflow_app_id=resolved_app_id,
name=tool_name,
label=label,
icon=icon,
description=description,
parameters=parameters,
privacy_policy=privacy_policy,
labels=labels,
import_id=import_id or "",
)
status = "created"
target_provider = self._find_existing_workflow_tool(
target.tenant_id, import_id or None, tool_name, resolved_app_id
)
if target_provider is None:
raise MigrationDataError(f"Workflow tool was not created: {tool_name}")
identifier = target_provider.id
if workflow_tool_id:
self._record_id_mappings(
id_mapping,
id_mapping_details,
ResourceType.WORKFLOW_TOOL,
tool_name,
{workflow_tool_id},
identifier,
)
report_items.append(ResourceReportItem(ResourceType.WORKFLOW_TOOL, identifier, tool_name, status))
def _ensure_workflow_app_is_published(self, target: ImportTarget, account: Account, app_id: str) -> None:
app = self._find_existing_app(app_id, target.tenant_id)
if app is None:
raise MigrationDataError(f"Referenced workflow app was not found in target tenant: {app_id}")
if app.workflow_id:
return
workflow_service = WorkflowService()
with sessionmaker(db.engine).begin() as session:
app_in_session = session.get(App, app_id)
account_in_session = session.get(Account, account.id)
if app_in_session is None:
raise MigrationDataError(f"Referenced workflow app was not found in target tenant: {app_id}")
if account_in_session is None:
raise MigrationDataError(f"Operator account not found: {account.id}")
workflow = workflow_service.publish_workflow(
session=session,
app_model=app_in_session,
account=account_in_session,
marked_name="Migration import",
marked_comment="Published automatically for workflow tool import.",
)
app_in_session.workflow_id = workflow.id
app_in_session.updated_by = account.id
app_in_session.updated_at = naive_utc_now()
def _import_mcp_tools(
self,
package: MigrationPackage,
target: ImportTarget,
options: ImportOptions,
report_items: list[ResourceReportItem],
id_mapping: dict[str, str],
id_mapping_details: list[ResourceIdMapping],
) -> None:
for mcp_data in package.mcp_tools:
name = self._required_string(mcp_data, "name", "mcp_tool")
server_identifier = self._required_string(mcp_data, "server_identifier", "mcp_tool")
provider_id = self._optional_string(mcp_data.get("id"))
lookup_provider_id = provider_id if options.id_strategy == IdStrategy.PRESERVE_ID else None
existing = self._find_existing_mcp_tool(target.tenant_id, lookup_provider_id, server_identifier)
if existing is not None and options.conflict_strategy == ConflictStrategy.FAIL:
raise MigrationDataError(f"MCP tool already exists and conflict_strategy=fail: {name}")
if existing is not None and options.conflict_strategy == ConflictStrategy.SKIP:
if provider_id:
self._record_id_mappings(
id_mapping,
id_mapping_details,
ResourceType.MCP_TOOL,
name,
{provider_id},
existing.id,
)
report_items.append(ResourceReportItem(ResourceType.MCP_TOOL, existing.id, name, "skipped"))
continue
service = MCPToolManageService(session=cast(Session, db.session))
configuration = MCPConfiguration.model_validate(mcp_data.get("configuration") or {})
authentication = (
MCPAuthentication.model_validate(mcp_data["authentication"]) if mcp_data.get("authentication") else None
)
if existing is not None:
service.update_provider(
tenant_id=target.tenant_id,
provider_id=existing.id,
server_url=self._required_string(mcp_data, "server_url", "mcp_tool"),
name=name,
icon=self._optional_string(mcp_data.get("icon")) or "",
icon_type=self._optional_string(mcp_data.get("icon_type")) or "emoji",
icon_background=self._optional_string(mcp_data.get("icon_background")) or "",
server_identifier=server_identifier,
headers=mcp_data.get("headers") if isinstance(mcp_data.get("headers"), dict) else {},
configuration=configuration,
authentication=authentication,
)
db.session.commit()
status = "updated"
identifier = existing.id
provider = existing
else:
service.create_provider(
tenant_id=target.tenant_id,
user_id=target.operator_id,
server_url=self._required_string(mcp_data, "server_url", "mcp_tool"),
name=name,
icon=self._optional_string(mcp_data.get("icon")) or "",
icon_type=self._optional_string(mcp_data.get("icon_type")) or "emoji",
icon_background=self._optional_string(mcp_data.get("icon_background")) or "",
server_identifier=server_identifier,
headers=mcp_data.get("headers") if isinstance(mcp_data.get("headers"), dict) else {},
configuration=configuration,
authentication=authentication,
)
created_provider = self._find_existing_mcp_tool(target.tenant_id, lookup_provider_id, server_identifier)
if created_provider is None:
raise MigrationDataError(f"MCP provider was not created: {name}")
status = "created"
provider = created_provider
identifier = provider.id
self._restore_mcp_provider_tools(provider, mcp_data)
db.session.commit()
if provider_id:
self._record_id_mappings(
id_mapping,
id_mapping_details,
ResourceType.MCP_TOOL,
name,
{provider_id},
identifier,
)
report_items.append(ResourceReportItem(ResourceType.MCP_TOOL, identifier, name, status))
def _restore_mcp_provider_tools(self, provider: MCPToolProvider, mcp_data: dict[str, object]) -> None:
tools = mcp_data.get("tools")
if not isinstance(tools, list):
return
provider.tools = json.dumps(tools)
provider.authed = True
def _find_existing_mcp_tool(
self, tenant_id: str, provider_id: str | None, server_identifier: str
) -> MCPToolProvider | None:
predicates = [MCPToolProvider.server_identifier == server_identifier]
if self._is_uuid_string(provider_id):
predicates.append(MCPToolProvider.id == provider_id)
return db.session.scalar(
sa.select(MCPToolProvider).where(MCPToolProvider.tenant_id == tenant_id, or_(*predicates)).limit(1)
)
def _is_uuid_string(self, value: str | None) -> bool:
if not value:
return False
try:
UUID(value)
except ValueError:
return False
return True
def _find_existing_workflow_tool(
self, tenant_id: str, workflow_tool_id: str | None, tool_name: str, app_id: str
) -> WorkflowToolProvider | None:
predicates = [WorkflowToolProvider.name == tool_name, WorkflowToolProvider.app_id == app_id]
if self._is_uuid_string(workflow_tool_id):
predicates.append(WorkflowToolProvider.id == workflow_tool_id)
return db.session.scalar(
sa.select(WorkflowToolProvider)
.where(WorkflowToolProvider.tenant_id == tenant_id, or_(*predicates))
.limit(1)
)
def _preflight_dependency_only_mcp(
self, package: MigrationPackage, target: ImportTarget, report_items: list[ResourceReportItem]
) -> None:
for dependency in package.dependencies:
if dependency.get("kind") != DependencyKind.MCP_TOOL.value:
continue
provider_id = str(dependency.get("provider_id", dependency.get("id", "")))
provider_name = self._optional_string(dependency.get("provider_name") or dependency.get("name"))
existing = self._find_dependency_only_mcp_provider(target.tenant_id, provider_id, provider_name)
report_name = f"mcp_tool {provider_name or getattr(existing, 'name', None) or provider_id}"
if existing is not None:
report_items.append(
ResourceReportItem(
ResourceType.DEPENDENCY,
provider_id,
report_name,
"available",
"MCP provider exists in target tenant.",
)
)
continue
reference_summary = self._dependency_only_mcp_reference_summary(package, provider_id, provider_name)
message = "missing in target tenant"
if reference_summary:
message = f"{message}; referenced by {reference_summary}"
message = f"{message}; configure it manually before running the workflow."
report_items.append(
ResourceReportItem(
ResourceType.DEPENDENCY,
provider_id,
report_name,
"skipped",
message,
)
)
def _find_dependency_only_mcp_provider(
self, tenant_id: str, provider_id: str, provider_name: str | None
) -> MCPToolProvider | None:
predicates = [MCPToolProvider.server_identifier == provider_id]
if self._is_uuid_string(provider_id):
predicates.append(MCPToolProvider.id == provider_id)
return db.session.scalar(
sa.select(MCPToolProvider).where(MCPToolProvider.tenant_id == tenant_id, or_(*predicates)).limit(1)
)
def _dependency_only_mcp_reference_summary(
self, package: MigrationPackage, provider_id: str, provider_name: str | None
) -> str:
references = self._dependency_only_mcp_references(package, provider_id, provider_name)
return "; ".join(references)
def _dependency_only_mcp_references(
self, package: MigrationPackage, provider_id: str, provider_name: str | None
) -> list[str]:
references: list[str] = []
seen: set[str] = set()
for workflow_data in package.workflows:
workflow_name = self._optional_string(workflow_data.get("name"))
workflow_id = self._optional_string(workflow_data.get("id"))
workflow_label = workflow_name or workflow_id or "unknown workflow"
dsl_content = self._optional_string(workflow_data.get("dsl"))
if not dsl_content:
continue
parsed = yaml.safe_load(dsl_content) if dsl_content else {}
if not isinstance(parsed, dict):
continue
for node in self._workflow_nodes(parsed):
data = node.get("data") if isinstance(node, dict) else None
if not isinstance(data, dict):
continue
for tool_config in [data, *self._agent_tool_configs(data)]:
if not self._is_mcp_dependency_reference(tool_config, provider_id, provider_name):
continue
tool_label = self._mcp_tool_reference_label(node, tool_config)
reference = f"{workflow_label} / {tool_label}"
if reference not in seen:
seen.add(reference)
references.append(reference)
return references
def _is_mcp_dependency_reference(
self, tool_config: dict[str, Any], provider_id: str, provider_name: str | None
) -> bool:
provider_type = str(tool_config.get("provider_type") or tool_config.get("type") or "").lower()
if provider_type != "mcp":
return False
config_provider_id = self._optional_string(
tool_config.get("provider_id") or tool_config.get("provider_name") or tool_config.get("provider")
)
if config_provider_id == provider_id:
return True
return bool(provider_name and config_provider_id == provider_name)
def _mcp_tool_reference_label(self, node: dict[str, Any], tool_config: dict[str, Any]) -> str:
for key in ("tool_name", "tool", "name"):
value = self._optional_string(tool_config.get(key))
if value:
return value
node_id = self._optional_string(node.get("id"))
return node_id or "unknown tool"
def _required_string(self, value: dict[str, object], field_name: str, resource_name: str) -> str:
field_value = value.get(field_name)
if not isinstance(field_value, str) or not field_value:
raise MigrationDataError(f"Missing required {resource_name} field: {field_name}")
return field_value
def _optional_string(self, value: object) -> str | None:
if isinstance(value, str) and value:
return value
return None

View File

@ -0,0 +1,71 @@
"""JSON persistence for versioned cross-environment migration packages.
The package service validates file shape and serializes only structured package
entities. It does not perform CLI rendering or database access, keeping it safe
to reuse from Click adapters, tests, and future import/export services.
"""
from __future__ import annotations
import json
from dataclasses import asdict
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from services.data_migration.entities import (
ImportOptions,
MigrationDataError,
MigrationMetadata,
MigrationPackage,
SourceTenant,
TargetTenantSelector,
)
PACKAGE_VERSION = "1"
class MigrationPackageService:
def load_package(self, path: str | Path) -> MigrationPackage:
package_path = Path(path)
with package_path.open(encoding="utf-8") as file:
raw = json.load(file)
if not isinstance(raw, dict):
raise MigrationDataError("Migration package JSON must be an object.")
package = MigrationPackage.from_mapping(raw)
if package.metadata.version != PACKAGE_VERSION:
raise MigrationDataError(f"Unsupported migration package version: {package.metadata.version}")
return package
def save_package(self, package: MigrationPackage, path: str | Path, *, overwrite: bool) -> None:
package_path = Path(path)
if package_path.exists() and not overwrite:
raise MigrationDataError(f"Output file already exists: {package_path}")
package_path.parent.mkdir(parents=True, exist_ok=True)
with package_path.open("w", encoding="utf-8") as file:
json.dump(self.to_mapping(package), file, ensure_ascii=False, indent=2)
file.write("\n")
def build_empty_package(
self,
*,
source_tenant_id: str,
source_tenant_name: str,
include_secrets: bool,
import_options: ImportOptions | None = None,
target_tenant: TargetTenantSelector | None = None,
) -> MigrationPackage:
return MigrationPackage(
metadata=MigrationMetadata(
version=PACKAGE_VERSION,
source_scope="single",
source_tenants=[SourceTenant(id=source_tenant_id, name=source_tenant_name)],
target_tenant=target_tenant,
created_at=datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z"),
include_secrets=include_secrets,
import_options=import_options or ImportOptions(),
)
)
def to_mapping(self, package: MigrationPackage) -> dict[str, Any]:
return asdict(package)

View File

@ -0,0 +1,76 @@
from __future__ import annotations
from collections import Counter
from services.data_migration.entities import ReportContext, ResourceIdMapping, ResourceReportItem
class MigrationReportService:
"""Render structured migration resource results into CLI-friendly summary lines."""
def render(self, items: list[ResourceReportItem], *, context: ReportContext | None = None) -> list[str]:
counts = Counter((item.resource_type.value, item.status) for item in items)
lines = self._render_context(context)
lines.extend(
[f"{resource_type} {status}: {count}" for (resource_type, status), count in sorted(counts.items())]
)
actionable_items = [
item for item in items if item.status in {"dependency-only", "skipped", "unresolved"} and item.message
]
for item in actionable_items:
lines.append(self._render_actionable_detail(item))
return lines
def _render_context(self, context: ReportContext | None) -> list[str]:
if context is None:
return []
lines: list[str] = []
if context.output_path:
lines.append(f"output: {context.output_path}")
if context.source_scope:
lines.append(f"source scope: {context.source_scope}")
if context.selected_app_count is not None:
lines.append(f"selected apps: {context.selected_app_count}")
if context.include_secrets is not None:
lines.append(f"include secrets: {str(context.include_secrets).lower()}")
if context.target_tenant:
lines.append(f"target tenant: {context.target_tenant}")
if context.operator_email:
lines.append(f"operator: {context.operator_email}")
if context.app_api_tokens_created or context.app_api_tokens_reused:
lines.append(
f"app api tokens: {context.app_api_tokens_created} created, {context.app_api_tokens_reused} reused"
)
if context.id_mappings:
lines.append(f"resource references resolved: {len(context.id_mappings)}")
if context.id_mapping_details:
lines.extend(
self._render_id_mapping_detail(item)
for item in sorted(
context.id_mapping_details,
key=lambda item: (item.resource_type.value, item.name or "", item.source_id),
)
)
else:
lines.extend(
f"- {source_id} -> {target_id}" for source_id, target_id in sorted(context.id_mappings.items())
)
elif context.id_mapping_count:
lines.append(f"resource references resolved: {context.id_mapping_count}")
return lines
def _render_id_mapping_detail(self, item: ResourceIdMapping) -> str:
label = item.resource_type.value
if item.name:
label = f"{label} {item.name}"
return f"- {label}: {item.source_id} -> {item.target_id}"
def _render_actionable_detail(self, item: ResourceReportItem) -> str:
if item.resource_type.value == "dependency" and item.name and self._has_dependency_type_prefix(item.name):
if item.identifier and item.identifier not in item.name:
return f"dependency {item.name}: {item.identifier}: {item.message}"
return f"dependency {item.name}: {item.message}"
return f"{item.resource_type.value} {item.identifier}: {item.message}"
def _has_dependency_type_prefix(self, name: str) -> bool:
return name.startswith(("workflow ", "api_tool ", "workflow_tool ", "mcp_tool ", "builtin_or_plugin_tool "))

View File

@ -41,6 +41,7 @@ class WorkflowToolManageService:
parameters: list[WorkflowToolParameterConfiguration],
privacy_policy: str = "",
labels: list[str] | None = None,
import_id: str = "",
):
# check if the name is unique
existing_workflow_tool_provider: WorkflowToolProvider | None = None
@ -92,7 +93,8 @@ class WorkflowToolManageService:
privacy_policy=privacy_policy,
version=workflow.version,
)
if import_id:
workflow_tool_provider.id = import_id
try:
WorkflowToolProviderController.from_db(workflow_tool_provider)
except Exception as e:

View File

@ -0,0 +1,298 @@
from __future__ import annotations
from unittest.mock import patch
from uuid import uuid4
import pytest
from werkzeug.exceptions import HTTPException
import services
from controllers.console.auth.error import MemberNotInTenantError
from controllers.console.workspace import members as members_module
from controllers.console.workspace.members import MemberCancelInviteApi, MemberUpdateRoleApi, OwnerTransfer
from models.account import Account, Tenant, TenantAccountJoin, TenantAccountRole, TenantStatus
def unwrap(func):
while hasattr(func, "__wrapped__"):
func = func.__wrapped__
return func
class WorkspaceMembersIntegrationFactory:
@staticmethod
def create_tenant(db_session_with_containers) -> Tenant:
tenant = Tenant(name=f"Tenant {uuid4()}", plan="basic", status=TenantStatus.NORMAL)
db_session_with_containers.add(tenant)
db_session_with_containers.commit()
return tenant
@staticmethod
def create_account(
db_session_with_containers,
*,
email_prefix: str,
tenant: Tenant | None = None,
role: TenantAccountRole = TenantAccountRole.NORMAL,
current: bool = False,
) -> Account:
account = Account(
name=f"Account {uuid4()}",
email=f"{email_prefix}-{uuid4()}@example.com",
password="hashed-password",
password_salt="salt",
interface_language="en-US",
timezone="UTC",
)
db_session_with_containers.add(account)
db_session_with_containers.commit()
if tenant is not None:
join = TenantAccountJoin(
tenant_id=tenant.id,
account_id=account.id,
role=role,
current=current,
)
db_session_with_containers.add(join)
db_session_with_containers.commit()
account.current_tenant = tenant
return account
@staticmethod
def create_owner_workspace(db_session_with_containers) -> tuple[Tenant, Account]:
tenant = WorkspaceMembersIntegrationFactory.create_tenant(db_session_with_containers)
owner = WorkspaceMembersIntegrationFactory.create_account(
db_session_with_containers,
email_prefix="owner",
tenant=tenant,
role=TenantAccountRole.OWNER,
current=True,
)
return tenant, owner
@staticmethod
def create_owner_transfer_token(account: Account) -> str:
_, token = members_module.AccountService.generate_owner_transfer_token(
account.email,
account=account,
code="123456",
additional_data={},
)
return token
@staticmethod
def get_join(db_session_with_containers, *, tenant: Tenant, account: Account) -> TenantAccountJoin:
tenant_id = tenant.id
account_id = account.id
db_session_with_containers.expire_all()
join = (
db_session_with_containers.query(TenantAccountJoin)
.filter_by(tenant_id=tenant_id, account_id=account_id)
.one()
)
return join
class TestMemberCancelInviteApiWithContainers:
def test_cancel_success(self, flask_app_with_containers, db_session_with_containers):
api = MemberCancelInviteApi()
method = unwrap(api.delete)
factory = WorkspaceMembersIntegrationFactory
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
member = factory.create_account(db_session_with_containers, email_prefix="member")
with (
flask_app_with_containers.test_request_context("/"),
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
patch.object(members_module.TenantService, "remove_member_from_tenant") as mock_remove_member,
):
result, status = method(api, member.id)
assert status == 200
assert result["result"] == "success"
mock_remove_member.assert_called_once()
called_tenant, called_member, called_current_user = mock_remove_member.call_args.args
assert called_tenant.id == tenant.id
assert called_member.id == member.id
assert called_current_user.id == current_user.id
def test_cancel_not_found(self, flask_app_with_containers, db_session_with_containers):
api = MemberCancelInviteApi()
method = unwrap(api.delete)
factory = WorkspaceMembersIntegrationFactory
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
with (
flask_app_with_containers.test_request_context("/"),
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
):
with pytest.raises(HTTPException):
method(api, str(uuid4()))
def test_cancel_cannot_operate_self(self, flask_app_with_containers, db_session_with_containers):
api = MemberCancelInviteApi()
method = unwrap(api.delete)
factory = WorkspaceMembersIntegrationFactory
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
member = factory.create_account(db_session_with_containers, email_prefix="member")
with (
flask_app_with_containers.test_request_context("/"),
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
patch.object(
members_module.TenantService,
"remove_member_from_tenant",
side_effect=services.errors.account.CannotOperateSelfError("x"),
),
):
result, status = method(api, member.id)
assert status == 400
assert result["code"] == "cannot-operate-self"
def test_cancel_no_permission(self, flask_app_with_containers, db_session_with_containers):
api = MemberCancelInviteApi()
method = unwrap(api.delete)
factory = WorkspaceMembersIntegrationFactory
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
member = factory.create_account(db_session_with_containers, email_prefix="member")
with (
flask_app_with_containers.test_request_context("/"),
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
patch.object(
members_module.TenantService,
"remove_member_from_tenant",
side_effect=services.errors.account.NoPermissionError("x"),
),
):
result, status = method(api, member.id)
assert status == 403
assert result["code"] == "forbidden"
def test_cancel_member_not_in_tenant(self, flask_app_with_containers, db_session_with_containers):
api = MemberCancelInviteApi()
method = unwrap(api.delete)
factory = WorkspaceMembersIntegrationFactory
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
member = factory.create_account(db_session_with_containers, email_prefix="member")
with (
flask_app_with_containers.test_request_context("/"),
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
patch.object(
members_module.TenantService,
"remove_member_from_tenant",
side_effect=services.errors.account.MemberNotInTenantError(),
),
):
result, status = method(api, member.id)
assert status == 404
assert result["code"] == "member-not-found"
class TestMemberUpdateRoleApiWithContainers:
def test_update_success(self, flask_app_with_containers, db_session_with_containers):
api = MemberUpdateRoleApi()
method = unwrap(api.put)
factory = WorkspaceMembersIntegrationFactory
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
member = factory.create_account(
db_session_with_containers,
email_prefix="member",
tenant=tenant,
role=TenantAccountRole.EDITOR,
)
with (
flask_app_with_containers.test_request_context("/", json={"role": "normal"}),
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
):
result = method(api, member.id)
if isinstance(result, tuple):
result = result[0]
assert result["result"] == "success"
assert (
factory.get_join(db_session_with_containers, tenant=tenant, account=member).role == TenantAccountRole.NORMAL
)
def test_update_member_not_found(self, flask_app_with_containers, db_session_with_containers):
api = MemberUpdateRoleApi()
method = unwrap(api.put)
factory = WorkspaceMembersIntegrationFactory
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
with (
flask_app_with_containers.test_request_context("/", json={"role": "normal"}),
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
):
with pytest.raises(HTTPException):
method(api, str(uuid4()))
class TestOwnerTransferApiWithContainers:
def test_member_not_in_tenant(self, flask_app_with_containers, db_session_with_containers):
api = OwnerTransfer()
method = unwrap(api.post)
factory = WorkspaceMembersIntegrationFactory
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
member = factory.create_account(db_session_with_containers, email_prefix="member")
token = factory.create_owner_transfer_token(current_user)
with (
flask_app_with_containers.test_request_context("/", json={"token": token}),
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
):
with pytest.raises(MemberNotInTenantError):
method(api, member.id)
def test_member_not_found(self, flask_app_with_containers, db_session_with_containers):
api = OwnerTransfer()
method = unwrap(api.post)
factory = WorkspaceMembersIntegrationFactory
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
token = factory.create_owner_transfer_token(current_user)
with (
flask_app_with_containers.test_request_context("/", json={"token": token}),
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
):
with pytest.raises(HTTPException):
method(api, str(uuid4()))
def test_transfer_success(self, flask_app_with_containers, db_session_with_containers):
api = OwnerTransfer()
method = unwrap(api.post)
factory = WorkspaceMembersIntegrationFactory
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
member = factory.create_account(
db_session_with_containers,
email_prefix="member",
tenant=tenant,
role=TenantAccountRole.NORMAL,
)
token = factory.create_owner_transfer_token(current_user)
with (
flask_app_with_containers.test_request_context("/", json={"token": token}),
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
patch.object(members_module.AccountService, "send_new_owner_transfer_notify_email") as mock_new_owner_email,
patch.object(members_module.AccountService, "send_old_owner_transfer_notify_email") as mock_old_owner_email,
):
result = method(api, member.id)
assert result["result"] == "success"
assert (
factory.get_join(db_session_with_containers, tenant=tenant, account=member).role == TenantAccountRole.OWNER
)
assert (
factory.get_join(db_session_with_containers, tenant=tenant, account=current_user).role
== TenantAccountRole.ADMIN
)
mock_new_owner_email.assert_called_once()
mock_old_owner_email.assert_called_once()

View File

@ -0,0 +1,71 @@
import json
from click.testing import CliRunner
from commands.data_migration import (
ID_STRATEGY_CHOICES,
export_migration_data,
export_migration_data_template,
import_migration_data,
)
def test_export_command_requires_input_and_output():
result = CliRunner().invoke(export_migration_data, [])
assert result.exit_code != 0
assert export_migration_data.name == "export-app-migration"
assert "--input" in result.output
assert "--output" in result.output
def test_import_command_requires_input_and_target_tenant_or_package_metadata():
result = CliRunner().invoke(import_migration_data, [])
assert result.exit_code != 0
assert import_migration_data.name == "import-app-migration"
assert "--input" in result.output
def test_import_command_does_not_expose_unimplemented_map_id_strategy():
assert ID_STRATEGY_CHOICES == ["preserve-id", "generate-new-id"]
def test_export_template_command_prints_scripted_json_template():
result = CliRunner().invoke(export_migration_data_template, [])
assert result.exit_code == 0
assert export_migration_data_template.name == "app-migration-template"
template = json.loads(result.output)
assert template == {
"source_tenant": {"mode": "single", "id": "", "name": "admin's Workspace"},
"apps": {"modes": ["workflow", "advanced-chat"], "ids": [], "all": True},
"include_referenced_tools": True,
"additional_tools": {"api_tools": [], "workflow_tools": [], "mcp_tools": []},
"include_secrets": False,
"import_options": {
"create_app_api_token_on_import": False,
"id_strategy": "preserve-id",
"conflict_strategy": "fail",
},
}
def test_export_template_command_writes_output_file(tmp_path):
output_file = tmp_path / "export-template.json"
result = CliRunner().invoke(export_migration_data_template, ["--output", str(output_file)])
assert result.exit_code == 0
assert f"Output written to {output_file}" in result.output
assert json.loads(output_file.read_text())["apps"]["all"] is True
def test_export_template_command_requires_overwrite_for_existing_output(tmp_path):
output_file = tmp_path / "export-template.json"
output_file.write_text("{}")
result = CliRunner().invoke(export_migration_data_template, ["--output", str(output_file)])
assert result.exit_code != 0
assert "already exists" in result.output

View File

@ -0,0 +1,384 @@
from commands.data_migration import (
CONFLICT_STRATEGY_CHOICES,
ID_STRATEGY_CHOICES,
_confirm_wizard_summary,
_print_auto_tools,
_print_final_tool_selection,
_print_wizard_step,
_prompt_additional_tools,
_prompt_output_file,
_prompt_tool_category,
_resolve_mcp_tool_names,
migration_data_wizard,
parse_index_selection,
)
def test_parse_index_selection_supports_all():
assert parse_index_selection("all", ["a", "b", "c"]) == ["a", "b", "c"]
def test_wizard_command_uses_app_migration_name():
assert migration_data_wizard.name == "app-migration-wizard"
def test_parse_index_selection_supports_comma_indexes():
assert parse_index_selection("1, 3", ["a", "b", "c"]) == ["a", "c"]
def test_print_wizard_step_adds_separator(monkeypatch):
output_lines = []
monkeypatch.setattr("commands.data_migration.click.echo", output_lines.append)
_print_wizard_step("App Selection")
assert output_lines == ["", "==== App Selection ===="]
def test_conflict_strategy_choices_exclude_replace():
assert CONFLICT_STRATEGY_CHOICES == ["fail", "skip", "update"]
def test_prompt_app_ids_explains_comma_selection_and_default(monkeypatch):
from commands.data_migration import _prompt_app_ids
prompts = []
output_lines = []
apps = [
type("App", (), {"id": "app-1", "name": "embedded", "mode": "workflow"})(),
type("App", (), {"id": "app-2", "name": "main", "mode": "advanced-chat"})(),
]
def capture_prompt(text, **kwargs):
prompts.append((text, kwargs))
return "1,2"
monkeypatch.setattr("commands.data_migration.click.echo", output_lines.append)
monkeypatch.setattr("commands.data_migration.click.prompt", capture_prompt)
assert _prompt_app_ids(apps) == ["app-1", "app-2"]
assert prompts == [("Select apps by number, comma-separated numbers, or all", {"default": "all"})]
assert "Currently supported app types: workflow and chatflow." in output_lines
def test_prompt_tool_category_marks_auto_discovered_tools(monkeypatch):
output_lines = []
monkeypatch.setattr("commands.data_migration.click.echo", output_lines.append)
monkeypatch.setattr("commands.data_migration.click.prompt", lambda *args, **kwargs: "")
selected = _prompt_tool_category(
"Custom API tools",
[("weather", "weather", "tool-id"), ("calendar", "calendar", "calendar-id")],
auto_tools={"weather": "tool-id"},
)
assert selected == []
assert "1. [auto] weather (tool-id)" in output_lines
assert "2. [ ] calendar (calendar-id)" in output_lines
assert output_lines[:2] == ["", "==== Custom API tools ===="]
def test_prompt_tool_category_explains_comma_selection_and_default(monkeypatch):
prompts = []
def capture_prompt(text, **kwargs):
prompts.append((text, kwargs))
return ""
monkeypatch.setattr("commands.data_migration.click.echo", lambda *_args, **_kwargs: None)
monkeypatch.setattr("commands.data_migration.click.prompt", capture_prompt)
selected = _prompt_tool_category(
"Custom API tools",
[("weather", "weather", "tool-id")],
auto_tools={},
)
assert selected == []
assert prompts == [
(
"Select custom api tools by number, comma-separated numbers, all, or empty",
{"default": "", "show_default": "empty"},
)
]
def test_prompt_output_file_shows_default(monkeypatch):
prompts = []
def capture_prompt(text, **kwargs):
prompts.append((text, kwargs))
return "migration-data.json"
monkeypatch.setattr("commands.data_migration.click.prompt", capture_prompt)
assert _prompt_output_file() == ("migration-data.json", False)
assert prompts[0][0] == "Output path"
assert prompts[0][1]["show_default"] is True
def test_prompt_tool_category_marks_auto_by_detail_and_supports_multi_select(monkeypatch):
monkeypatch.setattr("commands.data_migration.click.echo", lambda *_args, **_kwargs: None)
monkeypatch.setattr("commands.data_migration.click.prompt", lambda *args, **kwargs: "1,2")
selected = _prompt_tool_category(
"Workflow tools",
[("tool-1", "embedded", "app-1"), ("tool-2", "other", "app-2")],
auto_tools={"embedded": "app-1"},
)
assert selected == ["tool-1", "tool-2"]
def test_prompt_tool_category_marks_auto_by_value():
output_lines = []
from commands import data_migration
original_echo = data_migration.click.echo
original_prompt = data_migration.click.prompt
data_migration.click.echo = output_lines.append
data_migration.click.prompt = lambda *args, **kwargs: ""
try:
_prompt_tool_category(
"Workflow tools",
[("tool-1", "embedded_workflow_as_tool", "tool-1")],
auto_tools={"embedded_workflow_as_tool": "tool-1"},
)
finally:
data_migration.click.echo = original_echo
data_migration.click.prompt = original_prompt
assert "1. [auto] embedded_workflow_as_tool (tool-1)" in output_lines
def test_print_auto_tools_lists_each_category(monkeypatch):
output_lines = []
monkeypatch.setattr("commands.data_migration.click.echo", output_lines.append)
_print_auto_tools(
{
"api_tools": {"weather": "3bac3aa9-dd87-4351-9459-a7099137b028"},
"workflow_tools": {"embedded_workflow_as_tool": "e6024578-41b7-4fb5-a81f-9201358e5835"},
"mcp_tools": {},
}
)
assert "Automatically discovered tools:" in output_lines
assert "Custom API tools" in output_lines
assert "- weather: 3bac3aa9-dd87-4351-9459-a7099137b028" in output_lines
assert "Workflow tools" in output_lines
assert "- embedded_workflow_as_tool: e6024578-41b7-4fb5-a81f-9201358e5835" in output_lines
assert "MCP tools" in output_lines
assert "- none" in output_lines
def test_resolve_mcp_tool_names_does_not_compare_non_uuid_identifier_to_uuid_id(monkeypatch):
statements = []
def capture_scalar(statement):
statements.append(str(statement))
monkeypatch.setattr("commands.data_migration.db.session.scalar", capture_scalar)
assert _resolve_mcp_tool_names("49a99e46-bc2c-4885-91fa-47615f6192b5", {"my-test-mcp": "my-test-mcp"}) == {
"my-test-mcp": "my-test-mcp"
}
assert "tool_mcp_providers.id =" not in statements[0]
assert "tool_mcp_providers.server_identifier =" in statements[0]
def test_prompt_additional_tools_prints_final_selection_when_skipped(monkeypatch):
output_lines = []
confirm_prompts = []
def capture_confirm(prompt, **kwargs):
confirm_prompts.append((prompt, kwargs))
return False
monkeypatch.setattr("commands.data_migration.click.confirm", capture_confirm)
monkeypatch.setattr("commands.data_migration.click.echo", output_lines.append)
selected = _prompt_additional_tools(
"tenant-id",
{
"api_tools": {"weather": "3bac3aa9-dd87-4351-9459-a7099137b028"},
"workflow_tools": {},
"mcp_tools": {},
},
)
assert selected == {"api_tools": [], "workflow_tools": [], "mcp_tools": []}
assert confirm_prompts == [
("Export additional tools manually? [y/n, default: n]", {"default": False, "show_default": False})
]
assert "Final tools to export:" in output_lines
assert "- [auto] weather: 3bac3aa9-dd87-4351-9459-a7099137b028" in output_lines
def test_final_tool_selection_deduplicates_manual_tool_already_auto(monkeypatch):
output_lines = []
monkeypatch.setattr("commands.data_migration.click.echo", output_lines.append)
_print_final_tool_selection(
{
"api_tools": {},
"workflow_tools": {"embedded_workflow_as_tool": "e6024578-41b7-4fb5-a81f-9201358e5835"},
"mcp_tools": {},
},
{
"api_tools": [],
"workflow_tools": ["e6024578-41b7-4fb5-a81f-9201358e5835"],
"mcp_tools": [],
},
{"e6024578-41b7-4fb5-a81f-9201358e5835": "embedded_workflow_as_tool: e6024578"},
)
assert "- [auto] embedded_workflow_as_tool: e6024578-41b7-4fb5-a81f-9201358e5835" in output_lines
assert not any(line.startswith("- [manual]") for line in output_lines)
def test_prompt_output_file_rejects_yes_no_typo(monkeypatch):
import click
import pytest
monkeypatch.setattr("commands.data_migration.click.prompt", lambda *args, **kwargs: "y")
with pytest.raises(click.ClickException, match="Output path must be a file path"):
_prompt_output_file()
def test_confirm_wizard_summary_shows_conflict_strategy(monkeypatch):
output_lines = []
confirm_prompts = []
monkeypatch.setattr("commands.data_migration.click.echo", output_lines.append)
monkeypatch.setattr(
"commands.data_migration.click.confirm",
lambda prompt, **kwargs: confirm_prompts.append((prompt, kwargs)) or True,
)
_confirm_wizard_summary(
tenant_name="admin's Workspace",
app_names=["main_chatflow"],
auto_tools={"api_tools": {}, "workflow_tools": {}, "mcp_tools": {}},
additional_tools={"api_tools": [], "workflow_tools": [], "mcp_tools": []},
manual_labels={},
include_referenced_tools=True,
include_secrets=False,
create_tokens=True,
id_strategy="preserve-id",
conflict_strategy="fail",
output_file="migration-data.json",
)
assert "id strategy: preserve-id" in output_lines
assert "conflict strategy: fail" in output_lines
assert confirm_prompts == [("Write migration package? [y/n, default: y]", {"default": True, "show_default": False})]
def test_confirm_wizard_summary_shows_final_deduplicated_tool_selection(monkeypatch):
output_lines = []
monkeypatch.setattr("commands.data_migration.click.echo", output_lines.append)
monkeypatch.setattr("commands.data_migration.click.confirm", lambda *args, **kwargs: True)
_confirm_wizard_summary(
tenant_name="admin's Workspace",
app_names=["main_chatflow"],
auto_tools={
"api_tools": {"weather": "weather-id"},
"workflow_tools": {"embedded_workflow_as_tool": "workflow-tool-id"},
"mcp_tools": {},
},
additional_tools={
"api_tools": ["weather-id", "calendar"],
"workflow_tools": [],
"mcp_tools": ["mcp-id"],
},
manual_labels={
"calendar": "calendar: calendar-id",
"mcp-id": "my-test-mcp: mcp-id",
},
include_referenced_tools=True,
include_secrets=False,
create_tokens=False,
id_strategy="preserve-id",
conflict_strategy="update",
output_file="migration-data.json",
)
assert "Final tools to export:" in output_lines
assert "Custom API tools" in output_lines
assert "- [auto] weather: weather-id" in output_lines
assert "- [manual] calendar: calendar-id" in output_lines
assert "Workflow tools" in output_lines
assert "- [auto] embedded_workflow_as_tool: workflow-tool-id" in output_lines
assert "MCP tools" in output_lines
assert "- [manual] my-test-mcp: mcp-id" in output_lines
assert not any(line.startswith("additional api tools:") for line in output_lines)
assert not any(line.startswith("additional workflow tools:") for line in output_lines)
assert not any(line.startswith("additional mcp tools:") for line in output_lines)
assert "- [manual] weather-id" not in output_lines
def test_import_options_prompts_explain_secrets_reuse_and_conflicts(monkeypatch):
from commands.data_migration import _prompt_import_options
output_lines = []
confirm_prompts = []
prompt_calls = []
def capture_confirm(prompt, **kwargs):
confirm_prompts.append((prompt, kwargs))
return False
def capture_prompt(prompt, **kwargs):
prompt_calls.append((prompt, kwargs))
return kwargs["default"]
monkeypatch.setattr("commands.data_migration.click.echo", output_lines.append)
monkeypatch.setattr("commands.data_migration.click.confirm", capture_confirm)
monkeypatch.setattr("commands.data_migration.click.prompt", capture_prompt)
include_secrets, create_tokens, id_strategy, conflict_strategy = _prompt_import_options()
assert include_secrets is False
assert create_tokens is False
assert id_strategy == "preserve-id"
assert conflict_strategy == "update"
assert "Secrets include workflow/app DSL secret values, custom API tool credentials," in output_lines
assert "-- Secrets --" in output_lines
assert "If you choose no, credentials are omitted or masked," in output_lines
assert "-- App API Tokens --" in output_lines
assert "When enabled, import will create an app API token if the imported app has none," in output_lines
assert "or reuse an existing app API token if one already exists." in output_lines
assert "-- ID Strategy --" in output_lines
assert "ID strategy controls whether imported app and tool IDs preserve source IDs" in output_lines
assert "or use target-generated IDs." in output_lines
assert "preserve-id: keep source IDs where the target service supports it." in output_lines
assert (
"generate-new-id: let the target environment generate new IDs and rewrite references via mapping."
in output_lines
)
assert "-- Conflict Strategy --" in output_lines
assert "Conflict strategy controls what import does when a target resource already exists." in output_lines
assert "fail: stop at the first conflict; previously committed resources are not rolled back." in output_lines
assert "skip: keep the existing target resource and skip importing that resource." in output_lines
assert "update: update the existing target resource in place." in output_lines
assert confirm_prompts == [
("Include secrets in output JSON? [y/n, default: n]", {"default": False, "show_default": False}),
("Create or reuse app API tokens during import? [y/n, default: n]", {"default": False, "show_default": False}),
]
assert prompt_calls[0][0] == "Import ID strategy. Enter one of: preserve-id, generate-new-id"
assert prompt_calls[0][1]["default"] == "preserve-id"
assert prompt_calls[0][1]["show_default"] is True
assert prompt_calls[0][1]["type"].choices == ID_STRATEGY_CHOICES
assert prompt_calls[1][0] == "Import conflict strategy. Enter one of: fail, skip, update"
assert prompt_calls[1][1]["default"] == "update"
assert prompt_calls[1][1]["show_default"] is True
assert prompt_calls[1][1]["type"].choices == CONFLICT_STRATEGY_CHOICES

View File

@ -1036,6 +1036,48 @@ class TestSegmentListAdvancedCases:
assert status == 200
assert response["total"] == 1
def test_segment_list_postgres_keyword_filter_handles_scalar_keywords(self, app: Flask):
api = DatasetDocumentSegmentListApi()
method = unwrap(api.get)
dataset = MagicMock()
document = MagicMock()
pagination = MagicMock(items=[], total=0, pages=0)
with (
app.test_request_context("/?keyword=test"),
patch(
"controllers.console.datasets.datasets_segments.current_account_with_tenant",
return_value=(MagicMock(), "11111111-1111-1111-1111-111111111111"),
),
patch(
"controllers.console.datasets.datasets_segments.DatasetService.get_dataset",
return_value=dataset,
),
patch(
"controllers.console.datasets.datasets_segments.DatasetService.check_dataset_permission",
return_value=None,
),
patch(
"controllers.console.datasets.datasets_segments.DocumentService.get_document",
return_value=document,
),
patch(
"controllers.console.datasets.datasets_segments.dify_config",
SimpleNamespace(SQLALCHEMY_DATABASE_URI_SCHEME="postgresql"),
),
patch(
"controllers.console.datasets.datasets_segments.db.paginate",
return_value=pagination,
) as paginate_mock,
):
method(api, "22222222-2222-2222-2222-222222222222", "33333333-3333-3333-3333-333333333333")
query = paginate_mock.call_args.kwargs["select"]
sql = str(query.compile(compile_kwargs={"literal_binds": True}))
assert "jsonb_array_elements_text(CASE" in sql
assert "ELSE CAST('[]' AS JSONB)" in sql
def test_segment_list_permission_denied(self, app: Flask):
"""Test segment list with permission denied"""
api = DatasetDocumentSegmentListApi()

View File

@ -3,22 +3,18 @@ from unittest.mock import MagicMock, patch
import pytest
from flask import Flask
from werkzeug.exceptions import HTTPException
import services
from controllers.console.auth.error import (
CannotTransferOwnerToSelfError,
EmailCodeError,
InvalidEmailError,
InvalidTokenError,
MemberNotInTenantError,
NotOwnerError,
OwnerTransferLimitError,
)
from controllers.console.error import EmailSendIpLimitError, WorkspaceMembersLimitExceeded
from controllers.console.workspace.members import (
DatasetOperatorMemberListApi,
MemberCancelInviteApi,
MemberInviteEmailApi,
MemberListApi,
MemberUpdateRoleApi,
@ -251,135 +247,7 @@ class TestMemberInviteEmailApi:
assert result["invitation_results"][0]["status"] == "failed"
class TestMemberCancelInviteApi:
def test_cancel_success(self, app: Flask):
api = MemberCancelInviteApi()
method = unwrap(api.delete)
tenant = MagicMock(id="t1")
user = MagicMock(current_tenant=tenant)
member = MagicMock()
with (
app.test_request_context("/"),
patch("controllers.console.workspace.members.current_account_with_tenant", return_value=(user, "t1")),
patch("controllers.console.workspace.members.db.session.get") as get_mock,
patch("controllers.console.workspace.members.TenantService.remove_member_from_tenant"),
):
get_mock.return_value = member
result, status = method(api, member.id)
assert status == 200
assert result["result"] == "success"
def test_cancel_not_found(self, app: Flask):
api = MemberCancelInviteApi()
method = unwrap(api.delete)
tenant = MagicMock(id="t1")
user = MagicMock(current_tenant=tenant)
with (
app.test_request_context("/"),
patch("controllers.console.workspace.members.current_account_with_tenant", return_value=(user, "t1")),
patch("controllers.console.workspace.members.db.session.get") as get_mock,
):
get_mock.return_value = None
with pytest.raises(HTTPException):
method(api, "x")
def test_cancel_cannot_operate_self(self, app: Flask):
api = MemberCancelInviteApi()
method = unwrap(api.delete)
tenant = MagicMock(id="t1")
user = MagicMock(current_tenant=tenant)
member = MagicMock()
with (
app.test_request_context("/"),
patch("controllers.console.workspace.members.current_account_with_tenant", return_value=(user, "t1")),
patch("controllers.console.workspace.members.db.session.get") as get_mock,
patch(
"controllers.console.workspace.members.TenantService.remove_member_from_tenant",
side_effect=services.errors.account.CannotOperateSelfError("x"),
),
):
get_mock.return_value = member
result, status = method(api, member.id)
assert status == 400
def test_cancel_no_permission(self, app: Flask):
api = MemberCancelInviteApi()
method = unwrap(api.delete)
tenant = MagicMock(id="t1")
user = MagicMock(current_tenant=tenant)
member = MagicMock()
with (
app.test_request_context("/"),
patch("controllers.console.workspace.members.current_account_with_tenant", return_value=(user, "t1")),
patch("controllers.console.workspace.members.db.session.get") as get_mock,
patch(
"controllers.console.workspace.members.TenantService.remove_member_from_tenant",
side_effect=services.errors.account.NoPermissionError("x"),
),
):
get_mock.return_value = member
result, status = method(api, member.id)
assert status == 403
def test_cancel_member_not_in_tenant(self, app: Flask):
api = MemberCancelInviteApi()
method = unwrap(api.delete)
tenant = MagicMock(id="t1")
user = MagicMock(current_tenant=tenant)
member = MagicMock()
with (
app.test_request_context("/"),
patch("controllers.console.workspace.members.current_account_with_tenant", return_value=(user, "t1")),
patch("controllers.console.workspace.members.db.session.get") as get_mock,
patch(
"controllers.console.workspace.members.TenantService.remove_member_from_tenant",
side_effect=services.errors.account.MemberNotInTenantError(),
),
):
get_mock.return_value = member
result, status = method(api, member.id)
assert status == 404
class TestMemberUpdateRoleApi:
def test_update_success(self, app: Flask):
api = MemberUpdateRoleApi()
method = unwrap(api.put)
tenant = MagicMock()
user = MagicMock(current_tenant=tenant)
member = MagicMock()
payload = {"role": "normal"}
with (
app.test_request_context("/", json=payload),
patch("controllers.console.workspace.members.current_account_with_tenant", return_value=(user, "t1")),
patch("controllers.console.workspace.members.db.session.get", return_value=member),
patch("controllers.console.workspace.members.TenantService.update_member_role"),
):
result = method(api, "id")
if isinstance(result, tuple):
result = result[0]
assert result["result"] == "success"
def test_update_invalid_role(self, app: Flask):
api = MemberUpdateRoleApi()
method = unwrap(api.put)
@ -391,23 +259,6 @@ class TestMemberUpdateRoleApi:
assert status == 400
def test_update_member_not_found(self, app: Flask):
api = MemberUpdateRoleApi()
method = unwrap(api.put)
payload = {"role": "normal"}
with (
app.test_request_context("/", json=payload),
patch(
"controllers.console.workspace.members.current_account_with_tenant",
return_value=(MagicMock(current_tenant=MagicMock()), "t1"),
),
patch("controllers.console.workspace.members.db.session.get", return_value=None),
):
with pytest.raises(HTTPException):
method(api, "id")
class TestDatasetOperatorMemberListApi:
def test_get_success(self, app: Flask):
@ -637,27 +488,3 @@ class TestOwnerTransferApi:
):
with pytest.raises(InvalidTokenError):
method(api, "2")
def test_member_not_in_tenant(self, app: Flask):
api = OwnerTransfer()
method = unwrap(api.post)
tenant = MagicMock()
user = MagicMock(id="1", email="a@test.com", current_tenant=tenant)
member = MagicMock()
payload = {"token": "t"}
with (
app.test_request_context("/", json=payload),
patch("controllers.console.workspace.members.current_account_with_tenant", return_value=(user, "t1")),
patch("controllers.console.workspace.members.TenantService.is_owner", return_value=True),
patch(
"controllers.console.workspace.members.AccountService.get_owner_transfer_data",
return_value={"email": "a@test.com"},
),
patch("controllers.console.workspace.members.db.session.get", return_value=member),
patch("controllers.console.workspace.members.TenantService.is_member", return_value=False),
):
with pytest.raises(MemberNotInTenantError):
method(api, "2")

View File

@ -0,0 +1,42 @@
import logging
from types import SimpleNamespace
from core.tools.errors import ToolProviderNotFoundError
from events.event_handlers import delete_tool_parameters_cache_when_sync_draft_workflow as handler_module
def test_missing_tool_provider_does_not_log_error_traceback(monkeypatch, caplog):
app = SimpleNamespace(id="workflow-id", tenant_id="tenant-id")
workflow = SimpleNamespace(
graph_dict={
"nodes": [
{
"id": "node-id",
"data": {
"type": "tool",
},
}
]
}
)
tool_entity = SimpleNamespace(
provider_type=SimpleNamespace(value="mcp"),
provider_id="my-test-mcp-server",
provider_name="my-test-mcp-server",
tool_name="echo",
credential_id=None,
)
monkeypatch.setattr(handler_module, "adapt_node_config_for_graph", lambda node_data: {"data": node_data["data"]})
monkeypatch.setattr(handler_module.ToolEntity, "model_validate", lambda data: tool_entity)
monkeypatch.setattr(
handler_module.ToolManager,
"get_tool_runtime",
lambda **kwargs: (_ for _ in ()).throw(ToolProviderNotFoundError("mcp provider not found")),
)
with caplog.at_level(logging.INFO, logger=handler_module.logger.name):
handler_module.handle(app, synced_draft_workflow=workflow)
assert not [record for record in caplog.records if record.levelno >= logging.ERROR]
assert "Skipped deleting tool parameters cache" in caplog.text

View File

@ -0,0 +1,115 @@
from services.data_migration.dependency_discovery_service import DependencyDiscoveryService
from services.data_migration.entities import DependencyKind
def test_discovers_and_deduplicates_standalone_tool_nodes():
graph = {
"graph": {
"nodes": [
{"data": {"type": "tool", "provider_type": "api", "provider_id": "weather"}},
{"data": {"type": "tool", "provider_type": "api", "provider_id": "weather"}},
{"data": {"type": "tool", "provider_type": "workflow", "provider_id": "wf-tool-1"}},
{"data": {"type": "tool", "provider_type": "builtin", "provider_id": "google_search"}},
]
}
}
dependencies = DependencyDiscoveryService().discover_from_dsl(graph)
assert [(item.kind, item.provider_id) for item in dependencies] == [
(DependencyKind.API_TOOL, "weather"),
(DependencyKind.WORKFLOW_TOOL, "wf-tool-1"),
(DependencyKind.BUILTIN_OR_PLUGIN_TOOL, "google_search"),
]
def test_discovers_agent_node_tools():
graph = {
"graph": {
"nodes": [
{
"data": {
"type": "agent",
"tools": [
{"provider_type": "mcp", "provider_id": "mcp-1"},
{"provider_type": "api", "provider_id": "api-1"},
],
}
}
]
}
}
dependencies = DependencyDiscoveryService().discover_from_dsl(graph)
assert [(item.kind, item.provider_id) for item in dependencies] == [
(DependencyKind.MCP_TOOL, "mcp-1"),
(DependencyKind.API_TOOL, "api-1"),
]
def test_discovers_tool_nodes_from_exported_workflow_dsl_shape():
dsl = {
"workflow": {
"graph": {
"nodes": [
{
"data": {
"type": "tool",
"provider_type": "api",
"provider_id": "api-provider-id",
"provider_name": "weather",
}
},
{
"data": {
"type": "tool",
"provider_type": "workflow",
"provider_id": "workflow-tool-id",
"provider_name": "embedded_workflow",
}
},
]
}
}
}
dependencies = DependencyDiscoveryService().discover_from_dsl(dsl)
assert [(item.kind, item.provider_id, item.provider_name) for item in dependencies] == [
(DependencyKind.API_TOOL, "api-provider-id", "weather"),
(DependencyKind.WORKFLOW_TOOL, "workflow-tool-id", "embedded_workflow"),
]
def test_discovers_agent_tools_from_exported_agent_parameter_shape():
dsl = {
"workflow": {
"graph": {
"nodes": [
{
"data": {
"type": "agent",
"agent_parameters": {
"tools": {
"value": [
{
"provider_type": "api",
"provider_id": "api-provider-id",
"provider_name": "weather",
}
]
}
},
}
}
]
}
}
}
dependencies = DependencyDiscoveryService().discover_from_dsl(dsl)
assert [(item.kind, item.provider_id, item.provider_name) for item in dependencies] == [
(DependencyKind.API_TOOL, "api-provider-id", "weather"),
]

View File

@ -0,0 +1,71 @@
import pytest
from services.data_migration.entities import (
ConflictStrategy,
IdStrategy,
ImportOptions,
MigrationDataError,
MigrationMetadata,
MigrationPackage,
SourceTenant,
)
def test_import_options_defaults_match_spec():
options = ImportOptions()
assert options.create_app_api_token_on_import is False
assert options.id_strategy == IdStrategy.PRESERVE_ID
assert options.conflict_strategy == ConflictStrategy.FAIL
def test_metadata_requires_version():
with pytest.raises(MigrationDataError, match="metadata.version"):
MigrationMetadata.from_mapping({})
def test_metadata_parses_source_tenants_and_import_options():
metadata = MigrationMetadata.from_mapping(
{
"version": "1",
"source_scope": "single",
"source_tenants": [{"id": "tenant-1", "name": "source"}],
"target_tenant": {"name": "prod"},
"include_secrets": False,
"import_options": {"conflict_strategy": "update"},
}
)
assert metadata.version == "1"
assert metadata.source_scope == "single"
assert metadata.source_tenants == [SourceTenant(id="tenant-1", name="source")]
assert metadata.target_tenant == {"name": "prod"}
assert metadata.import_options.conflict_strategy == ConflictStrategy.UPDATE
assert metadata.import_options.id_strategy == IdStrategy.PRESERVE_ID
def test_import_options_invalid_strategy_raises_domain_error():
with pytest.raises(MigrationDataError, match="id_strategy"):
ImportOptions.from_mapping({"id_strategy": "unknown"})
with pytest.raises(MigrationDataError, match="id_strategy"):
ImportOptions.from_mapping({"id_strategy": "map-id"})
with pytest.raises(MigrationDataError, match="conflict_strategy"):
ImportOptions.from_mapping({"conflict_strategy": "unknown"})
with pytest.raises(MigrationDataError, match="conflict_strategy"):
ImportOptions.from_mapping({"conflict_strategy": "replace"})
def test_metadata_rejects_invalid_target_tenant_shape():
with pytest.raises(MigrationDataError, match="target_tenant"):
MigrationMetadata.from_mapping({"version": "1", "target_tenant": "prod"})
def test_migration_package_sections_must_be_lists_of_objects():
with pytest.raises(MigrationDataError, match="workflows"):
MigrationPackage.from_mapping({"metadata": {"version": "1"}, "workflows": {"id": "app-1"}})
with pytest.raises(MigrationDataError, match="tools"):
MigrationPackage.from_mapping({"metadata": {"version": "1"}, "tools": ["weather"]})

View File

@ -0,0 +1,201 @@
import pytest
from services.data_migration.dependency_discovery_service import DiscoveredDependency
from services.data_migration.entities import (
ConflictStrategy,
DependencyKind,
IdStrategy,
MigrationDataError,
ResourceType,
)
from services.data_migration.export_service import ExportConfigParser, MigrationExportService
def test_export_config_parser_accepts_new_scripted_shape():
selection = ExportConfigParser().parse(
{
"source_tenant": {"mode": "single", "name": "admin's Workspace"},
"apps": {"modes": ["workflow", "advanced-chat"], "ids": ["app-1"], "all": False},
"include_referenced_tools": True,
"additional_tools": {
"api_tools": ["weather"],
"workflow_tools": ["workflow-tool-1"],
"mcp_tools": ["mcp-1"],
},
"include_secrets": False,
"import_options": {
"create_app_api_token_on_import": True,
"id_strategy": "preserve-id",
"conflict_strategy": "fail",
},
}
)
assert selection.source_tenant_name == "admin's Workspace"
assert selection.app_ids == ["app-1"]
assert selection.export_all_apps is False
assert selection.additional_api_tools == ["weather"]
assert selection.additional_workflow_tools == ["workflow-tool-1"]
assert selection.additional_mcp_tools == ["mcp-1"]
assert selection.include_secrets is False
assert selection.import_options.create_app_api_token_on_import is True
assert selection.import_options.id_strategy == IdStrategy.PRESERVE_ID
assert selection.import_options.conflict_strategy == ConflictStrategy.FAIL
def test_export_config_parser_defaults_to_secret_free_all_apps():
selection = ExportConfigParser().parse(
{
"source_tenant": {"name": "source"},
"apps": {"all": True},
}
)
assert selection.export_all_apps is True
assert selection.include_referenced_tools is True
assert selection.include_secrets is False
def test_export_config_parser_requires_explicit_source_tenant_name_for_new_shape():
with pytest.raises(MigrationDataError, match="source_tenant.name"):
ExportConfigParser().parse({"source_tenant": {"mode": "single"}, "apps": {"all": True}})
def test_export_config_parser_accepts_limited_backwards_draft_shape():
selection = ExportConfigParser().parse(
{
"tenant_name": "legacy-source",
"workflows": ["app-1"],
"tools": ["weather"],
"workflow_tools": ["wf-tool-1"],
"mcp_tools": ["mcp-1"],
"export_all_workflows": False,
}
)
assert selection.source_tenant_name == "legacy-source"
assert selection.app_ids == ["app-1"]
assert selection.additional_api_tools == ["weather"]
assert selection.additional_workflow_tools == ["wf-tool-1"]
assert selection.additional_mcp_tools == ["mcp-1"]
assert selection.include_secrets is False
def test_export_config_parser_rejects_unsupported_app_modes():
with pytest.raises(MigrationDataError, match="Unsupported app modes"):
ExportConfigParser().parse(
{
"source_tenant": {"name": "source"},
"apps": {"modes": ["chat"], "all": True},
}
)
def test_secret_free_api_tool_export_uses_masking_and_omits_credentials(monkeypatch):
calls = []
def fake_get_api_provider(provider: str, tenant_id: str, mask: bool = True):
calls.append((provider, tenant_id, mask))
return {"credentials": {"api_key": "masked"}, "schema": {"openapi": "3.0.0"}, "tools": ["unused"]}
monkeypatch.setattr(
"services.data_migration.export_service.ToolManager.user_get_api_provider",
fake_get_api_provider,
)
service = MigrationExportService()
tools: list[dict] = []
report_items = []
service._export_api_tools(
"tenant-1",
["weather"],
include_secrets=False,
exported_tools=tools,
report_items=report_items,
)
assert calls == [("weather", "tenant-1", True)]
assert tools == [{"schema": {"openapi": "3.0.0"}, "provider_name": "weather", "source_tenant_id": "tenant-1"}]
assert report_items[0].resource_type == ResourceType.API_TOOL
def test_secret_free_mcp_dependencies_are_dependency_only():
service = MigrationExportService()
dependencies: list[dict] = []
mcp_tools: list[dict] = []
report_items = []
service._export_mcp_tools(
tenant_id="tenant-1",
provider_ids=["mcp-1"],
include_secrets=False,
exported_mcp_tools=mcp_tools,
dependencies=dependencies,
report_items=report_items,
)
assert mcp_tools == []
assert dependencies == [
{
"kind": DependencyKind.MCP_TOOL.value,
"provider_id": "mcp-1",
"provider_name": None,
"source": "mcp_provider",
}
]
assert report_items[0].status == "dependency-only"
assert report_items[0].name == "mcp_tool mcp-1"
def test_get_mcp_provider_does_not_compare_non_uuid_identifier_to_uuid_id(monkeypatch):
statements = []
def capture_scalar(statement):
statements.append(str(statement))
monkeypatch.setattr("services.data_migration.export_service.db.session.scalar", capture_scalar)
with pytest.raises(MigrationDataError, match="MCP provider not found"):
MigrationExportService()._get_mcp_provider("tenant-1", "my-test-mcp")
assert len(statements) == 1
assert "tool_mcp_providers.id =" not in statements[0]
assert "tool_mcp_providers.server_identifier =" in statements[0]
def test_dependency_ids_are_deduplicated_with_manual_selection_first():
service = MigrationExportService()
provider_ids = service._provider_ids(
manual_provider_ids=["weather", "weather", "manual"],
discovered_dependencies=[
DiscoveredDependency(DependencyKind.API_TOOL, "weather"),
DiscoveredDependency(DependencyKind.API_TOOL, "forecast"),
DiscoveredDependency(DependencyKind.WORKFLOW_TOOL, "workflow-tool"),
],
kind=DependencyKind.API_TOOL,
)
assert provider_ids == ["weather", "manual", "forecast"]
def test_api_provider_ids_use_provider_name_from_discovered_dependencies():
service = MigrationExportService()
provider_ids = service._provider_ids(
manual_provider_ids=[],
discovered_dependencies=[
DiscoveredDependency(DependencyKind.API_TOOL, "api-provider-id", provider_name="weather"),
],
kind=DependencyKind.API_TOOL,
)
assert provider_ids == ["weather"]
def test_mcp_authentication_export_omits_runtime_header_shape():
service = MigrationExportService()
assert service._serialize_mcp_authentication({"Authorization": "Bearer token"}) is None
assert service._serialize_mcp_authentication({"client_id": "id", "client_secret": "secret"}) == {
"client_id": "id",
"client_secret": "secret",
}

View File

@ -0,0 +1,973 @@
import pytest
import yaml
from models.tools import MCPToolProvider, WorkflowToolProvider
from services.app_dsl_service import Import
from services.data_migration.entities import (
ConflictStrategy,
IdStrategy,
ImportOptions,
ImportTarget,
MigrationDataError,
MigrationPackage,
ResourceIdMapping,
ResourceReportItem,
ResourceType,
)
from services.data_migration.import_service import ImportRequest, ImportTargetResolver, MigrationImportService
from services.entities.dsl_entities import ImportStatus
def test_target_tenant_precedence_cli_then_config_then_package():
package = MigrationPackage.from_mapping(
{
"metadata": {
"version": "1",
"source_scope": "single",
"source_tenants": [],
"target_tenant": {"name": "from-package"},
}
}
)
resolver = ImportTargetResolver()
assert (
resolver.select_target_tenant_name(
ImportRequest(package=package, cli_target_tenant="from-cli", config_target_tenant="from-config")
)
== "from-cli"
)
assert (
resolver.select_target_tenant_name(
ImportRequest(package=package, cli_target_tenant=None, config_target_tenant="from-config")
)
== "from-config"
)
assert (
resolver.select_target_tenant_name(
ImportRequest(package=package, cli_target_tenant=None, config_target_tenant=None)
)
== "from-package"
)
def test_target_tenant_missing_fails_before_import():
package = MigrationPackage.from_mapping({"metadata": {"version": "1", "source_scope": "single"}})
with pytest.raises(MigrationDataError, match="Target tenant"):
ImportTargetResolver().select_target_tenant_name(
ImportRequest(package=package, cli_target_tenant=None, config_target_tenant=None)
)
def test_package_target_tenant_id_can_be_used_without_name():
package = MigrationPackage.from_mapping(
{"metadata": {"version": "1", "source_scope": "single", "target_tenant": {"id": "tenant-id"}}}
)
assert ImportTargetResolver().select_target_tenant_name(ImportRequest(package=package)) == "tenant-id"
def test_target_tenant_name_is_not_treated_as_uuid():
resolver = ImportTargetResolver()
assert resolver._is_uuid("admin's Workspace") is False
assert resolver._is_uuid("49a99e46-bc2c-4885-91fa-47615f6192b5") is True
def test_package_target_tenant_id_ignores_invalid_uuid(monkeypatch):
package = MigrationPackage.from_mapping(
{"metadata": {"version": "1", "source_scope": "single", "target_tenant": {"id": "not-a-uuid"}}}
)
class StubSession:
def get(self, model, identifier):
raise AssertionError("invalid UUID should not be passed to session.get")
def scalars(self, statement):
class EmptyResult:
def all(self):
return []
return EmptyResult()
from services.data_migration import import_service
monkeypatch.setattr(import_service.db, "session", StubSession())
with pytest.raises(MigrationDataError, match="Target tenant not found"):
ImportTargetResolver().resolve(ImportRequest(package=package))
def test_options_override_replaces_package_defaults():
package = MigrationPackage.from_mapping(
{
"metadata": {
"version": "1",
"source_scope": "single",
"target_tenant": {"name": "target"},
"import_options": {
"create_app_api_token_on_import": True,
"conflict_strategy": "update",
},
}
}
)
captured_options: list[ImportOptions] = []
class StubResolver(ImportTargetResolver):
def resolve(self, request: ImportRequest) -> ImportTarget:
return ImportTarget(
tenant_id="tenant-1",
tenant_name="target",
operator_id="account-1",
operator_email="owner@example.com",
)
class CapturingImportService(MigrationImportService):
def _import_workflows(
self,
package: MigrationPackage,
target: ImportTarget,
options: ImportOptions,
report_items: list[ResourceReportItem],
id_mapping: dict[str, str],
**kwargs,
) -> None:
captured_options.append(options)
override = ImportOptions(create_app_api_token_on_import=False, conflict_strategy=ConflictStrategy.SKIP)
CapturingImportService(target_resolver=StubResolver()).import_package(
ImportRequest(package=package, options_override=override)
)
assert captured_options == [override]
def test_only_preserve_id_strategy_reuses_source_app_id():
service = MigrationImportService()
assert service._should_preserve_source_app_id(ImportOptions(id_strategy=IdStrategy.PRESERVE_ID)) is True
assert service._should_preserve_source_app_id(ImportOptions(id_strategy=IdStrategy.GENERATE_NEW_ID)) is False
def test_find_existing_app_ignores_invalid_uuid(monkeypatch):
class StubSession:
def scalar(self, statement):
raise AssertionError("invalid UUID should not be queried against App.id")
from services.data_migration import import_service
monkeypatch.setattr(import_service.db, "session", StubSession())
assert MigrationImportService()._find_existing_app("not-a-uuid", "tenant-1") is None
def test_find_existing_workflow_tool_does_not_compare_invalid_uuid(monkeypatch):
captured = []
class StubSession:
def scalar(self, statement):
captured.append(statement)
from services.data_migration import import_service
monkeypatch.setattr(import_service.db, "session", StubSession())
MigrationImportService()._find_existing_workflow_tool("tenant-1", "not-a-uuid", "tool-name", "app-id")
where_clause = str(captured[0].whereclause)
assert f"{WorkflowToolProvider.__tablename__}.id" not in where_clause
def test_find_existing_mcp_tool_does_not_compare_invalid_uuid(monkeypatch):
captured = []
class StubSession:
def scalar(self, statement):
captured.append(statement)
from services.data_migration import import_service
monkeypatch.setattr(import_service.db, "session", StubSession())
MigrationImportService()._find_existing_mcp_tool("tenant-1", "my-test-mcp", "my-test-mcp")
where_clause = str(captured[0].whereclause)
assert f"{MCPToolProvider.__tablename__}.id" not in where_clause
assert f"{MCPToolProvider.__tablename__}.name" not in where_clause
def test_workflow_app_import_does_not_wrap_app_dsl_import_in_nested_transaction(monkeypatch):
class FailingNestedTransaction:
def __enter__(self):
raise AssertionError("nested transaction should not be opened")
def __exit__(self, exc_type, exc_value, traceback):
return False
class StubSession:
def begin_nested(self):
return FailingNestedTransaction()
def commit(self):
return None
class StubAppDslService:
def __init__(self, session):
self.session = session
def import_app(self, **kwargs):
return Import(id="import-id", status=ImportStatus.COMPLETED, app_id="imported-app-id")
from services.data_migration import import_service
monkeypatch.setattr(import_service.db, "session", StubSession())
monkeypatch.setattr(import_service, "AppDslService", StubAppDslService)
imported_app_id = MigrationImportService()._import_workflow_app(
account=object(),
workflow_data={"name": "main_chatflow"},
dsl_content="app:\n mode: workflow\n",
app_id="source-app-id",
existing_app=None,
options=ImportOptions(id_strategy=IdStrategy.PRESERVE_ID),
)
assert imported_app_id == "imported-app-id"
def test_rewrite_workflow_dsl_replaces_tool_provider_ids():
dsl_content = yaml.safe_dump(
{
"app": {"mode": "workflow"},
"workflow": {
"graph": {
"nodes": [
{
"data": {
"type": "tool",
"provider_id": "source-api-provider-id",
"provider_name": "weather",
}
},
{
"data": {
"type": "agent",
"agent_parameters": {
"tools": {
"value": [
{
"provider_id": "source-agent-provider-id",
"provider_name": "agent_weather",
}
]
}
},
}
},
]
}
},
}
)
rewritten = MigrationImportService()._rewrite_workflow_dsl_provider_ids(
dsl_content,
{
"source-api-provider-id": "target-api-provider-id",
"source-agent-provider-id": "target-agent-provider-id",
},
)
graph = yaml.safe_load(rewritten)["workflow"]["graph"]
assert graph["nodes"][0]["data"]["provider_id"] == "target-api-provider-id"
assert (
graph["nodes"][1]["data"]["agent_parameters"]["tools"]["value"][0]["provider_id"] == "target-agent-provider-id"
)
def test_source_api_provider_ids_are_discovered_from_workflow_dsl():
package = MigrationPackage.from_mapping(
{
"metadata": {"version": "1", "source_scope": "single"},
"workflows": [
{
"dsl": yaml.safe_dump(
{
"workflow": {
"graph": {
"nodes": [
{
"data": {
"type": "tool",
"provider_id": "source-api-provider-id",
"provider_name": "weather",
"provider_type": "api",
}
}
]
}
}
}
)
}
],
}
)
assert MigrationImportService()._source_api_provider_ids_by_name(package) == {"weather": {"source-api-provider-id"}}
def test_workflow_tool_import_publishes_referenced_app_before_create(monkeypatch):
events = []
account = type("Account", (), {"id": "account-1"})()
class StubSession:
def get(self, model, identifier):
return account
class PublishingImportService(MigrationImportService):
def _find_existing_app(self, app_id, tenant_id):
return object()
def _find_existing_workflow_tool(self, tenant_id, workflow_tool_id, tool_name, app_id):
if ("created", app_id) in events:
return type("WorkflowToolProvider", (), {"id": workflow_tool_id or "created-workflow-tool-id"})()
return None
def _ensure_workflow_app_is_published(self, target, account, app_id):
events.append(("published", app_id))
from services.data_migration import import_service
monkeypatch.setattr(import_service.db, "session", StubSession())
monkeypatch.setattr(
import_service.WorkflowToolManageService,
"create_workflow_tool",
lambda **kwargs: events.append(("created", kwargs["workflow_app_id"])),
)
PublishingImportService()._import_workflow_tools(
MigrationPackage.from_mapping(
{
"metadata": {"version": "1", "source_scope": "single"},
"workflow_tools": [
{
"id": "workflow-tool-1",
"name": "embedded_workflow_as_tool",
"app_id": "workflow-app-1",
}
],
}
),
ImportTarget(
tenant_id="tenant-1",
tenant_name="target",
operator_id="account-1",
operator_email="owner@example.com",
),
ImportOptions(),
{},
[],
[],
)
assert events == [("published", "workflow-app-1"), ("created", "workflow-app-1")]
@pytest.mark.parametrize(
("id_strategy", "expected_import_id"),
[
(IdStrategy.PRESERVE_ID, "source-workflow-tool-id"),
(IdStrategy.GENERATE_NEW_ID, ""),
],
)
def test_workflow_tool_import_id_follows_id_strategy(monkeypatch, id_strategy, expected_import_id):
created_kwargs = []
target_provider = type("WorkflowToolProvider", (), {"id": "target-workflow-tool-id"})()
account = type("Account", (), {"id": "account-1"})()
id_mapping = {"source-app-id": "target-app-id"}
id_mapping_details = []
class StubSession:
def get(self, model, identifier):
return account
class StrategyImportService(MigrationImportService):
def _find_existing_app(self, app_id, tenant_id):
return object()
def _find_existing_workflow_tool(self, tenant_id, workflow_tool_id, tool_name, app_id):
return target_provider if created_kwargs else None
def _ensure_workflow_app_is_published(self, target, account, app_id):
return None
from services.data_migration import import_service
monkeypatch.setattr(import_service.db, "session", StubSession())
monkeypatch.setattr(
import_service.WorkflowToolManageService,
"create_workflow_tool",
lambda **kwargs: created_kwargs.append(kwargs),
)
StrategyImportService()._import_workflow_tools(
MigrationPackage.from_mapping(
{
"metadata": {"version": "1", "source_scope": "single"},
"workflow_tools": [
{
"id": "source-workflow-tool-id",
"name": "embedded_workflow_as_tool",
"app_id": "source-app-id",
}
],
}
),
ImportTarget(
tenant_id="tenant-1",
tenant_name="target",
operator_id="account-1",
operator_email="owner@example.com",
),
ImportOptions(id_strategy=id_strategy),
id_mapping,
id_mapping_details,
[],
)
assert created_kwargs[0]["import_id"] == expected_import_id
assert id_mapping["source-workflow-tool-id"] == "target-workflow-tool-id"
assert id_mapping_details == [
ResourceIdMapping(
ResourceType.WORKFLOW_TOOL,
"embedded_workflow_as_tool",
"source-workflow-tool-id",
"target-workflow-tool-id",
)
]
def test_workflow_tool_skip_records_id_mapping(monkeypatch):
account = type("Account", (), {"id": "account-1"})()
existing_provider = type("WorkflowToolProvider", (), {"id": "existing-workflow-tool-id"})()
id_mapping = {"source-app-id": "target-app-id"}
class StubSession:
def get(self, model, identifier):
return account
class SkipImportService(MigrationImportService):
def _find_existing_app(self, app_id, tenant_id):
return object()
def _find_existing_workflow_tool(self, tenant_id, workflow_tool_id, tool_name, app_id):
return existing_provider
def _ensure_workflow_app_is_published(self, target, account, app_id):
return None
from services.data_migration import import_service
monkeypatch.setattr(import_service.db, "session", StubSession())
SkipImportService()._import_workflow_tools(
MigrationPackage.from_mapping(
{
"metadata": {"version": "1", "source_scope": "single"},
"workflow_tools": [
{
"id": "source-workflow-tool-id",
"name": "embedded_workflow_as_tool",
"app_id": "source-app-id",
}
],
}
),
ImportTarget(
tenant_id="tenant-1",
tenant_name="target",
operator_id="account-1",
operator_email="owner@example.com",
),
ImportOptions(conflict_strategy=ConflictStrategy.SKIP, id_strategy=IdStrategy.GENERATE_NEW_ID),
id_mapping,
[],
[],
)
assert id_mapping["source-workflow-tool-id"] == "existing-workflow-tool-id"
@pytest.mark.parametrize("conflict_strategy", [ConflictStrategy.SKIP, ConflictStrategy.UPDATE])
def test_api_tool_existing_provider_records_id_mapping(monkeypatch, conflict_strategy):
target_provider = type("ApiToolProvider", (), {"id": "target-api-provider-id", "name": "weather"})()
id_mapping = {}
id_mapping_details = []
report_items = []
class ExistingApiImportService(MigrationImportService):
def _find_api_tool_provider(self, tenant_id, provider_name):
return target_provider
from services.data_migration import import_service
monkeypatch.setattr(import_service.db.session, "scalar", lambda statement: target_provider)
monkeypatch.setattr(
import_service.ApiToolManageService, "parser_api_schema", lambda schema: {"schema_type": "openapi"}
)
monkeypatch.setattr(import_service.ApiToolManageService, "update_api_tool_provider", lambda **kwargs: None)
ExistingApiImportService()._import_api_tools(
MigrationPackage.from_mapping(
{
"metadata": {"version": "1", "source_scope": "single"},
"tools": [{"id": "source-api-provider-id", "provider_name": "weather", "schema": "openapi: 3.0.0"}],
}
),
ImportTarget(
tenant_id="tenant-1",
tenant_name="target",
operator_id="account-1",
operator_email="owner@example.com",
),
ImportOptions(conflict_strategy=conflict_strategy),
report_items,
id_mapping,
id_mapping_details,
{"weather": {"source-api-provider-id-from-dsl"}},
)
assert id_mapping == {
"source-api-provider-id": "target-api-provider-id",
"source-api-provider-id-from-dsl": "target-api-provider-id",
}
assert (
ResourceIdMapping(ResourceType.API_TOOL, "weather", "source-api-provider-id", "target-api-provider-id")
in id_mapping_details
)
def test_api_tool_create_records_id_mapping(monkeypatch):
target_provider = type("ApiToolProvider", (), {"id": "target-api-provider-id", "name": "weather"})()
id_mapping = {}
class StubSession:
def scalar(self, statement):
return None
class CreatedApiImportService(MigrationImportService):
def _find_api_tool_provider(self, tenant_id, provider_name):
return target_provider
from services.data_migration import import_service
monkeypatch.setattr(import_service.db, "session", StubSession())
monkeypatch.setattr(
import_service.ApiToolManageService, "parser_api_schema", lambda schema: {"schema_type": "openapi"}
)
monkeypatch.setattr(import_service.ApiToolManageService, "create_api_tool_provider", lambda **kwargs: None)
CreatedApiImportService()._import_api_tools(
MigrationPackage.from_mapping(
{
"metadata": {"version": "1", "source_scope": "single"},
"tools": [{"id": "source-api-provider-id", "provider_name": "weather", "schema": "openapi: 3.0.0"}],
}
),
ImportTarget(
tenant_id="tenant-1",
tenant_name="target",
operator_id="account-1",
operator_email="owner@example.com",
),
ImportOptions(),
[],
id_mapping,
[],
{},
)
assert id_mapping["source-api-provider-id"] == "target-api-provider-id"
def test_mcp_tool_import_restores_exported_tool_list(monkeypatch):
provider = type("Provider", (), {"id": "target-provider-id", "tools": "[]", "authed": False})()
report_items = []
class StubSession:
def scalar(self, statement):
return provider
def commit(self):
return None
class StubMCPToolManageService:
def __init__(self, session):
self.session = session
def update_provider(self, **kwargs):
return None
from services.data_migration import import_service
monkeypatch.setattr(import_service.db, "session", StubSession())
monkeypatch.setattr(import_service, "MCPToolManageService", StubMCPToolManageService)
MigrationImportService()._import_mcp_tools(
MigrationPackage.from_mapping(
{
"metadata": {"version": "1", "source_scope": "single"},
"mcp_tools": [
{
"id": "source-provider-id",
"name": "my-test-mcp",
"server_identifier": "my-test-mcp",
"server_url": "http://localhost:3000/mcp",
"configuration": {},
"tools": [{"name": "echo"}],
}
],
}
),
ImportTarget(
tenant_id="tenant-1",
tenant_name="target",
operator_id="account-1",
operator_email="owner@example.com",
),
ImportOptions(conflict_strategy=ConflictStrategy.UPDATE),
report_items,
{},
[],
)
assert provider.tools == '[{"name": "echo"}]'
assert provider.authed is True
@pytest.mark.parametrize("conflict_strategy", [ConflictStrategy.SKIP, ConflictStrategy.UPDATE])
def test_mcp_tool_existing_provider_records_id_mapping(monkeypatch, conflict_strategy):
provider = type("Provider", (), {"id": "target-mcp-provider-id", "tools": "[]", "authed": False})()
id_mapping = {}
id_mapping_details = []
class StubSession:
def commit(self):
return None
class ExistingMCPImportService(MigrationImportService):
def _find_existing_mcp_tool(self, tenant_id, provider_id, server_identifier):
return provider
class StubMCPToolManageService:
def __init__(self, session):
self.session = session
def update_provider(self, **kwargs):
return None
from services.data_migration import import_service
monkeypatch.setattr(import_service.db, "session", StubSession())
monkeypatch.setattr(import_service, "MCPToolManageService", StubMCPToolManageService)
ExistingMCPImportService()._import_mcp_tools(
MigrationPackage.from_mapping(
{
"metadata": {"version": "1", "source_scope": "single"},
"mcp_tools": [
{
"id": "source-mcp-provider-id",
"name": "my-test-mcp",
"server_identifier": "my-test-mcp",
"server_url": "http://localhost:3000/mcp",
"configuration": {},
"tools": [{"name": "echo"}],
}
],
}
),
ImportTarget(
tenant_id="tenant-1",
tenant_name="target",
operator_id="account-1",
operator_email="owner@example.com",
),
ImportOptions(conflict_strategy=conflict_strategy),
[],
id_mapping,
id_mapping_details,
)
assert id_mapping["source-mcp-provider-id"] == "target-mcp-provider-id"
assert "my-test-mcp" not in id_mapping
assert id_mapping_details == [
ResourceIdMapping(ResourceType.MCP_TOOL, "my-test-mcp", "source-mcp-provider-id", "target-mcp-provider-id")
]
def test_mcp_tool_create_records_id_mapping(monkeypatch):
provider = type("Provider", (), {"id": "target-mcp-provider-id", "tools": "[]", "authed": False})()
id_mapping = {}
provider_created = False
class StubSession:
def commit(self):
return None
class CreatedMCPImportService(MigrationImportService):
def _find_existing_mcp_tool(self, tenant_id, provider_id, server_identifier):
return provider if provider_created else None
class StubMCPToolManageService:
def __init__(self, session):
self.session = session
def create_provider(self, **kwargs):
nonlocal provider_created
provider_created = True
from services.data_migration import import_service
monkeypatch.setattr(import_service.db, "session", StubSession())
monkeypatch.setattr(import_service, "MCPToolManageService", StubMCPToolManageService)
CreatedMCPImportService()._import_mcp_tools(
MigrationPackage.from_mapping(
{
"metadata": {"version": "1", "source_scope": "single"},
"mcp_tools": [
{
"id": "source-mcp-provider-id",
"name": "my-test-mcp",
"server_identifier": "my-test-mcp",
"server_url": "http://localhost:3000/mcp",
"configuration": {},
}
],
}
),
ImportTarget(
tenant_id="tenant-1",
tenant_name="target",
operator_id="account-1",
operator_email="owner@example.com",
),
ImportOptions(),
[],
id_mapping,
[],
)
assert id_mapping["source-mcp-provider-id"] == "target-mcp-provider-id"
def test_dependency_only_mcp_preflight_reports_missing_target_provider_with_workflow_context(monkeypatch):
report_items = []
package = MigrationPackage.from_mapping(
{
"metadata": {"version": "1", "source_scope": "single"},
"dependencies": [
{
"kind": "mcp_tool",
"provider_id": "my-test-mcp-server",
"provider_name": "my-test-mcp",
}
],
"workflows": [
{
"name": "workflow2",
"dsl": yaml.safe_dump(
{
"workflow": {
"graph": {
"nodes": [
{
"id": "node-1",
"data": {
"type": "tool",
"provider_type": "mcp",
"provider_id": "my-test-mcp-server",
"tool_name": "echo",
},
}
]
}
}
}
),
}
],
}
)
from services.data_migration import import_service
monkeypatch.setattr(import_service.db.session, "scalar", lambda statement: None)
MigrationImportService()._preflight_dependency_only_mcp(
package,
ImportTarget(
tenant_id="tenant-1",
tenant_name="target",
operator_id="account-1",
operator_email="owner@example.com",
),
report_items,
)
assert report_items == [
ResourceReportItem(
ResourceType.DEPENDENCY,
"my-test-mcp-server",
"mcp_tool my-test-mcp",
"skipped",
"missing in target tenant; referenced by workflow2 / echo; "
"configure it manually before running the workflow.",
)
]
def test_dependency_only_mcp_lookup_does_not_compare_non_uuid_identifier_to_uuid_id(monkeypatch):
captured = []
class StubSession:
def scalar(self, statement):
captured.append(statement)
from services.data_migration import import_service
monkeypatch.setattr(import_service.db, "session", StubSession())
MigrationImportService()._find_dependency_only_mcp_provider(
"tenant-1",
"my-test-mcp-server",
"my-test-mcp",
)
where_clause = str(captured[0].whereclause)
assert f"{MCPToolProvider.__tablename__}.id" not in where_clause
def test_dependency_only_mcp_preflight_reports_available_target_provider(monkeypatch):
report_items = []
package = MigrationPackage.from_mapping(
{
"metadata": {"version": "1", "source_scope": "single"},
"dependencies": [{"kind": "mcp_tool", "provider_id": "my-test-mcp-server"}],
}
)
provider = type(
"Provider",
(),
{"id": "target-provider-id", "name": "my-test-mcp", "server_identifier": "my-test-mcp-server"},
)()
from services.data_migration import import_service
monkeypatch.setattr(import_service.db.session, "scalar", lambda statement: provider)
MigrationImportService()._preflight_dependency_only_mcp(
package,
ImportTarget(
tenant_id="tenant-1",
tenant_name="target",
operator_id="account-1",
operator_email="owner@example.com",
),
report_items,
)
assert report_items == [
ResourceReportItem(
ResourceType.DEPENDENCY,
"my-test-mcp-server",
"mcp_tool my-test-mcp",
"available",
"MCP provider exists in target tenant.",
)
]
def test_import_package_imports_workflow_tool_provider_apps_before_consumers():
events = []
class StubResolver(ImportTargetResolver):
def resolve(self, request):
return ImportTarget(
tenant_id="tenant-1",
tenant_name="target",
operator_id="account-1",
operator_email="owner@example.com",
)
class OrderedImportService(MigrationImportService):
def _import_api_tools(
self,
package,
target,
options,
report_items,
id_mapping,
id_mapping_details,
source_provider_ids_by_name,
):
events.append(("api_tools", "imported"))
def _import_workflows(
self,
package,
target,
options,
report_items,
id_mapping,
id_mapping_details,
*,
imported_workflow_ids=None,
only_app_ids=None,
skip_app_ids=None,
):
only_app_ids = set(only_app_ids or [])
skip_app_ids = set(skip_app_ids or [])
for workflow_data in package.workflows:
app_id = workflow_data["id"]
if only_app_ids and app_id not in only_app_ids:
continue
if app_id in skip_app_ids:
continue
events.append(("workflow", app_id))
id_mapping[app_id] = app_id
if imported_workflow_ids is not None:
imported_workflow_ids.add(app_id)
def _import_workflow_tools(self, package, target, options, id_mapping, id_mapping_details, report_items):
events.append(("workflow_tool", package.workflow_tools[0]["id"]))
def _import_mcp_tools(self, package, target, options, report_items, id_mapping, id_mapping_details):
events.append(("mcp_tools", "imported"))
package = MigrationPackage.from_mapping(
{
"metadata": {"version": "1", "source_scope": "single"},
"workflows": [
{"id": "provider-app", "name": "embedded", "dsl": "app:\n mode: workflow\n"},
{"id": "consumer-app", "name": "main", "dsl": "app:\n mode: advanced-chat\n"},
],
"workflow_tools": [{"id": "workflow-tool", "name": "embedded_tool", "app_id": "provider-app"}],
}
)
OrderedImportService(target_resolver=StubResolver()).import_package(ImportRequest(package=package))
assert events == [
("api_tools", "imported"),
("mcp_tools", "imported"),
("workflow", "provider-app"),
("workflow_tool", "workflow-tool"),
("workflow", "consumer-app"),
]

View File

@ -0,0 +1,53 @@
import json
import pytest
from services.data_migration.entities import MigrationDataError
from services.data_migration.package_service import MigrationPackageService
def test_load_package_rejects_branch_draft_shape(tmp_path):
package_file = tmp_path / "draft.json"
package_file.write_text(json.dumps({"workflows": [], "tools": []}), encoding="utf-8")
with pytest.raises(MigrationDataError, match="metadata.version"):
MigrationPackageService().load_package(package_file)
def test_load_package_rejects_unsupported_version(tmp_path):
package_file = tmp_path / "future.json"
package_file.write_text(json.dumps({"metadata": {"version": "999", "source_scope": "single"}}), encoding="utf-8")
with pytest.raises(MigrationDataError, match="Unsupported migration package version"):
MigrationPackageService().load_package(package_file)
def test_save_package_writes_versioned_shape(tmp_path):
output_file = tmp_path / "migration-data.json"
package = MigrationPackageService().build_empty_package(
source_tenant_id="tenant-1",
source_tenant_name="source",
include_secrets=False,
)
MigrationPackageService().save_package(package, output_file, overwrite=False)
data = json.loads(output_file.read_text(encoding="utf-8"))
assert data["metadata"]["version"] == "1"
assert data["metadata"]["source_tenants"] == [{"id": "tenant-1", "name": "source"}]
assert data["metadata"]["include_secrets"] is False
assert data["workflows"] == []
assert data["dependencies"] == []
def test_save_package_without_overwrite_fails_when_file_exists(tmp_path):
output_file = tmp_path / "migration-data.json"
output_file.write_text("{}", encoding="utf-8")
package = MigrationPackageService().build_empty_package(
source_tenant_id="tenant-1",
source_tenant_name="source",
include_secrets=False,
)
with pytest.raises(MigrationDataError, match="already exists"):
MigrationPackageService().save_package(package, output_file, overwrite=False)

View File

@ -0,0 +1,111 @@
from services.data_migration.entities import ReportContext, ResourceIdMapping, ResourceReportItem, ResourceType
from services.data_migration.report_service import MigrationReportService
def test_report_summarizes_by_resource_type_and_status():
lines = MigrationReportService().render(
[
ResourceReportItem(ResourceType.WORKFLOW, "app-1", "App", "exported"),
ResourceReportItem(ResourceType.API_TOOL, "weather", "Weather", "exported"),
ResourceReportItem(ResourceType.DEPENDENCY, "mcp-1", "MCP", "dependency-only"),
]
)
assert "workflow exported: 1" in lines
assert "api_tool exported: 1" in lines
assert "dependency dependency-only: 1" in lines
def test_report_includes_actionable_lines_for_skipped_and_unresolved_items():
lines = MigrationReportService().render(
[
ResourceReportItem(ResourceType.WORKFLOW, "app-1", "App", "skipped", "App already exists"),
ResourceReportItem(ResourceType.DEPENDENCY, "mcp-1", None, "unresolved", "MCP provider not found"),
ResourceReportItem(ResourceType.API_TOOL, "weather", "Weather", "exported"),
]
)
assert "workflow skipped: 1" in lines
assert "dependency unresolved: 1" in lines
assert "workflow app-1: App already exists" in lines
assert "dependency mcp-1: MCP provider not found" in lines
def test_report_dependency_detail_uses_type_and_name_when_available():
lines = MigrationReportService().render(
[
ResourceReportItem(
ResourceType.DEPENDENCY,
"785e52f1-06bf-483c-8dcf-712e59fd43b9",
"workflow embedded_workflow",
"dependency-only",
"Dependency metadata only; ensure the resource exists in the target environment.",
),
ResourceReportItem(
ResourceType.DEPENDENCY,
"my-test-mcp",
"mcp_tool my-test-mcp",
"dependency-only",
"Configure MCP provider manually in the target tenant unless exporting with secrets enabled.",
),
]
)
assert (
"dependency workflow embedded_workflow: 785e52f1-06bf-483c-8dcf-712e59fd43b9: "
"Dependency metadata only; ensure the resource exists in the target environment."
) in lines
assert (
"dependency mcp_tool my-test-mcp: "
"Configure MCP provider manually in the target tenant unless exporting with secrets enabled."
) in lines
def test_report_includes_dependency_only_detail_lines():
lines = MigrationReportService().render(
[
ResourceReportItem(
ResourceType.DEPENDENCY,
"mcp-1",
"MCP",
"dependency-only",
"Configure manually in target tenant.",
),
]
)
assert "dependency dependency-only: 1" in lines
assert "dependency mcp-1: Configure manually in target tenant." in lines
def test_report_includes_export_and_import_context():
lines = MigrationReportService().render(
[],
context=ReportContext(
output_path="migration-data.json",
source_scope="single",
selected_app_count=2,
include_secrets=False,
target_tenant="prod",
operator_email="admin@example.com",
app_api_tokens_created=1,
app_api_tokens_reused=2,
id_mapping_count=3,
id_mappings={"source-app": "target-app", "source-tool": "target-tool"},
id_mapping_details=[
ResourceIdMapping(ResourceType.WORKFLOW, "Main workflow", "source-app", "target-app"),
ResourceIdMapping(ResourceType.API_TOOL, "weather", "source-tool", "target-tool"),
],
),
)
assert "output: migration-data.json" in lines
assert "source scope: single" in lines
assert "selected apps: 2" in lines
assert "include secrets: false" in lines
assert "target tenant: prod" in lines
assert "operator: admin@example.com" in lines
assert "app api tokens: 1 created, 2 reused" in lines
assert "resource references resolved: 2" in lines
assert "- workflow Main workflow: source-app -> target-app" in lines
assert "- api_tool weather: source-tool -> target-tool" in lines

View File

@ -0,0 +1,286 @@
# Cross-Environment Data Migration
This guide explains how to move workflow apps, chatflow apps, and their custom tool dependencies from one Dify environment to another.
The recommended best-practice path is:
1. Run the export wizard in the source environment.
2. Import the generated migration package in the target environment.
Use scripted export only when you need repeatable automation.
## What Gets Migrated
The migration package can contain:
- Workflow apps and advanced-chat apps.
- Custom API tool providers.
- Workflow tool providers.
- MCP tool providers when secrets are explicitly included.
- Dependency metadata for MCP tools, built-in tools, and plugin tools that must already exist or be configured in the target environment.
Only single source workspace exports are supported. The source and target workspace names do not need to match.
## Recommended Flow: Wizard Export + Import
Run the wizard from the source environment:
```bash
cd api
source .venv/bin/activate
uv run flask app-migration-wizard
```
The wizard asks you to select the source workspace, choose workflow/chatflow apps, discover referenced tools, set import behavior, and write the final migration package JSON.
Then copy the generated JSON package to the target environment and import it:
```bash
cd api
source .venv/bin/activate
uv run flask import-app-migration \
--input migration-data-20260528-120000.json \
--target-tenant "production Workspace"
```
This wizard + import combination is the recommended migration workflow because the wizard shows the available apps and tools, discovers app dependencies, summarizes the package before writing it, and uses the same export service as scripted export.
## Wizard Command
```bash
cd api
source .venv/bin/activate
uv run flask app-migration-wizard
```
The wizard has no CLI options. Its interactive inputs are:
- `Source tenant`: the source workspace to export from. Select one tenant by number.
- `App selection`: apps to export. Supported app types are `workflow` and `advanced-chat`. Enter `all`, one number, or comma-separated numbers.
- `Automatically export tools referenced by selected apps?`: recommended `yes`. This scans selected app graphs and automatically includes referenced custom API tools, workflow tools, and MCP tool references.
- `Export additional tools manually?`: optional. Use this when you need to migrate tools that are not referenced by the selected apps.
- `Include secrets in output JSON?`: default `no`. When `no`, workflow/app DSL secrets are omitted or masked, API tool credentials are omitted, and MCP providers are exported as dependency metadata only. When `yes`, treat the output JSON as sensitive.
- `Create or reuse app API tokens during import?`: default `no`. When `yes`, import creates an app API token for imported apps that have none, or reuses an existing token.
- `Import ID strategy`: `preserve-id` or `generate-new-id`. Default is `preserve-id`.
- `Import conflict strategy`: `fail`, `skip`, or `update`. Wizard default is `update`.
- `Output path`: output migration package path. The default is `migration-data-YYYYMMDD-HHMMSS.json`.
- `Overwrite existing output`: shown only if the selected output file already exists.
- `Write migration package?`: final confirmation after the wizard prints the summary.
## Import Command
```bash
cd api
source .venv/bin/activate
uv run flask import-app-migration --input migration-package.json --target-tenant "target Workspace"
```
Options:
- `--input`: required. Path to the migration package JSON generated by the wizard or scripted export.
- `--target-tenant`: optional only when package metadata already contains a target tenant. Target workspace name or workspace UUID. This overrides package metadata and is recommended for reusable packages.
- `--operator-email`: optional. Email of the target-tenant account used as the import operator. If omitted, import uses the earliest owner account in the target tenant.
- `--id-strategy`: optional override for the package import option. Allowed values:
- `preserve-id`: keep source app/tool IDs when supported. This is the recommended default for preserving workflow references across environments.
- `generate-new-id`: let the target environment generate new IDs and rewrite references through the migration ID mapping.
- `--conflict-strategy`: optional override for the package import option. Allowed values:
- `fail`: stop at the first existing target resource conflict. Previously committed resources are not rolled back.
- `skip`: keep the existing target resource and skip importing that resource.
- `update`: update the existing target resource in place.
- `--create-app-api-token-on-import`: optional override. Create or reuse app API tokens during import.
- `--no-create-app-api-token-on-import`: optional override. Do not create app API tokens during import.
Import prints a report that includes the resolved target tenant, operator, created/updated/skipped resources, unresolved dependencies, app API token counts, and ID mappings used to rewrite references.
## Optional Flow: Scripted Export
Scripted export is supported for automation. The recommended scripted path is:
1. Generate an export config template.
2. Edit the template.
3. Run scripted export.
4. Import the generated package.
Generate the template:
```bash
cd api
source .venv/bin/activate
uv run flask app-migration-template --output export-config.json
```
Edit `export-config.json`, then run:
```bash
cd api
source .venv/bin/activate
uv run flask export-app-migration \
--input export-config.json \
--output migration-package.json
```
Import it:
```bash
cd api
source .venv/bin/activate
uv run flask import-app-migration \
--input migration-package.json \
--target-tenant "production Workspace"
```
### Template Command
```bash
cd api
source .venv/bin/activate
uv run flask app-migration-template [--output export-config.json] [--overwrite]
```
Options:
- `--output`: optional. Path to write the scripted export config JSON template. If omitted, the template is printed to stdout.
- `--overwrite`: optional. Allows replacing an existing output file. Without this option, the command fails if `--output` already exists.
### Scripted Export Command
```bash
cd api
source .venv/bin/activate
uv run flask export-app-migration --input export-config.json --output migration-package.json
```
Options:
- `--input`: required. Path to the export config JSON.
- `--output`: required. Path to write the migration package JSON.
- `--overwrite`: optional. Allows replacing an existing output package. Without this option, the command fails if `--output` already exists.
### Export Config Fields
The generated template uses this shape:
```json
{
"source_tenant": {
"mode": "single",
"id": "",
"name": "admin's Workspace"
},
"apps": {
"modes": ["workflow", "advanced-chat"],
"ids": [],
"all": true
},
"include_referenced_tools": true,
"additional_tools": {
"api_tools": [],
"workflow_tools": [],
"mcp_tools": []
},
"include_secrets": false,
"import_options": {
"create_app_api_token_on_import": false,
"id_strategy": "preserve-id",
"conflict_strategy": "fail"
}
}
```
Fields:
- `source_tenant.mode`: must be `single`.
- `source_tenant.id`: optional source workspace UUID. Use this when workspace names are duplicated or when you want strict source selection.
- `source_tenant.name`: required source workspace name. When `source_tenant.id` is set, the ID and name must match.
- `apps.modes`: optional list of app modes for validation. Supported values are `workflow` and `advanced-chat`.
- `apps.ids`: app IDs to export when `apps.all` is `false`.
- `apps.all`: when `true`, export all supported apps in the selected source workspace. When `false`, export only `apps.ids`.
- `include_referenced_tools`: recommended `true`. Automatically discovers tools referenced by selected workflow/chatflow apps.
- `additional_tools.api_tools`: optional custom API tool provider names to export in addition to referenced tools.
- `additional_tools.workflow_tools`: optional workflow tool provider IDs to export in addition to referenced tools.
- `additional_tools.mcp_tools`: optional MCP provider IDs or server identifiers to export in addition to referenced tools.
- `include_secrets`: default `false`. When `false`, credentials are omitted and MCP providers are recorded as dependency metadata. When `true`, custom API credentials, workflow DSL secrets, and full MCP connection data can be written to the package.
- `import_options.create_app_api_token_on_import`: default `false`. Package-level default for import token creation.
- `import_options.id_strategy`: package-level default ID strategy. Allowed values are `preserve-id` and `generate-new-id`.
- `import_options.conflict_strategy`: package-level default conflict strategy. Allowed values are `fail`, `skip`, and `update`.
## Referenced Tools
Use automatic referenced-tool discovery whenever possible.
When enabled, Dify scans selected workflow/chatflow graphs and agent tool configs, then de-duplicates discovered custom API tools, workflow tools, and MCP tool references before export. This reduces the chance of importing an app whose workflow references missing providers.
Built-in and plugin tools are not serialized into the package. The migration report records them as dependency metadata; make sure the target environment has the required built-in or plugin tools installed and configured.
MCP providers are also dependency-only unless `include_secrets` is enabled. If MCP secrets are not exported, configure the corresponding MCP provider manually in the target workspace before running migrated workflows.
## Secret Handling
The safe default is `include_secrets: false`.
Only enable secret export when you have a controlled transfer path for the JSON package. With secrets enabled, the package may include API tool credentials, workflow/app DSL secret values, MCP server URLs, MCP headers, authentication data, and cached MCP tool lists.
## FAQ
### What happens when `include_secrets` is `false`?
This is the recommended default, but it means the migration package is intentionally incomplete for sensitive runtime configuration.
Important behavior:
- MCP tools are not exported as full tool providers. They are recorded as dependency metadata only.
- Custom API tool credentials are not exported. The provider schema can be exported, but credentials must be configured again in the target workspace.
- Workflow/app DSL secrets are omitted or masked. Any app variables, credentials, or secret-backed settings must be reviewed in the target workspace after import.
- Built-in and plugin tools are never serialized as custom migration data. They are recorded as dependencies only.
How to handle it:
- Before importing, install or enable the required built-in/plugin tools in the target environment.
- For MCP tools, manually create or configure the corresponding MCP provider in the target workspace before running migrated workflows.
- For custom API tools, open the imported provider in the target workspace and re-enter credentials.
- After import, review each migrated workflow/chatflow before production use. Pay special attention to tool nodes, agent tool configs, environment variables, app variables, and credential-backed settings.
- Use the import report. Items marked `dependency-only`, `skipped`, or `unresolved` usually need manual follow-up.
If you need the package to carry full MCP provider configuration or credentials, rerun export with `include_secrets` set to `true` and transfer the generated JSON as sensitive data.
### What should I consider when `include_secrets` is `true`?
The package can include sensitive runtime configuration such as API tool credentials, workflow/app DSL secret values, MCP server URLs, headers, authentication data, and MCP tool lists.
How to handle it:
- Store and transfer the JSON package as a secret.
- Delete the package after import if your security policy requires it.
- Prefer using this only for controlled one-time migrations where manual target-side credential setup is harder than securing the package.
### Should I use `preserve-id` or `generate-new-id`?
The ID strategy controls whether imported resources keep source IDs or receive new IDs in the target environment. It applies to workflow apps and custom tool resources where the target service supports explicit IDs.
Use `preserve-id` by default. It is recommended for cross-environment app migration because imported apps and tools try to keep their source IDs, which makes workflow references easier to preserve.
Use `preserve-id` when:
- You are migrating between environments that are meant to mirror each other, such as staging to production.
- You want workflow tool references and provider references to remain as stable as possible.
- The target environment does not already contain unrelated resources with the same IDs.
Watch out for:
- If the target already has a resource with the same ID, `conflict_strategy=fail` stops the import, `skip` keeps the target resource, and `update` updates it in place.
- If the same ID was reused for a different resource in the target environment, review carefully before using `update`.
Use `generate-new-id` when the target environment should keep its own local IDs.
Use `generate-new-id` when:
- The target environment already has resources that may conflict with source IDs.
- You are importing a package as a copy rather than trying to mirror environments.
- You want to avoid preserving source database IDs in the target environment.
Watch out for:
- Import records source-to-target ID mappings. Always review the import report's ID mappings.
- Workflow DSL provider references are rewritten using the generated ID mapping where possible.
- Dependencies that are metadata-only, such as MCP providers exported with `include_secrets=false`, still require manual target-side configuration.
- Built-in/plugin tools are not remapped as migrated custom resources; the target environment must provide them separately.

View File

@ -1,5 +1,5 @@
# base image
FROM node:22-alpine AS base
FROM node:22.22.1-alpine AS base
LABEL maintainer="takatost@gmail.com"
# if you located in China, you can use aliyun mirror to speed up

View File

@ -465,6 +465,14 @@ describe('Card', () => {
expect(screen.queryByTestId('partner-badge')).not.toBeInTheDocument()
})
it('should handle null badges from the marketplace API', () => {
const plugin = createMockPlugin({ badges: null })
render(<Card payload={plugin} />)
expect(screen.queryByTestId('partner-badge')).not.toBeInTheDocument()
})
})
// ================================

View File

@ -53,7 +53,8 @@ const Card = ({
const { t } = useTranslation()
const { categoriesMap } = useCategories(true)
const currentWorkspaceId = useSelector(s => s.currentWorkspace.id)
const { category, type, name, org, label, brief, icon, icon_dark, verified, badges = [], from } = payload
const { category, type, name, org, label, brief, icon, icon_dark, verified, from } = payload
const badges = payload.badges ?? []
const { theme } = useTheme()
const iconSrc = getPluginCardIconUrl(
{ from, name, org, type },

View File

@ -190,7 +190,7 @@ export type PluginManifestInMarket = {
introduction: string
verified: boolean
install_count: number
badges: string[]
badges: string[] | null
verification: {
authorized_category: 'langgenius' | 'partner' | 'community'
}
@ -255,7 +255,7 @@ export type Plugin = {
settings: CredentialFormSchemaBase[]
}
tags: { name: string }[]
badges: string[]
badges: string[] | null
verification: {
authorized_category: 'langgenius' | 'partner' | 'community'
}

View File

@ -33,6 +33,7 @@ export default function DevicePage() {
const pathname = usePathname()
const urlUserCode = (searchParams.get('user_code') || '').trim().toUpperCase()
const ssoVerified = searchParams.get('sso_verified') === '1'
const ssoError = searchParams.get('sso_error') || ''
const [typed, setTyped] = useState('')
const [view, setView] = useState<View>({ kind: 'code_entry' })
@ -81,7 +82,11 @@ export default function DevicePage() {
return
}
let consumed = false
if (ssoVerified) {
if (ssoError) {
setErrMsg(ssoError) // eslint-disable-line react/set-state-in-effect
consumed = true
}
else if (ssoVerified) {
setView({ kind: 'authorize_sso' }) // eslint-disable-line react/set-state-in-effect
consumed = true
}
@ -92,9 +97,9 @@ export default function DevicePage() {
setView({ kind: 'chooser', userCode: urlUserCode }) // eslint-disable-line react/set-state-in-effect
consumed = true
}
if (consumed && (urlUserCode || ssoVerified))
if (consumed && (urlUserCode || ssoVerified || ssoError))
router.replace(pathname)
}, [urlUserCode, ssoVerified, account, view, router, pathname])
}, [urlUserCode, ssoVerified, ssoError, account, view, router, pathname])
const onContinue = async () => {
if (!isValidUserCode(typed))