mirror of
https://github.com/langgenius/dify.git
synced 2026-05-05 09:58:04 +08:00
refactor all
This commit is contained in:
@ -17,8 +17,8 @@ from core.app.entities.app_asset_entities import AppAssetFileTree
|
||||
from core.app_assets.builder import AssetBuildPipeline, BuildContext
|
||||
from core.app_assets.builder.file_builder import FileBuilder
|
||||
from core.app_assets.builder.skill_builder import SkillBuilder
|
||||
from core.app_assets.entities.assets import AssetItem, FileAsset
|
||||
from core.app_assets.storage import AssetPath
|
||||
from core.app_assets.entities.assets import AssetItem
|
||||
from core.app_assets.storage import AssetPaths
|
||||
from core.zip_sandbox import SandboxDownloadItem, ZipSandbox
|
||||
from models.app_asset import AppAssets
|
||||
from models.model import App
|
||||
@ -62,12 +62,12 @@ class AppAssetPackageService:
|
||||
"""Convert file tree to asset items for packaging."""
|
||||
files = file_tree.walk_files()
|
||||
return [
|
||||
FileAsset(
|
||||
AssetItem(
|
||||
asset_id=f.id,
|
||||
path=file_tree.get_path(f.id),
|
||||
file_name=f.name,
|
||||
extension=f.extension,
|
||||
storage_key=AssetPath.draft(tenant_id, app_id, f.id).get_storage_key(),
|
||||
storage_key=AssetPaths.draft(tenant_id, app_id, f.id),
|
||||
)
|
||||
for f in files
|
||||
]
|
||||
@ -98,8 +98,8 @@ class AppAssetPackageService:
|
||||
return
|
||||
|
||||
asset_storage = AppAssetService.get_storage()
|
||||
asset_paths = [AssetPath.draft(tenant_id, app_id, asset.asset_id) for asset in assets]
|
||||
download_urls = asset_storage.get_download_urls(asset_paths)
|
||||
keys = [AssetPaths.draft(tenant_id, app_id, asset.asset_id) for asset in assets]
|
||||
download_urls = asset_storage.get_download_urls(keys)
|
||||
download_items = [
|
||||
SandboxDownloadItem(url=url, path=asset.path) for asset, url in zip(assets, download_urls, strict=True)
|
||||
]
|
||||
@ -139,8 +139,8 @@ class AppAssetPackageService:
|
||||
ctx = BuildContext(tenant_id=tenant_id, app_id=app_id, build_id=publish_id)
|
||||
built_assets = AssetBuildPipeline([SkillBuilder(storage=asset_storage), FileBuilder()]).build_all(tree, ctx)
|
||||
|
||||
runtime_zip_path = AssetPath.build_zip(tenant_id, app_id, publish_id)
|
||||
runtime_upload_url = asset_storage.get_upload_url(runtime_zip_path)
|
||||
runtime_zip_key = AssetPaths.build_zip(tenant_id, app_id, publish_id)
|
||||
runtime_upload_url = asset_storage.get_upload_url(runtime_zip_key)
|
||||
AppAssetPackageService.package_and_upload(
|
||||
assets=built_assets,
|
||||
upload_url=runtime_upload_url,
|
||||
@ -150,8 +150,8 @@ class AppAssetPackageService:
|
||||
)
|
||||
|
||||
source_items = AppAssetService.get_draft_assets(tenant_id, app_id)
|
||||
source_zip_path = AssetPath.source_zip(tenant_id, app_id, workflow_id)
|
||||
source_upload_url = asset_storage.get_upload_url(source_zip_path)
|
||||
source_key = AssetPaths.source_zip(tenant_id, app_id, workflow_id)
|
||||
source_upload_url = asset_storage.get_upload_url(source_key)
|
||||
AppAssetPackageService.package_and_upload(
|
||||
assets=source_items,
|
||||
upload_url=source_upload_url,
|
||||
@ -176,8 +176,8 @@ class AppAssetPackageService:
|
||||
).build_all(tree, ctx)
|
||||
|
||||
user_id = getattr(assets, "updated_by", None) or getattr(assets, "created_by", None) or "system"
|
||||
zip_path = AssetPath.build_zip(tenant_id, app_id, assets.id)
|
||||
upload_url = asset_storage.get_upload_url(zip_path)
|
||||
key = AssetPaths.build_zip(tenant_id, app_id, assets.id)
|
||||
upload_url = asset_storage.get_upload_url(key)
|
||||
AppAssetPackageService.package_and_upload(
|
||||
assets=built_assets,
|
||||
upload_url=upload_url,
|
||||
|
||||
@ -13,11 +13,13 @@ from core.app.entities.app_asset_entities import (
|
||||
TreeParentNotFoundError,
|
||||
TreePathConflictError,
|
||||
)
|
||||
from core.app_assets.entities.assets import AssetItem, FileAsset
|
||||
from core.app_assets.storage import AppAssetStorage, AssetPath
|
||||
from core.app_assets.entities.assets import AssetItem
|
||||
from core.app_assets.storage import AssetPaths
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from extensions.ext_storage import storage
|
||||
from extensions.storage.cached_presign_storage import CachedPresignStorage
|
||||
from extensions.storage.file_presign_storage import FilePresignStorage
|
||||
from models.app_asset import AppAssets
|
||||
from models.model import App
|
||||
|
||||
@ -34,20 +36,17 @@ logger = logging.getLogger(__name__)
|
||||
class AppAssetService:
|
||||
MAX_PREVIEW_CONTENT_SIZE = 5 * 1024 * 1024 # 5MB
|
||||
_LOCK_TIMEOUT_SECONDS = 60
|
||||
_DRAFT_CACHE_KEY_PREFIX = "app_asset:draft_download"
|
||||
|
||||
@staticmethod
|
||||
def get_storage() -> AppAssetStorage:
|
||||
"""Get a lazily-initialized AppAssetStorage instance.
|
||||
def get_storage() -> CachedPresignStorage:
|
||||
"""Get a lazily-initialized storage instance for app assets.
|
||||
|
||||
This method creates an AppAssetStorage each time it's called,
|
||||
ensuring storage.storage_runner is only accessed after init_app.
|
||||
|
||||
The storage is wrapped with FilePresignStorage for presign fallback support
|
||||
and CachedPresignStorage for URL caching.
|
||||
Returns a CachedPresignStorage wrapping FilePresignStorage,
|
||||
providing presign fallback and URL caching.
|
||||
"""
|
||||
return AppAssetStorage(
|
||||
storage=storage.storage_runner,
|
||||
return CachedPresignStorage(
|
||||
storage=FilePresignStorage(storage.storage_runner),
|
||||
cache_key_prefix="app_assets",
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
@ -90,12 +89,12 @@ class AppAssetService:
|
||||
def get_draft_asset_items(tenant_id: str, app_id: str, file_tree: AppAssetFileTree) -> list[AssetItem]:
|
||||
files = file_tree.walk_files()
|
||||
return [
|
||||
FileAsset(
|
||||
AssetItem(
|
||||
asset_id=f.id,
|
||||
path=file_tree.get_path(f.id),
|
||||
file_name=f.name,
|
||||
extension=f.extension,
|
||||
storage_key=AssetPath.draft(tenant_id, app_id, f.id).get_storage_key(),
|
||||
storage_key=AssetPaths.draft(tenant_id, app_id, f.id),
|
||||
)
|
||||
for f in files
|
||||
]
|
||||
@ -218,8 +217,8 @@ class AppAssetService:
|
||||
raise AppAssetNodeTooLargeError(f"File node {node_id} size exceeded the limit: {max_size_mb} MB")
|
||||
|
||||
asset_storage = AppAssetService.get_storage()
|
||||
asset_path = AssetPath.draft(app_model.tenant_id, app_model.id, node_id)
|
||||
return asset_storage.load(asset_path)
|
||||
key = AssetPaths.draft(app_model.tenant_id, app_model.id, node_id)
|
||||
return asset_storage.load_once(key)
|
||||
|
||||
@staticmethod
|
||||
def update_file_content(
|
||||
@ -239,8 +238,8 @@ class AppAssetService:
|
||||
raise AppAssetNodeNotFoundError(str(e)) from e
|
||||
|
||||
asset_storage = AppAssetService.get_storage()
|
||||
asset_path = AssetPath.draft(app_model.tenant_id, app_model.id, node_id)
|
||||
asset_storage.save(asset_path, content)
|
||||
key = AssetPaths.draft(app_model.tenant_id, app_model.id, node_id)
|
||||
asset_storage.save(key, content)
|
||||
|
||||
assets.asset_tree = tree
|
||||
assets.updated_by = account_id
|
||||
@ -340,15 +339,11 @@ class AppAssetService:
|
||||
def _delete_file_from_storage(tenant_id: str, app_id: str, node_ids: list[str]) -> None:
|
||||
asset_storage = AppAssetService.get_storage()
|
||||
for nid in node_ids:
|
||||
asset_path = AssetPath.draft(tenant_id, app_id, nid)
|
||||
key = AssetPaths.draft(tenant_id, app_id, nid)
|
||||
try:
|
||||
asset_storage.delete(asset_path)
|
||||
asset_storage.delete(key)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to delete storage file %s",
|
||||
asset_path.get_storage_key(),
|
||||
exc_info=True,
|
||||
)
|
||||
logger.warning("Failed to delete storage file %s", key, exc_info=True)
|
||||
|
||||
threading.Thread(
|
||||
target=lambda: _delete_file_from_storage(app_model.tenant_id, app_model.id, removed_ids)
|
||||
@ -370,17 +365,18 @@ class AppAssetService:
|
||||
raise AppAssetNodeNotFoundError(f"File node {node_id} not found")
|
||||
|
||||
asset_storage = AppAssetService.get_storage()
|
||||
asset_path = AssetPath.draft(app_model.tenant_id, app_model.id, node_id)
|
||||
return asset_storage.get_download_url(asset_path, expires_in)
|
||||
key = AssetPaths.draft(app_model.tenant_id, app_model.id, node_id)
|
||||
return asset_storage.get_download_url(key, expires_in)
|
||||
|
||||
@staticmethod
|
||||
def get_source_zip_bytes(tenant_id: str, app_id: str, workflow_id: str) -> bytes | None:
|
||||
asset_storage = AppAssetService.get_storage()
|
||||
asset_path = AssetPath.source_zip(tenant_id, app_id, workflow_id)
|
||||
source_zip = asset_storage.load_or_none(asset_path)
|
||||
if source_zip is None:
|
||||
logger.warning("Source zip not found: %s", asset_path.get_storage_key())
|
||||
return source_zip
|
||||
key = AssetPaths.source_zip(tenant_id, app_id, workflow_id)
|
||||
try:
|
||||
return asset_storage.load_once(key)
|
||||
except FileNotFoundError:
|
||||
logger.warning("Source zip not found: %s", key)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def set_draft_assets(
|
||||
@ -434,15 +430,15 @@ class AppAssetService:
|
||||
assets.updated_by = account_id
|
||||
session.commit()
|
||||
|
||||
asset_path = AssetPath.draft(app_model.tenant_id, app_model.id, node_id)
|
||||
key = AssetPaths.draft(app_model.tenant_id, app_model.id, node_id)
|
||||
asset_storage = AppAssetService.get_storage()
|
||||
|
||||
# put empty content to create the file record
|
||||
# which avoids file not found error when uploading via presigned URL is never touched
|
||||
# resulting in inconsistent state
|
||||
asset_storage.save(asset_path, b"")
|
||||
asset_storage.save(key, b"")
|
||||
|
||||
upload_url = asset_storage.get_upload_url(asset_path, expires_in)
|
||||
upload_url = asset_storage.get_upload_url(key, expires_in)
|
||||
|
||||
return node, upload_url
|
||||
|
||||
@ -481,8 +477,8 @@ class AppAssetService:
|
||||
|
||||
def fill_urls(node: BatchUploadNode) -> None:
|
||||
if node.node_type == AssetNodeType.FILE and node.id:
|
||||
asset_path = AssetPath.draft(app_model.tenant_id, app_model.id, node.id)
|
||||
node.upload_url = asset_storage.get_upload_url(asset_path, expires_in)
|
||||
key = AssetPaths.draft(app_model.tenant_id, app_model.id, node.id)
|
||||
node.upload_url = asset_storage.get_upload_url(key, expires_in)
|
||||
for child in node.children:
|
||||
fill_urls(child)
|
||||
|
||||
|
||||
@ -36,10 +36,11 @@ from core.app.entities.app_bundle_entities import (
|
||||
BundleFormatError,
|
||||
BundleManifest,
|
||||
)
|
||||
from core.app_assets.storage import AppAssetStorage, AssetPath, BundleImportZipPath
|
||||
from core.app_assets.storage import AssetPaths
|
||||
from core.zip_sandbox import SandboxDownloadItem, SandboxUploadItem, ZipSandbox
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from extensions.storage.cached_presign_storage import CachedPresignStorage
|
||||
from models.account import Account
|
||||
from models.model import App
|
||||
|
||||
@ -108,9 +109,9 @@ class AppBundleService:
|
||||
manifest = BundleManifest.from_tree(app_assets.asset_tree, dsl_filename)
|
||||
|
||||
export_id = uuid4().hex
|
||||
export_path = AssetPath.bundle_export_zip(tenant_id, app_id, export_id)
|
||||
export_key = AssetPaths.bundle_export(tenant_id, app_id, export_id)
|
||||
asset_storage = AppAssetService.get_storage()
|
||||
upload_url = asset_storage.get_upload_url(export_path, expires_in)
|
||||
upload_url = asset_storage.get_upload_url(export_key, expires_in)
|
||||
|
||||
dsl_content = AppDslService.export_dsl(
|
||||
app_model=app_model,
|
||||
@ -123,15 +124,15 @@ class AppBundleService:
|
||||
zs.write_file(f"bundle_root/{MANIFEST_FILENAME}", manifest.model_dump_json(indent=2).encode("utf-8"))
|
||||
|
||||
if workflow_id is not None:
|
||||
source_zip_path = AssetPath.source_zip(tenant_id, app_id, workflow_id)
|
||||
source_url = asset_storage.get_download_url(source_zip_path, expires_in)
|
||||
source_key = AssetPaths.source_zip(tenant_id, app_id, workflow_id)
|
||||
source_url = asset_storage.get_download_url(source_key, expires_in)
|
||||
zs.download_archive(source_url, path="tmp/source_assets.zip")
|
||||
zs.unzip(archive_path="tmp/source_assets.zip", dest_dir=f"bundle_root/{safe_name}")
|
||||
else:
|
||||
asset_items = AppAssetService.get_draft_assets(tenant_id, app_id)
|
||||
if asset_items:
|
||||
asset_urls = asset_storage.get_download_urls(
|
||||
[AssetPath.draft(tenant_id, app_id, a.asset_id) for a in asset_items], expires_in
|
||||
[AssetPaths.draft(tenant_id, app_id, a.asset_id) for a in asset_items], expires_in
|
||||
)
|
||||
zs.download_items(
|
||||
[
|
||||
@ -144,7 +145,7 @@ class AppBundleService:
|
||||
archive = zs.zip(src="bundle_root", include_base=False)
|
||||
zs.upload(archive, upload_url)
|
||||
|
||||
download_url = asset_storage.get_download_url(export_path, expires_in)
|
||||
download_url = asset_storage.get_download_url(export_key, expires_in)
|
||||
return BundleExportResult(download_url=download_url, filename=f"{safe_name}.zip")
|
||||
|
||||
# ========== Import ==========
|
||||
@ -153,9 +154,9 @@ class AppBundleService:
|
||||
def prepare_import(tenant_id: str, account_id: str) -> ImportPrepareResult:
|
||||
"""Prepare import: generate import_id and upload URL."""
|
||||
import_id = uuid4().hex
|
||||
import_path = AssetPath.bundle_import_zip(tenant_id, import_id)
|
||||
import_key = AssetPaths.bundle_import(tenant_id, import_id)
|
||||
asset_storage = AppAssetService.get_storage()
|
||||
upload_url = asset_storage.get_import_upload_url(import_path, _IMPORT_TTL_SECONDS)
|
||||
upload_url = asset_storage.get_upload_url(import_key, _IMPORT_TTL_SECONDS)
|
||||
|
||||
redis_client.setex(
|
||||
f"{_IMPORT_REDIS_PREFIX}{import_id}",
|
||||
@ -188,14 +189,14 @@ class AppBundleService:
|
||||
if tenant_id != account.current_tenant_id:
|
||||
raise BundleFormatError("Import session tenant mismatch")
|
||||
|
||||
import_path = AssetPath.bundle_import_zip(tenant_id, import_id)
|
||||
import_key = AssetPaths.bundle_import(tenant_id, import_id)
|
||||
asset_storage = AppAssetService.get_storage()
|
||||
|
||||
try:
|
||||
result = AppBundleService.import_bundle(
|
||||
tenant_id=tenant_id,
|
||||
account=account,
|
||||
import_path=import_path,
|
||||
import_key=import_key,
|
||||
asset_storage=asset_storage,
|
||||
name=name,
|
||||
description=description,
|
||||
@ -205,7 +206,10 @@ class AppBundleService:
|
||||
)
|
||||
finally:
|
||||
redis_client.delete(redis_key)
|
||||
asset_storage.delete_import_zip(import_path)
|
||||
try:
|
||||
asset_storage.delete(import_key)
|
||||
except Exception: # noqa: S110
|
||||
pass
|
||||
|
||||
return result
|
||||
|
||||
@ -214,8 +218,8 @@ class AppBundleService:
|
||||
*,
|
||||
tenant_id: str,
|
||||
account: Account,
|
||||
import_path: BundleImportZipPath,
|
||||
asset_storage: AppAssetStorage,
|
||||
import_key: str,
|
||||
asset_storage: CachedPresignStorage,
|
||||
name: str | None,
|
||||
description: str | None,
|
||||
icon_type: str | None,
|
||||
@ -223,7 +227,7 @@ class AppBundleService:
|
||||
icon_background: str | None,
|
||||
) -> Import:
|
||||
"""Execute import in sandbox."""
|
||||
download_url = asset_storage.get_import_download_url(import_path, _IMPORT_TTL_SECONDS)
|
||||
download_url = asset_storage.get_download_url(import_key, _IMPORT_TTL_SECONDS)
|
||||
|
||||
with ZipSandbox(tenant_id=tenant_id, user_id=account.id, app_id="app-bundle-import") as zs:
|
||||
zs.download_archive(download_url, path="import.zip")
|
||||
@ -260,8 +264,8 @@ class AppBundleService:
|
||||
|
||||
upload_items: list[SandboxUploadItem] = []
|
||||
for file_entry in manifest.files:
|
||||
asset_path = AssetPath.draft(tenant_id, app_id, file_entry.node_id)
|
||||
file_upload_url = asset_storage.get_upload_url(asset_path, _IMPORT_TTL_SECONDS)
|
||||
key = AssetPaths.draft(tenant_id, app_id, file_entry.node_id)
|
||||
file_upload_url = asset_storage.get_upload_url(key, _IMPORT_TTL_SECONDS)
|
||||
src_path = f"{manifest.assets_prefix}/{file_entry.path}"
|
||||
upload_items.append(SandboxUploadItem(path=src_path, url=file_upload_url))
|
||||
|
||||
|
||||
@ -2,9 +2,24 @@ from __future__ import annotations
|
||||
|
||||
from core.sandbox.entities.files import SandboxFileDownloadTicket, SandboxFileNode
|
||||
from core.sandbox.inspector import SandboxFileBrowser
|
||||
from extensions.ext_storage import storage
|
||||
from extensions.storage.cached_presign_storage import CachedPresignStorage
|
||||
from extensions.storage.file_presign_storage import FilePresignStorage
|
||||
|
||||
|
||||
class SandboxFileService:
|
||||
@staticmethod
|
||||
def get_storage() -> CachedPresignStorage:
|
||||
"""Get a lazily-initialized storage instance for sandbox files.
|
||||
|
||||
Returns a CachedPresignStorage wrapping FilePresignStorage,
|
||||
providing presign fallback and URL caching.
|
||||
"""
|
||||
return CachedPresignStorage(
|
||||
storage=FilePresignStorage(storage.storage_runner),
|
||||
cache_key_prefix="sandbox_files",
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def list_files(
|
||||
cls,
|
||||
|
||||
Reference in New Issue
Block a user