Compare commits

..

53 Commits

Author SHA1 Message Date
4c0f33d0d7 fix(openapi/apps): move uuid fast-path before tag guard in list endpoint 2026-05-08 19:06:05 -07:00
668e9c7864 feat(openapi/apps): list accepts uuid in name param; dispatches to pk lookup 2026-05-08 19:03:22 -07:00
a7c481ce87 fix(openapi/apps): normalise uuid in session.get; validate workspace_id format in query 2026-05-08 18:43:23 -07:00
507eb1f52f feat(openapi/apps): describe accepts name arg; uuid-parse dispatches pk vs name lookup 2026-05-08 18:33:13 -07:00
8e2ab1367b fix(openapi): /run swallowed HTTP errors as 500
Explicit re-raise list at L230-238 only covered UnprocessableEntity +
NotChatAppError/NotWorkflowAppError. Other HTTPException subclasses
raised inside handlers (NotFound, BadRequest, ConversationCompletedError,
ProviderNotInitializeError, ProviderQuotaExceededError, ...) hit
`except Exception` and got squashed to 500. Replace with
`except HTTPException: raise`.

Refactor bundled: collapse 3x try/except ladder into
_translate_service_errors() ctxmgr, inline single-call constraint
enforcers, drop wasted dict() copy in _unpack_blocking, trim module
docstring and stale spec doc reference. -60 net lines.
2026-05-07 13:53:19 -07:00
0c568623d7 test(openapi): pin invoke_from + user-strip invariants on /run
Restores two assertions lost when the legacy per-mode unit tests
were deleted in api-3 Task 4:
- invoke_from == InvokeFrom.OPENAPI on the unified runner
- body-side user field is stripped before reaching the generator
  (Model 2: bearer is identity, body cannot spoof user)

Both run as part of test_run_chat_dispatches_to_chat_handler;
no new tests added.
2026-05-07 01:35:53 -07:00
fb7b8dc151 refactor(openapi): drop legacy per-mode bearer routes
Removes /openapi/v1/apps/<id>/{chat-messages,completion-messages,
workflows/run}. Bearer surface for runs is now the unified /run route
(api-3). Service-API /v1/* per-mode routes (app-key auth) untouched.

Also deletes the corresponding unit test files
(test_chat_messages.py, test_completion_messages.py, test_workflow_run.py)
which targeted the removed handlers; coverage of the unified route lives
in tests/unit_tests/controllers/openapi/test_app_run_dispatch.py and
tests/integration_tests/controllers/openapi/test_app_run.py.
2026-05-07 01:28:12 -07:00
4bc1046f14 fix(openapi): tighten /run handler error path + drop cargo-cult call
- Drop except ValueError: raise. Inherited from per-mode controllers
  without examining purpose; today it converts helper-internal
  ValueErrors into uncaptured 500s with no body or log. Falling
  through to except Exception: gives them a logged trace and a
  structured InternalServerError.
- Drop redundant AppMode.value_of(app_model.mode). App.mode is
  Mapped[AppMode] with an EnumText adapter that returns the enum
  directly; value_of was a no-op iteration.
- Comment the explicit re-raise block to spell out why ordering
  matters before the catch-all.
2026-05-07 01:08:59 -07:00
f2ec17be9b feat(openapi): unified POST /apps/<id>/run with per-mode dispatch
Single bearer-accepting run route on the openapi namespace. Server
reads apps.mode after AppResolver and dispatches via the _DISPATCH
table to the per-mode helper. Per-mode constraints enforced inside
helpers (422). Service-API /v1/* per-mode routes untouched.

Also fixes a pre-existing latent bug in the openapi integration
fixtures: App() rows were constructed without enable_site, which
DB INSERT rejected (column is NOT NULL with no default). Now set
enable_site=True alongside enable_api=True in the three fixtures
that construct App() rows.
2026-05-07 00:35:47 -07:00
6532b4d161 fix(openapi): tighten _DISPATCH return type + cover helper edge cases
- _DISPATCH value type: tuple[Any, dict[str, Any] | None] (was Any, Any).
  Helpers always return one of (stream_obj, None) or (None, dict);
  tightened sig lets mypy flag wrong shapes when Task 3's route handler
  unpacks the result.
- Comment _run_completion's auto_generate_name + query mutations
  (legacy parity; non-obvious without grep-archaeology).
- Unit cover _unpack_blocking (mapping / tuple / non-mapping branches)
  + AppRunRequest.conversation_id field validator (strip-to-None,
  invalid-uuid raise, valid-uuid passthrough).
2026-05-07 00:05:20 -07:00
4bfc4af590 feat(openapi): AppRunRequest schema + per-mode dispatch helpers
Lays the foundation for the unified /openapi/v1/apps/<id>/run route
without yet registering it. Helpers preserve the per-mode exception
fans + response shapes byte-for-byte from the existing chat-messages /
completion-messages / workflows-run controllers.
2026-05-06 23:57:13 -07:00
1fb7329327 fix(openapi): tighten WorkflowRunResponse.mode + outputs default
- WorkflowRunResponse.mode: Literal["workflow"] (was str) — only one
  valid value, so Literal makes the contract explicit.
- WorkflowRunData.outputs: Field(default_factory=dict) — matches the
  sibling metadata field's idiom; avoids the mutable-literal-default
  smell flagged in code review.
- Extends test_response_models_dump_per_mode with an explicit assertion
  on the WorkflowRunResponse.mode echo + exercises CompletionMessageResponse
  (was imported-but-unused).
2026-05-06 23:51:54 -07:00
40ae39a3a3 refactor(openapi): co-locate per-mode run response models in _models.py
Prepares for the api-3 unified /run route which imports all three
response shapes from one location. The per-mode files (chat_messages,
completion_messages, workflow_run) still define their own copies inline
until Task 4 deletes them — no collision since neither location is
imported anywhere else.
2026-05-06 23:46:53 -07:00
35c08f7c3d feat(enterprise): wire /apps/permitted via EE wrapper (app_ids only)
services/enterprise/app_permitted_service.list_permitted_apps calls
EnterpriseService.WebAppAuth.list_externally_accessible_apps and decodes
the camelCase wrapper response into a PermittedAppsPage carrying just
app_ids. The controller hydrates name/mode/tenant/etc. from local
App/Tenant rows.

The EE response shape is {data: [appId], total, hasMore} per ee-2. EE owns
access control only; dify/api owns app data, so the older inner-API
metadata fanout (/inner/api/enterprise/apps/batch-metadata) is removed.

- delete controllers/inner_api/app/metadata.py + its test
- service: ServiceUnavailable on EnterpriseAPIError; 5s timeout via wrapper
- controller: drop fail-fast subject check + unused g/InternalServerError
  imports
2026-05-06 22:50:10 -07:00
7b6ceaebea feat(inner_api): batch-metadata endpoint for EE permitted-apps flow 2026-05-06 03:11:58 -07:00
35d9b6a0f8 feat(openapi): merge /apps/<id>/{info,parameters} into /describe + ?fields
Collapse the openapi-namespace per-app reads into one canonical endpoint
GET /openapi/v1/apps/<id>/describe[?fields=info,parameters,input_schema]
returning a single AppDescribeResponse with all blocks Optional and a new
JSON-Schema input_schema block derived server-side from user_input_form +
app mode.

- AppDescribeQuery (Pydantic, extra=forbid) parses the ?fields allow-list;
  unknown member -> 422.
- _input_schema.build_input_schema(app) derives Draft 2020-12 JSON Schema:
  chat-family modes carry top-level query (string, minLength=1, required);
  workflow / completion only carry inputs. AppUnavailableError -> empty
  sentinel (EMPTY_INPUT_SCHEMA).
- Drop AppByIdApi (/apps/<id>) and AppParametersApi (/apps/<id>/parameters)
  route classes; delete app_info.py module + app_info_payload helper.
- AppDescribeResponse.{info,parameters,input_schema} now Optional[None].

Lock-step deploy with difyctl Phase B (/describe consumer migration).
2026-05-06 00:53:41 -07:00
d1c1c04615 fix(openapi): /apps/permitted hardening + naming
- fail-fast on missing subject_email/subject_issuer for dfoe_
  bearers (was silently coercing None -> empty string and sending
  a malformed query upstream).
- document the has_more contract: total comes from EE inner-API
  unfiltered count; locally-dropped archived rows leave
  len(items) < limit even when has_more=True.
- gate tenant-name lookup in /apps on non-empty rows so empty
  filter results skip the wasted scalar query.
- rename AppListPermittedApi -> AppPermittedListApi for word-order
  consistency with AppPermittedListQuery.
- tests: positive mode acceptance and explicit dfoa_ non-carrier
  assertion.
2026-05-05 21:12:33 -07:00
04ebf8a92f feat(openapi): /apps/permitted — external-subject app discovery (EE)
Split route for dfoe_ external-SSO discovery, separate from /apps
(dfoa_-only workspace catalog). Cross-tenant allow-list query: server
calls Enterprise inner-API POST /inner/api/webapp/permitted-apps and
hydrates app/tenant rows locally. New scope apps:read:permitted (no
dual-meaning with apps:read). Route gated by @enterprise_only — 404
on CE — and validate_bearer(accept=ACCEPT_USER_EXT_SSO) — 403 on dfoa_.
Query validator rejects workspace_id and tag (cross-tenant
unresolvable); mode/name supported.

EE inner-API wire-up depends on ee-2; the service-layer stub raises
ServiceUnavailable until that endpoint ships. CLI dispatches between
/apps and /apps/permitted client-side based on the bearer prefix in
hosts.yml — see docs/specs/v1.0/apps.md §Subject dispatch.

Verified via unit tests on AppPermittedListQuery and Scope wiring;
HTTP integration tests deferred to ee-2 once the inner-API ships.
2026-05-05 20:20:22 -07:00
6f3c2fe97b test(openapi): unit coverage for app payload helpers
app_info_payload, parameters_payload, _EMPTY_PARAMETERS are CLI
contracts. Direct unit tests pin their shape independent of DB +
HTTP plumbing — drift in the helpers fails fast at unit-test time
instead of leaking through into integration runs.
2026-05-05 20:13:03 -07:00
03cd16fc44 refactor(openapi): /account/sessions uses MAX_PAGE_LIMIT
Drops the magic 200 in favor of the shared constant introduced in
Task 1's _models.py rewrite. Behavior unchanged (still caps at 200).
Sibling endpoint /apps already wired the constant through AppListQuery
in Task 3; this closes the loop on the single-source-of-truth goal.
2026-05-05 20:10:51 -07:00
3a6901e718 fix(openapi): /apps 422 body emits JSON
ValidationError -> UnprocessableEntity(exc.json()) so CLI consumers
can parse the error body. The previous str(errors()) produced a
Python repr (single-quoted dicts), not JSON. Also align with
sibling openapi controllers: request.args.to_dict(flat=True)
and 'as exc' naming.

Test cleanup: hoist module-scope imports; add a happy-path
positive case covering every field.
2026-05-05 20:08:43 -07:00
25034612b8 feat(openapi): AppListQuery — Pydantic validation for /apps
Replaces ad-hoc int(request.args.get(...)) parsing in AppListApi.get
with a typed Pydantic query model. Bad inputs (page=abc, limit=-1,
limit=500, mode=invalid, missing workspace_id) raise ValidationError
which the handler converts to 422 with field-level error detail
instead of 500 / silent empty page. Closes the mode whitelist via
AppMode enum.

Verified via direct unit tests on AppListQuery (no HTTP integration
tests required since the model carries the validation contract).
2026-05-05 20:02:47 -07:00
87620050d7 refactor(openapi): tighten _AppReadResource refactor
- Correct docstring: Flask-RESTX iterates method_decorators forward;
  the last entry becomes outermost via composition, not via framework
  reversal.
- Extract shared _APPS_READ_DECORATORS constant; was duplicated
  verbatim between AppReadResource and AppListApi.
- Rename _AppReadResource -> AppReadResource (no longer module-private
  since app_info.py imports it). Drops the pyright ignore.
2026-05-05 19:59:04 -07:00
e006eb7a4b refactor(openapi): _AppReadResource base for per-app reads
Four per-app GETs (/apps/<id>, /info, /parameters, /describe) repeated
the same SSO-guard / app-load / membership-check pattern. Hoist into
_AppReadResource with method_decorators=[require_scope, validate_bearer]
plus _load(app_id) -> (App, AuthContext). Subclasses now 3-line bodies.
Eliminates the per-method # type: ignore[reportUntypedFunctionDecorator]
suppression by relocating the decorator chain to the class attribute.
Endpoints now build typed AppInfoResponse / AppDescribeResponse and
.model_dump() at the boundary.
2026-05-05 19:51:42 -07:00
305de57eff test(openapi): tighten PEP 695 generic assertion
The previous test asserted only that model_fields exposed the
expected names — the legacy Generic[T] form would have passed
identically. Switch to __type_params__, which is non-empty only
under PEP 695 native syntax.
2026-05-05 19:39:49 -07:00
069fdd4894 refactor(openapi): typed app response models + MAX_PAGE_LIMIT
Adds AppListRow, AppInfoResponse, AppDescribeInfo, AppDescribeResponse
per spec docs/specs/v1.0/server/endpoints.md (every response a typed
Pydantic model). Adopts PEP 695 generic syntax for PaginationEnvelope
(drops legacy TypeVar + UP046 noqa). Centralizes the per-endpoint
limit cap as MAX_PAGE_LIMIT = 200.
2026-05-05 19:34:15 -07:00
783dfe38a0 chore(test): consolidate /openapi/v1 integration fixtures
- Shared conftest at tests/integration_tests/controllers/openapi/:
  workspace_account, app_in_workspace, mint_token (factory + tracked
  cleanup of OAuthAccessToken rows), account_token convenience fixture,
  autouse disable_enterprise (default ENTERPRISE_ENABLED=False; tests
  needing the EE branch override in-test), autouse _flush_auth_redis.

- test_auth.py covers Layer 0 + per-token rate limit + scope on /info.
  other_workspace_app fixture is a generator that cleans up the second
  tenant + app on teardown.

- test_apps.py covers the read surface: /apps list with pagination
  envelope, /apps/<id>, /info, /parameters, /describe, /account/sessions
  envelope migration, plus dfoe_ scope rejection on apps:read routes.
2026-05-05 18:08:31 -07:00
86ba361ff1 feat(openapi): app reads + canonical pagination envelope
Read-side surface for difyctl describe / get / list:

- GET /openapi/v1/apps              paginated list (workspace_id required)
- GET /openapi/v1/apps/<id>         single app summary
- GET /openapi/v1/apps/<id>/parameters  port of service_api parameters
- GET /openapi/v1/apps/<id>/describe    merged { info, parameters }

All gated by validate_bearer(ACCEPT_USER_ANY) + require_scope(APPS_READ) +
require_workspace_member(ctx, tenant_id). SSO subjects 404 (account-only
helper account_or_404 deduplicates the guard across the four endpoints).

PaginationEnvelope[T] (page, limit, total, has_more, data) is the canonical
shape for every /openapi/v1/* list endpoint. has_more is computed by the
server from page * limit < total. /account/sessions migrates from the
legacy { sessions: [...] } shape to the envelope; integration tests assert
the legacy key is gone.
2026-05-05 18:08:12 -07:00
591048d7c2 feat(openapi): bearer auth pipeline + Layer 0 + per-token rate limit (CE)
Bearer auth surface for /openapi/v1/* run-routes:

- OAUTH_BEARER_PIPELINE (renamed from APP_PIPELINE for clarity outside this
  module) composes BearerCheck → ScopeCheck → AppResolver →
  WorkspaceMembershipCheck → AppAuthzCheck → CallerMount.
- BearerAuthenticator.authenticate() is the single source of identity +
  rate-limit. Both pipeline (BearerCheck) and decorator (validate_bearer)
  delegate to it, so per-token rate limit fires exactly once per request.
- Layer 0 (workspace membership) is CE-only; on EE the gateway owns
  tenant isolation. Verdicts are cached on the AuthContext entry as
  verified_tenants: dict[str, bool] (legacy "ok"/"denied" strings tolerated
  by from_cache for one TTL cycle, then removed).
- check_workspace_membership(...) is the shared core; the pipeline step
  and the inline require_workspace_member helper both delegate to it.
- Per-token rate limit: 60/min sliding window, RFC-7231-compliant 429
  with Retry-After header + JSON body { error, retry_after_ms }. Bucket
  key is sha256(token) so all replicas share state via Redis.

API hygiene:
- Scope StrEnum (FULL, APPS_READ, APPS_RUN) replaces bare string literals.
- /openapi/v1/apps/<id>/info: scope flipped from apps:run to apps:read.
- /info migrates off the pipeline to validate_bearer + require_scope +
  require_workspace_member (no AppAuthzCheck/CallerMount needed for reads).
- ResolvedRow gains to_cache() / from_cache() classmethods.
- AuthContext gains token_hash + verified_tenants, dropping the per-route
  re-hash and per-request Redis read on the cache hit path.

OPENAPI_RATE_LIMIT_PER_TOKEN config (default 60).
2026-05-05 18:07:47 -07:00
8a62c1d915 chore(api): pyright + ruff cleanup for openapi/cli surface
Type and lint pass over the openapi controllers, auth pipeline, and
oauth bearer/device-flow plumbing. Down from 36 pyright errors and 16
ruff errors to 0/0; 93 openapi unit tests pass.

Logic fixes:
- libs/oauth_bearer.py: drop private-naming on the friend-API methods
  consumed by _VariantResolver (cache_get / cache_set_positive /
  cache_set_negative / hard_expire / session_factory). They were always
  cross-class accessors — leading underscore was misleading. Add public
  registry property on BearerAuthenticator. _hard_expire row_id widened
  to UUID | str (matches the StringUUID column type).
- libs/oauth_bearer.py: type validate_bearer / bearer_feature_required
  with ParamSpec / PEP-695 so wrapped routes preserve their signature.
- libs/rate_limit.py: same — typed rate_limit decorator.
- services/oauth_device_flow.py: mint_oauth_token / _upsert accept
  Session | scoped_session (Flask-SQLAlchemy proxy). Guard row-is-None
  after upsert.
- controllers/openapi/{chat,completion,workflow}_messages.py: tuple-vs-
  Mapping shape narrowing on AppGenerateService.generate return —
  production returns Mapping, tests mock as (body, status). Validate
  through Pydantic Response model in both shapes.
- controllers/openapi/oauth_device.py: replace flask_restx.reqparse (banned)
  with Pydantic Request/Query models — DeviceCodeRequest, DevicePollRequest,
  DeviceLookupQuery, DeviceMutateRequest. Two PEP-695 generic helpers
  (_validate_json / _validate_query) translate ValidationError to BadRequest.
- controllers/openapi/auth/strategies.py: Protocol param-name match
  (subject_type), Optional narrowing on app/tenant/account_id/subject_email.
- controllers/openapi/auth/steps.py: subject_type-is-None guard before
  mounter dispatch.
- core/app/apps/workflow/generate_task_pipeline.py + models/workflow.py:
  add WorkflowAppLogCreatedFrom.OPENAPI + matching match-case branch.
  Fixes match-exhaustiveness and possibly-unbound created_from.
- libs/device_flow_security.py: pyright ignore on flask after_request
  hook (registered by the framework, pyright sees as unused).
- services/oauth_device_flow.py: rename Exceptions to *Error suffix
  (StateNotFoundError / InvalidTransitionError / UserCodeExhaustedError);
  same for libs/oauth_bearer.py (InvalidBearerError / TokenExpiredError).
  Update all callers across openapi controllers.
- controllers/openapi/{oauth_device,oauth_device_sso}.py +
  services/oauth_device_flow.py: switch logger.error in except blocks
  to logger.exception (TRY400) — keeps the traceback for ops.
- configs/feature/__init__.py: OPENAPI_KNOWN_CLIENT_IDS computed_field
  needs an @property alongside for pyright to see it as a value, not a
  method. Matches the existing line-451 pattern.

Plus ruff format + import-sort across the openapi tree (pure formatting).
2026-04-28 21:44:54 -07:00
b083c910b3 fix(web/device): bounce to authorize_account after post-login return
When an unauthenticated user submits a user_code, the chooser view
holds the typed code and redirects to /signin. After login, the page
re-mounts on /device with no URL params (already scrubbed on the
first render) and account loaded — but the existing useEffect path
only advanced when ssoVerified or urlUserCode was present.

Add an early branch: if view is chooser and account just loaded,
advance to authorize_account using the userCode stashed in view
state. Also widen the effect deps to view (not view.kind) so the
nested userCode reads stay current.
2026-04-28 20:42:06 -07:00
9b2a37ceff feat(openapi): cookie auth for device-flow approval routes
Adds the openapi blueprint branch in load_user_from_request so that
account-branch device-flow approval routes (approve / deny /
approval-context) can authenticate via the console session cookie
under @login_required.

Splits extract_access_token into two helpers:
- extract_console_cookie_token (cookie-only) — used by openapi
  approval routes that must not fall through to the Authorization
  header, where dfoa_/dfoe_ bearers live (those aren't JWTs and
  PassportService.verify would crash on them).
- extract_access_token retains both code paths for legacy callers.
2026-04-28 20:41:38 -07:00
cf5ebe9430 feat(openapi): app-run endpoints with auth pipeline
Ports service_api/app/{completion,workflow}.py to bearer-authed
/openapi/v1/apps/<app_id>/{info,chat-messages,completion-messages,workflows/run}.

Architecture:
- New controllers/openapi/auth/ package: Pipeline + Step protocol over
  one mutable Context. Endpoints attach via @APP_PIPELINE.guard(scope=...)
  — single attachment point; forgetting auth is structurally impossible.
- Pipeline order: BearerCheck -> ScopeCheck -> AppResolver -> AppAuthzCheck
  -> CallerMount.
- Strategies vary along independent axes: AclStrategy (EE webapp-auth inner
  API) vs MembershipStrategy (CE TenantAccountJoin); AccountMounter vs
  EndUserMounter dispatched by SubjectType.
- App is in URL path (not header). Each non-GET has typed Pydantic Request;
  each non-SSE response has typed Pydantic Response. Bearer-as-identity:
  body 'user' field stripped, ignored if present.

Adds InvokeFrom.OPENAPI enum variant. Emits app.run.openapi audit log
on successful invocation via standard logger extra={"audit": True, ...}
convention.
2026-04-27 17:25:17 -07:00
85c3f9cbf8 fix(device-flow): scope approval-grant cookie to /openapi/v1/oauth/device
Phase F retired the legacy /v1/oauth/device/* mounts but the cookie path
still pointed at the dead prefix. Browsers therefore dropped the cookie
on the canonical /openapi/v1/oauth/device/* requests, so SSO-branch
approval-context and approve-external returned 401 no_session even
right after sso-complete had set the cookie.
2026-04-27 01:15:44 -07:00
d98fe7916a fix(nginx): route /openapi to api backend
Phase F removed legacy /v1/oauth/device/* and /console/api/oauth/device/*
mounts in favour of /openapi/v1. Without this rule /openapi falls through
to location / and proxies to web:3000, returning 404 for every API call.
2026-04-27 01:06:19 -07:00
0b3b0b5ce8 feat(api): retire legacy /v1/* and /console/api device-flow mounts (Phase F)
Web and CLI consumers now hit /openapi/v1/* directly, so the dual-mount
shims can go:
  - controllers/oauth_device_sso.py (legacy /v1/oauth/device/sso-* + /v1/device/sso-complete)
  - controllers/service_api/oauth.py (legacy /v1/oauth/device/*, /v1/me, /v1/oauth/authorizations/self)
  - controllers/console/auth/oauth_device.py (placeholder for legacy /console/api/oauth/device/{approve,deny})
  - the deferred _register_legacy_console_mount() inside openapi/oauth_device.py

Imports in controllers/console/__init__.py, controllers/service_api/__init__.py,
and extensions/ext_blueprints.py pruned. Tests rewritten to openapi-only.
2026-04-27 00:45:10 -07:00
eb5ef3dba5 feat(web): switch /device page to /openapi/v1 paths (Phase G.21)
Approve/deny + lookup + SSO endpoints now live under /openapi/v1/oauth/device/*.
Approve/deny use direct fetch with console session cookie + CSRF instead of
the /console/api-prefixed post() helper.
2026-04-27 00:32:31 -07:00
a07b32274a feat(api): add /openapi/v1/workspaces reads (Phase E.17)
GET /openapi/v1/workspaces lists tenants the bearer's account is a
member of. GET /openapi/v1/workspaces/<id> returns one workspace
detail, member-gated (404 on non-member, never 403, so workspace IDs
don't leak across tenants).

Bearer-authed via @validate_bearer(accept=ACCEPT_USER_ANY). External
SSO bearers (no account_id) get an empty list / 404 — same posture as
GET /openapi/v1/account.

Cookie-authed /console/api/workspaces stays in console for the
dashboard SPA — different consumer, different auth model. No legacy
/v1/ remount this phase.

Plan: docs/superpowers/plans/2026-04-26-openapi-migration.md (in difyctl repo).
2026-04-27 00:10:16 -07:00
2a38df2b7f refactor(api): consolidate openapi/oauth_device into per-domain modules
Match the existing api-group convention: one module per resource family
with multiple Resource classes per file (cf service_api/dataset/dataset.py
with 7 routes, console/auth/oauth_device.py with 2 before this branch).

The Phase B-D fragmentation (one file per route under
controllers/openapi/oauth_device/) was inconsistent with the codebase.
Collapse into:

  controllers/openapi/oauth_device.py        (5 routes: code, token,
                                              lookup, approve, deny —
                                              account branch)
  controllers/openapi/oauth_device_sso.py    (4 routes: sso-initiate,
                                              sso-complete,
                                              approval-context,
                                              approve-external —
                                              EE-only SSO branch)

The split mirrors the original pre-migration layout: account branch in
console/auth/oauth_device.py, SSO branch in controllers/oauth_device_sso.py
(root). Both legacy mount files updated to import from the new modules.

No behavior change; 59 tests still green. Test files updated to import
from the consolidated module paths.

Plan: docs/superpowers/plans/2026-04-26-openapi-migration.md (in difyctl repo).
2026-04-27 00:07:15 -07:00
71e9e8dda6 feat(api): lift SSO branch device-flow handlers to /openapi/v1 (Phase D.15-16)
The four EE-only SSO handlers (sso_initiate, sso_complete,
approval_context, approve_external) move from controllers/oauth_device_sso.py
to controllers/openapi/oauth_device/. Each is registered on openapi_bp
via @bp.route at the canonical path:

  /openapi/v1/oauth/device/sso-initiate
  /openapi/v1/oauth/device/sso-complete
  /openapi/v1/oauth/device/approval-context
  /openapi/v1/oauth/device/approve-external

sso-complete moves under /oauth/device/ from its previous orphan path
/v1/device/sso-complete; the IdP-side ACS callback URL hardcoded in
sso_initiate now points to the canonical path. Operators must
re-register the ACS callback with each IdP before Phase F deletes the
legacy alias.

oauth_device_sso.py shrinks to a thin re-mount file: same legacy bp
with attach_anti_framing applied, four bp.add_url_rule() calls binding
the legacy paths to the imported view functions. Same handler runs
for both mounts — no duplicated logic.

attach_anti_framing(openapi_bp) added in controllers/openapi/__init__.py
so X-Frame-Options + frame-ancestors CSP cover the canonical paths too.

Plan: docs/superpowers/plans/2026-04-26-openapi-migration.md (in difyctl repo).
2026-04-27 00:00:24 -07:00
772f450b29 feat(api): lift device-flow approve/deny to /openapi/v1 (Phase D.13-14)
DeviceApproveApi + DeviceDenyApi (cookie-authed) move to
controllers/openapi/oauth_device/{approve,deny}.py. Decorator stack
preserved verbatim: setup_required → login_required →
account_initialization_required → bearer_feature_required →
rate_limit. Audit event names ('oauth.device_flow_approved' /
'oauth.device_flow_denied') unchanged so the OTel exporter
registration keeps routing them.

The legacy /console/api/oauth/device/{approve,deny} mounts are
re-registered on console_ns from the bottom of the new files via a
local-import _register_legacy_console_mount() helper. The local
import breaks an import cycle that would otherwise form: openapi
imports console.wraps for setup_required, console.__init__.py imports
console.auth.oauth_device, which would re-import the openapi class
mid-load. Deferring console_ns past the class definition resolves it.

console/auth/oauth_device.py becomes a 9-line placeholder (the
existing console.__init__.py `from .auth import (..., oauth_device,
...)` keeps loading until Phase F prunes the import).

Plan: docs/superpowers/plans/2026-04-26-openapi-migration.md (in difyctl repo).
2026-04-26 23:57:28 -07:00
390f1f74db feat(api): add /openapi/v1/account/sessions endpoints (Phase C.11-12)
GET /openapi/v1/account/sessions lists the bearer's active OAuth
tokens (filtered to revoked_at IS NULL, expires_at > NOW(), token_hash
IS NOT NULL — no phantom devices). DELETE
/openapi/v1/account/sessions/<id> revokes a specific session with a
subject-match guard that returns 404 (not 403) on cross-subject so
token IDs don't leak across subjects.

Subject scoping abstracted into _subject_match(ctx): account subjects
filter by account_id; external_sso subjects filter by (email, issuer)
AND account_id IS NULL — preventing an SSO bearer from touching a
same-email account row from a federated tenant.

_revoke_token_by_id helper extracted so /sessions/self and
/sessions/<id> share the same UPDATE-where-revoked_at-IS-NULL
idempotent revoke + Redis cache invalidation.

No /v1/ equivalents — these are new endpoints (spec §Sessions list shape).

Plan: docs/superpowers/plans/2026-04-26-openapi-migration.md (in difyctl repo).
2026-04-26 23:51:55 -07:00
b7bd9c19ed feat(api): lift identity + self-revoke to /openapi/v1/account (Phase C.9-10)
GET /v1/me moves to GET /openapi/v1/account. DELETE
/v1/oauth/authorizations/self moves to DELETE
/openapi/v1/account/sessions/self. Both classes (AccountApi,
AccountSessionsSelfApi) are now in controllers/openapi/account.py and
re-registered on service_api_ns at the legacy paths.

service_api/oauth.py is now nothing but legacy re-mount declarations
(20 lines). All in-place handler logic has moved to openapi/. Phase F
will delete the file and the legacy mounts together.

Helper functions (_load_memberships, _pick_default_workspace,
_workspace_payload, _account_payload) move with the AccountApi class.

Plan: docs/superpowers/plans/2026-04-26-openapi-migration.md (in difyctl repo).
2026-04-26 23:50:15 -07:00
e93821af46 feat(api): lift GET /oauth/device/lookup to /openapi/v1 (Phase B.8)
Same pattern as B.6 / B.7: OAuthDeviceLookupApi moves to
controllers/openapi/oauth_device/lookup.py and is re-registered on
service_api_ns to keep /v1/oauth/device/lookup serving until Phase F.

service_api/oauth.py is now down to /me + /oauth/authorizations/self
plus three legacy mounts; remaining handlers move in Phase C.
Now-unused imports (LIMIT_LOOKUP_PUBLIC, rate_limit, reqparse, request,
DEVICE_FLOW_TTL_SECONDS, DeviceFlowRedis, DeviceFlowStatus) removed.

Plan: docs/superpowers/plans/2026-04-26-openapi-migration.md (in difyctl repo).
2026-04-26 23:44:05 -07:00
9408759954 feat(api): lift POST /oauth/device/token to /openapi/v1 (Phase B.7)
Same pattern as B.6: OAuthDeviceTokenApi moves to
controllers/openapi/oauth_device/token.py and is re-registered on
service_api_ns to keep /v1/oauth/device/token serving until Phase F.

_audit_cross_ip_if_needed helper moves with the handler. Now-unused
imports removed from service_api/oauth.py.

Plan: docs/superpowers/plans/2026-04-26-openapi-migration.md (in difyctl repo).
2026-04-26 23:42:27 -07:00
fe9412af5d feat(api): lift POST /oauth/device/code to /openapi/v1 (Phase B.6)
Canonical class OAuthDeviceCodeApi now lives in
controllers/openapi/oauth_device/code.py and is registered on
openapi_ns at /openapi/v1/oauth/device/code. service_api/oauth.py
re-registers the same class object on service_api_ns at
/v1/oauth/device/code so existing callers keep working until Phase F.

KNOWN_CLIENT_IDS literal moves to dify_config.OPENAPI_KNOWN_CLIENT_IDS
(CSV-parsed, default "difyctl") so new CLIs / SDKs can be admitted
without code changes (CLAUDE.md rule 8 — no magic strings).

_verification_uri helper moves with the handler. Single source of
truth — no duplicated logic between the two mounts.

Plan: docs/superpowers/plans/2026-04-26-openapi-migration.md (in difyctl repo).
2026-04-26 23:40:58 -07:00
218ef6a447 feat(api): CORS posture for /openapi/v1 (Phase A.5)
OPENAPI_CORS_ALLOW_ORIGINS env var defaults to empty (same-origin only).
Operators expand for third-party integrations via comma-separated list.
Allowed headers: Authorization, Content-Type, X-CSRF-Token. Methods:
GET POST PATCH DELETE OPTIONS. Max-Age 600s. supports_credentials=True
so cookie-authed approve/deny work once Phase D moves them in.

Disallowed origins receive a normal 200 OPTIONS response without the
Access-Control-Allow-Origin header — flask-cors's standard behavior;
browser blocks the cross-origin request from the disallowed origin.

Plan: docs/superpowers/plans/2026-04-26-openapi-migration.md (in difyctl repo).
2026-04-26 23:30:27 -07:00
501c0b8746 feat(api): add require_scope decorator (Phase A.4)
Route-level scope gate; pairs with validate_bearer. Bearer holding the
catch-all SCOPE_FULL ('full', carried by dfoa_) passes any check;
narrower bearers (dfoe_, future PATs) need the exact scope listed in
the route decorator.

No v1.0 route applies it yet — apps/datasets controllers will be the
first consumers when those plans land. Programming-error guard: if
@require_scope runs without @validate_bearer above it, raises
RuntimeError instead of silently allowing.

Plan: docs/superpowers/plans/2026-04-26-openapi-migration.md (in difyctl repo).
2026-04-26 23:27:48 -07:00
4214583ae5 refactor(api): hoist bearer_feature_required to libs/oauth_bearer (Phase A.3)
The decorator was defined inline in console/auth/oauth_device.py. Phase
D will move approve/deny to controllers/openapi/oauth_device/ and the
new SSO branch under the same group needs the same gate. Hoist it to
libs/oauth_bearer.py now so the move stays a pure file rename later.

Behavior unchanged: 503 'bearer_auth_disabled' when ENABLE_OAUTH_BEARER
is off. console/auth/oauth_device.py imports it from libs and drops
the now-unused dify_config / wraps / ServiceUnavailable imports.

Plan: docs/superpowers/plans/2026-04-26-openapi-migration.md (in difyctl repo).
2026-04-26 23:26:13 -07:00
73771cb58c refactor(api): drop vestigial Accepts.APP from validate_bearer (Phase A.2)
Accepts.APP and the matching app- short-circuit existed to let routes
declare "I accept either OAuth or app- tokens", but no production
caller ever did, and the short-circuit returned without doing the
tenant/app/end-user setup that app- tokens actually need (that lives
in service_api/wraps.py:validate_app_token).

After this change, validate_bearer is OAuth-only. app- bearers fall
through the prefix dispatch and surface as InvalidBearer -> 401, which
is what we already promised on /openapi/* (no app- accepted) and what
the docstring claimed all along.

Pre-check rg "Accepts\\.APP" returned zero hits outside the function
being edited; no callers to update.

Plan: docs/superpowers/plans/2026-04-26-openapi-migration.md (in difyctl repo).
2026-04-26 23:24:56 -07:00
f5f224f49d feat(api): scaffold /openapi/v1 blueprint (Phase A.1)
New Flask blueprint at /openapi/v1/ that will host user-scoped
programmatic endpoints (device flow, identity, sessions, workspaces).
Ships only a smoke route GET /openapi/v1/_health for now; subsequent
phases lift handlers in from service_api, console, and the orphan
oauth_device_sso.py.

CORS is intentionally omitted here and configured in step A.5 once
the allowlist envvar lands.

Plan: docs/superpowers/plans/2026-04-26-openapi-migration.md (in difyctl repo).
2026-04-26 23:08:15 -07:00
813da349ec fix(api,web): post-review hardening for OAuth device flow
- api: account-flow stores subject_issuer="dify:account" sentinel
  instead of NULL so the rotate-in-place unique index collides as
  intended (Postgres treats NULLs as distinct in unique indices).
  mint_oauth_token validates prefix-specific issuer rules.
- api: enterprise_only inverts to an allowlist (ACTIVE / EXPIRING) so
  any future LicenseStatus value defaults to denial.
- api: consume_on_poll moved to a single Lua script (GET + status-check
  + DEL) so concurrent pollers can't both observe APPROVED.
- web: typed DeviceFlowError + central error-copy mapping; page
  surfaces rate_limited / lookup_failed view states; URL params
  scrubbed after consumption (RFC 8628 §5.4).
2026-04-26 23:05:07 -07:00
fe8510ad1a feat(api,web): OAuth 2.0 device flow + bearer auth (RFC 8628)
Adds a CLI-friendly authorization flow so difyctl (and future
non-browser clients) can obtain user-scoped tokens without copy-
pasting cookies or raw API keys. Two grant paths share one device
flow surface:

  1. Account branch — user signs in via the existing /signin
     methods, /device page calls console-authed approve, mints a
     dfoa_ token tied to (account_id, tenant).
  2. External-SSO branch (EE) — /v1/oauth/device/sso-initiate signs
     an SSOState envelope, hands off to Enterprise's external ACS,
     receives a signed external-subject assertion, mints a dfoe_
     token tied to (subject_email, subject_issuer).

API surface (all under /v1, EE-only endpoints 404 on CE):

  POST   /v1/oauth/device/code              — RFC 8628 start
  POST   /v1/oauth/device/token             — RFC 8628 poll
  GET    /v1/oauth/device/lookup            — pre-validate user_code
  GET    /v1/oauth/device/sso-initiate      — SSO branch entry
  GET    /v1/device/sso-complete            — SSO callback sink
  GET    /v1/oauth/device/approval-context  — /device cookie probe
  POST   /v1/oauth/device/approve-external  — SSO approve
  GET    /v1/me                             — bearer subject lookup
  DELETE /v1/oauth/authorizations/self      — self-revoke
  POST   /console/api/oauth/device/approve  — account approve
  POST   /console/api/oauth/device/deny     — account deny

Core primitives:
- libs/oauth_bearer.py: prefix-keyed TokenKindRegistry +
  BearerAuthenticator + validate_bearer decorator. Two-tier scope
  (full vs apps:run) stamped from the registry, never from the DB.
- libs/jws.py: HS256 compact JWS keyed on the shared Dify
  SECRET_KEY — same key-set verifies the SSOState envelope, the
  external-subject assertion (minted by Enterprise), and the
  approval-grant cookie.
- libs/device_flow_security.py: enterprise_only gate, approval-
  grant cookie mint/verify/consume (Path=/v1/oauth/device,
  HttpOnly, SameSite=Lax, Secure follows is_secure()), anti-
  framing headers.
- libs/rate_limit.py: typed RateLimit / RateLimitScope dispatch
  with composite-key buckets; both decorator + imperative form.
- services/oauth_device_flow.py: Redis state machine (PENDING ->
  APPROVED|DENIED with atomic consume-on-poll), token mint via
  partial unique index uq_oauth_active_per_device (rotates in
  place), env-driven TTL policy.

Storage: oauth_access_tokens table with partial unique index on
(subject_email, subject_issuer, client_id, device_label) WHERE
revoked_at IS NULL. account_id NULL distinguishes external-SSO
rows. Hard-expire is CAS UPDATE (revoked_at + nullify token_hash)
so audit events keep their token_id. Retention pruner DELETEs
revoked + zombie-expired rows past OAUTH_ACCESS_TOKEN_RETENTION_DAYS.

Frontend: /device page with code-entry, chooser (account vs SSO),
authorize-account, authorize-sso views. SSO branch detaches from
the URL user_code and reads everything from the cookie via
/approval-context. Anti-framing headers on all responses.

Wiring: ENABLE_OAUTH_BEARER feature flag; ext_oauth_bearer binds
the authenticator at startup; clean_oauth_access_tokens_task
scheduled in ext_celery.

Spec: docs/specs/v1.0/server/{device-flow,tokens,middleware,security}.md
2026-04-26 20:06:43 -07:00
383 changed files with 12913 additions and 8773 deletions

View File

@ -200,7 +200,7 @@ When assigned to test a directory/path, test **ALL content** within that path:
-**Import real project components** directly (including base components and siblings)
-**Only mock**: API services (`@/service/*`), `next/navigation`, complex context providers
-**DO NOT mock** base components (`@/app/components/base/*`) or dify-ui primitives (`@langgenius/dify-ui/*`)
-**DO NOT mock** base components (`@/app/components/base/*`)
-**DO NOT mock** sibling/child components in the same directory
> See [Test Structure Template](#test-structure-template) for correct import/mock patterns.
@ -325,12 +325,12 @@ For more detailed information, refer to:
### Reference Examples in Codebase
- `web/utils/classnames.spec.ts` - Utility function tests
- `web/app/components/base/radio/__tests__/index.spec.tsx` - Component tests
- `web/app/components/base/button/index.spec.tsx` - Component tests
- `web/__mocks__/provider-context.ts` - Mock factory example
### Project Configuration
- `web/vite.config.ts` - Vite/Vitest configuration
- `web/vitest.config.ts` - Vitest configuration
- `web/vitest.setup.ts` - Test environment setup
- `web/scripts/analyze-component.js` - Component analysis tool
- Modules are not mocked automatically. Global mocks live in `web/vitest.setup.ts` (for example `react-i18next`, `next/image`); mock other modules like `ky` or `mime` locally in test files.

View File

@ -36,7 +36,7 @@ Use this checklist when generating or reviewing tests for Dify frontend componen
### Integration vs Mocking
- [ ] **DO NOT mock base components or dify-ui primitives** (base `Loading`, `Input`, `Badge`; dify-ui `Button`, `Tooltip`, `Dialog`, etc.)
- [ ] **DO NOT mock base components** (`Loading`, `Button`, `Tooltip`, etc.)
- [ ] Import real project components instead of mocking
- [ ] Only mock: API calls, complex context providers, third-party libs with side effects
- [ ] Prefer integration testing when using single spec file
@ -73,7 +73,7 @@ Use this checklist when generating or reviewing tests for Dify frontend componen
### Mocks
- [ ] **DO NOT mock base components or dify-ui primitives** (`@/app/components/base/*` or `@langgenius/dify-ui/*`)
- [ ] **DO NOT mock base components** (`@/app/components/base/*`)
- [ ] `vi.clearAllMocks()` in `beforeEach` (not `afterEach`)
- [ ] Shared mock state reset in `beforeEach`
- [ ] i18n uses global mock (auto-loaded in `web/vitest.setup.ts`); only override locally for custom translations

View File

@ -2,27 +2,29 @@
## ⚠️ Important: What NOT to Mock
### DO NOT Mock Base Components or dify-ui Primitives
### DO NOT Mock Base Components
**Never mock components from `@/app/components/base/` or from `@langgenius/dify-ui/*`** such as:
**Never mock components from `@/app/components/base/`** such as:
- Legacy base (`@/app/components/base/*`): `Loading`, `Spinner`, `Input`, `Badge`, `Tag`
- dify-ui primitives (`@langgenius/dify-ui/*`): `Button`, `Tooltip`, `Dialog`, `Popover`, `DropdownMenu`, `ContextMenu`, `Select`, `AlertDialog`, `Toast`
- `Loading`, `Spinner`
- `Button`, `Input`, `Select`
- `Tooltip`, `Modal`, `Dropdown`
- `Icon`, `Badge`, `Tag`
**Why?**
- These components have their own dedicated tests
- Base components will have their own dedicated tests
- Mocking them creates false positives (tests pass but real integration fails)
- Using real components tests actual integration behavior
```typescript
// ❌ WRONG: Don't mock base components or dify-ui primitives
// ❌ WRONG: Don't mock base components
vi.mock('@/app/components/base/loading', () => () => <div>Loading</div>)
vi.mock('@langgenius/dify-ui/button', () => ({ Button: ({ children }: any) => <button>{children}</button> }))
vi.mock('@/app/components/base/button', () => ({ children }: any) => <button>{children}</button>)
// ✅ CORRECT: Import and use the real components
// ✅ CORRECT: Import and use real base components
import Loading from '@/app/components/base/loading'
import { Button } from '@langgenius/dify-ui/button'
import Button from '@/app/components/base/button'
// They will render normally in tests
```
@ -317,7 +319,7 @@ const renderWithQueryClient = (ui: React.ReactElement) => {
### ✅ DO
1. **Use real base components and dify-ui primitives** - Import from `@/app/components/base/` or `@langgenius/dify-ui/*` directly
1. **Use real base components** - Import from `@/app/components/base/` directly
1. **Use real project components** - Prefer importing over mocking
1. **Use real Zustand stores** - Set test state via `store.setState()`
1. **Reset mocks in `beforeEach`**, not `afterEach`
@ -328,7 +330,7 @@ const renderWithQueryClient = (ui: React.ReactElement) => {
### ❌ DON'T
1. **Don't mock base components or dify-ui primitives** (`Loading`, `Input`, `Button`, `Tooltip`, `Dialog`, etc.)
1. **Don't mock base components** (`Loading`, `Button`, `Tooltip`, etc.)
1. **Don't mock Zustand store modules** - Use real stores with `setState()`
1. Don't mock components you can import directly
1. Don't create overly simplified mocks that miss conditional logic
@ -340,7 +342,7 @@ const renderWithQueryClient = (ui: React.ReactElement) => {
```
Need to use a component in test?
├─ Is it from @/app/components/base/* or @langgenius/dify-ui/*?
├─ Is it from @/app/components/base/*?
│ └─ YES → Import real component, DO NOT mock
├─ Is it a project component?

19
.github/workflows/anti-slop.yml vendored Normal file
View File

@ -0,0 +1,19 @@
name: Anti-Slop PR Check
on:
pull_request_target:
types: [opened, edited, synchronize]
permissions:
pull-requests: write
contents: read
jobs:
anti-slop:
runs-on: ubuntu-latest
steps:
- uses: peakoss/anti-slop@85daca1880e9e1af197fc06ea03349daf08f4202 # v0.2.1
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
close-pr: false
failure-add-pr-labels: "needs-revision"

View File

@ -35,7 +35,7 @@ jobs:
persist-credentials: false
- name: Setup UV and Python
uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0
with:
enable-cache: true
python-version: ${{ matrix.python-version }}
@ -84,7 +84,7 @@ jobs:
persist-credentials: false
- name: Setup UV and Python
uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0
with:
enable-cache: true
python-version: ${{ matrix.python-version }}
@ -105,7 +105,7 @@ jobs:
run: sh .github/workflows/expose_service_ports.sh
- name: Set up Sandbox
uses: hoverkraft-tech/compose-action@d2bee4f07e8ca410d6b196d00f90c12e7d48c33a # v2.6.0
uses: hoverkraft-tech/compose-action@4894d2492015c1774ee5a13a95b1072093087ec3 # v2.5.0
with:
compose-file: |
docker/docker-compose.middleware.yaml
@ -156,7 +156,7 @@ jobs:
persist-credentials: false
- name: Setup UV and Python
uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0
with:
enable-cache: true
python-version: "3.12"

View File

@ -25,7 +25,7 @@ jobs:
- name: Check Docker Compose inputs
if: github.event_name != 'merge_group'
id: docker-compose-changes
uses: tj-actions/changed-files@9426d40962ed5378910ee2e21d5f8c6fcbf2dd96 # v47.0.6
uses: tj-actions/changed-files@22103cc46bda19c2b464ffe86db46df6922fd323 # v47.0.5
with:
files: |
docker/generate_docker_compose
@ -35,7 +35,7 @@ jobs:
- name: Check web inputs
if: github.event_name != 'merge_group'
id: web-changes
uses: tj-actions/changed-files@9426d40962ed5378910ee2e21d5f8c6fcbf2dd96 # v47.0.6
uses: tj-actions/changed-files@22103cc46bda19c2b464ffe86db46df6922fd323 # v47.0.5
with:
files: |
web/**
@ -48,7 +48,7 @@ jobs:
- name: Check api inputs
if: github.event_name != 'merge_group'
id: api-changes
uses: tj-actions/changed-files@9426d40962ed5378910ee2e21d5f8c6fcbf2dd96 # v47.0.6
uses: tj-actions/changed-files@22103cc46bda19c2b464ffe86db46df6922fd323 # v47.0.5
with:
files: |
api/**
@ -58,7 +58,7 @@ jobs:
python-version: "3.11"
- if: github.event_name != 'merge_group'
uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0
- name: Generate Docker Compose
if: github.event_name != 'merge_group' && steps.docker-compose-changes.outputs.any_changed == 'true'
@ -123,4 +123,4 @@ jobs:
vp exec eslint --concurrency=2 --prune-suppressions --quiet || true
- if: github.event_name != 'merge_group'
uses: autofix-ci/action@c5b2d67aa2274e7b5a18224e8171550871fc7e4a # v1.3.4
uses: autofix-ci/action@7a166d7532b277f34e16238930461bf77f9d7ed8 # v1.3.3

View File

@ -19,7 +19,7 @@ jobs:
persist-credentials: false
- name: Setup UV and Python
uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0
with:
enable-cache: true
python-version: "3.12"
@ -40,7 +40,7 @@ jobs:
cp middleware.env.example middleware.env
- name: Set up Middlewares
uses: hoverkraft-tech/compose-action@d2bee4f07e8ca410d6b196d00f90c12e7d48c33a # v2.6.0
uses: hoverkraft-tech/compose-action@4894d2492015c1774ee5a13a95b1072093087ec3 # v2.5.0
with:
compose-file: |
docker/docker-compose.middleware.yaml
@ -69,7 +69,7 @@ jobs:
persist-credentials: false
- name: Setup UV and Python
uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0
with:
enable-cache: true
python-version: "3.12"
@ -94,7 +94,7 @@ jobs:
sed -i 's/DB_USERNAME=postgres/DB_USERNAME=mysql/' middleware.env
- name: Set up Middlewares
uses: hoverkraft-tech/compose-action@d2bee4f07e8ca410d6b196d00f90c12e7d48c33a # v2.6.0
uses: hoverkraft-tech/compose-action@4894d2492015c1774ee5a13a95b1072093087ec3 # v2.5.0
with:
compose-file: |
docker/docker-compose.middleware.yaml

View File

@ -22,7 +22,7 @@ jobs:
fetch-depth: 0
- name: Setup Python & UV
uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0
with:
enable-cache: true

View File

@ -24,7 +24,7 @@ jobs:
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Setup Python & UV
uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0
with:
enable-cache: true

View File

@ -22,7 +22,7 @@ jobs:
fetch-depth: 0
- name: Setup Python & UV
uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0
with:
enable-cache: true

View File

@ -25,7 +25,7 @@ jobs:
- name: Check changed files
id: changed-files
uses: tj-actions/changed-files@9426d40962ed5378910ee2e21d5f8c6fcbf2dd96 # v47.0.6
uses: tj-actions/changed-files@22103cc46bda19c2b464ffe86db46df6922fd323 # v47.0.5
with:
files: |
api/**
@ -33,7 +33,7 @@ jobs:
- name: Setup UV and Python
if: steps.changed-files.outputs.any_changed == 'true'
uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0
with:
enable-cache: false
python-version: "3.12"
@ -73,7 +73,7 @@ jobs:
- name: Check changed files
id: changed-files
uses: tj-actions/changed-files@9426d40962ed5378910ee2e21d5f8c6fcbf2dd96 # v47.0.6
uses: tj-actions/changed-files@22103cc46bda19c2b464ffe86db46df6922fd323 # v47.0.5
with:
files: |
web/**
@ -95,7 +95,7 @@ jobs:
- name: Restore ESLint cache
if: steps.changed-files.outputs.any_changed == 'true'
id: eslint-cache-restore
uses: actions/cache/restore@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
uses: actions/cache/restore@668228422ae6a00e4ad889ee87cd7109ec5666a7 # v5.0.4
with:
path: .eslintcache
key: ${{ runner.os }}-eslint-${{ hashFiles('pnpm-lock.yaml', 'eslint.config.mjs', 'web/eslint.config.mjs', 'web/eslint.constants.mjs', 'web/plugins/eslint/**') }}-${{ github.sha }}
@ -124,7 +124,7 @@ jobs:
- name: Save ESLint cache
if: steps.changed-files.outputs.any_changed == 'true' && success() && steps.eslint-cache-restore.outputs.cache-hit != 'true'
uses: actions/cache/save@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
uses: actions/cache/save@668228422ae6a00e4ad889ee87cd7109ec5666a7 # v5.0.4
with:
path: .eslintcache
key: ${{ steps.eslint-cache-restore.outputs.cache-primary-key }}
@ -142,7 +142,7 @@ jobs:
- name: Check changed files
id: changed-files
uses: tj-actions/changed-files@9426d40962ed5378910ee2e21d5f8c6fcbf2dd96 # v47.0.6
uses: tj-actions/changed-files@22103cc46bda19c2b464ffe86db46df6922fd323 # v47.0.5
with:
files: |
**.sh

View File

@ -30,7 +30,7 @@ jobs:
persist-credentials: false
- name: Use Node.js
uses: actions/setup-node@48b55a011bda9f5d6aeb4c2d9c7362e8dae4041e # v6.4.0
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f # v6.3.0
with:
node-version: 22
cache: ''

View File

@ -158,7 +158,7 @@ jobs:
- name: Run Claude Code for Translation Sync
if: steps.context.outputs.CHANGED_FILES != ''
uses: anthropics/claude-code-action@38ec876110f9fbf8b950c79f534430740c3ac009 # v1.0.101
uses: anthropics/claude-code-action@b47fd721da662d48c5680e154ad16a73ed74d2e0 # v1.0.93
with:
anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }}
github_token: ${{ secrets.GITHUB_TOKEN }}

View File

@ -36,7 +36,7 @@ jobs:
remove_tool_cache: true
- name: Setup UV and Python
uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0
with:
enable-cache: true
python-version: ${{ matrix.python-version }}
@ -65,7 +65,7 @@ jobs:
# tiflash
- name: Set up Full Vector Store Matrix
uses: hoverkraft-tech/compose-action@d2bee4f07e8ca410d6b196d00f90c12e7d48c33a # v2.6.0
uses: hoverkraft-tech/compose-action@4894d2492015c1774ee5a13a95b1072093087ec3 # v2.5.0
with:
compose-file: |
docker/docker-compose.yaml

View File

@ -33,7 +33,7 @@ jobs:
remove_tool_cache: true
- name: Setup UV and Python
uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0
with:
enable-cache: true
python-version: ${{ matrix.python-version }}
@ -62,7 +62,7 @@ jobs:
# tiflash
- name: Set up Vector Stores for Smoke Coverage
uses: hoverkraft-tech/compose-action@d2bee4f07e8ca410d6b196d00f90c12e7d48c33a # v2.6.0
uses: hoverkraft-tech/compose-action@4894d2492015c1774ee5a13a95b1072093087ec3 # v2.5.0
with:
compose-file: |
docker/docker-compose.yaml

View File

@ -28,7 +28,7 @@ jobs:
uses: ./.github/actions/setup-web
- name: Setup UV and Python
uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0
with:
enable-cache: true
python-version: "3.12"

4
.gitignore vendored
View File

@ -237,10 +237,6 @@ scripts/stress-test/reports/
.playwright-mcp/
.serena/
# vitest browser mode attachments (failure screenshots, traces, etc.)
.vitest-attachments/
**/__screenshots__/
# settings
*.local.json
*.local.md

