mirror of
https://github.com/langgenius/dify.git
synced 2026-05-03 08:58:09 +08:00
Merge remote-tracking branch 'origin/main' into feat/trigger
This commit is contained in:
@ -427,8 +427,8 @@ CODE_EXECUTION_POOL_MAX_KEEPALIVE_CONNECTIONS=20
|
||||
CODE_EXECUTION_POOL_KEEPALIVE_EXPIRY=5.0
|
||||
CODE_MAX_NUMBER=9223372036854775807
|
||||
CODE_MIN_NUMBER=-9223372036854775808
|
||||
CODE_MAX_STRING_LENGTH=80000
|
||||
TEMPLATE_TRANSFORM_MAX_LENGTH=80000
|
||||
CODE_MAX_STRING_LENGTH=400000
|
||||
TEMPLATE_TRANSFORM_MAX_LENGTH=400000
|
||||
CODE_MAX_STRING_ARRAY_LENGTH=30
|
||||
CODE_MAX_OBJECT_ARRAY_LENGTH=30
|
||||
CODE_MAX_NUMBER_ARRAY_LENGTH=1000
|
||||
|
||||
@ -150,7 +150,7 @@ class CodeExecutionSandboxConfig(BaseSettings):
|
||||
|
||||
CODE_MAX_STRING_LENGTH: PositiveInt = Field(
|
||||
description="Maximum allowed length for strings in code execution",
|
||||
default=80000,
|
||||
default=400_000,
|
||||
)
|
||||
|
||||
CODE_MAX_STRING_ARRAY_LENGTH: PositiveInt = Field(
|
||||
@ -593,6 +593,11 @@ class WorkflowConfig(BaseSettings):
|
||||
default=200 * 1024,
|
||||
)
|
||||
|
||||
TEMPLATE_TRANSFORM_MAX_LENGTH: PositiveInt = Field(
|
||||
description="Maximum number of characters allowed in Template Transform node output",
|
||||
default=400_000,
|
||||
)
|
||||
|
||||
# GraphEngine Worker Pool Configuration
|
||||
GRAPH_ENGINE_MIN_WORKERS: PositiveInt = Field(
|
||||
description="Minimum number of workers per GraphEngine instance",
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
from typing import TYPE_CHECKING, Any, Optional
|
||||
|
||||
from openai import BaseModel
|
||||
from pydantic import Field
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
# Import InvokeFrom locally to avoid circular import
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
from typing import Any
|
||||
|
||||
from openai import BaseModel
|
||||
from pydantic import Field
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import os
|
||||
from collections.abc import Mapping, Sequence
|
||||
from typing import Any
|
||||
|
||||
from configs import dify_config
|
||||
from core.helper.code_executor.code_executor import CodeExecutionError, CodeExecutor, CodeLanguage
|
||||
from core.workflow.enums import ErrorStrategy, NodeType, WorkflowNodeExecutionStatus
|
||||
from core.workflow.node_events import NodeRunResult
|
||||
@ -9,7 +9,7 @@ from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from core.workflow.nodes.template_transform.entities import TemplateTransformNodeData
|
||||
|
||||
MAX_TEMPLATE_TRANSFORM_OUTPUT_LENGTH = int(os.environ.get("TEMPLATE_TRANSFORM_MAX_LENGTH", "80000"))
|
||||
MAX_TEMPLATE_TRANSFORM_OUTPUT_LENGTH = dify_config.TEMPLATE_TRANSFORM_MAX_LENGTH
|
||||
|
||||
|
||||
class TemplateTransformNode(Node):
|
||||
|
||||
@ -10,14 +10,14 @@ from dify_app import DifyApp
|
||||
|
||||
def init_app(app: DifyApp):
|
||||
@app.after_request
|
||||
def after_request(response):
|
||||
def after_request(response): # pyright: ignore[reportUnusedFunction]
|
||||
"""Add Version headers to the response."""
|
||||
response.headers.add("X-Version", dify_config.project.version)
|
||||
response.headers.add("X-Env", dify_config.DEPLOY_ENV)
|
||||
return response
|
||||
|
||||
@app.route("/health")
|
||||
def health():
|
||||
def health(): # pyright: ignore[reportUnusedFunction]
|
||||
return Response(
|
||||
json.dumps({"pid": os.getpid(), "status": "ok", "version": dify_config.project.version}),
|
||||
status=200,
|
||||
@ -25,7 +25,7 @@ def init_app(app: DifyApp):
|
||||
)
|
||||
|
||||
@app.route("/threads")
|
||||
def threads():
|
||||
def threads(): # pyright: ignore[reportUnusedFunction]
|
||||
num_threads = threading.active_count()
|
||||
threads = threading.enumerate()
|
||||
|
||||
@ -50,7 +50,7 @@ def init_app(app: DifyApp):
|
||||
}
|
||||
|
||||
@app.route("/db-pool-stat")
|
||||
def pool_stat():
|
||||
def pool_stat(): # pyright: ignore[reportUnusedFunction]
|
||||
from extensions.ext_database import db
|
||||
|
||||
engine = db.engine
|
||||
|
||||
@ -10,7 +10,7 @@ from models.engine import db
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Global flag to avoid duplicate registration of event listener
|
||||
_GEVENT_COMPATIBILITY_SETUP: bool = False
|
||||
_gevent_compatibility_setup: bool = False
|
||||
|
||||
|
||||
def _safe_rollback(connection):
|
||||
@ -26,14 +26,14 @@ def _safe_rollback(connection):
|
||||
|
||||
|
||||
def _setup_gevent_compatibility():
|
||||
global _GEVENT_COMPATIBILITY_SETUP # pylint: disable=global-statement
|
||||
global _gevent_compatibility_setup # pylint: disable=global-statement
|
||||
|
||||
# Avoid duplicate registration
|
||||
if _GEVENT_COMPATIBILITY_SETUP:
|
||||
if _gevent_compatibility_setup:
|
||||
return
|
||||
|
||||
@event.listens_for(Pool, "reset")
|
||||
def _safe_reset(dbapi_connection, connection_record, reset_state): # pylint: disable=unused-argument
|
||||
def _safe_reset(dbapi_connection, connection_record, reset_state): # pyright: ignore[reportUnusedFunction]
|
||||
if reset_state.terminate_only:
|
||||
return
|
||||
|
||||
@ -47,7 +47,7 @@ def _setup_gevent_compatibility():
|
||||
except (AttributeError, ImportError):
|
||||
_safe_rollback(dbapi_connection)
|
||||
|
||||
_GEVENT_COMPATIBILITY_SETUP = True
|
||||
_gevent_compatibility_setup = True
|
||||
|
||||
|
||||
def init_app(app: DifyApp):
|
||||
|
||||
@ -2,4 +2,4 @@ from dify_app import DifyApp
|
||||
|
||||
|
||||
def init_app(app: DifyApp):
|
||||
from events import event_handlers # noqa: F401
|
||||
from events import event_handlers # noqa: F401 # pyright: ignore[reportUnusedImport]
|
||||
|
||||
@ -4,7 +4,6 @@ from dify_app import DifyApp
|
||||
|
||||
def init_app(app: DifyApp):
|
||||
if dify_config.SENTRY_DSN:
|
||||
import openai
|
||||
import sentry_sdk
|
||||
from langfuse import parse_error # type: ignore
|
||||
from sentry_sdk.integrations.celery import CeleryIntegration
|
||||
@ -28,7 +27,6 @@ def init_app(app: DifyApp):
|
||||
HTTPException,
|
||||
ValueError,
|
||||
FileNotFoundError,
|
||||
openai.APIStatusError,
|
||||
InvokeRateLimitError,
|
||||
parse_error.defaultErrorResponse,
|
||||
],
|
||||
|
||||
@ -33,7 +33,9 @@ class AliyunOssStorage(BaseStorage):
|
||||
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
obj = self.client.get_object(self.__wrapper_folder_filename(filename))
|
||||
data: bytes = obj.read()
|
||||
data = obj.read()
|
||||
if not isinstance(data, bytes):
|
||||
return b""
|
||||
return data
|
||||
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
|
||||
@ -39,10 +39,10 @@ class AwsS3Storage(BaseStorage):
|
||||
self.client.head_bucket(Bucket=self.bucket_name)
|
||||
except ClientError as e:
|
||||
# if bucket not exists, create it
|
||||
if e.response["Error"]["Code"] == "404":
|
||||
if e.response.get("Error", {}).get("Code") == "404":
|
||||
self.client.create_bucket(Bucket=self.bucket_name)
|
||||
# if bucket is not accessible, pass, maybe the bucket is existing but not accessible
|
||||
elif e.response["Error"]["Code"] == "403":
|
||||
elif e.response.get("Error", {}).get("Code") == "403":
|
||||
pass
|
||||
else:
|
||||
# other error, raise exception
|
||||
@ -55,7 +55,7 @@ class AwsS3Storage(BaseStorage):
|
||||
try:
|
||||
data: bytes = self.client.get_object(Bucket=self.bucket_name, Key=filename)["Body"].read()
|
||||
except ClientError as ex:
|
||||
if ex.response["Error"]["Code"] == "NoSuchKey":
|
||||
if ex.response.get("Error", {}).get("Code") == "NoSuchKey":
|
||||
raise FileNotFoundError("File not found")
|
||||
else:
|
||||
raise
|
||||
@ -66,7 +66,7 @@ class AwsS3Storage(BaseStorage):
|
||||
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
|
||||
yield from response["Body"].iter_chunks()
|
||||
except ClientError as ex:
|
||||
if ex.response["Error"]["Code"] == "NoSuchKey":
|
||||
if ex.response.get("Error", {}).get("Code") == "NoSuchKey":
|
||||
raise FileNotFoundError("file not found")
|
||||
elif "reached max retries" in str(ex):
|
||||
raise ValueError("please do not request the same file too frequently")
|
||||
|
||||
@ -27,24 +27,38 @@ class AzureBlobStorage(BaseStorage):
|
||||
self.credential = None
|
||||
|
||||
def save(self, filename, data):
|
||||
if not self.bucket_name:
|
||||
return
|
||||
|
||||
client = self._sync_client()
|
||||
blob_container = client.get_container_client(container=self.bucket_name)
|
||||
blob_container.upload_blob(filename, data)
|
||||
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
if not self.bucket_name:
|
||||
raise FileNotFoundError("Azure bucket name is not configured.")
|
||||
|
||||
client = self._sync_client()
|
||||
blob = client.get_container_client(container=self.bucket_name)
|
||||
blob = blob.get_blob_client(blob=filename)
|
||||
data: bytes = blob.download_blob().readall()
|
||||
data = blob.download_blob().readall()
|
||||
if not isinstance(data, bytes):
|
||||
raise TypeError(f"Expected bytes from blob.readall(), got {type(data).__name__}")
|
||||
return data
|
||||
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
if not self.bucket_name:
|
||||
raise FileNotFoundError("Azure bucket name is not configured.")
|
||||
|
||||
client = self._sync_client()
|
||||
blob = client.get_blob_client(container=self.bucket_name, blob=filename)
|
||||
blob_data = blob.download_blob()
|
||||
yield from blob_data.chunks()
|
||||
|
||||
def download(self, filename, target_filepath):
|
||||
if not self.bucket_name:
|
||||
return
|
||||
|
||||
client = self._sync_client()
|
||||
|
||||
blob = client.get_blob_client(container=self.bucket_name, blob=filename)
|
||||
@ -53,12 +67,18 @@ class AzureBlobStorage(BaseStorage):
|
||||
blob_data.readinto(my_blob)
|
||||
|
||||
def exists(self, filename):
|
||||
if not self.bucket_name:
|
||||
return False
|
||||
|
||||
client = self._sync_client()
|
||||
|
||||
blob = client.get_blob_client(container=self.bucket_name, blob=filename)
|
||||
return blob.exists()
|
||||
|
||||
def delete(self, filename):
|
||||
if not self.bucket_name:
|
||||
return
|
||||
|
||||
client = self._sync_client()
|
||||
|
||||
blob_container = client.get_container_client(container=self.bucket_name)
|
||||
|
||||
@ -430,7 +430,7 @@ class ClickZettaVolumeStorage(BaseStorage):
|
||||
|
||||
rows = self._execute_sql(sql, fetch=True)
|
||||
|
||||
exists = len(rows) > 0
|
||||
exists = len(rows) > 0 if rows else False
|
||||
logger.debug("File %s exists check: %s", filename, exists)
|
||||
return exists
|
||||
except Exception as e:
|
||||
@ -509,16 +509,17 @@ class ClickZettaVolumeStorage(BaseStorage):
|
||||
rows = self._execute_sql(sql, fetch=True)
|
||||
|
||||
result = []
|
||||
for row in rows:
|
||||
file_path = row[0] # relative_path column
|
||||
if rows:
|
||||
for row in rows:
|
||||
file_path = row[0] # relative_path column
|
||||
|
||||
# For User Volume, remove dify prefix from results
|
||||
dify_prefix_with_slash = f"{self._config.dify_prefix}/"
|
||||
if volume_prefix == "USER VOLUME" and file_path.startswith(dify_prefix_with_slash):
|
||||
file_path = file_path[len(dify_prefix_with_slash) :] # Remove prefix
|
||||
# For User Volume, remove dify prefix from results
|
||||
dify_prefix_with_slash = f"{self._config.dify_prefix}/"
|
||||
if volume_prefix == "USER VOLUME" and file_path.startswith(dify_prefix_with_slash):
|
||||
file_path = file_path[len(dify_prefix_with_slash) :] # Remove prefix
|
||||
|
||||
if files and not file_path.endswith("/") or directories and file_path.endswith("/"):
|
||||
result.append(file_path)
|
||||
if files and not file_path.endswith("/") or directories and file_path.endswith("/"):
|
||||
result.append(file_path)
|
||||
|
||||
logger.debug("Scanned %d items in path %s", len(result), path)
|
||||
return result
|
||||
|
||||
@ -439,6 +439,11 @@ class VolumePermissionManager:
|
||||
self._permission_cache.clear()
|
||||
logger.debug("Permission cache cleared")
|
||||
|
||||
@property
|
||||
def volume_type(self) -> str | None:
|
||||
"""Get the volume type."""
|
||||
return self._volume_type
|
||||
|
||||
def get_permission_summary(self, dataset_id: str | None = None) -> dict[str, bool]:
|
||||
"""Get permission summary
|
||||
|
||||
@ -632,13 +637,13 @@ def check_volume_permission(permission_manager: VolumePermissionManager, operati
|
||||
VolumePermissionError: If no permission
|
||||
"""
|
||||
if not permission_manager.validate_operation(operation, dataset_id):
|
||||
error_message = f"Permission denied for operation '{operation}' on {permission_manager._volume_type} volume"
|
||||
error_message = f"Permission denied for operation '{operation}' on {permission_manager.volume_type} volume"
|
||||
if dataset_id:
|
||||
error_message += f" (dataset: {dataset_id})"
|
||||
|
||||
raise VolumePermissionError(
|
||||
error_message,
|
||||
operation=operation,
|
||||
volume_type=permission_manager._volume_type or "unknown",
|
||||
volume_type=permission_manager.volume_type or "unknown",
|
||||
dataset_id=dataset_id,
|
||||
)
|
||||
|
||||
@ -35,12 +35,16 @@ class GoogleCloudStorage(BaseStorage):
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
bucket = self.client.get_bucket(self.bucket_name)
|
||||
blob = bucket.get_blob(filename)
|
||||
if blob is None:
|
||||
raise FileNotFoundError("File not found")
|
||||
data: bytes = blob.download_as_bytes()
|
||||
return data
|
||||
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
bucket = self.client.get_bucket(self.bucket_name)
|
||||
blob = bucket.get_blob(filename)
|
||||
if blob is None:
|
||||
raise FileNotFoundError("File not found")
|
||||
with blob.open(mode="rb") as blob_stream:
|
||||
while chunk := blob_stream.read(4096):
|
||||
yield chunk
|
||||
@ -48,6 +52,8 @@ class GoogleCloudStorage(BaseStorage):
|
||||
def download(self, filename, target_filepath):
|
||||
bucket = self.client.get_bucket(self.bucket_name)
|
||||
blob = bucket.get_blob(filename)
|
||||
if blob is None:
|
||||
raise FileNotFoundError("File not found")
|
||||
blob.download_to_filename(target_filepath)
|
||||
|
||||
def exists(self, filename):
|
||||
|
||||
@ -45,7 +45,7 @@ class HuaweiObsStorage(BaseStorage):
|
||||
|
||||
def _get_meta(self, filename):
|
||||
res = self.client.getObjectMetadata(bucketName=self.bucket_name, objectKey=filename)
|
||||
if res.status < 300:
|
||||
if res and res.status and res.status < 300:
|
||||
return res
|
||||
else:
|
||||
return None
|
||||
|
||||
@ -3,9 +3,9 @@ import os
|
||||
from collections.abc import Generator
|
||||
from pathlib import Path
|
||||
|
||||
import opendal
|
||||
from dotenv import dotenv_values
|
||||
from opendal import Operator
|
||||
from opendal.layers import RetryLayer
|
||||
|
||||
from extensions.storage.base_storage import BaseStorage
|
||||
|
||||
@ -35,7 +35,7 @@ class OpenDALStorage(BaseStorage):
|
||||
root = kwargs.get("root", "storage")
|
||||
Path(root).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
retry_layer = RetryLayer(max_times=3, factor=2.0, jitter=True)
|
||||
retry_layer = opendal.layers.RetryLayer(max_times=3, factor=2.0, jitter=True)
|
||||
self.op = Operator(scheme=scheme, **kwargs).layer(retry_layer)
|
||||
logger.debug("opendal operator created with scheme %s", scheme)
|
||||
logger.debug("added retry layer to opendal operator")
|
||||
|
||||
@ -29,7 +29,7 @@ class OracleOCIStorage(BaseStorage):
|
||||
try:
|
||||
data: bytes = self.client.get_object(Bucket=self.bucket_name, Key=filename)["Body"].read()
|
||||
except ClientError as ex:
|
||||
if ex.response["Error"]["Code"] == "NoSuchKey":
|
||||
if ex.response.get("Error", {}).get("Code") == "NoSuchKey":
|
||||
raise FileNotFoundError("File not found")
|
||||
else:
|
||||
raise
|
||||
@ -40,7 +40,7 @@ class OracleOCIStorage(BaseStorage):
|
||||
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
|
||||
yield from response["Body"].iter_chunks()
|
||||
except ClientError as ex:
|
||||
if ex.response["Error"]["Code"] == "NoSuchKey":
|
||||
if ex.response.get("Error", {}).get("Code") == "NoSuchKey":
|
||||
raise FileNotFoundError("File not found")
|
||||
else:
|
||||
raise
|
||||
|
||||
@ -46,13 +46,13 @@ class SupabaseStorage(BaseStorage):
|
||||
Path(target_filepath).write_bytes(result)
|
||||
|
||||
def exists(self, filename):
|
||||
result = self.client.storage.from_(self.bucket_name).list(filename)
|
||||
if result.count() > 0:
|
||||
result = self.client.storage.from_(self.bucket_name).list(path=filename)
|
||||
if len(result) > 0:
|
||||
return True
|
||||
return False
|
||||
|
||||
def delete(self, filename):
|
||||
self.client.storage.from_(self.bucket_name).remove(filename)
|
||||
self.client.storage.from_(self.bucket_name).remove([filename])
|
||||
|
||||
def bucket_exists(self):
|
||||
buckets = self.client.storage.list_buckets()
|
||||
|
||||
@ -11,6 +11,14 @@ class VolcengineTosStorage(BaseStorage):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
if not dify_config.VOLCENGINE_TOS_ACCESS_KEY:
|
||||
raise ValueError("VOLCENGINE_TOS_ACCESS_KEY is not set")
|
||||
if not dify_config.VOLCENGINE_TOS_SECRET_KEY:
|
||||
raise ValueError("VOLCENGINE_TOS_SECRET_KEY is not set")
|
||||
if not dify_config.VOLCENGINE_TOS_ENDPOINT:
|
||||
raise ValueError("VOLCENGINE_TOS_ENDPOINT is not set")
|
||||
if not dify_config.VOLCENGINE_TOS_REGION:
|
||||
raise ValueError("VOLCENGINE_TOS_REGION is not set")
|
||||
self.bucket_name = dify_config.VOLCENGINE_TOS_BUCKET_NAME
|
||||
self.client = tos.TosClientV2(
|
||||
ak=dify_config.VOLCENGINE_TOS_ACCESS_KEY,
|
||||
@ -20,27 +28,39 @@ class VolcengineTosStorage(BaseStorage):
|
||||
)
|
||||
|
||||
def save(self, filename, data):
|
||||
if not self.bucket_name:
|
||||
raise ValueError("VOLCENGINE_TOS_BUCKET_NAME is not set")
|
||||
self.client.put_object(bucket=self.bucket_name, key=filename, content=data)
|
||||
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
if not self.bucket_name:
|
||||
raise FileNotFoundError("VOLCENGINE_TOS_BUCKET_NAME is not set")
|
||||
data = self.client.get_object(bucket=self.bucket_name, key=filename).read()
|
||||
if not isinstance(data, bytes):
|
||||
raise TypeError(f"Expected bytes, got {type(data).__name__}")
|
||||
return data
|
||||
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
if not self.bucket_name:
|
||||
raise FileNotFoundError("VOLCENGINE_TOS_BUCKET_NAME is not set")
|
||||
response = self.client.get_object(bucket=self.bucket_name, key=filename)
|
||||
while chunk := response.read(4096):
|
||||
yield chunk
|
||||
|
||||
def download(self, filename, target_filepath):
|
||||
if not self.bucket_name:
|
||||
raise ValueError("VOLCENGINE_TOS_BUCKET_NAME is not set")
|
||||
self.client.get_object_to_file(bucket=self.bucket_name, key=filename, file_path=target_filepath)
|
||||
|
||||
def exists(self, filename):
|
||||
if not self.bucket_name:
|
||||
return False
|
||||
res = self.client.head_object(bucket=self.bucket_name, key=filename)
|
||||
if res.status_code != 200:
|
||||
return False
|
||||
return True
|
||||
|
||||
def delete(self, filename):
|
||||
if not self.bucket_name:
|
||||
return
|
||||
self.client.delete_object(bucket=self.bucket_name, key=filename)
|
||||
|
||||
@ -37,7 +37,6 @@ dependencies = [
|
||||
"mailchimp-transactional~=1.0.50",
|
||||
"markdown~=3.5.1",
|
||||
"numpy~=1.26.4",
|
||||
"openai~=1.61.0",
|
||||
"openpyxl~=3.1.5",
|
||||
"opik~=1.7.25",
|
||||
"opentelemetry-api==1.27.0",
|
||||
|
||||
@ -5,7 +5,6 @@
|
||||
".venv",
|
||||
"migrations/",
|
||||
"core/rag",
|
||||
"extensions",
|
||||
"core/app/app_config/easy_ui_based_app/dataset"
|
||||
],
|
||||
"typeCheckingMode": "strict",
|
||||
|
||||
@ -7,7 +7,7 @@ env =
|
||||
CHATGLM_API_BASE = http://a.abc.com:11451
|
||||
CODE_EXECUTION_API_KEY = dify-sandbox
|
||||
CODE_EXECUTION_ENDPOINT = http://127.0.0.1:8194
|
||||
CODE_MAX_STRING_LENGTH = 80000
|
||||
CODE_MAX_STRING_LENGTH = 400000
|
||||
PLUGIN_DAEMON_KEY=lYkiYYT6owG+71oLerGzA7GXCgOT++6ovaezWAjpCjf+Sjc3ZtU+qUEi
|
||||
PLUGIN_DAEMON_URL=http://127.0.0.1:5002
|
||||
PLUGIN_MAX_PACKAGE_SIZE=15728640
|
||||
|
||||
@ -2,8 +2,6 @@ import uuid
|
||||
from collections.abc import Generator, Mapping
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
from openai._exceptions import RateLimitError
|
||||
|
||||
from configs import dify_config
|
||||
from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator
|
||||
from core.app.apps.agent_chat.app_generator import AgentChatAppGenerator
|
||||
@ -124,8 +122,6 @@ class AppGenerateService:
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Invalid app mode {app_model.mode}")
|
||||
except RateLimitError as e:
|
||||
raise InvokeRateLimitError(str(e))
|
||||
except Exception:
|
||||
rate_limit.exit(request_id)
|
||||
raise
|
||||
|
||||
@ -79,7 +79,6 @@ class WorkflowConverter:
|
||||
new_app.updated_by = account.id
|
||||
db.session.add(new_app)
|
||||
db.session.flush()
|
||||
db.session.commit()
|
||||
|
||||
workflow.app_id = new_app.id
|
||||
db.session.commit()
|
||||
|
||||
@ -1,9 +1,9 @@
|
||||
import time
|
||||
import uuid
|
||||
from os import getenv
|
||||
|
||||
import pytest
|
||||
|
||||
from configs import dify_config
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from core.workflow.entities import GraphInitParams, GraphRuntimeState, VariablePool
|
||||
from core.workflow.enums import WorkflowNodeExecutionStatus
|
||||
@ -15,7 +15,7 @@ from core.workflow.system_variable import SystemVariable
|
||||
from models.enums import UserFrom
|
||||
from tests.integration_tests.workflow.nodes.__mock.code_executor import setup_code_executor_mock
|
||||
|
||||
CODE_MAX_STRING_LENGTH = int(getenv("CODE_MAX_STRING_LENGTH", "10000"))
|
||||
CODE_MAX_STRING_LENGTH = dify_config.CODE_MAX_STRING_LENGTH
|
||||
|
||||
|
||||
def init_code_node(code_config: dict):
|
||||
|
||||
@ -18,6 +18,7 @@ from flask.testing import FlaskClient
|
||||
from sqlalchemy import Engine, text
|
||||
from sqlalchemy.orm import Session
|
||||
from testcontainers.core.container import DockerContainer
|
||||
from testcontainers.core.network import Network
|
||||
from testcontainers.core.waiting_utils import wait_for_logs
|
||||
from testcontainers.postgres import PostgresContainer
|
||||
from testcontainers.redis import RedisContainer
|
||||
@ -41,6 +42,7 @@ class DifyTestContainers:
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize container management with default configurations."""
|
||||
self.network: Network | None = None
|
||||
self.postgres: PostgresContainer | None = None
|
||||
self.redis: RedisContainer | None = None
|
||||
self.dify_sandbox: DockerContainer | None = None
|
||||
@ -62,12 +64,18 @@ class DifyTestContainers:
|
||||
|
||||
logger.info("Starting test containers for Dify integration tests...")
|
||||
|
||||
# Create Docker network for container communication
|
||||
logger.info("Creating Docker network for container communication...")
|
||||
self.network = Network()
|
||||
self.network.create()
|
||||
logger.info("Docker network created successfully with name: %s", self.network.name)
|
||||
|
||||
# Start PostgreSQL container for main application database
|
||||
# PostgreSQL is used for storing user data, workflows, and application state
|
||||
logger.info("Initializing PostgreSQL container...")
|
||||
self.postgres = PostgresContainer(
|
||||
image="postgres:14-alpine",
|
||||
)
|
||||
).with_network(self.network)
|
||||
self.postgres.start()
|
||||
db_host = self.postgres.get_container_host_ip()
|
||||
db_port = self.postgres.get_exposed_port(5432)
|
||||
@ -137,7 +145,7 @@ class DifyTestContainers:
|
||||
# Start Redis container for caching and session management
|
||||
# Redis is used for storing session data, cache entries, and temporary data
|
||||
logger.info("Initializing Redis container...")
|
||||
self.redis = RedisContainer(image="redis:6-alpine", port=6379)
|
||||
self.redis = RedisContainer(image="redis:6-alpine", port=6379).with_network(self.network)
|
||||
self.redis.start()
|
||||
redis_host = self.redis.get_container_host_ip()
|
||||
redis_port = self.redis.get_exposed_port(6379)
|
||||
@ -153,7 +161,7 @@ class DifyTestContainers:
|
||||
# Start Dify Sandbox container for code execution environment
|
||||
# Dify Sandbox provides a secure environment for executing user code
|
||||
logger.info("Initializing Dify Sandbox container...")
|
||||
self.dify_sandbox = DockerContainer(image="langgenius/dify-sandbox:latest")
|
||||
self.dify_sandbox = DockerContainer(image="langgenius/dify-sandbox:latest").with_network(self.network)
|
||||
self.dify_sandbox.with_exposed_ports(8194)
|
||||
self.dify_sandbox.env = {
|
||||
"API_KEY": "test_api_key",
|
||||
@ -173,22 +181,28 @@ class DifyTestContainers:
|
||||
# Start Dify Plugin Daemon container for plugin management
|
||||
# Dify Plugin Daemon provides plugin lifecycle management and execution
|
||||
logger.info("Initializing Dify Plugin Daemon container...")
|
||||
self.dify_plugin_daemon = DockerContainer(image="langgenius/dify-plugin-daemon:0.3.0-local")
|
||||
self.dify_plugin_daemon = DockerContainer(image="langgenius/dify-plugin-daemon:0.3.0-local").with_network(
|
||||
self.network
|
||||
)
|
||||
self.dify_plugin_daemon.with_exposed_ports(5002)
|
||||
# Get container internal network addresses
|
||||
postgres_container_name = self.postgres.get_wrapped_container().name
|
||||
redis_container_name = self.redis.get_wrapped_container().name
|
||||
|
||||
self.dify_plugin_daemon.env = {
|
||||
"DB_HOST": db_host,
|
||||
"DB_PORT": str(db_port),
|
||||
"DB_HOST": postgres_container_name, # Use container name for internal network communication
|
||||
"DB_PORT": "5432", # Use internal port
|
||||
"DB_USERNAME": self.postgres.username,
|
||||
"DB_PASSWORD": self.postgres.password,
|
||||
"DB_DATABASE": "dify_plugin",
|
||||
"REDIS_HOST": redis_host,
|
||||
"REDIS_PORT": str(redis_port),
|
||||
"REDIS_HOST": redis_container_name, # Use container name for internal network communication
|
||||
"REDIS_PORT": "6379", # Use internal port
|
||||
"REDIS_PASSWORD": "",
|
||||
"SERVER_PORT": "5002",
|
||||
"SERVER_KEY": "test_plugin_daemon_key",
|
||||
"MAX_PLUGIN_PACKAGE_SIZE": "52428800",
|
||||
"PPROF_ENABLED": "false",
|
||||
"DIFY_INNER_API_URL": f"http://{db_host}:5001",
|
||||
"DIFY_INNER_API_URL": f"http://{postgres_container_name}:5001",
|
||||
"DIFY_INNER_API_KEY": "test_inner_api_key",
|
||||
"PLUGIN_REMOTE_INSTALLING_HOST": "0.0.0.0",
|
||||
"PLUGIN_REMOTE_INSTALLING_PORT": "5003",
|
||||
@ -253,6 +267,15 @@ class DifyTestContainers:
|
||||
# Log error but don't fail the test cleanup
|
||||
logger.warning("Failed to stop container %s: %s", container, e)
|
||||
|
||||
# Stop and remove the network
|
||||
if self.network:
|
||||
try:
|
||||
logger.info("Removing Docker network...")
|
||||
self.network.remove()
|
||||
logger.info("Successfully removed Docker network")
|
||||
except Exception as e:
|
||||
logger.warning("Failed to remove Docker network: %s", e)
|
||||
|
||||
self._containers_started = False
|
||||
logger.info("All test containers stopped and cleaned up successfully")
|
||||
|
||||
|
||||
@ -3,7 +3,6 @@ from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from faker import Faker
|
||||
from openai._exceptions import RateLimitError
|
||||
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from models.model import EndUser
|
||||
@ -484,36 +483,6 @@ class TestAppGenerateService:
|
||||
# Verify error message
|
||||
assert "Rate limit exceeded" in str(exc_info.value)
|
||||
|
||||
def test_generate_with_rate_limit_error_from_openai(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test generation when OpenAI rate limit error occurs.
|
||||
"""
|
||||
fake = Faker()
|
||||
app, account = self._create_test_app_and_account(
|
||||
db_session_with_containers, mock_external_service_dependencies, mode="completion"
|
||||
)
|
||||
|
||||
# Setup completion generator to raise RateLimitError
|
||||
mock_response = MagicMock()
|
||||
mock_response.request = MagicMock()
|
||||
mock_external_service_dependencies["completion_generator"].return_value.generate.side_effect = RateLimitError(
|
||||
"Rate limit exceeded", response=mock_response, body=None
|
||||
)
|
||||
|
||||
# Setup test arguments
|
||||
args = {"inputs": {"query": fake.text(max_nb_chars=50)}, "response_mode": "streaming"}
|
||||
|
||||
# Execute the method under test and expect rate limit error
|
||||
with pytest.raises(InvokeRateLimitError) as exc_info:
|
||||
AppGenerateService.generate(
|
||||
app_model=app, user=account, args=args, invoke_from=InvokeFrom.SERVICE_API, streaming=True
|
||||
)
|
||||
|
||||
# Verify error message
|
||||
assert "Rate limit exceeded" in str(exc_info.value)
|
||||
|
||||
def test_generate_with_invalid_app_mode(self, db_session_with_containers, mock_external_service_dependencies):
|
||||
"""
|
||||
Test generation with invalid app mode.
|
||||
|
||||
@ -784,133 +784,6 @@ class TestCleanDatasetTask:
|
||||
print(f"Total cleanup time: {cleanup_duration:.3f} seconds")
|
||||
print(f"Average time per document: {cleanup_duration / len(documents):.3f} seconds")
|
||||
|
||||
def test_clean_dataset_task_concurrent_cleanup_scenarios(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test dataset cleanup with concurrent cleanup scenarios and race conditions.
|
||||
|
||||
This test verifies that the task can properly:
|
||||
1. Handle multiple cleanup operations on the same dataset
|
||||
2. Prevent data corruption during concurrent access
|
||||
3. Maintain data consistency across multiple cleanup attempts
|
||||
4. Handle race conditions gracefully
|
||||
5. Ensure idempotent cleanup operations
|
||||
"""
|
||||
# Create test data
|
||||
account, tenant = self._create_test_account_and_tenant(db_session_with_containers)
|
||||
dataset = self._create_test_dataset(db_session_with_containers, account, tenant)
|
||||
document = self._create_test_document(db_session_with_containers, account, tenant, dataset)
|
||||
segment = self._create_test_segment(db_session_with_containers, account, tenant, dataset, document)
|
||||
upload_file = self._create_test_upload_file(db_session_with_containers, account, tenant)
|
||||
|
||||
# Update document with file reference
|
||||
import json
|
||||
|
||||
document.data_source_info = json.dumps({"upload_file_id": upload_file.id})
|
||||
from extensions.ext_database import db
|
||||
|
||||
db.session.commit()
|
||||
|
||||
# Save IDs for verification
|
||||
dataset_id = dataset.id
|
||||
tenant_id = tenant.id
|
||||
upload_file_id = upload_file.id
|
||||
|
||||
# Mock storage to simulate slow operations
|
||||
mock_storage = mock_external_service_dependencies["storage"]
|
||||
original_delete = mock_storage.delete
|
||||
|
||||
def slow_delete(key):
|
||||
import time
|
||||
|
||||
time.sleep(0.1) # Simulate slow storage operation
|
||||
return original_delete(key)
|
||||
|
||||
mock_storage.delete.side_effect = slow_delete
|
||||
|
||||
# Execute multiple cleanup operations concurrently
|
||||
import threading
|
||||
|
||||
cleanup_results = []
|
||||
cleanup_errors = []
|
||||
|
||||
def run_cleanup():
|
||||
try:
|
||||
clean_dataset_task(
|
||||
dataset_id=dataset_id,
|
||||
tenant_id=tenant_id,
|
||||
indexing_technique="high_quality",
|
||||
index_struct='{"type": "paragraph"}',
|
||||
collection_binding_id=str(uuid.uuid4()),
|
||||
doc_form="paragraph_index",
|
||||
)
|
||||
cleanup_results.append("success")
|
||||
except Exception as e:
|
||||
cleanup_errors.append(str(e))
|
||||
|
||||
# Start multiple cleanup threads
|
||||
threads = []
|
||||
for i in range(3):
|
||||
thread = threading.Thread(target=run_cleanup)
|
||||
threads.append(thread)
|
||||
thread.start()
|
||||
|
||||
# Wait for all threads to complete
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
# Verify results
|
||||
# Check that all documents were deleted (only once)
|
||||
remaining_documents = db.session.query(Document).filter_by(dataset_id=dataset_id).all()
|
||||
assert len(remaining_documents) == 0
|
||||
|
||||
# Check that all segments were deleted (only once)
|
||||
remaining_segments = db.session.query(DocumentSegment).filter_by(dataset_id=dataset_id).all()
|
||||
assert len(remaining_segments) == 0
|
||||
|
||||
# Check that upload file was deleted (only once)
|
||||
# Note: In concurrent scenarios, the first thread deletes documents and segments,
|
||||
# subsequent threads may not find the related data to clean up upload files
|
||||
# This demonstrates the idempotent nature of the cleanup process
|
||||
remaining_files = db.session.query(UploadFile).filter_by(id=upload_file_id).all()
|
||||
# The upload file should be deleted by the first successful cleanup operation
|
||||
# However, in concurrent scenarios, this may not always happen due to race conditions
|
||||
# This test demonstrates the idempotent nature of the cleanup process
|
||||
if len(remaining_files) > 0:
|
||||
print(f"Warning: Upload file {upload_file_id} was not deleted in concurrent scenario")
|
||||
print("This is expected behavior demonstrating the idempotent nature of cleanup")
|
||||
# We don't assert here as the behavior depends on timing and race conditions
|
||||
|
||||
# Verify that storage.delete was called (may be called multiple times in concurrent scenarios)
|
||||
# In concurrent scenarios, storage operations may be called multiple times due to race conditions
|
||||
assert mock_storage.delete.call_count > 0
|
||||
|
||||
# Verify that index processor was called (may be called multiple times in concurrent scenarios)
|
||||
mock_index_processor = mock_external_service_dependencies["index_processor"]
|
||||
assert mock_index_processor.clean.call_count > 0
|
||||
|
||||
# Check cleanup results
|
||||
assert len(cleanup_results) == 3, "All cleanup operations should complete"
|
||||
assert len(cleanup_errors) == 0, "No cleanup errors should occur"
|
||||
|
||||
# Verify idempotency by running cleanup again on the same dataset
|
||||
# This should not perform any additional operations since data is already cleaned
|
||||
clean_dataset_task(
|
||||
dataset_id=dataset_id,
|
||||
tenant_id=tenant_id,
|
||||
indexing_technique="high_quality",
|
||||
index_struct='{"type": "paragraph"}',
|
||||
collection_binding_id=str(uuid.uuid4()),
|
||||
doc_form="paragraph_index",
|
||||
)
|
||||
|
||||
# Verify that no additional storage operations were performed
|
||||
# Note: In concurrent scenarios, the exact count may vary due to race conditions
|
||||
print(f"Final storage delete calls: {mock_storage.delete.call_count}")
|
||||
print(f"Final index processor calls: {mock_index_processor.clean.call_count}")
|
||||
print("Note: Multiple calls in concurrent scenarios are expected due to race conditions")
|
||||
|
||||
def test_clean_dataset_task_storage_exception_handling(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
|
||||
@ -148,61 +148,6 @@ class TestEnableSegmentsToIndexTask:
|
||||
db.session.commit()
|
||||
return segments
|
||||
|
||||
def test_enable_segments_to_index_success(self, db_session_with_containers, mock_external_service_dependencies):
|
||||
"""
|
||||
Test successful segments indexing with paragraph index type.
|
||||
|
||||
This test verifies:
|
||||
- Proper dataset and document retrieval from database
|
||||
- Correct segment processing and document creation
|
||||
- Index processor integration
|
||||
- Database state updates
|
||||
- Redis cache key deletion
|
||||
"""
|
||||
# Arrange: Create test data
|
||||
dataset, document = self._create_test_dataset_and_document(
|
||||
db_session_with_containers, mock_external_service_dependencies
|
||||
)
|
||||
segments = self._create_test_segments(db_session_with_containers, document, dataset)
|
||||
|
||||
# Set up Redis cache keys to simulate indexing in progress
|
||||
segment_ids = [segment.id for segment in segments]
|
||||
for segment in segments:
|
||||
indexing_cache_key = f"segment_{segment.id}_indexing"
|
||||
redis_client.set(indexing_cache_key, "processing", ex=300) # 5 minutes expiry
|
||||
|
||||
# Verify cache keys exist
|
||||
for segment in segments:
|
||||
indexing_cache_key = f"segment_{segment.id}_indexing"
|
||||
assert redis_client.exists(indexing_cache_key) == 1
|
||||
|
||||
# Act: Execute the task
|
||||
enable_segments_to_index_task(segment_ids, dataset.id, document.id)
|
||||
|
||||
# Assert: Verify the expected outcomes
|
||||
# Verify index processor was called correctly
|
||||
mock_external_service_dependencies["index_processor_factory"].assert_called_once_with(IndexType.PARAGRAPH_INDEX)
|
||||
mock_external_service_dependencies["index_processor"].load.assert_called_once()
|
||||
|
||||
# Verify the load method was called with correct parameters
|
||||
call_args = mock_external_service_dependencies["index_processor"].load.call_args
|
||||
assert call_args is not None
|
||||
documents = call_args[0][1] # Second argument should be documents list
|
||||
assert len(documents) == 3
|
||||
|
||||
# Verify document structure
|
||||
for i, doc in enumerate(documents):
|
||||
assert doc.page_content == segments[i].content
|
||||
assert doc.metadata["doc_id"] == segments[i].index_node_id
|
||||
assert doc.metadata["doc_hash"] == segments[i].index_node_hash
|
||||
assert doc.metadata["document_id"] == document.id
|
||||
assert doc.metadata["dataset_id"] == dataset.id
|
||||
|
||||
# Verify Redis cache keys were deleted
|
||||
for segment in segments:
|
||||
indexing_cache_key = f"segment_{segment.id}_indexing"
|
||||
assert redis_client.exists(indexing_cache_key) == 0
|
||||
|
||||
def test_enable_segments_to_index_with_different_index_type(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
|
||||
@ -0,0 +1,242 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from faker import Faker
|
||||
|
||||
from extensions.ext_database import db
|
||||
from libs.email_i18n import EmailType
|
||||
from models.account import Account, Tenant, TenantAccountJoin, TenantAccountRole
|
||||
from tasks.mail_account_deletion_task import send_account_deletion_verification_code, send_deletion_success_task
|
||||
|
||||
|
||||
class TestMailAccountDeletionTask:
|
||||
"""Integration tests for mail account deletion tasks using testcontainers."""
|
||||
|
||||
@pytest.fixture
|
||||
def mock_external_service_dependencies(self):
|
||||
"""Mock setup for external service dependencies."""
|
||||
with (
|
||||
patch("tasks.mail_account_deletion_task.mail") as mock_mail,
|
||||
patch("tasks.mail_account_deletion_task.get_email_i18n_service") as mock_get_email_service,
|
||||
):
|
||||
# Setup mock mail service
|
||||
mock_mail.is_inited.return_value = True
|
||||
|
||||
# Setup mock email service
|
||||
mock_email_service = MagicMock()
|
||||
mock_get_email_service.return_value = mock_email_service
|
||||
|
||||
yield {
|
||||
"mail": mock_mail,
|
||||
"get_email_service": mock_get_email_service,
|
||||
"email_service": mock_email_service,
|
||||
}
|
||||
|
||||
def _create_test_account(self, db_session_with_containers):
|
||||
"""
|
||||
Helper method to create a test account for testing.
|
||||
|
||||
Args:
|
||||
db_session_with_containers: Database session from testcontainers infrastructure
|
||||
|
||||
Returns:
|
||||
Account: Created account instance
|
||||
"""
|
||||
fake = Faker()
|
||||
|
||||
# Create account
|
||||
account = Account(
|
||||
email=fake.email(),
|
||||
name=fake.name(),
|
||||
interface_language="en-US",
|
||||
status="active",
|
||||
)
|
||||
db.session.add(account)
|
||||
db.session.commit()
|
||||
|
||||
# Create tenant
|
||||
tenant = Tenant(
|
||||
name=fake.company(),
|
||||
status="normal",
|
||||
)
|
||||
db.session.add(tenant)
|
||||
db.session.commit()
|
||||
|
||||
# Create tenant-account join
|
||||
join = TenantAccountJoin(
|
||||
tenant_id=tenant.id,
|
||||
account_id=account.id,
|
||||
role=TenantAccountRole.OWNER.value,
|
||||
current=True,
|
||||
)
|
||||
db.session.add(join)
|
||||
db.session.commit()
|
||||
|
||||
return account
|
||||
|
||||
def test_send_deletion_success_task_success(self, db_session_with_containers, mock_external_service_dependencies):
|
||||
"""
|
||||
Test successful account deletion success email sending.
|
||||
|
||||
This test verifies:
|
||||
- Proper email service initialization check
|
||||
- Correct email service method calls
|
||||
- Template context is properly formatted
|
||||
- Email type is correctly specified
|
||||
"""
|
||||
# Arrange: Create test data
|
||||
account = self._create_test_account(db_session_with_containers)
|
||||
test_email = account.email
|
||||
test_language = "en-US"
|
||||
|
||||
# Act: Execute the task
|
||||
send_deletion_success_task(test_email, test_language)
|
||||
|
||||
# Assert: Verify the expected outcomes
|
||||
# Verify mail service was checked
|
||||
mock_external_service_dependencies["mail"].is_inited.assert_called_once()
|
||||
|
||||
# Verify email service was retrieved
|
||||
mock_external_service_dependencies["get_email_service"].assert_called_once()
|
||||
|
||||
# Verify email was sent with correct parameters
|
||||
mock_external_service_dependencies["email_service"].send_email.assert_called_once_with(
|
||||
email_type=EmailType.ACCOUNT_DELETION_SUCCESS,
|
||||
language_code=test_language,
|
||||
to=test_email,
|
||||
template_context={
|
||||
"to": test_email,
|
||||
"email": test_email,
|
||||
},
|
||||
)
|
||||
|
||||
def test_send_deletion_success_task_mail_not_initialized(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test account deletion success email when mail service is not initialized.
|
||||
|
||||
This test verifies:
|
||||
- Early return when mail service is not initialized
|
||||
- No email service calls are made
|
||||
- No exceptions are raised
|
||||
"""
|
||||
# Arrange: Setup mail service to return not initialized
|
||||
mock_external_service_dependencies["mail"].is_inited.return_value = False
|
||||
account = self._create_test_account(db_session_with_containers)
|
||||
test_email = account.email
|
||||
|
||||
# Act: Execute the task
|
||||
send_deletion_success_task(test_email)
|
||||
|
||||
# Assert: Verify no email service calls were made
|
||||
mock_external_service_dependencies["get_email_service"].assert_not_called()
|
||||
mock_external_service_dependencies["email_service"].send_email.assert_not_called()
|
||||
|
||||
def test_send_deletion_success_task_email_service_exception(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test account deletion success email when email service raises exception.
|
||||
|
||||
This test verifies:
|
||||
- Exception is properly caught and logged
|
||||
- Task completes without raising exception
|
||||
- Error logging is recorded
|
||||
"""
|
||||
# Arrange: Setup email service to raise exception
|
||||
mock_external_service_dependencies["email_service"].send_email.side_effect = Exception("Email service failed")
|
||||
account = self._create_test_account(db_session_with_containers)
|
||||
test_email = account.email
|
||||
|
||||
# Act: Execute the task (should not raise exception)
|
||||
send_deletion_success_task(test_email)
|
||||
|
||||
# Assert: Verify email service was called but exception was handled
|
||||
mock_external_service_dependencies["email_service"].send_email.assert_called_once()
|
||||
|
||||
def test_send_account_deletion_verification_code_success(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test successful account deletion verification code email sending.
|
||||
|
||||
This test verifies:
|
||||
- Proper email service initialization check
|
||||
- Correct email service method calls
|
||||
- Template context includes verification code
|
||||
- Email type is correctly specified
|
||||
"""
|
||||
# Arrange: Create test data
|
||||
account = self._create_test_account(db_session_with_containers)
|
||||
test_email = account.email
|
||||
test_code = "123456"
|
||||
test_language = "en-US"
|
||||
|
||||
# Act: Execute the task
|
||||
send_account_deletion_verification_code(test_email, test_code, test_language)
|
||||
|
||||
# Assert: Verify the expected outcomes
|
||||
# Verify mail service was checked
|
||||
mock_external_service_dependencies["mail"].is_inited.assert_called_once()
|
||||
|
||||
# Verify email service was retrieved
|
||||
mock_external_service_dependencies["get_email_service"].assert_called_once()
|
||||
|
||||
# Verify email was sent with correct parameters
|
||||
mock_external_service_dependencies["email_service"].send_email.assert_called_once_with(
|
||||
email_type=EmailType.ACCOUNT_DELETION_VERIFICATION,
|
||||
language_code=test_language,
|
||||
to=test_email,
|
||||
template_context={
|
||||
"to": test_email,
|
||||
"code": test_code,
|
||||
},
|
||||
)
|
||||
|
||||
def test_send_account_deletion_verification_code_mail_not_initialized(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test account deletion verification code email when mail service is not initialized.
|
||||
|
||||
This test verifies:
|
||||
- Early return when mail service is not initialized
|
||||
- No email service calls are made
|
||||
- No exceptions are raised
|
||||
"""
|
||||
# Arrange: Setup mail service to return not initialized
|
||||
mock_external_service_dependencies["mail"].is_inited.return_value = False
|
||||
account = self._create_test_account(db_session_with_containers)
|
||||
test_email = account.email
|
||||
test_code = "123456"
|
||||
|
||||
# Act: Execute the task
|
||||
send_account_deletion_verification_code(test_email, test_code)
|
||||
|
||||
# Assert: Verify no email service calls were made
|
||||
mock_external_service_dependencies["get_email_service"].assert_not_called()
|
||||
mock_external_service_dependencies["email_service"].send_email.assert_not_called()
|
||||
|
||||
def test_send_account_deletion_verification_code_email_service_exception(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test account deletion verification code email when email service raises exception.
|
||||
|
||||
This test verifies:
|
||||
- Exception is properly caught and logged
|
||||
- Task completes without raising exception
|
||||
- Error logging is recorded
|
||||
"""
|
||||
# Arrange: Setup email service to raise exception
|
||||
mock_external_service_dependencies["email_service"].send_email.side_effect = Exception("Email service failed")
|
||||
account = self._create_test_account(db_session_with_containers)
|
||||
test_email = account.email
|
||||
test_code = "123456"
|
||||
|
||||
# Act: Execute the task (should not raise exception)
|
||||
send_account_deletion_verification_code(test_email, test_code)
|
||||
|
||||
# Assert: Verify email service was called but exception was handled
|
||||
mock_external_service_dependencies["email_service"].send_email.assert_called_once()
|
||||
@ -0,0 +1,598 @@
|
||||
"""
|
||||
TestContainers-based integration tests for send_email_code_login_mail_task.
|
||||
|
||||
This module provides comprehensive integration tests for the email code login mail task
|
||||
using TestContainers infrastructure. The tests ensure that the task properly sends
|
||||
email verification codes for login with internationalization support and handles
|
||||
various error scenarios in a real database environment.
|
||||
|
||||
All tests use the testcontainers infrastructure to ensure proper database isolation
|
||||
and realistic testing scenarios with actual PostgreSQL and Redis instances.
|
||||
"""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from faker import Faker
|
||||
|
||||
from libs.email_i18n import EmailType
|
||||
from models.account import Account, Tenant, TenantAccountJoin, TenantAccountRole
|
||||
from tasks.mail_email_code_login import send_email_code_login_mail_task
|
||||
|
||||
|
||||
class TestSendEmailCodeLoginMailTask:
|
||||
"""
|
||||
Comprehensive integration tests for send_email_code_login_mail_task using testcontainers.
|
||||
|
||||
This test class covers all major functionality of the email code login mail task:
|
||||
- Successful email sending with different languages
|
||||
- Email service integration and template rendering
|
||||
- Error handling for various failure scenarios
|
||||
- Performance metrics and logging verification
|
||||
- Edge cases and boundary conditions
|
||||
|
||||
All tests use the testcontainers infrastructure to ensure proper database isolation
|
||||
and realistic testing environment with actual database interactions.
|
||||
"""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def cleanup_database(self, db_session_with_containers):
|
||||
"""Clean up database before each test to ensure isolation."""
|
||||
from extensions.ext_redis import redis_client
|
||||
|
||||
# Clear all test data
|
||||
db_session_with_containers.query(TenantAccountJoin).delete()
|
||||
db_session_with_containers.query(Tenant).delete()
|
||||
db_session_with_containers.query(Account).delete()
|
||||
db_session_with_containers.commit()
|
||||
|
||||
# Clear Redis cache
|
||||
redis_client.flushdb()
|
||||
|
||||
@pytest.fixture
|
||||
def mock_external_service_dependencies(self):
|
||||
"""Mock setup for external service dependencies."""
|
||||
with (
|
||||
patch("tasks.mail_email_code_login.mail") as mock_mail,
|
||||
patch("tasks.mail_email_code_login.get_email_i18n_service") as mock_email_service,
|
||||
):
|
||||
# Setup default mock returns
|
||||
mock_mail.is_inited.return_value = True
|
||||
|
||||
# Mock email service
|
||||
mock_email_service_instance = MagicMock()
|
||||
mock_email_service_instance.send_email.return_value = None
|
||||
mock_email_service.return_value = mock_email_service_instance
|
||||
|
||||
yield {
|
||||
"mail": mock_mail,
|
||||
"email_service": mock_email_service,
|
||||
"email_service_instance": mock_email_service_instance,
|
||||
}
|
||||
|
||||
def _create_test_account(self, db_session_with_containers, fake=None):
|
||||
"""
|
||||
Helper method to create a test account for testing.
|
||||
|
||||
Args:
|
||||
db_session_with_containers: Database session from testcontainers infrastructure
|
||||
fake: Faker instance for generating test data
|
||||
|
||||
Returns:
|
||||
Account: Created account instance
|
||||
"""
|
||||
if fake is None:
|
||||
fake = Faker()
|
||||
|
||||
# Create account
|
||||
account = Account(
|
||||
email=fake.email(),
|
||||
name=fake.name(),
|
||||
interface_language="en-US",
|
||||
status="active",
|
||||
)
|
||||
|
||||
db_session_with_containers.add(account)
|
||||
db_session_with_containers.commit()
|
||||
|
||||
return account
|
||||
|
||||
def _create_test_tenant_and_account(self, db_session_with_containers, fake=None):
|
||||
"""
|
||||
Helper method to create a test tenant and account for testing.
|
||||
|
||||
Args:
|
||||
db_session_with_containers: Database session from testcontainers infrastructure
|
||||
fake: Faker instance for generating test data
|
||||
|
||||
Returns:
|
||||
tuple: (Account, Tenant) created instances
|
||||
"""
|
||||
if fake is None:
|
||||
fake = Faker()
|
||||
|
||||
# Create account using the existing helper method
|
||||
account = self._create_test_account(db_session_with_containers, fake)
|
||||
|
||||
# Create tenant
|
||||
tenant = Tenant(
|
||||
name=fake.company(),
|
||||
plan="basic",
|
||||
status="active",
|
||||
)
|
||||
|
||||
db_session_with_containers.add(tenant)
|
||||
db_session_with_containers.commit()
|
||||
|
||||
# Create tenant-account relationship
|
||||
tenant_account_join = TenantAccountJoin(
|
||||
tenant_id=tenant.id,
|
||||
account_id=account.id,
|
||||
role=TenantAccountRole.OWNER,
|
||||
)
|
||||
|
||||
db_session_with_containers.add(tenant_account_join)
|
||||
db_session_with_containers.commit()
|
||||
|
||||
return account, tenant
|
||||
|
||||
def test_send_email_code_login_mail_task_success_english(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test successful email code login mail sending in English.
|
||||
|
||||
This test verifies that the task can successfully:
|
||||
1. Send email code login mail with English language
|
||||
2. Use proper email service integration
|
||||
3. Pass correct template context to email service
|
||||
4. Log performance metrics correctly
|
||||
5. Complete task execution without errors
|
||||
"""
|
||||
# Arrange: Setup test data
|
||||
fake = Faker()
|
||||
test_email = fake.email()
|
||||
test_code = "123456"
|
||||
test_language = "en-US"
|
||||
|
||||
# Act: Execute the task
|
||||
send_email_code_login_mail_task(
|
||||
language=test_language,
|
||||
to=test_email,
|
||||
code=test_code,
|
||||
)
|
||||
|
||||
# Assert: Verify expected outcomes
|
||||
mock_mail = mock_external_service_dependencies["mail"]
|
||||
mock_email_service_instance = mock_external_service_dependencies["email_service_instance"]
|
||||
|
||||
# Verify mail service was checked for initialization
|
||||
mock_mail.is_inited.assert_called_once()
|
||||
|
||||
# Verify email service was called with correct parameters
|
||||
mock_email_service_instance.send_email.assert_called_once_with(
|
||||
email_type=EmailType.EMAIL_CODE_LOGIN,
|
||||
language_code=test_language,
|
||||
to=test_email,
|
||||
template_context={
|
||||
"to": test_email,
|
||||
"code": test_code,
|
||||
},
|
||||
)
|
||||
|
||||
def test_send_email_code_login_mail_task_success_chinese(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test successful email code login mail sending in Chinese.
|
||||
|
||||
This test verifies that the task can successfully:
|
||||
1. Send email code login mail with Chinese language
|
||||
2. Handle different language codes properly
|
||||
3. Use correct template context for Chinese emails
|
||||
4. Complete task execution without errors
|
||||
"""
|
||||
# Arrange: Setup test data
|
||||
fake = Faker()
|
||||
test_email = fake.email()
|
||||
test_code = "789012"
|
||||
test_language = "zh-Hans"
|
||||
|
||||
# Act: Execute the task
|
||||
send_email_code_login_mail_task(
|
||||
language=test_language,
|
||||
to=test_email,
|
||||
code=test_code,
|
||||
)
|
||||
|
||||
# Assert: Verify expected outcomes
|
||||
mock_email_service_instance = mock_external_service_dependencies["email_service_instance"]
|
||||
|
||||
# Verify email service was called with Chinese language
|
||||
mock_email_service_instance.send_email.assert_called_once_with(
|
||||
email_type=EmailType.EMAIL_CODE_LOGIN,
|
||||
language_code=test_language,
|
||||
to=test_email,
|
||||
template_context={
|
||||
"to": test_email,
|
||||
"code": test_code,
|
||||
},
|
||||
)
|
||||
|
||||
def test_send_email_code_login_mail_task_success_multiple_languages(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test successful email code login mail sending with multiple languages.
|
||||
|
||||
This test verifies that the task can successfully:
|
||||
1. Handle various language codes correctly
|
||||
2. Send emails with different language configurations
|
||||
3. Maintain proper template context for each language
|
||||
4. Complete multiple task executions without conflicts
|
||||
"""
|
||||
# Arrange: Setup test data
|
||||
fake = Faker()
|
||||
test_languages = ["en-US", "zh-Hans", "zh-CN", "ja-JP", "ko-KR"]
|
||||
test_emails = [fake.email() for _ in test_languages]
|
||||
test_codes = [fake.numerify("######") for _ in test_languages]
|
||||
|
||||
# Act: Execute the task for each language
|
||||
for i, language in enumerate(test_languages):
|
||||
send_email_code_login_mail_task(
|
||||
language=language,
|
||||
to=test_emails[i],
|
||||
code=test_codes[i],
|
||||
)
|
||||
|
||||
# Assert: Verify expected outcomes
|
||||
mock_email_service_instance = mock_external_service_dependencies["email_service_instance"]
|
||||
|
||||
# Verify email service was called for each language
|
||||
assert mock_email_service_instance.send_email.call_count == len(test_languages)
|
||||
|
||||
# Verify each call had correct parameters
|
||||
for i, language in enumerate(test_languages):
|
||||
call_args = mock_email_service_instance.send_email.call_args_list[i]
|
||||
assert call_args[1]["email_type"] == EmailType.EMAIL_CODE_LOGIN
|
||||
assert call_args[1]["language_code"] == language
|
||||
assert call_args[1]["to"] == test_emails[i]
|
||||
assert call_args[1]["template_context"]["code"] == test_codes[i]
|
||||
|
||||
def test_send_email_code_login_mail_task_mail_not_initialized(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test email code login mail task when mail service is not initialized.
|
||||
|
||||
This test verifies that the task can properly:
|
||||
1. Check mail service initialization status
|
||||
2. Return early when mail is not initialized
|
||||
3. Not attempt to send email when service is unavailable
|
||||
4. Handle gracefully without errors
|
||||
"""
|
||||
# Arrange: Setup test data
|
||||
fake = Faker()
|
||||
test_email = fake.email()
|
||||
test_code = "123456"
|
||||
test_language = "en-US"
|
||||
|
||||
# Mock mail service as not initialized
|
||||
mock_mail = mock_external_service_dependencies["mail"]
|
||||
mock_mail.is_inited.return_value = False
|
||||
|
||||
# Act: Execute the task
|
||||
send_email_code_login_mail_task(
|
||||
language=test_language,
|
||||
to=test_email,
|
||||
code=test_code,
|
||||
)
|
||||
|
||||
# Assert: Verify expected outcomes
|
||||
mock_email_service_instance = mock_external_service_dependencies["email_service_instance"]
|
||||
|
||||
# Verify mail service was checked for initialization
|
||||
mock_mail.is_inited.assert_called_once()
|
||||
|
||||
# Verify email service was not called
|
||||
mock_email_service_instance.send_email.assert_not_called()
|
||||
|
||||
def test_send_email_code_login_mail_task_email_service_exception(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test email code login mail task when email service raises an exception.
|
||||
|
||||
This test verifies that the task can properly:
|
||||
1. Handle email service exceptions gracefully
|
||||
2. Log appropriate error messages
|
||||
3. Continue execution without crashing
|
||||
4. Maintain proper error handling
|
||||
"""
|
||||
# Arrange: Setup test data
|
||||
fake = Faker()
|
||||
test_email = fake.email()
|
||||
test_code = "123456"
|
||||
test_language = "en-US"
|
||||
|
||||
# Mock email service to raise an exception
|
||||
mock_email_service_instance = mock_external_service_dependencies["email_service_instance"]
|
||||
mock_email_service_instance.send_email.side_effect = Exception("Email service unavailable")
|
||||
|
||||
# Act: Execute the task - it should handle the exception gracefully
|
||||
send_email_code_login_mail_task(
|
||||
language=test_language,
|
||||
to=test_email,
|
||||
code=test_code,
|
||||
)
|
||||
|
||||
# Assert: Verify expected outcomes
|
||||
mock_mail = mock_external_service_dependencies["mail"]
|
||||
mock_email_service_instance = mock_external_service_dependencies["email_service_instance"]
|
||||
|
||||
# Verify mail service was checked for initialization
|
||||
mock_mail.is_inited.assert_called_once()
|
||||
|
||||
# Verify email service was called (and failed)
|
||||
mock_email_service_instance.send_email.assert_called_once_with(
|
||||
email_type=EmailType.EMAIL_CODE_LOGIN,
|
||||
language_code=test_language,
|
||||
to=test_email,
|
||||
template_context={
|
||||
"to": test_email,
|
||||
"code": test_code,
|
||||
},
|
||||
)
|
||||
|
||||
def test_send_email_code_login_mail_task_invalid_parameters(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test email code login mail task with invalid parameters.
|
||||
|
||||
This test verifies that the task can properly:
|
||||
1. Handle empty or None email addresses
|
||||
2. Process empty or None verification codes
|
||||
3. Handle invalid language codes
|
||||
4. Maintain proper error handling for invalid inputs
|
||||
"""
|
||||
# Arrange: Setup test data
|
||||
fake = Faker()
|
||||
test_language = "en-US"
|
||||
|
||||
# Test cases for invalid parameters
|
||||
invalid_test_cases = [
|
||||
{"email": "", "code": "123456", "description": "empty email"},
|
||||
{"email": None, "code": "123456", "description": "None email"},
|
||||
{"email": fake.email(), "code": "", "description": "empty code"},
|
||||
{"email": fake.email(), "code": None, "description": "None code"},
|
||||
{"email": "invalid-email", "code": "123456", "description": "invalid email format"},
|
||||
]
|
||||
|
||||
for test_case in invalid_test_cases:
|
||||
# Reset mocks for each test case
|
||||
mock_email_service_instance = mock_external_service_dependencies["email_service_instance"]
|
||||
mock_email_service_instance.reset_mock()
|
||||
|
||||
# Act: Execute the task with invalid parameters
|
||||
send_email_code_login_mail_task(
|
||||
language=test_language,
|
||||
to=test_case["email"],
|
||||
code=test_case["code"],
|
||||
)
|
||||
|
||||
# Assert: Verify that email service was still called
|
||||
# The task should pass parameters to email service as-is
|
||||
# and let the email service handle validation
|
||||
mock_email_service_instance.send_email.assert_called_once()
|
||||
|
||||
def test_send_email_code_login_mail_task_edge_cases(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test email code login mail task with edge cases and boundary conditions.
|
||||
|
||||
This test verifies that the task can properly:
|
||||
1. Handle very long email addresses
|
||||
2. Process very long verification codes
|
||||
3. Handle special characters in parameters
|
||||
4. Process extreme language codes
|
||||
"""
|
||||
# Arrange: Setup test data
|
||||
fake = Faker()
|
||||
test_language = "en-US"
|
||||
|
||||
# Edge case test data
|
||||
edge_cases = [
|
||||
{
|
||||
"email": "a" * 100 + "@example.com", # Very long email
|
||||
"code": "1" * 20, # Very long code
|
||||
"description": "very long email and code",
|
||||
},
|
||||
{
|
||||
"email": "test+tag@example.com", # Email with special characters
|
||||
"code": "123-456", # Code with special characters
|
||||
"description": "special characters",
|
||||
},
|
||||
{
|
||||
"email": "test@sub.domain.example.com", # Complex domain
|
||||
"code": "000000", # All zeros
|
||||
"description": "complex domain and all zeros code",
|
||||
},
|
||||
{
|
||||
"email": "test@example.co.uk", # International domain
|
||||
"code": "999999", # All nines
|
||||
"description": "international domain and all nines code",
|
||||
},
|
||||
]
|
||||
|
||||
for test_case in edge_cases:
|
||||
# Reset mocks for each test case
|
||||
mock_email_service_instance = mock_external_service_dependencies["email_service_instance"]
|
||||
mock_email_service_instance.reset_mock()
|
||||
|
||||
# Act: Execute the task with edge case data
|
||||
send_email_code_login_mail_task(
|
||||
language=test_language,
|
||||
to=test_case["email"],
|
||||
code=test_case["code"],
|
||||
)
|
||||
|
||||
# Assert: Verify that email service was called with edge case data
|
||||
mock_email_service_instance.send_email.assert_called_once_with(
|
||||
email_type=EmailType.EMAIL_CODE_LOGIN,
|
||||
language_code=test_language,
|
||||
to=test_case["email"],
|
||||
template_context={
|
||||
"to": test_case["email"],
|
||||
"code": test_case["code"],
|
||||
},
|
||||
)
|
||||
|
||||
def test_send_email_code_login_mail_task_database_integration(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test email code login mail task with database integration.
|
||||
|
||||
This test verifies that the task can properly:
|
||||
1. Work with real database connections
|
||||
2. Handle database session management
|
||||
3. Maintain proper database state
|
||||
4. Complete without database-related errors
|
||||
"""
|
||||
# Arrange: Setup test data with database
|
||||
fake = Faker()
|
||||
account, tenant = self._create_test_tenant_and_account(db_session_with_containers, fake)
|
||||
|
||||
test_email = account.email
|
||||
test_code = "123456"
|
||||
test_language = "en-US"
|
||||
|
||||
# Act: Execute the task
|
||||
send_email_code_login_mail_task(
|
||||
language=test_language,
|
||||
to=test_email,
|
||||
code=test_code,
|
||||
)
|
||||
|
||||
# Assert: Verify expected outcomes
|
||||
mock_email_service_instance = mock_external_service_dependencies["email_service_instance"]
|
||||
|
||||
# Verify email service was called with database account email
|
||||
mock_email_service_instance.send_email.assert_called_once_with(
|
||||
email_type=EmailType.EMAIL_CODE_LOGIN,
|
||||
language_code=test_language,
|
||||
to=test_email,
|
||||
template_context={
|
||||
"to": test_email,
|
||||
"code": test_code,
|
||||
},
|
||||
)
|
||||
|
||||
# Verify database state is maintained
|
||||
db_session_with_containers.refresh(account)
|
||||
assert account.email == test_email
|
||||
assert account.status == "active"
|
||||
|
||||
def test_send_email_code_login_mail_task_redis_integration(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test email code login mail task with Redis integration.
|
||||
|
||||
This test verifies that the task can properly:
|
||||
1. Work with Redis cache connections
|
||||
2. Handle Redis operations without errors
|
||||
3. Maintain proper cache state
|
||||
4. Complete without Redis-related errors
|
||||
"""
|
||||
# Arrange: Setup test data
|
||||
fake = Faker()
|
||||
test_email = fake.email()
|
||||
test_code = "123456"
|
||||
test_language = "en-US"
|
||||
|
||||
# Setup Redis cache data
|
||||
from extensions.ext_redis import redis_client
|
||||
|
||||
cache_key = f"email_code_login_test_{test_email}"
|
||||
redis_client.set(cache_key, "test_value", ex=300)
|
||||
|
||||
# Act: Execute the task
|
||||
send_email_code_login_mail_task(
|
||||
language=test_language,
|
||||
to=test_email,
|
||||
code=test_code,
|
||||
)
|
||||
|
||||
# Assert: Verify expected outcomes
|
||||
mock_email_service_instance = mock_external_service_dependencies["email_service_instance"]
|
||||
|
||||
# Verify email service was called
|
||||
mock_email_service_instance.send_email.assert_called_once()
|
||||
|
||||
# Verify Redis cache is still accessible
|
||||
assert redis_client.exists(cache_key) == 1
|
||||
assert redis_client.get(cache_key) == b"test_value"
|
||||
|
||||
# Clean up Redis cache
|
||||
redis_client.delete(cache_key)
|
||||
|
||||
def test_send_email_code_login_mail_task_error_handling_comprehensive(
|
||||
self, db_session_with_containers, mock_external_service_dependencies
|
||||
):
|
||||
"""
|
||||
Test comprehensive error handling for email code login mail task.
|
||||
|
||||
This test verifies that the task can properly:
|
||||
1. Handle various types of exceptions
|
||||
2. Log appropriate error messages
|
||||
3. Continue execution despite errors
|
||||
4. Maintain proper error reporting
|
||||
"""
|
||||
# Arrange: Setup test data
|
||||
fake = Faker()
|
||||
test_email = fake.email()
|
||||
test_code = "123456"
|
||||
test_language = "en-US"
|
||||
|
||||
# Test different exception types
|
||||
exception_types = [
|
||||
("ValueError", ValueError("Invalid email format")),
|
||||
("RuntimeError", RuntimeError("Service unavailable")),
|
||||
("ConnectionError", ConnectionError("Network error")),
|
||||
("TimeoutError", TimeoutError("Request timeout")),
|
||||
("Exception", Exception("Generic error")),
|
||||
]
|
||||
|
||||
for error_name, exception in exception_types:
|
||||
# Reset mocks for each test case
|
||||
mock_email_service_instance = mock_external_service_dependencies["email_service_instance"]
|
||||
mock_email_service_instance.reset_mock()
|
||||
mock_email_service_instance.send_email.side_effect = exception
|
||||
|
||||
# Mock logging to capture error messages
|
||||
with patch("tasks.mail_email_code_login.logger") as mock_logger:
|
||||
# Act: Execute the task - it should handle the exception gracefully
|
||||
send_email_code_login_mail_task(
|
||||
language=test_language,
|
||||
to=test_email,
|
||||
code=test_code,
|
||||
)
|
||||
|
||||
# Assert: Verify error handling
|
||||
# Verify email service was called (and failed)
|
||||
mock_email_service_instance.send_email.assert_called_once()
|
||||
|
||||
# Verify error was logged
|
||||
error_calls = [
|
||||
call
|
||||
for call in mock_logger.exception.call_args_list
|
||||
if f"Send email code login mail to {test_email} failed" in str(call)
|
||||
]
|
||||
# Check if any exception call was made (the exact message format may vary)
|
||||
assert mock_logger.exception.call_count >= 1, f"Error should be logged for {error_name}"
|
||||
|
||||
# Reset side effect for next iteration
|
||||
mock_email_service_instance.send_email.side_effect = None
|
||||
@ -33,6 +33,7 @@ def test_dify_config(monkeypatch: pytest.MonkeyPatch):
|
||||
assert config.EDITION == "SELF_HOSTED"
|
||||
assert config.API_COMPRESSION_ENABLED is False
|
||||
assert config.SENTRY_TRACES_SAMPLE_RATE == 1.0
|
||||
assert config.TEMPLATE_TRANSFORM_MAX_LENGTH == 400_000
|
||||
|
||||
# annotated field with default value
|
||||
assert config.HTTP_REQUEST_MAX_READ_TIMEOUT == 600
|
||||
|
||||
@ -172,73 +172,31 @@ class TestSupabaseStorage:
|
||||
assert "test-bucket" in [call[0][0] for call in mock_client.storage.from_.call_args_list if call[0]]
|
||||
mock_client.storage.from_().download.assert_called_with("test.txt")
|
||||
|
||||
def test_exists_with_list_containing_items(self, storage_with_mock_client):
|
||||
"""Test exists returns True when list() returns items (using len() > 0)."""
|
||||
def test_exists_returns_true_when_file_found(self, storage_with_mock_client):
|
||||
"""Test exists returns True when list() returns items."""
|
||||
storage, mock_client = storage_with_mock_client
|
||||
|
||||
# Mock list return with special object that has count() method
|
||||
mock_list_result = Mock()
|
||||
mock_list_result.count.return_value = 1
|
||||
mock_client.storage.from_().list.return_value = mock_list_result
|
||||
mock_client.storage.from_().list.return_value = [{"name": "test.txt"}]
|
||||
|
||||
result = storage.exists("test.txt")
|
||||
|
||||
assert result is True
|
||||
# from_ gets called during init too, so just check it was called with the right bucket
|
||||
assert "test-bucket" in [call[0][0] for call in mock_client.storage.from_.call_args_list if call[0]]
|
||||
mock_client.storage.from_().list.assert_called_with("test.txt")
|
||||
mock_client.storage.from_().list.assert_called_with(path="test.txt")
|
||||
|
||||
def test_exists_with_count_method_greater_than_zero(self, storage_with_mock_client):
|
||||
"""Test exists returns True when list result has count() > 0."""
|
||||
def test_exists_returns_false_when_file_not_found(self, storage_with_mock_client):
|
||||
"""Test exists returns False when list() returns an empty list."""
|
||||
storage, mock_client = storage_with_mock_client
|
||||
|
||||
# Mock list return with count() method
|
||||
mock_list_result = Mock()
|
||||
mock_list_result.count.return_value = 1
|
||||
mock_client.storage.from_().list.return_value = mock_list_result
|
||||
|
||||
result = storage.exists("test.txt")
|
||||
|
||||
assert result is True
|
||||
# Verify the correct calls were made
|
||||
assert "test-bucket" in [call[0][0] for call in mock_client.storage.from_.call_args_list if call[0]]
|
||||
mock_client.storage.from_().list.assert_called_with("test.txt")
|
||||
mock_list_result.count.assert_called()
|
||||
|
||||
def test_exists_with_count_method_zero(self, storage_with_mock_client):
|
||||
"""Test exists returns False when list result has count() == 0."""
|
||||
storage, mock_client = storage_with_mock_client
|
||||
|
||||
# Mock list return with count() method returning 0
|
||||
mock_list_result = Mock()
|
||||
mock_list_result.count.return_value = 0
|
||||
mock_client.storage.from_().list.return_value = mock_list_result
|
||||
mock_client.storage.from_().list.return_value = []
|
||||
|
||||
result = storage.exists("test.txt")
|
||||
|
||||
assert result is False
|
||||
# Verify the correct calls were made
|
||||
assert "test-bucket" in [call[0][0] for call in mock_client.storage.from_.call_args_list if call[0]]
|
||||
mock_client.storage.from_().list.assert_called_with("test.txt")
|
||||
mock_list_result.count.assert_called()
|
||||
mock_client.storage.from_().list.assert_called_with(path="test.txt")
|
||||
|
||||
def test_exists_with_empty_list(self, storage_with_mock_client):
|
||||
"""Test exists returns False when list() returns empty list."""
|
||||
storage, mock_client = storage_with_mock_client
|
||||
|
||||
# Mock list return with special object that has count() method returning 0
|
||||
mock_list_result = Mock()
|
||||
mock_list_result.count.return_value = 0
|
||||
mock_client.storage.from_().list.return_value = mock_list_result
|
||||
|
||||
result = storage.exists("test.txt")
|
||||
|
||||
assert result is False
|
||||
# Verify the correct calls were made
|
||||
assert "test-bucket" in [call[0][0] for call in mock_client.storage.from_.call_args_list if call[0]]
|
||||
mock_client.storage.from_().list.assert_called_with("test.txt")
|
||||
|
||||
def test_delete_calls_remove_with_filename(self, storage_with_mock_client):
|
||||
def test_delete_calls_remove_with_filename_in_list(self, storage_with_mock_client):
|
||||
"""Test delete calls remove([...]) (some client versions require a list)."""
|
||||
storage, mock_client = storage_with_mock_client
|
||||
|
||||
@ -247,7 +205,7 @@ class TestSupabaseStorage:
|
||||
storage.delete(filename)
|
||||
|
||||
mock_client.storage.from_.assert_called_once_with("test-bucket")
|
||||
mock_client.storage.from_().remove.assert_called_once_with(filename)
|
||||
mock_client.storage.from_().remove.assert_called_once_with([filename])
|
||||
|
||||
def test_bucket_exists_returns_true_when_bucket_found(self):
|
||||
"""Test bucket_exists returns True when bucket is found in list."""
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from tos import TosClientV2 # type: ignore
|
||||
|
||||
@ -13,7 +15,13 @@ class TestVolcengineTos(BaseStorageTest):
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_method(self, setup_volcengine_tos_mock):
|
||||
"""Executed before each test method."""
|
||||
self.storage = VolcengineTosStorage()
|
||||
with patch("extensions.storage.volcengine_tos_storage.dify_config") as mock_config:
|
||||
mock_config.VOLCENGINE_TOS_ACCESS_KEY = "test_access_key"
|
||||
mock_config.VOLCENGINE_TOS_SECRET_KEY = "test_secret_key"
|
||||
mock_config.VOLCENGINE_TOS_ENDPOINT = "test_endpoint"
|
||||
mock_config.VOLCENGINE_TOS_REGION = "test_region"
|
||||
self.storage = VolcengineTosStorage()
|
||||
|
||||
self.storage.bucket_name = get_example_bucket()
|
||||
self.storage.client = TosClientV2(
|
||||
ak="dify",
|
||||
|
||||
2
api/uv.lock
generated
2
api/uv.lock
generated
@ -1328,7 +1328,6 @@ dependencies = [
|
||||
{ name = "mailchimp-transactional" },
|
||||
{ name = "markdown" },
|
||||
{ name = "numpy" },
|
||||
{ name = "openai" },
|
||||
{ name = "openpyxl" },
|
||||
{ name = "opentelemetry-api" },
|
||||
{ name = "opentelemetry-distro" },
|
||||
@ -1523,7 +1522,6 @@ requires-dist = [
|
||||
{ name = "mailchimp-transactional", specifier = "~=1.0.50" },
|
||||
{ name = "markdown", specifier = "~=3.5.1" },
|
||||
{ name = "numpy", specifier = "~=1.26.4" },
|
||||
{ name = "openai", specifier = "~=1.61.0" },
|
||||
{ name = "openpyxl", specifier = "~=3.1.5" },
|
||||
{ name = "opentelemetry-api", specifier = "==1.27.0" },
|
||||
{ name = "opentelemetry-distro", specifier = "==0.48b0" },
|
||||
|
||||
Reference in New Issue
Block a user