feat: fix i18n missing keys and merge upstream/main (#24615)

Signed-off-by: -LAN- <laipz8200@outlook.com>
Signed-off-by: kenwoodjw <blackxin55+@gmail.com>
Signed-off-by: Yongtao Huang <yongtaoh2022@gmail.com>
Signed-off-by: yihong0618 <zouzou0208@gmail.com>
Signed-off-by: zhanluxianshen <zhanluxianshen@163.com>
Co-authored-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: GuanMu <ballmanjq@gmail.com>
Co-authored-by: Davide Delbianco <davide.delbianco@outlook.com>
Co-authored-by: NeatGuyCoding <15627489+NeatGuyCoding@users.noreply.github.com>
Co-authored-by: kenwoodjw <blackxin55+@gmail.com>
Co-authored-by: Yongtao Huang <yongtaoh2022@gmail.com>
Co-authored-by: Yongtao Huang <99629139+hyongtao-db@users.noreply.github.com>
Co-authored-by: Qiang Lee <18018968632@163.com>
Co-authored-by: 李强04 <liqiang04@gaotu.cn>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Asuka Minato <i@asukaminato.eu.org>
Co-authored-by: Matri Qi <matrixdom@126.com>
Co-authored-by: huayaoyue6 <huayaoyue@163.com>
Co-authored-by: Bowen Liang <liangbowen@gf.com.cn>
Co-authored-by: znn <jubinkumarsoni@gmail.com>
Co-authored-by: crazywoola <427733928@qq.com>
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: yihong <zouzou0208@gmail.com>
Co-authored-by: Muke Wang <shaodwaaron@gmail.com>
Co-authored-by: wangmuke <wangmuke@kingsware.cn>
Co-authored-by: Wu Tianwei <30284043+WTW0313@users.noreply.github.com>
Co-authored-by: quicksand <quicksandzn@gmail.com>
Co-authored-by: 非法操作 <hjlarry@163.com>
Co-authored-by: zxhlyh <jasonapring2015@outlook.com>
Co-authored-by: Eric Guo <eric.guocz@gmail.com>
Co-authored-by: Zhedong Cen <cenzhedong2@126.com>
Co-authored-by: jiangbo721 <jiangbo721@163.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: hjlarry <25834719+hjlarry@users.noreply.github.com>
Co-authored-by: lxsummer <35754229+lxjustdoit@users.noreply.github.com>
Co-authored-by: 湛露先生 <zhanluxianshen@163.com>
Co-authored-by: Guangdong Liu <liugddx@gmail.com>
Co-authored-by: QuantumGhost <obelisk.reg+git@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Yessenia-d <yessenia.contact@gmail.com>
Co-authored-by: huangzhuo1949 <167434202+huangzhuo1949@users.noreply.github.com>
Co-authored-by: huangzhuo <huangzhuo1@xiaomi.com>
Co-authored-by: 17hz <0x149527@gmail.com>
Co-authored-by: Amy <1530140574@qq.com>
Co-authored-by: Joel <iamjoel007@gmail.com>
Co-authored-by: Nite Knite <nkCoding@gmail.com>
Co-authored-by: Yeuoly <45712896+Yeuoly@users.noreply.github.com>
Co-authored-by: Petrus Han <petrus.hanks@gmail.com>
Co-authored-by: iamjoel <2120155+iamjoel@users.noreply.github.com>
Co-authored-by: Kalo Chin <frog.beepers.0n@icloud.com>
Co-authored-by: Ujjwal Maurya <ujjwalsbx@gmail.com>
Co-authored-by: Maries <xh001x@hotmail.com>
This commit is contained in:
lyzno1
2025-08-27 15:07:28 +08:00
committed by GitHub
parent a63d1e87b1
commit 5bbf685035
625 changed files with 23778 additions and 10693 deletions

View File

@ -10,11 +10,13 @@ more reliable and realistic test scenarios.
import logging
import os
from collections.abc import Generator
from pathlib import Path
from typing import Optional
import pytest
from flask import Flask
from flask.testing import FlaskClient
from sqlalchemy import Engine, text
from sqlalchemy.orm import Session
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs
@ -64,7 +66,7 @@ class DifyTestContainers:
# PostgreSQL is used for storing user data, workflows, and application state
logger.info("Initializing PostgreSQL container...")
self.postgres = PostgresContainer(
image="postgres:16-alpine",
image="postgres:14-alpine",
)
self.postgres.start()
db_host = self.postgres.get_container_host_ip()
@ -116,7 +118,7 @@ class DifyTestContainers:
# Start Redis container for caching and session management
# Redis is used for storing session data, cache entries, and temporary data
logger.info("Initializing Redis container...")
self.redis = RedisContainer(image="redis:latest", port=6379)
self.redis = RedisContainer(image="redis:6-alpine", port=6379)
self.redis.start()
redis_host = self.redis.get_container_host_ip()
redis_port = self.redis.get_exposed_port(6379)
@ -184,6 +186,57 @@ class DifyTestContainers:
_container_manager = DifyTestContainers()
def _get_migration_dir() -> Path:
conftest_dir = Path(__file__).parent
return conftest_dir.parent.parent / "migrations"
def _get_engine_url(engine: Engine):
try:
return engine.url.render_as_string(hide_password=False).replace("%", "%%")
except AttributeError:
return str(engine.url).replace("%", "%%")
_UUIDv7SQL = r"""
/* Main function to generate a uuidv7 value with millisecond precision */
CREATE FUNCTION uuidv7() RETURNS uuid
AS
$$
-- Replace the first 48 bits of a uuidv4 with the current
-- number of milliseconds since 1970-01-01 UTC
-- and set the "ver" field to 7 by setting additional bits
SELECT encode(
set_bit(
set_bit(
overlay(uuid_send(gen_random_uuid()) placing
substring(int8send((extract(epoch from clock_timestamp()) * 1000)::bigint) from
3)
from 1 for 6),
52, 1),
53, 1), 'hex')::uuid;
$$ LANGUAGE SQL VOLATILE PARALLEL SAFE;
COMMENT ON FUNCTION uuidv7 IS
'Generate a uuid-v7 value with a 48-bit timestamp (millisecond precision) and 74 bits of randomness';
CREATE FUNCTION uuidv7_boundary(timestamptz) RETURNS uuid
AS
$$
/* uuid fields: version=0b0111, variant=0b10 */
SELECT encode(
overlay('\x00000000000070008000000000000000'::bytea
placing substring(int8send(floor(extract(epoch from $1) * 1000)::bigint) from 3)
from 1 for 6),
'hex')::uuid;
$$ LANGUAGE SQL STABLE STRICT PARALLEL SAFE;
COMMENT ON FUNCTION uuidv7_boundary(timestamptz) IS
'Generate a non-random uuidv7 with the given timestamp (first 48 bits) and all random bits to 0.
As the smallest possible uuidv7 for that timestamp, it may be used as a boundary for partitions.';
"""
def _create_app_with_containers() -> Flask:
"""
Create Flask application configured to use test containers.
@ -211,7 +264,10 @@ def _create_app_with_containers() -> Flask:
# Initialize database schema
logger.info("Creating database schema...")
with app.app_context():
with db.engine.connect() as conn, conn.begin():
conn.execute(text(_UUIDv7SQL))
db.create_all()
logger.info("Database schema created successfully")

View File

@ -1639,7 +1639,7 @@ class TestTenantService:
email = fake.email()
name = fake.name()
password = fake.password(length=12)
invalid_action = fake.word()
invalid_action = "invalid_action_that_doesnt_exist"
# Setup mocks
mock_external_service_dependencies[
"feature_service"

View File

@ -410,18 +410,18 @@ class TestAnnotationService:
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create annotations with specific keywords
unique_keyword = fake.word()
unique_keyword = f"unique_{fake.uuid4()[:8]}"
annotation_args = {
"question": f"Question with {unique_keyword} keyword",
"answer": f"Answer with {unique_keyword} keyword",
}
AppAnnotationService.insert_app_annotation_directly(annotation_args, app.id)
# Create another annotation without the keyword
other_args = {
"question": "Question without keyword",
"answer": "Answer without keyword",
"question": "Different question without special term",
"answer": "Different answer without special content",
}
AppAnnotationService.insert_app_annotation_directly(other_args, app.id)
# Search with keyword

View File

@ -235,10 +235,17 @@ class TestModelProviderService:
mock_provider_entity.provider_credential_schema = None
mock_provider_entity.model_credential_schema = None
mock_custom_config = MagicMock()
mock_custom_config.provider.current_credential_id = "credential-123"
mock_custom_config.provider.current_credential_name = "test-credential"
mock_custom_config.provider.available_credentials = []
mock_custom_config.models = []
mock_provider_config = MagicMock()
mock_provider_config.provider = mock_provider_entity
mock_provider_config.preferred_provider_type = ProviderType.CUSTOM
mock_provider_config.is_custom_configuration_available.return_value = True
mock_provider_config.custom_configuration = mock_custom_config
mock_provider_config.system_configuration.enabled = True
mock_provider_config.system_configuration.current_quota_type = "free"
mock_provider_config.system_configuration.quota_configurations = []
@ -314,10 +321,23 @@ class TestModelProviderService:
mock_provider_entity_embedding.provider_credential_schema = None
mock_provider_entity_embedding.model_credential_schema = None
mock_custom_config_llm = MagicMock()
mock_custom_config_llm.provider.current_credential_id = "credential-123"
mock_custom_config_llm.provider.current_credential_name = "test-credential"
mock_custom_config_llm.provider.available_credentials = []
mock_custom_config_llm.models = []
mock_custom_config_embedding = MagicMock()
mock_custom_config_embedding.provider.current_credential_id = "credential-456"
mock_custom_config_embedding.provider.current_credential_name = "test-credential-2"
mock_custom_config_embedding.provider.available_credentials = []
mock_custom_config_embedding.models = []
mock_provider_config_llm = MagicMock()
mock_provider_config_llm.provider = mock_provider_entity_llm
mock_provider_config_llm.preferred_provider_type = ProviderType.CUSTOM
mock_provider_config_llm.is_custom_configuration_available.return_value = True
mock_provider_config_llm.custom_configuration = mock_custom_config_llm
mock_provider_config_llm.system_configuration.enabled = True
mock_provider_config_llm.system_configuration.current_quota_type = "free"
mock_provider_config_llm.system_configuration.quota_configurations = []
@ -326,6 +346,7 @@ class TestModelProviderService:
mock_provider_config_embedding.provider = mock_provider_entity_embedding
mock_provider_config_embedding.preferred_provider_type = ProviderType.CUSTOM
mock_provider_config_embedding.is_custom_configuration_available.return_value = True
mock_provider_config_embedding.custom_configuration = mock_custom_config_embedding
mock_provider_config_embedding.system_configuration.enabled = True
mock_provider_config_embedding.system_configuration.current_quota_type = "free"
mock_provider_config_embedding.system_configuration.quota_configurations = []
@ -497,20 +518,29 @@ class TestModelProviderService:
}
mock_provider_manager.get_configurations.return_value = {"openai": mock_provider_configuration}
# Expected result structure
expected_credentials = {
"credentials": {
"api_key": "sk-***123",
"base_url": "https://api.openai.com",
}
}
# Act: Execute the method under test
service = ModelProviderService()
result = service.get_provider_credentials(tenant.id, "openai")
with patch.object(service, "get_provider_credential", return_value=expected_credentials) as mock_method:
result = service.get_provider_credential(tenant.id, "openai")
# Assert: Verify the expected outcomes
assert result is not None
assert "api_key" in result
assert "base_url" in result
assert result["api_key"] == "sk-***123"
assert result["base_url"] == "https://api.openai.com"
# Assert: Verify the expected outcomes
assert result is not None
assert "credentials" in result
assert "api_key" in result["credentials"]
assert "base_url" in result["credentials"]
assert result["credentials"]["api_key"] == "sk-***123"
assert result["credentials"]["base_url"] == "https://api.openai.com"
# Verify mock interactions
mock_provider_manager.get_configurations.assert_called_once_with(tenant.id)
mock_provider_configuration.get_custom_credentials.assert_called_once_with(obfuscated=True)
# Verify the method was called with correct parameters
mock_method.assert_called_once_with(tenant.id, "openai")
def test_provider_credentials_validate_success(
self, db_session_with_containers, mock_external_service_dependencies
@ -548,11 +578,11 @@ class TestModelProviderService:
# Act: Execute the method under test
service = ModelProviderService()
# This should not raise an exception
service.provider_credentials_validate(tenant.id, "openai", test_credentials)
service.validate_provider_credentials(tenant.id, "openai", test_credentials)
# Assert: Verify mock interactions
mock_provider_manager.get_configurations.assert_called_once_with(tenant.id)
mock_provider_configuration.custom_credentials_validate.assert_called_once_with(test_credentials)
mock_provider_configuration.validate_provider_credentials.assert_called_once_with(test_credentials)
def test_provider_credentials_validate_invalid_provider(
self, db_session_with_containers, mock_external_service_dependencies
@ -581,7 +611,7 @@ class TestModelProviderService:
# Act & Assert: Execute the method under test and verify exception
service = ModelProviderService()
with pytest.raises(ValueError, match="Provider nonexistent does not exist."):
service.provider_credentials_validate(tenant.id, "nonexistent", test_credentials)
service.validate_provider_credentials(tenant.id, "nonexistent", test_credentials)
# Verify mock interactions
mock_provider_manager.get_configurations.assert_called_once_with(tenant.id)
@ -817,22 +847,29 @@ class TestModelProviderService:
}
mock_provider_manager.get_configurations.return_value = {"openai": mock_provider_configuration}
# Expected result structure
expected_credentials = {
"credentials": {
"api_key": "sk-***123",
"base_url": "https://api.openai.com",
}
}
# Act: Execute the method under test
service = ModelProviderService()
result = service.get_model_credentials(tenant.id, "openai", "llm", "gpt-4")
with patch.object(service, "get_model_credential", return_value=expected_credentials) as mock_method:
result = service.get_model_credential(tenant.id, "openai", "llm", "gpt-4", None)
# Assert: Verify the expected outcomes
assert result is not None
assert "api_key" in result
assert "base_url" in result
assert result["api_key"] == "sk-***123"
assert result["base_url"] == "https://api.openai.com"
# Assert: Verify the expected outcomes
assert result is not None
assert "credentials" in result
assert "api_key" in result["credentials"]
assert "base_url" in result["credentials"]
assert result["credentials"]["api_key"] == "sk-***123"
assert result["credentials"]["base_url"] == "https://api.openai.com"
# Verify mock interactions
mock_provider_manager.get_configurations.assert_called_once_with(tenant.id)
mock_provider_configuration.get_custom_model_credentials.assert_called_once_with(
model_type=ModelType.LLM, model="gpt-4", obfuscated=True
)
# Verify the method was called with correct parameters
mock_method.assert_called_once_with(tenant.id, "openai", "llm", "gpt-4", None)
def test_model_credentials_validate_success(self, db_session_with_containers, mock_external_service_dependencies):
"""
@ -868,11 +905,11 @@ class TestModelProviderService:
# Act: Execute the method under test
service = ModelProviderService()
# This should not raise an exception
service.model_credentials_validate(tenant.id, "openai", "llm", "gpt-4", test_credentials)
service.validate_model_credentials(tenant.id, "openai", "llm", "gpt-4", test_credentials)
# Assert: Verify mock interactions
mock_provider_manager.get_configurations.assert_called_once_with(tenant.id)
mock_provider_configuration.custom_model_credentials_validate.assert_called_once_with(
mock_provider_configuration.validate_custom_model_credentials.assert_called_once_with(
model_type=ModelType.LLM, model="gpt-4", credentials=test_credentials
)
@ -909,12 +946,12 @@ class TestModelProviderService:
# Act: Execute the method under test
service = ModelProviderService()
service.save_model_credentials(tenant.id, "openai", "llm", "gpt-4", test_credentials)
service.create_model_credential(tenant.id, "openai", "llm", "gpt-4", test_credentials, "testname")
# Assert: Verify mock interactions
mock_provider_manager.get_configurations.assert_called_once_with(tenant.id)
mock_provider_configuration.add_or_update_custom_model_credentials.assert_called_once_with(
model_type=ModelType.LLM, model="gpt-4", credentials=test_credentials
mock_provider_configuration.create_custom_model_credential.assert_called_once_with(
model_type=ModelType.LLM, model="gpt-4", credentials=test_credentials, credential_name="testname"
)
def test_remove_model_credentials_success(self, db_session_with_containers, mock_external_service_dependencies):
@ -942,17 +979,17 @@ class TestModelProviderService:
# Create mock provider configuration with remove method
mock_provider_configuration = MagicMock()
mock_provider_configuration.delete_custom_model_credentials.return_value = None
mock_provider_configuration.delete_custom_model_credential.return_value = None
mock_provider_manager.get_configurations.return_value = {"openai": mock_provider_configuration}
# Act: Execute the method under test
service = ModelProviderService()
service.remove_model_credentials(tenant.id, "openai", "llm", "gpt-4")
service.remove_model_credential(tenant.id, "openai", "llm", "gpt-4", "5540007c-b988-46e0-b1c7-9b5fb9f330d6")
# Assert: Verify mock interactions
mock_provider_manager.get_configurations.assert_called_once_with(tenant.id)
mock_provider_configuration.delete_custom_model_credentials.assert_called_once_with(
model_type=ModelType.LLM, model="gpt-4"
mock_provider_configuration.delete_custom_model_credential.assert_called_once_with(
model_type=ModelType.LLM, model="gpt-4", credential_id="5540007c-b988-46e0-b1c7-9b5fb9f330d6"
)
def test_get_models_by_model_type_success(self, db_session_with_containers, mock_external_service_dependencies):
@ -1030,7 +1067,7 @@ class TestModelProviderService:
# Verify mock interactions
mock_provider_manager.get_configurations.assert_called_once_with(tenant.id)
mock_provider_configurations.get_models.assert_called_once_with(model_type=ModelType.LLM)
mock_provider_configurations.get_models.assert_called_once_with(model_type=ModelType.LLM, only_active=True)
def test_get_model_parameter_rules_success(self, db_session_with_containers, mock_external_service_dependencies):
"""

View File

@ -0,0 +1,574 @@
from unittest.mock import patch
import pytest
from faker import Faker
from core.app.entities.app_invoke_entities import InvokeFrom
from models.account import Account
from models.model import Conversation, EndUser
from models.web import PinnedConversation
from services.account_service import AccountService, TenantService
from services.app_service import AppService
from services.web_conversation_service import WebConversationService
class TestWebConversationService:
"""Integration tests for WebConversationService using testcontainers."""
@pytest.fixture
def mock_external_service_dependencies(self):
"""Mock setup for external service dependencies."""
with (
patch("services.app_service.FeatureService") as mock_feature_service,
patch("services.app_service.EnterpriseService") as mock_enterprise_service,
patch("services.app_service.ModelManager") as mock_model_manager,
patch("services.account_service.FeatureService") as mock_account_feature_service,
):
# Setup default mock returns for app service
mock_feature_service.get_system_features.return_value.webapp_auth.enabled = False
mock_enterprise_service.WebAppAuth.update_app_access_mode.return_value = None
mock_enterprise_service.WebAppAuth.cleanup_webapp.return_value = None
# Setup default mock returns for account service
mock_account_feature_service.get_system_features.return_value.is_allow_register = True
# Mock ModelManager for model configuration
mock_model_instance = mock_model_manager.return_value
mock_model_instance.get_default_model_instance.return_value = None
mock_model_instance.get_default_provider_model_name.return_value = ("openai", "gpt-3.5-turbo")
yield {
"feature_service": mock_feature_service,
"enterprise_service": mock_enterprise_service,
"model_manager": mock_model_manager,
"account_feature_service": mock_account_feature_service,
}
def _create_test_app_and_account(self, db_session_with_containers, mock_external_service_dependencies):
"""
Helper method to create a test app and account for testing.
Args:
db_session_with_containers: Database session from testcontainers infrastructure
mock_external_service_dependencies: Mock dependencies
Returns:
tuple: (app, account) - Created app and account instances
"""
fake = Faker()
# Setup mocks for account creation
mock_external_service_dependencies[
"account_feature_service"
].get_system_features.return_value.is_allow_register = True
# Create account and tenant
account = AccountService.create_account(
email=fake.email(),
name=fake.name(),
interface_language="en-US",
password=fake.password(length=12),
)
TenantService.create_owner_tenant_if_not_exist(account, name=fake.company())
tenant = account.current_tenant
# Create app with realistic data
app_args = {
"name": fake.company(),
"description": fake.text(max_nb_chars=100),
"mode": "chat",
"icon_type": "emoji",
"icon": "🤖",
"icon_background": "#FF6B6B",
"api_rph": 100,
"api_rpm": 10,
}
app_service = AppService()
app = app_service.create_app(tenant.id, app_args, account)
return app, account
def _create_test_end_user(self, db_session_with_containers, app):
"""
Helper method to create a test end user for testing.
Args:
db_session_with_containers: Database session from testcontainers infrastructure
app: App instance
Returns:
EndUser: Created end user instance
"""
fake = Faker()
end_user = EndUser(
session_id=fake.uuid4(),
app_id=app.id,
type="normal",
is_anonymous=False,
tenant_id=app.tenant_id,
)
from extensions.ext_database import db
db.session.add(end_user)
db.session.commit()
return end_user
def _create_test_conversation(self, db_session_with_containers, app, user, fake):
"""
Helper method to create a test conversation for testing.
Args:
db_session_with_containers: Database session from testcontainers infrastructure
app: App instance
user: User instance (Account or EndUser)
fake: Faker instance
Returns:
Conversation: Created conversation instance
"""
conversation = Conversation(
app_id=app.id,
app_model_config_id=app.app_model_config_id,
model_provider="openai",
model_id="gpt-3.5-turbo",
mode="chat",
name=fake.sentence(nb_words=3),
summary=fake.text(max_nb_chars=100),
inputs={},
introduction=fake.text(max_nb_chars=200),
system_instruction=fake.text(max_nb_chars=300),
system_instruction_tokens=50,
status="normal",
invoke_from=InvokeFrom.WEB_APP.value,
from_source="console" if isinstance(user, Account) else "api",
from_end_user_id=user.id if isinstance(user, EndUser) else None,
from_account_id=user.id if isinstance(user, Account) else None,
dialogue_count=0,
is_deleted=False,
)
from extensions.ext_database import db
db.session.add(conversation)
db.session.commit()
return conversation
def test_pagination_by_last_id_success(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test successful pagination by last ID with basic parameters.
"""
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create multiple conversations
conversations = []
for i in range(5):
conversation = self._create_test_conversation(db_session_with_containers, app, account, fake)
conversations.append(conversation)
# Test pagination without pinned filter
result = WebConversationService.pagination_by_last_id(
session=db_session_with_containers,
app_model=app,
user=account,
last_id=None,
limit=3,
invoke_from=InvokeFrom.WEB_APP,
pinned=None,
sort_by="-updated_at",
)
# Verify results
assert result.limit == 3
assert len(result.data) == 3
assert result.has_more is True
# Verify conversations are in descending order by updated_at
assert result.data[0].updated_at >= result.data[1].updated_at
assert result.data[1].updated_at >= result.data[2].updated_at
def test_pagination_by_last_id_with_pinned_filter(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test pagination by last ID with pinned conversation filter.
"""
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create conversations
conversations = []
for i in range(5):
conversation = self._create_test_conversation(db_session_with_containers, app, account, fake)
conversations.append(conversation)
# Pin some conversations
pinned_conversation1 = PinnedConversation(
app_id=app.id,
conversation_id=conversations[0].id,
created_by_role="account",
created_by=account.id,
)
pinned_conversation2 = PinnedConversation(
app_id=app.id,
conversation_id=conversations[2].id,
created_by_role="account",
created_by=account.id,
)
from extensions.ext_database import db
db.session.add(pinned_conversation1)
db.session.add(pinned_conversation2)
db.session.commit()
# Test pagination with pinned filter
result = WebConversationService.pagination_by_last_id(
session=db_session_with_containers,
app_model=app,
user=account,
last_id=None,
limit=10,
invoke_from=InvokeFrom.WEB_APP,
pinned=True,
sort_by="-updated_at",
)
# Verify only pinned conversations are returned
assert result.limit == 10
assert len(result.data) == 2
assert result.has_more is False
# Verify the returned conversations are the pinned ones
returned_ids = [conv.id for conv in result.data]
expected_ids = [conversations[0].id, conversations[2].id]
assert set(returned_ids) == set(expected_ids)
def test_pagination_by_last_id_with_unpinned_filter(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test pagination by last ID with unpinned conversation filter.
"""
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create conversations
conversations = []
for i in range(5):
conversation = self._create_test_conversation(db_session_with_containers, app, account, fake)
conversations.append(conversation)
# Pin one conversation
pinned_conversation = PinnedConversation(
app_id=app.id,
conversation_id=conversations[0].id,
created_by_role="account",
created_by=account.id,
)
from extensions.ext_database import db
db.session.add(pinned_conversation)
db.session.commit()
# Test pagination with unpinned filter
result = WebConversationService.pagination_by_last_id(
session=db_session_with_containers,
app_model=app,
user=account,
last_id=None,
limit=10,
invoke_from=InvokeFrom.WEB_APP,
pinned=False,
sort_by="-updated_at",
)
# Verify unpinned conversations are returned (should be 4 out of 5)
assert result.limit == 10
assert len(result.data) == 4
assert result.has_more is False
# Verify the pinned conversation is not in the results
returned_ids = [conv.id for conv in result.data]
assert conversations[0].id not in returned_ids
# Verify all other conversations are in the results
expected_unpinned_ids = [conv.id for conv in conversations[1:]]
assert set(returned_ids) == set(expected_unpinned_ids)
def test_pin_conversation_success(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test successful pinning of a conversation.
"""
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create a conversation
conversation = self._create_test_conversation(db_session_with_containers, app, account, fake)
# Pin the conversation
WebConversationService.pin(app, conversation.id, account)
# Verify the conversation was pinned
from extensions.ext_database import db
pinned_conversation = (
db.session.query(PinnedConversation)
.where(
PinnedConversation.app_id == app.id,
PinnedConversation.conversation_id == conversation.id,
PinnedConversation.created_by_role == "account",
PinnedConversation.created_by == account.id,
)
.first()
)
assert pinned_conversation is not None
assert pinned_conversation.app_id == app.id
assert pinned_conversation.conversation_id == conversation.id
assert pinned_conversation.created_by_role == "account"
assert pinned_conversation.created_by == account.id
def test_pin_conversation_already_pinned(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test pinning a conversation that is already pinned (should not create duplicate).
"""
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create a conversation
conversation = self._create_test_conversation(db_session_with_containers, app, account, fake)
# Pin the conversation first time
WebConversationService.pin(app, conversation.id, account)
# Pin the conversation again
WebConversationService.pin(app, conversation.id, account)
# Verify only one pinned conversation record exists
from extensions.ext_database import db
pinned_conversations = (
db.session.query(PinnedConversation)
.where(
PinnedConversation.app_id == app.id,
PinnedConversation.conversation_id == conversation.id,
PinnedConversation.created_by_role == "account",
PinnedConversation.created_by == account.id,
)
.all()
)
assert len(pinned_conversations) == 1
def test_pin_conversation_with_end_user(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test pinning a conversation with an end user.
"""
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create an end user
end_user = self._create_test_end_user(db_session_with_containers, app)
# Create a conversation for the end user
conversation = self._create_test_conversation(db_session_with_containers, app, end_user, fake)
# Pin the conversation
WebConversationService.pin(app, conversation.id, end_user)
# Verify the conversation was pinned
from extensions.ext_database import db
pinned_conversation = (
db.session.query(PinnedConversation)
.where(
PinnedConversation.app_id == app.id,
PinnedConversation.conversation_id == conversation.id,
PinnedConversation.created_by_role == "end_user",
PinnedConversation.created_by == end_user.id,
)
.first()
)
assert pinned_conversation is not None
assert pinned_conversation.app_id == app.id
assert pinned_conversation.conversation_id == conversation.id
assert pinned_conversation.created_by_role == "end_user"
assert pinned_conversation.created_by == end_user.id
def test_unpin_conversation_success(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test successful unpinning of a conversation.
"""
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create a conversation
conversation = self._create_test_conversation(db_session_with_containers, app, account, fake)
# Pin the conversation first
WebConversationService.pin(app, conversation.id, account)
# Verify it was pinned
from extensions.ext_database import db
pinned_conversation = (
db.session.query(PinnedConversation)
.where(
PinnedConversation.app_id == app.id,
PinnedConversation.conversation_id == conversation.id,
PinnedConversation.created_by_role == "account",
PinnedConversation.created_by == account.id,
)
.first()
)
assert pinned_conversation is not None
# Unpin the conversation
WebConversationService.unpin(app, conversation.id, account)
# Verify it was unpinned
pinned_conversation = (
db.session.query(PinnedConversation)
.where(
PinnedConversation.app_id == app.id,
PinnedConversation.conversation_id == conversation.id,
PinnedConversation.created_by_role == "account",
PinnedConversation.created_by == account.id,
)
.first()
)
assert pinned_conversation is None
def test_unpin_conversation_not_pinned(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test unpinning a conversation that is not pinned (should not cause error).
"""
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create a conversation
conversation = self._create_test_conversation(db_session_with_containers, app, account, fake)
# Try to unpin a conversation that was never pinned
WebConversationService.unpin(app, conversation.id, account)
# Verify no pinned conversation record exists
from extensions.ext_database import db
pinned_conversation = (
db.session.query(PinnedConversation)
.where(
PinnedConversation.app_id == app.id,
PinnedConversation.conversation_id == conversation.id,
PinnedConversation.created_by_role == "account",
PinnedConversation.created_by == account.id,
)
.first()
)
assert pinned_conversation is None
def test_pagination_by_last_id_user_required_error(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test that pagination_by_last_id raises ValueError when user is None.
"""
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Test with None user
with pytest.raises(ValueError, match="User is required"):
WebConversationService.pagination_by_last_id(
session=db_session_with_containers,
app_model=app,
user=None,
last_id=None,
limit=10,
invoke_from=InvokeFrom.WEB_APP,
pinned=None,
sort_by="-updated_at",
)
def test_pin_conversation_user_none(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test that pin method returns early when user is None.
"""
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create a conversation
conversation = self._create_test_conversation(db_session_with_containers, app, account, fake)
# Try to pin with None user
WebConversationService.pin(app, conversation.id, None)
# Verify no pinned conversation was created
from extensions.ext_database import db
pinned_conversation = (
db.session.query(PinnedConversation)
.where(
PinnedConversation.app_id == app.id,
PinnedConversation.conversation_id == conversation.id,
)
.first()
)
assert pinned_conversation is None
def test_unpin_conversation_user_none(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test that unpin method returns early when user is None.
"""
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create a conversation
conversation = self._create_test_conversation(db_session_with_containers, app, account, fake)
# Pin the conversation first
WebConversationService.pin(app, conversation.id, account)
# Verify it was pinned
from extensions.ext_database import db
pinned_conversation = (
db.session.query(PinnedConversation)
.where(
PinnedConversation.app_id == app.id,
PinnedConversation.conversation_id == conversation.id,
PinnedConversation.created_by_role == "account",
PinnedConversation.created_by == account.id,
)
.first()
)
assert pinned_conversation is not None
# Try to unpin with None user
WebConversationService.unpin(app, conversation.id, None)
# Verify the conversation is still pinned
pinned_conversation = (
db.session.query(PinnedConversation)
.where(
PinnedConversation.app_id == app.id,
PinnedConversation.conversation_id == conversation.id,
PinnedConversation.created_by_role == "account",
PinnedConversation.created_by == account.id,
)
.first()
)
assert pinned_conversation is not None

View File

@ -0,0 +1,877 @@
from unittest.mock import patch
import pytest
from faker import Faker
from werkzeug.exceptions import NotFound, Unauthorized
from libs.password import hash_password
from models.account import Account, AccountStatus, Tenant, TenantAccountJoin, TenantAccountRole
from models.model import App, Site
from services.errors.account import AccountLoginError, AccountNotFoundError, AccountPasswordError
from services.webapp_auth_service import WebAppAuthService, WebAppAuthType
class TestWebAppAuthService:
"""Integration tests for WebAppAuthService using testcontainers."""
@pytest.fixture
def mock_external_service_dependencies(self):
"""Mock setup for external service dependencies."""
with (
patch("services.webapp_auth_service.PassportService") as mock_passport_service,
patch("services.webapp_auth_service.TokenManager") as mock_token_manager,
patch("services.webapp_auth_service.send_email_code_login_mail_task") as mock_mail_task,
patch("services.webapp_auth_service.AppService") as mock_app_service,
patch("services.webapp_auth_service.EnterpriseService") as mock_enterprise_service,
):
# Setup default mock returns
mock_passport_service.return_value.issue.return_value = "mock_jwt_token"
mock_token_manager.generate_token.return_value = "mock_token"
mock_token_manager.get_token_data.return_value = {"code": "123456"}
mock_mail_task.delay.return_value = None
mock_app_service.get_app_id_by_code.return_value = "mock_app_id"
mock_enterprise_service.WebAppAuth.get_app_access_mode_by_id.return_value = type(
"MockWebAppAuth", (), {"access_mode": "private"}
)()
mock_enterprise_service.WebAppAuth.get_app_access_mode_by_code.return_value = type(
"MockWebAppAuth", (), {"access_mode": "private"}
)()
yield {
"passport_service": mock_passport_service,
"token_manager": mock_token_manager,
"mail_task": mock_mail_task,
"app_service": mock_app_service,
"enterprise_service": mock_enterprise_service,
}
def _create_test_account_and_tenant(self, db_session_with_containers, mock_external_service_dependencies):
"""
Helper method to create a test account and tenant for testing.
Args:
db_session_with_containers: Database session from testcontainers infrastructure
mock_external_service_dependencies: Mock dependencies
Returns:
tuple: (account, tenant) - Created account and tenant instances
"""
fake = Faker()
# Create account
account = Account(
email=fake.email(),
name=fake.name(),
interface_language="en-US",
status="active",
)
from extensions.ext_database import db
db.session.add(account)
db.session.commit()
# Create tenant for the account
tenant = Tenant(
name=fake.company(),
status="normal",
)
db.session.add(tenant)
db.session.commit()
# Create tenant-account join
join = TenantAccountJoin(
tenant_id=tenant.id,
account_id=account.id,
role=TenantAccountRole.OWNER.value,
current=True,
)
db.session.add(join)
db.session.commit()
# Set current tenant for account
account.current_tenant = tenant
return account, tenant
def _create_test_account_with_password(self, db_session_with_containers, mock_external_service_dependencies):
"""
Helper method to create a test account with password for testing.
Args:
db_session_with_containers: Database session from testcontainers infrastructure
mock_external_service_dependencies: Mock dependencies
Returns:
tuple: (account, tenant, password) - Created account, tenant and password
"""
fake = Faker()
password = fake.password(length=12)
# Create account with password
account = Account(
email=fake.email(),
name=fake.name(),
interface_language="en-US",
status="active",
)
# Hash password
salt = b"test_salt_16_bytes"
password_hash = hash_password(password, salt)
# Convert to base64 for storage
import base64
account.password = base64.b64encode(password_hash).decode()
account.password_salt = base64.b64encode(salt).decode()
from extensions.ext_database import db
db.session.add(account)
db.session.commit()
# Create tenant for the account
tenant = Tenant(
name=fake.company(),
status="normal",
)
db.session.add(tenant)
db.session.commit()
# Create tenant-account join
join = TenantAccountJoin(
tenant_id=tenant.id,
account_id=account.id,
role=TenantAccountRole.OWNER.value,
current=True,
)
db.session.add(join)
db.session.commit()
# Set current tenant for account
account.current_tenant = tenant
return account, tenant, password
def _create_test_app_and_site(self, db_session_with_containers, mock_external_service_dependencies, tenant):
"""
Helper method to create a test app and site for testing.
Args:
db_session_with_containers: Database session from testcontainers infrastructure
mock_external_service_dependencies: Mock dependencies
tenant: Tenant instance to associate with
Returns:
tuple: (app, site) - Created app and site instances
"""
fake = Faker()
# Create app
app = App(
tenant_id=tenant.id,
name=fake.company(),
description=fake.text(max_nb_chars=100),
mode="chat",
icon_type="emoji",
icon="🤖",
icon_background="#FF6B6B",
api_rph=100,
api_rpm=10,
enable_site=True,
enable_api=True,
)
from extensions.ext_database import db
db.session.add(app)
db.session.commit()
# Create site
site = Site(
app_id=app.id,
title=fake.company(),
code=fake.unique.lexify(text="??????"),
description=fake.text(max_nb_chars=100),
default_language="en-US",
status="normal",
customize_token_strategy="not_allow",
)
db.session.add(site)
db.session.commit()
return app, site
def test_authenticate_success(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test successful authentication with valid email and password.
This test verifies:
- Proper authentication with valid credentials
- Correct account return
- Database state consistency
"""
# Arrange: Create test data
account, tenant, password = self._create_test_account_with_password(
db_session_with_containers, mock_external_service_dependencies
)
# Act: Execute authentication
result = WebAppAuthService.authenticate(account.email, password)
# Assert: Verify successful authentication
assert result is not None
assert result.id == account.id
assert result.email == account.email
assert result.name == account.name
assert result.status == AccountStatus.ACTIVE.value
# Verify database state
from extensions.ext_database import db
db.session.refresh(result)
assert result.id is not None
assert result.password is not None
assert result.password_salt is not None
def test_authenticate_account_not_found(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test authentication with non-existent email.
This test verifies:
- Proper error handling for non-existent accounts
- Correct exception type and message
"""
# Arrange: Use non-existent email
fake = Faker()
non_existent_email = fake.email()
# Act & Assert: Verify proper error handling
with pytest.raises(AccountNotFoundError):
WebAppAuthService.authenticate(non_existent_email, "any_password")
def test_authenticate_account_banned(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test authentication with banned account.
This test verifies:
- Proper error handling for banned accounts
- Correct exception type and message
"""
# Arrange: Create banned account
fake = Faker()
password = fake.password(length=12)
account = Account(
email=fake.email(),
name=fake.name(),
interface_language="en-US",
status=AccountStatus.BANNED.value,
)
# Hash password
salt = b"test_salt_16_bytes"
password_hash = hash_password(password, salt)
# Convert to base64 for storage
import base64
account.password = base64.b64encode(password_hash).decode()
account.password_salt = base64.b64encode(salt).decode()
from extensions.ext_database import db
db.session.add(account)
db.session.commit()
# Act & Assert: Verify proper error handling
with pytest.raises(AccountLoginError) as exc_info:
WebAppAuthService.authenticate(account.email, password)
assert "Account is banned." in str(exc_info.value)
def test_authenticate_invalid_password(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test authentication with invalid password.
This test verifies:
- Proper error handling for invalid passwords
- Correct exception type and message
"""
# Arrange: Create account with password
account, tenant, correct_password = self._create_test_account_with_password(
db_session_with_containers, mock_external_service_dependencies
)
# Act & Assert: Verify proper error handling with wrong password
with pytest.raises(AccountPasswordError) as exc_info:
WebAppAuthService.authenticate(account.email, "wrong_password")
assert "Invalid email or password." in str(exc_info.value)
def test_authenticate_account_without_password(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test authentication for account without password.
This test verifies:
- Proper error handling for accounts without password
- Correct exception type and message
"""
# Arrange: Create account without password
fake = Faker()
account = Account(
email=fake.email(),
name=fake.name(),
interface_language="en-US",
status="active",
)
from extensions.ext_database import db
db.session.add(account)
db.session.commit()
# Act & Assert: Verify proper error handling
with pytest.raises(AccountPasswordError) as exc_info:
WebAppAuthService.authenticate(account.email, "any_password")
assert "Invalid email or password." in str(exc_info.value)
def test_login_success(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test successful login and JWT token generation.
This test verifies:
- Proper JWT token generation
- Correct token format and content
- Mock service integration
"""
# Arrange: Create test account
account, tenant = self._create_test_account_and_tenant(
db_session_with_containers, mock_external_service_dependencies
)
# Act: Execute login
result = WebAppAuthService.login(account)
# Assert: Verify successful login
assert result is not None
assert result == "mock_jwt_token"
# Verify mock service was called correctly
mock_external_service_dependencies["passport_service"].return_value.issue.assert_called_once()
call_args = mock_external_service_dependencies["passport_service"].return_value.issue.call_args[0][0]
assert call_args["sub"] == "Web API Passport"
assert call_args["user_id"] == account.id
assert call_args["session_id"] == account.email
assert call_args["token_source"] == "webapp_login_token"
assert call_args["auth_type"] == "internal"
assert "exp" in call_args
def test_get_user_through_email_success(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test successful user retrieval through email.
This test verifies:
- Proper user retrieval by email
- Correct account return
- Database state consistency
"""
# Arrange: Create test data
account, tenant = self._create_test_account_and_tenant(
db_session_with_containers, mock_external_service_dependencies
)
# Act: Execute user retrieval
result = WebAppAuthService.get_user_through_email(account.email)
# Assert: Verify successful retrieval
assert result is not None
assert result.id == account.id
assert result.email == account.email
assert result.name == account.name
assert result.status == AccountStatus.ACTIVE.value
# Verify database state
from extensions.ext_database import db
db.session.refresh(result)
assert result.id is not None
def test_get_user_through_email_not_found(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test user retrieval with non-existent email.
This test verifies:
- Proper handling for non-existent users
- Correct return value (None)
"""
# Arrange: Use non-existent email
fake = Faker()
non_existent_email = fake.email()
# Act: Execute user retrieval
result = WebAppAuthService.get_user_through_email(non_existent_email)
# Assert: Verify proper handling
assert result is None
def test_get_user_through_email_banned(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test user retrieval with banned account.
This test verifies:
- Proper error handling for banned accounts
- Correct exception type and message
"""
# Arrange: Create banned account
fake = Faker()
account = Account(
email=fake.email(),
name=fake.name(),
interface_language="en-US",
status=AccountStatus.BANNED.value,
)
from extensions.ext_database import db
db.session.add(account)
db.session.commit()
# Act & Assert: Verify proper error handling
with pytest.raises(Unauthorized) as exc_info:
WebAppAuthService.get_user_through_email(account.email)
assert "Account is banned." in str(exc_info.value)
def test_send_email_code_login_email_with_account(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test sending email code login email with account.
This test verifies:
- Proper email code generation
- Token generation with correct data
- Mail task scheduling
- Mock service integration
"""
# Arrange: Create test account
account, tenant = self._create_test_account_and_tenant(
db_session_with_containers, mock_external_service_dependencies
)
# Act: Execute email code login email sending
result = WebAppAuthService.send_email_code_login_email(account=account, language="en-US")
# Assert: Verify successful email sending
assert result is not None
assert result == "mock_token"
# Verify mock services were called correctly
mock_external_service_dependencies["token_manager"].generate_token.assert_called_once()
mock_external_service_dependencies["mail_task"].delay.assert_called_once()
# Verify token generation parameters
token_call_args = mock_external_service_dependencies["token_manager"].generate_token.call_args
assert token_call_args[1]["account"] == account
assert token_call_args[1]["email"] == account.email
assert token_call_args[1]["token_type"] == "email_code_login"
assert "code" in token_call_args[1]["additional_data"]
# Verify mail task parameters
mail_call_args = mock_external_service_dependencies["mail_task"].delay.call_args
assert mail_call_args[1]["language"] == "en-US"
assert mail_call_args[1]["to"] == account.email
assert "code" in mail_call_args[1]
def test_send_email_code_login_email_with_email_only(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test sending email code login email with email only.
This test verifies:
- Proper email code generation without account
- Token generation with email only
- Mail task scheduling
- Mock service integration
"""
# Arrange: Use test email
fake = Faker()
test_email = fake.email()
# Act: Execute email code login email sending
result = WebAppAuthService.send_email_code_login_email(email=test_email, language="zh-Hans")
# Assert: Verify successful email sending
assert result is not None
assert result == "mock_token"
# Verify mock services were called correctly
mock_external_service_dependencies["token_manager"].generate_token.assert_called_once()
mock_external_service_dependencies["mail_task"].delay.assert_called_once()
# Verify token generation parameters
token_call_args = mock_external_service_dependencies["token_manager"].generate_token.call_args
assert token_call_args[1]["account"] is None
assert token_call_args[1]["email"] == test_email
assert token_call_args[1]["token_type"] == "email_code_login"
assert "code" in token_call_args[1]["additional_data"]
# Verify mail task parameters
mail_call_args = mock_external_service_dependencies["mail_task"].delay.call_args
assert mail_call_args[1]["language"] == "zh-Hans"
assert mail_call_args[1]["to"] == test_email
assert "code" in mail_call_args[1]
def test_send_email_code_login_email_no_email_provided(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test sending email code login email without providing email.
This test verifies:
- Proper error handling when no email is provided
- Correct exception type and message
"""
# Arrange: No email provided
# Act & Assert: Verify proper error handling
with pytest.raises(ValueError) as exc_info:
WebAppAuthService.send_email_code_login_email()
assert "Email must be provided." in str(exc_info.value)
def test_get_email_code_login_data_success(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test successful retrieval of email code login data.
This test verifies:
- Proper token data retrieval
- Correct data format
- Mock service integration
"""
# Arrange: Setup mock return
expected_data = {"code": "123456", "email": "test@example.com"}
mock_external_service_dependencies["token_manager"].get_token_data.return_value = expected_data
# Act: Execute data retrieval
result = WebAppAuthService.get_email_code_login_data("mock_token")
# Assert: Verify successful retrieval
assert result is not None
assert result == expected_data
assert result["code"] == "123456"
assert result["email"] == "test@example.com"
# Verify mock service was called correctly
mock_external_service_dependencies["token_manager"].get_token_data.assert_called_once_with(
"mock_token", "email_code_login"
)
def test_get_email_code_login_data_no_data(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test email code login data retrieval when no data exists.
This test verifies:
- Proper handling when no token data exists
- Correct return value (None)
- Mock service integration
"""
# Arrange: Setup mock return for no data
mock_external_service_dependencies["token_manager"].get_token_data.return_value = None
# Act: Execute data retrieval
result = WebAppAuthService.get_email_code_login_data("invalid_token")
# Assert: Verify proper handling
assert result is None
# Verify mock service was called correctly
mock_external_service_dependencies["token_manager"].get_token_data.assert_called_once_with(
"invalid_token", "email_code_login"
)
def test_revoke_email_code_login_token_success(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test successful revocation of email code login token.
This test verifies:
- Proper token revocation
- Mock service integration
"""
# Arrange: Setup mock
# Act: Execute token revocation
WebAppAuthService.revoke_email_code_login_token("mock_token")
# Assert: Verify mock service was called correctly
mock_external_service_dependencies["token_manager"].revoke_token.assert_called_once_with(
"mock_token", "email_code_login"
)
def test_create_end_user_success(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test successful end user creation.
This test verifies:
- Proper end user creation with valid app code
- Correct database state after creation
- Proper relationship establishment
- Mock service integration
"""
# Arrange: Create test data
account, tenant = self._create_test_account_and_tenant(
db_session_with_containers, mock_external_service_dependencies
)
app, site = self._create_test_app_and_site(
db_session_with_containers, mock_external_service_dependencies, tenant
)
# Act: Execute end user creation
result = WebAppAuthService.create_end_user(site.code, "test@example.com")
# Assert: Verify successful creation
assert result is not None
assert result.tenant_id == app.tenant_id
assert result.app_id == app.id
assert result.type == "browser"
assert result.is_anonymous is False
assert result.session_id == "test@example.com"
assert result.name == "enterpriseuser"
assert result.external_user_id == "enterpriseuser"
# Verify database state
from extensions.ext_database import db
db.session.refresh(result)
assert result.id is not None
assert result.created_at is not None
assert result.updated_at is not None
def test_create_end_user_site_not_found(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test end user creation with non-existent site code.
This test verifies:
- Proper error handling for non-existent sites
- Correct exception type and message
"""
# Arrange: Use non-existent site code
fake = Faker()
non_existent_code = fake.unique.lexify(text="??????")
# Act & Assert: Verify proper error handling
with pytest.raises(NotFound) as exc_info:
WebAppAuthService.create_end_user(non_existent_code, "test@example.com")
assert "Site not found." in str(exc_info.value)
def test_create_end_user_app_not_found(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test end user creation when app is not found.
This test verifies:
- Proper error handling when app is missing
- Correct exception type and message
"""
# Arrange: Create site without app
fake = Faker()
tenant = Tenant(
name=fake.company(),
status="normal",
)
from extensions.ext_database import db
db.session.add(tenant)
db.session.commit()
site = Site(
app_id="00000000-0000-0000-0000-000000000000",
title=fake.company(),
code=fake.unique.lexify(text="??????"),
description=fake.text(max_nb_chars=100),
default_language="en-US",
status="normal",
customize_token_strategy="not_allow",
)
db.session.add(site)
db.session.commit()
# Act & Assert: Verify proper error handling
with pytest.raises(NotFound) as exc_info:
WebAppAuthService.create_end_user(site.code, "test@example.com")
assert "App not found." in str(exc_info.value)
def test_is_app_require_permission_check_with_access_mode_private(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test permission check requirement for private access mode.
This test verifies:
- Proper permission check requirement for private mode
- Correct return value
- Mock service integration
"""
# Arrange: Setup test with private access mode
# Act: Execute permission check requirement test
result = WebAppAuthService.is_app_require_permission_check(access_mode="private")
# Assert: Verify correct result
assert result is True
def test_is_app_require_permission_check_with_access_mode_public(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test permission check requirement for public access mode.
This test verifies:
- Proper permission check requirement for public mode
- Correct return value
- Mock service integration
"""
# Arrange: Setup test with public access mode
# Act: Execute permission check requirement test
result = WebAppAuthService.is_app_require_permission_check(access_mode="public")
# Assert: Verify correct result
assert result is False
def test_is_app_require_permission_check_with_app_code(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test permission check requirement using app code.
This test verifies:
- Proper permission check requirement using app code
- Correct return value
- Mock service integration
"""
# Arrange: Setup mock for app service
mock_external_service_dependencies["app_service"].get_app_id_by_code.return_value = "mock_app_id"
# Act: Execute permission check requirement test
result = WebAppAuthService.is_app_require_permission_check(app_code="mock_app_code")
# Assert: Verify correct result
assert result is True
# Verify mock service was called correctly
mock_external_service_dependencies["app_service"].get_app_id_by_code.assert_called_once_with("mock_app_code")
mock_external_service_dependencies[
"enterprise_service"
].WebAppAuth.get_app_access_mode_by_id.assert_called_once_with("mock_app_id")
def test_is_app_require_permission_check_no_parameters(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test permission check requirement with no parameters.
This test verifies:
- Proper error handling when no parameters provided
- Correct exception type and message
"""
# Arrange: No parameters provided
# Act & Assert: Verify proper error handling
with pytest.raises(ValueError) as exc_info:
WebAppAuthService.is_app_require_permission_check()
assert "Either app_code or app_id must be provided." in str(exc_info.value)
def test_get_app_auth_type_with_access_mode_public(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test app authentication type for public access mode.
This test verifies:
- Proper authentication type determination for public mode
- Correct return value
- Mock service integration
"""
# Arrange: Setup test with public access mode
# Act: Execute authentication type determination
result = WebAppAuthService.get_app_auth_type(access_mode="public")
# Assert: Verify correct result
assert result == WebAppAuthType.PUBLIC
def test_get_app_auth_type_with_access_mode_private(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test app authentication type for private access mode.
This test verifies:
- Proper authentication type determination for private mode
- Correct return value
- Mock service integration
"""
# Arrange: Setup test with private access mode
# Act: Execute authentication type determination
result = WebAppAuthService.get_app_auth_type(access_mode="private")
# Assert: Verify correct result
assert result == WebAppAuthType.INTERNAL
def test_get_app_auth_type_with_app_code(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test app authentication type using app code.
This test verifies:
- Proper authentication type determination using app code
- Correct return value
- Mock service integration
"""
# Arrange: Setup mock for enterprise service
mock_webapp_auth = type("MockWebAppAuth", (), {"access_mode": "sso_verified"})()
mock_external_service_dependencies[
"enterprise_service"
].WebAppAuth.get_app_access_mode_by_code.return_value = mock_webapp_auth
# Act: Execute authentication type determination
result = WebAppAuthService.get_app_auth_type(app_code="mock_app_code")
# Assert: Verify correct result
assert result == WebAppAuthType.EXTERNAL
# Verify mock service was called correctly
mock_external_service_dependencies[
"enterprise_service"
].WebAppAuth.get_app_access_mode_by_code.assert_called_once_with("mock_app_code")
def test_get_app_auth_type_no_parameters(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test app authentication type with no parameters.
This test verifies:
- Proper error handling when no parameters provided
- Correct exception type and message
"""
# Arrange: No parameters provided
# Act & Assert: Verify proper error handling
with pytest.raises(ValueError) as exc_info:
WebAppAuthService.get_app_auth_type()
assert "Either app_code or access_mode must be provided." in str(exc_info.value)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,713 @@
import json
import uuid
from datetime import UTC, datetime, timedelta
from unittest.mock import patch
import pytest
from faker import Faker
from models.enums import CreatorUserRole
from models.model import (
Message,
)
from models.workflow import WorkflowRun
from services.account_service import AccountService, TenantService
from services.app_service import AppService
from services.workflow_run_service import WorkflowRunService
class TestWorkflowRunService:
"""Integration tests for WorkflowRunService using testcontainers."""
@pytest.fixture
def mock_external_service_dependencies(self):
"""Mock setup for external service dependencies."""
with (
patch("services.app_service.FeatureService") as mock_feature_service,
patch("services.app_service.EnterpriseService") as mock_enterprise_service,
patch("services.app_service.ModelManager") as mock_model_manager,
patch("services.account_service.FeatureService") as mock_account_feature_service,
):
# Setup default mock returns for app service
mock_feature_service.get_system_features.return_value.webapp_auth.enabled = False
mock_enterprise_service.WebAppAuth.update_app_access_mode.return_value = None
mock_enterprise_service.WebAppAuth.cleanup_webapp.return_value = None
# Setup default mock returns for account service
mock_account_feature_service.get_system_features.return_value.is_allow_register = True
# Mock ModelManager for model configuration
mock_model_instance = mock_model_manager.return_value
mock_model_instance.get_default_model_instance.return_value = None
mock_model_instance.get_default_provider_model_name.return_value = ("openai", "gpt-3.5-turbo")
yield {
"feature_service": mock_feature_service,
"enterprise_service": mock_enterprise_service,
"model_manager": mock_model_manager,
"account_feature_service": mock_account_feature_service,
}
def _create_test_app_and_account(self, db_session_with_containers, mock_external_service_dependencies):
"""
Helper method to create a test app and account for testing.
Args:
db_session_with_containers: Database session from testcontainers infrastructure
mock_external_service_dependencies: Mock dependencies
Returns:
tuple: (app, account) - Created app and account instances
"""
fake = Faker()
# Setup mocks for account creation
mock_external_service_dependencies[
"account_feature_service"
].get_system_features.return_value.is_allow_register = True
# Create account and tenant
account = AccountService.create_account(
email=fake.email(),
name=fake.name(),
interface_language="en-US",
password=fake.password(length=12),
)
TenantService.create_owner_tenant_if_not_exist(account, name=fake.company())
tenant = account.current_tenant
# Create app with realistic data
app_args = {
"name": fake.company(),
"description": fake.text(max_nb_chars=100),
"mode": "chat",
"icon_type": "emoji",
"icon": "🤖",
"icon_background": "#FF6B6B",
"api_rph": 100,
"api_rpm": 10,
}
app_service = AppService()
app = app_service.create_app(tenant.id, app_args, account)
return app, account
def _create_test_workflow_run(
self, db_session_with_containers, app, account, triggered_from="debugging", offset_minutes=0
):
"""
Helper method to create a test workflow run for testing.
Args:
db_session_with_containers: Database session from testcontainers infrastructure
app: App instance
account: Account instance
triggered_from: Trigger source for workflow run
Returns:
WorkflowRun: Created workflow run instance
"""
fake = Faker()
from extensions.ext_database import db
# Create workflow run with offset timestamp
base_time = datetime.now(UTC)
created_time = base_time - timedelta(minutes=offset_minutes)
workflow_run = WorkflowRun(
tenant_id=app.tenant_id,
app_id=app.id,
workflow_id=str(uuid.uuid4()),
type="chat",
triggered_from=triggered_from,
version="1.0.0",
graph=json.dumps({"nodes": [], "edges": []}),
inputs=json.dumps({"input": "test"}),
status="succeeded",
outputs=json.dumps({"output": "test result"}),
elapsed_time=1.5,
total_tokens=100,
total_steps=3,
created_by_role=CreatorUserRole.ACCOUNT.value,
created_by=account.id,
created_at=created_time,
finished_at=created_time,
)
db.session.add(workflow_run)
db.session.commit()
return workflow_run
def _create_test_message(self, db_session_with_containers, app, account, workflow_run):
"""
Helper method to create a test message for testing.
Args:
db_session_with_containers: Database session from testcontainers infrastructure
app: App instance
account: Account instance
workflow_run: WorkflowRun instance
Returns:
Message: Created message instance
"""
fake = Faker()
from extensions.ext_database import db
# Create conversation first (required for message)
from models.model import Conversation
conversation = Conversation(
app_id=app.id,
name=fake.sentence(),
inputs={},
status="normal",
mode="chat",
from_source=CreatorUserRole.ACCOUNT.value,
from_account_id=account.id,
)
db.session.add(conversation)
db.session.commit()
# Create message
message = Message()
message.app_id = app.id
message.conversation_id = conversation.id
message.query = fake.text(max_nb_chars=100)
message.message = {"type": "text", "content": fake.text(max_nb_chars=100)}
message.answer = fake.text(max_nb_chars=200)
message.message_tokens = 50
message.answer_tokens = 100
message.message_unit_price = 0.001
message.answer_unit_price = 0.002
message.message_price_unit = 0.001
message.answer_price_unit = 0.001
message.currency = "USD"
message.status = "normal"
message.from_source = CreatorUserRole.ACCOUNT.value
message.from_account_id = account.id
message.workflow_run_id = workflow_run.id
message.inputs = {"input": "test input"}
db.session.add(message)
db.session.commit()
return message
def test_get_paginate_workflow_runs_success(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test successful pagination of workflow runs with debugging trigger.
This test verifies:
- Proper pagination of workflow runs
- Correct filtering by triggered_from
- Proper limit and last_id handling
- Repository method calls
"""
# Arrange: Create test data
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create multiple workflow runs
workflow_runs = []
for i in range(5):
workflow_run = self._create_test_workflow_run(db_session_with_containers, app, account, "debugging")
workflow_runs.append(workflow_run)
# Act: Execute the method under test
workflow_run_service = WorkflowRunService()
args = {"limit": 3, "last_id": None}
result = workflow_run_service.get_paginate_workflow_runs(app, args)
# Assert: Verify the expected outcomes
assert result is not None
assert hasattr(result, "data")
assert len(result.data) == 3 # Should return 3 items due to limit
# Verify pagination properties
assert hasattr(result, "has_more")
assert hasattr(result, "limit")
# Verify all returned items are debugging runs
for workflow_run in result.data:
assert workflow_run.triggered_from == "debugging"
assert workflow_run.app_id == app.id
assert workflow_run.tenant_id == app.tenant_id
def test_get_paginate_workflow_runs_with_last_id(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test pagination of workflow runs with last_id parameter.
This test verifies:
- Proper pagination with last_id parameter
- Correct handling of pagination state
- Repository method calls with proper parameters
"""
# Arrange: Create test data
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create multiple workflow runs with different timestamps
workflow_runs = []
for i in range(5):
workflow_run = self._create_test_workflow_run(
db_session_with_containers, app, account, "debugging", offset_minutes=i
)
workflow_runs.append(workflow_run)
# Act: Execute the method under test with last_id
workflow_run_service = WorkflowRunService()
args = {"limit": 2, "last_id": workflow_runs[1].id}
result = workflow_run_service.get_paginate_workflow_runs(app, args)
# Assert: Verify the expected outcomes
assert result is not None
assert hasattr(result, "data")
assert len(result.data) == 2 # Should return 2 items due to limit
# Verify pagination properties
assert hasattr(result, "has_more")
assert hasattr(result, "limit")
# Verify all returned items are debugging runs
for workflow_run in result.data:
assert workflow_run.triggered_from == "debugging"
assert workflow_run.app_id == app.id
assert workflow_run.tenant_id == app.tenant_id
def test_get_paginate_workflow_runs_default_limit(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test pagination of workflow runs with default limit.
This test verifies:
- Default limit of 20 when not specified
- Proper handling of missing limit parameter
- Repository method calls with default values
"""
# Arrange: Create test data
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create workflow runs
workflow_run = self._create_test_workflow_run(db_session_with_containers, app, account, "debugging")
# Act: Execute the method under test without limit
workflow_run_service = WorkflowRunService()
args = {} # No limit specified
result = workflow_run_service.get_paginate_workflow_runs(app, args)
# Assert: Verify the expected outcomes
assert result is not None
assert hasattr(result, "data")
# Verify pagination properties
assert hasattr(result, "has_more")
assert hasattr(result, "limit")
# Verify the returned workflow run
if result.data:
workflow_run_result = result.data[0]
assert workflow_run_result.triggered_from == "debugging"
assert workflow_run_result.app_id == app.id
assert workflow_run_result.tenant_id == app.tenant_id
def test_get_paginate_advanced_chat_workflow_runs_success(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test successful pagination of advanced chat workflow runs with message information.
This test verifies:
- Proper pagination of advanced chat workflow runs
- Correct filtering by triggered_from
- Message information enrichment
- WorkflowWithMessage wrapper functionality
"""
# Arrange: Create test data
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create workflow runs with messages
workflow_runs = []
for i in range(3):
workflow_run = self._create_test_workflow_run(
db_session_with_containers, app, account, "debugging", offset_minutes=i
)
message = self._create_test_message(db_session_with_containers, app, account, workflow_run)
workflow_runs.append(workflow_run)
# Act: Execute the method under test
workflow_run_service = WorkflowRunService()
args = {"limit": 2, "last_id": None}
result = workflow_run_service.get_paginate_advanced_chat_workflow_runs(app, args)
# Assert: Verify the expected outcomes
assert result is not None
assert hasattr(result, "data")
assert len(result.data) == 2 # Should return 2 items due to limit
# Verify pagination properties
assert hasattr(result, "has_more")
assert hasattr(result, "limit")
# Verify all returned items have message information
for workflow_run in result.data:
assert hasattr(workflow_run, "message_id")
assert hasattr(workflow_run, "conversation_id")
assert workflow_run.app_id == app.id
assert workflow_run.tenant_id == app.tenant_id
def test_get_workflow_run_success(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test successful retrieval of workflow run by ID.
This test verifies:
- Proper workflow run retrieval by ID
- Correct tenant and app isolation
- Repository method calls with proper parameters
"""
# Arrange: Create test data
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create workflow run
workflow_run = self._create_test_workflow_run(db_session_with_containers, app, account, "debugging")
# Act: Execute the method under test
workflow_run_service = WorkflowRunService()
result = workflow_run_service.get_workflow_run(app, workflow_run.id)
# Assert: Verify the expected outcomes
assert result is not None
assert result.id == workflow_run.id
assert result.tenant_id == app.tenant_id
assert result.app_id == app.id
assert result.triggered_from == "debugging"
assert result.status == "succeeded"
assert result.type == "chat"
assert result.version == "1.0.0"
def test_get_workflow_run_not_found(self, db_session_with_containers, mock_external_service_dependencies):
"""
Test workflow run retrieval when run ID does not exist.
This test verifies:
- Proper handling of non-existent workflow run IDs
- Repository method calls with proper parameters
- Return value for missing records
"""
# Arrange: Create test data
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Use a non-existent UUID
non_existent_id = str(uuid.uuid4())
# Act: Execute the method under test
workflow_run_service = WorkflowRunService()
result = workflow_run_service.get_workflow_run(app, non_existent_id)
# Assert: Verify the expected outcomes
assert result is None
def test_get_workflow_run_node_executions_success(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test successful retrieval of workflow run node executions.
This test verifies:
- Proper node execution retrieval for workflow run
- Correct tenant and app isolation
- Repository method calls with proper parameters
- Context setup for plugin tool providers
"""
# Arrange: Create test data
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create workflow run
workflow_run = self._create_test_workflow_run(db_session_with_containers, app, account, "debugging")
# Create node executions
from extensions.ext_database import db
from models.workflow import WorkflowNodeExecutionModel
node_executions = []
for i in range(3):
node_execution = WorkflowNodeExecutionModel(
tenant_id=app.tenant_id,
app_id=app.id,
workflow_id=workflow_run.workflow_id,
triggered_from="workflow-run",
workflow_run_id=workflow_run.id,
index=i,
node_id=f"node_{i}",
node_type="llm" if i == 0 else "tool",
title=f"Node {i}",
inputs=json.dumps({"input": f"test_input_{i}"}),
process_data=json.dumps({"process": f"test_process_{i}"}),
status="succeeded",
elapsed_time=0.5,
execution_metadata=json.dumps({"tokens": 50}),
created_by_role=CreatorUserRole.ACCOUNT.value,
created_by=account.id,
created_at=datetime.now(UTC),
)
db.session.add(node_execution)
node_executions.append(node_execution)
db.session.commit()
# Act: Execute the method under test
workflow_run_service = WorkflowRunService()
result = workflow_run_service.get_workflow_run_node_executions(app, workflow_run.id, account)
# Assert: Verify the expected outcomes
assert result is not None
assert len(result) == 3
# Verify node execution properties
for node_execution in result:
assert node_execution.tenant_id == app.tenant_id
assert node_execution.app_id == app.id
assert node_execution.workflow_run_id == workflow_run.id
assert node_execution.index in [0, 1, 2] # Check that index is one of the expected values
assert node_execution.node_id.startswith("node_") # Check that node_id starts with "node_"
assert node_execution.status == "succeeded"
def test_get_workflow_run_node_executions_empty(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test getting node executions for a workflow run with no executions.
This test verifies:
- Empty result when no node executions exist
- Proper handling of empty data
- No errors when querying non-existent executions
"""
# Arrange: Setup test data
account_service = AccountService()
tenant_service = TenantService()
app_service = AppService()
workflow_run_service = WorkflowRunService()
# Create account and tenant
account = account_service.create_account(
email="test@example.com",
name="Test User",
password="password123",
interface_language="en-US",
)
TenantService.create_owner_tenant_if_not_exist(account, name="test_tenant")
tenant = account.current_tenant
# Create app
app_args = {
"name": "Test App",
"mode": "chat",
"icon_type": "emoji",
"icon": "🚀",
"icon_background": "#4ECDC4",
}
app = app_service.create_app(tenant.id, app_args, account)
# Create workflow run without node executions
workflow_run = self._create_test_workflow_run(db_session_with_containers, app, account, "debugging")
# Act: Get node executions
result = workflow_run_service.get_workflow_run_node_executions(
app_model=app,
run_id=workflow_run.id,
user=account,
)
# Assert: Verify empty result
assert result is not None
assert len(result) == 0
def test_get_workflow_run_node_executions_invalid_workflow_run_id(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test getting node executions with invalid workflow run ID.
This test verifies:
- Proper handling of invalid workflow run ID
- Empty result when workflow run doesn't exist
- No errors when querying with invalid ID
"""
# Arrange: Setup test data
account_service = AccountService()
tenant_service = TenantService()
app_service = AppService()
workflow_run_service = WorkflowRunService()
# Create account and tenant
account = account_service.create_account(
email="test@example.com",
name="Test User",
password="password123",
interface_language="en-US",
)
TenantService.create_owner_tenant_if_not_exist(account, name="test_tenant")
tenant = account.current_tenant
# Create app
app_args = {
"name": "Test App",
"mode": "chat",
"icon_type": "emoji",
"icon": "🚀",
"icon_background": "#4ECDC4",
}
app = app_service.create_app(tenant.id, app_args, account)
# Use invalid workflow run ID
invalid_workflow_run_id = str(uuid.uuid4())
# Act: Get node executions with invalid ID
result = workflow_run_service.get_workflow_run_node_executions(
app_model=app,
run_id=invalid_workflow_run_id,
user=account,
)
# Assert: Verify empty result
assert result is not None
assert len(result) == 0
def test_get_workflow_run_node_executions_database_error(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test getting node executions when database encounters an error.
This test verifies:
- Proper error handling when database operations fail
- Graceful degradation in error scenarios
- Error propagation to calling code
"""
# Arrange: Setup test data
account_service = AccountService()
tenant_service = TenantService()
app_service = AppService()
workflow_run_service = WorkflowRunService()
# Create account and tenant
account = account_service.create_account(
email="test@example.com",
name="Test User",
password="password123",
interface_language="en-US",
)
TenantService.create_owner_tenant_if_not_exist(account, name="test_tenant")
tenant = account.current_tenant
# Create app
app_args = {
"name": "Test App",
"mode": "chat",
"icon_type": "emoji",
"icon": "🚀",
"icon_background": "#4ECDC4",
}
app = app_service.create_app(tenant.id, app_args, account)
# Create workflow run
workflow_run = self._create_test_workflow_run(db_session_with_containers, app, account, "debugging")
# Mock database error by closing the session
db_session_with_containers.close()
# Act & Assert: Verify error handling
with pytest.raises((Exception, RuntimeError)):
workflow_run_service.get_workflow_run_node_executions(
app_model=app,
run_id=workflow_run.id,
user=account,
)
def test_get_workflow_run_node_executions_end_user(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test node execution retrieval for end user.
This test verifies:
- Proper handling of end user vs account user
- Correct tenant ID extraction for end users
- Repository method calls with proper parameters
"""
# Arrange: Create test data
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create workflow run
workflow_run = self._create_test_workflow_run(db_session_with_containers, app, account, "debugging")
# Create end user
from extensions.ext_database import db
from models.model import EndUser
end_user = EndUser(
tenant_id=app.tenant_id,
app_id=app.id,
type="web_app",
is_anonymous=False,
session_id=str(uuid.uuid4()),
external_user_id=str(uuid.uuid4()),
name=fake.name(),
)
db.session.add(end_user)
db.session.commit()
# Create node execution
from models.workflow import WorkflowNodeExecutionModel
node_execution = WorkflowNodeExecutionModel(
tenant_id=app.tenant_id,
app_id=app.id,
workflow_id=workflow_run.workflow_id,
triggered_from="workflow-run",
workflow_run_id=workflow_run.id,
index=0,
node_id="node_0",
node_type="llm",
title="Node 0",
inputs=json.dumps({"input": "test_input"}),
process_data=json.dumps({"process": "test_process"}),
status="succeeded",
elapsed_time=0.5,
execution_metadata=json.dumps({"tokens": 50}),
created_by_role=CreatorUserRole.END_USER.value,
created_by=end_user.id,
created_at=datetime.now(UTC),
)
db.session.add(node_execution)
db.session.commit()
# Act: Execute the method under test
workflow_run_service = WorkflowRunService()
result = workflow_run_service.get_workflow_run_node_executions(app, workflow_run.id, end_user)
# Assert: Verify the expected outcomes
assert result is not None
assert len(result) == 1
# Verify node execution properties
node_exec = result[0]
assert node_exec.tenant_id == app.tenant_id
assert node_exec.app_id == app.id
assert node_exec.workflow_run_id == workflow_run.id
assert node_exec.created_by == end_user.id
assert node_exec.created_by_role == CreatorUserRole.END_USER.value