mirror of
https://github.com/langgenius/dify.git
synced 2026-02-05 03:05:32 +08:00
Compare commits
1 Commits
feat/auto-
...
fix/consol
| Author | SHA1 | Date | |
|---|---|---|---|
| ce644e1549 |
@ -10,7 +10,7 @@ import services
|
||||
from controllers.common.fields import Parameters as ParametersResponse
|
||||
from controllers.common.fields import Site as SiteResponse
|
||||
from controllers.common.schema import get_or_create_model
|
||||
from controllers.console import api
|
||||
from controllers.console import api, console_ns
|
||||
from controllers.console.app.error import (
|
||||
AppUnavailableError,
|
||||
AudioTooLargeError,
|
||||
|
||||
@ -6,7 +6,7 @@ from yarl import URL
|
||||
|
||||
from configs import dify_config
|
||||
from core.helper.download import download_with_size_limit
|
||||
from core.plugin.entities.marketplace import MarketplacePluginDeclaration, MarketplacePluginSnapshot
|
||||
from core.plugin.entities.marketplace import MarketplacePluginDeclaration
|
||||
|
||||
marketplace_api_url = URL(str(dify_config.MARKETPLACE_API_URL))
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -45,17 +45,17 @@ def batch_fetch_plugin_by_ids(plugin_ids: list[str]) -> list[dict]:
|
||||
|
||||
def batch_fetch_plugin_manifests_ignore_deserialization_error(
|
||||
plugin_ids: list[str],
|
||||
) -> Sequence[MarketplacePluginSnapshot]:
|
||||
) -> Sequence[MarketplacePluginDeclaration]:
|
||||
if len(plugin_ids) == 0:
|
||||
return []
|
||||
|
||||
url = str(marketplace_api_url / "api/v1/plugins/batch")
|
||||
response = httpx.post(url, json={"plugin_ids": plugin_ids}, headers={"X-Dify-Version": dify_config.project.version})
|
||||
response.raise_for_status()
|
||||
result: list[MarketplacePluginSnapshot] = []
|
||||
result: list[MarketplacePluginDeclaration] = []
|
||||
for plugin in response.json()["data"]["plugins"]:
|
||||
try:
|
||||
result.append(MarketplacePluginSnapshot.model_validate(plugin))
|
||||
result.append(MarketplacePluginDeclaration.model_validate(plugin))
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Failed to deserialize marketplace plugin manifest for %s", plugin.get("plugin_id", "unknown")
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
from pydantic import BaseModel, Field, computed_field, model_validator
|
||||
from pydantic import BaseModel, Field, model_validator
|
||||
|
||||
from core.model_runtime.entities.provider_entities import ProviderEntity
|
||||
from core.plugin.entities.endpoint import EndpointProviderDeclaration
|
||||
@ -48,16 +48,3 @@ class MarketplacePluginDeclaration(BaseModel):
|
||||
if "tool" in data and not data["tool"]:
|
||||
del data["tool"]
|
||||
return data
|
||||
|
||||
|
||||
class MarketplacePluginSnapshot(BaseModel):
|
||||
org: str
|
||||
name: str
|
||||
latest_version: str
|
||||
latest_package_identifier: str
|
||||
latest_package_url: str
|
||||
|
||||
@computed_field
|
||||
@property
|
||||
def plugin_id(self) -> str:
|
||||
return self.latest_package_identifier.split(":")[0]
|
||||
|
||||
@ -7,7 +7,6 @@ import app
|
||||
from extensions.ext_database import db
|
||||
from models.account import TenantPluginAutoUpgradeStrategy
|
||||
from tasks import process_tenant_plugin_autoupgrade_check_task as check_task
|
||||
from tasks.process_tenant_plugin_autoupgrade_check_task import fetch_global_plugin_manifest
|
||||
|
||||
AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL = 15 * 60 # 15 minutes
|
||||
MAX_CONCURRENT_CHECK_TASKS = 20
|
||||
@ -41,8 +40,6 @@ def check_upgradable_plugin_task():
|
||||
) # make sure all strategies are checked in this interval
|
||||
batch_interval_time = (AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL / batch_chunk_count) if batch_chunk_count > 0 else 0
|
||||
|
||||
fetch_global_plugin_manifest()
|
||||
|
||||
for i in range(0, total_strategies, MAX_CONCURRENT_CHECK_TASKS):
|
||||
batch_strategies = strategies[i : i + MAX_CONCURRENT_CHECK_TASKS]
|
||||
for strategy in batch_strategies:
|
||||
|
||||
@ -4,13 +4,10 @@ import operator
|
||||
import typing
|
||||
|
||||
import click
|
||||
import httpx
|
||||
from celery import shared_task
|
||||
|
||||
from configs import dify_config
|
||||
from core.helper import marketplace
|
||||
from core.helper.marketplace import marketplace_api_url
|
||||
from core.plugin.entities.marketplace import MarketplacePluginSnapshot
|
||||
from core.helper.marketplace import MarketplacePluginDeclaration
|
||||
from core.plugin.entities.plugin import PluginInstallationSource
|
||||
from core.plugin.impl.plugin import PluginInstaller
|
||||
from extensions.ext_redis import redis_client
|
||||
@ -28,11 +25,11 @@ def _get_redis_cache_key(plugin_id: str) -> str:
|
||||
return f"{CACHE_REDIS_KEY_PREFIX}{plugin_id}"
|
||||
|
||||
|
||||
def _get_cached_manifest(plugin_id: str) -> typing.Union[MarketplacePluginSnapshot, None, bool]:
|
||||
def _get_cached_manifest(plugin_id: str) -> typing.Union[MarketplacePluginDeclaration, None, bool]:
|
||||
"""
|
||||
Get cached plugin manifest from Redis.
|
||||
Returns:
|
||||
- MarketplacePluginSnapshot: if found in cache
|
||||
- MarketplacePluginDeclaration: if found in cache
|
||||
- None: if cached as not found (marketplace returned no result)
|
||||
- False: if not in cache at all
|
||||
"""
|
||||
@ -46,13 +43,13 @@ def _get_cached_manifest(plugin_id: str) -> typing.Union[MarketplacePluginSnapsh
|
||||
if cached_json is None:
|
||||
return None
|
||||
|
||||
return MarketplacePluginSnapshot.model_validate(cached_json)
|
||||
return MarketplacePluginDeclaration.model_validate(cached_json)
|
||||
except Exception:
|
||||
logger.exception("Failed to get cached manifest for plugin %s", plugin_id)
|
||||
return False
|
||||
|
||||
|
||||
def _set_cached_manifest(plugin_id: str, manifest: typing.Union[MarketplacePluginSnapshot, None]) -> None:
|
||||
def _set_cached_manifest(plugin_id: str, manifest: typing.Union[MarketplacePluginDeclaration, None]) -> None:
|
||||
"""
|
||||
Cache plugin manifest in Redis.
|
||||
Args:
|
||||
@ -75,9 +72,9 @@ def _set_cached_manifest(plugin_id: str, manifest: typing.Union[MarketplacePlugi
|
||||
|
||||
def marketplace_batch_fetch_plugin_manifests(
|
||||
plugin_ids_plain_list: list[str],
|
||||
) -> list[MarketplacePluginSnapshot]:
|
||||
) -> list[MarketplacePluginDeclaration]:
|
||||
"""Fetch plugin manifests with Redis caching support."""
|
||||
cached_manifests: dict[str, typing.Union[MarketplacePluginSnapshot, None]] = {}
|
||||
cached_manifests: dict[str, typing.Union[MarketplacePluginDeclaration, None]] = {}
|
||||
not_cached_plugin_ids: list[str] = []
|
||||
|
||||
# Check Redis cache for each plugin
|
||||
@ -111,29 +108,15 @@ def marketplace_batch_fetch_plugin_manifests(
|
||||
_set_cached_manifest(plugin_id, None)
|
||||
|
||||
# Build result list from cached manifests
|
||||
result: list[MarketplacePluginSnapshot] = []
|
||||
result: list[MarketplacePluginDeclaration] = []
|
||||
for plugin_id in plugin_ids_plain_list:
|
||||
cached_manifest: typing.Union[MarketplacePluginSnapshot, None] = cached_manifests.get(plugin_id)
|
||||
cached_manifest: typing.Union[MarketplacePluginDeclaration, None] = cached_manifests.get(plugin_id)
|
||||
if cached_manifest is not None:
|
||||
result.append(cached_manifest)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def fetch_global_plugin_manifest():
|
||||
url = str(marketplace_api_url / "api/v1/dist/plugins/manifest.json")
|
||||
response = httpx.get(url, headers={"X-Dify-Version": dify_config.project.version}, timeout=30)
|
||||
response.raise_for_status()
|
||||
raw_json = response.json()
|
||||
plugin_snapshots = [MarketplacePluginSnapshot.model_validate(plugin) for plugin in raw_json["plugins"]]
|
||||
for plugin_snapshot in plugin_snapshots:
|
||||
redis_client.setex(
|
||||
name=f"{CACHE_REDIS_KEY_PREFIX}{plugin_snapshot.plugin_id}",
|
||||
time=CACHE_REDIS_TTL,
|
||||
value=plugin_snapshot.model_dump_json(),
|
||||
)
|
||||
|
||||
|
||||
@shared_task(queue="plugin")
|
||||
def process_tenant_plugin_autoupgrade_check_task(
|
||||
tenant_id: str,
|
||||
|
||||
Reference in New Issue
Block a user