refactor(sandbox): async init and draft downloads

Reduce startup latency by deferring sandbox setup and downloading draft assets directly with cached presigned URLs.
This commit is contained in:
Yeuoly
2026-01-22 19:18:34 +08:00
parent 87f35efa2f
commit 5e16d85ff6
13 changed files with 448 additions and 31 deletions

View File

@ -1,19 +1,25 @@
from .bash.dify_cli import (
DifyCliBinary,
DifyCliConfig,
DifyCliEnvConfig,
DifyCliLocator,
DifyCliToolConfig,
)
from .bash.session import SandboxBashSession
from .builder import SandboxBuilder, VMConfig
from .entities import AppAssets, DifyCli, SandboxProviderApiEntity, SandboxType
from .initializer import AppAssetsInitializer, DifyCliInitializer, SandboxInitializer
from .manager import SandboxManager
from .sandbox import Sandbox
from .storage import ArchiveSandboxStorage, SandboxStorage
from .utils.debug import sandbox_debug
from .utils.encryption import create_sandbox_config_encrypter, masked_config
from __future__ import annotations
import importlib
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .bash.dify_cli import (
DifyCliBinary,
DifyCliConfig,
DifyCliEnvConfig,
DifyCliLocator,
DifyCliToolConfig,
)
from .bash.session import SandboxBashSession
from .builder import SandboxBuilder, VMConfig
from .entities import AppAssets, DifyCli, SandboxProviderApiEntity, SandboxType
from .initializer import AppAssetsInitializer, DifyCliInitializer, SandboxInitializer
from .manager import SandboxManager
from .sandbox import Sandbox
from .storage import ArchiveSandboxStorage, SandboxStorage
from .utils.debug import sandbox_debug
from .utils.encryption import create_sandbox_config_encrypter, masked_config
__all__ = [
"AppAssets",
@ -39,3 +45,38 @@ __all__ = [
"masked_config",
"sandbox_debug",
]
_LAZY_IMPORTS = {
"AppAssets": ("core.sandbox.entities", "AppAssets"),
"AppAssetsInitializer": ("core.sandbox.initializer", "AppAssetsInitializer"),
"ArchiveSandboxStorage": ("core.sandbox.storage", "ArchiveSandboxStorage"),
"DifyCli": ("core.sandbox.entities", "DifyCli"),
"DifyCliBinary": ("core.sandbox.bash.dify_cli", "DifyCliBinary"),
"DifyCliConfig": ("core.sandbox.bash.dify_cli", "DifyCliConfig"),
"DifyCliEnvConfig": ("core.sandbox.bash.dify_cli", "DifyCliEnvConfig"),
"DifyCliInitializer": ("core.sandbox.initializer", "DifyCliInitializer"),
"DifyCliLocator": ("core.sandbox.bash.dify_cli", "DifyCliLocator"),
"DifyCliToolConfig": ("core.sandbox.bash.dify_cli", "DifyCliToolConfig"),
"Sandbox": ("core.sandbox.sandbox", "Sandbox"),
"SandboxBashSession": ("core.sandbox.bash.session", "SandboxBashSession"),
"SandboxBuilder": ("core.sandbox.builder", "SandboxBuilder"),
"SandboxInitializer": ("core.sandbox.initializer", "SandboxInitializer"),
"SandboxManager": ("core.sandbox.manager", "SandboxManager"),
"SandboxProviderApiEntity": ("core.sandbox.entities", "SandboxProviderApiEntity"),
"SandboxStorage": ("core.sandbox.storage", "SandboxStorage"),
"SandboxType": ("core.sandbox.entities", "SandboxType"),
"VMConfig": ("core.sandbox.builder", "VMConfig"),
"create_sandbox_config_encrypter": ("core.sandbox.utils.encryption", "create_sandbox_config_encrypter"),
"masked_config": ("core.sandbox.utils.encryption", "masked_config"),
"sandbox_debug": ("core.sandbox.utils.debug", "sandbox_debug"),
}
def __getattr__(name: str):
if name not in _LAZY_IMPORTS:
raise AttributeError(f"module 'core.sandbox' has no attribute {name}")
module_path, attr_name = _LAZY_IMPORTS[name]
module = importlib.import_module(module_path)
value = getattr(module, attr_name)
globals()[name] = value
return value

View File

@ -16,6 +16,8 @@ from .bash_tool import SandboxBashTool
logger = logging.getLogger(__name__)
SANDBOX_READY_TIMEOUT = 60 * 10
class SandboxBashSession:
def __init__(self, *, sandbox: Sandbox, node_id: str, tools: ToolArtifact | None) -> None:
@ -30,6 +32,8 @@ class SandboxBashSession:
self._assets_id = sandbox.assets_id
def __enter__(self) -> SandboxBashSession:
# Ensure sandbox initialization completes before any bash commands run.
self._sandbox.wait_ready(timeout=SANDBOX_READY_TIMEOUT)
self._cli_api_session = CliApiSessionManager().create(
tenant_id=self._tenant_id,
user_id=self._user_id,

View File

@ -1,5 +1,7 @@
from __future__ import annotations
import logging
import threading
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any
@ -13,6 +15,8 @@ from .sandbox import Sandbox
if TYPE_CHECKING:
from .storage.sandbox_storage import SandboxStorage
logger = logging.getLogger(__name__)
def _get_sandbox_class(sandbox_type: SandboxType) -> type[VirtualEnvironment]:
match sandbox_type:
@ -108,10 +112,24 @@ class SandboxBuilder:
app_id=self._app_id,
assets_id=self._assets_id,
)
for init in self._initializers:
init.initialize(sandbox)
sandbox.mount()
# Run sandbox setup asynchronously so workflow execution can proceed.
def initialize() -> None:
try:
for init in self._initializers:
if sandbox.is_cancelled():
return
init.initialize(sandbox)
if sandbox.is_cancelled():
return
sandbox.mount()
sandbox.mark_ready()
except Exception as exc:
logger.exception("Failed to initialize sandbox: tenant_id=%s, app_id=%s", self._tenant_id, self._app_id)
sandbox.mark_failed(exc)
# Background init completes or signals failure via sandbox state.
threading.Thread(target=initialize, daemon=True).start()
return sandbox
@staticmethod

View File

@ -1,9 +1,11 @@
from .app_assets_initializer import AppAssetsInitializer
from .draft_app_assets_initializer import DraftAppAssetsInitializer
from .base import SandboxInitializer
from .dify_cli_initializer import DifyCliInitializer
__all__ = [
"AppAssetsInitializer",
"DraftAppAssetsInitializer",
"DifyCliInitializer",
"SandboxInitializer",
]

View File

@ -22,18 +22,21 @@ class AppAssetsInitializer(SandboxInitializer):
self._app_id = app_id
self._assets_id = assets_id
def initialize(self, sandbox: Sandbox) -> None:
vm = sandbox.vm
# load app assets
def initialize(self, env: Sandbox) -> None:
vm = env.vm
# Load published app assets and unzip the artifact bundle.
app_assets = AppAssetService.get_tenant_app_assets(self._tenant_id, self._assets_id)
sandbox.attrs.set(AppAssetsAttrs.FILE_TREE, app_assets.asset_tree)
env.attrs.set(AppAssetsAttrs.FILE_TREE, app_assets.asset_tree)
zip_key = AssetPaths.build_zip(self._tenant_id, self._app_id, self._assets_id)
download_url = FilePresignStorage(storage.storage_runner).get_download_url(zip_key)
(
pipeline(vm)
.add(["wget", "-q", download_url, "-O", AppAssets.ZIP_PATH], error_message="Failed to download assets zip")
.add(
["wget", "-q", download_url, "-O", AppAssets.ZIP_PATH],
error_message="Failed to download assets zip",
)
# unzip with silent error and return 1 if the zip is empty
# FIXME(Mairuis): should use a more robust way to check if the zip is empty
.add(

View File

@ -0,0 +1,44 @@
import logging
from core.app_assets.constants import AppAssetsAttrs
from core.sandbox.entities import AppAssets
from core.sandbox.sandbox import Sandbox
from core.sandbox.services import AssetDownloadService
from core.sandbox.services.asset_download_service import AssetDownloadItem
from core.virtual_environment.__base.helpers import pipeline
from services.app_asset_service import AppAssetService
from .base import SandboxInitializer
logger = logging.getLogger(__name__)
DRAFT_ASSETS_DOWNLOAD_TIMEOUT = 60 * 10
class DraftAppAssetsInitializer(SandboxInitializer):
def __init__(self, tenant_id: str, app_id: str, assets_id: str) -> None:
self._tenant_id = tenant_id
self._app_id = app_id
self._assets_id = assets_id
def initialize(self, env: Sandbox) -> None:
vm = env.vm
# Draft assets download via presigned URLs to avoid zip build overhead.
app_assets = AppAssetService.get_tenant_app_assets(self._tenant_id, self._assets_id)
env.attrs.set(AppAssetsAttrs.FILE_TREE, app_assets.asset_tree)
items = [
AssetDownloadItem(path=path, url=url)
for path, url in AppAssetService.get_cached_draft_download_urls(app_assets)
]
script = AssetDownloadService.build_download_script(items, AppAssets.PATH)
pipeline(vm).add(
["sh", "-lc", script],
error_message="Failed to download draft assets",
).execute(timeout=DRAFT_ASSETS_DOWNLOAD_TIMEOUT, raise_on_error=True)
logger.info(
"Draft app assets initialized for app_id=%s, assets_id=%s",
self._app_id,
self._assets_id,
)

View File

@ -9,6 +9,7 @@ from core.sandbox.entities import AppAssets, SandboxType
from core.sandbox.entities.providers import SandboxProviderEntity
from core.sandbox.initializer.app_assets_initializer import AppAssetsInitializer
from core.sandbox.initializer.dify_cli_initializer import DifyCliInitializer
from core.sandbox.initializer.draft_app_assets_initializer import DraftAppAssetsInitializer
from core.sandbox.initializer.skill_initializer import SkillInitializer
from core.sandbox.sandbox import Sandbox
from core.sandbox.storage.archive_storage import ArchiveSandboxStorage
@ -158,7 +159,7 @@ class SandboxManager:
.options(sandbox_provider.config)
.user(user_id)
.app(app_id)
.initializer(AppAssetsInitializer(tenant_id, app_id, assets.id))
.initializer(DraftAppAssetsInitializer(tenant_id, app_id, assets.id))
.initializer(DifyCliInitializer(tenant_id, user_id, app_id, assets.id))
.initializer(SkillInitializer(tenant_id, user_id, app_id, assets.id))
.storage(storage, assets.id)

View File

@ -1,6 +1,7 @@
from __future__ import annotations
import logging
import threading
from typing import TYPE_CHECKING
from libs.attr_map import AttrMap
@ -30,6 +31,9 @@ class Sandbox:
self._app_id = app_id
self._assets_id = assets_id
self._attributes = AttrMap()
self._ready_event = threading.Event()
self._cancel_event = threading.Event()
self._init_error: Exception | None = None
@property
def attrs(self) -> AttrMap:
@ -59,6 +63,32 @@ class Sandbox:
def assets_id(self) -> str:
return self._assets_id
def mark_ready(self) -> None:
# Signal that sandbox initialization has completed successfully.
self._ready_event.set()
def mark_failed(self, error: Exception) -> None:
# Capture initialization error and unblock waiters.
self._init_error = error
self._ready_event.set()
def cancel_init(self) -> None:
# Mark initialization as cancelled to stop background setup.
self._cancel_event.set()
self._ready_event.set()
def is_cancelled(self) -> bool:
return self._cancel_event.is_set()
def wait_ready(self, timeout: float | None = None) -> None:
# Block until initialization completes, fails, or is cancelled.
if not self._ready_event.wait(timeout=timeout):
raise TimeoutError("Sandbox initialization timed out")
if self._cancel_event.is_set():
raise RuntimeError("Sandbox initialization was cancelled")
if self._init_error is not None:
raise RuntimeError("Sandbox initialization failed") from self._init_error
def mount(self) -> bool:
return self._storage.mount(self._vm)
@ -66,6 +96,7 @@ class Sandbox:
return self._storage.unmount(self._vm)
def release(self) -> None:
self.cancel_init()
sandbox_id = self._vm.metadata.id
try:
self._storage.unmount(self._vm)

View File

@ -0,0 +1,3 @@
from .asset_download_service import AssetDownloadService
__all__ = ["AssetDownloadService"]

View File

@ -0,0 +1,77 @@
from __future__ import annotations
import shlex
import textwrap
from dataclasses import dataclass
def _render_download_script(root_path: str, download_commands: str) -> str:
python_download_cmd = (
"python3 - \"${url}\" \"${dest}\" <<\"PY\"\n"
"import sys\n"
"import urllib.request\n"
"url = sys.argv[1]\n"
"dest = sys.argv[2]\n"
"with urllib.request.urlopen(url) as resp:\n"
" data = resp.read()\n"
"with open(dest, \"wb\") as f:\n"
" f.write(data)\n"
"PY"
)
script = f"""
download_root={shlex.quote(root_path)}
if command -v curl >/dev/null 2>&1; then
download_cmd='curl -fsSL "${{url}}" -o "${{dest}}"'
elif command -v wget >/dev/null 2>&1; then
download_cmd='wget -q "${{url}}" -O "${{dest}}"'
elif command -v python3 >/dev/null 2>&1; then
download_cmd={shlex.quote(python_download_cmd)}
else
echo 'No downloader found (curl/wget/python3)' >&2
exit 1
fi
mkdir -p "${{download_root}}"
fail_log="$(mktemp)"
download_one() {{
file_path="$1"
url="$2"
dest="${{download_root}}${{file_path}}"
mkdir -p "$(dirname "${{dest}}")"
eval "${{download_cmd}}" || echo "${{file_path}}" >> "${{fail_log}}"
}}
{download_commands}
wait
if [ -s "${{fail_log}}" ]; then
echo 'Failed downloads:' >&2
cat "${{fail_log}}" >&2
rm -f "${{fail_log}}"
exit 1
fi
rm -f "${{fail_log}}"
"""
return textwrap.dedent(script).strip()
@dataclass(frozen=True)
class AssetDownloadItem:
path: str
url: str
class AssetDownloadService:
@staticmethod
def build_download_script(items: list[AssetDownloadItem], root_path: str) -> str:
# Build a portable shell script to download assets in parallel.
commands: list[str] = []
for item in items:
path = shlex.quote(item.path)
url = shlex.quote(item.url)
commands.append(f"download_one {path} {url} &")
download_commands = "\n".join(commands)
return _render_download_script(root_path, download_commands)

View File

@ -16,6 +16,7 @@ from core.app_assets.builder import AssetBuildPipeline, BuildContext
from core.app_assets.packager.zip_packager import ZipPackager
from core.app_assets.paths import AssetPaths
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from extensions.ext_storage import storage
from extensions.storage.file_presign_storage import FilePresignStorage
from libs.datetime_utils import naive_utc_now
@ -34,6 +35,85 @@ logger = logging.getLogger(__name__)
class AppAssetService:
MAX_PREVIEW_CONTENT_SIZE = 5 * 1024 * 1024 # 1MB
_PRESIGN_CACHE_TTL_BUFFER_SECONDS = 300
_PRESIGN_CACHE_MIN_TTL_SECONDS = 60
@staticmethod
def _draft_download_cache_key(storage_key: str) -> str:
# Cache key for a single draft asset download URL.
return f"app_asset:draft_download:{storage_key}"
@staticmethod
def _get_cached_download_urls(cache_keys: list[str]) -> list[str | None] | None:
# Return cached draft download URLs per asset if available.
try:
cached = redis_client.mget(cache_keys)
except Exception:
logger.warning("Failed to read draft download cache", exc_info=True)
return None
return cached
@staticmethod
def _set_cached_download_url(cache_key: str, url: str, expires_in: int) -> None:
# Store draft download URL with TTL slightly shorter than presign expiry.
ttl = max(
expires_in - AppAssetService._PRESIGN_CACHE_TTL_BUFFER_SECONDS,
AppAssetService._PRESIGN_CACHE_MIN_TTL_SECONDS,
)
try:
redis_client.setex(cache_key, ttl, url)
except Exception:
logger.warning("Failed to write draft download cache", exc_info=True)
@staticmethod
def _clear_draft_download_cache(storage_keys: list[str]) -> None:
# Clear draft download URL cache for specific assets.
if not storage_keys:
return
cache_keys = [AppAssetService._draft_download_cache_key(key) for key in storage_keys]
try:
redis_client.delete(*cache_keys)
except Exception:
logger.warning("Failed to clear draft download cache", exc_info=True)
@staticmethod
def get_cached_draft_download_urls(app_assets: AppAssets, *, expires_in: int = 3600) -> list[tuple[str, str]]:
# Build draft download URLs with cache to avoid repeated presign calls.
tree = app_assets.asset_tree
build_id = app_assets.id
presign_storage = FilePresignStorage(storage.storage_runner)
nodes = list(tree.walk_files())
if not nodes:
return []
storage_keys = [
AssetPaths.build_resolved_file(app_assets.tenant_id, app_assets.app_id, build_id, node.id)
if node.extension == "md"
else AssetPaths.draft_file(app_assets.tenant_id, app_assets.app_id, node.id)
for node in nodes
]
cache_keys = [AppAssetService._draft_download_cache_key(key) for key in storage_keys]
cached_values = AppAssetService._get_cached_download_urls(cache_keys)
if cached_values is None:
cached_values = [None] * len(nodes)
items: list[tuple[str, str]] = []
for node, storage_key, cache_key, cached in zip(nodes, storage_keys, cache_keys, cached_values):
path = tree.get_path(node.id)
if cached:
url = cached.decode("utf-8") if isinstance(cached, (bytes, bytearray)) else cached
else:
url = presign_storage.get_download_url(storage_key, expires_in)
AppAssetService._set_cached_download_url(cache_key, url, expires_in)
items.append((path, url))
return items
@staticmethod
def _draft_storage_key_for_node(tenant_id: str, app_id: str, assets_id: str, node: AppAssetNode) -> str:
if node.extension == "md":
return AssetPaths.build_resolved_file(tenant_id, app_id, assets_id, node.id)
return AssetPaths.draft_file(tenant_id, app_id, node.id)
@staticmethod
def get_or_create_assets(session: Session, app_model: App, account_id: str) -> AppAssets:
@ -167,6 +247,14 @@ class AppAssetService:
assets.updated_by = account_id
session.commit()
cache_key = AppAssetService._draft_storage_key_for_node(
app_model.tenant_id,
app_model.id,
assets.id,
node,
)
AppAssetService._clear_draft_download_cache([cache_key])
return node
@staticmethod
@ -211,6 +299,14 @@ class AppAssetService:
assets.updated_by = account_id
session.commit()
cache_key = AppAssetService._draft_storage_key_for_node(
app_model.tenant_id,
app_model.id,
assets.id,
node,
)
AppAssetService._clear_draft_download_cache([cache_key])
return node
@staticmethod
@ -224,6 +320,9 @@ class AppAssetService:
assets = AppAssetService.get_or_create_assets(session, app_model, account_id)
tree = assets.asset_tree
old_node = tree.get(node_id)
old_extension = old_node.extension if old_node else None
try:
node = tree.rename(node_id, new_name)
except TreeNodeNotFoundError as e:
@ -235,6 +334,25 @@ class AppAssetService:
assets.updated_by = account_id
session.commit()
if node.node_type == AssetNodeType.FILE:
cache_keys: list[str] = []
if old_extension is not None:
old_storage_key = (
AssetPaths.build_resolved_file(app_model.tenant_id, app_model.id, assets.id, node.id)
if old_extension == "md"
else AssetPaths.draft_file(app_model.tenant_id, app_model.id, node.id)
)
cache_keys.append(old_storage_key)
cache_keys.append(
AppAssetService._draft_storage_key_for_node(
app_model.tenant_id,
app_model.id,
assets.id,
node,
)
)
AppAssetService._clear_draft_download_cache(list(set(cache_keys)))
return node
@staticmethod
@ -261,6 +379,15 @@ class AppAssetService:
assets.updated_by = account_id
session.commit()
if node.node_type == AssetNodeType.FILE:
cache_key = AppAssetService._draft_storage_key_for_node(
app_model.tenant_id,
app_model.id,
assets.id,
node,
)
AppAssetService._clear_draft_download_cache([cache_key])
return node
@staticmethod
@ -291,6 +418,14 @@ class AppAssetService:
assets = AppAssetService.get_or_create_assets(session, app_model, account_id)
tree = assets.asset_tree
target_ids = [node_id] + tree.get_descendant_ids(node_id)
target_nodes = [tree.get(nid) for nid in target_ids]
cache_keys = [
AppAssetService._draft_storage_key_for_node(app_model.tenant_id, app_model.id, assets.id, node)
for node in target_nodes
if node is not None and node.node_type == AssetNodeType.FILE
]
try:
removed_ids = tree.remove(node_id)
except TreeNodeNotFoundError as e:
@ -307,6 +442,8 @@ class AppAssetService:
assets.updated_by = account_id
session.commit()
AppAssetService._clear_draft_download_cache(cache_keys)
@staticmethod
def publish(app_model: App, account_id: str) -> AppAssets:
tenant_id = app_model.tenant_id
@ -342,15 +479,11 @@ class AppAssetService:
@staticmethod
def build_assets(tenant_id: str, app_id: str, assets: AppAssets) -> None:
# Build resolved draft assets without packaging into a zip.
tree = assets.asset_tree
ctx = BuildContext(tenant_id=tenant_id, app_id=app_id, build_id=assets.id)
built_assets = AssetBuildPipeline().build_all(tree, ctx)
packager = ZipPackager(storage)
zip_bytes = packager.package(built_assets)
zip_key = AssetPaths.build_zip(tenant_id, app_id, assets.id)
storage.save(zip_key, zip_bytes)
AssetBuildPipeline().build_all(tree, ctx)
@staticmethod
def get_file_download_url(

View File

@ -0,0 +1,60 @@
from core.sandbox.services.asset_download_service import AssetDownloadItem, AssetDownloadService
def test_build_download_script_includes_downloader_detection() -> None:
script = AssetDownloadService.build_download_script([], "skills")
assert "command -v curl" in script
assert "command -v wget" in script
assert "command -v python3" in script
assert "No downloader found" in script
def test_build_download_script_contains_items_and_root() -> None:
items = [
AssetDownloadItem(path="/docs/readme.md", url="https://example.com/readme.md"),
AssetDownloadItem(path="/data/input.json", url="https://example.com/input.json"),
]
script = AssetDownloadService.build_download_script(items, "skills")
assert "download_root=skills" in script
assert "/docs/readme.md" in script
assert "https://example.com/readme.md" in script
assert "/data/input.json" in script
assert "https://example.com/input.json" in script
def test_build_download_script_escapes_paths_and_urls() -> None:
items = [
AssetDownloadItem(path='/space path/"quoted".txt', url="https://example.com/a?b=1&c=2"),
AssetDownloadItem(path=r"/path/with\\backslash", url="https://example.com/with space"),
]
script = AssetDownloadService.build_download_script(items, "skills")
assert "'" in script
assert "\\\\" in script
assert "?b=1&c=2" in script
def test_build_download_script_runs_parallel_jobs() -> None:
script = AssetDownloadService.build_download_script([], "skills")
assert "download_one" in script
assert "&" in script
assert "wait" in script
def test_build_download_script_appends_failures() -> None:
script = AssetDownloadService.build_download_script([], "skills")
assert "fail_log" in script
assert "Failed downloads" in script
def test_build_download_script_contains_python_fallback() -> None:
script = AssetDownloadService.build_download_script([], "skills")
assert "python3 -" in script
assert "urllib.request" in script