feat: implement transport abstractions for virtual environments and add E2B environment provider

This commit is contained in:
Yeuoly
2025-12-31 17:51:38 +08:00
parent 29dc083d8d
commit c9610e9949
10 changed files with 518 additions and 41 deletions

View File

@ -4,7 +4,7 @@ from io import BytesIO
from typing import Any
from core.virtual_environment.__base.entities import CommandStatus, ConnectionHandle, FileState, Metadata
from core.virtual_environment.channel.transport import Transport
from core.virtual_environment.channel.transport import TransportReadCloser, TransportWriteCloser
class VirtualEnvironment(ABC):
@ -12,7 +12,7 @@ class VirtualEnvironment(ABC):
Base class for virtual environment implementations.
"""
def __init__(self, options: Mapping[str, Any], environments: Mapping[str, Any] | None = None) -> None:
def __init__(self, options: Mapping[str, Any], environments: Mapping[str, str] | None = None) -> None:
"""
Initialize the virtual environment with metadata.
"""
@ -21,7 +21,7 @@ class VirtualEnvironment(ABC):
self.metadata = self.construct_environment(options, environments or {})
@abstractmethod
def construct_environment(self, options: Mapping[str, Any], environments: Mapping[str, Any]) -> Metadata:
def construct_environment(self, options: Mapping[str, Any], environments: Mapping[str, str]) -> Metadata:
"""
Construct the unique identifier for the virtual environment.
@ -118,8 +118,8 @@ class VirtualEnvironment(ABC):
@abstractmethod
def execute_command(
self, connection_handle: ConnectionHandle, command: list[str]
) -> tuple[str, Transport, Transport, Transport]:
self, connection_handle: ConnectionHandle, command: list[str], environments: Mapping[str, str] | None = None
) -> tuple[str, TransportWriteCloser, TransportReadCloser, TransportReadCloser]:
"""
Execute a command in the virtual environment.
@ -128,8 +128,8 @@ class VirtualEnvironment(ABC):
command (list[str]): The command to execute as a list of strings.
Returns:
tuple[int, Transport, Transport, Transport]: A tuple containing pid and 3 handle
to os.pipe(): (stdin, stdout, stderr).
tuple[int, TransportWriteCloser, TransportReadCloser, TransportReadCloser]
a tuple containing pid and 3 handle to os.pipe(): (stdin, stdout, stderr).
After exuection, the 3 handles will be closed by caller.
"""

View File

@ -1,6 +1,6 @@
import os
from core.virtual_environment.channel.transport import Transport
from core.virtual_environment.channel.transport import Transport, TransportReadCloser, TransportWriteCloser
class PipeTransport(Transport):
@ -26,3 +26,33 @@ class PipeTransport(Transport):
def close(self) -> None:
os.close(self.r_fd)
os.close(self.w_fd)
class PipeReadCloser(TransportReadCloser):
"""
A Transport implementation using OS pipe for reading.
"""
def __init__(self, r_fd: int):
self.r_fd = r_fd
def read(self, n: int) -> bytes:
return os.read(self.r_fd, n)
def close(self) -> None:
os.close(self.r_fd)
class PipeWriteCloser(TransportWriteCloser):
"""
A Transport implementation using OS pipe for writing.
"""
def __init__(self, w_fd: int):
self.w_fd = w_fd
def write(self, data: bytes) -> None:
os.write(self.w_fd, data)
def close(self) -> None:
os.close(self.w_fd)

View File

@ -0,0 +1,66 @@
from queue import Queue
from core.virtual_environment.channel.transport import TransportReadCloser
class QueueTransportReadCloser(TransportReadCloser):
"""
Transport implementation using queues for inter-thread communication.
Usage:
q_transport = QueueTransportReadCloser()
write_handler = q_transport.get_write_handler()
# In writer thread
write_handler.write(b"data")
# In reader thread
data = q_transport.read(1024)
# Close transport when done
q_transport.close()
"""
class WriteHandler:
"""
A write handler that writes data to a queue.
"""
def __init__(self, queue: Queue[bytes | None]) -> None:
self.queue = queue
def write(self, data: bytes) -> None:
self.queue.put(data)
def __init__(
self,
) -> None:
"""
Initialize the QueueTransportReadCloser with write function.
"""
self.q = Queue[bytes | None]()
def get_write_handler(self) -> WriteHandler:
"""
Get a write handler that writes to the internal queue.
"""
return QueueTransportReadCloser.WriteHandler(self.q)
def close(self) -> None:
"""
Close the transport by putting a sentinel value in the queue.
"""
self.q.put(None)
def read(self, n: int) -> bytes:
"""
Read up to n bytes from the queue.
"""
data = bytearray()
while len(data) < n:
chunk = self.q.get()
if chunk is None:
break
data.extend(chunk)
return bytes(data)

