feat: Add SMTP OAuth 2.0 support for Microsoft Exchange

Add comprehensive OAuth 2.0 authentication support for SMTP to address
Microsoft's Basic Authentication retirement in September 2025.

Key features:
- OAuth 2.0 SASL XOAUTH2 authentication mechanism
- Microsoft Azure AD integration with client credentials flow
- Backward compatible with existing basic authentication
- Comprehensive configuration options in .env.example files
- Enhanced SMTP client with dependency injection for better testability
- Complete test coverage with proper mocking

Configuration:
- SMTP_AUTH_TYPE: Choose between 'basic' and 'oauth2' authentication
- Microsoft OAuth 2.0 settings for Azure AD integration
- Automatic token acquisition using client credentials flow

Files changed:
- Enhanced SMTP client with OAuth 2.0 support
- New mail module structure under libs/mail/
- Updated configuration system with OAuth settings
- Comprehensive documentation and setup instructions
- Complete test suite for OAuth functionality

This change ensures compatibility with Microsoft Exchange Online
after Basic Authentication retirement.
This commit is contained in:
-LAN-
2025-07-22 03:00:16 +08:00
parent f4522fd695
commit 69f712b713
13 changed files with 1593 additions and 62 deletions

View File

@ -0,0 +1,375 @@
"""Comprehensive tests for email OAuth implementation"""
import base64
from typing import Optional, Union
import pytest
from libs.mail.oauth_email import MicrosoftEmailOAuth, OAuthUserInfo
from libs.mail.oauth_http_client import OAuthHTTPClientProtocol
class MockHTTPClient(OAuthHTTPClientProtocol):
"""Mock HTTP client for testing OAuth without real network calls"""
def __init__(self):
self.post_responses = []
self.get_responses = []
self.post_calls = []
self.get_calls = []
self.post_index = 0
self.get_index = 0
def add_post_response(self, status_code: int, json_data: dict[str, Union[str, int]]):
"""Add a mocked POST response"""
self.post_responses.append(
{
"status_code": status_code,
"json": json_data,
"text": str(json_data),
"headers": {"content-type": "application/json"},
}
)
def add_get_response(self, json_data: dict[str, Union[str, int, dict, list]]):
"""Add a mocked GET response"""
self.get_responses.append(json_data)
def post(
self, url: str, data: dict[str, Union[str, int]], headers: Optional[dict[str, str]] = None
) -> dict[str, Union[str, int, dict, list]]:
"""Mock POST request"""
self.post_calls.append({"url": url, "data": data, "headers": headers})
if self.post_index < len(self.post_responses):
response = self.post_responses[self.post_index]
self.post_index += 1
return response
# Default error response
return {
"status_code": 500,
"json": {"error": "No mock response configured"},
"text": "No mock response configured",
"headers": {},
}
def get(self, url: str, headers: Optional[dict[str, str]] = None) -> dict[str, Union[str, int, dict, list]]:
"""Mock GET request"""
self.get_calls.append({"url": url, "headers": headers})
if self.get_index < len(self.get_responses):
response = self.get_responses[self.get_index]
self.get_index += 1
return response
# Default error response
raise Exception("No mock response configured")
class TestMicrosoftEmailOAuth:
"""Test cases for MicrosoftEmailOAuth"""
@pytest.fixture
def mock_http_client(self):
"""Create a mock HTTP client"""
return MockHTTPClient()
@pytest.fixture
def oauth_client(self, mock_http_client):
"""Create OAuth client with mock HTTP client"""
return MicrosoftEmailOAuth(
client_id="test-client-id",
client_secret="test-client-secret",
redirect_uri="https://example.com/callback",
tenant_id="test-tenant",
http_client=mock_http_client,
)
def test_get_authorization_url(self, oauth_client):
"""Test authorization URL generation"""
url = oauth_client.get_authorization_url()
assert "login.microsoftonline.com/test-tenant/oauth2/v2.0/authorize" in url
assert "client_id=test-client-id" in url
assert "response_type=code" in url
assert "redirect_uri=https%3A%2F%2Fexample.com%2Fcallback" in url
assert "scope=https%3A%2F%2Foutlook.office.com%2FSMTP.Send+offline_access" in url
assert "response_mode=query" in url
def test_get_authorization_url_with_state(self, oauth_client):
"""Test authorization URL with state parameter"""
url = oauth_client.get_authorization_url(invite_token="test-state-123")
assert "state=test-state-123" in url
def test_get_access_token_success(self, oauth_client, mock_http_client):
"""Test successful access token retrieval"""
# Setup mock response
mock_http_client.add_post_response(
200,
{
"access_token": "test-access-token",
"token_type": "Bearer",
"expires_in": 3600,
"refresh_token": "test-refresh-token",
},
)
result = oauth_client.get_access_token("test-auth-code")
# Verify result
assert result["access_token"] == "test-access-token"
assert result["token_type"] == "Bearer"
assert result["expires_in"] == 3600
assert result["refresh_token"] == "test-refresh-token"
# Verify HTTP call
assert len(mock_http_client.post_calls) == 1
call = mock_http_client.post_calls[0]
assert "login.microsoftonline.com/test-tenant/oauth2/v2.0/token" in call["url"]
assert call["data"]["grant_type"] == "authorization_code"
assert call["data"]["code"] == "test-auth-code"
assert call["data"]["client_id"] == "test-client-id"
assert call["data"]["client_secret"] == "test-client-secret"
def test_get_access_token_failure(self, oauth_client, mock_http_client):
"""Test access token retrieval failure"""
# Setup mock error response
mock_http_client.add_post_response(
400, {"error": "invalid_grant", "error_description": "The authorization code is invalid"}
)
with pytest.raises(ValueError, match="Error in Microsoft OAuth"):
oauth_client.get_access_token("bad-auth-code")
def test_get_access_token_client_credentials_success(self, oauth_client, mock_http_client):
"""Test successful client credentials flow"""
# Setup mock response
mock_http_client.add_post_response(
200, {"access_token": "service-access-token", "token_type": "Bearer", "expires_in": 3600}
)
result = oauth_client.get_access_token_client_credentials()
# Verify result
assert result["access_token"] == "service-access-token"
assert result["token_type"] == "Bearer"
# Verify HTTP call
call = mock_http_client.post_calls[0]
assert call["data"]["grant_type"] == "client_credentials"
assert call["data"]["scope"] == "https://outlook.office365.com/.default"
def test_get_access_token_client_credentials_custom_scope(self, oauth_client, mock_http_client):
"""Test client credentials with custom scope"""
mock_http_client.add_post_response(200, {"access_token": "custom-scope-token", "token_type": "Bearer"})
result = oauth_client.get_access_token_client_credentials(scope="https://graph.microsoft.com/.default")
assert result["access_token"] == "custom-scope-token"
# Verify custom scope was used
call = mock_http_client.post_calls[0]
assert call["data"]["scope"] == "https://graph.microsoft.com/.default"
def test_refresh_access_token_success(self, oauth_client, mock_http_client):
"""Test successful token refresh"""
# Setup mock response
mock_http_client.add_post_response(
200,
{
"access_token": "new-access-token",
"refresh_token": "new-refresh-token",
"token_type": "Bearer",
"expires_in": 3600,
},
)
result = oauth_client.refresh_access_token("old-refresh-token")
# Verify result
assert result["access_token"] == "new-access-token"
assert result["refresh_token"] == "new-refresh-token"
# Verify HTTP call
call = mock_http_client.post_calls[0]
assert call["data"]["grant_type"] == "refresh_token"
assert call["data"]["refresh_token"] == "old-refresh-token"
def test_refresh_access_token_failure(self, oauth_client, mock_http_client):
"""Test token refresh failure"""
# Setup mock error response
mock_http_client.add_post_response(
400, {"error": "invalid_grant", "error_description": "The refresh token has expired"}
)
with pytest.raises(ValueError, match="Error refreshing Microsoft OAuth token"):
oauth_client.refresh_access_token("expired-refresh-token")
def test_get_raw_user_info(self, oauth_client, mock_http_client):
"""Test getting user info from Microsoft Graph"""
# Setup mock response
mock_http_client.add_get_response(
{
"id": "12345",
"displayName": "Test User",
"mail": "test@contoso.com",
"userPrincipalName": "test@contoso.com",
}
)
result = oauth_client.get_raw_user_info("test-access-token")
# Verify result
assert result["id"] == "12345"
assert result["displayName"] == "Test User"
assert result["mail"] == "test@contoso.com"
# Verify HTTP call
call = mock_http_client.get_calls[0]
assert call["url"] == "https://graph.microsoft.com/v1.0/me"
assert call["headers"]["Authorization"] == "Bearer test-access-token"
def test_get_user_info_complete_flow(self, oauth_client, mock_http_client):
"""Test complete user info retrieval flow"""
# Setup mock response
mock_http_client.add_get_response(
{
"id": "67890",
"displayName": "John Doe",
"mail": "john.doe@contoso.com",
"userPrincipalName": "john.doe@contoso.com",
}
)
user_info = oauth_client.get_user_info("test-access-token")
# Verify transformed user info
assert isinstance(user_info, OAuthUserInfo)
assert user_info.id == "67890"
assert user_info.name == "John Doe"
assert user_info.email == "john.doe@contoso.com"
def test_transform_user_info_with_missing_mail(self, oauth_client):
"""Test user info transformation when mail field is missing"""
raw_info = {"id": "99999", "displayName": "No Mail User", "userPrincipalName": "nomail@contoso.com"}
user_info = oauth_client._transform_user_info(raw_info)
# Should fall back to userPrincipalName
assert user_info.email == "nomail@contoso.com"
def test_transform_user_info_with_no_display_name(self, oauth_client):
"""Test user info transformation when displayName is missing"""
raw_info = {"id": "11111", "mail": "anonymous@contoso.com", "userPrincipalName": "anonymous@contoso.com"}
user_info = oauth_client._transform_user_info(raw_info)
# Should have empty name
assert user_info.name == ""
assert user_info.email == "anonymous@contoso.com"
def test_create_sasl_xoauth2_string(self):
"""Test static SASL XOAUTH2 string creation"""
username = "test@contoso.com"
access_token = "test-token-456"
result = MicrosoftEmailOAuth.create_sasl_xoauth2_string(username, access_token)
# Decode and verify format
decoded = base64.b64decode(result).decode()
expected = f"user={username}\x01auth=Bearer {access_token}\x01\x01"
assert decoded == expected
def test_error_handling_with_non_json_response(self, oauth_client, mock_http_client):
"""Test handling of non-JSON error responses"""
# Setup mock HTML error response
mock_http_client.post_responses.append(
{
"status_code": 500,
"json": {},
"text": "<html>Internal Server Error</html>",
"headers": {"content-type": "text/html"},
}
)
with pytest.raises(ValueError, match="Error in Microsoft OAuth"):
oauth_client.get_access_token("test-code")
class TestOAuthIntegration:
"""Integration tests for OAuth with SMTP"""
def test_oauth_token_flow_for_smtp(self):
"""Test complete OAuth token flow for SMTP usage"""
# Create mock HTTP client
mock_http = MockHTTPClient()
# Setup mock responses for complete flow
mock_http.add_post_response(
200,
{
"access_token": "smtp-access-token",
"token_type": "Bearer",
"expires_in": 3600,
"refresh_token": "smtp-refresh-token",
"scope": "https://outlook.office.com/SMTP.Send offline_access",
},
)
# Create OAuth client
oauth_client = MicrosoftEmailOAuth(
client_id="smtp-client-id",
client_secret="smtp-client-secret",
redirect_uri="https://app.example.com/oauth/callback",
tenant_id="contoso.onmicrosoft.com",
http_client=mock_http,
)
# Get authorization URL
auth_url = oauth_client.get_authorization_url()
assert "scope=https%3A%2F%2Foutlook.office.com%2FSMTP.Send+offline_access" in auth_url
# Exchange code for token
token_response = oauth_client.get_access_token("auth-code-from-user")
assert token_response["access_token"] == "smtp-access-token"
# Create SASL string for SMTP
access_token = str(token_response["access_token"])
sasl_string = MicrosoftEmailOAuth.create_sasl_xoauth2_string("user@contoso.com", access_token)
# Verify SASL string is valid base64
try:
decoded = base64.b64decode(sasl_string)
assert b"user=user@contoso.com" in decoded
assert b"auth=Bearer smtp-access-token" in decoded
except Exception:
pytest.fail("SASL string is not valid base64")
def test_service_account_flow(self):
"""Test service account (client credentials) flow"""
mock_http = MockHTTPClient()
# Setup mock response for client credentials
mock_http.add_post_response(
200, {"access_token": "service-smtp-token", "token_type": "Bearer", "expires_in": 3600}
)
oauth_client = MicrosoftEmailOAuth(
client_id="service-client-id",
client_secret="service-client-secret",
redirect_uri="", # Not needed for service accounts
tenant_id="contoso.onmicrosoft.com",
http_client=mock_http,
)
# Get token using client credentials
token_response = oauth_client.get_access_token_client_credentials()
assert token_response["access_token"] == "service-smtp-token"
# Verify the request used correct grant type
call = mock_http.post_calls[0]
assert call["data"]["grant_type"] == "client_credentials"
assert "redirect_uri" not in call["data"] # Should not include redirect_uri

