refactor: unify download item types and eliminate extension-based branching

Merge AssetDownloadItem, AssetInlineItem into SandboxDownloadItem with
optional 'content' field. All consumers now follow a clean pipeline:
  get items → accessor.resolve_items() → AppAssetService.to_download_items() → download

Key changes:
- SandboxDownloadItem gains content: bytes | None (entities.py)
- ZipSandbox.download_items() handles both inline (base64 heredoc) and
  remote (curl) via a single pipeline — no structural branching
- AssetDownloadService.build_download_script() takes unified list
- CachedContentAccessor.resolve_items() batch-enriches items from DB
  (extension-agnostic, no 'if md' checks needed)
- AppAssetService.to_download_items() converts AssetItem → SandboxDownloadItem
- DraftAppAssetsInitializer, package_and_upload, export_bundle simplified
- file_upload/node.py switched to SandboxDownloadItem
- Deleted AssetDownloadItem and AssetInlineItem classes
This commit is contained in:
Harry
2026-03-10 17:11:41 +08:00
parent 6ac730ec2e
commit 65e89520c0
19 changed files with 492 additions and 214 deletions

View File

@ -347,3 +347,6 @@ class AppAssetFileTree(BaseModel):
build_view(root_node, "")
return [tree_views[n.id] for n in by_parent.get(None, [])]
def empty(self) -> bool:
return len(self.nodes) == 0

View File

@ -6,12 +6,9 @@ All methods accept an AppAssetNode parameter to identify the target.
CachedContentAccessor is the primary entry point:
- Reads DB first, misses fall through to S3 with sync backfill.
- Writes go to both DB and S3 (dual-write).
- Wraps an internal StorageContentAccessor for S3 I/O.
Public helper:
- should_mirror(extension) — the ONLY place that maps file extensions to the
"should this node use DB mirror?" decision. All callers (presigned-upload
gating, etc.) should use this function instead of hard-coding extension checks.
- resolve_items() batch-enriches AssetItem lists with DB-cached content
(extension-agnostic), so callers never need to filter by extension.
- Wraps an internal _StorageAccessor for S3 I/O.
Collaborators:
- services.asset_content_service.AssetContentService (DB layer)
@ -24,29 +21,13 @@ from __future__ import annotations
import logging
from core.app.entities.app_asset_entities import AppAssetNode
from core.app_assets.entities.assets import AssetItem
from core.app_assets.storage import AssetPaths
from extensions.storage.cached_presign_storage import CachedPresignStorage
from services.asset_content_service import AssetContentService
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Extension-based policy — the single source of truth
# ---------------------------------------------------------------------------
_MIRROR_EXTENSIONS: frozenset[str] = frozenset({"md"})
def should_mirror(extension: str) -> bool:
"""Return True if files with *extension* should be cached in DB.
This is the ONLY place that maps file extensions to the inline-mirror
decision. All other modules should call this function instead of
checking extensions directly.
"""
return extension.lower() in _MIRROR_EXTENSIONS
# ---------------------------------------------------------------------------
# S3-only implementation (internal, used as inner delegate)
# ---------------------------------------------------------------------------
@ -162,6 +143,38 @@ class CachedContentAccessor:
)
self._inner.save(node, content)
def resolve_items(self, items: list[AssetItem]) -> list[AssetItem]:
"""Batch-enrich asset items with DB-cached content.
Queries by ``asset_id`` only — extension-agnostic. Items without
a DB cache row keep their original *content* value (typically
``None``), so only genuinely cached assets (e.g. ``.md`` skill
documents) get populated.
This eliminates the need for callers to filter by file extension
before deciding whether to read from the DB cache.
"""
if not items:
return items
node_ids = [a.asset_id for a in items]
cached = AssetContentService.get_many(self._tenant_id, self._app_id, node_ids)
if not cached:
return items
return [
AssetItem(
asset_id=a.asset_id,
path=a.path,
file_name=a.file_name,
extension=a.extension,
storage_key=a.storage_key,
content=cached[a.asset_id].encode("utf-8") if a.asset_id in cached else a.content,
)
for a in items
]
def delete(self, node: AppAssetNode) -> None:
AssetContentService.delete(self._tenant_id, self._app_id, node.id)
self._inner.delete(node)

View File

