From dcba86b7078b56ebf1a2bbd2a7672e2c3c9c0295 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 4 Feb 2026 03:22:17 +0000 Subject: [PATCH] [autofix.ci] apply automated fixes --- api/controllers/service_api/wraps.py | 2 +- api/libs/api_token_updater.py | 19 ++-- api/tasks/update_api_token_last_used_task.py | 12 +-- .../libs/test_api_token_cache_integration.py | 86 ++++++++----------- 4 files changed, 53 insertions(+), 66 deletions(-) diff --git a/api/controllers/service_api/wraps.py b/api/controllers/service_api/wraps.py index df0ee8d69f..6a71ed9b3f 100644 --- a/api/controllers/service_api/wraps.py +++ b/api/controllers/service_api/wraps.py @@ -332,7 +332,7 @@ def validate_and_get_api_token(scope: str | None = None): with Session(db.engine, expire_on_commit=False) as session: # Use unified update method to avoid code duplication with Celery task update_token_last_used_at(auth_token, scope, current_time, session=session) - + # Query the token stmt = select(ApiToken).where(ApiToken.token == auth_token, ApiToken.type == scope) api_token = session.scalar(stmt) diff --git a/api/libs/api_token_updater.py b/api/libs/api_token_updater.py index 78ce681d3a..371811bb58 100644 --- a/api/libs/api_token_updater.py +++ b/api/libs/api_token_updater.py @@ -19,29 +19,26 @@ logger = logging.getLogger(__name__) def update_token_last_used_at( - token: str, - scope: str | None, - start_time: datetime, - session: Session | None = None + token: str, scope: str | None, start_time: datetime, session: Session | None = None ) -> dict: """ Unified method to update API token last_used_at timestamp. - + This method is used by both: 1. Direct database update (cache miss scenario) 2. Async Celery task (cache hit scenario) - + Args: token: The API token string scope: The token type/scope (e.g., 'app', 'dataset') start_time: The request start time (for concurrency control) session: Optional existing session to use (if None, creates new one) - + Returns: Dict with status, rowcount, and other metadata """ current_time = naive_utc_now() - + def _do_update(s: Session) -> dict: """Execute the update within the session.""" update_stmt = ( @@ -55,7 +52,7 @@ def update_token_last_used_at( .values(last_used_at=current_time) ) result = s.execute(update_stmt) - + rowcount = getattr(result, "rowcount", 0) if rowcount > 0: s.commit() @@ -64,7 +61,7 @@ def update_token_last_used_at( else: logger.debug("No update needed for token: %s... (already up-to-date)", token[:10]) return {"status": "no_update_needed", "reason": "last_used_at >= start_time"} - + try: if session: # Use provided session (sync path) @@ -73,7 +70,7 @@ def update_token_last_used_at( # Create new session (async path) with Session(db.engine, expire_on_commit=False) as new_session: return _do_update(new_session) - + except Exception as e: logger.warning("Failed to update last_used_at for token: %s", e) return {"status": "failed", "error": str(e)} diff --git a/api/tasks/update_api_token_last_used_task.py b/api/tasks/update_api_token_last_used_task.py index f3c7e4aad4..2f65014c8c 100644 --- a/api/tasks/update_api_token_last_used_task.py +++ b/api/tasks/update_api_token_last_used_task.py @@ -18,7 +18,7 @@ def update_api_token_last_used_task(self, token: str, scope: str | None, start_t Asynchronously update the last_used_at timestamp for an API token. Uses the unified update_token_last_used_at() method to avoid code duplication. - + Queue: api_token_update (dedicated queue to isolate from other tasks and prevent accumulation in production environment) @@ -26,22 +26,22 @@ def update_api_token_last_used_task(self, token: str, scope: str | None, start_t token: The API token string scope: The token type/scope (e.g., 'app', 'dataset') start_time_iso: ISO format timestamp of when the request started - + Returns: Dict with status and metadata """ try: # Parse start_time from ISO format start_time = datetime.fromisoformat(start_time_iso) - + # Use unified update method result = update_token_last_used_at(token, scope, start_time, session=None) - + if result["status"] == "updated": logger.info("Updated last_used_at for token (async): %s... (scope: %s)", token[:10], scope) - + return result - + except Exception as e: logger.warning("Failed to update last_used_at for token (async): %s", e) return {"status": "failed", "error": str(e)} diff --git a/api/tests/integration_tests/libs/test_api_token_cache_integration.py b/api/tests/integration_tests/libs/test_api_token_cache_integration.py index 8026ca6902..6e25605bdf 100644 --- a/api/tests/integration_tests/libs/test_api_token_cache_integration.py +++ b/api/tests/integration_tests/libs/test_api_token_cache_integration.py @@ -27,7 +27,7 @@ class TestApiTokenCacheRedisIntegration: self.test_token = "test-integration-token-123" self.test_scope = "app" self.cache_key = f"api_token:{self.test_scope}:{self.test_token}" - + # Clean up any existing test data self._cleanup() @@ -51,7 +51,7 @@ class TestApiTokenCacheRedisIntegration: """Test cache set and get operations with real Redis.""" # Create a mock token from unittest.mock import MagicMock - + mock_token = MagicMock() mock_token.id = "test-id-123" mock_token.app_id = "test-app-456" @@ -82,7 +82,7 @@ class TestApiTokenCacheRedisIntegration: def test_cache_ttl_with_real_redis(self): """Test cache TTL is set correctly.""" from unittest.mock import MagicMock - + mock_token = MagicMock() mock_token.id = "test-id" mock_token.app_id = "test-app" @@ -120,7 +120,7 @@ class TestApiTokenCacheRedisIntegration: def test_cache_delete_with_real_redis(self): """Test cache deletion with real Redis.""" from unittest.mock import MagicMock - + mock_token = MagicMock() mock_token.id = "test-id" mock_token.app_id = "test-app" @@ -144,7 +144,7 @@ class TestApiTokenCacheRedisIntegration: def test_tenant_index_creation(self): """Test tenant index is created when caching token.""" from unittest.mock import MagicMock - + tenant_id = "test-tenant-id" mock_token = MagicMock() mock_token.id = "test-id" @@ -164,15 +164,15 @@ class TestApiTokenCacheRedisIntegration: # Verify cache key is in the index members = redis_client.smembers(index_key) - cache_keys = [m.decode('utf-8') if isinstance(m, bytes) else m for m in members] + cache_keys = [m.decode("utf-8") if isinstance(m, bytes) else m for m in members] assert self.cache_key in cache_keys def test_invalidate_by_tenant_via_index(self): """Test tenant-wide cache invalidation using index (fast path).""" from unittest.mock import MagicMock - + tenant_id = "test-tenant-id" - + # Create multiple tokens for the same tenant for i in range(3): token_value = f"test-token-{i}" @@ -184,7 +184,7 @@ class TestApiTokenCacheRedisIntegration: mock_token.token = token_value mock_token.last_used_at = None mock_token.created_at = datetime.now() - + ApiTokenCache.set(token_value, "app", mock_token) # Verify all cached @@ -208,7 +208,7 @@ class TestApiTokenCacheRedisIntegration: """Test concurrent cache access doesn't cause issues.""" import concurrent.futures from unittest.mock import MagicMock - + mock_token = MagicMock() mock_token.id = "test-id" mock_token.app_id = "test-app" @@ -251,19 +251,14 @@ class TestApiTokenUpdaterIntegration: test_token.tenant_id = "test-tenant" test_token.last_used_at = datetime.now() - timedelta(minutes=10) test_token.created_at = datetime.now() - timedelta(days=30) - + db_session.add(test_token) db_session.commit() try: # Update using unified method start_time = datetime.now() - result = update_token_last_used_at( - test_token.token, - test_token.type, - start_time, - session=db_session - ) + result = update_token_last_used_at(test_token.token, test_token.type, start_time, session=db_session) # Verify result assert result["status"] == "updated" @@ -283,7 +278,7 @@ class TestApiTokenUpdaterIntegration: class TestCeleryTaskIntegration: """ Integration tests for Celery task. - + Requires Celery worker running with api_token_update queue. Run with: pytest -m celery_integration """ @@ -292,7 +287,7 @@ class TestCeleryTaskIntegration: def test_celery_task_execution(self, db_session): """Test Celery task can be executed successfully.""" from tasks.update_api_token_last_used_task import update_api_token_last_used_task - + # Create a test token in database test_token = ApiToken() test_token.id = "test-celery-id" @@ -302,18 +297,14 @@ class TestCeleryTaskIntegration: test_token.tenant_id = "test-tenant" test_token.last_used_at = datetime.now() - timedelta(minutes=10) test_token.created_at = datetime.now() - timedelta(days=30) - + db_session.add(test_token) db_session.commit() try: # Send task start_time_iso = datetime.now().isoformat() - result = update_api_token_last_used_task.delay( - test_token.token, - test_token.type, - start_time_iso - ) + result = update_api_token_last_used_task.delay(test_token.token, test_token.type, start_time_iso) # Wait for task to complete (with timeout) task_result = result.get(timeout=10) @@ -335,7 +326,7 @@ class TestCeleryTaskIntegration: def test_concurrent_celery_tasks_with_redis_lock(self, db_session): """Test multiple Celery tasks with Redis lock (é˜²ęŠ–).""" from tasks.update_api_token_last_used_task import update_api_token_last_used_task - + # Create a test token test_token = ApiToken() test_token.id = "test-concurrent-id" @@ -345,7 +336,7 @@ class TestCeleryTaskIntegration: test_token.tenant_id = "test-tenant" test_token.last_used_at = datetime.now() - timedelta(minutes=10) test_token.created_at = datetime.now() - timedelta(days=30) - + db_session.add(test_token) db_session.commit() @@ -354,11 +345,7 @@ class TestCeleryTaskIntegration: start_time_iso = datetime.now().isoformat() tasks = [] for _ in range(10): - result = update_api_token_last_used_task.delay( - test_token.token, - test_token.type, - start_time_iso - ) + result = update_api_token_last_used_task.delay(test_token.token, test_token.type, start_time_iso) tasks.append(result) # Wait for all tasks @@ -389,10 +376,10 @@ class TestEndToEndCacheFlow: 2. Second request (cache hit) -> return from cache 3. Verify Redis state """ - + test_token_value = "test-e2e-token" test_scope = "app" - + # Create test token in DB test_token = ApiToken() test_token.id = "test-e2e-id" @@ -402,7 +389,7 @@ class TestEndToEndCacheFlow: test_token.tenant_id = "test-tenant" test_token.last_used_at = None test_token.created_at = datetime.now() - + db_session.add(test_token) db_session.commit() @@ -442,10 +429,10 @@ class TestEndToEndCacheFlow: """Simulate high concurrency access to cache.""" import concurrent.futures from unittest.mock import MagicMock - + test_token_value = "test-concurrent-token" test_scope = "app" - + # Setup cache mock_token = MagicMock() mock_token.id = "concurrent-id" @@ -455,7 +442,7 @@ class TestEndToEndCacheFlow: mock_token.token = test_token_value mock_token.last_used_at = datetime.now() mock_token.created_at = datetime.now() - + ApiTokenCache.set(test_token_value, test_scope, mock_token) try: @@ -472,10 +459,10 @@ class TestEndToEndCacheFlow: # All should succeed assert len(results) == 100 assert all(r is not None for r in results) - + # Should be fast (< 1 second for 100 reads) assert elapsed < 1.0, f"Too slow: {elapsed}s for 100 cache reads" - + print(f"\nāœ“ 100 concurrent cache reads in {elapsed:.3f}s") print(f"āœ“ Average: {(elapsed / 100) * 1000:.2f}ms per read") @@ -492,7 +479,7 @@ class TestRedisFailover: def test_graceful_degradation_when_redis_fails(self, mock_redis): """Test system degrades gracefully when Redis is unavailable.""" from redis import RedisError - + # Simulate Redis failure mock_redis.get.side_effect = RedisError("Connection failed") mock_redis.setex.side_effect = RedisError("Connection failed") @@ -509,10 +496,13 @@ class TestRedisFailover: if __name__ == "__main__": # Run integration tests - pytest.main([ - __file__, - "-v", - "-s", - "--tb=short", - "-m", "not celery_integration" # Skip Celery tests by default - ]) + pytest.main( + [ + __file__, + "-v", + "-s", + "--tb=short", + "-m", + "not celery_integration", # Skip Celery tests by default + ] + )