mirror of
https://github.com/langgenius/dify.git
synced 2026-05-01 07:58:02 +08:00
feat(sandbox): draft storage
This commit is contained in:
@ -23,6 +23,7 @@ from core.rag.datasource.vdb.vector_factory import Vector
|
||||
from core.rag.datasource.vdb.vector_type import VectorType
|
||||
from core.rag.index_processor.constant.built_in_field import BuiltInField
|
||||
from core.rag.models.document import Document
|
||||
from core.sandbox.vm import SandboxBuilder, SandboxType
|
||||
from core.tools.utils.system_encryption import encrypt_system_params
|
||||
from events.app_event import app_was_created
|
||||
from extensions.ext_database import db
|
||||
@ -1508,7 +1509,6 @@ def setup_sandbox_system_config(provider_type: str, config: str):
|
||||
flask setup-sandbox-system-config --provider-type local --config '{}'
|
||||
"""
|
||||
from models.sandbox import SandboxProviderSystemConfig
|
||||
from services.sandbox.sandbox_provider_service import PROVIDER_CONFIG_MODELS
|
||||
|
||||
try:
|
||||
click.echo(click.style(f"Validating config: {config}", fg="yellow"))
|
||||
@ -1516,9 +1516,7 @@ def setup_sandbox_system_config(provider_type: str, config: str):
|
||||
click.echo(click.style("Config validated successfully.", fg="green"))
|
||||
|
||||
click.echo(click.style(f"Validating config schema for provider type: {provider_type}", fg="yellow"))
|
||||
model_class = PROVIDER_CONFIG_MODELS.get(provider_type)
|
||||
if model_class:
|
||||
model_class.model_validate(config_dict)
|
||||
SandboxBuilder.validate(SandboxType(provider_type), config_dict)
|
||||
click.echo(click.style("Config schema validated successfully.", fg="green"))
|
||||
|
||||
click.echo(click.style("Encrypting config...", fg="yellow"))
|
||||
|
||||
@ -30,7 +30,6 @@ from core.model_runtime.errors.invoke import InvokeAuthorizationError
|
||||
from core.ops.ops_trace_manager import TraceQueueManager
|
||||
from core.prompt.utils.get_thread_messages_length import get_thread_messages_length
|
||||
from core.repositories import DifyCoreRepositoryFactory
|
||||
from core.sandbox.storage.archive_storage import ArchiveSandboxStorage
|
||||
from core.workflow.repositories.draft_variable_repository import (
|
||||
DraftVariableSaverFactory,
|
||||
)
|
||||
@ -525,11 +524,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
|
||||
app_id=application_generate_entity.app_config.app_id,
|
||||
user_id=application_generate_entity.user_id,
|
||||
workflow_version=workflow.version,
|
||||
sandbox_id=application_generate_entity.workflow_run_id,
|
||||
sandbox_storage=ArchiveSandboxStorage(
|
||||
tenant_id=application_generate_entity.app_config.tenant_id,
|
||||
sandbox_id=application_generate_entity.workflow_run_id,
|
||||
),
|
||||
workflow_execution_id=application_generate_entity.workflow_run_id,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@ -29,7 +29,6 @@ from core.helper.trace_id_helper import extract_external_trace_id_from_args
|
||||
from core.model_runtime.errors.invoke import InvokeAuthorizationError
|
||||
from core.ops.ops_trace_manager import TraceQueueManager
|
||||
from core.repositories import DifyCoreRepositoryFactory
|
||||
from core.sandbox.storage.archive_storage import ArchiveSandboxStorage
|
||||
from core.workflow.graph_engine.layers.base import GraphEngineLayer
|
||||
from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory
|
||||
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
|
||||
@ -499,11 +498,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
|
||||
app_id=application_generate_entity.app_config.app_id,
|
||||
user_id=application_generate_entity.user_id,
|
||||
workflow_version=workflow.version,
|
||||
sandbox_id=application_generate_entity.workflow_execution_id,
|
||||
sandbox_storage=ArchiveSandboxStorage(
|
||||
tenant_id=application_generate_entity.app_config.tenant_id,
|
||||
sandbox_id=application_generate_entity.workflow_execution_id,
|
||||
),
|
||||
workflow_execution_id=application_generate_entity.workflow_execution_id,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@ -1,8 +1,8 @@
|
||||
import logging
|
||||
|
||||
from core.sandbox import AppAssetsInitializer, DifyCliInitializer, SandboxManager
|
||||
from core.sandbox.storage.sandbox_storage import SandboxStorage
|
||||
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
|
||||
from core.sandbox.storage.archive_storage import ArchiveSandboxStorage
|
||||
from core.sandbox.vm import SandboxBuilder
|
||||
from core.workflow.graph_engine.layers.base import GraphEngineLayer
|
||||
from core.workflow.graph_events.base import GraphEngineEvent
|
||||
from core.workflow.graph_events.graph import GraphRunPausedEvent
|
||||
@ -25,32 +25,30 @@ class SandboxLayer(GraphEngineLayer):
|
||||
app_id: str,
|
||||
user_id: str,
|
||||
workflow_version: str,
|
||||
sandbox_id: str,
|
||||
sandbox_storage: SandboxStorage,
|
||||
workflow_execution_id: str,
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self._tenant_id = tenant_id
|
||||
self._app_id = app_id
|
||||
self._user_id = user_id
|
||||
self._workflow_version = workflow_version
|
||||
self._sandbox_id = sandbox_id
|
||||
self._sandbox_storage = sandbox_storage
|
||||
|
||||
@property
|
||||
def sandbox(self) -> VirtualEnvironment:
|
||||
sandbox = SandboxManager.get(self._sandbox_id)
|
||||
if sandbox is None:
|
||||
raise RuntimeError(f"Sandbox not found or not initialized for sandbox_id={self._sandbox_id}")
|
||||
return sandbox
|
||||
self._workflow_execution_id = workflow_execution_id
|
||||
self._sandbox_id = (
|
||||
self._workflow_execution_id
|
||||
if self._workflow_version == Workflow.VERSION_DRAFT
|
||||
else SandboxBuilder.draft_id(self._user_id)
|
||||
)
|
||||
self._sandbox_storage = ArchiveSandboxStorage(self._tenant_id, self._sandbox_id)
|
||||
|
||||
def on_graph_start(self) -> None:
|
||||
try:
|
||||
is_draft = self._workflow_version == Workflow.VERSION_DRAFT
|
||||
assets = AppAssetService.get_assets(self._tenant_id, self._app_id, is_draft=is_draft)
|
||||
assets = AppAssetService.get_assets(self._tenant_id, self._app_id, self._user_id, is_draft=is_draft)
|
||||
if not assets:
|
||||
raise ValueError(
|
||||
f"No assets found for tid={self._tenant_id}, app_id={self._app_id}, wf={self._workflow_version}"
|
||||
)
|
||||
|
||||
self._assets_id = assets.id
|
||||
|
||||
if is_draft:
|
||||
@ -63,20 +61,21 @@ class SandboxLayer(GraphEngineLayer):
|
||||
)
|
||||
AppAssetService.build_assets(self._tenant_id, self._app_id, assets)
|
||||
|
||||
logger.info(
|
||||
"Initializing sandbox for tenant_id=%s, app_id=%s, workflow_version=%s, assets_id=%s",
|
||||
self._tenant_id,
|
||||
self._app_id,
|
||||
self._workflow_version,
|
||||
assets.id,
|
||||
)
|
||||
|
||||
builder = (
|
||||
SandboxProviderService.create_sandbox_builder(self._tenant_id)
|
||||
.initializer(AppAssetsInitializer(self._tenant_id, self._app_id, assets.id))
|
||||
.initializer(DifyCliInitializer(self._tenant_id, self._user_id, self._app_id, assets.id))
|
||||
)
|
||||
sandbox = builder.build()
|
||||
try:
|
||||
sandbox = builder.build()
|
||||
logger.info(
|
||||
"Sandbox initialized, workflow_execution_id=%s, sandbox_id=%s, sandbox_arch=%s",
|
||||
self._sandbox_id,
|
||||
sandbox.metadata.id,
|
||||
sandbox.metadata.arch,
|
||||
)
|
||||
except Exception as e:
|
||||
raise SandboxInitializationError(f"Failed to build sandbox: {e}") from e
|
||||
|
||||
SandboxManager.register(self._sandbox_id, sandbox)
|
||||
logger.info(
|
||||
@ -86,9 +85,10 @@ class SandboxLayer(GraphEngineLayer):
|
||||
sandbox.metadata.arch,
|
||||
)
|
||||
|
||||
# Check if sandbox is initialized
|
||||
if self._sandbox_storage.mount(sandbox):
|
||||
logger.info("Sandbox files restored, sandbox_id=%s", self._sandbox_id)
|
||||
# mount sandbox files from storage
|
||||
mounted = self._sandbox_storage.mount(sandbox)
|
||||
logger.info("Sandbox files mount status: %s", mounted)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Failed to initialize sandbox")
|
||||
raise SandboxInitializationError(f"Failed to initialize sandbox: {e}") from e
|
||||
@ -108,13 +108,6 @@ class SandboxLayer(GraphEngineLayer):
|
||||
logger.debug("No sandbox to release for sandbox_id=%s", self._sandbox_id)
|
||||
return
|
||||
|
||||
sandbox_id = sandbox.metadata.id
|
||||
logger.info(
|
||||
"Releasing sandbox, workflow_execution_id=%s, sandbox_id=%s",
|
||||
self._sandbox_id,
|
||||
sandbox_id,
|
||||
)
|
||||
|
||||
try:
|
||||
self._sandbox_storage.unmount(sandbox)
|
||||
logger.info("Sandbox files persisted, sandbox_id=%s", self._sandbox_id)
|
||||
@ -123,6 +116,6 @@ class SandboxLayer(GraphEngineLayer):
|
||||
|
||||
try:
|
||||
sandbox.release_environment()
|
||||
logger.info("Sandbox released, sandbox_id=%s", sandbox_id)
|
||||
logger.info("Sandbox released, sandbox_id=%s", self._sandbox_id)
|
||||
except Exception:
|
||||
logger.exception("Failed to release sandbox, sandbox_id=%s", sandbox_id)
|
||||
logger.exception("Failed to release sandbox, sandbox_id=%s", self._sandbox_id)
|
||||
|
||||
@ -8,7 +8,7 @@ from types import TracebackType
|
||||
from core.session.cli_api import CliApiSessionManager
|
||||
from core.skill.entities.tool_artifact import ToolArtifact
|
||||
from core.skill.skill_manager import SkillManager
|
||||
from core.virtual_environment.__base.helpers import execute, with_connection
|
||||
from core.virtual_environment.__base.helpers import pipeline
|
||||
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
|
||||
|
||||
from ..bash.dify_cli import DifyCliConfig
|
||||
@ -73,63 +73,47 @@ class SandboxBashSession:
|
||||
node_id: str,
|
||||
allow_tools: list[tuple[str, str]],
|
||||
) -> str | None:
|
||||
with with_connection(sandbox) as conn:
|
||||
artifact: ToolArtifact | None = SkillManager.load_tool_artifact(
|
||||
self._tenant_id,
|
||||
self._app_id,
|
||||
self._assets_id,
|
||||
)
|
||||
artifact: ToolArtifact | None = SkillManager.load_tool_artifact(
|
||||
self._tenant_id,
|
||||
self._app_id,
|
||||
self._assets_id,
|
||||
)
|
||||
|
||||
if artifact is None or artifact.is_empty():
|
||||
logger.info("No tools found in artifact for assets_id=%s", self._assets_id)
|
||||
return None
|
||||
if artifact is None or artifact.is_empty():
|
||||
logger.info("No tools found in artifact for assets_id=%s", self._assets_id)
|
||||
return None
|
||||
|
||||
artifact = artifact.filter(allow_tools)
|
||||
if artifact.is_empty():
|
||||
logger.info("No tools found in artifact for assets_id=%s", self._assets_id)
|
||||
return None
|
||||
artifact = artifact.filter(allow_tools)
|
||||
if artifact.is_empty():
|
||||
logger.info("No tools found in artifact for assets_id=%s", self._assets_id)
|
||||
return None
|
||||
|
||||
self._cli_api_session = CliApiSessionManager().create(tenant_id=self._tenant_id, user_id=self._user_id)
|
||||
self._cli_api_session = CliApiSessionManager().create(tenant_id=self._tenant_id, user_id=self._user_id)
|
||||
node_tools_path = f"{DIFY_CLI_TOOLS_ROOT}/{node_id}"
|
||||
|
||||
execute(
|
||||
sandbox,
|
||||
["mkdir", "-p", DIFY_CLI_GLOBAL_TOOLS_PATH],
|
||||
connection=conn,
|
||||
error_message="Failed to create Dify CLI global tools directory",
|
||||
)
|
||||
(
|
||||
pipeline(sandbox)
|
||||
.add(["mkdir", "-p", DIFY_CLI_GLOBAL_TOOLS_PATH], error_message="Failed to create global tools dir")
|
||||
.add(["mkdir", "-p", node_tools_path], error_message="Failed to create node tools dir")
|
||||
.execute(raise_on_error=True)
|
||||
)
|
||||
|
||||
execute(
|
||||
sandbox,
|
||||
["mkdir", "-p", f"{DIFY_CLI_TOOLS_ROOT}/{node_id}"],
|
||||
connection=conn,
|
||||
error_message="Failed to create Dify CLI node tools directory",
|
||||
)
|
||||
config_json = json.dumps(
|
||||
DifyCliConfig.create(
|
||||
session=self._cli_api_session, tenant_id=self._tenant_id, artifact=artifact
|
||||
).model_dump(mode="json"),
|
||||
ensure_ascii=False,
|
||||
)
|
||||
sandbox.upload_file(f"{node_tools_path}/{DIFY_CLI_CONFIG_FILENAME}", BytesIO(config_json.encode("utf-8")))
|
||||
|
||||
config_json = json.dumps(
|
||||
DifyCliConfig.create(
|
||||
session=self._cli_api_session, tenant_id=self._tenant_id, artifact=artifact
|
||||
).model_dump(mode="json"),
|
||||
ensure_ascii=False,
|
||||
)
|
||||
sandbox.upload_file(
|
||||
f"{DIFY_CLI_TOOLS_ROOT}/{node_id}/{DIFY_CLI_CONFIG_FILENAME}", BytesIO(config_json.encode("utf-8"))
|
||||
)
|
||||
pipeline(sandbox, cwd=node_tools_path).add(
|
||||
[DIFY_CLI_PATH, "init"], error_message="Failed to initialize Dify CLI"
|
||||
).execute(raise_on_error=True)
|
||||
|
||||
execute(
|
||||
sandbox,
|
||||
[DIFY_CLI_PATH, "init"],
|
||||
connection=conn,
|
||||
cwd=f"{DIFY_CLI_TOOLS_ROOT}/{node_id}",
|
||||
error_message="Failed to initialize Dify CLI",
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Node %s tools initialized, path=%s, tool_count=%d",
|
||||
node_id,
|
||||
f"{DIFY_CLI_TOOLS_ROOT}/{node_id}",
|
||||
len(artifact.references),
|
||||
)
|
||||
return f"{DIFY_CLI_TOOLS_ROOT}/{node_id}"
|
||||
logger.info(
|
||||
"Node %s tools initialized, path=%s, tool_count=%d", node_id, node_tools_path, len(artifact.references)
|
||||
)
|
||||
return node_tools_path
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
|
||||
@ -1,16 +1,18 @@
|
||||
import logging
|
||||
from io import BytesIO
|
||||
|
||||
from core.app_assets.paths import AssetPaths
|
||||
from core.virtual_environment.__base.helpers import execute, with_connection
|
||||
from core.virtual_environment.__base.helpers import pipeline
|
||||
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
|
||||
from extensions.ext_storage import storage
|
||||
from extensions.storage.file_presign_storage import FilePresignStorage
|
||||
|
||||
from ..constants import APP_ASSETS_PATH, APP_ASSETS_ZIP_PATH
|
||||
from ..constants import APP_ASSETS_ZIP_PATH
|
||||
from .base import SandboxInitializer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
APP_ASSETS_DOWNLOAD_TIMEOUT = 60 * 10
|
||||
|
||||
|
||||
class AppAssetsInitializer(SandboxInitializer):
|
||||
def __init__(self, tenant_id: str, app_id: str, assets_id: str) -> None:
|
||||
@ -20,33 +22,20 @@ class AppAssetsInitializer(SandboxInitializer):
|
||||
|
||||
def initialize(self, env: VirtualEnvironment) -> None:
|
||||
zip_key = AssetPaths.build_zip(self._tenant_id, self._app_id, self._assets_id)
|
||||
try:
|
||||
zip_data = storage.load_once(zip_key)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to load assets zip for app_id=%s, key=%s",
|
||||
self._app_id,
|
||||
zip_key,
|
||||
exc_info=True,
|
||||
)
|
||||
return
|
||||
download_url = FilePresignStorage(storage.storage_runner).get_download_url(zip_key)
|
||||
|
||||
env.upload_file(APP_ASSETS_ZIP_PATH, BytesIO(zip_data))
|
||||
|
||||
with with_connection(env) as conn:
|
||||
execute(
|
||||
env,
|
||||
["unzip", "-o", APP_ASSETS_ZIP_PATH, "-d", APP_ASSETS_PATH],
|
||||
connection=conn,
|
||||
timeout=60,
|
||||
(
|
||||
pipeline(env)
|
||||
.add(["wget", "-q", download_url, "-O", APP_ASSETS_ZIP_PATH], error_message="Failed to download assets zip")
|
||||
# unzip with silent error and return 1 if the zip is empty
|
||||
# FIXME(Mairuis): should use a more robust way to check if the zip is empty
|
||||
.add(
|
||||
["sh", "-c", f"unzip {APP_ASSETS_ZIP_PATH} 2>/dev/null || [ $? -eq 1 ]"],
|
||||
error_message="Failed to unzip assets",
|
||||
)
|
||||
execute(
|
||||
env,
|
||||
["rm", "-f", APP_ASSETS_ZIP_PATH],
|
||||
connection=conn,
|
||||
error_message="Failed to cleanup temp zip file",
|
||||
)
|
||||
.add(["rm", "-f", APP_ASSETS_ZIP_PATH], error_message="Failed to cleanup temp zip file")
|
||||
.execute(timeout=APP_ASSETS_DOWNLOAD_TIMEOUT, raise_on_error=True)
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"App assets initialized for app_id=%s, published_id=%s",
|
||||
|
||||
@ -7,7 +7,7 @@ from pathlib import Path
|
||||
|
||||
from core.session.cli_api import CliApiSessionManager
|
||||
from core.skill.skill_manager import SkillManager
|
||||
from core.virtual_environment.__base.helpers import execute, with_connection
|
||||
from core.virtual_environment.__base.helpers import pipeline
|
||||
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
|
||||
|
||||
from ..bash.dify_cli import DifyCliConfig, DifyCliLocator
|
||||
@ -43,61 +43,37 @@ class DifyCliInitializer(SandboxInitializer):
|
||||
def initialize(self, env: VirtualEnvironment) -> None:
|
||||
binary = self._locator.resolve(env.metadata.os, env.metadata.arch)
|
||||
|
||||
with with_connection(env) as conn:
|
||||
execute(
|
||||
env,
|
||||
["mkdir", "-p", f"{DIFY_CLI_ROOT}/bin"],
|
||||
connection=conn,
|
||||
error_message="Failed to create dify CLI directory",
|
||||
)
|
||||
pipeline(env).add(
|
||||
["mkdir", "-p", f"{DIFY_CLI_ROOT}/bin"], error_message="Failed to create dify CLI directory"
|
||||
).execute(raise_on_error=True)
|
||||
|
||||
env.upload_file(DIFY_CLI_PATH, BytesIO(binary.path.read_bytes()))
|
||||
env.upload_file(DIFY_CLI_PATH, BytesIO(binary.path.read_bytes()))
|
||||
|
||||
execute(
|
||||
env,
|
||||
["chmod", "+x", DIFY_CLI_PATH],
|
||||
connection=conn,
|
||||
error_message="Failed to mark dify CLI as executable",
|
||||
)
|
||||
pipeline(env).add(
|
||||
["chmod", "+x", DIFY_CLI_PATH], error_message="Failed to mark dify CLI as executable"
|
||||
).execute(raise_on_error=True)
|
||||
|
||||
logger.info("Dify CLI uploaded to sandbox, path=%s", DIFY_CLI_PATH)
|
||||
logger.info("Dify CLI uploaded to sandbox, path=%s", DIFY_CLI_PATH)
|
||||
|
||||
artifact = SkillManager.load_tool_artifact(
|
||||
self._tenant_id,
|
||||
self._app_id,
|
||||
self._assets_id,
|
||||
)
|
||||
artifact = SkillManager.load_tool_artifact(self._tenant_id, self._app_id, self._assets_id)
|
||||
if artifact is None or not artifact.references:
|
||||
logger.info("No tools found in artifact for assets_id=%s", self._assets_id)
|
||||
return
|
||||
|
||||
if artifact is None or not artifact.references:
|
||||
logger.info("No tools found in artifact for assets_id=%s", self._assets_id)
|
||||
return
|
||||
# FIXME(Mairuis): store it in workflow context
|
||||
self._cli_api_session = CliApiSessionManager().create(tenant_id=self._tenant_id, user_id=self._user_id)
|
||||
|
||||
# FIXME(Mairuis): store it in workflow context
|
||||
self._cli_api_session = CliApiSessionManager().create(tenant_id=self._tenant_id, user_id=self._user_id)
|
||||
pipeline(env).add(
|
||||
["mkdir", "-p", DIFY_CLI_GLOBAL_TOOLS_PATH], error_message="Failed to create global tools dir"
|
||||
).execute(raise_on_error=True)
|
||||
|
||||
execute(
|
||||
env,
|
||||
["mkdir", "-p", DIFY_CLI_GLOBAL_TOOLS_PATH],
|
||||
connection=conn,
|
||||
error_message="Failed to create Dify CLI global tools directory",
|
||||
)
|
||||
config = DifyCliConfig.create(self._cli_api_session, self._tenant_id, artifact)
|
||||
config_json = json.dumps(config.model_dump(mode="json"), ensure_ascii=False)
|
||||
config_path = f"{DIFY_CLI_GLOBAL_TOOLS_PATH}/{DIFY_CLI_CONFIG_FILENAME}"
|
||||
env.upload_file(config_path, BytesIO(config_json.encode("utf-8")))
|
||||
|
||||
config = DifyCliConfig.create(self._cli_api_session, self._tenant_id, artifact)
|
||||
config_json = json.dumps(config.model_dump(mode="json"), ensure_ascii=False)
|
||||
env.upload_file(
|
||||
f"{DIFY_CLI_GLOBAL_TOOLS_PATH}/{DIFY_CLI_CONFIG_FILENAME}", BytesIO(config_json.encode("utf-8"))
|
||||
)
|
||||
pipeline(env, cwd=DIFY_CLI_GLOBAL_TOOLS_PATH).add(
|
||||
[DIFY_CLI_PATH, "init"], error_message="Failed to initialize Dify CLI"
|
||||
).execute(raise_on_error=True)
|
||||
|
||||
execute(
|
||||
env,
|
||||
[DIFY_CLI_PATH, "init"],
|
||||
connection=conn,
|
||||
cwd=DIFY_CLI_GLOBAL_TOOLS_PATH,
|
||||
error_message="Failed to initialize Dify CLI",
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Global tools initialized, path=%s, tool_count=%d",
|
||||
DIFY_CLI_GLOBAL_TOOLS_PATH,
|
||||
len(self._tools),
|
||||
)
|
||||
logger.info("Global tools initialized, path=%s, tool_count=%d", DIFY_CLI_GLOBAL_TOOLS_PATH, len(self._tools))
|
||||
|
||||
@ -1,9 +1,10 @@
|
||||
import logging
|
||||
from io import BytesIO
|
||||
|
||||
from core.virtual_environment.__base.helpers import try_execute
|
||||
from core.virtual_environment.__base.exec import PipelineExecutionError
|
||||
from core.virtual_environment.__base.helpers import pipeline
|
||||
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
|
||||
from extensions.ext_storage import storage
|
||||
from extensions.storage.file_presign_storage import FilePresignStorage
|
||||
|
||||
from .sandbox_storage import SandboxStorage
|
||||
|
||||
@ -13,10 +14,12 @@ ARCHIVE_NAME = "workspace.tar.gz"
|
||||
WORKSPACE_DIR = "."
|
||||
ARCHIVE_PATH = f"/tmp/{ARCHIVE_NAME}"
|
||||
|
||||
ARCHIVE_DOWNLOAD_TIMEOUT = 60 * 5
|
||||
ARCHIVE_UPLOAD_TIMEOUT = 60 * 5
|
||||
|
||||
|
||||
class ArchiveSandboxStorage(SandboxStorage):
|
||||
def __init__(self, tenant_id: str, sandbox_id: str):
|
||||
self._storage = storage
|
||||
self._tenant_id = tenant_id
|
||||
self._sandbox_id = sandbox_id
|
||||
|
||||
@ -29,39 +32,40 @@ class ArchiveSandboxStorage(SandboxStorage):
|
||||
logger.debug("No archive found for sandbox %s, skipping mount", self._sandbox_id)
|
||||
return False
|
||||
|
||||
archive_data = self._storage.load_once(self._storage_key)
|
||||
sandbox.upload_file(ARCHIVE_NAME, BytesIO(archive_data))
|
||||
|
||||
result = try_execute(sandbox, ["tar", "-xzf", ARCHIVE_NAME], timeout=60)
|
||||
if result.is_error:
|
||||
logger.error("Failed to extract archive: %s", result.error_message)
|
||||
download_url = FilePresignStorage(storage.storage_runner).get_download_url(self._storage_key)
|
||||
try:
|
||||
(
|
||||
pipeline(sandbox)
|
||||
.add(["wget", download_url, "-O", ARCHIVE_NAME], error_message="Failed to download archive")
|
||||
.add(["tar", "-xzf", ARCHIVE_NAME], error_message="Failed to extract archive")
|
||||
.add(["rm", ARCHIVE_NAME], error_message="Failed to cleanup archive")
|
||||
.execute(timeout=ARCHIVE_DOWNLOAD_TIMEOUT, raise_on_error=True)
|
||||
)
|
||||
except PipelineExecutionError:
|
||||
logger.exception("Failed to extract archive")
|
||||
return False
|
||||
|
||||
try_execute(sandbox, ["rm", ARCHIVE_NAME], timeout=10)
|
||||
|
||||
logger.info("Mounted archive for sandbox %s", self._sandbox_id)
|
||||
return True
|
||||
|
||||
def unmount(self, sandbox: VirtualEnvironment) -> bool:
|
||||
result = try_execute(
|
||||
sandbox,
|
||||
["tar", "-czf", ARCHIVE_PATH, "--warning=no-file-changed", "-C", WORKSPACE_DIR, "."],
|
||||
timeout=120,
|
||||
upload_url = FilePresignStorage(storage.storage_runner).get_upload_url(self._storage_key)
|
||||
(
|
||||
pipeline(sandbox)
|
||||
.add(
|
||||
["tar", "-czf", ARCHIVE_PATH, "--warning=no-file-changed", "-C", WORKSPACE_DIR, "."],
|
||||
error_message="Failed to create archive",
|
||||
)
|
||||
.add(["wget", upload_url, "-O", ARCHIVE_PATH], error_message="Failed to upload archive")
|
||||
.execute(timeout=ARCHIVE_UPLOAD_TIMEOUT, raise_on_error=True)
|
||||
)
|
||||
if result.is_error:
|
||||
logger.error("Failed to create archive: %s", result.error_message)
|
||||
return False
|
||||
|
||||
archive_content = sandbox.download_file(ARCHIVE_PATH)
|
||||
self._storage.save(self._storage_key, archive_content.getvalue())
|
||||
|
||||
logger.info("Unmounted archive for sandbox %s", self._sandbox_id)
|
||||
return True
|
||||
|
||||
def exists(self) -> bool:
|
||||
return self._storage.exists(self._storage_key)
|
||||
return storage.exists(self._storage_key)
|
||||
|
||||
def delete(self) -> None:
|
||||
if self.exists():
|
||||
self._storage.delete(self._storage_key)
|
||||
storage.delete(self._storage_key)
|
||||
logger.info("Deleted archive for sandbox %s", self._sandbox_id)
|
||||
|
||||
@ -102,6 +102,10 @@ class SandboxBuilder:
|
||||
vm_class = _get_sandbox_class(vm_type)
|
||||
vm_class.validate(options)
|
||||
|
||||
@classmethod
|
||||
def draft_id(cls, user_id: str) -> str:
|
||||
return f"sandbox_draft_{user_id}"
|
||||
|
||||
|
||||
class VMConfig:
|
||||
@staticmethod
|
||||
|
||||
0
api/core/skill/entities/skill_artifact.py
Normal file
0
api/core/skill/entities/skill_artifact.py
Normal file
@ -33,6 +33,8 @@ class SandboxConfigValidationError(ValueError):
|
||||
class CommandExecutionError(Exception):
|
||||
"""Raised when a command execution fails."""
|
||||
|
||||
result: CommandResult
|
||||
|
||||
def __init__(self, message: str, result: CommandResult):
|
||||
super().__init__(message)
|
||||
self.result = result
|
||||
@ -44,3 +46,19 @@ class CommandExecutionError(Exception):
|
||||
@property
|
||||
def stderr(self) -> bytes:
|
||||
return self.result.stderr
|
||||
|
||||
|
||||
class PipelineExecutionError(CommandExecutionError):
|
||||
"""Raised when a pipeline command fails in strict mode."""
|
||||
|
||||
index: int
|
||||
command: list[str]
|
||||
results: list[CommandResult]
|
||||
|
||||
def __init__(
|
||||
self, message: str, result: CommandResult, *, index: int, command: list[str], results: list[CommandResult]
|
||||
):
|
||||
super().__init__(message, result)
|
||||
self.index = index
|
||||
self.command = command
|
||||
self.results = results
|
||||
|
||||
@ -1,15 +1,19 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import contextlib
|
||||
import shlex
|
||||
from collections.abc import Generator, Mapping
|
||||
from contextlib import contextmanager
|
||||
from dataclasses import dataclass, field
|
||||
from functools import partial
|
||||
|
||||
from core.virtual_environment.__base.command_future import CommandFuture
|
||||
from core.virtual_environment.__base.entities import CommandResult, ConnectionHandle
|
||||
from core.virtual_environment.__base.exec import CommandExecutionError
|
||||
from core.virtual_environment.__base.exec import CommandExecutionError, PipelineExecutionError
|
||||
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
|
||||
|
||||
_PIPE_SENTINEL = "__DIFY_PIPE__"
|
||||
|
||||
|
||||
@contextmanager
|
||||
def with_connection(env: VirtualEnvironment) -> Generator[ConnectionHandle, None, None]:
|
||||
@ -147,3 +151,127 @@ def try_execute(
|
||||
|
||||
with with_connection(env) as conn:
|
||||
return _execute_with_connection(env, conn, command, timeout, cwd)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class _PipelineStep:
|
||||
argv: list[str]
|
||||
error_message: str = "Command failed"
|
||||
|
||||
|
||||
@dataclass
|
||||
class CommandPipeline:
|
||||
"""Batch multiple commands into a single shell execution (Redis pipeline style).
|
||||
|
||||
Example:
|
||||
results = pipeline(env).add(["echo", "hi"]).add(["ls"]).execute()
|
||||
# Strict mode: raise on first failure
|
||||
pipeline(env).add(["mkdir", "/x"], error_message="mkdir failed").execute(raise_on_error=True)
|
||||
"""
|
||||
|
||||
env: VirtualEnvironment
|
||||
connection: ConnectionHandle | None = None
|
||||
cwd: str | None = None
|
||||
environments: Mapping[str, str] | None = None
|
||||
|
||||
_steps: list[_PipelineStep] = field(default_factory=list) # pyright: ignore[reportUnknownVariableType]
|
||||
|
||||
def add(self, command: list[str], *, error_message: str = "Command failed") -> CommandPipeline:
|
||||
self._steps.append(_PipelineStep(argv=command, error_message=error_message))
|
||||
return self
|
||||
|
||||
def execute(self, *, timeout: float | None = 30, raise_on_error: bool = False) -> list[CommandResult]:
|
||||
if not self._steps:
|
||||
return []
|
||||
|
||||
script = self._build_script(fail_fast=raise_on_error)
|
||||
batch_cmd = ["sh", "-lc", script]
|
||||
|
||||
if self.connection is not None:
|
||||
batch_result = try_execute(self.env, batch_cmd, timeout=timeout, cwd=self.cwd, connection=self.connection)
|
||||
else:
|
||||
with with_connection(self.env) as conn:
|
||||
batch_result = try_execute(self.env, batch_cmd, timeout=timeout, cwd=self.cwd, connection=conn)
|
||||
|
||||
results = self._parse_results(batch_result.stdout, batch_result.pid)
|
||||
|
||||
if raise_on_error:
|
||||
for i, r in enumerate(iterable=results):
|
||||
if r.is_error:
|
||||
step = self._steps[i]
|
||||
raise PipelineExecutionError(
|
||||
f"{step.error_message}: {r.error_message}",
|
||||
r,
|
||||
index=i,
|
||||
command=step.argv,
|
||||
results=results,
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
def _build_script(self, *, fail_fast: bool = False) -> str:
|
||||
lines = [
|
||||
"run() {",
|
||||
' i="$1"; shift',
|
||||
' out="$(mktemp)"; err="$(mktemp)"',
|
||||
' ("$@") >"$out" 2>"$err"; ec="$?"',
|
||||
' os="$(wc -c <"$out" | tr -d \' \')"',
|
||||
' es="$(wc -c <"$err" | tr -d \' \')"',
|
||||
f' printf \'{_PIPE_SENTINEL} %s %s %s %s\\n\' "$i" "$ec" "$os" "$es"',
|
||||
' cat "$out"',
|
||||
' cat "$err"',
|
||||
' rm -f "$out" "$err"',
|
||||
' return "$ec"',
|
||||
"}",
|
||||
]
|
||||
suffix = " || exit $?" if fail_fast else ""
|
||||
for i, step in enumerate(self._steps):
|
||||
quoted = " ".join(shlex.quote(arg) for arg in step.argv)
|
||||
lines.append(f"run {i} {quoted}{suffix}")
|
||||
return "\n".join(lines)
|
||||
|
||||
@staticmethod
|
||||
def _parse_results(stdout: bytes, pid: str) -> list[CommandResult]:
|
||||
results: list[CommandResult] = []
|
||||
pos = 0
|
||||
sentinel = _PIPE_SENTINEL.encode() + b" "
|
||||
|
||||
while pos < len(stdout):
|
||||
nl = stdout.find(b"\n", pos)
|
||||
if nl == -1:
|
||||
break
|
||||
header = stdout[pos : nl + 1]
|
||||
pos = nl + 1
|
||||
|
||||
if not header.startswith(sentinel):
|
||||
raise ValueError("Malformed pipeline output: missing sentinel")
|
||||
|
||||
parts = header.decode().strip().split(" ")
|
||||
_, idx, ec, os_len, es_len = parts
|
||||
out_len, err_len = int(os_len), int(es_len)
|
||||
|
||||
out_bytes = stdout[pos : pos + out_len]
|
||||
pos += out_len
|
||||
err_bytes = stdout[pos : pos + err_len]
|
||||
pos += err_len
|
||||
|
||||
results.append(
|
||||
CommandResult(
|
||||
stdout=out_bytes,
|
||||
stderr=err_bytes,
|
||||
exit_code=int(ec),
|
||||
pid=f"{pid}:{idx}",
|
||||
)
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def pipeline(
|
||||
env: VirtualEnvironment,
|
||||
connection: ConnectionHandle | None = None,
|
||||
*,
|
||||
cwd: str | None = None,
|
||||
environments: Mapping[str, str] | None = None,
|
||||
) -> CommandPipeline:
|
||||
return CommandPipeline(env=env, connection=connection, cwd=cwd, environments=environments)
|
||||
|
||||
@ -93,3 +93,11 @@ class AwsS3Storage(BaseStorage):
|
||||
ExpiresIn=expires_in,
|
||||
)
|
||||
return url
|
||||
|
||||
def get_upload_url(self, filename: str, expires_in: int = 3600) -> str:
|
||||
url: str = self.client.generate_presigned_url(
|
||||
ClientMethod="put_object",
|
||||
Params={"Bucket": self.bucket_name, "Key": filename},
|
||||
ExpiresIn=expires_in,
|
||||
)
|
||||
return url
|
||||
|
||||
@ -57,3 +57,12 @@ class BaseStorage(ABC):
|
||||
NotImplementedError: If this storage backend doesn't support pre-signed URLs
|
||||
"""
|
||||
raise NotImplementedError("This storage backend doesn't support pre-signed URLs")
|
||||
|
||||
def get_upload_url(self, filename: str, expires_in: int = 3600) -> str:
|
||||
"""
|
||||
Generate a pre-signed URL for uploading a file.
|
||||
|
||||
Storage backends that support pre-signed URLs (e.g., S3, Azure Blob, GCS)
|
||||
should override this method to return a direct upload URL.
|
||||
"""
|
||||
raise NotImplementedError("This storage backend doesn't support pre-signed URLs")
|
||||
|
||||
@ -44,6 +44,16 @@ class FilePresignStorage(BaseStorage):
|
||||
except NotImplementedError:
|
||||
return self._generate_signed_proxy_url(filename)
|
||||
|
||||
def get_upload_url(self, filename: str, expires_in: int = 3600) -> str:
|
||||
try:
|
||||
return self._storage.get_upload_url(filename, expires_in)
|
||||
except NotImplementedError:
|
||||
return self._generate_signed_upload_url(filename)
|
||||
|
||||
def _generate_signed_upload_url(self, filename: str) -> str:
|
||||
# TODO: Implement this
|
||||
raise NotImplementedError("This storage backend doesn't support pre-signed URLs")
|
||||
|
||||
def _generate_signed_proxy_url(self, filename: str) -> str:
|
||||
base_url = dify_config.FILES_URL
|
||||
encoded_filename = urllib.parse.quote(filename, safe="")
|
||||
|
||||
@ -19,6 +19,7 @@ class AppAssets(Base):
|
||||
)
|
||||
|
||||
VERSION_DRAFT = "draft"
|
||||
VERSION_PUBLISHED = "published"
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuid4()))
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
|
||||
@ -62,14 +62,24 @@ class AppAssetService:
|
||||
return assets
|
||||
|
||||
@staticmethod
|
||||
def get_assets(tenant_id: str, app_id: str, *, is_draft: bool) -> AppAssets | None:
|
||||
with Session(db.engine) as session:
|
||||
def get_assets(tenant_id: str, app_id: str, user_id: str, *, is_draft: bool) -> AppAssets | None:
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
if is_draft:
|
||||
stmt = session.query(AppAssets).filter(
|
||||
AppAssets.tenant_id == tenant_id,
|
||||
AppAssets.app_id == app_id,
|
||||
AppAssets.version == AppAssets.VERSION_DRAFT,
|
||||
)
|
||||
if not stmt.first():
|
||||
assets = AppAssets(
|
||||
id=str(uuid4()),
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
version=AppAssets.VERSION_DRAFT,
|
||||
created_by=user_id,
|
||||
)
|
||||
session.add(assets)
|
||||
session.commit()
|
||||
else:
|
||||
stmt = (
|
||||
session.query(AppAssets)
|
||||
@ -308,7 +318,7 @@ class AppAssetService:
|
||||
parser = AssetParser(tree, tenant_id, app_id)
|
||||
parser.register(
|
||||
"md",
|
||||
SkillAssetParser(tenant_id, app_id, publish_id),
|
||||
SkillAssetParser(tenant_id, app_id, publish_id, tree),
|
||||
)
|
||||
|
||||
assets = parser.parse()
|
||||
|
||||
@ -15,6 +15,8 @@ from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
|
||||
from core.file import File
|
||||
from core.repositories import DifyCoreRepositoryFactory
|
||||
from core.sandbox import SandboxManager
|
||||
from core.sandbox.storage.archive_storage import ArchiveSandboxStorage
|
||||
from core.sandbox.storage.sandbox_storage import SandboxStorage
|
||||
from core.variables import Variable, VariableBase
|
||||
from core.workflow.entities import WorkflowNodeExecution
|
||||
from core.workflow.enums import ErrorStrategy, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
|
||||
@ -704,7 +706,7 @@ class WorkflowService:
|
||||
from core.sandbox import AppAssetsInitializer, DifyCliInitializer
|
||||
from services.app_asset_service import AppAssetService
|
||||
|
||||
assets = AppAssetService.get_assets(draft_workflow.tenant_id, app_model.id, is_draft=True)
|
||||
assets = AppAssetService.get_or_create_assets(draft_workflow.tenant_id, app_model.id, is_draft=True)
|
||||
if not assets:
|
||||
raise ValueError(f"No assets found for tid={draft_workflow.tenant_id}, app_id={app_model.id}")
|
||||
|
||||
@ -715,6 +717,7 @@ class WorkflowService:
|
||||
SandboxProviderService.create_sandbox_builder(draft_workflow.tenant_id)
|
||||
.initializer(DifyCliInitializer(draft_workflow.tenant_id, account.id, app_model.id, assets.id))
|
||||
.initializer(AppAssetsInitializer(draft_workflow.tenant_id, app_model.id, assets.id))
|
||||
.storage(ArchiveSandboxStorage(draft_workflow.tenant_id, SandboxStorage.draft_id(account.id)))
|
||||
.build()
|
||||
)
|
||||
single_step_execution_id = f"single-step-{uuid.uuid4()}"
|
||||
|
||||
Reference in New Issue
Block a user