Compare commits

..

55 Commits

Author SHA1 Message Date
31cf656b35 feat(cli): app run overhaul — always stream, --inputs JSON, HITL pause/resume, Ctrl+C stop
- Remove blocking mode; all apps stream SSE, --stream controls live vs collect output
- Replace --input k=v with --inputs '{json}' (single object, mutually exclusive with --inputs-file)
- Add --workflow-id, --file flags
- HITL: human_input_required → pause JSON to stdout + hint to stderr + exit 2
- Ctrl+C: captures task_id from SSE, calls stop-task, exits 1
- New difyctl run app resume subcommand: POST form, reconnect SSE, stream to completion
- resume: --action (auto-select), --with-history (include_state_snapshot), --stream flags
- Delete BlockingStrategy; simplify pickStrategy(isText, livePrint)
- Add HitlPauseError, SILENT_EVENTS handling in sse-collector and stream-handlers
- Update dify-mock: always SSE, hitl-pause/hitl-resume scenarios, stop/form/events handlers
- Update agent guide: --inputs JSON syntax, HITL pause/resume instructions
2026-05-15 02:53:19 -07:00
8be6665d22 feat(api,cli): openapi HITL endpoints — always-stream, human_input_form, workflow_events, stop-task
- Remove response_mode from AppRunRequest; openapi /run always streams
- Add POST /apps/<id>/tasks/<task_id>/stop (SIGINT hook target)
- Add GET/POST /apps/<id>/form/human_input/<token> (HITL form fetch/submit)
- Add GET /apps/<id>/tasks/<task_id>/events (SSE reconnect after resume)
- Add HumanInputSurface.OPENAPI; map to STANDALONE_WEB_APP recipient type
- Regenerate cli/src/types/data-contracts.ts via pnpm sync-models
2026-05-15 02:50:54 -07:00
c2b91d849d ci(cli): replace Makefile with pnpm scripts, add Dockerfile.dev and CI workflows
- Replace Makefile with package.json scripts (ci, clean, docker:build-dev,
  sync-models, version:info); update cli-tests.yml to use pnpm ci
- Add cli/Dockerfile.dev: multi-stage local build from monorepo source,
  proper cache layer order, non-root user, corepack-driven pnpm version
- Add .dockerignore at repo root to exclude node_modules/dist/.git from
  build context
- Add cli-docker-build.yml: PR + merge_group validation of Dockerfile.dev,
  amd64+arm64 via Depot for org; amd64-only for forks
- cli-release.yml: pin action SHAs, depot-ubuntu-24.04 runner, move
  verify-release step before build for fast-fail
- main-ci.yml: add packages/tsconfig/** and cli-docker-build.yml to cli
  path filter
- Drop cli/docs/specs/** and cli/docs/*.md (superseded)
2026-05-14 22:10:42 -07:00
e0f4e98a2f chore(cli): pre-merge cleanup — docker images, comments, tsconfig lib
- docker-compose.yaml: revert api/web from build: back to image tags
  (1.14.1); fix api_websocket/worker/worker_beat downgraded to 1.14.0
- Remove verbose internal design comments from openapi controllers
- web/next.config.ts: trim anti-framing comment to one line
- cli/tsconfig.json: drop lib:ES2015 override (broke Error.cause typing)
- eslint.config.mjs: ignore cli/context/** and cli/docs/** (local caches)
- pnpm-lock.yaml: regenerate after fresh install
2026-05-14 20:44:51 -07:00
9d554495cf feat(cli): add OPENAPI_ENABLED env switch, default false
Gates the entire /openapi/v1/* blueprint on a single env var.
When false (default), the blueprint is never registered so all
CLI requests return 404 before any auth or logic runs.

Set OPENAPI_ENABLED=true to activate the endpoint group.
2026-05-14 20:16:14 -07:00
c2868075fa Merge remote-tracking branch 'origin/main' into feat/cli
# Conflicts:
#	docker/docker-compose.yaml
#	pnpm-lock.yaml
#	pnpm-workspace.yaml
2026-05-14 20:11:59 -07:00
7654f14241 fix: replace deprecated testcontainers log waits (#36125) 2026-05-15 01:30:59 +00:00
yyh
194b54bae4 fix: allow tag rename without type payload (#36182)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-05-15 01:29:17 +00:00
0e16d36edb fix(commands): purge tenant tool credentials on reset-encrypt-key-pair (#35396) (#35843)
Co-authored-by: xr843 <xianren843@protonmail.com>
Co-authored-by: -LAN- <laipz8200@outlook.com>
2026-05-14 16:25:54 +00:00
432a6412a3 fix(security): tenant-scope FilePreviewApi text-extract endpoint (GHSA-2qwc-c2cc-2xwv) (#35797)
Signed-off-by: xr843 <137012659+xr843@users.noreply.github.com>
Co-authored-by: Ido Shani <ido@zafran.io>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: -LAN- <laipz8200@outlook.com>
2026-05-14 16:13:04 +00:00
55d05fe52d fix(security): enforce tenant scoping on app trace-config endpoints (GHSA-48xc-wmw8-3jr3) (#35793)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Ido Shani <ido@zafran.io>
Co-authored-by: -LAN- <laipz8200@outlook.com>
2026-05-14 15:59:31 +00:00
0d500e6965 fix(api): allow LLM nodes to access retrieved knowledge files (#36175) 2026-05-14 13:09:25 +00:00
5798610f27 refactor(api): migrate console.app.workflow_comment to BaseModel (#36180) 2026-05-14 12:13:47 +00:00
a35b28dbef refactor: cleanup duplicate code (#36173) 2026-05-14 10:34:31 +00:00
1a4288c811 fix: action btn is hidden if there are many packages to install (#36176) 2026-05-14 10:21:32 +00:00
9dc32f2318 chore: increase default graph engine min workers (#35650)
Signed-off-by: kenwoodjw <blackxin55+@gmail.com>
2026-05-14 09:27:45 +00:00
7210f856c9 fix: pipeline template render (#36168)
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-05-14 09:11:18 +00:00
ebcc1200a3 feat(MessageLogModal): refactor modal structure and improve tab handling (#36169)
Co-authored-by: CodingOnStar <hanxujiang@dify.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-05-14 08:50:21 +00:00
e660d7af38 fix(api): gracefully handle credential fetch failures in rag pipeline (#36165)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-05-14 08:27:19 +00:00
d9ccfcbc6e fix: fix delete logs failed (#36151)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-05-14 08:02:31 +00:00
a9bcec013f feat: allow disabling run time cred check (#36031)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-05-14 07:12:10 +00:00
aeb7687e2c fix: add null check in get_recommend_app_detail before accessing result['id'] (#36153) 2026-05-14 06:42:22 +00:00
9355d36718 chore(deps): bump urllib3 from 2.6.3 to 2.7.0 in /dify-agent (#36160)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-05-14 06:41:22 +00:00
a03ee828a3 fix: get recommend_app categories should not re-order it (#36161) 2026-05-14 06:36:28 +00:00
7066372892 feat(workflow): enhance workflow run callbacks with additional data tracking (#36149)
Co-authored-by: CodingOnStar <hanxujiang@dify.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-05-14 06:20:12 +00:00
55f95dbc36 feat(agent): init agent server (#36087)
Co-authored-by: Copilot <copilot@github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-05-14 06:04:44 +00:00
8b40de3c4e chore: enchance notify link ui (#36155) 2026-05-14 06:03:44 +00:00
af4b9bfa8f chore: Remove pyright in favor of pyrefly (#36154) 2026-05-14 05:49:08 +00:00
b9e3130388 chore: drop unnecessary | None on LLMError and Mail.send (#36147)
Co-authored-by: Brian Wang <20699847+BrianWang1990@users.noreply.github.com>
2026-05-14 03:22:00 +00:00
12d33652b6 fix(errors): clean unnecessary | None in error classes (#36135) 2026-05-14 03:21:41 +00:00
fe8cf2aff4 fix: fix pydantic union type error (#36134) 2026-05-14 02:54:23 +00:00
d1d190374d fix: credit pool access outside flask context (#36143) 2026-05-14 02:45:53 +00:00
e1be4e6aa8 chore(deps): bump langsmith from 0.7.31 to 0.8.0 in /api (#36142)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-05-14 02:36:02 +00:00
301a470e7a fix: isolate Langfuse v3 SDK TracerProvider to prevent cross-task interference (#36136)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-05-14 01:46:23 +00:00
91251ad5a5 chore(deps): bump authlib from 1.6.11 to 1.6.12 in /api (#36121)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-05-13 14:56:20 +00:00
3f6644a615 chore(i18n): sync translations with en-US (#36115)
Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com>
2026-05-13 09:20:47 +00:00
yyh
5edc682c4a fix(web): refine account avatar interactions (#36111)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-05-13 08:36:04 +00:00
13c00ecfc4 chore(deps): bump ujson from 5.12.0 to 5.12.1 in /api (#36112)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-05-13 08:34:32 +00:00
9d545144ce chore: remove obsolete admin console routes (#35637)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-05-13 08:08:50 +00:00
2afa39cdcb fix: knowledge hit-testing render failed. (#36106)
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-05-13 07:31:38 +00:00
bb1c883be4 ci: Update pyrefly dependency version to 1.0.0 (#36100)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-05-13 07:13:59 +00:00
yyh
03861bcee3 refactor: stabilize selector preview cards (#36105)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-05-13 06:16:26 +00:00
c34fc429ae fix: use Trans component for delete app confirmation text on editing page (#36092)
Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
2026-05-13 06:08:59 +00:00
1a83dfaf1f refactor: use BaseModel in openapi group. Generate ts code from swagger (#36076)
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-05-13 12:56:42 +08:00
d110112863 chore: update deps (#36091) 2026-05-13 03:51:21 +00:00
934a20e745 fix: restore tracing after HITL workflow resume (#36064)
Co-authored-by: -LAN- <laipz8200@outlook.com>
2026-05-13 03:23:32 +00:00
7e56a244a8 fix: fix plugin miss version and checksum (#36083)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-05-13 01:39:27 +00:00
6facd9360c chore: some match case (#36080)
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-05-12 14:45:58 +00:00
a18d7f51eb fix: fixed relative (#36078)
Co-authored-by: CodingOnStar <hanxujiang@dify.com>
2026-05-12 11:05:57 +00:00
680ef077ae chore: admin also has the permission of changing role (#36069)
Co-authored-by: Yansong Zhang <916125788@qq.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-05-12 10:15:05 +00:00
c26be9d3f4 fix: redirect unauthorized dataset access to /datasets for knowledge editors (#36073)
Co-authored-by: yyh <92089059+lyzno1@users.noreply.github.com>
2026-05-12 09:38:49 +00:00
83d14e0540 ci(cli): use depot-ubuntu-24.04 for cli-tests jobs
Aligns cli-tests.yml and the cli-tests-skip/finalizer jobs in main-ci.yml
with the rest of the pipeline; all other jobs already use depot-ubuntu-24.04.
2026-05-11 19:46:11 -07:00
1f7da9c191 Merge branch 'main' into feat/cli
Conflicts resolved:
- api/services/app_service.py: extend AppListParams with status + openapi_visible fields so the openapi caller's per-page visibility gate survives the dict->BaseModel refactor; openapi controller now constructs AppListParams.
- pnpm-workspace.yaml: union of CLI-only entries (@napi-rs/keyring, @oclif/*) with main's bumped versions (@next/*, @orpc/*, eslint-plugin-sonarjs, eslint-plugin-storybook); kept eventsource-parser.
- pnpm-lock.yaml: regenerated.
- web/app/signin/utils/post-login-redirect.ts: union impl — keep main's resolvePostLoginRedirect(searchParams) + setOAuthPendingRedirect; add hardened sessionStorage-based setPostLoginRedirect for device flow with same-origin + path whitelist; device redirect takes precedence over oauth pending.
2026-05-11 19:29:37 -07:00
b21d0ae32d fix(cli): call this.parse in arg-less commands to silence oclif UnparsedCommand warning
oclif v4 warns "did not parse its arguments" for any command class whose
run() never calls this.parse(ClassName), independent of whether the
command declares args/flags. Add the call in the five arg-less commands.
2026-05-11 19:01:14 -07:00
6779366dca feat(api,web,cli): difyctl v1.0 — OAuth device flow, /openapi/v1 auth pipeline, CLI client 2026-05-11 18:40:39 -07:00
666 changed files with 49630 additions and 4586 deletions

0
.codex Normal file
View File

15
.dockerignore Normal file
View File

@ -0,0 +1,15 @@
**/node_modules
**/.pnpm-store
**/dist
**/.next
**/.turbo
**/.cache
**/__pycache__
**/*.pyc
**/.mypy_cache
**/.ruff_cache
.git
.github
*.md
!web/README.md
!api/README.md

4
.github/CODEOWNERS vendored
View File

@ -18,6 +18,10 @@
# Docs
/docs/ @crazywoola
# CLI
/cli/ @langgenius/maintainers
/.github/workflows/cli-tests.yml @langgenius/maintainers
# Backend (default owner, more specific rules below will override)
/api/ @QuantumGhost

63
.github/workflows/cli-docker-build.yml vendored Normal file
View File

@ -0,0 +1,63 @@
name: CLI Docker Build (dev)
on:
pull_request:
branches:
- "main"
paths:
- "cli/**"
- "packages/tsconfig/**"
- "pnpm-lock.yaml"
- "pnpm-workspace.yaml"
merge_group:
branches:
- "main"
types: [checks_requested]
concurrency:
group: cli-docker-build-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
jobs:
build:
name: Build CLI dev image
if: github.event_name == 'merge_group' || github.event.pull_request.head.repo.full_name == github.repository
runs-on: depot-ubuntu-24.04-4
permissions:
contents: read
id-token: write
steps:
- name: Set up Depot CLI
uses: depot/setup-action@15c09a5f77a0840ad4bce955686522a257853461 # v1.7.1
- name: Build CLI Dockerfile.dev
uses: depot/build-push-action@5f3b3c2e5a00f0093de47f657aeaefcedff27d18 # v1.17.0
with:
project: ${{ vars.DEPOT_PROJECT_ID }}
push: false
context: "{{defaultContext}}"
file: "cli/Dockerfile.dev"
platforms: linux/amd64,linux/arm64
build-fork:
name: Build CLI dev image (fork)
if: github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name != github.repository
runs-on: ubuntu-24.04
permissions:
contents: read
steps:
- name: Checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
persist-credentials: false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # v4.0.0
- name: Build CLI Dockerfile.dev
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # v7.1.0
with:
push: false
context: "."
file: "cli/Dockerfile.dev"
platforms: linux/amd64

131
.github/workflows/cli-release.yml vendored Normal file
View File

@ -0,0 +1,131 @@
name: CLI Release
on:
release:
types: [published]
workflow_dispatch:
inputs:
dify_release_tag:
description: "dify release tag to attach cli artifacts to (e.g. 1.14.0). Bare semver — dify tags are NOT v-prefixed."
type: string
required: true
concurrency:
group: cli-release-${{ github.event.release.tag_name || inputs.dify_release_tag }}
cancel-in-progress: true
jobs:
release:
runs-on: depot-ubuntu-24.04
if: >-
github.repository == 'langgenius/dify' &&
(github.event_name == 'workflow_dispatch' ||
(vars.CLI_AUTO_RELEASE == 'true' && !github.event.release.prerelease))
env:
DIFY_TAG: ${{ github.event.release.tag_name || inputs.dify_release_tag }}
permissions:
contents: write
id-token: write
defaults:
run:
shell: bash
working-directory: ./cli
steps:
- name: Checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
persist-credentials: false
fetch-depth: 0
- name: Setup web environment
uses: ./.github/actions/setup-web
- name: Setup Node registry auth
uses: actions/setup-node@48b55a011bda9f5d6aeb4c2d9c7362e8dae4041e # v6.4.0
with:
node-version-file: .nvmrc
registry-url: 'https://registry.npmjs.org'
- name: Read cli/package.json
id: manifest
run: |
version=$(node -p "require('./package.json').version")
channel=$(node -p "require('./package.json').difyctl.channel")
minDify=$(node -p "require('./package.json').difyctl.compat.minDify")
maxDify=$(node -p "require('./package.json').difyctl.compat.maxDify")
{
echo "version=$version"
echo "channel=$channel"
echo "minDify=$minDify"
echo "maxDify=$maxDify"
} >> "$GITHUB_OUTPUT"
- name: Validate manifest
run: scripts/release-validate-manifest.sh
- name: Bump guard (auto-path only)
if: github.event_name == 'release'
run: scripts/release-bump-guard.sh
env:
NEW_VERSION: ${{ steps.manifest.outputs.version }}
NEW_MIN_DIFY: ${{ steps.manifest.outputs.minDify }}
NEW_MAX_DIFY: ${{ steps.manifest.outputs.maxDify }}
- name: Verify target dify release exists
run: gh release view "$DIFY_TAG" --repo langgenius/dify > /dev/null
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Build cli
run: |
DIFYCTL_VERSION="${{ steps.manifest.outputs.version }}" \
DIFYCTL_CHANNEL="${{ steps.manifest.outputs.channel }}" \
DIFYCTL_MIN_DIFY="${{ steps.manifest.outputs.minDify }}" \
DIFYCTL_MAX_DIFY="${{ steps.manifest.outputs.maxDify }}" \
DIFYCTL_COMMIT="$(git rev-parse HEAD)" \
DIFYCTL_BUILD_DATE="$(git log -1 --format=%cI HEAD)" \
pnpm build
- name: Pack tarballs
run: pnpm pack:tarballs
- name: Publish to npm (idempotent)
run: scripts/release-npm-publish.sh
env:
CHANNEL: ${{ steps.manifest.outputs.channel }}
NEW_VERSION: ${{ steps.manifest.outputs.version }}
NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}
- name: Generate sha256 checksum file
run: scripts/release-write-checksums.sh
env:
CLI_VERSION: ${{ steps.manifest.outputs.version }}
- name: Install cosign
uses: sigstore/cosign-installer@3454372f43399081ed03b604cb2d021dabca52bb # v3.8.2
- name: Keyless-sign tarballs + checksum file (Sigstore)
run: scripts/release-cosign-sign.sh
env:
CLI_VERSION: ${{ steps.manifest.outputs.version }}
COSIGN_EXPERIMENTAL: '1'
- name: Snapshot tarballs + checksum + signatures as workflow artifact
if: always()
uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
with:
name: difyctl-${{ steps.manifest.outputs.version }}-${{ env.DIFY_TAG }}
path: |
cli/dist/difyctl-v*.tar.xz
cli/dist/difyctl-v*-checksums.txt
cli/dist/difyctl-v*.sig
cli/dist/difyctl-v*.pem
retention-days: 90
if-no-files-found: error
- name: Upload tarballs + checksum + signatures to dify GH release (idempotent)
run: scripts/release-upload-tarballs.sh
env:
CLI_VERSION: ${{ steps.manifest.outputs.version }}
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}

57
.github/workflows/cli-smoke.yml vendored Normal file
View File

@ -0,0 +1,57 @@
name: CLI Smoke (live dify)
on:
workflow_dispatch:
inputs:
dify_version:
description: "Dify image tag to test against (e.g. 1.7.0)"
type: string
required: true
cli_ref:
description: "Git ref to build the cli from (default: current branch)"
type: string
required: false
jobs:
smoke:
runs-on: ubuntu-latest
timeout-minutes: 30
defaults:
run:
shell: bash
steps:
- name: Checkout cli ref
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
ref: ${{ inputs.cli_ref || github.ref }}
persist-credentials: false
- name: Setup web environment
uses: ./.github/actions/setup-web
- name: Bring up dify
env:
DIFY_VERSION: ${{ inputs.dify_version }}
run: |
cd docker
cp .env.example .env
DIFY_API_IMAGE_TAG="$DIFY_VERSION" \
DIFY_WEB_IMAGE_TAG="$DIFY_VERSION" \
docker compose up -d api worker web db redis
for i in $(seq 1 60); do
if curl -fsS http://localhost:5001/health >/dev/null 2>&1; then
echo "dify api ready after ${i}s"
break
fi
sleep 1
done
- name: Run smoke against live dify
working-directory: ./cli
run: pnpm exec tsx scripts/run-smoke.ts --base-url http://localhost:5001
- name: Dump dify logs on failure
if: failure()
run: |
cd docker
docker compose logs api worker web --tail=200

46
.github/workflows/cli-tests.yml vendored Normal file
View File

@ -0,0 +1,46 @@
name: CLI Tests
on:
workflow_call:
secrets:
CODECOV_TOKEN:
required: false
permissions:
contents: read
concurrency:
group: cli-tests-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
jobs:
test:
name: CLI Tests
runs-on: depot-ubuntu-24.04
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
defaults:
run:
shell: bash
working-directory: ./cli
steps:
- name: Checkout code
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
persist-credentials: false
- name: Setup web environment
uses: ./.github/actions/setup-web
- name: CI pipeline (typecheck, lint, coverage, build)
run: pnpm ci
- name: Report coverage
if: ${{ env.CODECOV_TOKEN != '' }}
uses: codecov/codecov-action@57e3a136b779b570ffcdbf80b3bdc90e7fab3de2 # v6.0.0
with:
directory: cli/coverage
flags: cli
env:
CODECOV_TOKEN: ${{ env.CODECOV_TOKEN }}

View File

@ -42,6 +42,7 @@ jobs:
runs-on: depot-ubuntu-24.04
outputs:
api-changed: ${{ steps.changes.outputs.api }}
cli-changed: ${{ steps.changes.outputs.cli }}
e2e-changed: ${{ steps.changes.outputs.e2e }}
web-changed: ${{ steps.changes.outputs.web }}
vdb-changed: ${{ steps.changes.outputs.vdb }}
@ -63,6 +64,18 @@ jobs:
- 'docker/generate_docker_compose'
- 'docker/ssrf_proxy/**'
- 'docker/volumes/sandbox/conf/**'
cli:
- 'cli/**'
- 'packages/tsconfig/**'
- 'package.json'
- 'pnpm-lock.yaml'
- 'pnpm-workspace.yaml'
- 'eslint.config.mjs'
- '.npmrc'
- '.nvmrc'
- '.github/workflows/cli-tests.yml'
- '.github/workflows/cli-docker-build.yml'
- '.github/actions/setup-web/**'
web:
- 'web/**'
- 'packages/**'
@ -184,6 +197,66 @@ jobs:
echo "API tests were not required, but the skip job finished with result: $SKIP_RESULT" >&2
exit 1
cli-tests-run:
name: Run CLI Tests
needs:
- pre_job
- check-changes
if: needs.pre_job.outputs.should_skip != 'true' && needs.check-changes.outputs.cli-changed == 'true'
uses: ./.github/workflows/cli-tests.yml
secrets: inherit
cli-tests-skip:
name: Skip CLI Tests
needs:
- pre_job
- check-changes
if: needs.pre_job.outputs.should_skip != 'true' && needs.check-changes.outputs.cli-changed != 'true'
runs-on: depot-ubuntu-24.04
steps:
- name: Report skipped CLI tests
run: echo "No CLI-related changes detected; skipping CLI tests."
cli-tests:
name: CLI Tests
if: ${{ always() }}
needs:
- pre_job
- check-changes
- cli-tests-run
- cli-tests-skip
runs-on: depot-ubuntu-24.04
steps:
- name: Finalize CLI Tests status
env:
SHOULD_SKIP_WORKFLOW: ${{ needs.pre_job.outputs.should_skip }}
TESTS_CHANGED: ${{ needs.check-changes.outputs.cli-changed }}
RUN_RESULT: ${{ needs.cli-tests-run.result }}
SKIP_RESULT: ${{ needs.cli-tests-skip.result }}
run: |
if [[ "$SHOULD_SKIP_WORKFLOW" == 'true' ]]; then
echo "CLI tests were skipped because this workflow run duplicated a successful or newer run."
exit 0
fi
if [[ "$TESTS_CHANGED" == 'true' ]]; then
if [[ "$RUN_RESULT" == 'success' ]]; then
echo "CLI tests ran successfully."
exit 0
fi
echo "CLI tests were required but finished with result: $RUN_RESULT" >&2
exit 1
fi
if [[ "$SKIP_RESULT" == 'success' ]]; then
echo "CLI tests were skipped because no CLI-related files changed."
exit 0
fi
echo "CLI tests were not required, but the skip job finished with result: $SKIP_RESULT" >&2
exit 1
web-tests-run:
name: Run Web Tests
needs:

9
.gitignore vendored
View File

