mirror of
https://github.com/langgenius/dify.git
synced 2026-05-04 01:18:05 +08:00
feat: add DB inline content cache for app asset draft files
Introduce app_asset_contents table as a read-through cache over S3 for text-like asset files (e.g. .md skill documents). This eliminates N individual S3 fetches during SkillBuilder builds — bulk_load pulls all content in a single SQL query with S3 fallback on miss. Key components: - CachedContentAccessor: DB-first read / dual-write / S3 fallback - AssetContentService: static DB operations (get, get_many, upsert, delete) - should_mirror(): single source of truth for extension-based policy - Alembic migration for app_asset_contents table Modified callers: - SkillBuilder uses accessor.bulk_load() instead of per-node S3 reads - AppAssetService.get/update_file_content route through accessor - delete_node cleans both DB cache and S3 - draft_app_assets_initializer uses should_mirror() instead of hardcoded .md
This commit is contained in:
@ -158,8 +158,10 @@ class AppAssetPackageService:
|
||||
session.flush()
|
||||
|
||||
asset_storage = AppAssetService.get_storage()
|
||||
accessor = AppAssetService.get_accessor(tenant_id, app_id)
|
||||
pipeline = AssetBuildPipeline([SkillBuilder(accessor=accessor, storage=asset_storage), FileBuilder()])
|
||||
ctx = BuildContext(tenant_id=tenant_id, app_id=app_id, build_id=publish_id)
|
||||
built_assets = AssetBuildPipeline([SkillBuilder(storage=asset_storage), FileBuilder()]).build_all(tree, ctx)
|
||||
built_assets = pipeline.build_all(tree, ctx)
|
||||
|
||||
runtime_zip_key = AssetPaths.build_zip(tenant_id, app_id, publish_id)
|
||||
runtime_upload_url = asset_storage.get_upload_url(runtime_zip_key)
|
||||
@ -194,10 +196,10 @@ class AppAssetPackageService:
|
||||
tree = assets.asset_tree
|
||||
|
||||
asset_storage = AppAssetService.get_storage()
|
||||
accessor = AppAssetService.get_accessor(tenant_id, app_id)
|
||||
pipeline = AssetBuildPipeline([SkillBuilder(accessor=accessor, storage=asset_storage), FileBuilder()])
|
||||
ctx = BuildContext(tenant_id=tenant_id, app_id=app_id, build_id=assets.id)
|
||||
built_assets: list[AssetItem] = AssetBuildPipeline(
|
||||
[SkillBuilder(storage=asset_storage), FileBuilder()]
|
||||
).build_all(tree, ctx)
|
||||
built_assets: list[AssetItem] = pipeline.build_all(tree, ctx)
|
||||
|
||||
user_id = getattr(assets, "updated_by", None) or getattr(assets, "created_by", None) or "system"
|
||||
key = AssetPaths.build_zip(tenant_id, app_id, assets.id)
|
||||
|
||||
@ -13,6 +13,7 @@ from core.app.entities.app_asset_entities import (
|
||||
TreeParentNotFoundError,
|
||||
TreePathConflictError,
|
||||
)
|
||||
from core.app_assets.accessor import CachedContentAccessor
|
||||
from core.app_assets.entities.assets import AssetItem
|
||||
from core.app_assets.storage import AssetPaths
|
||||
from extensions.ext_database import db
|
||||
@ -22,6 +23,7 @@ from extensions.storage.cached_presign_storage import CachedPresignStorage
|
||||
from extensions.storage.file_presign_storage import FilePresignStorage
|
||||
from models.app_asset import AppAssets
|
||||
from models.model import App
|
||||
from services.asset_content_service import AssetContentService
|
||||
|
||||
from .errors.app_asset import (
|
||||
AppAssetNodeNotFoundError,
|
||||
@ -207,6 +209,11 @@ class AppAssetService:
|
||||
|
||||
return node
|
||||
|
||||
@staticmethod
|
||||
def get_accessor(tenant_id: str, app_id: str) -> CachedContentAccessor:
|
||||
"""Get a content accessor with DB caching for the given app."""
|
||||
return CachedContentAccessor(AppAssetService.get_storage(), tenant_id, app_id)
|
||||
|
||||
@staticmethod
|
||||
def get_file_content(app_model: App, account_id: str, node_id: str) -> bytes:
|
||||
with Session(db.engine) as session:
|
||||
@ -221,9 +228,8 @@ class AppAssetService:
|
||||
max_size_mb = AppAssetService.MAX_PREVIEW_CONTENT_SIZE / 1024 / 1024
|
||||
raise AppAssetNodeTooLargeError(f"File node {node_id} size exceeded the limit: {max_size_mb} MB")
|
||||
|
||||
asset_storage = AppAssetService.get_storage()
|
||||
key = AssetPaths.draft(app_model.tenant_id, app_model.id, node_id)
|
||||
return asset_storage.load_once(key)
|
||||
accessor = AppAssetService.get_accessor(app_model.tenant_id, app_model.id)
|
||||
return accessor.load(node)
|
||||
|
||||
@staticmethod
|
||||
def update_file_content(
|
||||
@ -242,9 +248,8 @@ class AppAssetService:
|
||||
except TreeNodeNotFoundError as e:
|
||||
raise AppAssetNodeNotFoundError(str(e)) from e
|
||||
|
||||
asset_storage = AppAssetService.get_storage()
|
||||
key = AssetPaths.draft(app_model.tenant_id, app_model.id, node_id)
|
||||
asset_storage.save(key, content)
|
||||
accessor = AppAssetService.get_accessor(app_model.tenant_id, app_model.id)
|
||||
accessor.save(node, content)
|
||||
|
||||
assets.asset_tree = tree
|
||||
assets.updated_by = account_id
|
||||
@ -340,8 +345,9 @@ class AppAssetService:
|
||||
assets.updated_by = account_id
|
||||
session.commit()
|
||||
|
||||
# FIXME(Mairuis): sync deletion queue, failed is fine
|
||||
def _delete_file_from_storage(tenant_id: str, app_id: str, node_ids: list[str]) -> None:
|
||||
# Delete from both DB cache and S3 in background; failures are non-fatal.
|
||||
def _delete_files(tenant_id: str, app_id: str, node_ids: list[str]) -> None:
|
||||
AssetContentService.delete_many(tenant_id, app_id, node_ids)
|
||||
asset_storage = AppAssetService.get_storage()
|
||||
for nid in node_ids:
|
||||
key = AssetPaths.draft(tenant_id, app_id, nid)
|
||||
@ -350,9 +356,7 @@ class AppAssetService:
|
||||
except Exception:
|
||||
logger.warning("Failed to delete storage file %s", key, exc_info=True)
|
||||
|
||||
threading.Thread(
|
||||
target=lambda: _delete_file_from_storage(app_model.tenant_id, app_model.id, removed_ids)
|
||||
).start()
|
||||
threading.Thread(target=lambda: _delete_files(app_model.tenant_id, app_model.id, removed_ids)).start()
|
||||
|
||||
@staticmethod
|
||||
def get_file_download_url(
|
||||
@ -469,17 +473,13 @@ class AppAssetService:
|
||||
tree = assets.asset_tree
|
||||
|
||||
taken_by_parent: dict[str | None, set[str]] = {}
|
||||
stack: list[tuple[BatchUploadNode, str | None]] = [
|
||||
(child, None) for child in reversed(input_children)
|
||||
]
|
||||
stack: list[tuple[BatchUploadNode, str | None]] = [(child, None) for child in reversed(input_children)]
|
||||
while stack:
|
||||
node, parent_id = stack.pop()
|
||||
if node.id is None:
|
||||
node.id = str(uuid4())
|
||||
if parent_id not in taken_by_parent:
|
||||
taken_by_parent[parent_id] = {
|
||||
child.name for child in tree.get_children(parent_id)
|
||||
}
|
||||
taken_by_parent[parent_id] = {child.name for child in tree.get_children(parent_id)}
|
||||
taken = taken_by_parent[parent_id]
|
||||
unique_name = tree.ensure_unique_name(
|
||||
parent_id,
|
||||
|
||||
103
api/services/asset_content_service.py
Normal file
103
api/services/asset_content_service.py
Normal file
@ -0,0 +1,103 @@
|
||||
"""Service for the app_asset_contents table.
|
||||
|
||||
Provides single-node and batch DB operations for the inline content cache.
|
||||
All methods are static and open their own short-lived sessions.
|
||||
|
||||
Collaborators:
|
||||
- models.app_asset.AppAssetContent (SQLAlchemy model)
|
||||
- core.app_assets.accessor (accessor abstraction that calls this service)
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
from sqlalchemy import delete, select
|
||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from extensions.ext_database import db
|
||||
from models.app_asset import AppAssetContent
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AssetContentService:
|
||||
"""DB operations for the inline asset content cache.
|
||||
|
||||
All methods are static. All queries are scoped by tenant_id + app_id.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def get(tenant_id: str, app_id: str, node_id: str) -> str | None:
|
||||
"""Get cached content for a single node. Returns None on miss."""
|
||||
with Session(db.engine) as session:
|
||||
return session.execute(
|
||||
select(AppAssetContent.content).where(
|
||||
AppAssetContent.tenant_id == tenant_id,
|
||||
AppAssetContent.app_id == app_id,
|
||||
AppAssetContent.node_id == node_id,
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
|
||||
@staticmethod
|
||||
def get_many(tenant_id: str, app_id: str, node_ids: list[str]) -> dict[str, str]:
|
||||
"""Batch get. Returns {node_id: content} for hits only."""
|
||||
if not node_ids:
|
||||
return {}
|
||||
with Session(db.engine) as session:
|
||||
rows = session.execute(
|
||||
select(AppAssetContent.node_id, AppAssetContent.content).where(
|
||||
AppAssetContent.tenant_id == tenant_id,
|
||||
AppAssetContent.app_id == app_id,
|
||||
AppAssetContent.node_id.in_(node_ids),
|
||||
)
|
||||
).all()
|
||||
return {row.node_id: row.content for row in rows}
|
||||
|
||||
@staticmethod
|
||||
def upsert(tenant_id: str, app_id: str, node_id: str, content: str, size: int) -> None:
|
||||
"""Insert or update inline content for a single node."""
|
||||
with Session(db.engine) as session:
|
||||
stmt = pg_insert(AppAssetContent).values(
|
||||
tenant_id=tenant_id,
|
||||
app_id=app_id,
|
||||
node_id=node_id,
|
||||
content=content,
|
||||
size=size,
|
||||
)
|
||||
stmt = stmt.on_conflict_do_update(
|
||||
constraint="uq_asset_content_node",
|
||||
set_={
|
||||
"content": stmt.excluded.content,
|
||||
"size": stmt.excluded.size,
|
||||
},
|
||||
)
|
||||
session.execute(stmt)
|
||||
session.commit()
|
||||
|
||||
@staticmethod
|
||||
def delete(tenant_id: str, app_id: str, node_id: str) -> None:
|
||||
"""Delete cached content for a single node."""
|
||||
with Session(db.engine) as session:
|
||||
session.execute(
|
||||
delete(AppAssetContent).where(
|
||||
AppAssetContent.tenant_id == tenant_id,
|
||||
AppAssetContent.app_id == app_id,
|
||||
AppAssetContent.node_id == node_id,
|
||||
)
|
||||
)
|
||||
session.commit()
|
||||
|
||||
@staticmethod
|
||||
def delete_many(tenant_id: str, app_id: str, node_ids: list[str]) -> None:
|
||||
"""Delete cached content for multiple nodes."""
|
||||
if not node_ids:
|
||||
return
|
||||
with Session(db.engine) as session:
|
||||
session.execute(
|
||||
delete(AppAssetContent).where(
|
||||
AppAssetContent.tenant_id == tenant_id,
|
||||
AppAssetContent.app_id == app_id,
|
||||
AppAssetContent.node_id.in_(node_ids),
|
||||
)
|
||||
)
|
||||
session.commit()
|
||||
Reference in New Issue
Block a user