mirror of
https://github.com/langgenius/dify.git
synced 2026-04-30 15:38:08 +08:00
feat(app_assets): enhance asset management with CachedPresignStorage
- Introduced CachedPresignStorage to cache presigned download URLs, reducing repeated API calls. - Updated AppAssetService to utilize CachedPresignStorage for improved performance in asset download URL generation. - Refactored asset builders and packagers to support the new storage mechanism. - Removed unused AppAssetsAttrsInitializer to streamline initialization processes. - Added unit tests for CachedPresignStorage to ensure functionality and reliability.
This commit is contained in:
@ -1,6 +1,4 @@
|
||||
from core.app.entities.app_asset_entities import AppAssetFileTree
|
||||
from core.app_assets.builder.file_builder import FileBuilder
|
||||
from core.app_assets.builder.skill_builder import SkillBuilder
|
||||
from core.app_assets.entities import AssetItem
|
||||
|
||||
from .base import AssetBuilder, BuildContext
|
||||
@ -9,8 +7,8 @@ from .base import AssetBuilder, BuildContext
|
||||
class AssetBuildPipeline:
|
||||
_builders: list[AssetBuilder]
|
||||
|
||||
def __init__(self, builders: list[AssetBuilder] | None = None) -> None:
|
||||
self._builders = builders or [SkillBuilder(), FileBuilder()]
|
||||
def __init__(self, builders: list[AssetBuilder]) -> None:
|
||||
self._builders = builders
|
||||
|
||||
def build_all(self, tree: AppAssetFileTree, ctx: BuildContext) -> list[AssetItem]:
|
||||
# 1. Distribute: each node goes to first accepting builder
|
||||
|
||||
@ -8,7 +8,7 @@ from core.app_assets.paths import AssetPaths
|
||||
from core.skill.entities.skill_document import SkillDocument
|
||||
from core.skill.skill_compiler import SkillCompiler
|
||||
from core.skill.skill_manager import SkillManager
|
||||
from extensions.ext_storage import storage
|
||||
from extensions.storage.base_storage import BaseStorage
|
||||
|
||||
from .base import BuildContext
|
||||
|
||||
@ -32,10 +32,12 @@ class _CompiledSkill:
|
||||
class SkillBuilder:
|
||||
_nodes: list[tuple[AppAssetNode, str]]
|
||||
_max_workers: int
|
||||
_storage: BaseStorage
|
||||
|
||||
def __init__(self, max_workers: int = 8) -> None:
|
||||
def __init__(self, storage: BaseStorage, max_workers: int = 8) -> None:
|
||||
self._nodes = []
|
||||
self._max_workers = max_workers
|
||||
self._storage = storage
|
||||
|
||||
def accept(self, node: AppAssetNode) -> bool:
|
||||
return node.extension == "md"
|
||||
@ -91,7 +93,7 @@ class SkillBuilder:
|
||||
def load_one(node: AppAssetNode, path: str) -> _LoadedSkill:
|
||||
draft_key = AssetPaths.draft_file(ctx.tenant_id, ctx.app_id, node.id)
|
||||
try:
|
||||
data = json.loads(storage.load_once(draft_key))
|
||||
data = json.loads(self._storage.load_once(draft_key))
|
||||
content = data.get("content", "") if isinstance(data, dict) else ""
|
||||
metadata = data.get("metadata", {}) if isinstance(data, dict) else {}
|
||||
except Exception:
|
||||
@ -105,7 +107,7 @@ class SkillBuilder:
|
||||
|
||||
def _upload_all(self, skills: list[_CompiledSkill]) -> None:
|
||||
def upload_one(skill: _CompiledSkill) -> None:
|
||||
storage.save(skill.resolved_key, skill.content_bytes)
|
||||
self._storage.save(skill.resolved_key, skill.content_bytes)
|
||||
|
||||
with ThreadPoolExecutor(max_workers=self._max_workers) as executor:
|
||||
futures = [executor.submit(upload_one, skill) for skill in skills]
|
||||
|
||||
@ -5,3 +5,4 @@ from libs.attr_map import AttrKey
|
||||
class AppAssetsAttrs:
|
||||
# Skill artifact set
|
||||
FILE_TREE = AttrKey("file_tree", AppAssetFileTree)
|
||||
APP_ASSETS_ID = AttrKey("app_assets_id", str)
|
||||
|
||||
@ -2,6 +2,7 @@ from __future__ import annotations
|
||||
|
||||
from core.app.entities.app_asset_entities import AppAssetFileTree, AssetNodeType
|
||||
from core.app_assets.entities import FileAsset
|
||||
from core.app_assets.entities.assets import AssetItem
|
||||
from core.app_assets.paths import AssetPaths
|
||||
|
||||
|
||||
@ -9,7 +10,7 @@ def tree_to_asset_items(
|
||||
tree: AppAssetFileTree,
|
||||
tenant_id: str,
|
||||
app_id: str,
|
||||
) -> list[FileAsset]:
|
||||
) -> list[AssetItem]:
|
||||
"""
|
||||
Convert AppAssetFileTree to list of FileAsset for packaging.
|
||||
|
||||
@ -21,7 +22,7 @@ def tree_to_asset_items(
|
||||
Returns:
|
||||
List of FileAsset items ready for packaging
|
||||
"""
|
||||
items: list[FileAsset] = []
|
||||
items: list[AssetItem] = []
|
||||
for node in tree.nodes:
|
||||
if node.node_type == AssetNodeType.FILE:
|
||||
path = tree.get_path(node.id)
|
||||
|
||||
@ -4,12 +4,9 @@ import io
|
||||
import zipfile
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from threading import Lock
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from core.app_assets.entities import AssetItem
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from extensions.ext_storage import Storage
|
||||
from extensions.storage.base_storage import BaseStorage
|
||||
|
||||
|
||||
class AssetZipPackager:
|
||||
@ -18,7 +15,7 @@ class AssetZipPackager:
|
||||
Automatically creates directory entries from asset paths.
|
||||
"""
|
||||
|
||||
def __init__(self, storage: Storage, *, max_workers: int = 8) -> None:
|
||||
def __init__(self, storage: BaseStorage, *, max_workers: int = 8) -> None:
|
||||
self._storage = storage
|
||||
self._max_workers = max_workers
|
||||
|
||||
|
||||
@ -1,16 +0,0 @@
|
||||
from core.app_assets.constants import AppAssetsAttrs
|
||||
from core.sandbox.initializer.base import SyncSandboxInitializer
|
||||
from core.sandbox.sandbox import Sandbox
|
||||
from services.app_asset_service import AppAssetService
|
||||
|
||||
|
||||
class AppAssetsAttrsInitializer(SyncSandboxInitializer):
|
||||
def __init__(self, tenant_id: str, app_id: str, assets_id: str) -> None:
|
||||
self._tenant_id = tenant_id
|
||||
self._app_id = app_id
|
||||
self._assets_id = assets_id
|
||||
|
||||
def initialize(self, sandbox: Sandbox) -> None:
|
||||
# Load published app assets and unzip the artifact bundle.
|
||||
app_assets = AppAssetService.get_tenant_app_assets(self._tenant_id, self._assets_id)
|
||||
sandbox.attrs.set(AppAssetsAttrs.FILE_TREE, app_assets.asset_tree)
|
||||
@ -1,10 +1,12 @@
|
||||
import logging
|
||||
|
||||
from core.app_assets.constants import AppAssetsAttrs
|
||||
from core.app_assets.paths import AssetPaths
|
||||
from core.sandbox.sandbox import Sandbox
|
||||
from core.virtual_environment.__base.helpers import pipeline
|
||||
from extensions.ext_storage import storage
|
||||
from extensions.storage.file_presign_storage import FilePresignStorage
|
||||
from services.app_asset_service import AppAssetService
|
||||
|
||||
from ..entities import AppAssets
|
||||
from .base import AsyncSandboxInitializer
|
||||
@ -21,6 +23,10 @@ class AppAssetsInitializer(AsyncSandboxInitializer):
|
||||
self._assets_id = assets_id
|
||||
|
||||
def initialize(self, sandbox: Sandbox) -> None:
|
||||
# Load published app assets and unzip the artifact bundle.
|
||||
app_assets = AppAssetService.get_tenant_app_assets(self._tenant_id, self._assets_id)
|
||||
sandbox.attrs.set(AppAssetsAttrs.FILE_TREE, app_assets.asset_tree)
|
||||
sandbox.attrs.set(AppAssetsAttrs.APP_ASSETS_ID, self._assets_id)
|
||||
vm = sandbox.vm
|
||||
zip_key = AssetPaths.build_zip(self._tenant_id, self._app_id, self._assets_id)
|
||||
download_url = FilePresignStorage(storage.storage_runner).get_download_url(zip_key)
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
import logging
|
||||
|
||||
from core.app_assets.constants import AppAssetsAttrs
|
||||
from core.app_assets.paths import AssetPaths
|
||||
from core.sandbox.entities import AppAssets
|
||||
from core.sandbox.sandbox import Sandbox
|
||||
from core.sandbox.services import AssetDownloadService
|
||||
@ -12,6 +14,7 @@ from .base import AsyncSandboxInitializer
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DRAFT_ASSETS_DOWNLOAD_TIMEOUT = 60 * 10
|
||||
DRAFT_ASSETS_EXPIRES_IN = 60 * 10
|
||||
|
||||
|
||||
class DraftAppAssetsInitializer(AsyncSandboxInitializer):
|
||||
@ -21,15 +24,27 @@ class DraftAppAssetsInitializer(AsyncSandboxInitializer):
|
||||
self._assets_id = assets_id
|
||||
|
||||
def initialize(self, sandbox: Sandbox) -> None:
|
||||
vm = sandbox.vm
|
||||
# Draft assets download via presigned URLs to avoid zip build overhead.
|
||||
# FIXME(Yeuoly): merge 2 IO operations in DraftAppAssetsInitializer and AppAssetsAttrsInitializer
|
||||
# Load published app assets and unzip the artifact bundle.
|
||||
app_assets = AppAssetService.get_tenant_app_assets(self._tenant_id, self._assets_id)
|
||||
sandbox.attrs.set(AppAssetsAttrs.FILE_TREE, app_assets.asset_tree)
|
||||
sandbox.attrs.set(AppAssetsAttrs.APP_ASSETS_ID, self._assets_id)
|
||||
|
||||
items = [
|
||||
AssetDownloadItem(path=path, url=url)
|
||||
for path, url in AppAssetService.get_cached_draft_download_urls(app_assets)
|
||||
vm = sandbox.vm
|
||||
build_id = self._assets_id
|
||||
tree = app_assets.asset_tree
|
||||
storage = AppAssetService.assets_storage()
|
||||
nodes = list(tree.walk_files())
|
||||
if not nodes:
|
||||
return
|
||||
# FIXME(Mairuis): should be more graceful
|
||||
storage_keys = [
|
||||
AssetPaths.build_resolved_file(self._tenant_id, self._app_id, build_id, node.id)
|
||||
if node.extension == "md"
|
||||
else AssetPaths.draft_file(self._tenant_id, self._app_id, node.id)
|
||||
for node in nodes
|
||||
]
|
||||
urls = storage.get_download_urls(storage_keys, DRAFT_ASSETS_EXPIRES_IN)
|
||||
items = [AssetDownloadItem(path=tree.get_path(node.id).lstrip("/"), url=url) for node, url in zip(nodes, urls)]
|
||||
script = AssetDownloadService.build_download_script(items, AppAssets.PATH)
|
||||
pipeline(vm).add(
|
||||
["sh", "-lc", script],
|
||||
|
||||
@ -7,7 +7,6 @@ from typing import Final
|
||||
from core.sandbox.builder import SandboxBuilder
|
||||
from core.sandbox.entities import AppAssets, SandboxType
|
||||
from core.sandbox.entities.providers import SandboxProviderEntity
|
||||
from core.sandbox.initializer.app_assets_attrs_loader import AppAssetsAttrsInitializer
|
||||
from core.sandbox.initializer.app_assets_initializer import AppAssetsInitializer
|
||||
from core.sandbox.initializer.dify_cli_initializer import DifyCliInitializer
|
||||
from core.sandbox.initializer.draft_app_assets_initializer import DraftAppAssetsInitializer
|
||||
@ -124,7 +123,6 @@ class SandboxManager:
|
||||
.options(sandbox_provider.config)
|
||||
.user(user_id)
|
||||
.app(app_id)
|
||||
.initializer(AppAssetsAttrsInitializer(tenant_id, app_id, assets.id))
|
||||
.initializer(AppAssetsInitializer(tenant_id, app_id, assets.id))
|
||||
.initializer(DifyCliInitializer(tenant_id, user_id, app_id, assets.id))
|
||||
.initializer(SkillInitializer(tenant_id, user_id, app_id, assets.id))
|
||||
@ -161,7 +159,6 @@ class SandboxManager:
|
||||
.options(sandbox_provider.config)
|
||||
.user(user_id)
|
||||
.app(app_id)
|
||||
.initializer(AppAssetsAttrsInitializer(tenant_id, app_id, assets.id))
|
||||
.initializer(DraftAppAssetsInitializer(tenant_id, app_id, assets.id))
|
||||
.initializer(DifyCliInitializer(tenant_id, user_id, app_id, assets.id))
|
||||
.initializer(SkillInitializer(tenant_id, user_id, app_id, assets.id))
|
||||
@ -193,7 +190,6 @@ class SandboxManager:
|
||||
.options(sandbox_provider.config)
|
||||
.user(user_id)
|
||||
.app(app_id)
|
||||
.initializer(AppAssetsAttrsInitializer(tenant_id, app_id, assets.id))
|
||||
.initializer(AppAssetsInitializer(tenant_id, app_id, assets.id))
|
||||
.initializer(DifyCliInitializer(tenant_id, user_id, app_id, assets.id))
|
||||
.initializer(SkillInitializer(tenant_id, user_id, app_id, assets.id))
|
||||
|
||||
@ -7,14 +7,14 @@ from dataclasses import dataclass
|
||||
|
||||
def _render_download_script(root_path: str, download_commands: str) -> str:
|
||||
python_download_cmd = (
|
||||
"python3 - \"${url}\" \"${dest}\" <<\"PY\"\n"
|
||||
'python3 - "${url}" "${dest}" <<"PY"\n'
|
||||
"import sys\n"
|
||||
"import urllib.request\n"
|
||||
"url = sys.argv[1]\n"
|
||||
"dest = sys.argv[2]\n"
|
||||
"with urllib.request.urlopen(url) as resp:\n"
|
||||
" data = resp.read()\n"
|
||||
"with open(dest, \"wb\") as f:\n"
|
||||
'with open(dest, "wb") as f:\n'
|
||||
" f.write(data)\n"
|
||||
"PY"
|
||||
)
|
||||
|
||||
172
api/extensions/storage/cached_presign_storage.py
Normal file
172
api/extensions/storage/cached_presign_storage.py
Normal file
@ -0,0 +1,172 @@
|
||||
"""Storage wrapper that caches presigned download URLs."""
|
||||
|
||||
import logging
|
||||
from collections.abc import Generator
|
||||
from typing import Any
|
||||
|
||||
from extensions.storage.base_storage import BaseStorage
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CachedPresignStorage(BaseStorage):
|
||||
"""Storage wrapper that caches presigned download URLs.
|
||||
|
||||
Wraps a storage with presign capability and caches the generated URLs
|
||||
in Redis to reduce repeated presign API calls.
|
||||
|
||||
Example:
|
||||
cached_storage = CachedPresignStorage(
|
||||
storage=FilePresignStorage(base_storage),
|
||||
redis_client=redis_client,
|
||||
cache_key_prefix="app_asset:draft_download",
|
||||
)
|
||||
url = cached_storage.get_download_url("path/to/file.txt", expires_in=3600)
|
||||
"""
|
||||
|
||||
TTL_BUFFER_SECONDS = 60
|
||||
MIN_TTL_SECONDS = 60
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
storage: BaseStorage,
|
||||
redis_client: Any,
|
||||
cache_key_prefix: str = "presign_cache",
|
||||
):
|
||||
super().__init__()
|
||||
self._storage = storage
|
||||
self._redis = redis_client
|
||||
self._cache_key_prefix = cache_key_prefix
|
||||
|
||||
def save(self, filename: str, data: bytes):
|
||||
self._storage.save(filename, data)
|
||||
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
return self._storage.load_once(filename)
|
||||
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
return self._storage.load_stream(filename)
|
||||
|
||||
def download(self, filename: str, target_filepath: str):
|
||||
self._storage.download(filename, target_filepath)
|
||||
|
||||
def exists(self, filename: str) -> bool:
|
||||
return self._storage.exists(filename)
|
||||
|
||||
def delete(self, filename: str):
|
||||
self._storage.delete(filename)
|
||||
self.invalidate([filename])
|
||||
|
||||
def scan(self, path: str, files: bool = True, directories: bool = False) -> list[str]:
|
||||
return self._storage.scan(path, files=files, directories=directories)
|
||||
|
||||
def get_upload_url(self, filename: str, expires_in: int = 3600) -> str:
|
||||
return self._storage.get_upload_url(filename, expires_in)
|
||||
|
||||
def get_download_url(self, filename: str, expires_in: int = 3600) -> str:
|
||||
"""Get a presigned download URL, using cache when available.
|
||||
|
||||
Args:
|
||||
filename: The file path/key in storage
|
||||
expires_in: URL validity duration in seconds (default: 1 hour)
|
||||
|
||||
Returns:
|
||||
Presigned URL string
|
||||
"""
|
||||
cache_key = self._cache_key(filename)
|
||||
|
||||
cached = self._get_cached(cache_key)
|
||||
if cached:
|
||||
return cached
|
||||
|
||||
url = self._storage.get_download_url(filename, expires_in)
|
||||
self._set_cached(cache_key, url, expires_in)
|
||||
|
||||
return url
|
||||
|
||||
def get_download_urls(
|
||||
self,
|
||||
filenames: list[str],
|
||||
expires_in: int = 3600,
|
||||
) -> list[str]:
|
||||
"""Batch get download URLs with cache.
|
||||
|
||||
Args:
|
||||
filenames: List of file paths/keys in storage
|
||||
expires_in: URL validity duration in seconds (default: 1 hour)
|
||||
|
||||
Returns:
|
||||
List of presigned URLs in the same order as filenames
|
||||
"""
|
||||
if not filenames:
|
||||
return []
|
||||
|
||||
cache_keys = [self._cache_key(f) for f in filenames]
|
||||
cached_values = self._get_cached_batch(cache_keys)
|
||||
|
||||
results: list[str] = []
|
||||
for filename, cache_key, cached in zip(filenames, cache_keys, cached_values):
|
||||
if cached:
|
||||
results.append(cached)
|
||||
else:
|
||||
url = self._storage.get_download_url(filename, expires_in)
|
||||
self._set_cached(cache_key, url, expires_in)
|
||||
results.append(url)
|
||||
|
||||
return results
|
||||
|
||||
def invalidate(self, filenames: list[str]) -> None:
|
||||
"""Invalidate cached URLs for given filenames.
|
||||
|
||||
Args:
|
||||
filenames: List of file paths/keys to invalidate
|
||||
"""
|
||||
if not filenames:
|
||||
return
|
||||
|
||||
cache_keys = [self._cache_key(f) for f in filenames]
|
||||
try:
|
||||
self._redis.delete(*cache_keys)
|
||||
except Exception:
|
||||
logger.warning("Failed to invalidate presign cache", exc_info=True)
|
||||
|
||||
def _cache_key(self, filename: str) -> str:
|
||||
"""Generate cache key for a filename."""
|
||||
return f"{self._cache_key_prefix}:{filename}"
|
||||
|
||||
def _compute_ttl(self, expires_in: int) -> int:
|
||||
"""Compute cache TTL from presign expiration.
|
||||
|
||||
Returns TTL slightly shorter than presign expiry to ensure
|
||||
cached URLs are refreshed before they expire.
|
||||
"""
|
||||
return max(expires_in - self.TTL_BUFFER_SECONDS, self.MIN_TTL_SECONDS)
|
||||
|
||||
def _get_cached(self, cache_key: str) -> str | None:
|
||||
"""Get a single cached URL."""
|
||||
try:
|
||||
values = self._redis.mget([cache_key])
|
||||
cached = values[0] if values else None
|
||||
if cached:
|
||||
return cached.decode("utf-8") if isinstance(cached, (bytes, bytearray)) else cached
|
||||
return None
|
||||
except Exception:
|
||||
logger.warning("Failed to read presign cache", exc_info=True)
|
||||
return None
|
||||
|
||||
def _get_cached_batch(self, cache_keys: list[str]) -> list[str | None]:
|
||||
"""Get multiple cached URLs."""
|
||||
try:
|
||||
cached_values = self._redis.mget(cache_keys)
|
||||
return [v.decode("utf-8") if isinstance(v, (bytes, bytearray)) else v for v in cached_values]
|
||||
except Exception:
|
||||
logger.warning("Failed to read presign cache batch", exc_info=True)
|
||||
return [None] * len(cache_keys)
|
||||
|
||||
def _set_cached(self, cache_key: str, url: str, expires_in: int) -> None:
|
||||
"""Store a URL in cache with computed TTL."""
|
||||
ttl = self._compute_ttl(expires_in)
|
||||
try:
|
||||
self._redis.setex(cache_key, ttl, url)
|
||||
except Exception:
|
||||
logger.warning("Failed to write presign cache", exc_info=True)
|
||||
@ -77,8 +77,7 @@ class AttrMapTypeError(TypeError):
|
||||
self.expected_type = expected_type
|
||||
self.actual_type = actual_type
|
||||
super().__init__(
|
||||
f"Attribute '{key.name}' expects type '{expected_type.__name__}', "
|
||||
f"got '{actual_type.__name__}'"
|
||||
f"Attribute '{key.name}' expects type '{expected_type.__name__}', got '{actual_type.__name__}'"
|
||||
)
|
||||
|
||||
|
||||
|
||||
@ -13,12 +13,14 @@ from core.app.entities.app_asset_entities import (
|
||||
TreePathConflictError,
|
||||
)
|
||||
from core.app_assets.builder import AssetBuildPipeline, BuildContext
|
||||
from core.app_assets.builder.file_builder import FileBuilder
|
||||
from core.app_assets.builder.skill_builder import SkillBuilder
|
||||
from core.app_assets.converters import tree_to_asset_items
|
||||
from core.app_assets.packager import AssetZipPackager
|
||||
from core.app_assets.paths import AssetPaths
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from extensions.ext_storage import storage
|
||||
from extensions.storage.cached_presign_storage import CachedPresignStorage
|
||||
from extensions.storage.file_presign_storage import FilePresignStorage
|
||||
from models.app_asset import AppAssets
|
||||
from models.model import App
|
||||
@ -35,84 +37,22 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
class AppAssetService:
|
||||
MAX_PREVIEW_CONTENT_SIZE = 5 * 1024 * 1024 # 5MB
|
||||
_PRESIGN_CACHE_TTL_BUFFER_SECONDS = 300
|
||||
_PRESIGN_CACHE_MIN_TTL_SECONDS = 60
|
||||
_LOCK_TIMEOUT_SECONDS = 60
|
||||
_DRAFT_CACHE_KEY_PREFIX = "app_asset:draft_download"
|
||||
|
||||
@staticmethod
|
||||
def _lock(app_id: str):
|
||||
return redis_client.lock(f"app_asset:lock:{app_id}", timeout=AppAssetService._LOCK_TIMEOUT_SECONDS)
|
||||
|
||||
@staticmethod
|
||||
def _draft_download_cache_key(storage_key: str) -> str:
|
||||
# Cache key for a single draft asset download URL.
|
||||
return f"app_asset:draft_download:{storage_key}"
|
||||
def assets_storage() -> CachedPresignStorage:
|
||||
from extensions.ext_storage import storage
|
||||
|
||||
@staticmethod
|
||||
def _get_cached_download_urls(cache_keys: list[str]) -> list[str | None] | None:
|
||||
# Return cached draft download URLs per asset if available.
|
||||
try:
|
||||
cached = redis_client.mget(cache_keys)
|
||||
except Exception:
|
||||
logger.warning("Failed to read draft download cache", exc_info=True)
|
||||
return None
|
||||
|
||||
return cached
|
||||
|
||||
@staticmethod
|
||||
def _set_cached_download_url(cache_key: str, url: str, expires_in: int) -> None:
|
||||
# Store draft download URL with TTL slightly shorter than presign expiry.
|
||||
ttl = max(
|
||||
expires_in - AppAssetService._PRESIGN_CACHE_TTL_BUFFER_SECONDS,
|
||||
AppAssetService._PRESIGN_CACHE_MIN_TTL_SECONDS,
|
||||
return CachedPresignStorage(
|
||||
storage=FilePresignStorage(storage.storage_runner),
|
||||
redis_client=redis_client,
|
||||
cache_key_prefix=AppAssetService._DRAFT_CACHE_KEY_PREFIX,
|
||||
)
|
||||
try:
|
||||
redis_client.setex(cache_key, ttl, url)
|
||||
except Exception:
|
||||
logger.warning("Failed to write draft download cache", exc_info=True)
|
||||
|
||||
@staticmethod
|
||||
def _clear_draft_download_cache(storage_keys: list[str]) -> None:
|
||||
# Clear draft download URL cache for specific assets.
|
||||
if not storage_keys:
|
||||
return
|
||||
cache_keys = [AppAssetService._draft_download_cache_key(key) for key in storage_keys]
|
||||
try:
|
||||
redis_client.delete(*cache_keys)
|
||||
except Exception:
|
||||
logger.warning("Failed to clear draft download cache", exc_info=True)
|
||||
|
||||
@staticmethod
|
||||
def get_cached_draft_download_urls(app_assets: AppAssets, *, expires_in: int = 3600) -> list[tuple[str, str]]:
|
||||
# Build draft download URLs with cache to avoid repeated presign calls.
|
||||
tree = app_assets.asset_tree
|
||||
build_id = app_assets.id
|
||||
presign_storage = FilePresignStorage(storage.storage_runner)
|
||||
nodes = list(tree.walk_files())
|
||||
if not nodes:
|
||||
return []
|
||||
storage_keys = [
|
||||
AssetPaths.build_resolved_file(app_assets.tenant_id, app_assets.app_id, build_id, node.id)
|
||||
if node.extension == "md"
|
||||
else AssetPaths.draft_file(app_assets.tenant_id, app_assets.app_id, node.id)
|
||||
for node in nodes
|
||||
]
|
||||
cache_keys = [AppAssetService._draft_download_cache_key(key) for key in storage_keys]
|
||||
cached_values = AppAssetService._get_cached_download_urls(cache_keys)
|
||||
if cached_values is None:
|
||||
cached_values = [None] * len(nodes)
|
||||
|
||||
items: list[tuple[str, str]] = []
|
||||
for node, storage_key, cache_key, cached in zip(nodes, storage_keys, cache_keys, cached_values):
|
||||
path = tree.get_path(node.id)
|
||||
if cached:
|
||||
url = cached.decode("utf-8") if isinstance(cached, (bytes, bytearray)) else cached
|
||||
else:
|
||||
url = presign_storage.get_download_url(storage_key, expires_in)
|
||||
AppAssetService._set_cached_download_url(cache_key, url, expires_in)
|
||||
items.append((path, url))
|
||||
|
||||
return items
|
||||
|
||||
@staticmethod
|
||||
def _draft_storage_key_for_node(tenant_id: str, app_id: str, assets_id: str, node: AppAssetNode) -> str:
|
||||
@ -238,9 +178,8 @@ class AppAssetService:
|
||||
raise AppAssetNodeTooLargeError(f"File node {node_id} size exceeded the limit: {max_size_mb} MB")
|
||||
|
||||
storage_key = AssetPaths.draft_file(app_model.tenant_id, app_model.id, node_id)
|
||||
return storage.load_once(storage_key)
|
||||
return AppAssetService.assets_storage().load_once(storage_key)
|
||||
|
||||
# FIXME(Mairuis): migrate to presigned upload API
|
||||
@staticmethod
|
||||
def update_file_content(
|
||||
app_model: App,
|
||||
@ -259,20 +198,12 @@ class AppAssetService:
|
||||
raise AppAssetNodeNotFoundError(str(e)) from e
|
||||
|
||||
storage_key = AssetPaths.draft_file(app_model.tenant_id, app_model.id, node_id)
|
||||
storage.save(storage_key, content)
|
||||
AppAssetService.assets_storage().save(storage_key, content)
|
||||
|
||||
assets.asset_tree = tree
|
||||
assets.updated_by = account_id
|
||||
session.commit()
|
||||
|
||||
cache_key = AppAssetService._draft_storage_key_for_node(
|
||||
app_model.tenant_id,
|
||||
app_model.id,
|
||||
assets.id,
|
||||
node,
|
||||
)
|
||||
AppAssetService._clear_draft_download_cache([cache_key])
|
||||
|
||||
return node
|
||||
|
||||
@staticmethod
|
||||
@ -286,10 +217,6 @@ class AppAssetService:
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
assets = AppAssetService.get_or_create_assets(session, app_model, account_id)
|
||||
tree = assets.asset_tree
|
||||
|
||||
old_node = tree.get(node_id)
|
||||
old_extension = old_node.extension if old_node else None
|
||||
|
||||
try:
|
||||
node = tree.rename(node_id, new_name)
|
||||
except TreeNodeNotFoundError as e:
|
||||
@ -300,26 +227,6 @@ class AppAssetService:
|
||||
assets.asset_tree = tree
|
||||
assets.updated_by = account_id
|
||||
session.commit()
|
||||
|
||||
if node.node_type == AssetNodeType.FILE:
|
||||
cache_keys: list[str] = []
|
||||
if old_extension is not None:
|
||||
old_storage_key = (
|
||||
AssetPaths.build_resolved_file(app_model.tenant_id, app_model.id, assets.id, node.id)
|
||||
if old_extension == "md"
|
||||
else AssetPaths.draft_file(app_model.tenant_id, app_model.id, node.id)
|
||||
)
|
||||
cache_keys.append(old_storage_key)
|
||||
cache_keys.append(
|
||||
AppAssetService._draft_storage_key_for_node(
|
||||
app_model.tenant_id,
|
||||
app_model.id,
|
||||
assets.id,
|
||||
node,
|
||||
)
|
||||
)
|
||||
AppAssetService._clear_draft_download_cache(list(set(cache_keys)))
|
||||
|
||||
return node
|
||||
|
||||
@staticmethod
|
||||
@ -347,15 +254,6 @@ class AppAssetService:
|
||||
assets.updated_by = account_id
|
||||
session.commit()
|
||||
|
||||
if node.node_type == AssetNodeType.FILE:
|
||||
cache_key = AppAssetService._draft_storage_key_for_node(
|
||||
app_model.tenant_id,
|
||||
app_model.id,
|
||||
assets.id,
|
||||
node,
|
||||
)
|
||||
AppAssetService._clear_draft_download_cache([cache_key])
|
||||
|
||||
return node
|
||||
|
||||
@staticmethod
|
||||
@ -388,14 +286,6 @@ class AppAssetService:
|
||||
assets = AppAssetService.get_or_create_assets(session, app_model, account_id)
|
||||
tree = assets.asset_tree
|
||||
|
||||
target_ids = [node_id] + tree.get_descendant_ids(node_id)
|
||||
target_nodes = [tree.get(nid) for nid in target_ids]
|
||||
cache_keys = [
|
||||
AppAssetService._draft_storage_key_for_node(app_model.tenant_id, app_model.id, assets.id, node)
|
||||
for node in target_nodes
|
||||
if node is not None and node.node_type == AssetNodeType.FILE
|
||||
]
|
||||
|
||||
try:
|
||||
removed_ids = tree.remove(node_id)
|
||||
except TreeNodeNotFoundError as e:
|
||||
@ -404,7 +294,7 @@ class AppAssetService:
|
||||
for nid in removed_ids:
|
||||
storage_key = AssetPaths.draft_file(app_model.tenant_id, app_model.id, nid)
|
||||
try:
|
||||
storage.delete(storage_key)
|
||||
AppAssetService.assets_storage().delete(storage_key)
|
||||
except Exception:
|
||||
logger.warning("Failed to delete storage file %s", storage_key, exc_info=True)
|
||||
|
||||
@ -412,8 +302,6 @@ class AppAssetService:
|
||||
assets.updated_by = account_id
|
||||
session.commit()
|
||||
|
||||
AppAssetService._clear_draft_download_cache(cache_keys)
|
||||
|
||||
@staticmethod
|
||||
def publish(session: Session, app_model: App, account_id: str, workflow_id: str) -> AppAssets:
|
||||
tenant_id = app_model.tenant_id
|
||||
@ -436,18 +324,20 @@ class AppAssetService:
|
||||
session.flush()
|
||||
|
||||
ctx = BuildContext(tenant_id=tenant_id, app_id=app_id, build_id=publish_id)
|
||||
built_assets = AssetBuildPipeline().build_all(tree, ctx)
|
||||
built_assets = AssetBuildPipeline(
|
||||
[SkillBuilder(storage=AppAssetService.assets_storage()), FileBuilder()]
|
||||
).build_all(tree, ctx)
|
||||
|
||||
packager = AssetZipPackager(storage)
|
||||
packager = AssetZipPackager(AppAssetService.assets_storage())
|
||||
|
||||
runtime_zip_bytes = packager.package(built_assets)
|
||||
runtime_zip_key = AssetPaths.build_zip(tenant_id, app_id, publish_id)
|
||||
storage.save(runtime_zip_key, runtime_zip_bytes)
|
||||
AppAssetService.assets_storage().save(runtime_zip_key, runtime_zip_bytes)
|
||||
|
||||
source_items = tree_to_asset_items(tree, tenant_id, app_id)
|
||||
source_zip_bytes = packager.package(source_items)
|
||||
source_zip_key = AssetPaths.build_source_zip(tenant_id, app_id, workflow_id)
|
||||
storage.save(source_zip_key, source_zip_bytes)
|
||||
AppAssetService.assets_storage().save(source_zip_key, source_zip_bytes)
|
||||
|
||||
return published
|
||||
|
||||
@ -457,12 +347,14 @@ class AppAssetService:
|
||||
tree = assets.asset_tree
|
||||
|
||||
ctx = BuildContext(tenant_id=tenant_id, app_id=app_id, build_id=assets.id)
|
||||
built_assets = AssetBuildPipeline().build_all(tree, ctx)
|
||||
built_assets = AssetBuildPipeline(
|
||||
[SkillBuilder(storage=AppAssetService.assets_storage()), FileBuilder()]
|
||||
).build_all(tree, ctx)
|
||||
|
||||
packager = AssetZipPackager(storage)
|
||||
packager = AssetZipPackager(storage=AppAssetService.assets_storage())
|
||||
zip_bytes = packager.package(built_assets)
|
||||
zip_key = AssetPaths.build_zip(tenant_id, app_id, assets.id)
|
||||
storage.save(zip_key, zip_bytes)
|
||||
AppAssetService.assets_storage().save(zip_key, zip_bytes)
|
||||
|
||||
@staticmethod
|
||||
def get_file_download_url(
|
||||
@ -480,14 +372,13 @@ class AppAssetService:
|
||||
raise AppAssetNodeNotFoundError(f"File node {node_id} not found")
|
||||
|
||||
storage_key = AssetPaths.draft_file(app_model.tenant_id, app_model.id, node_id)
|
||||
presign_storage = FilePresignStorage(storage.storage_runner)
|
||||
return presign_storage.get_download_url(storage_key, expires_in)
|
||||
return AppAssetService.assets_storage().get_download_url(storage_key, expires_in)
|
||||
|
||||
@staticmethod
|
||||
def get_source_zip_bytes(tenant_id: str, app_id: str, workflow_id: str) -> bytes | None:
|
||||
source_zip_key = AssetPaths.build_source_zip(tenant_id, app_id, workflow_id)
|
||||
try:
|
||||
return storage.load_once(source_zip_key)
|
||||
return AppAssetService.assets_storage().load_once(source_zip_key)
|
||||
except Exception:
|
||||
logger.warning("Source zip not found: %s", source_zip_key)
|
||||
return None
|
||||
@ -545,7 +436,7 @@ class AppAssetService:
|
||||
session.commit()
|
||||
|
||||
storage_key = AssetPaths.draft_file(app_model.tenant_id, app_model.id, node_id)
|
||||
presign_storage = FilePresignStorage(storage.storage_runner)
|
||||
presign_storage = AppAssetService.assets_storage()
|
||||
upload_url = presign_storage.get_upload_url(storage_key, expires_in)
|
||||
|
||||
return node, upload_url
|
||||
@ -581,12 +472,12 @@ class AppAssetService:
|
||||
assets.updated_by = account_id
|
||||
session.commit()
|
||||
|
||||
presign_storage = FilePresignStorage(storage.storage_runner)
|
||||
storage = AppAssetService.assets_storage()
|
||||
|
||||
def fill_urls(node: BatchUploadNode) -> None:
|
||||
if node.node_type == AssetNodeType.FILE and node.id:
|
||||
storage_key = AssetPaths.draft_file(app_model.tenant_id, app_model.id, node.id)
|
||||
node.upload_url = presign_storage.get_upload_url(storage_key, expires_in)
|
||||
node.upload_url = storage.get_upload_url(storage_key, expires_in)
|
||||
for child in node.children:
|
||||
fill_urls(child)
|
||||
|
||||
|
||||
@ -0,0 +1,204 @@
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
|
||||
from extensions.storage.cached_presign_storage import CachedPresignStorage
|
||||
|
||||
|
||||
class TestCachedPresignStorage:
|
||||
"""Test suite for CachedPresignStorage class."""
|
||||
|
||||
@pytest.fixture
|
||||
def mock_storage(self):
|
||||
"""Create a mock underlying storage."""
|
||||
return Mock()
|
||||
|
||||
@pytest.fixture
|
||||
def mock_redis(self):
|
||||
"""Create a mock Redis client."""
|
||||
return Mock()
|
||||
|
||||
@pytest.fixture
|
||||
def cached_storage(self, mock_storage, mock_redis):
|
||||
"""Create CachedPresignStorage with mocks."""
|
||||
return CachedPresignStorage(
|
||||
storage=mock_storage,
|
||||
redis_client=mock_redis,
|
||||
cache_key_prefix="test_prefix",
|
||||
)
|
||||
|
||||
def test_get_download_url_returns_cached_on_hit(self, cached_storage, mock_storage, mock_redis):
|
||||
"""Test that cached URL is returned when cache hit occurs."""
|
||||
mock_redis.mget.return_value = [b"https://cached-url.com/file.txt"]
|
||||
|
||||
result = cached_storage.get_download_url("path/to/file.txt", expires_in=3600)
|
||||
|
||||
assert result == "https://cached-url.com/file.txt"
|
||||
mock_redis.mget.assert_called_once_with(["test_prefix:path/to/file.txt"])
|
||||
mock_storage.get_download_url.assert_not_called()
|
||||
mock_redis.setex.assert_not_called()
|
||||
|
||||
def test_get_download_url_calls_storage_on_miss(self, cached_storage, mock_storage, mock_redis):
|
||||
"""Test that storage is called and result cached on cache miss."""
|
||||
mock_redis.mget.return_value = [None]
|
||||
mock_storage.get_download_url.return_value = "https://new-url.com/file.txt"
|
||||
|
||||
result = cached_storage.get_download_url("path/to/file.txt", expires_in=3600)
|
||||
|
||||
assert result == "https://new-url.com/file.txt"
|
||||
mock_redis.mget.assert_called_once_with(["test_prefix:path/to/file.txt"])
|
||||
mock_storage.get_download_url.assert_called_once_with("path/to/file.txt", 3600)
|
||||
mock_redis.setex.assert_called_once()
|
||||
call_args = mock_redis.setex.call_args
|
||||
assert call_args[0][0] == "test_prefix:path/to/file.txt"
|
||||
assert call_args[0][2] == "https://new-url.com/file.txt"
|
||||
|
||||
def test_get_download_urls_batch_operation(self, cached_storage, mock_storage, mock_redis):
|
||||
"""Test batch URL retrieval with mixed cache hits/misses."""
|
||||
mock_redis.mget.return_value = [b"https://cached1.com", None, b"https://cached2.com"]
|
||||
mock_storage.get_download_url.return_value = "https://new.com"
|
||||
|
||||
filenames = ["file1.txt", "file2.txt", "file3.txt"]
|
||||
result = cached_storage.get_download_urls(filenames, expires_in=3600)
|
||||
|
||||
assert result == ["https://cached1.com", "https://new.com", "https://cached2.com"]
|
||||
mock_storage.get_download_url.assert_called_once_with("file2.txt", 3600)
|
||||
mock_redis.setex.assert_called_once()
|
||||
|
||||
def test_get_download_urls_empty_list(self, cached_storage, mock_storage, mock_redis):
|
||||
"""Test batch URL retrieval with empty list."""
|
||||
result = cached_storage.get_download_urls([], expires_in=3600)
|
||||
|
||||
assert result == []
|
||||
mock_redis.mget.assert_not_called()
|
||||
mock_storage.get_download_url.assert_not_called()
|
||||
|
||||
def test_invalidate_clears_cache(self, cached_storage, mock_redis):
|
||||
"""Test that invalidate deletes the correct cache keys."""
|
||||
filenames = ["file1.txt", "file2.txt"]
|
||||
cached_storage.invalidate(filenames)
|
||||
|
||||
mock_redis.delete.assert_called_once_with(
|
||||
"test_prefix:file1.txt",
|
||||
"test_prefix:file2.txt",
|
||||
)
|
||||
|
||||
def test_invalidate_empty_list(self, cached_storage, mock_redis):
|
||||
"""Test that invalidate does nothing for empty list."""
|
||||
cached_storage.invalidate([])
|
||||
|
||||
mock_redis.delete.assert_not_called()
|
||||
|
||||
def test_ttl_calculation_with_normal_expiry(self, cached_storage):
|
||||
"""Test TTL is computed correctly for normal expiry values."""
|
||||
ttl = cached_storage._compute_ttl(3600)
|
||||
expected = 3600 - CachedPresignStorage.TTL_BUFFER_SECONDS
|
||||
assert ttl == expected
|
||||
|
||||
def test_ttl_calculation_respects_minimum(self, cached_storage):
|
||||
"""Test TTL respects minimum value for short expiry times."""
|
||||
ttl = cached_storage._compute_ttl(100)
|
||||
assert ttl == CachedPresignStorage.MIN_TTL_SECONDS
|
||||
|
||||
def test_ttl_calculation_edge_case(self, cached_storage):
|
||||
"""Test TTL calculation at the boundary."""
|
||||
ttl = cached_storage._compute_ttl(CachedPresignStorage.TTL_BUFFER_SECONDS + 30)
|
||||
assert ttl == CachedPresignStorage.MIN_TTL_SECONDS
|
||||
|
||||
def test_graceful_degradation_on_redis_mget_error(self, cached_storage, mock_storage, mock_redis):
|
||||
"""Test that storage is called when Redis mget fails."""
|
||||
mock_redis.mget.side_effect = Exception("Redis connection error")
|
||||
mock_storage.get_download_url.return_value = "https://new-url.com/file.txt"
|
||||
|
||||
result = cached_storage.get_download_url("path/to/file.txt", expires_in=3600)
|
||||
|
||||
assert result == "https://new-url.com/file.txt"
|
||||
mock_storage.get_download_url.assert_called_once_with("path/to/file.txt", 3600)
|
||||
|
||||
def test_graceful_degradation_on_redis_setex_error(self, cached_storage, mock_storage, mock_redis):
|
||||
"""Test that URL is still returned when Redis setex fails."""
|
||||
mock_redis.mget.return_value = [None]
|
||||
mock_redis.setex.side_effect = Exception("Redis connection error")
|
||||
mock_storage.get_download_url.return_value = "https://new-url.com/file.txt"
|
||||
|
||||
result = cached_storage.get_download_url("path/to/file.txt", expires_in=3600)
|
||||
|
||||
assert result == "https://new-url.com/file.txt"
|
||||
|
||||
def test_graceful_degradation_on_redis_delete_error(self, cached_storage, mock_redis):
|
||||
"""Test that invalidate doesn't raise when Redis delete fails."""
|
||||
mock_redis.delete.side_effect = Exception("Redis connection error")
|
||||
|
||||
cached_storage.invalidate(["file.txt"])
|
||||
|
||||
def test_delegates_save_to_storage(self, cached_storage, mock_storage):
|
||||
"""Test that save delegates to underlying storage."""
|
||||
cached_storage.save("file.txt", b"data")
|
||||
mock_storage.save.assert_called_once_with("file.txt", b"data")
|
||||
|
||||
def test_delegates_load_once_to_storage(self, cached_storage, mock_storage):
|
||||
"""Test that load_once delegates to underlying storage."""
|
||||
mock_storage.load_once.return_value = b"content"
|
||||
result = cached_storage.load_once("file.txt")
|
||||
assert result == b"content"
|
||||
mock_storage.load_once.assert_called_once_with("file.txt")
|
||||
|
||||
def test_delegates_exists_to_storage(self, cached_storage, mock_storage):
|
||||
"""Test that exists delegates to underlying storage."""
|
||||
mock_storage.exists.return_value = True
|
||||
result = cached_storage.exists("file.txt")
|
||||
assert result is True
|
||||
mock_storage.exists.assert_called_once_with("file.txt")
|
||||
|
||||
def test_delete_delegates_and_invalidates_cache(self, cached_storage, mock_storage, mock_redis):
|
||||
"""Test that delete delegates to storage and invalidates cache."""
|
||||
cached_storage.delete("file.txt")
|
||||
|
||||
mock_storage.delete.assert_called_once_with("file.txt")
|
||||
mock_redis.delete.assert_called_once_with("test_prefix:file.txt")
|
||||
|
||||
def test_delegates_scan_to_storage(self, cached_storage, mock_storage):
|
||||
"""Test that scan delegates to underlying storage."""
|
||||
mock_storage.scan.return_value = ["file1.txt", "file2.txt"]
|
||||
result = cached_storage.scan("path/", files=True, directories=False)
|
||||
assert result == ["file1.txt", "file2.txt"]
|
||||
mock_storage.scan.assert_called_once_with("path/", files=True, directories=False)
|
||||
|
||||
def test_delegates_get_upload_url_to_storage(self, cached_storage, mock_storage):
|
||||
"""Test that get_upload_url delegates to underlying storage."""
|
||||
mock_storage.get_upload_url.return_value = "https://upload-url.com"
|
||||
result = cached_storage.get_upload_url("file.txt", expires_in=3600)
|
||||
assert result == "https://upload-url.com"
|
||||
mock_storage.get_upload_url.assert_called_once_with("file.txt", 3600)
|
||||
|
||||
def test_cache_key_generation(self, cached_storage):
|
||||
"""Test cache key is generated correctly."""
|
||||
key = cached_storage._cache_key("path/to/file.txt")
|
||||
assert key == "test_prefix:path/to/file.txt"
|
||||
|
||||
def test_cached_value_decoded_from_bytes(self, cached_storage, mock_storage, mock_redis):
|
||||
"""Test that bytes cached values are decoded to strings."""
|
||||
mock_redis.mget.return_value = [b"https://cached-url.com"]
|
||||
|
||||
result = cached_storage.get_download_url("file.txt")
|
||||
|
||||
assert result == "https://cached-url.com"
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_cached_value_decoded_from_bytearray(self, cached_storage, mock_storage, mock_redis):
|
||||
"""Test that bytearray cached values are decoded to strings."""
|
||||
mock_redis.mget.return_value = [bytearray(b"https://cached-url.com")]
|
||||
|
||||
result = cached_storage.get_download_url("file.txt")
|
||||
|
||||
assert result == "https://cached-url.com"
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_default_cache_key_prefix(self, mock_storage, mock_redis):
|
||||
"""Test default cache key prefix is used when not specified."""
|
||||
storage = CachedPresignStorage(
|
||||
storage=mock_storage,
|
||||
redis_client=mock_redis,
|
||||
)
|
||||
key = storage._cache_key("file.txt")
|
||||
assert key == "presign_cache:file.txt"
|
||||
Reference in New Issue
Block a user