Merge branch 'main' into feat/pull-a-variable

This commit is contained in:
zhsama
2026-01-15 16:14:15 +08:00
141 changed files with 17063 additions and 4450 deletions

View File

@ -0,0 +1 @@
"""Unit tests for `controllers.console.datasets` controllers."""

View File

@ -0,0 +1,49 @@
from __future__ import annotations
"""
Unit tests for the external dataset controller payload schemas.
These tests focus on Pydantic validation rules so we can catch regressions
in request constraints (e.g. max length changes) without exercising the
full Flask/RESTX request stack.
"""
import pytest
from pydantic import ValidationError
from controllers.console.datasets.external import ExternalDatasetCreatePayload
def test_external_dataset_create_payload_allows_name_length_100() -> None:
"""Ensure the `name` field accepts up to 100 characters (inclusive)."""
# Build a request payload with a boundary-length name value.
name_100: str = "a" * 100
payload = {
"external_knowledge_api_id": "ek-api-1",
"external_knowledge_id": "ek-1",
"name": name_100,
}
model = ExternalDatasetCreatePayload.model_validate(payload)
assert model.name == name_100
def test_external_dataset_create_payload_rejects_name_length_101() -> None:
"""Ensure the `name` field rejects values longer than 100 characters."""
# Build a request payload that exceeds the max length by 1.
name_101: str = "a" * 101
payload: dict[str, object] = {
"external_knowledge_api_id": "ek-api-1",
"external_knowledge_id": "ek-1",
"name": name_101,
}
with pytest.raises(ValidationError) as exc_info:
ExternalDatasetCreatePayload.model_validate(payload)
errors = exc_info.value.errors()
assert errors[0]["loc"] == ("name",)
assert errors[0]["type"] == "string_too_long"
assert errors[0]["ctx"]["max_length"] == 100

View File

@ -0,0 +1,279 @@
"""Unit tests for PluginEndpointClient functionality.
This test module covers the endpoint client operations including:
- Successful endpoint deletion
- Idempotent delete behavior (record not found)
- Non-idempotent delete behavior (other errors)
Tests follow the Arrange-Act-Assert pattern for clarity.
"""
from unittest.mock import MagicMock, patch
import pytest
from core.plugin.impl.endpoint import PluginEndpointClient
from core.plugin.impl.exc import PluginDaemonInternalServerError
class TestPluginEndpointClientDelete:
"""Unit tests for PluginEndpointClient delete_endpoint operation.
Tests cover:
- Successful endpoint deletion
- Idempotent behavior when endpoint is already deleted (record not found)
- Non-idempotent behavior for other errors
"""
@pytest.fixture
def endpoint_client(self):
"""Create a PluginEndpointClient instance for testing."""
return PluginEndpointClient()
@pytest.fixture
def mock_config(self):
"""Mock plugin daemon configuration."""
with (
patch("core.plugin.impl.base.dify_config.PLUGIN_DAEMON_URL", "http://127.0.0.1:5002"),
patch("core.plugin.impl.base.dify_config.PLUGIN_DAEMON_KEY", "test-api-key"),
):
yield
def test_delete_endpoint_success(self, endpoint_client, mock_config):
"""Test successful endpoint deletion.
Given:
- A valid tenant_id, user_id, and endpoint_id
- The plugin daemon returns success response
When:
- delete_endpoint is called
Then:
- The method should return True
- The request should be made with correct parameters
"""
# Arrange
tenant_id = "tenant-123"
user_id = "user-456"
endpoint_id = "endpoint-789"
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"code": 0,
"message": "success",
"data": True,
}
with patch("httpx.request", return_value=mock_response):
# Act
result = endpoint_client.delete_endpoint(
tenant_id=tenant_id,
user_id=user_id,
endpoint_id=endpoint_id,
)
# Assert
assert result is True
def test_delete_endpoint_idempotent_record_not_found(self, endpoint_client, mock_config):
"""Test idempotent delete behavior when endpoint is already deleted.
Given:
- A valid tenant_id, user_id, and endpoint_id
- The plugin daemon returns "record not found" error
When:
- delete_endpoint is called
Then:
- The method should return True (idempotent behavior)
- No exception should be raised
"""
# Arrange
tenant_id = "tenant-123"
user_id = "user-456"
endpoint_id = "endpoint-789"
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"code": -1,
"message": (
'{"error_type": "PluginDaemonInternalServerError", '
'"message": "failed to remove endpoint: record not found"}'
),
}
with patch("httpx.request", return_value=mock_response):
# Act
result = endpoint_client.delete_endpoint(
tenant_id=tenant_id,
user_id=user_id,
endpoint_id=endpoint_id,
)
# Assert - should return True instead of raising an error
assert result is True
def test_delete_endpoint_non_idempotent_other_errors(self, endpoint_client, mock_config):
"""Test non-idempotent delete behavior for other errors.
Given:
- A valid tenant_id, user_id, and endpoint_id
- The plugin daemon returns a different error (not "record not found")
When:
- delete_endpoint is called
Then:
- The method should raise PluginDaemonInternalServerError
"""
# Arrange
tenant_id = "tenant-123"
user_id = "user-456"
endpoint_id = "endpoint-789"
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"code": -1,
"message": (
'{"error_type": "PluginDaemonInternalServerError", '
'"message": "failed to remove endpoint: internal server error"}'
),
}
with patch("httpx.request", return_value=mock_response):
# Act & Assert
with pytest.raises(PluginDaemonInternalServerError) as exc_info:
endpoint_client.delete_endpoint(
tenant_id=tenant_id,
user_id=user_id,
endpoint_id=endpoint_id,
)
# Assert - the error message should not be "record not found"
assert "record not found" not in str(exc_info.value.description)
def test_delete_endpoint_idempotent_case_insensitive(self, endpoint_client, mock_config):
"""Test idempotent delete behavior with case-insensitive error message.
Given:
- A valid tenant_id, user_id, and endpoint_id
- The plugin daemon returns "Record Not Found" error (different case)
When:
- delete_endpoint is called
Then:
- The method should return True (idempotent behavior)
"""
# Arrange
tenant_id = "tenant-123"
user_id = "user-456"
endpoint_id = "endpoint-789"
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"code": -1,
"message": '{"error_type": "PluginDaemonInternalServerError", "message": "Record Not Found"}',
}
with patch("httpx.request", return_value=mock_response):
# Act
result = endpoint_client.delete_endpoint(
tenant_id=tenant_id,
user_id=user_id,
endpoint_id=endpoint_id,
)
# Assert - should still return True
assert result is True
def test_delete_endpoint_multiple_calls_idempotent(self, endpoint_client, mock_config):
"""Test that multiple delete calls are idempotent.
Given:
- A valid tenant_id, user_id, and endpoint_id
- The first call succeeds
- Subsequent calls return "record not found"
When:
- delete_endpoint is called multiple times
Then:
- All calls should return True
"""
# Arrange
tenant_id = "tenant-123"
user_id = "user-456"
endpoint_id = "endpoint-789"
# First call - success
mock_response_success = MagicMock()
mock_response_success.status_code = 200
mock_response_success.json.return_value = {
"code": 0,
"message": "success",
"data": True,
}
# Second call - record not found
mock_response_not_found = MagicMock()
mock_response_not_found.status_code = 200
mock_response_not_found.json.return_value = {
"code": -1,
"message": (
'{"error_type": "PluginDaemonInternalServerError", '
'"message": "failed to remove endpoint: record not found"}'
),
}
with patch("httpx.request") as mock_request:
# Act - first call
mock_request.return_value = mock_response_success
result1 = endpoint_client.delete_endpoint(
tenant_id=tenant_id,
user_id=user_id,
endpoint_id=endpoint_id,
)
# Act - second call (already deleted)
mock_request.return_value = mock_response_not_found
result2 = endpoint_client.delete_endpoint(
tenant_id=tenant_id,
user_id=user_id,
endpoint_id=endpoint_id,
)
# Assert - both should return True
assert result1 is True
assert result2 is True
def test_delete_endpoint_non_idempotent_unauthorized_error(self, endpoint_client, mock_config):
"""Test that authorization errors are not treated as idempotent.
Given:
- A valid tenant_id, user_id, and endpoint_id
- The plugin daemon returns an unauthorized error
When:
- delete_endpoint is called
Then:
- The method should raise the appropriate error (not return True)
"""
# Arrange
tenant_id = "tenant-123"
user_id = "user-456"
endpoint_id = "endpoint-789"
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"code": -1,
"message": '{"error_type": "PluginDaemonUnauthorizedError", "message": "unauthorized access"}',
}
with patch("httpx.request", return_value=mock_response):
# Act & Assert
with pytest.raises(Exception) as exc_info:
endpoint_client.delete_endpoint(
tenant_id=tenant_id,
user_id=user_id,
endpoint_id=endpoint_id,
)
# Assert - should not return True for unauthorized errors
assert exc_info.value.__class__.__name__ == "PluginDaemonUnauthorizedError"