@ -115,6 +115,12 @@ venv/
ENV/
env.bak/
venv.bak/
# cli/ has a src/env/ module (DIFY_* registry) — don't treat it as a venv
!/cli/src/env/
!/cli/src/commands/env/
# cli/scripts/lib/ holds TS build helpers (resolve-buildinfo etc.) — don't treat as Python lib/
!/cli/scripts/lib/
.conda/
# Spyder project settings
@ -247,8 +253,9 @@ scripts/stress-test/reports/
# settings
*.local.json
*.local.md
*.local.toml
# Code Agent Folder
.qoder/*
.context/*
.eslintcache

View File

@ -9,6 +9,7 @@ The codebase is split into:
- **Backend API** (`/api`): Python Flask application organized with Domain-Driven Design
- **Frontend Web** (`/web`): Next.js application using TypeScript and React
- **Docker deployment** (`/docker`): Containerized deployment configurations
- **Dify Agent Backend** (`/dify-agent`): Backend services for managing and executing agent
## Backend Workflow

View File

@ -83,16 +83,15 @@ lint:
@echo "✅ Linting complete"
type-check:
@echo "📝 Running type checks (basedpyright + pyrefly + mypy)..."
@./dev/basedpyright-check $(PATH_TO_CHECK)
@./dev/pyrefly-check-local
@uv --directory api run mypy --exclude-gitignore --exclude 'tests/' --exclude 'migrations/' --exclude 'dev/generate_swagger_specs.py' --check-untyped-defs --disable-error-code=import-untyped .
@echo "📝 Running type checks (pyrefly + mypy)..."
@./dev/pyrefly-check-local $(PATH_TO_CHECK)
@uv --directory api run mypy --exclude-gitignore --exclude 'tests/' --exclude 'migrations/' --exclude 'dev/generate_swagger_specs.py' --exclude 'dev/generate_fastopenapi_specs.py' --check-untyped-defs --disable-error-code=import-untyped .
@echo "✅ Type checks complete"
type-check-core:
@echo "📝 Running core type checks (basedpyright + mypy)..."
@./dev/basedpyright-check $(PATH_TO_CHECK)
@uv --directory api run mypy --exclude-gitignore --exclude 'tests/' --exclude 'migrations/' --exclude 'dev/generate_swagger_specs.py' --exclude 'dev/generate_fastopenapi_specs.py' --check-untyped-defs --disable-error-code=import-untyped .
@echo "📝 Running core type checks (pyrefly + mypy)..."
@./dev/pyrefly-check-local $(PATH_TO_CHECK)
@uv --directory api run mypy --exclude-gitignore --exclude 'tests/' --exclude 'migrations/' --exclude 'dev/generate_swagger_specs.py' --exclude 'dev/generate_fastopenapi_specs.py' --check-untyped-defs --disable-error-code=import-untyped .
@echo "✅ Core type checks complete"
test:
@ -153,8 +152,8 @@ help:
@echo " make format - Format code with ruff"
@echo " make check - Check code with ruff"
@echo " make lint - Format, fix, and lint code (ruff, imports, dotenv)"
@echo " make type-check - Run type checks (basedpyright, pyrefly, mypy)"
@echo " make type-check-core - Run core type checks (basedpyright, mypy)"
@echo " make type-check - Run type checks (pyrefly, mypy)"
@echo " make type-check-core - Run core type checks (pyrefly, mypy)"
@echo " make test - Run backend unit tests (or TARGET_TESTS=./api/tests/<target_tests>)"
@echo ""
@echo "Docker Build Targets:"

View File

@ -557,7 +557,7 @@ MAX_VARIABLE_SIZE=204800
# GraphEngine Worker Pool Configuration
# Minimum number of workers per GraphEngine instance (default: 1)
GRAPH_ENGINE_MIN_WORKERS=1
GRAPH_ENGINE_MIN_WORKERS=3
# Maximum number of workers per GraphEngine instance (default: 10)
GRAPH_ENGINE_MAX_WORKERS=10
# Queue depth threshold that triggers worker scale up (default: 3)

View File

@ -99,7 +99,7 @@ The scripts resolve paths relative to their location, so you can run them from a
./dev/reformat # Run all formatters and linters
uv run ruff check --fix ./ # Fix linting issues
uv run ruff format ./ # Format code
uv run basedpyright . # Type checking
uv run pyrefly check # Type checking
```
## Generate TS stub

View File

@ -117,7 +117,7 @@ def create_flask_app_with_configs() -> DifyApp:
logger.warning("Failed to add trace headers to response", exc_info=True)
return response
# Capture the decorator's return value to avoid pyright reportUnusedFunction
# Capture the decorator return values so static checkers do not treat the hooks as unused.
_ = before_request
_ = add_trace_headers
@ -159,6 +159,7 @@ def initialize_extensions(app: DifyApp):
ext_logstore,
ext_mail,
ext_migrate,
ext_oauth_bearer,
ext_orjson,
ext_otel,
ext_proxy_fix,
@ -203,6 +204,7 @@ def initialize_extensions(app: DifyApp):
ext_enterprise_telemetry,
ext_request_logging,
ext_session_factory,
ext_oauth_bearer,
]
for ext in extensions:
short_name = ext.__name__.split(".")[-1]

View File

@ -185,9 +185,9 @@ def transform_datasource_credentials(environment: str):
firecrawl_plugin_id = "langgenius/firecrawl_datasource"
jina_plugin_id = "langgenius/jina_datasource"
if environment == "online":
notion_plugin_unique_identifier = plugin_migration._fetch_plugin_unique_identifier(notion_plugin_id) # pyright: ignore[reportPrivateUsage]
firecrawl_plugin_unique_identifier = plugin_migration._fetch_plugin_unique_identifier(firecrawl_plugin_id) # pyright: ignore[reportPrivateUsage]
jina_plugin_unique_identifier = plugin_migration._fetch_plugin_unique_identifier(jina_plugin_id) # pyright: ignore[reportPrivateUsage]
notion_plugin_unique_identifier = plugin_migration._fetch_plugin_unique_identifier(notion_plugin_id)
firecrawl_plugin_unique_identifier = plugin_migration._fetch_plugin_unique_identifier(firecrawl_plugin_id)
jina_plugin_unique_identifier = plugin_migration._fetch_plugin_unique_identifier(jina_plugin_id)
else:
notion_plugin_unique_identifier = None
firecrawl_plugin_unique_identifier = None

View File

@ -14,6 +14,7 @@ from libs.rsa import generate_key_pair
from models import Tenant
from models.model import App, AppMode, Conversation
from models.provider import Provider, ProviderModel
from models.tools import ApiToolProvider, BuiltinToolProvider, MCPToolProvider
logger = logging.getLogger(__name__)
@ -23,13 +24,16 @@ DB_UPGRADE_LOCK_TTL_SECONDS = 60
@click.command(
"reset-encrypt-key-pair",
help="Reset the asymmetric key pair of workspace for encrypt LLM credentials. "
"After the reset, all LLM credentials will become invalid, "
"requiring re-entry."
"After the reset, all LLM credentials and tool provider credentials "
"(builtin / API / MCP) will be purged, requiring re-entry. "
"Only support SELF_HOSTED mode.",
)
@click.confirmation_option(
prompt=click.style(
"Are you sure you want to reset encrypt key pair? This operation cannot be rolled back!", fg="red"
"Are you sure you want to reset encrypt key pair? "
"This will also purge builtin / API / MCP tool provider records for every tenant. "
"This operation cannot be rolled back!",
fg="red",
)
)
def reset_encrypt_key_pair():
@ -53,6 +57,13 @@ def reset_encrypt_key_pair():
session.execute(delete(Provider).where(Provider.provider_type == "custom", Provider.tenant_id == tenant.id))
session.execute(delete(ProviderModel).where(ProviderModel.tenant_id == tenant.id))
# Purge tool provider records that hold credentials encrypted under the
# tenant key. Leaving them in place causes /console/api/workspaces/current/
# tool-providers to 500 because decryption fails on stale ciphertext (#35396).
session.execute(delete(BuiltinToolProvider).where(BuiltinToolProvider.tenant_id == tenant.id))
session.execute(delete(ApiToolProvider).where(ApiToolProvider.tenant_id == tenant.id))
session.execute(delete(MCPToolProvider).where(MCPToolProvider.tenant_id == tenant.id))
click.echo(
click.style(
f"Congratulations! The asymmetric key pair of workspace {tenant.id} has been reset.",

View File

@ -23,6 +23,12 @@ class EnterpriseFeatureConfig(BaseSettings):
ge=1, description="Maximum timeout in seconds for enterprise requests", default=5
)
ENTERPRISE_DISABLE_RUNTIME_CREDENTIAL_CHECK: bool = Field(
default=False,
description="If disabled, credential policy check is only performed when saving workflows."
"This helps gain runtime performance by trading off consistency.",
)
class EnterpriseTelemetryConfig(BaseSettings):
"""

View File

@ -520,6 +520,44 @@ class HttpConfig(BaseSettings):
def WEB_API_CORS_ALLOW_ORIGINS(self) -> list[str]:
return self.inner_WEB_API_CORS_ALLOW_ORIGINS.split(",")
OPENAPI_ENABLED: bool = Field(
description=(
"Enable the /openapi/v1/* endpoint group used by difyctl and other "
"programmatic clients. Set to true to activate; disabled by default."
),
validation_alias=AliasChoices("OPENAPI_ENABLED"),
default=False,
)
inner_OPENAPI_CORS_ALLOW_ORIGINS: str = Field(
description=(
"Comma-separated allowlist for /openapi/v1/* CORS. "
"Default empty = same-origin only. Browser-cookie routes within "
"the group reject cross-origin OPTIONS regardless of this list."
),
validation_alias=AliasChoices("OPENAPI_CORS_ALLOW_ORIGINS"),
default="",
)
@computed_field
def OPENAPI_CORS_ALLOW_ORIGINS(self) -> list[str]:
return [o for o in self.inner_OPENAPI_CORS_ALLOW_ORIGINS.split(",") if o]
inner_OPENAPI_KNOWN_CLIENT_IDS: str = Field(
description=(
"Comma-separated client_id values accepted at "
"POST /openapi/v1/oauth/device/code. New CLIs / SDKs added here "
"without code changes. Unknown client_id returns 400 unsupported_client."
),
validation_alias=AliasChoices("OPENAPI_KNOWN_CLIENT_IDS"),
default="difyctl",
)
@computed_field # type: ignore[misc]
@property
def OPENAPI_KNOWN_CLIENT_IDS(self) -> frozenset[str]:
return frozenset(c for c in self.inner_OPENAPI_KNOWN_CLIENT_IDS.split(",") if c)
HTTP_REQUEST_MAX_CONNECT_TIMEOUT: int = Field(
ge=1, description="Maximum connection timeout in seconds for HTTP requests", default=10
)
@ -761,7 +799,7 @@ class WorkflowConfig(BaseSettings):
# GraphEngine Worker Pool Configuration
GRAPH_ENGINE_MIN_WORKERS: PositiveInt = Field(
description="Minimum number of workers per GraphEngine instance",
default=1,
default=3,
)
GRAPH_ENGINE_MAX_WORKERS: PositiveInt = Field(
@ -895,6 +933,17 @@ class AuthConfig(BaseSettings):
default=86400,
)
ENABLE_OAUTH_BEARER: bool = Field(
description="Enable OAuth bearer authentication (device-flow + Service API /v1/* bearer middleware).",
default=True,
)
OPENAPI_RATE_LIMIT_PER_TOKEN: PositiveInt = Field(
description="Per-token rate limit on /openapi/v1/* (requests per minute). "
"Bucket keyed on sha256(token), shared across api replicas via Redis.",
default=60,
)
class ModerationConfig(BaseSettings):
"""
@ -1181,6 +1230,14 @@ class CeleryScheduleTasksConfig(BaseSettings):
description="Enable scheduled workflow run cleanup task",
default=False,
)
ENABLE_CLEAN_OAUTH_ACCESS_TOKENS_TASK: bool = Field(
description="Enable scheduled cleanup of revoked/expired OAuth access-token rows past retention.",
default=True,
)
OAUTH_ACCESS_TOKEN_RETENTION_DAYS: PositiveInt = Field(
description="Days to retain revoked OAuth access-token rows before deletion.",
default=30,
)
ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK: bool = Field(
description="Enable mail clean document notify task",
default=False,

View File

@ -1,5 +1,5 @@
import os
from typing import Any, Literal, TypedDict
from typing import Any, Literal, TypedDict, cast
from urllib.parse import parse_qsl, quote_plus
from pydantic import Field, NonNegativeFloat, NonNegativeInt, PositiveFloat, PositiveInt, computed_field
@ -50,28 +50,30 @@ from .vdb.vastbase_vector_config import VastbaseVectorConfig
from .vdb.vikingdb_config import VikingDBConfig
from .vdb.weaviate_config import WeaviateConfig
_VALID_STORAGE_TYPE = Literal[
"opendal",
"s3",
"aliyun-oss",
"azure-blob",
"baidu-obs",
"clickzetta-volume",
"google-storage",
"huawei-obs",
"oci-storage",
"tencent-cos",
"volcengine-tos",
"supabase",
"local",
]
class StorageConfig(BaseSettings):
STORAGE_TYPE: Literal[
"opendal",
"s3",
"aliyun-oss",
"azure-blob",
"baidu-obs",
"clickzetta-volume",
"google-storage",
"huawei-obs",
"oci-storage",
"tencent-cos",
"volcengine-tos",
"supabase",
"local",
] = Field(
STORAGE_TYPE: _VALID_STORAGE_TYPE = Field(
description="Type of storage to use."
" Options: 'opendal', '(deprecated) local', 's3', 'aliyun-oss', 'azure-blob', 'baidu-obs', "
"'clickzetta-volume', 'google-storage', 'huawei-obs', 'oci-storage', 'tencent-cos', "
"'volcengine-tos', 'supabase'. Default is 'opendal'.",
default="opendal",
default=cast(_VALID_STORAGE_TYPE, "opendal"),
)
STORAGE_LOCAL_PATH: str = Field(

View File

@ -1,6 +1,21 @@
import json
from pydantic import BaseModel, JsonValue
class HumanInputFormSubmitPayload(BaseModel):
inputs: dict[str, JsonValue]
action: str
def stringify_form_default_values(values: dict[str, object]) -> dict[str, str]:
"""Serialize default values into strings expected by human-input form clients."""
result: dict[str, str] = {}
for key, value in values.items():
if value is None:
result[key] = ""
elif isinstance(value, (dict, list)):
result[key] = json.dumps(value, ensure_ascii=False)
else:
result[key] = str(value)
return result

View File

@ -33,7 +33,6 @@ for module_name in RESOURCE_MODULES:
# Ensure resource modules are imported so route decorators are evaluated.
# Import other controllers
from . import (
admin,
apikey,
extension,
feature,
@ -117,7 +116,7 @@ from .explore import (
saved_message,
trial,
)
from .socketio import workflow as socketio_workflow # pyright: ignore[reportUnusedImport]
from .socketio import workflow as socketio_workflow
# Import tag controllers
from .tag import tags
@ -142,7 +141,6 @@ api.add_namespace(console_ns)
__all__ = [
"account",
"activate",
"admin",
"advanced_prompt_template",
"agent",
"agent_providers",

View File

@ -1,64 +1,11 @@
import csv
import io
from collections.abc import Callable
from functools import wraps
from typing import cast
from uuid import UUID
from flask import request
from flask_restx import Resource
from pydantic import BaseModel, Field, field_validator
from sqlalchemy import select
from werkzeug.exceptions import BadRequest, NotFound, Unauthorized
from werkzeug.exceptions import Unauthorized
from configs import dify_config
from constants.languages import supported_language
from controllers.common.schema import register_schema_models
from controllers.console import console_ns
from controllers.console.wraps import only_edition_cloud
from core.db.session_factory import session_factory
from extensions.ext_database import db
from libs.token import extract_access_token
from models.model import App, ExporleBanner, InstalledApp, RecommendedApp, TrialApp
from services.billing_service import BillingService, LangContentDict
class InsertExploreAppPayload(BaseModel):
app_id: str = Field(...)
desc: str | None = None
copyright: str | None = None
privacy_policy: str | None = None
custom_disclaimer: str | None = None
language: str = Field(...)
category: str = Field(...)
position: int = Field(...)
can_trial: bool = Field(default=False)
trial_limit: int = Field(default=0)
@field_validator("language")
@classmethod
def validate_language(cls, value: str) -> str:
return supported_language(value)
class InsertExploreBannerPayload(BaseModel):
category: str = Field(...)
title: str = Field(...)
description: str = Field(...)
img_src: str = Field(..., alias="img-src")
language: str = Field(default="en-US")
link: str = Field(...)
sort: int = Field(...)
@field_validator("language")
@classmethod
def validate_language(cls, value: str) -> str:
return supported_language(value)
model_config = {"populate_by_name": True}
register_schema_models(console_ns, InsertExploreAppPayload, InsertExploreBannerPayload)
def admin_required[**P, R](view: Callable[P, R]) -> Callable[P, R]:
@ -76,353 +23,3 @@ def admin_required[**P, R](view: Callable[P, R]) -> Callable[P, R]:
return view(*args, **kwargs)
return decorated
@console_ns.route("/admin/insert-explore-apps")
class InsertExploreAppListApi(Resource):
@console_ns.doc("insert_explore_app")
@console_ns.doc(description="Insert or update an app in the explore list")
@console_ns.expect(console_ns.models[InsertExploreAppPayload.__name__])
@console_ns.response(200, "App updated successfully")
@console_ns.response(201, "App inserted successfully")
@console_ns.response(404, "App not found")
@only_edition_cloud
@admin_required
def post(self):
payload = InsertExploreAppPayload.model_validate(console_ns.payload)
app = db.session.execute(select(App).where(App.id == payload.app_id)).scalar_one_or_none()
if not app:
raise NotFound(f"App '{payload.app_id}' is not found")
site = app.site
if not site:
desc = payload.desc or ""
copy_right = payload.copyright or ""
privacy_policy = payload.privacy_policy or ""
custom_disclaimer = payload.custom_disclaimer or ""
else:
desc = site.description or payload.desc or ""
copy_right = site.copyright or payload.copyright or ""
privacy_policy = site.privacy_policy or payload.privacy_policy or ""
custom_disclaimer = site.custom_disclaimer or payload.custom_disclaimer or ""
with session_factory.create_session() as session:
recommended_app = session.execute(
select(RecommendedApp).where(RecommendedApp.app_id == payload.app_id)
).scalar_one_or_none()
if not recommended_app:
recommended_app = RecommendedApp(
app_id=app.id,
description=desc,
copyright=copy_right,
privacy_policy=privacy_policy,
custom_disclaimer=custom_disclaimer,
language=payload.language,
category=payload.category,
position=payload.position,
)
db.session.add(recommended_app)
if payload.can_trial:
trial_app = db.session.execute(
select(TrialApp).where(TrialApp.app_id == payload.app_id)
).scalar_one_or_none()
if not trial_app:
db.session.add(
TrialApp(
app_id=payload.app_id,
tenant_id=app.tenant_id,
trial_limit=payload.trial_limit,
)
)
else:
trial_app.trial_limit = payload.trial_limit
app.is_public = True
db.session.commit()
return {"result": "success"}, 201
else:
recommended_app.description = desc
recommended_app.copyright = copy_right
recommended_app.privacy_policy = privacy_policy
recommended_app.custom_disclaimer = custom_disclaimer
recommended_app.language = payload.language
recommended_app.category = payload.category
recommended_app.position = payload.position
if payload.can_trial:
trial_app = db.session.execute(
select(TrialApp).where(TrialApp.app_id == payload.app_id)
).scalar_one_or_none()
if not trial_app:
db.session.add(
TrialApp(
app_id=payload.app_id,
tenant_id=app.tenant_id,
trial_limit=payload.trial_limit,
)
)
else:
trial_app.trial_limit = payload.trial_limit
app.is_public = True
db.session.commit()
return {"result": "success"}, 200
@console_ns.route("/admin/insert-explore-apps/<uuid:app_id>")
class InsertExploreAppApi(Resource):
@console_ns.doc("delete_explore_app")
@console_ns.doc(description="Remove an app from the explore list")
@console_ns.doc(params={"app_id": "Application ID to remove"})
@console_ns.response(204, "App removed successfully")
@only_edition_cloud
@admin_required
def delete(self, app_id: UUID):
with session_factory.create_session() as session:
recommended_app = session.execute(
select(RecommendedApp).where(RecommendedApp.app_id == str(app_id))
).scalar_one_or_none()
if not recommended_app:
return {"result": "success"}, 204
with session_factory.create_session() as session:
app = session.execute(select(App).where(App.id == recommended_app.app_id)).scalar_one_or_none()
if app:
app.is_public = False
with session_factory.create_session() as session:
installed_apps = (
session.execute(
select(InstalledApp).where(
InstalledApp.app_id == recommended_app.app_id,
InstalledApp.tenant_id != InstalledApp.app_owner_tenant_id,
)
)
.scalars()
.all()
)
for installed_app in installed_apps:
session.delete(installed_app)
trial_app = session.execute(
select(TrialApp).where(TrialApp.app_id == recommended_app.app_id)
).scalar_one_or_none()
if trial_app:
session.delete(trial_app)
db.session.delete(recommended_app)
db.session.commit()
return {"result": "success"}, 204
@console_ns.route("/admin/insert-explore-banner")
class InsertExploreBannerApi(Resource):
@console_ns.doc("insert_explore_banner")
@console_ns.doc(description="Insert an explore banner")
@console_ns.expect(console_ns.models[InsertExploreBannerPayload.__name__])
@console_ns.response(201, "Banner inserted successfully")
@only_edition_cloud
@admin_required
def post(self):
payload = InsertExploreBannerPayload.model_validate(console_ns.payload)
banner = ExporleBanner(
content={
"category": payload.category,
"title": payload.title,
"description": payload.description,
"img-src": payload.img_src,
},
link=payload.link,
sort=payload.sort,
language=payload.language,
)
db.session.add(banner)
db.session.commit()
return {"result": "success"}, 201
@console_ns.route("/admin/delete-explore-banner/<uuid:banner_id>")
class DeleteExploreBannerApi(Resource):
@console_ns.doc("delete_explore_banner")
@console_ns.doc(description="Delete an explore banner")
@console_ns.doc(params={"banner_id": "Banner ID to delete"})
@console_ns.response(204, "Banner deleted successfully")
@only_edition_cloud
@admin_required
def delete(self, banner_id):
banner = db.session.execute(select(ExporleBanner).where(ExporleBanner.id == banner_id)).scalar_one_or_none()
if not banner:
raise NotFound(f"Banner '{banner_id}' is not found")
db.session.delete(banner)
db.session.commit()
return {"result": "success"}, 204
class LangContentPayload(BaseModel):
lang: str = Field(..., description="Language tag: 'zh' | 'en' | 'jp'")
title: str = Field(...)
subtitle: str | None = Field(default=None)
body: str = Field(...)
title_pic_url: str | None = Field(default=None)
class UpsertNotificationPayload(BaseModel):
notification_id: str | None = Field(default=None, description="Omit to create; supply UUID to update")
contents: list[LangContentPayload] = Field(..., min_length=1)
start_time: str | None = Field(default=None, description="RFC3339, e.g. 2026-03-01T00:00:00Z")
end_time: str | None = Field(default=None, description="RFC3339, e.g. 2026-03-20T23:59:59Z")
frequency: str = Field(default="once", description="'once' | 'every_page_load'")
status: str = Field(default="active", description="'active' | 'inactive'")
class BatchAddNotificationAccountsPayload(BaseModel):
notification_id: str = Field(...)
user_email: list[str] = Field(..., description="List of account email addresses")
register_schema_models(console_ns, UpsertNotificationPayload, BatchAddNotificationAccountsPayload)
@console_ns.route("/admin/upsert_notification")
class UpsertNotificationApi(Resource):
@console_ns.doc("upsert_notification")
@console_ns.doc(
description=(
"Create or update an in-product notification. "
"Supply notification_id to update an existing one; omit it to create a new one. "
"Pass at least one language variant in contents (zh / en / jp)."
)
)
@console_ns.expect(console_ns.models[UpsertNotificationPayload.__name__])
@console_ns.response(200, "Notification upserted successfully")
@only_edition_cloud
@admin_required
def post(self):
payload = UpsertNotificationPayload.model_validate(console_ns.payload)
result = BillingService.upsert_notification(
contents=[cast(LangContentDict, c.model_dump()) for c in payload.contents],
frequency=payload.frequency,
status=payload.status,
notification_id=payload.notification_id,
start_time=payload.start_time,
end_time=payload.end_time,
)
return {"result": "success", "notification_id": result.get("notificationId")}, 200
@console_ns.route("/admin/batch_add_notification_accounts")
class BatchAddNotificationAccountsApi(Resource):
@console_ns.doc("batch_add_notification_accounts")
@console_ns.doc(
description=(
"Register target accounts for a notification by email address. "
'JSON body: {"notification_id": "...", "user_email": ["a@example.com", ...]}. '
"File upload: multipart/form-data with a 'file' field (CSV or TXT, one email per line) "
"plus a 'notification_id' field. "
"Emails that do not match any account are silently skipped."
)
)
@console_ns.response(200, "Accounts added successfully")
@only_edition_cloud
@admin_required
def post(self):
from models.account import Account
if "file" in request.files:
notification_id = request.form.get("notification_id", "").strip()
if not notification_id:
raise BadRequest("notification_id is required.")
emails = self._parse_emails_from_file()
else:
payload = BatchAddNotificationAccountsPayload.model_validate(console_ns.payload)
notification_id = payload.notification_id
emails = payload.user_email
if not emails:
raise BadRequest("No valid email addresses provided.")
# Resolve emails → account IDs in chunks to avoid large IN-clause
account_ids: list[str] = []
chunk_size = 500
for i in range(0, len(emails), chunk_size):
chunk = emails[i : i + chunk_size]
rows = db.session.execute(select(Account.id, Account.email).where(Account.email.in_(chunk))).all()
account_ids.extend(str(row.id) for row in rows)
if not account_ids:
raise BadRequest("None of the provided emails matched an existing account.")
# Send to dify-saas in batches of 1000
total_count = 0
batch_size = 1000
for i in range(0, len(account_ids), batch_size):
batch = account_ids[i : i + batch_size]
result = BillingService.batch_add_notification_accounts(
notification_id=notification_id,
account_ids=batch,
)
total_count += result.get("count", 0)
return {
"result": "success",
"emails_provided": len(emails),
"accounts_matched": len(account_ids),
"count": total_count,
}, 200
@staticmethod
def _parse_emails_from_file() -> list[str]:
"""Parse email addresses from an uploaded CSV or TXT file."""
file = request.files["file"]
if not file.filename:
raise BadRequest("Uploaded file has no filename.")
filename_lower = file.filename.lower()
if not filename_lower.endswith((".csv", ".txt")):
raise BadRequest("Invalid file type. Only CSV (.csv) and TXT (.txt) files are allowed.")
try:
content = file.stream.read().decode("utf-8")
except UnicodeDecodeError:
try:
file.stream.seek(0)
content = file.stream.read().decode("gbk")
except UnicodeDecodeError:
raise BadRequest("Unable to decode the file. Please use UTF-8 or GBK encoding.")
emails: list[str] = []
if filename_lower.endswith(".csv"):
reader = csv.reader(io.StringIO(content))
for row in reader:
for cell in row:
cell = cell.strip()
if cell:
emails.append(cell)
else:
for line in content.splitlines():
line = line.strip()
if line:
emails.append(line)
# Deduplicate while preserving order
seen: set[str] = set()
unique_emails: list[str] = []
for email in emails:
if email.lower() not in seen:
seen.add(email.lower())
unique_emails.append(email)
return unique_emails

View File

@ -11,6 +11,7 @@ from werkzeug.exceptions import Forbidden
from controllers.common.schema import register_schema_models
from extensions.ext_database import db
from fields.base import ResponseModel
from libs.helper import to_timestamp
from libs.login import current_account_with_tenant, login_required
from models.dataset import Dataset
from models.enums import ApiTokenType
@ -21,12 +22,6 @@ from . import console_ns
from .wraps import account_initialization_required, edit_permission_required, setup_required
def _to_timestamp(value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())
return value
class ApiKeyItem(ResponseModel):
id: str
type: str
@ -37,7 +32,7 @@ class ApiKeyItem(ResponseModel):
@field_validator("last_used_at", "created_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class ApiKeyList(ResponseModel):

View File

@ -3,7 +3,6 @@ import re
import uuid
from datetime import datetime
from typing import Any, Literal
from uuid import UUID
from flask import request
from flask_restx import Resource
@ -34,7 +33,7 @@ from core.trigger.constants import TRIGGER_NODE_TYPES
from extensions.ext_database import db
from fields.base import ResponseModel
from graphon.enums import WorkflowExecutionStatus
from libs.helper import build_icon_url
from libs.helper import build_icon_url, to_timestamp
from libs.login import current_account_with_tenant, login_required
from models import App, DatasetPermissionEnum, Workflow
from models.model import IconType
@ -178,12 +177,6 @@ class AppTracePayload(BaseModel):
type JSONValue = Any
def _to_timestamp(value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())
return value
class Tag(ResponseModel):
id: str
name: str
@ -200,7 +193,7 @@ class WorkflowPartial(ResponseModel):
@field_validator("created_at", "updated_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class ModelConfigPartial(ResponseModel):
@ -214,7 +207,7 @@ class ModelConfigPartial(ResponseModel):
@field_validator("created_at", "updated_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class ModelConfig(ResponseModel):
@ -275,7 +268,7 @@ class ModelConfig(ResponseModel):
@field_validator("created_at", "updated_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class Site(ResponseModel):
@ -318,7 +311,7 @@ class Site(ResponseModel):
@field_validator("created_at", "updated_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class DeletedTool(ResponseModel):
@ -361,7 +354,7 @@ class AppPartial(ResponseModel):
@field_validator("created_at", "updated_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class AppDetail(ResponseModel):
@ -391,7 +384,7 @@ class AppDetail(ResponseModel):
@field_validator("created_at", "updated_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class AppDetailWithSite(AppDetail):
@ -856,10 +849,11 @@ class AppTraceApi(Resource):
@setup_required
@login_required
@account_initialization_required
def get(self, app_id: UUID):
@get_app_model
def get(self, app_model):
"""Get app trace"""
with session_factory.create_session() as session:
app_trace_config = OpsTraceManager.get_app_tracing_config(str(app_id), session)
app_trace_config = OpsTraceManager.get_app_tracing_config(app_model.id, session)
return app_trace_config
@ -873,12 +867,13 @@ class AppTraceApi(Resource):
@login_required
@account_initialization_required
@edit_permission_required
def post(self, app_id: UUID):
@get_app_model
def post(self, app_model):
# add app trace
args = AppTracePayload.model_validate(console_ns.payload)
OpsTraceManager.update_app_tracing_config(
app_id=str(app_id),
app_id=app_model.id,
enabled=args.enabled,
tracing_provider=args.tracing_provider,
)

View File

@ -16,6 +16,7 @@ from controllers.console.wraps import account_initialization_required, setup_req
from extensions.ext_database import db
from fields._value_type_serializer import serialize_value_type
from fields.base import ResponseModel
from libs.helper import to_timestamp
from libs.login import login_required
from models import ConversationVariable
from models.model import AppMode
@ -25,12 +26,6 @@ class ConversationVariablesQuery(BaseModel):
conversation_id: str = Field(..., description="Conversation ID to filter variables")
def _to_timestamp(value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())
return value
class ConversationVariableResponse(ResponseModel):
id: str
name: str
@ -65,7 +60,7 @@ class ConversationVariableResponse(ResponseModel):
@field_validator("created_at", "updated_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class PaginatedConversationVariableResponse(ResponseModel):

View File

@ -13,6 +13,7 @@ from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, edit_permission_required, setup_required
from extensions.ext_database import db
from fields.base import ResponseModel
from libs.helper import to_timestamp
from libs.login import current_account_with_tenant, login_required
from models.enums import AppMCPServerStatus
from models.model import AppMCPServer
@ -30,12 +31,6 @@ class MCPServerUpdatePayload(BaseModel):
status: str | None = Field(default=None, description="Server status")
def _to_timestamp(value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())
return value
class AppMCPServerResponse(ResponseModel):
id: str
name: str
@ -59,7 +54,7 @@ class AppMCPServerResponse(ResponseModel):
@field_validator("created_at", "updated_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
register_schema_models(console_ns, MCPServerCreatePayload, MCPServerUpdatePayload, AppMCPServerResponse)

View File

@ -37,10 +37,9 @@ from fields.conversation_fields import (
JSONValue,
MessageFile,
format_files_contained,
to_timestamp,
)
from graphon.model_runtime.errors.invoke import InvokeError
from libs.helper import uuid_value
from libs.helper import to_timestamp, uuid_value
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from libs.login import current_account_with_tenant, login_required
from models.enums import FeedbackFromSource, FeedbackRating
@ -144,9 +143,7 @@ class MessageDetailResponse(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_created_at(cls, value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return to_timestamp(value)
return value
return to_timestamp(value)
class MessageInfiniteScrollPaginationResponse(ResponseModel):

View File

@ -1,5 +1,4 @@
from typing import Any
from uuid import UUID
from flask import request
from flask_restx import Resource, fields
@ -9,8 +8,10 @@ from werkzeug.exceptions import BadRequest
from controllers.common.schema import register_schema_models
from controllers.console import console_ns
from controllers.console.app.error import TracingConfigCheckError, TracingConfigIsExist, TracingConfigNotExist
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, setup_required
from libs.login import login_required
from models import App
from services.ops_service import OpsService
@ -43,11 +44,14 @@ class TraceAppConfigApi(Resource):
@setup_required
@login_required
@account_initialization_required
def get(self, app_id: UUID):
args = TraceProviderQuery.model_validate(request.args.to_dict(flat=True))
@get_app_model
def get(self, app_model: App):
args = TraceProviderQuery.model_validate(request.args.to_dict(flat=True)) # type: ignore
try:
trace_config = OpsService.get_tracing_app_config(app_id=str(app_id), tracing_provider=args.tracing_provider)
trace_config = OpsService.get_tracing_app_config(
app_id=app_model.id, tracing_provider=args.tracing_provider
)
if not trace_config:
return {"has_not_configured": True}
return trace_config
@ -65,13 +69,14 @@ class TraceAppConfigApi(Resource):
@setup_required
@login_required
@account_initialization_required
def post(self, app_id: UUID):
@get_app_model
def post(self, app_model: App):
"""Create a new trace app configuration"""
args = TraceConfigPayload.model_validate(console_ns.payload)
try:
result = OpsService.create_tracing_app_config(
app_id=str(app_id), tracing_provider=args.tracing_provider, tracing_config=args.tracing_config
app_id=app_model.id, tracing_provider=args.tracing_provider, tracing_config=args.tracing_config
)
if not result:
raise TracingConfigIsExist()
@ -90,13 +95,14 @@ class TraceAppConfigApi(Resource):
@setup_required
@login_required
@account_initialization_required
def patch(self, app_id: UUID):
@get_app_model
def patch(self, app_model: App):
"""Update an existing trace app configuration"""
args = TraceConfigPayload.model_validate(console_ns.payload)
try:
result = OpsService.update_tracing_app_config(
app_id=str(app_id), tracing_provider=args.tracing_provider, tracing_config=args.tracing_config
app_id=app_model.id, tracing_provider=args.tracing_provider, tracing_config=args.tracing_config
)
if not result:
raise TracingConfigNotExist()
@ -113,12 +119,13 @@ class TraceAppConfigApi(Resource):
@setup_required
@login_required
@account_initialization_required
def delete(self, app_id: UUID):
@get_app_model
def delete(self, app_model: App):
"""Delete an existing trace app configuration"""
args = TraceProviderQuery.model_validate(request.args.to_dict(flat=True))
try:
result = OpsService.delete_tracing_app_config(app_id=str(app_id), tracing_provider=args.tracing_provider)
result = OpsService.delete_tracing_app_config(app_id=app_model.id, tracing_provider=args.tracing_provider)
if not result:
raise TracingConfigNotExist()
return {"result": "success"}, 204

View File

@ -16,6 +16,7 @@ from fields.base import ResponseModel
from fields.end_user_fields import SimpleEndUser
from fields.member_fields import SimpleAccount
from graphon.enums import WorkflowExecutionStatus
from libs.helper import to_timestamp
from libs.login import login_required
from models import App
from models.model import AppMode
@ -82,9 +83,7 @@ class WorkflowRunForLogResponse(ResponseModel):
@field_validator("created_at", "finished_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())
return value
return to_timestamp(value)
class WorkflowRunForArchivedLogResponse(ResponseModel):
@ -117,9 +116,7 @@ class WorkflowAppLogPartialResponse(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())
return value
return to_timestamp(value)
class WorkflowArchivedLogPartialResponse(ResponseModel):
@ -133,9 +130,7 @@ class WorkflowArchivedLogPartialResponse(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())
return value
return to_timestamp(value)
class WorkflowAppLogPaginationResponse(ResponseModel):

View File

@ -1,22 +1,16 @@
import logging
from datetime import datetime
from flask_restx import Resource, marshal_with
from pydantic import BaseModel, Field, TypeAdapter
from flask_restx import Resource
from pydantic import BaseModel, Field, TypeAdapter, computed_field, field_validator
from controllers.common.schema import register_schema_models
from controllers.common.schema import register_response_schema_models, register_schema_models
from controllers.console import console_ns
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, edit_permission_required, setup_required
from fields.base import ResponseModel
from fields.member_fields import AccountWithRole
from fields.workflow_comment_fields import (
workflow_comment_basic_fields,
workflow_comment_create_fields,
workflow_comment_detail_fields,
workflow_comment_reply_create_fields,
workflow_comment_reply_update_fields,
workflow_comment_resolve_fields,
workflow_comment_update_fields,
)
from libs.helper import build_avatar_url, dump_response, to_timestamp
from libs.login import current_user, login_required
from models import App
from services.account_service import TenantService
@ -51,6 +45,138 @@ class WorkflowCommentMentionUsersPayload(BaseModel):
users: list[AccountWithRole]
class WorkflowCommentAccount(ResponseModel):
id: str
name: str
email: str
avatar: str | None = Field(default=None, exclude=True)
@computed_field(return_type=str | None) # type: ignore[prop-decorator]
@property
def avatar_url(self) -> str | None:
return build_avatar_url(self.avatar)
class WorkflowCommentReply(ResponseModel):
id: str
content: str
created_by: str
created_by_account: WorkflowCommentAccount | None = None
created_at: int | None = None
@field_validator("created_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return to_timestamp(value)
class WorkflowCommentMention(ResponseModel):
mentioned_user_id: str
mentioned_user_account: WorkflowCommentAccount | None = None
reply_id: str | None = None
class WorkflowCommentBasic(ResponseModel):
id: str
position_x: float
position_y: float
content: str
created_by: str
created_by_account: WorkflowCommentAccount | None = None
created_at: int | None = None
updated_at: int | None = None
resolved: bool
resolved_at: int | None = None
resolved_by: str | None = None
resolved_by_account: WorkflowCommentAccount | None = None
reply_count: int
mention_count: int
participants: list[WorkflowCommentAccount]
@field_validator("created_at", "updated_at", "resolved_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return to_timestamp(value)
class WorkflowCommentBasicList(ResponseModel):
data: list[WorkflowCommentBasic]
class WorkflowCommentDetail(ResponseModel):
id: str
position_x: float
position_y: float
content: str
created_by: str
created_by_account: WorkflowCommentAccount | None = None
created_at: int | None = None
updated_at: int | None = None
resolved: bool
resolved_at: int | None = None
resolved_by: str | None = None
resolved_by_account: WorkflowCommentAccount | None = None
replies: list[WorkflowCommentReply]
mentions: list[WorkflowCommentMention]
@field_validator("created_at", "updated_at", "resolved_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return to_timestamp(value)
class WorkflowCommentCreate(ResponseModel):
id: str
created_at: int | None = None
@field_validator("created_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return to_timestamp(value)
class WorkflowCommentUpdate(ResponseModel):
id: str
updated_at: int | None = None
@field_validator("updated_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return to_timestamp(value)
class WorkflowCommentResolve(ResponseModel):
id: str
resolved: bool
resolved_at: int | None = None
resolved_by: str | None = None
@field_validator("resolved_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return to_timestamp(value)
class WorkflowCommentReplyCreate(ResponseModel):
id: str
created_at: int | None = None
@field_validator("created_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return to_timestamp(value)
class WorkflowCommentReplyUpdate(ResponseModel):
id: str
updated_at: int | None = None
@field_validator("updated_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return to_timestamp(value)
register_schema_models(
console_ns,
AccountWithRole,
@ -59,17 +185,19 @@ register_schema_models(
WorkflowCommentUpdatePayload,
WorkflowCommentReplyPayload,
)
workflow_comment_basic_model = console_ns.model("WorkflowCommentBasic", workflow_comment_basic_fields)
workflow_comment_detail_model = console_ns.model("WorkflowCommentDetail", workflow_comment_detail_fields)
workflow_comment_create_model = console_ns.model("WorkflowCommentCreate", workflow_comment_create_fields)
workflow_comment_update_model = console_ns.model("WorkflowCommentUpdate", workflow_comment_update_fields)
workflow_comment_resolve_model = console_ns.model("WorkflowCommentResolve", workflow_comment_resolve_fields)
workflow_comment_reply_create_model = console_ns.model(
"WorkflowCommentReplyCreate", workflow_comment_reply_create_fields
)
workflow_comment_reply_update_model = console_ns.model(
"WorkflowCommentReplyUpdate", workflow_comment_reply_update_fields
register_response_schema_models(
console_ns,
WorkflowCommentAccount,
WorkflowCommentReply,
WorkflowCommentMention,
WorkflowCommentBasic,
WorkflowCommentBasicList,
WorkflowCommentDetail,
WorkflowCommentCreate,
WorkflowCommentUpdate,
WorkflowCommentResolve,
WorkflowCommentReplyCreate,
WorkflowCommentReplyUpdate,
)
@ -80,28 +208,26 @@ class WorkflowCommentListApi(Resource):
@console_ns.doc("list_workflow_comments")
@console_ns.doc(description="Get all comments for a workflow")
@console_ns.doc(params={"app_id": "Application ID"})
@console_ns.response(200, "Comments retrieved successfully", workflow_comment_basic_model)
@console_ns.response(200, "Comments retrieved successfully", console_ns.models[WorkflowCommentBasicList.__name__])
@login_required
@setup_required
@account_initialization_required
@get_app_model()
@marshal_with(workflow_comment_basic_model, envelope="data")
def get(self, app_model: App):
"""Get all comments for a workflow."""
comments = WorkflowCommentService.get_comments(tenant_id=current_user.current_tenant_id, app_id=app_model.id)
return comments
return WorkflowCommentBasicList.model_validate({"data": comments}).model_dump(mode="json")
@console_ns.doc("create_workflow_comment")
@console_ns.doc(description="Create a new workflow comment")
@console_ns.doc(params={"app_id": "Application ID"})
@console_ns.expect(console_ns.models[WorkflowCommentCreatePayload.__name__])
@console_ns.response(201, "Comment created successfully", workflow_comment_create_model)
@console_ns.response(201, "Comment created successfully", console_ns.models[WorkflowCommentCreate.__name__])
@login_required
@setup_required
@account_initialization_required
@get_app_model()
@marshal_with(workflow_comment_create_model)
@edit_permission_required
def post(self, app_model: App):
"""Create a new workflow comment."""
@ -117,7 +243,7 @@ class WorkflowCommentListApi(Resource):
mentioned_user_ids=payload.mentioned_user_ids,
)
return result, 201
return dump_response(WorkflowCommentCreate, result), 201
@console_ns.route("/apps/<uuid:app_id>/workflow/comments/<string:comment_id>")
@ -127,30 +253,28 @@ class WorkflowCommentDetailApi(Resource):
@console_ns.doc("get_workflow_comment")
@console_ns.doc(description="Get a specific workflow comment")
@console_ns.doc(params={"app_id": "Application ID", "comment_id": "Comment ID"})
@console_ns.response(200, "Comment retrieved successfully", workflow_comment_detail_model)
@console_ns.response(200, "Comment retrieved successfully", console_ns.models[WorkflowCommentDetail.__name__])
@login_required
@setup_required
@account_initialization_required
@get_app_model()
@marshal_with(workflow_comment_detail_model)
def get(self, app_model: App, comment_id: str):
"""Get a specific workflow comment."""
comment = WorkflowCommentService.get_comment(
tenant_id=current_user.current_tenant_id, app_id=app_model.id, comment_id=comment_id
)
return comment
return dump_response(WorkflowCommentDetail, comment)
@console_ns.doc("update_workflow_comment")
@console_ns.doc(description="Update a workflow comment")
@console_ns.doc(params={"app_id": "Application ID", "comment_id": "Comment ID"})
@console_ns.expect(console_ns.models[WorkflowCommentUpdatePayload.__name__])
@console_ns.response(200, "Comment updated successfully", workflow_comment_update_model)
@console_ns.response(200, "Comment updated successfully", console_ns.models[WorkflowCommentUpdate.__name__])
@login_required
@setup_required
@account_initialization_required
@get_app_model()
@marshal_with(workflow_comment_update_model)
@edit_permission_required
def put(self, app_model: App, comment_id: str):
"""Update a workflow comment."""
@ -167,7 +291,7 @@ class WorkflowCommentDetailApi(Resource):
mentioned_user_ids=payload.mentioned_user_ids,
)
return result
return dump_response(WorkflowCommentUpdate, result)
@console_ns.doc("delete_workflow_comment")
@console_ns.doc(description="Delete a workflow comment")
@ -197,12 +321,11 @@ class WorkflowCommentResolveApi(Resource):
@console_ns.doc("resolve_workflow_comment")
@console_ns.doc(description="Resolve a workflow comment")
@console_ns.doc(params={"app_id": "Application ID", "comment_id": "Comment ID"})
@console_ns.response(200, "Comment resolved successfully", workflow_comment_resolve_model)
@console_ns.response(200, "Comment resolved successfully", console_ns.models[WorkflowCommentResolve.__name__])
@login_required
@setup_required
@account_initialization_required
@get_app_model()
@marshal_with(workflow_comment_resolve_model)
@edit_permission_required
def post(self, app_model: App, comment_id: str):
"""Resolve a workflow comment."""
@ -213,7 +336,7 @@ class WorkflowCommentResolveApi(Resource):
user_id=current_user.id,
)
return comment
return dump_response(WorkflowCommentResolve, comment)
@console_ns.route("/apps/<uuid:app_id>/workflow/comments/<string:comment_id>/replies")
@ -224,12 +347,11 @@ class WorkflowCommentReplyApi(Resource):
@console_ns.doc(description="Add a reply to a workflow comment")
@console_ns.doc(params={"app_id": "Application ID", "comment_id": "Comment ID"})
@console_ns.expect(console_ns.models[WorkflowCommentReplyPayload.__name__])
@console_ns.response(201, "Reply created successfully", workflow_comment_reply_create_model)
@console_ns.response(201, "Reply created successfully", console_ns.models[WorkflowCommentReplyCreate.__name__])
@login_required
@setup_required
@account_initialization_required
@get_app_model()
@marshal_with(workflow_comment_reply_create_model)
@edit_permission_required
def post(self, app_model: App, comment_id: str):
"""Add a reply to a workflow comment."""
@ -247,7 +369,7 @@ class WorkflowCommentReplyApi(Resource):
mentioned_user_ids=payload.mentioned_user_ids,
)
return result, 201
return dump_response(WorkflowCommentReplyCreate, result), 201
@console_ns.route("/apps/<uuid:app_id>/workflow/comments/<string:comment_id>/replies/<string:reply_id>")
@ -258,12 +380,11 @@ class WorkflowCommentReplyDetailApi(Resource):
@console_ns.doc(description="Update a comment reply")
@console_ns.doc(params={"app_id": "Application ID", "comment_id": "Comment ID", "reply_id": "Reply ID"})
@console_ns.expect(console_ns.models[WorkflowCommentReplyPayload.__name__])
@console_ns.response(200, "Reply updated successfully", workflow_comment_reply_update_model)
@console_ns.response(200, "Reply updated successfully", console_ns.models[WorkflowCommentReplyUpdate.__name__])
@login_required
@setup_required
@account_initialization_required
@get_app_model()
@marshal_with(workflow_comment_reply_update_model)
@edit_permission_required
def put(self, app_model: App, comment_id: str, reply_id: str):
"""Update a comment reply."""
@ -284,7 +405,7 @@ class WorkflowCommentReplyDetailApi(Resource):
mentioned_user_ids=payload.mentioned_user_ids,
)
return reply
return dump_response(WorkflowCommentReplyUpdate, reply)
@console_ns.doc("delete_workflow_comment_reply")
@console_ns.doc(description="Delete a comment reply")

View File

@ -39,6 +39,7 @@ from fields.document_fields import (
from graphon.model_runtime.entities.model_entities import ModelType
from graphon.model_runtime.errors.invoke import InvokeAuthorizationError
from libs.datetime_utils import naive_utc_now
from libs.helper import to_timestamp
from libs.login import current_account_with_tenant, login_required
from models import DatasetProcessRule, Document, DocumentSegment, UploadFile
from models.dataset import DocumentPipelineExecutionLog
@ -71,12 +72,6 @@ from ..wraps import (
logger = logging.getLogger(__name__)
def _to_timestamp(value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())
return value
def _normalize_enum(value: Any) -> Any:
if isinstance(value, str) or value is None:
return value
@ -101,7 +96,7 @@ class DatasetResponse(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class DocumentMetadataResponse(ResponseModel):
@ -152,7 +147,7 @@ class DocumentResponse(ResponseModel):
@field_validator("created_at", "disabled_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class DocumentWithSegmentsResponse(DocumentResponse):

View File

@ -8,6 +8,7 @@ from pydantic import Field, field_validator
from controllers.common.schema import register_schema_models
from fields.base import ResponseModel
from libs.helper import to_timestamp
from libs.login import login_required
from .. import console_ns
@ -19,12 +20,6 @@ from ..wraps import (
)
def _to_timestamp(value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())
return value
class HitTestingDocument(ResponseModel):
id: str | None = None
data_source_type: str | None = None
@ -61,7 +56,7 @@ class HitTestingSegment(ResponseModel):
@field_validator("disabled_at", "created_at", "indexing_at", "completed_at", "stopped_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class HitTestingChildChunk(ResponseModel):

View File

@ -16,6 +16,7 @@ from extensions.ext_database import db
from fields.base import ResponseModel
from graphon.file import helpers as file_helpers
from libs.datetime_utils import naive_utc_now
from libs.helper import to_timestamp
from libs.login import current_account_with_tenant, login_required
from models import App, InstalledApp, RecommendedApp
from models.model import IconType
@ -105,9 +106,7 @@ class InstalledAppResponse(ResponseModel):
@field_validator("last_used_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())
return value
return to_timestamp(value)
class InstalledAppListResponse(ResponseModel):

View File

@ -7,6 +7,7 @@ from pydantic import BaseModel, Field, TypeAdapter, field_validator
from constants import HIDDEN_VALUE
from fields.base import ResponseModel
from libs.helper import to_timestamp
from libs.login import current_account_with_tenant, login_required
from models.api_based_extension import APIBasedExtension
from services.api_based_extension_service import APIBasedExtensionService
@ -40,12 +41,6 @@ def _mask_api_key(api_key: str) -> str:
return api_key[:3] + "******" + api_key[-3:]
def _to_timestamp(value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())
return value
class APIBasedExtensionResponse(ResponseModel):
id: str
name: str
@ -61,7 +56,7 @@ class APIBasedExtensionResponse(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_created_at(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
register_schema_models(console_ns, APIBasedExtensionPayload, CodeBasedExtensionResponse, APIBasedExtensionResponse)

View File

@ -105,7 +105,8 @@ class FilePreviewApi(Resource):
@account_initialization_required
def get(self, file_id):
file_id = str(file_id)
text = FileService(db.engine).get_file_preview(file_id)
_, tenant_id = current_account_with_tenant()
text = FileService(db.engine).get_file_preview(file_id, tenant_id)
return {"content": text}

View File

@ -42,7 +42,7 @@ from fields.base import ResponseModel
from fields.member_fields import Account as AccountResponse
from graphon.file import helpers as file_helpers
from libs.datetime_utils import naive_utc_now
from libs.helper import EmailStr, extract_remote_ip, timezone
from libs.helper import EmailStr, extract_remote_ip, timezone, to_timestamp
from libs.login import current_account_with_tenant, login_required
from models import AccountIntegrate, InvitationCode
from models.account import AccountStatus, InvitationCodeStatus
@ -185,12 +185,6 @@ def _serialize_account(account) -> dict[str, Any]:
return AccountResponse.model_validate(account, from_attributes=True).model_dump(mode="json")
def _to_timestamp(value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())
return value
class AccountIntegrateResponse(ResponseModel):
provider: str
created_at: int | None = None
@ -200,7 +194,7 @@ class AccountIntegrateResponse(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_created_at(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class AccountIntegrateListResponse(ResponseModel):
@ -220,7 +214,7 @@ class EducationStatusResponse(ResponseModel):
@field_validator("expire_at", mode="before")
@classmethod
def _normalize_expire_at(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class EducationAutocompleteResponse(ResponseModel):

View File

@ -70,6 +70,12 @@ register_schema_models(
)
def _is_role_enabled(role: TenantAccountRole | str, tenant_id: str) -> bool:
if role != TenantAccountRole.DATASET_OPERATOR:
return True
return FeatureService.get_features(tenant_id=tenant_id).dataset_operator_enabled
@console_ns.route("/workspaces/current/members")
class MemberListApi(Resource):
"""List all members of current tenant."""
@ -110,6 +116,8 @@ class MemberInviteEmailApi(Resource):
inviter = current_user
if not inviter.current_tenant:
raise ValueError("No current tenant")
if not _is_role_enabled(invitee_role, inviter.current_tenant.id):
return {"code": "invalid-role", "message": "Invalid role"}, 400
# Check workspace permission for member invitations
from libs.workspace_permission import check_workspace_member_invite_permission
@ -208,6 +216,8 @@ class MemberUpdateRoleApi(Resource):
current_user, _ = current_account_with_tenant()
if not current_user.current_tenant:
raise ValueError("No current tenant")
if not _is_role_enabled(new_role, current_user.current_tenant.id):
return {"code": "invalid-role", "message": "Invalid role"}, 400
member = db.session.get(Account, str(member_id))
if not member:
abort(404)
@ -215,11 +225,17 @@ class MemberUpdateRoleApi(Resource):
try:
assert member is not None, "Member not found"
TenantService.update_member_role(current_user.current_tenant, member, new_role, current_user)
except services.errors.account.CannotOperateSelfError as e:
return {"code": "cannot-operate-self", "message": str(e)}, 400
except services.errors.account.NoPermissionError as e:
return {"code": "forbidden", "message": str(e)}, 403
except services.errors.account.MemberNotInTenantError as e:
return {"code": "member-not-found", "message": str(e)}, 404
except services.errors.account.RoleAlreadyAssignedError as e:
return {"code": "role-already-assigned", "message": str(e)}, 400
except Exception as e:
raise ValueError(str(e))
# todo: 403
return {"result": "success"}

View File

@ -29,7 +29,7 @@ from controllers.console.wraps import (
from enums.cloud_plan import CloudPlan
from extensions.ext_database import db
from fields.base import ResponseModel
from libs.helper import TimestampField
from libs.helper import TimestampField, to_timestamp
from libs.login import current_account_with_tenant, login_required
from models.account import Tenant, TenantCustomConfigDict, TenantStatus
from services.account_service import TenantService
@ -86,9 +86,7 @@ class TenantInfoResponse(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_created_at(cls, value: datetime | int | None):
if isinstance(value, datetime):
return int(value.timestamp())
return value
return to_timestamp(value)
register_schema_models(

View File

@ -0,0 +1,120 @@
from flask import Blueprint
from flask_restx import Namespace
from libs.device_flow_security import attach_anti_framing
from libs.external_api import ExternalApi
bp = Blueprint("openapi", __name__, url_prefix="/openapi/v1")
attach_anti_framing(bp)
api = ExternalApi(
bp,
version="1.0",
title="OpenAPI",
description="User-scoped programmatic API (bearer auth)",
)
openapi_ns = Namespace("openapi", description="User-scoped operations", path="/")
# Register response/query models BEFORE importing controller modules so that
# @openapi_ns.response / @openapi_ns.expect decorators can resolve model names.
from controllers.common.schema import register_response_schema_models, register_schema_models
from controllers.openapi._models import (
AccountPayload,
AccountResponse,
AppDescribeInfo,
AppDescribeQuery,
AppDescribeResponse,
AppInfoResponse,
AppListQuery,
AppListResponse,
AppListRow,
AppRunRequest,
DeviceCodeRequest,
DeviceCodeResponse,
DeviceLookupQuery,
DeviceLookupResponse,
DeviceMutateRequest,
DeviceMutateResponse,
DevicePollRequest,
MessageMetadata,
PermittedExternalAppsListQuery,
PermittedExternalAppsListResponse,
RevokeResponse,
SessionListResponse,
SessionRow,
TagItem,
UsageInfo,
WorkflowRunData,
WorkspaceDetailResponse,
WorkspaceListResponse,
WorkspacePayload,
WorkspaceSummaryResponse,
)
register_schema_models(
openapi_ns,
AppDescribeQuery,
AppListQuery,
AppRunRequest,
DeviceCodeRequest,
DevicePollRequest,
DeviceLookupQuery,
DeviceMutateRequest,
PermittedExternalAppsListQuery,
)
register_response_schema_models(
openapi_ns,
TagItem,
UsageInfo,
MessageMetadata,
AppListRow,
AppListResponse,
AppInfoResponse,
AppDescribeInfo,
AppDescribeResponse,
WorkflowRunData,
AccountPayload,
WorkspacePayload,
AccountResponse,
SessionRow,
SessionListResponse,
PermittedExternalAppsListResponse,
RevokeResponse,
WorkspaceSummaryResponse,
WorkspaceListResponse,
WorkspaceDetailResponse,
DeviceCodeResponse,
DeviceLookupResponse,
DeviceMutateResponse,
)
from . import (
account,
app_run,
apps,
apps_permitted_external,
human_input_form,
index,
oauth_device,
oauth_device_sso,
workflow_events,
workspaces,
)
# Request models are imported from _models.py and registered above.
__all__ = [
"account",
"app_run",
"apps",
"apps_permitted_external",
"human_input_form",
"index",
"oauth_device",
"oauth_device_sso",
"workflow_events",
"workspaces",
]
api.add_namespace(openapi_ns)

View File

@ -0,0 +1,66 @@
"""Audit emission for openapi app-run endpoints.
Pattern: logger.info with extra={"audit": True, "event": "app.run.openapi", ...}
matches the existing oauth_device convention. The EE OTel exporter consults
its own allowlist to decide whether to ship the line.
"""
from __future__ import annotations
import logging
logger = logging.getLogger(__name__)
EVENT_APP_RUN_OPENAPI = "app.run.openapi"
EVENT_OPENAPI_WRONG_SURFACE_DENIED = "openapi.wrong_surface_denied"
def emit_app_run(
*,
app_id: str,
tenant_id: str,
caller_kind: str,
mode: str,
surface: str,
) -> None:
logger.info(
"audit: %s app_id=%s tenant_id=%s caller_kind=%s mode=%s surface=%s",
EVENT_APP_RUN_OPENAPI,
app_id,
tenant_id,
caller_kind,
mode,
surface,
extra={
"audit": True,
"event": EVENT_APP_RUN_OPENAPI,
"app_id": app_id,
"tenant_id": tenant_id,
"caller_kind": caller_kind,
"mode": mode,
"surface": surface,
},
)
def emit_wrong_surface(
*,
subject_type: str | None,
attempted_path: str,
client_id: str | None,
token_id: str | None,
) -> None:
logger.warning(
"audit: %s subject_type=%s attempted_path=%s",
EVENT_OPENAPI_WRONG_SURFACE_DENIED,
subject_type,
attempted_path,
extra={
"audit": True,
"event": EVENT_OPENAPI_WRONG_SURFACE_DENIED,
"subject_type": subject_type,
"attempted_path": attempted_path,
"client_id": client_id,
"token_id": token_id,
},
)

View File

@ -0,0 +1,143 @@
"""Server-side JSON Schema derivation from Dify `user_input_form`."""
from __future__ import annotations
from typing import Any, cast
from controllers.service_api.app.error import AppUnavailableError
from models import App
from models.model import AppMode
JSON_SCHEMA_DRAFT = "https://json-schema.org/draft/2020-12/schema"
EMPTY_INPUT_SCHEMA: dict[str, Any] = {
"$schema": JSON_SCHEMA_DRAFT,
"type": "object",
"properties": {},
"required": [],
}
_CHAT_FAMILY = frozenset({AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT})
def _file_object_shape() -> dict[str, Any]:
"""Single-file value shape. Forward-compat placeholder; refine when file-API contract pins."""
return {
"type": "object",
"properties": {
"type": {"type": "string"},
"transfer_method": {"type": "string"},
"url": {"type": "string"},
"upload_file_id": {"type": "string"},
},
"additionalProperties": True,
}
def _row_to_schema(row_type: str, row: dict[str, Any]) -> dict[str, Any] | None:
label = row.get("label") or row.get("variable", "")
base: dict[str, Any] = {"title": label} if label else {}
if row_type in ("text-input", "paragraph"):
out = {"type": "string"} | base
max_length = row.get("max_length")
if isinstance(max_length, int) and max_length > 0:
out["maxLength"] = max_length
return out
if row_type == "select":
return {"type": "string"} | base | {"enum": list(row.get("options") or [])}
if row_type == "number":
return {"type": "number"} | base
if row_type == "file":
return _file_object_shape() | base
if row_type == "file-list":
return {
"type": "array",
"items": _file_object_shape(),
} | base
return None
def _form_to_jsonschema(form: list[dict[str, Any]]) -> tuple[dict[str, Any], list[str]]:
"""Translate a user_input_form row list into (properties, required-list).
Each row is a single-key dict: `{"text-input": {variable, label, required, ...}}`.
Unknown variable types are skipped (forward-compat).
"""
properties: dict[str, Any] = {}
required: list[str] = []
for row in form:
if not isinstance(row, dict) or len(row) != 1:
continue
((row_type, row_body),) = row.items()
if not isinstance(row_body, dict):
continue
variable = row_body.get("variable")
if not variable:
continue
schema = _row_to_schema(row_type, row_body)
if schema is None:
continue
properties[variable] = schema
if row_body.get("required"):
required.append(variable)
return properties, required
def resolve_app_config(app: App) -> tuple[dict[str, Any], list[dict[str, Any]]]:
"""Resolve `(features_dict, user_input_form)` for parameters / schema derivation.
Raises `AppUnavailableError` on misconfigured apps.
"""
if app.mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:
workflow = app.workflow
if workflow is None:
raise AppUnavailableError()
return (
workflow.features_dict,
cast(list[dict[str, Any]], workflow.user_input_form(to_old_structure=True)),
)
app_model_config = app.app_model_config
if app_model_config is None:
raise AppUnavailableError()
features_dict = cast(dict[str, Any], app_model_config.to_dict())
return features_dict, cast(list[dict[str, Any]], features_dict.get("user_input_form", []))
def build_input_schema(app: App) -> dict[str, Any]:
"""Derive Draft 2020-12 JSON Schema from `user_input_form` + app mode.
chat / agent-chat / advanced-chat: top-level `query` (required, minLength=1) + `inputs` object.
completion / workflow: `inputs` object only.
Raises `AppUnavailableError` on misconfigured apps.
"""
_, user_input_form = resolve_app_config(app)
inputs_props, inputs_required = _form_to_jsonschema(user_input_form)
properties: dict[str, Any] = {}
required: list[str] = []
if app.mode in _CHAT_FAMILY:
properties["query"] = {"type": "string", "minLength": 1}
required.append("query")
properties["inputs"] = {
"type": "object",
"properties": inputs_props,
"required": inputs_required,
"additionalProperties": False,
}
required.append("inputs")
return {
"$schema": JSON_SCHEMA_DRAFT,
"type": "object",
"properties": properties,
"required": required,
}

View File

@ -0,0 +1,319 @@
"""Shared response substructures for openapi endpoints."""
from __future__ import annotations
from typing import Any, Literal
from pydantic import BaseModel, ConfigDict, Field, field_validator
from libs.helper import UUIDStrOrEmpty, uuid_value
from models.model import AppMode
# Server-side cap on `limit` query param for /openapi/v1/* list endpoints.
MAX_PAGE_LIMIT = 200
class UsageInfo(BaseModel):
prompt_tokens: int = 0
completion_tokens: int = 0
total_tokens: int = 0
class MessageMetadata(BaseModel):
usage: UsageInfo | None = None
retriever_resources: list[dict[str, Any]] = []
class PaginationEnvelope[T](BaseModel):
"""Canonical pagination envelope for `/openapi/v1/*` list endpoints."""
page: int
limit: int
total: int
has_more: bool
data: list[T]
@classmethod
def build(cls, *, page: int, limit: int, total: int, items: list[T]) -> PaginationEnvelope[T]:
return cls(page=page, limit=limit, total=total, has_more=page * limit < total, data=items)
class TagItem(BaseModel):
name: str
class AppListRow(BaseModel):
id: str
name: str
description: str | None = None
mode: AppMode
tags: list[TagItem] = []
updated_at: str | None = None
created_by_name: str | None = None
workspace_id: str | None = None
workspace_name: str | None = None
class AppListResponse(BaseModel):
page: int
limit: int
total: int
has_more: bool
data: list[AppListRow]
class PermittedExternalAppsListResponse(BaseModel):
page: int
limit: int
total: int
has_more: bool
data: list[AppListRow]
class AppInfoResponse(BaseModel):
id: str
name: str
description: str | None = None
mode: str
author: str | None = None
tags: list[TagItem] = []
class AppDescribeInfo(AppInfoResponse):
updated_at: str | None = None
service_api_enabled: bool
is_agent: bool = False
class AppDescribeResponse(BaseModel):
info: AppDescribeInfo | None = None
parameters: dict[str, Any] | None = None
input_schema: dict[str, Any] | None = None
class ChatMessageResponse(BaseModel):
event: str
task_id: str
id: str
message_id: str
conversation_id: str
mode: str
answer: str
metadata: MessageMetadata = Field(default_factory=MessageMetadata)
created_at: int
class CompletionMessageResponse(BaseModel):
event: str
task_id: str
id: str
message_id: str
mode: str
answer: str
metadata: MessageMetadata = Field(default_factory=MessageMetadata)
created_at: int
class WorkflowRunData(BaseModel):
id: str
workflow_id: str
status: str
outputs: dict[str, Any] = Field(default_factory=dict)
error: str | None = None
elapsed_time: float | None = None
total_tokens: int | None = None
total_steps: int | None = None
created_at: int | None = None
finished_at: int | None = None
class WorkflowRunResponse(BaseModel):
workflow_run_id: str
task_id: str
mode: Literal["workflow"] = "workflow"
data: WorkflowRunData
class AccountPayload(BaseModel):
id: str
email: str
name: str
class WorkspacePayload(BaseModel):
id: str
name: str
role: str
class AccountResponse(BaseModel):
subject_type: str
subject_email: str | None = None
subject_issuer: str | None = None
account: AccountPayload | None = None
workspaces: list[WorkspacePayload] = []
default_workspace_id: str | None = None
class SessionRow(BaseModel):
id: str
prefix: str
client_id: str
device_label: str
created_at: str | None = None
last_used_at: str | None = None
expires_at: str | None = None
class SessionListResponse(BaseModel):
page: int
limit: int
total: int
has_more: bool
data: list[SessionRow]
class RevokeResponse(BaseModel):
status: str
class WorkspaceSummaryResponse(BaseModel):
id: str
name: str
role: str
status: str
current: bool
class WorkspaceListResponse(BaseModel):
workspaces: list[WorkspaceSummaryResponse]
class WorkspaceDetailResponse(BaseModel):
id: str
name: str
role: str
status: str
current: bool
created_at: str | None = None
class DeviceCodeResponse(BaseModel):
device_code: str
user_code: str
verification_uri: str
expires_in: int
interval: int
class DeviceLookupResponse(BaseModel):
valid: bool
expires_in_remaining: int = 0
client_id: str | None = None
class DeviceMutateResponse(BaseModel):
status: str
class AppDescribeQuery(BaseModel):
"""`?fields=` allow-list for GET /apps/<id>/describe.
Empty / omitted → all blocks. Unknown member → ValidationError → 422.
"""
model_config = ConfigDict(extra="forbid")
fields: set[str] | None = None
workspace_id: str | None = None
@field_validator("workspace_id", mode="before")
@classmethod
def _validate_workspace_id(cls, v: object) -> str | None:
if v is None or v == "":
return None
if not isinstance(v, str):
raise ValueError("workspace_id must be a string")
try:
import uuid as _uuid
_uuid.UUID(v)
except ValueError:
raise ValueError("workspace_id must be a valid UUID")
return v
@field_validator("fields", mode="before")
@classmethod
def _parse_fields(cls, v: object) -> set[str] | None:
if v is None or v == "":
return None
if not isinstance(v, str):
raise ValueError("fields must be a comma-separated string")
_ALLOWED_DESCRIBE_FIELDS = frozenset({"info", "parameters", "input_schema"})
members = {m.strip() for m in v.split(",") if m.strip()}
unknown = members - _ALLOWED_DESCRIBE_FIELDS
if unknown:
raise ValueError(f"unknown field(s): {sorted(unknown)}")
return members
class AppListQuery(BaseModel):
"""mode is a closed enum."""
workspace_id: str
page: int = Field(1, ge=1)
limit: int = Field(20, ge=1, le=MAX_PAGE_LIMIT)
mode: AppMode | None = None
name: str | None = Field(None, max_length=200)
tag: str | None = Field(None, max_length=100)
class AppRunRequest(BaseModel):
inputs: dict[str, Any]
query: str | None = None
files: list[dict[str, Any]] | None = None
conversation_id: UUIDStrOrEmpty | None = None
auto_generate_name: bool = True
workflow_id: str | None = None
workspace_id: UUIDStrOrEmpty | None = None
@field_validator("conversation_id", mode="before")
@classmethod
def _normalize_conv(cls, value: str | None) -> str | None:
if isinstance(value, str):
value = value.strip()
if not value:
return None
try:
return uuid_value(value)
except ValueError as exc:
raise ValueError("conversation_id must be a valid UUID") from exc
class DeviceCodeRequest(BaseModel):
client_id: str
device_label: str
class DevicePollRequest(BaseModel):
device_code: str
client_id: str
class DeviceLookupQuery(BaseModel):
user_code: str
class DeviceMutateRequest(BaseModel):
user_code: str
class PermittedExternalAppsListQuery(BaseModel):
"""Strict (extra='forbid')."""
model_config = ConfigDict(extra="forbid")
page: int = Field(1, ge=1)
limit: int = Field(20, ge=1, le=MAX_PAGE_LIMIT)
mode: AppMode | None = None
name: str | None = Field(None, max_length=200)

View File

@ -0,0 +1,249 @@
"""User-scoped account endpoints. /account is the bearer-authed
identity read; /account/sessions and /account/sessions/<id> manage
the user's active OAuth tokens.
"""
from __future__ import annotations
from datetime import UTC, datetime
from flask import g, request
from flask_restx import Resource
from sqlalchemy import and_, select, update
from werkzeug.exceptions import BadRequest, NotFound
from controllers.openapi import openapi_ns
from controllers.openapi._models import (
MAX_PAGE_LIMIT,
AccountPayload,
AccountResponse,
PaginationEnvelope,
RevokeResponse,
SessionListResponse,
SessionRow,
WorkspacePayload,
)
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from libs.oauth_bearer import (
ACCEPT_USER_ANY,
TOKEN_CACHE_KEY_FMT,
AuthContext,
SubjectType,
validate_bearer,
)
from libs.rate_limit import (
LIMIT_ME_PER_ACCOUNT,
LIMIT_ME_PER_EMAIL,
enforce,
)
from models import Account, OAuthAccessToken, Tenant, TenantAccountJoin
@openapi_ns.route("/account")
class AccountApi(Resource):
@openapi_ns.response(200, "Account info", openapi_ns.models[AccountResponse.__name__])
@validate_bearer(accept=ACCEPT_USER_ANY)
def get(self):
ctx = g.auth_ctx
if ctx.subject_type == SubjectType.EXTERNAL_SSO:
enforce(LIMIT_ME_PER_EMAIL, key=f"subject:{ctx.subject_email}")
else:
enforce(LIMIT_ME_PER_ACCOUNT, key=f"account:{ctx.account_id}")
if ctx.subject_type == SubjectType.EXTERNAL_SSO:
return AccountResponse(
subject_type=ctx.subject_type,
subject_email=ctx.subject_email,
subject_issuer=ctx.subject_issuer,
account=None,
workspaces=[],
default_workspace_id=None,
).model_dump(mode="json")
account = (
db.session.query(Account).filter(Account.id == ctx.account_id).one_or_none() if ctx.account_id else None
)
memberships = _load_memberships(ctx.account_id) if ctx.account_id else []
default_ws_id = _pick_default_workspace(memberships)
return AccountResponse(
subject_type=ctx.subject_type,
subject_email=ctx.subject_email or (account.email if account else None),
account=_account_payload(account) if account else None,
workspaces=[_workspace_payload(m) for m in memberships],
default_workspace_id=default_ws_id,
).model_dump(mode="json")
@openapi_ns.route("/account/sessions/self")
class AccountSessionsSelfApi(Resource):
@openapi_ns.response(200, "Session revoked", openapi_ns.models[RevokeResponse.__name__])
@validate_bearer(accept=ACCEPT_USER_ANY)
def delete(self):
ctx = g.auth_ctx
_require_oauth_subject(ctx)
_revoke_token_by_id(str(ctx.token_id))
return RevokeResponse(status="revoked").model_dump(mode="json"), 200
@openapi_ns.route("/account/sessions")
class AccountSessionsApi(Resource):
@openapi_ns.response(200, "Session list", openapi_ns.models[SessionListResponse.__name__])
@validate_bearer(accept=ACCEPT_USER_ANY)
def get(self):
ctx = g.auth_ctx
now = datetime.now(UTC)
page = int(request.args.get("page", "1"))
limit = min(int(request.args.get("limit", "100")), MAX_PAGE_LIMIT)
all_rows = db.session.execute(
select(
OAuthAccessToken.id,
OAuthAccessToken.prefix,
OAuthAccessToken.client_id,
OAuthAccessToken.device_label,
OAuthAccessToken.created_at,
OAuthAccessToken.last_used_at,
OAuthAccessToken.expires_at,
)
.where(
and_(
*_subject_match(ctx),
OAuthAccessToken.revoked_at.is_(None),
OAuthAccessToken.token_hash.is_not(None),
OAuthAccessToken.expires_at > now,
)
)
.order_by(OAuthAccessToken.created_at.desc())
).all()
total = len(all_rows)
sliced = all_rows[(page - 1) * limit : page * limit]
items = [
SessionRow(
id=str(r.id),
prefix=r.prefix,
client_id=r.client_id,
device_label=r.device_label,
created_at=_iso(r.created_at),
last_used_at=_iso(r.last_used_at),
expires_at=_iso(r.expires_at),
)
for r in sliced
]
return (
PaginationEnvelope.build(page=page, limit=limit, total=total, items=items).model_dump(mode="json"),
200,
)
@openapi_ns.route("/account/sessions/<string:session_id>")
class AccountSessionByIdApi(Resource):
@openapi_ns.response(200, "Session revoked", openapi_ns.models[RevokeResponse.__name__])
@validate_bearer(accept=ACCEPT_USER_ANY)
def delete(self, session_id: str):
ctx = g.auth_ctx
_require_oauth_subject(ctx)
# Subject-match guard. 404 (not 403) on cross-subject so the
# endpoint doesn't leak token IDs that belong to other subjects.
owns = db.session.execute(
select(OAuthAccessToken.id).where(
and_(
OAuthAccessToken.id == session_id,
*_subject_match(ctx),
)
)
).first()
if owns is None:
raise NotFound("session not found")
_revoke_token_by_id(session_id)
return RevokeResponse(status="revoked").model_dump(mode="json"), 200
def _subject_match(ctx: AuthContext) -> tuple:
"""Where-clauses that scope a query to the bearer's subject. Works
for both account (account_id) and external_sso (email + issuer).
"""
if ctx.subject_type == SubjectType.ACCOUNT:
return (OAuthAccessToken.account_id == str(ctx.account_id),)
return (
OAuthAccessToken.subject_email == ctx.subject_email,
OAuthAccessToken.subject_issuer == ctx.subject_issuer,
OAuthAccessToken.account_id.is_(None),
)
def _require_oauth_subject(ctx: AuthContext) -> None:
if not ctx.source.startswith("oauth"):
raise BadRequest(
"this endpoint revokes OAuth bearer tokens; use /openapi/v1/personal-access-tokens/self for PATs"
)
def _revoke_token_by_id(token_id: str) -> None:
# Snapshot pre-revoke hash for cache invalidation; UPDATE WHERE
# makes double-revoke idempotent.
row = (
db.session.query(OAuthAccessToken.token_hash)
.filter(
OAuthAccessToken.id == token_id,
OAuthAccessToken.revoked_at.is_(None),
)
.one_or_none()
)
pre_revoke_hash = row[0] if row else None
stmt = (
update(OAuthAccessToken)
.where(
OAuthAccessToken.id == token_id,
OAuthAccessToken.revoked_at.is_(None),
)
.values(revoked_at=datetime.now(UTC), token_hash=None)
)
db.session.execute(stmt)
db.session.commit()
if pre_revoke_hash:
redis_client.delete(TOKEN_CACHE_KEY_FMT.format(hash=pre_revoke_hash))
def _iso(dt: datetime | None) -> str | None:
if dt is None:
return None
if dt.tzinfo is None:
dt = dt.replace(tzinfo=UTC)
return dt.isoformat().replace("+00:00", "Z")
def _load_memberships(account_id):
return (
db.session.query(TenantAccountJoin, Tenant)
.join(Tenant, Tenant.id == TenantAccountJoin.tenant_id)
.filter(TenantAccountJoin.account_id == account_id)
.all()
)
def _pick_default_workspace(memberships) -> str | None:
if not memberships:
return None
for join, tenant in memberships:
if getattr(join, "current", False):
return str(tenant.id)
return str(memberships[0][1].id)
def _workspace_payload(row) -> WorkspacePayload:
join, tenant = row
return WorkspacePayload(id=str(tenant.id), name=tenant.name, role=getattr(join, "role", ""))
def _account_payload(account) -> AccountPayload:
return AccountPayload(id=str(account.id), email=account.email, name=account.name)

View File

@ -0,0 +1,165 @@
"""POST /openapi/v1/apps/<app_id>/run — mode-agnostic runner."""
from __future__ import annotations
import logging
from collections.abc import Callable, Iterator
from contextlib import contextmanager
from typing import Any
from flask import request
from flask_restx import Resource
from pydantic import ValidationError
from werkzeug.exceptions import BadRequest, HTTPException, InternalServerError, NotFound, UnprocessableEntity
import services
from controllers.openapi import openapi_ns
from controllers.openapi._audit import emit_app_run
from controllers.openapi._models import AppRunRequest
from controllers.openapi.auth.composition import OAUTH_BEARER_PIPELINE
from controllers.service_api.app.error import (
AppUnavailableError,
CompletionRequestError,
ConversationCompletedError,
ProviderModelCurrentlyNotSupportError,
ProviderNotInitializeError,
ProviderQuotaExceededError,
)
from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpError
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.errors.error import (
ModelCurrentlyNotSupportError,
ProviderTokenNotInitError,
QuotaExceededError,
)
from extensions.ext_redis import redis_client
from graphon.graph_engine.manager import GraphEngineManager
from graphon.model_runtime.errors.invoke import InvokeError
from libs import helper
from libs.oauth_bearer import Scope
from models.model import App, AppMode
from services.app_generate_service import AppGenerateService
from services.errors.app import (
IsDraftWorkflowError,
WorkflowIdFormatError,
WorkflowNotFoundError,
)
from services.errors.llm import InvokeRateLimitError
logger = logging.getLogger(__name__)
@contextmanager
def _translate_service_errors() -> Iterator[None]:
try:
yield
except WorkflowNotFoundError as ex:
raise NotFound(str(ex))
except (IsDraftWorkflowError, WorkflowIdFormatError) as ex:
raise BadRequest(str(ex))
except services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")
except services.errors.conversation.ConversationCompletedError:
raise ConversationCompletedError()
except services.errors.app_model_config.AppModelConfigBrokenError:
logger.exception("App model config broken.")
raise AppUnavailableError()
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
except QuotaExceededError:
raise ProviderQuotaExceededError()
except ModelCurrentlyNotSupportError:
raise ProviderModelCurrentlyNotSupportError()
except InvokeRateLimitError as ex:
raise InvokeRateLimitHttpError(ex.description)
except InvokeError as e:
raise CompletionRequestError(e.description)
def _generate(app: App, caller: Any, args: dict[str, Any], streaming: bool):
return AppGenerateService.generate(
app_model=app,
user=caller,
args=args,
invoke_from=InvokeFrom.OPENAPI,
streaming=streaming,
)
def _run_chat(app: App, caller: Any, payload: AppRunRequest):
if not payload.query or not payload.query.strip():
raise UnprocessableEntity("query_required_for_chat")
args = payload.model_dump(exclude_none=True)
with _translate_service_errors():
return _generate(app, caller, args, streaming=True)
def _run_completion(app: App, caller: Any, payload: AppRunRequest):
args = payload.model_dump(exclude_none=True)
args["auto_generate_name"] = False
args.setdefault("query", "")
with _translate_service_errors():
return _generate(app, caller, args, streaming=True)
def _run_workflow(app: App, caller: Any, payload: AppRunRequest):
if payload.query is not None:
raise UnprocessableEntity("query_not_supported_for_workflow")
args = payload.model_dump(exclude={"query", "conversation_id", "auto_generate_name"}, exclude_none=True)
with _translate_service_errors():
return _generate(app, caller, args, streaming=True)
_DISPATCH: dict[AppMode, Callable[[App, Any, AppRunRequest], Any]] = {
AppMode.CHAT: _run_chat,
AppMode.AGENT_CHAT: _run_chat,
AppMode.ADVANCED_CHAT: _run_chat,
AppMode.COMPLETION: _run_completion,
AppMode.WORKFLOW: _run_workflow,
}
@openapi_ns.route("/apps/<string:app_id>/run")
class AppRunApi(Resource):
@openapi_ns.expect(openapi_ns.models[AppRunRequest.__name__])
@openapi_ns.response(200, "Run result (SSE stream)")
@OAUTH_BEARER_PIPELINE.guard(scope=Scope.APPS_RUN)
def post(self, app_id: str, app_model: App, caller, caller_kind: str):
body = request.get_json(silent=True) or {}
try:
payload = AppRunRequest.model_validate(body)
except ValidationError as exc:
raise UnprocessableEntity(exc.json())
handler = _DISPATCH.get(app_model.mode)
if handler is None:
raise UnprocessableEntity("mode_not_runnable")
try:
stream_obj = handler(app_model, caller, payload)
except HTTPException:
raise
except Exception:
logger.exception("internal server error.")
raise InternalServerError()
emit_app_run(
app_id=app_model.id,
tenant_id=app_model.tenant_id,
caller_kind=caller_kind,
mode=str(app_model.mode),
surface="apps",
)
return helper.compact_generate_response(stream_obj)
@openapi_ns.route("/apps/<string:app_id>/tasks/<string:task_id>/stop")
class AppRunTaskStopApi(Resource):
@openapi_ns.response(200, "Task stopped")
@OAUTH_BEARER_PIPELINE.guard(scope=Scope.APPS_RUN)
def post(self, app_id: str, task_id: str, app_model: App, caller, caller_kind: str):
AppQueueManager.set_stop_flag_no_user_check(task_id)
GraphEngineManager(redis_client).send_stop_command(task_id)
return {"result": "success"}

View File

@ -0,0 +1,280 @@
"""GET /openapi/v1/apps and per-app reads.
Decorator order: `method_decorators` is innermost-first. `validate_bearer`
is last → outermost → sets `g.auth_ctx` before `require_scope` reads it.
"""
from __future__ import annotations
import uuid as _uuid
from typing import Any
import sqlalchemy as sa
from flask import g, request
from flask_restx import Resource
from pydantic import ValidationError
from werkzeug.exceptions import Conflict, NotFound, UnprocessableEntity
from controllers.common.fields import Parameters
from controllers.common.schema import query_params_from_model
from controllers.openapi import openapi_ns
from controllers.openapi._input_schema import EMPTY_INPUT_SCHEMA, build_input_schema, resolve_app_config
from controllers.openapi._models import (
AppDescribeInfo,
AppDescribeQuery,
AppDescribeResponse,
AppListQuery,
AppListResponse,
AppListRow,
TagItem,
)
from controllers.openapi.auth.surface_gate import accept_subjects
from controllers.service_api.app.error import AppUnavailableError
from core.app.app_config.common.parameters_mapping import get_parameters_from_feature_dict
from extensions.ext_database import db
from libs.oauth_bearer import (
ACCEPT_USER_ANY,
AuthContext,
Scope,
SubjectType,
require_scope,
require_workspace_member,
validate_bearer,
)
from models import App, Tenant
from services.app_service import AppListParams, AppService
from services.openapi.visibility import apply_openapi_gate, is_openapi_visible
from services.tag_service import TagService
_APPS_READ_DECORATORS = [
require_scope(Scope.APPS_READ),
accept_subjects(SubjectType.ACCOUNT),
validate_bearer(accept=ACCEPT_USER_ANY),
]
_ALLOWED_DESCRIBE_FIELDS: frozenset[str] = frozenset({"info", "parameters", "input_schema"})
_EMPTY_PARAMETERS: dict[str, Any] = {
"opening_statement": None,
"suggested_questions": [],
"user_input_form": [],
"file_upload": None,
"system_parameters": {},
}
class AppReadResource(Resource):
"""Base for per-app read endpoints; subclasses call `_load()` for SSO/membership/exists checks."""
method_decorators = _APPS_READ_DECORATORS
def _load(self, app_id: str, workspace_id: str | None = None) -> tuple[App, AuthContext]:
ctx: AuthContext = g.auth_ctx
try:
parsed_uuid = _uuid.UUID(app_id)
is_uuid = True
except ValueError:
parsed_uuid = None
is_uuid = False
if is_uuid:
app = db.session.get(App, str(parsed_uuid)) # normalised dashed form
if not app or app.status != "normal" or not is_openapi_visible(app):
raise NotFound("app not found")
else:
if not workspace_id:
raise UnprocessableEntity("workspace_id is required for name-based lookup")
matches = list(
db.session.execute(
apply_openapi_gate(
sa.select(App).where(
App.name == app_id,
App.tenant_id == workspace_id,
App.status == "normal",
)
)
).scalars()
)
if len(matches) == 0:
raise NotFound("app not found")
if len(matches) > 1:
lines = [f"app name {app_id!r} is ambiguous — re-run with a UUID:\n\n"]
lines.append(f" {'ID':<36} {'MODE':<12} NAME\n")
for m in matches:
lines.append(f" {str(m.id):<36} {str(m.mode.value):<12} {m.name}\n")
raise Conflict("".join(lines))
app = matches[0]
require_workspace_member(ctx, str(app.tenant_id))
return app, ctx
def parameters_payload(app: App) -> dict:
"""Mirrors service_api/app/app.py::AppParameterApi response body."""
features_dict, user_input_form = resolve_app_config(app)
parameters = get_parameters_from_feature_dict(features_dict=features_dict, user_input_form=user_input_form)
return Parameters.model_validate(parameters).model_dump(mode="json")
@openapi_ns.route("/apps/<string:app_id>/describe")
class AppDescribeApi(AppReadResource):
@openapi_ns.doc(params=query_params_from_model(AppDescribeQuery))
@openapi_ns.response(200, "App description", openapi_ns.models[AppDescribeResponse.__name__])
def get(self, app_id: str):
try:
query = AppDescribeQuery.model_validate(request.args.to_dict(flat=True))
except ValidationError as exc:
raise UnprocessableEntity(exc.json())
app, _ = self._load(app_id, workspace_id=query.workspace_id)
requested = query.fields
want_info = requested is None or "info" in requested
want_params = requested is None or "parameters" in requested
want_schema = requested is None or "input_schema" in requested
info = (
AppDescribeInfo(
id=str(app.id),
name=app.name,
mode=app.mode,
description=app.description,
tags=[TagItem(name=t.name) for t in app.tags],
author=app.author_name,
updated_at=app.updated_at.isoformat() if app.updated_at else None,
service_api_enabled=bool(app.enable_api),
is_agent=app.mode in ("agent-chat", "advanced-chat"),
)
if want_info
else None
)
parameters: dict[str, Any] | None = None
input_schema: dict[str, Any] | None = None
if want_params:
try:
parameters = parameters_payload(app)
except AppUnavailableError:
parameters = dict(_EMPTY_PARAMETERS)
if want_schema:
try:
input_schema = build_input_schema(app)
except AppUnavailableError:
input_schema = dict(EMPTY_INPUT_SCHEMA)
return (
AppDescribeResponse(
info=info,
parameters=parameters,
input_schema=input_schema,
).model_dump(mode="json", exclude_none=False),
200,
)
@openapi_ns.route("/apps")
class AppListApi(Resource):
method_decorators = _APPS_READ_DECORATORS
@openapi_ns.doc(params=query_params_from_model(AppListQuery))
@openapi_ns.response(200, "App list", openapi_ns.models[AppListResponse.__name__])
def get(self):
ctx: AuthContext = g.auth_ctx
try:
query: AppListQuery = AppListQuery.model_validate(request.args.to_dict(flat=True))
except ValidationError as exc:
raise UnprocessableEntity(exc.json())
workspace_id = query.workspace_id
require_workspace_member(ctx, workspace_id)
empty = (
AppListResponse(page=query.page, limit=query.limit, total=0, has_more=False, data=[]).model_dump(
mode="json"
),
200,
)
if query.name:
try:
parsed_uuid = _uuid.UUID(query.name)
except ValueError:
parsed_uuid = None
else:
parsed_uuid = None
if parsed_uuid is not None:
app: App = db.session.get(App, str(parsed_uuid))
if not app or app.status != "normal" or str(app.tenant_id) != workspace_id or not is_openapi_visible(app):
return empty
tenant_name = db.session.execute(
sa.select(Tenant.name).where(Tenant.id == workspace_id)
).scalar_one_or_none()
item = AppListRow(
id=str(app.id),
name=app.name,
description=app.description,
mode=app.mode,
tags=[TagItem(name=t.name) for t in app.tags],
updated_at=app.updated_at.isoformat() if app.updated_at else None,
created_by_name=getattr(app, "author_name", None),
workspace_id=str(workspace_id),
workspace_name=tenant_name,
)
env = AppListResponse(page=1, limit=1, total=1, has_more=False, data=[item])
return env.model_dump(mode="json"), 200
tag_ids: list[str] | None = None
if query.tag:
tags = TagService.get_tag_by_tag_name("app", workspace_id, query.tag)
if not tags:
return empty
tag_ids = [tag.id for tag in tags]
params = AppListParams(
page=query.page,
limit=query.limit,
mode=query.mode.value if query.mode else "all",
name=query.name,
tag_ids=tag_ids,
status="normal",
# Visibility gate pushed into the query — pagination.total stays
# consistent across pages because invisible rows never count.
openapi_visible=True,
)
pagination = AppService().get_paginate_apps(ctx.account_id, workspace_id, params)
if pagination is None:
return empty
tenant_name: str | None = None
if pagination.items:
tenant_name = db.session.execute(
sa.select(Tenant.name).where(Tenant.id == workspace_id)
).scalar_one_or_none()
items = [
AppListRow(
id=str(r.id),
name=r.name,
description=r.description,
mode=r.mode,
tags=[TagItem(name=t.name) for t in r.tags],
updated_at=r.updated_at.isoformat() if r.updated_at else None,
created_by_name=getattr(r, "author_name", None),
workspace_id=str(workspace_id),
workspace_name=tenant_name,
)
for r in pagination.items
]
env = AppListResponse(
page=query.page,
limit=query.limit,
total=int(pagination.total),
has_more=query.page * query.limit < int(pagination.total),
data=items,
)
return env.model_dump(mode="json"), 200

View File

@ -0,0 +1,107 @@
"""GET /openapi/v1/permitted-external-apps — external-subject app discovery (EE only).
`dfoe_` (External SSO) callers reach apps gated by ACL access-mode
(public / sso_verified). License-gated: CE deploys never enable the
EE blueprint chain so this module is unreachable there.
"""
from __future__ import annotations
import sqlalchemy as sa
from flask import request
from flask_restx import Resource
from pydantic import ValidationError
from werkzeug.exceptions import UnprocessableEntity
from controllers.openapi import openapi_ns
from controllers.openapi._models import (
AppListRow,
PermittedExternalAppsListQuery,
PermittedExternalAppsListResponse,
)
from controllers.openapi.auth.surface_gate import accept_subjects
from extensions.ext_database import db
from libs.device_flow_security import enterprise_only
from libs.oauth_bearer import (
ACCEPT_USER_ANY,
Scope,
SubjectType,
require_scope,
validate_bearer,
)
from models import App, Tenant
from services.enterprise.app_permitted_service import list_permitted_apps
from services.openapi.license_gate import license_required
from services.openapi.visibility import apply_openapi_gate
@openapi_ns.route("/permitted-external-apps")
class PermittedExternalAppsListApi(Resource):
method_decorators = [
require_scope(Scope.APPS_READ_PERMITTED_EXTERNAL),
license_required,
accept_subjects(SubjectType.EXTERNAL_SSO),
validate_bearer(accept=ACCEPT_USER_ANY),
enterprise_only,
]
@openapi_ns.response(
200, "Permitted external apps list", openapi_ns.models[PermittedExternalAppsListResponse.__name__]
)
def get(self):
try:
query = PermittedExternalAppsListQuery.model_validate(request.args.to_dict(flat=True))
except ValidationError as exc:
raise UnprocessableEntity(exc.json())
page_result = list_permitted_apps(
page=query.page,
limit=query.limit,
mode=query.mode.value if query.mode else None,
name=query.name,
)
if not page_result.app_ids:
env = PermittedExternalAppsListResponse(
page=query.page, limit=query.limit, total=page_result.total, has_more=False, data=[]
)
return env.model_dump(mode="json"), 200
apps_by_id: dict[str, App] = {
str(a.id): a
for a in db.session.execute(apply_openapi_gate(sa.select(App).where(App.id.in_(page_result.app_ids))))
.scalars()
.all()
}
tenant_ids = list({a.tenant_id for a in apps_by_id.values()})
tenants_by_id = {
str(t.id): t for t in db.session.execute(sa.select(Tenant).where(Tenant.id.in_(tenant_ids))).scalars().all()
}
items: list[AppListRow] = []
for app_id in page_result.app_ids:
app = apps_by_id.get(app_id)
if not app or app.status != "normal":
continue
tenant = tenants_by_id.get(str(app.tenant_id))
items.append(
AppListRow(
id=str(app.id),
name=app.name,
description=app.description,
mode=app.mode,
tags=[], # tenant-scoped; not surfaced cross-tenant
updated_at=app.updated_at.isoformat() if app.updated_at else None,
created_by_name=None, # cross-tenant author leak prevention
workspace_id=str(app.tenant_id),
workspace_name=tenant.name if tenant else None,
)
)
env = PermittedExternalAppsListResponse(
page=query.page,
limit=query.limit,
total=page_result.total,
has_more=query.page * query.limit < page_result.total,
data=items,
)
return env.model_dump(mode="json"), 200

View File

@ -0,0 +1,3 @@
from controllers.openapi.auth.composition import OAUTH_BEARER_PIPELINE
__all__ = ["OAUTH_BEARER_PIPELINE"]

View File

@ -0,0 +1,46 @@
"""`OAUTH_BEARER_PIPELINE` — the auth scheme for openapi `/run` endpoints.
Endpoints attach via `@OAUTH_BEARER_PIPELINE.guard(scope=…)`. No alternative
paths. Read endpoints (`/apps`, `/info`, `/parameters`, `/describe`) skip
the pipeline and use `validate_bearer + require_scope + require_workspace_member`
inline — they don't need `AppAuthzCheck`/`CallerMount`.
"""
from __future__ import annotations
from controllers.openapi.auth.pipeline import Pipeline
from controllers.openapi.auth.steps import (
AppAuthzCheck,
AppResolver,
BearerCheck,
CallerMount,
ScopeCheck,
SurfaceCheck,
WorkspaceMembershipCheck,
)
from controllers.openapi.auth.strategies import (
AccountMounter,
AclStrategy,
AppAuthzStrategy,
EndUserMounter,
MembershipStrategy,
)
from libs.oauth_bearer import SubjectType
from services.feature_service import FeatureService
def _resolve_app_authz_strategy() -> AppAuthzStrategy:
if FeatureService.get_system_features().webapp_auth.enabled:
return AclStrategy()
return MembershipStrategy()
OAUTH_BEARER_PIPELINE = Pipeline(
BearerCheck(),
SurfaceCheck(accepted=frozenset({SubjectType.ACCOUNT})),
ScopeCheck(),
AppResolver(),
WorkspaceMembershipCheck(),
AppAuthzCheck(_resolve_app_authz_strategy),
CallerMount(AccountMounter(), EndUserMounter()),
)

View File

@ -0,0 +1,46 @@
"""Mutable per-request context for the openapi auth pipeline.
Every field starts None / empty and is filled in by a step. The pipeline
is the only thing that should construct or mutate Context — handlers
read populated values via the decorator's kwargs unpacking.
"""
from __future__ import annotations
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING, Literal, Protocol
from flask import Request
from libs.oauth_bearer import Scope, SubjectType
if TYPE_CHECKING:
from models import App, Tenant
@dataclass
class Context:
request: Request
required_scope: Scope
subject_type: SubjectType | None = None
subject_email: str | None = None
subject_issuer: str | None = None
account_id: uuid.UUID | None = None
scopes: frozenset[Scope] = field(default_factory=frozenset)
token_id: uuid.UUID | None = None
token_hash: str | None = None
cached_verified_tenants: dict[str, bool] | None = None
source: str | None = None
expires_at: datetime | None = None
app: App | None = None
tenant: Tenant | None = None
caller: object | None = None
caller_kind: Literal["account", "end_user"] | None = None
class Step(Protocol):
"""One responsibility. Mutate ctx or raise to short-circuit."""
def __call__(self, ctx: Context) -> None: ...

View File

@ -0,0 +1,41 @@
"""Pipeline IS the auth scheme.
`Pipeline.guard(scope=…)` is the only attachment point for endpoints —
that is the design lock-in: forgetting an auth layer is structurally
impossible because there is no "sometimes wrap, sometimes don't" choice.
"""
from __future__ import annotations
from functools import wraps
from flask import request
from controllers.openapi.auth.context import Context, Step
from libs.oauth_bearer import Scope
class Pipeline:
def __init__(self, *steps: Step) -> None:
self._steps = steps
def run(self, ctx: Context) -> None:
for step in self._steps:
step(ctx)
def guard(self, *, scope: Scope):
def decorator(view):
@wraps(view)
def decorated(*args, **kwargs):
ctx = Context(request=request, required_scope=scope)
self.run(ctx)
kwargs.update(
app_model=ctx.app,
caller=ctx.caller,
caller_kind=ctx.caller_kind,
)
return view(*args, **kwargs)
return decorated
return decorator

View File

@ -0,0 +1,172 @@
"""Pipeline steps. Each is one responsibility.
`BearerCheck` is the only step that touches the token registry; downstream
steps see only the populated `Context`. `BearerCheck` also assigns
``g.auth_ctx`` (the same way ``validate_bearer`` does) so the surface gate
+ any handler reading the request-scoped context has a single source of
truth across both auth-attach paths.
"""
from __future__ import annotations
from collections.abc import Callable
from flask import g
from werkzeug.exceptions import BadRequest, Forbidden, NotFound, Unauthorized
from configs import dify_config
from controllers.openapi.auth.context import Context
from controllers.openapi.auth.strategies import AppAuthzStrategy, CallerMounter
from controllers.openapi.auth.surface_gate import check_surface
from extensions.ext_database import db
from libs.oauth_bearer import (
AuthContext,
InvalidBearerError,
Scope,
SubjectType,
_extract_bearer, # type: ignore[attr-defined]
check_workspace_membership,
get_authenticator,
)
from models import App, Tenant, TenantStatus
class BearerCheck:
"""Resolve bearer → populate identity fields. Rate-limit is enforced
inside `BearerAuthenticator.authenticate`, so no separate step here.
Also attaches the resolved `AuthContext` to ``g.auth_ctx`` — same shape
the decorator-level ``validate_bearer`` writes — so the surface gate
+ downstream readers don't see two different identity sources."""
def __call__(self, ctx: Context) -> None:
token = _extract_bearer(ctx.request)
if not token:
raise Unauthorized("bearer required")
try:
authn = get_authenticator().authenticate(token)
except InvalidBearerError as e:
raise Unauthorized(str(e))
ctx.subject_type = authn.subject_type
ctx.subject_email = authn.subject_email
ctx.subject_issuer = authn.subject_issuer
ctx.account_id = authn.account_id
ctx.scopes = frozenset(authn.scopes)
ctx.source = authn.source
ctx.token_id = authn.token_id
ctx.expires_at = authn.expires_at
ctx.token_hash = authn.token_hash
ctx.cached_verified_tenants = dict(authn.verified_tenants)
# Single source of truth for the request-scoped identity. Surface
# gate + handlers read `g.auth_ctx` regardless of whether the route
# ran the decorator path (`validate_bearer`) or the pipeline path.
g.auth_ctx = authn
class ScopeCheck:
"""Verify ctx.scopes (already populated by BearerCheck) covers required."""
def __call__(self, ctx: Context) -> None:
if Scope.FULL in ctx.scopes or ctx.required_scope in ctx.scopes:
return
raise Forbidden("insufficient_scope")
class SurfaceCheck:
"""Reject the request if `g.auth_ctx.subject_type` is not in `accepted`.
Delegates to `surface_gate.check_surface` so the inline decorator and
the pipeline step emit identical audit events. Relies on `BearerCheck`
(above) having set `g.auth_ctx`.
"""
def __init__(self, *, accepted: frozenset[SubjectType]) -> None:
self._accepted = accepted
def __call__(self, ctx: Context) -> None:
check_surface(self._accepted)
class AppResolver:
"""Read app_id from request.view_args, populate ctx.app + ctx.tenant.
Every endpoint using the OAuth bearer pipeline must declare
``<string:app_id>`` in its route — that is the design lock-in (no body /
header coupling).
"""
def __call__(self, ctx: Context) -> None:
app_id = (ctx.request.view_args or {}).get("app_id")
if not app_id:
raise BadRequest("app_id is required in path")
app = db.session.get(App, app_id)
if not app or app.status != "normal":
raise NotFound("app not found")
if not app.enable_api:
raise Forbidden("service_api_disabled")
tenant = db.session.get(Tenant, app.tenant_id)
if tenant is None or tenant.status == TenantStatus.ARCHIVE:
raise Forbidden("workspace unavailable")
ctx.app, ctx.tenant = app, tenant
class WorkspaceMembershipCheck:
"""Layer 0 — workspace membership gate.
CE-only (skipped when ENTERPRISE_ENABLED). Account-subject bearers
(dfoa_) only — SSO subjects skip.
"""
def __call__(self, ctx: Context) -> None:
if dify_config.ENTERPRISE_ENABLED:
return
if ctx.subject_type != SubjectType.ACCOUNT:
return
if ctx.account_id is None or ctx.tenant is None:
raise Unauthorized("account_id or tenant unset — BearerCheck or AppResolver did not run")
if ctx.token_hash is None:
raise Unauthorized("token_hash unset — BearerCheck did not run")
check_workspace_membership(
account_id=ctx.account_id,
tenant_id=ctx.tenant.id,
token_hash=ctx.token_hash,
cached_verdicts=ctx.cached_verified_tenants or {},
)
class AppAuthzCheck:
def __init__(self, resolve_strategy: Callable[[], AppAuthzStrategy]) -> None:
self._resolve = resolve_strategy
def __call__(self, ctx: Context) -> None:
if not self._resolve().authorize(ctx):
raise Forbidden("subject_no_app_access")
class CallerMount:
def __init__(self, *mounters: CallerMounter) -> None:
self._mounters = mounters
def __call__(self, ctx: Context) -> None:
if ctx.subject_type is None:
raise Unauthorized("subject_type unset — BearerCheck did not run")
for m in self._mounters:
if m.applies_to(ctx.subject_type):
m.mount(ctx)
return
raise Unauthorized("no caller mounter for subject type")
__all__ = [
"AppAuthzCheck",
"AppResolver",
"AuthContext",
"BearerCheck",
"CallerMount",
"ScopeCheck",
"SurfaceCheck",
"WorkspaceMembershipCheck",
]

View File

@ -0,0 +1,184 @@
"""Strategy classes for the openapi auth pipeline.
App authorization (Acl/Membership) and caller mounting (Account/EndUser)
vary along independent axes; each strategy is one class so the pipeline
composition stays a flat list.
"""
from __future__ import annotations
import uuid
from typing import Protocol
from flask import current_app
from flask_login import user_logged_in
from sqlalchemy import select
from controllers.openapi.auth.context import Context
from core.app.entities.app_invoke_entities import InvokeFrom
from extensions.ext_database import db
from libs.oauth_bearer import SubjectType
from models import Account, TenantAccountJoin
from services.end_user_service import EndUserService
from services.enterprise.enterprise_service import (
EnterpriseService,
WebAppAccessMode,
)
class AppAuthzStrategy(Protocol):
def authorize(self, ctx: Context) -> bool: ...
class AclStrategy:
"""Per-app ACL, evaluated in two stages.
The EE gateway has already enforced tenancy and workspace membership
by the time this strategy runs, so AclStrategy only owns per-app ACL:
1. Subject vs access-mode compatibility (pure rule table). External-SSO
bearers belong to public-facing apps only; account bearers cover the
full set. A mismatch is an immediate deny — no IO.
2. For modes that pair with the subject, decide whether the inner
permission API must run. Only `PRIVATE` (per-app selected-user list)
requires it; the remaining modes are pass-through.
"""
_ALLOWED_MODES_BY_SUBJECT: dict[SubjectType, frozenset[WebAppAccessMode]] = {
SubjectType.ACCOUNT: frozenset(
{
WebAppAccessMode.PUBLIC,
WebAppAccessMode.SSO_VERIFIED,
WebAppAccessMode.PRIVATE_ALL,
WebAppAccessMode.PRIVATE,
}
),
SubjectType.EXTERNAL_SSO: frozenset(
{
WebAppAccessMode.PUBLIC,
WebAppAccessMode.SSO_VERIFIED,
}
),
}
_MODES_REQUIRING_INNER_CHECK: frozenset[WebAppAccessMode] = frozenset({WebAppAccessMode.PRIVATE})
def authorize(self, ctx: Context) -> bool:
if ctx.app is None:
return False
access_mode = self._fetch_access_mode(ctx.app.id)
if access_mode is None:
return False
if not self._subject_allowed_for_mode(ctx.subject_type, access_mode):
return False
if access_mode not in self._MODES_REQUIRING_INNER_CHECK:
return True
return self._inner_permission_check(ctx)
@staticmethod
def _fetch_access_mode(app_id: str) -> WebAppAccessMode | None:
settings = EnterpriseService.WebAppAuth.get_app_access_mode_by_id(app_id=app_id)
if settings is None:
return None
try:
return WebAppAccessMode(settings.access_mode)
except ValueError:
return None
@classmethod
def _subject_allowed_for_mode(cls, subject_type: SubjectType, access_mode: WebAppAccessMode) -> bool:
return access_mode in cls._ALLOWED_MODES_BY_SUBJECT.get(subject_type, frozenset())
def _inner_permission_check(self, ctx: Context) -> bool:
if ctx.app is None:
return False
user_id = self._resolve_user_id(ctx)
if user_id is None:
return False
return EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(
user_id=user_id,
app_id=ctx.app.id,
)
@staticmethod
def _resolve_user_id(ctx: Context) -> str | None:
if ctx.subject_type == SubjectType.ACCOUNT:
return str(ctx.account_id) if ctx.account_id is not None else None
if ctx.subject_email is None:
return None
account = db.session.execute(
select(Account).where(Account.email == ctx.subject_email),
).scalar_one_or_none()
return str(account.id) if account is not None else None
class MembershipStrategy:
"""Tenant-membership fallback.
Used when webapp-auth is disabled (CE deployment). Account-bearing
subjects pass if they have a TenantAccountJoin row; EXTERNAL_SSO is
denied (it requires the webapp-auth surface).
"""
def authorize(self, ctx: Context) -> bool:
if ctx.subject_type == SubjectType.EXTERNAL_SSO:
return False
if ctx.tenant is None:
return False
return _has_tenant_membership(ctx.account_id, ctx.tenant.id)
def _has_tenant_membership(account_id: uuid.UUID | str | None, tenant_id: str) -> bool:
if not account_id:
return False
row = db.session.execute(
select(TenantAccountJoin.id).where(
TenantAccountJoin.tenant_id == tenant_id,
TenantAccountJoin.account_id == account_id,
)
).scalar_one_or_none()
return row is not None
def _login_as(user) -> None:
"""Set Flask-Login request user so downstream services see the caller."""
current_app.login_manager._update_request_context_with_user(user)
user_logged_in.send(current_app._get_current_object(), user=user)
class CallerMounter(Protocol):
def applies_to(self, subject_type: SubjectType) -> bool: ...
def mount(self, ctx: Context) -> None: ...
class AccountMounter:
def applies_to(self, subject_type: SubjectType) -> bool:
return subject_type == SubjectType.ACCOUNT
def mount(self, ctx: Context) -> None:
if ctx.account_id is None:
raise RuntimeError("AccountMounter: account_id unset — BearerCheck did not run")
account = db.session.get(Account, ctx.account_id)
if account is None:
raise RuntimeError("AccountMounter: account row missing for resolved bearer")
account.current_tenant = ctx.tenant
_login_as(account)
ctx.caller, ctx.caller_kind = account, "account"
class EndUserMounter:
def applies_to(self, subject_type: SubjectType) -> bool:
return subject_type == SubjectType.EXTERNAL_SSO
def mount(self, ctx: Context) -> None:
if ctx.tenant is None or ctx.app is None or ctx.subject_email is None:
raise RuntimeError("EndUserMounter: tenant/app/subject_email unset — earlier steps did not run")
end_user = EndUserService.get_or_create_end_user_by_type(
InvokeFrom.OPENAPI,
tenant_id=ctx.tenant.id,
app_id=ctx.app.id,
user_id=ctx.subject_email,
)
_login_as(end_user)
ctx.caller, ctx.caller_kind = end_user, "end_user"

View File

@ -0,0 +1,89 @@
"""Surface gate.
`@accept_subjects(...)` is the route-level form. `SurfaceCheck` (pipeline
step) is the pipeline-level form. Both delegate to `check_surface` so the
audit emit + canonical-path message are single-sourced.
Subjects come from `libs.oauth_bearer.SubjectType` directly — no parallel
vocabulary. Caller hits the wrong surface → 403 ``wrong_surface`` + audit
``openapi.wrong_surface_denied``.
"""
from __future__ import annotations
from collections.abc import Callable
from functools import wraps
from typing import TypeVar
from flask import g, request
from werkzeug.exceptions import Forbidden
from controllers.openapi._audit import emit_wrong_surface
from libs.oauth_bearer import SubjectType
_CANONICAL_PATH: dict[SubjectType, str] = {
SubjectType.ACCOUNT: "/openapi/v1/apps",
SubjectType.EXTERNAL_SSO: "/openapi/v1/permitted-external-apps",
}
F = TypeVar("F", bound=Callable[..., object])
def check_surface(accepted: frozenset[SubjectType]) -> None:
"""Enforce that ``g.auth_ctx.subject_type`` is in ``accepted``.
Raises ``Forbidden`` with ``wrong_surface`` + canonical-path hint on
miss; emits ``openapi.wrong_surface_denied`` audit. If ``g.auth_ctx``
is missing the bearer layer didn't run — that's a wiring bug, not a
user-driven failure, so surface it as a ``RuntimeError`` instead of
a silent 403.
"""
ctx = getattr(g, "auth_ctx", None)
if ctx is None:
raise RuntimeError(
"check_surface called without g.auth_ctx; stack validate_bearer or BearerCheck above the surface gate"
)
subject = _coerce_subject_type(getattr(ctx, "subject_type", None))
if subject in accepted:
return
canonical = _CANONICAL_PATH.get(subject, "/openapi/v1/") if subject else "/openapi/v1/"
emit_wrong_surface(
subject_type=subject.value if subject else None,
attempted_path=request.path,
client_id=getattr(ctx, "client_id", None),
token_id=_stringify(getattr(ctx, "token_id", None)),
)
raise Forbidden(description=f"wrong_surface (canonical: {canonical})")
def accept_subjects(*accepted: SubjectType) -> Callable[[F], F]:
accepted_set: frozenset[SubjectType] = frozenset(accepted)
def deco(fn: F) -> F:
@wraps(fn)
def wrapper(*args: object, **kwargs: object) -> object:
check_surface(accepted_set)
return fn(*args, **kwargs)
return wrapper # type: ignore[return-value]
return deco
def _coerce_subject_type(raw: object) -> SubjectType | None:
if raw is None:
return None
if isinstance(raw, SubjectType):
return raw
try:
return SubjectType(raw)
except ValueError:
return None
def _stringify(value: object) -> str | None:
if value is None:
return None
return str(value)

View File

@ -0,0 +1,107 @@
"""
OpenAPI bearer-authed human input form endpoints.
GET /apps/<app_id>/form/human_input/<form_token> — fetch paused form definition
POST /apps/<app_id>/form/human_input/<form_token> — submit form response
"""
from __future__ import annotations
import json
import logging
from flask import Response, request
from flask_restx import Resource
from werkzeug.exceptions import BadRequest, NotFound
from controllers.common.human_input import HumanInputFormSubmitPayload, stringify_form_default_values
from controllers.common.schema import register_schema_models
from controllers.openapi import openapi_ns
from controllers.openapi.auth.composition import OAUTH_BEARER_PIPELINE
from core.workflow.human_input_policy import HumanInputSurface, is_recipient_type_allowed_for_surface
from extensions.ext_database import db
from libs.helper import to_timestamp
from libs.oauth_bearer import Scope
from models.model import App
from services.human_input_service import FormNotFoundError, HumanInputService
logger = logging.getLogger(__name__)
register_schema_models(openapi_ns, HumanInputFormSubmitPayload)
def _jsonify_form_definition(form) -> Response:
definition_payload = form.get_definition().model_dump()
payload = {
"form_content": definition_payload["rendered_content"],
"inputs": definition_payload["inputs"],
"resolved_default_values": stringify_form_default_values(definition_payload["default_values"]),
"user_actions": definition_payload["user_actions"],
"expiration_time": to_timestamp(form.expiration_time),
}
return Response(json.dumps(payload, ensure_ascii=False), mimetype="application/json")
def _ensure_form_belongs_to_app(form, app_model: App) -> None:
if form.app_id != app_model.id or form.tenant_id != app_model.tenant_id:
raise NotFound("Form not found")
def _ensure_form_is_allowed_for_openapi(form) -> None:
if not is_recipient_type_allowed_for_surface(form.recipient_type, HumanInputSurface.OPENAPI):
raise NotFound("Form not found")
@openapi_ns.route("/apps/<string:app_id>/form/human_input/<string:form_token>")
class OpenApiWorkflowHumanInputFormApi(Resource):
@openapi_ns.response(200, "Form definition")
@OAUTH_BEARER_PIPELINE.guard(scope=Scope.APPS_RUN)
def get(self, app_id: str, form_token: str, app_model: App, caller, caller_kind: str):
service = HumanInputService(db.engine)
form = service.get_form_by_token(form_token)
if form is None:
raise NotFound("Form not found")
_ensure_form_belongs_to_app(form, app_model)
_ensure_form_is_allowed_for_openapi(form)
service.ensure_form_active(form)
return _jsonify_form_definition(form)
@openapi_ns.expect(openapi_ns.models[HumanInputFormSubmitPayload.__name__])
@openapi_ns.response(200, "Form submitted")
@OAUTH_BEARER_PIPELINE.guard(scope=Scope.APPS_RUN)
def post(self, app_id: str, form_token: str, app_model: App, caller, caller_kind: str):
payload = HumanInputFormSubmitPayload.model_validate(request.get_json(silent=True) or {})
service = HumanInputService(db.engine)
form = service.get_form_by_token(form_token)
if form is None:
raise NotFound("Form not found")
_ensure_form_belongs_to_app(form, app_model)
_ensure_form_is_allowed_for_openapi(form)
submission_user_id: str | None = None
submission_end_user_id: str | None = None
if caller_kind == "account":
submission_user_id = caller.id
else:
submission_end_user_id = caller.id
if form.recipient_type is None:
logger.warning("Recipient type is None for form, form_token=%s", form_token)
raise BadRequest("Form recipient type is invalid")
try:
service.submit_form_by_token(
recipient_type=form.recipient_type,
form_token=form_token,
selected_action_id=payload.action,
form_data=payload.inputs,
submission_user_id=submission_user_id,
submission_end_user_id=submission_end_user_id,
)
except FormNotFoundError:
raise NotFound("Form not found")
return {}, 200

View File

@ -0,0 +1,9 @@
from flask_restx import Resource
from controllers.openapi import openapi_ns
@openapi_ns.route("/_health")
class HealthApi(Resource):
def get(self):
return {"ok": True}

View File

@ -0,0 +1,404 @@
"""Device-flow endpoints under /openapi/v1/oauth/device/*. Two
sub-groups in one module:
Protocol (RFC 8628, public + rate-limited):
POST /oauth/device/code
POST /oauth/device/token
GET /oauth/device/lookup
Approval (account branch, console-cookie authed):
POST /oauth/device/approve
POST /oauth/device/deny
SSO branch lives in oauth_device_sso.py.
"""
from __future__ import annotations
import logging
from flask import request
from flask_login import login_required
from flask_restx import Resource
from pydantic import BaseModel, ValidationError
from werkzeug.exceptions import BadRequest
from configs import dify_config
from controllers.common.schema import query_params_from_model
from controllers.console.wraps import account_initialization_required, setup_required
from controllers.openapi import openapi_ns
from controllers.openapi._models import (
AccountPayload,
DeviceCodeRequest,
DeviceCodeResponse,
DeviceLookupQuery,
DeviceLookupResponse,
DeviceMutateRequest,
DeviceMutateResponse,
DevicePollRequest,
WorkspacePayload,
)
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from libs.helper import extract_remote_ip
from libs.login import current_account_with_tenant
from libs.oauth_bearer import MINTABLE_PROFILES, SubjectType, bearer_feature_required
from libs.rate_limit import (
LIMIT_APPROVE_CONSOLE,
LIMIT_DEVICE_CODE_PER_IP,
LIMIT_LOOKUP_PUBLIC,
rate_limit,
)
from services.oauth_device_flow import (
ACCOUNT_ISSUER_SENTINEL,
DEFAULT_POLL_INTERVAL_SECONDS,
DEVICE_FLOW_TTL_SECONDS,
DeviceFlowRedis,
DeviceFlowStatus,
InvalidTransitionError,
SlowDownDecision,
StateNotFoundError,
mint_oauth_token,
oauth_ttl_days,
)
from services.openapi.mint_policy import MintPolicyViolation, validate_mint_policy
logger = logging.getLogger(__name__)
# =========================================================================
# Validation helpers
# =========================================================================
def _validate_json[M: BaseModel](model: type[M]) -> M:
body = request.get_json(silent=True) or {}
try:
return model.model_validate(body)
except ValidationError as exc:
raise BadRequest(str(exc))
def _validate_query[M: BaseModel](model: type[M]) -> M:
try:
return model.model_validate(request.args.to_dict(flat=True))
except ValidationError as exc:
raise BadRequest(str(exc))
# =========================================================================
# Protocol endpoints — RFC 8628 (public + per-IP rate limit)
# =========================================================================
@openapi_ns.route("/oauth/device/code")
class OAuthDeviceCodeApi(Resource):
@openapi_ns.expect(openapi_ns.models[DeviceCodeRequest.__name__])
@openapi_ns.response(200, "Device code created", openapi_ns.models[DeviceCodeResponse.__name__])
@rate_limit(LIMIT_DEVICE_CODE_PER_IP)
def post(self):
payload = _validate_json(DeviceCodeRequest)
client_id = payload.client_id
device_label = payload.device_label
if client_id not in dify_config.OPENAPI_KNOWN_CLIENT_IDS:
return {"error": "unsupported_client"}, 400
store = DeviceFlowRedis(redis_client)
ip = extract_remote_ip(request)
device_code, user_code, expires_in = store.start(client_id, device_label, created_ip=ip)
return {
"device_code": device_code,
"user_code": user_code,
"verification_uri": _verification_uri(),
"expires_in": expires_in,
"interval": DEFAULT_POLL_INTERVAL_SECONDS,
}, 200
@openapi_ns.route("/oauth/device/token")
class OAuthDeviceTokenApi(Resource):
"""RFC 8628 poll."""
@openapi_ns.expect(openapi_ns.models[DevicePollRequest.__name__])
def post(self):
payload = _validate_json(DevicePollRequest)
device_code = payload.device_code
store = DeviceFlowRedis(redis_client)
# slow_down beats every other branch — polling-too-fast clients
# see only that response regardless of underlying state.
if store.record_poll(device_code, DEFAULT_POLL_INTERVAL_SECONDS) is SlowDownDecision.SLOW_DOWN:
return {"error": "slow_down"}, 400
state = store.load_by_device_code(device_code)
if state is None:
return {"error": "expired_token"}, 400
if state.status is DeviceFlowStatus.PENDING:
return {"error": "authorization_pending"}, 400
terminal = store.consume_on_poll(device_code)
if terminal is None:
return {"error": "expired_token"}, 400
if terminal.status is DeviceFlowStatus.DENIED:
return {"error": "access_denied"}, 400
poll_payload = terminal.poll_payload or {}
if "token" not in poll_payload:
logger.error("device_flow: approved state missing poll_payload for %s", device_code)
return {"error": "expired_token"}, 400
_audit_cross_ip_if_needed(state)
return poll_payload, 200
@openapi_ns.route("/oauth/device/lookup")
class OAuthDeviceLookupApi(Resource):
"""Read-only — public for pre-validate before login. user_code is
high-entropy + short-TTL; per-IP rate limit blocks enumeration.
"""
@openapi_ns.doc(params=query_params_from_model(DeviceLookupQuery))
@openapi_ns.response(200, "Device lookup result", openapi_ns.models[DeviceLookupResponse.__name__])
@rate_limit(LIMIT_LOOKUP_PUBLIC)
def get(self):
payload = _validate_query(DeviceLookupQuery)
user_code = payload.user_code.strip().upper()
store = DeviceFlowRedis(redis_client)
found = store.load_by_user_code(user_code)
if found is None:
return {"valid": False, "expires_in_remaining": 0, "client_id": None}, 200
_device_code, state = found
if state.status is not DeviceFlowStatus.PENDING:
return {"valid": False, "expires_in_remaining": 0, "client_id": state.client_id}, 200
return {
"valid": True,
"expires_in_remaining": DEVICE_FLOW_TTL_SECONDS,
"client_id": state.client_id,
}, 200
# =========================================================================
# Approval endpoints — account branch (cookie-authed)
# =========================================================================
_APPROVE_GUARD_KEY_FMT = "device_code:{code}:approving"
_APPROVE_GUARD_TTL_SECONDS = 10
@openapi_ns.route("/oauth/device/approve")
class DeviceApproveApi(Resource):
@openapi_ns.expect(openapi_ns.models[DeviceMutateRequest.__name__])
@openapi_ns.response(200, "Approved", openapi_ns.models[DeviceMutateResponse.__name__])
@setup_required
@login_required
@account_initialization_required
@bearer_feature_required
@rate_limit(LIMIT_APPROVE_CONSOLE)
def post(self):
payload = _validate_json(DeviceMutateRequest)
user_code = payload.user_code.strip().upper()
account, tenant = current_account_with_tenant()
store = DeviceFlowRedis(redis_client)
found = store.load_by_user_code(user_code)
if found is None:
return {"error": "expired_or_unknown"}, 404
device_code, state = found
if state.status is not DeviceFlowStatus.PENDING:
return {"error": "already_resolved"}, 409
# SET NX guard — without it, two in-flight approves both pass
# PENDING, both mint, and the second upsert silently rotates the
# first caller into an already-revoked token.
guard_key = _APPROVE_GUARD_KEY_FMT.format(code=device_code)
if not redis_client.set(guard_key, "1", nx=True, ex=_APPROVE_GUARD_TTL_SECONDS):
return {"error": "approve_in_progress"}, 409
try:
profile = MINTABLE_PROFILES[SubjectType.ACCOUNT]
try:
validate_mint_policy(
subject_type=profile.subject_type,
prefix=profile.prefix,
scopes=profile.scopes,
)
except MintPolicyViolation as e:
raise BadRequest(description=str(e)) from None
ttl_days = oauth_ttl_days(tenant_id=tenant)
mint = mint_oauth_token(
db.session,
redis_client,
subject_email=account.email,
subject_issuer=ACCOUNT_ISSUER_SENTINEL,
account_id=str(account.id),
client_id=state.client_id,
device_label=state.device_label,
prefix=profile.prefix,
ttl_days=ttl_days,
)
poll_payload = _build_account_poll_payload(account, tenant, mint)
try:
store.approve(
device_code,
subject_email=account.email,
account_id=str(account.id),
subject_issuer=ACCOUNT_ISSUER_SENTINEL,
minted_token=mint.token,
token_id=str(mint.token_id),
poll_payload=poll_payload,
)
except (StateNotFoundError, InvalidTransitionError):
# Row minted but state vanished — roll forward; the orphan
# token is revocable via auth devices list / Authorized Apps.
logger.exception("device_flow: approve raced on %s", device_code)
return {"error": "state_lost"}, 409
finally:
redis_client.delete(guard_key)
_emit_approve_audit(state, account, tenant, mint)
return {"status": "approved"}, 200
@openapi_ns.route("/oauth/device/deny")
class DeviceDenyApi(Resource):
@openapi_ns.expect(openapi_ns.models[DeviceMutateRequest.__name__])
@openapi_ns.response(200, "Denied", openapi_ns.models[DeviceMutateResponse.__name__])
@setup_required
@login_required
@account_initialization_required
@bearer_feature_required
@rate_limit(LIMIT_APPROVE_CONSOLE)
def post(self):
payload = _validate_json(DeviceMutateRequest)
user_code = payload.user_code.strip().upper()
store = DeviceFlowRedis(redis_client)
found = store.load_by_user_code(user_code)
if found is None:
return {"error": "expired_or_unknown"}, 404
device_code, state = found
if state.status is not DeviceFlowStatus.PENDING:
return {"error": "already_resolved"}, 409
try:
store.deny(device_code)
except (StateNotFoundError, InvalidTransitionError):
logger.exception("device_flow: deny raced on %s", device_code)
return {"error": "state_lost"}, 409
_emit_deny_audit(state)
return {"status": "denied"}, 200
# =========================================================================
# Helpers
# =========================================================================
def _verification_uri() -> str:
base = getattr(dify_config, "CONSOLE_WEB_URL", None)
if base:
return f"{base.rstrip('/')}/device"
return f"{request.host_url.rstrip('/')}/device"
def _audit_cross_ip_if_needed(state) -> None:
poll_ip = extract_remote_ip(request)
if state.created_ip and poll_ip and poll_ip != state.created_ip:
logger.warning(
"audit: oauth.device_code_cross_ip_poll token_id=%s creation_ip=%s poll_ip=%s",
state.token_id,
state.created_ip,
poll_ip,
extra={
"audit": True,
"token_id": state.token_id,
"creation_ip": state.created_ip,
"poll_ip": poll_ip,
},
)
def _build_account_poll_payload(account, tenant, mint) -> dict:
"""Pre-render the poll-response body so the unauthenticated poll
handler doesn't re-query accounts/tenants for authz data.
"""
from models import Tenant, TenantAccountJoin
rows = (
db.session.query(Tenant, TenantAccountJoin)
.join(TenantAccountJoin, TenantAccountJoin.tenant_id == Tenant.id)
.filter(TenantAccountJoin.account_id == account.id)
.all()
)
workspaces = [WorkspacePayload(id=str(t.id), name=t.name, role=getattr(m, "role", "")) for t, m in rows]
# Prefer active session tenant → DB-flagged current join → first membership.
default_ws_id = None
if tenant and any(w.id == str(tenant) for w in workspaces):
default_ws_id = str(tenant)
if default_ws_id is None:
for _t, m in rows:
if getattr(m, "current", False):
default_ws_id = str(m.tenant_id)
break
if default_ws_id is None and workspaces:
default_ws_id = workspaces[0].id
return {
"token": mint.token,
"expires_at": mint.expires_at.isoformat(),
"subject_type": SubjectType.ACCOUNT,
"account": AccountPayload(id=str(account.id), email=account.email, name=account.name).model_dump(mode="json"),
"workspaces": [w.model_dump(mode="json") for w in workspaces],
"default_workspace_id": default_ws_id,
"token_id": str(mint.token_id),
}
def _emit_approve_audit(state, account, tenant, mint) -> None:
logger.warning(
"audit: oauth.device_flow_approved token_id=%s subject=%s client_id=%s device_label=%s rotated=? expires_at=%s",
mint.token_id,
account.email,
state.client_id,
state.device_label,
mint.expires_at,
extra={
"audit": True,
"event": "oauth.device_flow_approved",
"token_id": str(mint.token_id),
"subject_type": SubjectType.ACCOUNT,
"subject_email": account.email,
"account_id": str(account.id),
"tenant_id": tenant,
"client_id": state.client_id,
"device_label": state.device_label,
"scopes": ["full"],
"expires_at": mint.expires_at.isoformat(),
},
)
def _emit_deny_audit(state) -> None:
logger.warning(
"audit: oauth.device_flow_denied client_id=%s device_label=%s",
state.client_id,
state.device_label,
extra={
"audit": True,
"event": "oauth.device_flow_denied",
"client_id": state.client_id,
"device_label": state.device_label,
},
)

View File

@ -0,0 +1,369 @@
"""SSO-branch device-flow endpoints under /openapi/v1/oauth/device/*.
EE-only. Browser flow:
GET /oauth/device/sso-initiate → 302 to IdP authorize URL
GET /oauth/device/sso-complete → ACS callback, sets approval-grant cookie
GET /oauth/device/approval-context → SPA reads cookie claims (idempotent)
POST /oauth/device/approve-external → mints dfoe_ token + clears cookie
Function-based (raw @bp.route) rather than Resource classes because the
handlers do redirects + cookie kwargs that don't fit the Resource shape.
"""
from __future__ import annotations
import logging
import secrets
from dataclasses import dataclass
from flask import jsonify, make_response, redirect, request
from sqlalchemy import func, select
from werkzeug.exceptions import (
BadGateway,
BadRequest,
Conflict,
Forbidden,
NotFound,
Unauthorized,
)
from controllers.openapi import bp
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from libs import jws
from libs.device_flow_security import (
APPROVAL_GRANT_COOKIE_NAME,
ApprovalGrantClaims,
approval_grant_cleared_cookie_kwargs,
approval_grant_cookie_kwargs,
consume_approval_grant_nonce,
consume_sso_assertion_nonce,
enterprise_only,
mint_approval_grant,
verify_approval_grant,
)
from libs.oauth_bearer import MINTABLE_PROFILES, SubjectType
from libs.rate_limit import (
LIMIT_APPROVE_EXT_PER_EMAIL,
LIMIT_SSO_INITIATE_PER_IP,
enforce,
rate_limit,
)
from models import Account
from models.account import AccountStatus
from services.enterprise.enterprise_service import EnterpriseService
from services.oauth_device_flow import (
DeviceFlowRedis,
DeviceFlowStatus,
InvalidTransitionError,
StateNotFoundError,
mint_oauth_token,
oauth_ttl_days,
)
from services.openapi.mint_policy import MintPolicyViolation, validate_mint_policy
logger = logging.getLogger(__name__)
# Matches DEVICE_FLOW_TTL_SECONDS so the signed state can't outlive the
# device_code it references.
STATE_ENVELOPE_TTL_SECONDS = 15 * 60
# Canonical sso-complete path. IdP-side ACS callback URL must point here.
_SSO_COMPLETE_PATH = "/openapi/v1/oauth/device/sso-complete"
@bp.route("/oauth/device/sso-initiate", methods=["GET"])
@enterprise_only
@rate_limit(LIMIT_SSO_INITIATE_PER_IP)
def sso_initiate():
user_code = (request.args.get("user_code") or "").strip().upper()
if not user_code:
raise BadRequest("user_code required")
store = DeviceFlowRedis(redis_client)
found = store.load_by_user_code(user_code)
if found is None:
raise BadRequest("invalid_user_code")
_, state = found
if state.status is not DeviceFlowStatus.PENDING:
raise BadRequest("invalid_user_code")
keyset = jws.KeySet.from_shared_secret()
signed_state = jws.sign(
keyset,
payload={
"redirect_url": "",
"app_code": "",
"intent": "device_flow",
"user_code": user_code,
"nonce": secrets.token_urlsafe(16),
"return_to": "",
"idp_callback_url": f"{request.host_url.rstrip('/')}{_SSO_COMPLETE_PATH}",
},
aud=jws.AUD_STATE_ENVELOPE,
ttl_seconds=STATE_ENVELOPE_TTL_SECONDS,
)
try:
reply = EnterpriseService.initiate_device_flow_sso(signed_state)
except Exception as e:
logger.warning("sso-initiate: enterprise call failed: %s", e)
raise BadGateway("sso_initiate_failed") from e
url = (reply or {}).get("url")
if not url:
raise BadGateway("sso_initiate_missing_url")
# Clear stale approval-grant — defends against cross-tab/back-button mixing.
resp = redirect(url, code=302)
resp.set_cookie(**approval_grant_cleared_cookie_kwargs())
return resp
@bp.route("/oauth/device/sso-complete", methods=["GET"])
@enterprise_only
def sso_complete():
blob = request.args.get("sso_assertion")
if not blob:
raise BadRequest("sso_assertion required")
keyset = jws.KeySet.from_shared_secret()
try:
claims = jws.verify(keyset, blob, expected_aud=jws.AUD_EXT_SUBJECT_ASSERTION)
except jws.VerifyError as e:
logger.warning("sso-complete: rejected assertion: %s", e)
raise BadRequest("invalid_sso_assertion") from e
if not consume_sso_assertion_nonce(redis_client, claims.get("nonce", "")):
raise BadRequest("invalid_sso_assertion")
user_code = (claims.get("user_code") or "").strip().upper()
store = DeviceFlowRedis(redis_client)
found = store.load_by_user_code(user_code)
if found is None:
raise Conflict("user_code_not_pending")
_, state = found
if state.status is not DeviceFlowStatus.PENDING:
raise Conflict("user_code_not_pending")
if _email_belongs_to_dify_account(claims["email"]):
_emit_external_rejection_audit(
state,
_RejectedClaims(subject_email=claims["email"], subject_issuer=claims["issuer"]),
reason="email_belongs_to_dify_account",
)
return redirect("/device?sso_error=email_belongs_to_dify_account", code=302)
iss = request.host_url.rstrip("/")
cookie_value, _ = mint_approval_grant(
keyset=keyset,
iss=iss,
subject_email=claims["email"],
subject_issuer=claims["issuer"],
user_code=user_code,
)
resp = redirect("/device?sso_verified=1", code=302)
resp.set_cookie(**approval_grant_cookie_kwargs(cookie_value))
return resp
@bp.route("/oauth/device/approval-context", methods=["GET"])
@enterprise_only
def approval_context():
token = request.cookies.get(APPROVAL_GRANT_COOKIE_NAME)
if not token:
raise Unauthorized("no_session")
keyset = jws.KeySet.from_shared_secret()
try:
claims = verify_approval_grant(keyset, token)
except jws.VerifyError as e:
logger.warning("approval-context: bad cookie: %s", e)
raise Unauthorized("no_session") from e
return jsonify(
{
"subject_email": claims.subject_email,
"subject_issuer": claims.subject_issuer,
"user_code": claims.user_code,
"csrf_token": claims.csrf_token,
"expires_at": claims.expires_at.isoformat(),
}
), 200
@bp.route("/oauth/device/approve-external", methods=["POST"])
@enterprise_only
def approve_external():
token = request.cookies.get(APPROVAL_GRANT_COOKIE_NAME)
if not token:
raise Unauthorized("invalid_session")
keyset = jws.KeySet.from_shared_secret()
try:
claims: ApprovalGrantClaims = verify_approval_grant(keyset, token)
except jws.VerifyError as e:
logger.warning("approve-external: bad cookie: %s", e)
raise Unauthorized("invalid_session") from e
enforce(LIMIT_APPROVE_EXT_PER_EMAIL, key=f"subject:{claims.subject_email}")
csrf_header = request.headers.get("X-CSRF-Token", "")
if not csrf_header or csrf_header != claims.csrf_token:
raise Forbidden("csrf_mismatch")
data = request.get_json(silent=True) or {}
body_user_code = (data.get("user_code") or "").strip().upper()
if body_user_code != claims.user_code:
raise BadRequest("user_code_mismatch")
store = DeviceFlowRedis(redis_client)
found = store.load_by_user_code(claims.user_code)
if found is None:
raise NotFound("user_code_not_pending")
device_code, state = found
if state.status is not DeviceFlowStatus.PENDING:
raise Conflict("user_code_not_pending")
if _email_belongs_to_dify_account(claims.subject_email):
_emit_external_rejection_audit(state, claims, reason="email_belongs_to_dify_account")
raise Forbidden("email_belongs_to_dify_account")
if not consume_approval_grant_nonce(redis_client, claims.nonce):
raise Unauthorized("session_already_consumed")
profile = MINTABLE_PROFILES[SubjectType.EXTERNAL_SSO]
try:
validate_mint_policy(
subject_type=profile.subject_type,
prefix=profile.prefix,
scopes=profile.scopes,
)
except MintPolicyViolation as e:
raise BadRequest(description=str(e)) from None
ttl_days = oauth_ttl_days(tenant_id=None)
mint = mint_oauth_token(
db.session,
redis_client,
subject_email=claims.subject_email,
subject_issuer=claims.subject_issuer,
account_id=None,
client_id=state.client_id,
device_label=state.device_label,
prefix=profile.prefix,
ttl_days=ttl_days,
)
poll_payload = {
"token": mint.token,
"expires_at": mint.expires_at.isoformat(),
"subject_type": SubjectType.EXTERNAL_SSO,
"subject_email": claims.subject_email,
"subject_issuer": claims.subject_issuer,
"account": None,
"workspaces": [],
"default_workspace_id": None,
"token_id": str(mint.token_id),
}
try:
store.approve(
device_code,
subject_email=claims.subject_email,
account_id=None,
subject_issuer=claims.subject_issuer,
minted_token=mint.token,
token_id=str(mint.token_id),
poll_payload=poll_payload,
)
except (StateNotFoundError, InvalidTransitionError) as e:
logger.exception("approve-external: state transition raced")
raise Conflict("state_lost") from e
_emit_approve_external_audit(state, claims, mint)
resp = make_response(jsonify({"status": "approved"}), 200)
resp.set_cookie(**approval_grant_cleared_cookie_kwargs())
return resp
@dataclass(frozen=True)
class _RejectedClaims:
"""Minimal subject shape consumed by `_emit_external_rejection_audit`.
Mirrors the attributes used from `ApprovalGrantClaims` so callers holding
only a raw JWS claims dict (e.g. `sso_complete`) can emit the same audit
event without reaching for the full dataclass.
"""
subject_email: str
subject_issuer: str
def _email_belongs_to_dify_account(email: str) -> bool:
"""External SSO subjects whose email matches an active Dify Account must
authenticate via the internal Dify login path (which mints dfoa_), not via
the external SSO device flow. Returning True here blocks dfoe_ minting.
Pending/uninitialized/banned/closed accounts do not block: pending and
uninitialized users may complete invitation via SSO; banned and closed
accounts are handled by separate enforcement paths.
"""
if not email:
return False
normalized = email.strip().lower()
if not normalized:
return False
row = db.session.execute(
select(Account.id).where(
func.lower(Account.email) == normalized,
Account.status == AccountStatus.ACTIVE,
),
).scalar_one_or_none()
return row is not None
def _emit_external_rejection_audit(state, claims, *, reason: str) -> None:
logger.warning(
"audit: oauth.device_flow_rejected subject_type=%s subject_email=%s subject_issuer=%s reason=%s",
SubjectType.EXTERNAL_SSO,
claims.subject_email,
claims.subject_issuer,
reason,
extra={
"audit": True,
"event": "oauth.device_flow_rejected",
"subject_type": SubjectType.EXTERNAL_SSO,
"subject_email": claims.subject_email,
"subject_issuer": claims.subject_issuer,
"reason": reason,
"client_id": state.client_id,
"device_label": state.device_label,
},
)
def _emit_approve_external_audit(state, claims, mint) -> None:
logger.warning(
"audit: oauth.device_flow_approved subject_type=%s subject_email=%s subject_issuer=%s token_id=%s",
SubjectType.EXTERNAL_SSO,
claims.subject_email,
claims.subject_issuer,
mint.token_id,
extra={
"audit": True,
"event": "oauth.device_flow_approved",
"subject_type": SubjectType.EXTERNAL_SSO,
"subject_email": claims.subject_email,
"subject_issuer": claims.subject_issuer,
"token_id": str(mint.token_id),
"client_id": state.client_id,
"device_label": state.device_label,
"scopes": ["apps:run"],
"expires_at": mint.expires_at.isoformat(),
},
)

View File

@ -0,0 +1,119 @@
"""
OpenAPI bearer-authed workflow reconnect event stream endpoint.
GET /apps/<app_id>/tasks/<task_id>/events
— reconnect to the SSE stream for a paused/running workflow run.
`task_id` is treated as `workflow_run_id`.
"""
from __future__ import annotations
import json
from collections.abc import Generator
from flask import Response, request
from flask_restx import Resource
from sqlalchemy.orm import sessionmaker
from werkzeug.exceptions import NotFound, UnprocessableEntity
from controllers.openapi import openapi_ns
from controllers.openapi.auth.composition import OAUTH_BEARER_PIPELINE
from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator
from core.app.apps.base_app_generator import BaseAppGenerator
from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter
from core.app.apps.message_generator import MessageGenerator
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
from core.app.entities.task_entities import StreamEvent
from core.workflow.human_input_policy import HumanInputSurface
from extensions.ext_database import db
from libs.oauth_bearer import Scope
from models.enums import CreatorUserRole
from models.model import App, AppMode
from repositories.factory import DifyAPIRepositoryFactory
from services.workflow_event_snapshot_service import build_workflow_event_stream
@openapi_ns.route("/apps/<string:app_id>/tasks/<string:task_id>/events")
class OpenApiWorkflowEventsApi(Resource):
@openapi_ns.response(200, "SSE event stream")
@OAUTH_BEARER_PIPELINE.guard(scope=Scope.APPS_RUN)
def get(self, app_id: str, task_id: str, app_model: App, caller, caller_kind: str):
app_mode = AppMode.value_of(app_model.mode)
if app_mode not in {AppMode.WORKFLOW, AppMode.ADVANCED_CHAT}:
raise UnprocessableEntity("mode_not_supported_for_event_reconnect")
session_maker = sessionmaker(db.engine)
repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
workflow_run = repo.get_workflow_run_by_id_and_tenant_id(
tenant_id=app_model.tenant_id,
run_id=task_id,
)
if workflow_run is None:
raise NotFound("Workflow run not found")
if workflow_run.app_id != app_model.id:
raise NotFound("Workflow run not found")
if caller_kind == "account":
if workflow_run.created_by_role != CreatorUserRole.ACCOUNT or workflow_run.created_by != caller.id:
raise NotFound("Workflow run not found")
else:
if workflow_run.created_by_role != CreatorUserRole.END_USER or workflow_run.created_by != caller.id:
raise NotFound("Workflow run not found")
workflow_run_entity = workflow_run
if workflow_run_entity.finished_at is not None:
response = WorkflowResponseConverter.workflow_run_result_to_finish_response(
task_id=workflow_run_entity.id,
workflow_run=workflow_run_entity,
creator_user=caller,
)
payload = response.model_dump(mode="json")
payload["event"] = response.event.value
def _generate_finished_events() -> Generator[str, None, None]:
yield f"data: {json.dumps(payload)}\n\n"
event_generator = _generate_finished_events
else:
msg_generator = MessageGenerator()
generator: BaseAppGenerator
if app_mode == AppMode.ADVANCED_CHAT:
generator = AdvancedChatAppGenerator()
else:
generator = WorkflowAppGenerator()
include_state_snapshot = request.args.get("include_state_snapshot", "false").lower() == "true"
continue_on_pause = request.args.get("continue_on_pause", "false").lower() == "true"
terminal_events: list[StreamEvent] | None = [] if continue_on_pause else None
def _generate_stream_events():
if include_state_snapshot:
return generator.convert_to_event_stream(
build_workflow_event_stream(
app_mode=app_mode,
workflow_run=workflow_run_entity,
tenant_id=app_model.tenant_id,
app_id=app_model.id,
session_maker=session_maker,
human_input_surface=HumanInputSurface.OPENAPI,
close_on_pause=not continue_on_pause,
)
)
return generator.convert_to_event_stream(
msg_generator.retrieve_events(
app_mode,
workflow_run_entity.id,
terminal_events=terminal_events,
),
)
event_generator = _generate_stream_events
return Response(
event_generator(),
mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
)

View File

@ -0,0 +1,90 @@
"""User-scoped workspace reads under /openapi/v1/workspaces. Bearer-authed
counterparts to the cookie-authed /console/api/workspaces endpoints.
Account bearers (dfoa_) see every tenant they're a member of. External
SSO bearers (dfoe_) have no account_id and so see an empty list — that
matches /openapi/v1/account.
"""
from __future__ import annotations
from itertools import starmap
from flask import g
from flask_restx import Resource
from sqlalchemy import select
from werkzeug.exceptions import NotFound
from controllers.openapi import openapi_ns
from controllers.openapi._models import WorkspaceDetailResponse, WorkspaceListResponse, WorkspaceSummaryResponse
from controllers.openapi.auth.surface_gate import accept_subjects
from extensions.ext_database import db
from libs.oauth_bearer import (
ACCEPT_USER_ANY,
SubjectType,
validate_bearer,
)
from models import Tenant, TenantAccountJoin
@openapi_ns.route("/workspaces")
class WorkspacesApi(Resource):
@openapi_ns.response(200, "Workspace list", openapi_ns.models[WorkspaceListResponse.__name__])
@validate_bearer(accept=ACCEPT_USER_ANY)
@accept_subjects(SubjectType.ACCOUNT)
def get(self):
ctx = g.auth_ctx
rows = db.session.execute(
select(Tenant, TenantAccountJoin)
.join(TenantAccountJoin, TenantAccountJoin.tenant_id == Tenant.id)
.where(TenantAccountJoin.account_id == str(ctx.account_id))
.order_by(Tenant.created_at.asc())
).all()
return WorkspaceListResponse(workspaces=list(starmap(_workspace_summary, rows))).model_dump(mode="json"), 200
@openapi_ns.route("/workspaces/<string:workspace_id>")
class WorkspaceByIdApi(Resource):
@openapi_ns.response(200, "Workspace detail", openapi_ns.models[WorkspaceDetailResponse.__name__])
@validate_bearer(accept=ACCEPT_USER_ANY)
@accept_subjects(SubjectType.ACCOUNT)
def get(self, workspace_id: str):
ctx = g.auth_ctx
row = db.session.execute(
select(Tenant, TenantAccountJoin)
.join(TenantAccountJoin, TenantAccountJoin.tenant_id == Tenant.id)
.where(
Tenant.id == workspace_id,
TenantAccountJoin.account_id == str(ctx.account_id),
)
).first()
# 404 (not 403) on non-member so workspace IDs don't leak across tenants.
if row is None:
raise NotFound("workspace not found")
tenant, membership = row
return _workspace_detail(tenant, membership).model_dump(mode="json"), 200
def _workspace_summary(tenant: Tenant, membership: TenantAccountJoin) -> WorkspaceSummaryResponse:
return WorkspaceSummaryResponse(
id=str(tenant.id),
name=tenant.name,
role=getattr(membership, "role", ""),
status=tenant.status,
current=getattr(membership, "current", False),
)
def _workspace_detail(tenant: Tenant, membership: TenantAccountJoin) -> WorkspaceDetailResponse:
return WorkspaceDetailResponse(
id=str(tenant.id),
name=tenant.name,
role=getattr(membership, "role", ""),
status=tenant.status,
current=getattr(membership, "current", False),
created_at=tenant.created_at.isoformat() if tenant.created_at else None,
)

View File

@ -22,7 +22,7 @@ from fields.conversation_fields import (
SimpleConversation,
)
from graphon.variables.types import SegmentType
from libs.helper import UUIDStrOrEmpty
from libs.helper import UUIDStrOrEmpty, to_timestamp
from models.model import App, AppMode, EndUser
from services.conversation_service import ConversationService
@ -115,9 +115,7 @@ class ConversationVariableResponse(ResponseModel):
@field_validator("created_at", "updated_at", mode="before")
@classmethod
def normalize_timestamp(cls, value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())
return value
return to_timestamp(value)
class ConversationVariableInfiniteScrollPaginationResponse(ResponseModel):

View File

@ -7,18 +7,18 @@ paused human input forms in workflow/chatflow runs.
import json
import logging
from datetime import datetime
from flask import Response
from flask_restx import Resource
from werkzeug.exceptions import BadRequest, NotFound
from controllers.common.human_input import HumanInputFormSubmitPayload
from controllers.common.human_input import HumanInputFormSubmitPayload, stringify_form_default_values
from controllers.common.schema import register_schema_models
from controllers.service_api import service_api_ns
from controllers.service_api.wraps import FetchUserArg, WhereisUserArg, validate_app_token
from core.workflow.human_input_policy import HumanInputSurface, is_recipient_type_allowed_for_surface
from extensions.ext_database import db
from libs.helper import to_timestamp
from models.model import App, EndUser
from services.human_input_service import Form, FormNotFoundError, HumanInputService
@ -28,30 +28,14 @@ logger = logging.getLogger(__name__)
register_schema_models(service_api_ns, HumanInputFormSubmitPayload)
def _stringify_default_values(values: dict[str, object]) -> dict[str, str]:
result: dict[str, str] = {}
for key, value in values.items():
if value is None:
result[key] = ""
elif isinstance(value, (dict, list)):
result[key] = json.dumps(value, ensure_ascii=False)
else:
result[key] = str(value)
return result
def _to_timestamp(value: datetime) -> int:
return int(value.timestamp())
def _jsonify_form_definition(form: Form) -> Response:
definition_payload = form.get_definition().model_dump()
payload = {
"form_content": definition_payload["rendered_content"],
"inputs": definition_payload["inputs"],
"resolved_default_values": _stringify_default_values(definition_payload["default_values"]),
"resolved_default_values": stringify_form_default_values(definition_payload["default_values"]),
"user_actions": definition_payload["user_actions"],
"expiration_time": _to_timestamp(form.expiration_time),
"expiration_time": to_timestamp(form.expiration_time),
}
return Response(json.dumps(payload, ensure_ascii=False), mimetype="application/json")

View File

@ -39,6 +39,7 @@ from graphon.enums import WorkflowExecutionStatus
from graphon.graph_engine.manager import GraphEngineManager
from graphon.model_runtime.errors.invoke import InvokeError
from libs import helper
from libs.helper import to_timestamp
from models.model import App, AppMode, EndUser
from models.workflow import WorkflowRun
from repositories.factory import DifyAPIRepositoryFactory
@ -68,12 +69,6 @@ class WorkflowLogQuery(BaseModel):
register_schema_models(service_api_ns, WorkflowRunPayload, WorkflowLogQuery)
def _to_timestamp(value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())
return value
def _enum_value(value):
return getattr(value, "value", value)
@ -109,7 +104,7 @@ class WorkflowRunResponse(ResponseModel):
@field_validator("created_at", "finished_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class WorkflowRunForLogResponse(ResponseModel):
@ -133,7 +128,7 @@ class WorkflowRunForLogResponse(ResponseModel):
@field_validator("created_at", "finished_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class WorkflowAppLogPartialResponse(ResponseModel):
@ -154,7 +149,7 @@ class WorkflowAppLogPartialResponse(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class WorkflowAppLogPaginationResponse(ResponseModel):

View File

@ -4,7 +4,6 @@ Web App Human Input Form APIs.
import json
import logging
from datetime import datetime
from typing import Any, NotRequired, TypedDict
from flask import Response, request
@ -13,12 +12,12 @@ from sqlalchemy import select
from werkzeug.exceptions import Forbidden
from configs import dify_config
from controllers.common.human_input import HumanInputFormSubmitPayload
from controllers.common.human_input import HumanInputFormSubmitPayload, stringify_form_default_values
from controllers.web import web_ns
from controllers.web.error import NotFoundError, WebFormRateLimitExceededError
from controllers.web.site import serialize_app_site_payload
from extensions.ext_database import db
from libs.helper import RateLimiter, extract_remote_ip
from libs.helper import RateLimiter, extract_remote_ip, to_timestamp
from models.account import TenantStatus
from models.model import App, Site
from services.human_input_service import Form, FormNotFoundError, HumanInputService
@ -38,22 +37,6 @@ _FORM_ACCESS_RATE_LIMITER = RateLimiter(
)
def _stringify_default_values(values: dict[str, object]) -> dict[str, str]:
result: dict[str, str] = {}
for key, value in values.items():
if value is None:
result[key] = ""
elif isinstance(value, (dict, list)):
result[key] = json.dumps(value, ensure_ascii=False)
else:
result[key] = str(value)
return result
def _to_timestamp(value: datetime) -> int:
return int(value.timestamp())
class FormDefinitionPayload(TypedDict):
form_content: Any
inputs: Any
@ -69,9 +52,9 @@ def _jsonify_form_definition(form: Form, site_payload: dict | None = None) -> Re
payload: FormDefinitionPayload = {
"form_content": definition_payload["rendered_content"],
"inputs": definition_payload["inputs"],
"resolved_default_values": _stringify_default_values(definition_payload["default_values"]),
"resolved_default_values": stringify_form_default_values(definition_payload["default_values"]),
"user_actions": definition_payload["user_actions"],
"expiration_time": _to_timestamp(form.expiration_time),
"expiration_time": to_timestamp(form.expiration_time),
}
if site_payload is not None:
payload["site"] = site_payload

View File

@ -16,7 +16,7 @@ from libs.passport import PassportService
from libs.token import extract_webapp_passport
from models.model import App, EndUser, Site
from services.app_service import AppService
from services.enterprise.enterprise_service import EnterpriseService, WebAppSettings
from services.enterprise.enterprise_service import EnterpriseService, WebAppAccessMode, WebAppSettings
from services.feature_service import FeatureService
from services.webapp_auth_service import WebAppAuthService
@ -74,7 +74,7 @@ def decode_jwt_token(app_code: str | None = None, user_id: str | None = None) ->
webapp_settings = EnterpriseService.WebAppAuth.get_app_access_mode_by_id(app_id)
if not webapp_settings:
raise NotFound("Web app settings not found.")
app_web_auth_enabled = webapp_settings.access_mode != "public"
app_web_auth_enabled = webapp_settings.access_mode != WebAppAccessMode.PUBLIC
_validate_webapp_token(decoded, app_web_auth_enabled, system_features.webapp_auth.enabled)
_validate_user_accessibility(
@ -88,7 +88,8 @@ def decode_jwt_token(app_code: str | None = None, user_id: str | None = None) ->
raise Unauthorized("Please re-login to access the web app.")
app_id = AppService.get_app_id_by_code(app_code)
app_web_auth_enabled = (
EnterpriseService.WebAppAuth.get_app_access_mode_by_id(app_id=app_id).access_mode != "public"
EnterpriseService.WebAppAuth.get_app_access_mode_by_id(app_id=app_id).access_mode
!= WebAppAccessMode.PUBLIC
)
if app_web_auth_enabled:
raise WebAppAuthRequiredError()

View File

@ -253,7 +253,20 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
):
"""
Resume a paused advanced chat execution.
``trace_manager`` is transient and excluded from generate-entity serialization,
so resumed executions rebuild it here before persistence layers receive the entity.
"""
if application_generate_entity.trace_manager is None:
application_generate_entity = application_generate_entity.model_copy(
update={
"trace_manager": TraceQueueManager(
app_id=app_model.id,
user_id=user.id if isinstance(user, Account) else user.session_id,
)
}
)
return self._generate(
workflow=workflow,
user=user,

View File

@ -178,7 +178,7 @@ class AgentChatAppConfigManager(BaseAppConfigManager):
if not isinstance(agent_mode, dict):
raise ValueError("agent_mode must be of object type")
# FIXME(-LAN-): Cast needed due to basedpyright limitation with dict type narrowing
# FIXME(-LAN-): Cast needed because static checkers do not narrow this dict value.
agent_mode = cast(dict[str, Any], agent_mode)
if "enabled" not in agent_mode or not agent_mode["enabled"]:

View File

@ -253,7 +253,20 @@ class WorkflowAppGenerator(BaseAppGenerator):
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], None, None]:
"""
Resume a paused workflow execution using the persisted runtime state.
``trace_manager`` is transient and excluded from generate-entity serialization,
so resumed executions rebuild it here before persistence layers receive the entity.
"""
if application_generate_entity.trace_manager is None:
application_generate_entity = application_generate_entity.model_copy(
update={
"trace_manager": TraceQueueManager(
app_id=app_model.id,
user_id=user.id if isinstance(user, Account) else user.session_id,
)
}
)
return self._generate(
app_model=app_model,
workflow=workflow,

View File

@ -730,6 +730,8 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
match invoke_from:
case InvokeFrom.SERVICE_API:
created_from = WorkflowAppLogCreatedFrom.SERVICE_API
case InvokeFrom.OPENAPI:
created_from = WorkflowAppLogCreatedFrom.OPENAPI
case InvokeFrom.EXPLORE:
created_from = WorkflowAppLogCreatedFrom.INSTALLED_APP
case InvokeFrom.WEB_APP:

View File

@ -24,6 +24,7 @@ class UserFrom(StrEnum):
class InvokeFrom(StrEnum):
SERVICE_API = "service-api"
OPENAPI = "openapi"
WEB_APP = "web-app"
TRIGGER = "trigger"
EXPLORE = "explore"

View File

@ -171,9 +171,9 @@ class ProviderConfiguration(BaseModel):
current_credential_id = self.custom_configuration.provider.current_credential_id
if current_credential_id:
from core.helper.credential_utils import check_credential_policy_compliance
from core.helper.credential_utils import runtime_check_credential_policy_compliance
check_credential_policy_compliance(
runtime_check_credential_policy_compliance(
credential_id=current_credential_id,
provider=self.provider.provider,
credential_type=PluginCredentialType.MODEL,
@ -182,9 +182,9 @@ class ProviderConfiguration(BaseModel):
# no current credential id, check all available credentials
if self.custom_configuration.provider:
for credential_configuration in self.custom_configuration.provider.available_credentials:
from core.helper.credential_utils import check_credential_policy_compliance
from core.helper.credential_utils import runtime_check_credential_policy_compliance
check_credential_policy_compliance(
runtime_check_credential_policy_compliance(
credential_id=credential_configuration.credential_id,
provider=self.provider.provider,
credential_type=PluginCredentialType.MODEL,

View File

@ -1,9 +1,9 @@
class LLMError(ValueError):
"""Base class for all LLM exceptions."""
description: str | None = None
description: str = ""
def __init__(self, description: str | None = None):
def __init__(self, description: str = ""):
self.description = description

View File

@ -2,6 +2,7 @@
Credential utility functions for checking credential existence and policy compliance.
"""
from configs import dify_config
from core.entities import PluginCredentialType
@ -39,6 +40,16 @@ def is_credential_exists(credential_id: str, credential_type: "PluginCredentialT
return False
def runtime_check_credential_policy_compliance(
credential_id: str, provider: str, credential_type: "PluginCredentialType", check_existence: bool = True
):
if dify_config.ENTERPRISE_DISABLE_RUNTIME_CREDENTIAL_CHECK:
return
check_credential_policy_compliance(
credential_id=credential_id, provider=provider, credential_type=credential_type, check_existence=check_existence
)
def check_credential_policy_compliance(
credential_id: str, provider: str, credential_type: "PluginCredentialType", check_existence: bool = True
) -> None:

View File

@ -435,7 +435,7 @@ class LLMGenerator:
stream=False,
)
# Runtime type check since pyright has issues with the overload
# Runtime type check for overload narrowing.
if not isinstance(result, LLMResult):
raise TypeError("Expected LLMResult when stream=False")
response = result

View File

@ -2,7 +2,7 @@ from collections.abc import Callable
from dataclasses import dataclass
from typing import Annotated, Any, Literal
from pydantic import BaseModel, ConfigDict, Field, FileUrl, RootModel
from pydantic import BaseModel, ConfigDict, Field, FileUrl, RootModel, model_validator
from pydantic.networks import AnyUrl, UrlConstraints
"""
@ -173,7 +173,21 @@ class JSONRPCError(BaseModel):
class JSONRPCMessage(RootModel[JSONRPCRequest | JSONRPCNotification | JSONRPCResponse | JSONRPCError]):
pass
@model_validator(mode="before")
@classmethod
def _select_message_type(
cls, value: Any
) -> JSONRPCRequest | JSONRPCNotification | JSONRPCResponse | JSONRPCError | Any:
if isinstance(value, dict):
if "result" in value:
return JSONRPCResponse.model_validate(value)
if "error" in value:
return JSONRPCError.model_validate(value)
if "method" in value:
if "id" in value:
return JSONRPCRequest.model_validate(value)
return JSONRPCNotification.model_validate(value)
return value
class EmptyResult(Result):

View File

@ -391,10 +391,10 @@ class ModelInstance:
# Additional policy compliance check as fallback (in case fetch_next didn't catch it)
try:
from core.helper.credential_utils import check_credential_policy_compliance
from core.helper.credential_utils import runtime_check_credential_policy_compliance
if lb_config.credential_id:
check_credential_policy_compliance(
runtime_check_credential_policy_compliance(
credential_id=lb_config.credential_id,
provider=self.provider,
credential_type=PluginCredentialType.MODEL,
@ -630,10 +630,10 @@ class LBModelManager:
# Check policy compliance for the selected configuration
try:
from core.helper.credential_utils import check_credential_policy_compliance
from core.helper.credential_utils import runtime_check_credential_policy_compliance
if config.credential_id:
check_credential_policy_compliance(
runtime_check_credential_policy_compliance(
credential_id=config.credential_id,
provider=self._provider,
credential_type=PluginCredentialType.MODEL,

View File

@ -161,35 +161,39 @@ class AdvancedPromptTransform(PromptTransform):
prompt_messages: list[PromptMessage] = []
for prompt_item in prompt_template:
raw_prompt = prompt_item.text
if prompt_item.edition_type == "basic" or not prompt_item.edition_type:
if self.with_variable_tmpl:
vp = VariablePool.empty()
for k, v in inputs.items():
if k.startswith("#"):
vp.add(k[1:-1].split("."), v)
raw_prompt = raw_prompt.replace("{{#context#}}", context or "")
prompt = vp.convert_template(raw_prompt).text
else:
parser = PromptTemplateParser(template=raw_prompt, with_variable_tmpl=self.with_variable_tmpl)
prompt_inputs: Mapping[str, str] = {k: inputs[k] for k in parser.variable_keys if k in inputs}
prompt_inputs = self._set_context_variable(
context=context, parser=parser, prompt_inputs=prompt_inputs
)
prompt = parser.format(prompt_inputs)
elif prompt_item.edition_type == "jinja2":
prompt = raw_prompt
prompt_inputs = inputs
prompt = Jinja2Formatter.format(template=prompt, inputs=prompt_inputs)
else:
raise ValueError(f"Invalid edition type: {prompt_item.edition_type}")
if prompt_item.role == PromptMessageRole.USER:
prompt_messages.append(UserPromptMessage(content=prompt))
elif prompt_item.role == PromptMessageRole.SYSTEM and prompt:
prompt_messages.append(SystemPromptMessage(content=prompt))
elif prompt_item.role == PromptMessageRole.ASSISTANT:
prompt_messages.append(AssistantPromptMessage(content=prompt))
edition_type = prompt_item.edition_type or "basic"
match edition_type:
case "basic":
if self.with_variable_tmpl:
vp = VariablePool.empty()
for k, v in inputs.items():
if k.startswith("#"):
vp.add(k[1:-1].split("."), v)
raw_prompt = raw_prompt.replace("{{#context#}}", context or "")
prompt = vp.convert_template(raw_prompt).text
else:
parser = PromptTemplateParser(template=raw_prompt, with_variable_tmpl=self.with_variable_tmpl)
prompt_inputs: Mapping[str, str] = {k: inputs[k] for k in parser.variable_keys if k in inputs}
prompt_inputs = self._set_context_variable(
context=context, parser=parser, prompt_inputs=prompt_inputs
)
prompt = parser.format(prompt_inputs)
case "jinja2":
prompt = raw_prompt
prompt_inputs = inputs
prompt = Jinja2Formatter.format(template=prompt, inputs=prompt_inputs)
case _:
raise ValueError(f"Invalid edition type: {prompt_item.edition_type}")
match prompt_item.role:
case PromptMessageRole.USER:
prompt_messages.append(UserPromptMessage(content=prompt))
case PromptMessageRole.SYSTEM:
if prompt:
prompt_messages.append(SystemPromptMessage(content=prompt))
case PromptMessageRole.ASSISTANT:
prompt_messages.append(AssistantPromptMessage(content=prompt))
case PromptMessageRole.TOOL:
pass
if query and memory_config and memory_config.query_prompt_template:
parser = PromptTemplateParser(

View File

@ -183,34 +183,35 @@ class ExtractProcessor:
return extractor.extract()
elif extract_setting.datasource_type == DatasourceType.WEBSITE:
assert extract_setting.website_info is not None, "website_info is required"
if extract_setting.website_info.provider == "firecrawl":
extractor = FirecrawlWebExtractor(
url=extract_setting.website_info.url,
job_id=extract_setting.website_info.job_id,
tenant_id=extract_setting.website_info.tenant_id,
mode=extract_setting.website_info.mode,
only_main_content=extract_setting.website_info.only_main_content,
)
return extractor.extract()
elif extract_setting.website_info.provider == "watercrawl":
extractor = WaterCrawlWebExtractor(
url=extract_setting.website_info.url,
job_id=extract_setting.website_info.job_id,
tenant_id=extract_setting.website_info.tenant_id,
mode=extract_setting.website_info.mode,
only_main_content=extract_setting.website_info.only_main_content,
)
return extractor.extract()
elif extract_setting.website_info.provider == "jinareader":
extractor = JinaReaderWebExtractor(
url=extract_setting.website_info.url,
job_id=extract_setting.website_info.job_id,
tenant_id=extract_setting.website_info.tenant_id,
mode=extract_setting.website_info.mode,
only_main_content=extract_setting.website_info.only_main_content,
)
return extractor.extract()
else:
raise ValueError(f"Unsupported website provider: {extract_setting.website_info.provider}")
match extract_setting.website_info.provider:
case "firecrawl":
extractor = FirecrawlWebExtractor(
url=extract_setting.website_info.url,
job_id=extract_setting.website_info.job_id,
tenant_id=extract_setting.website_info.tenant_id,
mode=extract_setting.website_info.mode,
only_main_content=extract_setting.website_info.only_main_content,
)
return extractor.extract()
case "watercrawl":
extractor = WaterCrawlWebExtractor(
url=extract_setting.website_info.url,
job_id=extract_setting.website_info.job_id,
tenant_id=extract_setting.website_info.tenant_id,
mode=extract_setting.website_info.mode,
only_main_content=extract_setting.website_info.only_main_content,
)
return extractor.extract()
case "jinareader":
extractor = JinaReaderWebExtractor(
url=extract_setting.website_info.url,
job_id=extract_setting.website_info.job_id,
tenant_id=extract_setting.website_info.tenant_id,
mode=extract_setting.website_info.mode,
only_main_content=extract_setting.website_info.only_main_content,
)
return extractor.extract()
case _:
raise ValueError(f"Unsupported website provider: {extract_setting.website_info.provider}")
else:
raise ValueError(f"Unsupported datasource type: {extract_setting.datasource_type}")

View File

@ -19,7 +19,7 @@ class UnstructuredWordExtractor(BaseExtractor):
def extract(self) -> list[Document]:
from unstructured.__version__ import __version__ as __unstructured_version__
from unstructured.file_utils.filetype import ( # pyright: ignore[reportPrivateImportUsage]
from unstructured.file_utils.filetype import (
FileType,
detect_filetype,
)
@ -27,7 +27,7 @@ class UnstructuredWordExtractor(BaseExtractor):
unstructured_version = tuple(int(x) for x in __unstructured_version__.split("."))
# check the file extension
try:
import magic # noqa: F401 # pyright: ignore[reportUnusedImport]
import magic # noqa: F401
is_doc = detect_filetype(self._file_path) == FileType.DOC
except ImportError:

View File

@ -28,10 +28,10 @@ class FunctionCallMultiDatasetRouter:
SystemPromptMessage(content="You are a helpful AI assistant."),
UserPromptMessage(content=query),
]
result: LLMResult = model_instance.invoke_llm( # pyright: ignore[reportCallIssue, reportArgumentType]
result: LLMResult = model_instance.invoke_llm( # pyrefly: ignore[no-matching-overload]
prompt_messages=prompt_messages,
tools=dataset_tools,
stream=False, # pyright: ignore[reportArgumentType]
stream=False,
model_parameters={"temperature": 0.2, "top_p": 0.3, "max_tokens": 1500},
)
usage = result.usage or LLMUsage.empty_usage()

View File

@ -264,9 +264,9 @@ class ToolManager:
if builtin_provider is None:
raise ToolProviderNotFoundError(f"builtin provider {provider_id} not found")
from core.helper.credential_utils import check_credential_policy_compliance
from core.helper.credential_utils import runtime_check_credential_policy_compliance
check_credential_policy_compliance(
runtime_check_credential_policy_compliance(
credential_id=builtin_provider.id,
provider=provider_id,
credential_type=PluginCredentialType.TOOL,

View File

@ -63,7 +63,7 @@ def _get_surface_form_token(
*,
surface: HumanInputSurface | None,
) -> str | None:
if surface == HumanInputSurface.SERVICE_API:
if surface in {HumanInputSurface.SERVICE_API, HumanInputSurface.OPENAPI}:
for recipient_type, token in recipients:
if recipient_type == RecipientType.STANDALONE_WEB_APP and token:
return token

View File

@ -11,13 +11,15 @@ from models.human_input import RecipientType
class HumanInputSurface(StrEnum):
SERVICE_API = "service_api"
CONSOLE = "console"
OPENAPI = "openapi"
# Service API is intentionally narrower than other surfaces: app-token callers
# SERVICE_API and OPENAPI are intentionally narrower than CONSOLE: token callers
# should only be able to act on end-user web forms, not internal console flows.
_ALLOWED_RECIPIENT_TYPES_BY_SURFACE: dict[HumanInputSurface, frozenset[RecipientType]] = {
HumanInputSurface.SERVICE_API: frozenset({RecipientType.STANDALONE_WEB_APP}),
HumanInputSurface.CONSOLE: frozenset({RecipientType.CONSOLE, RecipientType.BACKSTAGE}),
HumanInputSurface.OPENAPI: frozenset({RecipientType.STANDALONE_WEB_APP}),
}
# A single HITL form can have multiple recipient records; this shared priority

View File

@ -45,6 +45,7 @@ SPEC_TARGETS: tuple[SpecTarget, ...] = (
SpecTarget(route="/console/api/swagger.json", filename="console-swagger.json", namespace="console"),
SpecTarget(route="/api/swagger.json", filename="web-swagger.json", namespace="web"),
SpecTarget(route="/v1/swagger.json", filename="service-swagger.json", namespace="service"),
SpecTarget(route="/openapi/v1/swagger.json", filename="openapi-swagger.json", namespace="openapi"),
)
@ -161,6 +162,8 @@ def create_spec_app() -> Flask:
from controllers.console import bp as console_bp
from controllers.console import console_ns
from controllers.openapi import bp as openapi_bp
from controllers.openapi import openapi_ns
from controllers.service_api import bp as service_api_bp
from controllers.service_api import service_api_ns
from controllers.web import bp as web_bp
@ -169,8 +172,9 @@ def create_spec_app() -> Flask:
app.register_blueprint(console_bp)
app.register_blueprint(web_bp)
app.register_blueprint(service_api_bp)
app.register_blueprint(openapi_bp)
for namespace in (console_ns, web_ns, service_api_ns):
for namespace in (console_ns, web_ns, service_api_ns, openapi_ns):
for api in namespace.apis:
_materialize_inline_model_definitions(api)
@ -201,6 +205,13 @@ def _registered_models(namespace: str) -> dict[str, object]:
for api in service_api_ns.apis:
models.update(api.models)
return models
if namespace == "openapi":
from controllers.openapi import openapi_ns
models = dict(openapi_ns.models)
for api in openapi_ns.apis:
models.update(api.models)
return models
raise ValueError(f"unknown Swagger namespace: {namespace}")

View File

@ -11,14 +11,14 @@ from dify_app import DifyApp
def init_app(app: DifyApp):
@app.after_request
def after_request(response): # pyright: ignore[reportUnusedFunction]
def after_request(response):
"""Add Version headers to the response."""
response.headers.add("X-Version", dify_config.project.version)
response.headers.add("X-Env", dify_config.DEPLOY_ENV)
return response
@app.route("/health")
def health(): # pyright: ignore[reportUnusedFunction]
def health():
return Response(
json.dumps({"pid": os.getpid(), "status": "ok", "version": dify_config.project.version}),
status=200,
@ -27,7 +27,7 @@ def init_app(app: DifyApp):
@app.route("/threads")
@admin_required
def threads(): # pyright: ignore[reportUnusedFunction]
def threads():
num_threads = threading.active_count()
threads = threading.enumerate()
@ -53,7 +53,7 @@ def init_app(app: DifyApp):
@app.route("/db-pool-stat")
@admin_required
def pool_stat(): # pyright: ignore[reportUnusedFunction]
def pool_stat():
from extensions.ext_database import db
engine = db.engine

View File

@ -8,6 +8,8 @@ AUTHENTICATED_HEADERS: tuple[str, ...] = (*SERVICE_API_HEADERS, HEADER_NAME_CSRF
FILES_HEADERS: tuple[str, ...] = (*BASE_CORS_HEADERS, HEADER_NAME_CSRF_TOKEN)
EMBED_HEADERS: tuple[str, ...] = ("Content-Type", HEADER_NAME_APP_CODE)
EXPOSED_HEADERS: tuple[str, ...] = ("X-Version", "X-Env", "X-Trace-Id")
OPENAPI_HEADERS: tuple[str, ...] = ("Authorization", "Content-Type", HEADER_NAME_CSRF_TOKEN)
OPENAPI_MAX_AGE_SECONDS: int = 600
def _apply_cors_once(bp, /, **cors_kwargs):
@ -29,6 +31,7 @@ def init_app(app: DifyApp):
from controllers.files import bp as files_bp
from controllers.inner_api import bp as inner_api_bp
from controllers.mcp import bp as mcp_bp
from controllers.openapi import bp as openapi_bp
from controllers.service_api import bp as service_api_bp
from controllers.trigger import bp as trigger_bp
from controllers.web import bp as web_bp
@ -41,6 +44,23 @@ def init_app(app: DifyApp):
)
app.register_blueprint(service_api_bp)
if dify_config.OPENAPI_ENABLED:
# User-scoped programmatic API. Default empty allowlist = same-origin
# only; expand via OPENAPI_CORS_ALLOW_ORIGINS for third-party
# integrations. supports_credentials so cookie-authed approve/deny
# work; cross-origin OPTIONS without an allowed origin will fail
# the same as on the console blueprint.
_apply_cors_once(
openapi_bp,
resources={r"/*": {"origins": dify_config.OPENAPI_CORS_ALLOW_ORIGINS}},
supports_credentials=True,
allow_headers=list(OPENAPI_HEADERS),
methods=["GET", "POST", "PATCH", "DELETE", "OPTIONS"],
expose_headers=list(EXPOSED_HEADERS),
max_age=OPENAPI_MAX_AGE_SECONDS,
)
app.register_blueprint(openapi_bp)
_apply_cors_once(
web_bp,
resources={

View File

@ -222,6 +222,12 @@ def init_app(app: DifyApp) -> Celery:
"task": "schedule.clean_workflow_runs_task.clean_workflow_runs_task",
"schedule": crontab(minute="0", hour="0"),
}
if dify_config.ENABLE_CLEAN_OAUTH_ACCESS_TOKENS_TASK:
imports.append("schedule.clean_oauth_access_tokens_task")
beat_schedule["clean_oauth_access_tokens_task"] = {
"task": "schedule.clean_oauth_access_tokens_task.clean_oauth_access_tokens_task",
"schedule": crontab(minute="0", hour="5", day_of_month=f"*/{day}"),
}
if dify_config.ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK:
imports.append("schedule.workflow_schedule_task")
beat_schedule["workflow_schedule_task"] = {

View File

@ -33,7 +33,7 @@ def _setup_gevent_compatibility():
return
@event.listens_for(Pool, "reset")
def _safe_reset(dbapi_connection, connection_record, reset_state): # pyright: ignore[reportUnusedFunction]
def _safe_reset(dbapi_connection, connection_record, reset_state):
if reset_state.terminate_only:
return

View File

@ -2,4 +2,4 @@ from dify_app import DifyApp
def init_app(app: DifyApp):
from events import event_handlers # noqa: F401 # pyright: ignore[reportUnusedImport]
from events import event_handlers # noqa: F401

View File

@ -12,7 +12,7 @@ from constants import HEADER_NAME_APP_CODE
from dify_app import DifyApp
from extensions.ext_database import db
from libs.passport import PassportService
from libs.token import extract_access_token, extract_webapp_passport
from libs.token import extract_access_token, extract_console_cookie_token, extract_webapp_passport
from models import Account, Tenant, TenantAccountJoin
from models.model import AppMCPServer, EndUser
from services.account_service import AccountService
@ -84,6 +84,24 @@ def load_user_from_request(request_from_flask_login: Request) -> LoginUser | Non
logged_in_account = AccountService.load_logged_in_account(account_id=user_id)
return logged_in_account
elif request.blueprint == "openapi":
# Account-branch device-flow approval routes (approve / deny /
# approval-context) sit under @login_required and authenticate via
# the console session cookie. Cookie-only on purpose — bearer
# tokens (dfoa_/dfoe_) live on the Authorization header and are
# validated by AppPipeline, not flask-login.
cookie_token = extract_console_cookie_token(request)
if not cookie_token:
return None
try:
decoded = PassportService().verify(cookie_token)
except Exception:
return None
user_id = decoded.get("user_id")
source = decoded.get("token_source")
if source or not user_id:
return None
return AccountService.load_logged_in_account(account_id=user_id)
elif request.blueprint == "web":
app_code = request.headers.get(HEADER_NAME_APP_CODE)
webapp_token = extract_webapp_passport(app_code, request) if app_code else None

View File

@ -67,7 +67,7 @@ class Mail:
case _:
raise ValueError(f"Unsupported mail type {mail_type}")
def send(self, to: str, subject: str, html: str, from_: str | None = None):
def send(self, to: str, subject: str, html: str, from_: str = ""):
if not self._client:
raise ValueError("Mail client is not initialized")

View File

@ -0,0 +1,23 @@
"""Bind the bearer authenticator at startup. Must run after ext_database
and ext_redis (needs both factories).
"""
from __future__ import annotations
from configs import dify_config
from dify_app import DifyApp
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from libs.oauth_bearer import build_and_bind
def is_enabled() -> bool:
return dify_config.ENABLE_OAUTH_BEARER
def init_app(app: DifyApp) -> None:
# scoped_session isn't a context manager; request teardown closes it.
def session_factory():
return db.session
build_and_bind(session_factory=session_factory, redis_client=redis_client)

View File

@ -1,5 +1,9 @@
from typing import Any, cast
import socketio # type: ignore[reportMissingTypeStubs]
from configs import dify_config
sio = socketio.Server(async_mode="gevent", cors_allowed_origins=dify_config.CONSOLE_CORS_ALLOW_ORIGINS)
# TODO: FIXME(chariri) - Casting to any because app_factory attaches the
# current app as the `app` attribute on this - Bad.
sio = cast(Any, socketio.Server(async_mode="gevent", cors_allowed_origins=dify_config.CONSOLE_CORS_ALLOW_ORIGINS))

View File

@ -198,7 +198,7 @@ class AliyunLogStore:
)
# Append Dify identification to the existing user agent
original_user_agent = self.client._user_agent # pyright: ignore[reportPrivateUsage]
original_user_agent = self.client._user_agent
dify_version = dify_config.project.version
enhanced_user_agent = f"Dify,Dify-{dify_version},{original_user_agent}"
self.client.set_user_agent(enhanced_user_agent)

