mirror of
https://github.com/langgenius/dify.git
synced 2026-04-26 21:55:58 +08:00
feat(sandbox-zip-service): using sandbox to zip files
- refactor allllllllll!!!!!!
This commit is contained in:
117
api/core/zip_sandbox/python_strategy.py
Normal file
117
api/core/zip_sandbox/python_strategy.py
Normal file
@ -0,0 +1,117 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from core.virtual_environment.__base.helpers import execute, try_execute
|
||||
|
||||
from .strategy import ZipStrategy
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
|
||||
|
||||
|
||||
ZIP_SCRIPT = r"""
|
||||
import os
|
||||
import sys
|
||||
import zipfile
|
||||
|
||||
src = sys.argv[1]
|
||||
out_path = sys.argv[2]
|
||||
|
||||
def is_cwd(p: str) -> bool:
|
||||
return p in (".", "")
|
||||
|
||||
src = src.rstrip("/")
|
||||
|
||||
if is_cwd(src):
|
||||
base = "."
|
||||
root = "."
|
||||
prefix = ""
|
||||
else:
|
||||
base = os.path.dirname(src) or "."
|
||||
prefix = os.path.basename(src)
|
||||
root = os.path.join(base, prefix)
|
||||
|
||||
def add_empty_dir(zf: zipfile.ZipFile, arc_dir: str) -> None:
|
||||
name = arc_dir.rstrip("/") + "/"
|
||||
if name != "/":
|
||||
zf.writestr(name, b"")
|
||||
|
||||
with zipfile.ZipFile(out_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
|
||||
if os.path.isfile(root):
|
||||
zf.write(root, arcname=os.path.basename(root))
|
||||
else:
|
||||
for dirpath, dirnames, filenames in os.walk(root):
|
||||
rel_dir = os.path.relpath(dirpath, base)
|
||||
rel_dir = "" if rel_dir == "." else rel_dir
|
||||
if not dirnames and not filenames:
|
||||
add_empty_dir(zf, rel_dir)
|
||||
for fn in filenames:
|
||||
fp = os.path.join(dirpath, fn)
|
||||
arcname = os.path.join(rel_dir, fn) if rel_dir else fn
|
||||
zf.write(fp, arcname=arcname)
|
||||
"""
|
||||
|
||||
UNZIP_SCRIPT = r"""
|
||||
import sys
|
||||
import zipfile
|
||||
|
||||
archive_path = sys.argv[1]
|
||||
dest_dir = sys.argv[2]
|
||||
|
||||
with zipfile.ZipFile(archive_path, "r") as zf:
|
||||
zf.extractall(dest_dir)
|
||||
"""
|
||||
|
||||
|
||||
class PythonZipStrategy(ZipStrategy):
|
||||
"""Strategy using Python's zipfile module."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._python_cmd: str | None = None
|
||||
|
||||
def is_available(self, vm: VirtualEnvironment) -> bool:
|
||||
for cmd in ("python3", "python"):
|
||||
result = try_execute(vm, ["which", cmd], timeout=10)
|
||||
if result.stdout and result.stdout.strip():
|
||||
self._python_cmd = cmd
|
||||
return True
|
||||
return False
|
||||
|
||||
def zip(
|
||||
self,
|
||||
vm: VirtualEnvironment,
|
||||
*,
|
||||
src: str,
|
||||
out_path: str,
|
||||
cwd: str | None,
|
||||
timeout: float,
|
||||
) -> None:
|
||||
if self._python_cmd is None:
|
||||
raise RuntimeError("Python not available")
|
||||
|
||||
execute(
|
||||
vm,
|
||||
[self._python_cmd, "-c", ZIP_SCRIPT, src, out_path],
|
||||
timeout=timeout,
|
||||
cwd=cwd,
|
||||
error_message="Failed to create zip archive",
|
||||
)
|
||||
|
||||
def unzip(
|
||||
self,
|
||||
vm: VirtualEnvironment,
|
||||
*,
|
||||
archive_path: str,
|
||||
dest_dir: str,
|
||||
timeout: float,
|
||||
) -> None:
|
||||
if self._python_cmd is None:
|
||||
raise RuntimeError("Python not available")
|
||||
|
||||
execute(
|
||||
vm,
|
||||
[self._python_cmd, "-c", UNZIP_SCRIPT, archive_path, dest_dir],
|
||||
timeout=timeout,
|
||||
error_message="Failed to unzip archive",
|
||||
)
|
||||
Reference in New Issue
Block a user