@ -1,14 +1,25 @@
"""Builder that compiles ``.md`` skill documents into resolved content.
The builder reads raw draft content from the DB-backed accessor, parses
each into a ``SkillDocument``, assembles a ``SkillBundle`` (with
transitive tool/file dependency resolution), and returns ``AssetItem``
objects whose *content* field carries the resolved bytes in-process.
No S3 writes happen here — the only persistence is the ``SkillBundle``
saved via ``SkillManager`` (S3 + Redis cache invalidation) so that
downstream consumers (``SkillInitializer``, ``DifyCliInitializer``) can
load it later.
"""
import json
import logging
from core.app.entities.app_asset_entities import AppAssetFileTree, AppAssetNode
from core.app_assets.accessor import CachedContentAccessor
from core.app_assets.entities import AssetItem
from core.app_assets.storage import AssetPaths
from core.skill.assembler import SkillBundleAssembler
from core.skill.entities.skill_bundle import SkillBundle
from core.skill.entities.skill_document import SkillDocument
from extensions.storage.base_storage import BaseStorage
from .base import BuildContext
@ -18,12 +29,10 @@ logger = logging.getLogger(__name__)
class SkillBuilder:
_nodes: list[tuple[AppAssetNode, str]]
_accessor: CachedContentAccessor
_storage: BaseStorage
def __init__(self, accessor: CachedContentAccessor, storage: BaseStorage) -> None:
def __init__(self, accessor: CachedContentAccessor) -> None:
self._nodes = []
self._accessor = accessor
self._storage = storage
def accept(self, node: AppAssetNode) -> bool:
return node.extension == "md"
@ -66,15 +75,14 @@ class SkillBuilder:
skill = bundle.get(node.id)
if skill is None:
continue
storage_key = AssetPaths.resolved(ctx.tenant_id, ctx.app_id, ctx.build_id, node.id)
self._storage.save(storage_key, skill.content.encode("utf-8"))
items.append(
AssetItem(
asset_id=node.id,
path=path,
file_name=node.name,
extension=node.extension or "",
storage_key=storage_key,
storage_key="",
content=skill.content.encode("utf-8"),
)
)
return items

View File

@ -1,10 +1,20 @@
from dataclasses import dataclass
from dataclasses import dataclass, field
@dataclass
class AssetItem:
"""A single asset file produced by the build pipeline.
When *content* is set the payload is available in-process and can be
written directly into a ZIP or uploaded to a sandbox VM without an
extra S3 round-trip. When *content* is ``None`` the caller should
fetch the bytes from *storage_key* (the traditional presigned-URL
path).
"""
asset_id: str
path: str
file_name: str
extension: str
storage_key: str
content: bytes | None = field(default=None, repr=False)

View File

@ -37,15 +37,6 @@ class AssetPaths:
_check_uuid(assets_id, "assets_id")
return f"{_BASE}/{tenant_id}/{app_id}/artifacts/{assets_id}.zip"
@staticmethod
def resolved(tenant_id: str, app_id: str, assets_id: str, node_id: str) -> str:
"""app_assets/{tenant}/{app}/artifacts/{assets_id}/resolved/{node_id}"""
_check_uuid(tenant_id, "tenant_id")
_check_uuid(app_id, "app_id")
_check_uuid(assets_id, "assets_id")
_check_uuid(node_id, "node_id")
return f"{_BASE}/{tenant_id}/{app_id}/artifacts/{assets_id}/resolved/{node_id}"
@staticmethod
def skill_bundle(tenant_id: str, app_id: str, assets_id: str) -> str:
"""app_assets/{tenant}/{app}/artifacts/{assets_id}/skill_artifact_set.json"""

View File