View File

@ -0,0 +1 @@
"""LogStore extension unit tests."""

View File

@ -0,0 +1,469 @@
"""
Unit tests for SQL escape utility functions.
These tests ensure that SQL injection attacks are properly prevented
in LogStore queries, particularly for cross-tenant access scenarios.
"""
import pytest
from extensions.logstore.sql_escape import escape_identifier, escape_logstore_query_value, escape_sql_string
class TestEscapeSQLString:
"""Test escape_sql_string function."""
def test_escape_empty_string(self):
"""Test escaping empty string."""
assert escape_sql_string("") == ""
def test_escape_normal_string(self):
"""Test escaping string without special characters."""
assert escape_sql_string("tenant_abc123") == "tenant_abc123"
assert escape_sql_string("app-uuid-1234") == "app-uuid-1234"
def test_escape_single_quote(self):
"""Test escaping single quote."""
# Single quote should be doubled
assert escape_sql_string("tenant'id") == "tenant''id"
assert escape_sql_string("O'Reilly") == "O''Reilly"
def test_escape_multiple_quotes(self):
"""Test escaping multiple single quotes."""
assert escape_sql_string("a'b'c") == "a''b''c"
assert escape_sql_string("'''") == "''''''"
# === SQL Injection Attack Scenarios ===
def test_prevent_boolean_injection(self):
"""Test prevention of boolean injection attacks."""
# Classic OR 1=1 attack
malicious_input = "tenant' OR '1'='1"
escaped = escape_sql_string(malicious_input)
assert escaped == "tenant'' OR ''1''=''1"
# When used in SQL, this becomes a safe string literal
sql = f"WHERE tenant_id='{escaped}'"
assert sql == "WHERE tenant_id='tenant'' OR ''1''=''1'"
# The entire input is now a string literal that won't match any tenant
def test_prevent_or_injection(self):
"""Test prevention of OR-based injection."""
malicious_input = "tenant_a' OR tenant_id='tenant_b"
escaped = escape_sql_string(malicious_input)
assert escaped == "tenant_a'' OR tenant_id=''tenant_b"
sql = f"WHERE tenant_id='{escaped}'"
# The OR is now part of the string literal, not SQL logic
assert "OR tenant_id=" in sql
# The SQL has: opening ', doubled internal quotes '', and closing '
assert sql == "WHERE tenant_id='tenant_a'' OR tenant_id=''tenant_b'"
def test_prevent_union_injection(self):
"""Test prevention of UNION-based injection."""
malicious_input = "xxx' UNION SELECT password FROM users WHERE '1'='1"
escaped = escape_sql_string(malicious_input)
assert escaped == "xxx'' UNION SELECT password FROM users WHERE ''1''=''1"
# UNION becomes part of the string literal
assert "UNION" in escaped
assert escaped.count("''") == 4 # All internal quotes are doubled
def test_prevent_comment_injection(self):
"""Test prevention of comment-based injection."""
# SQL comment to bypass remaining conditions
malicious_input = "tenant' --"
escaped = escape_sql_string(malicious_input)
assert escaped == "tenant'' --"
sql = f"WHERE tenant_id='{escaped}' AND deleted=false"
# The -- is now inside the string, not a SQL comment
assert "--" in sql
assert "AND deleted=false" in sql # This part is NOT commented out
def test_prevent_semicolon_injection(self):
"""Test prevention of semicolon-based multi-statement injection."""
malicious_input = "tenant'; DROP TABLE users; --"
escaped = escape_sql_string(malicious_input)
assert escaped == "tenant''; DROP TABLE users; --"
# Semicolons and DROP are now part of the string
assert "DROP TABLE" in escaped
def test_prevent_time_based_blind_injection(self):
"""Test prevention of time-based blind SQL injection."""
malicious_input = "tenant' AND SLEEP(5) --"
escaped = escape_sql_string(malicious_input)
assert escaped == "tenant'' AND SLEEP(5) --"
# SLEEP becomes part of the string
assert "SLEEP" in escaped
def test_prevent_wildcard_injection(self):
"""Test prevention of wildcard-based injection."""
malicious_input = "tenant' OR tenant_id LIKE '%"
escaped = escape_sql_string(malicious_input)
assert escaped == "tenant'' OR tenant_id LIKE ''%"
# The LIKE and wildcard are now part of the string
assert "LIKE" in escaped
def test_prevent_null_byte_injection(self):
"""Test handling of null bytes."""
# Null bytes can sometimes bypass filters
malicious_input = "tenant\x00' OR '1'='1"
escaped = escape_sql_string(malicious_input)
# Null byte is preserved, but quote is escaped
assert "''1''=''1" in escaped
# === Real-world SAAS Scenarios ===
def test_cross_tenant_access_attempt(self):
"""Test prevention of cross-tenant data access."""
# Attacker tries to access another tenant's data
attacker_input = "tenant_b' OR tenant_id='tenant_a"
escaped = escape_sql_string(attacker_input)
sql = f"SELECT * FROM workflow_runs WHERE tenant_id='{escaped}'"
# The query will look for a tenant literally named "tenant_b' OR tenant_id='tenant_a"
# which doesn't exist - preventing access to either tenant's data
assert "tenant_b'' OR tenant_id=''tenant_a" in sql
def test_cross_app_access_attempt(self):
"""Test prevention of cross-application data access."""
attacker_input = "app1' OR app_id='app2"
escaped = escape_sql_string(attacker_input)
sql = f"WHERE app_id='{escaped}'"
# Cannot access app2's data
assert "app1'' OR app_id=''app2" in sql
def test_bypass_status_filter(self):
"""Test prevention of bypassing status filters."""
# Try to see all statuses instead of just 'running'
attacker_input = "running' OR status LIKE '%"
escaped = escape_sql_string(attacker_input)
sql = f"WHERE status='{escaped}'"
# Status condition is not bypassed
assert "running'' OR status LIKE ''%" in sql
# === Edge Cases ===
def test_escape_only_quotes(self):
"""Test string with only quotes."""
assert escape_sql_string("'") == "''"
assert escape_sql_string("''") == "''''"
def test_escape_mixed_content(self):
"""Test string with mixed quotes and other chars."""
input_str = "It's a 'test' of O'Reilly's code"
escaped = escape_sql_string(input_str)
assert escaped == "It''s a ''test'' of O''Reilly''s code"
def test_escape_unicode_with_quotes(self):
"""Test Unicode strings with quotes."""
input_str = "租户' OR '1'='1"
escaped = escape_sql_string(input_str)
assert escaped == "租户'' OR ''1''=''1"
class TestEscapeIdentifier:
"""Test escape_identifier function."""
def test_escape_uuid(self):
"""Test escaping UUID identifiers."""
uuid = "550e8400-e29b-41d4-a716-446655440000"
assert escape_identifier(uuid) == uuid
def test_escape_alphanumeric_id(self):
"""Test escaping alphanumeric identifiers."""
assert escape_identifier("tenant_123") == "tenant_123"
assert escape_identifier("app-abc-123") == "app-abc-123"
def test_escape_identifier_with_quote(self):
"""Test escaping identifier with single quote."""
malicious = "tenant' OR '1'='1"
escaped = escape_identifier(malicious)
assert escaped == "tenant'' OR ''1''=''1"
def test_identifier_injection_attempt(self):
"""Test prevention of injection through identifiers."""
# Common identifier injection patterns
test_cases = [
("id' OR '1'='1", "id'' OR ''1''=''1"),
("id'; DROP TABLE", "id''; DROP TABLE"),
("id' UNION SELECT", "id'' UNION SELECT"),
]
for malicious, expected in test_cases:
assert escape_identifier(malicious) == expected
class TestSQLInjectionIntegration:
"""Integration tests simulating real SQL construction scenarios."""
def test_complete_where_clause_safety(self):
"""Test that a complete WHERE clause is safe from injection."""
# Simulating typical query construction
tenant_id = "tenant' OR '1'='1"
app_id = "app' UNION SELECT"
run_id = "run' --"
escaped_tenant = escape_identifier(tenant_id)
escaped_app = escape_identifier(app_id)
escaped_run = escape_identifier(run_id)
sql = f"""
SELECT * FROM workflow_runs
WHERE tenant_id='{escaped_tenant}'
AND app_id='{escaped_app}'
AND id='{escaped_run}'
"""
# Verify all special characters are escaped
assert "tenant'' OR ''1''=''1" in sql
assert "app'' UNION SELECT" in sql
assert "run'' --" in sql
# Verify SQL structure is preserved (3 conditions with AND)
assert sql.count("AND") == 2
def test_multiple_conditions_with_injection_attempts(self):
"""Test multiple conditions all attempting injection."""
conditions = {
"tenant_id": "t1' OR tenant_id='t2",
"app_id": "a1' OR app_id='a2",
"status": "running' OR '1'='1",
}
where_parts = []
for field, value in conditions.items():
escaped = escape_sql_string(value)
where_parts.append(f"{field}='{escaped}'")
where_clause = " AND ".join(where_parts)
# All injection attempts are neutralized
assert "t1'' OR tenant_id=''t2" in where_clause
assert "a1'' OR app_id=''a2" in where_clause
assert "running'' OR ''1''=''1" in where_clause
# AND structure is preserved
assert where_clause.count(" AND ") == 2
@pytest.mark.parametrize(
("attack_vector", "description"),
[
("' OR '1'='1", "Boolean injection"),
("' OR '1'='1' --", "Boolean with comment"),
("' UNION SELECT * FROM users --", "Union injection"),
("'; DROP TABLE workflow_runs; --", "Destructive command"),
("' AND SLEEP(10) --", "Time-based blind"),
("' OR tenant_id LIKE '%", "Wildcard injection"),
("admin' --", "Comment bypass"),
("' OR 1=1 LIMIT 1 --", "Limit bypass"),
],
)
def test_common_injection_vectors(self, attack_vector, description):
"""Test protection against common injection attack vectors."""
escaped = escape_sql_string(attack_vector)
# Build SQL
sql = f"WHERE tenant_id='{escaped}'"
# Verify the attack string is now a safe literal
# The key indicator: all internal single quotes are doubled
internal_quotes = escaped.count("''")
original_quotes = attack_vector.count("'")
# Each original quote should be doubled
assert internal_quotes == original_quotes
# Verify SQL has exactly 2 quotes (opening and closing)
assert sql.count("'") >= 2 # At least opening and closing
def test_logstore_specific_scenario(self):
"""Test SQL injection prevention in LogStore-specific scenarios."""
# Simulate LogStore query with window function
tenant_id = "tenant' OR '1'='1"
app_id = "app' UNION SELECT"
escaped_tenant = escape_identifier(tenant_id)
escaped_app = escape_identifier(app_id)
sql = f"""
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY log_version DESC) as rn
FROM workflow_execution_logstore
WHERE tenant_id='{escaped_tenant}'
AND app_id='{escaped_app}'
AND __time__ > 0
) AS subquery WHERE rn = 1
"""
# Complex query structure is maintained
assert "ROW_NUMBER()" in sql
assert "PARTITION BY id" in sql
# Injection attempts are escaped
assert "tenant'' OR ''1''=''1" in sql
assert "app'' UNION SELECT" in sql
# ====================================================================================
# Tests for LogStore Query Syntax (SDK Mode)
# ====================================================================================
class TestLogStoreQueryEscape:
"""Test escape_logstore_query_value for SDK mode query syntax."""
def test_normal_value(self):
"""Test escaping normal alphanumeric value."""
value = "550e8400-e29b-41d4-a716-446655440000"
escaped = escape_logstore_query_value(value)
# Should be wrapped in double quotes
assert escaped == '"550e8400-e29b-41d4-a716-446655440000"'
def test_empty_value(self):
"""Test escaping empty string."""
assert escape_logstore_query_value("") == '""'
def test_value_with_and_keyword(self):
"""Test that 'and' keyword is neutralized when quoted."""
malicious = "value and field:evil"
escaped = escape_logstore_query_value(malicious)
# Should be wrapped in quotes, making 'and' a literal
assert escaped == '"value and field:evil"'
# Simulate using in query
query = f"tenant_id:{escaped}"
assert query == 'tenant_id:"value and field:evil"'
def test_value_with_or_keyword(self):
"""Test that 'or' keyword is neutralized when quoted."""
malicious = "tenant_a or tenant_id:tenant_b"
escaped = escape_logstore_query_value(malicious)
assert escaped == '"tenant_a or tenant_id:tenant_b"'
query = f"tenant_id:{escaped}"
assert "or" in query # Present but as literal string
def test_value_with_not_keyword(self):
"""Test that 'not' keyword is neutralized when quoted."""
malicious = "not field:value"
escaped = escape_logstore_query_value(malicious)
assert escaped == '"not field:value"'
def test_value_with_parentheses(self):
"""Test that parentheses are neutralized when quoted."""
malicious = "(tenant_a or tenant_b)"
escaped = escape_logstore_query_value(malicious)
assert escaped == '"(tenant_a or tenant_b)"'
assert "(" in escaped # Present as literal
assert ")" in escaped # Present as literal
def test_value_with_colon(self):
"""Test that colons are neutralized when quoted."""
malicious = "field:value"
escaped = escape_logstore_query_value(malicious)
assert escaped == '"field:value"'
assert ":" in escaped # Present as literal
def test_value_with_double_quotes(self):
"""Test that internal double quotes are escaped."""
value_with_quotes = 'tenant"test"value'
escaped = escape_logstore_query_value(value_with_quotes)
# Double quotes should be escaped with backslash
assert escaped == '"tenant\\"test\\"value"'
# Should have outer quotes plus escaped inner quotes
assert '\\"' in escaped
def test_value_with_backslash(self):
"""Test that backslashes are escaped."""
value_with_backslash = "tenant\\test"
escaped = escape_logstore_query_value(value_with_backslash)
# Backslash should be escaped
assert escaped == '"tenant\\\\test"'
assert "\\\\" in escaped
def test_value_with_backslash_and_quote(self):
"""Test escaping both backslash and double quote."""
value = 'path\\to\\"file"'
escaped = escape_logstore_query_value(value)
# Both should be escaped
assert escaped == '"path\\\\to\\\\\\"file\\""'
# Verify escape order is correct
assert "\\\\" in escaped # Escaped backslash
assert '\\"' in escaped # Escaped double quote
def test_complex_injection_attempt(self):
"""Test complex injection combining multiple operators."""
malicious = 'tenant_a" or (tenant_id:"tenant_b" and app_id:"evil")'
escaped = escape_logstore_query_value(malicious)
# All special chars should be literals or escaped
assert escaped.startswith('"')
assert escaped.endswith('"')
# Inner double quotes escaped, operators become literals
assert "or" in escaped
assert "and" in escaped
assert '\\"' in escaped # Escaped quotes
def test_only_backslash(self):
"""Test escaping a single backslash."""
assert escape_logstore_query_value("\\") == '"\\\\"'
def test_only_double_quote(self):
"""Test escaping a single double quote."""
assert escape_logstore_query_value('"') == '"\\""'
def test_multiple_backslashes(self):
"""Test escaping multiple consecutive backslashes."""
assert escape_logstore_query_value("\\\\\\") == '"\\\\\\\\\\\\"' # 3 backslashes -> 6
def test_escape_sequence_like_input(self):
"""Test that existing escape sequences are properly escaped."""
# Input looks like already escaped, but we still escape it
value = 'value\\"test'
escaped = escape_logstore_query_value(value)
# \\ -> \\\\, " -> \"
assert escaped == '"value\\\\\\"test"'
@pytest.mark.parametrize(
("attack_scenario", "field", "malicious_value"),
[
("Cross-tenant via OR", "tenant_id", "tenant_a or tenant_id:tenant_b"),
("Cross-app via AND", "app_id", "app_a and (app_id:app_b or app_id:app_c)"),
("Boolean logic", "status", "succeeded or status:failed"),
("Negation", "tenant_id", "not tenant_a"),
("Field injection", "run_id", "run123 and tenant_id:evil_tenant"),
("Parentheses grouping", "app_id", "app1 or (app_id:app2 and tenant_id:tenant2)"),
("Quote breaking attempt", "tenant_id", 'tenant" or "1"="1'),
("Backslash escape bypass", "app_id", "app\\ and app_id:evil"),
],
)
def test_logstore_query_injection_scenarios(attack_scenario: str, field: str, malicious_value: str):
"""Test that various LogStore query injection attempts are neutralized."""
escaped = escape_logstore_query_value(malicious_value)
# Build query
query = f"{field}:{escaped}"
# All operators should be within quoted string (literals)
assert escaped.startswith('"')
assert escaped.endswith('"')
# Verify the full query structure is safe
assert query.count(":") >= 1 # At least the main field:value separator

