mirror of
https://github.com/langgenius/dify.git
synced 2026-04-26 21:55:58 +08:00
fix: harden sandbox builder cleanup
This commit is contained in:
@ -3,9 +3,10 @@ from __future__ import annotations
|
||||
import logging
|
||||
import threading
|
||||
from collections.abc import Mapping, Sequence
|
||||
from typing import TYPE_CHECKING, Any
|
||||
from contextlib import nullcontext
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
from flask import current_app
|
||||
from flask import Flask, current_app, has_app_context
|
||||
|
||||
from core.entities.provider_entities import BasicProviderConfig
|
||||
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
|
||||
@ -100,6 +101,11 @@ class SandboxBuilder:
|
||||
return self
|
||||
|
||||
def build(self) -> Sandbox:
|
||||
"""Create a sandbox and start background initialization.
|
||||
|
||||
The builder is responsible for cleaning up any VM or sandbox that was
|
||||
successfully created if a later setup step fails.
|
||||
"""
|
||||
if self._storage is None:
|
||||
raise ValueError("storage is required, call .storage() before .build()")
|
||||
if self._assets_id is None:
|
||||
@ -109,41 +115,55 @@ class SandboxBuilder:
|
||||
if self._app_id is None:
|
||||
raise ValueError("app_id is required, call .app() before .build()")
|
||||
|
||||
vm_class = _get_sandbox_class(self._sandbox_type)
|
||||
vm = vm_class(
|
||||
tenant_id=self._tenant_id,
|
||||
options=self._options,
|
||||
environments=self._environments,
|
||||
user_id=self._user_id,
|
||||
)
|
||||
sandbox = Sandbox(
|
||||
vm=vm,
|
||||
storage=self._storage,
|
||||
tenant_id=self._tenant_id,
|
||||
user_id=self._user_id,
|
||||
app_id=self._app_id,
|
||||
assets_id=self._assets_id,
|
||||
)
|
||||
|
||||
ctx = SandboxInitializeContext(
|
||||
tenant_id=self._tenant_id,
|
||||
app_id=self._app_id,
|
||||
assets_id=self._assets_id,
|
||||
user_id=self._user_id,
|
||||
)
|
||||
vm: VirtualEnvironment | None = None
|
||||
sandbox: Sandbox | None = None
|
||||
try:
|
||||
vm_class = _get_sandbox_class(self._sandbox_type)
|
||||
vm = vm_class(
|
||||
tenant_id=self._tenant_id,
|
||||
options=self._options,
|
||||
environments=self._environments,
|
||||
user_id=self._user_id,
|
||||
)
|
||||
sandbox = Sandbox(
|
||||
vm=vm,
|
||||
storage=self._storage,
|
||||
tenant_id=self._tenant_id,
|
||||
user_id=self._user_id,
|
||||
app_id=self._app_id,
|
||||
assets_id=self._assets_id,
|
||||
)
|
||||
|
||||
# Run synchronous initializers before marking sandbox as ready.
|
||||
for init in self._initializers:
|
||||
if isinstance(init, SyncSandboxInitializer):
|
||||
init.initialize(sandbox, ctx)
|
||||
for init in self._initializers:
|
||||
if isinstance(init, SyncSandboxInitializer):
|
||||
init.initialize(sandbox, ctx)
|
||||
except Exception as exc:
|
||||
logger.exception(
|
||||
"Failed to initialize sandbox synchronously: tenant_id=%s, app_id=%s", self._tenant_id, self._app_id
|
||||
)
|
||||
if sandbox is not None:
|
||||
sandbox.release()
|
||||
elif vm is not None:
|
||||
try:
|
||||
vm.release_environment()
|
||||
except Exception:
|
||||
logger.exception("Failed to release sandbox VM during builder cleanup")
|
||||
raise RuntimeError("Sandbox initialization failed") from exc
|
||||
|
||||
# Run sandbox setup asynchronously so workflow execution can proceed.
|
||||
# Capture the Flask app before starting the thread for database access.
|
||||
flask_app = current_app._get_current_object() # type: ignore
|
||||
flask_app: Flask | None = cast(Any, current_app)._get_current_object() if has_app_context() else None
|
||||
|
||||
def initialize() -> None:
|
||||
with flask_app.app_context():
|
||||
try:
|
||||
try:
|
||||
app_context = flask_app.app_context() if flask_app is not None else nullcontext()
|
||||
with app_context:
|
||||
for init in self._initializers:
|
||||
if not isinstance(init, AsyncSandboxInitializer):
|
||||
continue
|
||||
@ -151,18 +171,36 @@ class SandboxBuilder:
|
||||
if sandbox.is_cancelled():
|
||||
return
|
||||
init.initialize(sandbox, ctx)
|
||||
|
||||
if sandbox.is_cancelled():
|
||||
return
|
||||
sandbox.mount()
|
||||
sandbox.mark_ready()
|
||||
except Exception as exc:
|
||||
except Exception as exc:
|
||||
try:
|
||||
logger.exception(
|
||||
"Failed to initialize sandbox: tenant_id=%s, app_id=%s", self._tenant_id, self._app_id
|
||||
)
|
||||
sandbox.release()
|
||||
sandbox.mark_failed(exc)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Failed to mark sandbox initialization failure: tenant_id=%s, app_id=%s",
|
||||
self._tenant_id,
|
||||
self._app_id,
|
||||
)
|
||||
|
||||
# Background init completes or signals failure via sandbox state.
|
||||
threading.Thread(target=initialize, daemon=True).start()
|
||||
try:
|
||||
threading.Thread(target=initialize, daemon=True).start()
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Failed to start sandbox initialization thread: tenant_id=%s, app_id=%s",
|
||||
self._tenant_id,
|
||||
self._app_id,
|
||||
)
|
||||
sandbox.release()
|
||||
raise RuntimeError("Sandbox initialization failed")
|
||||
return sandbox
|
||||
|
||||
@staticmethod
|
||||
|
||||
Reference in New Issue
Block a user