@ -13,12 +13,12 @@ from core.virtual_environment.__base.helpers import pipeline
from ..bash.dify_cli import DifyCliConfig, DifyCliLocator
from ..entities import DifyCli
from .base import AsyncSandboxInitializer
from .base import SyncSandboxInitializer
logger = logging.getLogger(__name__)
class DifyCliInitializer(AsyncSandboxInitializer):
class DifyCliInitializer(SyncSandboxInitializer):
def __init__(
self,
tenant_id: str,

View File

@ -1,23 +1,46 @@
"""Async initializer that populates a draft sandbox with app asset files.
Unlike ``AppAssetsInitializer`` (which downloads a pre-built ZIP for
published assets), this initializer runs the build pipeline on the fly
so that ``.md`` skill documents are compiled and their resolved content
is embedded directly into the download script — avoiding the S3
round-trip that was previously required for resolved keys.
Execution order guarantee:
This runs as an ``AsyncSandboxInitializer`` in the background thread.
By the time it finishes, ``SkillManager.save_bundle()`` has been
called (inside ``SkillBuilder.build()``), so subsequent initializers
like ``DifyCliInitializer`` can safely load the bundle from Redis/S3.
"""
import logging
from core.app_assets.accessor import should_mirror
from core.app_assets.builder.base import BuildContext
from core.app_assets.builder.file_builder import FileBuilder
from core.app_assets.builder.pipeline import AssetBuildPipeline
from core.app_assets.builder.skill_builder import SkillBuilder
from core.app_assets.constants import AppAssetsAttrs
from core.app_assets.storage import AssetPaths
from core.sandbox.entities import AppAssets
from core.sandbox.sandbox import Sandbox
from core.sandbox.services import AssetDownloadService
from core.sandbox.services.asset_download_service import AssetDownloadItem
from core.virtual_environment.__base.helpers import pipeline
from services.app_asset_service import AppAssetService
from .base import AsyncSandboxInitializer
from .base import SyncSandboxInitializer
logger = logging.getLogger(__name__)
_TIMEOUT = 600 # 10 minutes
class DraftAppAssetsInitializer(AsyncSandboxInitializer):
class DraftAppAssetsInitializer(SyncSandboxInitializer):
"""Compile draft assets and push them into the sandbox VM.
``.md`` (skill) files are compiled in-process and their resolved
content is embedded as base64 heredocs in the download script.
All other files are fetched from S3 via presigned URLs.
"""
def __init__(self, tenant_id: str, app_id: str, assets_id: str) -> None:
self._tenant_id = tenant_id
self._app_id = app_id
@ -25,22 +48,22 @@ class DraftAppAssetsInitializer(AsyncSandboxInitializer):
def initialize(self, sandbox: Sandbox) -> None:
vm = sandbox.vm
build_id = self._assets_id
tree = sandbox.attrs.get(AppAssetsAttrs.FILE_TREE)
asset_storage = AppAssetService.get_storage()
nodes = list(tree.walk_files())
if not nodes:
if tree.empty():
return
# Inline-mirror nodes use the resolved (compiled) key; others use draft.
keys = [
AssetPaths.resolved(self._tenant_id, self._app_id, build_id, node.id)
if should_mirror(node.extension)
else AssetPaths.draft(self._tenant_id, self._app_id, node.id)
for node in nodes
]
urls = asset_storage.get_download_urls(keys, _TIMEOUT)
items = [AssetDownloadItem(path=tree.get_path(node.id).lstrip("/"), url=url) for node, url in zip(nodes, urls)]
script = AssetDownloadService.build_download_script(items, AppAssets.PATH)
# --- 1. Run the build pipeline (SkillBuilder compiles .md inline) ---
accessor = AppAssetService.get_accessor(self._tenant_id, self._app_id)
build_pipeline = AssetBuildPipeline([SkillBuilder(accessor=accessor), FileBuilder()])
ctx = BuildContext(tenant_id=self._tenant_id, app_id=self._app_id, build_id=self._assets_id)
built_assets = build_pipeline.build_all(tree, ctx)
if not built_assets:
return
# --- 2. Convert to unified download items and execute ---
download_items = AppAssetService.to_download_items(built_assets)
script = AssetDownloadService.build_download_script(download_items, AppAssets.PATH)
pipeline(vm).add(
["sh", "-c", script],
error_message="Failed to download draft assets",

View File

@ -1,11 +1,46 @@
"""Shell script builder for downloading / writing assets into a sandbox VM.
Generates a self-contained POSIX shell script that handles two kinds of
``SandboxDownloadItem``:
- Items with *content* — written via base64 heredoc (sequential).
- Items with *url* — fetched via ``curl``/``wget``/``python3`` with
auto-detection, run as parallel background jobs.
Both kinds can be mixed freely in a single call.
"""
from __future__ import annotations
import base64
import shlex
import textwrap
from dataclasses import dataclass
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.zip_sandbox.entities import SandboxDownloadItem
def _render_download_script(root_path: str, download_commands: str) -> str:
def _build_inline_commands(items: list[SandboxDownloadItem], root_var: str) -> str:
"""Generate shell commands that write base64-encoded content to files."""
lines: list[str] = []
for idx, item in enumerate(items):
assert item.content is not None
dest = f"${{{root_var}}}/{shlex.quote(item.path)}"
encoded = base64.b64encode(item.content).decode("ascii")
lines.append(f'mkdir -p "$(dirname "{dest}")"')
lines.append(f"base64 -d <<'_INLINE_{idx}' > \"{dest}\"")
lines.append(encoded)
lines.append(f"_INLINE_{idx}")
return "\n".join(lines)
def _render_download_script(
root_path: str,
inline_commands: str,
download_commands: str,
need_downloader: bool,
) -> str:
python_download_cmd = (
'python3 - "${url}" "${dest}" <<"PY"\n'
"import sys\n"
@ -18,59 +53,88 @@ def _render_download_script(root_path: str, download_commands: str) -> str:
" f.write(data)\n"
"PY"
)
script = f"""
download_root={shlex.quote(root_path)}
if command -v curl >/dev/null 2>&1; then
download_cmd='curl -fsSL "${{url}}" -o "${{dest}}"'
elif command -v wget >/dev/null 2>&1; then
download_cmd='wget -q "${{url}}" -O "${{dest}}"'
elif command -v python3 >/dev/null 2>&1; then
download_cmd={shlex.quote(python_download_cmd)}
else
echo 'No downloader found (curl/wget/python3)' >&2
exit 1
fi
# Only emit the downloader-detection block when there are remote items.
if need_downloader:
downloader_block = f"""\
if command -v curl >/dev/null 2>&1; then
download_cmd='curl -fsSL "${{url}}" -o "${{dest}}"'
elif command -v wget >/dev/null 2>&1; then
download_cmd='wget -q "${{url}}" -O "${{dest}}"'
elif command -v python3 >/dev/null 2>&1; then
download_cmd={shlex.quote(python_download_cmd)}
else
echo 'No downloader found (curl/wget/python3)' >&2
exit 1
fi
mkdir -p "${{download_root}}"
fail_log="$(mktemp)"
fail_log="$(mktemp)"
download_one() {{
file_path="$1"
url="$2"
dest="${{download_root}}/${{file_path}}"
mkdir -p "$(dirname "${{dest}}")"
eval "${{download_cmd}}" 2>/dev/null || echo "${{file_path}}" >> "${{fail_log}}"
}}
download_one() {{
file_path="$1"
url="$2"
dest="${{download_root}}/${{file_path}}"
mkdir -p "$(dirname "${{dest}}")"
eval "${{download_cmd}}" 2>/dev/null || echo "${{file_path}}" >> "${{fail_log}}"
}}"""
else:
downloader_block = ""
{download_commands}
# The failure-check block is only meaningful when downloads occurred.
if need_downloader:
wait_block = textwrap.dedent("""\
wait
wait
if [ -s "${fail_log}" ]; then
mv "${fail_log}" "${download_root}/DOWNLOAD_FAILURES.txt"
else
rm -f "${fail_log}"
fi""")
else:
wait_block = ""
if [ -s "${{fail_log}}" ]; then
mv "${{fail_log}}" "${{download_root}}/DOWNLOAD_FAILURES.txt"
else
rm -f "${{fail_log}}"
fi
exit 0
"""
return textwrap.dedent(script).strip()
script = f"""\
download_root={shlex.quote(root_path)}
mkdir -p "${{download_root}}"
{downloader_block}
@dataclass(frozen=True)
class AssetDownloadItem:
path: str
url: str
{inline_commands}
{download_commands}
{wait_block}
exit 0"""
return script
class AssetDownloadService:
@staticmethod
def build_download_script(items: list[AssetDownloadItem], root_path: str) -> str:
# Build a portable shell script to download assets in parallel.
def build_download_script(
items: list[SandboxDownloadItem],
root_path: str,
) -> str:
"""Build a portable shell script to write inline assets and download remote ones.
Items with *content* are written first (sequential base64 decode),
then items with *url* are fetched in parallel background jobs.
The two kinds can be mixed freely in a single list.
"""
inline = [item for item in items if item.content is not None]
remote = [item for item in items if item.content is None]
inline_commands = _build_inline_commands(inline, "download_root") if inline else ""
commands: list[str] = []
for item in items:
for item in remote:
path = shlex.quote(item.path)
url = shlex.quote(item.url)
commands.append(f"download_one {path} {url} &")
download_commands = "\n".join(commands)
return _render_download_script(root_path, download_commands)
return _render_download_script(
root_path,
inline_commands,
download_commands,
need_downloader=bool(remote),
)

View File

@ -7,7 +7,6 @@ from typing import Any, cast
from core.file import File, FileTransferMethod
from core.sandbox.bash.session import SANDBOX_READY_TIMEOUT
from core.sandbox.services.asset_download_service import AssetDownloadItem
from core.variables import ArrayFileSegment
from core.variables.segments import ArrayStringSegment, FileSegment
from core.virtual_environment.__base.command_future import CommandCancelledError, CommandTimeoutError
@ -15,6 +14,7 @@ from core.virtual_environment.__base.helpers import pipeline
from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus
from core.workflow.node_events import NodeRunResult
from core.workflow.nodes.base.node import Node
from core.zip_sandbox import SandboxDownloadItem
from .entities import FileUploadNodeData
from .exc import FileUploadDownloadError, FileUploadNodeError
@ -90,7 +90,7 @@ class FileUploadNode(Node[FileUploadNodeData]):
try:
sandbox.wait_ready(timeout=SANDBOX_READY_TIMEOUT)
download_items: list[AssetDownloadItem] = self._build_download_items(files)
download_items: list[SandboxDownloadItem] = self._build_download_items(files)
sandbox_paths = self._upload(sandbox.vm, download_items)
file_names = [PurePosixPath(path).name for path in sandbox_paths]
process_data = {
@ -178,9 +178,9 @@ class FileUploadNode(Node[FileUploadNodeData]):
return files
return []
def _build_download_items(self, files: Sequence[File]) -> list[AssetDownloadItem]:
def _build_download_items(self, files: Sequence[File]) -> list[SandboxDownloadItem]:
used_paths: set[str] = set()
items: list[AssetDownloadItem] = []
items: list[SandboxDownloadItem] = []
for index, file in enumerate(files):
file_url = self._get_download_url(file)
@ -198,7 +198,7 @@ class FileUploadNode(Node[FileUploadNodeData]):
dedupe += 1
used_paths.add(filename)
items.append(AssetDownloadItem(path=filename, url=file_url))
items.append(SandboxDownloadItem(path=filename, url=file_url))
return items
@staticmethod
@ -208,7 +208,7 @@ class FileUploadNode(Node[FileUploadNodeData]):
normalized = normalized.lstrip("/")
return normalized or "."
def _upload(self, vm: Any, items: list[AssetDownloadItem]) -> list[str]:
def _upload(self, vm: Any, items: list[SandboxDownloadItem]) -> list[str]:
p = pipeline(vm)
out_paths: list[str] = []
for item in items:

View File

@ -1,4 +1,11 @@
from .zip_sandbox import SandboxDownloadItem, SandboxFile, SandboxUploadItem, ZipSandbox
from __future__ import annotations
from typing import TYPE_CHECKING
from .entities import SandboxDownloadItem, SandboxFile, SandboxUploadItem
if TYPE_CHECKING:
from .zip_sandbox import ZipSandbox
__all__ = [
"SandboxDownloadItem",
@ -6,3 +13,11 @@ __all__ = [
"SandboxUploadItem",
"ZipSandbox",
]
def __getattr__(name: str):
if name == "ZipSandbox":
from .zip_sandbox import ZipSandbox
return ZipSandbox
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

View File

@ -0,0 +1,39 @@
"""Data classes for ZipSandbox file operations.
Separated from ``zip_sandbox.py`` so that lightweight consumers (tests,
shell-script builders) can import the types without pulling in the full
sandbox provider chain.
"""
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass(frozen=True)
class SandboxDownloadItem:
"""Unified download/inline item for sandbox file operations.
For remote files, *url* is set and the item is fetched via ``curl``.
For inline content, *content* is set and the bytes are written directly
into the VM via ``upload_file`` — no network round-trip.
"""
path: str
url: str = ""
content: bytes | None = field(default=None, repr=False)
@dataclass(frozen=True)
class SandboxUploadItem:
"""Item for uploading: sandbox path -> URL."""
path: str
url: str
@dataclass(frozen=True)
class SandboxFile:
"""A handle to a file in the sandbox."""
path: str

View File

@ -1,7 +1,8 @@
from __future__ import annotations
import base64
import posixpath
from dataclasses import dataclass
import shlex
from io import BytesIO
from pathlib import PurePosixPath
from types import TracebackType
@ -20,34 +21,12 @@ from core.virtual_environment.__base.virtual_environment import VirtualEnvironme
from services.sandbox.sandbox_provider_service import SandboxProviderService
from .cli_strategy import CliZipStrategy
from .entities import SandboxDownloadItem, SandboxFile, SandboxUploadItem
from .node_strategy import NodeZipStrategy
from .python_strategy import PythonZipStrategy
from .strategy import ZipStrategy
@dataclass(frozen=True)
class SandboxDownloadItem:
"""Item for downloading: URL -> sandbox path."""
url: str
path: str
@dataclass(frozen=True)
class SandboxUploadItem:
"""Item for uploading: sandbox path -> URL."""
path: str
url: str
@dataclass(frozen=True)
class SandboxFile:
"""A handle to a file in the sandbox."""
path: str
class ZipSandbox:
"""A sandbox for archive (zip) operations.
@ -221,6 +200,12 @@ class ZipSandbox:
# ========== Download operations ==========
def download_items(self, items: list[SandboxDownloadItem], *, dest_dir: str = ".") -> list[str]:
"""Download or write items into the sandbox via a single pipeline.
Remote items (with *url*) are fetched via ``curl``. Inline items
(with *content*) are written via ``base64 -d`` heredoc. Both go
through the same pipeline — no branching at the structural level.
"""
if not items:
return []
@ -238,7 +223,10 @@ class ZipSandbox:
out_dir = posixpath.dirname(out_path)
if out_dir not in ("", "."):
p.add(["mkdir", "-p", out_dir], error_message="Failed to create download directory")
p.add(["curl", "-fsSL", item.url, "-o", out_path], error_message="Failed to download file")
p.add(
self.to_download_command(item, out_path),
error_message=f"Failed to write {item.path}",
)
try:
p.execute(timeout=self._DEFAULT_TIMEOUT_SECONDS, raise_on_error=True)
@ -247,6 +235,14 @@ class ZipSandbox:
return out_paths
@staticmethod
def to_download_command(item: SandboxDownloadItem, out_path: str) -> list[str]:
"""Return the shell command to materialise *item* at *out_path*."""
if item.content is not None:
encoded = base64.b64encode(item.content).decode("ascii")
return ["sh", "-c", f"base64 -d <<'_B64_' > {shlex.quote(out_path)}\n{encoded}\n_B64_"]
return ["curl", "-fsSL", item.url, "-o", out_path]
def download_archive(self, archive_url: str, *, path: str = "input.tar.gz") -> str:
path = self._normalize_path(path)

View File

@ -6,6 +6,13 @@ separated from AppAssetService to avoid circular imports.
Dependency flow:
core/* -> AppAssetPackageService -> AppAssetService
(core modules can import this service without circular dependency)
Inline content optimisation:
``AssetItem`` objects returned by the build pipeline may carry an
in-process *content* field (e.g. resolved ``.md`` skill documents).
``AppAssetService.to_download_items()`` converts these into unified
``SandboxDownloadItem`` instances, and ``ZipSandbox.download_items()``
handles both inline and remote items natively.
"""
import logging
@ -19,7 +26,7 @@ from core.app_assets.builder.file_builder import FileBuilder
from core.app_assets.builder.skill_builder import SkillBuilder
from core.app_assets.entities.assets import AssetItem
from core.app_assets.storage import AssetPaths
from core.zip_sandbox import SandboxDownloadItem, ZipSandbox
from core.zip_sandbox import ZipSandbox
from models.app_asset import AppAssets
from models.model import App
@ -84,15 +91,12 @@ class AppAssetPackageService:
) -> None:
"""Package assets into a ZIP and upload directly to the given URL.
When *assets* is empty an empty ZIP is written directly to storage
using *storage_key*, bypassing the HTTP ticket URL. This avoids a
``ConnectionError`` when the api process cannot reach the ticket
endpoint (e.g. ``localhost:80`` inside a Docker container where nginx
runs in a separate service).
Uses ``AppAssetService.to_download_items()`` to convert assets
into unified download items, then ``ZipSandbox.download_items()``
handles both inline content and remote presigned URLs natively.
For non-empty assets the ZIP is built inside a remote sandbox VM
which uploads via ``curl`` to *upload_url* (the sandbox container
*can* reach the ticket endpoint thanks to socat forwarding).
When *assets* is empty an empty ZIP is written directly to storage
using *storage_key*, bypassing the HTTP ticket URL.
"""
from services.app_asset_service import AppAssetService
@ -119,12 +123,8 @@ class AppAssetPackageService:
requests.put(upload_url, data=buf.getvalue(), timeout=30)
return
asset_storage = AppAssetService.get_storage()
keys = [AssetPaths.draft(tenant_id, app_id, asset.asset_id) for asset in assets]
download_urls = asset_storage.get_download_urls(keys)
download_items = [
SandboxDownloadItem(url=url, path=asset.path) for asset, url in zip(assets, download_urls, strict=True)
]
download_items = AppAssetService.to_download_items(assets)
with ZipSandbox(tenant_id=tenant_id, user_id=user_id, app_id="asset-packager") as zs:
zs.download_items(download_items)
archive = zs.zip()
@ -134,7 +134,11 @@ class AppAssetPackageService:
def publish(session: Session, app_model: App, account_id: str, workflow_id: str) -> AppAssets:
"""Publish app assets for a workflow.
Creates a versioned copy of draft assets and packages them for runtime use.
Creates a versioned copy of draft assets and packages them for
runtime use. The build ZIP contains resolved ``.md`` content
(inline from ``SkillBuilder``) and raw draft content for all
other files. A separate source ZIP snapshots the raw drafts for
later export.
"""
from services.app_asset_service import AppAssetService
@ -159,10 +163,11 @@ class AppAssetPackageService:
asset_storage = AppAssetService.get_storage()
accessor = AppAssetService.get_accessor(tenant_id, app_id)
pipeline = AssetBuildPipeline([SkillBuilder(accessor=accessor, storage=asset_storage), FileBuilder()])
build_pipeline = AssetBuildPipeline([SkillBuilder(accessor=accessor), FileBuilder()])
ctx = BuildContext(tenant_id=tenant_id, app_id=app_id, build_id=publish_id)
built_assets = pipeline.build_all(tree, ctx)
built_assets = build_pipeline.build_all(tree, ctx)
# Runtime ZIP: resolved .md (inline) + raw draft (remote).
runtime_zip_key = AssetPaths.build_zip(tenant_id, app_id, publish_id)
runtime_upload_url = asset_storage.get_upload_url(runtime_zip_key)
AppAssetPackageService.package_and_upload(
@ -174,6 +179,7 @@ class AppAssetPackageService:
storage_key=runtime_zip_key,
)
# Source ZIP: all raw draft content (for export/restore).
source_items = AppAssetService.get_draft_assets(tenant_id, app_id)
source_key = AssetPaths.source_zip(tenant_id, app_id, workflow_id)
source_upload_url = asset_storage.get_upload_url(source_key)
@ -187,28 +193,3 @@ class AppAssetPackageService:
)
return published
@staticmethod
def build_assets(tenant_id: str, app_id: str, assets: AppAssets) -> None:
"""Build resolved draft assets without packaging into a zip."""
from services.app_asset_service import AppAssetService
tree = assets.asset_tree
asset_storage = AppAssetService.get_storage()
accessor = AppAssetService.get_accessor(tenant_id, app_id)
pipeline = AssetBuildPipeline([SkillBuilder(accessor=accessor, storage=asset_storage), FileBuilder()])
ctx = BuildContext(tenant_id=tenant_id, app_id=app_id, build_id=assets.id)
built_assets: list[AssetItem] = pipeline.build_all(tree, ctx)
user_id = getattr(assets, "updated_by", None) or getattr(assets, "created_by", None) or "system"
key = AssetPaths.build_zip(tenant_id, app_id, assets.id)
upload_url = asset_storage.get_upload_url(key)
AppAssetPackageService.package_and_upload(
assets=built_assets,
upload_url=upload_url,
tenant_id=tenant_id,
app_id=app_id,
user_id=user_id,
storage_key=key,
)

View File

@ -1,3 +1,5 @@
from __future__ import annotations
import logging
import threading
from uuid import uuid4
@ -16,6 +18,7 @@ from core.app.entities.app_asset_entities import (
from core.app_assets.accessor import CachedContentAccessor
from core.app_assets.entities.assets import AssetItem
from core.app_assets.storage import AssetPaths
from core.zip_sandbox import SandboxDownloadItem
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from extensions.ext_storage import storage
@ -214,6 +217,46 @@ class AppAssetService:
"""Get a content accessor with DB caching for the given app."""
return CachedContentAccessor(AppAssetService.get_storage(), tenant_id, app_id)
# Default TTL for presigned download URLs generated by to_download_items().
_DOWNLOAD_URL_TTL_SECONDS = 600
@staticmethod
def to_download_items(
items: list[AssetItem],
*,
path_prefix: str = "",
) -> list[SandboxDownloadItem]:
"""Convert asset items to unified download items.
Items with *content* become inline ``SandboxDownloadItem`` instances
(no presigned URL needed). Items without *content* get presigned
download URLs from storage.
*path_prefix*, when set, is prepended to every item path
(e.g. ``"my-app"`` → ``"my-app/skills/foo.md"``).
"""
from core.zip_sandbox import SandboxDownloadItem
inline: list[SandboxDownloadItem] = []
remote_items: list[tuple[AssetItem, str]] = [] # (item, path)
for item in items:
path = f"{path_prefix}/{item.path}" if path_prefix else item.path
if item.content is not None:
inline.append(SandboxDownloadItem(path=path, content=item.content))
else:
remote_items.append((item, path))
result = list(inline)
if remote_items:
asset_storage = AppAssetService.get_storage()
keys = [a.storage_key for a, _ in remote_items]
urls = asset_storage.get_download_urls(keys, AppAssetService._DOWNLOAD_URL_TTL_SECONDS)
for (_, path), url in zip(remote_items, urls, strict=True):
result.append(SandboxDownloadItem(path=path, url=url))
return result
@staticmethod
def get_file_content(app_model: App, account_id: str, node_id: str) -> bytes:
with Session(db.engine) as session:

View File

@ -37,7 +37,7 @@ from core.app.entities.app_bundle_entities import (
BundleManifest,
)
from core.app_assets.storage import AssetPaths
from core.zip_sandbox import SandboxDownloadItem, SandboxUploadItem, ZipSandbox
from core.zip_sandbox import SandboxUploadItem, ZipSandbox
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from extensions.storage.cached_presign_storage import CachedPresignStorage
@ -131,16 +131,10 @@ class AppBundleService:
else:
asset_items = AppAssetService.get_draft_assets(tenant_id, app_id)
if asset_items:
asset_urls = asset_storage.get_download_urls(
[AssetPaths.draft(tenant_id, app_id, a.asset_id) for a in asset_items], expires_in
)
zs.download_items(
[
SandboxDownloadItem(url=url, path=f"{safe_name}/{a.path}")
for a, url in zip(asset_items, asset_urls, strict=True)
],
dest_dir="bundle_root",
)
accessor = AppAssetService.get_accessor(tenant_id, app_id)
resolved = accessor.resolve_items(asset_items)
download_items = AppAssetService.to_download_items(resolved, path_prefix=safe_name)
zs.download_items(download_items, dest_dir="bundle_root")
archive = zs.zip(src="bundle_root", include_base=False)
zs.upload(archive, upload_url)

View File

@ -1,3 +1,19 @@
"""Service for creating and managing sandbox instances.
Three creation paths:
- ``create()`` — published runtime. Downloads the pre-built ZIP via
``AppAssetsInitializer`` and loads the ``SkillBundle`` via
``SkillInitializer``.
- ``create_draft()`` / ``create_for_single_step()`` — draft runtime.
``DraftAppAssetsInitializer`` runs the build pipeline on the fly,
compiles ``.md`` skills (saving the ``SkillBundle`` to Redis/S3 as a
side-effect), and pushes resolved content as inline base64 into the
sandbox. ``SkillInitializer`` then loads the bundle from Redis/S3.
No separate ``build_assets()`` call is needed.
"""
import logging
from core.sandbox.builder import SandboxBuilder
@ -11,7 +27,6 @@ from core.sandbox.initializer.skill_initializer import SkillInitializer
from core.sandbox.sandbox import Sandbox
from core.sandbox.storage.archive_storage import ArchiveSandboxStorage
from extensions.ext_storage import storage
from services.app_asset_package_service import AppAssetPackageService
from services.app_asset_service import AppAssetService
logger = logging.getLogger(__name__)
@ -67,7 +82,6 @@ class SandboxService:
if not assets:
raise ValueError(f"No assets found for tid={tenant_id}, app_id={app_id}")
AppAssetPackageService.build_assets(tenant_id, app_id, assets)
sandbox_id = SandboxBuilder.draft_id(user_id)
archive_storage = ArchiveSandboxStorage(
tenant_id, app_id, sandbox_id, storage.storage_runner, exclude_patterns=[AppAssets.PATH]
@ -101,7 +115,6 @@ class SandboxService:
if not assets:
raise ValueError(f"No assets found for tid={tenant_id}, app_id={app_id}")
AppAssetPackageService.build_assets(tenant_id, app_id, assets)
sandbox_id = SandboxBuilder.draft_id(user_id)
archive_storage = ArchiveSandboxStorage(
tenant_id, app_id, sandbox_id, storage.storage_runner, exclude_patterns=[AppAssets.PATH]

View File

@ -104,12 +104,6 @@ def test_build_zip_storage_key():
assert key == f"app_assets/{tid}/{aid}/artifacts/{assets_id}.zip"
def test_resolved_storage_key():
tid, aid, assets_id, nid = str(uuid4()), str(uuid4()), str(uuid4()), str(uuid4())
key = AssetPaths.resolved(tid, aid, assets_id, nid)
assert key == f"app_assets/{tid}/{aid}/artifacts/{assets_id}/resolved/{nid}"
def test_skill_bundle_storage_key():
tid, aid, assets_id = str(uuid4()), str(uuid4()), str(uuid4())
key = AssetPaths.skill_bundle(tid, aid, assets_id)

View File

@ -1,8 +1,22 @@
from core.sandbox.services.asset_download_service import AssetDownloadItem, AssetDownloadService
"""Tests for AssetDownloadService shell script generation.
Covers three scenarios:
1. Remote-only items (presigned URL download)
2. Inline-only items (base64 heredoc write)
3. Mixed (inline written first, then parallel downloads)
"""
import base64
from core.sandbox.services.asset_download_service import AssetDownloadService
from core.zip_sandbox.entities import SandboxDownloadItem
# --- Remote-only tests ---
def test_build_download_script_includes_downloader_detection() -> None:
script = AssetDownloadService.build_download_script([], "skills")
def test_remote_only_includes_downloader_detection() -> None:
items = [SandboxDownloadItem(path="file.txt", url="https://example.com/file.txt")]
script = AssetDownloadService.build_download_script(items, "skills")
assert "command -v curl" in script
assert "command -v wget" in script
@ -10,10 +24,10 @@ def test_build_download_script_includes_downloader_detection() -> None:
assert "No downloader found" in script
def test_build_download_script_contains_items_and_root() -> None:
def test_remote_only_contains_items_and_root() -> None:
items = [
AssetDownloadItem(path="/docs/readme.md", url="https://example.com/readme.md"),
AssetDownloadItem(path="/data/input.json", url="https://example.com/input.json"),
SandboxDownloadItem(path="/docs/readme.md", url="https://example.com/readme.md"),
SandboxDownloadItem(path="/data/input.json", url="https://example.com/input.json"),
]
script = AssetDownloadService.build_download_script(items, "skills")
@ -25,10 +39,10 @@ def test_build_download_script_contains_items_and_root() -> None:
assert "https://example.com/input.json" in script
def test_build_download_script_escapes_paths_and_urls() -> None:
def test_remote_only_escapes_paths_and_urls() -> None:
items = [
AssetDownloadItem(path='/space path/"quoted".txt', url="https://example.com/a?b=1&c=2"),
AssetDownloadItem(path=r"/path/with\\backslash", url="https://example.com/with space"),
SandboxDownloadItem(path='/space path/"quoted".txt', url="https://example.com/a?b=1&c=2"),
SandboxDownloadItem(path=r"/path/with\\backslash", url="https://example.com/with space"),
]
script = AssetDownloadService.build_download_script(items, "skills")
@ -38,23 +52,100 @@ def test_build_download_script_escapes_paths_and_urls() -> None:
assert "?b=1&c=2" in script
def test_build_download_script_runs_parallel_jobs() -> None:
script = AssetDownloadService.build_download_script([], "skills")
def test_remote_only_runs_parallel_jobs() -> None:
items = [
SandboxDownloadItem(path="a.txt", url="https://example.com/a"),
SandboxDownloadItem(path="b.txt", url="https://example.com/b"),
]
script = AssetDownloadService.build_download_script(items, "skills")
assert "download_one" in script
assert "&" in script
assert "wait" in script
def test_build_download_script_appends_failures() -> None:
script = AssetDownloadService.build_download_script([], "skills")
def test_remote_only_appends_failures() -> None:
items = [SandboxDownloadItem(path="a.txt", url="https://example.com/a")]
script = AssetDownloadService.build_download_script(items, "skills")
assert "fail_log" in script
assert "Failed downloads" in script
assert "DOWNLOAD_FAILURES" in script
def test_build_download_script_contains_python_fallback() -> None:
script = AssetDownloadService.build_download_script([], "skills")
def test_remote_only_contains_python_fallback() -> None:
items = [SandboxDownloadItem(path="a.txt", url="https://example.com/a")]
script = AssetDownloadService.build_download_script(items, "skills")
assert "python3 -" in script
assert "urllib.request" in script
# --- Inline-only tests ---
def test_inline_only_no_downloader_detection() -> None:
items = [SandboxDownloadItem(path="skill.md", content=b"hello world")]
script = AssetDownloadService.build_download_script(items, "skills")
# No remote items → no downloader detection block.
assert "command -v curl" not in script
assert "download_one" not in script
assert "wait" not in script
def test_inline_only_base64_content() -> None:
content = b'{"content": "test skill", "metadata": {}}'
items = [SandboxDownloadItem(path="docs/skill.md", content=content)]
script = AssetDownloadService.build_download_script(items, "skills")
encoded = base64.b64encode(content).decode("ascii")
assert encoded in script
assert "base64 -d" in script
assert "docs/skill.md" in script
assert "_INLINE_0" in script
def test_inline_multiple_items() -> None:
items = [
SandboxDownloadItem(path="a.md", content=b"aaa"),
SandboxDownloadItem(path="b.md", content=b"bbb"),
]
script = AssetDownloadService.build_download_script(items, "skills")
assert "_INLINE_0" in script
assert "_INLINE_1" in script
assert base64.b64encode(b"aaa").decode() in script
assert base64.b64encode(b"bbb").decode() in script
# --- Mixed tests ---
def test_mixed_inline_and_remote() -> None:
items = [
SandboxDownloadItem(path="skill.md", content=b"resolved content"),
SandboxDownloadItem(path="data.py", url="https://example.com/data.py"),
]
script = AssetDownloadService.build_download_script(items, "skills")
# Inline content present
assert "base64 -d" in script
assert base64.b64encode(b"resolved content").decode() in script
# Remote download present
assert "command -v curl" in script
assert "download_one" in script
assert "data.py" in script
assert "https://example.com/data.py" in script
# --- Empty items ---
def test_empty_items_produces_valid_script() -> None:
script = AssetDownloadService.build_download_script([], "skills")
assert "download_root=skills" in script
assert "mkdir -p" in script
assert "exit 0" in script