[autofix.ci] apply automated fixes

This commit is contained in:
autofix-ci[bot]
2026-02-04 03:22:17 +00:00
committed by GitHub
parent d02ed82854
commit dcba86b707
4 changed files with 53 additions and 66 deletions

View File

@ -332,7 +332,7 @@ def validate_and_get_api_token(scope: str | None = None):
with Session(db.engine, expire_on_commit=False) as session:
# Use unified update method to avoid code duplication with Celery task
update_token_last_used_at(auth_token, scope, current_time, session=session)
# Query the token
stmt = select(ApiToken).where(ApiToken.token == auth_token, ApiToken.type == scope)
api_token = session.scalar(stmt)

View File

@ -19,29 +19,26 @@ logger = logging.getLogger(__name__)
def update_token_last_used_at(
token: str,
scope: str | None,
start_time: datetime,
session: Session | None = None
token: str, scope: str | None, start_time: datetime, session: Session | None = None
) -> dict:
"""
Unified method to update API token last_used_at timestamp.
This method is used by both:
1. Direct database update (cache miss scenario)
2. Async Celery task (cache hit scenario)
Args:
token: The API token string
scope: The token type/scope (e.g., 'app', 'dataset')
start_time: The request start time (for concurrency control)
session: Optional existing session to use (if None, creates new one)
Returns:
Dict with status, rowcount, and other metadata
"""
current_time = naive_utc_now()
def _do_update(s: Session) -> dict:
"""Execute the update within the session."""
update_stmt = (
@ -55,7 +52,7 @@ def update_token_last_used_at(
.values(last_used_at=current_time)
)
result = s.execute(update_stmt)
rowcount = getattr(result, "rowcount", 0)
if rowcount > 0:
s.commit()
@ -64,7 +61,7 @@ def update_token_last_used_at(
else:
logger.debug("No update needed for token: %s... (already up-to-date)", token[:10])
return {"status": "no_update_needed", "reason": "last_used_at >= start_time"}
try:
if session:
# Use provided session (sync path)
@ -73,7 +70,7 @@ def update_token_last_used_at(
# Create new session (async path)
with Session(db.engine, expire_on_commit=False) as new_session:
return _do_update(new_session)
except Exception as e:
logger.warning("Failed to update last_used_at for token: %s", e)
return {"status": "failed", "error": str(e)}

View File

@ -18,7 +18,7 @@ def update_api_token_last_used_task(self, token: str, scope: str | None, start_t
Asynchronously update the last_used_at timestamp for an API token.
Uses the unified update_token_last_used_at() method to avoid code duplication.
Queue: api_token_update (dedicated queue to isolate from other tasks and
prevent accumulation in production environment)
@ -26,22 +26,22 @@ def update_api_token_last_used_task(self, token: str, scope: str | None, start_t
token: The API token string
scope: The token type/scope (e.g., 'app', 'dataset')
start_time_iso: ISO format timestamp of when the request started
Returns:
Dict with status and metadata
"""
try:
# Parse start_time from ISO format
start_time = datetime.fromisoformat(start_time_iso)
# Use unified update method
result = update_token_last_used_at(token, scope, start_time, session=None)
if result["status"] == "updated":
logger.info("Updated last_used_at for token (async): %s... (scope: %s)", token[:10], scope)
return result
except Exception as e:
logger.warning("Failed to update last_used_at for token (async): %s", e)
return {"status": "failed", "error": str(e)}

View File

@ -27,7 +27,7 @@ class TestApiTokenCacheRedisIntegration:
self.test_token = "test-integration-token-123"
self.test_scope = "app"
self.cache_key = f"api_token:{self.test_scope}:{self.test_token}"
# Clean up any existing test data
self._cleanup()
@ -51,7 +51,7 @@ class TestApiTokenCacheRedisIntegration:
"""Test cache set and get operations with real Redis."""
# Create a mock token
from unittest.mock import MagicMock
mock_token = MagicMock()
mock_token.id = "test-id-123"
mock_token.app_id = "test-app-456"
@ -82,7 +82,7 @@ class TestApiTokenCacheRedisIntegration:
def test_cache_ttl_with_real_redis(self):
"""Test cache TTL is set correctly."""
from unittest.mock import MagicMock
mock_token = MagicMock()
mock_token.id = "test-id"
mock_token.app_id = "test-app"
@ -120,7 +120,7 @@ class TestApiTokenCacheRedisIntegration:
def test_cache_delete_with_real_redis(self):
"""Test cache deletion with real Redis."""
from unittest.mock import MagicMock
mock_token = MagicMock()
mock_token.id = "test-id"
mock_token.app_id = "test-app"
@ -144,7 +144,7 @@ class TestApiTokenCacheRedisIntegration:
def test_tenant_index_creation(self):
"""Test tenant index is created when caching token."""
from unittest.mock import MagicMock
tenant_id = "test-tenant-id"
mock_token = MagicMock()
mock_token.id = "test-id"
@ -164,15 +164,15 @@ class TestApiTokenCacheRedisIntegration:
# Verify cache key is in the index
members = redis_client.smembers(index_key)
cache_keys = [m.decode('utf-8') if isinstance(m, bytes) else m for m in members]
cache_keys = [m.decode("utf-8") if isinstance(m, bytes) else m for m in members]
assert self.cache_key in cache_keys
def test_invalidate_by_tenant_via_index(self):
"""Test tenant-wide cache invalidation using index (fast path)."""
from unittest.mock import MagicMock
tenant_id = "test-tenant-id"
# Create multiple tokens for the same tenant
for i in range(3):
token_value = f"test-token-{i}"
@ -184,7 +184,7 @@ class TestApiTokenCacheRedisIntegration:
mock_token.token = token_value
mock_token.last_used_at = None
mock_token.created_at = datetime.now()
ApiTokenCache.set(token_value, "app", mock_token)
# Verify all cached
@ -208,7 +208,7 @@ class TestApiTokenCacheRedisIntegration:
"""Test concurrent cache access doesn't cause issues."""
import concurrent.futures
from unittest.mock import MagicMock
mock_token = MagicMock()
mock_token.id = "test-id"
mock_token.app_id = "test-app"
@ -251,19 +251,14 @@ class TestApiTokenUpdaterIntegration:
test_token.tenant_id = "test-tenant"
test_token.last_used_at = datetime.now() - timedelta(minutes=10)
test_token.created_at = datetime.now() - timedelta(days=30)
db_session.add(test_token)
db_session.commit()
try:
# Update using unified method
start_time = datetime.now()
result = update_token_last_used_at(
test_token.token,
test_token.type,
start_time,
session=db_session
)
result = update_token_last_used_at(test_token.token, test_token.type, start_time, session=db_session)
# Verify result
assert result["status"] == "updated"
@ -283,7 +278,7 @@ class TestApiTokenUpdaterIntegration:
class TestCeleryTaskIntegration:
"""
Integration tests for Celery task.
Requires Celery worker running with api_token_update queue.
Run with: pytest -m celery_integration
"""
@ -292,7 +287,7 @@ class TestCeleryTaskIntegration:
def test_celery_task_execution(self, db_session):
"""Test Celery task can be executed successfully."""
from tasks.update_api_token_last_used_task import update_api_token_last_used_task
# Create a test token in database
test_token = ApiToken()
test_token.id = "test-celery-id"
@ -302,18 +297,14 @@ class TestCeleryTaskIntegration:
test_token.tenant_id = "test-tenant"
test_token.last_used_at = datetime.now() - timedelta(minutes=10)
test_token.created_at = datetime.now() - timedelta(days=30)
db_session.add(test_token)
db_session.commit()
try:
# Send task
start_time_iso = datetime.now().isoformat()
result = update_api_token_last_used_task.delay(
test_token.token,
test_token.type,
start_time_iso
)
result = update_api_token_last_used_task.delay(test_token.token, test_token.type, start_time_iso)
# Wait for task to complete (with timeout)
task_result = result.get(timeout=10)
@ -335,7 +326,7 @@ class TestCeleryTaskIntegration:
def test_concurrent_celery_tasks_with_redis_lock(self, db_session):
"""Test multiple Celery tasks with Redis lock (防抖)."""
from tasks.update_api_token_last_used_task import update_api_token_last_used_task
# Create a test token
test_token = ApiToken()
test_token.id = "test-concurrent-id"
@ -345,7 +336,7 @@ class TestCeleryTaskIntegration:
test_token.tenant_id = "test-tenant"
test_token.last_used_at = datetime.now() - timedelta(minutes=10)
test_token.created_at = datetime.now() - timedelta(days=30)
db_session.add(test_token)
db_session.commit()
@ -354,11 +345,7 @@ class TestCeleryTaskIntegration:
start_time_iso = datetime.now().isoformat()
tasks = []
for _ in range(10):
result = update_api_token_last_used_task.delay(
test_token.token,
test_token.type,
start_time_iso
)
result = update_api_token_last_used_task.delay(test_token.token, test_token.type, start_time_iso)
tasks.append(result)
# Wait for all tasks
@ -389,10 +376,10 @@ class TestEndToEndCacheFlow:
2. Second request (cache hit) -> return from cache
3. Verify Redis state
"""
test_token_value = "test-e2e-token"
test_scope = "app"
# Create test token in DB
test_token = ApiToken()
test_token.id = "test-e2e-id"
@ -402,7 +389,7 @@ class TestEndToEndCacheFlow:
test_token.tenant_id = "test-tenant"
test_token.last_used_at = None
test_token.created_at = datetime.now()
db_session.add(test_token)
db_session.commit()
@ -442,10 +429,10 @@ class TestEndToEndCacheFlow:
"""Simulate high concurrency access to cache."""
import concurrent.futures
from unittest.mock import MagicMock
test_token_value = "test-concurrent-token"
test_scope = "app"
# Setup cache
mock_token = MagicMock()
mock_token.id = "concurrent-id"
@ -455,7 +442,7 @@ class TestEndToEndCacheFlow:
mock_token.token = test_token_value
mock_token.last_used_at = datetime.now()
mock_token.created_at = datetime.now()
ApiTokenCache.set(test_token_value, test_scope, mock_token)
try:
@ -472,10 +459,10 @@ class TestEndToEndCacheFlow:
# All should succeed
assert len(results) == 100
assert all(r is not None for r in results)
# Should be fast (< 1 second for 100 reads)
assert elapsed < 1.0, f"Too slow: {elapsed}s for 100 cache reads"
print(f"\n✓ 100 concurrent cache reads in {elapsed:.3f}s")
print(f"✓ Average: {(elapsed / 100) * 1000:.2f}ms per read")
@ -492,7 +479,7 @@ class TestRedisFailover:
def test_graceful_degradation_when_redis_fails(self, mock_redis):
"""Test system degrades gracefully when Redis is unavailable."""
from redis import RedisError
# Simulate Redis failure
mock_redis.get.side_effect = RedisError("Connection failed")
mock_redis.setex.side_effect = RedisError("Connection failed")
@ -509,10 +496,13 @@ class TestRedisFailover:
if __name__ == "__main__":
# Run integration tests
pytest.main([
__file__,
"-v",
"-s",
"--tb=short",
"-m", "not celery_integration" # Skip Celery tests by default
])
pytest.main(
[
__file__,
"-v",
"-s",
"--tb=short",
"-m",
"not celery_integration", # Skip Celery tests by default
]
)