feat: refactor sandbox file handling to use pipeline execution and improve script utilities

This commit is contained in:
Harry
2026-02-05 15:30:18 +08:00
parent 7161c3dd80
commit 06c31dfdb2
3 changed files with 170 additions and 204 deletions

View File

@ -1,6 +1,5 @@
from __future__ import annotations
import json
import logging
import os
from typing import TYPE_CHECKING
@ -8,9 +7,15 @@ from uuid import uuid4
from core.sandbox.entities.files import SandboxFileDownloadTicket, SandboxFileNode
from core.sandbox.inspector.base import SandboxFileSource
from core.sandbox.inspector.script_utils import (
build_detect_kind_command,
build_list_command,
parse_kind_output,
parse_list_output,
)
from core.sandbox.storage import SandboxFilePaths
from core.virtual_environment.__base.exec import CommandExecutionError, PipelineExecutionError
from core.virtual_environment.__base.helpers import execute, pipeline
from core.virtual_environment.__base.exec import PipelineExecutionError
from core.virtual_environment.__base.helpers import pipeline
from extensions.ext_storage import storage
if TYPE_CHECKING:
@ -20,53 +25,6 @@ logger = logging.getLogger(__name__)
class SandboxFileArchiveSource(SandboxFileSource):
_PYTHON_EXEC_CMD = 'if command -v python3 >/dev/null 2>&1; then py=python3; else py=python; fi; "$py" -c "$0" "$@"'
_LIST_SCRIPT = r"""
import json
import os
import sys
path = sys.argv[1]
recursive = sys.argv[2] == "1"
def norm(rel: str) -> str:
rel = rel.replace("\\", "/")
rel = rel.lstrip("./")
return rel or "."
def stat_entry(full_path: str, rel_path: str) -> dict:
st = os.stat(full_path)
is_dir = os.path.isdir(full_path)
return {
"path": norm(rel_path),
"is_dir": is_dir,
"size": None if is_dir else int(st.st_size),
"mtime": int(st.st_mtime),
}
entries = []
if recursive:
for root, dirs, files in os.walk(path):
for d in dirs:
fp = os.path.join(root, d)
rp = os.path.relpath(fp, ".")
entries.append(stat_entry(fp, rp))
for f in files:
fp = os.path.join(root, f)
rp = os.path.relpath(fp, ".")
entries.append(stat_entry(fp, rp))
else:
if os.path.isfile(path):
rel_path = os.path.relpath(path, ".")
entries.append(stat_entry(path, rel_path))
else:
for item in os.scandir(path):
rel_path = os.path.relpath(item.path, ".")
entries.append(stat_entry(item.path, rel_path))
print(json.dumps(entries))
"""
def _get_archive_download_url(self) -> str:
"""Get a pre-signed download URL for the sandbox archive."""
from extensions.storage.file_presign_storage import FilePresignStorage
@ -97,26 +55,19 @@ print(json.dumps(entries))
# List files using Python script in sandbox
try:
result = execute(
zs.vm,
[
"sh",
"-c",
self._PYTHON_EXEC_CMD,
self._LIST_SCRIPT,
f"workspace/{path}" if path not in (".", "") else "workspace",
"1" if recursive else "0",
],
timeout=self._LIST_TIMEOUT_SECONDS,
error_message="Failed to list sandbox files",
list_path = f"workspace/{path}" if path not in (".", "") else "workspace"
results = (
pipeline(zs.vm)
.add(
build_list_command(list_path, recursive),
error_message="Failed to list sandbox files",
)
.execute(timeout=self._LIST_TIMEOUT_SECONDS, raise_on_error=True)
)
except CommandExecutionError as exc:
except PipelineExecutionError as exc:
raise RuntimeError(str(exc)) from exc
try:
raw = json.loads(result.stdout.decode("utf-8"))
except Exception as exc:
raise RuntimeError("Malformed sandbox file list output") from exc
raw = parse_list_output(results[0].stdout)
entries: list[SandboxFileNode] = []
for item in raw:
@ -165,13 +116,7 @@ print(json.dumps(entries))
results = (
pipeline(zs.vm)
.add(
[
"sh",
"-c",
'if [ -d "$1" ]; then echo dir; elif [ -f "$1" ]; then echo file; else exit 2; fi',
"sh",
target_path,
],
build_detect_kind_command(target_path),
error_message="Failed to check path in sandbox",
)
.execute(timeout=self._LIST_TIMEOUT_SECONDS, raise_on_error=True)
@ -179,9 +124,7 @@ print(json.dumps(entries))
except PipelineExecutionError as exc:
raise ValueError(str(exc)) from exc
kind = results[0].stdout.decode("utf-8", errors="replace").strip()
if kind not in ("dir", "file"):
raise ValueError("File not found in sandbox archive")
kind = parse_kind_output(results[0].stdout, not_found_message="File not found in sandbox archive")
sandbox_storage = SandboxFileService.get_storage()
is_file = kind == "file"

View File

@ -1,15 +1,20 @@
from __future__ import annotations
import json
import logging
import os
from uuid import uuid4
from core.sandbox.entities.files import SandboxFileDownloadTicket, SandboxFileNode
from core.sandbox.inspector.base import SandboxFileSource
from core.sandbox.inspector.script_utils import (
build_detect_kind_command,
build_list_command,
parse_kind_output,
parse_list_output,
)
from core.sandbox.storage import SandboxFilePaths
from core.virtual_environment.__base.exec import CommandExecutionError
from core.virtual_environment.__base.helpers import execute
from core.virtual_environment.__base.exec import PipelineExecutionError
from core.virtual_environment.__base.helpers import pipeline
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
logger = logging.getLogger(__name__)
@ -25,73 +30,19 @@ class SandboxFileRuntimeSource(SandboxFileSource):
return self._runtime is not None
def list_files(self, *, path: str, recursive: bool) -> list[SandboxFileNode]:
script = r"""
import json
import os
import sys
path = sys.argv[1]
recursive = sys.argv[2] == "1"
def norm(rel: str) -> str:
rel = rel.replace("\\\\", "/")
rel = rel.lstrip("./")
return rel or "."
def stat_entry(full_path: str, rel_path: str) -> dict:
st = os.stat(full_path)
is_dir = os.path.isdir(full_path)
return {
"path": norm(rel_path),
"is_dir": is_dir,
"size": None if is_dir else int(st.st_size),
"mtime": int(st.st_mtime),
}
entries = []
if recursive:
for root, dirs, files in os.walk(path):
for d in dirs:
fp = os.path.join(root, d)
rp = os.path.relpath(fp, ".")
entries.append(stat_entry(fp, rp))
for f in files:
fp = os.path.join(root, f)
rp = os.path.relpath(fp, ".")
entries.append(stat_entry(fp, rp))
else:
if os.path.isfile(path):
rel_path = os.path.relpath(path, ".")
entries.append(stat_entry(path, rel_path))
else:
for item in os.scandir(path):
rel_path = os.path.relpath(item.path, ".")
entries.append(stat_entry(item.path, rel_path))
print(json.dumps(entries))
"""
try:
result = execute(
self._runtime,
[
"sh",
"-c",
'if command -v python3 >/dev/null 2>&1; then py=python3; else py=python; fi; "$py" -c "$0" "$@"',
script,
path,
"1" if recursive else "0",
],
timeout=self._LIST_TIMEOUT_SECONDS,
error_message="Failed to list sandbox files",
results = (
pipeline(self._runtime)
.add(
build_list_command(path, recursive),
error_message="Failed to list sandbox files",
)
.execute(timeout=self._LIST_TIMEOUT_SECONDS, raise_on_error=True)
)
except CommandExecutionError as exc:
except PipelineExecutionError as exc:
raise RuntimeError(str(exc)) from exc
try:
raw = json.loads(result.stdout.decode("utf-8"))
except Exception as exc:
raise RuntimeError("Malformed sandbox file list output") from exc
raw = parse_list_output(results[0].stdout)
entries: list[SandboxFileNode] = []
for item in raw:
@ -115,7 +66,19 @@ print(json.dumps(entries))
def download_file(self, *, path: str) -> SandboxFileDownloadTicket:
from services.sandbox.sandbox_file_service import SandboxFileService
kind = self._detect_path_kind(path)
try:
results = (
pipeline(self._runtime)
.add(
build_detect_kind_command(path),
error_message="Failed to check path in sandbox",
)
.execute(timeout=self._LIST_TIMEOUT_SECONDS, raise_on_error=True)
)
except PipelineExecutionError as exc:
raise ValueError(str(exc)) from exc
kind = parse_kind_output(results[0].stdout, not_found_message="File not found in sandbox")
export_name = os.path.basename(path.rstrip("/")) or "workspace"
filename = f"{export_name}.tar.gz" if kind == "dir" else (os.path.basename(path) or "file")
@ -134,40 +97,39 @@ print(json.dumps(entries))
if kind == "dir":
archive_path = f"/tmp/{export_id}.tar.gz"
try:
execute(
self._runtime,
["tar", "-czf", archive_path, "-C", ".", path],
timeout=self._UPLOAD_TIMEOUT_SECONDS,
error_message="Failed to archive directory in sandbox",
(
pipeline(self._runtime)
.add(
["tar", "-czf", archive_path, "-C", ".", path],
error_message="Failed to archive directory in sandbox",
)
.add(
["curl", "-s", "-f", "-X", "PUT", "-T", archive_path, upload_url],
error_message="Failed to upload directory archive from sandbox",
)
.execute(timeout=self._UPLOAD_TIMEOUT_SECONDS, raise_on_error=True)
)
execute(
self._runtime,
["curl", "-s", "-f", "-X", "PUT", "-T", archive_path, upload_url],
timeout=self._UPLOAD_TIMEOUT_SECONDS,
error_message="Failed to upload directory archive from sandbox",
)
except CommandExecutionError as exc:
except PipelineExecutionError as exc:
raise RuntimeError(str(exc)) from exc
finally:
try:
execute(
self._runtime,
["rm", "-f", archive_path],
timeout=self._LIST_TIMEOUT_SECONDS,
error_message="Failed to cleanup temp archive",
pipeline(self._runtime).add(["rm", "-f", archive_path]).execute(
timeout=self._LIST_TIMEOUT_SECONDS
)
except Exception as exc:
# Best-effort cleanup; do not fail the download on cleanup issues.
logger.debug("Failed to cleanup temp archive %s: %s", archive_path, exc)
else:
try:
execute(
self._runtime,
["curl", "-s", "-f", "-X", "PUT", "-T", path, upload_url],
timeout=self._UPLOAD_TIMEOUT_SECONDS,
error_message="Failed to upload file from sandbox",
(
pipeline(self._runtime)
.add(
["curl", "-s", "-f", "-X", "PUT", "-T", path, upload_url],
error_message="Failed to upload file from sandbox",
)
.execute(timeout=self._UPLOAD_TIMEOUT_SECONDS, raise_on_error=True)
)
except CommandExecutionError as exc:
except PipelineExecutionError as exc:
raise RuntimeError(str(exc)) from exc
download_url = sandbox_storage.get_download_url(export_key, self._EXPORT_EXPIRES_IN_SECONDS)
@ -176,40 +138,3 @@ print(json.dumps(entries))
expires_in=self._EXPORT_EXPIRES_IN_SECONDS,
export_id=export_id,
)
def _detect_path_kind(self, path: str) -> str:
script = r"""
import os
import sys
p = sys.argv[1]
if os.path.isdir(p):
print("dir")
raise SystemExit(0)
if os.path.isfile(p):
print("file")
raise SystemExit(0)
print("none")
raise SystemExit(2)
"""
try:
result = execute(
self._runtime,
[
"sh",
"-c",
'if command -v python3 >/dev/null 2>&1; then py=python3; else py=python; fi; "$py" -c "$0" "$@"',
script,
path,
],
timeout=self._LIST_TIMEOUT_SECONDS,
error_message="Failed to check path in sandbox",
)
except CommandExecutionError as exc:
raise ValueError(str(exc)) from exc
kind = result.stdout.decode("utf-8", errors="replace").strip()
if kind not in ("dir", "file"):
raise ValueError("File not found in sandbox")
return kind

View File

@ -0,0 +1,98 @@
"""Shared helpers for sandbox inspector shell commands."""
from __future__ import annotations
import json
from typing import TypedDict, cast
_PYTHON_EXEC_CMD = 'if command -v python3 >/dev/null 2>&1; then py=python3; else py=python; fi; "$py" -c "$0" "$@"'
_LIST_SCRIPT = r"""
import json
import os
import sys
path = sys.argv[1]
recursive = sys.argv[2] == "1"
def norm(rel: str) -> str:
rel = rel.replace("\\\\", "/")
rel = rel.lstrip("./")
return rel or "."
def stat_entry(full_path: str, rel_path: str) -> dict[str, object]:
st = os.stat(full_path)
is_dir = os.path.isdir(full_path)
return {
"path": norm(rel_path),
"is_dir": is_dir,
"size": None if is_dir else int(st.st_size),
"mtime": int(st.st_mtime),
}
entries = []
if recursive:
for root, dirs, files in os.walk(path):
for d in dirs:
fp = os.path.join(root, d)
rp = os.path.relpath(fp, ".")
entries.append(stat_entry(fp, rp))
for f in files:
fp = os.path.join(root, f)
rp = os.path.relpath(fp, ".")
entries.append(stat_entry(fp, rp))
else:
if os.path.isfile(path):
rel_path = os.path.relpath(path, ".")
entries.append(stat_entry(path, rel_path))
else:
for item in os.scandir(path):
rel_path = os.path.relpath(item.path, ".")
entries.append(stat_entry(item.path, rel_path))
print(json.dumps(entries))
"""
class ListedEntry(TypedDict):
path: str
is_dir: bool
size: int | None
mtime: int
def build_list_command(path: str, recursive: bool) -> list[str]:
return [
"sh",
"-c",
_PYTHON_EXEC_CMD,
_LIST_SCRIPT,
path,
"1" if recursive else "0",
]
def parse_list_output(stdout: bytes) -> list[ListedEntry]:
try:
raw = json.loads(stdout.decode("utf-8"))
except Exception as exc:
raise RuntimeError("Malformed sandbox file list output") from exc
if not isinstance(raw, list):
raise RuntimeError("Malformed sandbox file list output")
return cast(list[ListedEntry], raw)
def build_detect_kind_command(path: str) -> list[str]:
return [
"sh",
"-c",
'if [ -d "$1" ]; then echo dir; elif [ -f "$1" ]; then echo file; else exit 2; fi',
"sh",
path,
]
def parse_kind_output(stdout: bytes, *, not_found_message: str) -> str:
kind = stdout.decode("utf-8", errors="replace").strip()
if kind not in ("dir", "file"):
raise ValueError(not_found_message)
return kind