View File

@ -1,4 +1,4 @@
from unittest.mock import MagicMock, patch
from unittest.mock import ANY, MagicMock, patch
import pytest
@ -17,7 +17,7 @@ def test_smtp_plain_success(mock_smtp_cls: MagicMock):
client = SMTPClient(server="smtp.example.com", port=25, username="", password="", _from="noreply@example.com")
client.send(_mail())
mock_smtp_cls.assert_called_once_with("smtp.example.com", 25, timeout=10)
mock_smtp_cls.assert_called_once_with("smtp.example.com", 25, timeout=10, local_hostname=ANY)
mock_smtp.sendmail.assert_called_once()
mock_smtp.quit.assert_called_once()
@ -38,7 +38,7 @@ def test_smtp_tls_opportunistic_success(mock_smtp_cls: MagicMock):
)
client.send(_mail())
mock_smtp_cls.assert_called_once_with("smtp.example.com", 587, timeout=10)
mock_smtp_cls.assert_called_once_with("smtp.example.com", 587, timeout=10, local_hostname=ANY)
assert mock_smtp.ehlo.call_count == 2
mock_smtp.starttls.assert_called_once()
mock_smtp.login.assert_called_once_with("user", "pass")

View File

@ -0,0 +1,627 @@
import datetime
from unittest.mock import MagicMock, patch
import pytest
from enums.cloud_plan import CloudPlan
from services.retention.conversation.messages_clean_policy import (
BillingDisabledPolicy,
BillingSandboxPolicy,
SimpleMessage,
create_message_clean_policy,
)
from services.retention.conversation.messages_clean_service import MessagesCleanService
def make_simple_message(msg_id: str, app_id: str) -> SimpleMessage:
"""Helper to create a SimpleMessage with a fixed created_at timestamp."""
return SimpleMessage(id=msg_id, app_id=app_id, created_at=datetime.datetime(2024, 1, 1))
def make_plan_provider(tenant_plans: dict) -> MagicMock:
"""Helper to create a mock plan_provider that returns the given tenant_plans."""
provider = MagicMock()
provider.return_value = tenant_plans
return provider
class TestBillingSandboxPolicyFilterMessageIds:
"""Unit tests for BillingSandboxPolicy.filter_message_ids method."""
# Fixed timestamp for deterministic tests
CURRENT_TIMESTAMP = 1000000
GRACEFUL_PERIOD_DAYS = 8
GRACEFUL_PERIOD_SECONDS = GRACEFUL_PERIOD_DAYS * 24 * 60 * 60
def test_missing_tenant_mapping_excluded(self):
"""Test that messages with missing app-to-tenant mapping are excluded."""
# Arrange
messages = [
make_simple_message("msg1", "app1"),
make_simple_message("msg2", "app2"),
]
app_to_tenant = {} # No mapping
tenant_plans = {"tenant1": {"plan": CloudPlan.SANDBOX, "expiration_date": -1}}
plan_provider = make_plan_provider(tenant_plans)
policy = BillingSandboxPolicy(
plan_provider=plan_provider,
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
current_timestamp=self.CURRENT_TIMESTAMP,
)
# Act
result = policy.filter_message_ids(messages, app_to_tenant)
# Assert
assert list(result) == []
def test_missing_tenant_plan_excluded(self):
"""Test that messages with missing tenant plan are excluded (safe default)."""
# Arrange
messages = [
make_simple_message("msg1", "app1"),
make_simple_message("msg2", "app2"),
]
app_to_tenant = {"app1": "tenant1", "app2": "tenant2"}
tenant_plans = {} # No plans
plan_provider = make_plan_provider(tenant_plans)
policy = BillingSandboxPolicy(
plan_provider=plan_provider,
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
current_timestamp=self.CURRENT_TIMESTAMP,
)
# Act
result = policy.filter_message_ids(messages, app_to_tenant)
# Assert
assert list(result) == []
def test_non_sandbox_plan_excluded(self):
"""Test that messages from non-sandbox plans (PROFESSIONAL/TEAM) are excluded."""
# Arrange
messages = [
make_simple_message("msg1", "app1"),
make_simple_message("msg2", "app2"),
make_simple_message("msg3", "app3"),
]
app_to_tenant = {"app1": "tenant1", "app2": "tenant2", "app3": "tenant3"}
tenant_plans = {
"tenant1": {"plan": CloudPlan.PROFESSIONAL, "expiration_date": -1},
"tenant2": {"plan": CloudPlan.TEAM, "expiration_date": -1},
"tenant3": {"plan": CloudPlan.SANDBOX, "expiration_date": -1}, # Only this one
}
plan_provider = make_plan_provider(tenant_plans)
policy = BillingSandboxPolicy(
plan_provider=plan_provider,
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
current_timestamp=self.CURRENT_TIMESTAMP,
)
# Act
result = policy.filter_message_ids(messages, app_to_tenant)
# Assert - only msg3 (sandbox tenant) should be included
assert set(result) == {"msg3"}
def test_whitelist_skip(self):
"""Test that whitelisted tenants are excluded even if sandbox + expired."""
# Arrange
messages = [
make_simple_message("msg1", "app1"), # Whitelisted - excluded
make_simple_message("msg2", "app2"), # Not whitelisted - included
make_simple_message("msg3", "app3"), # Whitelisted - excluded
]
app_to_tenant = {"app1": "tenant1", "app2": "tenant2", "app3": "tenant3"}
tenant_plans = {
"tenant1": {"plan": CloudPlan.SANDBOX, "expiration_date": -1},
"tenant2": {"plan": CloudPlan.SANDBOX, "expiration_date": -1},
"tenant3": {"plan": CloudPlan.SANDBOX, "expiration_date": -1},
}
plan_provider = make_plan_provider(tenant_plans)
tenant_whitelist = ["tenant1", "tenant3"]
policy = BillingSandboxPolicy(
plan_provider=plan_provider,
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
tenant_whitelist=tenant_whitelist,
current_timestamp=self.CURRENT_TIMESTAMP,
)
# Act
result = policy.filter_message_ids(messages, app_to_tenant)
# Assert - only msg2 should be included
assert set(result) == {"msg2"}
def test_no_previous_subscription_included(self):
"""Test that messages with expiration_date=-1 (no previous subscription) are included."""
# Arrange
messages = [
make_simple_message("msg1", "app1"),
make_simple_message("msg2", "app2"),
]
app_to_tenant = {"app1": "tenant1", "app2": "tenant2"}
tenant_plans = {
"tenant1": {"plan": CloudPlan.SANDBOX, "expiration_date": -1},
"tenant2": {"plan": CloudPlan.SANDBOX, "expiration_date": -1},
}
plan_provider = make_plan_provider(tenant_plans)
policy = BillingSandboxPolicy(
plan_provider=plan_provider,
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
current_timestamp=self.CURRENT_TIMESTAMP,
)
# Act
result = policy.filter_message_ids(messages, app_to_tenant)
# Assert - all messages should be included
assert set(result) == {"msg1", "msg2"}
def test_within_grace_period_excluded(self):
"""Test that messages within grace period are excluded."""
# Arrange
now = self.CURRENT_TIMESTAMP
expired_1_day_ago = now - (1 * 24 * 60 * 60)
expired_5_days_ago = now - (5 * 24 * 60 * 60)
expired_7_days_ago = now - (7 * 24 * 60 * 60)
messages = [
make_simple_message("msg1", "app1"),
make_simple_message("msg2", "app2"),
make_simple_message("msg3", "app3"),
]
app_to_tenant = {"app1": "tenant1", "app2": "tenant2", "app3": "tenant3"}
tenant_plans = {
"tenant1": {"plan": CloudPlan.SANDBOX, "expiration_date": expired_1_day_ago},
"tenant2": {"plan": CloudPlan.SANDBOX, "expiration_date": expired_5_days_ago},
"tenant3": {"plan": CloudPlan.SANDBOX, "expiration_date": expired_7_days_ago},
}
plan_provider = make_plan_provider(tenant_plans)
policy = BillingSandboxPolicy(
plan_provider=plan_provider,
graceful_period_days=self.GRACEFUL_PERIOD_DAYS, # 8 days
current_timestamp=now,
)
# Act
result = policy.filter_message_ids(messages, app_to_tenant)
# Assert - all within 8-day grace period, none should be included
assert list(result) == []
def test_exactly_at_boundary_excluded(self):
"""Test that messages exactly at grace period boundary are excluded (code uses >)."""
# Arrange
now = self.CURRENT_TIMESTAMP
expired_exactly_8_days_ago = now - self.GRACEFUL_PERIOD_SECONDS # Exactly at boundary
messages = [make_simple_message("msg1", "app1")]
app_to_tenant = {"app1": "tenant1"}
tenant_plans = {
"tenant1": {"plan": CloudPlan.SANDBOX, "expiration_date": expired_exactly_8_days_ago},
}
plan_provider = make_plan_provider(tenant_plans)
policy = BillingSandboxPolicy(
plan_provider=plan_provider,
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
current_timestamp=now,
)
# Act
result = policy.filter_message_ids(messages, app_to_tenant)
# Assert - exactly at boundary (==) should be excluded (code uses >)
assert list(result) == []
def test_beyond_grace_period_included(self):
"""Test that messages beyond grace period are included."""
# Arrange
now = self.CURRENT_TIMESTAMP
expired_9_days_ago = now - (9 * 24 * 60 * 60) # Just beyond 8-day grace
expired_30_days_ago = now - (30 * 24 * 60 * 60) # Well beyond
messages = [
make_simple_message("msg1", "app1"),
make_simple_message("msg2", "app2"),
]
app_to_tenant = {"app1": "tenant1", "app2": "tenant2"}
tenant_plans = {
"tenant1": {"plan": CloudPlan.SANDBOX, "expiration_date": expired_9_days_ago},
"tenant2": {"plan": CloudPlan.SANDBOX, "expiration_date": expired_30_days_ago},
}
plan_provider = make_plan_provider(tenant_plans)
policy = BillingSandboxPolicy(
plan_provider=plan_provider,
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
current_timestamp=now,
)
# Act
result = policy.filter_message_ids(messages, app_to_tenant)
# Assert - both beyond grace period, should be included
assert set(result) == {"msg1", "msg2"}
def test_empty_messages_returns_empty(self):
"""Test that empty messages returns empty list."""
# Arrange
messages: list[SimpleMessage] = []
app_to_tenant = {"app1": "tenant1"}
plan_provider = make_plan_provider({"tenant1": {"plan": CloudPlan.SANDBOX, "expiration_date": -1}})
policy = BillingSandboxPolicy(
plan_provider=plan_provider,
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
current_timestamp=self.CURRENT_TIMESTAMP,
)
# Act
result = policy.filter_message_ids(messages, app_to_tenant)
# Assert
assert list(result) == []
def test_plan_provider_called_with_correct_tenant_ids(self):
"""Test that plan_provider is called with correct tenant_ids."""
# Arrange
messages = [
make_simple_message("msg1", "app1"),
make_simple_message("msg2", "app2"),
make_simple_message("msg3", "app3"),
]
app_to_tenant = {"app1": "tenant1", "app2": "tenant2", "app3": "tenant1"} # tenant1 appears twice
plan_provider = make_plan_provider({})
policy = BillingSandboxPolicy(
plan_provider=plan_provider,
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
current_timestamp=self.CURRENT_TIMESTAMP,
)
# Act
policy.filter_message_ids(messages, app_to_tenant)
# Assert - plan_provider should be called once with unique tenant_ids
plan_provider.assert_called_once()
called_tenant_ids = set(plan_provider.call_args[0][0])
assert called_tenant_ids == {"tenant1", "tenant2"}
def test_complex_mixed_scenario(self):
"""Test complex scenario with mixed plans, expirations, whitelist, and missing mappings."""
# Arrange
now = self.CURRENT_TIMESTAMP
sandbox_expired_old = now - (15 * 24 * 60 * 60) # Beyond grace
sandbox_expired_recent = now - (3 * 24 * 60 * 60) # Within grace
future_expiration = now + (30 * 24 * 60 * 60)
messages = [
make_simple_message("msg1", "app1"), # Sandbox, no subscription - included
make_simple_message("msg2", "app2"), # Sandbox, expired old - included
make_simple_message("msg3", "app3"), # Sandbox, within grace - excluded
make_simple_message("msg4", "app4"), # Team plan, active - excluded
make_simple_message("msg5", "app5"), # No tenant mapping - excluded
make_simple_message("msg6", "app6"), # No plan info - excluded
make_simple_message("msg7", "app7"), # Sandbox, expired old, whitelisted - excluded
]
app_to_tenant = {
"app1": "tenant1",
"app2": "tenant2",
"app3": "tenant3",
"app4": "tenant4",
"app6": "tenant6", # Has mapping but no plan
"app7": "tenant7",
# app5 has no mapping
}
tenant_plans = {
"tenant1": {"plan": CloudPlan.SANDBOX, "expiration_date": -1},
"tenant2": {"plan": CloudPlan.SANDBOX, "expiration_date": sandbox_expired_old},
"tenant3": {"plan": CloudPlan.SANDBOX, "expiration_date": sandbox_expired_recent},
"tenant4": {"plan": CloudPlan.TEAM, "expiration_date": future_expiration},
"tenant7": {"plan": CloudPlan.SANDBOX, "expiration_date": sandbox_expired_old},
# tenant6 has no plan
}
plan_provider = make_plan_provider(tenant_plans)
tenant_whitelist = ["tenant7"]
policy = BillingSandboxPolicy(
plan_provider=plan_provider,
graceful_period_days=self.GRACEFUL_PERIOD_DAYS,
tenant_whitelist=tenant_whitelist,
current_timestamp=now,
)
# Act
result = policy.filter_message_ids(messages, app_to_tenant)
# Assert - only msg1 and msg2 should be included
assert set(result) == {"msg1", "msg2"}
class TestBillingDisabledPolicyFilterMessageIds:
"""Unit tests for BillingDisabledPolicy.filter_message_ids method."""
def test_returns_all_message_ids(self):
"""Test that all message IDs are returned (order-preserving)."""
# Arrange
messages = [
make_simple_message("msg1", "app1"),
make_simple_message("msg2", "app2"),
make_simple_message("msg3", "app3"),
]
app_to_tenant = {"app1": "tenant1", "app2": "tenant2"}
policy = BillingDisabledPolicy()
# Act
result = policy.filter_message_ids(messages, app_to_tenant)
# Assert - all message IDs returned in order
assert list(result) == ["msg1", "msg2", "msg3"]
def test_ignores_app_to_tenant(self):
"""Test that app_to_tenant mapping is ignored."""
# Arrange
messages = [
make_simple_message("msg1", "app1"),
make_simple_message("msg2", "app2"),
]
app_to_tenant: dict[str, str] = {} # Empty - should be ignored
policy = BillingDisabledPolicy()
# Act
result = policy.filter_message_ids(messages, app_to_tenant)
# Assert - all message IDs still returned
assert list(result) == ["msg1", "msg2"]
def test_empty_messages_returns_empty(self):
"""Test that empty messages returns empty list."""
# Arrange
messages: list[SimpleMessage] = []
app_to_tenant = {"app1": "tenant1"}
policy = BillingDisabledPolicy()
# Act
result = policy.filter_message_ids(messages, app_to_tenant)
# Assert
assert list(result) == []
class TestCreateMessageCleanPolicy:
"""Unit tests for create_message_clean_policy factory function."""
@patch("services.retention.conversation.messages_clean_policy.dify_config")
def test_billing_disabled_returns_billing_disabled_policy(self, mock_config):
"""Test that BILLING_ENABLED=False returns BillingDisabledPolicy."""
# Arrange
mock_config.BILLING_ENABLED = False
# Act
policy = create_message_clean_policy(graceful_period_days=21)
# Assert
assert isinstance(policy, BillingDisabledPolicy)
@patch("services.retention.conversation.messages_clean_policy.BillingService")
@patch("services.retention.conversation.messages_clean_policy.dify_config")
def test_billing_enabled_policy_has_correct_internals(self, mock_config, mock_billing_service):
"""Test that BillingSandboxPolicy is created with correct internal values."""
# Arrange
mock_config.BILLING_ENABLED = True
whitelist = ["tenant1", "tenant2"]
mock_billing_service.get_expired_subscription_cleanup_whitelist.return_value = whitelist
mock_plan_provider = MagicMock()
mock_billing_service.get_plan_bulk_with_cache = mock_plan_provider
# Act
policy = create_message_clean_policy(graceful_period_days=14, current_timestamp=1234567)
# Assert
mock_billing_service.get_expired_subscription_cleanup_whitelist.assert_called_once()
assert isinstance(policy, BillingSandboxPolicy)
assert policy._graceful_period_days == 14
assert list(policy._tenant_whitelist) == whitelist
assert policy._plan_provider == mock_plan_provider
assert policy._current_timestamp == 1234567
class TestMessagesCleanServiceFromTimeRange:
"""Unit tests for MessagesCleanService.from_time_range factory method."""
def test_start_from_end_before_raises_value_error(self):
"""Test that start_from == end_before raises ValueError."""
policy = BillingDisabledPolicy()
# Arrange
same_time = datetime.datetime(2024, 1, 1, 12, 0, 0)
# Act & Assert
with pytest.raises(ValueError, match="start_from .* must be less than end_before"):
MessagesCleanService.from_time_range(
policy=policy,
start_from=same_time,
end_before=same_time,
)
# Arrange
start_from = datetime.datetime(2024, 12, 31)
end_before = datetime.datetime(2024, 1, 1)
# Act & Assert
with pytest.raises(ValueError, match="start_from .* must be less than end_before"):
MessagesCleanService.from_time_range(
policy=policy,
start_from=start_from,
end_before=end_before,
)
def test_batch_size_raises_value_error(self):
"""Test that batch_size=0 raises ValueError."""
# Arrange
start_from = datetime.datetime(2024, 1, 1)
end_before = datetime.datetime(2024, 2, 1)
policy = BillingDisabledPolicy()
# Act & Assert
with pytest.raises(ValueError, match="batch_size .* must be greater than 0"):
MessagesCleanService.from_time_range(
policy=policy,
start_from=start_from,
end_before=end_before,
batch_size=0,
)
start_from = datetime.datetime(2024, 1, 1)
end_before = datetime.datetime(2024, 2, 1)
policy = BillingDisabledPolicy()
# Act & Assert
with pytest.raises(ValueError, match="batch_size .* must be greater than 0"):
MessagesCleanService.from_time_range(
policy=policy,
start_from=start_from,
end_before=end_before,
batch_size=-100,
)
def test_valid_params_creates_instance(self):
"""Test that valid parameters create a correctly configured instance."""
# Arrange
start_from = datetime.datetime(2024, 1, 1, 0, 0, 0)
end_before = datetime.datetime(2024, 12, 31, 23, 59, 59)
policy = BillingDisabledPolicy()
batch_size = 500
dry_run = True
# Act
service = MessagesCleanService.from_time_range(
policy=policy,
start_from=start_from,
end_before=end_before,
batch_size=batch_size,
dry_run=dry_run,
)
# Assert
assert isinstance(service, MessagesCleanService)
assert service._policy is policy
assert service._start_from == start_from
assert service._end_before == end_before
assert service._batch_size == batch_size
assert service._dry_run == dry_run
def test_default_params(self):
"""Test that default parameters are applied correctly."""
# Arrange
start_from = datetime.datetime(2024, 1, 1)
end_before = datetime.datetime(2024, 2, 1)
policy = BillingDisabledPolicy()
# Act
service = MessagesCleanService.from_time_range(
policy=policy,
start_from=start_from,
end_before=end_before,
)
# Assert
assert service._batch_size == 1000 # default
assert service._dry_run is False # default
class TestMessagesCleanServiceFromDays:
"""Unit tests for MessagesCleanService.from_days factory method."""
def test_days_raises_value_error(self):
"""Test that days < 0 raises ValueError."""
# Arrange
policy = BillingDisabledPolicy()
# Act & Assert
with pytest.raises(ValueError, match="days .* must be greater than or equal to 0"):
MessagesCleanService.from_days(policy=policy, days=-1)
# Act
with patch("services.retention.conversation.messages_clean_service.datetime") as mock_datetime:
fixed_now = datetime.datetime(2024, 6, 15, 14, 0, 0)
mock_datetime.datetime.now.return_value = fixed_now
mock_datetime.timedelta = datetime.timedelta
service = MessagesCleanService.from_days(policy=policy, days=0)
# Assert
assert service._end_before == fixed_now
def test_batch_size_raises_value_error(self):
"""Test that batch_size=0 raises ValueError."""
# Arrange
policy = BillingDisabledPolicy()
# Act & Assert
with pytest.raises(ValueError, match="batch_size .* must be greater than 0"):
MessagesCleanService.from_days(policy=policy, days=30, batch_size=0)
# Act & Assert
with pytest.raises(ValueError, match="batch_size .* must be greater than 0"):
MessagesCleanService.from_days(policy=policy, days=30, batch_size=-500)
def test_valid_params_creates_instance(self):
"""Test that valid parameters create a correctly configured instance."""
# Arrange
policy = BillingDisabledPolicy()
days = 90
batch_size = 500
dry_run = True
# Act
with patch("services.retention.conversation.messages_clean_service.datetime") as mock_datetime:
fixed_now = datetime.datetime(2024, 6, 15, 10, 30, 0)
mock_datetime.datetime.now.return_value = fixed_now
mock_datetime.timedelta = datetime.timedelta
service = MessagesCleanService.from_days(
policy=policy,
days=days,
batch_size=batch_size,
dry_run=dry_run,
)
# Assert
expected_end_before = fixed_now - datetime.timedelta(days=days)
assert isinstance(service, MessagesCleanService)
assert service._policy is policy
assert service._start_from is None
assert service._end_before == expected_end_before
assert service._batch_size == batch_size
assert service._dry_run == dry_run
def test_default_params(self):
"""Test that default parameters are applied correctly."""
# Arrange
policy = BillingDisabledPolicy()
# Act
with patch("services.retention.conversation.messages_clean_service.datetime") as mock_datetime:
fixed_now = datetime.datetime(2024, 6, 15, 10, 30, 0)
mock_datetime.datetime.now.return_value = fixed_now
mock_datetime.timedelta = datetime.timedelta
service = MessagesCleanService.from_days(policy=policy)
# Assert
expected_end_before = fixed_now - datetime.timedelta(days=30) # default days=30
assert service._end_before == expected_end_before
assert service._batch_size == 1000 # default
assert service._dry_run is False # default