View File

@ -0,0 +1,368 @@
"""Comprehensive tests for SMTP implementation with OAuth 2.0 support"""
import base64
import smtplib
from unittest.mock import MagicMock, Mock
import pytest
from libs.mail.smtp import SMTPAuthenticator, SMTPClient, SMTPMessageBuilder
from libs.mail.smtp_connection import SMTPConnectionFactory, SMTPConnectionProtocol
class MockSMTPConnection:
"""Mock SMTP connection for testing"""
def __init__(self):
self.ehlo_called = 0
self.starttls_called = False
self.login_called = False
self.docmd_called = False
self.sendmail_called = False
self.quit_called = False
self.last_docmd_args = None
self.last_login_args = None
self.last_sendmail_args = None
def ehlo(self, name: str = "") -> tuple:
self.ehlo_called += 1
return (250, b"OK")
def starttls(self) -> tuple:
self.starttls_called = True
return (220, b"TLS started")
def login(self, user: str, password: str) -> tuple:
self.login_called = True
self.last_login_args = (user, password)
return (235, b"Authentication successful")
def docmd(self, cmd: str, args: str = "") -> tuple:
self.docmd_called = True
self.last_docmd_args = (cmd, args)
return (235, b"Authentication successful")
def sendmail(self, from_addr: str, to_addrs: str, msg: str) -> dict:
self.sendmail_called = True
self.last_sendmail_args = (from_addr, to_addrs, msg)
return {}
def quit(self) -> tuple:
self.quit_called = True
return (221, b"Bye")
class MockSMTPConnectionFactory(SMTPConnectionFactory):
"""Mock factory for creating mock SMTP connections"""
def __init__(self, connection: MockSMTPConnection):
self.connection = connection
self.create_called = False
def create_connection(self, server: str, port: int, timeout: int = 10) -> SMTPConnectionProtocol:
self.create_called = True
self.last_create_args = (server, port, timeout)
return self.connection
class TestSMTPAuthenticator:
"""Test cases for SMTPAuthenticator"""
def test_create_sasl_xoauth2_string(self):
"""Test SASL XOAUTH2 string creation"""
authenticator = SMTPAuthenticator()
username = "test@example.com"
access_token = "test_token_123"
result = authenticator.create_sasl_xoauth2_string(username, access_token)
# Decode and verify
decoded = base64.b64decode(result).decode()
expected = f"user={username}\x01auth=Bearer {access_token}\x01\x01"
assert decoded == expected
def test_authenticate_basic_with_valid_credentials(self):
"""Test basic authentication with valid credentials"""
authenticator = SMTPAuthenticator()
connection = MockSMTPConnection()
authenticator.authenticate_basic(connection, "user@example.com", "password123")
assert connection.login_called
assert connection.last_login_args == ("user@example.com", "password123")
def test_authenticate_basic_with_empty_credentials(self):
"""Test basic authentication skips with empty credentials"""
authenticator = SMTPAuthenticator()
connection = MockSMTPConnection()
authenticator.authenticate_basic(connection, "", "")
assert not connection.login_called
def test_authenticate_oauth2_success(self):
"""Test successful OAuth2 authentication"""
authenticator = SMTPAuthenticator()
connection = MockSMTPConnection()
authenticator.authenticate_oauth2(connection, "user@example.com", "oauth_token_123")
assert connection.docmd_called
assert connection.last_docmd_args[0] == "AUTH"
assert connection.last_docmd_args[1].startswith("XOAUTH2 ")
# Verify the auth string
auth_string = connection.last_docmd_args[1].split(" ")[1]
decoded = base64.b64decode(auth_string).decode()
assert "user=user@example.com" in decoded
assert "auth=Bearer oauth_token_123" in decoded
def test_authenticate_oauth2_missing_credentials(self):
"""Test OAuth2 authentication fails with missing credentials"""
authenticator = SMTPAuthenticator()
connection = MockSMTPConnection()
with pytest.raises(ValueError, match="Username and OAuth access token are required"):
authenticator.authenticate_oauth2(connection, "", "token")
with pytest.raises(ValueError, match="Username and OAuth access token are required"):
authenticator.authenticate_oauth2(connection, "user", "")
def test_authenticate_oauth2_auth_failure(self):
"""Test OAuth2 authentication handles auth errors"""
authenticator = SMTPAuthenticator()
connection = Mock()
connection.docmd.side_effect = smtplib.SMTPAuthenticationError(535, b"Authentication failed")
with pytest.raises(ValueError, match="OAuth2 authentication failed"):
authenticator.authenticate_oauth2(connection, "user@example.com", "bad_token")
class TestSMTPMessageBuilder:
"""Test cases for SMTPMessageBuilder"""
def test_build_message(self):
"""Test message building"""
builder = SMTPMessageBuilder()
mail_data = {"to": "recipient@example.com", "subject": "Test Subject", "html": "<p>Test HTML content</p>"}
from_addr = "sender@example.com"
msg = builder.build_message(mail_data, from_addr)
assert msg["To"] == "recipient@example.com"
assert msg["From"] == "sender@example.com"
assert msg["Subject"] == "Test Subject"
assert "<p>Test HTML content</p>" in msg.as_string()
class TestSMTPClient:
"""Test cases for SMTPClient"""
@pytest.fixture
def mock_connection(self):
"""Create a mock SMTP connection"""
return MockSMTPConnection()
@pytest.fixture
def mock_factories(self, mock_connection):
"""Create mock connection factories"""
return {
"connection_factory": MockSMTPConnectionFactory(mock_connection),
"ssl_connection_factory": MockSMTPConnectionFactory(mock_connection),
}
def test_basic_auth_send_success(self, mock_connection, mock_factories):
"""Test successful email send with basic auth"""
client = SMTPClient(
server="smtp.example.com",
port=587,
username="user@example.com",
password="password123",
from_addr="sender@example.com",
use_tls=True,
opportunistic_tls=True,
auth_type="basic",
**mock_factories,
)
mail_data = {"to": "recipient@example.com", "subject": "Test Subject", "html": "<p>Test content</p>"}
client.send(mail_data)
# Verify connection sequence
assert mock_connection.ehlo_called == 2 # Before and after STARTTLS
assert mock_connection.starttls_called
assert mock_connection.login_called
assert mock_connection.last_login_args == ("user@example.com", "password123")
assert mock_connection.sendmail_called
assert mock_connection.quit_called
def test_oauth2_send_success(self, mock_connection, mock_factories):
"""Test successful email send with OAuth2"""
client = SMTPClient(
server="smtp.office365.com",
port=587,
username="user@contoso.com",
password="",
from_addr="sender@contoso.com",
use_tls=True,
opportunistic_tls=True,
oauth_access_token="oauth_token_123",
auth_type="oauth2",
**mock_factories,
)
mail_data = {"to": "recipient@example.com", "subject": "OAuth Test", "html": "<p>OAuth test content</p>"}
client.send(mail_data)
# Verify OAuth authentication was used
assert mock_connection.docmd_called
assert not mock_connection.login_called
assert mock_connection.sendmail_called
assert mock_connection.quit_called
def test_ssl_connection_used_when_configured(self, mock_connection):
"""Test SSL connection is used when configured"""
ssl_factory = MockSMTPConnectionFactory(mock_connection)
regular_factory = MockSMTPConnectionFactory(mock_connection)
client = SMTPClient(
server="smtp.example.com",
port=465,
username="user@example.com",
password="password123",
from_addr="sender@example.com",
use_tls=True,
opportunistic_tls=False, # Use SSL, not STARTTLS
connection_factory=regular_factory,
ssl_connection_factory=ssl_factory,
)
mail_data = {"to": "recipient@example.com", "subject": "SSL Test", "html": "<p>SSL test content</p>"}
client.send(mail_data)
# Verify SSL factory was used
assert ssl_factory.create_called
assert not regular_factory.create_called
# No STARTTLS with SSL connection
assert not mock_connection.starttls_called
def test_connection_cleanup_on_error(self, mock_connection, mock_factories):
"""Test connection is cleaned up even on error"""
# Make sendmail fail
mock_connection.sendmail = Mock(side_effect=smtplib.SMTPException("Send failed"))
client = SMTPClient(
server="smtp.example.com",
port=587,
username="user@example.com",
password="password123",
from_addr="sender@example.com",
**mock_factories,
)
mail_data = {"to": "recipient@example.com", "subject": "Test", "html": "<p>Test</p>"}
with pytest.raises(smtplib.SMTPException):
client.send(mail_data)
# Verify quit was still called
assert mock_connection.quit_called
def test_custom_authenticator_injection(self, mock_connection, mock_factories):
"""Test custom authenticator can be injected"""
custom_authenticator = Mock(spec=SMTPAuthenticator)
client = SMTPClient(
server="smtp.example.com",
port=587,
username="user@example.com",
password="password123",
from_addr="sender@example.com",
authenticator=custom_authenticator,
**mock_factories,
)
mail_data = {"to": "recipient@example.com", "subject": "Test", "html": "<p>Test</p>"}
client.send(mail_data)
# Verify custom authenticator was used
custom_authenticator.authenticate_basic.assert_called_once()
def test_custom_message_builder_injection(self, mock_connection, mock_factories):
"""Test custom message builder can be injected"""
custom_builder = Mock(spec=SMTPMessageBuilder)
custom_msg = MagicMock()
custom_msg.as_string.return_value = "custom message"
custom_builder.build_message.return_value = custom_msg
client = SMTPClient(
server="smtp.example.com",
port=587,
username="user@example.com",
password="password123",
from_addr="sender@example.com",
message_builder=custom_builder,
**mock_factories,
)
mail_data = {"to": "recipient@example.com", "subject": "Test", "html": "<p>Test</p>"}
client.send(mail_data)
# Verify custom builder was used
custom_builder.build_message.assert_called_once_with(mail_data, "sender@example.com")
assert mock_connection.last_sendmail_args[2] == "custom message"
class TestIntegration:
"""Integration tests showing how components work together"""
def test_complete_oauth_flow_without_io(self):
"""Test complete OAuth flow without any real I/O"""
# Create all mocks
mock_connection = MockSMTPConnection()
connection_factory = MockSMTPConnectionFactory(mock_connection)
# Create client with OAuth
client = SMTPClient(
server="smtp.office365.com",
port=587,
username="test@contoso.com",
password="",
from_addr="test@contoso.com",
use_tls=True,
opportunistic_tls=True,
oauth_access_token="mock_oauth_token",
auth_type="oauth2",
connection_factory=connection_factory,
ssl_connection_factory=connection_factory,
)
# Send email
mail_data = {
"to": "recipient@example.com",
"subject": "OAuth Integration Test",
"html": "<h1>Hello OAuth!</h1>",
}
client.send(mail_data)
# Verify complete flow
assert connection_factory.create_called
assert mock_connection.ehlo_called == 2
assert mock_connection.starttls_called
assert mock_connection.docmd_called
assert "XOAUTH2" in mock_connection.last_docmd_args[1]
assert mock_connection.sendmail_called
assert mock_connection.quit_called
# Verify email data
from_addr, to_addr, msg_str = mock_connection.last_sendmail_args
assert from_addr == "test@contoso.com"
assert to_addr == "recipient@example.com"
assert "OAuth Integration Test" in msg_str
assert "Hello OAuth!" in msg_str