mirror of
https://github.com/langgenius/dify.git
synced 2026-03-23 15:27:53 +08:00
fix start_time -> update_time
This commit is contained in:
@ -4,11 +4,11 @@ API Token Cache Module
|
||||
Provides Redis-based caching for API token validation to reduce database load.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
import orjson
|
||||
from pydantic import BaseModel
|
||||
|
||||
from extensions.ext_redis import redis_client, redis_fallback
|
||||
@ -44,7 +44,7 @@ class CachedApiToken(BaseModel):
|
||||
# Cache configuration
|
||||
CACHE_KEY_PREFIX = "api_token"
|
||||
CACHE_TTL_SECONDS = 600 # 10 minutes
|
||||
CACHE_NULL_TTL_SECONDS = 60 # 1 minute for non-existent tokens (防穿透)
|
||||
CACHE_NULL_TTL_SECONDS = 60 # 1 minute for non-existent tokens
|
||||
|
||||
|
||||
class ApiTokenCache:
|
||||
@ -69,20 +69,21 @@ class ApiTokenCache:
|
||||
return f"{CACHE_KEY_PREFIX}:{scope_str}:{token}"
|
||||
|
||||
@staticmethod
|
||||
def _serialize_token(api_token: Any) -> str:
|
||||
def _serialize_token(api_token: Any) -> bytes:
|
||||
"""
|
||||
Serialize ApiToken object to JSON string.
|
||||
Serialize ApiToken object to JSON bytes using orjson for better performance.
|
||||
|
||||
Args:
|
||||
api_token: ApiToken model instance or CachedApiToken
|
||||
|
||||
Returns:
|
||||
JSON string representation
|
||||
JSON bytes representation
|
||||
"""
|
||||
# If it's already a Pydantic model, use model_dump_json
|
||||
# If it's already a Pydantic model, use model_dump
|
||||
if isinstance(api_token, CachedApiToken):
|
||||
return api_token.model_dump_json()
|
||||
|
||||
# Pydantic model -> dict -> orjson
|
||||
return orjson.dumps(api_token.model_dump(mode="json"))
|
||||
|
||||
# Otherwise, convert from SQLAlchemy model
|
||||
data = {
|
||||
"id": str(api_token.id),
|
||||
@ -93,28 +94,30 @@ class ApiTokenCache:
|
||||
"last_used_at": api_token.last_used_at.isoformat() if api_token.last_used_at else None,
|
||||
"created_at": api_token.created_at.isoformat() if api_token.created_at else None,
|
||||
}
|
||||
return json.dumps(data)
|
||||
return orjson.dumps(data)
|
||||
|
||||
@staticmethod
|
||||
def _deserialize_token(cached_data: str) -> Any:
|
||||
def _deserialize_token(cached_data: bytes | str) -> Any:
|
||||
"""
|
||||
Deserialize JSON string back to a CachedApiToken Pydantic model.
|
||||
Deserialize JSON bytes/string back to a CachedApiToken Pydantic model using orjson.
|
||||
|
||||
Args:
|
||||
cached_data: JSON string from cache
|
||||
cached_data: JSON bytes or string from cache
|
||||
|
||||
Returns:
|
||||
CachedApiToken instance or None
|
||||
"""
|
||||
if cached_data == "null":
|
||||
if cached_data in {b"null", "null"}:
|
||||
# Cached null value (token doesn't exist)
|
||||
return None
|
||||
|
||||
try:
|
||||
# Use Pydantic's model_validate_json for automatic validation
|
||||
token_obj = CachedApiToken.model_validate_json(cached_data)
|
||||
# orjson.loads accepts bytes or str
|
||||
data = orjson.loads(cached_data)
|
||||
# Use Pydantic's model_validate for automatic validation
|
||||
token_obj = CachedApiToken.model_validate(data)
|
||||
return token_obj
|
||||
except (json.JSONDecodeError, ValueError) as e:
|
||||
except (ValueError, orjson.JSONDecodeError) as e:
|
||||
logger.warning("Failed to deserialize token from cache: %s", e)
|
||||
return None
|
||||
|
||||
@ -138,10 +141,7 @@ class ApiTokenCache:
|
||||
logger.debug("Cache miss for token key: %s", cache_key)
|
||||
return None
|
||||
|
||||
# Decode bytes to string
|
||||
if isinstance(cached_data, bytes):
|
||||
cached_data = cached_data.decode("utf-8")
|
||||
|
||||
# orjson.loads handles both bytes and str automatically
|
||||
logger.debug("Cache hit for token key: %s", cache_key)
|
||||
return ApiTokenCache._deserialize_token(cached_data)
|
||||
|
||||
@ -207,7 +207,7 @@ class ApiTokenCache:
|
||||
|
||||
if api_token is None:
|
||||
# Cache null value to prevent cache penetration
|
||||
cached_value = "null"
|
||||
cached_value = b"null"
|
||||
ttl = CACHE_NULL_TTL_SECONDS
|
||||
else:
|
||||
cached_value = ApiTokenCache._serialize_token(api_token)
|
||||
@ -259,9 +259,7 @@ class ApiTokenCache:
|
||||
try:
|
||||
cached_data = redis_client.get(cache_key)
|
||||
if cached_data and cached_data != b"null":
|
||||
if isinstance(cached_data, bytes):
|
||||
cached_data = cached_data.decode("utf-8")
|
||||
data = json.loads(cached_data)
|
||||
data = orjson.loads(cached_data)
|
||||
tenant_id = data.get("tenant_id")
|
||||
except Exception as e:
|
||||
# If we can't get tenant_id, just delete the key without index cleanup
|
||||
|
||||
@ -29,6 +29,9 @@ def update_api_token_last_used_task(self, token: str, scope: str | None, update_
|
||||
|
||||
Returns:
|
||||
Dict with status and metadata
|
||||
|
||||
Raises:
|
||||
Exception: Re-raises exceptions to allow Celery retry mechanism and monitoring
|
||||
"""
|
||||
try:
|
||||
# Parse update_time from ISO format
|
||||
@ -39,9 +42,19 @@ def update_api_token_last_used_task(self, token: str, scope: str | None, update_
|
||||
|
||||
if result["status"] == "updated":
|
||||
logger.info("Updated last_used_at for token (async): %s... (scope: %s)", token[:10], scope)
|
||||
elif result["status"] == "failed":
|
||||
# If update failed, log and raise for retry
|
||||
error_msg = result.get("error", "Unknown error")
|
||||
logger.error("Failed to update last_used_at for token (async): %s", error_msg)
|
||||
raise Exception(f"Token update failed: {error_msg}")
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("Failed to update last_used_at for token (async): %s", e)
|
||||
return {"status": "failed", "error": str(e)}
|
||||
except Exception as exc:
|
||||
# Log the error with full context (logger.exception includes traceback)
|
||||
logger.exception("Error in update_api_token_last_used_task (token: %s..., scope: %s)",
|
||||
token[:10], scope)
|
||||
|
||||
# Raise exception to let Celery handle retry and monitoring
|
||||
# This allows Flower and other monitoring tools to track failures
|
||||
raise
|
||||
|
||||
@ -100,7 +100,7 @@ class TestApiTokenCacheRedisIntegration:
|
||||
assert 595 <= ttl <= 600 # Should be around 600 seconds (10 minutes)
|
||||
|
||||
def test_cache_null_value_for_invalid_token(self):
|
||||
"""Test caching null value for invalid tokens (防穿透)."""
|
||||
"""Test caching null value for invalid tokens """
|
||||
# Cache null value
|
||||
result = ApiTokenCache.set(self.test_token, self.test_scope, None)
|
||||
assert result is True
|
||||
|
||||
Reference in New Issue
Block a user