View File

@ -1,6 +1,6 @@
import socket
from core.virtual_environment.channel.transport import Transport
from core.virtual_environment.channel.transport import Transport, TransportReadCloser, TransportWriteCloser
class SocketTransport(Transport):
@ -19,3 +19,33 @@ class SocketTransport(Transport):
def close(self) -> None:
self.sock.close()
class SocketReadCloser(TransportReadCloser):
"""
A Transport implementation using a socket for reading.
"""
def __init__(self, sock: socket.SocketIO):
self.sock = sock
def read(self, n: int) -> bytes:
return self.sock.read(n)
def close(self) -> None:
self.sock.close()
class SocketWriteCloser(TransportWriteCloser):
"""
A Transport implementation using a socket for writing.
"""
def __init__(self, sock: socket.SocketIO):
self.sock = sock
def write(self, data: bytes) -> None:
self.sock.write(data)
def close(self) -> None:
self.sock.close()

View File

@ -2,24 +2,75 @@ from abc import abstractmethod
from typing import Protocol
class Transport(Protocol):
@abstractmethod
def write(self, data: bytes) -> None:
"""
Write data to the transport.
"""
pass
@abstractmethod
def read(self, n: int) -> bytes:
"""
Read up to n bytes from the transport.
"""
pass
class TransportCloser(Protocol):
"""
Transport that can be closed.
"""
@abstractmethod
def close(self) -> None:
"""
Close the transport.
"""
class TransportWriter(Protocol):
"""
Transport that can be written to.
"""
@abstractmethod
def write(self, data: bytes) -> None:
"""
Write data to the transport.
"""
class TransportReader(Protocol):
"""
Transport that can be read from.
"""
@abstractmethod
def read(self, n: int) -> bytes:
"""
Read up to n bytes from the transport.
"""
class TransportReadCloser(TransportReader, TransportCloser):
"""
Transport that can be read from and closed.
"""
class TransportWriteCloser(TransportWriter, TransportCloser):
"""
Transport that can be written to and closed.
"""
class Transport(TransportReader, TransportWriter, TransportCloser):
"""
Transport that can be read from, written to, and closed.
"""
class NopTransportWriteCloser(TransportWriteCloser):
"""
A no-operation TransportWriteCloser implementation.
This transport does nothing on write and close operations.
"""
def write(self, data: bytes) -> None:
"""
No-operation write method.
"""
pass
def close(self) -> None:
"""
No-operation close method.
"""
pass

View File

