From 06c31dfdb25430a7be1db4abb84b993d66e477bd Mon Sep 17 00:00:00 2001 From: Harry Date: Thu, 5 Feb 2026 15:30:18 +0800 Subject: [PATCH] feat: refactor sandbox file handling to use pipeline execution and improve script utilities --- api/core/sandbox/inspector/archive_source.py | 97 +++------- api/core/sandbox/inspector/runtime_source.py | 179 ++++++------------- api/core/sandbox/inspector/script_utils.py | 98 ++++++++++ 3 files changed, 170 insertions(+), 204 deletions(-) create mode 100644 api/core/sandbox/inspector/script_utils.py diff --git a/api/core/sandbox/inspector/archive_source.py b/api/core/sandbox/inspector/archive_source.py index 99100f2b1f..9c769a087b 100644 --- a/api/core/sandbox/inspector/archive_source.py +++ b/api/core/sandbox/inspector/archive_source.py @@ -1,6 +1,5 @@ from __future__ import annotations -import json import logging import os from typing import TYPE_CHECKING @@ -8,9 +7,15 @@ from uuid import uuid4 from core.sandbox.entities.files import SandboxFileDownloadTicket, SandboxFileNode from core.sandbox.inspector.base import SandboxFileSource +from core.sandbox.inspector.script_utils import ( + build_detect_kind_command, + build_list_command, + parse_kind_output, + parse_list_output, +) from core.sandbox.storage import SandboxFilePaths -from core.virtual_environment.__base.exec import CommandExecutionError, PipelineExecutionError -from core.virtual_environment.__base.helpers import execute, pipeline +from core.virtual_environment.__base.exec import PipelineExecutionError +from core.virtual_environment.__base.helpers import pipeline from extensions.ext_storage import storage if TYPE_CHECKING: @@ -20,53 +25,6 @@ logger = logging.getLogger(__name__) class SandboxFileArchiveSource(SandboxFileSource): - _PYTHON_EXEC_CMD = 'if command -v python3 >/dev/null 2>&1; then py=python3; else py=python; fi; "$py" -c "$0" "$@"' - _LIST_SCRIPT = r""" -import json -import os -import sys - -path = sys.argv[1] -recursive = sys.argv[2] == "1" - -def norm(rel: str) -> str: - rel = rel.replace("\\", "/") - rel = rel.lstrip("./") - return rel or "." - -def stat_entry(full_path: str, rel_path: str) -> dict: - st = os.stat(full_path) - is_dir = os.path.isdir(full_path) - return { - "path": norm(rel_path), - "is_dir": is_dir, - "size": None if is_dir else int(st.st_size), - "mtime": int(st.st_mtime), - } - -entries = [] -if recursive: - for root, dirs, files in os.walk(path): - for d in dirs: - fp = os.path.join(root, d) - rp = os.path.relpath(fp, ".") - entries.append(stat_entry(fp, rp)) - for f in files: - fp = os.path.join(root, f) - rp = os.path.relpath(fp, ".") - entries.append(stat_entry(fp, rp)) -else: - if os.path.isfile(path): - rel_path = os.path.relpath(path, ".") - entries.append(stat_entry(path, rel_path)) - else: - for item in os.scandir(path): - rel_path = os.path.relpath(item.path, ".") - entries.append(stat_entry(item.path, rel_path)) - -print(json.dumps(entries)) -""" - def _get_archive_download_url(self) -> str: """Get a pre-signed download URL for the sandbox archive.""" from extensions.storage.file_presign_storage import FilePresignStorage @@ -97,26 +55,19 @@ print(json.dumps(entries)) # List files using Python script in sandbox try: - result = execute( - zs.vm, - [ - "sh", - "-c", - self._PYTHON_EXEC_CMD, - self._LIST_SCRIPT, - f"workspace/{path}" if path not in (".", "") else "workspace", - "1" if recursive else "0", - ], - timeout=self._LIST_TIMEOUT_SECONDS, - error_message="Failed to list sandbox files", + list_path = f"workspace/{path}" if path not in (".", "") else "workspace" + results = ( + pipeline(zs.vm) + .add( + build_list_command(list_path, recursive), + error_message="Failed to list sandbox files", + ) + .execute(timeout=self._LIST_TIMEOUT_SECONDS, raise_on_error=True) ) - except CommandExecutionError as exc: + except PipelineExecutionError as exc: raise RuntimeError(str(exc)) from exc - try: - raw = json.loads(result.stdout.decode("utf-8")) - except Exception as exc: - raise RuntimeError("Malformed sandbox file list output") from exc + raw = parse_list_output(results[0].stdout) entries: list[SandboxFileNode] = [] for item in raw: @@ -165,13 +116,7 @@ print(json.dumps(entries)) results = ( pipeline(zs.vm) .add( - [ - "sh", - "-c", - 'if [ -d "$1" ]; then echo dir; elif [ -f "$1" ]; then echo file; else exit 2; fi', - "sh", - target_path, - ], + build_detect_kind_command(target_path), error_message="Failed to check path in sandbox", ) .execute(timeout=self._LIST_TIMEOUT_SECONDS, raise_on_error=True) @@ -179,9 +124,7 @@ print(json.dumps(entries)) except PipelineExecutionError as exc: raise ValueError(str(exc)) from exc - kind = results[0].stdout.decode("utf-8", errors="replace").strip() - if kind not in ("dir", "file"): - raise ValueError("File not found in sandbox archive") + kind = parse_kind_output(results[0].stdout, not_found_message="File not found in sandbox archive") sandbox_storage = SandboxFileService.get_storage() is_file = kind == "file" diff --git a/api/core/sandbox/inspector/runtime_source.py b/api/core/sandbox/inspector/runtime_source.py index 639b1d66d5..13cf65e39b 100644 --- a/api/core/sandbox/inspector/runtime_source.py +++ b/api/core/sandbox/inspector/runtime_source.py @@ -1,15 +1,20 @@ from __future__ import annotations -import json import logging import os from uuid import uuid4 from core.sandbox.entities.files import SandboxFileDownloadTicket, SandboxFileNode from core.sandbox.inspector.base import SandboxFileSource +from core.sandbox.inspector.script_utils import ( + build_detect_kind_command, + build_list_command, + parse_kind_output, + parse_list_output, +) from core.sandbox.storage import SandboxFilePaths -from core.virtual_environment.__base.exec import CommandExecutionError -from core.virtual_environment.__base.helpers import execute +from core.virtual_environment.__base.exec import PipelineExecutionError +from core.virtual_environment.__base.helpers import pipeline from core.virtual_environment.__base.virtual_environment import VirtualEnvironment logger = logging.getLogger(__name__) @@ -25,73 +30,19 @@ class SandboxFileRuntimeSource(SandboxFileSource): return self._runtime is not None def list_files(self, *, path: str, recursive: bool) -> list[SandboxFileNode]: - script = r""" -import json -import os -import sys - -path = sys.argv[1] -recursive = sys.argv[2] == "1" - -def norm(rel: str) -> str: - rel = rel.replace("\\\\", "/") - rel = rel.lstrip("./") - return rel or "." - -def stat_entry(full_path: str, rel_path: str) -> dict: - st = os.stat(full_path) - is_dir = os.path.isdir(full_path) - return { - "path": norm(rel_path), - "is_dir": is_dir, - "size": None if is_dir else int(st.st_size), - "mtime": int(st.st_mtime), - } - -entries = [] -if recursive: - for root, dirs, files in os.walk(path): - for d in dirs: - fp = os.path.join(root, d) - rp = os.path.relpath(fp, ".") - entries.append(stat_entry(fp, rp)) - for f in files: - fp = os.path.join(root, f) - rp = os.path.relpath(fp, ".") - entries.append(stat_entry(fp, rp)) -else: - if os.path.isfile(path): - rel_path = os.path.relpath(path, ".") - entries.append(stat_entry(path, rel_path)) - else: - for item in os.scandir(path): - rel_path = os.path.relpath(item.path, ".") - entries.append(stat_entry(item.path, rel_path)) - -print(json.dumps(entries)) -""" - try: - result = execute( - self._runtime, - [ - "sh", - "-c", - 'if command -v python3 >/dev/null 2>&1; then py=python3; else py=python; fi; "$py" -c "$0" "$@"', - script, - path, - "1" if recursive else "0", - ], - timeout=self._LIST_TIMEOUT_SECONDS, - error_message="Failed to list sandbox files", + results = ( + pipeline(self._runtime) + .add( + build_list_command(path, recursive), + error_message="Failed to list sandbox files", + ) + .execute(timeout=self._LIST_TIMEOUT_SECONDS, raise_on_error=True) ) - except CommandExecutionError as exc: + except PipelineExecutionError as exc: raise RuntimeError(str(exc)) from exc - try: - raw = json.loads(result.stdout.decode("utf-8")) - except Exception as exc: - raise RuntimeError("Malformed sandbox file list output") from exc + raw = parse_list_output(results[0].stdout) entries: list[SandboxFileNode] = [] for item in raw: @@ -115,7 +66,19 @@ print(json.dumps(entries)) def download_file(self, *, path: str) -> SandboxFileDownloadTicket: from services.sandbox.sandbox_file_service import SandboxFileService - kind = self._detect_path_kind(path) + try: + results = ( + pipeline(self._runtime) + .add( + build_detect_kind_command(path), + error_message="Failed to check path in sandbox", + ) + .execute(timeout=self._LIST_TIMEOUT_SECONDS, raise_on_error=True) + ) + except PipelineExecutionError as exc: + raise ValueError(str(exc)) from exc + + kind = parse_kind_output(results[0].stdout, not_found_message="File not found in sandbox") export_name = os.path.basename(path.rstrip("/")) or "workspace" filename = f"{export_name}.tar.gz" if kind == "dir" else (os.path.basename(path) or "file") @@ -134,40 +97,39 @@ print(json.dumps(entries)) if kind == "dir": archive_path = f"/tmp/{export_id}.tar.gz" try: - execute( - self._runtime, - ["tar", "-czf", archive_path, "-C", ".", path], - timeout=self._UPLOAD_TIMEOUT_SECONDS, - error_message="Failed to archive directory in sandbox", + ( + pipeline(self._runtime) + .add( + ["tar", "-czf", archive_path, "-C", ".", path], + error_message="Failed to archive directory in sandbox", + ) + .add( + ["curl", "-s", "-f", "-X", "PUT", "-T", archive_path, upload_url], + error_message="Failed to upload directory archive from sandbox", + ) + .execute(timeout=self._UPLOAD_TIMEOUT_SECONDS, raise_on_error=True) ) - execute( - self._runtime, - ["curl", "-s", "-f", "-X", "PUT", "-T", archive_path, upload_url], - timeout=self._UPLOAD_TIMEOUT_SECONDS, - error_message="Failed to upload directory archive from sandbox", - ) - except CommandExecutionError as exc: + except PipelineExecutionError as exc: raise RuntimeError(str(exc)) from exc finally: try: - execute( - self._runtime, - ["rm", "-f", archive_path], - timeout=self._LIST_TIMEOUT_SECONDS, - error_message="Failed to cleanup temp archive", + pipeline(self._runtime).add(["rm", "-f", archive_path]).execute( + timeout=self._LIST_TIMEOUT_SECONDS ) except Exception as exc: # Best-effort cleanup; do not fail the download on cleanup issues. logger.debug("Failed to cleanup temp archive %s: %s", archive_path, exc) else: try: - execute( - self._runtime, - ["curl", "-s", "-f", "-X", "PUT", "-T", path, upload_url], - timeout=self._UPLOAD_TIMEOUT_SECONDS, - error_message="Failed to upload file from sandbox", + ( + pipeline(self._runtime) + .add( + ["curl", "-s", "-f", "-X", "PUT", "-T", path, upload_url], + error_message="Failed to upload file from sandbox", + ) + .execute(timeout=self._UPLOAD_TIMEOUT_SECONDS, raise_on_error=True) ) - except CommandExecutionError as exc: + except PipelineExecutionError as exc: raise RuntimeError(str(exc)) from exc download_url = sandbox_storage.get_download_url(export_key, self._EXPORT_EXPIRES_IN_SECONDS) @@ -176,40 +138,3 @@ print(json.dumps(entries)) expires_in=self._EXPORT_EXPIRES_IN_SECONDS, export_id=export_id, ) - - def _detect_path_kind(self, path: str) -> str: - script = r""" -import os -import sys - -p = sys.argv[1] -if os.path.isdir(p): - print("dir") - raise SystemExit(0) -if os.path.isfile(p): - print("file") - raise SystemExit(0) -print("none") -raise SystemExit(2) -""" - - try: - result = execute( - self._runtime, - [ - "sh", - "-c", - 'if command -v python3 >/dev/null 2>&1; then py=python3; else py=python; fi; "$py" -c "$0" "$@"', - script, - path, - ], - timeout=self._LIST_TIMEOUT_SECONDS, - error_message="Failed to check path in sandbox", - ) - except CommandExecutionError as exc: - raise ValueError(str(exc)) from exc - - kind = result.stdout.decode("utf-8", errors="replace").strip() - if kind not in ("dir", "file"): - raise ValueError("File not found in sandbox") - return kind diff --git a/api/core/sandbox/inspector/script_utils.py b/api/core/sandbox/inspector/script_utils.py new file mode 100644 index 0000000000..2846a27a45 --- /dev/null +++ b/api/core/sandbox/inspector/script_utils.py @@ -0,0 +1,98 @@ +"""Shared helpers for sandbox inspector shell commands.""" + +from __future__ import annotations + +import json +from typing import TypedDict, cast + +_PYTHON_EXEC_CMD = 'if command -v python3 >/dev/null 2>&1; then py=python3; else py=python; fi; "$py" -c "$0" "$@"' +_LIST_SCRIPT = r""" +import json +import os +import sys + +path = sys.argv[1] +recursive = sys.argv[2] == "1" + +def norm(rel: str) -> str: + rel = rel.replace("\\\\", "/") + rel = rel.lstrip("./") + return rel or "." + +def stat_entry(full_path: str, rel_path: str) -> dict[str, object]: + st = os.stat(full_path) + is_dir = os.path.isdir(full_path) + return { + "path": norm(rel_path), + "is_dir": is_dir, + "size": None if is_dir else int(st.st_size), + "mtime": int(st.st_mtime), + } + +entries = [] +if recursive: + for root, dirs, files in os.walk(path): + for d in dirs: + fp = os.path.join(root, d) + rp = os.path.relpath(fp, ".") + entries.append(stat_entry(fp, rp)) + for f in files: + fp = os.path.join(root, f) + rp = os.path.relpath(fp, ".") + entries.append(stat_entry(fp, rp)) +else: + if os.path.isfile(path): + rel_path = os.path.relpath(path, ".") + entries.append(stat_entry(path, rel_path)) + else: + for item in os.scandir(path): + rel_path = os.path.relpath(item.path, ".") + entries.append(stat_entry(item.path, rel_path)) + +print(json.dumps(entries)) +""" + + +class ListedEntry(TypedDict): + path: str + is_dir: bool + size: int | None + mtime: int + + +def build_list_command(path: str, recursive: bool) -> list[str]: + return [ + "sh", + "-c", + _PYTHON_EXEC_CMD, + _LIST_SCRIPT, + path, + "1" if recursive else "0", + ] + + +def parse_list_output(stdout: bytes) -> list[ListedEntry]: + try: + raw = json.loads(stdout.decode("utf-8")) + except Exception as exc: + raise RuntimeError("Malformed sandbox file list output") from exc + if not isinstance(raw, list): + raise RuntimeError("Malformed sandbox file list output") + return cast(list[ListedEntry], raw) + + +def build_detect_kind_command(path: str) -> list[str]: + return [ + "sh", + "-c", + 'if [ -d "$1" ]; then echo dir; elif [ -f "$1" ]; then echo file; else exit 2; fi', + "sh", + path, + ] + + +def parse_kind_output(stdout: bytes, *, not_found_message: str) -> str: + kind = stdout.decode("utf-8", errors="replace").strip() + if kind not in ("dir", "file"): + raise ValueError(not_found_message) + return kind