View File

@ -5,12 +5,7 @@ from datetime import datetime
from pydantic import Field, field_validator
from fields.base import ResponseModel
def _to_timestamp(value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())
return value
from libs.helper import to_timestamp
class Annotation(ResponseModel):
@ -23,7 +18,7 @@ class Annotation(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_created_at(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class AnnotationList(ResponseModel):
@ -50,7 +45,7 @@ class AnnotationHitHistory(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_created_at(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class AnnotationHitHistoryList(ResponseModel):

View File

@ -7,6 +7,7 @@ from pydantic import Field, field_validator, model_validator
from fields.base import ResponseModel
from graphon.file import File
from libs.helper import to_timestamp
type JSONValue = Any
@ -47,9 +48,7 @@ class SimpleConversation(ResponseModel):
@field_validator("created_at", "updated_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return to_timestamp(value)
return value
return to_timestamp(value)
class ConversationInfiniteScrollPagination(ResponseModel):
@ -90,9 +89,7 @@ class ConversationAnnotation(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_created_at(cls, value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return to_timestamp(value)
return value
return to_timestamp(value)
class ConversationAnnotationHitHistory(ResponseModel):
@ -103,9 +100,7 @@ class ConversationAnnotationHitHistory(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_created_at(cls, value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return to_timestamp(value)
return value
return to_timestamp(value)
class AgentThought(ResponseModel):
@ -125,9 +120,7 @@ class AgentThought(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_created_at(cls, value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return to_timestamp(value)
return value
return to_timestamp(value)
@model_validator(mode="after")
def _fallback_chain_id(self):
@ -169,9 +162,7 @@ class MessageDetail(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_created_at(cls, value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return to_timestamp(value)
return value
return to_timestamp(value)
class FeedbackStat(ResponseModel):
@ -237,9 +228,7 @@ class Conversation(ResponseModel):
@field_validator("read_at", "created_at", "updated_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return to_timestamp(value)
return value
return to_timestamp(value)
class ConversationPagination(ResponseModel):
@ -263,9 +252,7 @@ class ConversationMessageDetail(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_created_at(cls, value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return to_timestamp(value)
return value
return to_timestamp(value)
class ConversationWithSummary(ResponseModel):
@ -291,9 +278,7 @@ class ConversationWithSummary(ResponseModel):
@field_validator("read_at", "created_at", "updated_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return to_timestamp(value)
return value
return to_timestamp(value)
class ConversationWithSummaryPagination(ResponseModel):
@ -322,15 +307,7 @@ class ConversationDetail(ResponseModel):
@field_validator("created_at", "updated_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return to_timestamp(value)
return value
def to_timestamp(value: datetime | None) -> int | None:
if value is None:
return None
return int(value.timestamp())
return to_timestamp(value)
def format_files_contained(value: JSONValue) -> JSONValue:

View File

@ -8,7 +8,7 @@ from pydantic import field_validator
from fields.base import ResponseModel
from graphon.variables.types import SegmentType
from libs.helper import TimestampField
from libs.helper import TimestampField, to_timestamp
from ._value_type_serializer import serialize_value_type
@ -37,12 +37,6 @@ conversation_variable_infinite_scroll_pagination_fields = {
}
def _to_timestamp(value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())
return value
class ConversationVariableResponse(ResponseModel):
id: str
name: str
@ -88,7 +82,7 @@ class ConversationVariableResponse(ResponseModel):
@field_validator("created_at", "updated_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class PaginatedConversationVariableResponse(ResponseModel):

View File

@ -5,12 +5,7 @@ from datetime import datetime
from pydantic import field_validator
from fields.base import ResponseModel
def _to_timestamp(value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())
return value
from libs.helper import to_timestamp
class UploadConfig(ResponseModel):
@ -45,7 +40,7 @@ class FileResponse(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_created_at(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class RemoteFileInfo(ResponseModel):
@ -66,7 +61,7 @@ class FileWithSignedUrl(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_created_at(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
__all__ = [

View File

@ -6,7 +6,7 @@ from flask_restx import fields
from pydantic import computed_field, field_validator
from fields.base import ResponseModel
from graphon.file import helpers as file_helpers
from libs.helper import build_avatar_url, to_timestamp
simple_account_fields = {
"id": fields.String,
@ -15,20 +15,6 @@ simple_account_fields = {
}
def _to_timestamp(value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())
return value
def _build_avatar_url(avatar: str | None) -> str | None:
if avatar is None:
return None
if avatar.startswith(("http://", "https://")):
return avatar
return file_helpers.get_signed_file_url(avatar)
class SimpleAccount(ResponseModel):
id: str
name: str
@ -41,7 +27,7 @@ class _AccountAvatar(ResponseModel):
@computed_field(return_type=str | None) # type: ignore[prop-decorator]
@property
def avatar_url(self) -> str | None:
return _build_avatar_url(self.avatar)
return build_avatar_url(self.avatar)
class Account(_AccountAvatar):
@ -59,7 +45,7 @@ class Account(_AccountAvatar):
@field_validator("last_login_at", "created_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class AccountWithRole(_AccountAvatar):
@ -75,7 +61,7 @@ class AccountWithRole(_AccountAvatar):
@field_validator("last_login_at", "last_active_at", "created_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class AccountWithRoleList(ResponseModel):

View File

@ -9,6 +9,7 @@ from core.entities.execution_extra_content import ExecutionExtraContentDomainMod
from fields.base import ResponseModel
from fields.conversation_fields import AgentThought, JSONValue, MessageFile
from graphon.file import File
from libs.helper import to_timestamp
type JSONValueType = JSONValue
@ -39,9 +40,7 @@ class RetrieverResource(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_created_at(cls, value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return to_timestamp(value)
return value
return to_timestamp(value)
class MessageListItem(ResponseModel):
@ -68,9 +67,7 @@ class MessageListItem(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_created_at(cls, value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return to_timestamp(value)
return value
return to_timestamp(value)
class WebMessageListItem(MessageListItem):
@ -106,9 +103,7 @@ class SavedMessageItem(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_created_at(cls, value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return to_timestamp(value)
return value
return to_timestamp(value)
class SavedMessageInfiniteScrollPagination(ResponseModel):
@ -121,12 +116,6 @@ class SuggestedQuestionsResponse(ResponseModel):
data: list[str]
def to_timestamp(value: datetime | None) -> int | None:
if value is None:
return None
return int(value.timestamp())
def format_files_contained(value: JSONValueType) -> JSONValueType:
if isinstance(value, File):
# Response payloads must preserve legacy file keys like `related_id`/`url`

View File

@ -17,7 +17,7 @@ from fields.workflow_run_fields import (
workflow_run_for_archived_log_fields,
workflow_run_for_log_fields,
)
from libs.helper import TimestampField
from libs.helper import TimestampField, to_timestamp
workflow_app_log_partial_fields = {
"id": fields.String,
@ -96,12 +96,6 @@ def build_workflow_archived_log_pagination_model(api_or_ns: Namespace):
return api_or_ns.model("WorkflowArchivedLogPagination", copied_fields)
def _to_timestamp(value: datetime | int | None) -> int | None:
if isinstance(value, datetime):
return int(value.timestamp())
return value
class WorkflowAppLogPartialResponse(ResponseModel):
id: str
workflow_run: WorkflowRunForLogResponse | None = None
@ -115,7 +109,7 @@ class WorkflowAppLogPartialResponse(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class WorkflowArchivedLogPartialResponse(ResponseModel):
@ -129,7 +123,7 @@ class WorkflowArchivedLogPartialResponse(ResponseModel):
@field_validator("created_at", mode="before")
@classmethod
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
return _to_timestamp(value)
return to_timestamp(value)
class WorkflowAppLogPaginationResponse(ResponseModel):

Some files were not shown because too many files have changed in this diff Show More