Compare commits

..

117 Commits

Author SHA1 Message Date
66d67185a0 fix: array number validation in conversation variable 2025-08-19 14:53:30 +08:00
5f0b52c017 Fix number input in tool configure form of agent node tool item (#24154) 2025-08-19 14:26:09 +08:00
c2606f9062 fix: correct behaviour of code fix (#24152)
Co-authored-by: Joel <iamjoel007@gmail.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-08-19 14:18:49 +08:00
70da81d0e5 try ast-grep (#24149) 2025-08-19 13:41:52 +08:00
75199442c1 feat: Implements periodic deletion of workflow run logs that exceed t… (#23881)
Co-authored-by: shiyun.li973792 <shiyun.li@seres.cn>
Co-authored-by: 1wangshu <suewangswu@gmail.com>
Co-authored-by: Blackoutta <hyytez@gmail.com>
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
2025-08-19 09:47:34 +08:00
60cc82aff1 feat: add testcontainers based tests for feature service (#24026) 2025-08-19 09:32:47 +08:00
ebd2c8236d an example of suppress (#24136) 2025-08-19 00:21:26 +08:00
a2537ba4fd chore: translate i18n files (#24131)
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
2025-08-18 23:46:23 +08:00
a3a041ef6f feat: add delete avatar functionality with confirmation modal (#24127)
Co-authored-by: crazywoola <427733928@qq.com>
2025-08-18 21:35:20 +08:00
c0702aacac Use typing.Literal to replace str places (#24099)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-08-18 21:34:13 +08:00
670d479e32 Bump pyobvector to 0.2.15 (#24120) 2025-08-18 17:36:27 +08:00
ae7de7d36b fix: treat default template of code as empty (#24106)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-08-18 16:52:27 +08:00
ef5decc98a Chore: remove some dead code in experience-enhance-group (#24110)
Co-authored-by: Yongtao Huang <99629139+hyongtao-db@users.noreply.github.com>
2025-08-18 16:51:43 +08:00
4445460eca fix: validate checklist before publishing workflow (#24104) 2025-08-18 16:46:22 +08:00
8288b1dcab Revert "fix pg_vector extension requires SUPERUSER, but not availabl… (#24108) 2025-08-18 16:46:15 +08:00
16d1289a0a fix pg_vector extension requires SUPERUSER, but not available on Huawei Cloud RDS (#24093)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-08-18 16:29:36 +08:00
ba775a1c90 chore: translate i18n files (#24102)
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
2025-08-18 16:18:45 +08:00
b0e58f9da7 Feature/improve goto anything commands (#24091) 2025-08-18 16:07:54 +08:00
531e784a92 feat: no longer enable auto upgrade when marketplace is disabled (#24… (#24101) 2025-08-18 15:57:33 +08:00
5e8fe30035 fix(ui): Optimize UI component styles and layouts (#24090) (#24092) 2025-08-18 15:56:10 +08:00
f5033c5a0e fix: no current code caused code generation show error (#24086) 2025-08-18 14:18:08 +08:00
26d7654851 chore: translate i18n files (#24081)
Co-authored-by: Stream29 <36751053+Stream29@users.noreply.github.com>
2025-08-18 12:45:17 +08:00
790a6ec203 fix: return empty list instead of raising exception for qdrant search when score_threshold is 1 (#24032) 2025-08-18 12:44:05 +08:00
de9c5f10b3 feat: enchance prompt and code (#23633)
Co-authored-by: stream <stream@dify.ai>
Co-authored-by: Stream <1542763342@qq.com>
Co-authored-by: Stream <Stream_2@qq.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-08-18 12:29:12 +08:00
a7fe0e3f87 fix(oauth): redis compatibility (#23959) 2025-08-18 11:14:08 +08:00
218e247fd2 refactor: improve loading animation and debug panel styles (#24075) 2025-08-18 11:12:47 +08:00
6b51530e21 fix: update first_id logic to use the oldest answer item in chat messages (#23992)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
2025-08-18 11:04:18 +08:00
9755564a05 Fix sticky table header transparency with backdrop-filter blur in dark mode (#23999) 2025-08-18 11:03:15 +08:00
052d0e059e feat: add Redis SSL/TLS certificate authentication support (#23624) 2025-08-18 10:59:07 +08:00
fa4d3bba86 feat: add CLAUDE.md for LLM-assisted development guidance (#23946) 2025-08-18 10:13:44 +08:00
f8fc9f8c79 feat: add select input support to the conversation opener (#24043) 2025-08-18 10:01:29 +08:00
80f0594f4b feat: add testcontainers based tests for model loadbalancing service (#24066) 2025-08-18 09:54:22 +08:00
97b24f48d5 feat: add testcontainers based tests for metadata service (#24048) 2025-08-18 09:43:20 +08:00
b475a6b257 chore: synchronize translations (#24044) 2025-08-18 09:29:52 +08:00
ff52a54fef Restore useLabelStore mistakenly removed in commit 403e2d58 (#24052)
Co-authored-by: Yongtao Huang <99629139+hyongtao-db@users.noreply.github.com>
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
2025-08-18 09:22:59 +08:00
c69634085d Revert "Fix: Correct file variable handling for custom tools in workflow (#24061) 2025-08-17 23:14:37 +08:00
0a9af45194 no used function for message_queue. (#24027)
Signed-off-by: zhanluxianshen <zhanluxianshen@163.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-08-17 09:33:22 +08:00
d92ddc4dd4 chore(i18n): correct japanese translation (#24041) 2025-08-17 09:32:57 +08:00
32fa817eaa Update mypy.ini (#24014)
Co-authored-by: Yongtao Huang <99629139+hyongtao-db@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-08-16 19:29:19 +08:00
af10b3c5fa Fix: add 'api_key' alias for backward compatibility (#24022)
Signed-off-by: Yongtao Huang <yongtaoh@gmail.com>
Co-authored-by: Yongtao Huang <99629139+hyongtao-db@users.noreply.github.com>
2025-08-16 19:28:31 +08:00
8b601a983c Fix missing user_id in trace_manager (#24024) 2025-08-16 11:08:30 +08:00
4b9812ce6a fix: move database service call inside session context in workflow draft variable API (#23996) 2025-08-15 18:23:42 +08:00
c9e18346ce Chore: remove empty files and unused code (#23990)
Co-authored-by: Yongtao Huang <99629139+hyongtao-db@users.noreply.github.com>
2025-08-15 16:28:41 +08:00
462ba8f416 chore: add configurable stdio buffer sizes for plugins in compose file (#23980) 2025-08-15 15:25:11 +08:00
658157e9a1 chore: improved type annotations in MCP-related codes (#23984) 2025-08-15 15:19:30 +08:00
4031a46572 doc: add deployment pattern using Amazon ECS and CDK (#23985) 2025-08-15 15:18:53 +08:00
821fe26b56 fix comparison with callable (#23978) 2025-08-15 15:03:00 +08:00
352776ba77 update: GitHub star fallback count to current value (#23957) 2025-08-15 11:25:50 +08:00
f560116fb2 fix: 504 Gateway Time-out error on /console/api/version endpoint (#23961) 2025-08-15 11:25:25 +08:00
e7a5268fdd Fix hover button contrast in dark mode for app and dataset cards (#23955) 2025-08-15 10:38:03 +08:00
aa71173dbb Feat: External_trace_id compatible with OpenTelemetry (#23918)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-08-15 09:13:41 +08:00
11fdcb18c6 feature: add test for tool engine serialization (#23951) 2025-08-15 09:12:29 +08:00
62c34c4bc2 refactor: unify pnpm version management with packageManager field (#23943) 2025-08-15 09:01:18 +08:00
01f2333381 chore: remove redundant .env.example from root directory (#23948) 2025-08-15 08:59:49 +08:00
8d47213529 fix(workflow/if-else): keep conditions in sync on variable rename (#23611)
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: crazywoola <427733928@qq.com>
2025-08-14 19:55:18 +08:00
f40e2cf98a Fix: remove redundant allowed_keys check in jsonable_encoder (#23931)
Signed-off-by: Yongtao Huang <yongtaoh2022@gmail.com>
Co-authored-by: Yongtao Huang <99629139+hyongtao-db@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-08-14 19:52:07 +08:00
05e071bf2f fix: resolve user profile dropdown cache sync issue across layouts (#23937) 2025-08-14 19:51:28 +08:00
e340fccafb feat: integrate flask-orjson for improved JSON serialization performance (#23935) 2025-08-14 19:50:59 +08:00
4a2e6af9b5 Fixes #23921 (#23924)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-08-14 15:54:25 +08:00
acba135de1 Revert "feat: support to upload files for visual model call when running LLM node for debugging in a single step" (#23922) 2025-08-14 15:52:19 +08:00
2476511368 Fix: replace deprecated String.prototype.substr with slice (#23915)
Signed-off-by: Yongtao Huang <yongtaoh2022@gmail.com>
Co-authored-by: Yongtao Huang <99629139+hyongtao-db@users.noreply.github.com>
2025-08-14 13:47:23 +08:00
c39dfad7b6 fix: mime_type could be None (#23880)
Co-authored-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-08-14 13:40:06 +08:00
9e29309ffd fix: ensure custom headers are ignored when using bearer or basic authorization (#23584)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-08-14 10:18:12 +08:00
e0f0813b7c Add Test Containers Based Tests for App Generation Services (#23899) 2025-08-14 10:16:41 +08:00
caf50ea01e fix: resolve text clipping issues in overview chart components (#23907) 2025-08-14 10:16:20 +08:00
d4756ba659 Fix multipart/form-data boundary issue in HTTP Call node (#23903)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-08-14 10:01:27 +08:00
cc4d82f932 fix(web): resolve TypeScript errors in app overview components (#23901) 2025-08-14 09:22:27 +08:00
7286b4ad06 fix(api): resolve "Message not exists" error in admin feedback creation (#23232)
Fix regression introduced in PR #22580 where admin users encountered 
"Message not exists" errors when creating feedback on messages created 
by other users.

The issue was caused by `MessageService.create_feedback()` incorrectly 
filtering messages by the current user's ID, preventing admins from 
accessing messages created by end users. 

Reverts: #22580
2025-08-13 23:57:25 +08:00
02194db0c6 Fix: narrow beforeRequest hook type to avoid boolean in array (#23860)
Signed-off-by: Yongtao Huang <yongtaoh2022@gmail.com>
Co-authored-by: Yongtao Huang <99629139+hyongtao-db@users.noreply.github.com>
2025-08-13 23:28:13 +08:00
b6bd145130 Remove redundant acceptedKeys check (#23891)
Signed-off-by: Yongtao Huang <yongtaoh2022@gmail.com>
2025-08-13 23:24:23 +08:00
5dcbc9861b fix: add dark mode support for embedded modal option icons (#23893) 2025-08-13 23:20:33 +08:00
3ac4e122eb Update use-document-title.ts to fix favicon.ico path (#23872) 2025-08-13 08:08:32 -07:00
e336a8666a fix translation (#23873) 2025-08-13 17:17:16 +08:00
a36fdf6a7d chore(cmdk): Resolve default option selection issue in GotoAnything component (#23878) (#23813) 2025-08-13 17:14:21 +08:00
bf2f03f911 Restructure the File errors in controller (#23801)
Co-authored-by: Yongtao Huang <99629139+hyongtao-db@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-08-13 17:06:07 +08:00
5aaa47e25d fix: messages updated_at (#23869) 2025-08-13 15:34:52 +08:00
f884886ef4 style: update dark and light theme colors and add new color variables (#23865) 2025-08-13 14:50:41 +08:00
4da6ec787e feat: support to upload files for visual model call when running LLM node for debugging in a single step (#23521)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-08-13 14:07:13 +08:00
2da00bb4ec Fix misleading Studio button in account header (#23842) 2025-08-13 13:56:06 +08:00
e0fe0e1a3e fix: goto-anything command filter should only match shortcut (#23862) 2025-08-13 13:55:25 +08:00
e11a334c9b fix: SimpleSelect chevron icon state sync and add notClearable to monitoring selector (#23858) 2025-08-13 13:55:10 +08:00
21e1b825fe fix: optimize dataset cleanup task (#23828)
Signed-off-by: kenwoodjw <blackxin55+@gmail.com>
2025-08-13 11:22:03 +08:00
1cf7c3430a Add more comprehensive Test Containers Based Tests for advanced prompt service (#23850) 2025-08-13 11:21:32 +08:00
ad2bc7f8ac fix: update modal component to use relative positioning (#23855) 2025-08-13 11:20:40 +08:00
e600070a61 feat(api): auto-delete WorkflowDraftVariable when app is deleted (#23737)
This commit introduces a background task that automatically deletes `WorkflowDraftVariable` records when
their associated workflow apps are deleted.

Additionally, it adds a new cleanup script
`cleanup-orphaned-draft-variables` to remove existing orphaned draft variables from the database.
2025-08-13 11:13:08 +08:00
854c1aa37d fix: goto-anything highlighting consistency improvements (#23843) 2025-08-13 10:15:45 +08:00
74ab057f56 refactor: improve Redis wrapper type hints and fix None value handling (#23845) 2025-08-13 09:46:02 +08:00
ccc6d5975f chore: rename misleading 'chore.yaml' issue template to 'refactor.yml' (#23847) 2025-08-13 09:28:42 +08:00
b3399642c5 feat: Add an asynchronous repository to improve workflow performance (#20050)
Co-authored-by: liangxin <liangxin@shein.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: liangxin <xinlmain@gmail.com>
2025-08-13 02:28:06 +08:00
6e6389c930 chore: translate i18n files (#23841)
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
2025-08-12 23:56:50 +08:00
973a390298 Feature/run cmd (#23822) 2025-08-12 23:47:50 +08:00
a77dfb69b0 chore: update uv to 0.8.9 (#23833) 2025-08-12 23:41:39 +08:00
9769318875 Fix missing import in app.ts (#23831) 2025-08-12 23:27:31 +08:00
c7f36d1a5a chore: goto anything mouse keyboard interaction (#23805) 2025-08-12 18:17:19 +08:00
cb46726fa4 Add Test Containers Based Tests for File Service (#23771) 2025-08-12 18:16:07 +08:00
7820e31a92 fix: add missing translation keys for goto anything command selector (#23815) 2025-08-12 18:14:57 +08:00
de0dae9d9b Fix node search (#23795) 2025-08-12 14:48:35 +08:00
a09935d9b9 chore: restore @mdx-js dependencies in package.json and pnpm-lock.yaml (#23792) 2025-08-12 14:45:56 +08:00
a62371940f fix: remove misleading clear buttons and improve SimpleSelect UX (#23791) 2025-08-12 13:29:51 +08:00
02f7677d92 chore: translate i18n files (#23789)
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
2025-08-12 13:29:33 +08:00
1ffe190557 Feat/improved mcp timeout configs (#23605)
Co-authored-by: crazywoola <427733928@qq.com>
2025-08-12 13:14:00 +08:00
d3eff9b1a3 fix: prevent X button flying to screen corners in dataset settings modal (#23788) 2025-08-12 12:03:04 +08:00
66232792a2 fix: add MAX_TREE_DEPTH in env.service.web (#23785)
Co-authored-by: crazywoola <427733928@qq.com>
2025-08-12 12:01:51 +08:00
dc2aaae414 feat: add highPriority option to Modal for goto-anything layering (#23783) 2025-08-12 10:37:11 +08:00
c0bb2ec851 feat: If combining text and files, place the text prompt after the fi… (#23779)
Co-authored-by: 刘江波 <jiangbo721@163.com>
2025-08-12 10:36:55 +08:00
7566d90dfe fix issue #23758 (#23764)
Co-authored-by: root <root@thinkpad-pc.localdomain>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-08-12 10:26:13 +08:00
b38f195a0d test: add comprehensive test suite for rate limiting module (#23765) 2025-08-12 10:05:30 +08:00
a6c5b7414d Fix: expose MAX_TREE_DEPTH in env (#23743)
Co-authored-by: crazywoola <427733928@qq.com>
2025-08-12 09:56:31 +08:00
cdee5aab3a fix: update integration tests to use 2-element variable selectors (#23766) 2025-08-12 09:33:09 +08:00
4240e2dd29 fix(api): fix flaky tests by generating unique variable names (#23768) 2025-08-12 09:31:15 +08:00
e298eee822 feat: add select-none class to tag filter components to prevent text selection (#23774) 2025-08-12 09:23:59 +08:00
bc1cfd4373 hotfix: fix translation (#23757) 2025-08-11 22:36:39 +08:00
2944a4fd43 feat: add filtering support for @ command selector in goto-anything (#23763) 2025-08-11 22:21:37 +08:00
a44ca29717 Chore: remove unused var in ModelProviderFactory (#23690) 2025-08-11 22:04:11 +08:00
332e8e68ee refactor: Change _queue_manager to public attribute queue_manager in task pipelines (#23747) 2025-08-11 18:18:07 +08:00
577062b93a refactor: simplify variable pool key structure and improve type safety (#23732)
Signed-off-by: -LAN- <laipz8200@outlook.com>
2025-08-11 18:10:04 +08:00
223c1a8089 test(api): fix flaky tests in TestWorkflowDraftVariableService (#23749)
Fix flaky test
`TestWorkflowDraftVariableService.test_list_variables_without_values_success`
caused by low entropy in test data generation that led to
duplicate values violating unique constraints.

Also improve data generation in other tests within
`TestWorkflowDraftVariableService` to reduce the likelihood of
generating duplicate test data.
2025-08-11 17:39:58 +08:00
430 changed files with 18194 additions and 3462 deletions

View File

@ -5,7 +5,7 @@ cd web && pnpm install
pipx install uv
echo 'alias start-api="cd /workspaces/dify/api && uv run python -m flask run --host 0.0.0.0 --port=5001 --debug"' >> ~/.bashrc
echo 'alias start-worker="cd /workspaces/dify/api && uv run python -m celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion"' >> ~/.bashrc
echo 'alias start-worker="cd /workspaces/dify/api && uv run python -m celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage"' >> ~/.bashrc
echo 'alias start-web="cd /workspaces/dify/web && pnpm dev"' >> ~/.bashrc
echo 'alias start-web-prod="cd /workspaces/dify/web && pnpm build && pnpm start"' >> ~/.bashrc
echo 'alias start-containers="cd /workspaces/dify/docker && docker-compose -f docker-compose.middleware.yaml -p dify --env-file middleware.env up -d"' >> ~/.bashrc

File diff suppressed because it is too large Load Diff

View File

@ -8,7 +8,7 @@ inputs:
uv-version:
description: UV version to set up
required: true
default: '~=0.7.11'
default: '0.8.9'
uv-lockfile:
description: Path to the UV lockfile to restore cache from
required: true
@ -26,7 +26,7 @@ runs:
python-version: ${{ inputs.python-version }}
- name: Install uv
uses: astral-sh/setup-uv@v5
uses: astral-sh/setup-uv@v6
with:
version: ${{ inputs.uv-version }}
python-version: ${{ inputs.python-version }}

View File

@ -23,6 +23,9 @@ jobs:
uv run ruff check --fix-only .
# Format code
uv run ruff format .
- name: ast-grep
run: |
uvx --from ast-grep-cli sg --pattern 'db.session.query($WHATEVER).filter($HERE)' --rewrite 'db.session.query($WHATEVER).where($HERE)' -l py --update-all
- uses: autofix-ci/action@635ffb0c9798bd160680f18fd73371e355b85f27

View File

@ -82,7 +82,7 @@ jobs:
- name: Install pnpm
uses: pnpm/action-setup@v4
with:
version: 10
package_json_file: web/package.json
run_install: false
- name: Setup NodeJS
@ -95,10 +95,12 @@ jobs:
- name: Web dependencies
if: steps.changed-files.outputs.any_changed == 'true'
working-directory: ./web
run: pnpm install --frozen-lockfile
- name: Web style check
if: steps.changed-files.outputs.any_changed == 'true'
working-directory: ./web
run: pnpm run lint
docker-compose-template:

View File

@ -46,7 +46,7 @@ jobs:
- name: Install pnpm
uses: pnpm/action-setup@v4
with:
version: 10
package_json_file: web/package.json
run_install: false
- name: Set up Node.js
@ -59,10 +59,12 @@ jobs:
- name: Install dependencies
if: env.FILES_CHANGED == 'true'
working-directory: ./web
run: pnpm install --frozen-lockfile
- name: Generate i18n translations
if: env.FILES_CHANGED == 'true'
working-directory: ./web
run: pnpm run auto-gen-i18n ${{ env.FILE_ARGS }}
- name: Create Pull Request

View File

@ -35,7 +35,7 @@ jobs:
if: steps.changed-files.outputs.any_changed == 'true'
uses: pnpm/action-setup@v4
with:
version: 10
package_json_file: web/package.json
run_install: false
- name: Setup Node.js
@ -48,8 +48,10 @@ jobs:
- name: Install dependencies
if: steps.changed-files.outputs.any_changed == 'true'
working-directory: ./web
run: pnpm install --frozen-lockfile
- name: Run tests
if: steps.changed-files.outputs.any_changed == 'true'
working-directory: ./web
run: pnpm test

2
.gitignore vendored
View File

@ -197,6 +197,8 @@ sdks/python-client/dify_client.egg-info
!.vscode/README.md
pyrightconfig.json
api/.vscode
# vscode Code History Extension
.history
.idea/

83
CLAUDE.md Normal file
View File

@ -0,0 +1,83 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
Dify is an open-source platform for developing LLM applications with an intuitive interface combining agentic AI workflows, RAG pipelines, agent capabilities, and model management.
The codebase consists of:
- **Backend API** (`/api`): Python Flask application with Domain-Driven Design architecture
- **Frontend Web** (`/web`): Next.js 15 application with TypeScript and React 19
- **Docker deployment** (`/docker`): Containerized deployment configurations
## Development Commands
### Backend (API)
All Python commands must be prefixed with `uv run --project api`:
```bash
# Start development servers
./dev/start-api # Start API server
./dev/start-worker # Start Celery worker
# Run tests
uv run --project api pytest # Run all tests
uv run --project api pytest tests/unit_tests/ # Unit tests only
uv run --project api pytest tests/integration_tests/ # Integration tests
# Code quality
./dev/reformat # Run all formatters and linters
uv run --project api ruff check --fix ./ # Fix linting issues
uv run --project api ruff format ./ # Format code
uv run --project api mypy . # Type checking
```
### Frontend (Web)
```bash
cd web
pnpm lint # Run ESLint
pnpm eslint-fix # Fix ESLint issues
pnpm test # Run Jest tests
```
## Testing Guidelines
### Backend Testing
- Use `pytest` for all backend tests
- Write tests first (TDD approach)
- Test structure: Arrange-Act-Assert
## Code Style Requirements
### Python
- Use type hints for all functions and class attributes
- No `Any` types unless absolutely necessary
- Implement special methods (`__repr__`, `__str__`) appropriately
### TypeScript/JavaScript
- Strict TypeScript configuration
- ESLint with Prettier integration
- Avoid `any` type
## Important Notes
- **Environment Variables**: Always use UV for Python commands: `uv run --project api <command>`
- **Comments**: Only write meaningful comments that explain "why", not "what"
- **File Creation**: Always prefer editing existing files over creating new ones
- **Documentation**: Don't create documentation files unless explicitly requested
- **Code Quality**: Always run `./dev/reformat` before committing backend changes
## Common Development Tasks
### Adding a New API Endpoint
1. Create controller in `/api/controllers/`
2. Add service logic in `/api/services/`
3. Update routes in controller's `__init__.py`
4. Write tests in `/api/tests/`
## Project-Specific Conventions
- All async tasks use Celery with Redis as broker

View File

@ -225,7 +225,8 @@ Deploy Dify to AWS with [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK by @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK by @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK by @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
#### Using Alibaba Cloud Computing Nest

View File

@ -208,7 +208,8 @@ docker compose up -d
##### AWS
- [AWS CDK بواسطة @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK بواسطة @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK بواسطة @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
#### استخدام Alibaba Cloud للنشر
[بسرعة نشر Dify إلى سحابة علي بابا مع عش الحوسبة السحابية علي بابا](https://computenest.console.aliyun.com/service/instance/create/default?type=user&ServiceName=Dify%E7%A4%BE%E5%8C%BA%E7%89%88)

View File

@ -225,7 +225,8 @@ GitHub-এ ডিফাইকে স্টার দিয়ে রাখুন
##### AWS
- [AWS CDK by @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK by @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK by @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
#### Alibaba Cloud ব্যবহার করে ডিপ্লয়

View File

@ -223,7 +223,8 @@ docker compose up -d
使用 [CDK](https://aws.amazon.com/cdk/) 将 Dify 部署到 AWS
##### AWS
- [AWS CDK by @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK by @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK by @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
#### 使用 阿里云计算巢 部署

View File

@ -220,7 +220,8 @@ Stellen Sie Dify mit nur einem Klick mithilfe von [terraform](https://www.terraf
Bereitstellung von Dify auf AWS mit [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK by @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK by @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK by @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
#### Alibaba Cloud

View File

@ -220,7 +220,8 @@ Despliega Dify en una plataforma en la nube con un solo clic utilizando [terrafo
Despliegue Dify en AWS usando [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK por @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK por @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK por @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
#### Alibaba Cloud

View File

@ -218,7 +218,8 @@ Déployez Dify sur une plateforme cloud en un clic en utilisant [terraform](http
Déployez Dify sur AWS en utilisant [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK par @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK par @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK par @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
#### Alibaba Cloud

View File

@ -219,7 +219,8 @@ docker compose up -d
[CDK](https://aws.amazon.com/cdk/) を使用して、DifyをAWSにデプロイします
##### AWS
- [@KevinZhaoによるAWS CDK](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [@KevinZhaoによるAWS CDK (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [@tmokmssによるAWS CDK (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
#### Alibaba Cloud
[Alibaba Cloud Computing Nest](https://computenest.console.aliyun.com/service/instance/create/default?type=user&ServiceName=Dify%E7%A4%BE%E5%8C%BA%E7%89%88)

View File

@ -218,7 +218,8 @@ wa'logh nIqHom neH ghun deployment toy'wI' [terraform](https://www.terraform.io/
wa'logh nIqHom neH ghun deployment toy'wI' [CDK](https://aws.amazon.com/cdk/) lo'laH.
##### AWS
- [AWS CDK qachlot @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK qachlot @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK qachlot @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
#### Alibaba Cloud

View File

@ -212,7 +212,8 @@ Dify를 Kubernetes에 배포하고 프리미엄 스케일링 설정을 구성했
[CDK](https://aws.amazon.com/cdk/)를 사용하여 AWS에 Dify 배포
##### AWS
- [KevinZhao의 AWS CDK](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [KevinZhao의 AWS CDK (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [tmokmss의 AWS CDK (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
#### Alibaba Cloud

View File

@ -217,7 +217,8 @@ Implante o Dify na Plataforma Cloud com um único clique usando [terraform](http
Implante o Dify na AWS usando [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK por @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK por @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK por @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
#### Alibaba Cloud

View File

@ -218,7 +218,8 @@ namestite Dify v Cloud Platform z enim klikom z uporabo [terraform](https://www.
Uvedite Dify v AWS z uporabo [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK by @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK by @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK by @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
#### Alibaba Cloud

View File

@ -211,7 +211,8 @@ Dify'ı bulut platformuna tek tıklamayla dağıtın [terraform](https://www.ter
[CDK](https://aws.amazon.com/cdk/) kullanarak Dify'ı AWS'ye dağıtın
##### AWS
- [AWS CDK tarafından @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK tarafından @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK tarafından @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
#### Alibaba Cloud

View File

@ -223,7 +223,8 @@ Dify 的所有功能都提供相應的 API因此您可以輕鬆地將 Dify
### AWS
- [由 @KevinZhao 提供的 AWS CDK](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [由 @KevinZhao 提供的 AWS CDK (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [由 @tmokmss 提供的 AWS CDK (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
#### 使用 阿里云计算巢進行部署

View File

@ -213,7 +213,8 @@ Triển khai Dify lên nền tảng đám mây với một cú nhấp chuột b
Triển khai Dify trên AWS bằng [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK bởi @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK bởi @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK bởi @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
#### Alibaba Cloud

View File

@ -42,6 +42,15 @@ REDIS_PORT=6379
REDIS_USERNAME=
REDIS_PASSWORD=difyai123456
REDIS_USE_SSL=false
# SSL configuration for Redis (when REDIS_USE_SSL=true)
REDIS_SSL_CERT_REQS=CERT_NONE
# Options: CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED
REDIS_SSL_CA_CERTS=
# Path to CA certificate file for SSL verification
REDIS_SSL_CERTFILE=
# Path to client certificate file for SSL authentication
REDIS_SSL_KEYFILE=
# Path to client private key file for SSL authentication
REDIS_DB=0
# redis Sentinel configuration.
@ -469,6 +478,13 @@ API_WORKFLOW_NODE_EXECUTION_REPOSITORY=repositories.sqlalchemy_api_workflow_node
# API workflow run repository implementation
API_WORKFLOW_RUN_REPOSITORY=repositories.sqlalchemy_api_workflow_run_repository.DifyAPISQLAlchemyWorkflowRunRepository
# Workflow log cleanup configuration
# Enable automatic cleanup of workflow run logs to manage database size
WORKFLOW_LOG_CLEANUP_ENABLED=true
# Number of days to retain workflow run logs (default: 30 days)
WORKFLOW_LOG_RETENTION_DAYS=30
# Batch size for workflow log cleanup operations (default: 100)
WORKFLOW_LOG_CLEANUP_BATCH_SIZE=100
# App configuration
APP_MAX_EXECUTION_TIME=1200

View File

@ -4,7 +4,7 @@ FROM python:3.12-slim-bookworm AS base
WORKDIR /app/api
# Install uv
ENV UV_VERSION=0.7.11
ENV UV_VERSION=0.8.9
RUN pip install --no-cache-dir uv==${UV_VERSION}

View File

@ -74,7 +74,7 @@
10. If you need to handle and debug the async tasks (e.g. dataset importing and documents indexing), please start the worker service.
```bash
uv run celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion,plugin
uv run celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage
```
Addition, if you want to debug the celery scheduled tasks, you can use the following command in another terminal:

View File

@ -51,6 +51,7 @@ def initialize_extensions(app: DifyApp):
ext_login,
ext_mail,
ext_migrate,
ext_orjson,
ext_otel,
ext_proxy_fix,
ext_redis,
@ -67,6 +68,7 @@ def initialize_extensions(app: DifyApp):
ext_logging,
ext_warnings,
ext_import_modules,
ext_orjson,
ext_set_secretkey,
ext_compress,
ext_code_based_extension,

View File

@ -36,6 +36,7 @@ from services.account_service import AccountService, RegisterService, TenantServ
from services.clear_free_plan_tenant_expired_logs import ClearFreePlanTenantExpiredLogs
from services.plugin.data_migration import PluginDataMigration
from services.plugin.plugin_migration import PluginMigration
from tasks.remove_app_and_related_data_task import delete_draft_variables_batch
@click.command("reset-password", help="Reset the account password.")
@ -1202,3 +1203,138 @@ def setup_system_tool_oauth_client(provider, client_params):
db.session.add(oauth_client)
db.session.commit()
click.echo(click.style(f"OAuth client params setup successfully. id: {oauth_client.id}", fg="green"))
def _find_orphaned_draft_variables(batch_size: int = 1000) -> list[str]:
"""
Find draft variables that reference non-existent apps.
Args:
batch_size: Maximum number of orphaned app IDs to return
Returns:
List of app IDs that have draft variables but don't exist in the apps table
"""
query = """
SELECT DISTINCT wdv.app_id
FROM workflow_draft_variables AS wdv
WHERE NOT EXISTS(
SELECT 1 FROM apps WHERE apps.id = wdv.app_id
)
LIMIT :batch_size
"""
with db.engine.connect() as conn:
result = conn.execute(sa.text(query), {"batch_size": batch_size})
return [row[0] for row in result]
def _count_orphaned_draft_variables() -> dict[str, Any]:
"""
Count orphaned draft variables by app.
Returns:
Dictionary with statistics about orphaned variables
"""
query = """
SELECT
wdv.app_id,
COUNT(*) as variable_count
FROM workflow_draft_variables AS wdv
WHERE NOT EXISTS(
SELECT 1 FROM apps WHERE apps.id = wdv.app_id
)
GROUP BY wdv.app_id
ORDER BY variable_count DESC
"""
with db.engine.connect() as conn:
result = conn.execute(sa.text(query))
orphaned_by_app = {row[0]: row[1] for row in result}
total_orphaned = sum(orphaned_by_app.values())
app_count = len(orphaned_by_app)
return {
"total_orphaned_variables": total_orphaned,
"orphaned_app_count": app_count,
"orphaned_by_app": orphaned_by_app,
}
@click.command()
@click.option("--dry-run", is_flag=True, help="Show what would be deleted without actually deleting")
@click.option("--batch-size", default=1000, help="Number of records to process per batch (default 1000)")
@click.option("--max-apps", default=None, type=int, help="Maximum number of apps to process (default: no limit)")
@click.option("-f", "--force", is_flag=True, help="Skip user confirmation and force the command to execute.")
def cleanup_orphaned_draft_variables(
dry_run: bool,
batch_size: int,
max_apps: int | None,
force: bool = False,
):
"""
Clean up orphaned draft variables from the database.
This script finds and removes draft variables that belong to apps
that no longer exist in the database.
"""
logger = logging.getLogger(__name__)
# Get statistics
stats = _count_orphaned_draft_variables()
logger.info("Found %s orphaned draft variables", stats["total_orphaned_variables"])
logger.info("Across %s non-existent apps", stats["orphaned_app_count"])
if stats["total_orphaned_variables"] == 0:
logger.info("No orphaned draft variables found. Exiting.")
return
if dry_run:
logger.info("DRY RUN: Would delete the following:")
for app_id, count in sorted(stats["orphaned_by_app"].items(), key=lambda x: x[1], reverse=True)[
:10
]: # Show top 10
logger.info(" App %s: %s variables", app_id, count)
if len(stats["orphaned_by_app"]) > 10:
logger.info(" ... and %s more apps", len(stats["orphaned_by_app"]) - 10)
return
# Confirm deletion
if not force:
click.confirm(
f"Are you sure you want to delete {stats['total_orphaned_variables']} "
f"orphaned draft variables from {stats['orphaned_app_count']} apps?",
abort=True,
)
total_deleted = 0
processed_apps = 0
while True:
if max_apps and processed_apps >= max_apps:
logger.info("Reached maximum app limit (%s). Stopping.", max_apps)
break
orphaned_app_ids = _find_orphaned_draft_variables(batch_size=10)
if not orphaned_app_ids:
logger.info("No more orphaned draft variables found.")
break
for app_id in orphaned_app_ids:
if max_apps and processed_apps >= max_apps:
break
try:
deleted_count = delete_draft_variables_batch(app_id, batch_size)
total_deleted += deleted_count
processed_apps += 1
logger.info("Deleted %s variables for app %s", deleted_count, app_id)
except Exception:
logger.exception("Error processing app %s", app_id)
continue
logger.info("Cleanup completed. Total deleted: %s variables across %s apps", total_deleted, processed_apps)

View File

@ -552,12 +552,18 @@ class RepositoryConfig(BaseSettings):
"""
CORE_WORKFLOW_EXECUTION_REPOSITORY: str = Field(
description="Repository implementation for WorkflowExecution. Specify as a module path",
description="Repository implementation for WorkflowExecution. Options: "
"'core.repositories.sqlalchemy_workflow_execution_repository.SQLAlchemyWorkflowExecutionRepository' (default), "
"'core.repositories.celery_workflow_execution_repository.CeleryWorkflowExecutionRepository'",
default="core.repositories.sqlalchemy_workflow_execution_repository.SQLAlchemyWorkflowExecutionRepository",
)
CORE_WORKFLOW_NODE_EXECUTION_REPOSITORY: str = Field(
description="Repository implementation for WorkflowNodeExecution. Specify as a module path",
description="Repository implementation for WorkflowNodeExecution. Options: "
"'core.repositories.sqlalchemy_workflow_node_execution_repository."
"SQLAlchemyWorkflowNodeExecutionRepository' (default), "
"'core.repositories.celery_workflow_node_execution_repository."
"CeleryWorkflowNodeExecutionRepository'",
default="core.repositories.sqlalchemy_workflow_node_execution_repository.SQLAlchemyWorkflowNodeExecutionRepository",
)
@ -962,6 +968,14 @@ class AccountConfig(BaseSettings):
)
class WorkflowLogConfig(BaseSettings):
WORKFLOW_LOG_CLEANUP_ENABLED: bool = Field(default=True, description="Enable workflow run log cleanup")
WORKFLOW_LOG_RETENTION_DAYS: int = Field(default=30, description="Retention days for workflow run logs")
WORKFLOW_LOG_CLEANUP_BATCH_SIZE: int = Field(
default=100, description="Batch size for workflow run log cleanup operations"
)
class FeatureConfig(
# place the configs in alphabet order
AppExecutionConfig,
@ -997,5 +1011,6 @@ class FeatureConfig(
HostedServiceConfig,
CeleryBeatConfig,
CeleryScheduleTasksConfig,
WorkflowLogConfig,
):
pass

View File

@ -39,6 +39,26 @@ class RedisConfig(BaseSettings):
default=False,
)
REDIS_SSL_CERT_REQS: str = Field(
description="SSL certificate requirements (CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED)",
default="CERT_NONE",
)
REDIS_SSL_CA_CERTS: Optional[str] = Field(
description="Path to the CA certificate file for SSL verification",
default=None,
)
REDIS_SSL_CERTFILE: Optional[str] = Field(
description="Path to the client certificate file for SSL authentication",
default=None,
)
REDIS_SSL_KEYFILE: Optional[str] = Field(
description="Path to the client private key file for SSL authentication",
default=None,
)
REDIS_USE_SENTINEL: Optional[bool] = Field(
description="Enable Redis Sentinel mode for high availability",
default=False,

View File

@ -1,5 +1,7 @@
from werkzeug.exceptions import HTTPException
from libs.exception import BaseHTTPException
class FilenameNotExistsError(HTTPException):
code = 400
@ -9,3 +11,27 @@ class FilenameNotExistsError(HTTPException):
class RemoteFileUploadError(HTTPException):
code = 400
description = "Error uploading remote file."
class FileTooLargeError(BaseHTTPException):
error_code = "file_too_large"
description = "File size exceeded. {message}"
code = 413
class UnsupportedFileTypeError(BaseHTTPException):
error_code = "unsupported_file_type"
description = "File type not allowed."
code = 415
class TooManyFilesError(BaseHTTPException):
error_code = "too_many_files"
description = "Only one file is allowed."
code = 400
class NoFileUploadedError(BaseHTTPException):
error_code = "no_file_uploaded"
description = "Please upload your file."
code = 400

View File

@ -1,3 +1,4 @@
import contextlib
import mimetypes
import os
import platform
@ -65,10 +66,8 @@ def guess_file_info_from_response(response: httpx.Response):
# Use python-magic to guess MIME type if still unknown or generic
if mimetype == "application/octet-stream" and magic is not None:
try:
with contextlib.suppress(magic.MagicException):
mimetype = magic.from_buffer(response.content[:1024], mime=True)
except magic.MagicException:
pass
extension = os.path.splitext(filename)[1]

View File

@ -1,11 +1,12 @@
from typing import Literal
from flask import request
from flask_login import current_user
from flask_restful import Resource, marshal, marshal_with, reqparse
from werkzeug.exceptions import Forbidden
from controllers.common.errors import NoFileUploadedError, TooManyFilesError
from controllers.console import api
from controllers.console.app.error import NoFileUploadedError
from controllers.console.datasets.error import TooManyFilesError
from controllers.console.wraps import (
account_initialization_required,
cloud_edition_billing_resource_check,
@ -25,7 +26,7 @@ class AnnotationReplyActionApi(Resource):
@login_required
@account_initialization_required
@cloud_edition_billing_resource_check("annotation")
def post(self, app_id, action):
def post(self, app_id, action: Literal["enable", "disable"]):
if not current_user.is_editor:
raise Forbidden()
@ -39,8 +40,6 @@ class AnnotationReplyActionApi(Resource):
result = AppAnnotationService.enable_app_annotation(args, app_id)
elif action == "disable":
result = AppAnnotationService.disable_app_annotation(app_id)
else:
raise ValueError("Unsupported annotation reply action")
return result, 200

View File

@ -1,6 +1,7 @@
import logging
import flask_login
from flask import request
from flask_restful import Resource, reqparse
from werkzeug.exceptions import InternalServerError, NotFound
@ -24,6 +25,7 @@ from core.errors.error import (
ProviderTokenNotInitError,
QuotaExceededError,
)
from core.helper.trace_id_helper import get_external_trace_id
from core.model_runtime.errors.invoke import InvokeError
from libs import helper
from libs.helper import uuid_value
@ -115,6 +117,10 @@ class ChatMessageApi(Resource):
streaming = args["response_mode"] != "blocking"
args["auto_generate_name"] = False
external_trace_id = get_external_trace_id(request)
if external_trace_id:
args["external_trace_id"] = external_trace_id
account = flask_login.current_user
try:

View File

@ -79,18 +79,6 @@ class ProviderNotSupportSpeechToTextError(BaseHTTPException):
code = 400
class NoFileUploadedError(BaseHTTPException):
error_code = "no_file_uploaded"
description = "Please upload your file."
code = 400
class TooManyFilesError(BaseHTTPException):
error_code = "too_many_files"
description = "Only one file is allowed."
code = 400
class DraftWorkflowNotExist(BaseHTTPException):
error_code = "draft_workflow_not_exist"
description = "Draft workflow need to be initialized."

View File

@ -1,3 +1,5 @@
from collections.abc import Sequence
from flask_login import current_user
from flask_restful import Resource, reqparse
@ -10,6 +12,8 @@ from controllers.console.app.error import (
)
from controllers.console.wraps import account_initialization_required, setup_required
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from core.helper.code_executor.javascript.javascript_code_provider import JavascriptCodeProvider
from core.helper.code_executor.python3.python3_code_provider import Python3CodeProvider
from core.llm_generator.llm_generator import LLMGenerator
from core.model_runtime.errors.invoke import InvokeError
from libs.login import login_required
@ -107,6 +111,121 @@ class RuleStructuredOutputGenerateApi(Resource):
return structured_output
class InstructionGenerateApi(Resource):
@setup_required
@login_required
@account_initialization_required
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("flow_id", type=str, required=True, default="", location="json")
parser.add_argument("node_id", type=str, required=False, default="", location="json")
parser.add_argument("current", type=str, required=False, default="", location="json")
parser.add_argument("language", type=str, required=False, default="javascript", location="json")
parser.add_argument("instruction", type=str, required=True, nullable=False, location="json")
parser.add_argument("model_config", type=dict, required=True, nullable=False, location="json")
parser.add_argument("ideal_output", type=str, required=False, default="", location="json")
args = parser.parse_args()
code_template = (
Python3CodeProvider.get_default_code()
if args["language"] == "python"
else (JavascriptCodeProvider.get_default_code())
if args["language"] == "javascript"
else ""
)
try:
# Generate from nothing for a workflow node
if (args["current"] == code_template or args["current"] == "") and args["node_id"] != "":
from models import App, db
from services.workflow_service import WorkflowService
app = db.session.query(App).where(App.id == args["flow_id"]).first()
if not app:
return {"error": f"app {args['flow_id']} not found"}, 400
workflow = WorkflowService().get_draft_workflow(app_model=app)
if not workflow:
return {"error": f"workflow {args['flow_id']} not found"}, 400
nodes: Sequence = workflow.graph_dict["nodes"]
node = [node for node in nodes if node["id"] == args["node_id"]]
if len(node) == 0:
return {"error": f"node {args['node_id']} not found"}, 400
node_type = node[0]["data"]["type"]
match node_type:
case "llm":
return LLMGenerator.generate_rule_config(
current_user.current_tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
no_variable=True,
)
case "agent":
return LLMGenerator.generate_rule_config(
current_user.current_tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
no_variable=True,
)
case "code":
return LLMGenerator.generate_code(
tenant_id=current_user.current_tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
code_language=args["language"],
)
case _:
return {"error": f"invalid node type: {node_type}"}
if args["node_id"] == "" and args["current"] != "": # For legacy app without a workflow
return LLMGenerator.instruction_modify_legacy(
tenant_id=current_user.current_tenant_id,
flow_id=args["flow_id"],
current=args["current"],
instruction=args["instruction"],
model_config=args["model_config"],
ideal_output=args["ideal_output"],
)
if args["node_id"] != "" and args["current"] != "": # For workflow node
return LLMGenerator.instruction_modify_workflow(
tenant_id=current_user.current_tenant_id,
flow_id=args["flow_id"],
node_id=args["node_id"],
current=args["current"],
instruction=args["instruction"],
model_config=args["model_config"],
ideal_output=args["ideal_output"],
)
return {"error": "incompatible parameters"}, 400
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
except QuotaExceededError:
raise ProviderQuotaExceededError()
except ModelCurrentlyNotSupportError:
raise ProviderModelCurrentlyNotSupportError()
except InvokeError as e:
raise CompletionRequestError(e.description)
class InstructionGenerationTemplateApi(Resource):
@setup_required
@login_required
@account_initialization_required
def post(self) -> dict:
parser = reqparse.RequestParser()
parser.add_argument("type", type=str, required=True, default=False, location="json")
args = parser.parse_args()
match args["type"]:
case "prompt":
from core.llm_generator.prompts import INSTRUCTION_GENERATE_TEMPLATE_PROMPT
return {"data": INSTRUCTION_GENERATE_TEMPLATE_PROMPT}
case "code":
from core.llm_generator.prompts import INSTRUCTION_GENERATE_TEMPLATE_CODE
return {"data": INSTRUCTION_GENERATE_TEMPLATE_CODE}
case _:
raise ValueError(f"Invalid type: {args['type']}")
api.add_resource(RuleGenerateApi, "/rule-generate")
api.add_resource(RuleCodeGenerateApi, "/rule-code-generate")
api.add_resource(RuleStructuredOutputGenerateApi, "/rule-structured-output-generate")
api.add_resource(InstructionGenerateApi, "/instruction-generate")
api.add_resource(InstructionGenerationTemplateApi, "/instruction-generate/template")

View File

@ -27,7 +27,7 @@ from fields.conversation_fields import annotation_fields, message_detail_fields
from libs.helper import uuid_value
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from libs.login import login_required
from models.model import AppMode, Conversation, Message, MessageAnnotation
from models.model import AppMode, Conversation, Message, MessageAnnotation, MessageFeedback
from services.annotation_service import AppAnnotationService
from services.errors.conversation import ConversationNotExistsError
from services.errors.message import MessageNotExistsError, SuggestedQuestionsAfterAnswerDisabledError
@ -124,17 +124,34 @@ class MessageFeedbackApi(Resource):
parser.add_argument("rating", type=str, choices=["like", "dislike", None], location="json")
args = parser.parse_args()
try:
MessageService.create_feedback(
app_model=app_model,
message_id=str(args["message_id"]),
user=current_user,
rating=args.get("rating"),
content=None,
)
except MessageNotExistsError:
message_id = str(args["message_id"])
message = db.session.query(Message).filter(Message.id == message_id, Message.app_id == app_model.id).first()
if not message:
raise NotFound("Message Not Exists.")
feedback = message.admin_feedback
if not args["rating"] and feedback:
db.session.delete(feedback)
elif args["rating"] and feedback:
feedback.rating = args["rating"]
elif not args["rating"] and not feedback:
raise ValueError("rating cannot be None when feedback not exists")
else:
feedback = MessageFeedback(
app_id=app_model.id,
conversation_id=message.conversation_id,
message_id=message.id,
rating=args["rating"],
from_source="admin",
from_account_id=current_user.id,
)
db.session.add(feedback)
db.session.commit()
return {"result": "success"}

View File

@ -23,6 +23,7 @@ from core.app.app_config.features.file_upload.manager import FileUploadConfigMan
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.file.models import File
from core.helper.trace_id_helper import get_external_trace_id
from extensions.ext_database import db
from factories import file_factory, variable_factory
from fields.workflow_fields import workflow_fields, workflow_pagination_fields
@ -185,6 +186,10 @@ class AdvancedChatDraftWorkflowRunApi(Resource):
args = parser.parse_args()
external_trace_id = get_external_trace_id(request)
if external_trace_id:
args["external_trace_id"] = external_trace_id
try:
response = AppGenerateService.generate(
app_model=app_model, user=current_user, args=args, invoke_from=InvokeFrom.DEBUGGER, streaming=True
@ -373,6 +378,10 @@ class DraftWorkflowRunApi(Resource):
parser.add_argument("files", type=list, required=False, location="json")
args = parser.parse_args()
external_trace_id = get_external_trace_id(request)
if external_trace_id:
args["external_trace_id"] = external_trace_id
try:
response = AppGenerateService.generate(
app_model=app_model,

View File

@ -163,11 +163,11 @@ class WorkflowVariableCollectionApi(Resource):
draft_var_srv = WorkflowDraftVariableService(
session=session,
)
workflow_vars = draft_var_srv.list_variables_without_values(
app_id=app_model.id,
page=args.page,
limit=args.limit,
)
workflow_vars = draft_var_srv.list_variables_without_values(
app_id=app_model.id,
page=args.page,
limit=args.limit,
)
return workflow_vars

View File

@ -1,6 +1,6 @@
import logging
from argparse import ArgumentTypeError
from typing import cast
from typing import Literal, cast
from flask import request
from flask_login import current_user
@ -758,7 +758,7 @@ class DocumentProcessingApi(DocumentResource):
@login_required
@account_initialization_required
@cloud_edition_billing_rate_limit_check("knowledge")
def patch(self, dataset_id, document_id, action):
def patch(self, dataset_id, document_id, action: Literal["pause", "resume"]):
dataset_id = str(dataset_id)
document_id = str(document_id)
document = self.get_document(dataset_id, document_id)
@ -784,8 +784,6 @@ class DocumentProcessingApi(DocumentResource):
document.paused_at = None
document.is_paused = False
db.session.commit()
else:
raise InvalidActionError()
return {"result": "success"}, 200
@ -840,7 +838,7 @@ class DocumentStatusApi(DocumentResource):
@account_initialization_required
@cloud_edition_billing_resource_check("vector_space")
@cloud_edition_billing_rate_limit_check("knowledge")
def patch(self, dataset_id, action):
def patch(self, dataset_id, action: Literal["enable", "disable", "archive", "un_archive"]):
dataset_id = str(dataset_id)
dataset = DatasetService.get_dataset(dataset_id)
if dataset is None:

View File

@ -1,30 +1,6 @@
from libs.exception import BaseHTTPException
class NoFileUploadedError(BaseHTTPException):
error_code = "no_file_uploaded"
description = "Please upload your file."
code = 400
class TooManyFilesError(BaseHTTPException):
error_code = "too_many_files"
description = "Only one file is allowed."
code = 400
class FileTooLargeError(BaseHTTPException):
error_code = "file_too_large"
description = "File size exceeded. {message}"
code = 413
class UnsupportedFileTypeError(BaseHTTPException):
error_code = "unsupported_file_type"
description = "File type not allowed."
code = 415
class DatasetNotInitializedError(BaseHTTPException):
error_code = "dataset_not_initialized"
description = "The dataset is still being initialized or indexing. Please wait a moment."

View File

@ -1,3 +1,5 @@
from typing import Literal
from flask_login import current_user
from flask_restful import Resource, marshal_with, reqparse
from werkzeug.exceptions import NotFound
@ -100,7 +102,7 @@ class DatasetMetadataBuiltInFieldActionApi(Resource):
@login_required
@account_initialization_required
@enterprise_license_required
def post(self, dataset_id, action):
def post(self, dataset_id, action: Literal["enable", "disable"]):
dataset_id_str = str(dataset_id)
dataset = DatasetService.get_dataset(dataset_id_str)
if dataset is None:

View File

@ -39,7 +39,7 @@ class UploadFileApi(Resource):
data_source_info = document.data_source_info_dict
if data_source_info and "upload_file_id" in data_source_info:
file_id = data_source_info["upload_file_id"]
upload_file = db.session.query(UploadFile).filter(UploadFile.id == file_id).first()
upload_file = db.session.query(UploadFile).where(UploadFile.id == file_id).first()
if not upload_file:
raise NotFound("UploadFile not found.")
else:

View File

@ -76,30 +76,6 @@ class EmailSendIpLimitError(BaseHTTPException):
code = 429
class FileTooLargeError(BaseHTTPException):
error_code = "file_too_large"
description = "File size exceeded. {message}"
code = 413
class UnsupportedFileTypeError(BaseHTTPException):
error_code = "unsupported_file_type"
description = "File type not allowed."
code = 415
class TooManyFilesError(BaseHTTPException):
error_code = "too_many_files"
description = "Only one file is allowed."
code = 400
class NoFileUploadedError(BaseHTTPException):
error_code = "no_file_uploaded"
description = "Please upload your file."
code = 400
class UnauthorizedAndForceLogout(BaseHTTPException):
error_code = "unauthorized_and_force_logout"
description = "Unauthorized and force logout."

View File

@ -8,7 +8,13 @@ from werkzeug.exceptions import Forbidden
import services
from configs import dify_config
from constants import DOCUMENT_EXTENSIONS
from controllers.common.errors import FilenameNotExistsError
from controllers.common.errors import (
FilenameNotExistsError,
FileTooLargeError,
NoFileUploadedError,
TooManyFilesError,
UnsupportedFileTypeError,
)
from controllers.console.wraps import (
account_initialization_required,
cloud_edition_billing_resource_check,
@ -18,13 +24,6 @@ from fields.file_fields import file_fields, upload_config_fields
from libs.login import login_required
from services.file_service import FileService
from .error import (
FileTooLargeError,
NoFileUploadedError,
TooManyFilesError,
UnsupportedFileTypeError,
)
PREVIEW_WORDS_LIMIT = 3000

View File

@ -7,18 +7,17 @@ from flask_restful import Resource, marshal_with, reqparse
import services
from controllers.common import helpers
from controllers.common.errors import RemoteFileUploadError
from controllers.common.errors import (
FileTooLargeError,
RemoteFileUploadError,
UnsupportedFileTypeError,
)
from core.file import helpers as file_helpers
from core.helper import ssrf_proxy
from fields.file_fields import file_fields_with_signed_url, remote_file_info_fields
from models.account import Account
from services.file_service import FileService
from .error import (
FileTooLargeError,
UnsupportedFileTypeError,
)
class RemoteFileInfoApi(Resource):
@marshal_with(remote_file_info_fields)

View File

@ -32,7 +32,7 @@ class VersionApi(Resource):
return result
try:
response = requests.get(check_update_url, {"current_version": args.get("current_version")})
response = requests.get(check_update_url, {"current_version": args.get("current_version")}, timeout=(3, 10))
except Exception as error:
logging.warning("Check update version error: %s.", str(error))
result["version"] = args.get("current_version")

View File

@ -862,6 +862,10 @@ class ToolProviderMCPApi(Resource):
parser.add_argument("icon_type", type=str, required=True, nullable=False, location="json")
parser.add_argument("icon_background", type=str, required=False, nullable=True, location="json", default="")
parser.add_argument("server_identifier", type=str, required=True, nullable=False, location="json")
parser.add_argument("timeout", type=float, required=False, nullable=False, location="json", default=30)
parser.add_argument(
"sse_read_timeout", type=float, required=False, nullable=False, location="json", default=300
)
args = parser.parse_args()
user = current_user
if not is_valid_url(args["server_url"]):
@ -876,6 +880,8 @@ class ToolProviderMCPApi(Resource):
icon_background=args["icon_background"],
user_id=user.id,
server_identifier=args["server_identifier"],
timeout=args["timeout"],
sse_read_timeout=args["sse_read_timeout"],
)
)
@ -891,6 +897,8 @@ class ToolProviderMCPApi(Resource):
parser.add_argument("icon_background", type=str, required=False, nullable=True, location="json")
parser.add_argument("provider_id", type=str, required=True, nullable=False, location="json")
parser.add_argument("server_identifier", type=str, required=True, nullable=False, location="json")
parser.add_argument("timeout", type=float, required=False, nullable=True, location="json")
parser.add_argument("sse_read_timeout", type=float, required=False, nullable=True, location="json")
args = parser.parse_args()
if not is_valid_url(args["server_url"]):
if "[__HIDDEN__]" in args["server_url"]:
@ -906,6 +914,8 @@ class ToolProviderMCPApi(Resource):
icon_type=args["icon_type"],
icon_background=args["icon_background"],
server_identifier=args["server_identifier"],
timeout=args.get("timeout"),
sse_read_timeout=args.get("sse_read_timeout"),
)
return {"result": "success"}

View File

@ -7,15 +7,15 @@ from sqlalchemy import select
from werkzeug.exceptions import Unauthorized
import services
from controllers.common.errors import FilenameNotExistsError
from controllers.console import api
from controllers.console.admin import admin_required
from controllers.console.datasets.error import (
from controllers.common.errors import (
FilenameNotExistsError,
FileTooLargeError,
NoFileUploadedError,
TooManyFilesError,
UnsupportedFileTypeError,
)
from controllers.console import api
from controllers.console.admin import admin_required
from controllers.console.error import AccountNotLinkTenantError
from controllers.console.wraps import (
account_initialization_required,

View File

@ -1,7 +0,0 @@
from libs.exception import BaseHTTPException
class UnsupportedFileTypeError(BaseHTTPException):
error_code = "unsupported_file_type"
description = "File type not allowed."
code = 415

View File

@ -5,8 +5,8 @@ from flask_restful import Resource, reqparse
from werkzeug.exceptions import NotFound
import services
from controllers.common.errors import UnsupportedFileTypeError
from controllers.files import api
from controllers.files.error import UnsupportedFileTypeError
from services.account_service import TenantService
from services.file_service import FileService

View File

@ -4,8 +4,8 @@ from flask import Response
from flask_restful import Resource, reqparse
from werkzeug.exceptions import Forbidden, NotFound
from controllers.common.errors import UnsupportedFileTypeError
from controllers.files import api
from controllers.files.error import UnsupportedFileTypeError
from core.tools.signature import verify_tool_file_signature
from core.tools.tool_file_manager import ToolFileManager
from models import db as global_db

View File

@ -5,11 +5,13 @@ from flask_restful import Resource, marshal_with
from werkzeug.exceptions import Forbidden
import services
from controllers.common.errors import (
FileTooLargeError,
UnsupportedFileTypeError,
)
from controllers.console.wraps import setup_required
from controllers.files import api
from controllers.files.error import UnsupportedFileTypeError
from controllers.inner_api.plugin.wraps import get_user
from controllers.service_api.app.error import FileTooLargeError
from core.file.helpers import verify_plugin_file_signature
from core.tools.tool_file_manager import ToolFileManager
from fields.file_fields import file_fields

View File

@ -1,3 +1,5 @@
from typing import Literal
from flask import request
from flask_restful import Resource, marshal, marshal_with, reqparse
from werkzeug.exceptions import Forbidden
@ -15,7 +17,7 @@ from services.annotation_service import AppAnnotationService
class AnnotationReplyActionApi(Resource):
@validate_app_token
def post(self, app_model: App, action):
def post(self, app_model: App, action: Literal["enable", "disable"]):
parser = reqparse.RequestParser()
parser.add_argument("score_threshold", required=True, type=float, location="json")
parser.add_argument("embedding_provider_name", required=True, type=str, location="json")
@ -25,8 +27,6 @@ class AnnotationReplyActionApi(Resource):
result = AppAnnotationService.enable_app_annotation(args, app_model.id)
elif action == "disable":
result = AppAnnotationService.disable_app_annotation(app_model.id)
else:
raise ValueError("Unsupported annotation reply action")
return result, 200

View File

@ -1,5 +1,3 @@
import json
from flask_restful import Resource, marshal_with, reqparse
from flask_restful.inputs import int_range
from sqlalchemy.orm import Session
@ -136,12 +134,15 @@ class ConversationVariableDetailApi(Resource):
variable_id = str(variable_id)
parser = reqparse.RequestParser()
parser.add_argument("value", required=True, location="json")
# using lambda is for passing the already-typed value without modification
# if no lambda, it will be converted to string
# the string cannot be converted using json.loads
parser.add_argument("value", required=True, location="json", type=lambda x: x)
args = parser.parse_args()
try:
return ConversationService.update_conversation_variable(
app_model, conversation_id, variable_id, end_user, json.loads(args["value"])
app_model, conversation_id, variable_id, end_user, args["value"]
)
except services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")

View File

@ -85,30 +85,6 @@ class ProviderNotSupportSpeechToTextError(BaseHTTPException):
code = 400
class NoFileUploadedError(BaseHTTPException):
error_code = "no_file_uploaded"
description = "Please upload your file."
code = 400
class TooManyFilesError(BaseHTTPException):
error_code = "too_many_files"
description = "Only one file is allowed."
code = 400
class FileTooLargeError(BaseHTTPException):
error_code = "file_too_large"
description = "File size exceeded. {message}"
code = 413
class UnsupportedFileTypeError(BaseHTTPException):
error_code = "unsupported_file_type"
description = "File type not allowed."
code = 415
class FileNotFoundError(BaseHTTPException):
error_code = "file_not_found"
description = "The requested file was not found."

View File

@ -2,14 +2,14 @@ from flask import request
from flask_restful import Resource, marshal_with
import services
from controllers.common.errors import FilenameNotExistsError
from controllers.service_api import api
from controllers.service_api.app.error import (
from controllers.common.errors import (
FilenameNotExistsError,
FileTooLargeError,
NoFileUploadedError,
TooManyFilesError,
UnsupportedFileTypeError,
)
from controllers.service_api import api
from controllers.service_api.wraps import FetchUserArg, WhereisUserArg, validate_app_token
from fields.file_fields import file_fields
from models.model import App, EndUser

View File

@ -1,3 +1,5 @@
from typing import Literal
from flask import request
from flask_restful import marshal, marshal_with, reqparse
from werkzeug.exceptions import Forbidden, NotFound
@ -358,14 +360,14 @@ class DatasetApi(DatasetApiResource):
class DocumentStatusApi(DatasetApiResource):
"""Resource for batch document status operations."""
def patch(self, tenant_id, dataset_id, action):
def patch(self, tenant_id, dataset_id, action: Literal["enable", "disable", "archive", "un_archive"]):
"""
Batch update document status.
Args:
tenant_id: tenant id
dataset_id: dataset id
action: action to perform (enable, disable, archive, un_archive)
action: action to perform (Literal["enable", "disable", "archive", "un_archive"])
Returns:
dict: A dictionary with a key 'result' and a value 'success'

View File

@ -6,15 +6,15 @@ from sqlalchemy import desc, select
from werkzeug.exceptions import Forbidden, NotFound
import services
from controllers.common.errors import FilenameNotExistsError
from controllers.service_api import api
from controllers.service_api.app.error import (
from controllers.common.errors import (
FilenameNotExistsError,
FileTooLargeError,
NoFileUploadedError,
ProviderNotInitializeError,
TooManyFilesError,
UnsupportedFileTypeError,
)
from controllers.service_api import api
from controllers.service_api.app.error import ProviderNotInitializeError
from controllers.service_api.dataset.error import (
ArchivedDocumentImmutableError,
DocumentIndexingError,

View File

@ -1,30 +1,6 @@
from libs.exception import BaseHTTPException
class NoFileUploadedError(BaseHTTPException):
error_code = "no_file_uploaded"
description = "Please upload your file."
code = 400
class TooManyFilesError(BaseHTTPException):
error_code = "too_many_files"
description = "Only one file is allowed."
code = 400
class FileTooLargeError(BaseHTTPException):
error_code = "file_too_large"
description = "File size exceeded. {message}"
code = 413
class UnsupportedFileTypeError(BaseHTTPException):
error_code = "unsupported_file_type"
description = "File type not allowed."
code = 415
class DatasetNotInitializedError(BaseHTTPException):
error_code = "dataset_not_initialized"
description = "The dataset is still being initialized or indexing. Please wait a moment."

View File

@ -1,3 +1,5 @@
from typing import Literal
from flask_login import current_user # type: ignore
from flask_restful import marshal, reqparse
from werkzeug.exceptions import NotFound
@ -77,7 +79,7 @@ class DatasetMetadataBuiltInFieldServiceApi(DatasetApiResource):
class DatasetMetadataBuiltInFieldActionServiceApi(DatasetApiResource):
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id, dataset_id, action):
def post(self, tenant_id, dataset_id, action: Literal["enable", "disable"]):
dataset_id_str = str(dataset_id)
dataset = DatasetService.get_dataset(dataset_id_str)
if dataset is None:

View File

@ -97,30 +97,6 @@ class ProviderNotSupportSpeechToTextError(BaseHTTPException):
code = 400
class NoFileUploadedError(BaseHTTPException):
error_code = "no_file_uploaded"
description = "Please upload your file."
code = 400
class TooManyFilesError(BaseHTTPException):
error_code = "too_many_files"
description = "Only one file is allowed."
code = 400
class FileTooLargeError(BaseHTTPException):
error_code = "file_too_large"
description = "File size exceeded. {message}"
code = 413
class UnsupportedFileTypeError(BaseHTTPException):
error_code = "unsupported_file_type"
description = "File type not allowed."
code = 415
class WebAppAuthRequiredError(BaseHTTPException):
error_code = "web_sso_auth_required"
description = "Web app authentication required."

View File

@ -2,8 +2,13 @@ from flask import request
from flask_restful import marshal_with
import services
from controllers.common.errors import FilenameNotExistsError
from controllers.web.error import FileTooLargeError, NoFileUploadedError, TooManyFilesError, UnsupportedFileTypeError
from controllers.common.errors import (
FilenameNotExistsError,
FileTooLargeError,
NoFileUploadedError,
TooManyFilesError,
UnsupportedFileTypeError,
)
from controllers.web.wraps import WebApiResource
from fields.file_fields import file_fields
from services.file_service import FileService

View File

@ -5,15 +5,17 @@ from flask_restful import marshal_with, reqparse
import services
from controllers.common import helpers
from controllers.common.errors import RemoteFileUploadError
from controllers.common.errors import (
FileTooLargeError,
RemoteFileUploadError,
UnsupportedFileTypeError,
)
from controllers.web.wraps import WebApiResource
from core.file import helpers as file_helpers
from core.helper import ssrf_proxy
from fields.file_fields import file_fields_with_signed_url, remote_file_info_fields
from services.file_service import FileService
from .error import FileTooLargeError, UnsupportedFileTypeError
class RemoteFileInfoApi(WebApiResource):
@marshal_with(remote_file_info_fields)

View File

@ -74,6 +74,7 @@ from core.workflow.system_variable import SystemVariable
from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager
from events.message_event import message_was_created
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models import Conversation, EndUser, Message, MessageFile
from models.account import Account
from models.enums import CreatorUserRole
@ -568,7 +569,7 @@ class AdvancedChatAppGenerateTaskPipeline:
)
yield workflow_finish_resp
self._base_task_pipeline._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
self._base_task_pipeline.queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
def _handle_workflow_partial_success_event(
self,
@ -600,7 +601,7 @@ class AdvancedChatAppGenerateTaskPipeline:
)
yield workflow_finish_resp
self._base_task_pipeline._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
self._base_task_pipeline.queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
def _handle_workflow_failed_event(
self,
@ -845,7 +846,7 @@ class AdvancedChatAppGenerateTaskPipeline:
# Initialize graph runtime state
graph_runtime_state: Optional[GraphRuntimeState] = None
for queue_message in self._base_task_pipeline._queue_manager.listen():
for queue_message in self._base_task_pipeline.queue_manager.listen():
event = queue_message.event
match event:
@ -896,6 +897,7 @@ class AdvancedChatAppGenerateTaskPipeline:
def _save_message(self, *, session: Session, graph_runtime_state: Optional[GraphRuntimeState] = None) -> None:
message = self._get_message(session=session)
message.answer = self._task_state.answer
message.updated_at = naive_utc_now()
message.provider_response_latency = time.perf_counter() - self._base_task_pipeline._start_at
message.message_metadata = self._task_state.metadata.model_dump_json()
message_files = [
@ -959,11 +961,11 @@ class AdvancedChatAppGenerateTaskPipeline:
if self._base_task_pipeline._output_moderation_handler:
if self._base_task_pipeline._output_moderation_handler.should_direct_output():
self._task_state.answer = self._base_task_pipeline._output_moderation_handler.get_final_output()
self._base_task_pipeline._queue_manager.publish(
self._base_task_pipeline.queue_manager.publish(
QueueTextChunkEvent(text=self._task_state.answer), PublishFrom.TASK_PIPELINE
)
self._base_task_pipeline._queue_manager.publish(
self._base_task_pipeline.queue_manager.publish(
QueueStopEvent(stopped_by=QueueStopEvent.StopBy.OUTPUT_MODERATION), PublishFrom.TASK_PIPELINE
)
return True

View File

@ -140,7 +140,9 @@ class ChatAppGenerator(MessageBasedAppGenerator):
)
# get tracing instance
trace_manager = TraceQueueManager(app_id=app_model.id)
trace_manager = TraceQueueManager(
app_id=app_model.id, user_id=user.id if isinstance(user, Account) else user.session_id
)
# init application generate entity
application_generate_entity = ChatAppGenerateEntity(

View File

@ -124,7 +124,9 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
)
# get tracing instance
trace_manager = TraceQueueManager(app_model.id)
trace_manager = TraceQueueManager(
app_id=app_model.id, user_id=user.id if isinstance(user, Account) else user.session_id
)
# init application generate entity
application_generate_entity = CompletionAppGenerateEntity(

View File

@ -6,7 +6,6 @@ from core.app.entities.queue_entities import (
MessageQueueMessage,
QueueAdvancedChatMessageEndEvent,
QueueErrorEvent,
QueueMessage,
QueueMessageEndEvent,
QueueStopEvent,
)
@ -22,15 +21,6 @@ class MessageBasedAppQueueManager(AppQueueManager):
self._app_mode = app_mode
self._message_id = str(message_id)
def construct_queue_message(self, event: AppQueueEvent) -> QueueMessage:
return MessageQueueMessage(
task_id=self._task_id,
message_id=self._message_id,
conversation_id=self._conversation_id,
app_mode=self._app_mode,
event=event,
)
def _publish(self, event: AppQueueEvent, pub_from: PublishFrom) -> None:
"""
Publish event to queue

View File

@ -711,7 +711,7 @@ class WorkflowAppGenerateTaskPipeline:
# Initialize graph runtime state
graph_runtime_state = None
for queue_message in self._base_task_pipeline._queue_manager.listen():
for queue_message in self._base_task_pipeline.queue_manager.listen():
event = queue_message.event
match event:

View File

@ -37,7 +37,7 @@ class BasedGenerateTaskPipeline:
stream: bool,
) -> None:
self._application_generate_entity = application_generate_entity
self._queue_manager = queue_manager
self.queue_manager = queue_manager
self._start_at = time.perf_counter()
self._output_moderation_handler = self._init_output_moderation()
self._stream = stream
@ -113,7 +113,7 @@ class BasedGenerateTaskPipeline:
tenant_id=app_config.tenant_id,
app_id=app_config.app_id,
rule=ModerationRule(type=sensitive_word_avoidance.type, config=sensitive_word_avoidance.config),
queue_manager=self._queue_manager,
queue_manager=self.queue_manager,
)
return None

View File

@ -57,6 +57,7 @@ from core.prompt.utils.prompt_message_util import PromptMessageUtil
from core.prompt.utils.prompt_template_parser import PromptTemplateParser
from events.message_event import message_was_created
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models.model import AppMode, Conversation, Message, MessageAgentThought
logger = logging.getLogger(__name__)
@ -257,7 +258,7 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
Process stream response.
:return:
"""
for message in self._queue_manager.listen():
for message in self.queue_manager.listen():
if publisher:
publisher.publish(message)
event = message.event
@ -389,6 +390,7 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
if llm_result.message.content
else ""
)
message.updated_at = naive_utc_now()
message.answer_tokens = usage.completion_tokens
message.answer_unit_price = usage.completion_unit_price
message.answer_price_unit = usage.completion_price_unit
@ -499,7 +501,7 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
if self._output_moderation_handler.should_direct_output():
# stop subscribe new token when output moderation should direct output
self._task_state.llm_result.message.content = self._output_moderation_handler.get_final_output()
self._queue_manager.publish(
self.queue_manager.publish(
QueueLLMChunkEvent(
chunk=LLMResultChunk(
model=self._task_state.llm_result.model,
@ -513,7 +515,7 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
PublishFrom.TASK_PIPELINE,
)
self._queue_manager.publish(
self.queue_manager.publish(
QueueStopEvent(stopped_by=QueueStopEvent.StopBy.OUTPUT_MODERATION), PublishFrom.TASK_PIPELINE
)
return True

View File

@ -181,7 +181,7 @@ class MessageCycleManager:
:param message_id: message id
:return:
"""
message_file = db.session.query(MessageFile).filter(MessageFile.id == message_id).first()
message_file = db.session.query(MessageFile).where(MessageFile.id == message_id).first()
event_type = StreamEvent.MESSAGE_FILE if message_file else StreamEvent.MESSAGE
return MessageStreamResponse(

View File

@ -5,7 +5,7 @@ from base64 import b64encode
from collections.abc import Mapping
from typing import Any
from core.variables.utils import SegmentJSONEncoder
from core.variables.utils import dumps_with_segments
class TemplateTransformer(ABC):
@ -93,7 +93,7 @@ class TemplateTransformer(ABC):
@classmethod
def serialize_inputs(cls, inputs: Mapping[str, Any]) -> str:
inputs_json_str = json.dumps(inputs, ensure_ascii=False, cls=SegmentJSONEncoder).encode()
inputs_json_str = dumps_with_segments(inputs, ensure_ascii=False).encode()
input_base64_encoded = b64encode(inputs_json_str).decode("utf-8")
return input_base64_encoded

View File

@ -16,15 +16,33 @@ def get_external_trace_id(request: Any) -> Optional[str]:
"""
Retrieve the trace_id from the request.
Priority: header ('X-Trace-Id'), then parameters, then JSON body. Returns None if not provided or invalid.
Priority:
1. header ('X-Trace-Id')
2. parameters
3. JSON body
4. Current OpenTelemetry context (if enabled)
5. OpenTelemetry traceparent header (if present and valid)
Returns None if no valid trace_id is provided.
"""
trace_id = request.headers.get("X-Trace-Id")
if not trace_id:
trace_id = request.args.get("trace_id")
if not trace_id and getattr(request, "is_json", False):
json_data = getattr(request, "json", None)
if json_data:
trace_id = json_data.get("trace_id")
if not trace_id:
trace_id = get_trace_id_from_otel_context()
if not trace_id:
traceparent = request.headers.get("traceparent")
if traceparent:
trace_id = parse_traceparent_header(traceparent)
if isinstance(trace_id, str) and is_valid_trace_id(trace_id):
return trace_id
return None
@ -40,3 +58,49 @@ def extract_external_trace_id_from_args(args: Mapping[str, Any]) -> dict:
if trace_id:
return {"external_trace_id": trace_id}
return {}
def get_trace_id_from_otel_context() -> Optional[str]:
"""
Retrieve the current trace ID from the active OpenTelemetry trace context.
Returns None if:
1. OpenTelemetry SDK is not installed or enabled.
2. There is no active span or trace context.
"""
try:
from opentelemetry.trace import SpanContext, get_current_span
from opentelemetry.trace.span import INVALID_TRACE_ID
span = get_current_span()
if not span:
return None
span_context: SpanContext = span.get_span_context()
if not span_context or span_context.trace_id == INVALID_TRACE_ID:
return None
trace_id_hex = f"{span_context.trace_id:032x}"
return trace_id_hex
except Exception:
return None
def parse_traceparent_header(traceparent: str) -> Optional[str]:
"""
Parse the `traceparent` header to extract the trace_id.
Expected format:
'version-trace_id-span_id-flags'
Reference:
W3C Trace Context Specification: https://www.w3.org/TR/trace-context/
"""
try:
parts = traceparent.split("-")
if len(parts) == 4 and len(parts[1]) == 32:
return parts[1]
except Exception:
pass
return None

View File

@ -1,6 +1,7 @@
import json
import logging
import re
from collections.abc import Sequence
from typing import Optional, cast
import json_repair
@ -11,6 +12,8 @@ from core.llm_generator.prompts import (
CONVERSATION_TITLE_PROMPT,
GENERATOR_QA_PROMPT,
JAVASCRIPT_CODE_GENERATOR_PROMPT_TEMPLATE,
LLM_MODIFY_CODE_SYSTEM,
LLM_MODIFY_PROMPT_SYSTEM,
PYTHON_CODE_GENERATOR_PROMPT_TEMPLATE,
SYSTEM_STRUCTURED_OUTPUT_GENERATE,
WORKFLOW_RULE_CONFIG_PROMPT_GENERATE_TEMPLATE,
@ -24,6 +27,9 @@ from core.ops.entities.trace_entity import TraceTaskName
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.ops.utils import measure_time
from core.prompt.utils.prompt_template_parser import PromptTemplateParser
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey
from core.workflow.graph_engine.entities.event import AgentLogEvent
from models import App, Message, WorkflowNodeExecutionModel, db
class LLMGenerator:
@ -388,3 +394,181 @@ class LLMGenerator:
except Exception as e:
logging.exception("Failed to invoke LLM model, model: %s", model_config.get("name"))
return {"output": "", "error": f"An unexpected error occurred: {str(e)}"}
@staticmethod
def instruction_modify_legacy(
tenant_id: str, flow_id: str, current: str, instruction: str, model_config: dict, ideal_output: str | None
) -> dict:
app: App | None = db.session.query(App).where(App.id == flow_id).first()
last_run: Message | None = (
db.session.query(Message).where(Message.app_id == flow_id).order_by(Message.created_at.desc()).first()
)
if not last_run:
return LLMGenerator.__instruction_modify_common(
tenant_id=tenant_id,
model_config=model_config,
last_run=None,
current=current,
error_message="",
instruction=instruction,
node_type="llm",
ideal_output=ideal_output,
)
last_run_dict = {
"query": last_run.query,
"answer": last_run.answer,
"error": last_run.error,
}
return LLMGenerator.__instruction_modify_common(
tenant_id=tenant_id,
model_config=model_config,
last_run=last_run_dict,
current=current,
error_message=str(last_run.error),
instruction=instruction,
node_type="llm",
ideal_output=ideal_output,
)
@staticmethod
def instruction_modify_workflow(
tenant_id: str,
flow_id: str,
node_id: str,
current: str,
instruction: str,
model_config: dict,
ideal_output: str | None,
) -> dict:
from services.workflow_service import WorkflowService
app: App | None = db.session.query(App).where(App.id == flow_id).first()
if not app:
raise ValueError("App not found.")
workflow = WorkflowService().get_draft_workflow(app_model=app)
if not workflow:
raise ValueError("Workflow not found for the given app model.")
last_run = WorkflowService().get_node_last_run(app_model=app, workflow=workflow, node_id=node_id)
try:
node_type = cast(WorkflowNodeExecutionModel, last_run).node_type
except Exception:
try:
node_type = [it for it in workflow.graph_dict["graph"]["nodes"] if it["id"] == node_id][0]["data"][
"type"
]
except Exception:
node_type = "llm"
if not last_run: # Node is not executed yet
return LLMGenerator.__instruction_modify_common(
tenant_id=tenant_id,
model_config=model_config,
last_run=None,
current=current,
error_message="",
instruction=instruction,
node_type=node_type,
ideal_output=ideal_output,
)
def agent_log_of(node_execution: WorkflowNodeExecutionModel) -> Sequence:
raw_agent_log = node_execution.execution_metadata_dict.get(WorkflowNodeExecutionMetadataKey.AGENT_LOG)
if not raw_agent_log:
return []
parsed: Sequence[AgentLogEvent] = json.loads(raw_agent_log)
def dict_of_event(event: AgentLogEvent) -> dict:
return {
"status": event.status,
"error": event.error,
"data": event.data,
}
return [dict_of_event(event) for event in parsed]
last_run_dict = {
"inputs": last_run.inputs_dict,
"status": last_run.status,
"error": last_run.error,
"agent_log": agent_log_of(last_run),
}
return LLMGenerator.__instruction_modify_common(
tenant_id=tenant_id,
model_config=model_config,
last_run=last_run_dict,
current=current,
error_message=last_run.error,
instruction=instruction,
node_type=last_run.node_type,
ideal_output=ideal_output,
)
@staticmethod
def __instruction_modify_common(
tenant_id: str,
model_config: dict,
last_run: dict | None,
current: str | None,
error_message: str | None,
instruction: str,
node_type: str,
ideal_output: str | None,
) -> dict:
LAST_RUN = "{{#last_run#}}"
CURRENT = "{{#current#}}"
ERROR_MESSAGE = "{{#error_message#}}"
injected_instruction = instruction
if LAST_RUN in injected_instruction:
injected_instruction = injected_instruction.replace(LAST_RUN, json.dumps(last_run))
if CURRENT in injected_instruction:
injected_instruction = injected_instruction.replace(CURRENT, current or "null")
if ERROR_MESSAGE in injected_instruction:
injected_instruction = injected_instruction.replace(ERROR_MESSAGE, error_message or "null")
model_instance = ModelManager().get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
provider=model_config.get("provider", ""),
model=model_config.get("name", ""),
)
match node_type:
case "llm", "agent":
system_prompt = LLM_MODIFY_PROMPT_SYSTEM
case "code":
system_prompt = LLM_MODIFY_CODE_SYSTEM
case _:
system_prompt = LLM_MODIFY_PROMPT_SYSTEM
prompt_messages = [
SystemPromptMessage(content=system_prompt),
UserPromptMessage(
content=json.dumps(
{
"current": current,
"last_run": last_run,
"instruction": injected_instruction,
"ideal_output": ideal_output,
}
)
),
]
model_parameters = {"temperature": 0.4}
try:
response = cast(
LLMResult,
model_instance.invoke_llm(
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
),
)
generated_raw = cast(str, response.message.content)
first_brace = generated_raw.find("{")
last_brace = generated_raw.rfind("}")
return {**json.loads(generated_raw[first_brace : last_brace + 1])}
except InvokeError as e:
error = str(e)
return {"error": f"Failed to generate code. Error: {error}"}
except Exception as e:
logging.exception("Failed to invoke LLM model, model: " + json.dumps(model_config.get("name")), exc_info=e)
return {"error": f"An unexpected error occurred: {str(e)}"}

View File

@ -309,3 +309,116 @@ eg:
Here is the JSON schema:
{{schema}}
""" # noqa: E501
LLM_MODIFY_PROMPT_SYSTEM = """
Both your input and output should be in JSON format.
! Below is the schema for input content !
{
"type": "object",
"description": "The user is trying to process some content with a prompt, but the output is not as expected. They hope to achieve their goal by modifying the prompt.",
"properties": {
"current": {
"type": "string",
"description": "The prompt before modification, where placeholders {{}} will be replaced with actual values for the large language model. The content in the placeholders should not be changed."
},
"last_run": {
"type": "object",
"description": "The output result from the large language model after receiving the prompt.",
},
"instruction": {
"type": "string",
"description": "User's instruction to edit the current prompt"
},
"ideal_output": {
"type": "string",
"description": "The ideal output that the user expects from the large language model after modifying the prompt. You should compare the last output with the ideal output and make changes to the prompt to achieve the goal."
}
}
}
! Above is the schema for input content !
! Below is the schema for output content !
{
"type": "object",
"description": "Your feedback to the user after they provide modification suggestions.",
"properties": {
"modified": {
"type": "string",
"description": "Your modified prompt. You should change the original prompt as little as possible to achieve the goal. Keep the language of prompt if not asked to change"
},
"message": {
"type": "string",
"description": "Your feedback to the user, in the user's language, explaining what you did and your thought process in text, providing sufficient emotional value to the user."
}
},
"required": [
"modified",
"message"
]
}
! Above is the schema for output content !
Your output must strictly follow the schema format, do not output any content outside of the JSON body.
""" # noqa: E501
LLM_MODIFY_CODE_SYSTEM = """
Both your input and output should be in JSON format.
! Below is the schema for input content !
{
"type": "object",
"description": "The user is trying to process some data with a code snippet, but the result is not as expected. They hope to achieve their goal by modifying the code.",
"properties": {
"current": {
"type": "string",
"description": "The code before modification."
},
"last_run": {
"type": "object",
"description": "The result of the code.",
},
"message": {
"type": "string",
"description": "User's instruction to edit the current code"
}
}
}
! Above is the schema for input content !
! Below is the schema for output content !
{
"type": "object",
"description": "Your feedback to the user after they provide modification suggestions.",
"properties": {
"modified": {
"type": "string",
"description": "Your modified code. You should change the original code as little as possible to achieve the goal. Keep the programming language of code if not asked to change"
},
"message": {
"type": "string",
"description": "Your feedback to the user, in the user's language, explaining what you did and your thought process in text, providing sufficient emotional value to the user."
}
},
"required": [
"modified",
"message"
]
}
! Above is the schema for output content !
When you are modifying the code, you should remember:
- Do not use print, this not work in dify sandbox.
- Do not try dangerous call like deleting files. It's PROHIBITED.
- Do not use any library that is not built-in in with Python.
- Get inputs from the parameters of the function and have explicit type annotations.
- Write proper imports at the top of the code.
- Use return statement to return the result.
- You should return a `dict`. If you need to return a `result: str`, you should `return {"result": result}`.
Your output must strictly follow the schema format, do not output any content outside of the JSON body.
""" # noqa: E501
INSTRUCTION_GENERATE_TEMPLATE_PROMPT = """The output of this prompt is not as expected: {{#last_run#}}.
You should edit the prompt according to the IDEAL OUTPUT."""
INSTRUCTION_GENERATE_TEMPLATE_CODE = """Please fix the errors in the {{#error_message#}}."""

View File

@ -10,8 +10,6 @@ from core.mcp.types import (
from models.tools import MCPToolProvider
from services.tools.mcp_tools_manage_service import MCPToolManageService
LATEST_PROTOCOL_VERSION = "1.0"
class OAuthClientProvider:
mcp_provider: MCPToolProvider

View File

@ -7,6 +7,7 @@ from typing import Any, TypeAlias, final
from urllib.parse import urljoin, urlparse
import httpx
from httpx_sse import EventSource, ServerSentEvent
from sseclient import SSEClient
from core.mcp import types
@ -37,11 +38,6 @@ WriteQueue: TypeAlias = queue.Queue[SessionMessage | Exception | None]
StatusQueue: TypeAlias = queue.Queue[_StatusReady | _StatusError]
def remove_request_params(url: str) -> str:
"""Remove request parameters from URL, keeping only the path."""
return urljoin(url, urlparse(url).path)
class SSETransport:
"""SSE client transport implementation."""
@ -114,7 +110,7 @@ class SSETransport:
logger.exception("Error parsing server message")
read_queue.put(exc)
def _handle_sse_event(self, sse, read_queue: ReadQueue, status_queue: StatusQueue) -> None:
def _handle_sse_event(self, sse: ServerSentEvent, read_queue: ReadQueue, status_queue: StatusQueue) -> None:
"""Handle a single SSE event.
Args:
@ -130,7 +126,7 @@ class SSETransport:
case _:
logger.warning("Unknown SSE event: %s", sse.event)
def sse_reader(self, event_source, read_queue: ReadQueue, status_queue: StatusQueue) -> None:
def sse_reader(self, event_source: EventSource, read_queue: ReadQueue, status_queue: StatusQueue) -> None:
"""Read and process SSE events.
Args:
@ -225,7 +221,7 @@ class SSETransport:
self,
executor: ThreadPoolExecutor,
client: httpx.Client,
event_source,
event_source: EventSource,
) -> tuple[ReadQueue, WriteQueue]:
"""Establish connection and start worker threads.
@ -327,7 +323,7 @@ def send_message(http_client: httpx.Client, endpoint_url: str, session_message:
)
response.raise_for_status()
logger.debug("Client message sent successfully: %s", response.status_code)
except Exception as exc:
except Exception:
logger.exception("Error sending message")
raise

View File

@ -55,14 +55,10 @@ DEFAULT_QUEUE_READ_TIMEOUT = 3
class StreamableHTTPError(Exception):
"""Base exception for StreamableHTTP transport errors."""
pass
class ResumptionError(StreamableHTTPError):
"""Raised when resumption request is invalid."""
pass
@dataclass
class RequestContext:
@ -74,7 +70,7 @@ class RequestContext:
session_message: SessionMessage
metadata: ClientMessageMetadata | None
server_to_client_queue: ServerToClientQueue # Renamed for clarity
sse_read_timeout: timedelta
sse_read_timeout: float
class StreamableHTTPTransport:
@ -84,8 +80,8 @@ class StreamableHTTPTransport:
self,
url: str,
headers: dict[str, Any] | None = None,
timeout: timedelta = timedelta(seconds=30),
sse_read_timeout: timedelta = timedelta(seconds=60 * 5),
timeout: float | timedelta = 30,
sse_read_timeout: float | timedelta = 60 * 5,
) -> None:
"""Initialize the StreamableHTTP transport.
@ -97,8 +93,10 @@ class StreamableHTTPTransport:
"""
self.url = url
self.headers = headers or {}
self.timeout = timeout
self.sse_read_timeout = sse_read_timeout
self.timeout = timeout.total_seconds() if isinstance(timeout, timedelta) else timeout
self.sse_read_timeout = (
sse_read_timeout.total_seconds() if isinstance(sse_read_timeout, timedelta) else sse_read_timeout
)
self.session_id: str | None = None
self.request_headers = {
ACCEPT: f"{JSON}, {SSE}",
@ -186,7 +184,7 @@ class StreamableHTTPTransport:
with ssrf_proxy_sse_connect(
self.url,
headers=headers,
timeout=httpx.Timeout(self.timeout.seconds, read=self.sse_read_timeout.seconds),
timeout=httpx.Timeout(self.timeout, read=self.sse_read_timeout),
client=client,
method="GET",
) as event_source:
@ -215,7 +213,7 @@ class StreamableHTTPTransport:
with ssrf_proxy_sse_connect(
self.url,
headers=headers,
timeout=httpx.Timeout(self.timeout.seconds, read=ctx.sse_read_timeout.seconds),
timeout=httpx.Timeout(self.timeout, read=self.sse_read_timeout),
client=ctx.client,
method="GET",
) as event_source:
@ -402,8 +400,8 @@ class StreamableHTTPTransport:
def streamablehttp_client(
url: str,
headers: dict[str, Any] | None = None,
timeout: timedelta = timedelta(seconds=30),
sse_read_timeout: timedelta = timedelta(seconds=60 * 5),
timeout: float | timedelta = 30,
sse_read_timeout: float | timedelta = 60 * 5,
terminate_on_close: bool = True,
) -> Generator[
tuple[
@ -436,7 +434,7 @@ def streamablehttp_client(
try:
with create_ssrf_proxy_mcp_http_client(
headers=transport.request_headers,
timeout=httpx.Timeout(transport.timeout.seconds, read=transport.sse_read_timeout.seconds),
timeout=httpx.Timeout(transport.timeout, read=transport.sse_read_timeout),
) as client:
# Define callbacks that need access to thread pool
def start_get_stream() -> None:

View File

@ -23,12 +23,18 @@ class MCPClient:
authed: bool = True,
authorization_code: Optional[str] = None,
for_list: bool = False,
headers: Optional[dict[str, str]] = None,
timeout: Optional[float] = None,
sse_read_timeout: Optional[float] = None,
):
# Initialize info
self.provider_id = provider_id
self.tenant_id = tenant_id
self.client_type = "streamable"
self.server_url = server_url
self.headers = headers or {}
self.timeout = timeout
self.sse_read_timeout = sse_read_timeout
# Authentication info
self.authed = authed
@ -43,7 +49,7 @@ class MCPClient:
self._session: Optional[ClientSession] = None
self._streams_context: Optional[AbstractContextManager[Any]] = None
self._session_context: Optional[ClientSession] = None
self.exit_stack = ExitStack()
self._exit_stack = ExitStack()
# Whether the client has been initialized
self._initialized = False
@ -90,21 +96,26 @@ class MCPClient:
headers = (
{"Authorization": f"{self.token.token_type.capitalize()} {self.token.access_token}"}
if self.authed and self.token
else {}
else self.headers
)
self._streams_context = client_factory(
url=self.server_url,
headers=headers,
timeout=self.timeout,
sse_read_timeout=self.sse_read_timeout,
)
self._streams_context = client_factory(url=self.server_url, headers=headers)
if not self._streams_context:
raise MCPConnectionError("Failed to create connection context")
# Use exit_stack to manage context managers properly
if method_name == "mcp":
read_stream, write_stream, _ = self.exit_stack.enter_context(self._streams_context)
read_stream, write_stream, _ = self._exit_stack.enter_context(self._streams_context)
streams = (read_stream, write_stream)
else: # sse_client
streams = self.exit_stack.enter_context(self._streams_context)
streams = self._exit_stack.enter_context(self._streams_context)
self._session_context = ClientSession(*streams)
self._session = self.exit_stack.enter_context(self._session_context)
self._session = self._exit_stack.enter_context(self._session_context)
session = cast(ClientSession, self._session)
session.initialize()
return
@ -120,9 +131,6 @@ class MCPClient:
if first_try:
return self.connect_server(client_factory, method_name, first_try=False)
except MCPConnectionError:
raise
def list_tools(self) -> list[Tool]:
"""Connect to an MCP server running with SSE transport"""
# List available tools to verify connection
@ -142,7 +150,7 @@ class MCPClient:
"""Clean up resources"""
try:
# ExitStack will handle proper cleanup of all managed context managers
self.exit_stack.close()
self._exit_stack.close()
except Exception as e:
logging.exception("Error during cleanup")
raise ValueError(f"Error during cleanup: {e}")

View File

@ -16,13 +16,14 @@ from extensions.ext_database import db
from models.model import App, AppMCPServer, AppMode, EndUser
from services.app_generate_service import AppGenerateService
"""
Apply to MCP HTTP streamable server with stateless http
"""
logger = logging.getLogger(__name__)
class MCPServerStreamableHTTPRequestHandler:
"""
Apply to MCP HTTP streamable server with stateless http
"""
def __init__(
self, app: App, request: types.ClientRequest | types.ClientNotification, user_input_form: list[VariableEntity]
):

View File

@ -2,7 +2,6 @@ import logging
import queue
from collections.abc import Callable
from concurrent.futures import Future, ThreadPoolExecutor, TimeoutError
from contextlib import ExitStack
from datetime import timedelta
from types import TracebackType
from typing import Any, Generic, Self, TypeVar
@ -170,7 +169,6 @@ class BaseSession(
self._receive_notification_type = receive_notification_type
self._session_read_timeout_seconds = read_timeout_seconds
self._in_flight = {}
self._exit_stack = ExitStack()
# Initialize executor and future to None for proper cleanup checks
self._executor: ThreadPoolExecutor | None = None
self._receiver_future: Future | None = None
@ -377,7 +375,7 @@ class BaseSession(
self._handle_incoming(RuntimeError(f"Server Error: {message}"))
except queue.Empty:
continue
except Exception as e:
except Exception:
logging.exception("Error in message processing loop")
raise
@ -389,14 +387,12 @@ class BaseSession(
If the request is responded to within this method, it will not be
forwarded on to the message stream.
"""
pass
def _received_notification(self, notification: ReceiveNotificationT) -> None:
"""
Can be overridden by subclasses to handle a notification without needing
to listen on the message stream.
"""
pass
def send_progress_notification(
self, progress_token: str | int, progress: float, total: float | None = None
@ -405,11 +401,9 @@ class BaseSession(
Sends a progress notification for a request that is currently being
processed.
"""
pass
def _handle_incoming(
self,
req: RequestResponder[ReceiveRequestT, SendResultT] | ReceiveNotificationT | Exception,
) -> None:
"""A generic handler for incoming messages. Overwritten by subclasses."""
pass

View File

@ -1,3 +1,4 @@
import queue
from datetime import timedelta
from typing import Any, Protocol
@ -85,8 +86,8 @@ class ClientSession(
):
def __init__(
self,
read_stream,
write_stream,
read_stream: queue.Queue,
write_stream: queue.Queue,
read_timeout_seconds: timedelta | None = None,
sampling_callback: SamplingFnT | None = None,
list_roots_callback: ListRootsFnT | None = None,

View File

@ -1,6 +1,10 @@
import json
from collections.abc import Generator
from contextlib import AbstractContextManager
import httpx
import httpx_sse
from httpx_sse import connect_sse
from configs import dify_config
from core.mcp.types import ErrorData, JSONRPCError
@ -55,20 +59,42 @@ def create_ssrf_proxy_mcp_http_client(
)
def ssrf_proxy_sse_connect(url, **kwargs):
def ssrf_proxy_sse_connect(url: str, **kwargs) -> AbstractContextManager[httpx_sse.EventSource]:
"""Connect to SSE endpoint with SSRF proxy protection.
This function creates an SSE connection using the configured proxy settings
to prevent SSRF attacks when connecting to external endpoints.
to prevent SSRF attacks when connecting to external endpoints. It returns
a context manager that yields an EventSource object for SSE streaming.
The function handles HTTP client creation and cleanup automatically, but
also accepts a pre-configured client via kwargs.
Args:
url: The SSE endpoint URL
**kwargs: Additional arguments passed to the SSE connection
url (str): The SSE endpoint URL to connect to
**kwargs: Additional arguments passed to the SSE connection, including:
- client (httpx.Client, optional): Pre-configured HTTP client.
If not provided, one will be created with SSRF protection.
- method (str, optional): HTTP method to use, defaults to "GET"
- headers (dict, optional): HTTP headers to include in the request
- timeout (httpx.Timeout, optional): Timeout configuration for the connection
Returns:
EventSource object for SSE streaming
AbstractContextManager[httpx_sse.EventSource]: A context manager that yields an EventSource
object for SSE streaming. The EventSource provides access to server-sent events.
Example:
```python
with ssrf_proxy_sse_connect(url, headers=headers) as event_source:
for sse in event_source.iter_sse():
print(sse.event, sse.data)
```
Note:
If a client is not provided in kwargs, one will be automatically created
with SSRF protection based on the application's configuration. If an
exception occurs during connection, any automatically created client
will be cleaned up automatically.
"""
from httpx_sse import connect_sse
# Extract client if provided, otherwise create one
client = kwargs.pop("client", None)
@ -101,7 +127,9 @@ def ssrf_proxy_sse_connect(url, **kwargs):
raise
def create_mcp_error_response(request_id: int | str | None, code: int, message: str, data=None):
def create_mcp_error_response(
request_id: int | str | None, code: int, message: str, data=None
) -> Generator[bytes, None, None]:
"""Create MCP error response"""
error_data = ErrorData(code=code, message=message, data=data)
json_response = JSONRPCError(

View File

@ -99,13 +99,13 @@ class TokenBufferMemory:
prompt_messages.append(UserPromptMessage(content=message.query))
else:
prompt_message_contents: list[PromptMessageContentUnionTypes] = []
prompt_message_contents.append(TextPromptMessageContent(data=message.query))
for file in file_objs:
prompt_message = file_manager.to_prompt_message_content(
file,
image_detail_config=detail,
)
prompt_message_contents.append(prompt_message)
prompt_message_contents.append(TextPromptMessageContent(data=message.query))
prompt_messages.append(UserPromptMessage(content=prompt_message_contents))

View File

@ -257,11 +257,6 @@ class ModelProviderFactory:
# scan all providers
plugin_model_provider_entities = self.get_plugin_model_providers()
# convert provider_configs to dict
provider_credentials_dict = {}
for provider_config in provider_configs:
provider_credentials_dict[provider_config.provider] = provider_config.credentials
# traverse all model_provider_extensions
providers = []
for plugin_model_provider_entity in plugin_model_provider_entities:

View File

@ -68,7 +68,7 @@ class CommonValidator:
if credential_form_schema.max_length:
if len(value) > credential_form_schema.max_length:
raise ValueError(
f"Variable {credential_form_schema.variable} length should not"
f"Variable {credential_form_schema.variable} length should not be"
f" greater than {credential_form_schema.max_length}"
)

View File

@ -151,12 +151,9 @@ def jsonable_encoder(
return format(obj, "f")
if isinstance(obj, dict):
encoded_dict = {}
allowed_keys = set(obj.keys())
for key, value in obj.items():
if (
(not sqlalchemy_safe or (not isinstance(key, str)) or (not key.startswith("_sa")))
and (value is not None or not exclude_none)
and key in allowed_keys
if (not sqlalchemy_safe or (not isinstance(key, str)) or (not key.startswith("_sa"))) and (
value is not None or not exclude_none
):
encoded_key = jsonable_encoder(
key,

View File

@ -1,10 +0,0 @@
import pydantic
from pydantic import BaseModel
def dump_model(model: BaseModel) -> dict:
if hasattr(pydantic, "model_dump"):
# FIXME mypy error, try to fix it instead of using type: ignore
return pydantic.model_dump(model) # type: ignore
else:
return model.model_dump()

View File

@ -4,15 +4,15 @@ from collections.abc import Sequence
from typing import Optional
from urllib.parse import urljoin
from opentelemetry.trace import Status, StatusCode
from opentelemetry.trace import Link, Status, StatusCode
from sqlalchemy.orm import Session, sessionmaker
from core.ops.aliyun_trace.data_exporter.traceclient import (
TraceClient,
convert_datetime_to_nanoseconds,
convert_string_to_id,
convert_to_span_id,
convert_to_trace_id,
create_link,
generate_span_id,
)
from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData
@ -103,10 +103,11 @@ class AliyunDataTrace(BaseTraceInstance):
def workflow_trace(self, trace_info: WorkflowTraceInfo):
trace_id = convert_to_trace_id(trace_info.workflow_run_id)
links = []
if trace_info.trace_id:
trace_id = convert_string_to_id(trace_info.trace_id)
links.append(create_link(trace_id_str=trace_info.trace_id))
workflow_span_id = convert_to_span_id(trace_info.workflow_run_id, "workflow")
self.add_workflow_span(trace_id, workflow_span_id, trace_info)
self.add_workflow_span(trace_id, workflow_span_id, trace_info, links)
workflow_node_executions = self.get_workflow_node_executions(trace_info)
for node_execution in workflow_node_executions:
@ -132,8 +133,9 @@ class AliyunDataTrace(BaseTraceInstance):
status = Status(StatusCode.ERROR, trace_info.error)
trace_id = convert_to_trace_id(message_id)
links = []
if trace_info.trace_id:
trace_id = convert_string_to_id(trace_info.trace_id)
links.append(create_link(trace_id_str=trace_info.trace_id))
message_span_id = convert_to_span_id(message_id, "message")
message_span = SpanData(
@ -152,6 +154,7 @@ class AliyunDataTrace(BaseTraceInstance):
OUTPUT_VALUE: str(trace_info.outputs),
},
status=status,
links=links,
)
self.trace_client.add_span(message_span)
@ -192,8 +195,9 @@ class AliyunDataTrace(BaseTraceInstance):
message_id = trace_info.message_id
trace_id = convert_to_trace_id(message_id)
links = []
if trace_info.trace_id:
trace_id = convert_string_to_id(trace_info.trace_id)
links.append(create_link(trace_id_str=trace_info.trace_id))
documents_data = extract_retrieval_documents(trace_info.documents)
dataset_retrieval_span = SpanData(
@ -211,6 +215,7 @@ class AliyunDataTrace(BaseTraceInstance):
INPUT_VALUE: str(trace_info.inputs),
OUTPUT_VALUE: json.dumps(documents_data, ensure_ascii=False),
},
links=links,
)
self.trace_client.add_span(dataset_retrieval_span)
@ -224,8 +229,9 @@ class AliyunDataTrace(BaseTraceInstance):
status = Status(StatusCode.ERROR, trace_info.error)
trace_id = convert_to_trace_id(message_id)
links = []
if trace_info.trace_id:
trace_id = convert_string_to_id(trace_info.trace_id)
links.append(create_link(trace_id_str=trace_info.trace_id))
tool_span = SpanData(
trace_id=trace_id,
@ -244,6 +250,7 @@ class AliyunDataTrace(BaseTraceInstance):
OUTPUT_VALUE: str(trace_info.tool_outputs),
},
status=status,
links=links,
)
self.trace_client.add_span(tool_span)
@ -413,7 +420,9 @@ class AliyunDataTrace(BaseTraceInstance):
status=self.get_workflow_node_status(node_execution),
)
def add_workflow_span(self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo):
def add_workflow_span(
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, links: Sequence[Link]
):
message_span_id = None
if trace_info.message_id:
message_span_id = convert_to_span_id(trace_info.message_id, "message")
@ -438,6 +447,7 @@ class AliyunDataTrace(BaseTraceInstance):
OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False),
},
status=status,
links=links,
)
self.trace_client.add_span(message_span)
@ -456,6 +466,7 @@ class AliyunDataTrace(BaseTraceInstance):
OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False),
},
status=status,
links=links,
)
self.trace_client.add_span(workflow_span)
@ -466,8 +477,9 @@ class AliyunDataTrace(BaseTraceInstance):
status = Status(StatusCode.ERROR, trace_info.error)
trace_id = convert_to_trace_id(message_id)
links = []
if trace_info.trace_id:
trace_id = convert_string_to_id(trace_info.trace_id)
links.append(create_link(trace_id_str=trace_info.trace_id))
suggested_question_span = SpanData(
trace_id=trace_id,
@ -487,6 +499,7 @@ class AliyunDataTrace(BaseTraceInstance):
OUTPUT_VALUE: json.dumps(trace_info.suggested_question, ensure_ascii=False),
},
status=status,
links=links,
)
self.trace_client.add_span(suggested_question_span)

View File

@ -16,6 +16,7 @@ from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.trace import Link, SpanContext, TraceFlags
from configs import dify_config
from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData
@ -166,6 +167,16 @@ class SpanBuilder:
return span
def create_link(trace_id_str: str) -> Link:
placeholder_span_id = 0x0000000000000000
trace_id = int(trace_id_str, 16)
span_context = SpanContext(
trace_id=trace_id, span_id=placeholder_span_id, is_remote=False, trace_flags=TraceFlags(TraceFlags.SAMPLED)
)
return Link(span_context)
def generate_span_id() -> int:
span_id = random.getrandbits(64)
while span_id == INVALID_SPAN_ID:

View File

@ -523,7 +523,7 @@ class ProviderManager:
# Init trial provider records if not exists
if ProviderQuotaType.TRIAL not in provider_quota_to_provider_record_dict:
try:
# FIXME ignore the type errork, onyl TrialHostingQuota has limit need to change the logic
# FIXME ignore the type error, only TrialHostingQuota has limit need to change the logic
new_provider_record = Provider(
tenant_id=tenant_id,
# TODO: Use provider name with prefix after the data migration.

View File

@ -1,7 +1,7 @@
import json
from collections import defaultdict
from typing import Any, Optional
import orjson
from pydantic import BaseModel
from configs import dify_config
@ -134,13 +134,13 @@ class Jieba(BaseKeyword):
dataset_keyword_table = self.dataset.dataset_keyword_table
keyword_data_source_type = dataset_keyword_table.data_source_type
if keyword_data_source_type == "database":
dataset_keyword_table.keyword_table = json.dumps(keyword_table_dict, cls=SetEncoder)
dataset_keyword_table.keyword_table = dumps_with_sets(keyword_table_dict)
db.session.commit()
else:
file_key = "keyword_files/" + self.dataset.tenant_id + "/" + self.dataset.id + ".txt"
if storage.exists(file_key):
storage.delete(file_key)
storage.save(file_key, json.dumps(keyword_table_dict, cls=SetEncoder).encode("utf-8"))
storage.save(file_key, dumps_with_sets(keyword_table_dict).encode("utf-8"))
def _get_dataset_keyword_table(self) -> Optional[dict]:
dataset_keyword_table = self.dataset.dataset_keyword_table
@ -156,12 +156,11 @@ class Jieba(BaseKeyword):
data_source_type=keyword_data_source_type,
)
if keyword_data_source_type == "database":
dataset_keyword_table.keyword_table = json.dumps(
dataset_keyword_table.keyword_table = dumps_with_sets(
{
"__type__": "keyword_table",
"__data__": {"index_id": self.dataset.id, "summary": None, "table": {}},
},
cls=SetEncoder,
}
)
db.session.add(dataset_keyword_table)
db.session.commit()
@ -252,8 +251,13 @@ class Jieba(BaseKeyword):
self._save_dataset_keyword_table(keyword_table)
class SetEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return super().default(obj)
def set_orjson_default(obj: Any) -> Any:
"""Default function for orjson serialization of set types"""
if isinstance(obj, set):
return list(obj)
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")
def dumps_with_sets(obj: Any) -> str:
"""JSON dumps with set support using orjson"""
return orjson.dumps(obj, default=set_orjson_default).decode("utf-8")

View File

@ -4,8 +4,8 @@ import math
from typing import Any
from pydantic import BaseModel, model_validator
from pyobvector import VECTOR, ObVecClient # type: ignore
from sqlalchemy import JSON, Column, String, func
from pyobvector import VECTOR, FtsIndexParam, FtsParser, ObVecClient, l2_distance # type: ignore
from sqlalchemy import JSON, Column, String
from sqlalchemy.dialects.mysql import LONGTEXT
from configs import dify_config
@ -119,14 +119,21 @@ class OceanBaseVector(BaseVector):
)
try:
if self._hybrid_search_enabled:
self._client.perform_raw_text_sql(f"""ALTER TABLE {self._collection_name}
ADD FULLTEXT INDEX fulltext_index_for_col_text (text) WITH PARSER ik""")
self._client.create_fts_idx_with_fts_index_param(
table_name=self._collection_name,
fts_idx_param=FtsIndexParam(
index_name="fulltext_index_for_col_text",
field_names=["text"],
parser_type=FtsParser.IK,
),
)
except Exception as e:
raise Exception(
"Failed to add fulltext index to the target table, your OceanBase version must be 4.3.5.1 or above "
+ "to support fulltext index and vector index in the same table",
e,
)
self._client.refresh_metadata([self._collection_name])
redis_client.set(collection_exist_cache_key, 1, ex=3600)
def _check_hybrid_search_support(self) -> bool:
@ -252,7 +259,7 @@ class OceanBaseVector(BaseVector):
vec_column_name="vector",
vec_data=query_vector,
topk=topk,
distance_func=func.l2_distance,
distance_func=l2_distance,
output_column_names=["text", "metadata"],
with_dist=True,
where_clause=_where_clause,

View File

@ -109,8 +109,19 @@ class OracleVector(BaseVector):
)
def _get_connection(self) -> Connection:
connection = oracledb.connect(user=self.config.user, password=self.config.password, dsn=self.config.dsn)
return connection
if self.config.is_autonomous:
connection = oracledb.connect(
user=self.config.user,
password=self.config.password,
dsn=self.config.dsn,
config_dir=self.config.config_dir,
wallet_location=self.config.wallet_location,
wallet_password=self.config.wallet_password,
)
return connection
else:
connection = oracledb.connect(user=self.config.user, password=self.config.password, dsn=self.config.dsn)
return connection
def _create_connection_pool(self, config: OracleVectorConfig):
pool_params = {

View File

@ -331,6 +331,12 @@ class QdrantVector(BaseVector):
def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]:
from qdrant_client.http import models
score_threshold = float(kwargs.get("score_threshold") or 0.0)
if score_threshold >= 1:
# return empty list because some versions of qdrant may response with 400 bad request
# and at the same time, the score_threshold with value 1 may be valid for other vector stores
return []
filter = models.Filter(
must=[
models.FieldCondition(
@ -355,7 +361,7 @@ class QdrantVector(BaseVector):
limit=kwargs.get("top_k", 4),
with_payload=True,
with_vectors=True,
score_threshold=float(kwargs.get("score_threshold") or 0.0),
score_threshold=score_threshold,
)
docs = []
for result in results:
@ -363,7 +369,6 @@ class QdrantVector(BaseVector):
continue
metadata = result.payload.get(Field.METADATA_KEY.value) or {}
# duplicate check score threshold
score_threshold = float(kwargs.get("score_threshold") or 0.0)
if result.score > score_threshold:
metadata["score"] = result.score
doc = Document(

View File

@ -5,10 +5,14 @@ This package contains concrete implementations of the repository interfaces
defined in the core.workflow.repository package.
"""
from core.repositories.celery_workflow_execution_repository import CeleryWorkflowExecutionRepository
from core.repositories.celery_workflow_node_execution_repository import CeleryWorkflowNodeExecutionRepository
from core.repositories.factory import DifyCoreRepositoryFactory, RepositoryImportError
from core.repositories.sqlalchemy_workflow_node_execution_repository import SQLAlchemyWorkflowNodeExecutionRepository
__all__ = [
"CeleryWorkflowExecutionRepository",
"CeleryWorkflowNodeExecutionRepository",
"DifyCoreRepositoryFactory",
"RepositoryImportError",
"SQLAlchemyWorkflowNodeExecutionRepository",

View File

@ -0,0 +1,126 @@
"""
Celery-based implementation of the WorkflowExecutionRepository.
This implementation uses Celery tasks for asynchronous storage operations,
providing improved performance by offloading database operations to background workers.
"""
import logging
from typing import Optional, Union
from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker
from core.workflow.entities.workflow_execution import WorkflowExecution
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from libs.helper import extract_tenant_id
from models import Account, CreatorUserRole, EndUser
from models.enums import WorkflowRunTriggeredFrom
from tasks.workflow_execution_tasks import (
save_workflow_execution_task,
)
logger = logging.getLogger(__name__)
class CeleryWorkflowExecutionRepository(WorkflowExecutionRepository):
"""
Celery-based implementation of the WorkflowExecutionRepository interface.
This implementation provides asynchronous storage capabilities by using Celery tasks
to handle database operations in background workers. This improves performance by
reducing the blocking time for workflow execution storage operations.
Key features:
- Asynchronous save operations using Celery tasks
- Support for multi-tenancy through tenant/app filtering
- Automatic retry and error handling through Celery
"""
_session_factory: sessionmaker
_tenant_id: str
_app_id: Optional[str]
_triggered_from: Optional[WorkflowRunTriggeredFrom]
_creator_user_id: str
_creator_user_role: CreatorUserRole
def __init__(
self,
session_factory: sessionmaker | Engine,
user: Union[Account, EndUser],
app_id: Optional[str],
triggered_from: Optional[WorkflowRunTriggeredFrom],
):
"""
Initialize the repository with Celery task configuration and context information.
Args:
session_factory: SQLAlchemy sessionmaker or engine for fallback operations
user: Account or EndUser object containing tenant_id, user ID, and role information
app_id: App ID for filtering by application (can be None)
triggered_from: Source of the execution trigger (DEBUGGING or APP_RUN)
"""
# Store session factory for fallback operations
if isinstance(session_factory, Engine):
self._session_factory = sessionmaker(bind=session_factory, expire_on_commit=False)
elif isinstance(session_factory, sessionmaker):
self._session_factory = session_factory
else:
raise ValueError(
f"Invalid session_factory type {type(session_factory).__name__}; expected sessionmaker or Engine"
)
# Extract tenant_id from user
tenant_id = extract_tenant_id(user)
if not tenant_id:
raise ValueError("User must have a tenant_id or current_tenant_id")
self._tenant_id = tenant_id # type: ignore[assignment] # We've already checked tenant_id is not None
# Store app context
self._app_id = app_id
# Extract user context
self._triggered_from = triggered_from
self._creator_user_id = user.id
# Determine user role based on user type
self._creator_user_role = CreatorUserRole.ACCOUNT if isinstance(user, Account) else CreatorUserRole.END_USER
logger.info(
"Initialized CeleryWorkflowExecutionRepository for tenant %s, app %s, triggered_from %s",
self._tenant_id,
self._app_id,
self._triggered_from,
)
def save(self, execution: WorkflowExecution) -> None:
"""
Save or update a WorkflowExecution instance asynchronously using Celery.
This method queues the save operation as a Celery task and returns immediately,
providing improved performance for high-throughput scenarios.
Args:
execution: The WorkflowExecution instance to save or update
"""
try:
# Serialize execution for Celery task
execution_data = execution.model_dump()
# Queue the save operation as a Celery task (fire and forget)
save_workflow_execution_task.delay(
execution_data=execution_data,
tenant_id=self._tenant_id,
app_id=self._app_id or "",
triggered_from=self._triggered_from.value if self._triggered_from else "",
creator_user_id=self._creator_user_id,
creator_user_role=self._creator_user_role.value,
)
logger.debug("Queued async save for workflow execution: %s", execution.id_)
except Exception as e:
logger.exception("Failed to queue save operation for execution %s", execution.id_)
# In case of Celery failure, we could implement a fallback to synchronous save
# For now, we'll re-raise the exception
raise

Some files were not shown because too many files have changed in this diff Show More