View File

@ -9,7 +9,7 @@ This module tests the mail sending functionality including:
"""
import smtplib
from unittest.mock import MagicMock, patch
from unittest.mock import ANY, MagicMock, patch
import pytest
@ -151,7 +151,7 @@ class TestSMTPIntegration:
client.send(mail_data)
# Assert
mock_smtp_ssl.assert_called_once_with("smtp.example.com", 465, timeout=10)
mock_smtp_ssl.assert_called_once_with("smtp.example.com", 465, timeout=10, local_hostname=ANY)
mock_server.login.assert_called_once_with("user@example.com", "password123")
mock_server.sendmail.assert_called_once()
mock_server.quit.assert_called_once()
@ -181,7 +181,7 @@ class TestSMTPIntegration:
client.send(mail_data)
# Assert
mock_smtp.assert_called_once_with("smtp.example.com", 587, timeout=10)
mock_smtp.assert_called_once_with("smtp.example.com", 587, timeout=10, local_hostname=ANY)
mock_server.ehlo.assert_called()
mock_server.starttls.assert_called_once()
assert mock_server.ehlo.call_count == 2 # Before and after STARTTLS
@ -213,7 +213,7 @@ class TestSMTPIntegration:
client.send(mail_data)
# Assert
mock_smtp.assert_called_once_with("smtp.example.com", 25, timeout=10)
mock_smtp.assert_called_once_with("smtp.example.com", 25, timeout=10, local_hostname=ANY)
mock_server.login.assert_called_once()
mock_server.sendmail.assert_called_once()
mock_server.quit.assert_called_once()