[autofix.ci] apply automated fixes

This commit is contained in:
autofix-ci[bot]
2026-02-04 05:27:16 +00:00
committed by GitHub
parent 6d87424ab8
commit ef1e233c2d
4 changed files with 13 additions and 14 deletions

View File

@ -402,11 +402,11 @@ def _async_update_token_last_used_at(auth_token: str, scope: str | None):
"""
try:
from tasks.update_api_token_last_used_task import update_api_token_last_used_task
# Record the update time for concurrency control
update_time = naive_utc_now()
update_time_iso = update_time.isoformat()
# Fire and forget - don't wait for result
update_api_token_last_used_task.delay(auth_token, scope, update_time_iso)
logger.debug("Scheduled async update for last_used_at (scope: %s, update_time: %s)", scope, update_time_iso)

View File

@ -83,7 +83,7 @@ class ApiTokenCache:
if isinstance(api_token, CachedApiToken):
# Pydantic model -> dict -> orjson
return orjson.dumps(api_token.model_dump(mode="json"))
# Otherwise, convert from SQLAlchemy model
data = {
"id": str(api_token.id),

View File

@ -18,7 +18,7 @@ def update_api_token_last_used_task(self, token: str, scope: str | None, update_
Asynchronously update the last_used_at timestamp for an API token.
Uses the unified update_token_last_used_at() method to avoid code duplication.
Queue: api_token_update (dedicated queue to isolate from other tasks and
prevent accumulation in production environment)
@ -26,20 +26,20 @@ def update_api_token_last_used_task(self, token: str, scope: str | None, update_
token: The API token string
scope: The token type/scope (e.g., 'app', 'dataset')
update_time_iso: ISO format timestamp for the update operation
Returns:
Dict with status and metadata
Raises:
Exception: Re-raises exceptions to allow Celery retry mechanism and monitoring
"""
try:
# Parse update_time from ISO format
update_time = datetime.fromisoformat(update_time_iso)
# Use unified update method
result = update_token_last_used_at(token, scope, update_time, session=None)
if result["status"] == "updated":
logger.info("Updated last_used_at for token (async): %s... (scope: %s)", token[:10], scope)
elif result["status"] == "failed":
@ -47,14 +47,13 @@ def update_api_token_last_used_task(self, token: str, scope: str | None, update_
error_msg = result.get("error", "Unknown error")
logger.error("Failed to update last_used_at for token (async): %s", error_msg)
raise Exception(f"Token update failed: {error_msg}")
return result
except Exception as exc:
# Log the error with full context (logger.exception includes traceback)
logger.exception("Error in update_api_token_last_used_task (token: %s..., scope: %s)",
token[:10], scope)
logger.exception("Error in update_api_token_last_used_task (token: %s..., scope: %s)", token[:10], scope)
# Raise exception to let Celery handle retry and monitoring
# This allows Flower and other monitoring tools to track failures
raise

View File

@ -100,7 +100,7 @@ class TestApiTokenCacheRedisIntegration:
assert 595 <= ttl <= 600 # Should be around 600 seconds (10 minutes)
def test_cache_null_value_for_invalid_token(self):
"""Test caching null value for invalid tokens """
"""Test caching null value for invalid tokens"""
# Cache null value
result = ApiTokenCache.set(self.test_token, self.test_scope, None)
assert result is True