View File

@ -159,6 +159,7 @@ def initialize_extensions(app: DifyApp):
ext_logstore,
ext_mail,
ext_migrate,
ext_oauth_bearer,
ext_orjson,
ext_otel,
ext_proxy_fix,
@ -203,6 +204,7 @@ def initialize_extensions(app: DifyApp):
ext_enterprise_telemetry,
ext_request_logging,
ext_session_factory,
ext_oauth_bearer,
]
for ext in extensions:
short_name = ext.__name__.split(".")[-1]

View File

@ -499,6 +499,35 @@ class HttpConfig(BaseSettings):
def WEB_API_CORS_ALLOW_ORIGINS(self) -> list[str]:
return self.inner_WEB_API_CORS_ALLOW_ORIGINS.split(",")
inner_OPENAPI_CORS_ALLOW_ORIGINS: str = Field(
description=(
"Comma-separated allowlist for /openapi/v1/* CORS. "
"Default empty = same-origin only. Browser-cookie routes within "
"the group reject cross-origin OPTIONS regardless of this list."
),
validation_alias=AliasChoices("OPENAPI_CORS_ALLOW_ORIGINS"),
default="",
)
@computed_field
def OPENAPI_CORS_ALLOW_ORIGINS(self) -> list[str]:
return [o for o in self.inner_OPENAPI_CORS_ALLOW_ORIGINS.split(",") if o]
inner_OPENAPI_KNOWN_CLIENT_IDS: str = Field(
description=(
"Comma-separated client_id values accepted at "
"POST /openapi/v1/oauth/device/code. New CLIs / SDKs added here "
"without code changes. Unknown client_id returns 400 unsupported_client."
),
validation_alias=AliasChoices("OPENAPI_KNOWN_CLIENT_IDS"),
default="difyctl",
)
@computed_field # type: ignore[misc]
@property
def OPENAPI_KNOWN_CLIENT_IDS(self) -> frozenset[str]:
return frozenset(c for c in self.inner_OPENAPI_KNOWN_CLIENT_IDS.split(",") if c)
HTTP_REQUEST_MAX_CONNECT_TIMEOUT: int = Field(
ge=1, description="Maximum connection timeout in seconds for HTTP requests", default=10
)
@ -874,6 +903,17 @@ class AuthConfig(BaseSettings):
default=86400,
)
ENABLE_OAUTH_BEARER: bool = Field(
description="Enable OAuth bearer authentication (device-flow + Service API /v1/* bearer middleware).",
default=True,
)
OPENAPI_RATE_LIMIT_PER_TOKEN: PositiveInt = Field(
description="Per-token rate limit on /openapi/v1/* (requests per minute). "
"Bucket keyed on sha256(token), shared across api replicas via Redis.",
default=60,
)
class ModerationConfig(BaseSettings):
"""
@ -1148,6 +1188,14 @@ class CeleryScheduleTasksConfig(BaseSettings):
description="Enable scheduled workflow run cleanup task",
default=False,
)
ENABLE_CLEAN_OAUTH_ACCESS_TOKENS_TASK: bool = Field(
description="Enable scheduled cleanup of revoked/expired OAuth access-token rows past retention.",
default=True,
)
OAUTH_ACCESS_TOKEN_RETENTION_DAYS: PositiveInt = Field(
description="Days to retain revoked OAuth access-token rows before deletion.",
default=30,
)
ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK: bool = Field(
description="Enable mail clean document notify task",
default=False,

View File

@ -595,25 +595,13 @@ class ChangeEmailSendEmailApi(Resource):
account = None
user_email = None
email_for_sending = args.email.lower()
# Default to the initial phase; any legacy/unexpected client input is
# coerced back to `old_email` so we never trust the caller to declare
# later phases without a verified predecessor token.
send_phase = AccountService.CHANGE_EMAIL_PHASE_OLD
if args.phase is not None and args.phase == AccountService.CHANGE_EMAIL_PHASE_NEW:
send_phase = AccountService.CHANGE_EMAIL_PHASE_NEW
if args.phase is not None and args.phase == "new_email":
if args.token is None:
raise InvalidTokenError()
reset_data = AccountService.get_change_email_data(args.token)
if reset_data is None:
raise InvalidTokenError()
# The token used to request a new-email code must come from the
# old-email verification step. This prevents the bypass described
# in GHSA-4q3w-q5mc-45rq where the phase-1 token was reused here.
token_phase = reset_data.get(AccountService.CHANGE_EMAIL_TOKEN_PHASE_KEY)
if token_phase != AccountService.CHANGE_EMAIL_PHASE_OLD_VERIFIED:
raise InvalidTokenError()
user_email = reset_data.get("email", "")
if user_email.lower() != current_user.email.lower():
@ -632,7 +620,7 @@ class ChangeEmailSendEmailApi(Resource):
email=email_for_sending,
old_email=user_email,
language=language,
phase=send_phase,
phase=args.phase,
)
return {"result": "success", "data": token}
@ -667,31 +655,12 @@ class ChangeEmailCheckApi(Resource):
AccountService.add_change_email_error_rate_limit(user_email)
raise EmailCodeError()
# Only advance tokens that were minted by the matching send-code step;
# refuse tokens that have already progressed or lack a phase marker so
# the chain `old_email -> old_email_verified -> new_email -> new_email_verified`
# is strictly enforced.
phase_transitions = {
AccountService.CHANGE_EMAIL_PHASE_OLD: AccountService.CHANGE_EMAIL_PHASE_OLD_VERIFIED,
AccountService.CHANGE_EMAIL_PHASE_NEW: AccountService.CHANGE_EMAIL_PHASE_NEW_VERIFIED,
}
token_phase = token_data.get(AccountService.CHANGE_EMAIL_TOKEN_PHASE_KEY)
if not isinstance(token_phase, str):
raise InvalidTokenError()
refreshed_phase = phase_transitions.get(token_phase)
if refreshed_phase is None:
raise InvalidTokenError()
# Verified, revoke the first token
AccountService.revoke_change_email_token(args.token)
# Refresh token data by generating a new token that carries the
# upgraded phase so later steps can check it.
# Refresh token data by generating a new token
_, new_token = AccountService.generate_change_email_token(
user_email,
code=args.code,
old_email=token_data.get("old_email"),
additional_data={AccountService.CHANGE_EMAIL_TOKEN_PHASE_KEY: refreshed_phase},
user_email, code=args.code, old_email=token_data.get("old_email"), additional_data={}
)
AccountService.reset_change_email_error_rate_limit(user_email)
@ -721,29 +690,13 @@ class ChangeEmailResetApi(Resource):
if not reset_data:
raise InvalidTokenError()
# Only tokens that completed both verification phases may be used to
# change the email. This closes GHSA-4q3w-q5mc-45rq where a token from
# the initial send-code step could be replayed directly here.
token_phase = reset_data.get(AccountService.CHANGE_EMAIL_TOKEN_PHASE_KEY)
if token_phase != AccountService.CHANGE_EMAIL_PHASE_NEW_VERIFIED:
raise InvalidTokenError()
# Bind the new email to the token that was mailed and verified, so a
# verified token cannot be reused with a different `new_email` value.
token_email = reset_data.get("email")
normalized_token_email = token_email.lower() if isinstance(token_email, str) else token_email
if normalized_token_email != normalized_new_email:
raise InvalidTokenError()
AccountService.revoke_change_email_token(args.token)
old_email = reset_data.get("old_email", "")
current_user, _ = current_account_with_tenant()
if current_user.email.lower() != old_email.lower():
raise AccountNotFound()
# Revoke only after all checks pass so failed attempts don't burn a
# legitimately verified token.
AccountService.revoke_change_email_token(args.token)
updated_account = AccountService.update_account_email(current_user, email=normalized_new_email)
AccountService.send_change_email_completed_notify_email(

View File

@ -0,0 +1,41 @@
from flask import Blueprint
from flask_restx import Namespace
from libs.device_flow_security import attach_anti_framing
from libs.external_api import ExternalApi
bp = Blueprint("openapi", __name__, url_prefix="/openapi/v1")
attach_anti_framing(bp)
api = ExternalApi(
bp,
version="1.0",
title="OpenAPI",
description="User-scoped programmatic API (bearer auth)",
)
openapi_ns = Namespace("openapi", description="User-scoped operations", path="/")
from . import (
account,
app_run,
apps,
apps_permitted,
index,
oauth_device,
oauth_device_sso,
workspaces,
)
__all__ = [
"account",
"app_run",
"apps",
"apps_permitted",
"index",
"oauth_device",
"oauth_device_sso",
"workspaces",
]
api.add_namespace(openapi_ns)

View File

@ -0,0 +1,33 @@
"""Audit emission for openapi app-run endpoints.
Pattern: logger.info with extra={"audit": True, "event": "app.run.openapi", ...}
matches the existing oauth_device convention. The EE OTel exporter consults
its own allowlist to decide whether to ship the line.
"""
from __future__ import annotations
import logging
logger = logging.getLogger(__name__)
EVENT_APP_RUN_OPENAPI = "app.run.openapi"
def emit_app_run(*, app_id: str, tenant_id: str, caller_kind: str, mode: str) -> None:
logger.info(
"audit: %s app_id=%s tenant_id=%s caller_kind=%s mode=%s",
EVENT_APP_RUN_OPENAPI,
app_id,
tenant_id,
caller_kind,
mode,
extra={
"audit": True,
"event": EVENT_APP_RUN_OPENAPI,
"app_id": app_id,
"tenant_id": tenant_id,
"caller_kind": caller_kind,
"mode": mode,
},
)

View File

@ -0,0 +1,143 @@
"""Server-side JSON Schema derivation from Dify `user_input_form`."""
from __future__ import annotations
from typing import Any, cast
from controllers.service_api.app.error import AppUnavailableError
from models import App
from models.model import AppMode
JSON_SCHEMA_DRAFT = "https://json-schema.org/draft/2020-12/schema"
EMPTY_INPUT_SCHEMA: dict[str, Any] = {
"$schema": JSON_SCHEMA_DRAFT,
"type": "object",
"properties": {},
"required": [],
}
_CHAT_FAMILY = frozenset({AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT})
def _file_object_shape() -> dict[str, Any]:
"""Single-file value shape. Forward-compat placeholder; refine when file-API contract pins."""
return {
"type": "object",
"properties": {
"type": {"type": "string"},
"transfer_method": {"type": "string"},
"url": {"type": "string"},
"upload_file_id": {"type": "string"},
},
"additionalProperties": True,
}
def _row_to_schema(row_type: str, row: dict[str, Any]) -> dict[str, Any] | None:
label = row.get("label") or row.get("variable", "")
base: dict[str, Any] = {"title": label} if label else {}
if row_type in ("text-input", "paragraph"):
out = {"type": "string"} | base
max_length = row.get("max_length")
if isinstance(max_length, int) and max_length > 0:
out["maxLength"] = max_length
return out
if row_type == "select":
return {"type": "string"} | base | {"enum": list(row.get("options") or [])}
if row_type == "number":
return {"type": "number"} | base
if row_type == "file":
return _file_object_shape() | base
if row_type == "file-list":
return {
"type": "array",
"items": _file_object_shape(),
} | base
return None
def _form_to_jsonschema(form: list[dict[str, Any]]) -> tuple[dict[str, Any], list[str]]:
"""Translate a user_input_form row list into (properties, required-list).
Each row is a single-key dict: `{"text-input": {variable, label, required, ...}}`.
Unknown variable types are skipped (forward-compat).
"""
properties: dict[str, Any] = {}
required: list[str] = []
for row in form:
if not isinstance(row, dict) or len(row) != 1:
continue
((row_type, row_body),) = row.items()
if not isinstance(row_body, dict):
continue
variable = row_body.get("variable")
if not variable:
continue
schema = _row_to_schema(row_type, row_body)
if schema is None:
continue
properties[variable] = schema
if row_body.get("required"):
required.append(variable)
return properties, required
def resolve_app_config(app: App) -> tuple[dict[str, Any], list[dict[str, Any]]]:
"""Resolve `(features_dict, user_input_form)` for parameters / schema derivation.
Raises `AppUnavailableError` on misconfigured apps.
"""
if app.mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:
workflow = app.workflow
if workflow is None:
raise AppUnavailableError()
return (
workflow.features_dict,
cast(list[dict[str, Any]], workflow.user_input_form(to_old_structure=True)),
)
app_model_config = app.app_model_config
if app_model_config is None:
raise AppUnavailableError()
features_dict = cast(dict[str, Any], app_model_config.to_dict())
return features_dict, cast(list[dict[str, Any]], features_dict.get("user_input_form", []))
def build_input_schema(app: App) -> dict[str, Any]:
"""Derive Draft 2020-12 JSON Schema from `user_input_form` + app mode.
chat / agent-chat / advanced-chat: top-level `query` (required, minLength=1) + `inputs` object.
completion / workflow: `inputs` object only.
Raises `AppUnavailableError` on misconfigured apps.
"""
_, user_input_form = resolve_app_config(app)
inputs_props, inputs_required = _form_to_jsonschema(user_input_form)
properties: dict[str, Any] = {}
required: list[str] = []
if app.mode in _CHAT_FAMILY:
properties["query"] = {"type": "string", "minLength": 1}
required.append("query")
properties["inputs"] = {
"type": "object",
"properties": inputs_props,
"required": inputs_required,
"additionalProperties": False,
}
required.append("inputs")
return {
"$schema": JSON_SCHEMA_DRAFT,
"type": "object",
"properties": properties,
"required": required,
}

View File

@ -0,0 +1,112 @@
"""Shared response substructures for openapi endpoints."""
from __future__ import annotations
from typing import Any, Literal
from pydantic import BaseModel, Field
# Server-side cap on `limit` query param for any /openapi/v1/* list endpoint.
# Sibling endpoints (`/apps`, `/account/sessions`, future routes) all clamp to
# this; do not introduce per-endpoint caps without raising the constant.
MAX_PAGE_LIMIT = 200
class UsageInfo(BaseModel):
prompt_tokens: int = 0
completion_tokens: int = 0
total_tokens: int = 0
class MessageMetadata(BaseModel):
usage: UsageInfo | None = None
retriever_resources: list[dict[str, Any]] = []
class PaginationEnvelope[T](BaseModel):
"""Canonical pagination envelope for `/openapi/v1/*` list endpoints."""
page: int
limit: int
total: int
has_more: bool
data: list[T]
@classmethod
def build(cls, *, page: int, limit: int, total: int, items: list[T]) -> PaginationEnvelope[T]:
return cls(page=page, limit=limit, total=total, has_more=page * limit < total, data=items)
class AppListRow(BaseModel):
id: str
name: str
description: str | None = None
mode: str
tags: list[dict[str, str]] = []
updated_at: str | None = None
created_by_name: str | None = None
workspace_id: str | None = None
workspace_name: str | None = None
class AppInfoResponse(BaseModel):
id: str
name: str
description: str | None = None
mode: str
author: str | None = None
tags: list[dict[str, str]] = []
class AppDescribeInfo(AppInfoResponse):
updated_at: str | None = None
service_api_enabled: bool
class AppDescribeResponse(BaseModel):
info: AppDescribeInfo | None = None
parameters: dict[str, Any] | None = None
input_schema: dict[str, Any] | None = None
class ChatMessageResponse(BaseModel):
event: str
task_id: str
id: str
message_id: str
conversation_id: str
mode: str
answer: str
metadata: MessageMetadata = Field(default_factory=MessageMetadata)
created_at: int
class CompletionMessageResponse(BaseModel):
event: str
task_id: str
id: str
message_id: str
mode: str
answer: str
metadata: MessageMetadata = Field(default_factory=MessageMetadata)
created_at: int
class WorkflowRunData(BaseModel):
id: str
workflow_id: str
status: str
outputs: dict[str, Any] = Field(default_factory=dict)
error: str | None = None
elapsed_time: float | None = None
total_tokens: int | None = None
total_steps: int | None = None
created_at: int | None = None
finished_at: int | None = None
class WorkflowRunResponse(BaseModel):
workflow_run_id: str
task_id: str
mode: Literal["workflow"] = "workflow"
data: WorkflowRunData

View File

@ -0,0 +1,236 @@
"""User-scoped account endpoints. /account is the bearer-authed
identity read; /account/sessions and /account/sessions/<id> manage
the user's active OAuth tokens.
"""
from __future__ import annotations
from datetime import UTC, datetime
from flask import g, request
from flask_restx import Resource
from sqlalchemy import and_, select, update
from werkzeug.exceptions import BadRequest, NotFound
from controllers.openapi import openapi_ns
from controllers.openapi._models import MAX_PAGE_LIMIT, PaginationEnvelope
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from libs.oauth_bearer import (
ACCEPT_USER_ANY,
TOKEN_CACHE_KEY_FMT,
AuthContext,
SubjectType,
validate_bearer,
)
from libs.rate_limit import (
LIMIT_ME_PER_ACCOUNT,
LIMIT_ME_PER_EMAIL,
enforce,
)
from models import Account, OAuthAccessToken, Tenant, TenantAccountJoin
@openapi_ns.route("/account")
class AccountApi(Resource):
@validate_bearer(accept=ACCEPT_USER_ANY)
def get(self):
ctx = g.auth_ctx
if ctx.subject_type == SubjectType.EXTERNAL_SSO:
enforce(LIMIT_ME_PER_EMAIL, key=f"subject:{ctx.subject_email}")
else:
enforce(LIMIT_ME_PER_ACCOUNT, key=f"account:{ctx.account_id}")
if ctx.subject_type == SubjectType.EXTERNAL_SSO:
return {
"subject_type": ctx.subject_type,
"subject_email": ctx.subject_email,
"subject_issuer": ctx.subject_issuer,
"account": None,
"workspaces": [],
"default_workspace_id": None,
}
account = (
db.session.query(Account).filter(Account.id == ctx.account_id).one_or_none() if ctx.account_id else None
)
memberships = _load_memberships(ctx.account_id) if ctx.account_id else []
default_ws_id = _pick_default_workspace(memberships)
return {
"subject_type": ctx.subject_type,
"subject_email": ctx.subject_email or (account.email if account else None),
"account": _account_payload(account) if account else None,
"workspaces": [_workspace_payload(m) for m in memberships],
"default_workspace_id": default_ws_id,
}
@openapi_ns.route("/account/sessions/self")
class AccountSessionsSelfApi(Resource):
@validate_bearer(accept=ACCEPT_USER_ANY)
def delete(self):
ctx = g.auth_ctx
_require_oauth_subject(ctx)
_revoke_token_by_id(str(ctx.token_id))
return {"status": "revoked"}, 200
@openapi_ns.route("/account/sessions")
class AccountSessionsApi(Resource):
@validate_bearer(accept=ACCEPT_USER_ANY)
def get(self):
ctx = g.auth_ctx
now = datetime.now(UTC)
page = int(request.args.get("page", "1"))
limit = min(int(request.args.get("limit", "100")), MAX_PAGE_LIMIT)
all_rows = db.session.execute(
select(
OAuthAccessToken.id,
OAuthAccessToken.prefix,
OAuthAccessToken.client_id,
OAuthAccessToken.device_label,
OAuthAccessToken.created_at,
OAuthAccessToken.last_used_at,
OAuthAccessToken.expires_at,
)
.where(
and_(
*_subject_match(ctx),
OAuthAccessToken.revoked_at.is_(None),
OAuthAccessToken.token_hash.is_not(None),
OAuthAccessToken.expires_at > now,
)
)
.order_by(OAuthAccessToken.created_at.desc())
).all()
total = len(all_rows)
sliced = all_rows[(page - 1) * limit : page * limit]
items = [
{
"id": str(r.id),
"prefix": r.prefix,
"client_id": r.client_id,
"device_label": r.device_label,
"created_at": _iso(r.created_at),
"last_used_at": _iso(r.last_used_at),
"expires_at": _iso(r.expires_at),
}
for r in sliced
]
return (
PaginationEnvelope.build(page=page, limit=limit, total=total, items=items).model_dump(mode="json"),
200,
)
@openapi_ns.route("/account/sessions/<string:session_id>")
class AccountSessionByIdApi(Resource):
@validate_bearer(accept=ACCEPT_USER_ANY)
def delete(self, session_id: str):
ctx = g.auth_ctx
_require_oauth_subject(ctx)
# Subject-match guard. 404 (not 403) on cross-subject so the
# endpoint doesn't leak token IDs that belong to other subjects.
owns = db.session.execute(
select(OAuthAccessToken.id).where(
and_(
OAuthAccessToken.id == session_id,
*_subject_match(ctx),
)
)
).first()
if owns is None:
raise NotFound("session not found")
_revoke_token_by_id(session_id)
return {"status": "revoked"}, 200
def _subject_match(ctx: AuthContext) -> tuple:
"""Where-clauses that scope a query to the bearer's subject. Works
for both account (account_id) and external_sso (email + issuer).
"""
if ctx.subject_type == SubjectType.ACCOUNT:
return (OAuthAccessToken.account_id == str(ctx.account_id),)
return (
OAuthAccessToken.subject_email == ctx.subject_email,
OAuthAccessToken.subject_issuer == ctx.subject_issuer,
OAuthAccessToken.account_id.is_(None),
)
def _require_oauth_subject(ctx: AuthContext) -> None:
if not ctx.source.startswith("oauth"):
raise BadRequest(
"this endpoint revokes OAuth bearer tokens; use /openapi/v1/personal-access-tokens/self for PATs"
)
def _revoke_token_by_id(token_id: str) -> None:
# Snapshot pre-revoke hash for cache invalidation; UPDATE WHERE
# makes double-revoke idempotent.
row = (
db.session.query(OAuthAccessToken.token_hash)
.filter(
OAuthAccessToken.id == token_id,
OAuthAccessToken.revoked_at.is_(None),
)
.one_or_none()
)
pre_revoke_hash = row[0] if row else None
stmt = (
update(OAuthAccessToken)
.where(
OAuthAccessToken.id == token_id,
OAuthAccessToken.revoked_at.is_(None),
)
.values(revoked_at=datetime.now(UTC), token_hash=None)
)
db.session.execute(stmt)
db.session.commit()
if pre_revoke_hash:
redis_client.delete(TOKEN_CACHE_KEY_FMT.format(hash=pre_revoke_hash))
def _iso(dt: datetime | None) -> str | None:
if dt is None:
return None
if dt.tzinfo is None:
dt = dt.replace(tzinfo=UTC)
return dt.isoformat().replace("+00:00", "Z")
def _load_memberships(account_id):
return (
db.session.query(TenantAccountJoin, Tenant)
.join(Tenant, Tenant.id == TenantAccountJoin.tenant_id)
.filter(TenantAccountJoin.account_id == account_id)
.all()
)
def _pick_default_workspace(memberships) -> str | None:
if not memberships:
return None
for join, tenant in memberships:
if getattr(join, "current", False):
return str(tenant.id)
return str(memberships[0][1].id)
def _workspace_payload(row) -> dict:
join, tenant = row
return {"id": str(tenant.id), "name": tenant.name, "role": getattr(join, "role", "")}
def _account_payload(account) -> dict:
return {"id": str(account.id), "email": account.email, "name": account.name}

View File

@ -0,0 +1,198 @@
"""POST /openapi/v1/apps/<app_id>/run — mode-agnostic runner."""
from __future__ import annotations
import logging
from collections.abc import Callable, Iterator, Mapping
from contextlib import contextmanager
from typing import Any, Literal
from uuid import UUID
from flask import request
from flask_restx import Resource
from pydantic import BaseModel, ValidationError, field_validator
from werkzeug.exceptions import BadRequest, HTTPException, InternalServerError, NotFound, UnprocessableEntity
import services
from controllers.openapi import openapi_ns
from controllers.openapi._audit import emit_app_run
from controllers.openapi._models import (
ChatMessageResponse,
CompletionMessageResponse,
WorkflowRunResponse,
)
from controllers.openapi.auth.composition import OAUTH_BEARER_PIPELINE
from controllers.service_api.app.error import (
AppUnavailableError,
CompletionRequestError,
ConversationCompletedError,
ProviderModelCurrentlyNotSupportError,
ProviderNotInitializeError,
ProviderQuotaExceededError,
)
from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpError
from core.app.entities.app_invoke_entities import InvokeFrom
from core.errors.error import (
ModelCurrentlyNotSupportError,
ProviderTokenNotInitError,
QuotaExceededError,
)
from graphon.model_runtime.errors.invoke import InvokeError
from libs import helper
from libs.helper import UUIDStrOrEmpty
from libs.oauth_bearer import Scope
from models.model import App, AppMode
from services.app_generate_service import AppGenerateService
from services.errors.app import (
IsDraftWorkflowError,
WorkflowIdFormatError,
WorkflowNotFoundError,
)
from services.errors.llm import InvokeRateLimitError
logger = logging.getLogger(__name__)
class AppRunRequest(BaseModel):
inputs: dict[str, Any]
query: str | None = None
files: list[dict[str, Any]] | None = None
response_mode: Literal["blocking", "streaming"] | None = None
conversation_id: UUIDStrOrEmpty | None = None
auto_generate_name: bool = True
workflow_id: str | None = None
@field_validator("conversation_id", mode="before")
@classmethod
def _normalize_conv(cls, value: str | UUID | None) -> str | None:
if isinstance(value, str):
value = value.strip()
if not value:
return None
try:
return helper.uuid_value(value)
except ValueError as exc:
raise ValueError("conversation_id must be a valid UUID") from exc
@contextmanager
def _translate_service_errors() -> Iterator[None]:
try:
yield
except WorkflowNotFoundError as ex:
raise NotFound(str(ex))
except (IsDraftWorkflowError, WorkflowIdFormatError) as ex:
raise BadRequest(str(ex))
except services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")
except services.errors.conversation.ConversationCompletedError:
raise ConversationCompletedError()
except services.errors.app_model_config.AppModelConfigBrokenError:
logger.exception("App model config broken.")
raise AppUnavailableError()
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
except QuotaExceededError:
raise ProviderQuotaExceededError()
except ModelCurrentlyNotSupportError:
raise ProviderModelCurrentlyNotSupportError()
except InvokeRateLimitError as ex:
raise InvokeRateLimitHttpError(ex.description)
except InvokeError as e:
raise CompletionRequestError(e.description)
def _unpack_blocking(response: Any) -> Mapping[str, Any]:
if isinstance(response, tuple):
response = response[0]
if not isinstance(response, Mapping):
raise InternalServerError("blocking generate returned non-mapping response")
return response
def _generate(app: App, caller: Any, args: dict[str, Any], streaming: bool):
return AppGenerateService.generate(
app_model=app,
user=caller,
args=args,
invoke_from=InvokeFrom.OPENAPI,
streaming=streaming,
)
def _run_chat(app: App, caller: Any, payload: AppRunRequest, streaming: bool):
if not payload.query or not payload.query.strip():
raise UnprocessableEntity("query_required_for_chat")
args = payload.model_dump(exclude_none=True)
with _translate_service_errors():
response = _generate(app, caller, args, streaming)
if streaming:
return response, None
return None, ChatMessageResponse.model_validate(_unpack_blocking(response)).model_dump(mode="json")
def _run_completion(app: App, caller: Any, payload: AppRunRequest, streaming: bool):
args = payload.model_dump(exclude_none=True)
args["auto_generate_name"] = False
args.setdefault("query", "")
with _translate_service_errors():
response = _generate(app, caller, args, streaming)
if streaming:
return response, None
return None, CompletionMessageResponse.model_validate(_unpack_blocking(response)).model_dump(mode="json")
def _run_workflow(app: App, caller: Any, payload: AppRunRequest, streaming: bool):
if payload.query is not None:
raise UnprocessableEntity("query_not_supported_for_workflow")
args = payload.model_dump(exclude={"query", "conversation_id", "auto_generate_name"}, exclude_none=True)
with _translate_service_errors():
response = _generate(app, caller, args, streaming)
if streaming:
return response, None
return None, WorkflowRunResponse.model_validate(_unpack_blocking(response)).model_dump(mode="json")
_DISPATCH: dict[AppMode, Callable[[App, Any, AppRunRequest, bool], tuple[Any, dict[str, Any] | None]]] = {
AppMode.CHAT: _run_chat,
AppMode.AGENT_CHAT: _run_chat,
AppMode.ADVANCED_CHAT: _run_chat,
AppMode.COMPLETION: _run_completion,
AppMode.WORKFLOW: _run_workflow,
}
@openapi_ns.route("/apps/<string:app_id>/run")
class AppRunApi(Resource):
@OAUTH_BEARER_PIPELINE.guard(scope=Scope.APPS_RUN)
def post(self, app_id: str, app_model: App, caller, caller_kind: str):
body = request.get_json(silent=True) or {}
body.pop("user", None)
try:
payload = AppRunRequest.model_validate(body)
except ValidationError as exc:
raise UnprocessableEntity(exc.json())
handler = _DISPATCH.get(app_model.mode)
if handler is None:
raise UnprocessableEntity("mode_not_runnable")
streaming = payload.response_mode == "streaming"
try:
stream_obj, blocking_body = handler(app_model, caller, payload, streaming)
except HTTPException:
raise
except Exception:
logger.exception("internal server error.")
raise InternalServerError()
emit_app_run(
app_id=app_model.id,
tenant_id=app_model.tenant_id,
caller_kind=caller_kind,
mode=str(app_model.mode),
)
if streaming:
return helper.compact_generate_response(stream_obj)
return blocking_body, 200

View File

@ -0,0 +1,315 @@
"""GET /openapi/v1/apps and per-app reads.
Decorator order: `method_decorators` is innermost-first. `validate_bearer`
is last → outermost → sets `g.auth_ctx` before `require_scope` reads it.
"""
from __future__ import annotations
import uuid as _uuid
from typing import Any
import sqlalchemy as sa
from flask import g, request
from flask_restx import Resource
from pydantic import BaseModel, ConfigDict, Field, ValidationError, field_validator
from werkzeug.exceptions import Conflict, NotFound, UnprocessableEntity
from controllers.common.fields import Parameters
from controllers.openapi import openapi_ns
from controllers.openapi._input_schema import EMPTY_INPUT_SCHEMA, build_input_schema, resolve_app_config
from controllers.openapi._models import (
MAX_PAGE_LIMIT,
AppDescribeInfo,
AppDescribeResponse,
AppListRow,
PaginationEnvelope,
)
from controllers.service_api.app.error import AppUnavailableError
from core.app.app_config.common.parameters_mapping import get_parameters_from_feature_dict
from extensions.ext_database import db
from libs.oauth_bearer import (
ACCEPT_USER_ANY,
AuthContext,
Scope,
SubjectType,
require_scope,
require_workspace_member,
validate_bearer,
)
from models import App, Tenant
from models.model import AppMode
from services.app_service import AppService
from services.tag_service import TagService
_APPS_READ_DECORATORS = [
require_scope(Scope.APPS_READ),
validate_bearer(accept=ACCEPT_USER_ANY),
]
_ALLOWED_DESCRIBE_FIELDS: frozenset[str] = frozenset({"info", "parameters", "input_schema"})
class AppDescribeQuery(BaseModel):
"""`?fields=` allow-list for GET /apps/<id>/describe.
Empty / omitted → all blocks. Unknown member → ValidationError → 422.
"""
model_config = ConfigDict(extra="forbid")
fields: set[str] | None = None
workspace_id: str | None = None
@field_validator("workspace_id", mode="before")
@classmethod
def _validate_workspace_id(cls, v: object) -> str | None:
if v is None or v == "":
return None
if not isinstance(v, str):
raise ValueError("workspace_id must be a string")
try:
_uuid.UUID(v)
except ValueError:
raise ValueError("workspace_id must be a valid UUID")
return v
@field_validator("fields", mode="before")
@classmethod
def _parse_fields(cls, v: object) -> set[str] | None:
if v is None or v == "":
return None
if not isinstance(v, str):
raise ValueError("fields must be a comma-separated string")
members = {m.strip() for m in v.split(",") if m.strip()}
unknown = members - _ALLOWED_DESCRIBE_FIELDS
if unknown:
raise ValueError(f"unknown field(s): {sorted(unknown)}")
return members
_EMPTY_PARAMETERS: dict[str, Any] = {
"opening_statement": None,
"suggested_questions": [],
"user_input_form": [],
"file_upload": None,
"system_parameters": {},
}
class AppReadResource(Resource):
"""Base for per-app read endpoints; subclasses call `_load()` for SSO/membership/exists checks."""
method_decorators = _APPS_READ_DECORATORS
def _load(self, app_id: str, workspace_id: str | None = None) -> tuple[App, AuthContext]:
ctx = g.auth_ctx
if ctx.subject_type != SubjectType.ACCOUNT or ctx.account_id is None:
raise NotFound("app not found")
try:
parsed_uuid = _uuid.UUID(app_id)
is_uuid = True
except ValueError:
parsed_uuid = None
is_uuid = False
if is_uuid:
app = db.session.get(App, str(parsed_uuid)) # normalised dashed form
if not app or app.status != "normal":
raise NotFound("app not found")
else:
if not workspace_id:
raise UnprocessableEntity("workspace_id is required for name-based lookup")
matches = list(
db.session.execute(
sa.select(App).where(
App.name == app_id,
App.tenant_id == workspace_id,
App.status == "normal",
)
).scalars()
)
if len(matches) == 0:
raise NotFound("app not found")
if len(matches) > 1:
lines = [f"app name {app_id!r} is ambiguous — re-run with a UUID:\n\n"]
lines.append(f" {'ID':<36} {'MODE':<12} NAME\n")
for m in matches:
lines.append(f" {str(m.id):<36} {str(m.mode.value):<12} {m.name}\n")
raise Conflict("".join(lines))
app = matches[0]
require_workspace_member(ctx, str(app.tenant_id))
return app, ctx
def parameters_payload(app: App) -> dict:
"""Mirrors service_api/app/app.py::AppParameterApi response body."""
features_dict, user_input_form = resolve_app_config(app)
parameters = get_parameters_from_feature_dict(features_dict=features_dict, user_input_form=user_input_form)
return Parameters.model_validate(parameters).model_dump(mode="json")
@openapi_ns.route("/apps/<string:app_id>/describe")
class AppDescribeApi(AppReadResource):
def get(self, app_id: str):
try:
query = AppDescribeQuery.model_validate(request.args.to_dict(flat=True))
except ValidationError as exc:
raise UnprocessableEntity(exc.json())
app, _ = self._load(app_id, workspace_id=query.workspace_id)
requested = query.fields
want_info = requested is None or "info" in requested
want_params = requested is None or "parameters" in requested
want_schema = requested is None or "input_schema" in requested
info = (
AppDescribeInfo(
id=str(app.id),
name=app.name,
mode=app.mode,
description=app.description,
tags=[{"name": t.name} for t in app.tags],
author=app.author_name,
updated_at=app.updated_at.isoformat() if app.updated_at else None,
service_api_enabled=bool(app.enable_api),
)
if want_info
else None
)
parameters: dict[str, Any] | None = None
input_schema: dict[str, Any] | None = None
if want_params:
try:
parameters = parameters_payload(app)
except AppUnavailableError:
parameters = dict(_EMPTY_PARAMETERS)
if want_schema:
try:
input_schema = build_input_schema(app)
except AppUnavailableError:
input_schema = dict(EMPTY_INPUT_SCHEMA)
return (
AppDescribeResponse(
info=info,
parameters=parameters,
input_schema=input_schema,
).model_dump(mode="json", exclude_none=False),
200,
)
class AppListQuery(BaseModel):
"""`mode` is a closed enum — unknown values 422 instead of silently-empty data."""
workspace_id: str
page: int = Field(1, ge=1)
limit: int = Field(20, ge=1, le=MAX_PAGE_LIMIT)
mode: AppMode | None = None
name: str | None = Field(None, max_length=200)
tag: str | None = Field(None, max_length=100)
@openapi_ns.route("/apps")
class AppListApi(Resource):
method_decorators = _APPS_READ_DECORATORS
def get(self):
ctx = g.auth_ctx
if ctx.subject_type != SubjectType.ACCOUNT or ctx.account_id is None:
return PaginationEnvelope[AppListRow].build(page=1, limit=0, total=0, items=[]).model_dump(mode="json"), 200
try:
query = AppListQuery.model_validate(request.args.to_dict(flat=True))
except ValidationError as exc:
raise UnprocessableEntity(exc.json())
workspace_id = query.workspace_id
require_workspace_member(ctx, workspace_id)
empty = (
PaginationEnvelope[AppListRow]
.build(page=query.page, limit=query.limit, total=0, items=[])
.model_dump(mode="json"),
200,
)
if query.name:
try:
parsed_uuid = _uuid.UUID(query.name)
except ValueError:
parsed_uuid = None
else:
parsed_uuid = None
if parsed_uuid is not None:
app = db.session.get(App, str(parsed_uuid))
if not app or app.status != "normal" or str(app.tenant_id) != workspace_id:
return empty
tenant_name = db.session.execute(
sa.select(Tenant.name).where(Tenant.id == workspace_id)
).scalar_one_or_none()
item = AppListRow(
id=str(app.id),
name=app.name,
description=app.description,
mode=app.mode,
tags=[{"name": t.name} for t in app.tags],
updated_at=app.updated_at.isoformat() if app.updated_at else None,
created_by_name=getattr(app, "author_name", None),
workspace_id=str(workspace_id),
workspace_name=tenant_name,
)
env = PaginationEnvelope[AppListRow].build(page=1, limit=1, total=1, items=[item])
return env.model_dump(mode="json"), 200
tag_ids: list[str] | None = None
if query.tag:
tags = TagService.get_tag_by_tag_name("app", workspace_id, query.tag)
if not tags:
return empty
tag_ids = [tag.id for tag in tags]
args: dict[str, Any] = {
"page": query.page,
"limit": query.limit,
"mode": query.mode.value if query.mode else "",
"name": query.name,
"status": "normal",
}
if tag_ids:
args["tag_ids"] = tag_ids
pagination = AppService().get_paginate_apps(ctx.account_id, workspace_id, args)
if pagination is None:
return empty
tenant_name: str | None = None
if pagination.items:
tenant_name = db.session.execute(
sa.select(Tenant.name).where(Tenant.id == workspace_id)
).scalar_one_or_none()
items = [
AppListRow(
id=str(r.id),
name=r.name,
description=r.description,
mode=r.mode,
tags=[{"name": t.name} for t in r.tags],
updated_at=r.updated_at.isoformat() if r.updated_at else None,
created_by_name=getattr(r, "author_name", None),
workspace_id=str(workspace_id),
workspace_name=tenant_name,
)
for r in pagination.items
]
env = PaginationEnvelope[AppListRow].build(
page=query.page, limit=query.limit, total=int(pagination.total), items=items
)
return env.model_dump(mode="json"), 200

View File

@ -0,0 +1,101 @@
"""GET /openapi/v1/apps/permitted — external-subject app discovery (EE only)."""
from __future__ import annotations
import sqlalchemy as sa
from flask import request
from flask_restx import Resource
from pydantic import BaseModel, ConfigDict, Field, ValidationError
from werkzeug.exceptions import UnprocessableEntity
from controllers.openapi import openapi_ns
from controllers.openapi._models import (
MAX_PAGE_LIMIT,
AppListRow,
PaginationEnvelope,
)
from extensions.ext_database import db
from libs.device_flow_security import enterprise_only
from libs.oauth_bearer import (
ACCEPT_USER_EXT_SSO,
Scope,
require_scope,
validate_bearer,
)
from models import App, Tenant
from models.model import AppMode
from services.enterprise.app_permitted_service import list_permitted_apps
class AppPermittedListQuery(BaseModel):
"""Strict (`extra='forbid'`) — rejects `workspace_id`/`tag`/etc. that are valid on /apps but not here."""
model_config = ConfigDict(extra="forbid")
page: int = Field(1, ge=1)
limit: int = Field(20, ge=1, le=MAX_PAGE_LIMIT)
mode: AppMode | None = None
name: str | None = Field(None, max_length=200)
@openapi_ns.route("/apps/permitted")
class AppPermittedListApi(Resource):
method_decorators = [
require_scope(Scope.APPS_READ_PERMITTED),
validate_bearer(accept=ACCEPT_USER_EXT_SSO),
enterprise_only,
]
def get(self):
try:
query = AppPermittedListQuery.model_validate(request.args.to_dict(flat=True))
except ValidationError as exc:
raise UnprocessableEntity(exc.json())
page_result = list_permitted_apps(
page=query.page,
limit=query.limit,
mode=query.mode.value if query.mode else None,
name=query.name,
)
if not page_result.app_ids:
env = PaginationEnvelope[AppListRow].build(
page=query.page, limit=query.limit, total=page_result.total, items=[]
)
return env.model_dump(mode="json"), 200
apps_by_id = {
str(a.id): a
for a in db.session.execute(sa.select(App).where(App.id.in_(page_result.app_ids))).scalars().all()
}
tenant_ids = list({a.tenant_id for a in apps_by_id.values()})
tenants_by_id = {
str(t.id): t for t in db.session.execute(sa.select(Tenant).where(Tenant.id.in_(tenant_ids))).scalars().all()
}
items: list[AppListRow] = []
for app_id in page_result.app_ids:
app = apps_by_id.get(app_id)
if not app or app.status != "normal":
continue
tenant = tenants_by_id.get(str(app.tenant_id))
items.append(
AppListRow(
id=str(app.id),
name=app.name,
description=app.description,
mode=app.mode,
tags=[], # tenant-scoped; not surfaced cross-tenant
updated_at=app.updated_at.isoformat() if app.updated_at else None,
created_by_name=None, # cross-tenant author leak prevention
workspace_id=str(app.tenant_id),
workspace_name=tenant.name if tenant else None,
)
)
# total/has_more reflect the EE-side allow-list; len(items) may be < limit when local rows are dropped.
env = PaginationEnvelope[AppListRow].build(
page=query.page, limit=query.limit, total=page_result.total, items=items
)
return env.model_dump(mode="json"), 200

View File

@ -0,0 +1,3 @@
from controllers.openapi.auth.composition import OAUTH_BEARER_PIPELINE
__all__ = ["OAUTH_BEARER_PIPELINE"]

View File

@ -0,0 +1,43 @@
"""`OAUTH_BEARER_PIPELINE` — the auth scheme for openapi `/run` endpoints.
Endpoints attach via `@OAUTH_BEARER_PIPELINE.guard(scope=…)`. No alternative
paths. Read endpoints (`/apps`, `/info`, `/parameters`, `/describe`) skip
the pipeline and use `validate_bearer + require_scope + require_workspace_member`
inline — they don't need `AppAuthzCheck`/`CallerMount`.
"""
from __future__ import annotations
from controllers.openapi.auth.pipeline import Pipeline
from controllers.openapi.auth.steps import (
AppAuthzCheck,
AppResolver,
BearerCheck,
CallerMount,
ScopeCheck,
WorkspaceMembershipCheck,
)
from controllers.openapi.auth.strategies import (
AccountMounter,
AclStrategy,
AppAuthzStrategy,
EndUserMounter,
MembershipStrategy,
)
from services.feature_service import FeatureService
def _resolve_app_authz_strategy() -> AppAuthzStrategy:
if FeatureService.get_system_features().webapp_auth.enabled:
return AclStrategy()
return MembershipStrategy()
OAUTH_BEARER_PIPELINE = Pipeline(
BearerCheck(),
ScopeCheck(),
AppResolver(),
WorkspaceMembershipCheck(),
AppAuthzCheck(_resolve_app_authz_strategy),
CallerMount(AccountMounter(), EndUserMounter()),
)

View File

@ -0,0 +1,46 @@
"""Mutable per-request context for the openapi auth pipeline.
Every field starts None / empty and is filled in by a step. The pipeline
is the only thing that should construct or mutate Context — handlers
read populated values via the decorator's kwargs unpacking.
"""
from __future__ import annotations
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING, Literal, Protocol
from flask import Request
from libs.oauth_bearer import Scope, SubjectType
if TYPE_CHECKING:
from models import App, Tenant
@dataclass
class Context:
request: Request
required_scope: Scope
subject_type: SubjectType | None = None
subject_email: str | None = None
subject_issuer: str | None = None
account_id: uuid.UUID | None = None
scopes: frozenset[Scope] = field(default_factory=frozenset)
token_id: uuid.UUID | None = None
token_hash: str | None = None
cached_verified_tenants: dict[str, bool] | None = None
source: str | None = None
expires_at: datetime | None = None
app: App | None = None
tenant: Tenant | None = None
caller: object | None = None
caller_kind: Literal["account", "end_user"] | None = None
class Step(Protocol):
"""One responsibility. Mutate ctx or raise to short-circuit."""
def __call__(self, ctx: Context) -> None: ...

View File

@ -0,0 +1,41 @@
"""Pipeline IS the auth scheme.
`Pipeline.guard(scope=…)` is the only attachment point for endpoints —
that is the design lock-in: forgetting an auth layer is structurally
impossible because there is no "sometimes wrap, sometimes don't" choice.
"""
from __future__ import annotations
from functools import wraps
from flask import request
from controllers.openapi.auth.context import Context, Step
from libs.oauth_bearer import Scope
class Pipeline:
def __init__(self, *steps: Step) -> None:
self._steps = steps
def run(self, ctx: Context) -> None:
for step in self._steps:
step(ctx)
def guard(self, *, scope: Scope):
def decorator(view):
@wraps(view)
def decorated(*args, **kwargs):
ctx = Context(request=request, required_scope=scope)
self.run(ctx)
kwargs.update(
app_model=ctx.app,
caller=ctx.caller,
caller_kind=ctx.caller_kind,
)
return view(*args, **kwargs)
return decorated
return decorator

View File

@ -0,0 +1,131 @@
"""Pipeline steps. Each is one responsibility.
`BearerCheck` is the only step that touches the token registry; downstream
steps see only the populated `Context`.
"""
from __future__ import annotations
from collections.abc import Callable
from werkzeug.exceptions import BadRequest, Forbidden, NotFound, Unauthorized
from configs import dify_config
from controllers.openapi.auth.context import Context
from controllers.openapi.auth.strategies import AppAuthzStrategy, CallerMounter
from extensions.ext_database import db
from libs.oauth_bearer import (
InvalidBearerError,
Scope,
SubjectType,
_extract_bearer, # type: ignore[attr-defined]
check_workspace_membership,
get_authenticator,
)
from models import App, Tenant, TenantStatus
class BearerCheck:
"""Resolve bearer → populate identity fields. Rate-limit is enforced
inside `BearerAuthenticator.authenticate`, so no separate step here."""
def __call__(self, ctx: Context) -> None:
token = _extract_bearer(ctx.request)
if not token:
raise Unauthorized("bearer required")
try:
authn = get_authenticator().authenticate(token)
except InvalidBearerError as e:
raise Unauthorized(str(e))
ctx.subject_type = authn.subject_type
ctx.subject_email = authn.subject_email
ctx.subject_issuer = authn.subject_issuer
ctx.account_id = authn.account_id
ctx.scopes = frozenset(authn.scopes)
ctx.source = authn.source
ctx.token_id = authn.token_id
ctx.expires_at = authn.expires_at
ctx.token_hash = authn.token_hash
ctx.cached_verified_tenants = dict(authn.verified_tenants)
class ScopeCheck:
"""Verify ctx.scopes (already populated by BearerCheck) covers required."""
def __call__(self, ctx: Context) -> None:
if Scope.FULL in ctx.scopes or ctx.required_scope in ctx.scopes:
return
raise Forbidden("insufficient_scope")
class AppResolver:
"""Read app_id from request.view_args, populate ctx.app + ctx.tenant.
Every endpoint using the OAuth bearer pipeline must declare
``<string:app_id>`` in its route — that is the design lock-in (no body /
header coupling).
"""
def __call__(self, ctx: Context) -> None:
app_id = (ctx.request.view_args or {}).get("app_id")
if not app_id:
raise BadRequest("app_id is required in path")
app = db.session.get(App, app_id)
if not app or app.status != "normal":
raise NotFound("app not found")
if not app.enable_api:
raise Forbidden("service_api_disabled")
tenant = db.session.get(Tenant, app.tenant_id)
if tenant is None or tenant.status == TenantStatus.ARCHIVE:
raise Forbidden("workspace unavailable")
ctx.app, ctx.tenant = app, tenant
class WorkspaceMembershipCheck:
"""Layer 0 — workspace membership gate.
CE-only (skipped when ENTERPRISE_ENABLED). Account-subject bearers
(dfoa_) only — SSO subjects skip.
"""
def __call__(self, ctx: Context) -> None:
if dify_config.ENTERPRISE_ENABLED:
return
if ctx.subject_type != SubjectType.ACCOUNT:
return
if ctx.account_id is None or ctx.tenant is None:
raise Unauthorized("account_id or tenant unset — BearerCheck or AppResolver did not run")
if ctx.token_hash is None:
raise Unauthorized("token_hash unset — BearerCheck did not run")
check_workspace_membership(
account_id=ctx.account_id,
tenant_id=ctx.tenant.id,
token_hash=ctx.token_hash,
cached_verdicts=ctx.cached_verified_tenants or {},
)
class AppAuthzCheck:
def __init__(self, resolve_strategy: Callable[[], AppAuthzStrategy]) -> None:
self._resolve = resolve_strategy
def __call__(self, ctx: Context) -> None:
if not self._resolve().authorize(ctx):
raise Forbidden("subject_no_app_access")
class CallerMount:
def __init__(self, *mounters: CallerMounter) -> None:
self._mounters = mounters
def __call__(self, ctx: Context) -> None:
if ctx.subject_type is None:
raise Unauthorized("subject_type unset — BearerCheck did not run")
for m in self._mounters:
if m.applies_to(ctx.subject_type):
m.mount(ctx)
return
raise Unauthorized("no caller mounter for subject type")

View File

@ -0,0 +1,115 @@
"""Strategy classes for the openapi auth pipeline.
App authorization (Acl/Membership) and caller mounting (Account/EndUser)
vary along independent axes; each strategy is one class so the pipeline
composition stays a flat list.
"""
from __future__ import annotations
import uuid
from typing import Protocol
from flask import current_app
from flask_login import user_logged_in
from sqlalchemy import select
from controllers.openapi.auth.context import Context
from core.app.entities.app_invoke_entities import InvokeFrom
from extensions.ext_database import db
from libs.oauth_bearer import SubjectType
from models import Account, TenantAccountJoin
from services.end_user_service import EndUserService
from services.enterprise.enterprise_service import EnterpriseService
class AppAuthzStrategy(Protocol):
def authorize(self, ctx: Context) -> bool: ...
class AclStrategy:
"""Per-app ACL via the workspace-auth inner API.
Used when webapp-auth is enabled (EE deployment). The inner-API
allowlist is the source of truth.
"""
def authorize(self, ctx: Context) -> bool:
if ctx.subject_email is None or ctx.app is None:
return False
return EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(
user_id=ctx.subject_email,
app_id=ctx.app.id,
)
class MembershipStrategy:
"""Tenant-membership fallback.
Used when webapp-auth is disabled (CE deployment). Account-bearing
subjects pass if they have a TenantAccountJoin row; EXTERNAL_SSO is
denied (it requires the webapp-auth surface).
"""
def authorize(self, ctx: Context) -> bool:
if ctx.subject_type == SubjectType.EXTERNAL_SSO:
return False
if ctx.tenant is None:
return False
return _has_tenant_membership(ctx.account_id, ctx.tenant.id)
def _has_tenant_membership(account_id: uuid.UUID | str | None, tenant_id: str) -> bool:
if not account_id:
return False
row = db.session.execute(
select(TenantAccountJoin.id).where(
TenantAccountJoin.tenant_id == tenant_id,
TenantAccountJoin.account_id == account_id,
)
).scalar_one_or_none()
return row is not None
def _login_as(user) -> None:
"""Set Flask-Login request user so downstream services see the caller."""
current_app.login_manager._update_request_context_with_user(user)
user_logged_in.send(current_app._get_current_object(), user=user)
class CallerMounter(Protocol):
def applies_to(self, subject_type: SubjectType) -> bool: ...
def mount(self, ctx: Context) -> None: ...
class AccountMounter:
def applies_to(self, subject_type: SubjectType) -> bool:
return subject_type == SubjectType.ACCOUNT
def mount(self, ctx: Context) -> None:
if ctx.account_id is None:
raise RuntimeError("AccountMounter: account_id unset — BearerCheck did not run")
account = db.session.get(Account, ctx.account_id)
if account is None:
raise RuntimeError("AccountMounter: account row missing for resolved bearer")
account.current_tenant = ctx.tenant
_login_as(account)
ctx.caller, ctx.caller_kind = account, "account"
class EndUserMounter:
def applies_to(self, subject_type: SubjectType) -> bool:
return subject_type == SubjectType.EXTERNAL_SSO
def mount(self, ctx: Context) -> None:
if ctx.tenant is None or ctx.app is None or ctx.subject_email is None:
raise RuntimeError("EndUserMounter: tenant/app/subject_email unset — earlier steps did not run")
end_user = EndUserService.get_or_create_end_user_by_type(
InvokeFrom.OPENAPI,
tenant_id=ctx.tenant.id,
app_id=ctx.app.id,
user_id=ctx.subject_email,
)
_login_as(end_user)
ctx.caller, ctx.caller_kind = end_user, "end_user"

View File

@ -0,0 +1,9 @@
from flask_restx import Resource
from controllers.openapi import openapi_ns
@openapi_ns.route("/_health")
class HealthApi(Resource):
def get(self):
return {"ok": True}

View File

@ -0,0 +1,392 @@
"""Device-flow endpoints under /openapi/v1/oauth/device/*. Two
sub-groups in one module:
Protocol (RFC 8628, public + rate-limited):
POST /oauth/device/code
POST /oauth/device/token
GET /oauth/device/lookup
Approval (account branch, console-cookie authed):
POST /oauth/device/approve
POST /oauth/device/deny
SSO branch lives in oauth_device_sso.py.
"""
from __future__ import annotations
import logging
from flask import request
from flask_login import login_required
from flask_restx import Resource
from pydantic import BaseModel, ValidationError
from werkzeug.exceptions import BadRequest
from configs import dify_config
from controllers.console.wraps import account_initialization_required, setup_required
from controllers.openapi import openapi_ns
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from libs.helper import extract_remote_ip
from libs.login import current_account_with_tenant
from libs.oauth_bearer import SubjectType, bearer_feature_required
from libs.rate_limit import (
LIMIT_APPROVE_CONSOLE,
LIMIT_DEVICE_CODE_PER_IP,
LIMIT_LOOKUP_PUBLIC,
rate_limit,
)
from services.oauth_device_flow import (
ACCOUNT_ISSUER_SENTINEL,
DEFAULT_POLL_INTERVAL_SECONDS,
DEVICE_FLOW_TTL_SECONDS,
PREFIX_OAUTH_ACCOUNT,
DeviceFlowRedis,
DeviceFlowStatus,
InvalidTransitionError,
SlowDownDecision,
StateNotFoundError,
mint_oauth_token,
oauth_ttl_days,
)
logger = logging.getLogger(__name__)
# =========================================================================
# Request / query schemas
# =========================================================================
class DeviceCodeRequest(BaseModel):
client_id: str
device_label: str
class DevicePollRequest(BaseModel):
device_code: str
client_id: str
class DeviceLookupQuery(BaseModel):
user_code: str
class DeviceMutateRequest(BaseModel):
user_code: str
def _validate_json[M: BaseModel](model: type[M]) -> M:
body = request.get_json(silent=True) or {}
try:
return model.model_validate(body)
except ValidationError as exc:
raise BadRequest(str(exc))
def _validate_query[M: BaseModel](model: type[M]) -> M:
try:
return model.model_validate(request.args.to_dict(flat=True))
except ValidationError as exc:
raise BadRequest(str(exc))
# =========================================================================
# Protocol endpoints — RFC 8628 (public + per-IP rate limit)
# =========================================================================
@openapi_ns.route("/oauth/device/code")
class OAuthDeviceCodeApi(Resource):
@rate_limit(LIMIT_DEVICE_CODE_PER_IP)
def post(self):
payload = _validate_json(DeviceCodeRequest)
client_id = payload.client_id
device_label = payload.device_label
if client_id not in dify_config.OPENAPI_KNOWN_CLIENT_IDS:
return {"error": "unsupported_client"}, 400
store = DeviceFlowRedis(redis_client)
ip = extract_remote_ip(request)
device_code, user_code, expires_in = store.start(client_id, device_label, created_ip=ip)
return {
"device_code": device_code,
"user_code": user_code,
"verification_uri": _verification_uri(),
"expires_in": expires_in,
"interval": DEFAULT_POLL_INTERVAL_SECONDS,
}, 200
@openapi_ns.route("/oauth/device/token")
class OAuthDeviceTokenApi(Resource):
"""RFC 8628 poll."""
def post(self):
payload = _validate_json(DevicePollRequest)
device_code = payload.device_code
store = DeviceFlowRedis(redis_client)
# slow_down beats every other branch — polling-too-fast clients
# see only that response regardless of underlying state.
if store.record_poll(device_code, DEFAULT_POLL_INTERVAL_SECONDS) is SlowDownDecision.SLOW_DOWN:
return {"error": "slow_down"}, 400
state = store.load_by_device_code(device_code)
if state is None:
return {"error": "expired_token"}, 400
if state.status is DeviceFlowStatus.PENDING:
return {"error": "authorization_pending"}, 400
terminal = store.consume_on_poll(device_code)
if terminal is None:
return {"error": "expired_token"}, 400
if terminal.status is DeviceFlowStatus.DENIED:
return {"error": "access_denied"}, 400
poll_payload = terminal.poll_payload or {}
if "token" not in poll_payload:
logger.error("device_flow: approved state missing poll_payload for %s", device_code)
return {"error": "expired_token"}, 400
_audit_cross_ip_if_needed(state)
return poll_payload, 200
@openapi_ns.route("/oauth/device/lookup")
class OAuthDeviceLookupApi(Resource):
"""Read-only — public for pre-validate before login. user_code is
high-entropy + short-TTL; per-IP rate limit blocks enumeration.
"""
@rate_limit(LIMIT_LOOKUP_PUBLIC)
def get(self):
payload = _validate_query(DeviceLookupQuery)
user_code = payload.user_code.strip().upper()
store = DeviceFlowRedis(redis_client)
found = store.load_by_user_code(user_code)
if found is None:
return {"valid": False, "expires_in_remaining": 0, "client_id": None}, 200
_device_code, state = found
if state.status is not DeviceFlowStatus.PENDING:
return {"valid": False, "expires_in_remaining": 0, "client_id": state.client_id}, 200
return {
"valid": True,
"expires_in_remaining": DEVICE_FLOW_TTL_SECONDS,
"client_id": state.client_id,
}, 200
# =========================================================================
# Approval endpoints — account branch (cookie-authed)
# =========================================================================
_APPROVE_GUARD_KEY_FMT = "device_code:{code}:approving"
_APPROVE_GUARD_TTL_SECONDS = 10
@openapi_ns.route("/oauth/device/approve")
class DeviceApproveApi(Resource):
@setup_required
@login_required
@account_initialization_required
@bearer_feature_required
@rate_limit(LIMIT_APPROVE_CONSOLE)
def post(self):
payload = _validate_json(DeviceMutateRequest)
user_code = payload.user_code.strip().upper()
account, tenant = current_account_with_tenant()
store = DeviceFlowRedis(redis_client)
found = store.load_by_user_code(user_code)
if found is None:
return {"error": "expired_or_unknown"}, 404
device_code, state = found
if state.status is not DeviceFlowStatus.PENDING:
return {"error": "already_resolved"}, 409
# SET NX guard — without it, two in-flight approves both pass
# PENDING, both mint, and the second upsert silently rotates the
# first caller into an already-revoked token.
guard_key = _APPROVE_GUARD_KEY_FMT.format(code=device_code)
if not redis_client.set(guard_key, "1", nx=True, ex=_APPROVE_GUARD_TTL_SECONDS):
return {"error": "approve_in_progress"}, 409
try:
ttl_days = oauth_ttl_days(tenant_id=tenant)
mint = mint_oauth_token(
db.session,
redis_client,
subject_email=account.email,
subject_issuer=ACCOUNT_ISSUER_SENTINEL,
account_id=str(account.id),
client_id=state.client_id,
device_label=state.device_label,
prefix=PREFIX_OAUTH_ACCOUNT,
ttl_days=ttl_days,
)
poll_payload = _build_account_poll_payload(account, tenant, mint)
try:
store.approve(
device_code,
subject_email=account.email,
account_id=str(account.id),
subject_issuer=ACCOUNT_ISSUER_SENTINEL,
minted_token=mint.token,
token_id=str(mint.token_id),
poll_payload=poll_payload,
)
except (StateNotFoundError, InvalidTransitionError):
# Row minted but state vanished — roll forward; the orphan
# token is revocable via auth devices list / Authorized Apps.
logger.exception("device_flow: approve raced on %s", device_code)
return {"error": "state_lost"}, 409
finally:
redis_client.delete(guard_key)
_emit_approve_audit(state, account, tenant, mint)
return {"status": "approved"}, 200
@openapi_ns.route("/oauth/device/deny")
class DeviceDenyApi(Resource):
@setup_required
@login_required
@account_initialization_required
@bearer_feature_required
@rate_limit(LIMIT_APPROVE_CONSOLE)
def post(self):
payload = _validate_json(DeviceMutateRequest)
user_code = payload.user_code.strip().upper()
store = DeviceFlowRedis(redis_client)
found = store.load_by_user_code(user_code)
if found is None:
return {"error": "expired_or_unknown"}, 404
device_code, state = found
if state.status is not DeviceFlowStatus.PENDING:
return {"error": "already_resolved"}, 409
try:
store.deny(device_code)
except (StateNotFoundError, InvalidTransitionError):
logger.exception("device_flow: deny raced on %s", device_code)
return {"error": "state_lost"}, 409
_emit_deny_audit(state)
return {"status": "denied"}, 200
# =========================================================================
# Helpers
# =========================================================================
def _verification_uri() -> str:
base = getattr(dify_config, "CONSOLE_WEB_URL", None)
if base:
return f"{base.rstrip('/')}/device"
return f"{request.host_url.rstrip('/')}/device"
def _audit_cross_ip_if_needed(state) -> None:
poll_ip = extract_remote_ip(request)
if state.created_ip and poll_ip and poll_ip != state.created_ip:
logger.warning(
"audit: oauth.device_code_cross_ip_poll token_id=%s creation_ip=%s poll_ip=%s",
state.token_id,
state.created_ip,
poll_ip,
extra={
"audit": True,
"token_id": state.token_id,
"creation_ip": state.created_ip,
"poll_ip": poll_ip,
},
)
def _build_account_poll_payload(account, tenant, mint) -> dict:
"""Pre-render the poll-response body so the unauthenticated poll
handler doesn't re-query accounts/tenants for authz data.
"""
from models import Tenant, TenantAccountJoin
rows = (
db.session.query(Tenant, TenantAccountJoin)
.join(TenantAccountJoin, TenantAccountJoin.tenant_id == Tenant.id)
.filter(TenantAccountJoin.account_id == account.id)
.all()
)
workspaces = [{"id": str(t.id), "name": t.name, "role": getattr(m, "role", "")} for t, m in rows]
# Prefer active session tenant → DB-flagged current join → first membership.
default_ws_id = None
if tenant and any(w["id"] == str(tenant) for w in workspaces):
default_ws_id = str(tenant)
if default_ws_id is None:
for _t, m in rows:
if getattr(m, "current", False):
default_ws_id = str(m.tenant_id)
break
if default_ws_id is None and workspaces:
default_ws_id = workspaces[0]["id"]
return {
"token": mint.token,
"expires_at": mint.expires_at.isoformat(),
"subject_type": SubjectType.ACCOUNT,
"account": {"id": str(account.id), "email": account.email, "name": account.name},
"workspaces": workspaces,
"default_workspace_id": default_ws_id,
"token_id": str(mint.token_id),
}
def _emit_approve_audit(state, account, tenant, mint) -> None:
logger.warning(
"audit: oauth.device_flow_approved token_id=%s subject=%s client_id=%s device_label=%s rotated=? expires_at=%s",
mint.token_id,
account.email,
state.client_id,
state.device_label,
mint.expires_at,
extra={
"audit": True,
"event": "oauth.device_flow_approved",
"token_id": str(mint.token_id),
"subject_type": SubjectType.ACCOUNT,
"subject_email": account.email,
"account_id": str(account.id),
"tenant_id": tenant,
"client_id": state.client_id,
"device_label": state.device_label,
"scopes": ["full"],
"expires_at": mint.expires_at.isoformat(),
},
)
def _emit_deny_audit(state) -> None:
logger.warning(
"audit: oauth.device_flow_denied client_id=%s device_label=%s",
state.client_id,
state.device_label,
extra={
"audit": True,
"event": "oauth.device_flow_denied",
"client_id": state.client_id,
"device_label": state.device_label,
},
)

View File

@ -0,0 +1,287 @@
"""SSO-branch device-flow endpoints under /openapi/v1/oauth/device/*.
EE-only. Browser flow:
GET /oauth/device/sso-initiate → 302 to IdP authorize URL
GET /oauth/device/sso-complete → ACS callback, sets approval-grant cookie
GET /oauth/device/approval-context → SPA reads cookie claims (idempotent)
POST /oauth/device/approve-external → mints dfoe_ token + clears cookie
Function-based (raw @bp.route) rather than Resource classes because the
handlers do redirects + cookie kwargs that don't fit the Resource shape.
"""
from __future__ import annotations
import logging
import secrets
from flask import jsonify, make_response, redirect, request
from werkzeug.exceptions import (
BadGateway,
BadRequest,
Conflict,
Forbidden,
NotFound,
Unauthorized,
)
from controllers.openapi import bp
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from libs import jws
from libs.device_flow_security import (
APPROVAL_GRANT_COOKIE_NAME,
ApprovalGrantClaims,
approval_grant_cleared_cookie_kwargs,
approval_grant_cookie_kwargs,
consume_approval_grant_nonce,
consume_sso_assertion_nonce,
enterprise_only,
mint_approval_grant,
verify_approval_grant,
)
from libs.oauth_bearer import SubjectType
from libs.rate_limit import (
LIMIT_APPROVE_EXT_PER_EMAIL,
LIMIT_SSO_INITIATE_PER_IP,
enforce,
rate_limit,
)
from services.enterprise.enterprise_service import EnterpriseService
from services.oauth_device_flow import (
PREFIX_OAUTH_EXTERNAL_SSO,
DeviceFlowRedis,
DeviceFlowStatus,
InvalidTransitionError,
StateNotFoundError,
mint_oauth_token,
oauth_ttl_days,
)
logger = logging.getLogger(__name__)
# Matches DEVICE_FLOW_TTL_SECONDS so the signed state can't outlive the
# device_code it references.
STATE_ENVELOPE_TTL_SECONDS = 15 * 60
# Canonical sso-complete path. IdP-side ACS callback URL must point here.
_SSO_COMPLETE_PATH = "/openapi/v1/oauth/device/sso-complete"
@bp.route("/oauth/device/sso-initiate", methods=["GET"])
@enterprise_only
@rate_limit(LIMIT_SSO_INITIATE_PER_IP)
def sso_initiate():
user_code = (request.args.get("user_code") or "").strip().upper()
if not user_code:
raise BadRequest("user_code required")
store = DeviceFlowRedis(redis_client)
found = store.load_by_user_code(user_code)
if found is None:
raise BadRequest("invalid_user_code")
_, state = found
if state.status is not DeviceFlowStatus.PENDING:
raise BadRequest("invalid_user_code")
keyset = jws.KeySet.from_shared_secret()
signed_state = jws.sign(
keyset,
payload={
"redirect_url": "",
"app_code": "",
"intent": "device_flow",
"user_code": user_code,
"nonce": secrets.token_urlsafe(16),
"return_to": "",
"idp_callback_url": f"{request.host_url.rstrip('/')}{_SSO_COMPLETE_PATH}",
},
aud=jws.AUD_STATE_ENVELOPE,
ttl_seconds=STATE_ENVELOPE_TTL_SECONDS,
)
try:
reply = EnterpriseService.initiate_device_flow_sso(signed_state)
except Exception as e:
logger.warning("sso-initiate: enterprise call failed: %s", e)
raise BadGateway("sso_initiate_failed") from e
url = (reply or {}).get("url")
if not url:
raise BadGateway("sso_initiate_missing_url")
# Clear stale approval-grant — defends against cross-tab/back-button mixing.
resp = redirect(url, code=302)
resp.set_cookie(**approval_grant_cleared_cookie_kwargs())
return resp
@bp.route("/oauth/device/sso-complete", methods=["GET"])
@enterprise_only
def sso_complete():
blob = request.args.get("sso_assertion")
if not blob:
raise BadRequest("sso_assertion required")
keyset = jws.KeySet.from_shared_secret()
try:
claims = jws.verify(keyset, blob, expected_aud=jws.AUD_EXT_SUBJECT_ASSERTION)
except jws.VerifyError as e:
logger.warning("sso-complete: rejected assertion: %s", e)
raise BadRequest("invalid_sso_assertion") from e
if not consume_sso_assertion_nonce(redis_client, claims.get("nonce", "")):
raise BadRequest("invalid_sso_assertion")
user_code = (claims.get("user_code") or "").strip().upper()
store = DeviceFlowRedis(redis_client)
found = store.load_by_user_code(user_code)
if found is None:
raise Conflict("user_code_not_pending")
_, state = found
if state.status is not DeviceFlowStatus.PENDING:
raise Conflict("user_code_not_pending")
iss = request.host_url.rstrip("/")
cookie_value, _ = mint_approval_grant(
keyset=keyset,
iss=iss,
subject_email=claims["email"],
subject_issuer=claims["issuer"],
user_code=user_code,
)
resp = redirect("/device?sso_verified=1", code=302)
resp.set_cookie(**approval_grant_cookie_kwargs(cookie_value))
return resp
@bp.route("/oauth/device/approval-context", methods=["GET"])
@enterprise_only
def approval_context():
token = request.cookies.get(APPROVAL_GRANT_COOKIE_NAME)
if not token:
raise Unauthorized("no_session")
keyset = jws.KeySet.from_shared_secret()
try:
claims = verify_approval_grant(keyset, token)
except jws.VerifyError as e:
logger.warning("approval-context: bad cookie: %s", e)
raise Unauthorized("no_session") from e
return jsonify(
{
"subject_email": claims.subject_email,
"subject_issuer": claims.subject_issuer,
"user_code": claims.user_code,
"csrf_token": claims.csrf_token,
"expires_at": claims.expires_at.isoformat(),
}
), 200
@bp.route("/oauth/device/approve-external", methods=["POST"])
@enterprise_only
def approve_external():
token = request.cookies.get(APPROVAL_GRANT_COOKIE_NAME)
if not token:
raise Unauthorized("invalid_session")
keyset = jws.KeySet.from_shared_secret()
try:
claims: ApprovalGrantClaims = verify_approval_grant(keyset, token)
except jws.VerifyError as e:
logger.warning("approve-external: bad cookie: %s", e)
raise Unauthorized("invalid_session") from e
enforce(LIMIT_APPROVE_EXT_PER_EMAIL, key=f"subject:{claims.subject_email}")
csrf_header = request.headers.get("X-CSRF-Token", "")
if not csrf_header or csrf_header != claims.csrf_token:
raise Forbidden("csrf_mismatch")
data = request.get_json(silent=True) or {}
body_user_code = (data.get("user_code") or "").strip().upper()
if body_user_code != claims.user_code:
raise BadRequest("user_code_mismatch")
store = DeviceFlowRedis(redis_client)
found = store.load_by_user_code(claims.user_code)
if found is None:
raise NotFound("user_code_not_pending")
device_code, state = found
if state.status is not DeviceFlowStatus.PENDING:
raise Conflict("user_code_not_pending")
if not consume_approval_grant_nonce(redis_client, claims.nonce):
raise Unauthorized("session_already_consumed")
ttl_days = oauth_ttl_days(tenant_id=None)
mint = mint_oauth_token(
db.session,
redis_client,
subject_email=claims.subject_email,
subject_issuer=claims.subject_issuer,
account_id=None,
client_id=state.client_id,
device_label=state.device_label,
prefix=PREFIX_OAUTH_EXTERNAL_SSO,
ttl_days=ttl_days,
)
poll_payload = {
"token": mint.token,
"expires_at": mint.expires_at.isoformat(),
"subject_type": SubjectType.EXTERNAL_SSO,
"subject_email": claims.subject_email,
"subject_issuer": claims.subject_issuer,
"account": None,
"workspaces": [],
"default_workspace_id": None,
"token_id": str(mint.token_id),
}
try:
store.approve(
device_code,
subject_email=claims.subject_email,
account_id=None,
subject_issuer=claims.subject_issuer,
minted_token=mint.token,
token_id=str(mint.token_id),
poll_payload=poll_payload,
)
except (StateNotFoundError, InvalidTransitionError) as e:
logger.exception("approve-external: state transition raced")
raise Conflict("state_lost") from e
_emit_approve_external_audit(state, claims, mint)
resp = make_response(jsonify({"status": "approved"}), 200)
resp.set_cookie(**approval_grant_cleared_cookie_kwargs())
return resp
def _emit_approve_external_audit(state, claims, mint) -> None:
logger.warning(
"audit: oauth.device_flow_approved subject_type=%s subject_email=%s subject_issuer=%s token_id=%s",
SubjectType.EXTERNAL_SSO,
claims.subject_email,
claims.subject_issuer,
mint.token_id,
extra={
"audit": True,
"event": "oauth.device_flow_approved",
"subject_type": SubjectType.EXTERNAL_SSO,
"subject_email": claims.subject_email,
"subject_issuer": claims.subject_issuer,
"token_id": str(mint.token_id),
"client_id": state.client_id,
"device_label": state.device_label,
"scopes": ["apps:run"],
"expires_at": mint.expires_at.isoformat(),
},
)

View File

@ -0,0 +1,89 @@
"""User-scoped workspace reads under /openapi/v1/workspaces. Bearer-authed
counterparts to the cookie-authed /console/api/workspaces endpoints.
Account bearers (dfoa_) see every tenant they're a member of. External
SSO bearers (dfoe_) have no account_id and so see an empty list — that
matches /openapi/v1/account.
"""
from __future__ import annotations
from itertools import starmap
from flask import g
from flask_restx import Resource
from sqlalchemy import select
from werkzeug.exceptions import NotFound
from controllers.openapi import openapi_ns
from extensions.ext_database import db
from libs.oauth_bearer import (
ACCEPT_USER_ANY,
SubjectType,
validate_bearer,
)
from models import Tenant, TenantAccountJoin
@openapi_ns.route("/workspaces")
class WorkspacesApi(Resource):
@validate_bearer(accept=ACCEPT_USER_ANY)
def get(self):
ctx = g.auth_ctx
if ctx.subject_type != SubjectType.ACCOUNT or not ctx.account_id:
return {"workspaces": []}, 200
rows = db.session.execute(
select(Tenant, TenantAccountJoin)
.join(TenantAccountJoin, TenantAccountJoin.tenant_id == Tenant.id)
.where(TenantAccountJoin.account_id == str(ctx.account_id))
.order_by(Tenant.created_at.asc())
).all()
return {"workspaces": list(starmap(_workspace_summary, rows))}, 200
@openapi_ns.route("/workspaces/<string:workspace_id>")
class WorkspaceByIdApi(Resource):
@validate_bearer(accept=ACCEPT_USER_ANY)
def get(self, workspace_id: str):
ctx = g.auth_ctx
# External SSO + missing account → never a member of anything; 404.
if ctx.subject_type != SubjectType.ACCOUNT or not ctx.account_id:
raise NotFound("workspace not found")
row = db.session.execute(
select(Tenant, TenantAccountJoin)
.join(TenantAccountJoin, TenantAccountJoin.tenant_id == Tenant.id)
.where(
Tenant.id == workspace_id,
TenantAccountJoin.account_id == str(ctx.account_id),
)
).first()
# 404 (not 403) on non-member so workspace IDs don't leak across tenants.
if row is None:
raise NotFound("workspace not found")
tenant, membership = row
return _workspace_detail(tenant, membership), 200
def _workspace_summary(tenant: Tenant, membership: TenantAccountJoin) -> dict:
return {
"id": str(tenant.id),
"name": tenant.name,
"role": getattr(membership, "role", ""),
"status": tenant.status,
"current": getattr(membership, "current", False),
}
def _workspace_detail(tenant: Tenant, membership: TenantAccountJoin) -> dict:
return {
"id": str(tenant.id),
"name": tenant.name,
"role": getattr(membership, "role", ""),
"status": tenant.status,
"current": getattr(membership, "current", False),
"created_at": tenant.created_at.isoformat() if tenant.created_at else None,
}

View File

@ -84,9 +84,6 @@ class WorkflowEventsApi(WebApiResource):
def _generate_stream_events():
if include_state_snapshot:
# TODO(wylswz): events between shapshot and live tail may be lost.
# TODO(wylswz): previous message chunks are not replayed. In order to support replay, we need
# to figure out a way to deduplicate events between snapshot and stream.
return generator.convert_to_event_stream(
build_workflow_event_stream(
app_mode=app_mode,

View File

@ -311,7 +311,7 @@ class MessageBasedAppGenerator(BaseAppGenerator):
cls,
app_mode: AppMode,
workflow_run_id: str,
idle_timeout: float = 300,
idle_timeout=300,
on_subscribe: Callable[[], None] | None = None,
) -> Generator[Mapping | str, None, None]:
topic = cls.get_response_topic(app_mode, workflow_run_id)

View File

@ -23,7 +23,7 @@ class MessageGenerator:
cls,
app_mode: AppMode,
workflow_run_id: str,
idle_timeout: float = 300,
idle_timeout=300,
ping_interval: float = 10.0,
on_subscribe: Callable[[], None] | None = None,
) -> Generator[Mapping | str, None, None]:

View File

@ -27,9 +27,6 @@ def stream_topic_events(
terminal_values = _normalize_terminal_events(terminal_events)
last_msg_time = time.time()
last_ping_time = last_msg_time
# The application layer intentionally does not use broadcast-channel replay;
# callers that need historical events should compose them from persisted state
# (see ``build_workflow_event_stream``) and then tail the live stream.
with topic.subscribe() as sub:
# on_subscribe fires only after the Redis subscription is active.
# This is used to gate task start and reduce pub/sub race for the first event.

View File

@ -685,6 +685,8 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
match invoke_from:
case InvokeFrom.SERVICE_API:
created_from = WorkflowAppLogCreatedFrom.SERVICE_API
case InvokeFrom.OPENAPI:
created_from = WorkflowAppLogCreatedFrom.OPENAPI
case InvokeFrom.EXPLORE:
created_from = WorkflowAppLogCreatedFrom.INSTALLED_APP
case InvokeFrom.WEB_APP:

View File

@ -410,7 +410,7 @@ class WorkflowBasedAppRunner:
elif isinstance(event, GraphRunFailedEvent):
self._publish_event(QueueWorkflowFailedEvent(error=event.error, exceptions_count=event.exceptions_count))
elif isinstance(event, GraphRunAbortedEvent):
self._publish_event(QueueWorkflowPartialSuccessEvent(outputs={}, exceptions_count=0))
self._publish_event(QueueWorkflowFailedEvent(error=event.reason or "Unknown error", exceptions_count=0))
elif isinstance(event, GraphRunPausedEvent):
runtime_state = workflow_entry.graph_engine.graph_runtime_state
paused_nodes = runtime_state.get_paused_nodes()

View File

@ -24,6 +24,7 @@ class UserFrom(StrEnum):
class InvokeFrom(StrEnum):
SERVICE_API = "service-api"
OPENAPI = "openapi"
WEB_APP = "web-app"
TRIGGER = "trigger"
EXPLORE = "explore"

View File

@ -1,6 +1,6 @@
from __future__ import annotations
from collections.abc import Generator # Changed from Iterator
from collections.abc import Iterator
from contextlib import contextmanager
from contextvars import ContextVar
from dataclasses import dataclass
@ -32,7 +32,7 @@ def get_current_file_access_scope() -> FileAccessScope | None:
@contextmanager
def bind_file_access_scope(scope: FileAccessScope) -> Generator[None, None, None]: # Changed from Iterator[None]
def bind_file_access_scope(scope: FileAccessScope) -> Iterator[None]:
token = _current_file_access_scope.set(scope)
try:
yield

View File

@ -70,32 +70,12 @@ class ProviderManager:
Request-bound managers may carry caller identity in that runtime, and the
resulting ``ProviderConfiguration`` objects must reuse it for downstream
model-type and schema lookups.
Configuration assembly is cached per manager instance so call chains that
share one request-scoped manager can reuse the same provider graph instead
of rebuilding it for every lookup. Call ``clear_configurations_cache()``
when a long-lived manager needs to observe writes performed within the same
instance scope.
"""
decoding_rsa_key: Any | None
decoding_cipher_rsa: Any | None
_model_runtime: ModelRuntime
_configurations_cache: dict[str, ProviderConfigurations]
def __init__(self, model_runtime: ModelRuntime):
self.decoding_rsa_key = None
self.decoding_cipher_rsa = None
self._model_runtime = model_runtime
self._configurations_cache = {}
def clear_configurations_cache(self, tenant_id: str | None = None) -> None:
"""Drop assembled provider configurations cached on this manager instance."""
if tenant_id is None:
self._configurations_cache.clear()
return
self._configurations_cache.pop(tenant_id, None)
def get_configurations(self, tenant_id: str) -> ProviderConfigurations:
"""
@ -134,10 +114,6 @@ class ProviderManager:
:param tenant_id:
:return:
"""
cached_configurations = self._configurations_cache.get(tenant_id)
if cached_configurations is not None:
return cached_configurations
# Get all provider records of the workspace
provider_name_to_provider_records_dict = self._get_all_providers(tenant_id)
@ -297,8 +273,6 @@ class ProviderManager:
provider_configurations[str(provider_id_entity)] = provider_configuration
self._configurations_cache[tenant_id] = provider_configurations
# Return the encapsulated object
return provider_configurations

View File

@ -139,10 +139,8 @@ class Jieba(BaseKeyword):
"__data__": {"index_id": self.dataset.id, "summary": None, "table": keyword_table},
}
dataset_keyword_table = self.dataset.dataset_keyword_table
keyword_data_source_type = dataset_keyword_table.data_source_type if dataset_keyword_table else "file"
keyword_data_source_type = dataset_keyword_table.data_source_type
if keyword_data_source_type == "database":
if dataset_keyword_table is None:
return
dataset_keyword_table.keyword_table = dumps_with_sets(keyword_table_dict)
db.session.commit()
else:

View File

@ -1,5 +1,4 @@
import re
from collections.abc import Callable
from operator import itemgetter
from typing import cast
@ -81,14 +80,12 @@ class JiebaKeywordTableHandler:
def extract_tags(self, sentence: str, top_k: int | None = 20, **kwargs):
# Basic frequency-based keyword extraction as a fallback when TF-IDF is unavailable.
top_k = cast(int | None, kwargs.pop("topK", top_k))
if top_k is None:
top_k = 20
top_k = kwargs.pop("topK", top_k)
cut = getattr(jieba, "cut", None)
if self._lcut:
tokens = self._lcut(sentence)
elif callable(cut):
tokens = list(cast(Callable[[str], list[str]], cut)(sentence))
tokens = list(cut(sentence))
else:
tokens = re.findall(r"\w+", sentence)
@ -111,7 +108,7 @@ class JiebaKeywordTableHandler:
sentence=text,
topK=max_keywords_per_chunk,
)
# jieba.analyse.extract_tags returns an untyped list when withFlag is False by default.
# jieba.analyse.extract_tags returns list[Any] when withFlag is False by default.
keywords = cast(list[str], keywords)
return set(self._expand_tokens_with_subtokens(set(keywords)))

View File

@ -158,7 +158,7 @@ class RetrievalService:
)
if futures:
for _ in concurrent.futures.as_completed(futures, timeout=3600):
for future in concurrent.futures.as_completed(futures, timeout=3600):
if exceptions:
for f in futures:
f.cancel()

View File

@ -94,7 +94,6 @@ class ExtractProcessor:
cls, extract_setting: ExtractSetting, is_automatic: bool = False, file_path: str | None = None
) -> list[Document]:
if extract_setting.datasource_type == DatasourceType.FILE:
upload_file = extract_setting.upload_file
with tempfile.TemporaryDirectory() as temp_dir:
upload_file = extract_setting.upload_file
if not file_path:
@ -105,7 +104,6 @@ class ExtractProcessor:
storage.download(upload_file.key, file_path)
input_file = Path(file_path)
file_extension = input_file.suffix.lower()
assert upload_file is not None, "upload_file is required"
etl_type = dify_config.ETL_TYPE
extractor: BaseExtractor | None = None
if etl_type == "Unstructured":

View File

@ -28,7 +28,7 @@ class FunctionCallMultiDatasetRouter:
SystemPromptMessage(content="You are a helpful AI assistant."),
UserPromptMessage(content=query),
]
result: LLMResult = model_instance.invoke_llm( # pyright: ignore[reportCallIssue, reportArgumentType]
result: LLMResult = model_instance.invoke_llm(
prompt_messages=prompt_messages,
tools=dataset_tools,
stream=False,

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import codecs
import re
from collections.abc import Set as AbstractSet
from collections.abc import Collection
from typing import Any, Literal
from core.model_manager import ModelInstance
@ -21,8 +21,8 @@ class EnhanceRecursiveCharacterTextSplitter(RecursiveCharacterTextSplitter):
def from_encoder[T: EnhanceRecursiveCharacterTextSplitter](
cls: type[T],
embedding_model_instance: ModelInstance | None,
allowed_special: Literal["all"] | AbstractSet[str] = frozenset(),
disallowed_special: Literal["all"] | AbstractSet[str] = "all",
allowed_special: Literal["all"] | set[str] = set(),
disallowed_special: Literal["all"] | Collection[str] = "all",
**kwargs: Any,
) -> T:
def _token_encoder(texts: list[str]) -> list[int]:
@ -40,7 +40,6 @@ class EnhanceRecursiveCharacterTextSplitter(RecursiveCharacterTextSplitter):
return [len(text) for text in texts]
_ = _token_encoder # kept for future token-length wiring
return cls(length_function=_character_encoder, **kwargs)

View File

@ -4,8 +4,7 @@ import copy
import logging
import re
from abc import ABC, abstractmethod
from collections.abc import Callable, Iterable, Sequence
from collections.abc import Set as AbstractSet
from collections.abc import Callable, Collection, Iterable, Sequence, Set
from dataclasses import dataclass
from typing import Any, Literal
@ -188,8 +187,8 @@ class TokenTextSplitter(TextSplitter):
self,
encoding_name: str = "gpt2",
model_name: str | None = None,
allowed_special: Literal["all"] | AbstractSet[str] = frozenset(),
disallowed_special: Literal["all"] | AbstractSet[str] = "all",
allowed_special: Literal["all"] | Set[str] = set(),
disallowed_special: Literal["all"] | Collection[str] = "all",
**kwargs: Any,
):
"""Create a new TextSplitter."""
@ -208,8 +207,8 @@ class TokenTextSplitter(TextSplitter):
else:
enc = tiktoken.get_encoding(encoding_name)
self._tokenizer = enc
self._allowed_special: Literal["all"] | AbstractSet[str] = allowed_special
self._disallowed_special: Literal["all"] | AbstractSet[str] = disallowed_special
self._allowed_special = allowed_special
self._disallowed_special = disallowed_special
def split_text(self, text: str) -> list[str]:
def _encode(_text: str) -> list[int]:

View File

@ -41,10 +41,6 @@ def safe_json_value(v):
return v.hex()
elif isinstance(v, memoryview):
return v.tobytes().hex()
elif isinstance(v, np.integer):
return int(v)
elif isinstance(v, np.floating):
return float(v)
elif isinstance(v, np.ndarray):
return v.tolist()
elif isinstance(v, dict):

View File

@ -105,7 +105,7 @@ class Article:
def extract_using_readabilipy(html: str):
json_article: dict[str, Any] = simple_json_from_html_string(html, use_readability=False)
json_article: dict[str, Any] = simple_json_from_html_string(html, use_readability=True)
article = Article(
title=json_article.get("title") or "",
author=json_article.get("byline") or "",

View File

@ -8,6 +8,8 @@ AUTHENTICATED_HEADERS: tuple[str, ...] = (*SERVICE_API_HEADERS, HEADER_NAME_CSRF
FILES_HEADERS: tuple[str, ...] = (*BASE_CORS_HEADERS, HEADER_NAME_CSRF_TOKEN)
EMBED_HEADERS: tuple[str, ...] = ("Content-Type", HEADER_NAME_APP_CODE)
EXPOSED_HEADERS: tuple[str, ...] = ("X-Version", "X-Env", "X-Trace-Id")
OPENAPI_HEADERS: tuple[str, ...] = ("Authorization", "Content-Type", HEADER_NAME_CSRF_TOKEN)
OPENAPI_MAX_AGE_SECONDS: int = 600
def _apply_cors_once(bp, /, **cors_kwargs):
@ -29,6 +31,7 @@ def init_app(app: DifyApp):
from controllers.files import bp as files_bp
from controllers.inner_api import bp as inner_api_bp
from controllers.mcp import bp as mcp_bp
from controllers.openapi import bp as openapi_bp
from controllers.service_api import bp as service_api_bp
from controllers.trigger import bp as trigger_bp
from controllers.web import bp as web_bp
@ -41,6 +44,22 @@ def init_app(app: DifyApp):
)
app.register_blueprint(service_api_bp)
# User-scoped programmatic API. Default empty allowlist = same-origin
# only; expand via OPENAPI_CORS_ALLOW_ORIGINS for third-party
# integrations. supports_credentials so cookie-authed approve/deny
# work; cross-origin OPTIONS without an allowed origin will fail
# the same as on the console blueprint.
_apply_cors_once(
openapi_bp,
resources={r"/*": {"origins": dify_config.OPENAPI_CORS_ALLOW_ORIGINS}},
supports_credentials=True,
allow_headers=list(OPENAPI_HEADERS),
methods=["GET", "POST", "PATCH", "DELETE", "OPTIONS"],
expose_headers=list(EXPOSED_HEADERS),
max_age=OPENAPI_MAX_AGE_SECONDS,
)
app.register_blueprint(openapi_bp)
_apply_cors_once(
web_bp,
resources={

View File

@ -222,6 +222,12 @@ def init_app(app: DifyApp) -> Celery:
"task": "schedule.clean_workflow_runs_task.clean_workflow_runs_task",
"schedule": crontab(minute="0", hour="0"),
}
if dify_config.ENABLE_CLEAN_OAUTH_ACCESS_TOKENS_TASK:
imports.append("schedule.clean_oauth_access_tokens_task")
beat_schedule["clean_oauth_access_tokens_task"] = {
"task": "schedule.clean_oauth_access_tokens_task.clean_oauth_access_tokens_task",
"schedule": crontab(minute="0", hour="5", day_of_month=f"*/{day}"),
}
if dify_config.ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK:
imports.append("schedule.workflow_schedule_task")
beat_schedule["workflow_schedule_task"] = {

View File

@ -12,7 +12,7 @@ from constants import HEADER_NAME_APP_CODE
from dify_app import DifyApp
from extensions.ext_database import db
from libs.passport import PassportService
from libs.token import extract_access_token, extract_webapp_passport
from libs.token import extract_access_token, extract_console_cookie_token, extract_webapp_passport
from models import Account, Tenant, TenantAccountJoin
from models.model import AppMCPServer, EndUser
from services.account_service import AccountService
@ -84,6 +84,24 @@ def load_user_from_request(request_from_flask_login: Request) -> LoginUser | Non
logged_in_account = AccountService.load_logged_in_account(account_id=user_id)
return logged_in_account
elif request.blueprint == "openapi":
# Account-branch device-flow approval routes (approve / deny /
# approval-context) sit under @login_required and authenticate via
# the console session cookie. Cookie-only on purpose — bearer
# tokens (dfoa_/dfoe_) live on the Authorization header and are
# validated by AppPipeline, not flask-login.
cookie_token = extract_console_cookie_token(request)
if not cookie_token:
return None
try:
decoded = PassportService().verify(cookie_token)
except Exception:
return None
user_id = decoded.get("user_id")
source = decoded.get("token_source")
if source or not user_id:
return None
return AccountService.load_logged_in_account(account_id=user_id)
elif request.blueprint == "web":
app_code = request.headers.get(HEADER_NAME_APP_CODE)
webapp_token = extract_webapp_passport(app_code, request) if app_code else None

View File

@ -0,0 +1,23 @@
"""Bind the bearer authenticator at startup. Must run after ext_database
and ext_redis (needs both factories).
"""
from __future__ import annotations
from configs import dify_config
from dify_app import DifyApp
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from libs.oauth_bearer import build_and_bind
def is_enabled() -> bool:
return dify_config.ENABLE_OAUTH_BEARER
def init_app(app: DifyApp) -> None:
# scoped_session isn't a context manager; request teardown closes it.
def session_factory():
return db.session
build_and_bind(session_factory=session_factory, redis_client=redis_client)

View File

@ -58,7 +58,6 @@ class MessageListItem(ResponseModel):
message_files: list[MessageFile]
status: str
error: str | None = None
workflow_run_id: str | None = None
extra_contents: list[ExecutionExtraContentDomainModel]
@field_validator("inputs", mode="before")

View File

@ -76,6 +76,7 @@ class _StreamsSubscription(Subscription):
# reading and writing the _listener / `_closed` attribute.
self._lock = threading.Lock()
self._closed: bool = False
# self._closed = threading.Event()
self._listener: threading.Thread | None = None
def _listen(self) -> None:

View File

@ -0,0 +1,196 @@
"""Device-flow security primitives: enterprise_only gate, approval-grant
cookie mint/verify/consume, and anti-framing headers.
"""
from __future__ import annotations
import logging
import secrets
from collections.abc import Callable
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from functools import wraps
from flask import Blueprint
from werkzeug.exceptions import NotFound
from libs import jws
from libs.token import is_secure
from services.feature_service import FeatureService, LicenseStatus
logger = logging.getLogger(__name__)
# ============================================================================
# enterprise_only decorator
# ============================================================================
# Fail-closed: any non-EE-active status (default NONE on CE, plus INACTIVE / EXPIRED / LOST)
# is denied. Future LicenseStatus values default to denial unless explicitly admitted.
_EE_ENABLED_STATUSES = {LicenseStatus.ACTIVE, LicenseStatus.EXPIRING}
def enterprise_only[**P, R](view: Callable[P, R]) -> Callable[P, R]:
"""404 on CE, passthrough on EE. Apply before rate-limit so CE
responses don't consume the bucket.
"""
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
settings = FeatureService.get_system_features()
if settings.license.status not in _EE_ENABLED_STATUSES:
raise NotFound()
return view(*args, **kwargs)
return decorated
# ============================================================================
# approval_grant cookie
# ============================================================================
APPROVAL_GRANT_COOKIE_NAME = "device_approval_grant"
APPROVAL_GRANT_COOKIE_PATH = "/openapi/v1/oauth/device"
APPROVAL_GRANT_COOKIE_TTL_SECONDS = 300 # 5 min
NONCE_TTL_SECONDS = 600 # 2x cookie TTL — defeats clock-skew late replay
NONCE_KEY_FMT = "device_approval_grant_nonce:{nonce}"
SSO_ASSERTION_NONCE_KEY_FMT = "sso_assertion_nonce:{nonce}"
@dataclass(frozen=True, slots=True)
class ApprovalGrantClaims:
subject_email: str
subject_issuer: str
user_code: str
nonce: str
csrf_token: str
expires_at: datetime
def mint_approval_grant(
*,
keyset: jws.KeySet,
iss: str,
subject_email: str,
subject_issuer: str,
user_code: str,
) -> tuple[str, ApprovalGrantClaims]:
"""Use ``approval_grant_cookie_kwargs`` to set the cookie — single
source of truth for Path/HttpOnly/Secure/SameSite.
"""
now = datetime.now(UTC)
exp = now + timedelta(seconds=APPROVAL_GRANT_COOKIE_TTL_SECONDS)
nonce = _random_opaque()
csrf_token = _random_opaque()
payload = {
"iss": iss,
"subject_email": subject_email,
"subject_issuer": subject_issuer,
"user_code": user_code,
"nonce": nonce,
"csrf_token": csrf_token,
}
token = jws.sign(keyset, payload, aud=jws.AUD_APPROVAL_GRANT, ttl_seconds=APPROVAL_GRANT_COOKIE_TTL_SECONDS)
return token, ApprovalGrantClaims(
subject_email=subject_email,
subject_issuer=subject_issuer,
user_code=user_code,
nonce=nonce,
csrf_token=csrf_token,
expires_at=exp,
)
def verify_approval_grant(keyset: jws.KeySet, token: str) -> ApprovalGrantClaims:
"""Sig + aud + exp only — nonce consumption is the caller's job."""
data = jws.verify(keyset, token, expected_aud=jws.AUD_APPROVAL_GRANT)
return ApprovalGrantClaims(
subject_email=data["subject_email"],
subject_issuer=data["subject_issuer"],
user_code=data["user_code"],
nonce=data["nonce"],
csrf_token=data["csrf_token"],
expires_at=datetime.fromtimestamp(data["exp"], tz=UTC),
)
def consume_approval_grant_nonce(redis_client, nonce: str) -> bool:
if not nonce:
return False
return bool(
redis_client.set(
NONCE_KEY_FMT.format(nonce=nonce),
"1",
nx=True,
ex=NONCE_TTL_SECONDS,
)
)
def consume_sso_assertion_nonce(redis_client, nonce: str) -> bool:
if not nonce:
return False
return bool(
redis_client.set(
SSO_ASSERTION_NONCE_KEY_FMT.format(nonce=nonce),
"1",
nx=True,
ex=NONCE_TTL_SECONDS,
)
)
def approval_grant_cookie_kwargs(value: str) -> dict:
"""``secure`` follows is_secure() so HTTP-only deployments don't
silently drop the cookie.
"""
return {
"key": APPROVAL_GRANT_COOKIE_NAME,
"value": value,
"max_age": APPROVAL_GRANT_COOKIE_TTL_SECONDS,
"path": APPROVAL_GRANT_COOKIE_PATH,
"secure": is_secure(),
"httponly": True,
"samesite": "Lax",
}
def approval_grant_cleared_cookie_kwargs() -> dict:
return {
"key": APPROVAL_GRANT_COOKIE_NAME,
"value": "",
"max_age": 0,
"path": APPROVAL_GRANT_COOKIE_PATH,
"secure": is_secure(),
"httponly": True,
"samesite": "Lax",
}
def _random_opaque() -> str:
return secrets.token_urlsafe(16)
# ============================================================================
# Anti-framing headers
# ============================================================================
_ANTI_FRAMING_HEADERS = {
"X-Frame-Options": "DENY",
"Content-Security-Policy": "frame-ancestors 'none'",
}
def attach_anti_framing(bp: Blueprint) -> None:
"""X-Frame-Options + CSP on every response from ``bp`` (CI invariant #4)."""
@bp.after_request
def _apply_headers(response): # pyright: ignore[reportUnusedFunction]
for name, value in _ANTI_FRAMING_HEADERS.items():
response.headers.setdefault(name, value)
return response

View File

@ -1,5 +1,5 @@
import contextvars
from collections.abc import Generator # Changed from Iterator
from collections.abc import Iterator
from contextlib import contextmanager
from typing import TYPE_CHECKING
@ -13,7 +13,7 @@ if TYPE_CHECKING:
def preserve_flask_contexts(
flask_app: Flask,
context_vars: contextvars.Context,
) -> Generator[None, None, None]: # Changed from Iterator[None]
) -> Iterator[None]:
"""
A context manager that handles:
1. flask-login's UserProxy copy

View File

@ -542,3 +542,18 @@ class RateLimiter:
self._redis_client.zadd(key, {member: current_time})
self._redis_client.expire(key, self.time_window * 2)
def seconds_until_available(self, email: str) -> int:
"""Seconds until the oldest in-window entry expires, freeing a slot.
Defensive floor of 1 second. Caller should only invoke this after
is_rate_limited() returned True.
"""
key = self._get_key(email)
oldest = cast(Any, self._redis_client).zrange(key, 0, 0, withscores=True)
if not oldest:
return 1
_member, score = oldest[0]
free_at = int(score) + self.time_window
remaining = free_at - int(time.time())
return max(remaining, 1)

108
api/libs/jws.py Normal file
View File

@ -0,0 +1,108 @@
"""HS256 compact JWS keyed on the shared Dify SECRET_KEY. Used by the SSO
state envelope, external subject assertion, and approval-grant cookie —
all three share one key-set so api ↔ enterprise can verify each other.
"""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
import jwt
from configs import dify_config
AUD_STATE_ENVELOPE = "api.sso.state_envelope"
AUD_EXT_SUBJECT_ASSERTION = "api.device_flow.external_subject_assertion"
AUD_APPROVAL_GRANT = "api.device_flow.approval_grant"
ACTIVE_KID_V1 = "dify-shared-v1"
class KeySetError(Exception):
pass
class KeySet:
"""``from_entries`` reserves multi-kid construction for rotation slots."""
def __init__(self, entries: dict[str, bytes], active_kid: str) -> None:
if active_kid not in entries:
raise KeySetError(f"active kid {active_kid!r} missing from key-set")
if not entries[active_kid]:
raise KeySetError(f"active kid {active_kid!r} has empty secret")
self._entries: dict[str, bytes] = {k: bytes(v) for k, v in entries.items()}
self._active_kid = active_kid
@classmethod
def from_shared_secret(cls) -> KeySet:
secret = dify_config.SECRET_KEY
if not secret:
raise KeySetError("dify_config.SECRET_KEY is empty; cannot build key-set")
return cls({ACTIVE_KID_V1: secret.encode("utf-8")}, ACTIVE_KID_V1)
@classmethod
def from_entries(cls, entries: dict[str, bytes], active_kid: str) -> KeySet:
return cls(entries, active_kid)
@property
def active_kid(self) -> str:
return self._active_kid
def lookup(self, kid: str) -> bytes | None:
return self._entries.get(kid)
def sign(keyset: KeySet, payload: dict, aud: str, ttl_seconds: int) -> str:
"""``iat`` + ``exp`` are injected here; callers must not set them."""
if "aud" in payload or "iat" in payload or "exp" in payload:
raise ValueError("reserved claim present in payload (aud/iat/exp)")
if ttl_seconds <= 0:
raise ValueError("ttl_seconds must be positive")
kid = keyset.active_kid
secret = keyset.lookup(kid)
if secret is None:
raise KeySetError(f"active kid {kid!r} lookup miss")
iat = datetime.now(UTC)
exp = iat + timedelta(seconds=ttl_seconds)
claims = {**payload, "aud": aud, "iat": iat, "exp": exp}
return jwt.encode(
claims,
secret,
algorithm="HS256",
headers={"kid": kid, "typ": "JWT"},
)
class VerifyError(Exception):
pass
def verify(keyset: KeySet, token: str, expected_aud: str) -> dict:
"""Unknown kid is rejected — never fall back to the active kid, since
a past kid value would otherwise be forgeable by anyone who saw it.
"""
try:
header = jwt.get_unverified_header(token)
except jwt.PyJWTError as e:
raise VerifyError(f"decode header: {e}") from e
kid = header.get("kid")
if not kid:
raise VerifyError("no kid in header")
secret = keyset.lookup(kid)
if secret is None:
raise VerifyError(f"unknown kid {kid!r}")
try:
return jwt.decode(
token,
secret,
algorithms=["HS256"],
audience=expected_aud,
)
except jwt.ExpiredSignatureError as e:
raise VerifyError("token expired") from e
except jwt.InvalidAudienceError as e:
raise VerifyError("aud mismatch") from e
except jwt.PyJWTError as e:
raise VerifyError(f"decode: {e}") from e

608
api/libs/oauth_bearer.py Normal file
View File

@ -0,0 +1,608 @@
"""OAuth bearer primitives.
To add a token kind: write a Resolver, add a SubjectType + Accepts member,
append a TokenKind to build_registry, and update _SUBJECT_TO_ACCEPT.
Authenticator + validate_bearer stay untouched.
"""
from __future__ import annotations
import hashlib
import json
import logging
import uuid
from collections.abc import Callable, Iterable
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import StrEnum
from functools import wraps
from typing import Literal, ParamSpec, Protocol, TypeVar
from flask import g, request
from sqlalchemy import select, update
from sqlalchemy.orm import Session
from werkzeug.exceptions import Forbidden, ServiceUnavailable, Unauthorized
from configs import dify_config
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from libs.rate_limit import enforce_bearer_rate_limit
from models import Account, OAuthAccessToken, TenantAccountJoin
logger = logging.getLogger(__name__)
# ============================================================================
# Contract — types, enums, protocols
# ============================================================================
class SubjectType(StrEnum):
ACCOUNT = "account"
EXTERNAL_SSO = "external_sso"
class Scope(StrEnum):
"""Catalog of bearer scopes recognised by the openapi surface.
`FULL` is the catch-all carried by `dfoa_` account tokens — it satisfies
any per-route `require_scope`. `dfoe_` tokens carry the per-feature scopes
(`APPS_RUN`, `APPS_READ_PERMITTED`).
"""
FULL = "full"
APPS_READ = "apps:read"
APPS_READ_PERMITTED = "apps:read:permitted"
APPS_RUN = "apps:run"
class Accepts(StrEnum):
"""Subject types a route is willing to accept as caller."""
USER_ACCOUNT = "user_account"
USER_EXT_SSO = "user_ext_sso"
ACCEPT_USER_ANY: frozenset[Accepts] = frozenset({Accepts.USER_ACCOUNT, Accepts.USER_EXT_SSO})
ACCEPT_USER_EXT_SSO: frozenset[Accepts] = frozenset({Accepts.USER_EXT_SSO})
_SUBJECT_TO_ACCEPT: dict[SubjectType, Accepts] = {
SubjectType.ACCOUNT: Accepts.USER_ACCOUNT,
SubjectType.EXTERNAL_SSO: Accepts.USER_EXT_SSO,
}
@dataclass(frozen=True, slots=True)
class AuthContext:
"""Attached to ``g.auth_ctx``. ``scopes`` / ``subject_type`` / ``source``
come from the TokenKind, not the DB — corrupt rows can't elevate scope.
`verified_tenants` is a snapshot of the Layer-0 verdict cache at
authenticate time. Per-request mutations write through to Redis via
`record_layer0_verdict`; this snapshot is not updated in place (frozen).
"""
subject_type: SubjectType
subject_email: str | None
subject_issuer: str | None
account_id: uuid.UUID | None
scopes: frozenset[Scope]
token_id: uuid.UUID
source: str
expires_at: datetime | None
token_hash: str
verified_tenants: dict[str, bool] = field(default_factory=dict)
@dataclass(frozen=True, slots=True)
class ResolvedRow:
subject_email: str | None
subject_issuer: str | None
account_id: uuid.UUID | None
token_id: uuid.UUID
expires_at: datetime | None
verified_tenants: dict[str, bool] = field(default_factory=dict)
def to_cache(self) -> dict:
return {
"subject_email": self.subject_email,
"subject_issuer": self.subject_issuer,
"account_id": str(self.account_id) if self.account_id else None,
"token_id": str(self.token_id),
"expires_at": self.expires_at.isoformat() if self.expires_at else None,
"verified_tenants": dict(self.verified_tenants),
}
@classmethod
def from_cache(cls, data: dict) -> ResolvedRow:
return cls(
subject_email=data["subject_email"],
subject_issuer=data["subject_issuer"],
account_id=uuid.UUID(data["account_id"]) if data["account_id"] else None,
token_id=uuid.UUID(data["token_id"]),
expires_at=datetime.fromisoformat(data["expires_at"]) if data["expires_at"] else None,
verified_tenants=_coerce_verified_tenants(data.get("verified_tenants")),
)
def _coerce_verified_tenants(raw: object) -> dict[str, bool]:
"""Tolerate legacy entries that stored 'ok'/'denied' string verdicts.
TODO(post-v1.0): remove once the AuthContext cache TTL has fully cycled
on all live deployments (60s TTL → safe to drop one release after rollout).
"""
if not isinstance(raw, dict):
return {}
out: dict[str, bool] = {}
for k, v in raw.items():
if isinstance(v, bool):
out[k] = v
elif v == "ok":
out[k] = True
elif v == "denied":
out[k] = False
return out
class Resolver(Protocol):
def resolve(self, token_hash: str) -> ResolvedRow | None: # pragma: no cover - contract
...
@dataclass(frozen=True, slots=True)
class TokenKind:
prefix: str
subject_type: SubjectType
scopes: frozenset[Scope]
source: str
resolver: Resolver
def matches(self, token: str) -> bool:
return token.startswith(self.prefix)
class InvalidBearerError(Exception):
"""Token missing, unknown prefix, or no live row."""
class TokenExpiredError(Exception):
"""Hard-expire bookkeeping is the resolver's job before raising."""
# ============================================================================
# Registry
# ============================================================================
class TokenKindRegistry:
def __init__(self, kinds: Iterable[TokenKind]) -> None:
self._kinds: tuple[TokenKind, ...] = tuple(kinds)
prefixes = [k.prefix for k in self._kinds]
if len(set(prefixes)) != len(prefixes):
raise ValueError(f"duplicate prefix in registry: {prefixes}")
def find(self, token: str) -> TokenKind | None:
for k in self._kinds:
if k.matches(token):
return k
return None
def kinds(self) -> tuple[TokenKind, ...]:
return self._kinds
# ============================================================================
# Authenticator
# ============================================================================
def sha256_hex(token: str) -> str:
return hashlib.sha256(token.encode("utf-8")).hexdigest()
class BearerAuthenticator:
def __init__(self, registry: TokenKindRegistry) -> None:
self._registry = registry
@property
def registry(self) -> TokenKindRegistry:
return self._registry
def authenticate(self, token: str) -> AuthContext:
"""Identity + per-token rate limit (single source).
Both the openapi pipeline (`BearerCheck`) and the decorator
(`validate_bearer`) call this — rate-limit fires exactly once per
request regardless of which path hosts the route.
"""
kind = self._registry.find(token)
if kind is None:
raise InvalidBearerError("unknown token prefix")
token_hash = sha256_hex(token)
row = kind.resolver.resolve(token_hash)
if row is None:
raise InvalidBearerError("token unknown or revoked")
enforce_bearer_rate_limit(token_hash)
return AuthContext(
subject_type=kind.subject_type,
subject_email=row.subject_email,
subject_issuer=row.subject_issuer,
account_id=row.account_id,
scopes=kind.scopes,
token_id=row.token_id,
source=kind.source,
expires_at=row.expires_at,
token_hash=token_hash,
verified_tenants=dict(row.verified_tenants),
)
# ============================================================================
# OAuth access token resolver (PAT resolver would be a sibling class)
# ============================================================================
TOKEN_CACHE_KEY_FMT = "auth:token:{hash}"
POSITIVE_TTL_SECONDS = 60
NEGATIVE_TTL_SECONDS = 10
AUDIT_OAUTH_EXPIRED = "oauth.token_expired"
ScopeVariant = Literal["account", "external_sso"]
class OAuthAccessTokenResolver:
"""``.for_account()`` / ``.for_external_sso()`` are variant-scoped views
sharing DB + cache plumbing.
"""
def __init__(
self,
session_factory,
redis_client,
positive_ttl: int = POSITIVE_TTL_SECONDS,
negative_ttl: int = NEGATIVE_TTL_SECONDS,
) -> None:
self.session_factory = session_factory
self._redis = redis_client
self._positive_ttl = positive_ttl
self._negative_ttl = negative_ttl
def for_account(self) -> Resolver:
return _VariantResolver(self, variant="account")
def for_external_sso(self) -> Resolver:
return _VariantResolver(self, variant="external_sso")
def _cache_key(self, token_hash: str) -> str:
return TOKEN_CACHE_KEY_FMT.format(hash=token_hash)
def cache_get(self, token_hash: str) -> ResolvedRow | None | Literal["invalid"]:
raw = self._redis.get(self._cache_key(token_hash))
if raw is None:
return None
text = raw.decode() if isinstance(raw, (bytes, bytearray)) else raw
if text == "invalid":
return "invalid"
try:
return ResolvedRow.from_cache(json.loads(text))
except (ValueError, KeyError):
logger.warning("auth:token cache entry malformed; treating as miss")
return None
def cache_set_positive(self, token_hash: str, row: ResolvedRow) -> None:
self._redis.setex(
self._cache_key(token_hash),
self._positive_ttl,
json.dumps(row.to_cache()),
)
def cache_set_negative(self, token_hash: str) -> None:
self._redis.setex(self._cache_key(token_hash), self._negative_ttl, "invalid")
def hard_expire(self, session: Session, row_id: uuid.UUID | str, token_hash: str) -> None:
"""Atomic CAS — only the worker that flips revoked_at emits audit;
replays are idempotent.
"""
stmt = (
update(OAuthAccessToken)
.where(OAuthAccessToken.id == row_id, OAuthAccessToken.revoked_at.is_(None))
.values(revoked_at=datetime.now(UTC), token_hash=None)
)
result = session.execute(stmt)
session.commit()
if result.rowcount == 1:
logger.warning(
"audit: %s token_id=%s",
AUDIT_OAUTH_EXPIRED,
row_id,
extra={"audit": True, "token_id": str(row_id)},
)
self._redis.delete(self._cache_key(token_hash))
self.cache_set_negative(token_hash)
class _VariantResolver:
def __init__(self, parent: OAuthAccessTokenResolver, variant: ScopeVariant) -> None:
self._parent = parent
self._variant = variant
def resolve(self, token_hash: str) -> ResolvedRow | None:
cached = self._parent.cache_get(token_hash)
if cached == "invalid":
return None
if cached is not None and not isinstance(cached, str):
if not self._matches_variant(cached):
return None
return cached
# Flask-SQLAlchemy's scoped_session is request-bound and not a
# context manager; use it directly.
session = self._parent.session_factory()
row = self._load_from_db(session, token_hash)
if row is None:
self._parent.cache_set_negative(token_hash)
return None
now = datetime.now(UTC)
if row.expires_at is not None and row.expires_at <= now:
self._parent.hard_expire(session, row.id, token_hash)
return None
if not self._matches_variant_model(row):
logger.error(
"internal_state_invariant: account_id/prefix mismatch token_id=%s prefix=%s",
row.id,
row.prefix,
)
return None
resolved = ResolvedRow(
subject_email=row.subject_email,
subject_issuer=row.subject_issuer,
account_id=uuid.UUID(str(row.account_id)) if row.account_id else None,
token_id=uuid.UUID(str(row.id)),
expires_at=row.expires_at,
)
self._parent.cache_set_positive(token_hash, resolved)
return resolved
def _matches_variant(self, row: ResolvedRow) -> bool:
has_account = row.account_id is not None
if self._variant == "account":
return has_account
return not has_account
def _matches_variant_model(self, row: OAuthAccessToken) -> bool:
has_account = row.account_id is not None
if self._variant == "account":
return has_account and row.prefix == "dfoa_"
return (not has_account) and row.prefix == "dfoe_"
def _load_from_db(self, session: Session, token_hash: str) -> OAuthAccessToken | None:
return (
session.query(OAuthAccessToken)
.filter(
OAuthAccessToken.token_hash == token_hash,
OAuthAccessToken.revoked_at.is_(None),
)
.one_or_none()
)
# ============================================================================
# Layer 0 — workspace membership cache + helper
# ============================================================================
def record_layer0_verdict(token_hash: str, tenant_id: str, verdict: bool) -> None:
"""Merge a Layer-0 membership verdict into the AuthContext cache entry at
`auth:token:{hash}`. No-op if entry missing/expired/invalid — next request
rebuilds via authenticate() and re-runs Layer 0.
"""
cache_key = TOKEN_CACHE_KEY_FMT.format(hash=token_hash)
raw = redis_client.get(cache_key)
if raw is None:
return
text = raw.decode() if isinstance(raw, (bytes, bytearray)) else raw
if text == "invalid":
return
try:
data = json.loads(text)
except (ValueError, KeyError):
return
ttl = redis_client.ttl(cache_key)
if ttl <= 0:
return
data.setdefault("verified_tenants", {})[tenant_id] = verdict
redis_client.setex(cache_key, ttl, json.dumps(data))
def check_workspace_membership(
*,
account_id: uuid.UUID | str,
tenant_id: str,
token_hash: str,
cached_verdicts: dict[str, bool],
) -> None:
"""Layer-0 enforcement core. Raises `Forbidden` on deny, returns on allow.
Shared by the pipeline step (`WorkspaceMembershipCheck`) and the
inline helper (`require_workspace_member`). Caller is responsible for
short-circuiting on EE / SSO subjects before invoking — this function
runs the membership + active-status checks unconditionally.
"""
cached = cached_verdicts.get(tenant_id)
if cached is True:
return
if cached is False:
raise Forbidden("workspace_membership_revoked")
join = db.session.execute(
select(TenantAccountJoin.id).where(
TenantAccountJoin.account_id == account_id,
TenantAccountJoin.tenant_id == tenant_id,
)
).scalar_one_or_none()
if join is None:
record_layer0_verdict(token_hash, tenant_id, False)
raise Forbidden("workspace_membership_revoked")
status = db.session.execute(select(Account.status).where(Account.id == account_id)).scalar_one_or_none()
if status != "active":
record_layer0_verdict(token_hash, tenant_id, False)
raise Forbidden("workspace_membership_revoked")
record_layer0_verdict(token_hash, tenant_id, True)
def require_workspace_member(ctx: AuthContext, tenant_id: str) -> None:
"""AuthContext-flavoured wrapper around `check_workspace_membership`.
No-op on EE (gateway RBAC owns tenant isolation) and for SSO subjects
(no `tenant_account_joins` row by definition).
"""
if dify_config.ENTERPRISE_ENABLED:
return
if ctx.subject_type != SubjectType.ACCOUNT or ctx.account_id is None:
return
check_workspace_membership(
account_id=ctx.account_id,
tenant_id=tenant_id,
token_hash=ctx.token_hash,
cached_verdicts=ctx.verified_tenants,
)
# ============================================================================
# Decorator — route-level bearer gate
# ============================================================================
_authenticator: BearerAuthenticator | None = None
def bind_authenticator(authenticator: BearerAuthenticator) -> None:
global _authenticator
_authenticator = authenticator
def get_authenticator() -> BearerAuthenticator:
if _authenticator is None:
raise RuntimeError("BearerAuthenticator not bound; call bind_authenticator at startup")
return _authenticator
def _extract_bearer(req) -> str | None:
header = req.headers.get("Authorization", "")
scheme, _, value = header.partition(" ")
if scheme.lower() != "bearer" or not value:
return None
return value.strip()
_DP = ParamSpec("_DP")
_DR = TypeVar("_DR")
def validate_bearer(*, accept: frozenset[Accepts]) -> Callable[[Callable[_DP, _DR]], Callable[_DP, _DR]]:
"""Opt-in: omitting it leaves the route unauthenticated.
Resolves user-level OAuth bearers (``dfoa_`` / ``dfoe_``). Legacy
``app-`` keys belong to ``service_api/wraps.py:validate_app_token``
and are rejected here as the wrong auth scheme for this surface.
"""
def wrap(fn: Callable[_DP, _DR]) -> Callable[_DP, _DR]:
@wraps(fn)
def inner(*args: _DP.args, **kwargs: _DP.kwargs) -> _DR:
token = _extract_bearer(request)
if token is None:
raise Unauthorized("missing bearer token")
if _authenticator is None:
raise ServiceUnavailable("bearer_auth_disabled: set ENABLE_OAUTH_BEARER=true to enable")
try:
ctx = get_authenticator().authenticate(token)
except InvalidBearerError as e:
raise Unauthorized(str(e))
if _SUBJECT_TO_ACCEPT[ctx.subject_type] not in accept:
raise Forbidden("token subject type not accepted here")
g.auth_ctx = ctx
return fn(*args, **kwargs)
return inner
return wrap
def bearer_feature_required[**P, R](fn: Callable[P, R]) -> Callable[P, R]:
"""503 if ENABLE_OAUTH_BEARER is off — minted tokens would be unusable
without the authenticator, so fail fast instead of approving silently.
"""
@wraps(fn)
def inner(*args: P.args, **kwargs: P.kwargs) -> R:
if not dify_config.ENABLE_OAUTH_BEARER:
raise ServiceUnavailable("bearer_auth_disabled: set ENABLE_OAUTH_BEARER=true to enable")
return fn(*args, **kwargs)
return inner
def require_scope(scope: Scope) -> Callable:
"""Route-level scope gate — must run AFTER validate_bearer so that
g.auth_ctx is set. Raises Forbidden('insufficient_scope: <scope>')
when the bearer lacks both the requested scope and `Scope.FULL`.
"""
def wrap(fn: Callable) -> Callable:
@wraps(fn)
def inner(*args, **kwargs):
ctx = getattr(g, "auth_ctx", None)
if ctx is None:
raise RuntimeError(
"require_scope used without validate_bearer; stack @validate_bearer above @require_scope"
)
if Scope.FULL not in ctx.scopes and scope not in ctx.scopes:
raise Forbidden(f"insufficient_scope: {scope}")
return fn(*args, **kwargs)
return inner
return wrap
# ============================================================================
# Wiring — called once from the app factory
# ============================================================================
def build_registry(session_factory, redis_client) -> TokenKindRegistry:
oauth = OAuthAccessTokenResolver(session_factory, redis_client)
return TokenKindRegistry(
[
TokenKind(
prefix="dfoa_",
subject_type=SubjectType.ACCOUNT,
scopes=frozenset({Scope.FULL}),
source="oauth_account",
resolver=oauth.for_account(),
),
TokenKind(
prefix="dfoe_",
subject_type=SubjectType.EXTERNAL_SSO,
scopes=frozenset({Scope.APPS_RUN, Scope.APPS_READ_PERMITTED}),
source="oauth_external_sso",
resolver=oauth.for_external_sso(),
),
]
)
def build_and_bind(session_factory, redis_client) -> BearerAuthenticator:
registry = build_registry(session_factory, redis_client)
auth = BearerAuthenticator(registry)
bind_authenticator(auth)
return auth

