Compare commits

..

1 Commits

Author SHA1 Message Date
82b1c5bc12 feat: add service api of HITL
Co-authored-by: hjlarry <hjlarry@163.com>
Co-authored-by: QuantumGhost <QuantumGhost@users.noreply.github.com>
2026-04-21 09:29:16 +08:00
1012 changed files with 23594 additions and 31619 deletions

View File

@ -367,7 +367,7 @@ For each extraction:
┌────────────────────────────────────────┐
│ 1. Extract code │
│ 2. Run: pnpm lint:fix │
│ 3. Run: pnpm type-check
│ 3. Run: pnpm type-check:tsgo
│ 4. Run: pnpm test │
│ 5. Test functionality manually │
│ 6. PASS? → Next extraction │

View File

@ -127,7 +127,7 @@ For the current file being tested:
- [ ] Run full directory test: `pnpm test path/to/directory/`
- [ ] Check coverage report: `pnpm test:coverage`
- [ ] Run `pnpm lint:fix` on all test files
- [ ] Run `pnpm type-check`
- [ ] Run `pnpm type-check:tsgo`
## Common Issues to Watch

3
.github/CODEOWNERS vendored
View File

@ -6,9 +6,6 @@
* @crazywoola @laipz8200 @Yeuoly
# ESLint suppression file is maintained by autofix.ci pruning.
/eslint-suppressions.json
# CODEOWNERS file
/.github/CODEOWNERS @laipz8200 @crazywoola

View File

@ -4,7 +4,7 @@ runs:
using: composite
steps:
- name: Setup Vite+
uses: voidzero-dev/setup-vp@4f5aa3e38c781f1b01e78fb9255527cee8a6efa6 # v1.8.0
uses: voidzero-dev/setup-vp@20553a7a7429c429a74894104a2835d7fed28a72 # v1.3.0
with:
node-version-file: .nvmrc
cache: true

1
.github/labeler.yml vendored
View File

@ -6,4 +6,5 @@ web:
- 'package.json'
- 'pnpm-lock.yaml'
- 'pnpm-workspace.yaml'
- '.npmrc'
- '.nvmrc'

View File

@ -16,7 +16,7 @@ concurrency:
jobs:
api-unit:
name: API Unit Tests
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
env:
COVERAGE_FILE: coverage-unit
defaults:
@ -62,7 +62,7 @@ jobs:
api-integration:
name: API Integration Tests
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
env:
COVERAGE_FILE: coverage-integration
STORAGE_TYPE: opendal
@ -137,7 +137,7 @@ jobs:
api-coverage:
name: API Coverage
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
needs:
- api-unit
- api-integration

View File

@ -13,7 +13,7 @@ permissions:
jobs:
autofix:
if: github.repository == 'langgenius/dify'
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
steps:
- name: Complete merge group check
if: github.event_name == 'merge_group'
@ -43,6 +43,7 @@ jobs:
package.json
pnpm-lock.yaml
pnpm-workspace.yaml
.npmrc
.nvmrc
- name: Check api inputs
if: github.event_name != 'merge_group'

View File

@ -26,9 +26,6 @@ jobs:
build:
runs-on: ${{ matrix.runs_on }}
if: github.repository == 'langgenius/dify'
permissions:
contents: read
id-token: write
strategy:
matrix:
include:
@ -38,28 +35,28 @@ jobs:
build_context: "{{defaultContext}}:api"
file: "Dockerfile"
platform: linux/amd64
runs_on: depot-ubuntu-24.04-4
runs_on: ubuntu-latest
- service_name: "build-api-arm64"
image_name_env: "DIFY_API_IMAGE_NAME"
artifact_context: "api"
build_context: "{{defaultContext}}:api"
file: "Dockerfile"
platform: linux/arm64
runs_on: depot-ubuntu-24.04-4
runs_on: ubuntu-24.04-arm
- service_name: "build-web-amd64"
image_name_env: "DIFY_WEB_IMAGE_NAME"
artifact_context: "web"
build_context: "{{defaultContext}}"
file: "web/Dockerfile"
platform: linux/amd64
runs_on: depot-ubuntu-24.04-4
runs_on: ubuntu-latest
- service_name: "build-web-arm64"
image_name_env: "DIFY_WEB_IMAGE_NAME"
artifact_context: "web"
build_context: "{{defaultContext}}"
file: "web/Dockerfile"
platform: linux/arm64
runs_on: depot-ubuntu-24.04-4
runs_on: ubuntu-24.04-arm
steps:
- name: Prepare
@ -73,8 +70,8 @@ jobs:
username: ${{ env.DOCKERHUB_USER }}
password: ${{ env.DOCKERHUB_TOKEN }}
- name: Set up Depot CLI
uses: depot/setup-action@15c09a5f77a0840ad4bce955686522a257853461 # v1.7.1
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # v4.0.0
- name: Extract metadata for Docker
id: meta
@ -84,15 +81,16 @@ jobs:
- name: Build Docker image
id: build
uses: depot/build-push-action@5f3b3c2e5a00f0093de47f657aeaefcedff27d18 # v1.17.0
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # v7.1.0
with:
project: ${{ vars.DEPOT_PROJECT_ID }}
context: ${{ matrix.build_context }}
file: ${{ matrix.file }}
platforms: ${{ matrix.platform }}
build-args: COMMIT_SHA=${{ fromJSON(steps.meta.outputs.json).labels['org.opencontainers.image.revision'] }}
labels: ${{ steps.meta.outputs.labels }}
outputs: type=image,name=${{ env[matrix.image_name_env] }},push-by-digest=true,name-canonical=true,push=true
cache-from: type=gha,scope=${{ matrix.service_name }}
cache-to: type=gha,mode=max,scope=${{ matrix.service_name }}
- name: Export digest
env:
@ -110,33 +108,9 @@ jobs:
if-no-files-found: error
retention-days: 1
fork-build-validate:
if: github.repository != 'langgenius/dify'
runs-on: ubuntu-24.04
strategy:
matrix:
include:
- service_name: "validate-api-amd64"
build_context: "{{defaultContext}}:api"
file: "Dockerfile"
- service_name: "validate-web-amd64"
build_context: "{{defaultContext}}"
file: "web/Dockerfile"
steps:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # v4.0.0
- name: Validate Docker image
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # v7.1.0
with:
push: false
context: ${{ matrix.build_context }}
file: ${{ matrix.file }}
platforms: linux/amd64
create-manifest:
needs: build
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
if: github.repository == 'langgenius/dify'
strategy:
matrix:

View File

@ -9,7 +9,7 @@ concurrency:
jobs:
db-migration-test-postgres:
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
steps:
- name: Checkout code
@ -59,7 +59,7 @@ jobs:
run: uv run --directory api flask upgrade-db
db-migration-test-mysql:
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
steps:
- name: Checkout code
@ -110,28 +110,6 @@ jobs:
sed -i 's/DB_PORT=5432/DB_PORT=3306/' .env
sed -i 's/DB_USERNAME=postgres/DB_USERNAME=root/' .env
# hoverkraft-tech/compose-action@v2.6.0 only waits for `docker compose up -d`
# to return (container processes started); it does not wait on healthcheck
# status. mysql:8.0's first-time init takes 15-30s, so without an explicit
# wait the migration runs while InnoDB is still initialising and gets
# killed with "Lost connection during query". Poll a real SELECT until it
# succeeds.
- name: Wait for MySQL to accept queries
run: |
set +e
for i in $(seq 1 60); do
if docker run --rm --network host mysql:8.0 \
mysql -h 127.0.0.1 -P 3306 -uroot -pdifyai123456 \
-e 'SELECT 1' >/dev/null 2>&1; then
echo "MySQL ready after ${i}s"
exit 0
fi
sleep 1
done
echo "MySQL not ready after 60s; dumping container logs:"
docker compose -f docker/docker-compose.middleware.yaml --profile mysql logs --tail=200 db_mysql
exit 1
- name: Run DB Migration
env:
DEBUG: true

View File

@ -13,7 +13,7 @@ on:
jobs:
deploy:
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
if: |
github.event.workflow_run.conclusion == 'success' &&
github.event.workflow_run.head_branch == 'deploy/agent-dev'

View File

@ -10,7 +10,7 @@ on:
jobs:
deploy:
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
if: |
github.event.workflow_run.conclusion == 'success' &&
github.event.workflow_run.head_branch == 'deploy/dev'

View File

@ -13,7 +13,7 @@ on:
jobs:
deploy:
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
if: |
github.event.workflow_run.conclusion == 'success' &&
github.event.workflow_run.head_branch == 'deploy/enterprise'

View File

@ -10,7 +10,7 @@ on:
jobs:
deploy:
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
if: |
github.event.workflow_run.conclusion == 'success' &&
github.event.workflow_run.head_branch == 'build/feat/hitl'

View File

@ -14,59 +14,28 @@ concurrency:
jobs:
build-docker:
if: github.event.pull_request.head.repo.full_name == github.repository
runs-on: ${{ matrix.runs_on }}
permissions:
contents: read
id-token: write
strategy:
matrix:
include:
- service_name: "api-amd64"
platform: linux/amd64
runs_on: depot-ubuntu-24.04-4
runs_on: ubuntu-latest
context: "{{defaultContext}}:api"
file: "Dockerfile"
- service_name: "api-arm64"
platform: linux/arm64
runs_on: depot-ubuntu-24.04-4
runs_on: ubuntu-24.04-arm
context: "{{defaultContext}}:api"
file: "Dockerfile"
- service_name: "web-amd64"
platform: linux/amd64
runs_on: depot-ubuntu-24.04-4
runs_on: ubuntu-latest
context: "{{defaultContext}}"
file: "web/Dockerfile"
- service_name: "web-arm64"
platform: linux/arm64
runs_on: depot-ubuntu-24.04-4
context: "{{defaultContext}}"
file: "web/Dockerfile"
steps:
- name: Set up Depot CLI
uses: depot/setup-action@15c09a5f77a0840ad4bce955686522a257853461 # v1.7.1
- name: Build Docker Image
uses: depot/build-push-action@5f3b3c2e5a00f0093de47f657aeaefcedff27d18 # v1.17.0
with:
project: ${{ vars.DEPOT_PROJECT_ID }}
push: false
context: ${{ matrix.context }}
file: ${{ matrix.file }}
platforms: ${{ matrix.platform }}
build-docker-fork:
if: github.event.pull_request.head.repo.full_name != github.repository
runs-on: ubuntu-24.04
permissions:
contents: read
strategy:
matrix:
include:
- service_name: "api-amd64"
context: "{{defaultContext}}:api"
file: "Dockerfile"
- service_name: "web-amd64"
runs_on: ubuntu-24.04-arm
context: "{{defaultContext}}"
file: "web/Dockerfile"
steps:
@ -79,4 +48,6 @@ jobs:
push: false
context: ${{ matrix.context }}
file: ${{ matrix.file }}
platforms: linux/amd64
platforms: ${{ matrix.platform }}
cache-from: type=gha
cache-to: type=gha,mode=max

View File

@ -7,7 +7,7 @@ jobs:
permissions:
contents: read
pull-requests: write
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
steps:
- uses: actions/labeler@634933edcd8ababfe52f92936142cc22ac488b1b # v6.0.1
with:

View File

@ -23,7 +23,7 @@ concurrency:
jobs:
pre_job:
name: Skip Duplicate Checks
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
outputs:
should_skip: ${{ steps.skip_check.outputs.should_skip || 'false' }}
steps:
@ -39,7 +39,7 @@ jobs:
name: Check Changed Files
needs: pre_job
if: needs.pre_job.outputs.should_skip != 'true'
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
outputs:
api-changed: ${{ steps.changes.outputs.api }}
e2e-changed: ${{ steps.changes.outputs.e2e }}
@ -69,6 +69,7 @@ jobs:
- 'package.json'
- 'pnpm-lock.yaml'
- 'pnpm-workspace.yaml'
- '.npmrc'
- '.nvmrc'
- '.github/workflows/web-tests.yml'
- '.github/actions/setup-web/**'
@ -82,6 +83,7 @@ jobs:
- 'package.json'
- 'pnpm-lock.yaml'
- 'pnpm-workspace.yaml'
- '.npmrc'
- '.nvmrc'
- 'docker/docker-compose.middleware.yaml'
- 'docker/middleware.env.example'
@ -139,7 +141,7 @@ jobs:
- pre_job
- check-changes
if: needs.pre_job.outputs.should_skip != 'true' && needs.check-changes.outputs.api-changed != 'true'
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
steps:
- name: Report skipped API tests
run: echo "No API-related changes detected; skipping API tests."
@ -152,7 +154,7 @@ jobs:
- check-changes
- api-tests-run
- api-tests-skip
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
steps:
- name: Finalize API Tests status
env:
@ -199,7 +201,7 @@ jobs:
- pre_job
- check-changes
if: needs.pre_job.outputs.should_skip != 'true' && needs.check-changes.outputs.web-changed != 'true'
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
steps:
- name: Report skipped web tests
run: echo "No web-related changes detected; skipping web tests."
@ -212,7 +214,7 @@ jobs:
- check-changes
- web-tests-run
- web-tests-skip
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
steps:
- name: Finalize Web Tests status
env:
@ -258,7 +260,7 @@ jobs:
- pre_job
- check-changes
if: needs.pre_job.outputs.should_skip != 'true' && needs.check-changes.outputs.e2e-changed != 'true'
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
steps:
- name: Report skipped web full-stack e2e
run: echo "No E2E-related changes detected; skipping web full-stack E2E."
@ -271,7 +273,7 @@ jobs:
- check-changes
- web-e2e-run
- web-e2e-skip
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
steps:
- name: Finalize Web Full-Stack E2E status
env:
@ -323,7 +325,7 @@ jobs:
- pre_job
- check-changes
if: needs.pre_job.outputs.should_skip != 'true' && needs.check-changes.outputs.vdb-changed != 'true'
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
steps:
- name: Report skipped VDB tests
run: echo "No VDB-related changes detected; skipping VDB tests."
@ -336,7 +338,7 @@ jobs:
- check-changes
- vdb-tests-run
- vdb-tests-skip
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
steps:
- name: Finalize VDB Tests status
env:
@ -382,7 +384,7 @@ jobs:
- pre_job
- check-changes
if: needs.pre_job.outputs.should_skip != 'true' && needs.check-changes.outputs.migration-changed != 'true'
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
steps:
- name: Report skipped DB migration tests
run: echo "No migration-related changes detected; skipping DB migration tests."
@ -395,7 +397,7 @@ jobs:
- check-changes
- db-migration-test-run
- db-migration-test-skip
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
steps:
- name: Finalize DB Migration Test status
env:

View File

@ -12,7 +12,7 @@ permissions: {}
jobs:
comment:
name: Comment PR with pyrefly diff
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
permissions:
actions: read
contents: read

View File

@ -10,7 +10,7 @@ permissions:
jobs:
pyrefly-diff:
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
permissions:
contents: read
issues: write

View File

@ -12,7 +12,7 @@ permissions: {}
jobs:
comment:
name: Comment PR with type coverage
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
permissions:
actions: read
contents: read

View File

@ -10,7 +10,7 @@ permissions:
jobs:
pyrefly-type-coverage:
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
permissions:
contents: read
issues: write

View File

@ -16,7 +16,7 @@ jobs:
name: Validate PR title
permissions:
pull-requests: read
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
steps:
- name: Complete merge group check
if: github.event_name == 'merge_group'

View File

@ -12,7 +12,7 @@ on:
jobs:
stale:
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
permissions:
issues: write
pull-requests: write

View File

@ -15,7 +15,7 @@ permissions:
jobs:
python-style:
name: Python Style
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
steps:
- name: Checkout code
@ -57,7 +57,7 @@ jobs:
web-style:
name: Web Style
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
defaults:
run:
working-directory: ./web
@ -83,6 +83,7 @@ jobs:
package.json
pnpm-lock.yaml
pnpm-workspace.yaml
.npmrc
.nvmrc
.github/workflows/style.yml
.github/actions/setup-web/**
@ -130,7 +131,7 @@ jobs:
superlinter:
name: SuperLinter
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
steps:
- name: Checkout code

View File

@ -9,6 +9,7 @@ on:
- package.json
- pnpm-lock.yaml
- pnpm-workspace.yaml
- .npmrc
concurrency:
group: sdk-tests-${{ github.head_ref || github.run_id }}
@ -17,7 +18,7 @@ concurrency:
jobs:
build:
name: unit test for Node.js SDK
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
defaults:
run:

View File

@ -35,7 +35,7 @@ concurrency:
jobs:
translate:
if: github.repository == 'langgenius/dify'
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
timeout-minutes: 120
steps:
@ -158,7 +158,7 @@ jobs:
- name: Run Claude Code for Translation Sync
if: steps.context.outputs.CHANGED_FILES != ''
uses: anthropics/claude-code-action@ef50f123a3a9be95b60040d042717517407c7256 # v1.0.110
uses: anthropics/claude-code-action@38ec876110f9fbf8b950c79f534430740c3ac009 # v1.0.101
with:
anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }}
github_token: ${{ secrets.GITHUB_TOKEN }}

View File

@ -16,7 +16,7 @@ concurrency:
jobs:
trigger:
if: github.repository == 'langgenius/dify'
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
timeout-minutes: 5
steps:

View File

@ -16,7 +16,7 @@ jobs:
test:
name: Full VDB Tests
if: github.repository == 'langgenius/dify'
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
strategy:
matrix:
python-version:

View File

@ -13,7 +13,7 @@ concurrency:
jobs:
test:
name: VDB Smoke Tests
runs-on: depot-ubuntu-24.04
runs-on: ubuntu-latest
strategy:
matrix:
python-version:

View File

@ -13,7 +13,7 @@ concurrency:
jobs:
test:
name: Web Full-Stack E2E
runs-on: depot-ubuntu-24.04-4
runs-on: ubuntu-latest
defaults:
run:
shell: bash

View File

@ -16,7 +16,7 @@ concurrency:
jobs:
test:
name: Web Tests (${{ matrix.shardIndex }}/${{ matrix.shardTotal }})
runs-on: depot-ubuntu-24.04-4
runs-on: ubuntu-latest
env:
VITEST_COVERAGE_SCOPE: app-components
strategy:
@ -54,7 +54,7 @@ jobs:
name: Merge Test Reports
if: ${{ !cancelled() }}
needs: [test]
runs-on: depot-ubuntu-24.04-4
runs-on: ubuntu-latest
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
defaults:
@ -92,7 +92,7 @@ jobs:
dify-ui-test:
name: dify-ui Tests
runs-on: depot-ubuntu-24.04-4
runs-on: ubuntu-latest
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
defaults:

4
.gitignore vendored
View File

@ -237,10 +237,6 @@ scripts/stress-test/reports/
.playwright-mcp/
.serena/
# vitest browser mode attachments (failure screenshots, traces, etc.)
.vitest-attachments/
**/__screenshots__/
# settings
*.local.json
*.local.md