@ -15,8 +15,8 @@ import docker
from core.virtual_environment.__base.entities import Arch, CommandStatus, ConnectionHandle, FileState, Metadata
from core.virtual_environment.__base.exec import VirtualEnvironmentLaunchFailedError
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
from core.virtual_environment.channel.socket_transport import SocketTransport
from core.virtual_environment.channel.transport import Transport
from core.virtual_environment.channel.socket_transport import SocketReadCloser, SocketWriteCloser
from core.virtual_environment.channel.transport import TransportReadCloser, TransportWriteCloser
"""
EXAMPLE:
@ -69,7 +69,7 @@ class DockerDaemonEnvironment(VirtualEnvironment):
DOCKER_IMAGE = "docker_image"
DOCKER_COMMAND = "docker_command"
def construct_environment(self, options: Mapping[str, Any], environments: Mapping[str, Any]) -> Metadata:
def construct_environment(self, options: Mapping[str, Any], environments: Mapping[str, str]) -> Metadata:
"""
Construct the Docker daemon virtual environment.
"""
@ -253,7 +253,7 @@ class DockerDaemonEnvironment(VirtualEnvironment):
def execute_command(
self, connection_handle: ConnectionHandle, command: list[str], environments: Mapping[str, str] | None = None
) -> tuple[str, Transport, Transport, Transport]:
) -> tuple[str, TransportWriteCloser, TransportReadCloser, TransportReadCloser]:
container = self._get_container()
container_id = container.id
if not isinstance(container_id, str) or not container_id:
@ -279,8 +279,10 @@ class DockerDaemonEnvironment(VirtualEnvironment):
exec_id: str = str(exec_info.get("Id"))
raw_sock: socket.SocketIO = cast(socket.SocketIO, api_client.exec_start(exec_id, socket=True, tty=False)) # pyright: ignore[reportUnknownMemberType] #
transport = SocketTransport(raw_sock)
return exec_id, transport, transport, transport
stdin_transport = SocketWriteCloser(raw_sock)
stdout_transport = SocketReadCloser(raw_sock)
return exec_id, stdin_transport, stdout_transport, stdout_transport
def get_command_status(self, connection_handle: ConnectionHandle, pid: str) -> CommandStatus:
api_client = self.get_docker_api_client(self.get_docker_sock())

View File

@ -0,0 +1,230 @@
import os
import threading
from collections.abc import Mapping, Sequence
from enum import StrEnum
from functools import cached_property
from io import BytesIO
from typing import Any
from uuid import uuid4
from e2b_code_interpreter import Sandbox
from core.virtual_environment.__base.entities import Arch, CommandStatus, ConnectionHandle, FileState, Metadata
from core.virtual_environment.__base.exec import ArchNotSupportedError
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
from core.virtual_environment.channel.queue_transport import QueueTransportReadCloser
from core.virtual_environment.channel.transport import (
NopTransportWriteCloser,
TransportReadCloser,
TransportWriteCloser,
)
"""
import logging
from collections.abc import Mapping
from typing import Any
from core.virtual_environment.providers.e2b_sandbox import E2BEnvironment
options: Mapping[str, Any] = {E2BEnvironment.OptionsKey.API_KEY: "?????????"}
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
# environment = DockerDaemonEnvironment(options=options)
# environment = LocalVirtualEnvironment(options=options)
environment = E2BEnvironment(options=options)
connection_handle = environment.establish_connection()
pid, transport_stdin, transport_stdout, transport_stderr = environment.execute_command(
connection_handle, ["uname", "-a"]
)
logger.info("Executed command with PID: %s", pid)
# consume stdout
output = transport_stdout.read(1024)
logger.info("Command output: %s", output.decode().strip())
environment.release_connection(connection_handle)
environment.release_environment()
"""
class E2BEnvironment(VirtualEnvironment):
"""
E2B virtual environment provider.
"""
_WORKDIR = "/home/user"
class OptionsKey(StrEnum):
API_KEY = "api_key"
E2B_LIST_FILE_DEPTH = "e2b_list_file_depth"
E2B_DEFAULT_TEMPLATE = "code-interpreter-v1"
class StoreKey(StrEnum):
SANDBOX = "sandbox"
def construct_environment(self, options: Mapping[str, Any], environments: Mapping[str, str]) -> Metadata:
"""
Construct a new E2B virtual environment.
"""
# TODO: add Dify as the user agent
sandbox = Sandbox.create(
template=options.get(self.OptionsKey.E2B_DEFAULT_TEMPLATE, "code-interpreter-v1"),
api_key=options.get(self.OptionsKey.API_KEY, ""),
)
info = sandbox.get_info(api_key=options.get(self.OptionsKey.API_KEY, ""))
output = sandbox.commands.run("uname -m").stdout.strip()
return Metadata(
id=info.sandbox_id,
arch=self._convert_architecture(output),
store={
self.StoreKey.SANDBOX: sandbox,
},
)
def release_environment(self) -> None:
"""
Release the E2B virtual environment.
"""
if not Sandbox.kill(api_key=self.api_key, sandbox_id=self.metadata.id):
raise Exception(f"Failed to release E2B sandbox with ID: {self.metadata.id}")
def establish_connection(self) -> ConnectionHandle:
"""
Establish a connection to the E2B virtual environment.
"""
return ConnectionHandle(id=uuid4().hex)
def release_connection(self, connection_handle: ConnectionHandle) -> None:
"""
Release the connection to the E2B virtual environment.
"""
pass
def upload_file(self, path: str, content: BytesIO) -> None:
"""
Upload a file to the E2B virtual environment.
Args:
path (str): The path to upload the file to.
content (BytesIO): The content of the file.
"""
path = os.path.join(self._WORKDIR, path.lstrip("/"))
sandbox: Sandbox = self.metadata.store[self.StoreKey.SANDBOX]
sandbox.files.write(path, content) # pyright: ignore[reportUnknownMemberType] #
def download_file(self, path: str) -> BytesIO:
"""
Download a file from the E2B virtual environment.
Args:
path (str): The path to download the file from.
Returns:
BytesIO: The content of the file.
"""
path = os.path.join(self._WORKDIR, path.lstrip("/"))
sandbox: Sandbox = self.metadata.store[self.StoreKey.SANDBOX]
content = sandbox.files.read(path)
return BytesIO(content.encode())
def list_files(self, directory_path: str, limit: int) -> Sequence[FileState]:
"""
List files in a directory of the E2B virtual environment.
"""
sandbox: Sandbox = self.metadata.store[self.StoreKey.SANDBOX]
directory_path = os.path.join(self._WORKDIR, directory_path.lstrip("/"))
files_info = sandbox.files.list(directory_path, depth=self.options.get(self.OptionsKey.E2B_LIST_FILE_DEPTH, 3))
return [
FileState(
path=os.path.relpath(file_info.path, self._WORKDIR),
size=file_info.size,
created_at=int(file_info.modified_time.timestamp()),
updated_at=int(file_info.modified_time.timestamp()),
)
for file_info in files_info
]
def execute_command(
self, connection_handle: ConnectionHandle, command: list[str], environments: Mapping[str, str] | None = None
) -> tuple[str, TransportWriteCloser, TransportReadCloser, TransportReadCloser]:
"""
Execute a command in the E2B virtual environment.
STDIN is not yet supported. E2B's API is such a terrible mess... to support it may lead a bad design.
as a result we leave it for future improvement.
"""
sandbox: Sandbox = self.metadata.store[self.StoreKey.SANDBOX]
stdout_stream = QueueTransportReadCloser()
stderr_stream = QueueTransportReadCloser()
threading.Thread(
target=self._cmd_thread,
args=(sandbox, command, environments, stdout_stream, stderr_stream),
).start()
return (
"N/A",
NopTransportWriteCloser(), # stdin not supported yet
stdout_stream,
stderr_stream,
)
def get_command_status(self, connection_handle: ConnectionHandle, pid: str) -> CommandStatus:
return super().get_command_status(connection_handle, pid)
def _cmd_thread(
self,
sandbox: Sandbox,
command: list[str],
environments: Mapping[str, str] | None,
stdout_stream: QueueTransportReadCloser,
stderr_stream: QueueTransportReadCloser,
) -> None:
""" """
stdout_stream_write_handler = stdout_stream.get_write_handler()
stderr_stream_write_handler = stderr_stream.get_write_handler()
sandbox.commands.run(
cmd=" ".join(command),
envs=dict(environments or {}),
# stdin=True,
on_stdout=lambda data: stdout_stream_write_handler.write(data.encode()),
on_stderr=lambda data: stderr_stream_write_handler.write(data.encode()),
)
# Close the write handlers to signal EOF
stdout_stream.close()
stderr_stream.close()
@cached_property
def api_key(self) -> str:
"""
Get the API key for the E2B environment.
"""
return self.options.get(self.OptionsKey.API_KEY, "")
def _convert_architecture(self, arch_str: str) -> Arch:
"""
Convert architecture string to standard format.
"""
arch_map = {
"x86_64": Arch.AMD64,
"aarch64": Arch.ARM64,
"armv7l": Arch.ARM64,
"arm64": Arch.ARM64,
"amd64": Arch.AMD64,
"arm64v8": Arch.ARM64,
"arm64v7": Arch.ARM64,
}
if arch_str in arch_map:
return arch_map[arch_str]
raise ArchNotSupportedError(f"Unsupported architecture: {arch_str}")

View File

@ -11,8 +11,8 @@ from uuid import uuid4
from core.virtual_environment.__base.entities import Arch, CommandStatus, ConnectionHandle, FileState, Metadata
from core.virtual_environment.__base.exec import ArchNotSupportedError
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
from core.virtual_environment.channel.pipe_transport import PipeTransport
from core.virtual_environment.channel.transport import Transport
from core.virtual_environment.channel.pipe_transport import PipeReadCloser, PipeWriteCloser
from core.virtual_environment.channel.transport import TransportReadCloser, TransportWriteCloser
class LocalVirtualEnvironment(VirtualEnvironment):
@ -23,7 +23,7 @@ class LocalVirtualEnvironment(VirtualEnvironment):
NEVER USE IT IN PRODUCTION ENVIRONMENTS.
"""
def construct_environment(self, options: Mapping[str, Any], environments: Mapping[str, Any]) -> Metadata:
def construct_environment(self, options: Mapping[str, Any], environments: Mapping[str, str]) -> Metadata:
"""
Construct the local virtual environment.
@ -118,7 +118,7 @@ class LocalVirtualEnvironment(VirtualEnvironment):
def execute_command(
self, connection_handle: ConnectionHandle, command: list[str], environments: Mapping[str, str] | None = None
) -> tuple[str, Transport, Transport, Transport]:
) -> tuple[str, TransportWriteCloser, TransportReadCloser, TransportReadCloser]:
"""
Execute a command in the local virtual environment.
@ -162,9 +162,9 @@ class LocalVirtualEnvironment(VirtualEnvironment):
os.close(stderr_write_fd)
# Create PipeTransport instances for stdin, stdout, and stderr
stdin_transport = PipeTransport(r_fd=stdin_read_fd, w_fd=stdin_write_fd)
stdout_transport = PipeTransport(r_fd=stdout_read_fd, w_fd=stdout_write_fd)
stderr_transport = PipeTransport(r_fd=stderr_read_fd, w_fd=stderr_write_fd)
stdin_transport = PipeWriteCloser(w_fd=stdin_write_fd)
stdout_transport = PipeReadCloser(r_fd=stdout_read_fd)
stderr_transport = PipeReadCloser(r_fd=stderr_read_fd)
# Return the process ID and file descriptors for stdin, stdout, and stderr
return str(process.pid), stdin_transport, stdout_transport, stderr_transport