140
api/libs/rate_limit.py Normal file
View File

@ -0,0 +1,140 @@
"""Typed rate-limit decorator over ``libs.helper.RateLimiter`` (sliding-
window Redis ZSET). Apply after auth decorators so scopes can read
``g.auth_ctx``. Use :func:`enforce` when the bucket key is computed
in-handler. RFC-8628 ``slow_down`` is inline — its response shape isn't
generic 429.
"""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from datetime import timedelta
from enum import StrEnum
from functools import wraps
from typing import ParamSpec, TypeVar
from flask import g, jsonify, make_response, request, session
from werkzeug.exceptions import TooManyRequests
from configs import dify_config
from libs.helper import RateLimiter, extract_remote_ip
class RateLimitScope(StrEnum):
IP = "ip"
SESSION = "session"
ACCOUNT = "account"
SUBJECT_EMAIL = "subject_email"
TOKEN_ID = "token_id"
@dataclass(frozen=True, slots=True)
class RateLimit:
limit: int
window: timedelta
scopes: tuple[RateLimitScope, ...]
LIMIT_DEVICE_CODE_PER_IP = RateLimit(60, timedelta(hours=1), (RateLimitScope.IP,))
LIMIT_SSO_INITIATE_PER_IP = RateLimit(60, timedelta(hours=1), (RateLimitScope.IP,))
LIMIT_APPROVE_EXT_PER_EMAIL = RateLimit(10, timedelta(hours=1), (RateLimitScope.SUBJECT_EMAIL,))
LIMIT_APPROVE_CONSOLE = RateLimit(10, timedelta(hours=1), (RateLimitScope.SESSION,))
LIMIT_LOOKUP_PUBLIC = RateLimit(60, timedelta(minutes=5), (RateLimitScope.IP,))
LIMIT_ME_PER_ACCOUNT = RateLimit(60, timedelta(minutes=1), (RateLimitScope.ACCOUNT,))
LIMIT_ME_PER_EMAIL = RateLimit(60, timedelta(minutes=1), (RateLimitScope.SUBJECT_EMAIL,))
LIMIT_BEARER_PER_TOKEN = RateLimit(
limit=dify_config.OPENAPI_RATE_LIMIT_PER_TOKEN,
window=timedelta(minutes=1),
scopes=(RateLimitScope.TOKEN_ID,), # bucket key composed by caller from sha256(token)
)
def _one_key(scope: RateLimitScope) -> str:
match scope:
case RateLimitScope.IP:
return f"ip:{extract_remote_ip(request) or 'unknown'}"
case RateLimitScope.SESSION:
return f"session:{session.get('_id', 'anon')}"
case RateLimitScope.ACCOUNT:
ctx = getattr(g, "auth_ctx", None)
if ctx and ctx.account_id:
return f"account:{ctx.account_id}"
return "account:anon"
case RateLimitScope.SUBJECT_EMAIL:
ctx = getattr(g, "auth_ctx", None)
if ctx and ctx.subject_email:
return f"subject:{ctx.subject_email}"
return "subject:anon"
case RateLimitScope.TOKEN_ID:
ctx = getattr(g, "auth_ctx", None)
if ctx and ctx.token_id:
return f"token:{ctx.token_id}"
return "token:anon"
def _composite_key(scopes: tuple[RateLimitScope, ...]) -> str:
return "|".join(_one_key(s) for s in scopes)
def _limiter_prefix(scopes: tuple[RateLimitScope, ...]) -> str:
return "rl:" + "+".join(s.value for s in scopes)
def _build_limiter(spec: RateLimit) -> RateLimiter:
return RateLimiter(
prefix=_limiter_prefix(spec.scopes),
max_attempts=spec.limit,
time_window=int(spec.window.total_seconds()),
)
_P = ParamSpec("_P")
_R = TypeVar("_R")
def rate_limit(spec: RateLimit) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]:
"""Apply after auth decorators that the scopes read from."""
limiter = _build_limiter(spec)
def wrap(fn: Callable[_P, _R]) -> Callable[_P, _R]:
@wraps(fn)
def inner(*args: _P.args, **kwargs: _P.kwargs) -> _R:
key = _composite_key(spec.scopes)
if limiter.is_rate_limited(key):
raise TooManyRequests("rate_limited")
limiter.increment_rate_limit(key)
return fn(*args, **kwargs)
return inner
return wrap
def enforce(spec: RateLimit, *, key: str) -> None:
"""Imperative form — caller composes the bucket key to match scope
semantics (the key is opaque here).
"""
limiter = _build_limiter(spec)
if limiter.is_rate_limited(key):
raise TooManyRequests("rate_limited")
limiter.increment_rate_limit(key)
def enforce_bearer_rate_limit(token_hash: str) -> None:
"""Per-token rate limit on /openapi/v1/* bearer-authed routes.
Bucket key = ``token:<sha256_hex>`` so the same token shares one
bucket across api replicas (Redis-backed sliding window).
"""
limiter = _build_limiter(LIMIT_BEARER_PER_TOKEN)
key = f"token:{token_hash}"
if limiter.is_rate_limited(key):
retry_after = limiter.seconds_until_available(key)
response = make_response(
jsonify({"error": "rate_limited", "retry_after_ms": retry_after * 1000}),
429,
)
response.headers["Retry-After"] = str(retry_after)
raise TooManyRequests(response=response)
limiter.increment_rate_limit(key)

