mirror of
https://github.com/langgenius/dify.git
synced 2026-05-05 09:58:04 +08:00
fix: defer sandbox SDK imports for gevent
This commit is contained in:
@ -1,3 +1,5 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import socket
|
||||
import tarfile
|
||||
@ -8,13 +10,13 @@ from functools import lru_cache
|
||||
from io import BytesIO
|
||||
from pathlib import PurePosixPath
|
||||
from queue import Empty, Queue
|
||||
from typing import Any, cast
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
from uuid import uuid4
|
||||
|
||||
import docker.errors
|
||||
from docker.models.containers import Container
|
||||
if TYPE_CHECKING:
|
||||
from docker.models.containers import Container
|
||||
|
||||
import docker
|
||||
import docker
|
||||
from configs import dify_config
|
||||
from core.entities.provider_entities import BasicProviderConfig
|
||||
from core.virtual_environment.__base.entities import (
|
||||
@ -289,6 +291,9 @@ class DockerDaemonEnvironment(VirtualEnvironment):
|
||||
|
||||
@classmethod
|
||||
def validate(cls, options: Mapping[str, Any]) -> None:
|
||||
# Import Docker SDK lazily so it is loaded after gevent monkey-patching.
|
||||
import docker
|
||||
|
||||
docker_sock = options.get(cls.OptionsKey.DOCKER_SOCK, cls._DEFAULT_DOCKER_SOCK)
|
||||
try:
|
||||
client = docker.DockerClient(base_url=docker_sock)
|
||||
@ -300,6 +305,7 @@ class DockerDaemonEnvironment(VirtualEnvironment):
|
||||
"""
|
||||
Construct the Docker daemon virtual environment.
|
||||
"""
|
||||
|
||||
docker_client = self.get_docker_daemon(
|
||||
docker_sock=options.get(self.OptionsKey.DOCKER_SOCK, self._DEFAULT_DOCKER_SOCK)
|
||||
)
|
||||
@ -357,6 +363,7 @@ class DockerDaemonEnvironment(VirtualEnvironment):
|
||||
|
||||
NOTE: I guess nobody will use more than 5 different docker sockets in practice....
|
||||
"""
|
||||
import docker
|
||||
return docker.DockerClient(base_url=docker_sock)
|
||||
|
||||
@classmethod
|
||||
@ -365,6 +372,7 @@ class DockerDaemonEnvironment(VirtualEnvironment):
|
||||
"""
|
||||
Get the Docker low-level API client.
|
||||
"""
|
||||
import docker
|
||||
return docker.APIClient(base_url=docker_sock)
|
||||
|
||||
def get_docker_sock(self) -> str:
|
||||
@ -471,6 +479,8 @@ class DockerDaemonEnvironment(VirtualEnvironment):
|
||||
return BytesIO(extracted.read())
|
||||
|
||||
def list_files(self, directory_path: str, limit: int) -> Sequence[FileState]:
|
||||
import docker
|
||||
|
||||
container = self._get_container()
|
||||
container_path = self._container_path(directory_path)
|
||||
relative_base = self._relative_path(directory_path)
|
||||
@ -515,6 +525,8 @@ class DockerDaemonEnvironment(VirtualEnvironment):
|
||||
pass
|
||||
|
||||
def release_environment(self) -> None:
|
||||
import docker
|
||||
|
||||
try:
|
||||
container = self._get_container()
|
||||
except docker.errors.NotFound:
|
||||
|
||||
@ -9,8 +9,6 @@ from io import BytesIO
|
||||
from typing import Any
|
||||
from uuid import uuid4
|
||||
|
||||
from e2b_code_interpreter import Sandbox # type: ignore[import-untyped]
|
||||
|
||||
from core.entities.provider_entities import BasicProviderConfig
|
||||
from core.virtual_environment.__base.entities import (
|
||||
Arch,
|
||||
@ -112,9 +110,12 @@ class E2BEnvironment(VirtualEnvironment):
|
||||
|
||||
@classmethod
|
||||
def validate(cls, options: Mapping[str, Any]) -> None:
|
||||
# Import E2B SDK lazily so it is loaded after gevent monkey-patching.
|
||||
# See `api/gunicorn.conf.py` for how we patch other third-party libs (e.g. gRPC).
|
||||
from e2b.exceptions import (
|
||||
AuthenticationException, # type: ignore[import-untyped]
|
||||
)
|
||||
from e2b_code_interpreter import Sandbox # type: ignore[import-untyped]
|
||||
|
||||
api_key = options.get(cls.OptionsKey.API_KEY, "")
|
||||
if not api_key:
|
||||
@ -131,6 +132,9 @@ class E2BEnvironment(VirtualEnvironment):
|
||||
"""
|
||||
Construct a new E2B virtual environment.
|
||||
"""
|
||||
# Import E2B SDK lazily so it is loaded after gevent monkey-patching.
|
||||
from e2b_code_interpreter import Sandbox # type: ignore[import-untyped]
|
||||
|
||||
# TODO: add Dify as the user agent
|
||||
sandbox = Sandbox.create(
|
||||
template=options.get(self.OptionsKey.E2B_DEFAULT_TEMPLATE, "code-interpreter-v1"),
|
||||
@ -168,6 +172,8 @@ class E2BEnvironment(VirtualEnvironment):
|
||||
"""
|
||||
Release the E2B virtual environment.
|
||||
"""
|
||||
from e2b_code_interpreter import Sandbox # type: ignore[import-untyped]
|
||||
|
||||
stop_event: threading.Event | None = self.metadata.store.get(self.StoreKey.KEEPALIVE_STOP)
|
||||
if stop_event:
|
||||
stop_event.set()
|
||||
@ -196,7 +202,7 @@ class E2BEnvironment(VirtualEnvironment):
|
||||
content (BytesIO): The content of the file.
|
||||
"""
|
||||
remote_path = self._workspace_path(path)
|
||||
sandbox: Sandbox = self.metadata.store[self.StoreKey.SANDBOX]
|
||||
sandbox = self.metadata.store[self.StoreKey.SANDBOX]
|
||||
sandbox.files.write(remote_path, content) # pyright: ignore[reportUnknownMemberType] #
|
||||
|
||||
def download_file(self, path: str) -> BytesIO:
|
||||
@ -209,7 +215,7 @@ class E2BEnvironment(VirtualEnvironment):
|
||||
BytesIO: The content of the file.
|
||||
"""
|
||||
remote_path = self._workspace_path(path)
|
||||
sandbox: Sandbox = self.metadata.store[self.StoreKey.SANDBOX]
|
||||
sandbox = self.metadata.store[self.StoreKey.SANDBOX]
|
||||
content = sandbox.files.read(remote_path)
|
||||
return BytesIO(content.encode())
|
||||
|
||||
@ -217,7 +223,7 @@ class E2BEnvironment(VirtualEnvironment):
|
||||
"""
|
||||
List files in a directory of the E2B virtual environment.
|
||||
"""
|
||||
sandbox: Sandbox = self.metadata.store[self.StoreKey.SANDBOX]
|
||||
sandbox = self.metadata.store[self.StoreKey.SANDBOX]
|
||||
remote_dir = self._workspace_path(directory_path)
|
||||
files_info = sandbox.files.list(remote_dir, depth=self.options.get(self.OptionsKey.E2B_LIST_FILE_DEPTH, 3))
|
||||
return [
|
||||
@ -243,7 +249,7 @@ class E2BEnvironment(VirtualEnvironment):
|
||||
STDIN is not yet supported. E2B's API is such a terrible mess... to support it may lead a bad design.
|
||||
as a result we leave it for future improvement.
|
||||
"""
|
||||
sandbox: Sandbox = self.metadata.store[self.StoreKey.SANDBOX]
|
||||
sandbox = self.metadata.store[self.StoreKey.SANDBOX]
|
||||
stdout_stream = QueueTransportReadCloser()
|
||||
stderr_stream = QueueTransportReadCloser()
|
||||
|
||||
@ -269,7 +275,7 @@ class E2BEnvironment(VirtualEnvironment):
|
||||
|
||||
def _cmd_thread(
|
||||
self,
|
||||
sandbox: Sandbox,
|
||||
sandbox: Any,
|
||||
command: list[str],
|
||||
environments: Mapping[str, str] | None,
|
||||
cwd: str,
|
||||
@ -297,7 +303,7 @@ class E2BEnvironment(VirtualEnvironment):
|
||||
stdout_stream.close()
|
||||
stderr_stream.close()
|
||||
|
||||
def _keepalive_thread(self, sandbox: Sandbox, stop_event: threading.Event) -> None:
|
||||
def _keepalive_thread(self, sandbox: Any, stop_event: threading.Event) -> None:
|
||||
while not stop_event.wait(timeout=60):
|
||||
try:
|
||||
sandbox.set_timeout(300)
|
||||
|
||||
Reference in New Issue
Block a user