Files
dify/api/tests/unit_tests/services/plugin/test_plugin_service.py
2026-05-23 09:12:09 +00:00

435 lines
20 KiB
Python

import datetime
import uuid
from types import SimpleNamespace
from unittest.mock import MagicMock, Mock, patch
from pydantic import TypeAdapter
from redis import RedisError
from core.plugin.entities.plugin_daemon import PluginInstallTask, PluginInstallTaskStatus, PluginModelProviderEntity
from graphon.model_runtime.entities.common_entities import I18nObject
from graphon.model_runtime.entities.provider_entities import ConfigurateMethod, ProviderEntity
MODULE = "core.plugin.plugin_service"
class _FakeSession:
def __init__(self) -> None:
self.execute = Mock()
self.scalars = Mock(return_value=SimpleNamespace(all=Mock(return_value=[])))
def __enter__(self) -> "_FakeSession":
return self
def __exit__(self, exc_type, exc, traceback) -> None:
return None
def begin(self) -> "_FakeSession":
return self
def _build_provider_entity(provider: str = "openai") -> ProviderEntity:
return ProviderEntity(
provider=f"langgenius/{provider}/{provider}",
label=I18nObject(en_US=provider.title()),
supported_model_types=[],
configurate_methods=[ConfigurateMethod.PREDEFINED_MODEL],
)
def _build_plugin_model_provider(*, tenant_id: str = "tenant-1", provider: str = "openai") -> PluginModelProviderEntity:
return PluginModelProviderEntity(
id=uuid.uuid4().hex,
created_at=datetime.datetime.now(),
updated_at=datetime.datetime.now(),
provider=provider,
tenant_id=tenant_id,
plugin_unique_identifier=f"langgenius/{provider}/{provider}",
plugin_id=f"langgenius/{provider}",
declaration=ProviderEntity(
provider=provider,
label=I18nObject(en_US=provider.title()),
supported_model_types=[],
configurate_methods=[ConfigurateMethod.PREDEFINED_MODEL],
),
)
def _build_install_task(*, task_id: str = "task-1", status: PluginInstallTaskStatus) -> PluginInstallTask:
now = datetime.datetime.now()
return PluginInstallTask(
id=task_id,
created_at=now,
updated_at=now,
status=status,
total_plugins=1,
completed_plugins=1 if status != PluginInstallTaskStatus.Pending else 0,
plugins=[],
)
class TestFetchLatestPluginVersion:
def test_skips_marketplace_fetch_when_disabled(self) -> None:
"""Cache misses stay None; marketplace is never called when disabled."""
with (
patch(f"{MODULE}.dify_config") as mock_cfg,
patch(f"{MODULE}.redis_client") as mock_redis,
patch(f"{MODULE}.marketplace") as mock_marketplace,
):
mock_cfg.MARKETPLACE_ENABLED = False
mock_redis.get.return_value = None # all cache misses
from core.plugin.plugin_service import PluginService
result = PluginService.fetch_latest_plugin_version(["langgenius/openai", "langgenius/anthropic"])
mock_marketplace.batch_fetch_plugin_manifests.assert_not_called()
assert result == {"langgenius/openai": None, "langgenius/anthropic": None}
def test_calls_marketplace_fetch_when_enabled(self) -> None:
"""Cache misses trigger marketplace fetch when enabled."""
manifest = MagicMock()
manifest.plugin_id = "langgenius/openai"
manifest.latest_version = "1.0.0"
manifest.latest_package_identifier = "langgenius/openai:1.0.0@abc"
manifest.status = "active"
manifest.deprecated_reason = ""
manifest.alternative_plugin_id = ""
with (
patch(f"{MODULE}.dify_config") as mock_cfg,
patch(f"{MODULE}.redis_client") as mock_redis,
patch(f"{MODULE}.marketplace") as mock_marketplace,
):
mock_cfg.MARKETPLACE_ENABLED = True
mock_redis.get.return_value = None
mock_marketplace.batch_fetch_plugin_manifests.return_value = [manifest]
from core.plugin.plugin_service import PluginService
result = PluginService.fetch_latest_plugin_version(["langgenius/openai"])
# The list arg is mutated by remove() after the call, so check call count + result.
mock_marketplace.batch_fetch_plugin_manifests.assert_called_once()
assert result["langgenius/openai"] is not None
assert result["langgenius/openai"].version == "1.0.0"
class TestPluginModelProviderCache:
def test_fetch_plugin_model_providers_returns_cached_provider_without_calling_daemon(self) -> None:
"""A valid tenant cache entry is reused across runtime calls without plugin daemon access."""
cached_provider = _build_provider_entity()
cached_payload = TypeAdapter(list[ProviderEntity]).dump_json([cached_provider]).decode("utf-8")
with patch(f"{MODULE}.redis_client") as redis_client:
redis_client.get.return_value = cached_payload
from core.plugin.plugin_service import PluginService
client = Mock()
result = PluginService.fetch_plugin_model_providers(tenant_id="tenant-1", client=client)
assert [provider.provider for provider in result] == ["langgenius/openai/openai"]
client.fetch_model_providers.assert_not_called()
redis_client.setex.assert_not_called()
def test_fetch_plugin_model_providers_deletes_invalid_cache_and_refetches(self) -> None:
"""Invalid cache payloads are tenant-scoped invalidated before falling back to the daemon."""
with (
patch(f"{MODULE}.redis_client") as redis_client,
patch(f"{MODULE}.dify_config") as mock_config,
):
redis_client.get.return_value = "not-json"
mock_config.PLUGIN_MODEL_PROVIDERS_CACHE_TTL = 86400
client = Mock()
client.fetch_model_providers.return_value = [_build_plugin_model_provider()]
from core.plugin.plugin_service import PluginService
result = PluginService.fetch_plugin_model_providers(tenant_id="tenant-1", client=client)
cache_key = "plugin_model_providers:tenant_id:tenant-1"
redis_client.delete.assert_called_once_with(cache_key)
redis_client.setex.assert_called_once()
assert redis_client.setex.call_args.args[0] == cache_key
assert redis_client.setex.call_args.args[1] == 86400
assert [provider.provider for provider in result] == ["langgenius/openai/openai"]
def test_fetch_plugin_model_providers_refetches_when_cache_read_fails(self) -> None:
"""Redis read failures do not block provider discovery for the tenant."""
with patch(f"{MODULE}.redis_client") as redis_client:
redis_client.get.side_effect = RedisError("redis unavailable")
client = Mock()
client.fetch_model_providers.return_value = [_build_plugin_model_provider()]
from core.plugin.plugin_service import PluginService
result = PluginService.fetch_plugin_model_providers(tenant_id="tenant-1", client=client)
client.fetch_model_providers.assert_called_once_with("tenant-1")
assert [provider.provider for provider in result] == ["langgenius/openai/openai"]
def test_fetch_plugin_model_providers_returns_fresh_result_when_cache_write_fails(self) -> None:
"""Redis write failures are non-fatal after fresh provider data has been fetched."""
with patch(f"{MODULE}.redis_client") as redis_client:
redis_client.get.return_value = None
redis_client.setex.side_effect = RedisError("redis unavailable")
client = Mock()
client.fetch_model_providers.return_value = [_build_plugin_model_provider()]
from core.plugin.plugin_service import PluginService
result = PluginService.fetch_plugin_model_providers(tenant_id="tenant-1", client=client)
client.fetch_model_providers.assert_called_once_with("tenant-1")
assert [provider.provider for provider in result] == ["langgenius/openai/openai"]
def test_fetch_plugin_model_providers_creates_default_client_on_cache_miss(self) -> None:
"""The service owns plugin daemon access when no runtime-provided client is injected."""
with (
patch(f"{MODULE}.redis_client") as redis_client,
patch(f"{MODULE}.PluginModelClient") as client_cls,
):
redis_client.get.return_value = None
client = client_cls.return_value
client.fetch_model_providers.return_value = [_build_plugin_model_provider()]
from core.plugin.plugin_service import PluginService
result = PluginService.fetch_plugin_model_providers(tenant_id="tenant-1")
client_cls.assert_called_once_with()
client.fetch_model_providers.assert_called_once_with("tenant-1")
assert [provider.provider for provider in result] == ["langgenius/openai/openai"]
def test_invalidate_plugin_model_providers_cache_uses_tenant_cache_key(self) -> None:
with patch(f"{MODULE}.redis_client") as redis_client:
from core.plugin.plugin_service import PluginService
PluginService.invalidate_plugin_model_providers_cache("tenant-1")
redis_client.delete.assert_called_once_with("plugin_model_providers:tenant_id:tenant-1")
def test_invalidate_plugin_model_providers_cache_ignores_redis_delete_failure(self) -> None:
with patch(f"{MODULE}.redis_client") as redis_client:
redis_client.delete.side_effect = RedisError("redis unavailable")
from core.plugin.plugin_service import PluginService
PluginService.invalidate_plugin_model_providers_cache("tenant-1")
redis_client.delete.assert_called_once_with("plugin_model_providers:tenant_id:tenant-1")
class TestPluginModelProviderCacheInvalidation:
def test_fetch_install_task_invalidates_model_provider_cache_when_finished(self) -> None:
"""Finished plugin install tasks invalidate tenant provider cache."""
task = _build_install_task(status=PluginInstallTaskStatus.Success)
with (
patch(f"{MODULE}.PluginInstaller") as installer_cls,
patch(f"{MODULE}.PluginService.invalidate_plugin_model_providers_cache") as invalidate_cache,
):
installer_cls.return_value.fetch_plugin_installation_task.return_value = task
from core.plugin.plugin_service import PluginService
result = PluginService.fetch_install_task("tenant-1", "task-1")
assert result is task
invalidate_cache.assert_called_once_with("tenant-1")
def test_fetch_install_tasks_invalidates_model_provider_cache_for_finished_tasks(self) -> None:
"""Finished tasks from task list polling also invalidate tenant provider cache."""
task = _build_install_task(status=PluginInstallTaskStatus.Success)
with (
patch(f"{MODULE}.PluginInstaller") as installer_cls,
patch(f"{MODULE}.PluginService.invalidate_plugin_model_providers_cache") as invalidate_cache,
):
installer_cls.return_value.fetch_plugin_installation_tasks.return_value = [task]
from core.plugin.plugin_service import PluginService
result = PluginService.fetch_install_tasks("tenant-1", 1, 256)
assert result == [task]
invalidate_cache.assert_called_once_with("tenant-1")
def test_fetch_install_tasks_ignores_running_model_provider_cache_tasks(self) -> None:
"""Running plugin install tasks do not invalidate provider cache until they reach a terminal state."""
task = _build_install_task(status=PluginInstallTaskStatus.Running)
with (
patch(f"{MODULE}.PluginInstaller") as installer_cls,
patch(f"{MODULE}.PluginService.invalidate_plugin_model_providers_cache") as invalidate_cache,
):
installer_cls.return_value.fetch_plugin_installation_tasks.return_value = [task]
from core.plugin.plugin_service import PluginService
result = PluginService.fetch_install_tasks("tenant-1", 1, 256)
assert result == [task]
invalidate_cache.assert_not_called()
def test_upgrade_plugin_with_marketplace_invalidates_model_provider_cache_for_tenant(self) -> None:
"""Marketplace upgrades invalidate only the mutated tenant provider cache."""
with (
patch(f"{MODULE}.dify_config") as mock_config,
patch(f"{MODULE}.FeatureService") as feature_service,
patch(f"{MODULE}.PluginInstaller") as installer_cls,
patch(f"{MODULE}.marketplace") as marketplace,
patch(f"{MODULE}.PluginService.invalidate_plugin_model_providers_cache") as invalidate_cache,
):
mock_config.MARKETPLACE_ENABLED = True
feature_service.get_system_features.return_value = SimpleNamespace(
plugin_installation_permission=SimpleNamespace(restrict_to_marketplace_only=False)
)
installer = installer_cls.return_value
installer.fetch_plugin_manifest.return_value = MagicMock()
installer.upgrade_plugin.return_value = "task-id"
from core.plugin.plugin_service import PluginService
result = PluginService.upgrade_plugin_with_marketplace("tenant-1", "old-uid", "new-uid")
assert result == "task-id"
marketplace.record_install_plugin_event.assert_called_once_with("new-uid")
invalidate_cache.assert_called_once_with("tenant-1")
def test_install_from_local_pkg_invalidates_model_provider_cache_for_tenant(self) -> None:
"""Starting a plugin install invalidates only the mutated tenant provider cache."""
with (
patch(f"{MODULE}.PluginService._check_marketplace_only_permission"),
patch(f"{MODULE}.PluginService._check_plugin_installation_scope"),
patch(f"{MODULE}.PluginInstaller") as installer_cls,
patch(f"{MODULE}.PluginService.invalidate_plugin_model_providers_cache") as invalidate_cache,
):
installer = installer_cls.return_value
decode_response = MagicMock()
decode_response.verification = None
installer.decode_plugin_from_identifier.return_value = decode_response
installer.install_from_identifiers.return_value = "task-id"
from core.plugin.plugin_service import PluginService
result = PluginService.install_from_local_pkg("tenant-1", ["langgenius/openai:1.0.0"])
assert result == "task-id"
invalidate_cache.assert_called_once_with("tenant-1")
def test_upgrade_plugin_with_github_invalidates_model_provider_cache_for_tenant(self) -> None:
"""Starting a plugin upgrade invalidates only the mutated tenant provider cache."""
with (
patch(f"{MODULE}.PluginService._check_marketplace_only_permission"),
patch(f"{MODULE}.PluginInstaller") as installer_cls,
patch(f"{MODULE}.PluginService.invalidate_plugin_model_providers_cache") as invalidate_cache,
):
installer = installer_cls.return_value
installer.upgrade_plugin.return_value = "task-id"
from core.plugin.plugin_service import PluginService
result = PluginService.upgrade_plugin_with_github(
"tenant-1", "old-uid", "new-uid", "langgenius/openai", "1.0.0", "openai.difypkg"
)
assert result == "task-id"
invalidate_cache.assert_called_once_with("tenant-1")
def test_install_from_github_invalidates_model_provider_cache_for_tenant(self) -> None:
"""GitHub installs invalidate only the mutated tenant provider cache."""
with (
patch(f"{MODULE}.PluginService._check_marketplace_only_permission"),
patch(f"{MODULE}.PluginService._check_plugin_installation_scope"),
patch(f"{MODULE}.PluginInstaller") as installer_cls,
patch(f"{MODULE}.PluginService.invalidate_plugin_model_providers_cache") as invalidate_cache,
):
installer = installer_cls.return_value
decode_response = MagicMock()
decode_response.verification = None
installer.decode_plugin_from_identifier.return_value = decode_response
installer.install_from_identifiers.return_value = "task-id"
from core.plugin.plugin_service import PluginService
result = PluginService.install_from_github(
"tenant-1", "langgenius/openai:1.0.0", "langgenius/openai", "1.0.0", "openai.difypkg"
)
assert result == "task-id"
invalidate_cache.assert_called_once_with("tenant-1")
def test_install_from_marketplace_pkg_invalidates_model_provider_cache_for_tenant(self) -> None:
"""Marketplace package installs invalidate only the mutated tenant provider cache."""
with (
patch(f"{MODULE}.dify_config") as mock_config,
patch(f"{MODULE}.FeatureService") as feature_service,
patch(f"{MODULE}.PluginService._check_plugin_installation_scope"),
patch(f"{MODULE}.PluginInstaller") as installer_cls,
patch(f"{MODULE}.PluginService.invalidate_plugin_model_providers_cache") as invalidate_cache,
):
mock_config.MARKETPLACE_ENABLED = True
feature_service.get_system_features.return_value = SimpleNamespace(
plugin_installation_permission=SimpleNamespace(restrict_to_marketplace_only=False)
)
installer = installer_cls.return_value
installer.fetch_plugin_manifest.return_value = MagicMock()
decode_response = MagicMock()
decode_response.verification = None
installer.decode_plugin_from_identifier.return_value = decode_response
installer.install_from_identifiers.return_value = "task-id"
from core.plugin.plugin_service import PluginService
result = PluginService.install_from_marketplace_pkg("tenant-1", ["langgenius/openai:1.0.0"])
assert result == "task-id"
invalidate_cache.assert_called_once_with("tenant-1")
def test_uninstall_invalidates_model_provider_cache_for_tenant(self) -> None:
"""Successful uninstall invalidates only the mutated tenant provider cache."""
with (
patch(f"{MODULE}.PluginInstaller") as installer_cls,
patch(f"{MODULE}.PluginService.invalidate_plugin_model_providers_cache") as invalidate_cache,
):
installer = installer_cls.return_value
installer.list_plugins.return_value = []
installer.uninstall.return_value = True
from core.plugin.plugin_service import PluginService
result = PluginService.uninstall("tenant-1", "installation-1")
assert result is True
invalidate_cache.assert_called_once_with("tenant-1")
def test_uninstall_existing_plugin_invalidates_cache_after_credential_cleanup(self) -> None:
"""Successful uninstall with plugin metadata also invalidates the mutated tenant provider cache."""
plugin = SimpleNamespace(
installation_id="installation-1",
plugin_id="langgenius/openai",
plugin_unique_identifier="langgenius/openai:1.0.0",
)
session = _FakeSession()
with (
patch(f"{MODULE}.db", SimpleNamespace(engine=object())),
patch(f"{MODULE}.dify_config") as mock_config,
patch(f"{MODULE}.PluginInstaller") as installer_cls,
patch(f"{MODULE}.Session", return_value=session),
patch(f"{MODULE}.PluginService.invalidate_plugin_model_providers_cache") as invalidate_cache,
):
mock_config.ENTERPRISE_ENABLED = False
installer = installer_cls.return_value
installer.list_plugins.return_value = [plugin]
installer.uninstall.return_value = True
from core.plugin.plugin_service import PluginService
result = PluginService.uninstall("tenant-1", "installation-1")
assert result is True
installer.uninstall.assert_called_once_with("tenant-1", "installation-1")
invalidate_cache.assert_called_once_with("tenant-1")