View File

@ -72,11 +72,15 @@ def extract_csrf_token_from_cookie(request: Request) -> str | None:
return request.cookies.get(_real_cookie_name(COOKIE_NAME_CSRF_TOKEN))
def extract_access_token(request: Request) -> str | None:
def _try_extract_from_cookie(request: Request) -> str | None:
return request.cookies.get(_real_cookie_name(COOKIE_NAME_ACCESS_TOKEN))
def extract_console_cookie_token(request: Request) -> str | None:
"""Cookie-only console session token. Used by /openapi/v1/oauth/device/*
approval routes, which must not fall through to the Authorization header
(that's where dfoa_/dfoe_ bearers live — they aren't JWTs)."""
return request.cookies.get(_real_cookie_name(COOKIE_NAME_ACCESS_TOKEN))
return _try_extract_from_cookie(request) or _try_extract_from_header(request)
def extract_access_token(request: Request) -> str | None:
return extract_console_cookie_token(request) or _try_extract_from_header(request)
def extract_webapp_access_token(request: Request) -> str | None:

View File

@ -0,0 +1,104 @@
"""add oauth_access_tokens table
Revision ID: d4a5e1f3c9b7
Revises: 227822d22895, b69ca54b9208, 2a3aebbbf4bb
Create Date: 2026-04-23 22:00:00.000000
Merges the three open heads at time of authoring (add_workflow_comments_table,
add_chatbot_color_theme, add_app_tracing) into a single parent so the new
oauth_access_tokens table sits on a definite linear chain thereafter.
Table stores user-level OAuth bearer tokens minted via the device-flow grant
(difyctl auth login). PAT storage (personal_access_tokens) is a separate
table not added in this migration.
"""
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "d4a5e1f3c9b7"
down_revision = ("227822d22895", "b69ca54b9208", "2a3aebbbf4bb")
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
"oauth_access_tokens",
sa.Column(
"id",
postgresql.UUID(as_uuid=True),
server_default=sa.text("gen_random_uuid()"),
nullable=False,
primary_key=True,
),
sa.Column("subject_email", sa.Text(), nullable=False),
sa.Column("subject_issuer", sa.Text(), nullable=True),
sa.Column("account_id", postgresql.UUID(as_uuid=True), nullable=True),
sa.Column("client_id", sa.String(length=64), nullable=False),
sa.Column("device_label", sa.Text(), nullable=False),
sa.Column("prefix", sa.String(length=8), nullable=False),
sa.Column("token_hash", sa.String(length=64), nullable=True, unique=True),
sa.Column(
"created_at",
sa.TIMESTAMP(timezone=True),
server_default=sa.text("NOW()"),
nullable=False,
),
sa.Column("last_used_at", sa.TIMESTAMP(timezone=True), nullable=True),
sa.Column("expires_at", sa.TIMESTAMP(timezone=True), nullable=False),
sa.Column("revoked_at", sa.TIMESTAMP(timezone=True), nullable=True),
sa.ForeignKeyConstraint(
["account_id"],
["accounts.id"],
name="fk_oauth_access_tokens_account_id",
ondelete="SET NULL",
),
)
op.create_index(
"idx_oauth_subject_email",
"oauth_access_tokens",
["subject_email"],
postgresql_where=sa.text("revoked_at IS NULL"),
)
op.create_index(
"idx_oauth_account",
"oauth_access_tokens",
["account_id"],
postgresql_where=sa.text("revoked_at IS NULL AND account_id IS NOT NULL"),
)
op.create_index(
"idx_oauth_client",
"oauth_access_tokens",
["subject_email", "client_id"],
postgresql_where=sa.text("revoked_at IS NULL"),
)
op.create_index(
"idx_oauth_token_hash",
"oauth_access_tokens",
["token_hash"],
postgresql_where=sa.text("revoked_at IS NULL"),
)
# Partial unique index — rotate-in-place keyed on (subject, client, device).
# The app always writes a non-NULL subject_issuer (account flow uses a
# sentinel, external-SSO uses the verified IdP issuer); without that the
# composite key would never collide because Postgres treats NULLs as
# distinct in unique indices.
op.create_index(
"uq_oauth_active_per_device",
"oauth_access_tokens",
["subject_email", "subject_issuer", "client_id", "device_label"],
unique=True,
postgresql_where=sa.text("revoked_at IS NULL"),
)
def downgrade():
op.drop_index("uq_oauth_active_per_device", table_name="oauth_access_tokens")
op.drop_index("idx_oauth_token_hash", table_name="oauth_access_tokens")
op.drop_index("idx_oauth_client", table_name="oauth_access_tokens")
op.drop_index("idx_oauth_account", table_name="oauth_access_tokens")
op.drop_index("idx_oauth_subject_email", table_name="oauth_access_tokens")
op.drop_table("oauth_access_tokens")

