mirror of
https://github.com/langgenius/dify.git
synced 2026-05-29 05:07:55 +08:00
Compare commits
9 Commits
deploy/ent
...
fix/device
| Author | SHA1 | Date | |
|---|---|---|---|
| 4a2f90e7ec | |||
| f5ab5e7eb3 | |||
| 0c40e1c2a0 | |||
| c29d76757e | |||
| 91c1d3ad81 | |||
| 57b02e341c | |||
| b94ff65e9f | |||
| 678260e34e | |||
| 739e34d08a |
@ -4,6 +4,12 @@ CLI command modules extracted from `commands.py`.
|
||||
|
||||
from .account import create_tenant, reset_email, reset_password
|
||||
from .data_migrate import data_migrate, legacy_model_types
|
||||
from .data_migration import (
|
||||
export_migration_data,
|
||||
export_migration_data_template,
|
||||
import_migration_data,
|
||||
migration_data_wizard,
|
||||
)
|
||||
from .plugin import (
|
||||
extract_plugins,
|
||||
extract_unique_plugins,
|
||||
@ -26,7 +32,12 @@ from .retention import (
|
||||
restore_workflow_runs,
|
||||
)
|
||||
from .storage import clear_orphaned_file_records, file_usage, migrate_oss, remove_orphaned_files_on_storage
|
||||
from .system import convert_to_agent_apps, fix_app_site_missing, reset_encrypt_key_pair, upgrade_db
|
||||
from .system import (
|
||||
convert_to_agent_apps,
|
||||
fix_app_site_missing,
|
||||
reset_encrypt_key_pair,
|
||||
upgrade_db,
|
||||
)
|
||||
from .vector import (
|
||||
add_qdrant_index,
|
||||
migrate_annotation_vector_database,
|
||||
@ -48,10 +59,13 @@ __all__ = [
|
||||
"data_migrate",
|
||||
"delete_archived_workflow_runs",
|
||||
"export_app_messages",
|
||||
"export_migration_data",
|
||||
"export_migration_data_template",
|
||||
"extract_plugins",
|
||||
"extract_unique_plugins",
|
||||
"file_usage",
|
||||
"fix_app_site_missing",
|
||||
"import_migration_data",
|
||||
"install_plugins",
|
||||
"install_rag_pipeline_plugins",
|
||||
"legacy_model_types",
|
||||
@ -59,6 +73,7 @@ __all__ = [
|
||||
"migrate_data_for_plugin",
|
||||
"migrate_knowledge_vector_database",
|
||||
"migrate_oss",
|
||||
"migration_data_wizard",
|
||||
"old_metadata_migration",
|
||||
"remove_orphaned_files_on_storage",
|
||||
"reset_email",
|
||||
|
||||
754
api/commands/data_migration.py
Normal file
754
api/commands/data_migration.py
Normal file
@ -0,0 +1,754 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, cast
|
||||
from uuid import UUID
|
||||
|
||||
import click
|
||||
import sqlalchemy as sa
|
||||
import yaml
|
||||
|
||||
from extensions.ext_database import db
|
||||
from models import Tenant
|
||||
from models.model import App
|
||||
from models.tools import ApiToolProvider, MCPToolProvider, WorkflowToolProvider
|
||||
from services.app_dsl_service import AppDslService
|
||||
from services.data_migration.dependency_discovery_service import DependencyDiscoveryService
|
||||
from services.data_migration.entities import (
|
||||
DependencyKind,
|
||||
ImportOptions,
|
||||
MigrationDataError,
|
||||
ReportContext,
|
||||
ResourceReportItem,
|
||||
)
|
||||
from services.data_migration.export_service import ExportConfigParser, MigrationExportService
|
||||
from services.data_migration.import_service import ImportRequest, MigrationImportService
|
||||
from services.data_migration.package_service import MigrationPackageService
|
||||
from services.data_migration.report_service import MigrationReportService
|
||||
|
||||
ID_STRATEGY_CHOICES = ["preserve-id", "generate-new-id"]
|
||||
CONFLICT_STRATEGY_CHOICES = ["fail", "skip", "update"]
|
||||
SUPPORTED_WIZARD_APP_MODES = ["workflow", "advanced-chat"]
|
||||
WizardToolMap = dict[str, dict[str, str | None]]
|
||||
WizardToolSelection = dict[str, list[str]]
|
||||
|
||||
|
||||
def _scripted_export_template() -> dict[str, Any]:
|
||||
return {
|
||||
"source_tenant": {
|
||||
"mode": "single",
|
||||
"id": "",
|
||||
"name": "admin's Workspace",
|
||||
},
|
||||
"apps": {
|
||||
"modes": ["workflow", "advanced-chat"],
|
||||
"ids": [],
|
||||
"all": True,
|
||||
},
|
||||
"include_referenced_tools": True,
|
||||
"additional_tools": {
|
||||
"api_tools": [],
|
||||
"workflow_tools": [],
|
||||
"mcp_tools": [],
|
||||
},
|
||||
"include_secrets": False,
|
||||
"import_options": {
|
||||
"create_app_api_token_on_import": False,
|
||||
"id_strategy": "preserve-id",
|
||||
"conflict_strategy": "fail",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@click.command("app-migration-template", help="Print or write a scripted export config JSON template.")
|
||||
@click.option(
|
||||
"--output",
|
||||
"output_file",
|
||||
required=False,
|
||||
type=click.Path(dir_okay=False),
|
||||
help="Path to write the export config JSON template. Prints to stdout when omitted.",
|
||||
)
|
||||
@click.option("--overwrite", is_flag=True, default=False, help="Overwrite output if it already exists.")
|
||||
def export_migration_data_template(output_file: str | None, overwrite: bool) -> None:
|
||||
template_json = json.dumps(_scripted_export_template(), indent=2, ensure_ascii=False) + "\n"
|
||||
if output_file is None:
|
||||
click.echo(template_json, nl=False)
|
||||
return
|
||||
path = Path(output_file)
|
||||
if path.exists() and not overwrite:
|
||||
raise click.ClickException(f"Output file already exists: {output_file}")
|
||||
path.write_text(template_json)
|
||||
click.echo(click.style(f"Output written to {output_file}", fg="green"))
|
||||
|
||||
|
||||
@click.command("export-app-migration", help="Export workflow migration data to a versioned JSON package.")
|
||||
@click.option(
|
||||
"--input",
|
||||
"input_file",
|
||||
required=False,
|
||||
type=click.Path(exists=True, dir_okay=False),
|
||||
help="Path to export config JSON.",
|
||||
)
|
||||
@click.option(
|
||||
"--output",
|
||||
"output_file",
|
||||
required=False,
|
||||
type=click.Path(dir_okay=False),
|
||||
help="Path to migration package JSON.",
|
||||
)
|
||||
@click.option("--overwrite", is_flag=True, default=False, help="Overwrite output if it already exists.")
|
||||
def export_migration_data(input_file: str | None, output_file: str | None, overwrite: bool) -> None:
|
||||
try:
|
||||
_require_options(("--input", input_file), ("--output", output_file))
|
||||
assert input_file is not None
|
||||
assert output_file is not None
|
||||
raw_config = _load_json_object(input_file, "Export config")
|
||||
selection = ExportConfigParser().parse(raw_config)
|
||||
result = MigrationExportService().export(selection)
|
||||
MigrationPackageService().save_package(result.package, output_file, overwrite=overwrite)
|
||||
click.echo(click.style(f"Output written to {output_file}", fg="green"))
|
||||
_render_report(result.report_items, context=_with_output_path(result.report_context, output_file))
|
||||
except MigrationDataError as exc:
|
||||
raise click.ClickException(str(exc)) from exc
|
||||
|
||||
|
||||
@click.command("import-app-migration", help="Import a versioned migration data package.")
|
||||
@click.option(
|
||||
"--input",
|
||||
"input_file",
|
||||
required=False,
|
||||
type=click.Path(exists=True, dir_okay=False),
|
||||
help="Path to migration package JSON.",
|
||||
)
|
||||
@click.option("--target-tenant", default=None, help="Target tenant/workspace name. Overrides package metadata.")
|
||||
@click.option("--operator-email", default=None, help="Operator account email in the target tenant.")
|
||||
@click.option(
|
||||
"--id-strategy",
|
||||
default=None,
|
||||
type=click.Choice(ID_STRATEGY_CHOICES),
|
||||
help="Override package ID strategy.",
|
||||
)
|
||||
@click.option(
|
||||
"--conflict-strategy",
|
||||
default=None,
|
||||
type=click.Choice(CONFLICT_STRATEGY_CHOICES),
|
||||
help="Override package conflict strategy.",
|
||||
)
|
||||
@click.option(
|
||||
"--create-app-api-token-on-import/--no-create-app-api-token-on-import",
|
||||
default=None,
|
||||
help="Override package app API token creation behavior.",
|
||||
)
|
||||
def import_migration_data(
|
||||
input_file: str | None,
|
||||
target_tenant: str | None,
|
||||
operator_email: str | None,
|
||||
id_strategy: str | None,
|
||||
conflict_strategy: str | None,
|
||||
create_app_api_token_on_import: bool | None,
|
||||
) -> None:
|
||||
try:
|
||||
_require_options(("--input", input_file))
|
||||
assert input_file is not None
|
||||
package = MigrationPackageService().load_package(input_file)
|
||||
result = MigrationImportService().import_package(
|
||||
ImportRequest(
|
||||
package=package,
|
||||
cli_target_tenant=target_tenant,
|
||||
operator_email=operator_email,
|
||||
options_override=_build_options_override(
|
||||
package.metadata.import_options,
|
||||
id_strategy=id_strategy,
|
||||
conflict_strategy=conflict_strategy,
|
||||
create_app_api_token_on_import=create_app_api_token_on_import,
|
||||
),
|
||||
)
|
||||
)
|
||||
_render_report(result.report_items, context=result.report_context)
|
||||
except MigrationDataError as exc:
|
||||
raise click.ClickException(str(exc)) from exc
|
||||
|
||||
|
||||
def parse_index_selection(raw: str, values: list[str]) -> list[str]:
|
||||
normalized = raw.strip().lower()
|
||||
if normalized == "all":
|
||||
return values
|
||||
|
||||
selected: list[str] = []
|
||||
for part in raw.split(","):
|
||||
stripped = part.strip()
|
||||
if not stripped:
|
||||
continue
|
||||
try:
|
||||
index = int(stripped)
|
||||
except ValueError as exc:
|
||||
raise click.ClickException(f"Selection must be 'all' or comma-separated numbers: {raw}") from exc
|
||||
if index < 1 or index > len(values):
|
||||
raise click.ClickException(f"Selection index out of range: {index}")
|
||||
selected.append(values[index - 1])
|
||||
return list(dict.fromkeys(selected))
|
||||
|
||||
|
||||
def _print_wizard_step(title: str) -> None:
|
||||
click.echo("")
|
||||
click.echo(f"==== {title} ====")
|
||||
|
||||
|
||||
def _print_wizard_substep(title: str) -> None:
|
||||
click.echo("")
|
||||
click.echo(f"-- {title} --")
|
||||
|
||||
|
||||
@click.command("app-migration-wizard", help="Interactively export workflow migration data.")
|
||||
def migration_data_wizard() -> None:
|
||||
try:
|
||||
tenant = _prompt_source_tenant()
|
||||
apps = _eligible_apps_for_tenant(tenant.id)
|
||||
app_ids = _prompt_app_ids(apps)
|
||||
_print_wizard_step("Referenced Tools")
|
||||
include_referenced_tools = click.confirm(
|
||||
"Automatically export tools referenced by selected apps? [y/n, default: y]",
|
||||
default=True,
|
||||
show_default=False,
|
||||
)
|
||||
auto_tools = _discover_auto_tools([app for app in apps if app.id in set(app_ids)], include_referenced_tools)
|
||||
auto_tools = _resolve_auto_tool_names(tenant.id, auto_tools)
|
||||
_print_auto_tools(auto_tools)
|
||||
additional_tools = _prompt_additional_tools(tenant.id, auto_tools)
|
||||
include_secrets, create_tokens, id_strategy, conflict_strategy = _prompt_import_options()
|
||||
_print_wizard_step("Output")
|
||||
output_file, overwrite = _prompt_output_file()
|
||||
|
||||
selection = ExportConfigParser().parse(
|
||||
{
|
||||
"source_tenant": {"mode": "single", "id": tenant.id, "name": tenant.name},
|
||||
"apps": {"ids": app_ids, "all": False},
|
||||
"include_referenced_tools": include_referenced_tools,
|
||||
"additional_tools": additional_tools,
|
||||
"include_secrets": include_secrets,
|
||||
"import_options": {
|
||||
"create_app_api_token_on_import": create_tokens,
|
||||
"id_strategy": id_strategy,
|
||||
"conflict_strategy": conflict_strategy,
|
||||
},
|
||||
}
|
||||
)
|
||||
_confirm_wizard_summary(
|
||||
tenant_name=tenant.name,
|
||||
app_names=[app.name for app in apps if app.id in set(app_ids)],
|
||||
auto_tools=auto_tools,
|
||||
additional_tools=additional_tools,
|
||||
manual_labels=_selected_tool_labels_for_tenant(tenant.id, additional_tools),
|
||||
include_referenced_tools=include_referenced_tools,
|
||||
include_secrets=include_secrets,
|
||||
create_tokens=create_tokens,
|
||||
id_strategy=id_strategy,
|
||||
conflict_strategy=conflict_strategy,
|
||||
output_file=output_file,
|
||||
)
|
||||
result = MigrationExportService().export(selection)
|
||||
MigrationPackageService().save_package(result.package, output_file, overwrite=overwrite)
|
||||
click.echo(click.style(f"Output written to {output_file}", fg="green"))
|
||||
_print_wizard_step("Report")
|
||||
_render_report(result.report_items, context=_with_output_path(result.report_context, output_file))
|
||||
except MigrationDataError as exc:
|
||||
raise click.ClickException(str(exc)) from exc
|
||||
|
||||
|
||||
def _load_json_object(path: str, label: str) -> dict[str, Any]:
|
||||
try:
|
||||
with Path(path).open(encoding="utf-8") as file:
|
||||
raw = json.load(file)
|
||||
except json.JSONDecodeError as exc:
|
||||
raise MigrationDataError(f"{label} JSON is invalid: {exc.msg}") from exc
|
||||
if not isinstance(raw, dict):
|
||||
raise MigrationDataError(f"{label} JSON must be an object.")
|
||||
return raw
|
||||
|
||||
|
||||
def _require_options(*options: tuple[str, object | None]) -> None:
|
||||
missing_options = [name for name, value in options if value is None]
|
||||
if missing_options:
|
||||
raise click.UsageError(f"Missing option(s): {', '.join(missing_options)}.")
|
||||
|
||||
|
||||
def _build_options_override(
|
||||
package_options: ImportOptions,
|
||||
*,
|
||||
id_strategy: str | None,
|
||||
conflict_strategy: str | None,
|
||||
create_app_api_token_on_import: bool | None,
|
||||
) -> ImportOptions | None:
|
||||
if id_strategy is None and conflict_strategy is None and create_app_api_token_on_import is None:
|
||||
return None
|
||||
return ImportOptions.from_mapping(
|
||||
{
|
||||
"id_strategy": id_strategy or package_options.id_strategy,
|
||||
"conflict_strategy": conflict_strategy or package_options.conflict_strategy,
|
||||
"create_app_api_token_on_import": (
|
||||
create_app_api_token_on_import
|
||||
if create_app_api_token_on_import is not None
|
||||
else package_options.create_app_api_token_on_import
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def _prompt_source_tenant() -> Tenant:
|
||||
tenants = list(db.session.scalars(sa.select(Tenant).order_by(Tenant.name.asc())).all())
|
||||
if not tenants:
|
||||
raise MigrationDataError("No tenants found.")
|
||||
|
||||
_print_wizard_step("Source Tenant")
|
||||
click.echo("Source tenants:")
|
||||
for index, tenant in enumerate(tenants, 1):
|
||||
click.echo(f"{index}. {tenant.name} ({tenant.id})")
|
||||
|
||||
tenant_index = click.prompt("Select one source tenant by number", type=int, default=1, show_default=True)
|
||||
if tenant_index < 1 or tenant_index > len(tenants):
|
||||
raise click.ClickException(f"Selection index out of range: {tenant_index}")
|
||||
return tenants[tenant_index - 1]
|
||||
|
||||
|
||||
def _eligible_apps_for_tenant(tenant_id: str) -> list[App]:
|
||||
return list(
|
||||
db.session.scalars(
|
||||
sa.select(App)
|
||||
.where(App.tenant_id == tenant_id, App.mode.in_(SUPPORTED_WIZARD_APP_MODES))
|
||||
.order_by(App.name.asc())
|
||||
).all()
|
||||
)
|
||||
|
||||
|
||||
def _prompt_app_ids(apps: list[App]) -> list[str]:
|
||||
if not apps:
|
||||
raise MigrationDataError("No workflow or advanced-chat apps found for the selected tenant.")
|
||||
|
||||
_print_wizard_step("App Selection")
|
||||
click.echo("Currently supported app types: workflow and chatflow.")
|
||||
click.echo("Workflow/chatflow apps:")
|
||||
for index, app in enumerate(apps, 1):
|
||||
mode = app.mode.value if hasattr(app.mode, "value") else app.mode
|
||||
click.echo(f"{index}. {app.name} [{mode}] ({app.id})")
|
||||
app_ids = parse_index_selection(
|
||||
click.prompt("Select apps by number, comma-separated numbers, or all", default="all"),
|
||||
[app.id for app in apps],
|
||||
)
|
||||
selected_apps = [app for app in apps if app.id in set(app_ids)]
|
||||
click.echo("Selected apps:")
|
||||
for app in selected_apps:
|
||||
click.echo(f"- {app.name} ({app.id})")
|
||||
return app_ids
|
||||
|
||||
|
||||
def _prompt_import_options() -> tuple[bool, bool, str, str]:
|
||||
_print_wizard_step("Import Options")
|
||||
_print_wizard_substep("Secrets")
|
||||
click.echo("Secrets include workflow/app DSL secret values, custom API tool credentials,")
|
||||
click.echo("and full MCP provider connection data such as server URL, headers, authentication, and tool list.")
|
||||
click.echo("If you choose no, credentials are omitted or masked,")
|
||||
click.echo("and MCP providers are exported as dependency metadata only.")
|
||||
click.echo("Treat the output JSON as sensitive if you choose yes.")
|
||||
include_secrets = click.confirm(
|
||||
"Include secrets in output JSON? [y/n, default: n]",
|
||||
default=False,
|
||||
show_default=False,
|
||||
)
|
||||
_print_wizard_substep("App API Tokens")
|
||||
click.echo("When enabled, import will create an app API token if the imported app has none,")
|
||||
click.echo("or reuse an existing app API token if one already exists.")
|
||||
create_tokens = click.confirm(
|
||||
"Create or reuse app API tokens during import? [y/n, default: n]",
|
||||
default=False,
|
||||
show_default=False,
|
||||
)
|
||||
_print_wizard_substep("ID Strategy")
|
||||
click.echo("ID strategy controls whether imported app and tool IDs preserve source IDs")
|
||||
click.echo("or use target-generated IDs.")
|
||||
click.echo("preserve-id: keep source IDs where the target service supports it.")
|
||||
click.echo("generate-new-id: let the target environment generate new IDs and rewrite references via mapping.")
|
||||
id_strategy = click.prompt(
|
||||
"Import ID strategy. Enter one of: preserve-id, generate-new-id",
|
||||
type=click.Choice(ID_STRATEGY_CHOICES),
|
||||
default="preserve-id",
|
||||
show_default=True,
|
||||
)
|
||||
_print_wizard_substep("Conflict Strategy")
|
||||
click.echo("Conflict strategy controls what import does when a target resource already exists.")
|
||||
click.echo("fail: stop at the first conflict; previously committed resources are not rolled back.")
|
||||
click.echo("skip: keep the existing target resource and skip importing that resource.")
|
||||
click.echo("update: update the existing target resource in place.")
|
||||
conflict_strategy = click.prompt(
|
||||
"Import conflict strategy. Enter one of: fail, skip, update",
|
||||
type=click.Choice(CONFLICT_STRATEGY_CHOICES),
|
||||
default="update",
|
||||
show_default=True,
|
||||
)
|
||||
return include_secrets, create_tokens, id_strategy, conflict_strategy
|
||||
|
||||
|
||||
def _discover_auto_tools(apps: list[App], include_referenced_tools: bool) -> WizardToolMap:
|
||||
auto_tools: WizardToolMap = {"api_tools": {}, "workflow_tools": {}, "mcp_tools": {}}
|
||||
if not include_referenced_tools:
|
||||
return auto_tools
|
||||
discovery_service = DependencyDiscoveryService()
|
||||
for app in apps:
|
||||
dsl_content = AppDslService.export_dsl(app_model=app, include_secret=False)
|
||||
raw_dsl = yaml.safe_load(dsl_content) if dsl_content else {}
|
||||
dsl = raw_dsl if isinstance(raw_dsl, dict) else {}
|
||||
for dependency in discovery_service.discover_from_dsl(dsl):
|
||||
if dependency.kind == DependencyKind.API_TOOL:
|
||||
auto_tools["api_tools"][dependency.provider_name or dependency.provider_id] = dependency.provider_id
|
||||
elif dependency.kind == DependencyKind.WORKFLOW_TOOL:
|
||||
auto_tools["workflow_tools"][dependency.provider_name or dependency.provider_id] = (
|
||||
dependency.provider_id
|
||||
)
|
||||
elif dependency.kind == DependencyKind.MCP_TOOL:
|
||||
auto_tools["mcp_tools"][dependency.provider_name or dependency.provider_id] = dependency.provider_id
|
||||
return auto_tools
|
||||
|
||||
|
||||
def _resolve_auto_tool_names(tenant_id: str, auto_tools: WizardToolMap) -> WizardToolMap:
|
||||
return {
|
||||
"api_tools": _resolve_api_tool_names(tenant_id, auto_tools["api_tools"]),
|
||||
"workflow_tools": _resolve_workflow_tool_names(tenant_id, auto_tools["workflow_tools"]),
|
||||
"mcp_tools": _resolve_mcp_tool_names(tenant_id, auto_tools["mcp_tools"]),
|
||||
}
|
||||
|
||||
|
||||
def _resolve_api_tool_names(tenant_id: str, tools: dict[str, str | None]) -> dict[str, str | None]:
|
||||
resolved: dict[str, str | None] = {}
|
||||
for name, identifier in tools.items():
|
||||
predicates = [ApiToolProvider.name == name]
|
||||
if _is_uuid_string(identifier):
|
||||
predicates.append(ApiToolProvider.id == identifier)
|
||||
provider = db.session.scalar(
|
||||
sa.select(ApiToolProvider).where(
|
||||
ApiToolProvider.tenant_id == tenant_id,
|
||||
sa.or_(*predicates),
|
||||
)
|
||||
)
|
||||
resolved[provider.name if provider else name] = provider.id if provider else identifier
|
||||
return resolved
|
||||
|
||||
|
||||
def _resolve_workflow_tool_names(tenant_id: str, tools: dict[str, str | None]) -> dict[str, str | None]:
|
||||
resolved: dict[str, str | None] = {}
|
||||
for name, identifier in tools.items():
|
||||
predicates = [WorkflowToolProvider.name == name]
|
||||
if _is_uuid_string(identifier):
|
||||
predicates.append(WorkflowToolProvider.id == identifier)
|
||||
provider = db.session.scalar(
|
||||
sa.select(WorkflowToolProvider).where(
|
||||
WorkflowToolProvider.tenant_id == tenant_id,
|
||||
sa.or_(*predicates),
|
||||
)
|
||||
)
|
||||
resolved[provider.name if provider else name] = provider.id if provider else identifier
|
||||
return resolved
|
||||
|
||||
|
||||
def _resolve_mcp_tool_names(tenant_id: str, tools: dict[str, str | None]) -> dict[str, str | None]:
|
||||
resolved: dict[str, str | None] = {}
|
||||
for name, identifier in tools.items():
|
||||
predicates = [MCPToolProvider.name == name]
|
||||
if identifier:
|
||||
predicates.append(MCPToolProvider.server_identifier == identifier)
|
||||
if _is_uuid_string(identifier):
|
||||
predicates.append(MCPToolProvider.id == identifier)
|
||||
provider = db.session.scalar(
|
||||
sa.select(MCPToolProvider).where(
|
||||
MCPToolProvider.tenant_id == tenant_id,
|
||||
sa.or_(*predicates),
|
||||
)
|
||||
)
|
||||
resolved[provider.name if provider else name] = provider.id if provider else identifier
|
||||
return resolved
|
||||
|
||||
|
||||
def _is_uuid_string(value: str | None) -> bool:
|
||||
if not value:
|
||||
return False
|
||||
try:
|
||||
UUID(value)
|
||||
except ValueError:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _print_auto_tools(auto_tools: WizardToolMap) -> None:
|
||||
_print_wizard_step("Automatically Discovered Tools")
|
||||
click.echo("Automatically discovered tools:")
|
||||
_print_auto_tool_category("Custom API tools", auto_tools["api_tools"])
|
||||
_print_auto_tool_category("Workflow tools", auto_tools["workflow_tools"])
|
||||
_print_auto_tool_category("MCP tools", auto_tools["mcp_tools"])
|
||||
|
||||
|
||||
def _print_auto_tool_category(label: str, values: dict[str, str | None]) -> None:
|
||||
click.echo(label)
|
||||
if not values:
|
||||
click.echo("- none")
|
||||
return
|
||||
for name, identifier in sorted(values.items()):
|
||||
click.echo(f"- {_format_tool_name_id(name, identifier)}")
|
||||
|
||||
|
||||
def _prompt_additional_tools(tenant_id: str, auto_tools: WizardToolMap) -> WizardToolSelection:
|
||||
selections: WizardToolSelection = {"api_tools": [], "workflow_tools": [], "mcp_tools": []}
|
||||
_print_wizard_step("Additional Tools")
|
||||
if not click.confirm(
|
||||
"Export additional tools manually? [y/n, default: n]",
|
||||
default=False,
|
||||
show_default=False,
|
||||
):
|
||||
_print_final_tool_selection(auto_tools, selections, {})
|
||||
return selections
|
||||
manual_labels: dict[str, str] = {}
|
||||
api_tool_options = [
|
||||
(tool.name, tool.name, tool.id)
|
||||
for tool in db.session.scalars(
|
||||
sa.select(ApiToolProvider).where(ApiToolProvider.tenant_id == tenant_id).order_by(ApiToolProvider.name)
|
||||
).all()
|
||||
]
|
||||
selections["api_tools"] = _prompt_tool_category(
|
||||
"Custom API tools",
|
||||
api_tool_options,
|
||||
auto_tools=auto_tools["api_tools"],
|
||||
)
|
||||
manual_labels.update(_selected_tool_labels(api_tool_options, selections["api_tools"]))
|
||||
workflow_tool_options = [
|
||||
(tool.id, tool.name, tool.id)
|
||||
for tool in db.session.scalars(
|
||||
sa.select(WorkflowToolProvider)
|
||||
.where(WorkflowToolProvider.tenant_id == tenant_id)
|
||||
.order_by(WorkflowToolProvider.name)
|
||||
).all()
|
||||
]
|
||||
selections["workflow_tools"] = _prompt_tool_category(
|
||||
"Workflow tools",
|
||||
workflow_tool_options,
|
||||
auto_tools=auto_tools["workflow_tools"],
|
||||
)
|
||||
manual_labels.update(_selected_tool_labels(workflow_tool_options, selections["workflow_tools"]))
|
||||
mcp_tool_options = [
|
||||
(tool.id, tool.name, tool.server_identifier)
|
||||
for tool in db.session.scalars(
|
||||
sa.select(MCPToolProvider).where(MCPToolProvider.tenant_id == tenant_id).order_by(MCPToolProvider.name)
|
||||
).all()
|
||||
]
|
||||
selections["mcp_tools"] = _prompt_tool_category(
|
||||
"MCP tools",
|
||||
mcp_tool_options,
|
||||
auto_tools=auto_tools["mcp_tools"],
|
||||
)
|
||||
manual_labels.update(_selected_tool_labels(mcp_tool_options, selections["mcp_tools"]))
|
||||
_print_final_tool_selection(auto_tools, selections, manual_labels)
|
||||
return selections
|
||||
|
||||
|
||||
def _selected_tool_labels_for_tenant(tenant_id: str, selected_tools: WizardToolSelection) -> dict[str, str]:
|
||||
labels: dict[str, str] = {}
|
||||
if selected_tools["api_tools"]:
|
||||
labels.update(
|
||||
_selected_tool_labels(
|
||||
[
|
||||
(tool.name, tool.name, tool.id)
|
||||
for tool in db.session.scalars(
|
||||
sa.select(ApiToolProvider)
|
||||
.where(ApiToolProvider.tenant_id == tenant_id)
|
||||
.order_by(ApiToolProvider.name)
|
||||
).all()
|
||||
],
|
||||
selected_tools["api_tools"],
|
||||
)
|
||||
)
|
||||
if selected_tools["workflow_tools"]:
|
||||
labels.update(
|
||||
_selected_tool_labels(
|
||||
[
|
||||
(tool.id, tool.name, tool.id)
|
||||
for tool in db.session.scalars(
|
||||
sa.select(WorkflowToolProvider)
|
||||
.where(WorkflowToolProvider.tenant_id == tenant_id)
|
||||
.order_by(WorkflowToolProvider.name)
|
||||
).all()
|
||||
],
|
||||
selected_tools["workflow_tools"],
|
||||
)
|
||||
)
|
||||
if selected_tools["mcp_tools"]:
|
||||
labels.update(
|
||||
_selected_tool_labels(
|
||||
[
|
||||
(tool.id, tool.name, tool.server_identifier)
|
||||
for tool in db.session.scalars(
|
||||
sa.select(MCPToolProvider)
|
||||
.where(MCPToolProvider.tenant_id == tenant_id)
|
||||
.order_by(MCPToolProvider.name)
|
||||
).all()
|
||||
],
|
||||
selected_tools["mcp_tools"],
|
||||
)
|
||||
)
|
||||
return labels
|
||||
|
||||
|
||||
def _selected_tool_labels(options: list[tuple[str, str, str]], selected_values: list[str]) -> dict[str, str]:
|
||||
selected = set(selected_values)
|
||||
return {value: _format_tool_name_id(name, detail) for value, name, detail in options if value in selected}
|
||||
|
||||
|
||||
def _prompt_tool_category(
|
||||
label: str,
|
||||
options: list[tuple[str, str, str]],
|
||||
*,
|
||||
auto_tools: dict[str, str | None],
|
||||
) -> list[str]:
|
||||
if not options:
|
||||
click.echo(f"{label}: none")
|
||||
return []
|
||||
_print_wizard_step(label)
|
||||
for index, (value, name, detail) in enumerate(options, 1):
|
||||
marker = "[auto]" if _is_auto_tool(value, name, detail, auto_tools) else "[ ]"
|
||||
click.echo(f"{index}. {marker} {name} ({detail})")
|
||||
raw = click.prompt(
|
||||
f"Select {label.lower()} by number, comma-separated numbers, all, or empty",
|
||||
default="",
|
||||
show_default=cast(Any, "empty"),
|
||||
)
|
||||
if not raw.strip():
|
||||
return []
|
||||
return parse_index_selection(raw, [value for value, _, _ in options])
|
||||
|
||||
|
||||
def _is_auto_tool(value: str, name: str, detail: str, auto_tools: dict[str, str | None]) -> bool:
|
||||
return name in auto_tools or value in auto_tools or value in auto_tools.values() or detail in auto_tools.values()
|
||||
|
||||
|
||||
def _print_final_tool_selection(
|
||||
auto_tools: WizardToolMap,
|
||||
additional_tools: WizardToolSelection,
|
||||
manual_labels: dict[str, str],
|
||||
) -> None:
|
||||
_print_wizard_step("Final Tool Selection")
|
||||
_print_tool_selection_body(auto_tools, additional_tools, manual_labels)
|
||||
|
||||
|
||||
def _print_tool_selection_body(
|
||||
auto_tools: WizardToolMap,
|
||||
additional_tools: WizardToolSelection,
|
||||
manual_labels: dict[str, str],
|
||||
) -> None:
|
||||
click.echo("Final tools to export:")
|
||||
_print_final_tool_category(
|
||||
"Custom API tools",
|
||||
auto_tools["api_tools"],
|
||||
additional_tools["api_tools"],
|
||||
manual_labels,
|
||||
)
|
||||
_print_final_tool_category(
|
||||
"Workflow tools",
|
||||
auto_tools["workflow_tools"],
|
||||
additional_tools["workflow_tools"],
|
||||
manual_labels,
|
||||
)
|
||||
_print_final_tool_category("MCP tools", auto_tools["mcp_tools"], additional_tools["mcp_tools"], manual_labels)
|
||||
|
||||
|
||||
def _print_final_tool_category(
|
||||
label: str,
|
||||
auto_tools: dict[str, str | None],
|
||||
manual_values: list[str],
|
||||
manual_labels: dict[str, str],
|
||||
) -> None:
|
||||
click.echo(label)
|
||||
lines = [f"- [auto] {_format_tool_name_id(name, identifier)}" for name, identifier in sorted(auto_tools.items())]
|
||||
auto_identifiers = {identifier for identifier in auto_tools.values() if identifier}
|
||||
lines.extend(
|
||||
f"- [manual] {manual_labels.get(value, value)}"
|
||||
for value in manual_values
|
||||
if value not in auto_tools and value not in auto_identifiers
|
||||
)
|
||||
if not lines:
|
||||
click.echo("- none")
|
||||
return
|
||||
for line in lines:
|
||||
click.echo(line)
|
||||
|
||||
|
||||
def _format_tool_name_id(name: str, identifier: str | None) -> str:
|
||||
if identifier and identifier != name:
|
||||
return f"{name}: {identifier}"
|
||||
return name
|
||||
|
||||
|
||||
def _confirm_wizard_summary(
|
||||
*,
|
||||
tenant_name: str,
|
||||
app_names: list[str],
|
||||
auto_tools: WizardToolMap,
|
||||
additional_tools: WizardToolSelection,
|
||||
manual_labels: dict[str, str],
|
||||
include_referenced_tools: bool,
|
||||
include_secrets: bool,
|
||||
create_tokens: bool,
|
||||
id_strategy: str,
|
||||
conflict_strategy: str,
|
||||
output_file: str,
|
||||
) -> None:
|
||||
_print_wizard_step("Summary")
|
||||
click.echo("Migration export summary:")
|
||||
click.echo(f"source tenant: {tenant_name}")
|
||||
click.echo(f"selected apps: {len(app_names)}")
|
||||
for app_name in app_names:
|
||||
click.echo(f"- {app_name}")
|
||||
click.echo(f"auto referenced tools: {str(include_referenced_tools).lower()}")
|
||||
_print_tool_selection_body(auto_tools, additional_tools, manual_labels)
|
||||
click.echo(f"include secrets: {str(include_secrets).lower()}")
|
||||
click.echo(f"create app api token on import: {str(create_tokens).lower()}")
|
||||
click.echo(f"id strategy: {id_strategy}")
|
||||
click.echo(f"conflict strategy: {conflict_strategy}")
|
||||
click.echo(f"output path: {output_file}")
|
||||
if not click.confirm("Write migration package? [y/n, default: y]", default=True, show_default=False):
|
||||
raise click.Abort()
|
||||
|
||||
|
||||
def _prompt_output_file() -> tuple[str, bool]:
|
||||
default_output = f"migration-data-{datetime.now().strftime('%Y%m%d-%H%M%S')}.json"
|
||||
output_file = click.prompt("Output path", default=default_output, show_default=True)
|
||||
if output_file.lower() in {"y", "yes", "n", "no"}:
|
||||
raise click.ClickException("Output path must be a file path. Press Enter to use the default path.")
|
||||
overwrite = False
|
||||
if Path(output_file).exists():
|
||||
overwrite = click.confirm(
|
||||
"Output file exists. Overwrite? [y/n, default: n]",
|
||||
default=False,
|
||||
show_default=False,
|
||||
)
|
||||
if not overwrite:
|
||||
raise click.ClickException(f"Output file already exists: {output_file}")
|
||||
return output_file, overwrite
|
||||
|
||||
|
||||
def _with_output_path(context: ReportContext | None, output_path: str) -> ReportContext:
|
||||
if context is None:
|
||||
return ReportContext(output_path=output_path)
|
||||
return ReportContext(
|
||||
output_path=output_path,
|
||||
source_scope=context.source_scope,
|
||||
selected_app_count=context.selected_app_count,
|
||||
include_secrets=context.include_secrets,
|
||||
target_tenant=context.target_tenant,
|
||||
operator_email=context.operator_email,
|
||||
app_api_tokens_created=context.app_api_tokens_created,
|
||||
app_api_tokens_reused=context.app_api_tokens_reused,
|
||||
id_mapping_count=context.id_mapping_count,
|
||||
id_mappings=context.id_mappings,
|
||||
)
|
||||
|
||||
|
||||
def _render_report(report_items: list[ResourceReportItem], *, context: ReportContext | None = None) -> None:
|
||||
for line in MigrationReportService().render(report_items, context=context):
|
||||
click.echo(line)
|
||||
@ -30,7 +30,7 @@ def vdb_migrate(scope: str):
|
||||
|
||||
def migrate_annotation_vector_database():
|
||||
"""
|
||||
Migrate annotation datas to target vector database .
|
||||
Migrate annotation data to target vector database.
|
||||
"""
|
||||
click.echo(click.style("Starting annotation data migration.", fg="green"))
|
||||
create_count = 0
|
||||
@ -140,7 +140,7 @@ def migrate_annotation_vector_database():
|
||||
|
||||
def migrate_knowledge_vector_database():
|
||||
"""
|
||||
Migrate vector database datas to target vector database .
|
||||
Migrate vector database data to target vector database.
|
||||
"""
|
||||
click.echo(click.style("Starting vector database migration.", fg="green"))
|
||||
create_count = 0
|
||||
|
||||
@ -5,7 +5,7 @@ from uuid import UUID
|
||||
from flask import request
|
||||
from flask_restx import Resource, marshal
|
||||
from pydantic import BaseModel, Field
|
||||
from sqlalchemy import String, cast, func, or_, select
|
||||
from sqlalchemy import String, case, cast, func, literal, or_, select
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
from werkzeug.exceptions import Forbidden, NotFound
|
||||
|
||||
@ -169,12 +169,17 @@ class DatasetDocumentSegmentListApi(Resource):
|
||||
# Use database-specific methods for JSON array search
|
||||
if dify_config.SQLALCHEMY_DATABASE_URI_SCHEME == "postgresql":
|
||||
# PostgreSQL: Use jsonb_array_elements_text to properly handle Unicode/Chinese text
|
||||
# Guard with jsonb_typeof to avoid "cannot extract elements from a scalar" error
|
||||
# when keywords is null or a non-array JSON value.
|
||||
# Feed the set-returning function a JSON array in every row. Filtering in
|
||||
# the subquery is not enough because PostgreSQL can still evaluate the
|
||||
# SRF on scalar JSON before applying the predicate.
|
||||
keywords_jsonb = cast(DocumentSegment.keywords, JSONB)
|
||||
keywords_array = case(
|
||||
(func.jsonb_typeof(keywords_jsonb) == "array", keywords_jsonb),
|
||||
else_=cast(literal("[]"), JSONB),
|
||||
)
|
||||
keywords_condition = func.array_to_string(
|
||||
func.array(
|
||||
select(func.jsonb_array_elements_text(cast(DocumentSegment.keywords, JSONB)))
|
||||
.where(func.jsonb_typeof(cast(DocumentSegment.keywords, JSONB)) == "array")
|
||||
select(func.jsonb_array_elements_text(keywords_array))
|
||||
.correlate(DocumentSegment)
|
||||
.scalar_subquery()
|
||||
),
|
||||
|
||||
@ -863,7 +863,7 @@ class ToolManager:
|
||||
return controller
|
||||
|
||||
@classmethod
|
||||
def user_get_api_provider(cls, provider: str, tenant_id: str):
|
||||
def user_get_api_provider(cls, provider: str, tenant_id: str, mask: bool = True):
|
||||
"""
|
||||
get api provider
|
||||
"""
|
||||
@ -902,8 +902,10 @@ class ToolManager:
|
||||
tenant_id=tenant_id,
|
||||
controller=controller,
|
||||
)
|
||||
|
||||
masked_credentials = encrypter.mask_plugin_credentials(encrypter.decrypt(credentials))
|
||||
if mask:
|
||||
masked_credentials = encrypter.mask_plugin_credentials(encrypter.decrypt(credentials))
|
||||
else:
|
||||
masked_credentials = encrypter.decrypt(credentials)
|
||||
|
||||
try:
|
||||
icon = emoji_icon_adapter.validate_json(provider_obj.icon)
|
||||
|
||||
@ -6,7 +6,7 @@ from json.decoder import JSONDecodeError
|
||||
from typing import Any, TypedDict
|
||||
|
||||
import httpx
|
||||
from flask import request
|
||||
from flask import has_request_context, request
|
||||
from yaml import YAMLError, safe_load
|
||||
|
||||
from core.tools.entities.common_entities import I18nObject
|
||||
@ -44,7 +44,7 @@ class ApiBasedToolSchemaParser:
|
||||
raise ToolProviderNotFoundError("No server found in the openapi yaml.")
|
||||
|
||||
server_url = openapi["servers"][0]["url"]
|
||||
request_env = request.headers.get("X-Request-Env")
|
||||
request_env = request.headers.get("X-Request-Env") if has_request_context() else None
|
||||
if request_env:
|
||||
matched_servers = [server["url"] for server in openapi["servers"] if server["env"] == request_env]
|
||||
server_url = matched_servers[0] if matched_servers else server_url
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import logging
|
||||
|
||||
from core.tools.entities.tool_entities import ToolProviderType
|
||||
from core.tools.errors import ToolProviderNotFoundError
|
||||
from core.tools.tool_manager import ToolManager
|
||||
from core.tools.utils.configuration import ToolParameterConfigurationManager
|
||||
from core.workflow.human_input_adapter import adapt_node_config_for_graph
|
||||
@ -38,6 +39,14 @@ def handle(sender, **kwargs):
|
||||
identity_id=f"WORKFLOW.{app.id}.{node_data.get('id')}",
|
||||
)
|
||||
manager.delete_tool_parameters_cache()
|
||||
except ToolProviderNotFoundError as exc:
|
||||
logger.info(
|
||||
"Skipped deleting tool parameters cache for workflow %s node %s "
|
||||
"because tool provider is missing: %s",
|
||||
app.id,
|
||||
node_data.get("id"),
|
||||
exc,
|
||||
)
|
||||
except Exception:
|
||||
# tool dose not exist
|
||||
logger.exception(
|
||||
|
||||
@ -15,14 +15,18 @@ def init_app(app: DifyApp):
|
||||
data_migrate,
|
||||
delete_archived_workflow_runs,
|
||||
export_app_messages,
|
||||
export_migration_data,
|
||||
export_migration_data_template,
|
||||
extract_plugins,
|
||||
extract_unique_plugins,
|
||||
file_usage,
|
||||
fix_app_site_missing,
|
||||
import_migration_data,
|
||||
install_plugins,
|
||||
install_rag_pipeline_plugins,
|
||||
migrate_data_for_plugin,
|
||||
migrate_oss,
|
||||
migration_data_wizard,
|
||||
old_metadata_migration,
|
||||
remove_orphaned_files_on_storage,
|
||||
reset_email,
|
||||
@ -70,6 +74,10 @@ def init_app(app: DifyApp):
|
||||
clean_workflow_runs,
|
||||
clean_expired_messages,
|
||||
export_app_messages,
|
||||
export_migration_data,
|
||||
export_migration_data_template,
|
||||
import_migration_data,
|
||||
migration_data_wizard,
|
||||
]
|
||||
for cmd in cmds_to_register:
|
||||
app.cli.add_command(cmd)
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import posixpath
|
||||
from collections.abc import Generator
|
||||
from typing import override
|
||||
|
||||
import oss2 as aliyun_s3
|
||||
|
||||
@ -29,9 +30,11 @@ class AliyunOssStorage(BaseStorage):
|
||||
cloudbox_id=dify_config.ALIYUN_CLOUDBOX_ID,
|
||||
)
|
||||
|
||||
@override
|
||||
def save(self, filename, data):
|
||||
self.client.put_object(self.__wrapper_folder_filename(filename), data)
|
||||
|
||||
@override
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
obj = self.client.get_object(self.__wrapper_folder_filename(filename))
|
||||
data = obj.read()
|
||||
@ -39,17 +42,21 @@ class AliyunOssStorage(BaseStorage):
|
||||
return b""
|
||||
return data
|
||||
|
||||
@override
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
obj = self.client.get_object(self.__wrapper_folder_filename(filename))
|
||||
while chunk := obj.read(4096):
|
||||
yield chunk
|
||||
|
||||
@override
|
||||
def download(self, filename: str, target_filepath):
|
||||
self.client.get_object_to_file(self.__wrapper_folder_filename(filename), target_filepath)
|
||||
|
||||
@override
|
||||
def exists(self, filename: str):
|
||||
return self.client.object_exists(self.__wrapper_folder_filename(filename))
|
||||
|
||||
@override
|
||||
def delete(self, filename: str):
|
||||
self.client.delete_object(self.__wrapper_folder_filename(filename))
|
||||
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import logging
|
||||
from collections.abc import Generator
|
||||
from typing import override
|
||||
|
||||
import boto3
|
||||
from botocore.client import Config
|
||||
@ -48,9 +49,11 @@ class AwsS3Storage(BaseStorage):
|
||||
# other error, raise exception
|
||||
raise
|
||||
|
||||
@override
|
||||
def save(self, filename, data):
|
||||
self.client.put_object(Bucket=self.bucket_name, Key=filename, Body=data)
|
||||
|
||||
@override
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
try:
|
||||
data: bytes = self.client.get_object(Bucket=self.bucket_name, Key=filename)["Body"].read()
|
||||
@ -61,6 +64,7 @@ class AwsS3Storage(BaseStorage):
|
||||
raise
|
||||
return data
|
||||
|
||||
@override
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
try:
|
||||
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
|
||||
@ -73,9 +77,11 @@ class AwsS3Storage(BaseStorage):
|
||||
else:
|
||||
raise
|
||||
|
||||
@override
|
||||
def download(self, filename, target_filepath):
|
||||
self.client.download_file(self.bucket_name, filename, target_filepath)
|
||||
|
||||
@override
|
||||
def exists(self, filename):
|
||||
try:
|
||||
self.client.head_object(Bucket=self.bucket_name, Key=filename)
|
||||
@ -83,5 +89,6 @@ class AwsS3Storage(BaseStorage):
|
||||
except:
|
||||
return False
|
||||
|
||||
@override
|
||||
def delete(self, filename: str):
|
||||
self.client.delete_object(Bucket=self.bucket_name, Key=filename)
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
from collections.abc import Generator
|
||||
from datetime import timedelta
|
||||
from typing import override
|
||||
|
||||
from azure.identity import ChainedTokenCredential, DefaultAzureCredential
|
||||
from azure.storage.blob import AccountSasPermissions, BlobServiceClient, ResourceTypes, generate_account_sas
|
||||
@ -26,6 +27,7 @@ class AzureBlobStorage(BaseStorage):
|
||||
else:
|
||||
self.credential = None
|
||||
|
||||
@override
|
||||
def save(self, filename, data):
|
||||
if not self.bucket_name:
|
||||
return
|
||||
@ -34,6 +36,7 @@ class AzureBlobStorage(BaseStorage):
|
||||
blob_container = client.get_container_client(container=self.bucket_name)
|
||||
blob_container.upload_blob(filename, data)
|
||||
|
||||
@override
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
if not self.bucket_name:
|
||||
raise FileNotFoundError("Azure bucket name is not configured.")
|
||||
@ -46,6 +49,7 @@ class AzureBlobStorage(BaseStorage):
|
||||
raise TypeError(f"Expected bytes from blob.readall(), got {type(data).__name__}")
|
||||
return data
|
||||
|
||||
@override
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
if not self.bucket_name:
|
||||
raise FileNotFoundError("Azure bucket name is not configured.")
|
||||
@ -55,6 +59,7 @@ class AzureBlobStorage(BaseStorage):
|
||||
blob_data = blob.download_blob()
|
||||
yield from blob_data.chunks()
|
||||
|
||||
@override
|
||||
def download(self, filename, target_filepath):
|
||||
if not self.bucket_name:
|
||||
return
|
||||
@ -66,6 +71,7 @@ class AzureBlobStorage(BaseStorage):
|
||||
blob_data = blob.download_blob()
|
||||
blob_data.readinto(my_blob)
|
||||
|
||||
@override
|
||||
def exists(self, filename):
|
||||
if not self.bucket_name:
|
||||
return False
|
||||
@ -75,6 +81,7 @@ class AzureBlobStorage(BaseStorage):
|
||||
blob = client.get_blob_client(container=self.bucket_name, blob=filename)
|
||||
return blob.exists()
|
||||
|
||||
@override
|
||||
def delete(self, filename: str):
|
||||
if not self.bucket_name:
|
||||
return
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import base64
|
||||
import hashlib
|
||||
from collections.abc import Generator
|
||||
from typing import override
|
||||
|
||||
from baidubce.auth.bce_credentials import BceCredentials
|
||||
from baidubce.bce_client_configuration import BceClientConfiguration
|
||||
@ -26,6 +27,7 @@ class BaiduObsStorage(BaseStorage):
|
||||
|
||||
self.client = BosClient(config=client_config)
|
||||
|
||||
@override
|
||||
def save(self, filename, data):
|
||||
md5 = hashlib.md5()
|
||||
md5.update(data)
|
||||
@ -34,24 +36,29 @@ class BaiduObsStorage(BaseStorage):
|
||||
bucket_name=self.bucket_name, key=filename, data=data, content_length=len(data), content_md5=content_md5
|
||||
)
|
||||
|
||||
@override
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
response = self.client.get_object(bucket_name=self.bucket_name, key=filename)
|
||||
data: bytes = response.data.read()
|
||||
return data
|
||||
|
||||
@override
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
response = self.client.get_object(bucket_name=self.bucket_name, key=filename).data
|
||||
while chunk := response.read(4096):
|
||||
yield chunk
|
||||
|
||||
@override
|
||||
def download(self, filename, target_filepath):
|
||||
self.client.get_object_to_file(bucket_name=self.bucket_name, key=filename, file_name=target_filepath)
|
||||
|
||||
@override
|
||||
def exists(self, filename):
|
||||
res = self.client.get_object_meta_data(bucket_name=self.bucket_name, key=filename)
|
||||
if res is None:
|
||||
return False
|
||||
return True
|
||||
|
||||
@override
|
||||
def delete(self, filename: str):
|
||||
self.client.delete_object(bucket_name=self.bucket_name, key=filename)
|
||||
|
||||
@ -10,7 +10,7 @@ import tempfile
|
||||
from collections.abc import Generator
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from typing import Any, override
|
||||
|
||||
import clickzetta
|
||||
from pydantic import BaseModel, model_validator
|
||||
@ -251,6 +251,7 @@ class ClickZettaVolumeStorage(BaseStorage):
|
||||
# Don't raise exception, let the operation continue
|
||||
# The table might exist but not be visible due to permissions
|
||||
|
||||
@override
|
||||
def save(self, filename: str, data: bytes):
|
||||
"""Save data to ClickZetta Volume.
|
||||
|
||||
@ -304,6 +305,7 @@ class ClickZettaVolumeStorage(BaseStorage):
|
||||
# Clean up temporary file
|
||||
Path(temp_file_path).unlink(missing_ok=True)
|
||||
|
||||
@override
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
"""Load file content from ClickZetta Volume.
|
||||
|
||||
@ -364,6 +366,7 @@ class ClickZettaVolumeStorage(BaseStorage):
|
||||
logger.debug("File %s loaded from ClickZetta Volume", filename)
|
||||
return content
|
||||
|
||||
@override
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
"""Load file as stream from ClickZetta Volume.
|
||||
|
||||
@ -382,6 +385,7 @@ class ClickZettaVolumeStorage(BaseStorage):
|
||||
|
||||
logger.debug("File %s loaded as stream from ClickZetta Volume", filename)
|
||||
|
||||
@override
|
||||
def download(self, filename: str, target_filepath: str):
|
||||
"""Download file from ClickZetta Volume to local path.
|
||||
|
||||
@ -395,6 +399,7 @@ class ClickZettaVolumeStorage(BaseStorage):
|
||||
|
||||
logger.debug("File %s downloaded from ClickZetta Volume to %s", filename, target_filepath)
|
||||
|
||||
@override
|
||||
def exists(self, filename: str) -> bool:
|
||||
"""Check if file exists in ClickZetta Volume.
|
||||
|
||||
@ -436,6 +441,7 @@ class ClickZettaVolumeStorage(BaseStorage):
|
||||
logger.warning("Error checking file existence for %s: %s", filename, e)
|
||||
return False
|
||||
|
||||
@override
|
||||
def delete(self, filename: str):
|
||||
"""Delete file from ClickZetta Volume.
|
||||
|
||||
@ -472,6 +478,7 @@ class ClickZettaVolumeStorage(BaseStorage):
|
||||
|
||||
logger.debug("File %s deleted from ClickZetta Volume", filename)
|
||||
|
||||
@override
|
||||
def scan(self, path: str, files: bool = True, directories: bool = False) -> list[str]:
|
||||
"""Scan files and directories in ClickZetta Volume.
|
||||
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import base64
|
||||
import io
|
||||
from collections.abc import Generator
|
||||
from typing import Any
|
||||
from typing import Any, override
|
||||
|
||||
from google.cloud import storage as google_cloud_storage # type: ignore
|
||||
from pydantic import TypeAdapter
|
||||
@ -29,12 +29,14 @@ class GoogleCloudStorage(BaseStorage):
|
||||
else:
|
||||
self.client = google_cloud_storage.Client()
|
||||
|
||||
@override
|
||||
def save(self, filename, data):
|
||||
bucket = self.client.get_bucket(self.bucket_name)
|
||||
blob = bucket.blob(filename)
|
||||
with io.BytesIO(data) as stream:
|
||||
blob.upload_from_file(stream)
|
||||
|
||||
@override
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
bucket = self.client.get_bucket(self.bucket_name)
|
||||
blob = bucket.get_blob(filename)
|
||||
@ -43,6 +45,7 @@ class GoogleCloudStorage(BaseStorage):
|
||||
data: bytes = blob.download_as_bytes()
|
||||
return data
|
||||
|
||||
@override
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
bucket = self.client.get_bucket(self.bucket_name)
|
||||
blob = bucket.get_blob(filename)
|
||||
@ -52,6 +55,7 @@ class GoogleCloudStorage(BaseStorage):
|
||||
while chunk := blob_stream.read(4096):
|
||||
yield chunk
|
||||
|
||||
@override
|
||||
def download(self, filename, target_filepath):
|
||||
bucket = self.client.get_bucket(self.bucket_name)
|
||||
blob = bucket.get_blob(filename)
|
||||
@ -59,11 +63,13 @@ class GoogleCloudStorage(BaseStorage):
|
||||
raise FileNotFoundError("File not found")
|
||||
blob.download_to_filename(target_filepath)
|
||||
|
||||
@override
|
||||
def exists(self, filename):
|
||||
bucket = self.client.get_bucket(self.bucket_name)
|
||||
blob = bucket.blob(filename)
|
||||
return blob.exists()
|
||||
|
||||
@override
|
||||
def delete(self, filename: str):
|
||||
bucket = self.client.get_bucket(self.bucket_name)
|
||||
bucket.delete_blob(filename)
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
from collections.abc import Generator
|
||||
from typing import override
|
||||
|
||||
from obs import ObsClient
|
||||
|
||||
@ -20,27 +21,33 @@ class HuaweiObsStorage(BaseStorage):
|
||||
path_style=dify_config.HUAWEI_OBS_PATH_STYLE,
|
||||
)
|
||||
|
||||
@override
|
||||
def save(self, filename, data):
|
||||
self.client.putObject(bucketName=self.bucket_name, objectKey=filename, content=data)
|
||||
|
||||
@override
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
data: bytes = self.client.getObject(bucketName=self.bucket_name, objectKey=filename)["body"].response.read()
|
||||
return data
|
||||
|
||||
@override
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
response = self.client.getObject(bucketName=self.bucket_name, objectKey=filename)["body"].response
|
||||
while chunk := response.read(4096):
|
||||
yield chunk
|
||||
|
||||
@override
|
||||
def download(self, filename, target_filepath):
|
||||
self.client.getObject(bucketName=self.bucket_name, objectKey=filename, downloadPath=target_filepath)
|
||||
|
||||
@override
|
||||
def exists(self, filename):
|
||||
res = self._get_meta(filename)
|
||||
if res is None:
|
||||
return False
|
||||
return True
|
||||
|
||||
@override
|
||||
def delete(self, filename: str):
|
||||
self.client.deleteObject(bucketName=self.bucket_name, objectKey=filename)
|
||||
|
||||
|
||||
@ -2,7 +2,7 @@ import logging
|
||||
import os
|
||||
from collections.abc import Generator
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from typing import Any, override
|
||||
|
||||
import opendal
|
||||
from dotenv import dotenv_values
|
||||
@ -41,10 +41,12 @@ class OpenDALStorage(BaseStorage):
|
||||
logger.debug("opendal operator created with scheme %s", scheme)
|
||||
logger.debug("added retry layer to opendal operator")
|
||||
|
||||
@override
|
||||
def save(self, filename: str, data: bytes):
|
||||
self.op.write(path=filename, bs=data)
|
||||
logger.debug("file %s saved", filename)
|
||||
|
||||
@override
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
if not self.exists(filename):
|
||||
raise FileNotFoundError("File not found")
|
||||
@ -53,6 +55,7 @@ class OpenDALStorage(BaseStorage):
|
||||
logger.debug("file %s loaded", filename)
|
||||
return content
|
||||
|
||||
@override
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
if not self.exists(filename):
|
||||
raise FileNotFoundError("File not found")
|
||||
@ -67,6 +70,7 @@ class OpenDALStorage(BaseStorage):
|
||||
yield chunk
|
||||
logger.debug("file %s loaded as stream", filename)
|
||||
|
||||
@override
|
||||
def download(self, filename: str, target_filepath: str):
|
||||
if not self.exists(filename):
|
||||
raise FileNotFoundError("File not found")
|
||||
@ -74,9 +78,11 @@ class OpenDALStorage(BaseStorage):
|
||||
Path(target_filepath).write_bytes(self.op.read(path=filename))
|
||||
logger.debug("file %s downloaded to %s", filename, target_filepath)
|
||||
|
||||
@override
|
||||
def exists(self, filename: str) -> bool:
|
||||
return self.op.exists(path=filename)
|
||||
|
||||
@override
|
||||
def delete(self, filename: str):
|
||||
if self.exists(filename):
|
||||
self.op.delete(path=filename)
|
||||
@ -84,6 +90,7 @@ class OpenDALStorage(BaseStorage):
|
||||
return
|
||||
logger.debug("file %s not found, skip delete", filename)
|
||||
|
||||
@override
|
||||
def scan(self, path: str, files: bool = True, directories: bool = False) -> list[str]:
|
||||
if not self.exists(path):
|
||||
raise FileNotFoundError("Path not found")
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
from collections.abc import Generator
|
||||
from typing import override
|
||||
|
||||
import boto3
|
||||
from botocore.exceptions import ClientError
|
||||
@ -22,9 +23,11 @@ class OracleOCIStorage(BaseStorage):
|
||||
region_name=dify_config.OCI_REGION,
|
||||
)
|
||||
|
||||
@override
|
||||
def save(self, filename, data):
|
||||
self.client.put_object(Bucket=self.bucket_name, Key=filename, Body=data)
|
||||
|
||||
@override
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
try:
|
||||
data: bytes = self.client.get_object(Bucket=self.bucket_name, Key=filename)["Body"].read()
|
||||
@ -35,6 +38,7 @@ class OracleOCIStorage(BaseStorage):
|
||||
raise
|
||||
return data
|
||||
|
||||
@override
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
try:
|
||||
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
|
||||
@ -45,9 +49,11 @@ class OracleOCIStorage(BaseStorage):
|
||||
else:
|
||||
raise
|
||||
|
||||
@override
|
||||
def download(self, filename, target_filepath):
|
||||
self.client.download_file(self.bucket_name, filename, target_filepath)
|
||||
|
||||
@override
|
||||
def exists(self, filename):
|
||||
try:
|
||||
self.client.head_object(Bucket=self.bucket_name, Key=filename)
|
||||
@ -55,5 +61,6 @@ class OracleOCIStorage(BaseStorage):
|
||||
except:
|
||||
return False
|
||||
|
||||
@override
|
||||
def delete(self, filename: str):
|
||||
self.client.delete_object(Bucket=self.bucket_name, Key=filename)
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import io
|
||||
from collections.abc import Generator
|
||||
from pathlib import Path
|
||||
from typing import override
|
||||
|
||||
from supabase import Client
|
||||
|
||||
@ -28,29 +29,35 @@ class SupabaseStorage(BaseStorage):
|
||||
if not self.bucket_exists():
|
||||
self.client.storage.create_bucket(id=id, name=bucket_name)
|
||||
|
||||
@override
|
||||
def save(self, filename, data):
|
||||
self.client.storage.from_(self.bucket_name).upload(filename, data)
|
||||
|
||||
@override
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
content: bytes = self.client.storage.from_(self.bucket_name).download(filename)
|
||||
return content
|
||||
|
||||
@override
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
result = self.client.storage.from_(self.bucket_name).download(filename)
|
||||
byte_stream = io.BytesIO(result)
|
||||
while chunk := byte_stream.read(4096): # Read in chunks of 4KB
|
||||
yield chunk
|
||||
|
||||
@override
|
||||
def download(self, filename, target_filepath):
|
||||
result = self.client.storage.from_(self.bucket_name).download(filename)
|
||||
Path(target_filepath).write_bytes(result)
|
||||
|
||||
@override
|
||||
def exists(self, filename):
|
||||
result = self.client.storage.from_(self.bucket_name).list(path=filename)
|
||||
if len(result) > 0:
|
||||
return True
|
||||
return False
|
||||
|
||||
@override
|
||||
def delete(self, filename: str):
|
||||
self.client.storage.from_(self.bucket_name).remove([filename])
|
||||
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
from collections.abc import Generator
|
||||
from typing import override
|
||||
|
||||
from qcloud_cos import CosConfig, CosS3Client
|
||||
|
||||
@ -29,23 +30,29 @@ class TencentCosStorage(BaseStorage):
|
||||
)
|
||||
self.client = CosS3Client(config)
|
||||
|
||||
@override
|
||||
def save(self, filename, data):
|
||||
self.client.put_object(Bucket=self.bucket_name, Body=data, Key=filename)
|
||||
|
||||
@override
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
data: bytes = self.client.get_object(Bucket=self.bucket_name, Key=filename)["Body"].get_raw_stream().read()
|
||||
return data
|
||||
|
||||
@override
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
|
||||
yield from response["Body"].get_stream(chunk_size=4096)
|
||||
|
||||
@override
|
||||
def download(self, filename, target_filepath):
|
||||
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
|
||||
response["Body"].get_stream_to_file(target_filepath)
|
||||
|
||||
@override
|
||||
def exists(self, filename):
|
||||
return self.client.object_exists(Bucket=self.bucket_name, Key=filename)
|
||||
|
||||
@override
|
||||
def delete(self, filename: str):
|
||||
self.client.delete_object(Bucket=self.bucket_name, Key=filename)
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
from collections.abc import Generator
|
||||
from typing import override
|
||||
|
||||
import tos
|
||||
|
||||
@ -27,11 +28,13 @@ class VolcengineTosStorage(BaseStorage):
|
||||
region=dify_config.VOLCENGINE_TOS_REGION,
|
||||
)
|
||||
|
||||
@override
|
||||
def save(self, filename, data):
|
||||
if not self.bucket_name:
|
||||
raise ValueError("VOLCENGINE_TOS_BUCKET_NAME is not set")
|
||||
self.client.put_object(bucket=self.bucket_name, key=filename, content=data)
|
||||
|
||||
@override
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
if not self.bucket_name:
|
||||
raise FileNotFoundError("VOLCENGINE_TOS_BUCKET_NAME is not set")
|
||||
@ -40,6 +43,7 @@ class VolcengineTosStorage(BaseStorage):
|
||||
raise TypeError(f"Expected bytes, got {type(data).__name__}")
|
||||
return data
|
||||
|
||||
@override
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
if not self.bucket_name:
|
||||
raise FileNotFoundError("VOLCENGINE_TOS_BUCKET_NAME is not set")
|
||||
@ -47,11 +51,13 @@ class VolcengineTosStorage(BaseStorage):
|
||||
while chunk := response.read(4096):
|
||||
yield chunk
|
||||
|
||||
@override
|
||||
def download(self, filename, target_filepath):
|
||||
if not self.bucket_name:
|
||||
raise ValueError("VOLCENGINE_TOS_BUCKET_NAME is not set")
|
||||
self.client.get_object_to_file(bucket=self.bucket_name, key=filename, file_path=target_filepath)
|
||||
|
||||
@override
|
||||
def exists(self, filename):
|
||||
if not self.bucket_name:
|
||||
return False
|
||||
@ -60,6 +66,7 @@ class VolcengineTosStorage(BaseStorage):
|
||||
return False
|
||||
return True
|
||||
|
||||
@override
|
||||
def delete(self, filename: str):
|
||||
if not self.bucket_name:
|
||||
return
|
||||
|
||||
@ -97,6 +97,7 @@ class AppDslService:
|
||||
icon: str | None = None,
|
||||
icon_background: str | None = None,
|
||||
app_id: str | None = None,
|
||||
import_app_id: str | None = None,
|
||||
) -> Import:
|
||||
"""Import an app from YAML content or URL."""
|
||||
import_id = str(uuid.uuid4())
|
||||
@ -262,6 +263,7 @@ class AppDslService:
|
||||
icon=icon,
|
||||
icon_background=icon_background,
|
||||
dependencies=check_dependencies_pending_data,
|
||||
import_app_id=import_app_id,
|
||||
)
|
||||
|
||||
draft_var_srv = WorkflowDraftVariableService(session=self._session)
|
||||
@ -385,6 +387,7 @@ class AppDslService:
|
||||
icon: str | None = None,
|
||||
icon_background: str | None = None,
|
||||
dependencies: list[PluginDependency] | None = None,
|
||||
import_app_id: str | None = None,
|
||||
) -> App:
|
||||
"""Create a new app or update an existing one."""
|
||||
app_data = data.get("app", {})
|
||||
@ -417,7 +420,7 @@ class AppDslService:
|
||||
|
||||
# Create new app
|
||||
app = App()
|
||||
app.id = str(uuid4())
|
||||
app.id = import_app_id or str(uuid4())
|
||||
app.tenant_id = account.current_tenant_id
|
||||
app.mode = app_mode
|
||||
app.name = name or app_data.get("name", "")
|
||||
|
||||
17
api/services/data_migration/__init__.py
Normal file
17
api/services/data_migration/__init__.py
Normal file
@ -0,0 +1,17 @@
|
||||
from services.data_migration.entities import (
|
||||
ConflictStrategy,
|
||||
ExportSelection,
|
||||
IdStrategy,
|
||||
ImportOptions,
|
||||
MigrationDataError,
|
||||
MigrationPackage,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"ConflictStrategy",
|
||||
"ExportSelection",
|
||||
"IdStrategy",
|
||||
"ImportOptions",
|
||||
"MigrationDataError",
|
||||
"MigrationPackage",
|
||||
]
|
||||
92
api/services/data_migration/dependency_discovery_service.py
Normal file
92
api/services/data_migration/dependency_discovery_service.py
Normal file
@ -0,0 +1,92 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
from services.data_migration.entities import DependencyKind
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DiscoveredDependency:
|
||||
kind: DependencyKind
|
||||
provider_id: str
|
||||
provider_name: str | None = None
|
||||
source: str | None = None
|
||||
|
||||
|
||||
class DependencyDiscoveryService:
|
||||
def discover_from_dsl(self, dsl: dict[str, Any]) -> list[DiscoveredDependency]:
|
||||
seen: set[tuple[DependencyKind, str]] = set()
|
||||
result: list[DiscoveredDependency] = []
|
||||
for node in self._nodes_from_dsl(dsl):
|
||||
data = node.get("data", {}) if isinstance(node, dict) else {}
|
||||
for dependency in self._dependencies_from_node(data):
|
||||
key = (dependency.kind, dependency.provider_id)
|
||||
if dependency.provider_id and key not in seen:
|
||||
seen.add(key)
|
||||
result.append(dependency)
|
||||
return result
|
||||
|
||||
def _nodes_from_dsl(self, dsl: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
nodes: list[dict[str, Any]] = []
|
||||
graph = dsl.get("graph") if isinstance(dsl, dict) else None
|
||||
if isinstance(graph, dict) and isinstance(graph.get("nodes"), list):
|
||||
nodes.extend(node for node in graph["nodes"] if isinstance(node, dict))
|
||||
workflow = dsl.get("workflow") if isinstance(dsl, dict) else None
|
||||
workflow_graph = workflow.get("graph") if isinstance(workflow, dict) else None
|
||||
if isinstance(workflow_graph, dict) and isinstance(workflow_graph.get("nodes"), list):
|
||||
nodes.extend(node for node in workflow_graph["nodes"] if isinstance(node, dict))
|
||||
return nodes
|
||||
|
||||
def _dependencies_from_node(self, data: dict[str, Any]) -> list[DiscoveredDependency]:
|
||||
dependencies: list[DiscoveredDependency] = []
|
||||
node_type = data.get("type")
|
||||
if node_type == "tool":
|
||||
dependency = self._from_tool_config(data, source="tool_node")
|
||||
if dependency:
|
||||
dependencies.append(dependency)
|
||||
if node_type == "agent":
|
||||
for tool_config in self._agent_tool_configs(data):
|
||||
if isinstance(tool_config, dict):
|
||||
dependency = self._from_tool_config(tool_config, source="agent_node")
|
||||
if dependency:
|
||||
dependencies.append(dependency)
|
||||
return dependencies
|
||||
|
||||
def _agent_tool_configs(self, data: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
configs = data.get("tools")
|
||||
if isinstance(configs, list):
|
||||
return [config for config in configs if isinstance(config, dict)]
|
||||
agent_parameters = data.get("agent_parameters")
|
||||
if not isinstance(agent_parameters, dict):
|
||||
return []
|
||||
tools_parameter = agent_parameters.get("tools")
|
||||
if not isinstance(tools_parameter, dict):
|
||||
return []
|
||||
value = tools_parameter.get("value", [])
|
||||
if not isinstance(value, list):
|
||||
return []
|
||||
return [config for config in value if isinstance(config, dict)]
|
||||
|
||||
def _from_tool_config(self, config: dict[str, Any], *, source: str) -> DiscoveredDependency | None:
|
||||
provider_id = config.get("provider_id") or config.get("provider_name") or config.get("provider")
|
||||
if not provider_id:
|
||||
return None
|
||||
provider_type = str(config.get("provider_type") or config.get("type") or "")
|
||||
kind = self._kind_from_provider_type(provider_type)
|
||||
return DiscoveredDependency(
|
||||
kind=kind,
|
||||
provider_id=str(provider_id),
|
||||
provider_name=config.get("provider_name"),
|
||||
source=source,
|
||||
)
|
||||
|
||||
def _kind_from_provider_type(self, provider_type: str) -> DependencyKind:
|
||||
normalized = provider_type.lower()
|
||||
if normalized in {"api", "custom", "api_tool"}:
|
||||
return DependencyKind.API_TOOL
|
||||
if normalized in {"workflow", "workflow_tool"}:
|
||||
return DependencyKind.WORKFLOW_TOOL
|
||||
if normalized == "mcp":
|
||||
return DependencyKind.MCP_TOOL
|
||||
return DependencyKind.BUILTIN_OR_PLUGIN_TOOL
|
||||
241
api/services/data_migration/entities.py
Normal file
241
api/services/data_migration/entities.py
Normal file
@ -0,0 +1,241 @@
|
||||
"""Typed entities for versioned cross-environment migration packages.
|
||||
|
||||
This module is intentionally side-effect free. It owns only value objects and
|
||||
validation for migration package/config shapes; command output and database I/O
|
||||
belong in adapter and service modules built on top of these entities.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from enum import StrEnum
|
||||
from typing import Any, Literal, TypedDict
|
||||
|
||||
|
||||
class MigrationDataError(ValueError):
|
||||
"""Raised when migration config or package data is invalid."""
|
||||
|
||||
|
||||
class IdStrategy(StrEnum):
|
||||
PRESERVE_ID = "preserve-id"
|
||||
GENERATE_NEW_ID = "generate-new-id"
|
||||
|
||||
|
||||
class ConflictStrategy(StrEnum):
|
||||
FAIL = "fail"
|
||||
SKIP = "skip"
|
||||
UPDATE = "update"
|
||||
|
||||
|
||||
class ResourceType(StrEnum):
|
||||
WORKFLOW = "workflow"
|
||||
API_TOOL = "api_tool"
|
||||
WORKFLOW_TOOL = "workflow_tool"
|
||||
MCP_TOOL = "mcp_tool"
|
||||
DEPENDENCY = "dependency"
|
||||
|
||||
|
||||
class DependencyKind(StrEnum):
|
||||
API_TOOL = "api_tool"
|
||||
WORKFLOW_TOOL = "workflow_tool"
|
||||
MCP_TOOL = "mcp_tool"
|
||||
BUILTIN_OR_PLUGIN_TOOL = "builtin_or_plugin_tool"
|
||||
UNRESOLVED = "unresolved"
|
||||
|
||||
|
||||
class TargetTenantSelector(TypedDict, total=False):
|
||||
id: str
|
||||
name: str
|
||||
|
||||
|
||||
def _parse_target_tenant(value: Any) -> TargetTenantSelector | None:
|
||||
if value is None:
|
||||
return None
|
||||
if not isinstance(value, dict):
|
||||
raise MigrationDataError("metadata.target_tenant must be an object when provided.")
|
||||
target: TargetTenantSelector = {}
|
||||
target_id = value.get("id")
|
||||
if target_id is not None:
|
||||
if not isinstance(target_id, str):
|
||||
raise MigrationDataError("metadata.target_tenant.id must be a string.")
|
||||
target["id"] = target_id
|
||||
target_name = value.get("name")
|
||||
if target_name is not None:
|
||||
if not isinstance(target_name, str):
|
||||
raise MigrationDataError("metadata.target_tenant.name must be a string.")
|
||||
target["name"] = target_name
|
||||
unsupported_keys = sorted(set(value.keys()) - {"id", "name"})
|
||||
if unsupported_keys:
|
||||
raise MigrationDataError(f"metadata.target_tenant contains unsupported fields: {unsupported_keys}")
|
||||
return target
|
||||
|
||||
|
||||
def _parse_package_section(value: Any, section: str) -> list[dict[str, Any]]:
|
||||
if value is None:
|
||||
return []
|
||||
if not isinstance(value, list):
|
||||
raise MigrationDataError(f"Migration package field '{section}' must be a list.")
|
||||
for item in value:
|
||||
if not isinstance(item, dict):
|
||||
raise MigrationDataError(f"Migration package field '{section}' must contain only objects.")
|
||||
return value
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class SourceTenant:
|
||||
id: str
|
||||
name: str
|
||||
|
||||
@classmethod
|
||||
def from_mapping(cls, value: dict[str, Any]) -> SourceTenant:
|
||||
return cls(id=str(value.get("id", "")), name=str(value.get("name", "")))
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ImportOptions:
|
||||
create_app_api_token_on_import: bool = False
|
||||
id_strategy: IdStrategy = IdStrategy.PRESERVE_ID
|
||||
conflict_strategy: ConflictStrategy = ConflictStrategy.FAIL
|
||||
|
||||
@classmethod
|
||||
def from_mapping(cls, value: dict[str, Any] | None) -> ImportOptions:
|
||||
value = value or {}
|
||||
try:
|
||||
id_strategy = IdStrategy(value.get("id_strategy", IdStrategy.PRESERVE_ID))
|
||||
except ValueError as exc:
|
||||
raise MigrationDataError(f"Unsupported import_options.id_strategy: {value.get('id_strategy')}") from exc
|
||||
try:
|
||||
conflict_strategy = ConflictStrategy(value.get("conflict_strategy", ConflictStrategy.FAIL))
|
||||
except ValueError as exc:
|
||||
raise MigrationDataError(
|
||||
f"Unsupported import_options.conflict_strategy: {value.get('conflict_strategy')}"
|
||||
) from exc
|
||||
return cls(
|
||||
create_app_api_token_on_import=bool(value.get("create_app_api_token_on_import", False)),
|
||||
id_strategy=id_strategy,
|
||||
conflict_strategy=conflict_strategy,
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MigrationMetadata:
|
||||
version: str
|
||||
source_scope: Literal["single"]
|
||||
source_tenants: list[SourceTenant]
|
||||
target_tenant: TargetTenantSelector | None = None
|
||||
created_at: str | None = None
|
||||
include_secrets: bool = False
|
||||
import_options: ImportOptions = field(default_factory=ImportOptions)
|
||||
|
||||
@classmethod
|
||||
def from_mapping(cls, value: dict[str, Any]) -> MigrationMetadata:
|
||||
version = value.get("version")
|
||||
if not version:
|
||||
raise MigrationDataError("Migration package must include metadata.version.")
|
||||
source_scope = value.get("source_scope", "single")
|
||||
if source_scope != "single":
|
||||
raise MigrationDataError(f"Unsupported source_scope: {source_scope}")
|
||||
source_tenants = [
|
||||
SourceTenant.from_mapping(item) for item in value.get("source_tenants", []) if isinstance(item, dict)
|
||||
]
|
||||
return cls(
|
||||
version=str(version),
|
||||
source_scope="single",
|
||||
source_tenants=source_tenants,
|
||||
target_tenant=_parse_target_tenant(value.get("target_tenant")),
|
||||
created_at=value.get("created_at"),
|
||||
include_secrets=bool(value.get("include_secrets", False)),
|
||||
import_options=ImportOptions.from_mapping(value.get("import_options")),
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MigrationPackage:
|
||||
metadata: MigrationMetadata
|
||||
workflows: list[dict[str, Any]] = field(default_factory=list)
|
||||
tools: list[dict[str, Any]] = field(default_factory=list)
|
||||
workflow_tools: list[dict[str, Any]] = field(default_factory=list)
|
||||
mcp_tools: list[dict[str, Any]] = field(default_factory=list)
|
||||
dependencies: list[dict[str, Any]] = field(default_factory=list)
|
||||
|
||||
@classmethod
|
||||
def from_mapping(cls, value: dict[str, Any]) -> MigrationPackage:
|
||||
metadata_value = value.get("metadata")
|
||||
if not isinstance(metadata_value, dict):
|
||||
raise MigrationDataError("Migration package must include metadata.version.")
|
||||
return cls(
|
||||
metadata=MigrationMetadata.from_mapping(metadata_value),
|
||||
workflows=_parse_package_section(value.get("workflows"), "workflows"),
|
||||
tools=_parse_package_section(value.get("tools"), "tools"),
|
||||
workflow_tools=_parse_package_section(value.get("workflow_tools"), "workflow_tools"),
|
||||
mcp_tools=_parse_package_section(value.get("mcp_tools"), "mcp_tools"),
|
||||
dependencies=_parse_package_section(value.get("dependencies"), "dependencies"),
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ExportSelection:
|
||||
source_tenant_name: str
|
||||
app_ids: list[str]
|
||||
source_tenant_id: str | None = None
|
||||
export_all_apps: bool = False
|
||||
include_referenced_tools: bool = True
|
||||
additional_api_tools: list[str] = field(default_factory=list)
|
||||
additional_workflow_tools: list[str] = field(default_factory=list)
|
||||
additional_mcp_tools: list[str] = field(default_factory=list)
|
||||
include_secrets: bool = False
|
||||
import_options: ImportOptions = field(default_factory=ImportOptions)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ResourceReportItem:
|
||||
resource_type: ResourceType
|
||||
identifier: str
|
||||
name: str | None
|
||||
status: str
|
||||
message: str | None = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ResourceIdMapping:
|
||||
resource_type: ResourceType
|
||||
name: str | None
|
||||
source_id: str
|
||||
target_id: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ExportResult:
|
||||
package: MigrationPackage
|
||||
report_items: list[ResourceReportItem]
|
||||
report_context: ReportContext | None = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ImportTarget:
|
||||
tenant_id: str
|
||||
tenant_name: str
|
||||
operator_id: str
|
||||
operator_email: str | None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ImportResult:
|
||||
report_items: list[ResourceReportItem]
|
||||
id_mapping: dict[str, str] = field(default_factory=dict)
|
||||
report_context: ReportContext | None = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ReportContext:
|
||||
output_path: str | None = None
|
||||
source_scope: str | None = None
|
||||
selected_app_count: int | None = None
|
||||
include_secrets: bool | None = None
|
||||
target_tenant: str | None = None
|
||||
operator_email: str | None = None
|
||||
app_api_tokens_created: int = 0
|
||||
app_api_tokens_reused: int = 0
|
||||
id_mapping_count: int = 0
|
||||
id_mappings: dict[str, str] = field(default_factory=dict)
|
||||
id_mapping_details: list[ResourceIdMapping] = field(default_factory=list)
|
||||
492
api/services/data_migration/export_service.py
Normal file
492
api/services/data_migration/export_service.py
Normal file
@ -0,0 +1,492 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Iterable
|
||||
from typing import Any
|
||||
from uuid import UUID
|
||||
|
||||
import sqlalchemy as sa
|
||||
import yaml
|
||||
|
||||
from core.tools.tool_manager import ToolManager
|
||||
from extensions.ext_database import db
|
||||
from graphon.model_runtime.utils.encoders import jsonable_encoder
|
||||
from models import Account, Tenant
|
||||
from models.account import TenantAccountJoin
|
||||
from models.model import App
|
||||
from models.tools import MCPToolProvider
|
||||
from services.app_dsl_service import AppDslService
|
||||
from services.data_migration.dependency_discovery_service import DependencyDiscoveryService, DiscoveredDependency
|
||||
from services.data_migration.entities import (
|
||||
DependencyKind,
|
||||
ExportResult,
|
||||
ExportSelection,
|
||||
ImportOptions,
|
||||
MigrationDataError,
|
||||
ReportContext,
|
||||
ResourceReportItem,
|
||||
ResourceType,
|
||||
)
|
||||
from services.data_migration.package_service import MigrationPackageService
|
||||
from services.tools.workflow_tools_manage_service import WorkflowToolManageService
|
||||
|
||||
SUPPORTED_APP_MODES = {"workflow", "advanced-chat"}
|
||||
|
||||
|
||||
class ExportConfigParser:
|
||||
def parse(self, data: dict[str, Any]) -> ExportSelection:
|
||||
if not isinstance(data, dict):
|
||||
raise MigrationDataError("Export config JSON must be an object.")
|
||||
|
||||
source_tenant = self._source_tenant(data)
|
||||
source_tenant_name = self._source_tenant_name(source_tenant, data)
|
||||
apps = self._mapping(data.get("apps"), field_name="apps")
|
||||
self._validate_source_scope(data)
|
||||
self._validate_app_modes(apps.get("modes", []))
|
||||
|
||||
additional_tools = self._mapping(data.get("additional_tools"), field_name="additional_tools")
|
||||
return ExportSelection(
|
||||
source_tenant_name=source_tenant_name,
|
||||
app_ids=self._string_list(apps.get("ids", data.get("workflows", [])), field_name="apps.ids"),
|
||||
source_tenant_id=source_tenant.get("id"),
|
||||
export_all_apps=bool(apps.get("all", data.get("export_all_workflows", False))),
|
||||
include_referenced_tools=bool(data.get("include_referenced_tools", True)),
|
||||
additional_api_tools=self._string_list(
|
||||
additional_tools.get("api_tools", data.get("tools", [])), field_name="additional_tools.api_tools"
|
||||
),
|
||||
additional_workflow_tools=self._string_list(
|
||||
additional_tools.get("workflow_tools", data.get("workflow_tools", [])),
|
||||
field_name="additional_tools.workflow_tools",
|
||||
),
|
||||
additional_mcp_tools=self._string_list(
|
||||
additional_tools.get("mcp_tools", data.get("mcp_tools", [])),
|
||||
field_name="additional_tools.mcp_tools",
|
||||
),
|
||||
include_secrets=bool(data.get("include_secrets", False)),
|
||||
import_options=ImportOptions.from_mapping(data.get("import_options")),
|
||||
)
|
||||
|
||||
def _source_tenant(self, data: dict[str, Any]) -> dict[str, Any]:
|
||||
if "source_tenant" in data:
|
||||
return self._mapping(data.get("source_tenant"), field_name="source_tenant")
|
||||
return {}
|
||||
|
||||
def _source_tenant_name(self, source_tenant: dict[str, Any], data: dict[str, Any]) -> str:
|
||||
if source_tenant:
|
||||
source_tenant_name = source_tenant.get("name")
|
||||
if not source_tenant_name:
|
||||
raise MigrationDataError("Export config must include source_tenant.name.")
|
||||
return str(source_tenant_name)
|
||||
source_tenant_name = data.get("tenant_name")
|
||||
if not source_tenant_name:
|
||||
raise MigrationDataError("Export config must include source_tenant.name.")
|
||||
return str(source_tenant_name)
|
||||
|
||||
def _validate_source_scope(self, data: dict[str, Any]) -> None:
|
||||
source_tenant = data.get("source_tenant")
|
||||
if not isinstance(source_tenant, dict):
|
||||
return
|
||||
mode = source_tenant.get("mode", "single")
|
||||
if mode != "single":
|
||||
raise MigrationDataError(f"Unsupported source_tenant.mode: {mode}")
|
||||
|
||||
def _validate_app_modes(self, modes: Any) -> None:
|
||||
app_modes = self._string_list(modes, field_name="apps.modes") if modes else []
|
||||
unsupported_modes = sorted(set(app_modes) - SUPPORTED_APP_MODES)
|
||||
if unsupported_modes:
|
||||
raise MigrationDataError(f"Unsupported app modes for export: {unsupported_modes}")
|
||||
|
||||
def _mapping(self, value: Any, *, field_name: str) -> dict[str, Any]:
|
||||
if value is None:
|
||||
return {}
|
||||
if not isinstance(value, dict):
|
||||
raise MigrationDataError(f"Export config field '{field_name}' must be an object.")
|
||||
return value
|
||||
|
||||
def _string_list(self, value: Any, *, field_name: str) -> list[str]:
|
||||
if value is None:
|
||||
return []
|
||||
if not isinstance(value, list):
|
||||
raise MigrationDataError(f"Export config field '{field_name}' must be a list.")
|
||||
return [str(item) for item in value]
|
||||
|
||||
|
||||
class MigrationExportService:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
package_service: MigrationPackageService | None = None,
|
||||
dependency_discovery_service: DependencyDiscoveryService | None = None,
|
||||
) -> None:
|
||||
self.package_service = package_service or MigrationPackageService()
|
||||
self.dependency_discovery_service = dependency_discovery_service or DependencyDiscoveryService()
|
||||
|
||||
def export(self, selection: ExportSelection) -> ExportResult:
|
||||
tenant = self._get_tenant(selection)
|
||||
package = self.package_service.build_empty_package(
|
||||
source_tenant_id=tenant.id,
|
||||
source_tenant_name=tenant.name,
|
||||
include_secrets=selection.include_secrets,
|
||||
import_options=selection.import_options,
|
||||
)
|
||||
report_items: list[ResourceReportItem] = []
|
||||
discovered_dependencies: list[DiscoveredDependency] = []
|
||||
|
||||
apps = self._selected_apps(tenant.id, selection)
|
||||
exported_app_ids = {app.id for app in apps}
|
||||
for app in apps:
|
||||
dsl_content = AppDslService.export_dsl(app_model=app, include_secret=selection.include_secrets)
|
||||
package.workflows.append(
|
||||
{
|
||||
"id": app.id,
|
||||
"name": app.name,
|
||||
"mode": app.mode.value if hasattr(app.mode, "value") else app.mode,
|
||||
"dsl": dsl_content,
|
||||
"source_tenant_id": tenant.id,
|
||||
"create_app_api_token_on_import": selection.import_options.create_app_api_token_on_import,
|
||||
}
|
||||
)
|
||||
report_items.append(ResourceReportItem(ResourceType.WORKFLOW, app.id, app.name, "exported"))
|
||||
if selection.include_referenced_tools:
|
||||
discovered_dependencies.extend(self._discover_dependencies(dsl_content))
|
||||
|
||||
self._export_api_tools(
|
||||
tenant.id,
|
||||
self._provider_ids(selection.additional_api_tools, discovered_dependencies, DependencyKind.API_TOOL),
|
||||
include_secrets=selection.include_secrets,
|
||||
exported_tools=package.tools,
|
||||
report_items=report_items,
|
||||
)
|
||||
self._export_workflow_tools(
|
||||
tenant,
|
||||
self._provider_ids(
|
||||
selection.additional_workflow_tools, discovered_dependencies, DependencyKind.WORKFLOW_TOOL
|
||||
),
|
||||
exported_app_ids=exported_app_ids,
|
||||
exported_workflow_tools=package.workflow_tools,
|
||||
dependencies=package.dependencies,
|
||||
report_items=report_items,
|
||||
)
|
||||
self._export_mcp_tools(
|
||||
tenant_id=tenant.id,
|
||||
provider_ids=self._provider_ids(
|
||||
selection.additional_mcp_tools,
|
||||
discovered_dependencies,
|
||||
DependencyKind.MCP_TOOL,
|
||||
),
|
||||
include_secrets=selection.include_secrets,
|
||||
exported_mcp_tools=package.mcp_tools,
|
||||
dependencies=package.dependencies,
|
||||
report_items=report_items,
|
||||
)
|
||||
self._record_dependency_metadata(
|
||||
self._dependencies_by_kind(discovered_dependencies, DependencyKind.BUILTIN_OR_PLUGIN_TOOL),
|
||||
package.dependencies,
|
||||
report_items,
|
||||
)
|
||||
return ExportResult(
|
||||
package=package,
|
||||
report_items=report_items,
|
||||
report_context=ReportContext(
|
||||
source_scope=package.metadata.source_scope,
|
||||
selected_app_count=len(apps),
|
||||
include_secrets=selection.include_secrets,
|
||||
),
|
||||
)
|
||||
|
||||
def _get_tenant(self, selection: ExportSelection) -> Tenant:
|
||||
if selection.source_tenant_id:
|
||||
tenant = db.session.get(Tenant, selection.source_tenant_id)
|
||||
if tenant is None:
|
||||
raise MigrationDataError(f"Source tenant not found: {selection.source_tenant_id}")
|
||||
if tenant.name != selection.source_tenant_name:
|
||||
raise MigrationDataError(
|
||||
f"Source tenant id/name mismatch: {selection.source_tenant_id} / {selection.source_tenant_name}"
|
||||
)
|
||||
return tenant
|
||||
tenants = list(db.session.scalars(sa.select(Tenant).where(Tenant.name == selection.source_tenant_name)).all())
|
||||
if not tenants:
|
||||
raise MigrationDataError(f"Source tenant not found: {selection.source_tenant_name}")
|
||||
if len(tenants) > 1:
|
||||
raise MigrationDataError(
|
||||
f"Source tenant name is ambiguous; use source_tenant.id: {selection.source_tenant_name}"
|
||||
)
|
||||
return tenants[0]
|
||||
|
||||
def _selected_apps(self, tenant_id: str, selection: ExportSelection) -> list[App]:
|
||||
query = sa.select(App).where(App.tenant_id == tenant_id, App.mode.in_(SUPPORTED_APP_MODES))
|
||||
if not selection.export_all_apps:
|
||||
if not selection.app_ids:
|
||||
return []
|
||||
query = query.where(App.id.in_(selection.app_ids))
|
||||
apps = list(db.session.scalars(query).all())
|
||||
if not selection.export_all_apps and len(apps) != len(set(selection.app_ids)):
|
||||
found_ids = {app.id for app in apps}
|
||||
missing_ids = [app_id for app_id in selection.app_ids if app_id not in found_ids]
|
||||
raise MigrationDataError(
|
||||
f"Selected app IDs not found in source tenant or unsupported app mode: {missing_ids}"
|
||||
)
|
||||
return apps
|
||||
|
||||
def _discover_dependencies(self, dsl_content: str | dict[str, Any]) -> list[DiscoveredDependency]:
|
||||
if isinstance(dsl_content, dict):
|
||||
dsl = dsl_content
|
||||
else:
|
||||
raw_dsl = yaml.safe_load(dsl_content) if dsl_content else {}
|
||||
dsl = raw_dsl if isinstance(raw_dsl, dict) else {}
|
||||
return self.dependency_discovery_service.discover_from_dsl(dsl)
|
||||
|
||||
def _export_api_tools(
|
||||
self,
|
||||
tenant_id: str,
|
||||
provider_ids: Iterable[str],
|
||||
*,
|
||||
include_secrets: bool,
|
||||
exported_tools: list[dict[str, Any]],
|
||||
report_items: list[ResourceReportItem],
|
||||
) -> None:
|
||||
for provider_id in self._dedupe(provider_ids):
|
||||
try:
|
||||
tool_data = ToolManager.user_get_api_provider(
|
||||
provider=provider_id,
|
||||
tenant_id=tenant_id,
|
||||
mask=not include_secrets,
|
||||
)
|
||||
if not include_secrets:
|
||||
tool_data.pop("credentials", None)
|
||||
tool_data.pop("tools", None)
|
||||
tool_data["provider_name"] = provider_id
|
||||
tool_data["source_tenant_id"] = tenant_id
|
||||
exported_tools.append(tool_data)
|
||||
report_items.append(ResourceReportItem(ResourceType.API_TOOL, provider_id, provider_id, "exported"))
|
||||
except Exception as exc:
|
||||
report_items.append(
|
||||
ResourceReportItem(ResourceType.API_TOOL, provider_id, provider_id, "unresolved", str(exc))
|
||||
)
|
||||
|
||||
def _export_workflow_tools(
|
||||
self,
|
||||
tenant: Tenant,
|
||||
provider_ids: Iterable[str],
|
||||
*,
|
||||
exported_app_ids: set[str],
|
||||
exported_workflow_tools: list[dict[str, Any]],
|
||||
dependencies: list[dict[str, Any]],
|
||||
report_items: list[ResourceReportItem],
|
||||
) -> None:
|
||||
provider_ids = self._dedupe(provider_ids)
|
||||
if not provider_ids:
|
||||
return
|
||||
owner = self._get_tenant_owner(tenant.id)
|
||||
if owner is None:
|
||||
for provider_id in provider_ids:
|
||||
report_items.append(
|
||||
ResourceReportItem(
|
||||
ResourceType.WORKFLOW_TOOL,
|
||||
provider_id,
|
||||
provider_id,
|
||||
"unresolved",
|
||||
f"No owner account found for source tenant: {tenant.name}",
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
for provider_id in provider_ids:
|
||||
try:
|
||||
tool_data = WorkflowToolManageService.get_workflow_tool_by_tool_id(
|
||||
user_id=owner.id,
|
||||
tenant_id=tenant.id,
|
||||
workflow_tool_id=provider_id,
|
||||
)
|
||||
tool_info = jsonable_encoder(tool_data)
|
||||
tool_info["id"] = provider_id
|
||||
tool_info["app_id"] = tool_info.get("workflow_app_id")
|
||||
tool_info["source_tenant_id"] = tenant.id
|
||||
for field_name in ("workflow_tool_id", "workflow_app_id", "tool"):
|
||||
tool_info.pop(field_name, None)
|
||||
exported_workflow_tools.append(tool_info)
|
||||
if tool_info.get("app_id") not in exported_app_ids:
|
||||
workflow_app_id = str(tool_info.get("app_id") or "")
|
||||
workflow_app = db.session.get(App, workflow_app_id) if workflow_app_id else None
|
||||
self._record_dependency_metadata(
|
||||
[
|
||||
DiscoveredDependency(
|
||||
DependencyKind.WORKFLOW_TOOL,
|
||||
workflow_app_id,
|
||||
provider_name=workflow_app.name if workflow_app else tool_info.get("name"),
|
||||
source="workflow_tool_app",
|
||||
)
|
||||
],
|
||||
dependencies,
|
||||
report_items,
|
||||
)
|
||||
report_items.append(
|
||||
ResourceReportItem(ResourceType.WORKFLOW_TOOL, provider_id, tool_info.get("name"), "exported")
|
||||
)
|
||||
except Exception as exc:
|
||||
report_items.append(
|
||||
ResourceReportItem(ResourceType.WORKFLOW_TOOL, provider_id, provider_id, "unresolved", str(exc))
|
||||
)
|
||||
|
||||
def _get_tenant_owner(self, tenant_id: str) -> Account | None:
|
||||
return db.session.scalar(
|
||||
sa.select(Account)
|
||||
.join(TenantAccountJoin, Account.id == TenantAccountJoin.account_id)
|
||||
.where(TenantAccountJoin.tenant_id == tenant_id, TenantAccountJoin.role == "owner")
|
||||
.order_by(TenantAccountJoin.created_at.asc())
|
||||
.limit(1)
|
||||
)
|
||||
|
||||
def _export_mcp_tools(
|
||||
self,
|
||||
*,
|
||||
tenant_id: str,
|
||||
provider_ids: Iterable[str],
|
||||
include_secrets: bool,
|
||||
exported_mcp_tools: list[dict[str, Any]],
|
||||
dependencies: list[dict[str, Any]],
|
||||
report_items: list[ResourceReportItem],
|
||||
) -> None:
|
||||
for provider_id in self._dedupe(provider_ids):
|
||||
if not include_secrets:
|
||||
self._record_dependency_metadata(
|
||||
[DiscoveredDependency(DependencyKind.MCP_TOOL, provider_id, source="mcp_provider")],
|
||||
dependencies,
|
||||
report_items,
|
||||
)
|
||||
continue
|
||||
try:
|
||||
provider = self._get_mcp_provider(tenant_id, provider_id)
|
||||
exported_mcp_tools.append(self._serialize_mcp_provider(provider))
|
||||
report_items.append(ResourceReportItem(ResourceType.MCP_TOOL, provider_id, provider.name, "exported"))
|
||||
except Exception as exc:
|
||||
report_items.append(
|
||||
ResourceReportItem(ResourceType.MCP_TOOL, provider_id, provider_id, "unresolved", str(exc))
|
||||
)
|
||||
|
||||
def _get_mcp_provider(self, tenant_id: str, provider_id: str) -> MCPToolProvider:
|
||||
predicates = [MCPToolProvider.server_identifier == provider_id]
|
||||
if self._is_uuid_string(provider_id):
|
||||
predicates.append(MCPToolProvider.id == provider_id)
|
||||
provider = db.session.scalar(
|
||||
sa.select(MCPToolProvider).where(MCPToolProvider.tenant_id == tenant_id, sa.or_(*predicates))
|
||||
)
|
||||
if provider is None:
|
||||
raise MigrationDataError(f"MCP provider not found: {provider_id}")
|
||||
return provider
|
||||
|
||||
def _is_uuid_string(self, value: str) -> bool:
|
||||
try:
|
||||
UUID(value)
|
||||
except ValueError:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _serialize_mcp_provider(self, provider: MCPToolProvider) -> dict[str, Any]:
|
||||
provider_entity = provider.to_entity()
|
||||
provider_icon = provider_entity.provider_icon
|
||||
if isinstance(provider_icon, dict):
|
||||
icon = provider_icon.get("content")
|
||||
icon_background = provider_icon.get("background")
|
||||
icon_type = "emoji"
|
||||
else:
|
||||
icon = provider_icon
|
||||
icon_background = None
|
||||
icon_type = "url"
|
||||
return {
|
||||
"id": provider.id,
|
||||
"name": provider.name,
|
||||
"server_url": provider_entity.decrypt_server_url(),
|
||||
"server_identifier": provider.server_identifier,
|
||||
"icon": icon,
|
||||
"icon_background": icon_background,
|
||||
"icon_type": icon_type,
|
||||
"configuration": {"timeout": provider.timeout, "sse_read_timeout": provider.sse_read_timeout},
|
||||
"headers": provider_entity.decrypt_headers(),
|
||||
"authentication": self._serialize_mcp_authentication(provider_entity.decrypt_authentication()),
|
||||
"tools": provider.tool_dict,
|
||||
"source_tenant_id": provider.tenant_id,
|
||||
}
|
||||
|
||||
def _serialize_mcp_authentication(self, authentication: dict[str, Any] | None) -> dict[str, Any] | None:
|
||||
if not authentication or not authentication.get("client_id"):
|
||||
return None
|
||||
return {
|
||||
"client_id": authentication["client_id"],
|
||||
"client_secret": authentication.get("client_secret"),
|
||||
}
|
||||
|
||||
def _record_dependency_metadata(
|
||||
self,
|
||||
dependencies_to_record: Iterable[DiscoveredDependency],
|
||||
dependencies: list[dict[str, Any]],
|
||||
report_items: list[ResourceReportItem],
|
||||
) -> None:
|
||||
existing = {(item.get("kind"), item.get("provider_id")) for item in dependencies}
|
||||
for dependency in dependencies_to_record:
|
||||
key = (dependency.kind.value, dependency.provider_id)
|
||||
if key in existing:
|
||||
continue
|
||||
existing.add(key)
|
||||
dependencies.append(
|
||||
{
|
||||
"kind": dependency.kind.value,
|
||||
"provider_id": dependency.provider_id,
|
||||
"provider_name": dependency.provider_name,
|
||||
"source": dependency.source,
|
||||
}
|
||||
)
|
||||
report_items.append(
|
||||
ResourceReportItem(
|
||||
ResourceType.DEPENDENCY,
|
||||
dependency.provider_id,
|
||||
self._dependency_report_name(dependency),
|
||||
"dependency-only",
|
||||
self._dependency_message(dependency.kind),
|
||||
)
|
||||
)
|
||||
|
||||
def _provider_ids(
|
||||
self,
|
||||
manual_provider_ids: Iterable[str],
|
||||
discovered_dependencies: Iterable[DiscoveredDependency],
|
||||
kind: DependencyKind,
|
||||
) -> list[str]:
|
||||
provider_ids = list(manual_provider_ids)
|
||||
provider_ids.extend(
|
||||
self._provider_export_identifier(dependency)
|
||||
for dependency in discovered_dependencies
|
||||
if dependency.kind == kind
|
||||
)
|
||||
return self._dedupe(provider_ids)
|
||||
|
||||
def _provider_export_identifier(self, dependency: DiscoveredDependency) -> str:
|
||||
if dependency.kind == DependencyKind.API_TOOL and dependency.provider_name:
|
||||
return dependency.provider_name
|
||||
return dependency.provider_id
|
||||
|
||||
def _dependencies_by_kind(
|
||||
self, discovered_dependencies: Iterable[DiscoveredDependency], kind: DependencyKind
|
||||
) -> list[DiscoveredDependency]:
|
||||
return [dependency for dependency in discovered_dependencies if dependency.kind == kind]
|
||||
|
||||
def _dedupe(self, values: Iterable[str]) -> list[str]:
|
||||
seen: set[str] = set()
|
||||
result: list[str] = []
|
||||
for value in values:
|
||||
if value and value not in seen:
|
||||
seen.add(value)
|
||||
result.append(value)
|
||||
return result
|
||||
|
||||
def _dependency_message(self, kind: DependencyKind) -> str:
|
||||
if kind == DependencyKind.MCP_TOOL:
|
||||
return "Configure MCP provider manually in the target tenant unless exporting with secrets enabled."
|
||||
if kind == DependencyKind.BUILTIN_OR_PLUGIN_TOOL:
|
||||
return "Ensure the built-in or plugin tool exists in the target environment."
|
||||
return "Dependency metadata only; ensure the resource exists in the target environment."
|
||||
|
||||
def _dependency_report_name(self, dependency: DiscoveredDependency) -> str:
|
||||
name = dependency.provider_name or dependency.provider_id
|
||||
if dependency.kind == DependencyKind.WORKFLOW_TOOL:
|
||||
return f"workflow {name}"
|
||||
return f"{dependency.kind.value} {name}"
|
||||
938
api/services/data_migration/import_service.py
Normal file
938
api/services/data_migration/import_service.py
Normal file
@ -0,0 +1,938 @@
|
||||
"""Apply versioned migration packages to an explicitly resolved target tenant.
|
||||
|
||||
Import target resolution is deliberately performed before any resource import
|
||||
work. The service does not write Click output; callers receive structured
|
||||
report items and can decide how to render them.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from collections.abc import Iterable
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, cast
|
||||
from uuid import UUID
|
||||
|
||||
import sqlalchemy as sa
|
||||
import yaml
|
||||
from sqlalchemy import or_
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
from core.entities.mcp_provider import MCPAuthentication, MCPConfiguration
|
||||
from core.tools.entities.tool_entities import ApiProviderSchemaType, WorkflowToolParameterConfiguration
|
||||
from extensions.ext_database import db
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from models import Account, ApiToken, Tenant, TenantAccountJoin, TenantAccountRole
|
||||
from models.enums import ApiTokenType
|
||||
from models.model import App
|
||||
from models.tools import ApiToolProvider, MCPToolProvider, WorkflowToolProvider
|
||||
from services.app_dsl_service import AppDslService
|
||||
from services.data_migration.dependency_discovery_service import DependencyDiscoveryService
|
||||
from services.data_migration.entities import (
|
||||
ConflictStrategy,
|
||||
DependencyKind,
|
||||
IdStrategy,
|
||||
ImportOptions,
|
||||
ImportResult,
|
||||
ImportTarget,
|
||||
MigrationDataError,
|
||||
MigrationPackage,
|
||||
ReportContext,
|
||||
ResourceIdMapping,
|
||||
ResourceReportItem,
|
||||
ResourceType,
|
||||
)
|
||||
from services.entities.dsl_entities import ImportStatus
|
||||
from services.tools.api_tools_manage_service import ApiToolManageService
|
||||
from services.tools.mcp_tools_manage_service import MCPToolManageService
|
||||
from services.tools.workflow_tools_manage_service import WorkflowToolManageService
|
||||
from services.workflow_service import WorkflowService
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ImportRequest:
|
||||
"""Structured input for package import.
|
||||
|
||||
`cli_target_tenant` and `config_target_tenant` are target tenant names from
|
||||
outer adapters. They intentionally override package metadata, because a
|
||||
migration package may be reused across environments.
|
||||
"""
|
||||
|
||||
package: MigrationPackage
|
||||
cli_target_tenant: str | None = None
|
||||
config_target_tenant: str | None = None
|
||||
operator_email: str | None = None
|
||||
options_override: ImportOptions | None = None
|
||||
|
||||
|
||||
class ImportTargetResolver:
|
||||
"""Resolve the target tenant and operator before import side effects begin."""
|
||||
|
||||
def select_target_tenant_name(self, request: ImportRequest) -> str:
|
||||
if request.cli_target_tenant:
|
||||
return request.cli_target_tenant
|
||||
if request.config_target_tenant:
|
||||
return request.config_target_tenant
|
||||
package_target = request.package.metadata.target_tenant or {}
|
||||
if package_target.get("name"):
|
||||
return package_target["name"]
|
||||
if package_target.get("id"):
|
||||
return package_target["id"]
|
||||
raise MigrationDataError(
|
||||
"Target tenant must be provided by --target-tenant, import config, or package metadata."
|
||||
)
|
||||
|
||||
def resolve(self, request: ImportRequest) -> ImportTarget:
|
||||
target_tenant_name = self.select_target_tenant_name(request)
|
||||
package_target = request.package.metadata.target_tenant or {}
|
||||
if request.cli_target_tenant or request.config_target_tenant:
|
||||
tenant = self._resolve_tenant_by_id_or_name(target_tenant_name)
|
||||
elif package_target.get("id") and self._is_uuid(package_target["id"]):
|
||||
tenant = db.session.get(Tenant, package_target["id"])
|
||||
if tenant is not None and package_target.get("name") and tenant.name != package_target.get("name"):
|
||||
raise MigrationDataError(
|
||||
f"Target tenant id/name mismatch: {package_target['id']} / {package_target['name']}"
|
||||
)
|
||||
else:
|
||||
tenant = self._resolve_tenant_by_id_or_name(target_tenant_name)
|
||||
if tenant is None:
|
||||
raise MigrationDataError(f"Target tenant not found: {target_tenant_name}")
|
||||
|
||||
account_query = (
|
||||
db.session.query(Account)
|
||||
.join(TenantAccountJoin, Account.id == TenantAccountJoin.account_id)
|
||||
.filter(TenantAccountJoin.tenant_id == tenant.id)
|
||||
)
|
||||
if request.operator_email:
|
||||
account_query = account_query.filter(Account.email == request.operator_email)
|
||||
identity = request.operator_email
|
||||
else:
|
||||
account_query = account_query.filter(TenantAccountJoin.role == TenantAccountRole.OWNER).order_by(
|
||||
TenantAccountJoin.created_at.asc()
|
||||
)
|
||||
identity = "earliest owner"
|
||||
|
||||
account = account_query.first()
|
||||
if account is None:
|
||||
raise MigrationDataError(f"No operator account found for target tenant {target_tenant_name}: {identity}")
|
||||
|
||||
return ImportTarget(
|
||||
tenant_id=tenant.id,
|
||||
tenant_name=tenant.name,
|
||||
operator_id=account.id,
|
||||
operator_email=account.email,
|
||||
)
|
||||
|
||||
def _resolve_tenant_by_id_or_name(self, value: str) -> Tenant | None:
|
||||
if self._is_uuid(value):
|
||||
tenant = db.session.get(Tenant, value)
|
||||
if tenant is not None:
|
||||
return tenant
|
||||
tenants = list(db.session.scalars(sa.select(Tenant).where(Tenant.name == value)).all())
|
||||
if len(tenants) > 1:
|
||||
raise MigrationDataError(f"Target tenant name is ambiguous; use target_tenant.id: {value}")
|
||||
return tenants[0] if tenants else None
|
||||
|
||||
def _is_uuid(self, value: str) -> bool:
|
||||
try:
|
||||
UUID(value)
|
||||
except ValueError:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class MigrationImportService:
|
||||
"""Apply package resources using Dify service APIs and structured reporting."""
|
||||
|
||||
target_resolver: ImportTargetResolver
|
||||
|
||||
def __init__(self, *, target_resolver: ImportTargetResolver | None = None) -> None:
|
||||
self.target_resolver = target_resolver or ImportTargetResolver()
|
||||
|
||||
def import_package(self, request: ImportRequest) -> ImportResult:
|
||||
target = self.target_resolver.resolve(request)
|
||||
options = request.options_override or request.package.metadata.import_options
|
||||
report_items = [
|
||||
ResourceReportItem(
|
||||
resource_type=ResourceType.DEPENDENCY,
|
||||
identifier=target.tenant_id,
|
||||
name=target.tenant_name,
|
||||
status="resolved",
|
||||
message=f"operator: {target.operator_email or target.operator_id}",
|
||||
)
|
||||
]
|
||||
id_mapping: dict[str, str] = {}
|
||||
id_mapping_details: list[ResourceIdMapping] = []
|
||||
|
||||
self._import_api_tools(
|
||||
request.package,
|
||||
target,
|
||||
options,
|
||||
report_items,
|
||||
id_mapping,
|
||||
id_mapping_details,
|
||||
self._source_api_provider_ids_by_name(request.package),
|
||||
)
|
||||
self._import_mcp_tools(request.package, target, options, report_items, id_mapping, id_mapping_details)
|
||||
self._preflight_dependency_only_mcp(request.package, target, report_items)
|
||||
workflow_tool_app_ids = self._workflow_tool_source_app_ids(request.package)
|
||||
imported_workflow_ids: set[str] = set()
|
||||
if workflow_tool_app_ids:
|
||||
self._import_workflows(
|
||||
request.package,
|
||||
target,
|
||||
options,
|
||||
report_items,
|
||||
id_mapping,
|
||||
id_mapping_details=id_mapping_details,
|
||||
imported_workflow_ids=imported_workflow_ids,
|
||||
only_app_ids=workflow_tool_app_ids,
|
||||
)
|
||||
self._import_workflow_tools(request.package, target, options, id_mapping, id_mapping_details, report_items)
|
||||
self._import_workflows(
|
||||
request.package,
|
||||
target,
|
||||
options,
|
||||
report_items,
|
||||
id_mapping,
|
||||
id_mapping_details=id_mapping_details,
|
||||
imported_workflow_ids=imported_workflow_ids,
|
||||
skip_app_ids=imported_workflow_ids,
|
||||
)
|
||||
return ImportResult(
|
||||
report_items=report_items,
|
||||
id_mapping=id_mapping,
|
||||
report_context=ReportContext(
|
||||
target_tenant=target.tenant_name,
|
||||
operator_email=target.operator_email,
|
||||
id_mapping_count=len(id_mapping),
|
||||
id_mappings=dict(id_mapping),
|
||||
id_mapping_details=id_mapping_details,
|
||||
),
|
||||
)
|
||||
|
||||
def _import_workflows(
|
||||
self,
|
||||
package: MigrationPackage,
|
||||
target: ImportTarget,
|
||||
options: ImportOptions,
|
||||
report_items: list[ResourceReportItem],
|
||||
id_mapping: dict[str, str],
|
||||
id_mapping_details: list[ResourceIdMapping],
|
||||
imported_workflow_ids: set[str] | None = None,
|
||||
only_app_ids: set[str] | None = None,
|
||||
skip_app_ids: set[str] | None = None,
|
||||
) -> None:
|
||||
account = db.session.get(Account, target.operator_id)
|
||||
tenant = db.session.get(Tenant, target.tenant_id)
|
||||
if account is None:
|
||||
raise MigrationDataError(f"Operator account not found: {target.operator_id}")
|
||||
if tenant is None:
|
||||
raise MigrationDataError(f"Target tenant not found: {target.tenant_id}")
|
||||
account.current_tenant = tenant
|
||||
|
||||
for workflow_data in package.workflows:
|
||||
app_id = self._optional_string(workflow_data.get("id"))
|
||||
if only_app_ids and app_id not in only_app_ids:
|
||||
continue
|
||||
if skip_app_ids and app_id in skip_app_ids:
|
||||
continue
|
||||
dsl_content = self._rewrite_workflow_dsl_provider_ids(
|
||||
self._required_string(workflow_data, "dsl", "workflow"),
|
||||
id_mapping,
|
||||
)
|
||||
existing_app = (
|
||||
self._find_existing_app(app_id, target.tenant_id)
|
||||
if options.id_strategy == IdStrategy.PRESERVE_ID
|
||||
else None
|
||||
)
|
||||
if existing_app is not None and options.conflict_strategy == ConflictStrategy.FAIL:
|
||||
raise MigrationDataError(f"App already exists and conflict_strategy=fail: {app_id}")
|
||||
if existing_app is not None and options.conflict_strategy == ConflictStrategy.SKIP:
|
||||
if app_id:
|
||||
self._record_id_mappings(
|
||||
id_mapping,
|
||||
id_mapping_details,
|
||||
ResourceType.WORKFLOW,
|
||||
workflow_data.get("name") if isinstance(workflow_data.get("name"), str) else None,
|
||||
{app_id},
|
||||
existing_app.id,
|
||||
)
|
||||
report_items.append(
|
||||
ResourceReportItem(ResourceType.WORKFLOW, str(app_id), workflow_data.get("name"), "skipped")
|
||||
)
|
||||
continue
|
||||
|
||||
imported_app_id = self._import_workflow_app(
|
||||
account=account,
|
||||
workflow_data=workflow_data,
|
||||
dsl_content=dsl_content,
|
||||
app_id=app_id,
|
||||
existing_app=existing_app,
|
||||
options=options,
|
||||
)
|
||||
if app_id:
|
||||
self._record_id_mappings(
|
||||
id_mapping,
|
||||
id_mapping_details,
|
||||
ResourceType.WORKFLOW,
|
||||
workflow_data.get("name") if isinstance(workflow_data.get("name"), str) else None,
|
||||
{app_id},
|
||||
imported_app_id,
|
||||
)
|
||||
if imported_workflow_ids is not None:
|
||||
imported_workflow_ids.add(app_id)
|
||||
if options.create_app_api_token_on_import:
|
||||
self._create_or_reuse_app_api_token(imported_app_id, target.tenant_id)
|
||||
report_items.append(
|
||||
ResourceReportItem(
|
||||
ResourceType.WORKFLOW,
|
||||
imported_app_id,
|
||||
workflow_data.get("name"),
|
||||
"updated" if existing_app is not None else "created",
|
||||
)
|
||||
)
|
||||
|
||||
def _workflow_tool_source_app_ids(self, package: MigrationPackage) -> set[str]:
|
||||
app_ids: set[str] = set()
|
||||
for workflow_tool_data in package.workflow_tools:
|
||||
app_id = self._optional_string(workflow_tool_data.get("app_id"))
|
||||
if app_id:
|
||||
app_ids.add(app_id)
|
||||
return app_ids
|
||||
|
||||
def _import_workflow_app(
|
||||
self,
|
||||
*,
|
||||
account: Account,
|
||||
workflow_data: dict[str, object],
|
||||
dsl_content: str,
|
||||
app_id: str | None,
|
||||
existing_app: App | None,
|
||||
options: ImportOptions,
|
||||
) -> str:
|
||||
import_service = AppDslService(cast(Session, db.session))
|
||||
if existing_app is not None:
|
||||
import_result = import_service.import_app(
|
||||
account=account,
|
||||
import_mode="yaml-content",
|
||||
yaml_content=dsl_content,
|
||||
app_id=existing_app.id,
|
||||
)
|
||||
else:
|
||||
import_app_id = app_id if self._should_preserve_source_app_id(options) else None
|
||||
import_result = import_service.import_app(
|
||||
account=account,
|
||||
import_mode="yaml-content",
|
||||
yaml_content=dsl_content,
|
||||
import_app_id=import_app_id,
|
||||
)
|
||||
if import_result.status not in {ImportStatus.COMPLETED, ImportStatus.COMPLETED_WITH_WARNINGS}:
|
||||
error = import_result.error or f"unexpected import status {import_result.status}"
|
||||
raise MigrationDataError(f"Workflow import failed: {error}")
|
||||
if import_result.app_id is None:
|
||||
raise MigrationDataError(f"Workflow import did not return an app id: {workflow_data.get('name')}")
|
||||
db.session.commit()
|
||||
return import_result.app_id
|
||||
|
||||
def _rewrite_workflow_dsl_provider_ids(self, dsl_content: str, id_mapping: dict[str, str]) -> str:
|
||||
if not id_mapping:
|
||||
return dsl_content
|
||||
parsed = yaml.safe_load(dsl_content) if dsl_content else {}
|
||||
if not isinstance(parsed, dict):
|
||||
return dsl_content
|
||||
for node in self._workflow_nodes(parsed):
|
||||
data = node.get("data") if isinstance(node, dict) else None
|
||||
if not isinstance(data, dict):
|
||||
continue
|
||||
self._rewrite_tool_config_provider_id(data, id_mapping)
|
||||
for tool_config in self._agent_tool_configs(data):
|
||||
self._rewrite_tool_config_provider_id(tool_config, id_mapping)
|
||||
return yaml.safe_dump(parsed, sort_keys=False, allow_unicode=True)
|
||||
|
||||
def _rewrite_tool_config_provider_id(self, tool_config: dict[str, Any], id_mapping: dict[str, str]) -> None:
|
||||
provider_id = self._optional_string(tool_config.get("provider_id"))
|
||||
if provider_id and provider_id in id_mapping:
|
||||
tool_config["provider_id"] = id_mapping[provider_id]
|
||||
|
||||
def _source_api_provider_ids_by_name(self, package: MigrationPackage) -> dict[str, set[str]]:
|
||||
provider_ids_by_name: dict[str, set[str]] = {}
|
||||
discovery_service = DependencyDiscoveryService()
|
||||
for workflow_data in package.workflows:
|
||||
dsl_content = self._optional_string(workflow_data.get("dsl"))
|
||||
if not dsl_content:
|
||||
continue
|
||||
parsed = yaml.safe_load(dsl_content) if dsl_content else {}
|
||||
if not isinstance(parsed, dict):
|
||||
continue
|
||||
for dependency in discovery_service.discover_from_dsl(parsed):
|
||||
if dependency.kind != DependencyKind.API_TOOL or not dependency.provider_name:
|
||||
continue
|
||||
provider_ids_by_name.setdefault(dependency.provider_name, set()).add(dependency.provider_id)
|
||||
return provider_ids_by_name
|
||||
|
||||
def _workflow_nodes(self, dsl: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
nodes: list[dict[str, Any]] = []
|
||||
graph = dsl.get("graph")
|
||||
if isinstance(graph, dict) and isinstance(graph.get("nodes"), list):
|
||||
nodes.extend(node for node in graph["nodes"] if isinstance(node, dict))
|
||||
workflow = dsl.get("workflow")
|
||||
workflow_graph = workflow.get("graph") if isinstance(workflow, dict) else None
|
||||
if isinstance(workflow_graph, dict) and isinstance(workflow_graph.get("nodes"), list):
|
||||
nodes.extend(node for node in workflow_graph["nodes"] if isinstance(node, dict))
|
||||
return nodes
|
||||
|
||||
def _agent_tool_configs(self, data: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
configs = data.get("tools")
|
||||
if isinstance(configs, list):
|
||||
return [config for config in configs if isinstance(config, dict)]
|
||||
agent_parameters = data.get("agent_parameters")
|
||||
if not isinstance(agent_parameters, dict):
|
||||
return []
|
||||
tools_parameter = agent_parameters.get("tools")
|
||||
if not isinstance(tools_parameter, dict):
|
||||
return []
|
||||
value = tools_parameter.get("value", [])
|
||||
if not isinstance(value, list):
|
||||
return []
|
||||
return [config for config in value if isinstance(config, dict)]
|
||||
|
||||
def _should_preserve_source_app_id(self, options: ImportOptions) -> bool:
|
||||
return options.id_strategy == IdStrategy.PRESERVE_ID
|
||||
|
||||
def _find_existing_app(self, app_id: str | None, tenant_id: str) -> App | None:
|
||||
if not self._is_uuid_string(app_id):
|
||||
return None
|
||||
return db.session.scalar(sa.select(App).where(App.id == app_id, App.tenant_id == tenant_id))
|
||||
|
||||
def _create_or_reuse_app_api_token(self, app_id: str, tenant_id: str) -> None:
|
||||
existing = db.session.scalar(
|
||||
sa.select(ApiToken).where(
|
||||
ApiToken.type == ApiTokenType.APP,
|
||||
ApiToken.app_id == app_id,
|
||||
ApiToken.tenant_id == tenant_id,
|
||||
)
|
||||
)
|
||||
if existing is not None:
|
||||
return
|
||||
api_token = ApiToken()
|
||||
api_token.app_id = app_id
|
||||
api_token.tenant_id = tenant_id
|
||||
api_token.token = ApiToken.generate_api_key("app", 24)
|
||||
api_token.type = ApiTokenType.APP
|
||||
db.session.add(api_token)
|
||||
db.session.commit()
|
||||
|
||||
def _import_api_tools(
|
||||
self,
|
||||
package: MigrationPackage,
|
||||
target: ImportTarget,
|
||||
options: ImportOptions,
|
||||
report_items: list[ResourceReportItem],
|
||||
id_mapping: dict[str, str],
|
||||
id_mapping_details: list[ResourceIdMapping],
|
||||
source_provider_ids_by_name: dict[str, set[str]],
|
||||
) -> None:
|
||||
for tool_data in package.tools:
|
||||
provider_name = self._required_string(tool_data, "provider_name", "api_tool")
|
||||
schema = self._required_string(tool_data, "schema", "api_tool")
|
||||
existing = db.session.scalar(
|
||||
sa.select(ApiToolProvider).where(
|
||||
ApiToolProvider.tenant_id == target.tenant_id,
|
||||
ApiToolProvider.name == provider_name,
|
||||
)
|
||||
)
|
||||
if existing is not None and options.conflict_strategy == ConflictStrategy.FAIL:
|
||||
raise MigrationDataError(f"API tool already exists and conflict_strategy=fail: {provider_name}")
|
||||
if existing is not None and options.conflict_strategy == ConflictStrategy.SKIP:
|
||||
self._record_id_mappings(
|
||||
id_mapping,
|
||||
id_mapping_details,
|
||||
ResourceType.API_TOOL,
|
||||
provider_name,
|
||||
self._api_tool_source_ids(provider_name, tool_data, source_provider_ids_by_name),
|
||||
existing.id,
|
||||
)
|
||||
report_items.append(ResourceReportItem(ResourceType.API_TOOL, provider_name, provider_name, "skipped"))
|
||||
continue
|
||||
|
||||
schema_info = ApiToolManageService.parser_api_schema(schema=schema)
|
||||
schema_type = cast(ApiProviderSchemaType, schema_info["schema_type"])
|
||||
credentials = (
|
||||
cast(dict[str, Any], tool_data.get("credentials"))
|
||||
if isinstance(tool_data.get("credentials"), dict)
|
||||
else {}
|
||||
)
|
||||
credentials = credentials or {"auth_type": "none"}
|
||||
raw_icon = tool_data.get("icon")
|
||||
icon = (
|
||||
cast(dict[str, Any], raw_icon)
|
||||
if isinstance(raw_icon, dict)
|
||||
else {"content": "tool", "background": "#FEF7C3"}
|
||||
)
|
||||
raw_labels = tool_data.get("labels")
|
||||
labels = [str(label) for label in raw_labels] if isinstance(raw_labels, list) else []
|
||||
if existing is not None:
|
||||
ApiToolManageService.update_api_tool_provider(
|
||||
user_id=target.operator_id,
|
||||
tenant_id=target.tenant_id,
|
||||
provider_name=provider_name,
|
||||
original_provider=existing.name,
|
||||
_schema_type=schema_type,
|
||||
schema=schema,
|
||||
privacy_policy=self._optional_string(tool_data.get("privacy_policy")) or "",
|
||||
credentials=credentials,
|
||||
custom_disclaimer=self._optional_string(tool_data.get("custom_disclaimer")) or "",
|
||||
labels=labels,
|
||||
icon=icon,
|
||||
)
|
||||
status = "updated"
|
||||
else:
|
||||
ApiToolManageService.create_api_tool_provider(
|
||||
user_id=target.operator_id,
|
||||
tenant_id=target.tenant_id,
|
||||
provider_name=provider_name,
|
||||
schema_type=schema_type,
|
||||
schema=schema,
|
||||
privacy_policy=self._optional_string(tool_data.get("privacy_policy")) or "",
|
||||
credentials=credentials,
|
||||
custom_disclaimer=self._optional_string(tool_data.get("custom_disclaimer")) or "",
|
||||
labels=labels,
|
||||
icon=icon,
|
||||
)
|
||||
status = "created"
|
||||
target_provider = self._find_api_tool_provider(target.tenant_id, provider_name)
|
||||
if target_provider is not None:
|
||||
self._record_id_mappings(
|
||||
id_mapping,
|
||||
id_mapping_details,
|
||||
ResourceType.API_TOOL,
|
||||
provider_name,
|
||||
self._api_tool_source_ids(provider_name, tool_data, source_provider_ids_by_name),
|
||||
target_provider.id,
|
||||
)
|
||||
report_items.append(ResourceReportItem(ResourceType.API_TOOL, provider_name, provider_name, status))
|
||||
|
||||
def _find_api_tool_provider(self, tenant_id: str, provider_name: str) -> ApiToolProvider | None:
|
||||
return db.session.scalar(
|
||||
sa.select(ApiToolProvider).where(
|
||||
ApiToolProvider.tenant_id == tenant_id,
|
||||
ApiToolProvider.name == provider_name,
|
||||
)
|
||||
)
|
||||
|
||||
def _api_tool_source_ids(
|
||||
self,
|
||||
provider_name: str,
|
||||
tool_data: dict[str, Any],
|
||||
source_provider_ids_by_name: dict[str, set[str]],
|
||||
) -> set[str]:
|
||||
source_ids = set(source_provider_ids_by_name.get(provider_name, set()))
|
||||
source_id = self._optional_string(tool_data.get("id"))
|
||||
if source_id:
|
||||
source_ids.add(source_id)
|
||||
return source_ids
|
||||
|
||||
def _record_id_mappings(
|
||||
self,
|
||||
id_mapping: dict[str, str],
|
||||
id_mapping_details: list[ResourceIdMapping],
|
||||
resource_type: ResourceType,
|
||||
name: str | None,
|
||||
source_ids: Iterable[str],
|
||||
target_id: str,
|
||||
) -> None:
|
||||
for source_id in source_ids:
|
||||
id_mapping[source_id] = target_id
|
||||
id_mapping_details[:] = [item for item in id_mapping_details if item.source_id != source_id]
|
||||
id_mapping_details.append(ResourceIdMapping(resource_type, name, source_id, target_id))
|
||||
|
||||
def _import_workflow_tools(
|
||||
self,
|
||||
package: MigrationPackage,
|
||||
target: ImportTarget,
|
||||
options: ImportOptions,
|
||||
id_mapping: dict[str, str],
|
||||
id_mapping_details: list[ResourceIdMapping],
|
||||
report_items: list[ResourceReportItem],
|
||||
) -> None:
|
||||
if not package.workflow_tools:
|
||||
return
|
||||
account = db.session.get(Account, target.operator_id)
|
||||
if account is None:
|
||||
raise MigrationDataError(f"Operator account not found: {target.operator_id}")
|
||||
for workflow_tool_data in package.workflow_tools:
|
||||
app_id = self._optional_string(workflow_tool_data.get("app_id"))
|
||||
resolved_app_id = id_mapping.get(app_id or "", app_id)
|
||||
if not resolved_app_id or self._find_existing_app(resolved_app_id, target.tenant_id) is None:
|
||||
report_items.append(
|
||||
ResourceReportItem(
|
||||
ResourceType.WORKFLOW_TOOL,
|
||||
str(workflow_tool_data.get("id", workflow_tool_data.get("name", ""))),
|
||||
self._optional_string(workflow_tool_data.get("name")),
|
||||
"unresolved",
|
||||
"Referenced workflow app was not found in the target tenant; workflow tool was skipped.",
|
||||
)
|
||||
)
|
||||
continue
|
||||
try:
|
||||
self._ensure_workflow_app_is_published(target, account, resolved_app_id)
|
||||
except Exception as exc:
|
||||
report_items.append(
|
||||
ResourceReportItem(
|
||||
ResourceType.WORKFLOW_TOOL,
|
||||
str(workflow_tool_data.get("id", workflow_tool_data.get("name", ""))),
|
||||
self._optional_string(workflow_tool_data.get("name")),
|
||||
"unresolved",
|
||||
f"Referenced workflow app could not be published: {exc}",
|
||||
)
|
||||
)
|
||||
continue
|
||||
workflow_tool_id = self._optional_string(workflow_tool_data.get("id"))
|
||||
tool_name = self._required_string(workflow_tool_data, "name", "workflow_tool")
|
||||
lookup_workflow_tool_id = workflow_tool_id if options.id_strategy == IdStrategy.PRESERVE_ID else None
|
||||
existing = self._find_existing_workflow_tool(
|
||||
target.tenant_id, lookup_workflow_tool_id, tool_name, resolved_app_id
|
||||
)
|
||||
if existing is not None and options.conflict_strategy == ConflictStrategy.FAIL:
|
||||
raise MigrationDataError(f"Workflow tool already exists and conflict_strategy=fail: {tool_name}")
|
||||
if existing is not None and options.conflict_strategy == ConflictStrategy.SKIP:
|
||||
if workflow_tool_id:
|
||||
self._record_id_mappings(
|
||||
id_mapping,
|
||||
id_mapping_details,
|
||||
ResourceType.WORKFLOW_TOOL,
|
||||
tool_name,
|
||||
{workflow_tool_id},
|
||||
existing.id,
|
||||
)
|
||||
report_items.append(ResourceReportItem(ResourceType.WORKFLOW_TOOL, existing.id, tool_name, "skipped"))
|
||||
continue
|
||||
raw_icon = workflow_tool_data.get("icon")
|
||||
icon = (
|
||||
cast(dict[str, Any], raw_icon)
|
||||
if isinstance(raw_icon, dict)
|
||||
else {"content": "🤖", "background": "#FFEAD5"}
|
||||
)
|
||||
raw_parameters = workflow_tool_data.get("parameters")
|
||||
parameters = [
|
||||
parameter
|
||||
if isinstance(parameter, WorkflowToolParameterConfiguration)
|
||||
else WorkflowToolParameterConfiguration(**parameter)
|
||||
for parameter in (raw_parameters if isinstance(raw_parameters, list) else [])
|
||||
if isinstance(parameter, dict | WorkflowToolParameterConfiguration)
|
||||
]
|
||||
raw_labels = workflow_tool_data.get("labels")
|
||||
labels = [str(label) for label in raw_labels] if isinstance(raw_labels, list) else []
|
||||
label = self._optional_string(workflow_tool_data.get("label")) or tool_name
|
||||
description = self._optional_string(workflow_tool_data.get("description")) or ""
|
||||
privacy_policy = self._optional_string(workflow_tool_data.get("privacy_policy")) or ""
|
||||
if existing is not None:
|
||||
WorkflowToolManageService.update_workflow_tool(
|
||||
user_id=account.id,
|
||||
tenant_id=target.tenant_id,
|
||||
workflow_tool_id=existing.id,
|
||||
name=tool_name,
|
||||
label=label,
|
||||
icon=icon,
|
||||
description=description,
|
||||
parameters=parameters,
|
||||
privacy_policy=privacy_policy,
|
||||
labels=labels,
|
||||
)
|
||||
status = "updated"
|
||||
identifier = existing.id
|
||||
else:
|
||||
import_id = workflow_tool_id if options.id_strategy == IdStrategy.PRESERVE_ID else ""
|
||||
WorkflowToolManageService.create_workflow_tool(
|
||||
user_id=account.id,
|
||||
tenant_id=target.tenant_id,
|
||||
workflow_app_id=resolved_app_id,
|
||||
name=tool_name,
|
||||
label=label,
|
||||
icon=icon,
|
||||
description=description,
|
||||
parameters=parameters,
|
||||
privacy_policy=privacy_policy,
|
||||
labels=labels,
|
||||
import_id=import_id or "",
|
||||
)
|
||||
status = "created"
|
||||
target_provider = self._find_existing_workflow_tool(
|
||||
target.tenant_id, import_id or None, tool_name, resolved_app_id
|
||||
)
|
||||
if target_provider is None:
|
||||
raise MigrationDataError(f"Workflow tool was not created: {tool_name}")
|
||||
identifier = target_provider.id
|
||||
if workflow_tool_id:
|
||||
self._record_id_mappings(
|
||||
id_mapping,
|
||||
id_mapping_details,
|
||||
ResourceType.WORKFLOW_TOOL,
|
||||
tool_name,
|
||||
{workflow_tool_id},
|
||||
identifier,
|
||||
)
|
||||
report_items.append(ResourceReportItem(ResourceType.WORKFLOW_TOOL, identifier, tool_name, status))
|
||||
|
||||
def _ensure_workflow_app_is_published(self, target: ImportTarget, account: Account, app_id: str) -> None:
|
||||
app = self._find_existing_app(app_id, target.tenant_id)
|
||||
if app is None:
|
||||
raise MigrationDataError(f"Referenced workflow app was not found in target tenant: {app_id}")
|
||||
if app.workflow_id:
|
||||
return
|
||||
workflow_service = WorkflowService()
|
||||
with sessionmaker(db.engine).begin() as session:
|
||||
app_in_session = session.get(App, app_id)
|
||||
account_in_session = session.get(Account, account.id)
|
||||
if app_in_session is None:
|
||||
raise MigrationDataError(f"Referenced workflow app was not found in target tenant: {app_id}")
|
||||
if account_in_session is None:
|
||||
raise MigrationDataError(f"Operator account not found: {account.id}")
|
||||
workflow = workflow_service.publish_workflow(
|
||||
session=session,
|
||||
app_model=app_in_session,
|
||||
account=account_in_session,
|
||||
marked_name="Migration import",
|
||||
marked_comment="Published automatically for workflow tool import.",
|
||||
)
|
||||
app_in_session.workflow_id = workflow.id
|
||||
app_in_session.updated_by = account.id
|
||||
app_in_session.updated_at = naive_utc_now()
|
||||
|
||||
def _import_mcp_tools(
|
||||
self,
|
||||
package: MigrationPackage,
|
||||
target: ImportTarget,
|
||||
options: ImportOptions,
|
||||
report_items: list[ResourceReportItem],
|
||||
id_mapping: dict[str, str],
|
||||
id_mapping_details: list[ResourceIdMapping],
|
||||
) -> None:
|
||||
for mcp_data in package.mcp_tools:
|
||||
name = self._required_string(mcp_data, "name", "mcp_tool")
|
||||
server_identifier = self._required_string(mcp_data, "server_identifier", "mcp_tool")
|
||||
provider_id = self._optional_string(mcp_data.get("id"))
|
||||
lookup_provider_id = provider_id if options.id_strategy == IdStrategy.PRESERVE_ID else None
|
||||
existing = self._find_existing_mcp_tool(target.tenant_id, lookup_provider_id, server_identifier)
|
||||
if existing is not None and options.conflict_strategy == ConflictStrategy.FAIL:
|
||||
raise MigrationDataError(f"MCP tool already exists and conflict_strategy=fail: {name}")
|
||||
if existing is not None and options.conflict_strategy == ConflictStrategy.SKIP:
|
||||
if provider_id:
|
||||
self._record_id_mappings(
|
||||
id_mapping,
|
||||
id_mapping_details,
|
||||
ResourceType.MCP_TOOL,
|
||||
name,
|
||||
{provider_id},
|
||||
existing.id,
|
||||
)
|
||||
report_items.append(ResourceReportItem(ResourceType.MCP_TOOL, existing.id, name, "skipped"))
|
||||
continue
|
||||
|
||||
service = MCPToolManageService(session=cast(Session, db.session))
|
||||
configuration = MCPConfiguration.model_validate(mcp_data.get("configuration") or {})
|
||||
authentication = (
|
||||
MCPAuthentication.model_validate(mcp_data["authentication"]) if mcp_data.get("authentication") else None
|
||||
)
|
||||
if existing is not None:
|
||||
service.update_provider(
|
||||
tenant_id=target.tenant_id,
|
||||
provider_id=existing.id,
|
||||
server_url=self._required_string(mcp_data, "server_url", "mcp_tool"),
|
||||
name=name,
|
||||
icon=self._optional_string(mcp_data.get("icon")) or "",
|
||||
icon_type=self._optional_string(mcp_data.get("icon_type")) or "emoji",
|
||||
icon_background=self._optional_string(mcp_data.get("icon_background")) or "",
|
||||
server_identifier=server_identifier,
|
||||
headers=mcp_data.get("headers") if isinstance(mcp_data.get("headers"), dict) else {},
|
||||
configuration=configuration,
|
||||
authentication=authentication,
|
||||
)
|
||||
db.session.commit()
|
||||
status = "updated"
|
||||
identifier = existing.id
|
||||
provider = existing
|
||||
else:
|
||||
service.create_provider(
|
||||
tenant_id=target.tenant_id,
|
||||
user_id=target.operator_id,
|
||||
server_url=self._required_string(mcp_data, "server_url", "mcp_tool"),
|
||||
name=name,
|
||||
icon=self._optional_string(mcp_data.get("icon")) or "",
|
||||
icon_type=self._optional_string(mcp_data.get("icon_type")) or "emoji",
|
||||
icon_background=self._optional_string(mcp_data.get("icon_background")) or "",
|
||||
server_identifier=server_identifier,
|
||||
headers=mcp_data.get("headers") if isinstance(mcp_data.get("headers"), dict) else {},
|
||||
configuration=configuration,
|
||||
authentication=authentication,
|
||||
)
|
||||
created_provider = self._find_existing_mcp_tool(target.tenant_id, lookup_provider_id, server_identifier)
|
||||
if created_provider is None:
|
||||
raise MigrationDataError(f"MCP provider was not created: {name}")
|
||||
status = "created"
|
||||
provider = created_provider
|
||||
identifier = provider.id
|
||||
self._restore_mcp_provider_tools(provider, mcp_data)
|
||||
db.session.commit()
|
||||
if provider_id:
|
||||
self._record_id_mappings(
|
||||
id_mapping,
|
||||
id_mapping_details,
|
||||
ResourceType.MCP_TOOL,
|
||||
name,
|
||||
{provider_id},
|
||||
identifier,
|
||||
)
|
||||
report_items.append(ResourceReportItem(ResourceType.MCP_TOOL, identifier, name, status))
|
||||
|
||||
def _restore_mcp_provider_tools(self, provider: MCPToolProvider, mcp_data: dict[str, object]) -> None:
|
||||
tools = mcp_data.get("tools")
|
||||
if not isinstance(tools, list):
|
||||
return
|
||||
provider.tools = json.dumps(tools)
|
||||
provider.authed = True
|
||||
|
||||
def _find_existing_mcp_tool(
|
||||
self, tenant_id: str, provider_id: str | None, server_identifier: str
|
||||
) -> MCPToolProvider | None:
|
||||
predicates = [MCPToolProvider.server_identifier == server_identifier]
|
||||
if self._is_uuid_string(provider_id):
|
||||
predicates.append(MCPToolProvider.id == provider_id)
|
||||
return db.session.scalar(
|
||||
sa.select(MCPToolProvider).where(MCPToolProvider.tenant_id == tenant_id, or_(*predicates)).limit(1)
|
||||
)
|
||||
|
||||
def _is_uuid_string(self, value: str | None) -> bool:
|
||||
if not value:
|
||||
return False
|
||||
try:
|
||||
UUID(value)
|
||||
except ValueError:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _find_existing_workflow_tool(
|
||||
self, tenant_id: str, workflow_tool_id: str | None, tool_name: str, app_id: str
|
||||
) -> WorkflowToolProvider | None:
|
||||
predicates = [WorkflowToolProvider.name == tool_name, WorkflowToolProvider.app_id == app_id]
|
||||
if self._is_uuid_string(workflow_tool_id):
|
||||
predicates.append(WorkflowToolProvider.id == workflow_tool_id)
|
||||
return db.session.scalar(
|
||||
sa.select(WorkflowToolProvider)
|
||||
.where(WorkflowToolProvider.tenant_id == tenant_id, or_(*predicates))
|
||||
.limit(1)
|
||||
)
|
||||
|
||||
def _preflight_dependency_only_mcp(
|
||||
self, package: MigrationPackage, target: ImportTarget, report_items: list[ResourceReportItem]
|
||||
) -> None:
|
||||
for dependency in package.dependencies:
|
||||
if dependency.get("kind") != DependencyKind.MCP_TOOL.value:
|
||||
continue
|
||||
provider_id = str(dependency.get("provider_id", dependency.get("id", "")))
|
||||
provider_name = self._optional_string(dependency.get("provider_name") or dependency.get("name"))
|
||||
existing = self._find_dependency_only_mcp_provider(target.tenant_id, provider_id, provider_name)
|
||||
report_name = f"mcp_tool {provider_name or getattr(existing, 'name', None) or provider_id}"
|
||||
if existing is not None:
|
||||
report_items.append(
|
||||
ResourceReportItem(
|
||||
ResourceType.DEPENDENCY,
|
||||
provider_id,
|
||||
report_name,
|
||||
"available",
|
||||
"MCP provider exists in target tenant.",
|
||||
)
|
||||
)
|
||||
continue
|
||||
reference_summary = self._dependency_only_mcp_reference_summary(package, provider_id, provider_name)
|
||||
message = "missing in target tenant"
|
||||
if reference_summary:
|
||||
message = f"{message}; referenced by {reference_summary}"
|
||||
message = f"{message}; configure it manually before running the workflow."
|
||||
report_items.append(
|
||||
ResourceReportItem(
|
||||
ResourceType.DEPENDENCY,
|
||||
provider_id,
|
||||
report_name,
|
||||
"skipped",
|
||||
message,
|
||||
)
|
||||
)
|
||||
|
||||
def _find_dependency_only_mcp_provider(
|
||||
self, tenant_id: str, provider_id: str, provider_name: str | None
|
||||
) -> MCPToolProvider | None:
|
||||
predicates = [MCPToolProvider.server_identifier == provider_id]
|
||||
if self._is_uuid_string(provider_id):
|
||||
predicates.append(MCPToolProvider.id == provider_id)
|
||||
return db.session.scalar(
|
||||
sa.select(MCPToolProvider).where(MCPToolProvider.tenant_id == tenant_id, or_(*predicates)).limit(1)
|
||||
)
|
||||
|
||||
def _dependency_only_mcp_reference_summary(
|
||||
self, package: MigrationPackage, provider_id: str, provider_name: str | None
|
||||
) -> str:
|
||||
references = self._dependency_only_mcp_references(package, provider_id, provider_name)
|
||||
return "; ".join(references)
|
||||
|
||||
def _dependency_only_mcp_references(
|
||||
self, package: MigrationPackage, provider_id: str, provider_name: str | None
|
||||
) -> list[str]:
|
||||
references: list[str] = []
|
||||
seen: set[str] = set()
|
||||
for workflow_data in package.workflows:
|
||||
workflow_name = self._optional_string(workflow_data.get("name"))
|
||||
workflow_id = self._optional_string(workflow_data.get("id"))
|
||||
workflow_label = workflow_name or workflow_id or "unknown workflow"
|
||||
dsl_content = self._optional_string(workflow_data.get("dsl"))
|
||||
if not dsl_content:
|
||||
continue
|
||||
parsed = yaml.safe_load(dsl_content) if dsl_content else {}
|
||||
if not isinstance(parsed, dict):
|
||||
continue
|
||||
for node in self._workflow_nodes(parsed):
|
||||
data = node.get("data") if isinstance(node, dict) else None
|
||||
if not isinstance(data, dict):
|
||||
continue
|
||||
for tool_config in [data, *self._agent_tool_configs(data)]:
|
||||
if not self._is_mcp_dependency_reference(tool_config, provider_id, provider_name):
|
||||
continue
|
||||
tool_label = self._mcp_tool_reference_label(node, tool_config)
|
||||
reference = f"{workflow_label} / {tool_label}"
|
||||
if reference not in seen:
|
||||
seen.add(reference)
|
||||
references.append(reference)
|
||||
return references
|
||||
|
||||
def _is_mcp_dependency_reference(
|
||||
self, tool_config: dict[str, Any], provider_id: str, provider_name: str | None
|
||||
) -> bool:
|
||||
provider_type = str(tool_config.get("provider_type") or tool_config.get("type") or "").lower()
|
||||
if provider_type != "mcp":
|
||||
return False
|
||||
config_provider_id = self._optional_string(
|
||||
tool_config.get("provider_id") or tool_config.get("provider_name") or tool_config.get("provider")
|
||||
)
|
||||
if config_provider_id == provider_id:
|
||||
return True
|
||||
return bool(provider_name and config_provider_id == provider_name)
|
||||
|
||||
def _mcp_tool_reference_label(self, node: dict[str, Any], tool_config: dict[str, Any]) -> str:
|
||||
for key in ("tool_name", "tool", "name"):
|
||||
value = self._optional_string(tool_config.get(key))
|
||||
if value:
|
||||
return value
|
||||
node_id = self._optional_string(node.get("id"))
|
||||
return node_id or "unknown tool"
|
||||
|
||||
def _required_string(self, value: dict[str, object], field_name: str, resource_name: str) -> str:
|
||||
field_value = value.get(field_name)
|
||||
if not isinstance(field_value, str) or not field_value:
|
||||
raise MigrationDataError(f"Missing required {resource_name} field: {field_name}")
|
||||
return field_value
|
||||
|
||||
def _optional_string(self, value: object) -> str | None:
|
||||
if isinstance(value, str) and value:
|
||||
return value
|
||||
return None
|
||||
71
api/services/data_migration/package_service.py
Normal file
71
api/services/data_migration/package_service.py
Normal file
@ -0,0 +1,71 @@
|
||||
"""JSON persistence for versioned cross-environment migration packages.
|
||||
|
||||
The package service validates file shape and serializes only structured package
|
||||
entities. It does not perform CLI rendering or database access, keeping it safe
|
||||
to reuse from Click adapters, tests, and future import/export services.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import asdict
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from services.data_migration.entities import (
|
||||
ImportOptions,
|
||||
MigrationDataError,
|
||||
MigrationMetadata,
|
||||
MigrationPackage,
|
||||
SourceTenant,
|
||||
TargetTenantSelector,
|
||||
)
|
||||
|
||||
PACKAGE_VERSION = "1"
|
||||
|
||||
|
||||
class MigrationPackageService:
|
||||
def load_package(self, path: str | Path) -> MigrationPackage:
|
||||
package_path = Path(path)
|
||||
with package_path.open(encoding="utf-8") as file:
|
||||
raw = json.load(file)
|
||||
if not isinstance(raw, dict):
|
||||
raise MigrationDataError("Migration package JSON must be an object.")
|
||||
package = MigrationPackage.from_mapping(raw)
|
||||
if package.metadata.version != PACKAGE_VERSION:
|
||||
raise MigrationDataError(f"Unsupported migration package version: {package.metadata.version}")
|
||||
return package
|
||||
|
||||
def save_package(self, package: MigrationPackage, path: str | Path, *, overwrite: bool) -> None:
|
||||
package_path = Path(path)
|
||||
if package_path.exists() and not overwrite:
|
||||
raise MigrationDataError(f"Output file already exists: {package_path}")
|
||||
package_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with package_path.open("w", encoding="utf-8") as file:
|
||||
json.dump(self.to_mapping(package), file, ensure_ascii=False, indent=2)
|
||||
file.write("\n")
|
||||
|
||||
def build_empty_package(
|
||||
self,
|
||||
*,
|
||||
source_tenant_id: str,
|
||||
source_tenant_name: str,
|
||||
include_secrets: bool,
|
||||
import_options: ImportOptions | None = None,
|
||||
target_tenant: TargetTenantSelector | None = None,
|
||||
) -> MigrationPackage:
|
||||
return MigrationPackage(
|
||||
metadata=MigrationMetadata(
|
||||
version=PACKAGE_VERSION,
|
||||
source_scope="single",
|
||||
source_tenants=[SourceTenant(id=source_tenant_id, name=source_tenant_name)],
|
||||
target_tenant=target_tenant,
|
||||
created_at=datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z"),
|
||||
include_secrets=include_secrets,
|
||||
import_options=import_options or ImportOptions(),
|
||||
)
|
||||
)
|
||||
|
||||
def to_mapping(self, package: MigrationPackage) -> dict[str, Any]:
|
||||
return asdict(package)
|
||||
76
api/services/data_migration/report_service.py
Normal file
76
api/services/data_migration/report_service.py
Normal file
@ -0,0 +1,76 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import Counter
|
||||
|
||||
from services.data_migration.entities import ReportContext, ResourceIdMapping, ResourceReportItem
|
||||
|
||||
|
||||
class MigrationReportService:
|
||||
"""Render structured migration resource results into CLI-friendly summary lines."""
|
||||
|
||||
def render(self, items: list[ResourceReportItem], *, context: ReportContext | None = None) -> list[str]:
|
||||
counts = Counter((item.resource_type.value, item.status) for item in items)
|
||||
lines = self._render_context(context)
|
||||
lines.extend(
|
||||
[f"{resource_type} {status}: {count}" for (resource_type, status), count in sorted(counts.items())]
|
||||
)
|
||||
actionable_items = [
|
||||
item for item in items if item.status in {"dependency-only", "skipped", "unresolved"} and item.message
|
||||
]
|
||||
for item in actionable_items:
|
||||
lines.append(self._render_actionable_detail(item))
|
||||
return lines
|
||||
|
||||
def _render_context(self, context: ReportContext | None) -> list[str]:
|
||||
if context is None:
|
||||
return []
|
||||
lines: list[str] = []
|
||||
if context.output_path:
|
||||
lines.append(f"output: {context.output_path}")
|
||||
if context.source_scope:
|
||||
lines.append(f"source scope: {context.source_scope}")
|
||||
if context.selected_app_count is not None:
|
||||
lines.append(f"selected apps: {context.selected_app_count}")
|
||||
if context.include_secrets is not None:
|
||||
lines.append(f"include secrets: {str(context.include_secrets).lower()}")
|
||||
if context.target_tenant:
|
||||
lines.append(f"target tenant: {context.target_tenant}")
|
||||
if context.operator_email:
|
||||
lines.append(f"operator: {context.operator_email}")
|
||||
if context.app_api_tokens_created or context.app_api_tokens_reused:
|
||||
lines.append(
|
||||
f"app api tokens: {context.app_api_tokens_created} created, {context.app_api_tokens_reused} reused"
|
||||
)
|
||||
if context.id_mappings:
|
||||
lines.append(f"resource references resolved: {len(context.id_mappings)}")
|
||||
if context.id_mapping_details:
|
||||
lines.extend(
|
||||
self._render_id_mapping_detail(item)
|
||||
for item in sorted(
|
||||
context.id_mapping_details,
|
||||
key=lambda item: (item.resource_type.value, item.name or "", item.source_id),
|
||||
)
|
||||
)
|
||||
else:
|
||||
lines.extend(
|
||||
f"- {source_id} -> {target_id}" for source_id, target_id in sorted(context.id_mappings.items())
|
||||
)
|
||||
elif context.id_mapping_count:
|
||||
lines.append(f"resource references resolved: {context.id_mapping_count}")
|
||||
return lines
|
||||
|
||||
def _render_id_mapping_detail(self, item: ResourceIdMapping) -> str:
|
||||
label = item.resource_type.value
|
||||
if item.name:
|
||||
label = f"{label} {item.name}"
|
||||
return f"- {label}: {item.source_id} -> {item.target_id}"
|
||||
|
||||
def _render_actionable_detail(self, item: ResourceReportItem) -> str:
|
||||
if item.resource_type.value == "dependency" and item.name and self._has_dependency_type_prefix(item.name):
|
||||
if item.identifier and item.identifier not in item.name:
|
||||
return f"dependency {item.name}: {item.identifier}: {item.message}"
|
||||
return f"dependency {item.name}: {item.message}"
|
||||
return f"{item.resource_type.value} {item.identifier}: {item.message}"
|
||||
|
||||
def _has_dependency_type_prefix(self, name: str) -> bool:
|
||||
return name.startswith(("workflow ", "api_tool ", "workflow_tool ", "mcp_tool ", "builtin_or_plugin_tool "))
|
||||
@ -41,6 +41,7 @@ class WorkflowToolManageService:
|
||||
parameters: list[WorkflowToolParameterConfiguration],
|
||||
privacy_policy: str = "",
|
||||
labels: list[str] | None = None,
|
||||
import_id: str = "",
|
||||
):
|
||||
# check if the name is unique
|
||||
existing_workflow_tool_provider: WorkflowToolProvider | None = None
|
||||
@ -92,7 +93,8 @@ class WorkflowToolManageService:
|
||||
privacy_policy=privacy_policy,
|
||||
version=workflow.version,
|
||||
)
|
||||
|
||||
if import_id:
|
||||
workflow_tool_provider.id = import_id
|
||||
try:
|
||||
WorkflowToolProviderController.from_db(workflow_tool_provider)
|
||||
except Exception as e:
|
||||
|
||||
@ -0,0 +1,298 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import patch
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
from werkzeug.exceptions import HTTPException
|
||||
|
||||
import services
|
||||
from controllers.console.auth.error import MemberNotInTenantError
|
||||
from controllers.console.workspace import members as members_module
|
||||
from controllers.console.workspace.members import MemberCancelInviteApi, MemberUpdateRoleApi, OwnerTransfer
|
||||
from models.account import Account, Tenant, TenantAccountJoin, TenantAccountRole, TenantStatus
|
||||
|
||||
|
||||
def unwrap(func):
|
||||
while hasattr(func, "__wrapped__"):
|
||||
func = func.__wrapped__
|
||||
return func
|
||||
|
||||
|
||||
class WorkspaceMembersIntegrationFactory:
|
||||
@staticmethod
|
||||
def create_tenant(db_session_with_containers) -> Tenant:
|
||||
tenant = Tenant(name=f"Tenant {uuid4()}", plan="basic", status=TenantStatus.NORMAL)
|
||||
db_session_with_containers.add(tenant)
|
||||
db_session_with_containers.commit()
|
||||
return tenant
|
||||
|
||||
@staticmethod
|
||||
def create_account(
|
||||
db_session_with_containers,
|
||||
*,
|
||||
email_prefix: str,
|
||||
tenant: Tenant | None = None,
|
||||
role: TenantAccountRole = TenantAccountRole.NORMAL,
|
||||
current: bool = False,
|
||||
) -> Account:
|
||||
account = Account(
|
||||
name=f"Account {uuid4()}",
|
||||
email=f"{email_prefix}-{uuid4()}@example.com",
|
||||
password="hashed-password",
|
||||
password_salt="salt",
|
||||
interface_language="en-US",
|
||||
timezone="UTC",
|
||||
)
|
||||
db_session_with_containers.add(account)
|
||||
db_session_with_containers.commit()
|
||||
|
||||
if tenant is not None:
|
||||
join = TenantAccountJoin(
|
||||
tenant_id=tenant.id,
|
||||
account_id=account.id,
|
||||
role=role,
|
||||
current=current,
|
||||
)
|
||||
db_session_with_containers.add(join)
|
||||
db_session_with_containers.commit()
|
||||
account.current_tenant = tenant
|
||||
return account
|
||||
|
||||
@staticmethod
|
||||
def create_owner_workspace(db_session_with_containers) -> tuple[Tenant, Account]:
|
||||
tenant = WorkspaceMembersIntegrationFactory.create_tenant(db_session_with_containers)
|
||||
owner = WorkspaceMembersIntegrationFactory.create_account(
|
||||
db_session_with_containers,
|
||||
email_prefix="owner",
|
||||
tenant=tenant,
|
||||
role=TenantAccountRole.OWNER,
|
||||
current=True,
|
||||
)
|
||||
return tenant, owner
|
||||
|
||||
@staticmethod
|
||||
def create_owner_transfer_token(account: Account) -> str:
|
||||
_, token = members_module.AccountService.generate_owner_transfer_token(
|
||||
account.email,
|
||||
account=account,
|
||||
code="123456",
|
||||
additional_data={},
|
||||
)
|
||||
return token
|
||||
|
||||
@staticmethod
|
||||
def get_join(db_session_with_containers, *, tenant: Tenant, account: Account) -> TenantAccountJoin:
|
||||
tenant_id = tenant.id
|
||||
account_id = account.id
|
||||
db_session_with_containers.expire_all()
|
||||
join = (
|
||||
db_session_with_containers.query(TenantAccountJoin)
|
||||
.filter_by(tenant_id=tenant_id, account_id=account_id)
|
||||
.one()
|
||||
)
|
||||
return join
|
||||
|
||||
|
||||
class TestMemberCancelInviteApiWithContainers:
|
||||
def test_cancel_success(self, flask_app_with_containers, db_session_with_containers):
|
||||
api = MemberCancelInviteApi()
|
||||
method = unwrap(api.delete)
|
||||
factory = WorkspaceMembersIntegrationFactory
|
||||
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
|
||||
member = factory.create_account(db_session_with_containers, email_prefix="member")
|
||||
|
||||
with (
|
||||
flask_app_with_containers.test_request_context("/"),
|
||||
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
|
||||
patch.object(members_module.TenantService, "remove_member_from_tenant") as mock_remove_member,
|
||||
):
|
||||
result, status = method(api, member.id)
|
||||
|
||||
assert status == 200
|
||||
assert result["result"] == "success"
|
||||
mock_remove_member.assert_called_once()
|
||||
called_tenant, called_member, called_current_user = mock_remove_member.call_args.args
|
||||
assert called_tenant.id == tenant.id
|
||||
assert called_member.id == member.id
|
||||
assert called_current_user.id == current_user.id
|
||||
|
||||
def test_cancel_not_found(self, flask_app_with_containers, db_session_with_containers):
|
||||
api = MemberCancelInviteApi()
|
||||
method = unwrap(api.delete)
|
||||
factory = WorkspaceMembersIntegrationFactory
|
||||
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
|
||||
|
||||
with (
|
||||
flask_app_with_containers.test_request_context("/"),
|
||||
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
|
||||
):
|
||||
with pytest.raises(HTTPException):
|
||||
method(api, str(uuid4()))
|
||||
|
||||
def test_cancel_cannot_operate_self(self, flask_app_with_containers, db_session_with_containers):
|
||||
api = MemberCancelInviteApi()
|
||||
method = unwrap(api.delete)
|
||||
factory = WorkspaceMembersIntegrationFactory
|
||||
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
|
||||
member = factory.create_account(db_session_with_containers, email_prefix="member")
|
||||
|
||||
with (
|
||||
flask_app_with_containers.test_request_context("/"),
|
||||
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
|
||||
patch.object(
|
||||
members_module.TenantService,
|
||||
"remove_member_from_tenant",
|
||||
side_effect=services.errors.account.CannotOperateSelfError("x"),
|
||||
),
|
||||
):
|
||||
result, status = method(api, member.id)
|
||||
|
||||
assert status == 400
|
||||
assert result["code"] == "cannot-operate-self"
|
||||
|
||||
def test_cancel_no_permission(self, flask_app_with_containers, db_session_with_containers):
|
||||
api = MemberCancelInviteApi()
|
||||
method = unwrap(api.delete)
|
||||
factory = WorkspaceMembersIntegrationFactory
|
||||
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
|
||||
member = factory.create_account(db_session_with_containers, email_prefix="member")
|
||||
|
||||
with (
|
||||
flask_app_with_containers.test_request_context("/"),
|
||||
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
|
||||
patch.object(
|
||||
members_module.TenantService,
|
||||
"remove_member_from_tenant",
|
||||
side_effect=services.errors.account.NoPermissionError("x"),
|
||||
),
|
||||
):
|
||||
result, status = method(api, member.id)
|
||||
|
||||
assert status == 403
|
||||
assert result["code"] == "forbidden"
|
||||
|
||||
def test_cancel_member_not_in_tenant(self, flask_app_with_containers, db_session_with_containers):
|
||||
api = MemberCancelInviteApi()
|
||||
method = unwrap(api.delete)
|
||||
factory = WorkspaceMembersIntegrationFactory
|
||||
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
|
||||
member = factory.create_account(db_session_with_containers, email_prefix="member")
|
||||
|
||||
with (
|
||||
flask_app_with_containers.test_request_context("/"),
|
||||
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
|
||||
patch.object(
|
||||
members_module.TenantService,
|
||||
"remove_member_from_tenant",
|
||||
side_effect=services.errors.account.MemberNotInTenantError(),
|
||||
),
|
||||
):
|
||||
result, status = method(api, member.id)
|
||||
|
||||
assert status == 404
|
||||
assert result["code"] == "member-not-found"
|
||||
|
||||
|
||||
class TestMemberUpdateRoleApiWithContainers:
|
||||
def test_update_success(self, flask_app_with_containers, db_session_with_containers):
|
||||
api = MemberUpdateRoleApi()
|
||||
method = unwrap(api.put)
|
||||
factory = WorkspaceMembersIntegrationFactory
|
||||
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
|
||||
member = factory.create_account(
|
||||
db_session_with_containers,
|
||||
email_prefix="member",
|
||||
tenant=tenant,
|
||||
role=TenantAccountRole.EDITOR,
|
||||
)
|
||||
|
||||
with (
|
||||
flask_app_with_containers.test_request_context("/", json={"role": "normal"}),
|
||||
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
|
||||
):
|
||||
result = method(api, member.id)
|
||||
|
||||
if isinstance(result, tuple):
|
||||
result = result[0]
|
||||
|
||||
assert result["result"] == "success"
|
||||
assert (
|
||||
factory.get_join(db_session_with_containers, tenant=tenant, account=member).role == TenantAccountRole.NORMAL
|
||||
)
|
||||
|
||||
def test_update_member_not_found(self, flask_app_with_containers, db_session_with_containers):
|
||||
api = MemberUpdateRoleApi()
|
||||
method = unwrap(api.put)
|
||||
factory = WorkspaceMembersIntegrationFactory
|
||||
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
|
||||
|
||||
with (
|
||||
flask_app_with_containers.test_request_context("/", json={"role": "normal"}),
|
||||
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
|
||||
):
|
||||
with pytest.raises(HTTPException):
|
||||
method(api, str(uuid4()))
|
||||
|
||||
|
||||
class TestOwnerTransferApiWithContainers:
|
||||
def test_member_not_in_tenant(self, flask_app_with_containers, db_session_with_containers):
|
||||
api = OwnerTransfer()
|
||||
method = unwrap(api.post)
|
||||
factory = WorkspaceMembersIntegrationFactory
|
||||
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
|
||||
member = factory.create_account(db_session_with_containers, email_prefix="member")
|
||||
token = factory.create_owner_transfer_token(current_user)
|
||||
|
||||
with (
|
||||
flask_app_with_containers.test_request_context("/", json={"token": token}),
|
||||
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
|
||||
):
|
||||
with pytest.raises(MemberNotInTenantError):
|
||||
method(api, member.id)
|
||||
|
||||
def test_member_not_found(self, flask_app_with_containers, db_session_with_containers):
|
||||
api = OwnerTransfer()
|
||||
method = unwrap(api.post)
|
||||
factory = WorkspaceMembersIntegrationFactory
|
||||
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
|
||||
token = factory.create_owner_transfer_token(current_user)
|
||||
|
||||
with (
|
||||
flask_app_with_containers.test_request_context("/", json={"token": token}),
|
||||
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
|
||||
):
|
||||
with pytest.raises(HTTPException):
|
||||
method(api, str(uuid4()))
|
||||
|
||||
def test_transfer_success(self, flask_app_with_containers, db_session_with_containers):
|
||||
api = OwnerTransfer()
|
||||
method = unwrap(api.post)
|
||||
factory = WorkspaceMembersIntegrationFactory
|
||||
tenant, current_user = factory.create_owner_workspace(db_session_with_containers)
|
||||
member = factory.create_account(
|
||||
db_session_with_containers,
|
||||
email_prefix="member",
|
||||
tenant=tenant,
|
||||
role=TenantAccountRole.NORMAL,
|
||||
)
|
||||
token = factory.create_owner_transfer_token(current_user)
|
||||
|
||||
with (
|
||||
flask_app_with_containers.test_request_context("/", json={"token": token}),
|
||||
patch.object(members_module, "current_account_with_tenant", return_value=(current_user, tenant.id)),
|
||||
patch.object(members_module.AccountService, "send_new_owner_transfer_notify_email") as mock_new_owner_email,
|
||||
patch.object(members_module.AccountService, "send_old_owner_transfer_notify_email") as mock_old_owner_email,
|
||||
):
|
||||
result = method(api, member.id)
|
||||
|
||||
assert result["result"] == "success"
|
||||
assert (
|
||||
factory.get_join(db_session_with_containers, tenant=tenant, account=member).role == TenantAccountRole.OWNER
|
||||
)
|
||||
assert (
|
||||
factory.get_join(db_session_with_containers, tenant=tenant, account=current_user).role
|
||||
== TenantAccountRole.ADMIN
|
||||
)
|
||||
mock_new_owner_email.assert_called_once()
|
||||
mock_old_owner_email.assert_called_once()
|
||||
@ -0,0 +1,71 @@
|
||||
import json
|
||||
|
||||
from click.testing import CliRunner
|
||||
|
||||
from commands.data_migration import (
|
||||
ID_STRATEGY_CHOICES,
|
||||
export_migration_data,
|
||||
export_migration_data_template,
|
||||
import_migration_data,
|
||||
)
|
||||
|
||||
|
||||
def test_export_command_requires_input_and_output():
|
||||
result = CliRunner().invoke(export_migration_data, [])
|
||||
|
||||
assert result.exit_code != 0
|
||||
assert export_migration_data.name == "export-app-migration"
|
||||
assert "--input" in result.output
|
||||
assert "--output" in result.output
|
||||
|
||||
|
||||
def test_import_command_requires_input_and_target_tenant_or_package_metadata():
|
||||
result = CliRunner().invoke(import_migration_data, [])
|
||||
|
||||
assert result.exit_code != 0
|
||||
assert import_migration_data.name == "import-app-migration"
|
||||
assert "--input" in result.output
|
||||
|
||||
|
||||
def test_import_command_does_not_expose_unimplemented_map_id_strategy():
|
||||
assert ID_STRATEGY_CHOICES == ["preserve-id", "generate-new-id"]
|
||||
|
||||
|
||||
def test_export_template_command_prints_scripted_json_template():
|
||||
result = CliRunner().invoke(export_migration_data_template, [])
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert export_migration_data_template.name == "app-migration-template"
|
||||
template = json.loads(result.output)
|
||||
assert template == {
|
||||
"source_tenant": {"mode": "single", "id": "", "name": "admin's Workspace"},
|
||||
"apps": {"modes": ["workflow", "advanced-chat"], "ids": [], "all": True},
|
||||
"include_referenced_tools": True,
|
||||
"additional_tools": {"api_tools": [], "workflow_tools": [], "mcp_tools": []},
|
||||
"include_secrets": False,
|
||||
"import_options": {
|
||||
"create_app_api_token_on_import": False,
|
||||
"id_strategy": "preserve-id",
|
||||
"conflict_strategy": "fail",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def test_export_template_command_writes_output_file(tmp_path):
|
||||
output_file = tmp_path / "export-template.json"
|
||||
|
||||
result = CliRunner().invoke(export_migration_data_template, ["--output", str(output_file)])
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert f"Output written to {output_file}" in result.output
|
||||
assert json.loads(output_file.read_text())["apps"]["all"] is True
|
||||
|
||||
|
||||
def test_export_template_command_requires_overwrite_for_existing_output(tmp_path):
|
||||
output_file = tmp_path / "export-template.json"
|
||||
output_file.write_text("{}")
|
||||
|
||||
result = CliRunner().invoke(export_migration_data_template, ["--output", str(output_file)])
|
||||
|
||||
assert result.exit_code != 0
|
||||
assert "already exists" in result.output
|
||||
384
api/tests/unit_tests/commands/test_data_migration_wizard.py
Normal file
384
api/tests/unit_tests/commands/test_data_migration_wizard.py
Normal file
@ -0,0 +1,384 @@
|
||||
from commands.data_migration import (
|
||||
CONFLICT_STRATEGY_CHOICES,
|
||||
ID_STRATEGY_CHOICES,
|
||||
_confirm_wizard_summary,
|
||||
_print_auto_tools,
|
||||
_print_final_tool_selection,
|
||||
_print_wizard_step,
|
||||
_prompt_additional_tools,
|
||||
_prompt_output_file,
|
||||
_prompt_tool_category,
|
||||
_resolve_mcp_tool_names,
|
||||
migration_data_wizard,
|
||||
parse_index_selection,
|
||||
)
|
||||
|
||||
|
||||
def test_parse_index_selection_supports_all():
|
||||
assert parse_index_selection("all", ["a", "b", "c"]) == ["a", "b", "c"]
|
||||
|
||||
|
||||
def test_wizard_command_uses_app_migration_name():
|
||||
assert migration_data_wizard.name == "app-migration-wizard"
|
||||
|
||||
|
||||
def test_parse_index_selection_supports_comma_indexes():
|
||||
assert parse_index_selection("1, 3", ["a", "b", "c"]) == ["a", "c"]
|
||||
|
||||
|
||||
def test_print_wizard_step_adds_separator(monkeypatch):
|
||||
output_lines = []
|
||||
|
||||
monkeypatch.setattr("commands.data_migration.click.echo", output_lines.append)
|
||||
|
||||
_print_wizard_step("App Selection")
|
||||
|
||||
assert output_lines == ["", "==== App Selection ===="]
|
||||
|
||||
|
||||
def test_conflict_strategy_choices_exclude_replace():
|
||||
assert CONFLICT_STRATEGY_CHOICES == ["fail", "skip", "update"]
|
||||
|
||||
|
||||
def test_prompt_app_ids_explains_comma_selection_and_default(monkeypatch):
|
||||
from commands.data_migration import _prompt_app_ids
|
||||
|
||||
prompts = []
|
||||
output_lines = []
|
||||
apps = [
|
||||
type("App", (), {"id": "app-1", "name": "embedded", "mode": "workflow"})(),
|
||||
type("App", (), {"id": "app-2", "name": "main", "mode": "advanced-chat"})(),
|
||||
]
|
||||
|
||||
def capture_prompt(text, **kwargs):
|
||||
prompts.append((text, kwargs))
|
||||
return "1,2"
|
||||
|
||||
monkeypatch.setattr("commands.data_migration.click.echo", output_lines.append)
|
||||
monkeypatch.setattr("commands.data_migration.click.prompt", capture_prompt)
|
||||
|
||||
assert _prompt_app_ids(apps) == ["app-1", "app-2"]
|
||||
assert prompts == [("Select apps by number, comma-separated numbers, or all", {"default": "all"})]
|
||||
assert "Currently supported app types: workflow and chatflow." in output_lines
|
||||
|
||||
|
||||
def test_prompt_tool_category_marks_auto_discovered_tools(monkeypatch):
|
||||
output_lines = []
|
||||
|
||||
monkeypatch.setattr("commands.data_migration.click.echo", output_lines.append)
|
||||
monkeypatch.setattr("commands.data_migration.click.prompt", lambda *args, **kwargs: "")
|
||||
|
||||
selected = _prompt_tool_category(
|
||||
"Custom API tools",
|
||||
[("weather", "weather", "tool-id"), ("calendar", "calendar", "calendar-id")],
|
||||
auto_tools={"weather": "tool-id"},
|
||||
)
|
||||
|
||||
assert selected == []
|
||||
assert "1. [auto] weather (tool-id)" in output_lines
|
||||
assert "2. [ ] calendar (calendar-id)" in output_lines
|
||||
assert output_lines[:2] == ["", "==== Custom API tools ===="]
|
||||
|
||||
|
||||
def test_prompt_tool_category_explains_comma_selection_and_default(monkeypatch):
|
||||
prompts = []
|
||||
|
||||
def capture_prompt(text, **kwargs):
|
||||
prompts.append((text, kwargs))
|
||||
return ""
|
||||
|
||||
monkeypatch.setattr("commands.data_migration.click.echo", lambda *_args, **_kwargs: None)
|
||||
monkeypatch.setattr("commands.data_migration.click.prompt", capture_prompt)
|
||||
|
||||
selected = _prompt_tool_category(
|
||||
"Custom API tools",
|
||||
[("weather", "weather", "tool-id")],
|
||||
auto_tools={},
|
||||
)
|
||||
|
||||
assert selected == []
|
||||
assert prompts == [
|
||||
(
|
||||
"Select custom api tools by number, comma-separated numbers, all, or empty",
|
||||
{"default": "", "show_default": "empty"},
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def test_prompt_output_file_shows_default(monkeypatch):
|
||||
prompts = []
|
||||
|
||||
def capture_prompt(text, **kwargs):
|
||||
prompts.append((text, kwargs))
|
||||
return "migration-data.json"
|
||||
|
||||
monkeypatch.setattr("commands.data_migration.click.prompt", capture_prompt)
|
||||
|
||||
assert _prompt_output_file() == ("migration-data.json", False)
|
||||
assert prompts[0][0] == "Output path"
|
||||
assert prompts[0][1]["show_default"] is True
|
||||
|
||||
|
||||
def test_prompt_tool_category_marks_auto_by_detail_and_supports_multi_select(monkeypatch):
|
||||
monkeypatch.setattr("commands.data_migration.click.echo", lambda *_args, **_kwargs: None)
|
||||
monkeypatch.setattr("commands.data_migration.click.prompt", lambda *args, **kwargs: "1,2")
|
||||
|
||||
selected = _prompt_tool_category(
|
||||
"Workflow tools",
|
||||
[("tool-1", "embedded", "app-1"), ("tool-2", "other", "app-2")],
|
||||
auto_tools={"embedded": "app-1"},
|
||||
)
|
||||
|
||||
assert selected == ["tool-1", "tool-2"]
|
||||
|
||||
|
||||
def test_prompt_tool_category_marks_auto_by_value():
|
||||
output_lines = []
|
||||
|
||||
from commands import data_migration
|
||||
|
||||
original_echo = data_migration.click.echo
|
||||
original_prompt = data_migration.click.prompt
|
||||
data_migration.click.echo = output_lines.append
|
||||
data_migration.click.prompt = lambda *args, **kwargs: ""
|
||||
try:
|
||||
_prompt_tool_category(
|
||||
"Workflow tools",
|
||||
[("tool-1", "embedded_workflow_as_tool", "tool-1")],
|
||||
auto_tools={"embedded_workflow_as_tool": "tool-1"},
|
||||
)
|
||||
finally:
|
||||
data_migration.click.echo = original_echo
|
||||
data_migration.click.prompt = original_prompt
|
||||
|
||||
assert "1. [auto] embedded_workflow_as_tool (tool-1)" in output_lines
|
||||
|
||||
|
||||
def test_print_auto_tools_lists_each_category(monkeypatch):
|
||||
output_lines = []
|
||||
|
||||
monkeypatch.setattr("commands.data_migration.click.echo", output_lines.append)
|
||||
|
||||
_print_auto_tools(
|
||||
{
|
||||
"api_tools": {"weather": "3bac3aa9-dd87-4351-9459-a7099137b028"},
|
||||
"workflow_tools": {"embedded_workflow_as_tool": "e6024578-41b7-4fb5-a81f-9201358e5835"},
|
||||
"mcp_tools": {},
|
||||
}
|
||||
)
|
||||
|
||||
assert "Automatically discovered tools:" in output_lines
|
||||
assert "Custom API tools" in output_lines
|
||||
assert "- weather: 3bac3aa9-dd87-4351-9459-a7099137b028" in output_lines
|
||||
assert "Workflow tools" in output_lines
|
||||
assert "- embedded_workflow_as_tool: e6024578-41b7-4fb5-a81f-9201358e5835" in output_lines
|
||||
assert "MCP tools" in output_lines
|
||||
assert "- none" in output_lines
|
||||
|
||||
|
||||
def test_resolve_mcp_tool_names_does_not_compare_non_uuid_identifier_to_uuid_id(monkeypatch):
|
||||
statements = []
|
||||
|
||||
def capture_scalar(statement):
|
||||
statements.append(str(statement))
|
||||
|
||||
monkeypatch.setattr("commands.data_migration.db.session.scalar", capture_scalar)
|
||||
|
||||
assert _resolve_mcp_tool_names("49a99e46-bc2c-4885-91fa-47615f6192b5", {"my-test-mcp": "my-test-mcp"}) == {
|
||||
"my-test-mcp": "my-test-mcp"
|
||||
}
|
||||
assert "tool_mcp_providers.id =" not in statements[0]
|
||||
assert "tool_mcp_providers.server_identifier =" in statements[0]
|
||||
|
||||
|
||||
def test_prompt_additional_tools_prints_final_selection_when_skipped(monkeypatch):
|
||||
output_lines = []
|
||||
confirm_prompts = []
|
||||
|
||||
def capture_confirm(prompt, **kwargs):
|
||||
confirm_prompts.append((prompt, kwargs))
|
||||
return False
|
||||
|
||||
monkeypatch.setattr("commands.data_migration.click.confirm", capture_confirm)
|
||||
monkeypatch.setattr("commands.data_migration.click.echo", output_lines.append)
|
||||
|
||||
selected = _prompt_additional_tools(
|
||||
"tenant-id",
|
||||
{
|
||||
"api_tools": {"weather": "3bac3aa9-dd87-4351-9459-a7099137b028"},
|
||||
"workflow_tools": {},
|
||||
"mcp_tools": {},
|
||||
},
|
||||
)
|
||||
|
||||
assert selected == {"api_tools": [], "workflow_tools": [], "mcp_tools": []}
|
||||
assert confirm_prompts == [
|
||||
("Export additional tools manually? [y/n, default: n]", {"default": False, "show_default": False})
|
||||
]
|
||||
assert "Final tools to export:" in output_lines
|
||||
assert "- [auto] weather: 3bac3aa9-dd87-4351-9459-a7099137b028" in output_lines
|
||||
|
||||
|
||||
def test_final_tool_selection_deduplicates_manual_tool_already_auto(monkeypatch):
|
||||
output_lines = []
|
||||
|
||||
monkeypatch.setattr("commands.data_migration.click.echo", output_lines.append)
|
||||
|
||||
_print_final_tool_selection(
|
||||
{
|
||||
"api_tools": {},
|
||||
"workflow_tools": {"embedded_workflow_as_tool": "e6024578-41b7-4fb5-a81f-9201358e5835"},
|
||||
"mcp_tools": {},
|
||||
},
|
||||
{
|
||||
"api_tools": [],
|
||||
"workflow_tools": ["e6024578-41b7-4fb5-a81f-9201358e5835"],
|
||||
"mcp_tools": [],
|
||||
},
|
||||
{"e6024578-41b7-4fb5-a81f-9201358e5835": "embedded_workflow_as_tool: e6024578"},
|
||||
)
|
||||
|
||||
assert "- [auto] embedded_workflow_as_tool: e6024578-41b7-4fb5-a81f-9201358e5835" in output_lines
|
||||
assert not any(line.startswith("- [manual]") for line in output_lines)
|
||||
|
||||
|
||||
def test_prompt_output_file_rejects_yes_no_typo(monkeypatch):
|
||||
import click
|
||||
import pytest
|
||||
|
||||
monkeypatch.setattr("commands.data_migration.click.prompt", lambda *args, **kwargs: "y")
|
||||
|
||||
with pytest.raises(click.ClickException, match="Output path must be a file path"):
|
||||
_prompt_output_file()
|
||||
|
||||
|
||||
def test_confirm_wizard_summary_shows_conflict_strategy(monkeypatch):
|
||||
output_lines = []
|
||||
confirm_prompts = []
|
||||
|
||||
monkeypatch.setattr("commands.data_migration.click.echo", output_lines.append)
|
||||
monkeypatch.setattr(
|
||||
"commands.data_migration.click.confirm",
|
||||
lambda prompt, **kwargs: confirm_prompts.append((prompt, kwargs)) or True,
|
||||
)
|
||||
|
||||
_confirm_wizard_summary(
|
||||
tenant_name="admin's Workspace",
|
||||
app_names=["main_chatflow"],
|
||||
auto_tools={"api_tools": {}, "workflow_tools": {}, "mcp_tools": {}},
|
||||
additional_tools={"api_tools": [], "workflow_tools": [], "mcp_tools": []},
|
||||
manual_labels={},
|
||||
include_referenced_tools=True,
|
||||
include_secrets=False,
|
||||
create_tokens=True,
|
||||
id_strategy="preserve-id",
|
||||
conflict_strategy="fail",
|
||||
output_file="migration-data.json",
|
||||
)
|
||||
|
||||
assert "id strategy: preserve-id" in output_lines
|
||||
assert "conflict strategy: fail" in output_lines
|
||||
assert confirm_prompts == [("Write migration package? [y/n, default: y]", {"default": True, "show_default": False})]
|
||||
|
||||
|
||||
def test_confirm_wizard_summary_shows_final_deduplicated_tool_selection(monkeypatch):
|
||||
output_lines = []
|
||||
|
||||
monkeypatch.setattr("commands.data_migration.click.echo", output_lines.append)
|
||||
monkeypatch.setattr("commands.data_migration.click.confirm", lambda *args, **kwargs: True)
|
||||
|
||||
_confirm_wizard_summary(
|
||||
tenant_name="admin's Workspace",
|
||||
app_names=["main_chatflow"],
|
||||
auto_tools={
|
||||
"api_tools": {"weather": "weather-id"},
|
||||
"workflow_tools": {"embedded_workflow_as_tool": "workflow-tool-id"},
|
||||
"mcp_tools": {},
|
||||
},
|
||||
additional_tools={
|
||||
"api_tools": ["weather-id", "calendar"],
|
||||
"workflow_tools": [],
|
||||
"mcp_tools": ["mcp-id"],
|
||||
},
|
||||
manual_labels={
|
||||
"calendar": "calendar: calendar-id",
|
||||
"mcp-id": "my-test-mcp: mcp-id",
|
||||
},
|
||||
include_referenced_tools=True,
|
||||
include_secrets=False,
|
||||
create_tokens=False,
|
||||
id_strategy="preserve-id",
|
||||
conflict_strategy="update",
|
||||
output_file="migration-data.json",
|
||||
)
|
||||
|
||||
assert "Final tools to export:" in output_lines
|
||||
assert "Custom API tools" in output_lines
|
||||
assert "- [auto] weather: weather-id" in output_lines
|
||||
assert "- [manual] calendar: calendar-id" in output_lines
|
||||
assert "Workflow tools" in output_lines
|
||||
assert "- [auto] embedded_workflow_as_tool: workflow-tool-id" in output_lines
|
||||
assert "MCP tools" in output_lines
|
||||
assert "- [manual] my-test-mcp: mcp-id" in output_lines
|
||||
assert not any(line.startswith("additional api tools:") for line in output_lines)
|
||||
assert not any(line.startswith("additional workflow tools:") for line in output_lines)
|
||||
assert not any(line.startswith("additional mcp tools:") for line in output_lines)
|
||||
assert "- [manual] weather-id" not in output_lines
|
||||
|
||||
|
||||
def test_import_options_prompts_explain_secrets_reuse_and_conflicts(monkeypatch):
|
||||
from commands.data_migration import _prompt_import_options
|
||||
|
||||
output_lines = []
|
||||
confirm_prompts = []
|
||||
prompt_calls = []
|
||||
|
||||
def capture_confirm(prompt, **kwargs):
|
||||
confirm_prompts.append((prompt, kwargs))
|
||||
return False
|
||||
|
||||
def capture_prompt(prompt, **kwargs):
|
||||
prompt_calls.append((prompt, kwargs))
|
||||
return kwargs["default"]
|
||||
|
||||
monkeypatch.setattr("commands.data_migration.click.echo", output_lines.append)
|
||||
monkeypatch.setattr("commands.data_migration.click.confirm", capture_confirm)
|
||||
monkeypatch.setattr("commands.data_migration.click.prompt", capture_prompt)
|
||||
|
||||
include_secrets, create_tokens, id_strategy, conflict_strategy = _prompt_import_options()
|
||||
|
||||
assert include_secrets is False
|
||||
assert create_tokens is False
|
||||
assert id_strategy == "preserve-id"
|
||||
assert conflict_strategy == "update"
|
||||
assert "Secrets include workflow/app DSL secret values, custom API tool credentials," in output_lines
|
||||
assert "-- Secrets --" in output_lines
|
||||
assert "If you choose no, credentials are omitted or masked," in output_lines
|
||||
assert "-- App API Tokens --" in output_lines
|
||||
assert "When enabled, import will create an app API token if the imported app has none," in output_lines
|
||||
assert "or reuse an existing app API token if one already exists." in output_lines
|
||||
assert "-- ID Strategy --" in output_lines
|
||||
assert "ID strategy controls whether imported app and tool IDs preserve source IDs" in output_lines
|
||||
assert "or use target-generated IDs." in output_lines
|
||||
assert "preserve-id: keep source IDs where the target service supports it." in output_lines
|
||||
assert (
|
||||
"generate-new-id: let the target environment generate new IDs and rewrite references via mapping."
|
||||
in output_lines
|
||||
)
|
||||
assert "-- Conflict Strategy --" in output_lines
|
||||
assert "Conflict strategy controls what import does when a target resource already exists." in output_lines
|
||||
assert "fail: stop at the first conflict; previously committed resources are not rolled back." in output_lines
|
||||
assert "skip: keep the existing target resource and skip importing that resource." in output_lines
|
||||
assert "update: update the existing target resource in place." in output_lines
|
||||
assert confirm_prompts == [
|
||||
("Include secrets in output JSON? [y/n, default: n]", {"default": False, "show_default": False}),
|
||||
("Create or reuse app API tokens during import? [y/n, default: n]", {"default": False, "show_default": False}),
|
||||
]
|
||||
assert prompt_calls[0][0] == "Import ID strategy. Enter one of: preserve-id, generate-new-id"
|
||||
assert prompt_calls[0][1]["default"] == "preserve-id"
|
||||
assert prompt_calls[0][1]["show_default"] is True
|
||||
assert prompt_calls[0][1]["type"].choices == ID_STRATEGY_CHOICES
|
||||
assert prompt_calls[1][0] == "Import conflict strategy. Enter one of: fail, skip, update"
|
||||
assert prompt_calls[1][1]["default"] == "update"
|
||||
assert prompt_calls[1][1]["show_default"] is True
|
||||
assert prompt_calls[1][1]["type"].choices == CONFLICT_STRATEGY_CHOICES
|
||||
@ -1036,6 +1036,48 @@ class TestSegmentListAdvancedCases:
|
||||
assert status == 200
|
||||
assert response["total"] == 1
|
||||
|
||||
def test_segment_list_postgres_keyword_filter_handles_scalar_keywords(self, app: Flask):
|
||||
api = DatasetDocumentSegmentListApi()
|
||||
method = unwrap(api.get)
|
||||
|
||||
dataset = MagicMock()
|
||||
document = MagicMock()
|
||||
pagination = MagicMock(items=[], total=0, pages=0)
|
||||
|
||||
with (
|
||||
app.test_request_context("/?keyword=test"),
|
||||
patch(
|
||||
"controllers.console.datasets.datasets_segments.current_account_with_tenant",
|
||||
return_value=(MagicMock(), "11111111-1111-1111-1111-111111111111"),
|
||||
),
|
||||
patch(
|
||||
"controllers.console.datasets.datasets_segments.DatasetService.get_dataset",
|
||||
return_value=dataset,
|
||||
),
|
||||
patch(
|
||||
"controllers.console.datasets.datasets_segments.DatasetService.check_dataset_permission",
|
||||
return_value=None,
|
||||
),
|
||||
patch(
|
||||
"controllers.console.datasets.datasets_segments.DocumentService.get_document",
|
||||
return_value=document,
|
||||
),
|
||||
patch(
|
||||
"controllers.console.datasets.datasets_segments.dify_config",
|
||||
SimpleNamespace(SQLALCHEMY_DATABASE_URI_SCHEME="postgresql"),
|
||||
),
|
||||
patch(
|
||||
"controllers.console.datasets.datasets_segments.db.paginate",
|
||||
return_value=pagination,
|
||||
) as paginate_mock,
|
||||
):
|
||||
method(api, "22222222-2222-2222-2222-222222222222", "33333333-3333-3333-3333-333333333333")
|
||||
|
||||
query = paginate_mock.call_args.kwargs["select"]
|
||||
sql = str(query.compile(compile_kwargs={"literal_binds": True}))
|
||||
assert "jsonb_array_elements_text(CASE" in sql
|
||||
assert "ELSE CAST('[]' AS JSONB)" in sql
|
||||
|
||||
def test_segment_list_permission_denied(self, app: Flask):
|
||||
"""Test segment list with permission denied"""
|
||||
api = DatasetDocumentSegmentListApi()
|
||||
|
||||
@ -3,22 +3,18 @@ from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from flask import Flask
|
||||
from werkzeug.exceptions import HTTPException
|
||||
|
||||
import services
|
||||
from controllers.console.auth.error import (
|
||||
CannotTransferOwnerToSelfError,
|
||||
EmailCodeError,
|
||||
InvalidEmailError,
|
||||
InvalidTokenError,
|
||||
MemberNotInTenantError,
|
||||
NotOwnerError,
|
||||
OwnerTransferLimitError,
|
||||
)
|
||||
from controllers.console.error import EmailSendIpLimitError, WorkspaceMembersLimitExceeded
|
||||
from controllers.console.workspace.members import (
|
||||
DatasetOperatorMemberListApi,
|
||||
MemberCancelInviteApi,
|
||||
MemberInviteEmailApi,
|
||||
MemberListApi,
|
||||
MemberUpdateRoleApi,
|
||||
@ -251,135 +247,7 @@ class TestMemberInviteEmailApi:
|
||||
assert result["invitation_results"][0]["status"] == "failed"
|
||||
|
||||
|
||||
class TestMemberCancelInviteApi:
|
||||
def test_cancel_success(self, app: Flask):
|
||||
api = MemberCancelInviteApi()
|
||||
method = unwrap(api.delete)
|
||||
|
||||
tenant = MagicMock(id="t1")
|
||||
user = MagicMock(current_tenant=tenant)
|
||||
member = MagicMock()
|
||||
|
||||
with (
|
||||
app.test_request_context("/"),
|
||||
patch("controllers.console.workspace.members.current_account_with_tenant", return_value=(user, "t1")),
|
||||
patch("controllers.console.workspace.members.db.session.get") as get_mock,
|
||||
patch("controllers.console.workspace.members.TenantService.remove_member_from_tenant"),
|
||||
):
|
||||
get_mock.return_value = member
|
||||
result, status = method(api, member.id)
|
||||
|
||||
assert status == 200
|
||||
assert result["result"] == "success"
|
||||
|
||||
def test_cancel_not_found(self, app: Flask):
|
||||
api = MemberCancelInviteApi()
|
||||
method = unwrap(api.delete)
|
||||
|
||||
tenant = MagicMock(id="t1")
|
||||
user = MagicMock(current_tenant=tenant)
|
||||
|
||||
with (
|
||||
app.test_request_context("/"),
|
||||
patch("controllers.console.workspace.members.current_account_with_tenant", return_value=(user, "t1")),
|
||||
patch("controllers.console.workspace.members.db.session.get") as get_mock,
|
||||
):
|
||||
get_mock.return_value = None
|
||||
|
||||
with pytest.raises(HTTPException):
|
||||
method(api, "x")
|
||||
|
||||
def test_cancel_cannot_operate_self(self, app: Flask):
|
||||
api = MemberCancelInviteApi()
|
||||
method = unwrap(api.delete)
|
||||
|
||||
tenant = MagicMock(id="t1")
|
||||
user = MagicMock(current_tenant=tenant)
|
||||
member = MagicMock()
|
||||
|
||||
with (
|
||||
app.test_request_context("/"),
|
||||
patch("controllers.console.workspace.members.current_account_with_tenant", return_value=(user, "t1")),
|
||||
patch("controllers.console.workspace.members.db.session.get") as get_mock,
|
||||
patch(
|
||||
"controllers.console.workspace.members.TenantService.remove_member_from_tenant",
|
||||
side_effect=services.errors.account.CannotOperateSelfError("x"),
|
||||
),
|
||||
):
|
||||
get_mock.return_value = member
|
||||
result, status = method(api, member.id)
|
||||
|
||||
assert status == 400
|
||||
|
||||
def test_cancel_no_permission(self, app: Flask):
|
||||
api = MemberCancelInviteApi()
|
||||
method = unwrap(api.delete)
|
||||
|
||||
tenant = MagicMock(id="t1")
|
||||
user = MagicMock(current_tenant=tenant)
|
||||
member = MagicMock()
|
||||
|
||||
with (
|
||||
app.test_request_context("/"),
|
||||
patch("controllers.console.workspace.members.current_account_with_tenant", return_value=(user, "t1")),
|
||||
patch("controllers.console.workspace.members.db.session.get") as get_mock,
|
||||
patch(
|
||||
"controllers.console.workspace.members.TenantService.remove_member_from_tenant",
|
||||
side_effect=services.errors.account.NoPermissionError("x"),
|
||||
),
|
||||
):
|
||||
get_mock.return_value = member
|
||||
result, status = method(api, member.id)
|
||||
|
||||
assert status == 403
|
||||
|
||||
def test_cancel_member_not_in_tenant(self, app: Flask):
|
||||
api = MemberCancelInviteApi()
|
||||
method = unwrap(api.delete)
|
||||
|
||||
tenant = MagicMock(id="t1")
|
||||
user = MagicMock(current_tenant=tenant)
|
||||
member = MagicMock()
|
||||
|
||||
with (
|
||||
app.test_request_context("/"),
|
||||
patch("controllers.console.workspace.members.current_account_with_tenant", return_value=(user, "t1")),
|
||||
patch("controllers.console.workspace.members.db.session.get") as get_mock,
|
||||
patch(
|
||||
"controllers.console.workspace.members.TenantService.remove_member_from_tenant",
|
||||
side_effect=services.errors.account.MemberNotInTenantError(),
|
||||
),
|
||||
):
|
||||
get_mock.return_value = member
|
||||
result, status = method(api, member.id)
|
||||
|
||||
assert status == 404
|
||||
|
||||
|
||||
class TestMemberUpdateRoleApi:
|
||||
def test_update_success(self, app: Flask):
|
||||
api = MemberUpdateRoleApi()
|
||||
method = unwrap(api.put)
|
||||
|
||||
tenant = MagicMock()
|
||||
user = MagicMock(current_tenant=tenant)
|
||||
member = MagicMock()
|
||||
|
||||
payload = {"role": "normal"}
|
||||
|
||||
with (
|
||||
app.test_request_context("/", json=payload),
|
||||
patch("controllers.console.workspace.members.current_account_with_tenant", return_value=(user, "t1")),
|
||||
patch("controllers.console.workspace.members.db.session.get", return_value=member),
|
||||
patch("controllers.console.workspace.members.TenantService.update_member_role"),
|
||||
):
|
||||
result = method(api, "id")
|
||||
|
||||
if isinstance(result, tuple):
|
||||
result = result[0]
|
||||
|
||||
assert result["result"] == "success"
|
||||
|
||||
def test_update_invalid_role(self, app: Flask):
|
||||
api = MemberUpdateRoleApi()
|
||||
method = unwrap(api.put)
|
||||
@ -391,23 +259,6 @@ class TestMemberUpdateRoleApi:
|
||||
|
||||
assert status == 400
|
||||
|
||||
def test_update_member_not_found(self, app: Flask):
|
||||
api = MemberUpdateRoleApi()
|
||||
method = unwrap(api.put)
|
||||
|
||||
payload = {"role": "normal"}
|
||||
|
||||
with (
|
||||
app.test_request_context("/", json=payload),
|
||||
patch(
|
||||
"controllers.console.workspace.members.current_account_with_tenant",
|
||||
return_value=(MagicMock(current_tenant=MagicMock()), "t1"),
|
||||
),
|
||||
patch("controllers.console.workspace.members.db.session.get", return_value=None),
|
||||
):
|
||||
with pytest.raises(HTTPException):
|
||||
method(api, "id")
|
||||
|
||||
|
||||
class TestDatasetOperatorMemberListApi:
|
||||
def test_get_success(self, app: Flask):
|
||||
@ -637,27 +488,3 @@ class TestOwnerTransferApi:
|
||||
):
|
||||
with pytest.raises(InvalidTokenError):
|
||||
method(api, "2")
|
||||
|
||||
def test_member_not_in_tenant(self, app: Flask):
|
||||
api = OwnerTransfer()
|
||||
method = unwrap(api.post)
|
||||
|
||||
tenant = MagicMock()
|
||||
user = MagicMock(id="1", email="a@test.com", current_tenant=tenant)
|
||||
member = MagicMock()
|
||||
|
||||
payload = {"token": "t"}
|
||||
|
||||
with (
|
||||
app.test_request_context("/", json=payload),
|
||||
patch("controllers.console.workspace.members.current_account_with_tenant", return_value=(user, "t1")),
|
||||
patch("controllers.console.workspace.members.TenantService.is_owner", return_value=True),
|
||||
patch(
|
||||
"controllers.console.workspace.members.AccountService.get_owner_transfer_data",
|
||||
return_value={"email": "a@test.com"},
|
||||
),
|
||||
patch("controllers.console.workspace.members.db.session.get", return_value=member),
|
||||
patch("controllers.console.workspace.members.TenantService.is_member", return_value=False),
|
||||
):
|
||||
with pytest.raises(MemberNotInTenantError):
|
||||
method(api, "2")
|
||||
|
||||
@ -0,0 +1,42 @@
|
||||
import logging
|
||||
from types import SimpleNamespace
|
||||
|
||||
from core.tools.errors import ToolProviderNotFoundError
|
||||
from events.event_handlers import delete_tool_parameters_cache_when_sync_draft_workflow as handler_module
|
||||
|
||||
|
||||
def test_missing_tool_provider_does_not_log_error_traceback(monkeypatch, caplog):
|
||||
app = SimpleNamespace(id="workflow-id", tenant_id="tenant-id")
|
||||
workflow = SimpleNamespace(
|
||||
graph_dict={
|
||||
"nodes": [
|
||||
{
|
||||
"id": "node-id",
|
||||
"data": {
|
||||
"type": "tool",
|
||||
},
|
||||
}
|
||||
]
|
||||
}
|
||||
)
|
||||
tool_entity = SimpleNamespace(
|
||||
provider_type=SimpleNamespace(value="mcp"),
|
||||
provider_id="my-test-mcp-server",
|
||||
provider_name="my-test-mcp-server",
|
||||
tool_name="echo",
|
||||
credential_id=None,
|
||||
)
|
||||
|
||||
monkeypatch.setattr(handler_module, "adapt_node_config_for_graph", lambda node_data: {"data": node_data["data"]})
|
||||
monkeypatch.setattr(handler_module.ToolEntity, "model_validate", lambda data: tool_entity)
|
||||
monkeypatch.setattr(
|
||||
handler_module.ToolManager,
|
||||
"get_tool_runtime",
|
||||
lambda **kwargs: (_ for _ in ()).throw(ToolProviderNotFoundError("mcp provider not found")),
|
||||
)
|
||||
|
||||
with caplog.at_level(logging.INFO, logger=handler_module.logger.name):
|
||||
handler_module.handle(app, synced_draft_workflow=workflow)
|
||||
|
||||
assert not [record for record in caplog.records if record.levelno >= logging.ERROR]
|
||||
assert "Skipped deleting tool parameters cache" in caplog.text
|
||||
@ -0,0 +1,115 @@
|
||||
from services.data_migration.dependency_discovery_service import DependencyDiscoveryService
|
||||
from services.data_migration.entities import DependencyKind
|
||||
|
||||
|
||||
def test_discovers_and_deduplicates_standalone_tool_nodes():
|
||||
graph = {
|
||||
"graph": {
|
||||
"nodes": [
|
||||
{"data": {"type": "tool", "provider_type": "api", "provider_id": "weather"}},
|
||||
{"data": {"type": "tool", "provider_type": "api", "provider_id": "weather"}},
|
||||
{"data": {"type": "tool", "provider_type": "workflow", "provider_id": "wf-tool-1"}},
|
||||
{"data": {"type": "tool", "provider_type": "builtin", "provider_id": "google_search"}},
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
dependencies = DependencyDiscoveryService().discover_from_dsl(graph)
|
||||
|
||||
assert [(item.kind, item.provider_id) for item in dependencies] == [
|
||||
(DependencyKind.API_TOOL, "weather"),
|
||||
(DependencyKind.WORKFLOW_TOOL, "wf-tool-1"),
|
||||
(DependencyKind.BUILTIN_OR_PLUGIN_TOOL, "google_search"),
|
||||
]
|
||||
|
||||
|
||||
def test_discovers_agent_node_tools():
|
||||
graph = {
|
||||
"graph": {
|
||||
"nodes": [
|
||||
{
|
||||
"data": {
|
||||
"type": "agent",
|
||||
"tools": [
|
||||
{"provider_type": "mcp", "provider_id": "mcp-1"},
|
||||
{"provider_type": "api", "provider_id": "api-1"},
|
||||
],
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
dependencies = DependencyDiscoveryService().discover_from_dsl(graph)
|
||||
|
||||
assert [(item.kind, item.provider_id) for item in dependencies] == [
|
||||
(DependencyKind.MCP_TOOL, "mcp-1"),
|
||||
(DependencyKind.API_TOOL, "api-1"),
|
||||
]
|
||||
|
||||
|
||||
def test_discovers_tool_nodes_from_exported_workflow_dsl_shape():
|
||||
dsl = {
|
||||
"workflow": {
|
||||
"graph": {
|
||||
"nodes": [
|
||||
{
|
||||
"data": {
|
||||
"type": "tool",
|
||||
"provider_type": "api",
|
||||
"provider_id": "api-provider-id",
|
||||
"provider_name": "weather",
|
||||
}
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"type": "tool",
|
||||
"provider_type": "workflow",
|
||||
"provider_id": "workflow-tool-id",
|
||||
"provider_name": "embedded_workflow",
|
||||
}
|
||||
},
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
dependencies = DependencyDiscoveryService().discover_from_dsl(dsl)
|
||||
|
||||
assert [(item.kind, item.provider_id, item.provider_name) for item in dependencies] == [
|
||||
(DependencyKind.API_TOOL, "api-provider-id", "weather"),
|
||||
(DependencyKind.WORKFLOW_TOOL, "workflow-tool-id", "embedded_workflow"),
|
||||
]
|
||||
|
||||
|
||||
def test_discovers_agent_tools_from_exported_agent_parameter_shape():
|
||||
dsl = {
|
||||
"workflow": {
|
||||
"graph": {
|
||||
"nodes": [
|
||||
{
|
||||
"data": {
|
||||
"type": "agent",
|
||||
"agent_parameters": {
|
||||
"tools": {
|
||||
"value": [
|
||||
{
|
||||
"provider_type": "api",
|
||||
"provider_id": "api-provider-id",
|
||||
"provider_name": "weather",
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
dependencies = DependencyDiscoveryService().discover_from_dsl(dsl)
|
||||
|
||||
assert [(item.kind, item.provider_id, item.provider_name) for item in dependencies] == [
|
||||
(DependencyKind.API_TOOL, "api-provider-id", "weather"),
|
||||
]
|
||||
@ -0,0 +1,71 @@
|
||||
import pytest
|
||||
|
||||
from services.data_migration.entities import (
|
||||
ConflictStrategy,
|
||||
IdStrategy,
|
||||
ImportOptions,
|
||||
MigrationDataError,
|
||||
MigrationMetadata,
|
||||
MigrationPackage,
|
||||
SourceTenant,
|
||||
)
|
||||
|
||||
|
||||
def test_import_options_defaults_match_spec():
|
||||
options = ImportOptions()
|
||||
|
||||
assert options.create_app_api_token_on_import is False
|
||||
assert options.id_strategy == IdStrategy.PRESERVE_ID
|
||||
assert options.conflict_strategy == ConflictStrategy.FAIL
|
||||
|
||||
|
||||
def test_metadata_requires_version():
|
||||
with pytest.raises(MigrationDataError, match="metadata.version"):
|
||||
MigrationMetadata.from_mapping({})
|
||||
|
||||
|
||||
def test_metadata_parses_source_tenants_and_import_options():
|
||||
metadata = MigrationMetadata.from_mapping(
|
||||
{
|
||||
"version": "1",
|
||||
"source_scope": "single",
|
||||
"source_tenants": [{"id": "tenant-1", "name": "source"}],
|
||||
"target_tenant": {"name": "prod"},
|
||||
"include_secrets": False,
|
||||
"import_options": {"conflict_strategy": "update"},
|
||||
}
|
||||
)
|
||||
|
||||
assert metadata.version == "1"
|
||||
assert metadata.source_scope == "single"
|
||||
assert metadata.source_tenants == [SourceTenant(id="tenant-1", name="source")]
|
||||
assert metadata.target_tenant == {"name": "prod"}
|
||||
assert metadata.import_options.conflict_strategy == ConflictStrategy.UPDATE
|
||||
assert metadata.import_options.id_strategy == IdStrategy.PRESERVE_ID
|
||||
|
||||
|
||||
def test_import_options_invalid_strategy_raises_domain_error():
|
||||
with pytest.raises(MigrationDataError, match="id_strategy"):
|
||||
ImportOptions.from_mapping({"id_strategy": "unknown"})
|
||||
|
||||
with pytest.raises(MigrationDataError, match="id_strategy"):
|
||||
ImportOptions.from_mapping({"id_strategy": "map-id"})
|
||||
|
||||
with pytest.raises(MigrationDataError, match="conflict_strategy"):
|
||||
ImportOptions.from_mapping({"conflict_strategy": "unknown"})
|
||||
|
||||
with pytest.raises(MigrationDataError, match="conflict_strategy"):
|
||||
ImportOptions.from_mapping({"conflict_strategy": "replace"})
|
||||
|
||||
|
||||
def test_metadata_rejects_invalid_target_tenant_shape():
|
||||
with pytest.raises(MigrationDataError, match="target_tenant"):
|
||||
MigrationMetadata.from_mapping({"version": "1", "target_tenant": "prod"})
|
||||
|
||||
|
||||
def test_migration_package_sections_must_be_lists_of_objects():
|
||||
with pytest.raises(MigrationDataError, match="workflows"):
|
||||
MigrationPackage.from_mapping({"metadata": {"version": "1"}, "workflows": {"id": "app-1"}})
|
||||
|
||||
with pytest.raises(MigrationDataError, match="tools"):
|
||||
MigrationPackage.from_mapping({"metadata": {"version": "1"}, "tools": ["weather"]})
|
||||
@ -0,0 +1,201 @@
|
||||
import pytest
|
||||
|
||||
from services.data_migration.dependency_discovery_service import DiscoveredDependency
|
||||
from services.data_migration.entities import (
|
||||
ConflictStrategy,
|
||||
DependencyKind,
|
||||
IdStrategy,
|
||||
MigrationDataError,
|
||||
ResourceType,
|
||||
)
|
||||
from services.data_migration.export_service import ExportConfigParser, MigrationExportService
|
||||
|
||||
|
||||
def test_export_config_parser_accepts_new_scripted_shape():
|
||||
selection = ExportConfigParser().parse(
|
||||
{
|
||||
"source_tenant": {"mode": "single", "name": "admin's Workspace"},
|
||||
"apps": {"modes": ["workflow", "advanced-chat"], "ids": ["app-1"], "all": False},
|
||||
"include_referenced_tools": True,
|
||||
"additional_tools": {
|
||||
"api_tools": ["weather"],
|
||||
"workflow_tools": ["workflow-tool-1"],
|
||||
"mcp_tools": ["mcp-1"],
|
||||
},
|
||||
"include_secrets": False,
|
||||
"import_options": {
|
||||
"create_app_api_token_on_import": True,
|
||||
"id_strategy": "preserve-id",
|
||||
"conflict_strategy": "fail",
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
assert selection.source_tenant_name == "admin's Workspace"
|
||||
assert selection.app_ids == ["app-1"]
|
||||
assert selection.export_all_apps is False
|
||||
assert selection.additional_api_tools == ["weather"]
|
||||
assert selection.additional_workflow_tools == ["workflow-tool-1"]
|
||||
assert selection.additional_mcp_tools == ["mcp-1"]
|
||||
assert selection.include_secrets is False
|
||||
assert selection.import_options.create_app_api_token_on_import is True
|
||||
assert selection.import_options.id_strategy == IdStrategy.PRESERVE_ID
|
||||
assert selection.import_options.conflict_strategy == ConflictStrategy.FAIL
|
||||
|
||||
|
||||
def test_export_config_parser_defaults_to_secret_free_all_apps():
|
||||
selection = ExportConfigParser().parse(
|
||||
{
|
||||
"source_tenant": {"name": "source"},
|
||||
"apps": {"all": True},
|
||||
}
|
||||
)
|
||||
|
||||
assert selection.export_all_apps is True
|
||||
assert selection.include_referenced_tools is True
|
||||
assert selection.include_secrets is False
|
||||
|
||||
|
||||
def test_export_config_parser_requires_explicit_source_tenant_name_for_new_shape():
|
||||
with pytest.raises(MigrationDataError, match="source_tenant.name"):
|
||||
ExportConfigParser().parse({"source_tenant": {"mode": "single"}, "apps": {"all": True}})
|
||||
|
||||
|
||||
def test_export_config_parser_accepts_limited_backwards_draft_shape():
|
||||
selection = ExportConfigParser().parse(
|
||||
{
|
||||
"tenant_name": "legacy-source",
|
||||
"workflows": ["app-1"],
|
||||
"tools": ["weather"],
|
||||
"workflow_tools": ["wf-tool-1"],
|
||||
"mcp_tools": ["mcp-1"],
|
||||
"export_all_workflows": False,
|
||||
}
|
||||
)
|
||||
|
||||
assert selection.source_tenant_name == "legacy-source"
|
||||
assert selection.app_ids == ["app-1"]
|
||||
assert selection.additional_api_tools == ["weather"]
|
||||
assert selection.additional_workflow_tools == ["wf-tool-1"]
|
||||
assert selection.additional_mcp_tools == ["mcp-1"]
|
||||
assert selection.include_secrets is False
|
||||
|
||||
|
||||
def test_export_config_parser_rejects_unsupported_app_modes():
|
||||
with pytest.raises(MigrationDataError, match="Unsupported app modes"):
|
||||
ExportConfigParser().parse(
|
||||
{
|
||||
"source_tenant": {"name": "source"},
|
||||
"apps": {"modes": ["chat"], "all": True},
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def test_secret_free_api_tool_export_uses_masking_and_omits_credentials(monkeypatch):
|
||||
calls = []
|
||||
|
||||
def fake_get_api_provider(provider: str, tenant_id: str, mask: bool = True):
|
||||
calls.append((provider, tenant_id, mask))
|
||||
return {"credentials": {"api_key": "masked"}, "schema": {"openapi": "3.0.0"}, "tools": ["unused"]}
|
||||
|
||||
monkeypatch.setattr(
|
||||
"services.data_migration.export_service.ToolManager.user_get_api_provider",
|
||||
fake_get_api_provider,
|
||||
)
|
||||
service = MigrationExportService()
|
||||
tools: list[dict] = []
|
||||
report_items = []
|
||||
|
||||
service._export_api_tools(
|
||||
"tenant-1",
|
||||
["weather"],
|
||||
include_secrets=False,
|
||||
exported_tools=tools,
|
||||
report_items=report_items,
|
||||
)
|
||||
|
||||
assert calls == [("weather", "tenant-1", True)]
|
||||
assert tools == [{"schema": {"openapi": "3.0.0"}, "provider_name": "weather", "source_tenant_id": "tenant-1"}]
|
||||
assert report_items[0].resource_type == ResourceType.API_TOOL
|
||||
|
||||
|
||||
def test_secret_free_mcp_dependencies_are_dependency_only():
|
||||
service = MigrationExportService()
|
||||
dependencies: list[dict] = []
|
||||
mcp_tools: list[dict] = []
|
||||
report_items = []
|
||||
|
||||
service._export_mcp_tools(
|
||||
tenant_id="tenant-1",
|
||||
provider_ids=["mcp-1"],
|
||||
include_secrets=False,
|
||||
exported_mcp_tools=mcp_tools,
|
||||
dependencies=dependencies,
|
||||
report_items=report_items,
|
||||
)
|
||||
|
||||
assert mcp_tools == []
|
||||
assert dependencies == [
|
||||
{
|
||||
"kind": DependencyKind.MCP_TOOL.value,
|
||||
"provider_id": "mcp-1",
|
||||
"provider_name": None,
|
||||
"source": "mcp_provider",
|
||||
}
|
||||
]
|
||||
assert report_items[0].status == "dependency-only"
|
||||
assert report_items[0].name == "mcp_tool mcp-1"
|
||||
|
||||
|
||||
def test_get_mcp_provider_does_not_compare_non_uuid_identifier_to_uuid_id(monkeypatch):
|
||||
statements = []
|
||||
|
||||
def capture_scalar(statement):
|
||||
statements.append(str(statement))
|
||||
|
||||
monkeypatch.setattr("services.data_migration.export_service.db.session.scalar", capture_scalar)
|
||||
|
||||
with pytest.raises(MigrationDataError, match="MCP provider not found"):
|
||||
MigrationExportService()._get_mcp_provider("tenant-1", "my-test-mcp")
|
||||
|
||||
assert len(statements) == 1
|
||||
assert "tool_mcp_providers.id =" not in statements[0]
|
||||
assert "tool_mcp_providers.server_identifier =" in statements[0]
|
||||
|
||||
|
||||
def test_dependency_ids_are_deduplicated_with_manual_selection_first():
|
||||
service = MigrationExportService()
|
||||
provider_ids = service._provider_ids(
|
||||
manual_provider_ids=["weather", "weather", "manual"],
|
||||
discovered_dependencies=[
|
||||
DiscoveredDependency(DependencyKind.API_TOOL, "weather"),
|
||||
DiscoveredDependency(DependencyKind.API_TOOL, "forecast"),
|
||||
DiscoveredDependency(DependencyKind.WORKFLOW_TOOL, "workflow-tool"),
|
||||
],
|
||||
kind=DependencyKind.API_TOOL,
|
||||
)
|
||||
|
||||
assert provider_ids == ["weather", "manual", "forecast"]
|
||||
|
||||
|
||||
def test_api_provider_ids_use_provider_name_from_discovered_dependencies():
|
||||
service = MigrationExportService()
|
||||
provider_ids = service._provider_ids(
|
||||
manual_provider_ids=[],
|
||||
discovered_dependencies=[
|
||||
DiscoveredDependency(DependencyKind.API_TOOL, "api-provider-id", provider_name="weather"),
|
||||
],
|
||||
kind=DependencyKind.API_TOOL,
|
||||
)
|
||||
|
||||
assert provider_ids == ["weather"]
|
||||
|
||||
|
||||
def test_mcp_authentication_export_omits_runtime_header_shape():
|
||||
service = MigrationExportService()
|
||||
|
||||
assert service._serialize_mcp_authentication({"Authorization": "Bearer token"}) is None
|
||||
assert service._serialize_mcp_authentication({"client_id": "id", "client_secret": "secret"}) == {
|
||||
"client_id": "id",
|
||||
"client_secret": "secret",
|
||||
}
|
||||
@ -0,0 +1,973 @@
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
from models.tools import MCPToolProvider, WorkflowToolProvider
|
||||
from services.app_dsl_service import Import
|
||||
from services.data_migration.entities import (
|
||||
ConflictStrategy,
|
||||
IdStrategy,
|
||||
ImportOptions,
|
||||
ImportTarget,
|
||||
MigrationDataError,
|
||||
MigrationPackage,
|
||||
ResourceIdMapping,
|
||||
ResourceReportItem,
|
||||
ResourceType,
|
||||
)
|
||||
from services.data_migration.import_service import ImportRequest, ImportTargetResolver, MigrationImportService
|
||||
from services.entities.dsl_entities import ImportStatus
|
||||
|
||||
|
||||
def test_target_tenant_precedence_cli_then_config_then_package():
|
||||
package = MigrationPackage.from_mapping(
|
||||
{
|
||||
"metadata": {
|
||||
"version": "1",
|
||||
"source_scope": "single",
|
||||
"source_tenants": [],
|
||||
"target_tenant": {"name": "from-package"},
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
resolver = ImportTargetResolver()
|
||||
|
||||
assert (
|
||||
resolver.select_target_tenant_name(
|
||||
ImportRequest(package=package, cli_target_tenant="from-cli", config_target_tenant="from-config")
|
||||
)
|
||||
== "from-cli"
|
||||
)
|
||||
assert (
|
||||
resolver.select_target_tenant_name(
|
||||
ImportRequest(package=package, cli_target_tenant=None, config_target_tenant="from-config")
|
||||
)
|
||||
== "from-config"
|
||||
)
|
||||
assert (
|
||||
resolver.select_target_tenant_name(
|
||||
ImportRequest(package=package, cli_target_tenant=None, config_target_tenant=None)
|
||||
)
|
||||
== "from-package"
|
||||
)
|
||||
|
||||
|
||||
def test_target_tenant_missing_fails_before_import():
|
||||
package = MigrationPackage.from_mapping({"metadata": {"version": "1", "source_scope": "single"}})
|
||||
|
||||
with pytest.raises(MigrationDataError, match="Target tenant"):
|
||||
ImportTargetResolver().select_target_tenant_name(
|
||||
ImportRequest(package=package, cli_target_tenant=None, config_target_tenant=None)
|
||||
)
|
||||
|
||||
|
||||
def test_package_target_tenant_id_can_be_used_without_name():
|
||||
package = MigrationPackage.from_mapping(
|
||||
{"metadata": {"version": "1", "source_scope": "single", "target_tenant": {"id": "tenant-id"}}}
|
||||
)
|
||||
|
||||
assert ImportTargetResolver().select_target_tenant_name(ImportRequest(package=package)) == "tenant-id"
|
||||
|
||||
|
||||
def test_target_tenant_name_is_not_treated_as_uuid():
|
||||
resolver = ImportTargetResolver()
|
||||
|
||||
assert resolver._is_uuid("admin's Workspace") is False
|
||||
assert resolver._is_uuid("49a99e46-bc2c-4885-91fa-47615f6192b5") is True
|
||||
|
||||
|
||||
def test_package_target_tenant_id_ignores_invalid_uuid(monkeypatch):
|
||||
package = MigrationPackage.from_mapping(
|
||||
{"metadata": {"version": "1", "source_scope": "single", "target_tenant": {"id": "not-a-uuid"}}}
|
||||
)
|
||||
|
||||
class StubSession:
|
||||
def get(self, model, identifier):
|
||||
raise AssertionError("invalid UUID should not be passed to session.get")
|
||||
|
||||
def scalars(self, statement):
|
||||
class EmptyResult:
|
||||
def all(self):
|
||||
return []
|
||||
|
||||
return EmptyResult()
|
||||
|
||||
from services.data_migration import import_service
|
||||
|
||||
monkeypatch.setattr(import_service.db, "session", StubSession())
|
||||
|
||||
with pytest.raises(MigrationDataError, match="Target tenant not found"):
|
||||
ImportTargetResolver().resolve(ImportRequest(package=package))
|
||||
|
||||
|
||||
def test_options_override_replaces_package_defaults():
|
||||
package = MigrationPackage.from_mapping(
|
||||
{
|
||||
"metadata": {
|
||||
"version": "1",
|
||||
"source_scope": "single",
|
||||
"target_tenant": {"name": "target"},
|
||||
"import_options": {
|
||||
"create_app_api_token_on_import": True,
|
||||
"conflict_strategy": "update",
|
||||
},
|
||||
}
|
||||
}
|
||||
)
|
||||
captured_options: list[ImportOptions] = []
|
||||
|
||||
class StubResolver(ImportTargetResolver):
|
||||
def resolve(self, request: ImportRequest) -> ImportTarget:
|
||||
return ImportTarget(
|
||||
tenant_id="tenant-1",
|
||||
tenant_name="target",
|
||||
operator_id="account-1",
|
||||
operator_email="owner@example.com",
|
||||
)
|
||||
|
||||
class CapturingImportService(MigrationImportService):
|
||||
def _import_workflows(
|
||||
self,
|
||||
package: MigrationPackage,
|
||||
target: ImportTarget,
|
||||
options: ImportOptions,
|
||||
report_items: list[ResourceReportItem],
|
||||
id_mapping: dict[str, str],
|
||||
**kwargs,
|
||||
) -> None:
|
||||
captured_options.append(options)
|
||||
|
||||
override = ImportOptions(create_app_api_token_on_import=False, conflict_strategy=ConflictStrategy.SKIP)
|
||||
|
||||
CapturingImportService(target_resolver=StubResolver()).import_package(
|
||||
ImportRequest(package=package, options_override=override)
|
||||
)
|
||||
|
||||
assert captured_options == [override]
|
||||
|
||||
|
||||
def test_only_preserve_id_strategy_reuses_source_app_id():
|
||||
service = MigrationImportService()
|
||||
|
||||
assert service._should_preserve_source_app_id(ImportOptions(id_strategy=IdStrategy.PRESERVE_ID)) is True
|
||||
assert service._should_preserve_source_app_id(ImportOptions(id_strategy=IdStrategy.GENERATE_NEW_ID)) is False
|
||||
|
||||
|
||||
def test_find_existing_app_ignores_invalid_uuid(monkeypatch):
|
||||
class StubSession:
|
||||
def scalar(self, statement):
|
||||
raise AssertionError("invalid UUID should not be queried against App.id")
|
||||
|
||||
from services.data_migration import import_service
|
||||
|
||||
monkeypatch.setattr(import_service.db, "session", StubSession())
|
||||
|
||||
assert MigrationImportService()._find_existing_app("not-a-uuid", "tenant-1") is None
|
||||
|
||||
|
||||
def test_find_existing_workflow_tool_does_not_compare_invalid_uuid(monkeypatch):
|
||||
captured = []
|
||||
|
||||
class StubSession:
|
||||
def scalar(self, statement):
|
||||
captured.append(statement)
|
||||
|
||||
from services.data_migration import import_service
|
||||
|
||||
monkeypatch.setattr(import_service.db, "session", StubSession())
|
||||
|
||||
MigrationImportService()._find_existing_workflow_tool("tenant-1", "not-a-uuid", "tool-name", "app-id")
|
||||
|
||||
where_clause = str(captured[0].whereclause)
|
||||
assert f"{WorkflowToolProvider.__tablename__}.id" not in where_clause
|
||||
|
||||
|
||||
def test_find_existing_mcp_tool_does_not_compare_invalid_uuid(monkeypatch):
|
||||
captured = []
|
||||
|
||||
class StubSession:
|
||||
def scalar(self, statement):
|
||||
captured.append(statement)
|
||||
|
||||
from services.data_migration import import_service
|
||||
|
||||
monkeypatch.setattr(import_service.db, "session", StubSession())
|
||||
|
||||
MigrationImportService()._find_existing_mcp_tool("tenant-1", "my-test-mcp", "my-test-mcp")
|
||||
|
||||
where_clause = str(captured[0].whereclause)
|
||||
assert f"{MCPToolProvider.__tablename__}.id" not in where_clause
|
||||
assert f"{MCPToolProvider.__tablename__}.name" not in where_clause
|
||||
|
||||
|
||||
def test_workflow_app_import_does_not_wrap_app_dsl_import_in_nested_transaction(monkeypatch):
|
||||
class FailingNestedTransaction:
|
||||
def __enter__(self):
|
||||
raise AssertionError("nested transaction should not be opened")
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
return False
|
||||
|
||||
class StubSession:
|
||||
def begin_nested(self):
|
||||
return FailingNestedTransaction()
|
||||
|
||||
def commit(self):
|
||||
return None
|
||||
|
||||
class StubAppDslService:
|
||||
def __init__(self, session):
|
||||
self.session = session
|
||||
|
||||
def import_app(self, **kwargs):
|
||||
return Import(id="import-id", status=ImportStatus.COMPLETED, app_id="imported-app-id")
|
||||
|
||||
from services.data_migration import import_service
|
||||
|
||||
monkeypatch.setattr(import_service.db, "session", StubSession())
|
||||
monkeypatch.setattr(import_service, "AppDslService", StubAppDslService)
|
||||
|
||||
imported_app_id = MigrationImportService()._import_workflow_app(
|
||||
account=object(),
|
||||
workflow_data={"name": "main_chatflow"},
|
||||
dsl_content="app:\n mode: workflow\n",
|
||||
app_id="source-app-id",
|
||||
existing_app=None,
|
||||
options=ImportOptions(id_strategy=IdStrategy.PRESERVE_ID),
|
||||
)
|
||||
|
||||
assert imported_app_id == "imported-app-id"
|
||||
|
||||
|
||||
def test_rewrite_workflow_dsl_replaces_tool_provider_ids():
|
||||
dsl_content = yaml.safe_dump(
|
||||
{
|
||||
"app": {"mode": "workflow"},
|
||||
"workflow": {
|
||||
"graph": {
|
||||
"nodes": [
|
||||
{
|
||||
"data": {
|
||||
"type": "tool",
|
||||
"provider_id": "source-api-provider-id",
|
||||
"provider_name": "weather",
|
||||
}
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"type": "agent",
|
||||
"agent_parameters": {
|
||||
"tools": {
|
||||
"value": [
|
||||
{
|
||||
"provider_id": "source-agent-provider-id",
|
||||
"provider_name": "agent_weather",
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
]
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
rewritten = MigrationImportService()._rewrite_workflow_dsl_provider_ids(
|
||||
dsl_content,
|
||||
{
|
||||
"source-api-provider-id": "target-api-provider-id",
|
||||
"source-agent-provider-id": "target-agent-provider-id",
|
||||
},
|
||||
)
|
||||
graph = yaml.safe_load(rewritten)["workflow"]["graph"]
|
||||
|
||||
assert graph["nodes"][0]["data"]["provider_id"] == "target-api-provider-id"
|
||||
assert (
|
||||
graph["nodes"][1]["data"]["agent_parameters"]["tools"]["value"][0]["provider_id"] == "target-agent-provider-id"
|
||||
)
|
||||
|
||||
|
||||
def test_source_api_provider_ids_are_discovered_from_workflow_dsl():
|
||||
package = MigrationPackage.from_mapping(
|
||||
{
|
||||
"metadata": {"version": "1", "source_scope": "single"},
|
||||
"workflows": [
|
||||
{
|
||||
"dsl": yaml.safe_dump(
|
||||
{
|
||||
"workflow": {
|
||||
"graph": {
|
||||
"nodes": [
|
||||
{
|
||||
"data": {
|
||||
"type": "tool",
|
||||
"provider_id": "source-api-provider-id",
|
||||
"provider_name": "weather",
|
||||
"provider_type": "api",
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
assert MigrationImportService()._source_api_provider_ids_by_name(package) == {"weather": {"source-api-provider-id"}}
|
||||
|
||||
|
||||
def test_workflow_tool_import_publishes_referenced_app_before_create(monkeypatch):
|
||||
events = []
|
||||
account = type("Account", (), {"id": "account-1"})()
|
||||
|
||||
class StubSession:
|
||||
def get(self, model, identifier):
|
||||
return account
|
||||
|
||||
class PublishingImportService(MigrationImportService):
|
||||
def _find_existing_app(self, app_id, tenant_id):
|
||||
return object()
|
||||
|
||||
def _find_existing_workflow_tool(self, tenant_id, workflow_tool_id, tool_name, app_id):
|
||||
if ("created", app_id) in events:
|
||||
return type("WorkflowToolProvider", (), {"id": workflow_tool_id or "created-workflow-tool-id"})()
|
||||
return None
|
||||
|
||||
def _ensure_workflow_app_is_published(self, target, account, app_id):
|
||||
events.append(("published", app_id))
|
||||
|
||||
from services.data_migration import import_service
|
||||
|
||||
monkeypatch.setattr(import_service.db, "session", StubSession())
|
||||
monkeypatch.setattr(
|
||||
import_service.WorkflowToolManageService,
|
||||
"create_workflow_tool",
|
||||
lambda **kwargs: events.append(("created", kwargs["workflow_app_id"])),
|
||||
)
|
||||
|
||||
PublishingImportService()._import_workflow_tools(
|
||||
MigrationPackage.from_mapping(
|
||||
{
|
||||
"metadata": {"version": "1", "source_scope": "single"},
|
||||
"workflow_tools": [
|
||||
{
|
||||
"id": "workflow-tool-1",
|
||||
"name": "embedded_workflow_as_tool",
|
||||
"app_id": "workflow-app-1",
|
||||
}
|
||||
],
|
||||
}
|
||||
),
|
||||
ImportTarget(
|
||||
tenant_id="tenant-1",
|
||||
tenant_name="target",
|
||||
operator_id="account-1",
|
||||
operator_email="owner@example.com",
|
||||
),
|
||||
ImportOptions(),
|
||||
{},
|
||||
[],
|
||||
[],
|
||||
)
|
||||
|
||||
assert events == [("published", "workflow-app-1"), ("created", "workflow-app-1")]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("id_strategy", "expected_import_id"),
|
||||
[
|
||||
(IdStrategy.PRESERVE_ID, "source-workflow-tool-id"),
|
||||
(IdStrategy.GENERATE_NEW_ID, ""),
|
||||
],
|
||||
)
|
||||
def test_workflow_tool_import_id_follows_id_strategy(monkeypatch, id_strategy, expected_import_id):
|
||||
created_kwargs = []
|
||||
target_provider = type("WorkflowToolProvider", (), {"id": "target-workflow-tool-id"})()
|
||||
account = type("Account", (), {"id": "account-1"})()
|
||||
id_mapping = {"source-app-id": "target-app-id"}
|
||||
id_mapping_details = []
|
||||
|
||||
class StubSession:
|
||||
def get(self, model, identifier):
|
||||
return account
|
||||
|
||||
class StrategyImportService(MigrationImportService):
|
||||
def _find_existing_app(self, app_id, tenant_id):
|
||||
return object()
|
||||
|
||||
def _find_existing_workflow_tool(self, tenant_id, workflow_tool_id, tool_name, app_id):
|
||||
return target_provider if created_kwargs else None
|
||||
|
||||
def _ensure_workflow_app_is_published(self, target, account, app_id):
|
||||
return None
|
||||
|
||||
from services.data_migration import import_service
|
||||
|
||||
monkeypatch.setattr(import_service.db, "session", StubSession())
|
||||
monkeypatch.setattr(
|
||||
import_service.WorkflowToolManageService,
|
||||
"create_workflow_tool",
|
||||
lambda **kwargs: created_kwargs.append(kwargs),
|
||||
)
|
||||
|
||||
StrategyImportService()._import_workflow_tools(
|
||||
MigrationPackage.from_mapping(
|
||||
{
|
||||
"metadata": {"version": "1", "source_scope": "single"},
|
||||
"workflow_tools": [
|
||||
{
|
||||
"id": "source-workflow-tool-id",
|
||||
"name": "embedded_workflow_as_tool",
|
||||
"app_id": "source-app-id",
|
||||
}
|
||||
],
|
||||
}
|
||||
),
|
||||
ImportTarget(
|
||||
tenant_id="tenant-1",
|
||||
tenant_name="target",
|
||||
operator_id="account-1",
|
||||
operator_email="owner@example.com",
|
||||
),
|
||||
ImportOptions(id_strategy=id_strategy),
|
||||
id_mapping,
|
||||
id_mapping_details,
|
||||
[],
|
||||
)
|
||||
|
||||
assert created_kwargs[0]["import_id"] == expected_import_id
|
||||
assert id_mapping["source-workflow-tool-id"] == "target-workflow-tool-id"
|
||||
assert id_mapping_details == [
|
||||
ResourceIdMapping(
|
||||
ResourceType.WORKFLOW_TOOL,
|
||||
"embedded_workflow_as_tool",
|
||||
"source-workflow-tool-id",
|
||||
"target-workflow-tool-id",
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def test_workflow_tool_skip_records_id_mapping(monkeypatch):
|
||||
account = type("Account", (), {"id": "account-1"})()
|
||||
existing_provider = type("WorkflowToolProvider", (), {"id": "existing-workflow-tool-id"})()
|
||||
id_mapping = {"source-app-id": "target-app-id"}
|
||||
|
||||
class StubSession:
|
||||
def get(self, model, identifier):
|
||||
return account
|
||||
|
||||
class SkipImportService(MigrationImportService):
|
||||
def _find_existing_app(self, app_id, tenant_id):
|
||||
return object()
|
||||
|
||||
def _find_existing_workflow_tool(self, tenant_id, workflow_tool_id, tool_name, app_id):
|
||||
return existing_provider
|
||||
|
||||
def _ensure_workflow_app_is_published(self, target, account, app_id):
|
||||
return None
|
||||
|
||||
from services.data_migration import import_service
|
||||
|
||||
monkeypatch.setattr(import_service.db, "session", StubSession())
|
||||
|
||||
SkipImportService()._import_workflow_tools(
|
||||
MigrationPackage.from_mapping(
|
||||
{
|
||||
"metadata": {"version": "1", "source_scope": "single"},
|
||||
"workflow_tools": [
|
||||
{
|
||||
"id": "source-workflow-tool-id",
|
||||
"name": "embedded_workflow_as_tool",
|
||||
"app_id": "source-app-id",
|
||||
}
|
||||
],
|
||||
}
|
||||
),
|
||||
ImportTarget(
|
||||
tenant_id="tenant-1",
|
||||
tenant_name="target",
|
||||
operator_id="account-1",
|
||||
operator_email="owner@example.com",
|
||||
),
|
||||
ImportOptions(conflict_strategy=ConflictStrategy.SKIP, id_strategy=IdStrategy.GENERATE_NEW_ID),
|
||||
id_mapping,
|
||||
[],
|
||||
[],
|
||||
)
|
||||
|
||||
assert id_mapping["source-workflow-tool-id"] == "existing-workflow-tool-id"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("conflict_strategy", [ConflictStrategy.SKIP, ConflictStrategy.UPDATE])
|
||||
def test_api_tool_existing_provider_records_id_mapping(monkeypatch, conflict_strategy):
|
||||
target_provider = type("ApiToolProvider", (), {"id": "target-api-provider-id", "name": "weather"})()
|
||||
id_mapping = {}
|
||||
id_mapping_details = []
|
||||
report_items = []
|
||||
|
||||
class ExistingApiImportService(MigrationImportService):
|
||||
def _find_api_tool_provider(self, tenant_id, provider_name):
|
||||
return target_provider
|
||||
|
||||
from services.data_migration import import_service
|
||||
|
||||
monkeypatch.setattr(import_service.db.session, "scalar", lambda statement: target_provider)
|
||||
monkeypatch.setattr(
|
||||
import_service.ApiToolManageService, "parser_api_schema", lambda schema: {"schema_type": "openapi"}
|
||||
)
|
||||
monkeypatch.setattr(import_service.ApiToolManageService, "update_api_tool_provider", lambda **kwargs: None)
|
||||
|
||||
ExistingApiImportService()._import_api_tools(
|
||||
MigrationPackage.from_mapping(
|
||||
{
|
||||
"metadata": {"version": "1", "source_scope": "single"},
|
||||
"tools": [{"id": "source-api-provider-id", "provider_name": "weather", "schema": "openapi: 3.0.0"}],
|
||||
}
|
||||
),
|
||||
ImportTarget(
|
||||
tenant_id="tenant-1",
|
||||
tenant_name="target",
|
||||
operator_id="account-1",
|
||||
operator_email="owner@example.com",
|
||||
),
|
||||
ImportOptions(conflict_strategy=conflict_strategy),
|
||||
report_items,
|
||||
id_mapping,
|
||||
id_mapping_details,
|
||||
{"weather": {"source-api-provider-id-from-dsl"}},
|
||||
)
|
||||
|
||||
assert id_mapping == {
|
||||
"source-api-provider-id": "target-api-provider-id",
|
||||
"source-api-provider-id-from-dsl": "target-api-provider-id",
|
||||
}
|
||||
assert (
|
||||
ResourceIdMapping(ResourceType.API_TOOL, "weather", "source-api-provider-id", "target-api-provider-id")
|
||||
in id_mapping_details
|
||||
)
|
||||
|
||||
|
||||
def test_api_tool_create_records_id_mapping(monkeypatch):
|
||||
target_provider = type("ApiToolProvider", (), {"id": "target-api-provider-id", "name": "weather"})()
|
||||
id_mapping = {}
|
||||
|
||||
class StubSession:
|
||||
def scalar(self, statement):
|
||||
return None
|
||||
|
||||
class CreatedApiImportService(MigrationImportService):
|
||||
def _find_api_tool_provider(self, tenant_id, provider_name):
|
||||
return target_provider
|
||||
|
||||
from services.data_migration import import_service
|
||||
|
||||
monkeypatch.setattr(import_service.db, "session", StubSession())
|
||||
monkeypatch.setattr(
|
||||
import_service.ApiToolManageService, "parser_api_schema", lambda schema: {"schema_type": "openapi"}
|
||||
)
|
||||
monkeypatch.setattr(import_service.ApiToolManageService, "create_api_tool_provider", lambda **kwargs: None)
|
||||
|
||||
CreatedApiImportService()._import_api_tools(
|
||||
MigrationPackage.from_mapping(
|
||||
{
|
||||
"metadata": {"version": "1", "source_scope": "single"},
|
||||
"tools": [{"id": "source-api-provider-id", "provider_name": "weather", "schema": "openapi: 3.0.0"}],
|
||||
}
|
||||
),
|
||||
ImportTarget(
|
||||
tenant_id="tenant-1",
|
||||
tenant_name="target",
|
||||
operator_id="account-1",
|
||||
operator_email="owner@example.com",
|
||||
),
|
||||
ImportOptions(),
|
||||
[],
|
||||
id_mapping,
|
||||
[],
|
||||
{},
|
||||
)
|
||||
|
||||
assert id_mapping["source-api-provider-id"] == "target-api-provider-id"
|
||||
|
||||
|
||||
def test_mcp_tool_import_restores_exported_tool_list(monkeypatch):
|
||||
provider = type("Provider", (), {"id": "target-provider-id", "tools": "[]", "authed": False})()
|
||||
report_items = []
|
||||
|
||||
class StubSession:
|
||||
def scalar(self, statement):
|
||||
return provider
|
||||
|
||||
def commit(self):
|
||||
return None
|
||||
|
||||
class StubMCPToolManageService:
|
||||
def __init__(self, session):
|
||||
self.session = session
|
||||
|
||||
def update_provider(self, **kwargs):
|
||||
return None
|
||||
|
||||
from services.data_migration import import_service
|
||||
|
||||
monkeypatch.setattr(import_service.db, "session", StubSession())
|
||||
monkeypatch.setattr(import_service, "MCPToolManageService", StubMCPToolManageService)
|
||||
|
||||
MigrationImportService()._import_mcp_tools(
|
||||
MigrationPackage.from_mapping(
|
||||
{
|
||||
"metadata": {"version": "1", "source_scope": "single"},
|
||||
"mcp_tools": [
|
||||
{
|
||||
"id": "source-provider-id",
|
||||
"name": "my-test-mcp",
|
||||
"server_identifier": "my-test-mcp",
|
||||
"server_url": "http://localhost:3000/mcp",
|
||||
"configuration": {},
|
||||
"tools": [{"name": "echo"}],
|
||||
}
|
||||
],
|
||||
}
|
||||
),
|
||||
ImportTarget(
|
||||
tenant_id="tenant-1",
|
||||
tenant_name="target",
|
||||
operator_id="account-1",
|
||||
operator_email="owner@example.com",
|
||||
),
|
||||
ImportOptions(conflict_strategy=ConflictStrategy.UPDATE),
|
||||
report_items,
|
||||
{},
|
||||
[],
|
||||
)
|
||||
|
||||
assert provider.tools == '[{"name": "echo"}]'
|
||||
assert provider.authed is True
|
||||
|
||||
|
||||
@pytest.mark.parametrize("conflict_strategy", [ConflictStrategy.SKIP, ConflictStrategy.UPDATE])
|
||||
def test_mcp_tool_existing_provider_records_id_mapping(monkeypatch, conflict_strategy):
|
||||
provider = type("Provider", (), {"id": "target-mcp-provider-id", "tools": "[]", "authed": False})()
|
||||
id_mapping = {}
|
||||
id_mapping_details = []
|
||||
|
||||
class StubSession:
|
||||
def commit(self):
|
||||
return None
|
||||
|
||||
class ExistingMCPImportService(MigrationImportService):
|
||||
def _find_existing_mcp_tool(self, tenant_id, provider_id, server_identifier):
|
||||
return provider
|
||||
|
||||
class StubMCPToolManageService:
|
||||
def __init__(self, session):
|
||||
self.session = session
|
||||
|
||||
def update_provider(self, **kwargs):
|
||||
return None
|
||||
|
||||
from services.data_migration import import_service
|
||||
|
||||
monkeypatch.setattr(import_service.db, "session", StubSession())
|
||||
monkeypatch.setattr(import_service, "MCPToolManageService", StubMCPToolManageService)
|
||||
|
||||
ExistingMCPImportService()._import_mcp_tools(
|
||||
MigrationPackage.from_mapping(
|
||||
{
|
||||
"metadata": {"version": "1", "source_scope": "single"},
|
||||
"mcp_tools": [
|
||||
{
|
||||
"id": "source-mcp-provider-id",
|
||||
"name": "my-test-mcp",
|
||||
"server_identifier": "my-test-mcp",
|
||||
"server_url": "http://localhost:3000/mcp",
|
||||
"configuration": {},
|
||||
"tools": [{"name": "echo"}],
|
||||
}
|
||||
],
|
||||
}
|
||||
),
|
||||
ImportTarget(
|
||||
tenant_id="tenant-1",
|
||||
tenant_name="target",
|
||||
operator_id="account-1",
|
||||
operator_email="owner@example.com",
|
||||
),
|
||||
ImportOptions(conflict_strategy=conflict_strategy),
|
||||
[],
|
||||
id_mapping,
|
||||
id_mapping_details,
|
||||
)
|
||||
|
||||
assert id_mapping["source-mcp-provider-id"] == "target-mcp-provider-id"
|
||||
assert "my-test-mcp" not in id_mapping
|
||||
assert id_mapping_details == [
|
||||
ResourceIdMapping(ResourceType.MCP_TOOL, "my-test-mcp", "source-mcp-provider-id", "target-mcp-provider-id")
|
||||
]
|
||||
|
||||
|
||||
def test_mcp_tool_create_records_id_mapping(monkeypatch):
|
||||
provider = type("Provider", (), {"id": "target-mcp-provider-id", "tools": "[]", "authed": False})()
|
||||
id_mapping = {}
|
||||
provider_created = False
|
||||
|
||||
class StubSession:
|
||||
def commit(self):
|
||||
return None
|
||||
|
||||
class CreatedMCPImportService(MigrationImportService):
|
||||
def _find_existing_mcp_tool(self, tenant_id, provider_id, server_identifier):
|
||||
return provider if provider_created else None
|
||||
|
||||
class StubMCPToolManageService:
|
||||
def __init__(self, session):
|
||||
self.session = session
|
||||
|
||||
def create_provider(self, **kwargs):
|
||||
nonlocal provider_created
|
||||
provider_created = True
|
||||
|
||||
from services.data_migration import import_service
|
||||
|
||||
monkeypatch.setattr(import_service.db, "session", StubSession())
|
||||
monkeypatch.setattr(import_service, "MCPToolManageService", StubMCPToolManageService)
|
||||
|
||||
CreatedMCPImportService()._import_mcp_tools(
|
||||
MigrationPackage.from_mapping(
|
||||
{
|
||||
"metadata": {"version": "1", "source_scope": "single"},
|
||||
"mcp_tools": [
|
||||
{
|
||||
"id": "source-mcp-provider-id",
|
||||
"name": "my-test-mcp",
|
||||
"server_identifier": "my-test-mcp",
|
||||
"server_url": "http://localhost:3000/mcp",
|
||||
"configuration": {},
|
||||
}
|
||||
],
|
||||
}
|
||||
),
|
||||
ImportTarget(
|
||||
tenant_id="tenant-1",
|
||||
tenant_name="target",
|
||||
operator_id="account-1",
|
||||
operator_email="owner@example.com",
|
||||
),
|
||||
ImportOptions(),
|
||||
[],
|
||||
id_mapping,
|
||||
[],
|
||||
)
|
||||
|
||||
assert id_mapping["source-mcp-provider-id"] == "target-mcp-provider-id"
|
||||
|
||||
|
||||
def test_dependency_only_mcp_preflight_reports_missing_target_provider_with_workflow_context(monkeypatch):
|
||||
report_items = []
|
||||
package = MigrationPackage.from_mapping(
|
||||
{
|
||||
"metadata": {"version": "1", "source_scope": "single"},
|
||||
"dependencies": [
|
||||
{
|
||||
"kind": "mcp_tool",
|
||||
"provider_id": "my-test-mcp-server",
|
||||
"provider_name": "my-test-mcp",
|
||||
}
|
||||
],
|
||||
"workflows": [
|
||||
{
|
||||
"name": "workflow2",
|
||||
"dsl": yaml.safe_dump(
|
||||
{
|
||||
"workflow": {
|
||||
"graph": {
|
||||
"nodes": [
|
||||
{
|
||||
"id": "node-1",
|
||||
"data": {
|
||||
"type": "tool",
|
||||
"provider_type": "mcp",
|
||||
"provider_id": "my-test-mcp-server",
|
||||
"tool_name": "echo",
|
||||
},
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
),
|
||||
}
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
from services.data_migration import import_service
|
||||
|
||||
monkeypatch.setattr(import_service.db.session, "scalar", lambda statement: None)
|
||||
|
||||
MigrationImportService()._preflight_dependency_only_mcp(
|
||||
package,
|
||||
ImportTarget(
|
||||
tenant_id="tenant-1",
|
||||
tenant_name="target",
|
||||
operator_id="account-1",
|
||||
operator_email="owner@example.com",
|
||||
),
|
||||
report_items,
|
||||
)
|
||||
|
||||
assert report_items == [
|
||||
ResourceReportItem(
|
||||
ResourceType.DEPENDENCY,
|
||||
"my-test-mcp-server",
|
||||
"mcp_tool my-test-mcp",
|
||||
"skipped",
|
||||
"missing in target tenant; referenced by workflow2 / echo; "
|
||||
"configure it manually before running the workflow.",
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def test_dependency_only_mcp_lookup_does_not_compare_non_uuid_identifier_to_uuid_id(monkeypatch):
|
||||
captured = []
|
||||
|
||||
class StubSession:
|
||||
def scalar(self, statement):
|
||||
captured.append(statement)
|
||||
|
||||
from services.data_migration import import_service
|
||||
|
||||
monkeypatch.setattr(import_service.db, "session", StubSession())
|
||||
|
||||
MigrationImportService()._find_dependency_only_mcp_provider(
|
||||
"tenant-1",
|
||||
"my-test-mcp-server",
|
||||
"my-test-mcp",
|
||||
)
|
||||
|
||||
where_clause = str(captured[0].whereclause)
|
||||
assert f"{MCPToolProvider.__tablename__}.id" not in where_clause
|
||||
|
||||
|
||||
def test_dependency_only_mcp_preflight_reports_available_target_provider(monkeypatch):
|
||||
report_items = []
|
||||
package = MigrationPackage.from_mapping(
|
||||
{
|
||||
"metadata": {"version": "1", "source_scope": "single"},
|
||||
"dependencies": [{"kind": "mcp_tool", "provider_id": "my-test-mcp-server"}],
|
||||
}
|
||||
)
|
||||
provider = type(
|
||||
"Provider",
|
||||
(),
|
||||
{"id": "target-provider-id", "name": "my-test-mcp", "server_identifier": "my-test-mcp-server"},
|
||||
)()
|
||||
|
||||
from services.data_migration import import_service
|
||||
|
||||
monkeypatch.setattr(import_service.db.session, "scalar", lambda statement: provider)
|
||||
|
||||
MigrationImportService()._preflight_dependency_only_mcp(
|
||||
package,
|
||||
ImportTarget(
|
||||
tenant_id="tenant-1",
|
||||
tenant_name="target",
|
||||
operator_id="account-1",
|
||||
operator_email="owner@example.com",
|
||||
),
|
||||
report_items,
|
||||
)
|
||||
|
||||
assert report_items == [
|
||||
ResourceReportItem(
|
||||
ResourceType.DEPENDENCY,
|
||||
"my-test-mcp-server",
|
||||
"mcp_tool my-test-mcp",
|
||||
"available",
|
||||
"MCP provider exists in target tenant.",
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def test_import_package_imports_workflow_tool_provider_apps_before_consumers():
|
||||
events = []
|
||||
|
||||
class StubResolver(ImportTargetResolver):
|
||||
def resolve(self, request):
|
||||
return ImportTarget(
|
||||
tenant_id="tenant-1",
|
||||
tenant_name="target",
|
||||
operator_id="account-1",
|
||||
operator_email="owner@example.com",
|
||||
)
|
||||
|
||||
class OrderedImportService(MigrationImportService):
|
||||
def _import_api_tools(
|
||||
self,
|
||||
package,
|
||||
target,
|
||||
options,
|
||||
report_items,
|
||||
id_mapping,
|
||||
id_mapping_details,
|
||||
source_provider_ids_by_name,
|
||||
):
|
||||
events.append(("api_tools", "imported"))
|
||||
|
||||
def _import_workflows(
|
||||
self,
|
||||
package,
|
||||
target,
|
||||
options,
|
||||
report_items,
|
||||
id_mapping,
|
||||
id_mapping_details,
|
||||
*,
|
||||
imported_workflow_ids=None,
|
||||
only_app_ids=None,
|
||||
skip_app_ids=None,
|
||||
):
|
||||
only_app_ids = set(only_app_ids or [])
|
||||
skip_app_ids = set(skip_app_ids or [])
|
||||
for workflow_data in package.workflows:
|
||||
app_id = workflow_data["id"]
|
||||
if only_app_ids and app_id not in only_app_ids:
|
||||
continue
|
||||
if app_id in skip_app_ids:
|
||||
continue
|
||||
events.append(("workflow", app_id))
|
||||
id_mapping[app_id] = app_id
|
||||
if imported_workflow_ids is not None:
|
||||
imported_workflow_ids.add(app_id)
|
||||
|
||||
def _import_workflow_tools(self, package, target, options, id_mapping, id_mapping_details, report_items):
|
||||
events.append(("workflow_tool", package.workflow_tools[0]["id"]))
|
||||
|
||||
def _import_mcp_tools(self, package, target, options, report_items, id_mapping, id_mapping_details):
|
||||
events.append(("mcp_tools", "imported"))
|
||||
|
||||
package = MigrationPackage.from_mapping(
|
||||
{
|
||||
"metadata": {"version": "1", "source_scope": "single"},
|
||||
"workflows": [
|
||||
{"id": "provider-app", "name": "embedded", "dsl": "app:\n mode: workflow\n"},
|
||||
{"id": "consumer-app", "name": "main", "dsl": "app:\n mode: advanced-chat\n"},
|
||||
],
|
||||
"workflow_tools": [{"id": "workflow-tool", "name": "embedded_tool", "app_id": "provider-app"}],
|
||||
}
|
||||
)
|
||||
|
||||
OrderedImportService(target_resolver=StubResolver()).import_package(ImportRequest(package=package))
|
||||
|
||||
assert events == [
|
||||
("api_tools", "imported"),
|
||||
("mcp_tools", "imported"),
|
||||
("workflow", "provider-app"),
|
||||
("workflow_tool", "workflow-tool"),
|
||||
("workflow", "consumer-app"),
|
||||
]
|
||||
@ -0,0 +1,53 @@
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
from services.data_migration.entities import MigrationDataError
|
||||
from services.data_migration.package_service import MigrationPackageService
|
||||
|
||||
|
||||
def test_load_package_rejects_branch_draft_shape(tmp_path):
|
||||
package_file = tmp_path / "draft.json"
|
||||
package_file.write_text(json.dumps({"workflows": [], "tools": []}), encoding="utf-8")
|
||||
|
||||
with pytest.raises(MigrationDataError, match="metadata.version"):
|
||||
MigrationPackageService().load_package(package_file)
|
||||
|
||||
|
||||
def test_load_package_rejects_unsupported_version(tmp_path):
|
||||
package_file = tmp_path / "future.json"
|
||||
package_file.write_text(json.dumps({"metadata": {"version": "999", "source_scope": "single"}}), encoding="utf-8")
|
||||
|
||||
with pytest.raises(MigrationDataError, match="Unsupported migration package version"):
|
||||
MigrationPackageService().load_package(package_file)
|
||||
|
||||
|
||||
def test_save_package_writes_versioned_shape(tmp_path):
|
||||
output_file = tmp_path / "migration-data.json"
|
||||
package = MigrationPackageService().build_empty_package(
|
||||
source_tenant_id="tenant-1",
|
||||
source_tenant_name="source",
|
||||
include_secrets=False,
|
||||
)
|
||||
|
||||
MigrationPackageService().save_package(package, output_file, overwrite=False)
|
||||
|
||||
data = json.loads(output_file.read_text(encoding="utf-8"))
|
||||
assert data["metadata"]["version"] == "1"
|
||||
assert data["metadata"]["source_tenants"] == [{"id": "tenant-1", "name": "source"}]
|
||||
assert data["metadata"]["include_secrets"] is False
|
||||
assert data["workflows"] == []
|
||||
assert data["dependencies"] == []
|
||||
|
||||
|
||||
def test_save_package_without_overwrite_fails_when_file_exists(tmp_path):
|
||||
output_file = tmp_path / "migration-data.json"
|
||||
output_file.write_text("{}", encoding="utf-8")
|
||||
package = MigrationPackageService().build_empty_package(
|
||||
source_tenant_id="tenant-1",
|
||||
source_tenant_name="source",
|
||||
include_secrets=False,
|
||||
)
|
||||
|
||||
with pytest.raises(MigrationDataError, match="already exists"):
|
||||
MigrationPackageService().save_package(package, output_file, overwrite=False)
|
||||
@ -0,0 +1,111 @@
|
||||
from services.data_migration.entities import ReportContext, ResourceIdMapping, ResourceReportItem, ResourceType
|
||||
from services.data_migration.report_service import MigrationReportService
|
||||
|
||||
|
||||
def test_report_summarizes_by_resource_type_and_status():
|
||||
lines = MigrationReportService().render(
|
||||
[
|
||||
ResourceReportItem(ResourceType.WORKFLOW, "app-1", "App", "exported"),
|
||||
ResourceReportItem(ResourceType.API_TOOL, "weather", "Weather", "exported"),
|
||||
ResourceReportItem(ResourceType.DEPENDENCY, "mcp-1", "MCP", "dependency-only"),
|
||||
]
|
||||
)
|
||||
|
||||
assert "workflow exported: 1" in lines
|
||||
assert "api_tool exported: 1" in lines
|
||||
assert "dependency dependency-only: 1" in lines
|
||||
|
||||
|
||||
def test_report_includes_actionable_lines_for_skipped_and_unresolved_items():
|
||||
lines = MigrationReportService().render(
|
||||
[
|
||||
ResourceReportItem(ResourceType.WORKFLOW, "app-1", "App", "skipped", "App already exists"),
|
||||
ResourceReportItem(ResourceType.DEPENDENCY, "mcp-1", None, "unresolved", "MCP provider not found"),
|
||||
ResourceReportItem(ResourceType.API_TOOL, "weather", "Weather", "exported"),
|
||||
]
|
||||
)
|
||||
|
||||
assert "workflow skipped: 1" in lines
|
||||
assert "dependency unresolved: 1" in lines
|
||||
assert "workflow app-1: App already exists" in lines
|
||||
assert "dependency mcp-1: MCP provider not found" in lines
|
||||
|
||||
|
||||
def test_report_dependency_detail_uses_type_and_name_when_available():
|
||||
lines = MigrationReportService().render(
|
||||
[
|
||||
ResourceReportItem(
|
||||
ResourceType.DEPENDENCY,
|
||||
"785e52f1-06bf-483c-8dcf-712e59fd43b9",
|
||||
"workflow embedded_workflow",
|
||||
"dependency-only",
|
||||
"Dependency metadata only; ensure the resource exists in the target environment.",
|
||||
),
|
||||
ResourceReportItem(
|
||||
ResourceType.DEPENDENCY,
|
||||
"my-test-mcp",
|
||||
"mcp_tool my-test-mcp",
|
||||
"dependency-only",
|
||||
"Configure MCP provider manually in the target tenant unless exporting with secrets enabled.",
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
assert (
|
||||
"dependency workflow embedded_workflow: 785e52f1-06bf-483c-8dcf-712e59fd43b9: "
|
||||
"Dependency metadata only; ensure the resource exists in the target environment."
|
||||
) in lines
|
||||
assert (
|
||||
"dependency mcp_tool my-test-mcp: "
|
||||
"Configure MCP provider manually in the target tenant unless exporting with secrets enabled."
|
||||
) in lines
|
||||
|
||||
|
||||
def test_report_includes_dependency_only_detail_lines():
|
||||
lines = MigrationReportService().render(
|
||||
[
|
||||
ResourceReportItem(
|
||||
ResourceType.DEPENDENCY,
|
||||
"mcp-1",
|
||||
"MCP",
|
||||
"dependency-only",
|
||||
"Configure manually in target tenant.",
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
assert "dependency dependency-only: 1" in lines
|
||||
assert "dependency mcp-1: Configure manually in target tenant." in lines
|
||||
|
||||
|
||||
def test_report_includes_export_and_import_context():
|
||||
lines = MigrationReportService().render(
|
||||
[],
|
||||
context=ReportContext(
|
||||
output_path="migration-data.json",
|
||||
source_scope="single",
|
||||
selected_app_count=2,
|
||||
include_secrets=False,
|
||||
target_tenant="prod",
|
||||
operator_email="admin@example.com",
|
||||
app_api_tokens_created=1,
|
||||
app_api_tokens_reused=2,
|
||||
id_mapping_count=3,
|
||||
id_mappings={"source-app": "target-app", "source-tool": "target-tool"},
|
||||
id_mapping_details=[
|
||||
ResourceIdMapping(ResourceType.WORKFLOW, "Main workflow", "source-app", "target-app"),
|
||||
ResourceIdMapping(ResourceType.API_TOOL, "weather", "source-tool", "target-tool"),
|
||||
],
|
||||
),
|
||||
)
|
||||
|
||||
assert "output: migration-data.json" in lines
|
||||
assert "source scope: single" in lines
|
||||
assert "selected apps: 2" in lines
|
||||
assert "include secrets: false" in lines
|
||||
assert "target tenant: prod" in lines
|
||||
assert "operator: admin@example.com" in lines
|
||||
assert "app api tokens: 1 created, 2 reused" in lines
|
||||
assert "resource references resolved: 2" in lines
|
||||
assert "- workflow Main workflow: source-app -> target-app" in lines
|
||||
assert "- api_tool weather: source-tool -> target-tool" in lines
|
||||
286
docs/cross-env-app-migration/README.md
Normal file
286
docs/cross-env-app-migration/README.md
Normal file
@ -0,0 +1,286 @@
|
||||
# Cross-Environment Data Migration
|
||||
|
||||
This guide explains how to move workflow apps, chatflow apps, and their custom tool dependencies from one Dify environment to another.
|
||||
|
||||
The recommended best-practice path is:
|
||||
|
||||
1. Run the export wizard in the source environment.
|
||||
2. Import the generated migration package in the target environment.
|
||||
|
||||
Use scripted export only when you need repeatable automation.
|
||||
|
||||
## What Gets Migrated
|
||||
|
||||
The migration package can contain:
|
||||
|
||||
- Workflow apps and advanced-chat apps.
|
||||
- Custom API tool providers.
|
||||
- Workflow tool providers.
|
||||
- MCP tool providers when secrets are explicitly included.
|
||||
- Dependency metadata for MCP tools, built-in tools, and plugin tools that must already exist or be configured in the target environment.
|
||||
|
||||
Only single source workspace exports are supported. The source and target workspace names do not need to match.
|
||||
|
||||
## Recommended Flow: Wizard Export + Import
|
||||
|
||||
Run the wizard from the source environment:
|
||||
|
||||
```bash
|
||||
cd api
|
||||
source .venv/bin/activate
|
||||
uv run flask app-migration-wizard
|
||||
```
|
||||
|
||||
The wizard asks you to select the source workspace, choose workflow/chatflow apps, discover referenced tools, set import behavior, and write the final migration package JSON.
|
||||
|
||||
Then copy the generated JSON package to the target environment and import it:
|
||||
|
||||
```bash
|
||||
cd api
|
||||
source .venv/bin/activate
|
||||
uv run flask import-app-migration \
|
||||
--input migration-data-20260528-120000.json \
|
||||
--target-tenant "production Workspace"
|
||||
```
|
||||
|
||||
This wizard + import combination is the recommended migration workflow because the wizard shows the available apps and tools, discovers app dependencies, summarizes the package before writing it, and uses the same export service as scripted export.
|
||||
|
||||
## Wizard Command
|
||||
|
||||
```bash
|
||||
cd api
|
||||
source .venv/bin/activate
|
||||
uv run flask app-migration-wizard
|
||||
```
|
||||
|
||||
The wizard has no CLI options. Its interactive inputs are:
|
||||
|
||||
- `Source tenant`: the source workspace to export from. Select one tenant by number.
|
||||
- `App selection`: apps to export. Supported app types are `workflow` and `advanced-chat`. Enter `all`, one number, or comma-separated numbers.
|
||||
- `Automatically export tools referenced by selected apps?`: recommended `yes`. This scans selected app graphs and automatically includes referenced custom API tools, workflow tools, and MCP tool references.
|
||||
- `Export additional tools manually?`: optional. Use this when you need to migrate tools that are not referenced by the selected apps.
|
||||
- `Include secrets in output JSON?`: default `no`. When `no`, workflow/app DSL secrets are omitted or masked, API tool credentials are omitted, and MCP providers are exported as dependency metadata only. When `yes`, treat the output JSON as sensitive.
|
||||
- `Create or reuse app API tokens during import?`: default `no`. When `yes`, import creates an app API token for imported apps that have none, or reuses an existing token.
|
||||
- `Import ID strategy`: `preserve-id` or `generate-new-id`. Default is `preserve-id`.
|
||||
- `Import conflict strategy`: `fail`, `skip`, or `update`. Wizard default is `update`.
|
||||
- `Output path`: output migration package path. The default is `migration-data-YYYYMMDD-HHMMSS.json`.
|
||||
- `Overwrite existing output`: shown only if the selected output file already exists.
|
||||
- `Write migration package?`: final confirmation after the wizard prints the summary.
|
||||
|
||||
## Import Command
|
||||
|
||||
```bash
|
||||
cd api
|
||||
source .venv/bin/activate
|
||||
uv run flask import-app-migration --input migration-package.json --target-tenant "target Workspace"
|
||||
```
|
||||
|
||||
Options:
|
||||
|
||||
- `--input`: required. Path to the migration package JSON generated by the wizard or scripted export.
|
||||
- `--target-tenant`: optional only when package metadata already contains a target tenant. Target workspace name or workspace UUID. This overrides package metadata and is recommended for reusable packages.
|
||||
- `--operator-email`: optional. Email of the target-tenant account used as the import operator. If omitted, import uses the earliest owner account in the target tenant.
|
||||
- `--id-strategy`: optional override for the package import option. Allowed values:
|
||||
- `preserve-id`: keep source app/tool IDs when supported. This is the recommended default for preserving workflow references across environments.
|
||||
- `generate-new-id`: let the target environment generate new IDs and rewrite references through the migration ID mapping.
|
||||
- `--conflict-strategy`: optional override for the package import option. Allowed values:
|
||||
- `fail`: stop at the first existing target resource conflict. Previously committed resources are not rolled back.
|
||||
- `skip`: keep the existing target resource and skip importing that resource.
|
||||
- `update`: update the existing target resource in place.
|
||||
- `--create-app-api-token-on-import`: optional override. Create or reuse app API tokens during import.
|
||||
- `--no-create-app-api-token-on-import`: optional override. Do not create app API tokens during import.
|
||||
|
||||
Import prints a report that includes the resolved target tenant, operator, created/updated/skipped resources, unresolved dependencies, app API token counts, and ID mappings used to rewrite references.
|
||||
|
||||
## Optional Flow: Scripted Export
|
||||
|
||||
Scripted export is supported for automation. The recommended scripted path is:
|
||||
|
||||
1. Generate an export config template.
|
||||
2. Edit the template.
|
||||
3. Run scripted export.
|
||||
4. Import the generated package.
|
||||
|
||||
Generate the template:
|
||||
|
||||
```bash
|
||||
cd api
|
||||
source .venv/bin/activate
|
||||
uv run flask app-migration-template --output export-config.json
|
||||
```
|
||||
|
||||
Edit `export-config.json`, then run:
|
||||
|
||||
```bash
|
||||
cd api
|
||||
source .venv/bin/activate
|
||||
uv run flask export-app-migration \
|
||||
--input export-config.json \
|
||||
--output migration-package.json
|
||||
```
|
||||
|
||||
Import it:
|
||||
|
||||
```bash
|
||||
cd api
|
||||
source .venv/bin/activate
|
||||
uv run flask import-app-migration \
|
||||
--input migration-package.json \
|
||||
--target-tenant "production Workspace"
|
||||
```
|
||||
|
||||
### Template Command
|
||||
|
||||
```bash
|
||||
cd api
|
||||
source .venv/bin/activate
|
||||
uv run flask app-migration-template [--output export-config.json] [--overwrite]
|
||||
```
|
||||
|
||||
Options:
|
||||
|
||||
- `--output`: optional. Path to write the scripted export config JSON template. If omitted, the template is printed to stdout.
|
||||
- `--overwrite`: optional. Allows replacing an existing output file. Without this option, the command fails if `--output` already exists.
|
||||
|
||||
### Scripted Export Command
|
||||
|
||||
```bash
|
||||
cd api
|
||||
source .venv/bin/activate
|
||||
uv run flask export-app-migration --input export-config.json --output migration-package.json
|
||||
```
|
||||
|
||||
Options:
|
||||
|
||||
- `--input`: required. Path to the export config JSON.
|
||||
- `--output`: required. Path to write the migration package JSON.
|
||||
- `--overwrite`: optional. Allows replacing an existing output package. Without this option, the command fails if `--output` already exists.
|
||||
|
||||
### Export Config Fields
|
||||
|
||||
The generated template uses this shape:
|
||||
|
||||
```json
|
||||
{
|
||||
"source_tenant": {
|
||||
"mode": "single",
|
||||
"id": "",
|
||||
"name": "admin's Workspace"
|
||||
},
|
||||
"apps": {
|
||||
"modes": ["workflow", "advanced-chat"],
|
||||
"ids": [],
|
||||
"all": true
|
||||
},
|
||||
"include_referenced_tools": true,
|
||||
"additional_tools": {
|
||||
"api_tools": [],
|
||||
"workflow_tools": [],
|
||||
"mcp_tools": []
|
||||
},
|
||||
"include_secrets": false,
|
||||
"import_options": {
|
||||
"create_app_api_token_on_import": false,
|
||||
"id_strategy": "preserve-id",
|
||||
"conflict_strategy": "fail"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Fields:
|
||||
|
||||
- `source_tenant.mode`: must be `single`.
|
||||
- `source_tenant.id`: optional source workspace UUID. Use this when workspace names are duplicated or when you want strict source selection.
|
||||
- `source_tenant.name`: required source workspace name. When `source_tenant.id` is set, the ID and name must match.
|
||||
- `apps.modes`: optional list of app modes for validation. Supported values are `workflow` and `advanced-chat`.
|
||||
- `apps.ids`: app IDs to export when `apps.all` is `false`.
|
||||
- `apps.all`: when `true`, export all supported apps in the selected source workspace. When `false`, export only `apps.ids`.
|
||||
- `include_referenced_tools`: recommended `true`. Automatically discovers tools referenced by selected workflow/chatflow apps.
|
||||
- `additional_tools.api_tools`: optional custom API tool provider names to export in addition to referenced tools.
|
||||
- `additional_tools.workflow_tools`: optional workflow tool provider IDs to export in addition to referenced tools.
|
||||
- `additional_tools.mcp_tools`: optional MCP provider IDs or server identifiers to export in addition to referenced tools.
|
||||
- `include_secrets`: default `false`. When `false`, credentials are omitted and MCP providers are recorded as dependency metadata. When `true`, custom API credentials, workflow DSL secrets, and full MCP connection data can be written to the package.
|
||||
- `import_options.create_app_api_token_on_import`: default `false`. Package-level default for import token creation.
|
||||
- `import_options.id_strategy`: package-level default ID strategy. Allowed values are `preserve-id` and `generate-new-id`.
|
||||
- `import_options.conflict_strategy`: package-level default conflict strategy. Allowed values are `fail`, `skip`, and `update`.
|
||||
|
||||
## Referenced Tools
|
||||
|
||||
Use automatic referenced-tool discovery whenever possible.
|
||||
|
||||
When enabled, Dify scans selected workflow/chatflow graphs and agent tool configs, then de-duplicates discovered custom API tools, workflow tools, and MCP tool references before export. This reduces the chance of importing an app whose workflow references missing providers.
|
||||
|
||||
Built-in and plugin tools are not serialized into the package. The migration report records them as dependency metadata; make sure the target environment has the required built-in or plugin tools installed and configured.
|
||||
|
||||
MCP providers are also dependency-only unless `include_secrets` is enabled. If MCP secrets are not exported, configure the corresponding MCP provider manually in the target workspace before running migrated workflows.
|
||||
|
||||
## Secret Handling
|
||||
|
||||
The safe default is `include_secrets: false`.
|
||||
|
||||
Only enable secret export when you have a controlled transfer path for the JSON package. With secrets enabled, the package may include API tool credentials, workflow/app DSL secret values, MCP server URLs, MCP headers, authentication data, and cached MCP tool lists.
|
||||
|
||||
## FAQ
|
||||
|
||||
### What happens when `include_secrets` is `false`?
|
||||
|
||||
This is the recommended default, but it means the migration package is intentionally incomplete for sensitive runtime configuration.
|
||||
|
||||
Important behavior:
|
||||
|
||||
- MCP tools are not exported as full tool providers. They are recorded as dependency metadata only.
|
||||
- Custom API tool credentials are not exported. The provider schema can be exported, but credentials must be configured again in the target workspace.
|
||||
- Workflow/app DSL secrets are omitted or masked. Any app variables, credentials, or secret-backed settings must be reviewed in the target workspace after import.
|
||||
- Built-in and plugin tools are never serialized as custom migration data. They are recorded as dependencies only.
|
||||
|
||||
How to handle it:
|
||||
|
||||
- Before importing, install or enable the required built-in/plugin tools in the target environment.
|
||||
- For MCP tools, manually create or configure the corresponding MCP provider in the target workspace before running migrated workflows.
|
||||
- For custom API tools, open the imported provider in the target workspace and re-enter credentials.
|
||||
- After import, review each migrated workflow/chatflow before production use. Pay special attention to tool nodes, agent tool configs, environment variables, app variables, and credential-backed settings.
|
||||
- Use the import report. Items marked `dependency-only`, `skipped`, or `unresolved` usually need manual follow-up.
|
||||
|
||||
If you need the package to carry full MCP provider configuration or credentials, rerun export with `include_secrets` set to `true` and transfer the generated JSON as sensitive data.
|
||||
|
||||
### What should I consider when `include_secrets` is `true`?
|
||||
|
||||
The package can include sensitive runtime configuration such as API tool credentials, workflow/app DSL secret values, MCP server URLs, headers, authentication data, and MCP tool lists.
|
||||
|
||||
How to handle it:
|
||||
|
||||
- Store and transfer the JSON package as a secret.
|
||||
- Delete the package after import if your security policy requires it.
|
||||
- Prefer using this only for controlled one-time migrations where manual target-side credential setup is harder than securing the package.
|
||||
|
||||
### Should I use `preserve-id` or `generate-new-id`?
|
||||
|
||||
The ID strategy controls whether imported resources keep source IDs or receive new IDs in the target environment. It applies to workflow apps and custom tool resources where the target service supports explicit IDs.
|
||||
|
||||
Use `preserve-id` by default. It is recommended for cross-environment app migration because imported apps and tools try to keep their source IDs, which makes workflow references easier to preserve.
|
||||
|
||||
Use `preserve-id` when:
|
||||
|
||||
- You are migrating between environments that are meant to mirror each other, such as staging to production.
|
||||
- You want workflow tool references and provider references to remain as stable as possible.
|
||||
- The target environment does not already contain unrelated resources with the same IDs.
|
||||
|
||||
Watch out for:
|
||||
|
||||
- If the target already has a resource with the same ID, `conflict_strategy=fail` stops the import, `skip` keeps the target resource, and `update` updates it in place.
|
||||
- If the same ID was reused for a different resource in the target environment, review carefully before using `update`.
|
||||
|
||||
Use `generate-new-id` when the target environment should keep its own local IDs.
|
||||
|
||||
Use `generate-new-id` when:
|
||||
|
||||
- The target environment already has resources that may conflict with source IDs.
|
||||
- You are importing a package as a copy rather than trying to mirror environments.
|
||||
- You want to avoid preserving source database IDs in the target environment.
|
||||
|
||||
Watch out for:
|
||||
|
||||
- Import records source-to-target ID mappings. Always review the import report's ID mappings.
|
||||
- Workflow DSL provider references are rewritten using the generated ID mapping where possible.
|
||||
- Dependencies that are metadata-only, such as MCP providers exported with `include_secrets=false`, still require manual target-side configuration.
|
||||
- Built-in/plugin tools are not remapped as migrated custom resources; the target environment must provide them separately.
|
||||
@ -1,5 +1,5 @@
|
||||
# base image
|
||||
FROM node:22-alpine AS base
|
||||
FROM node:22.22.1-alpine AS base
|
||||
LABEL maintainer="takatost@gmail.com"
|
||||
|
||||
# if you located in China, you can use aliyun mirror to speed up
|
||||
|
||||
@ -465,6 +465,14 @@ describe('Card', () => {
|
||||
|
||||
expect(screen.queryByTestId('partner-badge')).not.toBeInTheDocument()
|
||||
})
|
||||
|
||||
it('should handle null badges from the marketplace API', () => {
|
||||
const plugin = createMockPlugin({ badges: null })
|
||||
|
||||
render(<Card payload={plugin} />)
|
||||
|
||||
expect(screen.queryByTestId('partner-badge')).not.toBeInTheDocument()
|
||||
})
|
||||
})
|
||||
|
||||
// ================================
|
||||
|
||||
@ -53,7 +53,8 @@ const Card = ({
|
||||
const { t } = useTranslation()
|
||||
const { categoriesMap } = useCategories(true)
|
||||
const currentWorkspaceId = useSelector(s => s.currentWorkspace.id)
|
||||
const { category, type, name, org, label, brief, icon, icon_dark, verified, badges = [], from } = payload
|
||||
const { category, type, name, org, label, brief, icon, icon_dark, verified, from } = payload
|
||||
const badges = payload.badges ?? []
|
||||
const { theme } = useTheme()
|
||||
const iconSrc = getPluginCardIconUrl(
|
||||
{ from, name, org, type },
|
||||
|
||||
@ -190,7 +190,7 @@ export type PluginManifestInMarket = {
|
||||
introduction: string
|
||||
verified: boolean
|
||||
install_count: number
|
||||
badges: string[]
|
||||
badges: string[] | null
|
||||
verification: {
|
||||
authorized_category: 'langgenius' | 'partner' | 'community'
|
||||
}
|
||||
@ -255,7 +255,7 @@ export type Plugin = {
|
||||
settings: CredentialFormSchemaBase[]
|
||||
}
|
||||
tags: { name: string }[]
|
||||
badges: string[]
|
||||
badges: string[] | null
|
||||
verification: {
|
||||
authorized_category: 'langgenius' | 'partner' | 'community'
|
||||
}
|
||||
|
||||
@ -33,6 +33,7 @@ export default function DevicePage() {
|
||||
const pathname = usePathname()
|
||||
const urlUserCode = (searchParams.get('user_code') || '').trim().toUpperCase()
|
||||
const ssoVerified = searchParams.get('sso_verified') === '1'
|
||||
const ssoError = searchParams.get('sso_error') || ''
|
||||
|
||||
const [typed, setTyped] = useState('')
|
||||
const [view, setView] = useState<View>({ kind: 'code_entry' })
|
||||
@ -81,7 +82,11 @@ export default function DevicePage() {
|
||||
return
|
||||
}
|
||||
let consumed = false
|
||||
if (ssoVerified) {
|
||||
if (ssoError) {
|
||||
setErrMsg(ssoError) // eslint-disable-line react/set-state-in-effect
|
||||
consumed = true
|
||||
}
|
||||
else if (ssoVerified) {
|
||||
setView({ kind: 'authorize_sso' }) // eslint-disable-line react/set-state-in-effect
|
||||
consumed = true
|
||||
}
|
||||
@ -92,9 +97,9 @@ export default function DevicePage() {
|
||||
setView({ kind: 'chooser', userCode: urlUserCode }) // eslint-disable-line react/set-state-in-effect
|
||||
consumed = true
|
||||
}
|
||||
if (consumed && (urlUserCode || ssoVerified))
|
||||
if (consumed && (urlUserCode || ssoVerified || ssoError))
|
||||
router.replace(pathname)
|
||||
}, [urlUserCode, ssoVerified, account, view, router, pathname])
|
||||
}, [urlUserCode, ssoVerified, ssoError, account, view, router, pathname])
|
||||
|
||||
const onContinue = async () => {
|
||||
if (!isValidUserCode(typed))
|
||||
|
||||
Reference in New Issue
Block a user