mirror of
https://github.com/langgenius/dify.git
synced 2026-05-04 09:28:04 +08:00
add redis for api token
This commit is contained in:
@ -17,6 +17,7 @@ from werkzeug.exceptions import Forbidden, NotFound, Unauthorized
|
||||
from enums.cloud_plan import CloudPlan
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from libs.api_token_cache import ApiTokenCache
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from libs.login import current_user
|
||||
from models import Account, Tenant, TenantAccountJoin, TenantStatus
|
||||
@ -296,7 +297,14 @@ def validate_dataset_token(view: Callable[Concatenate[T, P], R] | None = None):
|
||||
|
||||
def validate_and_get_api_token(scope: str | None = None):
|
||||
"""
|
||||
Validate and get API token.
|
||||
Validate and get API token with Redis caching.
|
||||
|
||||
This function uses a two-tier approach:
|
||||
1. First checks Redis cache for the token
|
||||
2. If not cached, queries database and caches the result
|
||||
|
||||
The last_used_at field is updated asynchronously via Celery task
|
||||
to avoid blocking the request.
|
||||
"""
|
||||
auth_header = request.headers.get("Authorization")
|
||||
if auth_header is None or " " not in auth_header:
|
||||
@ -308,8 +316,20 @@ def validate_and_get_api_token(scope: str | None = None):
|
||||
if auth_scheme != "bearer":
|
||||
raise Unauthorized("Authorization scheme must be 'Bearer'")
|
||||
|
||||
# Try to get token from cache first
|
||||
# Returns a CachedApiToken (plain Python object), not a SQLAlchemy model
|
||||
cached_token = ApiTokenCache.get(auth_token, scope)
|
||||
if cached_token is not None:
|
||||
logger.debug("Token validation served from cache for scope: %s", scope)
|
||||
# Asynchronously update last_used_at (non-blocking)
|
||||
_async_update_token_last_used_at(auth_token, scope)
|
||||
return cached_token
|
||||
|
||||
# Cache miss - query database
|
||||
logger.debug("Token cache miss, querying database for scope: %s", scope)
|
||||
current_time = naive_utc_now()
|
||||
cutoff_time = current_time - timedelta(minutes=1)
|
||||
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
update_stmt = (
|
||||
update(ApiToken)
|
||||
@ -329,10 +349,35 @@ def validate_and_get_api_token(scope: str | None = None):
|
||||
|
||||
if not api_token:
|
||||
raise Unauthorized("Access token is invalid")
|
||||
# Cache the valid token
|
||||
ApiTokenCache.set(auth_token, scope, api_token)
|
||||
|
||||
return api_token
|
||||
|
||||
|
||||
def _async_update_token_last_used_at(auth_token: str, scope: str | None):
|
||||
"""
|
||||
Asynchronously update the last_used_at timestamp for a token.
|
||||
|
||||
This schedules a Celery task to update the database without blocking
|
||||
the current request. The start time is passed to ensure only older
|
||||
records are updated, providing natural concurrency control.
|
||||
"""
|
||||
try:
|
||||
from tasks.update_api_token_last_used_task import update_api_token_last_used_task
|
||||
|
||||
# Record the request start time for concurrency control
|
||||
start_time = naive_utc_now()
|
||||
start_time_iso = start_time.isoformat()
|
||||
|
||||
# Fire and forget - don't wait for result
|
||||
update_api_token_last_used_task.delay(auth_token, scope, start_time_iso)
|
||||
logger.debug("Scheduled async update for last_used_at (scope: %s, start_time: %s)", scope, start_time_iso)
|
||||
except Exception as e:
|
||||
# Don't fail the request if task scheduling fails
|
||||
logger.warning("Failed to schedule last_used_at update task: %s", e)
|
||||
|
||||
|
||||
class DatasetApiResource(Resource):
|
||||
method_decorators = [validate_dataset_token]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user