mirror of
https://github.com/langgenius/dify.git
synced 2026-05-01 16:08:04 +08:00
feat(skill-builder): enhance skill loading and compilation with parallel processing
- Introduced threading for loading skills and uploading compiled content to improve performance. - Added data classes for better structure and clarity in handling loaded and compiled skills. - Refactored the skill compilation process to separate loading and uploading, enhancing maintainability.
This commit is contained in:
@ -1,4 +1,6 @@
|
||||
import json
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from dataclasses import dataclass
|
||||
|
||||
from core.app.entities.app_asset_entities import AppAssetFileTree, AppAssetNode
|
||||
from core.app_assets.entities import AssetItem, FileAsset
|
||||
@ -11,11 +13,29 @@ from extensions.ext_storage import storage
|
||||
from .base import BuildContext
|
||||
|
||||
|
||||
@dataclass
|
||||
class _LoadedSkill:
|
||||
node: AppAssetNode
|
||||
path: str
|
||||
content: str
|
||||
metadata: dict
|
||||
|
||||
|
||||
@dataclass
|
||||
class _CompiledSkill:
|
||||
node: AppAssetNode
|
||||
path: str
|
||||
resolved_key: str
|
||||
content_bytes: bytes
|
||||
|
||||
|
||||
class SkillBuilder:
|
||||
_nodes: list[tuple[AppAssetNode, str]]
|
||||
_max_workers: int
|
||||
|
||||
def __init__(self) -> None:
|
||||
def __init__(self, max_workers: int = 8) -> None:
|
||||
self._nodes = []
|
||||
self._max_workers = max_workers
|
||||
|
||||
def accept(self, node: AppAssetNode) -> bool:
|
||||
return node.extension == "md"
|
||||
@ -27,9 +47,56 @@ class SkillBuilder:
|
||||
if not self._nodes:
|
||||
return []
|
||||
|
||||
# 1. Load and create documents
|
||||
documents: list[SkillDocument] = []
|
||||
for node, _ in self._nodes:
|
||||
# 1. Load all skills (parallel IO)
|
||||
loaded = self._load_all(ctx)
|
||||
|
||||
# 2. Compile all skills (CPU-bound, single thread)
|
||||
documents = [
|
||||
SkillDocument(skill_id=s.node.id, content=s.content, metadata=s.metadata)
|
||||
for s in loaded
|
||||
]
|
||||
artifact_set = SkillCompiler().compile_all(documents, tree, ctx.build_id)
|
||||
|
||||
# 3. Save tool artifact
|
||||
SkillManager.save_tool_artifact(
|
||||
ctx.tenant_id, ctx.app_id, ctx.build_id, artifact_set.get_tool_artifact()
|
||||
)
|
||||
|
||||
# 4. Prepare compiled skills for upload
|
||||
to_upload: list[_CompiledSkill] = []
|
||||
for skill in loaded:
|
||||
artifact = artifact_set.get(skill.node.id)
|
||||
if artifact is None:
|
||||
continue
|
||||
resolved_key = AssetPaths.build_resolved_file(
|
||||
ctx.tenant_id, ctx.app_id, ctx.build_id, skill.node.id
|
||||
)
|
||||
to_upload.append(
|
||||
_CompiledSkill(
|
||||
node=skill.node,
|
||||
path=skill.path,
|
||||
resolved_key=resolved_key,
|
||||
content_bytes=artifact.content.encode("utf-8"),
|
||||
)
|
||||
)
|
||||
|
||||
# 5. Upload all compiled skills (parallel IO)
|
||||
self._upload_all(to_upload)
|
||||
|
||||
# 6. Return FileAssets
|
||||
return [
|
||||
FileAsset(
|
||||
asset_id=s.node.id,
|
||||
path=s.path,
|
||||
file_name=s.node.name,
|
||||
extension=s.node.extension or "",
|
||||
storage_key=s.resolved_key,
|
||||
)
|
||||
for s in to_upload
|
||||
]
|
||||
|
||||
def _load_all(self, ctx: BuildContext) -> list[_LoadedSkill]:
|
||||
def load_one(node: AppAssetNode, path: str) -> _LoadedSkill:
|
||||
draft_key = AssetPaths.draft_file(ctx.tenant_id, ctx.app_id, node.id)
|
||||
try:
|
||||
data = json.loads(storage.load_once(draft_key))
|
||||
@ -38,48 +105,17 @@ class SkillBuilder:
|
||||
except Exception:
|
||||
content = ""
|
||||
metadata = {}
|
||||
return _LoadedSkill(node=node, path=path, content=content, metadata=metadata)
|
||||
|
||||
documents.append(
|
||||
SkillDocument(
|
||||
skill_id=node.id,
|
||||
content=content,
|
||||
metadata=metadata,
|
||||
)
|
||||
)
|
||||
with ThreadPoolExecutor(max_workers=self._max_workers) as executor:
|
||||
futures = [executor.submit(load_one, node, path) for node, path in self._nodes]
|
||||
return [f.result() for f in futures]
|
||||
|
||||
# 2. Compile all skills
|
||||
compiler = SkillCompiler()
|
||||
artifact_set = compiler.compile_all(documents, tree, ctx.build_id)
|
||||
def _upload_all(self, skills: list[_CompiledSkill]) -> None:
|
||||
def upload_one(skill: _CompiledSkill) -> None:
|
||||
storage.save(skill.resolved_key, skill.content_bytes)
|
||||
|
||||
# 3. Save tool artifact
|
||||
SkillManager.save_tool_artifact(
|
||||
ctx.tenant_id,
|
||||
ctx.app_id,
|
||||
ctx.build_id,
|
||||
artifact_set.get_tool_artifact(),
|
||||
)
|
||||
|
||||
# 4. Save compiled content to storage and return FileAssets
|
||||
results: list[AssetItem] = []
|
||||
for node, path in self._nodes:
|
||||
artifact = artifact_set.get(node.id)
|
||||
if artifact is None:
|
||||
continue
|
||||
|
||||
# Write compiled content to storage
|
||||
resolved_key = AssetPaths.build_resolved_file(
|
||||
ctx.tenant_id, ctx.app_id, ctx.build_id, node.id
|
||||
)
|
||||
storage.save(resolved_key, artifact.content.encode("utf-8"))
|
||||
|
||||
results.append(
|
||||
FileAsset(
|
||||
asset_id=node.id,
|
||||
path=path,
|
||||
file_name=node.name,
|
||||
extension=node.extension or "",
|
||||
storage_key=resolved_key,
|
||||
)
|
||||
)
|
||||
|
||||
return results
|
||||
with ThreadPoolExecutor(max_workers=self._max_workers) as executor:
|
||||
futures = [executor.submit(upload_one, skill) for skill in skills]
|
||||
for f in futures:
|
||||
f.result()
|
||||
|
||||
Reference in New Issue
Block a user