View File

@ -73,7 +73,7 @@ from .model import (
TrialApp,
UploadFile,
)
from .oauth import DatasourceOauthParamConfig, DatasourceProvider
from .oauth import DatasourceOauthParamConfig, DatasourceProvider, OAuthAccessToken
from .provider import (
LoadBalancingModelConfig,
Provider,
@ -177,6 +177,7 @@ __all__ = [
"MessageChain",
"MessageFeedback",
"MessageFile",
"OAuthAccessToken",
"OperationLog",
"PinnedConversation",
"Provider",

View File

@ -84,3 +84,35 @@ class DatasourceOauthTenantParamConfig(TypeBase):
onupdate=func.current_timestamp(),
init=False,
)
class OAuthAccessToken(TypeBase):
"""Device-flow bearer. account_id NOT NULL ⇒ dfoa_ (Dify account,
subject_issuer = "dify:account" sentinel); account_id NULL +
subject_issuer = verified IdP issuer ⇒ dfoe_ (external SSO, EE-only).
subject_issuer is non-NULL for all rows the app writes — Postgres
treats NULLs as distinct in unique indices, so the partial unique
index on (subject_email, subject_issuer, client_id, device_label)
WHERE revoked_at IS NULL would otherwise fail to rotate in place.
"""
__tablename__ = "oauth_access_tokens"
__table_args__ = (sa.PrimaryKeyConstraint("id", name="oauth_access_tokens_pkey"),)
id: Mapped[str] = mapped_column(
StringUUID, insert_default=lambda: str(uuidv7()), default_factory=lambda: str(uuidv7()), init=False
)
subject_email: Mapped[str] = mapped_column(sa.Text, nullable=False)
client_id: Mapped[str] = mapped_column(sa.String(64), nullable=False)
device_label: Mapped[str] = mapped_column(sa.Text, nullable=False)
prefix: Mapped[str] = mapped_column(sa.String(8), nullable=False)
expires_at: Mapped[datetime] = mapped_column(sa.DateTime(timezone=True), nullable=False)
subject_issuer: Mapped[str | None] = mapped_column(sa.Text, nullable=True, default=None)
account_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True, default=None)
token_hash: Mapped[str | None] = mapped_column(sa.String(64), nullable=True, default=None)
last_used_at: Mapped[datetime | None] = mapped_column(sa.DateTime(timezone=True), nullable=True, default=None)
revoked_at: Mapped[datetime | None] = mapped_column(sa.DateTime(timezone=True), nullable=True, default=None)
created_at: Mapped[datetime] = mapped_column(
sa.DateTime(timezone=True), nullable=False, server_default=func.now(), init=False
)

