mirror of
https://github.com/langgenius/dify.git
synced 2026-05-20 16:57:01 +08:00
Bearer auth surface for /openapi/v1/* run-routes:
- OAUTH_BEARER_PIPELINE (renamed from APP_PIPELINE for clarity outside this
module) composes BearerCheck → ScopeCheck → AppResolver →
WorkspaceMembershipCheck → AppAuthzCheck → CallerMount.
- BearerAuthenticator.authenticate() is the single source of identity +
rate-limit. Both pipeline (BearerCheck) and decorator (validate_bearer)
delegate to it, so per-token rate limit fires exactly once per request.
- Layer 0 (workspace membership) is CE-only; on EE the gateway owns
tenant isolation. Verdicts are cached on the AuthContext entry as
verified_tenants: dict[str, bool] (legacy "ok"/"denied" strings tolerated
by from_cache for one TTL cycle, then removed).
- check_workspace_membership(...) is the shared core; the pipeline step
and the inline require_workspace_member helper both delegate to it.
- Per-token rate limit: 60/min sliding window, RFC-7231-compliant 429
with Retry-After header + JSON body { error, retry_after_ms }. Bucket
key is sha256(token) so all replicas share state via Redis.
API hygiene:
- Scope StrEnum (FULL, APPS_READ, APPS_RUN) replaces bare string literals.
- /openapi/v1/apps/<id>/info: scope flipped from apps:run to apps:read.
- /info migrates off the pipeline to validate_bearer + require_scope +
require_workspace_member (no AppAuthzCheck/CallerMount needed for reads).
- ResolvedRow gains to_cache() / from_cache() classmethods.
- AuthContext gains token_hash + verified_tenants, dropping the per-route
re-hash and per-request Redis read on the cache hit path.
OPENAPI_RATE_LIMIT_PER_TOKEN config (default 60).
132 lines
4.4 KiB
Python
132 lines
4.4 KiB
Python
"""Pipeline steps. Each is one responsibility.
|
|
|
|
`BearerCheck` is the only step that touches the token registry; downstream
|
|
steps see only the populated `Context`.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Callable
|
|
|
|
from werkzeug.exceptions import BadRequest, Forbidden, NotFound, Unauthorized
|
|
|
|
from configs import dify_config
|
|
from controllers.openapi.auth.context import Context
|
|
from controllers.openapi.auth.strategies import AppAuthzStrategy, CallerMounter
|
|
from extensions.ext_database import db
|
|
from libs.oauth_bearer import (
|
|
InvalidBearerError,
|
|
Scope,
|
|
SubjectType,
|
|
_extract_bearer, # type: ignore[attr-defined]
|
|
check_workspace_membership,
|
|
get_authenticator,
|
|
)
|
|
from models import App, Tenant, TenantStatus
|
|
|
|
|
|
class BearerCheck:
|
|
"""Resolve bearer → populate identity fields. Rate-limit is enforced
|
|
inside `BearerAuthenticator.authenticate`, so no separate step here."""
|
|
|
|
def __call__(self, ctx: Context) -> None:
|
|
token = _extract_bearer(ctx.request)
|
|
if not token:
|
|
raise Unauthorized("bearer required")
|
|
|
|
try:
|
|
authn = get_authenticator().authenticate(token)
|
|
except InvalidBearerError as e:
|
|
raise Unauthorized(str(e))
|
|
|
|
ctx.subject_type = authn.subject_type
|
|
ctx.subject_email = authn.subject_email
|
|
ctx.subject_issuer = authn.subject_issuer
|
|
ctx.account_id = authn.account_id
|
|
ctx.scopes = frozenset(authn.scopes)
|
|
ctx.source = authn.source
|
|
ctx.token_id = authn.token_id
|
|
ctx.expires_at = authn.expires_at
|
|
ctx.token_hash = authn.token_hash
|
|
ctx.cached_verified_tenants = dict(authn.verified_tenants)
|
|
|
|
|
|
class ScopeCheck:
|
|
"""Verify ctx.scopes (already populated by BearerCheck) covers required."""
|
|
|
|
def __call__(self, ctx: Context) -> None:
|
|
if Scope.FULL in ctx.scopes or ctx.required_scope in ctx.scopes:
|
|
return
|
|
raise Forbidden("insufficient_scope")
|
|
|
|
|
|
class AppResolver:
|
|
"""Read app_id from request.view_args, populate ctx.app + ctx.tenant.
|
|
|
|
Every endpoint using the OAuth bearer pipeline must declare
|
|
``<string:app_id>`` in its route — that is the design lock-in (no body /
|
|
header coupling).
|
|
"""
|
|
|
|
def __call__(self, ctx: Context) -> None:
|
|
app_id = (ctx.request.view_args or {}).get("app_id")
|
|
if not app_id:
|
|
raise BadRequest("app_id is required in path")
|
|
app = db.session.get(App, app_id)
|
|
if not app or app.status != "normal":
|
|
raise NotFound("app not found")
|
|
if not app.enable_api:
|
|
raise Forbidden("service_api_disabled")
|
|
tenant = db.session.get(Tenant, app.tenant_id)
|
|
if tenant is None or tenant.status == TenantStatus.ARCHIVE:
|
|
raise Forbidden("workspace unavailable")
|
|
ctx.app, ctx.tenant = app, tenant
|
|
|
|
|
|
class WorkspaceMembershipCheck:
|
|
"""Layer 0 — workspace membership gate.
|
|
|
|
CE-only (skipped when ENTERPRISE_ENABLED). Account-subject bearers
|
|
(dfoa_) only — SSO subjects skip.
|
|
"""
|
|
|
|
def __call__(self, ctx: Context) -> None:
|
|
if dify_config.ENTERPRISE_ENABLED:
|
|
return
|
|
if ctx.subject_type != SubjectType.ACCOUNT:
|
|
return
|
|
if ctx.account_id is None or ctx.tenant is None:
|
|
raise Unauthorized("account_id or tenant unset — BearerCheck or AppResolver did not run")
|
|
if ctx.token_hash is None:
|
|
raise Unauthorized("token_hash unset — BearerCheck did not run")
|
|
|
|
check_workspace_membership(
|
|
account_id=ctx.account_id,
|
|
tenant_id=ctx.tenant.id,
|
|
token_hash=ctx.token_hash,
|
|
cached_verdicts=ctx.cached_verified_tenants or {},
|
|
)
|
|
|
|
|
|
class AppAuthzCheck:
|
|
def __init__(self, resolve_strategy: Callable[[], AppAuthzStrategy]) -> None:
|
|
self._resolve = resolve_strategy
|
|
|
|
def __call__(self, ctx: Context) -> None:
|
|
if not self._resolve().authorize(ctx):
|
|
raise Forbidden("subject_no_app_access")
|
|
|
|
|
|
class CallerMount:
|
|
def __init__(self, *mounters: CallerMounter) -> None:
|
|
self._mounters = mounters
|
|
|
|
def __call__(self, ctx: Context) -> None:
|
|
if ctx.subject_type is None:
|
|
raise Unauthorized("subject_type unset — BearerCheck did not run")
|
|
for m in self._mounters:
|
|
if m.applies_to(ctx.subject_type):
|
|
m.mount(ctx)
|
|
return
|
|
raise Unauthorized("no caller mounter for subject type")
|