Merge branch 'main' into feat/trigger

This commit is contained in:
Yeuoly
2025-09-01 18:05:31 +08:00
480 changed files with 10562 additions and 4040 deletions

View File

@ -674,7 +674,7 @@ class TestAnnotationService:
history = (
db.session.query(AppAnnotationHitHistory)
.filter(
.where(
AppAnnotationHitHistory.annotation_id == annotation.id, AppAnnotationHitHistory.message_id == message_id
)
.first()

View File

@ -144,127 +144,6 @@ class TestAppDslService:
}
return yaml.dump(yaml_data, allow_unicode=True)
def test_import_app_yaml_content_success(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test successful app import from YAML content.
"""
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create YAML content
yaml_content = self._create_simple_yaml_content(fake.company(), "chat")
# Import app
dsl_service = AppDslService(db_session_with_containers)
result = dsl_service.import_app(
account=account,
import_mode=ImportMode.YAML_CONTENT,
yaml_content=yaml_content,
name="Imported App",
description="Imported app description",
)
# Verify import result
assert result.status == ImportStatus.COMPLETED
assert result.app_id is not None
assert result.app_mode == "chat"
assert result.imported_dsl_version == "0.3.0"
assert result.error == ""
# Verify app was created in database
imported_app = db_session_with_containers.query(App).filter(App.id == result.app_id).first()
assert imported_app is not None
assert imported_app.name == "Imported App"
assert imported_app.description == "Imported app description"
assert imported_app.mode == "chat"
assert imported_app.tenant_id == account.current_tenant_id
assert imported_app.created_by == account.id
# Verify model config was created
model_config = (
db_session_with_containers.query(AppModelConfig).filter(AppModelConfig.app_id == result.app_id).first()
)
assert model_config is not None
# The provider and model_id are stored in the model field as JSON
model_dict = model_config.model_dict
assert model_dict["provider"] == "openai"
assert model_dict["name"] == "gpt-3.5-turbo"
def test_import_app_yaml_url_success(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test successful app import from YAML URL.
"""
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create YAML content for mock response
yaml_content = self._create_simple_yaml_content(fake.company(), "chat")
# Setup mock response
mock_response = MagicMock()
mock_response.content = yaml_content.encode("utf-8")
mock_response.raise_for_status.return_value = None
mock_external_service_dependencies["ssrf_proxy"].get.return_value = mock_response
# Import app from URL
dsl_service = AppDslService(db_session_with_containers)
result = dsl_service.import_app(
account=account,
import_mode=ImportMode.YAML_URL,
yaml_url="https://example.com/app.yaml",
name="URL Imported App",
description="App imported from URL",
)
# Verify import result
assert result.status == ImportStatus.COMPLETED
assert result.app_id is not None
assert result.app_mode == "chat"
assert result.imported_dsl_version == "0.3.0"
assert result.error == ""
# Verify app was created in database
imported_app = db_session_with_containers.query(App).filter(App.id == result.app_id).first()
assert imported_app is not None
assert imported_app.name == "URL Imported App"
assert imported_app.description == "App imported from URL"
assert imported_app.mode == "chat"
assert imported_app.tenant_id == account.current_tenant_id
# Verify ssrf_proxy was called
mock_external_service_dependencies["ssrf_proxy"].get.assert_called_once_with(
"https://example.com/app.yaml", follow_redirects=True, timeout=(10, 10)
)
def test_import_app_invalid_yaml_format(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test app import with invalid YAML format.
"""
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create invalid YAML content
invalid_yaml = "invalid: yaml: content: ["
# Import app with invalid YAML
dsl_service = AppDslService(db_session_with_containers)
result = dsl_service.import_app(
account=account,
import_mode=ImportMode.YAML_CONTENT,
yaml_content=invalid_yaml,
name="Invalid App",
)
# Verify import failed
assert result.status == ImportStatus.FAILED
assert result.app_id is None
assert "Invalid YAML format" in result.error
assert result.imported_dsl_version == ""
# Verify no app was created in database
apps_count = db_session_with_containers.query(App).filter(App.tenant_id == account.current_tenant_id).count()
assert apps_count == 1 # Only the original test app
def test_import_app_missing_yaml_content(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test app import with missing YAML content.
@ -287,7 +166,7 @@ class TestAppDslService:
assert result.imported_dsl_version == ""
# Verify no app was created in database
apps_count = db_session_with_containers.query(App).filter(App.tenant_id == account.current_tenant_id).count()
apps_count = db_session_with_containers.query(App).where(App.tenant_id == account.current_tenant_id).count()
assert apps_count == 1 # Only the original test app
def test_import_app_missing_yaml_url(self, db_session_with_containers, mock_external_service_dependencies):
@ -312,7 +191,7 @@ class TestAppDslService:
assert result.imported_dsl_version == ""
# Verify no app was created in database
apps_count = db_session_with_containers.query(App).filter(App.tenant_id == account.current_tenant_id).count()
apps_count = db_session_with_containers.query(App).where(App.tenant_id == account.current_tenant_id).count()
assert apps_count == 1 # Only the original test app
def test_import_app_invalid_import_mode(self, db_session_with_containers, mock_external_service_dependencies):
@ -336,7 +215,7 @@ class TestAppDslService:
)
# Verify no app was created in database
apps_count = db_session_with_containers.query(App).filter(App.tenant_id == account.current_tenant_id).count()
apps_count = db_session_with_containers.query(App).where(App.tenant_id == account.current_tenant_id).count()
assert apps_count == 1 # Only the original test app
def test_export_dsl_chat_app_success(self, db_session_with_containers, mock_external_service_dependencies):

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,529 @@
from unittest.mock import patch
import pytest
from faker import Faker
from models.account import Account, Tenant, TenantAccountJoin, TenantAccountRole
from services.workspace_service import WorkspaceService
class TestWorkspaceService:
"""Integration tests for WorkspaceService using testcontainers."""
@pytest.fixture
def mock_external_service_dependencies(self):
"""Mock setup for external service dependencies."""
with (
patch("services.workspace_service.FeatureService") as mock_feature_service,
patch("services.workspace_service.TenantService") as mock_tenant_service,
patch("services.workspace_service.dify_config") as mock_dify_config,
):
# Setup default mock returns
mock_feature_service.get_features.return_value.can_replace_logo = True
mock_tenant_service.has_roles.return_value = True
mock_dify_config.FILES_URL = "https://example.com/files"
yield {
"feature_service": mock_feature_service,
"tenant_service": mock_tenant_service,
"dify_config": mock_dify_config,
}
def _create_test_account_and_tenant(self, db_session_with_containers, mock_external_service_dependencies):
"""
Helper method to create a test account and tenant for testing.
Args:
db_session_with_containers: Database session from testcontainers infrastructure
mock_external_service_dependencies: Mock dependencies
Returns:
tuple: (account, tenant) - Created account and tenant instances
"""
fake = Faker()
# Create account
account = Account(
email=fake.email(),
name=fake.name(),
interface_language="en-US",
status="active",
)
from extensions.ext_database import db
db.session.add(account)
db.session.commit()
# Create tenant
tenant = Tenant(
name=fake.company(),
status="normal",
plan="basic",
custom_config='{"replace_webapp_logo": true, "remove_webapp_brand": false}',
)
db.session.add(tenant)
db.session.commit()
# Create tenant-account join with owner role
join = TenantAccountJoin(
tenant_id=tenant.id,
account_id=account.id,
role=TenantAccountRole.OWNER.value,
current=True,
)
db.session.add(join)
db.session.commit()
# Set current tenant for account
account.current_tenant = tenant
return account, tenant
def test_get_tenant_info_success(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test successful retrieval of tenant information with all features enabled.
This test verifies:
- Proper tenant info retrieval with all required fields
- Correct role assignment from TenantAccountJoin
- Custom config handling when features are enabled
- Logo replacement functionality for privileged users
"""
# Arrange: Create test data
fake = Faker()
account, tenant = self._create_test_account_and_tenant(
db_session_with_containers, mock_external_service_dependencies
)
# Setup mocks for feature service
mock_external_service_dependencies["feature_service"].get_features.return_value.can_replace_logo = True
mock_external_service_dependencies["tenant_service"].has_roles.return_value = True
# Mock current_user for flask_login
with patch("services.workspace_service.current_user", account):
# Act: Execute the method under test
result = WorkspaceService.get_tenant_info(tenant)
# Assert: Verify the expected outcomes
assert result is not None
assert result["id"] == tenant.id
assert result["name"] == tenant.name
assert result["plan"] == tenant.plan
assert result["status"] == tenant.status
assert result["role"] == TenantAccountRole.OWNER.value
assert result["created_at"] == tenant.created_at
assert result["trial_end_reason"] is None
# Verify custom config is included for privileged users
assert "custom_config" in result
assert result["custom_config"]["remove_webapp_brand"] is False
assert "replace_webapp_logo" in result["custom_config"]
# Verify database state
from extensions.ext_database import db
db.session.refresh(tenant)
assert tenant.id is not None
def test_get_tenant_info_without_custom_config(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test tenant info retrieval when custom config features are disabled.
This test verifies:
- Tenant info retrieval without custom config when features are disabled
- Proper handling of disabled logo replacement functionality
- Role assignment still works correctly
- Basic tenant information is complete
"""
# Arrange: Create test data
fake = Faker()
account, tenant = self._create_test_account_and_tenant(
db_session_with_containers, mock_external_service_dependencies
)
# Setup mocks to disable custom config features
mock_external_service_dependencies["feature_service"].get_features.return_value.can_replace_logo = False
mock_external_service_dependencies["tenant_service"].has_roles.return_value = False
# Mock current_user for flask_login
with patch("services.workspace_service.current_user", account):
# Act: Execute the method under test
result = WorkspaceService.get_tenant_info(tenant)
# Assert: Verify the expected outcomes
assert result is not None
assert result["id"] == tenant.id
assert result["name"] == tenant.name
assert result["plan"] == tenant.plan
assert result["status"] == tenant.status
assert result["role"] == TenantAccountRole.OWNER.value
assert result["created_at"] == tenant.created_at
assert result["trial_end_reason"] is None
# Verify custom config is not included when features are disabled
assert "custom_config" not in result
# Verify database state
from extensions.ext_database import db
db.session.refresh(tenant)
assert tenant.id is not None
def test_get_tenant_info_with_normal_user_role(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test tenant info retrieval for normal user role without privileged features.
This test verifies:
- Tenant info retrieval for non-privileged users
- Role assignment for normal users
- Custom config is not accessible for normal users
- Proper handling of different user roles
"""
# Arrange: Create test data
fake = Faker()
account, tenant = self._create_test_account_and_tenant(
db_session_with_containers, mock_external_service_dependencies
)
# Update the join to have normal role
from extensions.ext_database import db
join = db.session.query(TenantAccountJoin).filter_by(tenant_id=tenant.id, account_id=account.id).first()
join.role = TenantAccountRole.NORMAL.value
db.session.commit()
# Setup mocks for feature service
mock_external_service_dependencies["feature_service"].get_features.return_value.can_replace_logo = True
mock_external_service_dependencies["tenant_service"].has_roles.return_value = False
# Mock current_user for flask_login
with patch("services.workspace_service.current_user", account):
# Act: Execute the method under test
result = WorkspaceService.get_tenant_info(tenant)
# Assert: Verify the expected outcomes
assert result is not None
assert result["id"] == tenant.id
assert result["name"] == tenant.name
assert result["plan"] == tenant.plan
assert result["status"] == tenant.status
assert result["role"] == TenantAccountRole.NORMAL.value
assert result["created_at"] == tenant.created_at
assert result["trial_end_reason"] is None
# Verify custom config is not included for normal users
assert "custom_config" not in result
# Verify database state
db.session.refresh(tenant)
assert tenant.id is not None
def test_get_tenant_info_with_admin_role_and_logo_replacement(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test tenant info retrieval for admin role with logo replacement enabled.
This test verifies:
- Admin role can access custom config features
- Logo replacement functionality works for admin users
- Proper URL construction for logo replacement
- Custom config handling for admin role
"""
# Arrange: Create test data
fake = Faker()
account, tenant = self._create_test_account_and_tenant(
db_session_with_containers, mock_external_service_dependencies
)
# Update the join to have admin role
from extensions.ext_database import db
join = db.session.query(TenantAccountJoin).filter_by(tenant_id=tenant.id, account_id=account.id).first()
join.role = TenantAccountRole.ADMIN.value
db.session.commit()
# Setup mocks for feature service and tenant service
mock_external_service_dependencies["feature_service"].get_features.return_value.can_replace_logo = True
mock_external_service_dependencies["tenant_service"].has_roles.return_value = True
mock_external_service_dependencies["dify_config"].FILES_URL = "https://cdn.example.com"
# Mock current_user for flask_login
with patch("services.workspace_service.current_user", account):
# Act: Execute the method under test
result = WorkspaceService.get_tenant_info(tenant)
# Assert: Verify the expected outcomes
assert result is not None
assert result["role"] == TenantAccountRole.ADMIN.value
# Verify custom config is included for admin users
assert "custom_config" in result
assert result["custom_config"]["remove_webapp_brand"] is False
assert "replace_webapp_logo" in result["custom_config"]
# Verify database state
db.session.refresh(tenant)
assert tenant.id is not None
def test_get_tenant_info_with_tenant_none(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test tenant info retrieval when tenant parameter is None.
This test verifies:
- Proper handling of None tenant parameter
- Method returns None for invalid input
- No exceptions are raised for None input
- Graceful degradation for invalid data
"""
# Arrange: No test data needed for this test
# Act: Execute the method under test with None tenant
result = WorkspaceService.get_tenant_info(None)
# Assert: Verify the expected outcomes
assert result is None
def test_get_tenant_info_with_custom_config_variations(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test tenant info retrieval with various custom config configurations.
This test verifies:
- Different custom config combinations work correctly
- Logo replacement URL construction with various configs
- Brand removal functionality
- Edge cases in custom config handling
"""
# Arrange: Create test data with different custom configs
fake = Faker()
account, tenant = self._create_test_account_and_tenant(
db_session_with_containers, mock_external_service_dependencies
)
# Test different custom config combinations
test_configs = [
# Case 1: Both logo and brand removal enabled
{"replace_webapp_logo": True, "remove_webapp_brand": True},
# Case 2: Only logo replacement enabled
{"replace_webapp_logo": True, "remove_webapp_brand": False},
# Case 3: Only brand removal enabled
{"replace_webapp_logo": False, "remove_webapp_brand": True},
# Case 4: Neither enabled
{"replace_webapp_logo": False, "remove_webapp_brand": False},
]
for config in test_configs:
# Update tenant custom config
import json
from extensions.ext_database import db
tenant.custom_config = json.dumps(config)
db.session.commit()
# Setup mocks
mock_external_service_dependencies["feature_service"].get_features.return_value.can_replace_logo = True
mock_external_service_dependencies["tenant_service"].has_roles.return_value = True
mock_external_service_dependencies["dify_config"].FILES_URL = "https://files.example.com"
# Mock current_user for flask_login
with patch("services.workspace_service.current_user", account):
# Act: Execute the method under test
result = WorkspaceService.get_tenant_info(tenant)
# Assert: Verify the expected outcomes
assert result is not None
assert "custom_config" in result
if config["replace_webapp_logo"]:
assert "replace_webapp_logo" in result["custom_config"]
if config["replace_webapp_logo"]:
expected_url = f"https://files.example.com/files/workspaces/{tenant.id}/webapp-logo"
assert result["custom_config"]["replace_webapp_logo"] == expected_url
else:
assert result["custom_config"]["replace_webapp_logo"] is None
assert result["custom_config"]["remove_webapp_brand"] == config["remove_webapp_brand"]
# Verify database state
db.session.refresh(tenant)
assert tenant.id is not None
def test_get_tenant_info_with_editor_role_and_limited_permissions(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test tenant info retrieval for editor role with limited permissions.
This test verifies:
- Editor role has limited access to custom config features
- Proper role-based permission checking
- Custom config handling for different role levels
- Role hierarchy and permission boundaries
"""
# Arrange: Create test data
fake = Faker()
account, tenant = self._create_test_account_and_tenant(
db_session_with_containers, mock_external_service_dependencies
)
# Update the join to have editor role
from extensions.ext_database import db
join = db.session.query(TenantAccountJoin).filter_by(tenant_id=tenant.id, account_id=account.id).first()
join.role = TenantAccountRole.EDITOR.value
db.session.commit()
# Setup mocks for feature service and tenant service
mock_external_service_dependencies["feature_service"].get_features.return_value.can_replace_logo = True
# Editor role should not have admin/owner permissions
mock_external_service_dependencies["tenant_service"].has_roles.return_value = False
mock_external_service_dependencies["dify_config"].FILES_URL = "https://cdn.example.com"
# Mock current_user for flask_login
with patch("services.workspace_service.current_user", account):
# Act: Execute the method under test
result = WorkspaceService.get_tenant_info(tenant)
# Assert: Verify the expected outcomes
assert result is not None
assert result["role"] == TenantAccountRole.EDITOR.value
# Verify custom config is not included for editor users without admin privileges
assert "custom_config" not in result
# Verify database state
db.session.refresh(tenant)
assert tenant.id is not None
def test_get_tenant_info_with_dataset_operator_role(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test tenant info retrieval for dataset operator role.
This test verifies:
- Dataset operator role handling
- Role assignment for specialized roles
- Permission boundaries for dataset operators
- Custom config access for dataset operators
"""
# Arrange: Create test data
fake = Faker()
account, tenant = self._create_test_account_and_tenant(
db_session_with_containers, mock_external_service_dependencies
)
# Update the join to have dataset operator role
from extensions.ext_database import db
join = db.session.query(TenantAccountJoin).filter_by(tenant_id=tenant.id, account_id=account.id).first()
join.role = TenantAccountRole.DATASET_OPERATOR.value
db.session.commit()
# Setup mocks for feature service and tenant service
mock_external_service_dependencies["feature_service"].get_features.return_value.can_replace_logo = True
# Dataset operator should not have admin/owner permissions
mock_external_service_dependencies["tenant_service"].has_roles.return_value = False
mock_external_service_dependencies["dify_config"].FILES_URL = "https://cdn.example.com"
# Mock current_user for flask_login
with patch("services.workspace_service.current_user", account):
# Act: Execute the method under test
result = WorkspaceService.get_tenant_info(tenant)
# Assert: Verify the expected outcomes
assert result is not None
assert result["role"] == TenantAccountRole.DATASET_OPERATOR.value
# Verify custom config is not included for dataset operators without admin privileges
assert "custom_config" not in result
# Verify database state
db.session.refresh(tenant)
assert tenant.id is not None
def test_get_tenant_info_with_complex_custom_config_scenarios(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test tenant info retrieval with complex custom config scenarios.
This test verifies:
- Complex custom config combinations
- Edge cases in custom config handling
- URL construction with various configs
- Error handling for malformed configs
"""
# Arrange: Create test data
fake = Faker()
account, tenant = self._create_test_account_and_tenant(
db_session_with_containers, mock_external_service_dependencies
)
# Test complex custom config scenarios
test_configs = [
# Case 1: Empty custom config
{},
# Case 2: Custom config with only logo replacement
{"replace_webapp_logo": True},
# Case 3: Custom config with only brand removal
{"remove_webapp_brand": True},
# Case 4: Custom config with additional fields
{
"replace_webapp_logo": True,
"remove_webapp_brand": False,
"custom_field": "custom_value",
"nested_config": {"key": "value"},
},
# Case 5: Custom config with null values
{"replace_webapp_logo": None, "remove_webapp_brand": None},
]
for config in test_configs:
# Update tenant custom config
import json
from extensions.ext_database import db
tenant.custom_config = json.dumps(config)
db.session.commit()
# Setup mocks
mock_external_service_dependencies["feature_service"].get_features.return_value.can_replace_logo = True
mock_external_service_dependencies["tenant_service"].has_roles.return_value = True
mock_external_service_dependencies["dify_config"].FILES_URL = "https://files.example.com"
# Mock current_user for flask_login
with patch("services.workspace_service.current_user", account):
# Act: Execute the method under test
result = WorkspaceService.get_tenant_info(tenant)
# Assert: Verify the expected outcomes
assert result is not None
assert "custom_config" in result
# Verify logo replacement handling
if config.get("replace_webapp_logo"):
assert "replace_webapp_logo" in result["custom_config"]
expected_url = f"https://files.example.com/files/workspaces/{tenant.id}/webapp-logo"
assert result["custom_config"]["replace_webapp_logo"] == expected_url
else:
assert result["custom_config"]["replace_webapp_logo"] is None
# Verify brand removal handling
if "remove_webapp_brand" in config:
assert result["custom_config"]["remove_webapp_brand"] == config["remove_webapp_brand"]
else:
assert result["custom_config"]["remove_webapp_brand"] is False
# Verify database state
db.session.refresh(tenant)
assert tenant.id is not None

View File

@ -0,0 +1,550 @@
from unittest.mock import patch
import pytest
from faker import Faker
from models.account import Account, Tenant
from models.tools import ApiToolProvider
from services.tools.api_tools_manage_service import ApiToolManageService
class TestApiToolManageService:
"""Integration tests for ApiToolManageService using testcontainers."""
@pytest.fixture
def mock_external_service_dependencies(self):
"""Mock setup for external service dependencies."""
with (
patch("services.tools.api_tools_manage_service.ToolLabelManager") as mock_tool_label_manager,
patch("services.tools.api_tools_manage_service.create_tool_provider_encrypter") as mock_encrypter,
patch("services.tools.api_tools_manage_service.ApiToolProviderController") as mock_provider_controller,
):
# Setup default mock returns
mock_tool_label_manager.update_tool_labels.return_value = None
mock_encrypter.return_value = (mock_encrypter, None)
mock_encrypter.encrypt.return_value = {"encrypted": "credentials"}
mock_provider_controller.from_db.return_value = mock_provider_controller
mock_provider_controller.load_bundled_tools.return_value = None
yield {
"tool_label_manager": mock_tool_label_manager,
"encrypter": mock_encrypter,
"provider_controller": mock_provider_controller,
}
def _create_test_account_and_tenant(self, db_session_with_containers, mock_external_service_dependencies):
"""
Helper method to create a test account and tenant for testing.
Args:
db_session_with_containers: Database session from testcontainers infrastructure
mock_external_service_dependencies: Mock dependencies
Returns:
tuple: (account, tenant) - Created account and tenant instances
"""
fake = Faker()
# Create account
account = Account(
email=fake.email(),
name=fake.name(),
interface_language="en-US",
status="active",
)
from extensions.ext_database import db
db.session.add(account)
db.session.commit()
# Create tenant for the account
tenant = Tenant(
name=fake.company(),
status="normal",
)
db.session.add(tenant)
db.session.commit()
# Create tenant-account join
from models.account import TenantAccountJoin, TenantAccountRole
join = TenantAccountJoin(
tenant_id=tenant.id,
account_id=account.id,
role=TenantAccountRole.OWNER.value,
current=True,
)
db.session.add(join)
db.session.commit()
# Set current tenant for account
account.current_tenant = tenant
return account, tenant
def _create_test_openapi_schema(self):
"""Helper method to create a test OpenAPI schema."""
return """
{
"openapi": "3.0.0",
"info": {
"title": "Test API",
"version": "1.0.0",
"description": "Test API for testing purposes"
},
"servers": [
{
"url": "https://api.example.com",
"description": "Production server"
}
],
"paths": {
"/test": {
"get": {
"operationId": "testOperation",
"summary": "Test operation",
"responses": {
"200": {
"description": "Success"
}
}
}
}
}
}
"""
def test_parser_api_schema_success(
self, flask_req_ctx_with_containers, db_session_with_containers, mock_external_service_dependencies
):
"""
Test successful parsing of API schema.
This test verifies:
- Proper schema parsing with valid OpenAPI schema
- Correct credentials schema generation
- Proper warning handling
- Return value structure
"""
# Arrange: Create test schema
schema = self._create_test_openapi_schema()
# Act: Parse the schema
result = ApiToolManageService.parser_api_schema(schema)
# Assert: Verify the result structure
assert result is not None
assert "schema_type" in result
assert "parameters_schema" in result
assert "credentials_schema" in result
assert "warning" in result
# Verify credentials schema structure
credentials_schema = result["credentials_schema"]
assert len(credentials_schema) == 3
# Check auth_type field
auth_type_field = next(field for field in credentials_schema if field["name"] == "auth_type")
assert auth_type_field["required"] is True
assert auth_type_field["default"] == "none"
assert len(auth_type_field["options"]) == 2
# Check api_key_header field
api_key_header_field = next(field for field in credentials_schema if field["name"] == "api_key_header")
assert api_key_header_field["required"] is False
assert api_key_header_field["default"] == "api_key"
# Check api_key_value field
api_key_value_field = next(field for field in credentials_schema if field["name"] == "api_key_value")
assert api_key_value_field["required"] is False
assert api_key_value_field["default"] == ""
def test_parser_api_schema_invalid_schema(
self, flask_req_ctx_with_containers, db_session_with_containers, mock_external_service_dependencies
):
"""
Test parsing of invalid API schema.
This test verifies:
- Proper error handling for invalid schemas
- Correct exception type and message
- Error propagation from underlying parser
"""
# Arrange: Create invalid schema
invalid_schema = "invalid json schema"
# Act & Assert: Verify proper error handling
with pytest.raises(ValueError) as exc_info:
ApiToolManageService.parser_api_schema(invalid_schema)
assert "invalid schema" in str(exc_info.value)
def test_parser_api_schema_malformed_json(
self, flask_req_ctx_with_containers, db_session_with_containers, mock_external_service_dependencies
):
"""
Test parsing of malformed JSON schema.
This test verifies:
- Proper error handling for malformed JSON
- Correct exception type and message
- Error propagation from JSON parsing
"""
# Arrange: Create malformed JSON schema
malformed_schema = '{"openapi": "3.0.0", "info": {"title": "Test", "version": "1.0.0"}, "paths": {}}'
# Act & Assert: Verify proper error handling
with pytest.raises(ValueError) as exc_info:
ApiToolManageService.parser_api_schema(malformed_schema)
assert "invalid schema" in str(exc_info.value)
def test_convert_schema_to_tool_bundles_success(
self, flask_req_ctx_with_containers, db_session_with_containers, mock_external_service_dependencies
):
"""
Test successful conversion of schema to tool bundles.
This test verifies:
- Proper schema conversion with valid OpenAPI schema
- Correct tool bundles generation
- Proper schema type detection
- Return value structure
"""
# Arrange: Create test schema
schema = self._create_test_openapi_schema()
# Act: Convert schema to tool bundles
tool_bundles, schema_type = ApiToolManageService.convert_schema_to_tool_bundles(schema)
# Assert: Verify the result structure
assert tool_bundles is not None
assert isinstance(tool_bundles, list)
assert len(tool_bundles) > 0
assert schema_type is not None
assert isinstance(schema_type, str)
# Verify tool bundle structure
tool_bundle = tool_bundles[0]
assert hasattr(tool_bundle, "operation_id")
assert tool_bundle.operation_id == "testOperation"
def test_convert_schema_to_tool_bundles_with_extra_info(
self, flask_req_ctx_with_containers, db_session_with_containers, mock_external_service_dependencies
):
"""
Test successful conversion of schema to tool bundles with extra info.
This test verifies:
- Proper schema conversion with extra info parameter
- Correct tool bundles generation
- Extra info handling
- Return value structure
"""
# Arrange: Create test schema and extra info
schema = self._create_test_openapi_schema()
extra_info = {"description": "Custom description", "version": "2.0.0"}
# Act: Convert schema to tool bundles with extra info
tool_bundles, schema_type = ApiToolManageService.convert_schema_to_tool_bundles(schema, extra_info)
# Assert: Verify the result structure
assert tool_bundles is not None
assert isinstance(tool_bundles, list)
assert len(tool_bundles) > 0
assert schema_type is not None
assert isinstance(schema_type, str)
def test_convert_schema_to_tool_bundles_invalid_schema(
self, flask_req_ctx_with_containers, db_session_with_containers, mock_external_service_dependencies
):
"""
Test conversion of invalid schema to tool bundles.
This test verifies:
- Proper error handling for invalid schemas
- Correct exception type and message
- Error propagation from underlying parser
"""
# Arrange: Create invalid schema
invalid_schema = "invalid schema content"
# Act & Assert: Verify proper error handling
with pytest.raises(ValueError) as exc_info:
ApiToolManageService.convert_schema_to_tool_bundles(invalid_schema)
assert "invalid schema" in str(exc_info.value)
def test_create_api_tool_provider_success(
self, flask_req_ctx_with_containers, db_session_with_containers, mock_external_service_dependencies
):
"""
Test successful creation of API tool provider.
This test verifies:
- Proper provider creation with valid parameters
- Correct database state after creation
- Proper relationship establishment
- External service integration
- Return value correctness
"""
# Arrange: Create test data
fake = Faker()
account, tenant = self._create_test_account_and_tenant(
db_session_with_containers, mock_external_service_dependencies
)
provider_name = fake.company()
icon = {"type": "emoji", "value": "🔧"}
credentials = {"auth_type": "none", "api_key_header": "X-API-Key", "api_key_value": ""}
schema_type = "openapi"
schema = self._create_test_openapi_schema()
privacy_policy = "https://example.com/privacy"
custom_disclaimer = "Custom disclaimer text"
labels = ["test", "api"]
# Act: Create API tool provider
result = ApiToolManageService.create_api_tool_provider(
user_id=account.id,
tenant_id=tenant.id,
provider_name=provider_name,
icon=icon,
credentials=credentials,
schema_type=schema_type,
schema=schema,
privacy_policy=privacy_policy,
custom_disclaimer=custom_disclaimer,
labels=labels,
)
# Assert: Verify the result
assert result == {"result": "success"}
# Verify database state
from extensions.ext_database import db
provider = (
db.session.query(ApiToolProvider)
.filter(ApiToolProvider.tenant_id == tenant.id, ApiToolProvider.name == provider_name)
.first()
)
assert provider is not None
assert provider.name == provider_name
assert provider.tenant_id == tenant.id
assert provider.user_id == account.id
assert provider.schema_type_str == schema_type
assert provider.privacy_policy == privacy_policy
assert provider.custom_disclaimer == custom_disclaimer
# Verify mock interactions
mock_external_service_dependencies["tool_label_manager"].update_tool_labels.assert_called_once()
mock_external_service_dependencies["encrypter"].assert_called_once()
mock_external_service_dependencies["provider_controller"].from_db.assert_called_once()
mock_external_service_dependencies["provider_controller"].load_bundled_tools.assert_called_once()
def test_create_api_tool_provider_duplicate_name(
self, flask_req_ctx_with_containers, db_session_with_containers, mock_external_service_dependencies
):
"""
Test creation of API tool provider with duplicate name.
This test verifies:
- Proper error handling for duplicate provider names
- Correct exception type and message
- Database constraint enforcement
"""
# Arrange: Create test data and existing provider
fake = Faker()
account, tenant = self._create_test_account_and_tenant(
db_session_with_containers, mock_external_service_dependencies
)
provider_name = fake.company()
icon = {"type": "emoji", "value": "🔧"}
credentials = {"auth_type": "none"}
schema_type = "openapi"
schema = self._create_test_openapi_schema()
privacy_policy = "https://example.com/privacy"
custom_disclaimer = "Custom disclaimer text"
labels = ["test"]
# Create first provider
ApiToolManageService.create_api_tool_provider(
user_id=account.id,
tenant_id=tenant.id,
provider_name=provider_name,
icon=icon,
credentials=credentials,
schema_type=schema_type,
schema=schema,
privacy_policy=privacy_policy,
custom_disclaimer=custom_disclaimer,
labels=labels,
)
# Act & Assert: Try to create duplicate provider
with pytest.raises(ValueError) as exc_info:
ApiToolManageService.create_api_tool_provider(
user_id=account.id,
tenant_id=tenant.id,
provider_name=provider_name,
icon=icon,
credentials=credentials,
schema_type=schema_type,
schema=schema,
privacy_policy=privacy_policy,
custom_disclaimer=custom_disclaimer,
labels=labels,
)
assert f"provider {provider_name} already exists" in str(exc_info.value)
def test_create_api_tool_provider_invalid_schema_type(
self, flask_req_ctx_with_containers, db_session_with_containers, mock_external_service_dependencies
):
"""
Test creation of API tool provider with invalid schema type.
This test verifies:
- Proper error handling for invalid schema types
- Correct exception type and message
- Schema type validation
"""
# Arrange: Create test data with invalid schema type
fake = Faker()
account, tenant = self._create_test_account_and_tenant(
db_session_with_containers, mock_external_service_dependencies
)
provider_name = fake.company()
icon = {"type": "emoji", "value": "🔧"}
credentials = {"auth_type": "none"}
schema_type = "invalid_type"
schema = self._create_test_openapi_schema()
privacy_policy = "https://example.com/privacy"
custom_disclaimer = "Custom disclaimer text"
labels = ["test"]
# Act & Assert: Try to create provider with invalid schema type
with pytest.raises(ValueError) as exc_info:
ApiToolManageService.create_api_tool_provider(
user_id=account.id,
tenant_id=tenant.id,
provider_name=provider_name,
icon=icon,
credentials=credentials,
schema_type=schema_type,
schema=schema,
privacy_policy=privacy_policy,
custom_disclaimer=custom_disclaimer,
labels=labels,
)
assert "invalid schema type" in str(exc_info.value)
def test_create_api_tool_provider_missing_auth_type(
self, flask_req_ctx_with_containers, db_session_with_containers, mock_external_service_dependencies
):
"""
Test creation of API tool provider with missing auth type.
This test verifies:
- Proper error handling for missing auth type
- Correct exception type and message
- Credentials validation
"""
# Arrange: Create test data with missing auth type
fake = Faker()
account, tenant = self._create_test_account_and_tenant(
db_session_with_containers, mock_external_service_dependencies
)
provider_name = fake.company()
icon = {"type": "emoji", "value": "🔧"}
credentials = {} # Missing auth_type
schema_type = "openapi"
schema = self._create_test_openapi_schema()
privacy_policy = "https://example.com/privacy"
custom_disclaimer = "Custom disclaimer text"
labels = ["test"]
# Act & Assert: Try to create provider with missing auth type
with pytest.raises(ValueError) as exc_info:
ApiToolManageService.create_api_tool_provider(
user_id=account.id,
tenant_id=tenant.id,
provider_name=provider_name,
icon=icon,
credentials=credentials,
schema_type=schema_type,
schema=schema,
privacy_policy=privacy_policy,
custom_disclaimer=custom_disclaimer,
labels=labels,
)
assert "auth_type is required" in str(exc_info.value)
def test_create_api_tool_provider_with_api_key_auth(
self, flask_req_ctx_with_containers, db_session_with_containers, mock_external_service_dependencies
):
"""
Test successful creation of API tool provider with API key authentication.
This test verifies:
- Proper provider creation with API key auth
- Correct credentials handling
- Proper authentication type processing
"""
# Arrange: Create test data with API key auth
fake = Faker()
account, tenant = self._create_test_account_and_tenant(
db_session_with_containers, mock_external_service_dependencies
)
provider_name = fake.company()
icon = {"type": "emoji", "value": "🔑"}
credentials = {"auth_type": "api_key", "api_key_header": "X-API-Key", "api_key_value": fake.uuid4()}
schema_type = "openapi"
schema = self._create_test_openapi_schema()
privacy_policy = "https://example.com/privacy"
custom_disclaimer = "Custom disclaimer text"
labels = ["api_key", "secure"]
# Act: Create API tool provider
result = ApiToolManageService.create_api_tool_provider(
user_id=account.id,
tenant_id=tenant.id,
provider_name=provider_name,
icon=icon,
credentials=credentials,
schema_type=schema_type,
schema=schema,
privacy_policy=privacy_policy,
custom_disclaimer=custom_disclaimer,
labels=labels,
)
# Assert: Verify the result
assert result == {"result": "success"}
# Verify database state
from extensions.ext_database import db
provider = (
db.session.query(ApiToolProvider)
.filter(ApiToolProvider.tenant_id == tenant.id, ApiToolProvider.name == provider_name)
.first()
)
assert provider is not None
assert provider.name == provider_name
assert provider.tenant_id == tenant.id
assert provider.user_id == account.id
assert provider.schema_type_str == schema_type
# Verify mock interactions
mock_external_service_dependencies["encrypter"].assert_called_once()
mock_external_service_dependencies["provider_controller"].from_db.assert_called_once()