View File

@ -1206,6 +1206,7 @@ class WorkflowAppLogCreatedFrom(StrEnum):
SERVICE_API = "service-api"
WEB_APP = "web-app"
INSTALLED_APP = "installed-app"
OPENAPI = "openapi"
@classmethod
def value_of(cls, value: str) -> "WorkflowAppLogCreatedFrom":

View File

@ -1,6 +1,6 @@
import json
import uuid
from collections.abc import Generator # Added Generator
from collections.abc import Iterator
from contextlib import contextmanager
from typing import Any
@ -75,7 +75,7 @@ class AnalyticdbVectorBySql:
)
@contextmanager
def _get_cursor(self) -> Generator[Any, None, None]: # Changed from Iterator[Any]
def _get_cursor(self) -> Iterator[Any]:
assert self.pool is not None, "Connection pool is not initialized"
conn = self.pool.getconn()
cur = conn.cursor()

View File

@ -6,7 +6,7 @@ requires-python = "~=3.12.0"
dependencies = [
# Legacy: mature and widely deployed
"bleach>=6.3.0",
"boto3>=1.42.91",
"boto3>=1.42.88",
"celery>=5.6.3",
"croniter>=6.2.2",
"flask-cors>=6.0.2",
@ -30,7 +30,7 @@ dependencies = [
"flask-migrate>=4.1.0,<5.0.0",
"flask-orjson>=2.0.0,<3.0.0",
"flask-restx>=1.3.2,<2.0.0",
"google-cloud-aiplatform>=1.148.1,<2.0.0",
"google-cloud-aiplatform>=1.147.0,<2.0.0",
"httpx[socks]>=0.28.1,<1.0.0",
"opentelemetry-distro>=0.62b0,<1.0.0",
"opentelemetry-instrumentation-celery>=0.62b0,<1.0.0",
@ -46,7 +46,7 @@ dependencies = [
"fastopenapi[flask]~=0.7.0",
"graphon~=0.2.2",
"httpx-sse~=0.4.0",
"json-repair~=0.59.4",
"json-repair~=0.59.2",
]
# Before adding new dependency, consider place it in
# alphabet order (a-z) and suitable group.
@ -114,10 +114,10 @@ override-dependencies = [
dev = [
"coverage>=7.13.4",
"dotenv-linter>=0.7.0",
"faker>=40.15.0",
"faker>=20.1.0",
"lxml-stubs>=0.5.1",
"basedpyright>=1.39.3",
"ruff>=0.15.11",
"basedpyright>=1.39.0",
"ruff>=0.15.10",
"pytest>=9.0.3",
"pytest-benchmark>=5.2.3",
"pytest-cov>=7.1.0",
@ -157,14 +157,14 @@ dev = [
"types-tensorflow>=2.18.0.20260408",
"types-tqdm>=4.67.3.20260408",
"types-ujson>=5.10.0",
"boto3-stubs>=1.42.92",
"boto3-stubs>=1.42.88",
"types-jmespath>=1.1.0.20260408",
"hypothesis>=6.152.1",
"hypothesis>=6.151.12",
"types_pyOpenSSL>=24.1.0",
"types_cffi>=2.0.0.20260408",
"types_setuptools>=82.0.0.20260408",
"pandas-stubs>=3.0.0",
"scipy-stubs>=1.17.1.4",
"scipy-stubs>=1.15.3.0",
"types-python-http-client>=3.3.7.20260408",
"import-linter>=2.3",
"types-redis>=4.6.0.20241004",
@ -174,7 +174,7 @@ dev = [
"pytest-timeout>=2.4.0",
"pytest-xdist>=3.8.0",
"pyrefly>=0.61.1",
"xinference-client>=2.5.0",
"xinference-client>=2.4.0",
]
############################################################
@ -183,13 +183,13 @@ dev = [
############################################################
storage = [
"azure-storage-blob>=12.28.0",
"bce-python-sdk>=0.9.70",
"bce-python-sdk>=0.9.69",
"cos-python-sdk-v5>=1.9.41",
"esdk-obs-python>=3.22.2",
"google-cloud-storage>=3.10.1",
"opendal>=0.46.0",
"oss2>=2.19.1",
"supabase>=2.28.3",
"supabase>=2.18.1",
"tos>=2.9.0",
]
@ -266,7 +266,7 @@ vdb-vastbase = ["dify-vdb-vastbase"]
vdb-vikingdb = ["dify-vdb-vikingdb"]
vdb-weaviate = ["dify-vdb-weaviate"]
# Optional client used by some tests / integrations (not a vector backend plugin)
vdb-xinference = ["xinference-client>=2.5.0"]
vdb-xinference = ["xinference-client>=2.4.0"]
trace-all = [
"dify-trace-aliyun",

View File

@ -0,0 +1,54 @@
"""DELETE oauth_access_tokens past retention. Revocation is UPDATE
(token_id stays for audits) so rows accumulate across re-logins, and
expired-but-never-presented rows have no hard-expire trigger — both get
pruned here. Spec: docs/specs/v1.0/server/tokens.md §Hard-expire.
"""
from __future__ import annotations
import logging
import time
from datetime import UTC, datetime, timedelta
import click
from sqlalchemy import delete, or_, select
import app
from configs import dify_config
from extensions.ext_database import db
from models.oauth import OAuthAccessToken
logger = logging.getLogger(__name__)
DELETE_BATCH_SIZE = 500
@app.celery.task(queue="retention")
def clean_oauth_access_tokens_task():
click.echo(click.style("Start clean oauth_access_tokens.", fg="green"))
retention_days = int(dify_config.OAUTH_ACCESS_TOKEN_RETENTION_DAYS)
cutoff = datetime.now(UTC) - timedelta(days=retention_days)
start_at = time.perf_counter()
candidates = or_(
OAuthAccessToken.revoked_at < cutoff,
# Zombies: expired but never re-presented, so middleware never flipped them.
(OAuthAccessToken.revoked_at.is_(None)) & (OAuthAccessToken.expires_at < cutoff),
)
total = 0
while True:
ids = db.session.scalars(select(OAuthAccessToken.id).where(candidates).limit(DELETE_BATCH_SIZE)).all()
if not ids:
break
db.session.execute(delete(OAuthAccessToken).where(OAuthAccessToken.id.in_(ids)))
db.session.commit()
total += len(ids)
end_at = time.perf_counter()
click.echo(
click.style(
f"Cleaned {total} oauth_access_tokens rows older than {retention_days}d in {end_at - start_at:.2f}s",
fg="green",
)
)

View File

@ -112,14 +112,6 @@ REFRESH_TOKEN_EXPIRY = timedelta(days=dify_config.REFRESH_TOKEN_EXPIRE_DAYS)
class AccountService:
# Phase-bound token metadata for the change-email flow. Tokens carry the
# current phase so that downstream endpoints can enforce proper progression
CHANGE_EMAIL_TOKEN_PHASE_KEY = "email_change_phase"
CHANGE_EMAIL_PHASE_OLD = "old_email"
CHANGE_EMAIL_PHASE_OLD_VERIFIED = "old_email_verified"
CHANGE_EMAIL_PHASE_NEW = "new_email"
CHANGE_EMAIL_PHASE_NEW_VERIFIED = "new_email_verified"
reset_password_rate_limiter = RateLimiter(prefix="reset_password_rate_limit", max_attempts=1, time_window=60 * 1)
email_register_rate_limiter = RateLimiter(prefix="email_register_rate_limit", max_attempts=1, time_window=60 * 1)
email_code_login_rate_limiter = RateLimiter(
@ -584,20 +576,13 @@ class AccountService:
raise ValueError("Email must be provided.")
if not phase:
raise ValueError("phase must be provided.")
if phase not in (cls.CHANGE_EMAIL_PHASE_OLD, cls.CHANGE_EMAIL_PHASE_NEW):
raise ValueError("phase must be one of old_email or new_email.")
if cls.change_email_rate_limiter.is_rate_limited(account_email):
from controllers.console.auth.error import EmailChangeRateLimitExceededError
raise EmailChangeRateLimitExceededError(int(cls.change_email_rate_limiter.time_window / 60))
code, token = cls.generate_change_email_token(
account_email,
account,
old_email=old_email,
additional_data={cls.CHANGE_EMAIL_TOKEN_PHASE_KEY: phase},
)
code, token = cls.generate_change_email_token(account_email, account, old_email=old_email)
send_change_mail_task.delay(
language=language,

View File

@ -37,7 +37,7 @@ class AppService:
Get app list with pagination
:param user_id: user id
:param tenant_id: tenant id
:param args: request args
:param args: request args. Optional keys: status (e.g. "normal") restricts App.status.
:return:
"""
filters = [App.tenant_id == tenant_id, App.is_universal == False]
@ -53,6 +53,8 @@ class AppService:
elif args["mode"] == "agent-chat":
filters.append(App.mode == AppMode.AGENT_CHAT)
if args.get("status"):
filters.append(App.status == args["status"])
if args.get("is_created_by_me", False):
filters.append(App.created_by == user_id)
if args.get("name"):

View File

@ -37,13 +37,10 @@ class AppTaskService:
Returns:
None
"""
# Legacy mechanism: Set stop flag in Redis
AppQueueManager.set_stop_flag(task_id, invoke_from, user_id)
# New mechanism: Send stop command via GraphEngine for workflow-based apps
# This ensures proper workflow status recording in the persistence layer
if app_mode in (AppMode.ADVANCED_CHAT, AppMode.WORKFLOW):
# Let the event handler process the Graphon abort event instead of
# stopping the queue listener immediately. Otherwise, events may be
# lost and the workflow run can remain stuck in the running state.
GraphEngineManager(redis_client).send_stop_command(task_id)
else:
# Legacy mechanism: Set stop flag in Redis
AppQueueManager.set_stop_flag(task_id, invoke_from, user_id)

View File

@ -0,0 +1,44 @@
from __future__ import annotations
import logging
from dataclasses import dataclass
from werkzeug.exceptions import ServiceUnavailable
from services.enterprise.enterprise_service import EnterpriseService
from services.errors.enterprise import EnterpriseAPIError
logger = logging.getLogger(__name__)
@dataclass(frozen=True, slots=True)
class PermittedAppsPage:
app_ids: list[str]
total: int
has_more: bool
def list_permitted_apps(
*,
page: int,
limit: int,
mode: str | None = None,
name: str | None = None,
) -> PermittedAppsPage:
try:
body = EnterpriseService.WebAppAuth.list_externally_accessible_apps(
page=page, limit=limit, mode=mode, name=name
)
except EnterpriseAPIError as exc:
logger.warning(
"permitted_apps EE call failed: status=%s message=%s",
getattr(exc, "status_code", None),
str(exc),
)
raise ServiceUnavailable("permitted_apps_unavailable") from exc
return PermittedAppsPage(
app_ids=[row["appId"] for row in body.get("data", [])],
total=int(body.get("total", 0)),
has_more=bool(body.get("hasMore", False)),
)

View File

@ -106,6 +106,15 @@ class EnterpriseService:
def get_workspace_info(cls, tenant_id: str):
return EnterpriseRequest.send_request("GET", f"/workspace/{tenant_id}/info")
@classmethod
def initiate_device_flow_sso(cls, signed_state: str) -> dict:
return EnterpriseRequest.send_request(
"POST",
"/device-flow/sso-initiate",
json={"signed_state": signed_state},
raise_for_status=True,
)
@classmethod
def join_default_workspace(cls, *, account_id: str) -> DefaultWorkspaceJoinResult:
"""
@ -234,6 +243,32 @@ class EnterpriseService:
params = {"appId": app_id}
EnterpriseRequest.send_request("DELETE", "/webapp/clean", params=params)
@classmethod
def list_externally_accessible_apps(
cls,
*,
page: int,
limit: int,
mode: str | None = None,
name: str | None = None,
) -> dict:
"""Call EE InnerListExternallyAccessibleApps; returns raw camelCase response.
Response shape: ``{"data": [{"appId", "tenantId", "mode", "name", "updatedAt"}],
"total": int, "hasMore": bool}``.
"""
body: dict[str, str | int] = {"page": page, "limit": limit}
if mode is not None:
body["mode"] = mode
if name is not None:
body["name"] = name
return EnterpriseRequest.send_request(
"POST",
"/webapp/externally-accessible-apps",
json=body,
timeout=5.0,
)
@classmethod
def get_cached_license_status(cls) -> LicenseStatus | None:
"""Get enterprise license status with Redis caching to reduce HTTP calls.

View File

@ -2,7 +2,7 @@ import base64
import hashlib
import os
import uuid
from collections.abc import Generator, Sequence # Changed Iterator to Generator
from collections.abc import Iterator, Sequence
from contextlib import contextmanager, suppress
from tempfile import NamedTemporaryFile
from typing import Literal
@ -324,7 +324,7 @@ class FileService:
def build_upload_files_zip_tempfile(
*,
upload_files: Sequence[UploadFile],
) -> Generator[str, None, None]: # Changed from Iterator[str]
) -> Iterator[str]:
"""
Build a ZIP from `UploadFile`s and yield a tempfile path.

View File

@ -1,10 +1,10 @@
import json
import logging
import time
from typing import Any, TypedDict, cast
from typing import Any, TypedDict
from core.app.app_config.entities import ModelConfig
from core.rag.datasource.retrieval_service import DefaultRetrievalModelDict, RetrievalService
from core.rag.datasource.retrieval_service import RetrievalService
from core.rag.index_processor.constant.query_type import QueryType
from core.rag.models.document import Document
from core.rag.retrieval.dataset_retrieval import DatasetRetrieval
@ -36,10 +36,6 @@ default_retrieval_model = {
}
class HitTestingRetrievalModelDict(DefaultRetrievalModelDict, total=False):
metadata_filtering_conditions: dict[str, Any]
class HitTestingService:
@classmethod
def retrieve(
@ -55,18 +51,17 @@ class HitTestingService:
start = time.perf_counter()
# get retrieval model , if the model is not setting , using default
resolved_retrieval_model = cast(
HitTestingRetrievalModelDict,
retrieval_model or dataset.retrieval_model or default_retrieval_model,
)
if not retrieval_model:
retrieval_model = dataset.retrieval_model or default_retrieval_model
assert isinstance(retrieval_model, dict)
document_ids_filter = None
metadata_filtering_conditions_raw = resolved_retrieval_model.get("metadata_filtering_conditions", {})
if metadata_filtering_conditions_raw and query:
metadata_filtering_conditions = retrieval_model.get("metadata_filtering_conditions", {})
if metadata_filtering_conditions and query:
dataset_retrieval = DatasetRetrieval()
from core.rag.entities import MetadataFilteringCondition
metadata_filtering_conditions = MetadataFilteringCondition.model_validate(metadata_filtering_conditions_raw)
metadata_filtering_conditions = MetadataFilteringCondition.model_validate(metadata_filtering_conditions)
metadata_filter_document_ids, metadata_condition = dataset_retrieval.get_metadata_filter_condition(
dataset_ids=[dataset.id],
@ -83,21 +78,19 @@ class HitTestingService:
if metadata_condition and not document_ids_filter:
return cls.compact_retrieve_response(query, [])
all_documents = RetrievalService.retrieve(
retrieval_method=RetrievalMethod(
resolved_retrieval_model.get("search_method", RetrievalMethod.SEMANTIC_SEARCH)
),
retrieval_method=RetrievalMethod(retrieval_model.get("search_method", RetrievalMethod.SEMANTIC_SEARCH)),
dataset_id=dataset.id,
query=query,
attachment_ids=attachment_ids,
top_k=resolved_retrieval_model.get("top_k", 4),
score_threshold=resolved_retrieval_model.get("score_threshold", 0.0)
if resolved_retrieval_model["score_threshold_enabled"]
top_k=retrieval_model.get("top_k", 4),
score_threshold=retrieval_model.get("score_threshold", 0.0)
if retrieval_model["score_threshold_enabled"]
else 0.0,
reranking_model=resolved_retrieval_model.get("reranking_model", None)
if resolved_retrieval_model["reranking_enable"]
reranking_model=retrieval_model.get("reranking_model", None)
if retrieval_model["reranking_enable"]
else None,
reranking_mode=resolved_retrieval_model.get("reranking_mode") or "reranking_model",
weights=resolved_retrieval_model.get("weights", None),
reranking_mode=retrieval_model.get("reranking_mode") or "reranking_model",
weights=retrieval_model.get("weights", None),
document_ids_filter=document_ids_filter,
)

View File

@ -0,0 +1,467 @@
"""Device-flow service layer: Redis state machine, OAuth token mint
(DB upsert + plaintext generation), and TTL policy. Specs:
docs/specs/v1.0/server/{device-flow.md, tokens.md}.
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import secrets
import time
import uuid
from dataclasses import asdict, dataclass, field
from datetime import UTC, datetime, timedelta
from enum import StrEnum
from sqlalchemy import func, select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.orm import Session, scoped_session
from libs.oauth_bearer import TOKEN_CACHE_KEY_FMT
from models.oauth import OAuthAccessToken
logger = logging.getLogger(__name__)
# ============================================================================
# Redis state machine — device_code + user_code ephemeral state
# ============================================================================
_DEVICE_CODE_KEY_PREFIX = "device_code:"
_USER_CODE_KEY_PREFIX = "user_code:"
DEVICE_CODE_KEY_FMT = _DEVICE_CODE_KEY_PREFIX + "{code}"
USER_CODE_KEY_FMT = _USER_CODE_KEY_PREFIX + "{code}"
# Atomic GET → status-check → DEL(both keys). Two concurrent pollers must
# not both observe APPROVED — only the winner gets the plaintext token,
# the loser sees nil and the caller maps that to expired_token.
_CONSUME_ON_POLL_LUA = """
local raw = redis.call('GET', KEYS[1])
if not raw then return nil end
local ok, decoded = pcall(cjson.decode, raw)
if not ok then return nil end
if decoded.status == 'pending' then return nil end
if decoded.user_code then
redis.call('DEL', ARGV[1] .. decoded.user_code)
end
redis.call('DEL', KEYS[1])
return raw
"""
DEVICE_FLOW_TTL_SECONDS = 15 * 60 # RFC 8628 expires_in
APPROVED_TTL_SECONDS_MIN = 60 # plaintext-token lifetime floor
USER_CODE_ALPHABET = "ABCDEFGHJKLMNPQRSTUVWXY3456789" # ambiguous chars dropped
USER_CODE_SEGMENT_LEN = 4
USER_CODE_MAX_CLAIM_ATTEMPTS = 5
DEFAULT_POLL_INTERVAL_SECONDS = 5 # RFC 8628 minimum
class DeviceFlowStatus(StrEnum):
PENDING = "pending"
APPROVED = "approved"
DENIED = "denied"
class SlowDownDecision(StrEnum):
OK = "ok"
SLOW_DOWN = "slow_down"
@dataclass
class DeviceFlowState:
"""``minted_token`` is plaintext between approve and the next poll;
DEL'd after the poll reads it.
"""
user_code: str
client_id: str
device_label: str
status: DeviceFlowStatus
subject_email: str | None = None
account_id: str | None = None
subject_issuer: str | None = None
minted_token: str | None = None
token_id: str | None = None
created_at: str = ""
created_ip: str = ""
last_poll_at: str = ""
poll_payload: dict | None = field(default=None)
def to_json(self) -> str:
return json.dumps(asdict(self))
@classmethod
def from_json(cls, raw: str) -> DeviceFlowState:
data = json.loads(raw)
if "status" in data:
data["status"] = DeviceFlowStatus(data["status"])
return cls(**data)
def _random_device_code() -> str:
return "dc_" + secrets.token_urlsafe(24)
def _random_user_code_segment() -> str:
return "".join(secrets.choice(USER_CODE_ALPHABET) for _ in range(USER_CODE_SEGMENT_LEN))
def _random_user_code() -> str:
return f"{_random_user_code_segment()}-{_random_user_code_segment()}"
class StateNotFoundError(Exception):
pass
class InvalidTransitionError(Exception):
pass
class UserCodeExhaustedError(Exception):
pass
class DeviceFlowRedis:
def __init__(self, redis_client) -> None:
self._redis = redis_client
self._consume_on_poll_script = redis_client.register_script(_CONSUME_ON_POLL_LUA)
def start(self, client_id: str, device_label: str, created_ip: str) -> tuple[str, str, int]:
device_code = _random_device_code()
user_code = self._claim_user_code(device_code)
state = DeviceFlowState(
user_code=user_code,
client_id=client_id,
device_label=device_label,
status=DeviceFlowStatus.PENDING,
created_at=datetime.now(UTC).isoformat(),
created_ip=created_ip,
)
self._redis.setex(
DEVICE_CODE_KEY_FMT.format(code=device_code),
DEVICE_FLOW_TTL_SECONDS,
state.to_json(),
)
return device_code, user_code, DEVICE_FLOW_TTL_SECONDS
def _claim_user_code(self, device_code: str) -> str:
for _ in range(USER_CODE_MAX_CLAIM_ATTEMPTS):
user_code = _random_user_code()
key = USER_CODE_KEY_FMT.format(code=user_code)
ok = self._redis.set(key, device_code, nx=True, ex=DEVICE_FLOW_TTL_SECONDS)
if ok:
return user_code
raise UserCodeExhaustedError("could not allocate a unique user_code in 5 attempts")
def load_by_user_code(self, user_code: str) -> tuple[str, DeviceFlowState] | None:
raw_dc = self._redis.get(USER_CODE_KEY_FMT.format(code=user_code))
if not raw_dc:
return None
device_code = raw_dc.decode() if isinstance(raw_dc, (bytes, bytearray)) else raw_dc
state = self._load_state(device_code)
if state is None:
return None
return device_code, state
def load_by_device_code(self, device_code: str) -> DeviceFlowState | None:
return self._load_state(device_code)
def _load_state(self, device_code: str) -> DeviceFlowState | None:
raw = self._redis.get(DEVICE_CODE_KEY_FMT.format(code=device_code))
if not raw:
return None
text_ = raw.decode() if isinstance(raw, (bytes, bytearray)) else raw
try:
return DeviceFlowState.from_json(text_)
except (ValueError, KeyError):
logger.exception("device_flow: corrupt state for %s", device_code)
return None
def approve(
self,
device_code: str,
subject_email: str,
account_id: str | None,
minted_token: str,
token_id: str,
subject_issuer: str | None = None,
poll_payload: dict | None = None,
) -> None:
state = self._load_state(device_code)
if state is None:
raise StateNotFoundError(device_code)
if state.status is not DeviceFlowStatus.PENDING:
raise InvalidTransitionError(f"cannot approve {state.status}")
state.status = DeviceFlowStatus.APPROVED
state.subject_email = subject_email
state.account_id = account_id
state.subject_issuer = subject_issuer
state.minted_token = minted_token
state.token_id = token_id
state.poll_payload = poll_payload
new_ttl = self._remaining_ttl(device_code, floor=APPROVED_TTL_SECONDS_MIN)
self._redis.setex(DEVICE_CODE_KEY_FMT.format(code=device_code), new_ttl, state.to_json())
def deny(self, device_code: str) -> None:
state = self._load_state(device_code)
if state is None:
raise StateNotFoundError(device_code)
if state.status is not DeviceFlowStatus.PENDING:
raise InvalidTransitionError(f"cannot deny {state.status}")
state.status = DeviceFlowStatus.DENIED
self._redis.setex(
DEVICE_CODE_KEY_FMT.format(code=device_code),
self._remaining_ttl(device_code, floor=1),
state.to_json(),
)
def consume_on_poll(self, device_code: str) -> DeviceFlowState | None:
"""Race-safe via Lua EVAL: GET + status-check + DEL execute in a
single Redis transaction so only one of N concurrent pollers
observes the APPROVED state. Losers get None, mapped to
expired_token by the caller.
"""
raw = self._consume_on_poll_script(
keys=[DEVICE_CODE_KEY_FMT.format(code=device_code)],
args=[_USER_CODE_KEY_PREFIX],
)
if raw is None:
return None
text_ = raw.decode() if isinstance(raw, (bytes, bytearray)) else raw
try:
return DeviceFlowState.from_json(text_)
except (ValueError, KeyError):
logger.exception("device_flow: corrupt state on consume %s", device_code)
return None
def record_poll(self, device_code: str, interval_seconds: int) -> SlowDownDecision:
now = time.time()
key = f"device_code:{device_code}:last_poll"
prev_raw = self._redis.get(key)
self._redis.setex(key, DEVICE_FLOW_TTL_SECONDS, str(now))
if prev_raw is None:
return SlowDownDecision.OK
prev_s = prev_raw.decode() if isinstance(prev_raw, (bytes, bytearray)) else prev_raw
try:
prev = float(prev_s)
except ValueError:
return SlowDownDecision.OK
if now - prev < interval_seconds:
return SlowDownDecision.SLOW_DOWN
return SlowDownDecision.OK
def _remaining_ttl(self, device_code: str, floor: int) -> int:
"""``max(remaining, floor)`` — guarantees the CLI has at least
``floor`` seconds to poll after a near-expiry approve.
"""
ttl = self._redis.ttl(DEVICE_CODE_KEY_FMT.format(code=device_code))
if ttl is None or ttl < 0:
return floor
return max(int(ttl), floor)
# ============================================================================
# Token mint — generate + upsert
# ============================================================================
OAUTH_BODY_BYTES = 32 # ~256 bits entropy
PREFIX_OAUTH_ACCOUNT = "dfoa_"
PREFIX_OAUTH_EXTERNAL_SSO = "dfoe_"
# Sentinel issuer for account-flow rows. Postgres' default partial unique
# index treats NULLs as distinct, which would let two live `dfoa_` rows
# share (email, client, device) and break rotate-in-place. Storing a
# non-empty literal makes the composite key collide as intended.
ACCOUNT_ISSUER_SENTINEL = "dify:account"
@dataclass(frozen=True, slots=True)
class MintResult:
"""Plaintext token surfaces to the caller once."""
token: str
token_id: uuid.UUID
expires_at: datetime
@dataclass(frozen=True, slots=True)
class UpsertOutcome:
token_id: uuid.UUID
rotated: bool
old_hash: str | None
def generate_token(prefix: str) -> str:
return prefix + secrets.token_urlsafe(OAUTH_BODY_BYTES)
def sha256_hex(token: str) -> str:
return hashlib.sha256(token.encode("utf-8")).hexdigest()
def mint_oauth_token(
# Accept either Session or Flask-SQLAlchemy's request-scoped wrapper —
# the wrapper proxies the same execute/commit surface.
session: Session | scoped_session,
redis_client,
*,
subject_email: str,
subject_issuer: str | None,
account_id: str | None,
client_id: str,
device_label: str,
prefix: str,
ttl_days: int,
) -> MintResult:
"""Live row rotates in place via partial unique index
``uq_oauth_active_per_device``; hard-expired rows are excluded by the
index predicate so re-login INSERTs fresh. Pre-rotate Redis entry is
deleted so stale AuthContext drops immediately.
"""
if prefix == PREFIX_OAUTH_ACCOUNT:
# Account flow always writes the sentinel — caller may pass None
# (for clarity) or the sentinel itself; nothing else is valid.
if subject_issuer not in (None, ACCOUNT_ISSUER_SENTINEL):
raise ValueError(f"account-flow token must use ACCOUNT_ISSUER_SENTINEL, got {subject_issuer!r}")
subject_issuer = ACCOUNT_ISSUER_SENTINEL
elif prefix == PREFIX_OAUTH_EXTERNAL_SSO:
# Defense in depth: enterprise canonicalises + rejects empty,
# but a regression there must not yield a NULL composite key here.
if not subject_issuer or not subject_issuer.strip():
raise ValueError("external-SSO token requires non-empty subject_issuer")
else:
raise ValueError(f"unknown oauth prefix: {prefix!r}")
token = generate_token(prefix)
new_hash = sha256_hex(token)
expires_at = datetime.now(UTC) + timedelta(days=ttl_days)
outcome = _upsert(
session,
subject_email=subject_email,
subject_issuer=subject_issuer,
account_id=account_id,
client_id=client_id,
device_label=device_label,
prefix=prefix,
new_hash=new_hash,
expires_at=expires_at,
)
if outcome.rotated and outcome.old_hash:
redis_client.delete(TOKEN_CACHE_KEY_FMT.format(hash=outcome.old_hash))
return MintResult(token=token, token_id=outcome.token_id, expires_at=expires_at)
def _upsert(
session: Session | scoped_session,
*,
subject_email: str,
subject_issuer: str | None,
account_id: str | None,
client_id: str,
device_label: str,
prefix: str,
new_hash: str,
expires_at: datetime,
) -> UpsertOutcome:
# Snapshot prior live row's hash for Redis invalidation post-rotate.
# subject_issuer is always non-null here (account flow uses sentinel,
# external-SSO is validated upstream), so equality matches the index.
prior = session.execute(
select(OAuthAccessToken.id, OAuthAccessToken.token_hash)
.where(
OAuthAccessToken.subject_email == subject_email,
OAuthAccessToken.subject_issuer == subject_issuer,
OAuthAccessToken.client_id == client_id,
OAuthAccessToken.device_label == device_label,
OAuthAccessToken.revoked_at.is_(None),
)
.limit(1)
).first()
old_hash = prior.token_hash if prior else None
insert_stmt = pg_insert(OAuthAccessToken).values(
subject_email=subject_email,
subject_issuer=subject_issuer,
account_id=account_id,
client_id=client_id,
device_label=device_label,
prefix=prefix,
token_hash=new_hash,
expires_at=expires_at,
)
upsert_stmt = insert_stmt.on_conflict_do_update(
index_elements=["subject_email", "subject_issuer", "client_id", "device_label"],
index_where=OAuthAccessToken.revoked_at.is_(None),
set_={
"token_hash": insert_stmt.excluded.token_hash,
"prefix": insert_stmt.excluded.prefix,
"account_id": insert_stmt.excluded.account_id,
"expires_at": insert_stmt.excluded.expires_at,
"created_at": func.now(),
"last_used_at": None,
},
).returning(OAuthAccessToken.id)
row = session.execute(upsert_stmt).first()
session.commit()
if row is None:
raise RuntimeError("oauth_token upsert returned no row")
token_id = uuid.UUID(str(row.id))
return UpsertOutcome(
token_id=token_id,
rotated=prior is not None,
old_hash=old_hash,
)
# ============================================================================
# TTL policy — days new OAuth tokens live
# ============================================================================
DEFAULT_OAUTH_TTL_DAYS = 14
MIN_TTL_DAYS = 1
MAX_TTL_DAYS = 365
_TTL_ENV_VAR = "OAUTH_TTL_DAYS"
def oauth_ttl_days(tenant_id: str | None = None) -> int:
"""``OAUTH_TTL_DAYS`` env, else default. EE tenant-level lookup
is deferred; when it lands it wins over the env (Redis-cached 60s).
"""
_ = tenant_id
raw = os.environ.get(_TTL_ENV_VAR)
if raw is None:
return DEFAULT_OAUTH_TTL_DAYS
try:
value = int(raw)
except ValueError:
logger.warning(
"%s=%r is not an int; falling back to %d",
_TTL_ENV_VAR,
raw,
DEFAULT_OAUTH_TTL_DAYS,
)
return DEFAULT_OAUTH_TTL_DAYS
if value < MIN_TTL_DAYS:
logger.warning("%s=%d below min %d; clamping", _TTL_ENV_VAR, value, MIN_TTL_DAYS)
return MIN_TTL_DAYS
if value > MAX_TTL_DAYS:
logger.warning("%s=%d above max %d; clamping", _TTL_ENV_VAR, value, MAX_TTL_DAYS)
return MAX_TTL_DAYS
return value

View File

@ -23,7 +23,7 @@ class PluginAutoUpgradeService:
exclude_plugins: list[str],
include_plugins: list[str],
) -> bool:
with session_factory.create_session() as session, session.begin():
with session_factory.create_session() as session:
exist_strategy = session.scalar(
select(TenantPluginAutoUpgradeStrategy)
.where(TenantPluginAutoUpgradeStrategy.tenant_id == tenant_id)
@ -50,7 +50,7 @@ class PluginAutoUpgradeService:
@staticmethod
def exclude_plugin(tenant_id: str, plugin_id: str) -> bool:
with session_factory.create_session() as session, session.begin():
with session_factory.create_session() as session:
exist_strategy = session.scalar(
select(TenantPluginAutoUpgradeStrategy)
.where(TenantPluginAutoUpgradeStrategy.tenant_id == tenant_id)

View File

@ -62,19 +62,6 @@ def build_workflow_event_stream(
idle_timeout: float = 300,
ping_interval: float = 10.0,
) -> Generator[Mapping[str, Any] | str, None, None]:
"""Yield a stream of workflow events composed of a DB-derived snapshot followed by live tail.
The stream is assembled in two phases that are kept **structurally disjoint** so no
per-event deduplication is required:
1. Snapshot phase: events rebuilt from persistent state via ``_build_snapshot_events``
(``workflow_started``, optional ``message_replace``, ``node_started``/``node_finished``
for each persisted execution, and an optional terminal ``workflow_paused``). This
represents the history that already happened from the client's point of view.
2. Tail phase: events delivered by the broadcast subscription **from the moment of
subscription onward only**. Anything that was published before the subscription was
established is ignored here, since message id is not tracked in current implementation.
"""
topic = MessageGenerator.get_response_topic(app_mode, workflow_run.id)
workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
node_execution_repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository(session_maker)

View File

@ -0,0 +1,125 @@
"""Shared fixtures for /openapi/v1/* integration tests."""
from __future__ import annotations
import hashlib
import uuid
from collections.abc import Generator
from datetime import UTC, datetime, timedelta
import pytest
from flask import Flask
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models import Account, App, OAuthAccessToken, Tenant, TenantAccountJoin
from models.account import AccountStatus
def _sha256(token: str) -> str:
return hashlib.sha256(token.encode("utf-8")).hexdigest()
@pytest.fixture(autouse=True)
def disable_enterprise(monkeypatch):
"""Default to CE behaviour for /openapi/v1 tests. Tests that exercise the
EE branch override this with their own monkeypatch in-test."""
from configs import dify_config
monkeypatch.setattr(dify_config, "ENTERPRISE_ENABLED", False)
@pytest.fixture
def workspace_account(flask_app: Flask) -> Generator[tuple[Account, Tenant, TenantAccountJoin], None, None]:
with flask_app.app_context():
tenant = Tenant(name="t1", status="normal")
account = Account(email="u@example.com", name="u")
db.session.add_all([tenant, account])
db.session.commit()
account.status = AccountStatus.ACTIVE
join = TenantAccountJoin(tenant_id=tenant.id, account_id=account.id, role="owner")
db.session.add(join)
db.session.commit()
yield account, tenant, join
db.session.delete(join)
db.session.delete(account)
db.session.delete(tenant)
db.session.commit()
@pytest.fixture
def app_in_workspace(flask_app: Flask, workspace_account) -> Generator[App, None, None]:
_, tenant, _ = workspace_account
with flask_app.app_context():
app = App(tenant_id=tenant.id, name="a", mode="chat", status="normal", enable_site=True, enable_api=True)
db.session.add(app)
db.session.commit()
yield app
db.session.delete(app)
db.session.commit()
@pytest.fixture
def mint_token(flask_app: Flask):
"""Factory fixture; tracks minted rows and deletes them on teardown so
the auth-related test runs don't accumulate `oauth_access_tokens` rows."""
minted: list[OAuthAccessToken] = []
def _mint(
token: str,
*,
account_id: str | None,
prefix: str,
subject_email: str,
subject_issuer: str | None,
) -> OAuthAccessToken:
with flask_app.app_context():
row = OAuthAccessToken(
token_hash=_sha256(token),
prefix=prefix,
account_id=account_id,
subject_email=subject_email,
subject_issuer=subject_issuer,
client_id="difyctl",
device_label="test-device",
expires_at=datetime.now(UTC) + timedelta(hours=1),
)
db.session.add(row)
db.session.commit()
minted.append(row)
return row
yield _mint
with flask_app.app_context():
for row in minted:
db.session.delete(db.session.merge(row))
db.session.commit()
@pytest.fixture
def account_token(workspace_account, mint_token) -> str:
account, _, _ = workspace_account
token = "dfoa_" + uuid.uuid4().hex
mint_token(
token,
account_id=account.id,
prefix="dfoa_",
subject_email=account.email,
subject_issuer="dify:account",
)
return token
@pytest.fixture(autouse=True)
def _flush_auth_redis(flask_app: Flask) -> Generator[None, None, None]:
def _flush():
with flask_app.app_context():
for k in redis_client.keys("auth:*"):
redis_client.delete(k)
for k in redis_client.keys("rl:*"):
redis_client.delete(k)
_flush()
yield
_flush()

View File

@ -0,0 +1,252 @@
"""Integration tests for POST /openapi/v1/apps/<id>/run."""
from __future__ import annotations
import uuid
from collections.abc import Generator
import pytest
from flask import Flask
from core.app.entities.app_invoke_entities import InvokeFrom
from extensions.ext_database import db
from models import App
def test_run_chat_dispatches_to_chat_handler(flask_app, account_token, app_in_workspace, monkeypatch):
captured = {}
def _fake_generate(*, app_model, user, args, invoke_from, streaming):
captured["mode"] = app_model.mode
captured["args"] = args
captured["invoke_from"] = invoke_from
return {
"event": "message",
"task_id": "t",
"id": "m",
"message_id": "m",
"conversation_id": "c",
"mode": "chat",
"answer": "ok",
"created_at": 0,
}
monkeypatch.setattr(
"controllers.openapi.app_run.AppGenerateService.generate", staticmethod(_fake_generate)
)
client = flask_app.test_client()
res = client.post(
f"/openapi/v1/apps/{app_in_workspace.id}/run",
json={"inputs": {}, "query": "hi", "response_mode": "blocking", "user": "spoof@x.com"},
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 200
assert res.get_json()["mode"] == "chat"
assert captured["mode"] == "chat"
assert captured["invoke_from"] == InvokeFrom.OPENAPI
assert "user" not in captured["args"], "server must strip body.user; identity comes from bearer"
@pytest.fixture
def app_with_mode(flask_app: Flask, workspace_account):
"""Factory that creates an App row in the workspace_account tenant with
a specified mode. Tracks rows for teardown.
"""
_, tenant, _ = workspace_account
created: list[App] = []
def _make(mode: str) -> App:
with flask_app.app_context():
app = App(
tenant_id=tenant.id,
name=f"a-{mode}",
mode=mode,
status="normal",
enable_site=True,
enable_api=True,
)
db.session.add(app)
db.session.commit()
db.session.refresh(app)
db.session.expunge(app)
created.append(app)
return app
yield _make
with flask_app.app_context():
for app in created:
db.session.delete(db.session.merge(app))
db.session.commit()
def test_run_chat_without_query_returns_422(flask_app, account_token, app_in_workspace, monkeypatch):
client = flask_app.test_client()
res = client.post(
f"/openapi/v1/apps/{app_in_workspace.id}/run",
json={"inputs": {}, "response_mode": "blocking"},
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 422
assert b"query_required_for_chat" in res.data
def test_run_completion_dispatches_to_completion_handler(
flask_app, account_token, app_with_mode, monkeypatch
):
app = app_with_mode("completion")
captured: dict = {}
def _fake_generate(*, app_model, user, args, invoke_from, streaming):
captured["mode"] = app_model.mode
captured["args"] = args
return {
"event": "message",
"task_id": "t",
"id": "m",
"message_id": "m",
"mode": "completion",
"answer": "ok",
"created_at": 0,
}
monkeypatch.setattr(
"controllers.openapi.app_run.AppGenerateService.generate", staticmethod(_fake_generate)
)
client = flask_app.test_client()
res = client.post(
f"/openapi/v1/apps/{app.id}/run",
json={"inputs": {}, "response_mode": "blocking"},
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 200
assert res.get_json()["mode"] == "completion"
assert captured["mode"] == "completion"
def test_run_workflow_with_query_returns_422(flask_app, account_token, app_with_mode, monkeypatch):
app = app_with_mode("workflow")
client = flask_app.test_client()
res = client.post(
f"/openapi/v1/apps/{app.id}/run",
json={"inputs": {}, "query": "hi", "response_mode": "blocking"},
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 422
assert b"query_not_supported_for_workflow" in res.data
def test_run_workflow_no_query_dispatches_to_workflow_handler(
flask_app, account_token, app_with_mode, monkeypatch
):
app = app_with_mode("workflow")
def _fake_generate(*, app_model, user, args, invoke_from, streaming):
return {
"workflow_run_id": "wfr",
"task_id": "t",
"data": {"id": "wf-d", "workflow_id": "wf", "status": "succeeded"},
}
monkeypatch.setattr(
"controllers.openapi.app_run.AppGenerateService.generate", staticmethod(_fake_generate)
)
client = flask_app.test_client()
res = client.post(
f"/openapi/v1/apps/{app.id}/run",
json={"inputs": {}, "response_mode": "blocking"},
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 200
body = res.get_json()
assert body["mode"] == "workflow"
assert body["workflow_run_id"] == "wfr"
def test_run_unsupported_mode_returns_422(flask_app, account_token, app_with_mode, monkeypatch):
app = app_with_mode("channel")
client = flask_app.test_client()
res = client.post(
f"/openapi/v1/apps/{app.id}/run",
json={"inputs": {}, "response_mode": "blocking"},
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 422
assert b"mode_not_runnable" in res.data
def test_run_without_bearer_returns_401(flask_app, app_in_workspace):
client = flask_app.test_client()
res = client.post(
f"/openapi/v1/apps/{app_in_workspace.id}/run",
json={"inputs": {}, "query": "hi"},
)
assert res.status_code == 401
def test_run_with_insufficient_scope_returns_403(
flask_app, account_token, app_in_workspace, monkeypatch
):
"""Stub the authenticator to return an AuthContext with empty scopes."""
from libs import oauth_bearer
real_authenticate = oauth_bearer.BearerAuthenticator.authenticate
def _stub_authenticate(self, token: str):
ctx = real_authenticate(self, token)
from dataclasses import replace
return replace(ctx, scopes=frozenset())
monkeypatch.setattr(oauth_bearer.BearerAuthenticator, "authenticate", _stub_authenticate)
client = flask_app.test_client()
res = client.post(
f"/openapi/v1/apps/{app_in_workspace.id}/run",
json={"inputs": {}, "query": "hi"},
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 403
def test_run_with_unknown_app_returns_404(flask_app, account_token):
client = flask_app.test_client()
res = client.post(
f"/openapi/v1/apps/{uuid.uuid4()}/run",
json={"inputs": {}, "query": "hi"},
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 404
def test_run_streaming_returns_event_stream(
flask_app, account_token, app_in_workspace, monkeypatch
):
def _stream() -> Generator[str, None, None]:
yield "event: message\ndata: {\"x\": 1}\n\n"
monkeypatch.setattr(
"controllers.openapi.app_run.AppGenerateService.generate",
staticmethod(lambda **kw: _stream()),
)
client = flask_app.test_client()
res = client.post(
f"/openapi/v1/apps/{app_in_workspace.id}/run",
json={"inputs": {}, "query": "hi", "response_mode": "streaming"},
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 200
assert res.headers["Content-Type"].startswith("text/event-stream")
assert b"event: message" in res.data
def test_run_without_inputs_returns_422(flask_app, account_token, app_in_workspace):
client = flask_app.test_client()
res = client.post(
f"/openapi/v1/apps/{app_in_workspace.id}/run",
json={"query": "hi"},
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 422

View File

@ -0,0 +1,210 @@
"""Integration tests for /openapi/v1/apps* read surface."""
from __future__ import annotations
from flask.testing import FlaskClient
from models import App
def test_apps_bare_id_route_404(test_client, app_in_workspace, account_token):
resp = test_client.get(
f"/openapi/v1/apps/{app_in_workspace.id}",
headers={"Authorization": f"Bearer {account_token}"},
)
assert resp.status_code == 404
def test_apps_parameters_route_404(test_client, app_in_workspace, account_token):
resp = test_client.get(
f"/openapi/v1/apps/{app_in_workspace.id}/parameters",
headers={"Authorization": f"Bearer {account_token}"},
)
assert resp.status_code == 404
def test_apps_info_route_404(test_client, app_in_workspace, account_token):
resp = test_client.get(
f"/openapi/v1/apps/{app_in_workspace.id}/info",
headers={"Authorization": f"Bearer {account_token}"},
)
assert resp.status_code == 404
def test_apps_describe_returns_merged_shape(
test_client: FlaskClient,
app_in_workspace: App,
account_token: str,
):
res = test_client.get(
f"/openapi/v1/apps/{app_in_workspace.id}/describe",
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 200
body = res.json
assert body["info"]["id"] == app_in_workspace.id
assert body["info"]["mode"] == "chat"
assert isinstance(body["parameters"], dict)
def test_apps_describe_full_includes_input_schema(
test_client: FlaskClient,
app_in_workspace: App,
account_token: str,
):
res = test_client.get(
f"/openapi/v1/apps/{app_in_workspace.id}/describe",
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 200
body = res.json
assert body["info"] is not None
assert body["parameters"] is not None
assert body["input_schema"] is not None
assert body["input_schema"]["$schema"] == "https://json-schema.org/draft/2020-12/schema"
def test_apps_describe_fields_info_only(
test_client: FlaskClient,
app_in_workspace: App,
account_token: str,
):
res = test_client.get(
f"/openapi/v1/apps/{app_in_workspace.id}/describe?fields=info",
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 200
body = res.json
assert body["info"] is not None
assert body["parameters"] is None
assert body["input_schema"] is None
def test_apps_describe_fields_parameters_only(
test_client: FlaskClient,
app_in_workspace: App,
account_token: str,
):
res = test_client.get(
f"/openapi/v1/apps/{app_in_workspace.id}/describe?fields=parameters",
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 200
body = res.json
assert body["info"] is None
assert body["parameters"] is not None
assert body["input_schema"] is None
def test_apps_describe_fields_input_schema_only(
test_client: FlaskClient,
app_in_workspace: App,
account_token: str,
):
res = test_client.get(
f"/openapi/v1/apps/{app_in_workspace.id}/describe?fields=input_schema",
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 200
body = res.json
assert body["info"] is None
assert body["parameters"] is None
assert body["input_schema"] is not None
def test_apps_describe_fields_combined(
test_client: FlaskClient,
app_in_workspace: App,
account_token: str,
):
res = test_client.get(
f"/openapi/v1/apps/{app_in_workspace.id}/describe?fields=info,input_schema",
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 200
body = res.json
assert body["info"] is not None
assert body["parameters"] is None
assert body["input_schema"] is not None
def test_apps_describe_fields_unknown_returns_422(
test_client: FlaskClient,
app_in_workspace: App,
account_token: str,
):
res = test_client.get(
f"/openapi/v1/apps/{app_in_workspace.id}/describe?fields=garbage",
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 422
def test_apps_describe_fields_extra_param_returns_422(
test_client: FlaskClient,
app_in_workspace: App,
account_token: str,
):
res = test_client.get(
f"/openapi/v1/apps/{app_in_workspace.id}/describe?fields=info&page=1",
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 422
def test_apps_list_returns_pagination_envelope(
test_client: FlaskClient,
workspace_account,
app_in_workspace: App,
account_token: str,
):
_, tenant, _ = workspace_account
res = test_client.get(
f"/openapi/v1/apps?workspace_id={tenant.id}&page=1&limit=20",
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 200
body = res.json
assert body["page"] == 1
assert body["limit"] == 20
assert body["total"] >= 1
assert any(d["id"] == app_in_workspace.id for d in body["data"])
def test_apps_list_requires_workspace_id(test_client: FlaskClient, account_token: str):
res = test_client.get("/openapi/v1/apps", headers={"Authorization": f"Bearer {account_token}"})
assert res.status_code == 400
def test_apps_list_tag_no_match_returns_empty_data_not_400(
test_client: FlaskClient,
workspace_account,
app_in_workspace: App,
account_token: str,
):
_, tenant, _ = workspace_account
res = test_client.get(
f"/openapi/v1/apps?workspace_id={tenant.id}&tag=nonexistent",
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 200
assert res.json["data"] == []
def test_account_sessions_returns_envelope(
test_client: FlaskClient,
account_token: str,
):
res = test_client.get("/openapi/v1/account/sessions", headers={"Authorization": f"Bearer {account_token}"})
assert res.status_code == 200
body = res.json
# canonical envelope shape
assert isinstance(body["data"], list)
assert "page" in body
assert "limit" in body
assert "total" in body
assert "has_more" in body
# the bearer's own minted session must appear
assert any(s["prefix"] == "dfoa_" for s in body["data"])
# legacy "sessions" key must NOT appear
assert "sessions" not in body

View File

@ -0,0 +1,127 @@
"""Integration tests for the /openapi/v1 bearer auth surface.
Layer 0 (workspace membership), per-token rate limit, and read-scope (`apps:read`)
acceptance/rejection on app-scoped routes.
"""
from __future__ import annotations
from collections.abc import Generator
import pytest
from flask import Flask
from flask.testing import FlaskClient
from extensions.ext_database import db
from models import App, Tenant
def test_info_accepts_account_bearer_with_apps_read_scope(
test_client: FlaskClient,
app_in_workspace: App,
account_token: str,
) -> None:
res = test_client.get(
f"/openapi/v1/apps/{app_in_workspace.id}/info",
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 200
assert res.json["id"] == app_in_workspace.id
@pytest.fixture
def other_workspace_app(flask_app: Flask) -> Generator[App, None, None]:
"""A fresh app under a *different* tenant — caller has no membership row."""
with flask_app.app_context():
other_tenant = Tenant(name="other", status="normal")
db.session.add(other_tenant)
db.session.commit()
app = App(
tenant_id=other_tenant.id,
name="b",
mode="chat",
status="normal",
enable_site=True,
enable_api=True,
)
db.session.add(app)
db.session.commit()
yield app
db.session.delete(app)
db.session.delete(other_tenant)
db.session.commit()
def test_layer0_denies_account_bearer_without_membership(
test_client: FlaskClient,
account_token: str,
other_workspace_app: App,
) -> None:
"""Account A bearer hitting an app under tenant B — Layer 0 denies on CE."""
res = test_client.get(
f"/openapi/v1/apps/{other_workspace_app.id}/info",
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 403
assert res.json.get("message") == "workspace_membership_revoked"
def test_layer0_skipped_when_enterprise_enabled(
test_client: FlaskClient,
account_token: str,
other_workspace_app: App,
monkeypatch,
) -> None:
"""On EE, Layer 0 short-circuits — gateway RBAC owns tenant isolation.
/info uses validate_bearer + require_workspace_member inline (no
AppAuthzCheck), so a cross-tenant bearer reaches the app lookup and
gets 200 — gateway is expected to enforce isolation upstream.
"""
from configs import dify_config
# Override the conftest autouse default for this test only.
monkeypatch.setattr(dify_config, "ENTERPRISE_ENABLED", True)
res = test_client.get(
f"/openapi/v1/apps/{other_workspace_app.id}/info",
headers={"Authorization": f"Bearer {account_token}"},
)
assert res.status_code == 200
assert res.json.get("message") != "workspace_membership_revoked"
def test_rate_limit_returns_429_after_60_requests(
test_client: FlaskClient,
account_token: str,
) -> None:
"""61st sequential GET to /account on the same bearer → 429 with Retry-After."""
headers = {"Authorization": f"Bearer {account_token}"}
for i in range(60):
r = test_client.get("/openapi/v1/account", headers=headers)
assert r.status_code == 200, f"unexpected fail at i={i}"
r = test_client.get("/openapi/v1/account", headers=headers)
assert r.status_code == 429
assert r.headers.get("Retry-After"), "Retry-After header missing"
assert int(r.headers["Retry-After"]) >= 1
body = r.json or {}
assert body.get("error") == "rate_limited"
assert isinstance(body.get("retry_after_ms"), int)
assert body["retry_after_ms"] >= 1000
def test_rate_limit_bucket_shared_across_surfaces(
test_client: FlaskClient,
app_in_workspace: App,
account_token: str,
) -> None:
"""30 calls to /account + 30 calls to /apps/<id>/info on same token → 61st 429s."""
headers = {"Authorization": f"Bearer {account_token}"}
for _ in range(30):
assert test_client.get("/openapi/v1/account", headers=headers).status_code == 200
for _ in range(30):
assert test_client.get(f"/openapi/v1/apps/{app_in_workspace.id}/info", headers=headers).status_code == 200
r = test_client.get("/openapi/v1/account", headers=headers)
assert r.status_code == 429

View File

@ -50,7 +50,6 @@ def make_message():
msg.user_feedback = MagicMock(rating=None)
msg.status = "normal"
msg.error = None
msg.workflow_run_id = "22222222-2222-2222-2222-222222222222"
return msg
@ -85,8 +84,6 @@ class TestMessageListApi:
assert result["limit"] == 20
assert result["has_more"] is False
assert len(result["data"]) == 2
assert result["data"][0]["workflow_run_id"] == "22222222-2222-2222-2222-222222222222"
assert result["data"][1]["workflow_run_id"] == "22222222-2222-2222-2222-222222222222"
def test_get_not_chat_app(self):
api = module.MessageListApi()

View File

@ -68,10 +68,7 @@ class TestChangeEmailSend:
mock_features.return_value = SimpleNamespace(enable_change_email=True)
mock_account = _build_account("current@example.com", "acc1")
mock_current_account.return_value = (mock_account, None)
mock_get_change_data.return_value = {
"email": "current@example.com",
AccountService.CHANGE_EMAIL_TOKEN_PHASE_KEY: AccountService.CHANGE_EMAIL_PHASE_OLD_VERIFIED,
}
mock_get_change_data.return_value = {"email": "current@example.com"}
mock_send_email.return_value = "token-abc"
with app.test_request_context(
@ -88,55 +85,12 @@ class TestChangeEmailSend:
email="new@example.com",
old_email="current@example.com",
language="en-US",
phase=AccountService.CHANGE_EMAIL_PHASE_NEW,
phase="new_email",
)
mock_extract_ip.assert_called_once()
mock_is_ip_limit.assert_called_once_with("127.0.0.1")
mock_csrf.assert_called_once()
@patch("controllers.console.wraps.db")
@patch("controllers.console.workspace.account.current_account_with_tenant")
@patch("controllers.console.workspace.account.AccountService.get_change_email_data")
@patch("controllers.console.workspace.account.AccountService.send_change_email_email")
@patch("controllers.console.workspace.account.AccountService.is_email_send_ip_limit", return_value=False)
@patch("controllers.console.workspace.account.extract_remote_ip", return_value="127.0.0.1")
@patch("libs.login.check_csrf_token", return_value=None)
@patch("controllers.console.wraps.FeatureService.get_system_features")
def test_should_reject_new_email_phase_when_token_phase_is_not_old_verified(
self,
mock_features,
mock_csrf,
mock_extract_ip,
mock_is_ip_limit,
mock_send_email,
mock_get_change_data,
mock_current_account,
mock_db,
app,
):
"""GHSA-4q3w-q5mc-45rq: a phase-1 token must not unlock the new-email send step."""
from controllers.console.auth.error import InvalidTokenError
_mock_wraps_db(mock_db)
mock_features.return_value = SimpleNamespace(enable_change_email=True)
mock_account = _build_account("current@example.com", "acc1")
mock_current_account.return_value = (mock_account, None)
mock_get_change_data.return_value = {
"email": "current@example.com",
AccountService.CHANGE_EMAIL_TOKEN_PHASE_KEY: AccountService.CHANGE_EMAIL_PHASE_OLD,
}
with app.test_request_context(
"/account/change-email",
method="POST",
json={"email": "New@Example.com", "language": "en-US", "phase": "new_email", "token": "token-123"},
):
_set_logged_in_user(_build_account("tester@example.com", "tester"))
with pytest.raises(InvalidTokenError):
ChangeEmailSendEmailApi().post()
mock_send_email.assert_not_called()
class TestChangeEmailValidity:
@patch("controllers.console.wraps.db")
@ -168,12 +122,7 @@ class TestChangeEmailValidity:
mock_account = _build_account("user@example.com", "acc2")
mock_current_account.return_value = (mock_account, None)
mock_is_rate_limit.return_value = False
mock_get_data.return_value = {
"email": "user@example.com",
"code": "1234",
"old_email": "old@example.com",
AccountService.CHANGE_EMAIL_TOKEN_PHASE_KEY: AccountService.CHANGE_EMAIL_PHASE_OLD,
}
mock_get_data.return_value = {"email": "user@example.com", "code": "1234", "old_email": "old@example.com"}
mock_generate_token.return_value = (None, "new-token")
with app.test_request_context(
@ -189,169 +138,11 @@ class TestChangeEmailValidity:
mock_add_rate.assert_not_called()
mock_revoke_token.assert_called_once_with("token-123")
mock_generate_token.assert_called_once_with(
"user@example.com",
code="1234",
old_email="old@example.com",
additional_data={
AccountService.CHANGE_EMAIL_TOKEN_PHASE_KEY: AccountService.CHANGE_EMAIL_PHASE_OLD_VERIFIED,
},
"user@example.com", code="1234", old_email="old@example.com", additional_data={}
)
mock_reset_rate.assert_called_once_with("user@example.com")
mock_csrf.assert_called_once()
@patch("controllers.console.wraps.db")
@patch("controllers.console.workspace.account.current_account_with_tenant")
@patch("controllers.console.workspace.account.AccountService.reset_change_email_error_rate_limit")
@patch("controllers.console.workspace.account.AccountService.generate_change_email_token")
@patch("controllers.console.workspace.account.AccountService.revoke_change_email_token")
@patch("controllers.console.workspace.account.AccountService.add_change_email_error_rate_limit")
@patch("controllers.console.workspace.account.AccountService.get_change_email_data")
@patch("controllers.console.workspace.account.AccountService.is_change_email_error_rate_limit")
@patch("libs.login.check_csrf_token", return_value=None)
@patch("controllers.console.wraps.FeatureService.get_system_features")
def test_should_upgrade_new_phase_token_to_new_verified(
self,
mock_features,
mock_csrf,
mock_is_rate_limit,
mock_get_data,
mock_add_rate,
mock_revoke_token,
mock_generate_token,
mock_reset_rate,
mock_current_account,
mock_db,
app,
):
_mock_wraps_db(mock_db)
mock_features.return_value = SimpleNamespace(enable_change_email=True)
mock_current_account.return_value = (_build_account("old@example.com", "acc"), None)
mock_is_rate_limit.return_value = False
mock_get_data.return_value = {
"email": "new@example.com",
"code": "1234",
"old_email": "old@example.com",
AccountService.CHANGE_EMAIL_TOKEN_PHASE_KEY: AccountService.CHANGE_EMAIL_PHASE_NEW,
}
mock_generate_token.return_value = (None, "new-verified-token")
with app.test_request_context(
"/account/change-email/validity",
method="POST",
json={"email": "new@example.com", "code": "1234", "token": "token-123"},
):
_set_logged_in_user(_build_account("tester@example.com", "tester"))
response = ChangeEmailCheckApi().post()
assert response == {"is_valid": True, "email": "new@example.com", "token": "new-verified-token"}
mock_generate_token.assert_called_once_with(
"new@example.com",
code="1234",
old_email="old@example.com",
additional_data={
AccountService.CHANGE_EMAIL_TOKEN_PHASE_KEY: AccountService.CHANGE_EMAIL_PHASE_NEW_VERIFIED,
},
)
@patch("controllers.console.wraps.db")
@patch("controllers.console.workspace.account.current_account_with_tenant")
@patch("controllers.console.workspace.account.AccountService.reset_change_email_error_rate_limit")
@patch("controllers.console.workspace.account.AccountService.generate_change_email_token")
@patch("controllers.console.workspace.account.AccountService.revoke_change_email_token")
@patch("controllers.console.workspace.account.AccountService.add_change_email_error_rate_limit")
@patch("controllers.console.workspace.account.AccountService.get_change_email_data")
@patch("controllers.console.workspace.account.AccountService.is_change_email_error_rate_limit")
@patch("libs.login.check_csrf_token", return_value=None)
@patch("controllers.console.wraps.FeatureService.get_system_features")
def test_should_reject_validity_when_token_phase_is_unknown(
self,
mock_features,
mock_csrf,
mock_is_rate_limit,
mock_get_data,
mock_add_rate,
mock_revoke_token,
mock_generate_token,
mock_reset_rate,
mock_current_account,
mock_db,
app,
):
"""A token whose phase marker is a string but not a known transition must be rejected."""
from controllers.console.auth.error import InvalidTokenError
_mock_wraps_db(mock_db)
mock_features.return_value = SimpleNamespace(enable_change_email=True)
mock_current_account.return_value = (_build_account("old@example.com", "acc"), None)
mock_is_rate_limit.return_value = False
mock_get_data.return_value = {
"email": "user@example.com",
"code": "1234",
"old_email": "old@example.com",
AccountService.CHANGE_EMAIL_TOKEN_PHASE_KEY: "something_else",
}
with app.test_request_context(
"/account/change-email/validity",
method="POST",
json={"email": "user@example.com", "code": "1234", "token": "token-123"},
):
_set_logged_in_user(_build_account("tester@example.com", "tester"))
with pytest.raises(InvalidTokenError):
ChangeEmailCheckApi().post()
mock_revoke_token.assert_not_called()
mock_generate_token.assert_not_called()
@patch("controllers.console.wraps.db")
@patch("controllers.console.workspace.account.current_account_with_tenant")
@patch("controllers.console.workspace.account.AccountService.reset_change_email_error_rate_limit")
@patch("controllers.console.workspace.account.AccountService.generate_change_email_token")
@patch("controllers.console.workspace.account.AccountService.revoke_change_email_token")
@patch("controllers.console.workspace.account.AccountService.add_change_email_error_rate_limit")
@patch("controllers.console.workspace.account.AccountService.get_change_email_data")
@patch("controllers.console.workspace.account.AccountService.is_change_email_error_rate_limit")
@patch("libs.login.check_csrf_token", return_value=None)
@patch("controllers.console.wraps.FeatureService.get_system_features")
def test_should_reject_validity_when_token_has_no_phase(
self,
mock_features,
mock_csrf,
mock_is_rate_limit,
mock_get_data,
mock_add_rate,
mock_revoke_token,
mock_generate_token,
mock_reset_rate,
mock_current_account,
mock_db,
app,
):
"""A token minted without a phase marker (e.g. a hand-crafted token) must not validate."""
from controllers.console.auth.error import InvalidTokenError
_mock_wraps_db(mock_db)
mock_features.return_value = SimpleNamespace(enable_change_email=True)
mock_current_account.return_value = (_build_account("old@example.com", "acc"), None)
mock_is_rate_limit.return_value = False
mock_get_data.return_value = {
"email": "user@example.com",
"code": "1234",
"old_email": "old@example.com",
}
with app.test_request_context(
"/account/change-email/validity",
method="POST",
json={"email": "user@example.com", "code": "1234", "token": "token-123"},
):
_set_logged_in_user(_build_account("tester@example.com", "tester"))
with pytest.raises(InvalidTokenError):
ChangeEmailCheckApi().post()
mock_revoke_token.assert_not_called()
mock_generate_token.assert_not_called()
class TestChangeEmailReset:
@patch("controllers.console.wraps.db")
@ -384,11 +175,7 @@ class TestChangeEmailReset:
mock_current_account.return_value = (current_user, None)
mock_is_freeze.return_value = False
mock_check_unique.return_value = True
mock_get_data.return_value = {
"email": "new@example.com",
"old_email": "OLD@example.com",
AccountService.CHANGE_EMAIL_TOKEN_PHASE_KEY: AccountService.CHANGE_EMAIL_PHASE_NEW_VERIFIED,
}
mock_get_data.return_value = {"old_email": "OLD@example.com"}
mock_account_after_update = _build_account("new@example.com", "acc3-updated")
mock_update_account.return_value = mock_account_after_update
@ -407,155 +194,6 @@ class TestChangeEmailReset:
mock_send_notify.assert_called_once_with(email="new@example.com")
mock_csrf.assert_called_once()
@patch("controllers.console.wraps.db")
@patch("controllers.console.workspace.account.current_account_with_tenant")
@patch("controllers.console.workspace.account.AccountService.send_change_email_completed_notify_email")
@patch("controllers.console.workspace.account.AccountService.update_account_email")
@patch("controllers.console.workspace.account.AccountService.revoke_change_email_token")
@patch("controllers.console.workspace.account.AccountService.get_change_email_data")
@patch("controllers.console.workspace.account.AccountService.check_email_unique")
@patch("controllers.console.workspace.account.AccountService.is_account_in_freeze")
@patch("libs.login.check_csrf_token", return_value=None)
@patch("controllers.console.wraps.FeatureService.get_system_features")
def test_should_reject_reset_when_token_phase_is_not_new_verified(
self,
mock_features,
mock_csrf,
mock_is_freeze,
mock_check_unique,
mock_get_data,
mock_revoke_token,
mock_update_account,
mock_send_notify,
mock_current_account,
mock_db,
app,
):
"""GHSA-4q3w-q5mc-45rq PoC: phase-1 token must not be usable against /reset."""
from controllers.console.auth.error import InvalidTokenError
_mock_wraps_db(mock_db)
mock_features.return_value = SimpleNamespace(enable_change_email=True)
current_user = _build_account("old@example.com", "acc3")
mock_current_account.return_value = (current_user, None)
mock_is_freeze.return_value = False
mock_check_unique.return_value = True
# Simulate a token straight out of step #1 (phase=old_email) — exactly
# the replay used in the advisory PoC.
mock_get_data.return_value = {
"email": "old@example.com",
"old_email": "old@example.com",
AccountService.CHANGE_EMAIL_TOKEN_PHASE_KEY: AccountService.CHANGE_EMAIL_PHASE_OLD,
}
with app.test_request_context(
"/account/change-email/reset",
method="POST",
json={"new_email": "attacker@example.com", "token": "token-from-step1"},
):
_set_logged_in_user(_build_account("tester@example.com", "tester"))
with pytest.raises(InvalidTokenError):
ChangeEmailResetApi().post()
mock_revoke_token.assert_not_called()
mock_update_account.assert_not_called()
mock_send_notify.assert_not_called()
@patch("controllers.console.wraps.db")
@patch("controllers.console.workspace.account.current_account_with_tenant")
@patch("controllers.console.workspace.account.AccountService.send_change_email_completed_notify_email")
@patch("controllers.console.workspace.account.AccountService.update_account_email")
@patch("controllers.console.workspace.account.AccountService.revoke_change_email_token")
@patch("controllers.console.workspace.account.AccountService.get_change_email_data")
@patch("controllers.console.workspace.account.AccountService.check_email_unique")
@patch("controllers.console.workspace.account.AccountService.is_account_in_freeze")
@patch("libs.login.check_csrf_token", return_value=None)
@patch("controllers.console.wraps.FeatureService.get_system_features")
def test_should_reject_reset_when_token_email_differs_from_payload_new_email(
self,
mock_features,
mock_csrf,
mock_is_freeze,
mock_check_unique,
mock_get_data,
mock_revoke_token,
mock_update_account,
mock_send_notify,
mock_current_account,
mock_db,
app,
):
"""A verified token for address A must not be replayed to change to address B."""
from controllers.console.auth.error import InvalidTokenError
_mock_wraps_db(mock_db)
mock_features.return_value = SimpleNamespace(enable_change_email=True)
current_user = _build_account("old@example.com", "acc3")
mock_current_account.return_value = (current_user, None)
mock_is_freeze.return_value = False
mock_check_unique.return_value = True
mock_get_data.return_value = {
"email": "verified@example.com",
"old_email": "old@example.com",
AccountService.CHANGE_EMAIL_TOKEN_PHASE_KEY: AccountService.CHANGE_EMAIL_PHASE_NEW_VERIFIED,
}
with app.test_request_context(
"/account/change-email/reset",
method="POST",
json={"new_email": "attacker@example.com", "token": "token-verified"},
):
_set_logged_in_user(_build_account("tester@example.com", "tester"))
with pytest.raises(InvalidTokenError):
ChangeEmailResetApi().post()
mock_revoke_token.assert_not_called()
mock_update_account.assert_not_called()
mock_send_notify.assert_not_called()
class TestAccountServiceSendChangeEmailEmail:
"""Service-level coverage for the phase-bound changes in `send_change_email_email`."""
def test_should_raise_value_error_for_invalid_phase(self):
with pytest.raises(ValueError, match="phase must be one of"):
AccountService.send_change_email_email(
email="user@example.com",
old_email="user@example.com",
phase="old_email_verified",
)
@patch("services.account_service.send_change_mail_task")
@patch("services.account_service.AccountService.change_email_rate_limiter")
@patch("services.account_service.AccountService.generate_change_email_token")
def test_should_stamp_phase_into_generated_token(
self,
mock_generate_token,
mock_rate_limiter,
mock_mail_task,
):
mock_rate_limiter.is_rate_limited.return_value = False
mock_generate_token.return_value = ("123456", "the-token")
returned = AccountService.send_change_email_email(
email="user@example.com",
old_email="user@example.com",
language="en-US",
phase=AccountService.CHANGE_EMAIL_PHASE_NEW,
)
assert returned == "the-token"
mock_generate_token.assert_called_once_with(
"user@example.com",
None,
old_email="user@example.com",
additional_data={
AccountService.CHANGE_EMAIL_TOKEN_PHASE_KEY: AccountService.CHANGE_EMAIL_PHASE_NEW,
},
)
mock_mail_task.delay.assert_called_once()
mock_rate_limiter.increment_rate_limit.assert_called_once_with("user@example.com")
class TestAccountDeletionFeedback:
@patch("controllers.console.wraps.db")

View File

@ -0,0 +1,54 @@
from unittest.mock import patch
from controllers.openapi.auth.composition import OAUTH_BEARER_PIPELINE, _resolve_app_authz_strategy
from controllers.openapi.auth.pipeline import Pipeline
from controllers.openapi.auth.steps import (
AppAuthzCheck,
AppResolver,
BearerCheck,
CallerMount,
ScopeCheck,
WorkspaceMembershipCheck,
)
from controllers.openapi.auth.strategies import (
AccountMounter,
AclStrategy,
EndUserMounter,
MembershipStrategy,
)
def test_pipeline_is_composed():
assert isinstance(OAUTH_BEARER_PIPELINE, Pipeline)
def test_pipeline_step_order():
"""BearerCheck → ScopeCheck → AppResolver → WorkspaceMembershipCheck →
AppAuthzCheck → CallerMount. Rate-limit is enforced inside
`BearerAuthenticator.authenticate`, not as a separate pipeline step."""
steps = OAUTH_BEARER_PIPELINE._steps
assert isinstance(steps[0], BearerCheck)
assert isinstance(steps[1], ScopeCheck)
assert isinstance(steps[2], AppResolver)
assert isinstance(steps[3], WorkspaceMembershipCheck)
assert isinstance(steps[4], AppAuthzCheck)
assert isinstance(steps[5], CallerMount)
def test_caller_mount_has_both_mounters():
cm = OAUTH_BEARER_PIPELINE._steps[5]
kinds = {type(m) for m in cm._mounters}
assert AccountMounter in kinds
assert EndUserMounter in kinds
@patch("controllers.openapi.auth.composition.FeatureService")
def test_strategy_resolver_picks_acl_when_enabled(fs):
fs.get_system_features.return_value.webapp_auth.enabled = True
assert isinstance(_resolve_app_authz_strategy(), AclStrategy)
@patch("controllers.openapi.auth.composition.FeatureService")
def test_strategy_resolver_picks_membership_when_disabled(fs):
fs.get_system_features.return_value.webapp_auth.enabled = False
assert isinstance(_resolve_app_authz_strategy(), MembershipStrategy)

View File

@ -0,0 +1,21 @@
from unittest.mock import MagicMock
from controllers.openapi.auth.context import Context
def test_context_starts_unpopulated():
ctx = Context(request=MagicMock(), required_scope="apps:run")
assert ctx.subject_type is None
assert ctx.subject_email is None
assert ctx.account_id is None
assert ctx.scopes == frozenset()
assert ctx.app is None
assert ctx.tenant is None
assert ctx.caller is None
assert ctx.caller_kind is None
def test_context_fields_are_mutable():
ctx = Context(request=MagicMock(), required_scope="apps:run")
ctx.scopes = frozenset({"full"})
assert "full" in ctx.scopes

View File

@ -0,0 +1,61 @@
from unittest.mock import MagicMock
import pytest
from flask import Flask
from controllers.openapi.auth.context import Context
from controllers.openapi.auth.pipeline import Pipeline
def test_run_invokes_each_step_in_order():
calls = []
class S:
def __init__(self, tag):
self.tag = tag
def __call__(self, ctx):
calls.append(self.tag)
Pipeline(S("a"), S("b"), S("c")).run(Context(request=MagicMock(), required_scope="x"))
assert calls == ["a", "b", "c"]
def test_run_short_circuits_on_raise():
calls = []
class Boom:
def __call__(self, ctx):
raise RuntimeError("boom")
class Tail:
def __call__(self, ctx):
calls.append("ran")
with pytest.raises(RuntimeError):
Pipeline(Boom(), Tail()).run(Context(request=MagicMock(), required_scope="x"))
assert calls == []
def test_guard_decorator_runs_pipeline_and_unpacks_handler_kwargs():
seen = {}
class FakeStep:
def __call__(self, ctx):
ctx.app = "APP"
ctx.caller = "CALLER"
ctx.caller_kind = "account"
pipeline = Pipeline(FakeStep())
@pipeline.guard(scope="apps:run")
def handler(app_model, caller, caller_kind):
seen["app_model"] = app_model
seen["caller"] = caller
seen["caller_kind"] = caller_kind
return "ok"
app = Flask(__name__)
with app.test_request_context("/x", method="POST"):
assert handler() == "ok"
assert seen == {"app_model": "APP", "caller": "CALLER", "caller_kind": "account"}

View File

@ -0,0 +1,64 @@
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import pytest
from werkzeug.exceptions import BadRequest, Forbidden, NotFound
from controllers.openapi.auth.context import Context
from controllers.openapi.auth.steps import AppResolver
from models import TenantStatus
def _ctx(view_args):
req = MagicMock()
req.view_args = view_args
return Context(request=req, required_scope="apps:run")
def _app(*, status="normal", enable_api=True):
return SimpleNamespace(id="app1", tenant_id="t1", status=status, enable_api=enable_api)
def _tenant(*, status=TenantStatus.NORMAL):
return SimpleNamespace(id="t1", status=status)
def test_resolver_rejects_missing_path_param():
with pytest.raises(BadRequest):
AppResolver()(_ctx({}))
def test_resolver_rejects_none_view_args():
with pytest.raises(BadRequest):
AppResolver()(_ctx(None))
@patch("controllers.openapi.auth.steps.db")
def test_resolver_404_when_app_missing(db):
db.session.get.side_effect = [None]
with pytest.raises(NotFound):
AppResolver()(_ctx({"app_id": "x"}))
@patch("controllers.openapi.auth.steps.db")
def test_resolver_403_when_disabled(db):
db.session.get.side_effect = [_app(enable_api=False)]
with pytest.raises(Forbidden) as exc:
AppResolver()(_ctx({"app_id": "x"}))
assert "service_api_disabled" in str(exc.value.description)
@patch("controllers.openapi.auth.steps.db")
def test_resolver_403_when_tenant_archived(db):
db.session.get.side_effect = [_app(), _tenant(status=TenantStatus.ARCHIVE)]
with pytest.raises(Forbidden):
AppResolver()(_ctx({"app_id": "x"}))
@patch("controllers.openapi.auth.steps.db")
def test_resolver_populates_app_and_tenant(db):
db.session.get.side_effect = [_app(), _tenant()]
ctx = _ctx({"app_id": "x"})
AppResolver()(ctx)
assert ctx.app.id == "app1"
assert ctx.tenant.id == "t1"

View File

@ -0,0 +1,53 @@
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import pytest
from werkzeug.exceptions import Forbidden
from controllers.openapi.auth.context import Context
from controllers.openapi.auth.steps import AppAuthzCheck
from controllers.openapi.auth.strategies import AclStrategy, MembershipStrategy
from libs.oauth_bearer import SubjectType
def _ctx(*, subject_type, account_id="acc1"):
c = Context(request=MagicMock(), required_scope="apps:run")
c.subject_type = subject_type
c.subject_email = "alice@example.com"
c.account_id = account_id
c.app = SimpleNamespace(id="app1")
c.tenant = SimpleNamespace(id="t1")
return c
@patch("controllers.openapi.auth.strategies.EnterpriseService")
def test_acl_strategy_calls_inner_api(ent):
ent.WebAppAuth.is_user_allowed_to_access_webapp.return_value = True
assert AclStrategy().authorize(_ctx(subject_type=SubjectType.ACCOUNT)) is True
ent.WebAppAuth.is_user_allowed_to_access_webapp.assert_called_once_with(
user_id="alice@example.com",
app_id="app1",
)
@patch("controllers.openapi.auth.strategies._has_tenant_membership")
def test_membership_strategy_uses_join_lookup(member):
member.return_value = True
assert MembershipStrategy().authorize(_ctx(subject_type=SubjectType.ACCOUNT)) is True
member.assert_called_once_with("acc1", "t1")
def test_membership_strategy_rejects_external_sso():
assert MembershipStrategy().authorize(_ctx(subject_type=SubjectType.EXTERNAL_SSO, account_id=None)) is False
def test_app_authz_check_raises_when_strategy_denies():
deny = SimpleNamespace(authorize=lambda c: False)
with pytest.raises(Forbidden) as exc:
AppAuthzCheck(lambda: deny)(_ctx(subject_type=SubjectType.ACCOUNT))
assert "subject_no_app_access" in str(exc.value.description)
def test_app_authz_check_passes_when_strategy_allows():
allow = SimpleNamespace(authorize=lambda c: True)
AppAuthzCheck(lambda: allow)(_ctx(subject_type=SubjectType.ACCOUNT))

Some files were not shown because too many files have changed in this diff Show More