1
.npmrc Normal file
View File

@ -0,0 +1 @@
save-exact=true

View File

@ -30,7 +30,7 @@ The codebase is split into:
## Language Style
- **Python**: Keep type hints on functions and attributes, and implement relevant special methods (e.g., `__repr__`, `__str__`). Prefer `TypedDict` over `dict` or `Mapping` for type safety and better code documentation.
- **TypeScript**: Use the strict config, rely on ESLint (`pnpm lint:fix` preferred) plus `pnpm type-check`, and avoid `any` types.
- **TypeScript**: Use the strict config, rely on ESLint (`pnpm lint:fix` preferred) plus `pnpm type-check:tsgo`, and avoid `any` types.
## General Practices

View File

@ -139,6 +139,19 @@ Star Dify on GitHub and be instantly notified of new releases.
If you need to customize the configuration, please refer to the comments in our [.env.example](docker/.env.example) file and update the corresponding values in your `.env` file. Additionally, you might need to make adjustments to the `docker-compose.yaml` file itself, such as changing image versions, port mappings, or volume mounts, based on your specific deployment environment and requirements. After making any changes, please re-run `docker compose up -d`. You can find the full list of available environment variables [here](https://docs.dify.ai/getting-started/install-self-hosted/environments).
#### Customizing Suggested Questions
You can now customize the "Suggested Questions After Answer" feature to better fit your use case. For example, to generate longer, more technical questions:
```bash
# In your .env file
SUGGESTED_QUESTIONS_PROMPT='Please help me predict the five most likely technical follow-up questions a developer would ask. Focus on implementation details, best practices, and architecture considerations. Keep each question between 40-60 characters. Output must be JSON array: ["question1","question2","question3","question4","question5"]'
SUGGESTED_QUESTIONS_MAX_TOKENS=512
SUGGESTED_QUESTIONS_TEMPERATURE=0.3
```
See the [Suggested Questions Configuration Guide](docs/suggested-questions-configuration.md) for detailed examples and usage instructions.
### Metrics Monitoring with Grafana
Import the dashboard to Grafana, using Dify's PostgreSQL database as data source, to monitor metrics in granularity of apps, tenants, messages, and more.
@ -147,7 +160,7 @@ Import the dashboard to Grafana, using Dify's PostgreSQL database as data source
### Deployment with Kubernetes
If you'd like to configure a highly available setup, there are community-contributed [Helm Charts](https://helm.sh/) and YAML files which allow Dify to be deployed on Kubernetes.
If you'd like to configure a highly-available setup, there are community-contributed [Helm Charts](https://helm.sh/) and YAML files which allow Dify to be deployed on Kubernetes.
- [Helm Chart by @LeoQuote](https://github.com/douban/charts/tree/master/charts/dify)
- [Helm Chart by @BorisPolonsky](https://github.com/BorisPolonsky/dify-helm)

View File

@ -659,11 +659,6 @@ INNER_API_KEY_FOR_PLUGIN=QaHbTe77CtuXmsfyhR7+vRjI/+XbV1AaFy691iy+kGDv2Jvy0/eAh8Y
MARKETPLACE_ENABLED=true
MARKETPLACE_API_URL=https://marketplace.dify.ai
# Creators Platform configuration
CREATORS_PLATFORM_FEATURES_ENABLED=true
CREATORS_PLATFORM_API_URL=https://creators.dify.ai
CREATORS_PLATFORM_OAUTH_CLIENT_ID=
# Endpoint configuration
ENDPOINT_URL_TEMPLATE=http://localhost:5002/e/{hook_id}
@ -714,6 +709,22 @@ SWAGGER_UI_PATH=/swagger-ui.html
# Set to false to export dataset IDs as plain text for easier cross-environment import
DSL_EXPORT_ENCRYPT_DATASET_ID=true
# Suggested Questions After Answer Configuration
# These environment variables allow customization of the suggested questions feature
#
# Custom prompt for generating suggested questions (optional)
# If not set, uses the default prompt that generates 3 questions under 20 characters each
# Example: "Please help me predict the five most likely technical follow-up questions a developer would ask. Focus on implementation details, best practices, and architecture considerations. Keep each question between 40-60 characters. Output must be JSON array: [\"question1\",\"question2\",\"question3\",\"question4\",\"question5\"]"
# SUGGESTED_QUESTIONS_PROMPT=
# Maximum number of tokens for suggested questions generation (default: 256)
# Adjust this value for longer questions or more questions
# SUGGESTED_QUESTIONS_MAX_TOKENS=256
# Temperature for suggested questions generation (default: 0.0)
# Higher values (0.5-1.0) produce more creative questions, lower values (0.0-0.3) produce more focused questions
# SUGGESTED_QUESTIONS_TEMPERATURE=0
# Tenant isolated task queue configuration
TENANT_ISOLATED_TASK_CONCURRENCY=1

View File

@ -101,11 +101,3 @@ The scripts resolve paths relative to their location, so you can run them from a
uv run ruff format ./ # Format code
uv run basedpyright . # Type checking
```
## Generate TS stub
```
uv run dev/generate_swagger_specs.py --output-dir openapi
```
use https://jsontotable.org/openapi-to-typescript to convert to typescript

View File

@ -113,18 +113,8 @@ def create_tenant(email: str, language: str | None = None, name: str | None = No
# Validates name encoding for non-Latin characters.
name = name.strip().encode("utf-8").decode("utf-8") if name else None
# Generate a random password that satisfies the password policy.
# The iteration limit guards against infinite loops caused by unexpected bugs in valid_password.
for _ in range(100):
new_password = secrets.token_urlsafe(16)
try:
valid_password(new_password)
break
except Exception:
continue
else:
click.echo(click.style("Failed to generate a valid password. Please try again.", fg="red"))
return
# generate random password
new_password = secrets.token_urlsafe(16)
# register account
account = RegisterService.register(

View File

@ -11,7 +11,7 @@ from configs import dify_config
from core.helper import encrypter
from core.plugin.entities.plugin_daemon import CredentialType
from core.plugin.impl.plugin import PluginInstaller
from core.tools.utils.system_encryption import encrypt_system_params
from core.tools.utils.system_oauth_encryption import encrypt_system_oauth_params
from extensions.ext_database import db
from models import Tenant
from models.oauth import DatasourceOauthParamConfig, DatasourceProvider
@ -44,7 +44,7 @@ def setup_system_tool_oauth_client(provider, client_params):
click.echo(click.style(f"Encrypting client params: {client_params}", fg="yellow"))
click.echo(click.style(f"Using SECRET_KEY: `{dify_config.SECRET_KEY}`", fg="yellow"))
oauth_client_params = encrypt_system_params(client_params_dict)
oauth_client_params = encrypt_system_oauth_params(client_params_dict)
click.echo(click.style("Client params encrypted successfully.", fg="green"))
except Exception as e:
click.echo(click.style(f"Error parsing client params: {str(e)}", fg="red"))
@ -94,7 +94,7 @@ def setup_system_trigger_oauth_client(provider, client_params):
click.echo(click.style(f"Encrypting client params: {client_params}", fg="yellow"))
click.echo(click.style(f"Using SECRET_KEY: `{dify_config.SECRET_KEY}`", fg="yellow"))
oauth_client_params = encrypt_system_params(client_params_dict)
oauth_client_params = encrypt_system_oauth_params(client_params_dict)
click.echo(click.style("Client params encrypted successfully.", fg="green"))
except Exception as e:
click.echo(click.style(f"Error parsing client params: {str(e)}", fg="red"))

View File

@ -287,27 +287,6 @@ class MarketplaceConfig(BaseSettings):
)
class CreatorsPlatformConfig(BaseSettings):
"""
Configuration for Creators Platform integration
"""
CREATORS_PLATFORM_FEATURES_ENABLED: bool = Field(
description="Enable or disable Creators Platform features",
default=True,
)
CREATORS_PLATFORM_API_URL: HttpUrl = Field(
description="Creators Platform API URL",
default=HttpUrl("https://creators.dify.ai"),
)
CREATORS_PLATFORM_OAUTH_CLIENT_ID: str = Field(
description="OAuth client ID for Creators Platform integration",
default="",
)
class EndpointConfig(BaseSettings):
"""
Configuration for various application endpoints and URLs
@ -1400,7 +1379,6 @@ class FeatureConfig(
AuthConfig, # Changed from OAuthConfig to AuthConfig
BillingConfig,
CodeExecutionSandboxConfig,
CreatorsPlatformConfig,
TriggerConfig,
AsyncWorkflowConfig,
PluginConfig,

View File

@ -1,6 +0,0 @@
from pydantic import BaseModel, JsonValue
class HumanInputFormSubmitPayload(BaseModel):
inputs: dict[str, JsonValue]
action: str

View File

@ -692,32 +692,6 @@ class AppExportApi(Resource):
return payload.model_dump(mode="json")
@console_ns.route("/apps/<uuid:app_id>/publish-to-creators-platform")
class AppPublishToCreatorsPlatformApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=None)
@edit_permission_required
def post(self, app_model):
"""Publish app to Creators Platform"""
from configs import dify_config
from core.helper.creators import get_redirect_url, upload_dsl
if not dify_config.CREATORS_PLATFORM_FEATURES_ENABLED:
return {"error": "Creators Platform features are not enabled"}, 403
current_user, _ = current_account_with_tenant()
dsl_content = AppDslService.export_dsl(app_model=app_model, include_secret=False)
dsl_bytes = dsl_content.encode("utf-8")
claim_code = upload_dsl(dsl_bytes)
redirect_url = get_redirect_url(str(current_user.id), claim_code)
return {"redirect_url": redirect_url}
@console_ns.route("/apps/<uuid:app_id>/name")
class AppNameApi(Resource):
@console_ns.doc("check_app_name")

View File

@ -38,48 +38,6 @@ class HitTestingPayload(BaseModel):
class DatasetsHitTestingBase:
@staticmethod
def _normalize_hit_testing_query(query: Any) -> str:
"""Return the user-visible query string from legacy and current response shapes."""
if isinstance(query, str):
return query
if isinstance(query, dict):
content = query.get("content")
if isinstance(content, str):
return content
raise ValueError("Invalid hit testing query response")
@staticmethod
def _normalize_hit_testing_records(records: Any) -> list[dict[str, Any]]:
"""Coerce nullable collection fields into lists before response validation."""
if not isinstance(records, list):
return []
normalized_records: list[dict[str, Any]] = []
for record in records:
if not isinstance(record, dict):
continue
normalized_record = dict(record)
segment = normalized_record.get("segment")
if isinstance(segment, dict):
normalized_segment = dict(segment)
if normalized_segment.get("keywords") is None:
normalized_segment["keywords"] = []
normalized_record["segment"] = normalized_segment
if normalized_record.get("child_chunks") is None:
normalized_record["child_chunks"] = []
if normalized_record.get("files") is None:
normalized_record["files"] = []
normalized_records.append(normalized_record)
return normalized_records
@staticmethod
def get_and_validate_dataset(dataset_id: str):
assert isinstance(current_user, Account)
@ -117,12 +75,7 @@ class DatasetsHitTestingBase:
attachment_ids=args.get("attachment_ids"),
limit=10,
)
return {
"query": DatasetsHitTestingBase._normalize_hit_testing_query(response.get("query")),
"records": DatasetsHitTestingBase._normalize_hit_testing_records(
marshal(response.get("records", []), hit_testing_record_fields)
),
}
return {"query": response["query"], "records": marshal(response["records"], hit_testing_record_fields)}
except services.errors.index.IndexNotInitializedError:
raise DatasetNotInitializedError()
except ProviderTokenNotInitError as ex:

View File

@ -8,10 +8,10 @@ from collections.abc import Generator
from flask import Response, jsonify, request
from flask_restx import Resource
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.orm import Session, sessionmaker
from controllers.common.human_input import HumanInputFormSubmitPayload
from controllers.console import console_ns
from controllers.console.wraps import account_initialization_required, setup_required
from controllers.web.error import InvalidArgumentError, NotFoundError
@ -34,6 +34,11 @@ from services.workflow_event_snapshot_service import build_workflow_event_stream
logger = logging.getLogger(__name__)
class HumanInputFormSubmitPayload(BaseModel):
inputs: dict
action: str
def _jsonify_form_definition(form: Form) -> Response:
payload = form.get_definition().model_dump()
payload["expiration_time"] = int(form.expiration_time.timestamp())

View File

@ -37,11 +37,6 @@ class TagBindingRemovePayload(BaseModel):
type: TagType = Field(description="Tag type")
class TagBindingItemDeletePayload(BaseModel):
target_id: str = Field(description="Target ID to unbind tag from")
type: TagType = Field(description="Tag type")
class TagListQueryParam(BaseModel):
type: Literal["knowledge", "app", ""] = Field("", description="Tag type filter")
keyword: str | None = Field(None, description="Search keyword")
@ -75,7 +70,6 @@ register_schema_models(
TagBasePayload,
TagBindingPayload,
TagBindingRemovePayload,
TagBindingItemDeletePayload,
TagListQueryParam,
TagResponse,
)
@ -158,107 +152,41 @@ class TagUpdateDeleteApi(Resource):
return "", 204
def _require_tag_binding_edit_permission() -> None:
"""
Ensure the current account can edit tag bindings.
Tag binding operations are allowed for users who can edit resources (app/dataset) within the current tenant.
"""
current_user, _ = current_account_with_tenant()
# The role of the current user in the ta table must be admin, owner, editor, or dataset_operator
if not (current_user.has_edit_permission or current_user.is_dataset_editor):
raise Forbidden()
def _create_tag_bindings() -> tuple[dict[str, str], int]:
_require_tag_binding_edit_permission()
payload = TagBindingPayload.model_validate(console_ns.payload or {})
TagService.save_tag_binding(
TagBindingCreatePayload(
tag_ids=payload.tag_ids,
target_id=payload.target_id,
type=payload.type,
)
)
return {"result": "success"}, 200
def _remove_tag_binding() -> tuple[dict[str, str], int]:
_require_tag_binding_edit_permission()
payload = TagBindingRemovePayload.model_validate(console_ns.payload or {})
TagService.delete_tag_binding(
TagBindingDeletePayload(
tag_id=payload.tag_id,
target_id=payload.target_id,
type=payload.type,
)
)
return {"result": "success"}, 200
@console_ns.route("/tag-bindings")
class TagBindingCollectionApi(Resource):
"""Canonical collection resource for tag binding creation."""
@console_ns.doc("create_tag_binding")
@console_ns.route("/tag-bindings/create")
class TagBindingCreateApi(Resource):
@console_ns.expect(console_ns.models[TagBindingPayload.__name__])
@setup_required
@login_required
@account_initialization_required
def post(self):
return _create_tag_bindings()
current_user, _ = current_account_with_tenant()
# The role of the current user in the ta table must be admin, owner, editor, or dataset_operator
if not (current_user.has_edit_permission or current_user.is_dataset_editor):
raise Forbidden()
@console_ns.route("/tag-bindings/<uuid:id>")
class TagBindingItemApi(Resource):
"""Canonical item resource for tag binding deletion."""
@console_ns.doc("delete_tag_binding")
@console_ns.doc(params={"id": "Tag ID"})
@console_ns.expect(console_ns.models[TagBindingItemDeletePayload.__name__])
@setup_required
@login_required
@account_initialization_required
def delete(self, id):
_require_tag_binding_edit_permission()
payload = TagBindingItemDeletePayload.model_validate(console_ns.payload or {})
TagService.delete_tag_binding(
TagBindingDeletePayload(
tag_id=str(id),
target_id=payload.target_id,
type=payload.type,
)
payload = TagBindingPayload.model_validate(console_ns.payload or {})
TagService.save_tag_binding(
TagBindingCreatePayload(tag_ids=payload.tag_ids, target_id=payload.target_id, type=payload.type)
)
return {"result": "success"}, 200
@console_ns.route("/tag-bindings/create")
class DeprecatedTagBindingCreateApi(Resource):
"""Deprecated verb-based alias for tag binding creation."""
@console_ns.doc("create_tag_binding_deprecated")
@console_ns.doc(deprecated=True)
@console_ns.doc(description="Deprecated legacy alias. Use POST /tag-bindings instead.")
@console_ns.expect(console_ns.models[TagBindingPayload.__name__])
@setup_required
@login_required
@account_initialization_required
def post(self):
return _create_tag_bindings()
@console_ns.route("/tag-bindings/remove")
class DeprecatedTagBindingRemoveApi(Resource):
"""Deprecated verb-based alias for tag binding deletion."""
@console_ns.doc("delete_tag_binding_deprecated")
@console_ns.doc(deprecated=True)
@console_ns.doc(description="Deprecated legacy alias. Use DELETE /tag-bindings/{id} instead.")
class TagBindingDeleteApi(Resource):
@console_ns.expect(console_ns.models[TagBindingRemovePayload.__name__])
@setup_required
@login_required
@account_initialization_required
def post(self):
return _remove_tag_binding()
current_user, _ = current_account_with_tenant()
# The role of the current user in the ta table must be admin, owner, editor, or dataset_operator
if not (current_user.has_edit_permission or current_user.is_dataset_editor):
raise Forbidden()
payload = TagBindingRemovePayload.model_validate(console_ns.payload or {})
TagService.delete_tag_binding(
TagBindingDeletePayload(tag_id=payload.tag_id, target_id=payload.target_id, type=payload.type)
)
return {"result": "success"}, 200

View File

@ -1,11 +1,3 @@
"""Console workspace endpoint controllers.
This module exposes workspace-scoped plugin endpoint management APIs. The
canonical write routes follow resource-oriented paths, while the historical
verb-based aliases stay available as deprecated resources so OpenAPI metadata
marks only the legacy paths as deprecated.
"""
from typing import Any
from flask import request
@ -33,12 +25,7 @@ class EndpointIdPayload(BaseModel):
endpoint_id: str
class EndpointUpdatePayload(BaseModel):
settings: dict[str, Any]
name: str = Field(min_length=1)
class LegacyEndpointUpdatePayload(EndpointIdPayload):
class EndpointUpdatePayload(EndpointIdPayload):
settings: dict[str, Any]
name: str = Field(min_length=1)
@ -89,7 +76,6 @@ register_schema_models(
EndpointCreatePayload,
EndpointIdPayload,
EndpointUpdatePayload,
LegacyEndpointUpdatePayload,
EndpointListQuery,
EndpointListForPluginQuery,
EndpointCreateResponse,
@ -102,60 +88,8 @@ register_schema_models(
)
def _create_endpoint() -> dict[str, bool]:
"""Create a plugin endpoint for the current workspace."""
user, tenant_id = current_account_with_tenant()
args = EndpointCreatePayload.model_validate(console_ns.payload)
try:
return {
"success": EndpointService.create_endpoint(
tenant_id=tenant_id,
user_id=user.id,
plugin_unique_identifier=args.plugin_unique_identifier,
name=args.name,
settings=args.settings,
)
}
except PluginPermissionDeniedError as e:
raise ValueError(e.description) from e
def _update_endpoint(endpoint_id: str) -> dict[str, bool]:
"""Update a plugin endpoint identified by the canonical path parameter."""
user, tenant_id = current_account_with_tenant()
args = EndpointUpdatePayload.model_validate(console_ns.payload)
return {
"success": EndpointService.update_endpoint(
tenant_id=tenant_id,
user_id=user.id,
endpoint_id=endpoint_id,
name=args.name,
settings=args.settings,
)
}
def _delete_endpoint(endpoint_id: str) -> dict[str, bool]:
"""Delete a plugin endpoint identified by the canonical path parameter."""
user, tenant_id = current_account_with_tenant()
return {
"success": EndpointService.delete_endpoint(
tenant_id=tenant_id,
user_id=user.id,
endpoint_id=endpoint_id,
)
}
@console_ns.route("/workspaces/current/endpoints")
class EndpointCollectionApi(Resource):
"""Canonical collection resource for endpoint creation."""
@console_ns.route("/workspaces/current/endpoints/create")
class EndpointCreateApi(Resource):
@console_ns.doc("create_endpoint")
@console_ns.doc(description="Create a new plugin endpoint")
@console_ns.expect(console_ns.models[EndpointCreatePayload.__name__])
@ -170,33 +104,22 @@ class EndpointCollectionApi(Resource):
@is_admin_or_owner_required
@account_initialization_required
def post(self):
return _create_endpoint()
user, tenant_id = current_account_with_tenant()
args = EndpointCreatePayload.model_validate(console_ns.payload)
@console_ns.route("/workspaces/current/endpoints/create")
class DeprecatedEndpointCreateApi(Resource):
"""Deprecated verb-based alias for endpoint creation."""
@console_ns.doc("create_endpoint_deprecated")
@console_ns.doc(deprecated=True)
@console_ns.doc(
description=(
"Deprecated legacy alias for creating a plugin endpoint. Use POST /workspaces/current/endpoints instead."
)
)
@console_ns.expect(console_ns.models[EndpointCreatePayload.__name__])
@console_ns.response(
200,
"Endpoint created successfully",
console_ns.models[EndpointCreateResponse.__name__],
)
@console_ns.response(403, "Admin privileges required")
@setup_required
@login_required
@is_admin_or_owner_required
@account_initialization_required
def post(self):
return _create_endpoint()
try:
return {
"success": EndpointService.create_endpoint(
tenant_id=tenant_id,
user_id=user.id,
plugin_unique_identifier=args.plugin_unique_identifier,
name=args.name,
settings=args.settings,
)
}
except PluginPermissionDeniedError as e:
raise ValueError(e.description) from e
@console_ns.route("/workspaces/current/endpoints/list")
@ -267,56 +190,10 @@ class EndpointListForSinglePluginApi(Resource):
)
@console_ns.route("/workspaces/current/endpoints/<string:id>")
class EndpointItemApi(Resource):
"""Canonical item resource for endpoint updates and deletion."""
@console_ns.route("/workspaces/current/endpoints/delete")
class EndpointDeleteApi(Resource):
@console_ns.doc("delete_endpoint")
@console_ns.doc(description="Delete a plugin endpoint")
@console_ns.doc(params={"id": {"description": "Endpoint ID", "type": "string", "required": True}})
@console_ns.response(
200,
"Endpoint deleted successfully",
console_ns.models[EndpointDeleteResponse.__name__],
)
@console_ns.response(403, "Admin privileges required")
@setup_required
@login_required
@is_admin_or_owner_required
@account_initialization_required
def delete(self, id: str):
return _delete_endpoint(endpoint_id=id)
@console_ns.doc("update_endpoint")
@console_ns.doc(description="Update a plugin endpoint")
@console_ns.expect(console_ns.models[EndpointUpdatePayload.__name__])
@console_ns.doc(params={"id": {"description": "Endpoint ID", "type": "string", "required": True}})
@console_ns.response(
200,
"Endpoint updated successfully",
console_ns.models[EndpointUpdateResponse.__name__],
)
@console_ns.response(403, "Admin privileges required")
@setup_required
@login_required
@is_admin_or_owner_required
@account_initialization_required
def patch(self, id: str):
return _update_endpoint(endpoint_id=id)
@console_ns.route("/workspaces/current/endpoints/delete")
class DeprecatedEndpointDeleteApi(Resource):
"""Deprecated verb-based alias for endpoint deletion."""
@console_ns.doc("delete_endpoint_deprecated")
@console_ns.doc(deprecated=True)
@console_ns.doc(
description=(
"Deprecated legacy alias for deleting a plugin endpoint. "
"Use DELETE /workspaces/current/endpoints/{id} instead."
)
)
@console_ns.expect(console_ns.models[EndpointIdPayload.__name__])
@console_ns.response(
200,
@ -329,23 +206,22 @@ class DeprecatedEndpointDeleteApi(Resource):
@is_admin_or_owner_required
@account_initialization_required
def post(self):
user, tenant_id = current_account_with_tenant()
args = EndpointIdPayload.model_validate(console_ns.payload)
return _delete_endpoint(endpoint_id=args.endpoint_id)
return {
"success": EndpointService.delete_endpoint(
tenant_id=tenant_id, user_id=user.id, endpoint_id=args.endpoint_id
)
}
@console_ns.route("/workspaces/current/endpoints/update")
class DeprecatedEndpointUpdateApi(Resource):
"""Deprecated verb-based alias for endpoint updates."""
@console_ns.doc("update_endpoint_deprecated")
@console_ns.doc(deprecated=True)
@console_ns.doc(
description=(
"Deprecated legacy alias for updating a plugin endpoint. "
"Use PATCH /workspaces/current/endpoints/{id} instead."
)
)
@console_ns.expect(console_ns.models[LegacyEndpointUpdatePayload.__name__])
class EndpointUpdateApi(Resource):
@console_ns.doc("update_endpoint")
@console_ns.doc(description="Update a plugin endpoint")
@console_ns.expect(console_ns.models[EndpointUpdatePayload.__name__])
@console_ns.response(
200,
"Endpoint updated successfully",
@ -357,8 +233,19 @@ class DeprecatedEndpointUpdateApi(Resource):
@is_admin_or_owner_required
@account_initialization_required
def post(self):
args = LegacyEndpointUpdatePayload.model_validate(console_ns.payload)
return _update_endpoint(endpoint_id=args.endpoint_id)
user, tenant_id = current_account_with_tenant()
args = EndpointUpdatePayload.model_validate(console_ns.payload)
return {
"success": EndpointService.update_endpoint(
tenant_id=tenant_id,
user_id=user.id,
endpoint_id=args.endpoint_id,
name=args.name,
settings=args.settings,
)
}
@console_ns.route("/workspaces/current/endpoints/enable")

View File

@ -8,12 +8,13 @@ paused human input forms in workflow/chatflow runs.
import json
import logging
from datetime import datetime
from typing import Any
from flask import Response
from flask_restx import Resource
from werkzeug.exceptions import BadRequest, NotFound
from pydantic import BaseModel
from werkzeug.exceptions import InternalServerError, NotFound
from controllers.common.human_input import HumanInputFormSubmitPayload
from controllers.common.schema import register_schema_models
from controllers.service_api import service_api_ns
from controllers.service_api.wraps import FetchUserArg, WhereisUserArg, validate_app_token
@ -25,6 +26,11 @@ from services.human_input_service import Form, FormNotFoundError, HumanInputServ
logger = logging.getLogger(__name__)
class HumanInputFormSubmitPayload(BaseModel):
inputs: dict[str, Any]
action: str
register_schema_models(service_api_ns, HumanInputFormSubmitPayload)
@ -121,7 +127,7 @@ class WorkflowHumanInputFormApi(Resource):
recipient_type = form.recipient_type
if recipient_type is None:
logger.warning("Recipient type is None for form, form_id=%s", form.id)
raise BadRequest("Form recipient type is invalid")
raise InternalServerError("Form recipient type is invalid")
try:
service.submit_form_by_token(

View File

@ -18,8 +18,6 @@ from core.app.apps.base_app_generator import BaseAppGenerator
from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter
from core.app.apps.message_generator import MessageGenerator
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
from core.app.entities.task_entities import StreamEvent
from core.workflow.human_input_policy import HumanInputSurface
from extensions.ext_database import db
from models.enums import CreatorUserRole
from models.model import App, AppMode, EndUser
@ -37,14 +35,8 @@ class WorkflowEventsApi(Resource):
params={
"task_id": "Workflow run ID",
"user": "End user identifier (query param)",
"include_state_snapshot": (
"Whether to replay from persisted state snapshot, "
'specify `"true"` to include a status snapshot of executed nodes'
),
"continue_on_pause": (
"Whether to keep the stream open across workflow_paused events,"
'specify `"true"` to keep the stream open for `workflow_paused` events.'
),
"include_state_snapshot": "Whether to replay from persisted state snapshot",
"continue_on_pause": "Whether to keep the stream open across workflow_paused events",
}
)
@service_api_ns.doc(
@ -107,7 +99,7 @@ class WorkflowEventsApi(Resource):
include_state_snapshot = request.args.get("include_state_snapshot", "false").lower() == "true"
continue_on_pause = request.args.get("continue_on_pause", "false").lower() == "true"
terminal_events: list[StreamEvent] | None = [] if continue_on_pause else None
terminal_events = ["workflow_finished"] if continue_on_pause else None
def _generate_stream_events():
if include_state_snapshot:
@ -118,7 +110,6 @@ class WorkflowEventsApi(Resource):
tenant_id=app_model.tenant_id,
app_id=app_model.id,
session_maker=session_maker,
human_input_surface=HumanInputSurface.SERVICE_API,
close_on_pause=not continue_on_pause,
)
)

View File

@ -1,12 +1,4 @@
"""Service API endpoints for dataset document management.
The canonical Service API paths use hyphenated route segments. Legacy underscore
aliases remain registered for backward compatibility, but they must stay marked
deprecated in generated API docs so clients migrate toward the canonical paths.
"""
import json
from collections.abc import Mapping
from contextlib import ExitStack
from typing import Self
from uuid import UUID
@ -125,137 +117,12 @@ register_schema_models(
)
def _create_document_by_text(tenant_id: str, dataset_id: UUID) -> tuple[Mapping[str, object], int]:
"""Create a document from text for both canonical and legacy routes."""
payload = DocumentTextCreatePayload.model_validate(service_api_ns.payload or {})
args = payload.model_dump(exclude_none=True)
dataset_id_str = str(dataset_id)
tenant_id_str = str(tenant_id)
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id_str, Dataset.id == dataset_id_str).limit(1)
)
if not dataset:
raise ValueError("Dataset does not exist.")
if not dataset.indexing_technique and not args["indexing_technique"]:
raise ValueError("indexing_technique is required.")
embedding_model_provider = payload.embedding_model_provider
embedding_model = payload.embedding_model
if embedding_model_provider and embedding_model:
DatasetService.check_embedding_model_setting(tenant_id_str, embedding_model_provider, embedding_model)
retrieval_model = payload.retrieval_model
if (
retrieval_model
and retrieval_model.reranking_model
and retrieval_model.reranking_model.reranking_provider_name
and retrieval_model.reranking_model.reranking_model_name
):
DatasetService.check_reranking_model_setting(
tenant_id_str,
retrieval_model.reranking_model.reranking_provider_name,
retrieval_model.reranking_model.reranking_model_name,
)
if not current_user:
raise ValueError("current_user is required")
upload_file = FileService(db.engine).upload_text(
text=payload.text, text_name=payload.name, user_id=current_user.id, tenant_id=tenant_id_str
)
data_source = {
"type": "upload_file",
"info_list": {"data_source_type": "upload_file", "file_info_list": {"file_ids": [upload_file.id]}},
}
args["data_source"] = data_source
knowledge_config = KnowledgeConfig.model_validate(args)
DocumentService.document_create_args_validate(knowledge_config)
if not current_user:
raise ValueError("current_user is required")
try:
documents, batch = DocumentService.save_document_with_dataset_id(
dataset=dataset,
knowledge_config=knowledge_config,
account=current_user,
dataset_process_rule=dataset.latest_process_rule if "process_rule" not in args else None,
created_from="api",
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
document = documents[0]
documents_and_batch_fields = {"document": marshal(document, document_fields), "batch": batch}
return documents_and_batch_fields, 200
def _update_document_by_text(tenant_id: str, dataset_id: UUID, document_id: UUID) -> tuple[Mapping[str, object], int]:
"""Update a document from text for both canonical and legacy routes."""
payload = DocumentTextUpdate.model_validate(service_api_ns.payload or {})
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == str(dataset_id)).limit(1)
)
args = payload.model_dump(exclude_none=True)
if not dataset:
raise ValueError("Dataset does not exist.")
retrieval_model = payload.retrieval_model
if (
retrieval_model
and retrieval_model.reranking_model
and retrieval_model.reranking_model.reranking_provider_name
and retrieval_model.reranking_model.reranking_model_name
):
DatasetService.check_reranking_model_setting(
tenant_id,
retrieval_model.reranking_model.reranking_provider_name,
retrieval_model.reranking_model.reranking_model_name,
)
# indexing_technique is already set in dataset since this is an update
args["indexing_technique"] = dataset.indexing_technique
if args.get("text"):
text = args.get("text")
name = args.get("name")
if not current_user:
raise ValueError("current_user is required")
upload_file = FileService(db.engine).upload_text(
text=str(text), text_name=str(name), user_id=current_user.id, tenant_id=tenant_id
)
data_source = {
"type": "upload_file",
"info_list": {"data_source_type": "upload_file", "file_info_list": {"file_ids": [upload_file.id]}},
}
args["data_source"] = data_source
args["original_document_id"] = str(document_id)
knowledge_config = KnowledgeConfig.model_validate(args)
DocumentService.document_create_args_validate(knowledge_config)
try:
documents, batch = DocumentService.save_document_with_dataset_id(
dataset=dataset,
knowledge_config=knowledge_config,
account=current_user,
dataset_process_rule=dataset.latest_process_rule if "process_rule" not in args else None,
created_from="api",
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
document = documents[0]
documents_and_batch_fields = {"document": marshal(document, document_fields), "batch": batch}
return documents_and_batch_fields, 200
@service_api_ns.route("/datasets/<uuid:dataset_id>/document/create-by-text")
@service_api_ns.route(
"/datasets/<uuid:dataset_id>/document/create_by_text",
"/datasets/<uuid:dataset_id>/document/create-by-text",
)
class DocumentAddByTextApi(DatasetApiResource):
"""Resource for the canonical text document creation route."""
"""Resource for documents."""
@service_api_ns.expect(service_api_ns.models[DocumentTextCreatePayload.__name__])
@service_api_ns.doc("create_document_by_text")
@ -271,43 +138,81 @@ class DocumentAddByTextApi(DatasetApiResource):
@cloud_edition_billing_resource_check("vector_space", "dataset")
@cloud_edition_billing_resource_check("documents", "dataset")
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id: str, dataset_id: UUID):
def post(self, tenant_id, dataset_id):
"""Create document by text."""
return _create_document_by_text(tenant_id=tenant_id, dataset_id=dataset_id)
payload = DocumentTextCreatePayload.model_validate(service_api_ns.payload or {})
args = payload.model_dump(exclude_none=True)
@service_api_ns.route("/datasets/<uuid:dataset_id>/document/create_by_text")
class DeprecatedDocumentAddByTextApi(DatasetApiResource):
"""Deprecated resource alias for text document creation."""
@service_api_ns.expect(service_api_ns.models[DocumentTextCreatePayload.__name__])
@service_api_ns.doc("create_document_by_text_deprecated")
@service_api_ns.doc(deprecated=True)
@service_api_ns.doc(
description=(
"Deprecated legacy alias for creating a new document by providing text content. "
"Use /datasets/{dataset_id}/document/create-by-text instead."
dataset_id = str(dataset_id)
tenant_id = str(tenant_id)
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).limit(1)
)
)
@service_api_ns.doc(params={"dataset_id": "Dataset ID"})
@service_api_ns.doc(
responses={
200: "Document created successfully",
401: "Unauthorized - invalid API token",
400: "Bad request - invalid parameters",
if not dataset:
raise ValueError("Dataset does not exist.")
if not dataset.indexing_technique and not args["indexing_technique"]:
raise ValueError("indexing_technique is required.")
embedding_model_provider = payload.embedding_model_provider
embedding_model = payload.embedding_model
if embedding_model_provider and embedding_model:
DatasetService.check_embedding_model_setting(tenant_id, embedding_model_provider, embedding_model)
retrieval_model = payload.retrieval_model
if (
retrieval_model
and retrieval_model.reranking_model
and retrieval_model.reranking_model.reranking_provider_name
and retrieval_model.reranking_model.reranking_model_name
):
DatasetService.check_reranking_model_setting(
tenant_id,
retrieval_model.reranking_model.reranking_provider_name,
retrieval_model.reranking_model.reranking_model_name,
)
if not current_user:
raise ValueError("current_user is required")
upload_file = FileService(db.engine).upload_text(
text=payload.text, text_name=payload.name, user_id=current_user.id, tenant_id=tenant_id
)
data_source = {
"type": "upload_file",
"info_list": {"data_source_type": "upload_file", "file_info_list": {"file_ids": [upload_file.id]}},
}
)
@cloud_edition_billing_resource_check("vector_space", "dataset")
@cloud_edition_billing_resource_check("documents", "dataset")
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id: str, dataset_id: UUID):
"""Create document by text through the deprecated underscore alias."""
return _create_document_by_text(tenant_id=tenant_id, dataset_id=dataset_id)
args["data_source"] = data_source
knowledge_config = KnowledgeConfig.model_validate(args)
# validate args
DocumentService.document_create_args_validate(knowledge_config)
if not current_user:
raise ValueError("current_user is required")
try:
documents, batch = DocumentService.save_document_with_dataset_id(
dataset=dataset,
knowledge_config=knowledge_config,
account=current_user,
dataset_process_rule=dataset.latest_process_rule if "process_rule" not in args else None,
created_from="api",
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
document = documents[0]
documents_and_batch_fields = {"document": marshal(document, document_fields), "batch": batch}
return documents_and_batch_fields, 200
@service_api_ns.route("/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/update-by-text")
@service_api_ns.route(
"/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/update_by_text",
"/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/update-by-text",
)
class DocumentUpdateByTextApi(DatasetApiResource):
"""Resource for the canonical text document update route."""
"""Resource for update documents."""
@service_api_ns.expect(service_api_ns.models[DocumentTextUpdate.__name__])
@service_api_ns.doc("update_document_by_text")
@ -324,35 +229,62 @@ class DocumentUpdateByTextApi(DatasetApiResource):
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id: str, dataset_id: UUID, document_id: UUID):
"""Update document by text."""
return _update_document_by_text(tenant_id=tenant_id, dataset_id=dataset_id, document_id=document_id)
@service_api_ns.route("/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/update_by_text")
class DeprecatedDocumentUpdateByTextApi(DatasetApiResource):
"""Deprecated resource alias for text document updates."""
@service_api_ns.expect(service_api_ns.models[DocumentTextUpdate.__name__])
@service_api_ns.doc("update_document_by_text_deprecated")
@service_api_ns.doc(deprecated=True)
@service_api_ns.doc(
description=(
"Deprecated legacy alias for updating an existing document by providing text content. "
"Use /datasets/{dataset_id}/documents/{document_id}/update-by-text instead."
payload = DocumentTextUpdate.model_validate(service_api_ns.payload or {})
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == str(dataset_id)).limit(1)
)
)
@service_api_ns.doc(params={"dataset_id": "Dataset ID", "document_id": "Document ID"})
@service_api_ns.doc(
responses={
200: "Document updated successfully",
401: "Unauthorized - invalid API token",
404: "Document not found",
}
)
@cloud_edition_billing_resource_check("vector_space", "dataset")
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id: str, dataset_id: UUID, document_id: UUID):
"""Update document by text through the deprecated underscore alias."""
return _update_document_by_text(tenant_id=tenant_id, dataset_id=dataset_id, document_id=document_id)
args = payload.model_dump(exclude_none=True)
if not dataset:
raise ValueError("Dataset does not exist.")
retrieval_model = payload.retrieval_model
if (
retrieval_model
and retrieval_model.reranking_model
and retrieval_model.reranking_model.reranking_provider_name
and retrieval_model.reranking_model.reranking_model_name
):
DatasetService.check_reranking_model_setting(
tenant_id,
retrieval_model.reranking_model.reranking_provider_name,
retrieval_model.reranking_model.reranking_model_name,
)
# indexing_technique is already set in dataset since this is an update
args["indexing_technique"] = dataset.indexing_technique
if args.get("text"):
text = args.get("text")
name = args.get("name")
if not current_user:
raise ValueError("current_user is required")
upload_file = FileService(db.engine).upload_text(
text=str(text), text_name=str(name), user_id=current_user.id, tenant_id=tenant_id
)
data_source = {
"type": "upload_file",
"info_list": {"data_source_type": "upload_file", "file_info_list": {"file_ids": [upload_file.id]}},
}
args["data_source"] = data_source
# validate args
args["original_document_id"] = str(document_id)
knowledge_config = KnowledgeConfig.model_validate(args)
DocumentService.document_create_args_validate(knowledge_config)
try:
documents, batch = DocumentService.save_document_with_dataset_id(
dataset=dataset,
knowledge_config=knowledge_config,
account=current_user,
dataset_process_rule=dataset.latest_process_rule if "process_rule" not in args else None,
created_from="api",
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
document = documents[0]
documents_and_batch_fields = {"document": marshal(document, document_fields), "batch": batch}
return documents_and_batch_fields, 200
@service_api_ns.route(
@ -468,98 +400,15 @@ class DocumentAddByFileApi(DatasetApiResource):
return documents_and_batch_fields, 200
def _update_document_by_file(tenant_id: str, dataset_id: UUID, document_id: UUID) -> tuple[Mapping[str, object], int]:
"""Update a document from an uploaded file for canonical and deprecated routes."""
dataset_id_str = str(dataset_id)
tenant_id_str = str(tenant_id)
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id_str, Dataset.id == dataset_id_str).limit(1)
)
if not dataset:
raise ValueError("Dataset does not exist.")
if dataset.provider == "external":
raise ValueError("External datasets are not supported.")
args: dict[str, object] = {}
if "data" in request.form:
args = json.loads(request.form["data"])
if "doc_form" not in args:
args["doc_form"] = dataset.chunk_structure or "text_model"
if "doc_language" not in args:
args["doc_language"] = "English"
# indexing_technique is already set in dataset since this is an update
args["indexing_technique"] = dataset.indexing_technique
if "file" in request.files:
# save file info
file = request.files["file"]
if len(request.files) > 1:
raise TooManyFilesError()
if not file.filename:
raise FilenameNotExistsError
if not current_user:
raise ValueError("current_user is required")
try:
upload_file = FileService(db.engine).upload_file(
filename=file.filename,
content=file.read(),
mimetype=file.mimetype,
user=current_user,
source="datasets",
)
except services.errors.file.FileTooLargeError as file_too_large_error:
raise FileTooLargeError(file_too_large_error.description)
except services.errors.file.UnsupportedFileTypeError:
raise UnsupportedFileTypeError()
data_source = {
"type": "upload_file",
"info_list": {"data_source_type": "upload_file", "file_info_list": {"file_ids": [upload_file.id]}},
}
args["data_source"] = data_source
# validate args
args["original_document_id"] = str(document_id)
knowledge_config = KnowledgeConfig.model_validate(args)
DocumentService.document_create_args_validate(knowledge_config)
try:
documents, _ = DocumentService.save_document_with_dataset_id(
dataset=dataset,
knowledge_config=knowledge_config,
account=dataset.created_by_account,
dataset_process_rule=dataset.latest_process_rule if "process_rule" not in args else None,
created_from="api",
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
document = documents[0]
documents_and_batch_fields = {"document": marshal(document, document_fields), "batch": document.batch}
return documents_and_batch_fields, 200
@service_api_ns.route(
"/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/update_by_file",
"/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/update-by-file",
)
class DeprecatedDocumentUpdateByFileApi(DatasetApiResource):
"""Deprecated resource aliases for file document updates."""
class DocumentUpdateByFileApi(DatasetApiResource):
"""Resource for update documents."""
@service_api_ns.doc("update_document_by_file_deprecated")
@service_api_ns.doc(deprecated=True)
@service_api_ns.doc(
description=(
"Deprecated legacy alias for updating an existing document by uploading a file. "
"Use PATCH /datasets/{dataset_id}/documents/{document_id} instead."
)
)
@service_api_ns.doc("update_document_by_file")
@service_api_ns.doc(description="Update an existing document by uploading a file")
@service_api_ns.doc(params={"dataset_id": "Dataset ID", "document_id": "Document ID"})
@service_api_ns.doc(
responses={
@ -570,9 +419,82 @@ class DeprecatedDocumentUpdateByFileApi(DatasetApiResource):
)
@cloud_edition_billing_resource_check("vector_space", "dataset")
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id: str, dataset_id: UUID, document_id: UUID):
"""Update document by file through the deprecated file-update aliases."""
return _update_document_by_file(tenant_id=tenant_id, dataset_id=dataset_id, document_id=document_id)
def post(self, tenant_id, dataset_id, document_id):
"""Update document by upload file."""
dataset = db.session.scalar(
select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id).limit(1)
)
if not dataset:
raise ValueError("Dataset does not exist.")
if dataset.provider == "external":
raise ValueError("External datasets are not supported.")
args = {}
if "data" in request.form:
args = json.loads(request.form["data"])
if "doc_form" not in args:
args["doc_form"] = dataset.chunk_structure or "text_model"
if "doc_language" not in args:
args["doc_language"] = "English"
# get dataset info
dataset_id = str(dataset_id)
tenant_id = str(tenant_id)
# indexing_technique is already set in dataset since this is an update
args["indexing_technique"] = dataset.indexing_technique
if "file" in request.files:
# save file info
file = request.files["file"]
if len(request.files) > 1:
raise TooManyFilesError()
if not file.filename:
raise FilenameNotExistsError
if not current_user:
raise ValueError("current_user is required")
try:
upload_file = FileService(db.engine).upload_file(
filename=file.filename,
content=file.read(),
mimetype=file.mimetype,
user=current_user,
source="datasets",
)
except services.errors.file.FileTooLargeError as file_too_large_error:
raise FileTooLargeError(file_too_large_error.description)
except services.errors.file.UnsupportedFileTypeError:
raise UnsupportedFileTypeError()
data_source = {
"type": "upload_file",
"info_list": {"data_source_type": "upload_file", "file_info_list": {"file_ids": [upload_file.id]}},
}
args["data_source"] = data_source
# validate args
args["original_document_id"] = str(document_id)
knowledge_config = KnowledgeConfig.model_validate(args)
DocumentService.document_create_args_validate(knowledge_config)
try:
documents, _ = DocumentService.save_document_with_dataset_id(
dataset=dataset,
knowledge_config=knowledge_config,
account=dataset.created_by_account,
dataset_process_rule=dataset.latest_process_rule if "process_rule" not in args else None,
created_from="api",
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
document = documents[0]
documents_and_batch_fields = {"document": marshal(document, document_fields), "batch": document.batch}
return documents_and_batch_fields, 200
@service_api_ns.route("/datasets/<uuid:dataset_id>/documents")
@ -886,22 +808,6 @@ class DocumentApi(DatasetApiResource):
return response
@service_api_ns.doc("update_document_by_file")
@service_api_ns.doc(description="Update an existing document by uploading a file")
@service_api_ns.doc(params={"dataset_id": "Dataset ID", "document_id": "Document ID"})
@service_api_ns.doc(
responses={
200: "Document updated successfully",
401: "Unauthorized - invalid API token",
404: "Document not found",
}
)
@cloud_edition_billing_resource_check("vector_space", "dataset")
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def patch(self, tenant_id: str, dataset_id: UUID, document_id: UUID):
"""Update document by file on the canonical document resource."""
return _update_document_by_file(tenant_id=tenant_id, dataset_id=dataset_id, document_id=document_id)
@service_api_ns.doc("delete_document")
@service_api_ns.doc(description="Delete a document")
@service_api_ns.doc(params={"dataset_id": "Dataset ID", "document_id": "Document ID"})

View File

@ -9,11 +9,11 @@ from typing import Any, NotRequired, TypedDict
from flask import Response, request
from flask_restx import Resource
from pydantic import BaseModel
from sqlalchemy import select
from werkzeug.exceptions import Forbidden
from configs import dify_config
from controllers.common.human_input import HumanInputFormSubmitPayload
from controllers.web import web_ns
from controllers.web.error import NotFoundError, WebFormRateLimitExceededError
from controllers.web.site import serialize_app_site_payload
@ -26,6 +26,11 @@ from services.human_input_service import Form, FormNotFoundError, HumanInputServ
logger = logging.getLogger(__name__)
class HumanInputFormSubmitPayload(BaseModel):
inputs: dict
action: str
_FORM_SUBMIT_RATE_LIMITER = RateLimiter(
prefix="web_form_submit_rate_limit",
max_attempts=dify_config.WEB_FORM_SUBMIT_RATE_LIMIT_MAX_ATTEMPTS,

View File

@ -1,7 +1,5 @@
from typing import Any
CUSTOM_FOLLOW_UP_PROMPT_MAX_LENGTH = 1000
class SuggestedQuestionsAfterAnswerConfigManager:
@classmethod
@ -22,11 +20,7 @@ class SuggestedQuestionsAfterAnswerConfigManager:
@classmethod
def validate_and_set_defaults(cls, config: dict[str, Any]) -> tuple[dict[str, Any], list[str]]:
"""
Validate and set defaults for suggested questions feature.
Optional fields:
- prompt: custom instruction prompt.
- model: provider/model configuration for suggested question generation.
Validate and set defaults for suggested questions feature
:param config: app model config args
"""
@ -45,27 +39,4 @@ class SuggestedQuestionsAfterAnswerConfigManager:
if not isinstance(config["suggested_questions_after_answer"]["enabled"], bool):
raise ValueError("enabled in suggested_questions_after_answer must be of boolean type")
prompt = config["suggested_questions_after_answer"].get("prompt")
if prompt is not None and not isinstance(prompt, str):
raise ValueError("prompt in suggested_questions_after_answer must be of string type")
if isinstance(prompt, str) and len(prompt) > CUSTOM_FOLLOW_UP_PROMPT_MAX_LENGTH:
raise ValueError(
f"prompt in suggested_questions_after_answer must be less than or equal to "
f"{CUSTOM_FOLLOW_UP_PROMPT_MAX_LENGTH} characters"
)
if "model" in config["suggested_questions_after_answer"]:
model_config = config["suggested_questions_after_answer"]["model"]
if not isinstance(model_config, dict):
raise ValueError("model in suggested_questions_after_answer must be of object type")
if "provider" not in model_config or not isinstance(model_config["provider"], str):
raise ValueError("provider in suggested_questions_after_answer.model must be of string type")
if "name" not in model_config or not isinstance(model_config["name"], str):
raise ValueError("name in suggested_questions_after_answer.model must be of string type")
if "completion_params" in model_config and not isinstance(model_config["completion_params"], dict):
raise ValueError("completion_params in suggested_questions_after_answer.model must be of object type")
return config, ["suggested_questions_after_answer"]

View File

@ -35,8 +35,8 @@ from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom
from core.app.entities.task_entities import (
AdvancedChatPausedBlockingResponse,
ChatbotAppBlockingResponse,
ChatbotAppPausedBlockingResponse,
ChatbotAppStreamResponse,
)
from core.app.layers.pause_state_persist_layer import PauseStateLayerConfig, PauseStatePersistenceLayer
@ -660,9 +660,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
draft_var_saver_factory: DraftVariableSaverFactory,
stream: bool = False,
) -> (
ChatbotAppBlockingResponse
| AdvancedChatPausedBlockingResponse
| Generator[ChatbotAppStreamResponse, None, None]
ChatbotAppBlockingResponse | ChatbotAppPausedBlockingResponse | Generator[ChatbotAppStreamResponse, None, None]
):
"""
Handle response.

View File

@ -175,7 +175,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
# Create a variable pool.
# init variable pool
variable_pool = VariablePool.from_bootstrap()
variable_pool = VariablePool()
add_variables_to_pool(
variable_pool,
build_bootstrap_variables(

View File

@ -3,35 +3,34 @@ from typing import Any, cast
from core.app.apps.base_app_generate_response_converter import AppGenerateResponseConverter
from core.app.entities.task_entities import (
AdvancedChatPausedBlockingResponse,
AppStreamResponse,
ChatbotAppBlockingResponse,
ChatbotAppPausedBlockingResponse,
ChatbotAppStreamResponse,
ErrorStreamResponse,
MessageEndStreamResponse,
NodeFinishStreamResponse,
NodeStartStreamResponse,
PingStreamResponse,
StreamEvent,
)
class AdvancedChatAppGenerateResponseConverter(
AppGenerateResponseConverter[ChatbotAppBlockingResponse | AdvancedChatPausedBlockingResponse]
AppGenerateResponseConverter[ChatbotAppBlockingResponse | ChatbotAppPausedBlockingResponse]
):
@classmethod
def convert_blocking_full_response(
cls, blocking_response: ChatbotAppBlockingResponse | AdvancedChatPausedBlockingResponse
cls, blocking_response: ChatbotAppBlockingResponse | ChatbotAppPausedBlockingResponse
) -> dict[str, Any]:
"""
Convert blocking full response.
:param blocking_response: blocking response
:return:
"""
if isinstance(blocking_response, AdvancedChatPausedBlockingResponse):
if isinstance(blocking_response, ChatbotAppPausedBlockingResponse):
paused_data = blocking_response.data.model_dump(mode="json")
return {
"event": StreamEvent.WORKFLOW_PAUSED.value,
"event": "workflow_paused",
"task_id": blocking_response.task_id,
"id": blocking_response.data.id,
"message_id": blocking_response.data.message_id,
@ -45,7 +44,7 @@ class AdvancedChatAppGenerateResponseConverter(
}
response = {
"event": StreamEvent.MESSAGE.value,
"event": "message",
"task_id": blocking_response.task_id,
"id": blocking_response.data.id,
"message_id": blocking_response.data.message_id,
@ -60,7 +59,7 @@ class AdvancedChatAppGenerateResponseConverter(
@classmethod
def convert_blocking_simple_response(
cls, blocking_response: ChatbotAppBlockingResponse | AdvancedChatPausedBlockingResponse
cls, blocking_response: ChatbotAppBlockingResponse | ChatbotAppPausedBlockingResponse
) -> dict[str, Any]:
"""
Convert blocking simple response.

View File

@ -53,11 +53,10 @@ from core.app.entities.queue_entities import (
WorkflowQueueMessage,
)
from core.app.entities.task_entities import (
AdvancedChatPausedBlockingResponse,
ChatbotAppBlockingResponse,
ChatbotAppPausedBlockingResponse,
ChatbotAppStreamResponse,
ErrorStreamResponse,
HumanInputRequiredPauseReasonPayload,
HumanInputRequiredResponse,
MessageAudioEndStreamResponse,
MessageAudioStreamResponse,
@ -75,7 +74,7 @@ from core.repositories.human_input_repository import HumanInputFormRepositoryImp
from core.workflow.file_reference import resolve_file_record_id
from core.workflow.system_variables import build_system_variables
from extensions.ext_database import db
from graphon.entities.pause_reason import HumanInputRequired
from graphon.entities.pause_reason import HumanInputRequired, PauseReasonType
from graphon.enums import WorkflowExecutionStatus
from graphon.model_runtime.entities.llm_entities import LLMUsage
from graphon.model_runtime.utils.encoders import jsonable_encoder
@ -218,7 +217,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
self,
) -> Union[
ChatbotAppBlockingResponse,
AdvancedChatPausedBlockingResponse,
ChatbotAppPausedBlockingResponse,
Generator[ChatbotAppStreamResponse, None, None],
]:
"""
@ -238,7 +237,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
def _to_blocking_response(
self, generator: Generator[StreamResponse, None, None]
) -> Union[ChatbotAppBlockingResponse, AdvancedChatPausedBlockingResponse]:
) -> Union[ChatbotAppBlockingResponse, ChatbotAppPausedBlockingResponse]:
"""
Process blocking response.
:return:
@ -250,9 +249,9 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
elif isinstance(stream_response, HumanInputRequiredResponse):
human_input_responses.append(stream_response)
elif isinstance(stream_response, WorkflowPauseStreamResponse):
return AdvancedChatPausedBlockingResponse(
return ChatbotAppPausedBlockingResponse(
task_id=stream_response.task_id,
data=AdvancedChatPausedBlockingResponse.Data(
data=ChatbotAppPausedBlockingResponse.Data(
id=self._message_id,
mode=self._conversation_mode,
conversation_id=self._conversation_id,
@ -296,17 +295,18 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
def _build_paused_blocking_response_from_human_input(
self, human_input_responses: list[HumanInputRequiredResponse]
) -> AdvancedChatPausedBlockingResponse:
) -> ChatbotAppPausedBlockingResponse:
runtime_state = self._resolve_graph_runtime_state()
paused_nodes = list(dict.fromkeys(response.data.node_id for response in human_input_responses))
reasons = [
HumanInputRequiredPauseReasonPayload.from_response_data(response.data).model_dump(mode="json")
for response in human_input_responses
]
reasons = []
for response in human_input_responses:
reason = response.data.model_dump(mode="json")
reason["type"] = PauseReasonType.HUMAN_INPUT_REQUIRED
reasons.append(reason)
return AdvancedChatPausedBlockingResponse(
return ChatbotAppPausedBlockingResponse(
task_id=self._application_generate_entity.task_id,
data=AdvancedChatPausedBlockingResponse.Data(
data=ChatbotAppPausedBlockingResponse.Data(
id=self._message_id,
mode=self._conversation_mode,
conversation_id=self._conversation_id,

View File

@ -1,8 +1,6 @@
from collections.abc import Generator
from typing import Any, cast
from pydantic import JsonValue
from core.app.apps.base_app_generate_response_converter import AppGenerateResponseConverter
from core.app.entities.task_entities import (
AppStreamResponse,
@ -70,7 +68,7 @@ class AgentChatAppGenerateResponseConverter(AppGenerateResponseConverter[Chatbot
yield "ping"
continue
response_chunk: dict[str, JsonValue] = {
response_chunk = {
"event": sub_stream_response.event.value,
"conversation_id": chunk.conversation_id,
"message_id": chunk.message_id,
@ -101,7 +99,7 @@ class AgentChatAppGenerateResponseConverter(AppGenerateResponseConverter[Chatbot
yield "ping"
continue
response_chunk: dict[str, JsonValue] = {
response_chunk = {
"event": sub_stream_response.event.value,
"conversation_id": chunk.conversation_id,
"message_id": chunk.message_id,

View File

@ -3,8 +3,6 @@ from abc import ABC, abstractmethod
from collections.abc import Generator, Mapping
from typing import Any, Union, cast
from pydantic import JsonValue
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.task_entities import AppBlockingResponse, AppStreamResponse
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
@ -110,13 +108,13 @@ class AppGenerateResponseConverter[TBlockingResponse: AppBlockingResponse](ABC):
return metadata
@classmethod
def _error_to_stream_response(cls, e: Exception) -> dict[str, JsonValue]:
def _error_to_stream_response(cls, e: Exception) -> dict[str, Any]:
"""
Error to stream response.
:param e: exception
:return:
"""
error_responses: dict[type[Exception], dict[str, JsonValue]] = {
error_responses: dict[type[Exception], dict[str, Any]] = {
ValueError: {"code": "invalid_param", "status": 400},
ProviderTokenNotInitError: {"code": "provider_not_initialize", "status": 400},
QuotaExceededError: {
@ -130,7 +128,7 @@ class AppGenerateResponseConverter[TBlockingResponse: AppBlockingResponse](ABC):
}
# Determine the response based on the type of exception
data: dict[str, JsonValue] | None = None
data: dict[str, Any] | None = None
for k, v in error_responses.items():
if isinstance(e, k):
data = v

View File

@ -1,8 +1,6 @@
from collections.abc import Generator
from typing import Any, cast
from pydantic import JsonValue
from core.app.apps.base_app_generate_response_converter import AppGenerateResponseConverter
from core.app.entities.task_entities import (
AppStreamResponse,
@ -70,7 +68,7 @@ class ChatAppGenerateResponseConverter(AppGenerateResponseConverter[ChatbotAppBl
yield "ping"
continue
response_chunk: dict[str, JsonValue] = {
response_chunk = {
"event": sub_stream_response.event.value,
"conversation_id": chunk.conversation_id,
"message_id": chunk.message_id,
@ -101,7 +99,7 @@ class ChatAppGenerateResponseConverter(AppGenerateResponseConverter[ChatbotAppBl
yield "ping"
continue
response_chunk: dict[str, JsonValue] = {
response_chunk = {
"event": sub_stream_response.event.value,
"conversation_id": chunk.conversation_id,
"message_id": chunk.message_id,

View File

@ -0,0 +1,17 @@
from collections.abc import Mapping
from typing import Any
from graphon.entities.pause_reason import PauseReason
def pause_reason_to_public_dict(reason: PauseReason | Mapping[str, Any]) -> dict[str, Any]:
if isinstance(reason, Mapping):
data = dict(reason)
else:
data = dict(reason.model_dump(mode="json"))
discriminator = data.pop("TYPE", None)
if discriminator is not None:
data["type"] = discriminator
return data

View File

@ -9,7 +9,9 @@ from typing import Any, NewType, TypedDict, Union
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.app.apps.common.pause_reason_serializer import pause_reason_to_public_dict
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom, WorkflowAppGenerateEntity
from core.workflow.human_input_policy import enrich_human_input_pause_reasons
from core.app.entities.queue_entities import (
QueueAgentLogEvent,
QueueHumanInputFormFilledEvent,
@ -52,7 +54,6 @@ from core.tools.tool_manager import ToolManager
from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE
from core.trigger.trigger_manager import TriggerManager
from core.workflow.human_input_forms import load_form_tokens_by_form_id
from core.workflow.human_input_policy import HumanInputSurface, enrich_human_input_pause_reasons
from core.workflow.system_variables import SystemVariableKey, system_variables_to_mapping
from core.workflow.workflow_entry import WorkflowEntry
from extensions.ext_database import db
@ -318,7 +319,7 @@ class WorkflowResponseConverter:
encoded_outputs = self._encode_outputs(event.outputs) or {}
if self._application_generate_entity.invoke_from == InvokeFrom.SERVICE_API:
encoded_outputs = {}
pause_reasons = [reason.model_dump(mode="json") for reason in event.reasons]
pause_reasons = [pause_reason_to_public_dict(reason) for reason in event.reasons]
human_input_form_ids = [reason.form_id for reason in event.reasons if isinstance(reason, HumanInputRequired)]
expiration_times_by_form_id: dict[str, datetime] = {}
display_in_ui_by_form_id: dict[str, bool] = {}
@ -337,15 +338,7 @@ class WorkflowResponseConverter:
except (TypeError, json.JSONDecodeError):
definition_payload = {}
display_in_ui_by_form_id[str(form_id)] = bool(definition_payload.get("display_in_ui"))
form_token_by_form_id = load_form_tokens_by_form_id(
human_input_form_ids,
session=session,
surface=(
HumanInputSurface.SERVICE_API
if self._application_generate_entity.invoke_from == InvokeFrom.SERVICE_API
else None
),
)
form_token_by_form_id = load_form_tokens_by_form_id(human_input_form_ids, session=session)
# Reconnect paths must preserve the same pause-reason contract as live streams;
# otherwise clients see schema drift after resume.

View File

@ -1,8 +1,6 @@
from collections.abc import Generator
from typing import Any, cast
from pydantic import JsonValue
from core.app.apps.base_app_generate_response_converter import AppGenerateResponseConverter
from core.app.entities.task_entities import (
AppStreamResponse,
@ -22,7 +20,7 @@ class CompletionAppGenerateResponseConverter(AppGenerateResponseConverter[Comple
:param blocking_response: blocking response
:return:
"""
response: dict[str, Any] = {
response = {
"event": "message",
"task_id": blocking_response.task_id,
"id": blocking_response.data.id,
@ -69,7 +67,7 @@ class CompletionAppGenerateResponseConverter(AppGenerateResponseConverter[Comple
yield "ping"
continue
response_chunk: dict[str, JsonValue] = {
response_chunk = {
"event": sub_stream_response.event.value,
"message_id": chunk.message_id,
"created_at": chunk.created_at,
@ -99,7 +97,7 @@ class CompletionAppGenerateResponseConverter(AppGenerateResponseConverter[Comple
yield "ping"
continue
response_chunk: dict[str, JsonValue] = {
response_chunk = {
"event": sub_stream_response.event.value,
"message_id": chunk.message_id,
"created_at": chunk.created_at,

View File

@ -144,7 +144,7 @@ class PipelineRunner(WorkflowBasedAppRunner):
)
)
variable_pool = VariablePool.from_bootstrap()
variable_pool = VariablePool()
add_variables_to_pool(
variable_pool,
build_bootstrap_variables(

View File

@ -59,7 +59,7 @@ def stream_topic_events(
def _normalize_terminal_events(terminal_events: Iterable[str | StreamEvent] | None) -> set[str]:
if terminal_events is None:
if not terminal_events:
return {StreamEvent.WORKFLOW_FINISHED.value, StreamEvent.WORKFLOW_PAUSED.value}
values: set[str] = set()
for item in terminal_events:

View File

@ -106,7 +106,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
workflow_id=app_config.workflow_id,
workflow_execution_id=self.application_generate_entity.workflow_execution_id,
)
variable_pool = VariablePool.from_bootstrap()
variable_pool = VariablePool()
add_variables_to_pool(
variable_pool,
build_bootstrap_variables(

View File

@ -63,7 +63,7 @@ class WorkflowAppGenerateResponseConverter(
if isinstance(sub_stream_response, ErrorStreamResponse):
data = cls._error_to_stream_response(sub_stream_response.err)
response_chunk.update(data)
response_chunk.update(cast(dict[str, object], data))
else:
response_chunk.update(sub_stream_response.model_dump(mode="json"))
yield response_chunk
@ -92,9 +92,9 @@ class WorkflowAppGenerateResponseConverter(
if isinstance(sub_stream_response, ErrorStreamResponse):
data = cls._error_to_stream_response(sub_stream_response.err)
response_chunk.update(data)
response_chunk.update(cast(dict[str, object], data))
elif isinstance(sub_stream_response, NodeStartStreamResponse | NodeFinishStreamResponse):
response_chunk.update(sub_stream_response.to_ignore_detail_dict())
response_chunk.update(cast(dict[str, object], sub_stream_response.to_ignore_detail_dict()))
else:
response_chunk.update(sub_stream_response.model_dump(mode="json"))
yield response_chunk

View File

@ -42,7 +42,6 @@ from core.app.entities.queue_entities import (
)
from core.app.entities.task_entities import (
ErrorStreamResponse,
HumanInputRequiredPauseReasonPayload,
HumanInputRequiredResponse,
MessageAudioEndStreamResponse,
MessageAudioStreamResponse,
@ -200,11 +199,14 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
) -> WorkflowAppPausedBlockingResponse:
runtime_state = self._resolve_graph_runtime_state()
paused_nodes = list(dict.fromkeys(response.data.node_id for response in human_input_responses))
created_at = int(runtime_state.start_at)
reasons = [
HumanInputRequiredPauseReasonPayload.from_response_data(response.data).model_dump(mode="json")
for response in human_input_responses
]
# Graph runtime `start_at` is a perf-counter value, not an epoch timestamp, so
# fallback API payloads need a wall-clock source for `created_at`.
created_at = int(time.time())
reasons = []
for response in human_input_responses:
reason = response.data.model_dump(mode="json")
reason["type"] = "human_input_required"
reasons.append(reason)
return WorkflowAppPausedBlockingResponse(
task_id=self._application_generate_entity.task_id,

View File

@ -188,7 +188,7 @@ class WorkflowBasedAppRunner:
ValueError: If neither single_iteration_run nor single_loop_run is specified
"""
# Create initial runtime state with variable pool containing environment variables
variable_pool = VariablePool.from_bootstrap()
variable_pool = VariablePool()
add_variables_to_pool(
variable_pool,
build_bootstrap_variables(

View File

@ -1,13 +1,12 @@
from collections.abc import Mapping, Sequence
from enum import StrEnum
from typing import Any, Literal
from typing import Any
from pydantic import BaseModel, ConfigDict, Field, JsonValue
from pydantic import BaseModel, ConfigDict, Field
from core.app.entities.agent_strategy import AgentStrategyInfo
from core.rag.entities import RetrievalSourceMetadata
from graphon.entities import WorkflowStartReason
from graphon.entities.pause_reason import PauseReasonType
from graphon.enums import WorkflowExecutionStatus, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from graphon.model_runtime.entities.llm_entities import LLMResult, LLMUsage
from graphon.nodes.human_input.entities import FormInput, UserAction
@ -296,40 +295,6 @@ class HumanInputRequiredResponse(StreamResponse):
data: Data
class HumanInputRequiredPauseReasonPayload(BaseModel):
"""
Public pause-reason payload used by blocking responses when only
``human_input_required`` events are available.
"""
TYPE: Literal[PauseReasonType.HUMAN_INPUT_REQUIRED] = PauseReasonType.HUMAN_INPUT_REQUIRED
form_id: str
node_id: str
node_title: str
form_content: str
inputs: Sequence[FormInput] = Field(default_factory=list)
actions: Sequence[UserAction] = Field(default_factory=list)
display_in_ui: bool = False
form_token: str | None = None
resolved_default_values: Mapping[str, Any] = Field(default_factory=dict)
expiration_time: int
@classmethod
def from_response_data(cls, data: HumanInputRequiredResponse.Data) -> "HumanInputRequiredPauseReasonPayload":
return cls(
form_id=data.form_id,
node_id=data.node_id,
node_title=data.node_title,
form_content=data.form_content,
inputs=data.inputs,
actions=data.actions,
display_in_ui=data.display_in_ui,
form_token=data.form_token,
resolved_default_values=data.resolved_default_values,
expiration_time=data.expiration_time,
)
class HumanInputFormFilledResponse(StreamResponse):
class Data(BaseModel):
"""
@ -390,7 +355,7 @@ class NodeStartStreamResponse(StreamResponse):
workflow_run_id: str
data: Data
def to_ignore_detail_dict(self) -> dict[str, JsonValue]:
def to_ignore_detail_dict(self):
return {
"event": self.event.value,
"task_id": self.task_id,
@ -447,7 +412,7 @@ class NodeFinishStreamResponse(StreamResponse):
workflow_run_id: str
data: Data
def to_ignore_detail_dict(self) -> dict[str, JsonValue]:
def to_ignore_detail_dict(self):
return {
"event": self.event.value,
"task_id": self.task_id,
@ -809,7 +774,7 @@ class ChatbotAppBlockingResponse(AppBlockingResponse):
data: Data
class AdvancedChatPausedBlockingResponse(AppBlockingResponse):
class ChatbotAppPausedBlockingResponse(AppBlockingResponse):
"""
ChatbotAppPausedBlockingResponse entity
"""
@ -828,7 +793,7 @@ class AdvancedChatPausedBlockingResponse(AppBlockingResponse):
metadata: Mapping[str, object] = Field(default_factory=dict)
created_at: int
paused_nodes: Sequence[str] = Field(default_factory=list)
reasons: Sequence[Mapping[str, Any]] = Field(default_factory=list[Mapping[str, Any]])
reasons: Sequence[Mapping[str, Any]] = Field(default_factory=list)
status: WorkflowExecutionStatus
elapsed_time: float
total_tokens: int

View File

@ -1,6 +1,6 @@
from __future__ import annotations
from collections.abc import Generator # Changed from Iterator
from collections.abc import Iterator
from contextlib import contextmanager
from contextvars import ContextVar
from dataclasses import dataclass
@ -32,7 +32,7 @@ def get_current_file_access_scope() -> FileAccessScope | None:
@contextmanager
def bind_file_access_scope(scope: FileAccessScope) -> Generator[None, None, None]: # Changed from Iterator[None]
def bind_file_access_scope(scope: FileAccessScope) -> Iterator[None]:
token = _current_file_access_scope.set(scope)
try:
yield

View File

@ -1,6 +1,5 @@
from __future__ import annotations
from copy import deepcopy
from typing import Any
from core.app.entities.app_invoke_entities import DifyRunContext, ModelConfigWithCredentialsEntity
@ -15,21 +14,8 @@ from graphon.nodes.llm.protocols import CredentialsProvider
class DifyCredentialsProvider:
"""Resolves and returns LLM credentials for a given provider and model.
Fetched credentials are stored in :attr:`credentials_cache` and reused for
subsequent ``fetch`` calls for the same ``(provider_name, model_name)``.
Because of that cache, a single instance can return stale credentials after
the tenant or provider configuration changes (e.g. API key rotation).
Do **not** keep one instance for the lifetime of a process or across
unrelated invocations. Create a new provider per request, workflow run, or
other bounded scope where up-to-date credentials matter.
"""
tenant_id: str
provider_manager: ProviderManager
credentials_cache: dict[tuple[str, str], dict[str, Any]]
def __init__(
self,
@ -44,12 +30,8 @@ class DifyCredentialsProvider:
user_id=run_context.user_id,
)
self.provider_manager = provider_manager
self.credentials_cache = {}
def fetch(self, provider_name: str, model_name: str) -> dict[str, Any]:
if (provider_name, model_name) in self.credentials_cache:
return deepcopy(self.credentials_cache[(provider_name, model_name)])
provider_configurations = self.provider_manager.get_configurations(self.tenant_id)
provider_configuration = provider_configurations.get(provider_name)
if not provider_configuration:
@ -64,7 +46,6 @@ class DifyCredentialsProvider:
if credentials is None:
raise ProviderTokenNotInitError(f"Model {model_name} credentials is not initialized.")
self.credentials_cache[(provider_name, model_name)] = deepcopy(credentials)
return credentials
@ -84,8 +65,7 @@ class DifyModelFactory:
provider_manager=create_plugin_provider_manager(
tenant_id=run_context.tenant_id,
user_id=run_context.user_id,
),
enable_credentials_cache=True,
)
)
self.model_manager = model_manager
@ -104,7 +84,7 @@ def build_dify_model_access(run_context: DifyRunContext) -> tuple[CredentialsPro
tenant_id=run_context.tenant_id,
user_id=run_context.user_id,
)
model_manager = ModelManager(provider_manager=provider_manager, enable_credentials_cache=True)
model_manager = ModelManager(provider_manager=provider_manager)
return (
DifyCredentialsProvider(run_context=run_context, provider_manager=provider_manager),

View File

@ -16,7 +16,6 @@ from graphon.graph_engine.entities.commands import AbortCommand, CommandType
from graphon.graph_engine.layers import GraphEngineLayer
from graphon.graph_events import GraphEngineEvent, GraphNodeEventBase, NodeRunSucceededEvent
from graphon.nodes.base.node import Node
from graphon.nodes.llm.runtime_protocols import PreparedLLMProtocol
if TYPE_CHECKING:
from graphon.nodes.llm.node import LLMNode
@ -117,8 +116,7 @@ class LLMQuotaLayer(GraphEngineLayer):
case BuiltinNodeTypes.PARAMETER_EXTRACTOR:
model_instance = cast("ParameterExtractorNode", node).model_instance
case BuiltinNodeTypes.QUESTION_CLASSIFIER:
typed_node: QuestionClassifierNode = cast("QuestionClassifierNode", node)
model_instance = cast(PreparedLLMProtocol, typed_node._model_instance)
model_instance = cast("QuestionClassifierNode", node).model_instance
case _:
return None
except AttributeError:

View File

@ -24,7 +24,6 @@ from core.entities.provider_entities import (
from core.helper import encrypter
from core.helper.model_provider_cache import ProviderCredentialsCache, ProviderCredentialsCacheType
from core.plugin.impl.model_runtime_factory import create_plugin_model_provider_factory
from graphon.model_runtime import ModelRuntime
from graphon.model_runtime.entities.model_entities import AIModelEntity, FetchFrom, ModelType
from graphon.model_runtime.entities.provider_entities import (
ConfigurateMethod,
@ -34,6 +33,7 @@ from graphon.model_runtime.entities.provider_entities import (
)
from graphon.model_runtime.model_providers.base.ai_model import AIModel
from graphon.model_runtime.model_providers.model_provider_factory import ModelProviderFactory
from graphon.model_runtime.runtime import ModelRuntime
from libs.datetime_utils import naive_utc_now
from models.engine import db
from models.enums import CredentialSourceType
@ -109,7 +109,7 @@ class ProviderConfiguration(BaseModel):
def get_model_provider_factory(self) -> ModelProviderFactory:
"""Return a provider factory that preserves any request-bound runtime."""
if self._bound_model_runtime is not None:
return ModelProviderFactory(runtime=self._bound_model_runtime)
return ModelProviderFactory(model_runtime=self._bound_model_runtime)
return create_plugin_model_provider_factory(tenant_id=self.tenant_id)
def get_current_credentials(self, model_type: ModelType, model: str) -> dict[str, Any] | None:
@ -1392,12 +1392,10 @@ class ProviderConfiguration(BaseModel):
:param model_type: model type
:return:
"""
from core.plugin.impl.model_runtime_factory import create_model_type_instance
model_provider_factory = self.get_model_provider_factory()
return create_model_type_instance(
factory=model_provider_factory, provider=self.provider.provider, model_type=model_type
)
# Get model instance of LLM
return model_provider_factory.get_model_type_instance(provider=self.provider.provider, model_type=model_type)
def get_model_schema(
self, model_type: ModelType, model: str, credentials: dict[str, Any] | None

View File

@ -1,41 +0,0 @@
"""
Helper module for Creators Platform integration.
Provides functionality to upload DSL files to the Creators Platform
and generate redirect URLs with OAuth authorization codes.
"""
import logging
from urllib.parse import urlencode
import httpx
from yarl import URL
from configs import dify_config
logger = logging.getLogger(__name__)
creators_platform_api_url = URL(str(dify_config.CREATORS_PLATFORM_API_URL))
def upload_dsl(dsl_file_bytes: bytes, filename: str = "template.yaml") -> str:
url = str(creators_platform_api_url / "api/v1/templates/anonymous-upload")
response = httpx.post(url, files={"file": (filename, dsl_file_bytes)}, timeout=30)
response.raise_for_status()
data = response.json()
claim_code = data.get("data", {}).get("claim_code")
if not claim_code:
raise ValueError("Creators Platform did not return a valid claim_code")
return claim_code
def get_redirect_url(user_account_id: str, claim_code: str) -> str:
base_url = str(dify_config.CREATORS_PLATFORM_API_URL).rstrip("/")
params: dict[str, str] = {"dsl_claim_code": claim_code}
client_id = str(dify_config.CREATORS_PLATFORM_OAUTH_CLIENT_ID or "")
if client_id:
from services.oauth_server import OAuthServerService
oauth_code = OAuthServerService.sign_oauth_authorization_code(client_id, user_account_id)
params["oauth_code"] = oauth_code
return f"{base_url}?{urlencode(params)}"

View File

@ -4,7 +4,7 @@ from typing import cast
from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
from core.entities import DEFAULT_PLUGIN_ID
from core.plugin.impl.model_runtime_factory import create_model_type_instance, create_plugin_model_provider_factory
from core.plugin.impl.model_runtime_factory import create_plugin_model_provider_factory
from extensions.ext_hosting_provider import hosting_configuration
from graphon.model_runtime.entities.model_entities import ModelType
from graphon.model_runtime.errors.invoke import InvokeBadRequestError
@ -44,8 +44,8 @@ def check_moderation(tenant_id: str, model_config: ModelConfigWithCredentialsEnt
model_provider_factory = create_plugin_model_provider_factory(tenant_id=tenant_id)
# Get model instance of LLM
model_type_instance = create_model_type_instance(
factory=model_provider_factory, provider=openai_provider_name, model_type=ModelType.MODERATION
model_type_instance = model_provider_factory.get_model_type_instance(
provider=openai_provider_name, model_type=ModelType.MODERATION
)
model_type_instance = cast(ModerationModel, model_type_instance)
moderation_result = model_type_instance.invoke(

View File

@ -2,7 +2,7 @@ import json
import logging
import re
from collections.abc import Sequence
from typing import Any, NotRequired, Protocol, TypedDict, cast
from typing import Any, Protocol, TypedDict, cast
import json_repair
from sqlalchemy import select
@ -18,6 +18,8 @@ from core.llm_generator.prompts import (
LLM_MODIFY_CODE_SYSTEM,
LLM_MODIFY_PROMPT_SYSTEM,
PYTHON_CODE_GENERATOR_PROMPT_TEMPLATE,
SUGGESTED_QUESTIONS_MAX_TOKENS,
SUGGESTED_QUESTIONS_TEMPERATURE,
SYSTEM_STRUCTURED_OUTPUT_GENERATE,
WORKFLOW_RULE_CONFIG_PROMPT_GENERATE_TEMPLATE,
)
@ -39,36 +41,6 @@ from models.workflow import Workflow
logger = logging.getLogger(__name__)
class SuggestedQuestionsModelConfig(TypedDict):
provider: str
name: str
completion_params: NotRequired[dict[str, object]]
def _normalize_completion_params(completion_params: dict[str, object]) -> tuple[dict[str, object], list[str]]:
"""
Normalize raw completion params into invocation parameters and stop sequences.
This mirrors the app-model access path by separating ``stop`` from provider
parameters before invocation, then drops non-positive token limits because
some plugin-backed models reject ``0`` after mapping ``max_tokens`` to their
provider-specific output-token field.
"""
normalized_parameters = dict(completion_params)
stop_value = normalized_parameters.pop("stop", [])
if isinstance(stop_value, list) and all(isinstance(item, str) for item in stop_value):
stop = stop_value
else:
stop = []
for token_limit_key in ("max_tokens", "max_output_tokens"):
token_limit = normalized_parameters.get(token_limit_key)
if isinstance(token_limit, int | float) and token_limit <= 0:
normalized_parameters.pop(token_limit_key, None)
return normalized_parameters, stop
class WorkflowServiceInterface(Protocol):
def get_draft_workflow(self, app_model: App, workflow_id: str | None = None) -> Workflow | None:
pass
@ -151,15 +123,8 @@ class LLMGenerator:
return name
@classmethod
def generate_suggested_questions_after_answer(
cls,
tenant_id: str,
histories: str,
*,
instruction_prompt: str | None = None,
model_config: object | None = None,
) -> Sequence[str]:
output_parser = SuggestedQuestionsAfterAnswerOutputParser(instruction_prompt=instruction_prompt)
def generate_suggested_questions_after_answer(cls, tenant_id: str, histories: str) -> Sequence[str]:
output_parser = SuggestedQuestionsAfterAnswerOutputParser()
format_instructions = output_parser.get_format_instructions()
prompt_template = PromptTemplateParser(template="{{histories}}\n{{format_instructions}}\nquestions:\n")
@ -168,36 +133,10 @@ class LLMGenerator:
try:
model_manager = ModelManager.for_tenant(tenant_id=tenant_id)
configured_model = cast(dict[str, object], model_config) if isinstance(model_config, dict) else {}
provider = configured_model.get("provider")
model_name = configured_model.get("name")
use_configured_model = False
if isinstance(provider, str) and provider and isinstance(model_name, str) and model_name:
try:
model_instance = model_manager.get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
provider=provider,
model=model_name,
)
use_configured_model = True
except Exception:
logger.warning(
"Failed to use configured suggested-questions model %s/%s, fallback to default model",
provider,
model_name,
exc_info=True,
)
model_instance = model_manager.get_default_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
)
else:
model_instance = model_manager.get_default_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
)
model_instance = model_manager.get_default_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
)
except InvokeAuthorizationError:
return []
@ -206,29 +145,19 @@ class LLMGenerator:
questions: Sequence[str] = []
try:
configured_completion_params = configured_model.get("completion_params")
if use_configured_model and isinstance(configured_completion_params, dict):
model_parameters, stop = _normalize_completion_params(configured_completion_params)
elif use_configured_model:
model_parameters = {}
stop = []
else:
# Default-model generation keeps the built-in suggested-questions tuning.
model_parameters = {
"max_tokens": 2560,
"temperature": 0.0,
}
stop = []
response: LLMResult = model_instance.invoke_llm(
prompt_messages=list(prompt_messages),
model_parameters=model_parameters,
stop=stop,
model_parameters={
"max_tokens": SUGGESTED_QUESTIONS_MAX_TOKENS,
"temperature": SUGGESTED_QUESTIONS_TEMPERATURE,
},
stream=False,
)
text_content = response.message.get_text_content()
questions = output_parser.parse(text_content) if text_content else []
except InvokeError:
questions = []
except Exception:
logger.exception("Failed to generate suggested questions after answer")
questions = []

View File

@ -3,28 +3,17 @@ import logging
import re
from collections.abc import Sequence
from core.llm_generator.prompts import DEFAULT_SUGGESTED_QUESTIONS_AFTER_ANSWER_INSTRUCTION_PROMPT
from core.llm_generator.prompts import SUGGESTED_QUESTIONS_AFTER_ANSWER_INSTRUCTION_PROMPT
logger = logging.getLogger(__name__)
class SuggestedQuestionsAfterAnswerOutputParser:
def __init__(self, instruction_prompt: str | None = None) -> None:
self._instruction_prompt = self._build_instruction_prompt(instruction_prompt)
@staticmethod
def _build_instruction_prompt(instruction_prompt: str | None) -> str:
if not instruction_prompt or not instruction_prompt.strip():
return DEFAULT_SUGGESTED_QUESTIONS_AFTER_ANSWER_INSTRUCTION_PROMPT
return f'{instruction_prompt}\nYou must output a JSON array like ["question1", "question2", "question3"].'
def get_format_instructions(self) -> str:
return self._instruction_prompt
return SUGGESTED_QUESTIONS_AFTER_ANSWER_INSTRUCTION_PROMPT
def parse(self, text: str) -> Sequence[str]:
stripped_text = text.strip()
action_match = re.search(r"\[.*?\]", stripped_text, re.DOTALL)
action_match = re.search(r"\[.*?\]", text.strip(), re.DOTALL)
questions: list[str] = []
if action_match is not None:
try:
@ -34,6 +23,4 @@ class SuggestedQuestionsAfterAnswerOutputParser:
else:
if isinstance(json_obj, list):
questions = [question for question in json_obj if isinstance(question, str)]
elif stripped_text:
logger.warning("Failed to find suggested questions payload array in text: %r", stripped_text[:200])
return questions

View File

@ -1,4 +1,5 @@
# Written by YORKI MINAKO🤡, Edited by Xiaoyi, Edited by yasu-oh
import os
CONVERSATION_TITLE_PROMPT = """You are asked to generate a concise chat title by decomposing the users input into two parts: “Intention” and “Subject”.
@ -95,8 +96,8 @@ JAVASCRIPT_CODE_GENERATOR_PROMPT_TEMPLATE = (
)
# Default prompt and model parameters for suggested questions.
DEFAULT_SUGGESTED_QUESTIONS_AFTER_ANSWER_INSTRUCTION_PROMPT = (
# Default prompt for suggested questions (can be overridden by environment variable)
_DEFAULT_SUGGESTED_QUESTIONS_AFTER_ANSWER_PROMPT = (
"Please help me predict the three most likely questions that human would ask, "
"and keep each question under 20 characters.\n"
"MAKE SURE your output is the SAME language as the Assistant's latest response. "
@ -104,6 +105,15 @@ DEFAULT_SUGGESTED_QUESTIONS_AFTER_ANSWER_INSTRUCTION_PROMPT = (
'["question1","question2","question3"]\n'
)
# Environment variable override for suggested questions prompt
SUGGESTED_QUESTIONS_AFTER_ANSWER_INSTRUCTION_PROMPT = os.getenv(
"SUGGESTED_QUESTIONS_PROMPT", _DEFAULT_SUGGESTED_QUESTIONS_AFTER_ANSWER_PROMPT
)
# Configurable LLM parameters for suggested questions (can be overridden by environment variables)
SUGGESTED_QUESTIONS_MAX_TOKENS = int(os.getenv("SUGGESTED_QUESTIONS_MAX_TOKENS", "256"))
SUGGESTED_QUESTIONS_TEMPERATURE = float(os.getenv("SUGGESTED_QUESTIONS_TEMPERATURE", "0"))
GENERATOR_QA_PROMPT = (
"<Task> The user will send a long text. Generate a Question and Answer pairs only using the knowledge"
" in the long text. Please think step by step."

View File

@ -1,6 +1,5 @@
import logging
from collections.abc import Callable, Generator, Iterable, Mapping, Sequence
from copy import deepcopy
from typing import IO, Any, Literal, Optional, ParamSpec, TypeVar, Union, cast, overload
from configs import dify_config
@ -37,13 +36,11 @@ class ModelInstance:
Model instance class.
"""
def __init__(self, provider_model_bundle: ProviderModelBundle, model: str, credentials: dict | None = None) -> None:
def __init__(self, provider_model_bundle: ProviderModelBundle, model: str):
self.provider_model_bundle = provider_model_bundle
self.model_name = model
self.provider = provider_model_bundle.configuration.provider.provider
if credentials is None:
credentials = self._fetch_credentials_from_bundle(provider_model_bundle, model)
self.credentials = credentials
self.credentials = self._fetch_credentials_from_bundle(provider_model_bundle, model)
# Runtime LLM invocation fields.
self.parameters: Mapping[str, Any] = {}
self.stop: Sequence[str] = ()
@ -437,30 +434,8 @@ class ModelInstance:
class ModelManager:
"""Resolves :class:`ModelInstance` objects for a tenant and provider.
When ``enable_credentials_cache`` is ``True``, resolved credentials for each
``(tenant_id, provider, model_type, model)`` are stored in
``_credentials_cache`` and reused. That can return **stale** credentials after
API keys or provider settings change, so a manager constructed with
``enable_credentials_cache=True`` should not be kept for the lifetime of a
process or shared across unrelated work. Prefer a new manager per request,
workflow run, or similar bounded scope.
The default is ``enable_credentials_cache=False``; in that mode the internal
credential cache is not populated, and each ``get_model_instance`` call
loads credentials from the current provider configuration.
"""
def __init__(
self,
provider_manager: ProviderManager,
*,
enable_credentials_cache: bool = False,
) -> None:
def __init__(self, provider_manager: ProviderManager):
self._provider_manager = provider_manager
self._credentials_cache: dict[tuple[str, str, str, str], Any] = {}
self._enable_credentials_cache = enable_credentials_cache
@classmethod
def for_tenant(cls, tenant_id: str, user_id: str | None = None) -> "ModelManager":
@ -488,19 +463,8 @@ class ModelManager:
tenant_id=tenant_id, provider=provider, model_type=model_type
)
cred_cache_key = (tenant_id, provider, model_type.value, model)
if cred_cache_key in self._credentials_cache:
return ModelInstance(
provider_model_bundle,
model,
deepcopy(self._credentials_cache[cred_cache_key]),
)
ret = ModelInstance(provider_model_bundle, model)
if self._enable_credentials_cache:
self._credentials_cache[cred_cache_key] = deepcopy(ret.credentials)
return ret
model_instance = ModelInstance(provider_model_bundle, model)
return model_instance
def get_default_provider_model_name(self, tenant_id: str, model_type: ModelType) -> tuple[str | None, str | None]:
"""

View File

@ -4,7 +4,7 @@ import hashlib
import logging
from collections.abc import Generator, Iterable, Sequence
from threading import Lock
from typing import IO, Any, Literal, Union, overload
from typing import IO, Any, Union
from pydantic import ValidationError
from redis import RedisError
@ -14,18 +14,13 @@ from core.plugin.entities.plugin_daemon import PluginModelProviderEntity
from core.plugin.impl.asset import PluginAssetManager
from core.plugin.impl.model import PluginModelClient
from extensions.ext_redis import redis_client
from graphon.model_runtime import ModelRuntime
from graphon.model_runtime.entities.llm_entities import (
LLMResult,
LLMResultChunk,
LLMResultChunkWithStructuredOutput,
LLMResultWithStructuredOutput,
)
from graphon.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk
from graphon.model_runtime.entities.message_entities import PromptMessage, PromptMessageTool
from graphon.model_runtime.entities.model_entities import AIModelEntity, ModelType
from graphon.model_runtime.entities.provider_entities import ProviderEntity
from graphon.model_runtime.entities.rerank_entities import MultimodalRerankInput, RerankResult
from graphon.model_runtime.entities.text_embedding_entities import EmbeddingInputType, EmbeddingResult
from graphon.model_runtime.runtime import ModelRuntime
from models.provider_ids import ModelProviderID
logger = logging.getLogger(__name__)
@ -200,34 +195,6 @@ class PluginModelRuntime(ModelRuntime):
return schema
@overload
def invoke_llm(
self,
*,
provider: str,
model: str,
credentials: dict[str, Any],
model_parameters: dict[str, Any],
prompt_messages: Sequence[PromptMessage],
tools: list[PromptMessageTool] | None,
stop: Sequence[str] | None,
stream: Literal[False],
) -> LLMResult: ...
@overload
def invoke_llm(
self,
*,
provider: str,
model: str,
credentials: dict[str, Any],
model_parameters: dict[str, Any],
prompt_messages: Sequence[PromptMessage],
tools: list[PromptMessageTool] | None,
stop: Sequence[str] | None,
stream: Literal[True],
) -> Generator[LLMResultChunk, None, None]: ...
def invoke_llm(
self,
*,
@ -255,50 +222,6 @@ class PluginModelRuntime(ModelRuntime):
stream=stream,
)
@overload
def invoke_llm_with_structured_output(
self,
*,
provider: str,
model: str,
credentials: dict[str, Any],
json_schema: dict[str, Any],
model_parameters: dict[str, Any],
prompt_messages: Sequence[PromptMessage],
stop: Sequence[str] | None,
stream: Literal[False],
) -> LLMResultWithStructuredOutput: ...
@overload
def invoke_llm_with_structured_output(
self,
*,
provider: str,
model: str,
credentials: dict[str, Any],
json_schema: dict[str, Any],
model_parameters: dict[str, Any],
prompt_messages: Sequence[PromptMessage],
stop: Sequence[str] | None,
stream: Literal[True],
) -> Generator[LLMResultChunkWithStructuredOutput, None, None]: ...
def invoke_llm_with_structured_output(
self,
*,
provider: str,
model: str,
credentials: dict[str, Any],
json_schema: dict[str, Any],
model_parameters: dict[str, Any],
prompt_messages: Sequence[PromptMessage],
stop: Sequence[str] | None,
stream: bool,
) -> LLMResultWithStructuredOutput | Generator[LLMResultChunkWithStructuredOutput, None, None]:
# TODO: added to pass type check.
# it is a new method from upstream that is not invoked at all.
raise NotImplementedError
def get_llm_num_tokens(
self,
*,

View File

@ -3,14 +3,6 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from core.plugin.impl.model import PluginModelClient
from graphon.model_runtime.entities.model_entities import ModelType
from graphon.model_runtime.model_providers.base.ai_model import AIModel
from graphon.model_runtime.model_providers.base.large_language_model import LargeLanguageModel
from graphon.model_runtime.model_providers.base.moderation_model import ModerationModel
from graphon.model_runtime.model_providers.base.rerank_model import RerankModel
from graphon.model_runtime.model_providers.base.speech2text_model import Speech2TextModel
from graphon.model_runtime.model_providers.base.text_embedding_model import TextEmbeddingModel
from graphon.model_runtime.model_providers.base.tts_model import TTSModel
from graphon.model_runtime.model_providers.model_provider_factory import ModelProviderFactory
if TYPE_CHECKING:
@ -18,15 +10,6 @@ if TYPE_CHECKING:
from core.plugin.impl.model_runtime import PluginModelRuntime
from core.provider_manager import ProviderManager
_MODEL_TYPE_CLASS_MAP: dict[ModelType, type[AIModel]] = {
ModelType.LLM: LargeLanguageModel,
ModelType.TEXT_EMBEDDING: TextEmbeddingModel,
ModelType.RERANK: RerankModel,
ModelType.SPEECH2TEXT: Speech2TextModel,
ModelType.MODERATION: ModerationModel,
ModelType.TTS: TTSModel,
}
class PluginModelAssembly:
"""Compose request-scoped model views on top of a single plugin runtime."""
@ -55,7 +38,7 @@ class PluginModelAssembly:
@property
def model_provider_factory(self) -> ModelProviderFactory:
if self._model_provider_factory is None:
self._model_provider_factory = ModelProviderFactory(runtime=self.model_runtime)
self._model_provider_factory = ModelProviderFactory(model_runtime=self.model_runtime)
return self._model_provider_factory
@property
@ -104,30 +87,3 @@ def create_plugin_provider_manager(*, tenant_id: str, user_id: str | None = None
def create_plugin_model_manager(*, tenant_id: str, user_id: str | None = None) -> ModelManager:
"""Create a tenant-bound model manager for service flows."""
return create_plugin_model_assembly(tenant_id=tenant_id, user_id=user_id).model_manager
def create_model_type_instance(
factory: ModelProviderFactory,
provider: str,
model_type: ModelType,
) -> AIModel:
"""Instantiate the AIModel subclass for *model_type* backed by *factory*'s runtime.
This replaces ``ModelProviderFactory.get_model_type_instance`` which was
removed in graphon 0.3.0. The mapping from ModelType to concrete AIModel
subclass is maintained here so that callers do not need to know the
subclass constructors.
:param factory: factory whose ``runtime`` and provider resolution are used.
:param provider: provider identifier (canonical or short name).
:param model_type: the model type to instantiate.
:returns: an AIModel subclass instance wired to the factory's runtime.
:raises ValueError: if *model_type* is not supported.
"""
model_class = _MODEL_TYPE_CLASS_MAP.get(model_type)
if model_class is None:
msg = f"Unsupported model type: {model_type}"
raise ValueError(msg)
provider_entity = factory.get_model_provider(provider)
return model_class(provider_schema=provider_entity, model_runtime=factory.runtime)

View File

@ -151,12 +151,6 @@ def deserialize_response(raw_data: bytes) -> Response:
response = Response(response=body, status=status_code)
# Replace Flask's default headers (e.g. Content-Type, Content-Length) with the
# parsed ones so we faithfully reproduce the original response. Use Headers.add
# rather than dict-style assignment so that repeated headers such as Set-Cookie
# (and any other multi-valued header per RFC 9110) are preserved instead of
# being overwritten.
response.headers.clear()
for line in lines[1:]:
if not line:
continue
@ -164,6 +158,6 @@ def deserialize_response(raw_data: bytes) -> Response:
if ":" not in line_str:
continue
name, value = line_str.split(":", 1)
response.headers.add(name, value.strip())
response.headers[name] = value.strip()
return response

View File

@ -56,7 +56,7 @@ from models.provider_ids import ModelProviderID
from services.feature_service import FeatureService
if TYPE_CHECKING:
from graphon.model_runtime import ModelRuntime
from graphon.model_runtime.runtime import ModelRuntime
_credentials_adapter: TypeAdapter[dict[str, Any]] = TypeAdapter(dict[str, Any])
@ -70,32 +70,12 @@ class ProviderManager:
Request-bound managers may carry caller identity in that runtime, and the
resulting ``ProviderConfiguration`` objects must reuse it for downstream
model-type and schema lookups.
Configuration assembly is cached per manager instance so call chains that
share one request-scoped manager can reuse the same provider graph instead
of rebuilding it for every lookup. Call ``clear_configurations_cache()``
when a long-lived manager needs to observe writes performed within the same
instance scope.
"""
decoding_rsa_key: Any | None
decoding_cipher_rsa: Any | None
_model_runtime: ModelRuntime
_configurations_cache: dict[str, ProviderConfigurations]
def __init__(self, model_runtime: ModelRuntime):
self.decoding_rsa_key = None
self.decoding_cipher_rsa = None
self._model_runtime = model_runtime
self._configurations_cache = {}
def clear_configurations_cache(self, tenant_id: str | None = None) -> None:
"""Drop assembled provider configurations cached on this manager instance."""
if tenant_id is None:
self._configurations_cache.clear()
return
self._configurations_cache.pop(tenant_id, None)
def get_configurations(self, tenant_id: str) -> ProviderConfigurations:
"""
@ -134,10 +114,6 @@ class ProviderManager:
:param tenant_id:
:return:
"""
cached_configurations = self._configurations_cache.get(tenant_id)
if cached_configurations is not None:
return cached_configurations
# Get all provider records of the workspace
provider_name_to_provider_records_dict = self._get_all_providers(tenant_id)
@ -165,7 +141,7 @@ class ProviderManager:
)
# Get all provider entities
model_provider_factory = ModelProviderFactory(runtime=self._model_runtime)
model_provider_factory = ModelProviderFactory(model_runtime=self._model_runtime)
provider_entities = model_provider_factory.get_providers()
# Get All preferred provider types of the workspace
@ -297,8 +273,6 @@ class ProviderManager:
provider_configurations[str(provider_id_entity)] = provider_configuration
self._configurations_cache[tenant_id] = provider_configurations
# Return the encapsulated object
return provider_configurations
@ -362,7 +336,7 @@ class ProviderManager:
if not default_model:
return None
model_provider_factory = ModelProviderFactory(runtime=self._model_runtime)
model_provider_factory = ModelProviderFactory(model_runtime=self._model_runtime)
provider_schema = model_provider_factory.get_provider_schema(provider=default_model.provider_name)
return DefaultModelEntity(

View File

@ -139,10 +139,8 @@ class Jieba(BaseKeyword):
"__data__": {"index_id": self.dataset.id, "summary": None, "table": keyword_table},
}
dataset_keyword_table = self.dataset.dataset_keyword_table
keyword_data_source_type = dataset_keyword_table.data_source_type if dataset_keyword_table else "file"
keyword_data_source_type = dataset_keyword_table.data_source_type
if keyword_data_source_type == "database":
if dataset_keyword_table is None:
return
dataset_keyword_table.keyword_table = dumps_with_sets(keyword_table_dict)
db.session.commit()
else:
@ -156,8 +154,7 @@ class Jieba(BaseKeyword):
if dataset_keyword_table:
keyword_table_dict = dataset_keyword_table.keyword_table_dict
if keyword_table_dict:
data: Any = keyword_table_dict["__data__"]
return dict(data["table"])
return dict(keyword_table_dict["__data__"]["table"])
else:
keyword_data_source_type = dify_config.KEYWORD_DATA_SOURCE_TYPE
dataset_keyword_table = DatasetKeywordTable(

View File

@ -1,5 +1,4 @@
import re
from collections.abc import Callable
from operator import itemgetter
from typing import cast
@ -81,14 +80,12 @@ class JiebaKeywordTableHandler:
def extract_tags(self, sentence: str, top_k: int | None = 20, **kwargs):
# Basic frequency-based keyword extraction as a fallback when TF-IDF is unavailable.
top_k = cast(int | None, kwargs.pop("topK", top_k))
if top_k is None:
top_k = 20
top_k = kwargs.pop("topK", top_k)
cut = getattr(jieba, "cut", None)
if self._lcut:
tokens = self._lcut(sentence)
elif callable(cut):
tokens = list(cast(Callable[[str], list[str]], cut)(sentence))
tokens = list(cut(sentence))
else:
tokens = re.findall(r"\w+", sentence)
@ -109,9 +106,9 @@ class JiebaKeywordTableHandler:
"""Extract keywords with JIEBA tfidf."""
keywords = self._tfidf.extract_tags(
sentence=text,
topK=max_keywords_per_chunk or 10,
topK=max_keywords_per_chunk,
)
# jieba.analyse.extract_tags returns an untyped list when withFlag is False by default.
# jieba.analyse.extract_tags returns list[Any] when withFlag is False by default.
keywords = cast(list[str], keywords)
return set(self._expand_tokens_with_subtokens(set(keywords)))

View File

@ -158,7 +158,7 @@ class RetrievalService:
)
if futures:
for _ in concurrent.futures.as_completed(futures, timeout=3600):
for future in concurrent.futures.as_completed(futures, timeout=3600):
if exceptions:
for f in futures:
f.cancel()
@ -551,7 +551,6 @@ class RetrievalService:
child_index_nodes = session.execute(child_chunk_stmt).scalars().all()
for i in child_index_nodes:
assert i.index_node_id
segment_ids.append(i.segment_id)
if i.segment_id in child_chunk_map:
child_chunk_map[i.segment_id].append(i)

View File

@ -39,58 +39,6 @@ class AbstractVectorFactory(ABC):
return index_struct_dict
class _LazyEmbeddings(Embeddings):
"""Lazy proxy that defers materializing the real embedding model.
Constructing the real embeddings (via ``ModelManager.get_model_instance``)
transitively calls ``FeatureService.get_features`` → ``BillingService``
HTTP GETs (see ``provider_manager.py``). Cleanup paths
(``delete_by_ids`` / ``delete`` / ``text_exists``) do not need embeddings
at all, so deferring this until an ``embed_*`` method is actually invoked
keeps cleanup tasks resilient to transient billing-API failures and avoids
leaving stranded ``document_segments`` / ``child_chunks`` whenever billing
hiccups.
Existing callers that perform create / search operations are unaffected:
the first ``embed_*`` call materializes the underlying model and the
behavior is identical from that point on.
"""
def __init__(self, dataset: Dataset):
self._dataset = dataset
self._real: Embeddings | None = None
def _ensure(self) -> Embeddings:
if self._real is None:
model_manager = ModelManager.for_tenant(tenant_id=self._dataset.tenant_id)
embedding_model = model_manager.get_model_instance(
tenant_id=self._dataset.tenant_id,
provider=self._dataset.embedding_model_provider,
model_type=ModelType.TEXT_EMBEDDING,
model=self._dataset.embedding_model,
)
self._real = CacheEmbedding(embedding_model)
return self._real
def embed_documents(self, texts: list[str]) -> list[list[float]]:
return self._ensure().embed_documents(texts)
def embed_multimodal_documents(self, multimodel_documents: list[dict[str, Any]]) -> list[list[float]]:
return self._ensure().embed_multimodal_documents(multimodel_documents)
def embed_query(self, text: str) -> list[float]:
return self._ensure().embed_query(text)
def embed_multimodal_query(self, multimodel_document: dict[str, Any]) -> list[float]:
return self._ensure().embed_multimodal_query(multimodel_document)
async def aembed_documents(self, texts: list[str]) -> list[list[float]]:
return await self._ensure().aembed_documents(texts)
async def aembed_query(self, text: str) -> list[float]:
return await self._ensure().aembed_query(text)
class Vector:
def __init__(self, dataset: Dataset, attributes: list | None = None):
if attributes is None:
@ -112,11 +60,7 @@ class Vector:
"original_chunk_id",
]
self._dataset = dataset
# Use a lazy proxy so cleanup paths (delete_by_ids / delete / text_exists)
# never transitively trigger billing API calls during ``Vector(dataset)``
# construction. The real embedding model is materialized only when an
# ``embed_*`` method is actually invoked (i.e. create / search paths).
self._embeddings: Embeddings = _LazyEmbeddings(dataset)
self._embeddings = self._get_embeddings()
self._attributes = attributes
self._vector_processor = self._init_vector()

View File

@ -11,7 +11,6 @@ from core.rag.models.document import AttachmentDocument, Document
from extensions.ext_database import db
from graphon.model_runtime.entities.model_entities import ModelType
from models.dataset import ChildChunk, Dataset, DocumentSegment, SegmentAttachmentBinding
from models.enums import SegmentType
class DatasetDocumentStore:
@ -128,7 +127,6 @@ class DatasetDocumentStore:
if save_child:
if doc.children:
for position, child in enumerate(doc.children, start=1):
assert self._document_id
child_segment = ChildChunk(
tenant_id=self._dataset.tenant_id,
dataset_id=self._dataset.id,
@ -139,7 +137,7 @@ class DatasetDocumentStore:
index_node_hash=child.metadata.get("doc_hash"),
content=child.page_content,
word_count=len(child.page_content),
type=SegmentType.AUTOMATIC,
type="automatic",
created_by=self._user_id,
)
db.session.add(child_segment)
@ -165,7 +163,6 @@ class DatasetDocumentStore:
)
# add new child chunks
for position, child in enumerate(doc.children, start=1):
assert self._document_id
child_segment = ChildChunk(
tenant_id=self._dataset.tenant_id,
dataset_id=self._dataset.id,
@ -176,7 +173,7 @@ class DatasetDocumentStore:
index_node_hash=child.metadata.get("doc_hash"),
content=child.page_content,
word_count=len(child.page_content),
type=SegmentType.AUTOMATIC,
type="automatic",
created_by=self._user_id,
)
db.session.add(child_segment)

View File

@ -94,7 +94,6 @@ class ExtractProcessor:
cls, extract_setting: ExtractSetting, is_automatic: bool = False, file_path: str | None = None
) -> list[Document]:
if extract_setting.datasource_type == DatasourceType.FILE:
upload_file = extract_setting.upload_file
with tempfile.TemporaryDirectory() as temp_dir:
upload_file = extract_setting.upload_file
if not file_path:
@ -105,7 +104,6 @@ class ExtractProcessor:
storage.download(upload_file.key, file_path)
input_file = Path(file_path)
file_extension = input_file.suffix.lower()
assert upload_file is not None, "upload_file is required"
etl_type = dify_config.ETL_TYPE
extractor: BaseExtractor | None = None
if etl_type == "Unstructured":

View File

@ -28,10 +28,10 @@ class FunctionCallMultiDatasetRouter:
SystemPromptMessage(content="You are a helpful AI assistant."),
UserPromptMessage(content=query),
]
result: LLMResult = model_instance.invoke_llm( # pyright: ignore[reportCallIssue, reportArgumentType]
result: LLMResult = model_instance.invoke_llm(
prompt_messages=prompt_messages,
tools=dataset_tools,
stream=False, # pyright: ignore[reportArgumentType]
stream=False,
model_parameters={"temperature": 0.2, "top_p": 0.3, "max_tokens": 1500},
)
usage = result.usage or LLMUsage.empty_usage()

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import codecs
import re
from collections.abc import Set as AbstractSet
from collections.abc import Collection
from typing import Any, Literal
from core.model_manager import ModelInstance
@ -21,8 +21,8 @@ class EnhanceRecursiveCharacterTextSplitter(RecursiveCharacterTextSplitter):
def from_encoder[T: EnhanceRecursiveCharacterTextSplitter](
cls: type[T],
embedding_model_instance: ModelInstance | None,
allowed_special: Literal["all"] | AbstractSet[str] = frozenset(),
disallowed_special: Literal["all"] | AbstractSet[str] = "all",
allowed_special: Literal["all"] | set[str] = set(),
disallowed_special: Literal["all"] | Collection[str] = "all",
**kwargs: Any,
) -> T:
def _token_encoder(texts: list[str]) -> list[int]:
@ -40,7 +40,6 @@ class EnhanceRecursiveCharacterTextSplitter(RecursiveCharacterTextSplitter):
return [len(text) for text in texts]
_ = _token_encoder # kept for future token-length wiring
return cls(length_function=_character_encoder, **kwargs)

View File

@ -4,8 +4,7 @@ import copy
import logging
import re
from abc import ABC, abstractmethod
from collections.abc import Callable, Iterable, Sequence
from collections.abc import Set as AbstractSet
from collections.abc import Callable, Collection, Iterable, Sequence, Set
from dataclasses import dataclass
from typing import Any, Literal
@ -188,8 +187,8 @@ class TokenTextSplitter(TextSplitter):
self,
encoding_name: str = "gpt2",
model_name: str | None = None,
allowed_special: Literal["all"] | AbstractSet[str] = frozenset(),
disallowed_special: Literal["all"] | AbstractSet[str] = "all",
allowed_special: Literal["all"] | Set[str] = set(),
disallowed_special: Literal["all"] | Collection[str] = "all",
**kwargs: Any,
):
"""Create a new TextSplitter."""
@ -208,8 +207,8 @@ class TokenTextSplitter(TextSplitter):
else:
enc = tiktoken.get_encoding(encoding_name)
self._tokenizer = enc
self._allowed_special: Literal["all"] | AbstractSet[str] = allowed_special
self._disallowed_special: Literal["all"] | AbstractSet[str] = disallowed_special
self._allowed_special = allowed_special
self._disallowed_special = disallowed_special
def split_text(self, text: str) -> list[str]:
def _encode(_text: str) -> list[int]:

View File

@ -14,23 +14,23 @@ from configs import dify_config
logger = logging.getLogger(__name__)
class EncryptionError(Exception):
"""Encryption/decryption specific error"""
class OAuthEncryptionError(Exception):
"""OAuth encryption/decryption specific error"""
pass
class SystemEncrypter:
class SystemOAuthEncrypter:
"""
A simple parameters encrypter using AES-CBC encryption.
A simple OAuth parameters encrypter using AES-CBC encryption.
This class provides methods to encrypt and decrypt parameters
This class provides methods to encrypt and decrypt OAuth parameters
using AES-CBC mode with a key derived from the application's SECRET_KEY.
"""
def __init__(self, secret_key: str | None = None):
"""
Initialize the encrypter.
Initialize the OAuth encrypter.
Args:
secret_key: Optional secret key. If not provided, uses dify_config.SECRET_KEY
@ -43,19 +43,19 @@ class SystemEncrypter:
# Generate a fixed 256-bit key using SHA-256
self.key = hashlib.sha256(secret_key.encode()).digest()
def encrypt_params(self, params: Mapping[str, Any]) -> str:
def encrypt_oauth_params(self, oauth_params: Mapping[str, Any]) -> str:
"""
Encrypt parameters.
Encrypt OAuth parameters.
Args:
params: Parameters dictionary, e.g., {"client_id": "xxx", "client_secret": "xxx"}
oauth_params: OAuth parameters dictionary, e.g., {"client_id": "xxx", "client_secret": "xxx"}
Returns:
Base64-encoded encrypted string
Raises:
EncryptionError: If encryption fails
ValueError: If params is invalid
OAuthEncryptionError: If encryption fails
ValueError: If oauth_params is invalid
"""
try:
@ -66,7 +66,7 @@ class SystemEncrypter:
cipher = AES.new(self.key, AES.MODE_CBC, iv)
# Encrypt data
padded_data = pad(TypeAdapter(dict).dump_json(dict(params)), AES.block_size)
padded_data = pad(TypeAdapter(dict).dump_json(dict(oauth_params)), AES.block_size)
encrypted_data = cipher.encrypt(padded_data)
# Combine IV and encrypted data
@ -76,20 +76,20 @@ class SystemEncrypter:
return base64.b64encode(combined).decode()
except Exception as e:
raise EncryptionError(f"Encryption failed: {str(e)}") from e
raise OAuthEncryptionError(f"Encryption failed: {str(e)}") from e
def decrypt_params(self, encrypted_data: str) -> Mapping[str, Any]:
def decrypt_oauth_params(self, encrypted_data: str) -> Mapping[str, Any]:
"""
Decrypt parameters.
Decrypt OAuth parameters.
Args:
encrypted_data: Base64-encoded encrypted string
Returns:
Decrypted parameters dictionary
Decrypted OAuth parameters dictionary
Raises:
EncryptionError: If decryption fails
OAuthEncryptionError: If decryption fails
ValueError: If encrypted_data is invalid
"""
if not isinstance(encrypted_data, str):
@ -118,70 +118,70 @@ class SystemEncrypter:
unpadded_data = unpad(decrypted_data, AES.block_size)
# Parse JSON
params: Mapping[str, Any] = TypeAdapter(Mapping[str, Any]).validate_json(unpadded_data)
oauth_params: Mapping[str, Any] = TypeAdapter(Mapping[str, Any]).validate_json(unpadded_data)
if not isinstance(params, dict):
if not isinstance(oauth_params, dict):
raise ValueError("Decrypted data is not a valid dictionary")
return params
return oauth_params
except Exception as e:
raise EncryptionError(f"Decryption failed: {str(e)}") from e
raise OAuthEncryptionError(f"Decryption failed: {str(e)}") from e
# Factory function for creating encrypter instances
def create_system_encrypter(secret_key: str | None = None) -> SystemEncrypter:
def create_system_oauth_encrypter(secret_key: str | None = None) -> SystemOAuthEncrypter:
"""
Create an encrypter instance.
Create an OAuth encrypter instance.
Args:
secret_key: Optional secret key. If not provided, uses dify_config.SECRET_KEY
Returns:
SystemEncrypter instance
SystemOAuthEncrypter instance
"""
return SystemEncrypter(secret_key=secret_key)
return SystemOAuthEncrypter(secret_key=secret_key)
# Global encrypter instance (for backward compatibility)
_encrypter: SystemEncrypter | None = None
_oauth_encrypter: SystemOAuthEncrypter | None = None
def get_system_encrypter() -> SystemEncrypter:
def get_system_oauth_encrypter() -> SystemOAuthEncrypter:
"""
Get the global encrypter instance.
Get the global OAuth encrypter instance.
Returns:
SystemEncrypter instance
SystemOAuthEncrypter instance
"""
global _encrypter
if _encrypter is None:
_encrypter = SystemEncrypter()
return _encrypter
global _oauth_encrypter
if _oauth_encrypter is None:
_oauth_encrypter = SystemOAuthEncrypter()
return _oauth_encrypter
# Convenience functions for backward compatibility
def encrypt_system_params(params: Mapping[str, Any]) -> str:
def encrypt_system_oauth_params(oauth_params: Mapping[str, Any]) -> str:
"""
Encrypt parameters using the global encrypter.
Encrypt OAuth parameters using the global encrypter.
Args:
params: Parameters dictionary
oauth_params: OAuth parameters dictionary
Returns:
Base64-encoded encrypted string
"""
return get_system_encrypter().encrypt_params(params)
return get_system_oauth_encrypter().encrypt_oauth_params(oauth_params)
def decrypt_system_params(encrypted_data: str) -> Mapping[str, Any]:
def decrypt_system_oauth_params(encrypted_data: str) -> Mapping[str, Any]:
"""
Decrypt parameters using the global encrypter.
Decrypt OAuth parameters using the global encrypter.
Args:
encrypted_data: Base64-encoded encrypted string
Returns:
Decrypted parameters dictionary
Decrypted OAuth parameters dictionary
"""
return get_system_encrypter().decrypt_params(encrypted_data)
return get_system_oauth_encrypter().decrypt_oauth_params(encrypted_data)

View File

@ -105,7 +105,7 @@ class Article:
def extract_using_readabilipy(html: str):
json_article: dict[str, Any] = simple_json_from_html_string(html, use_readability=False)
json_article: dict[str, Any] = simple_json_from_html_string(html, use_readability=True)
article = Article(
title=json_article.get("title") or "",
author=json_article.get("byline") or "",

View File

@ -12,7 +12,7 @@ from collections.abc import Sequence
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.workflow.human_input_policy import HumanInputSurface, get_preferred_form_token
from core.workflow.human_input_policy import get_preferred_form_token
from extensions.ext_database import db
from models.human_input import HumanInputFormRecipient, RecipientType
@ -21,7 +21,6 @@ def load_form_tokens_by_form_id(
form_ids: Sequence[str],
*,
session: Session | None = None,
surface: HumanInputSurface | None = None,
) -> dict[str, str]:
"""Load the preferred access token for each human input form."""
unique_form_ids = list(dict.fromkeys(form_ids))
@ -29,43 +28,23 @@ def load_form_tokens_by_form_id(
return {}
if session is not None:
return _load_form_tokens_by_form_id(session, unique_form_ids, surface=surface)
return _load_form_tokens_by_form_id(session, unique_form_ids)
with Session(bind=db.engine, expire_on_commit=False) as new_session:
return _load_form_tokens_by_form_id(new_session, unique_form_ids, surface=surface)
return _load_form_tokens_by_form_id(new_session, unique_form_ids)
def _load_form_tokens_by_form_id(
session: Session,
form_ids: Sequence[str],
*,
surface: HumanInputSurface | None = None,
) -> dict[str, str]:
def _load_form_tokens_by_form_id(session: Session, form_ids: Sequence[str]) -> dict[str, str]:
recipients_by_form_id: dict[str, list[tuple[RecipientType, str]]] = {}
stmt = select(HumanInputFormRecipient).where(HumanInputFormRecipient.form_id.in_(form_ids))
for recipient in session.scalars(stmt):
if not recipient.access_token:
continue
recipients_by_form_id.setdefault(recipient.form_id, []).append(
(recipient.recipient_type, recipient.access_token)
)
recipients_by_form_id.setdefault(recipient.form_id, []).append((recipient.recipient_type, recipient.access_token))
tokens_by_form_id: dict[str, str] = {}
for form_id, recipients in recipients_by_form_id.items():
token = _get_surface_form_token(recipients, surface=surface)
token = get_preferred_form_token(recipients)
if token is not None:
tokens_by_form_id[form_id] = token
return tokens_by_form_id
def _get_surface_form_token(
recipients: Sequence[tuple[RecipientType, str]],
*,
surface: HumanInputSurface | None,
) -> str | None:
if surface == HumanInputSurface.SERVICE_API:
for recipient_type, token in recipients:
if recipient_type == RecipientType.STANDALONE_WEB_APP and token:
return token
return get_preferred_form_token(recipients)

View File

@ -4,7 +4,6 @@ from collections.abc import Mapping, Sequence
from enum import StrEnum
from typing import Any
from graphon.entities.pause_reason import PauseReasonType
from models.human_input import RecipientType
@ -62,7 +61,7 @@ def enrich_human_input_pause_reasons(
enriched: list[dict[str, Any]] = []
for reason in reasons:
updated = dict(reason)
if updated.get("TYPE") == PauseReasonType.HUMAN_INPUT_REQUIRED:
if updated.get("type") == "human_input_required":
form_id = updated.get("form_id")
if isinstance(form_id, str):
updated["form_token"] = form_tokens_by_form_id.get(form_id)

View File

@ -365,8 +365,7 @@ class DifyNodeFactory(NodeFactory):
(including pydantic ValidationError, which subclasses ValueError),
if node type is unknown, or if no implementation exists for the resolved version
"""
adapted_node_config = adapt_node_config_for_graph(node_config)
typed_node_config = NodeConfigDictAdapter.validate_python(adapted_node_config)
typed_node_config = NodeConfigDictAdapter.validate_python(adapt_node_config_for_graph(node_config))
node_id = typed_node_config["id"]
node_data = typed_node_config["data"]
node_class = self._resolve_node_class(node_type=node_data.type, node_version=str(node_data.version))
@ -374,11 +373,6 @@ class DifyNodeFactory(NodeFactory):
# Re-validate using the resolved node class so workflow-local node schemas
# stay explicit and constructors receive the concrete typed payload.
resolved_node_data = self._validate_resolved_node_data(node_class, node_data)
config_for_node_init: BaseNodeData | dict[str, Any]
if isinstance(resolved_node_data, BaseNodeData):
config_for_node_init = resolved_node_data.model_dump(mode="python", by_alias=True)
else:
config_for_node_init = resolved_node_data
node_type = node_data.type
node_init_kwargs_factories: Mapping[NodeType, Callable[[], dict[str, object]]] = {
BuiltinNodeTypes.CODE: lambda: {
@ -448,7 +442,7 @@ class DifyNodeFactory(NodeFactory):
node_init_kwargs = node_init_kwargs_factories.get(node_type, lambda: {})()
return node_class(
node_id=node_id,
data=config_for_node_init,
config=resolved_node_data,
graph_init_params=self.graph_init_params,
graph_runtime_state=self.graph_runtime_state,
**node_init_kwargs,
@ -480,7 +474,10 @@ class DifyNodeFactory(NodeFactory):
include_retriever_attachment_loader: bool,
include_jinja2_template_renderer: bool,
) -> dict[str, object]:
validated_node_data = cast(LLMCompatibleNodeData, node_data)
validated_node_data = cast(
LLMCompatibleNodeData,
self._validate_resolved_node_data(node_class=node_class, node_data=node_data),
)
model_instance = self._build_model_instance_for_llm_node(validated_node_data)
node_init_kwargs: dict[str, object] = {
"credentials_provider": self._llm_credentials_provider,

View File

@ -35,7 +35,7 @@ class AgentNode(Node[AgentNodeData]):
def __init__(
self,
node_id: str,
data: AgentNodeData,
config: AgentNodeData,
*,
graph_init_params: GraphInitParams,
graph_runtime_state: GraphRuntimeState,
@ -46,7 +46,7 @@ class AgentNode(Node[AgentNodeData]):
) -> None:
super().__init__(
node_id=node_id,
data=data,
config=config,
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)

View File

@ -36,14 +36,14 @@ class DatasourceNode(Node[DatasourceNodeData]):
def __init__(
self,
node_id: str,
data: DatasourceNodeData,
config: DatasourceNodeData,
*,
graph_init_params: "GraphInitParams",
graph_runtime_state: "GraphRuntimeState",
) -> None:
super().__init__(
node_id=node_id,
data=data,
config=config,
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)

View File

@ -32,14 +32,14 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
def __init__(
self,
node_id: str,
data: KnowledgeIndexNodeData,
config: KnowledgeIndexNodeData,
*,
graph_init_params: "GraphInitParams",
graph_runtime_state: "GraphRuntimeState",
) -> None:
super().__init__(
node_id=node_id,
data=data,
config=config,
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)

View File

@ -71,14 +71,14 @@ class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeD
def __init__(
self,
node_id: str,
data: KnowledgeRetrievalNodeData,
config: KnowledgeRetrievalNodeData,
*,
graph_init_params: "GraphInitParams",
graph_runtime_state: "GraphRuntimeState",
) -> None:
super().__init__(
node_id=node_id,
data=data,
config=config,
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)

View File

@ -3,7 +3,7 @@ from __future__ import annotations
from collections import defaultdict
from collections.abc import Mapping, Sequence
from enum import StrEnum
from typing import Any, Protocol
from typing import Any, Protocol, cast
from uuid import uuid4
from graphon.enums import BuiltinNodeTypes
@ -82,10 +82,13 @@ def build_system_variables(values: Mapping[str, Any] | None = None, /, **kwargs:
normalized = _normalize_system_variable_values(values, **kwargs)
return [
segment_to_variable(
segment=build_segment(value),
selector=system_variable_selector(key),
name=key,
cast(
Variable,
segment_to_variable(
segment=build_segment(value),
selector=system_variable_selector(key),
name=key,
),
)
for key, value in normalized.items()
]
@ -127,10 +130,13 @@ def build_bootstrap_variables(
for node_id, value in rag_pipeline_variables_map.items():
variables.append(
segment_to_variable(
segment=build_segment(value),
selector=(RAG_PIPELINE_VARIABLE_NODE_ID, node_id),
name=node_id,
cast(
Variable,
segment_to_variable(
segment=build_segment(value),
selector=(RAG_PIPELINE_VARIABLE_NODE_ID, node_id),
name=node_id,
),
)
)

Some files were not shown because too many files have changed in this diff Show More