mirror of
https://github.com/langgenius/dify.git
synced 2026-05-03 00:48:04 +08:00
feat: refactor initializers to support async and sync execution
This commit is contained in:
@ -9,7 +9,7 @@ from core.entities.provider_entities import BasicProviderConfig
|
||||
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
|
||||
|
||||
from .entities.sandbox_type import SandboxType
|
||||
from .initializer import SandboxInitializer
|
||||
from .initializer import AsyncSandboxInitializer, SandboxInitializer, SyncSandboxInitializer
|
||||
from .sandbox import Sandbox
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@ -113,19 +113,16 @@ class SandboxBuilder:
|
||||
assets_id=self._assets_id,
|
||||
)
|
||||
|
||||
"""
|
||||
# Run synchronous initializers before marking sandbox as ready.
|
||||
"""
|
||||
for init in self._initializers:
|
||||
if init.async_initialize():
|
||||
continue
|
||||
init.initialize(sandbox)
|
||||
if isinstance(init, SyncSandboxInitializer):
|
||||
init.initialize(sandbox)
|
||||
|
||||
# Run sandbox setup asynchronously so workflow execution can proceed.
|
||||
def initialize() -> None:
|
||||
try:
|
||||
for init in self._initializers:
|
||||
if not init.async_initialize():
|
||||
if not isinstance(init, AsyncSandboxInitializer):
|
||||
continue
|
||||
|
||||
if sandbox.is_cancelled():
|
||||
|
||||
@ -1,11 +1,13 @@
|
||||
from .app_assets_initializer import AppAssetsInitializer
|
||||
from .base import SandboxInitializer
|
||||
from .base import AsyncSandboxInitializer, SandboxInitializer, SyncSandboxInitializer
|
||||
from .dify_cli_initializer import DifyCliInitializer
|
||||
from .draft_app_assets_initializer import DraftAppAssetsInitializer
|
||||
|
||||
__all__ = [
|
||||
"AppAssetsInitializer",
|
||||
"AsyncSandboxInitializer",
|
||||
"DifyCliInitializer",
|
||||
"DraftAppAssetsInitializer",
|
||||
"SandboxInitializer",
|
||||
"SyncSandboxInitializer",
|
||||
]
|
||||
|
||||
@ -1,16 +1,16 @@
|
||||
from core.app_assets.constants import AppAssetsAttrs
|
||||
from core.sandbox.initializer.base import SandboxInitializer
|
||||
from core.sandbox.initializer.base import SyncSandboxInitializer
|
||||
from core.sandbox.sandbox import Sandbox
|
||||
from services.app_asset_service import AppAssetService
|
||||
|
||||
|
||||
class AppAssetsAttrsInitializer(SandboxInitializer):
|
||||
class AppAssetsAttrsInitializer(SyncSandboxInitializer):
|
||||
def __init__(self, tenant_id: str, app_id: str, assets_id: str) -> None:
|
||||
self._tenant_id = tenant_id
|
||||
self._app_id = app_id
|
||||
self._assets_id = assets_id
|
||||
|
||||
def initialize(self, env: Sandbox) -> None:
|
||||
def initialize(self, sandbox: Sandbox) -> None:
|
||||
# Load published app assets and unzip the artifact bundle.
|
||||
app_assets = AppAssetService.get_tenant_app_assets(self._tenant_id, self._assets_id)
|
||||
env.attrs.set(AppAssetsAttrs.FILE_TREE, app_assets.asset_tree)
|
||||
sandbox.attrs.set(AppAssetsAttrs.FILE_TREE, app_assets.asset_tree)
|
||||
|
||||
@ -7,21 +7,21 @@ from extensions.ext_storage import storage
|
||||
from extensions.storage.file_presign_storage import FilePresignStorage
|
||||
|
||||
from ..entities import AppAssets
|
||||
from .base import SandboxInitializer
|
||||
from .base import AsyncSandboxInitializer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
APP_ASSETS_DOWNLOAD_TIMEOUT = 60 * 10
|
||||
|
||||
|
||||
class AppAssetsInitializer(SandboxInitializer):
|
||||
class AppAssetsInitializer(AsyncSandboxInitializer):
|
||||
def __init__(self, tenant_id: str, app_id: str, assets_id: str) -> None:
|
||||
self._tenant_id = tenant_id
|
||||
self._app_id = app_id
|
||||
self._assets_id = assets_id
|
||||
|
||||
def initialize(self, env: Sandbox) -> None:
|
||||
vm = env.vm
|
||||
def initialize(self, sandbox: Sandbox) -> None:
|
||||
vm = sandbox.vm
|
||||
zip_key = AssetPaths.build_zip(self._tenant_id, self._app_id, self._assets_id)
|
||||
download_url = FilePresignStorage(storage.storage_runner).get_download_url(zip_key)
|
||||
|
||||
@ -45,6 +45,3 @@ class AppAssetsInitializer(SandboxInitializer):
|
||||
self._app_id,
|
||||
self._assets_id,
|
||||
)
|
||||
|
||||
def async_initialize(self) -> bool:
|
||||
return True
|
||||
|
||||
@ -5,10 +5,12 @@ from core.sandbox.sandbox import Sandbox
|
||||
|
||||
class SandboxInitializer(ABC):
|
||||
@abstractmethod
|
||||
def initialize(self, env: Sandbox) -> None: ...
|
||||
def initialize(self, sandbox: Sandbox) -> None: ...
|
||||
|
||||
def async_initialize(self) -> bool:
|
||||
"""
|
||||
Whether the initializer needs to run asynchronously.
|
||||
"""
|
||||
return False
|
||||
|
||||
class SyncSandboxInitializer(SandboxInitializer):
|
||||
"""Marker class for initializers that must run before async setup."""
|
||||
|
||||
|
||||
class AsyncSandboxInitializer(SandboxInitializer):
|
||||
"""Marker class for initializers that can run in the background."""
|
||||
|
||||
@ -12,12 +12,12 @@ from core.virtual_environment.__base.helpers import pipeline
|
||||
|
||||
from ..bash.dify_cli import DifyCliConfig, DifyCliLocator
|
||||
from ..entities import DifyCli
|
||||
from .base import SandboxInitializer
|
||||
from .base import AsyncSandboxInitializer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DifyCliInitializer(SandboxInitializer):
|
||||
class DifyCliInitializer(AsyncSandboxInitializer):
|
||||
def __init__(
|
||||
self,
|
||||
tenant_id: str,
|
||||
@ -82,6 +82,3 @@ class DifyCliInitializer(SandboxInitializer):
|
||||
).execute(raise_on_error=True)
|
||||
|
||||
logger.info("Global tools initialized, path=%s, tool_count=%d", DifyCli.GLOBAL_TOOLS_PATH, len(self._tools))
|
||||
|
||||
def async_initialize(self) -> bool:
|
||||
return True
|
||||
|
||||
@ -7,21 +7,21 @@ from core.sandbox.services.asset_download_service import AssetDownloadItem
|
||||
from core.virtual_environment.__base.helpers import pipeline
|
||||
from services.app_asset_service import AppAssetService
|
||||
|
||||
from .base import SandboxInitializer
|
||||
from .base import AsyncSandboxInitializer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DRAFT_ASSETS_DOWNLOAD_TIMEOUT = 60 * 10
|
||||
|
||||
|
||||
class DraftAppAssetsInitializer(SandboxInitializer):
|
||||
class DraftAppAssetsInitializer(AsyncSandboxInitializer):
|
||||
def __init__(self, tenant_id: str, app_id: str, assets_id: str) -> None:
|
||||
self._tenant_id = tenant_id
|
||||
self._app_id = app_id
|
||||
self._assets_id = assets_id
|
||||
|
||||
def initialize(self, env: Sandbox) -> None:
|
||||
vm = env.vm
|
||||
def initialize(self, sandbox: Sandbox) -> None:
|
||||
vm = sandbox.vm
|
||||
# Draft assets download via presigned URLs to avoid zip build overhead.
|
||||
# FIXME(Yeuoly): merge 2 IO operations in DraftAppAssetsInitializer and AppAssetsAttrsInitializer
|
||||
app_assets = AppAssetService.get_tenant_app_assets(self._tenant_id, self._assets_id)
|
||||
@ -41,6 +41,3 @@ class DraftAppAssetsInitializer(SandboxInitializer):
|
||||
self._app_id,
|
||||
self._assets_id,
|
||||
)
|
||||
|
||||
def async_initialize(self) -> bool:
|
||||
return True
|
||||
|
||||
@ -6,12 +6,12 @@ from core.sandbox.sandbox import Sandbox
|
||||
from core.skill import SkillAttrs
|
||||
from core.skill.skill_manager import SkillManager
|
||||
|
||||
from .base import SandboxInitializer
|
||||
from .base import SyncSandboxInitializer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SkillInitializer(SandboxInitializer):
|
||||
class SkillInitializer(SyncSandboxInitializer):
|
||||
def __init__(
|
||||
self,
|
||||
tenant_id: str,
|
||||
|
||||
Reference in New Issue
Block a user