refactor: remove keepalive thread and utilize E2B's native timeout management

This commit is contained in:
Harry
2026-03-12 17:22:12 +08:00
parent e1510a64c9
commit 7bd6dd588e

View File

@ -1,4 +1,3 @@
import logging
import posixpath import posixpath
import shlex import shlex
import threading import threading
@ -9,6 +8,7 @@ from io import BytesIO
from typing import Any from typing import Any
from uuid import uuid4 from uuid import uuid4
from configs import dify_config
from core.entities.provider_entities import BasicProviderConfig from core.entities.provider_entities import BasicProviderConfig
from core.virtual_environment.__base.entities import ( from core.virtual_environment.__base.entities import (
Arch, Arch,
@ -32,9 +32,6 @@ from core.virtual_environment.channel.transport import (
) )
from core.virtual_environment.constants import COMMAND_EXECUTION_TIMEOUT_SECONDS from core.virtual_environment.constants import COMMAND_EXECUTION_TIMEOUT_SECONDS
logger = logging.getLogger(__name__)
""" """
import logging import logging
from collections.abc import Mapping from collections.abc import Mapping
@ -99,7 +96,6 @@ class E2BEnvironment(VirtualEnvironment):
class StoreKey(StrEnum): class StoreKey(StrEnum):
SANDBOX = "sandbox" SANDBOX = "sandbox"
KEEPALIVE_STOP = "keepalive_stop"
@classmethod @classmethod
def get_config_schema(cls) -> list[BasicProviderConfig]: def get_config_schema(cls) -> list[BasicProviderConfig]:
@ -132,6 +128,10 @@ class E2BEnvironment(VirtualEnvironment):
def _construct_environment(self, options: Mapping[str, Any], environments: Mapping[str, str]) -> Metadata: def _construct_environment(self, options: Mapping[str, Any], environments: Mapping[str, str]) -> Metadata:
""" """
Construct a new E2B virtual environment. Construct a new E2B virtual environment.
The sandbox lifetime is capped by ``WORKFLOW_MAX_EXECUTION_TIME`` so the
provider can rely on E2B's native timeout instead of a background
keepalive thread that continuously extends the session.
""" """
# Import E2B SDK lazily so it is loaded after gevent monkey-patching. # Import E2B SDK lazily so it is loaded after gevent monkey-patching.
from e2b_code_interpreter import Sandbox # type: ignore[import-untyped] from e2b_code_interpreter import Sandbox # type: ignore[import-untyped]
@ -139,6 +139,7 @@ class E2BEnvironment(VirtualEnvironment):
# TODO: add Dify as the user agent # TODO: add Dify as the user agent
sandbox = Sandbox.create( sandbox = Sandbox.create(
template=options.get(self.OptionsKey.E2B_DEFAULT_TEMPLATE, "code-interpreter-v1"), template=options.get(self.OptionsKey.E2B_DEFAULT_TEMPLATE, "code-interpreter-v1"),
timeout=dify_config.WORKFLOW_MAX_EXECUTION_TIME,
api_key=options.get(self.OptionsKey.API_KEY, ""), api_key=options.get(self.OptionsKey.API_KEY, ""),
api_url=options.get(self.OptionsKey.E2B_API_URL, self._E2B_API_URL), api_url=options.get(self.OptionsKey.E2B_API_URL, self._E2B_API_URL),
envs=dict(environments), envs=dict(environments),
@ -152,20 +153,12 @@ class E2BEnvironment(VirtualEnvironment):
arch_part = system_parts[0] arch_part = system_parts[0]
os_part = system_parts[1] if len(system_parts) > 1 else "" os_part = system_parts[1] if len(system_parts) > 1 else ""
stop_event = threading.Event()
threading.Thread(
target=self._keepalive_thread,
args=(sandbox, stop_event),
daemon=True,
).start()
return Metadata( return Metadata(
id=info.sandbox_id, id=info.sandbox_id,
arch=self._convert_architecture(arch_part.strip()), arch=self._convert_architecture(arch_part.strip()),
os=self._convert_operating_system(os_part.strip()), os=self._convert_operating_system(os_part.strip()),
store={ store={
self.StoreKey.SANDBOX: sandbox, self.StoreKey.SANDBOX: sandbox,
self.StoreKey.KEEPALIVE_STOP: stop_event,
}, },
) )
@ -175,10 +168,6 @@ class E2BEnvironment(VirtualEnvironment):
""" """
from e2b_code_interpreter import Sandbox # type: ignore[import-untyped] from e2b_code_interpreter import Sandbox # type: ignore[import-untyped]
stop_event: threading.Event | None = self.metadata.store.get(self.StoreKey.KEEPALIVE_STOP)
if stop_event:
stop_event.set()
if not Sandbox.kill(api_key=self.api_key, sandbox_id=self.metadata.id): if not Sandbox.kill(api_key=self.api_key, sandbox_id=self.metadata.id):
raise Exception(f"Failed to release E2B sandbox with ID: {self.metadata.id}") raise Exception(f"Failed to release E2B sandbox with ID: {self.metadata.id}")
@ -305,14 +294,6 @@ class E2BEnvironment(VirtualEnvironment):
stdout_stream.close() stdout_stream.close()
stderr_stream.close() stderr_stream.close()
def _keepalive_thread(self, sandbox: Any, stop_event: threading.Event) -> None:
while not stop_event.wait(timeout=60):
try:
sandbox.set_timeout(300)
except Exception:
logger.warning("Failed to refresh E2B sandbox timeout, sandbox may have been killed")
break
@cached_property @cached_property
def api_key(self) -> str: def api_key(self) -> str:
""" """