Compare commits

..

5 Commits

Author SHA1 Message Date
91f9ab7ad3 opt(api): optimize update contention on the providers table (#24520) 2025-08-27 20:52:03 +08:00
337ad75114 refactor: remove document download functionality and related translations 2025-08-22 21:31:11 +08:00
9704319e10 fix: use automatic transaction commit in provider update handler
The provider update handler was missing proper transaction handling,
causing quota deductions to be lost. This fix uses session.begin()
context manager for automatic commit/rollback, ensuring provider
quota updates are properly persisted.

Fixes #24356
2025-08-22 21:24:55 +08:00
7f7156b325 refactor: improve session management in ToolManager 2025-08-15 13:09:54 +08:00
715a7fc19f fix: update session management in BuiltinToolManageService to use no_autoflush 2025-08-15 11:24:58 +08:00
434 changed files with 3507 additions and 18278 deletions

View File

@ -5,7 +5,7 @@ cd web && pnpm install
pipx install uv
echo 'alias start-api="cd /workspaces/dify/api && uv run python -m flask run --host 0.0.0.0 --port=5001 --debug"' >> ~/.bashrc
echo 'alias start-worker="cd /workspaces/dify/api && uv run python -m celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage"' >> ~/.bashrc
echo 'alias start-worker="cd /workspaces/dify/api && uv run python -m celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion"' >> ~/.bashrc
echo 'alias start-web="cd /workspaces/dify/web && pnpm dev"' >> ~/.bashrc
echo 'alias start-web-prod="cd /workspaces/dify/web && pnpm build && pnpm start"' >> ~/.bashrc
echo 'alias start-containers="cd /workspaces/dify/docker && docker-compose -f docker-compose.middleware.yaml -p dify --env-file middleware.env up -d"' >> ~/.bashrc

1197
.env.example Normal file

File diff suppressed because it is too large Load Diff

View File

@ -8,7 +8,7 @@ inputs:
uv-version:
description: UV version to set up
required: true
default: '0.8.9'
default: '~=0.7.11'
uv-lockfile:
description: Path to the UV lockfile to restore cache from
required: true
@ -26,7 +26,7 @@ runs:
python-version: ${{ inputs.python-version }}
- name: Install uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@v5
with:
version: ${{ inputs.uv-version }}
python-version: ${{ inputs.python-version }}

View File

@ -23,9 +23,6 @@ jobs:
uv run ruff check --fix-only .
# Format code
uv run ruff format .
- name: ast-grep
run: |
uvx --from ast-grep-cli sg --pattern 'db.session.query($WHATEVER).filter($HERE)' --rewrite 'db.session.query($WHATEVER).where($HERE)' -l py --update-all
- uses: autofix-ci/action@635ffb0c9798bd160680f18fd73371e355b85f27

View File

@ -82,7 +82,7 @@ jobs:
- name: Install pnpm
uses: pnpm/action-setup@v4
with:
package_json_file: web/package.json
version: 10
run_install: false
- name: Setup NodeJS
@ -95,12 +95,10 @@ jobs:
- name: Web dependencies
if: steps.changed-files.outputs.any_changed == 'true'
working-directory: ./web
run: pnpm install --frozen-lockfile
- name: Web style check
if: steps.changed-files.outputs.any_changed == 'true'
working-directory: ./web
run: pnpm run lint
docker-compose-template:

View File

@ -46,7 +46,7 @@ jobs:
- name: Install pnpm
uses: pnpm/action-setup@v4
with:
package_json_file: web/package.json
version: 10
run_install: false
- name: Set up Node.js
@ -59,12 +59,10 @@ jobs:
- name: Install dependencies
if: env.FILES_CHANGED == 'true'
working-directory: ./web
run: pnpm install --frozen-lockfile
- name: Generate i18n translations
if: env.FILES_CHANGED == 'true'
working-directory: ./web
run: pnpm run auto-gen-i18n ${{ env.FILE_ARGS }}
- name: Create Pull Request

View File

@ -35,7 +35,7 @@ jobs:
if: steps.changed-files.outputs.any_changed == 'true'
uses: pnpm/action-setup@v4
with:
package_json_file: web/package.json
version: 10
run_install: false
- name: Setup Node.js
@ -48,10 +48,8 @@ jobs:
- name: Install dependencies
if: steps.changed-files.outputs.any_changed == 'true'
working-directory: ./web
run: pnpm install --frozen-lockfile
- name: Run tests
if: steps.changed-files.outputs.any_changed == 'true'
working-directory: ./web
run: pnpm test

2
.gitignore vendored
View File

@ -197,8 +197,6 @@ sdks/python-client/dify_client.egg-info
!.vscode/README.md
pyrightconfig.json
api/.vscode
# vscode Code History Extension
.history
.idea/

View File

@ -1,83 +0,0 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
Dify is an open-source platform for developing LLM applications with an intuitive interface combining agentic AI workflows, RAG pipelines, agent capabilities, and model management.
The codebase consists of:
- **Backend API** (`/api`): Python Flask application with Domain-Driven Design architecture
- **Frontend Web** (`/web`): Next.js 15 application with TypeScript and React 19
- **Docker deployment** (`/docker`): Containerized deployment configurations
## Development Commands
### Backend (API)
All Python commands must be prefixed with `uv run --project api`:
```bash
# Start development servers
./dev/start-api # Start API server
./dev/start-worker # Start Celery worker
# Run tests
uv run --project api pytest # Run all tests
uv run --project api pytest tests/unit_tests/ # Unit tests only
uv run --project api pytest tests/integration_tests/ # Integration tests
# Code quality
./dev/reformat # Run all formatters and linters
uv run --project api ruff check --fix ./ # Fix linting issues
uv run --project api ruff format ./ # Format code
uv run --project api mypy . # Type checking
```
### Frontend (Web)
```bash
cd web
pnpm lint # Run ESLint
pnpm eslint-fix # Fix ESLint issues
pnpm test # Run Jest tests
```
## Testing Guidelines
### Backend Testing
- Use `pytest` for all backend tests
- Write tests first (TDD approach)
- Test structure: Arrange-Act-Assert
## Code Style Requirements
### Python
- Use type hints for all functions and class attributes
- No `Any` types unless absolutely necessary
- Implement special methods (`__repr__`, `__str__`) appropriately
### TypeScript/JavaScript
- Strict TypeScript configuration
- ESLint with Prettier integration
- Avoid `any` type
## Important Notes
- **Environment Variables**: Always use UV for Python commands: `uv run --project api <command>`
- **Comments**: Only write meaningful comments that explain "why", not "what"
- **File Creation**: Always prefer editing existing files over creating new ones
- **Documentation**: Don't create documentation files unless explicitly requested
- **Code Quality**: Always run `./dev/reformat` before committing backend changes
## Common Development Tasks
### Adding a New API Endpoint
1. Create controller in `/api/controllers/`
2. Add service logic in `/api/services/`
3. Update routes in controller's `__init__.py`
4. Write tests in `/api/tests/`
## Project-Specific Conventions
- All async tasks use Celery with Redis as broker

View File

@ -225,8 +225,7 @@ Deploy Dify to AWS with [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK by @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK by @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
- [AWS CDK by @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Using Alibaba Cloud Computing Nest

View File

@ -208,8 +208,7 @@ docker compose up -d
##### AWS
- [AWS CDK بواسطة @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK بواسطة @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
- [AWS CDK بواسطة @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### استخدام Alibaba Cloud للنشر
[بسرعة نشر Dify إلى سحابة علي بابا مع عش الحوسبة السحابية علي بابا](https://computenest.console.aliyun.com/service/instance/create/default?type=user&ServiceName=Dify%E7%A4%BE%E5%8C%BA%E7%89%88)

View File

@ -225,8 +225,7 @@ GitHub-এ ডিফাইকে স্টার দিয়ে রাখুন
##### AWS
- [AWS CDK by @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK by @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
- [AWS CDK by @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud ব্যবহার করে ডিপ্লয়

View File

@ -223,8 +223,7 @@ docker compose up -d
使用 [CDK](https://aws.amazon.com/cdk/) 将 Dify 部署到 AWS
##### AWS
- [AWS CDK by @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK by @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
- [AWS CDK by @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### 使用 阿里云计算巢 部署

View File

@ -220,8 +220,7 @@ Stellen Sie Dify mit nur einem Klick mithilfe von [terraform](https://www.terraf
Bereitstellung von Dify auf AWS mit [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK by @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK by @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
- [AWS CDK by @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud

View File

@ -220,8 +220,7 @@ Despliega Dify en una plataforma en la nube con un solo clic utilizando [terrafo
Despliegue Dify en AWS usando [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK por @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK por @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
- [AWS CDK por @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud

View File

@ -218,8 +218,7 @@ Déployez Dify sur une plateforme cloud en un clic en utilisant [terraform](http
Déployez Dify sur AWS en utilisant [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK par @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK par @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
- [AWS CDK par @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud

View File

@ -219,8 +219,7 @@ docker compose up -d
[CDK](https://aws.amazon.com/cdk/) を使用して、DifyをAWSにデプロイします
##### AWS
- [@KevinZhaoによるAWS CDK (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [@tmokmssによるAWS CDK (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
- [@KevinZhaoによるAWS CDK](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud
[Alibaba Cloud Computing Nest](https://computenest.console.aliyun.com/service/instance/create/default?type=user&ServiceName=Dify%E7%A4%BE%E5%8C%BA%E7%89%88)

View File

@ -218,8 +218,7 @@ wa'logh nIqHom neH ghun deployment toy'wI' [terraform](https://www.terraform.io/
wa'logh nIqHom neH ghun deployment toy'wI' [CDK](https://aws.amazon.com/cdk/) lo'laH.
##### AWS
- [AWS CDK qachlot @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK qachlot @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
- [AWS CDK qachlot @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud

View File

@ -212,8 +212,7 @@ Dify를 Kubernetes에 배포하고 프리미엄 스케일링 설정을 구성했
[CDK](https://aws.amazon.com/cdk/)를 사용하여 AWS에 Dify 배포
##### AWS
- [KevinZhao의 AWS CDK (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [tmokmss의 AWS CDK (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
- [KevinZhao의 AWS CDK](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud

View File

@ -217,8 +217,7 @@ Implante o Dify na Plataforma Cloud com um único clique usando [terraform](http
Implante o Dify na AWS usando [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK por @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK por @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
- [AWS CDK por @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud

View File

@ -218,8 +218,7 @@ namestite Dify v Cloud Platform z enim klikom z uporabo [terraform](https://www.
Uvedite Dify v AWS z uporabo [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK by @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK by @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
- [AWS CDK by @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud

View File

@ -211,8 +211,7 @@ Dify'ı bulut platformuna tek tıklamayla dağıtın [terraform](https://www.ter
[CDK](https://aws.amazon.com/cdk/) kullanarak Dify'ı AWS'ye dağıtın
##### AWS
- [AWS CDK tarafından @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK tarafından @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
- [AWS CDK tarafından @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud

View File

@ -223,8 +223,7 @@ Dify 的所有功能都提供相應的 API因此您可以輕鬆地將 Dify
### AWS
- [由 @KevinZhao 提供的 AWS CDK (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [由 @tmokmss 提供的 AWS CDK (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
- [由 @KevinZhao 提供的 AWS CDK](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### 使用 阿里云计算巢進行部署

View File

@ -213,8 +213,7 @@ Triển khai Dify lên nền tảng đám mây với một cú nhấp chuột b
Triển khai Dify trên AWS bằng [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK bởi @KevinZhao (EKS based)](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
- [AWS CDK bởi @tmokmss (ECS based)](https://github.com/aws-samples/dify-self-hosted-on-aws)
- [AWS CDK bởi @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud

View File

@ -42,15 +42,6 @@ REDIS_PORT=6379
REDIS_USERNAME=
REDIS_PASSWORD=difyai123456
REDIS_USE_SSL=false
# SSL configuration for Redis (when REDIS_USE_SSL=true)
REDIS_SSL_CERT_REQS=CERT_NONE
# Options: CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED
REDIS_SSL_CA_CERTS=
# Path to CA certificate file for SSL verification
REDIS_SSL_CERTFILE=
# Path to client certificate file for SSL authentication
REDIS_SSL_KEYFILE=
# Path to client private key file for SSL authentication
REDIS_DB=0
# redis Sentinel configuration.
@ -478,13 +469,6 @@ API_WORKFLOW_NODE_EXECUTION_REPOSITORY=repositories.sqlalchemy_api_workflow_node
# API workflow run repository implementation
API_WORKFLOW_RUN_REPOSITORY=repositories.sqlalchemy_api_workflow_run_repository.DifyAPISQLAlchemyWorkflowRunRepository
# Workflow log cleanup configuration
# Enable automatic cleanup of workflow run logs to manage database size
WORKFLOW_LOG_CLEANUP_ENABLED=true
# Number of days to retain workflow run logs (default: 30 days)
WORKFLOW_LOG_RETENTION_DAYS=30
# Batch size for workflow log cleanup operations (default: 100)
WORKFLOW_LOG_CLEANUP_BATCH_SIZE=100
# App configuration
APP_MAX_EXECUTION_TIME=1200

View File

@ -4,7 +4,7 @@ FROM python:3.12-slim-bookworm AS base
WORKDIR /app/api
# Install uv
ENV UV_VERSION=0.8.9
ENV UV_VERSION=0.7.11
RUN pip install --no-cache-dir uv==${UV_VERSION}

View File

@ -74,7 +74,7 @@
10. If you need to handle and debug the async tasks (e.g. dataset importing and documents indexing), please start the worker service.
```bash
uv run celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage
uv run celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion,plugin
```
Addition, if you want to debug the celery scheduled tasks, you can use the following command in another terminal:

View File

@ -51,7 +51,6 @@ def initialize_extensions(app: DifyApp):
ext_login,
ext_mail,
ext_migrate,
ext_orjson,
ext_otel,
ext_proxy_fix,
ext_redis,
@ -68,7 +67,6 @@ def initialize_extensions(app: DifyApp):
ext_logging,
ext_warnings,
ext_import_modules,
ext_orjson,
ext_set_secretkey,
ext_compress,
ext_code_based_extension,

View File

@ -36,7 +36,6 @@ from services.account_service import AccountService, RegisterService, TenantServ
from services.clear_free_plan_tenant_expired_logs import ClearFreePlanTenantExpiredLogs
from services.plugin.data_migration import PluginDataMigration
from services.plugin.plugin_migration import PluginMigration
from tasks.remove_app_and_related_data_task import delete_draft_variables_batch
@click.command("reset-password", help="Reset the account password.")
@ -1203,138 +1202,3 @@ def setup_system_tool_oauth_client(provider, client_params):
db.session.add(oauth_client)
db.session.commit()
click.echo(click.style(f"OAuth client params setup successfully. id: {oauth_client.id}", fg="green"))
def _find_orphaned_draft_variables(batch_size: int = 1000) -> list[str]:
"""
Find draft variables that reference non-existent apps.
Args:
batch_size: Maximum number of orphaned app IDs to return
Returns:
List of app IDs that have draft variables but don't exist in the apps table
"""
query = """
SELECT DISTINCT wdv.app_id
FROM workflow_draft_variables AS wdv
WHERE NOT EXISTS(
SELECT 1 FROM apps WHERE apps.id = wdv.app_id
)
LIMIT :batch_size
"""
with db.engine.connect() as conn:
result = conn.execute(sa.text(query), {"batch_size": batch_size})
return [row[0] for row in result]
def _count_orphaned_draft_variables() -> dict[str, Any]:
"""
Count orphaned draft variables by app.
Returns:
Dictionary with statistics about orphaned variables
"""
query = """
SELECT
wdv.app_id,
COUNT(*) as variable_count
FROM workflow_draft_variables AS wdv
WHERE NOT EXISTS(
SELECT 1 FROM apps WHERE apps.id = wdv.app_id
)
GROUP BY wdv.app_id
ORDER BY variable_count DESC
"""
with db.engine.connect() as conn:
result = conn.execute(sa.text(query))
orphaned_by_app = {row[0]: row[1] for row in result}
total_orphaned = sum(orphaned_by_app.values())
app_count = len(orphaned_by_app)
return {
"total_orphaned_variables": total_orphaned,
"orphaned_app_count": app_count,
"orphaned_by_app": orphaned_by_app,
}
@click.command()
@click.option("--dry-run", is_flag=True, help="Show what would be deleted without actually deleting")
@click.option("--batch-size", default=1000, help="Number of records to process per batch (default 1000)")
@click.option("--max-apps", default=None, type=int, help="Maximum number of apps to process (default: no limit)")
@click.option("-f", "--force", is_flag=True, help="Skip user confirmation and force the command to execute.")
def cleanup_orphaned_draft_variables(
dry_run: bool,
batch_size: int,
max_apps: int | None,
force: bool = False,
):
"""
Clean up orphaned draft variables from the database.
This script finds and removes draft variables that belong to apps
that no longer exist in the database.
"""
logger = logging.getLogger(__name__)
# Get statistics
stats = _count_orphaned_draft_variables()
logger.info("Found %s orphaned draft variables", stats["total_orphaned_variables"])
logger.info("Across %s non-existent apps", stats["orphaned_app_count"])
if stats["total_orphaned_variables"] == 0:
logger.info("No orphaned draft variables found. Exiting.")
return
if dry_run:
logger.info("DRY RUN: Would delete the following:")
for app_id, count in sorted(stats["orphaned_by_app"].items(), key=lambda x: x[1], reverse=True)[
:10
]: # Show top 10
logger.info(" App %s: %s variables", app_id, count)
if len(stats["orphaned_by_app"]) > 10:
logger.info(" ... and %s more apps", len(stats["orphaned_by_app"]) - 10)
return
# Confirm deletion
if not force:
click.confirm(
f"Are you sure you want to delete {stats['total_orphaned_variables']} "
f"orphaned draft variables from {stats['orphaned_app_count']} apps?",
abort=True,
)
total_deleted = 0
processed_apps = 0
while True:
if max_apps and processed_apps >= max_apps:
logger.info("Reached maximum app limit (%s). Stopping.", max_apps)
break
orphaned_app_ids = _find_orphaned_draft_variables(batch_size=10)
if not orphaned_app_ids:
logger.info("No more orphaned draft variables found.")
break
for app_id in orphaned_app_ids:
if max_apps and processed_apps >= max_apps:
break
try:
deleted_count = delete_draft_variables_batch(app_id, batch_size)
total_deleted += deleted_count
processed_apps += 1
logger.info("Deleted %s variables for app %s", deleted_count, app_id)
except Exception:
logger.exception("Error processing app %s", app_id)
continue
logger.info("Cleanup completed. Total deleted: %s variables across %s apps", total_deleted, processed_apps)

View File

@ -552,18 +552,12 @@ class RepositoryConfig(BaseSettings):
"""
CORE_WORKFLOW_EXECUTION_REPOSITORY: str = Field(
description="Repository implementation for WorkflowExecution. Options: "
"'core.repositories.sqlalchemy_workflow_execution_repository.SQLAlchemyWorkflowExecutionRepository' (default), "
"'core.repositories.celery_workflow_execution_repository.CeleryWorkflowExecutionRepository'",
description="Repository implementation for WorkflowExecution. Specify as a module path",
default="core.repositories.sqlalchemy_workflow_execution_repository.SQLAlchemyWorkflowExecutionRepository",
)
CORE_WORKFLOW_NODE_EXECUTION_REPOSITORY: str = Field(
description="Repository implementation for WorkflowNodeExecution. Options: "
"'core.repositories.sqlalchemy_workflow_node_execution_repository."
"SQLAlchemyWorkflowNodeExecutionRepository' (default), "
"'core.repositories.celery_workflow_node_execution_repository."
"CeleryWorkflowNodeExecutionRepository'",
description="Repository implementation for WorkflowNodeExecution. Specify as a module path",
default="core.repositories.sqlalchemy_workflow_node_execution_repository.SQLAlchemyWorkflowNodeExecutionRepository",
)
@ -968,14 +962,6 @@ class AccountConfig(BaseSettings):
)
class WorkflowLogConfig(BaseSettings):
WORKFLOW_LOG_CLEANUP_ENABLED: bool = Field(default=True, description="Enable workflow run log cleanup")
WORKFLOW_LOG_RETENTION_DAYS: int = Field(default=30, description="Retention days for workflow run logs")
WORKFLOW_LOG_CLEANUP_BATCH_SIZE: int = Field(
default=100, description="Batch size for workflow run log cleanup operations"
)
class FeatureConfig(
# place the configs in alphabet order
AppExecutionConfig,
@ -1011,6 +997,5 @@ class FeatureConfig(
HostedServiceConfig,
CeleryBeatConfig,
CeleryScheduleTasksConfig,
WorkflowLogConfig,
):
pass

View File

@ -39,26 +39,6 @@ class RedisConfig(BaseSettings):
default=False,
)
REDIS_SSL_CERT_REQS: str = Field(
description="SSL certificate requirements (CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED)",
default="CERT_NONE",
)
REDIS_SSL_CA_CERTS: Optional[str] = Field(
description="Path to the CA certificate file for SSL verification",
default=None,
)
REDIS_SSL_CERTFILE: Optional[str] = Field(
description="Path to the client certificate file for SSL authentication",
default=None,
)
REDIS_SSL_KEYFILE: Optional[str] = Field(
description="Path to the client private key file for SSL authentication",
default=None,
)
REDIS_USE_SENTINEL: Optional[bool] = Field(
description="Enable Redis Sentinel mode for high availability",
default=False,

View File

@ -1,7 +1,5 @@
from werkzeug.exceptions import HTTPException
from libs.exception import BaseHTTPException
class FilenameNotExistsError(HTTPException):
code = 400
@ -11,27 +9,3 @@ class FilenameNotExistsError(HTTPException):
class RemoteFileUploadError(HTTPException):
code = 400
description = "Error uploading remote file."
class FileTooLargeError(BaseHTTPException):
error_code = "file_too_large"
description = "File size exceeded. {message}"
code = 413
class UnsupportedFileTypeError(BaseHTTPException):
error_code = "unsupported_file_type"
description = "File type not allowed."
code = 415
class TooManyFilesError(BaseHTTPException):
error_code = "too_many_files"
description = "Only one file is allowed."
code = 400
class NoFileUploadedError(BaseHTTPException):
error_code = "no_file_uploaded"
description = "Please upload your file."
code = 400

View File

@ -1,4 +1,3 @@
import contextlib
import mimetypes
import os
import platform
@ -66,8 +65,10 @@ def guess_file_info_from_response(response: httpx.Response):
# Use python-magic to guess MIME type if still unknown or generic
if mimetype == "application/octet-stream" and magic is not None:
with contextlib.suppress(magic.MagicException):
try:
mimetype = magic.from_buffer(response.content[:1024], mime=True)
except magic.MagicException:
pass
extension = os.path.splitext(filename)[1]

View File

@ -1,12 +1,11 @@
from typing import Literal
from flask import request
from flask_login import current_user
from flask_restful import Resource, marshal, marshal_with, reqparse
from werkzeug.exceptions import Forbidden
from controllers.common.errors import NoFileUploadedError, TooManyFilesError
from controllers.console import api
from controllers.console.app.error import NoFileUploadedError
from controllers.console.datasets.error import TooManyFilesError
from controllers.console.wraps import (
account_initialization_required,
cloud_edition_billing_resource_check,
@ -26,7 +25,7 @@ class AnnotationReplyActionApi(Resource):
@login_required
@account_initialization_required
@cloud_edition_billing_resource_check("annotation")
def post(self, app_id, action: Literal["enable", "disable"]):
def post(self, app_id, action):
if not current_user.is_editor:
raise Forbidden()
@ -40,6 +39,8 @@ class AnnotationReplyActionApi(Resource):
result = AppAnnotationService.enable_app_annotation(args, app_id)
elif action == "disable":
result = AppAnnotationService.disable_app_annotation(app_id)
else:
raise ValueError("Unsupported annotation reply action")
return result, 200

View File

@ -1,7 +1,6 @@
import logging
import flask_login
from flask import request
from flask_restful import Resource, reqparse
from werkzeug.exceptions import InternalServerError, NotFound
@ -25,7 +24,6 @@ from core.errors.error import (
ProviderTokenNotInitError,
QuotaExceededError,
)
from core.helper.trace_id_helper import get_external_trace_id
from core.model_runtime.errors.invoke import InvokeError
from libs import helper
from libs.helper import uuid_value
@ -117,10 +115,6 @@ class ChatMessageApi(Resource):
streaming = args["response_mode"] != "blocking"
args["auto_generate_name"] = False
external_trace_id = get_external_trace_id(request)
if external_trace_id:
args["external_trace_id"] = external_trace_id
account = flask_login.current_user
try:

View File

@ -79,6 +79,18 @@ class ProviderNotSupportSpeechToTextError(BaseHTTPException):
code = 400
class NoFileUploadedError(BaseHTTPException):
error_code = "no_file_uploaded"
description = "Please upload your file."
code = 400
class TooManyFilesError(BaseHTTPException):
error_code = "too_many_files"
description = "Only one file is allowed."
code = 400
class DraftWorkflowNotExist(BaseHTTPException):
error_code = "draft_workflow_not_exist"
description = "Draft workflow need to be initialized."

View File

@ -1,5 +1,3 @@
from collections.abc import Sequence
from flask_login import current_user
from flask_restful import Resource, reqparse
@ -12,8 +10,6 @@ from controllers.console.app.error import (
)
from controllers.console.wraps import account_initialization_required, setup_required
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from core.helper.code_executor.javascript.javascript_code_provider import JavascriptCodeProvider
from core.helper.code_executor.python3.python3_code_provider import Python3CodeProvider
from core.llm_generator.llm_generator import LLMGenerator
from core.model_runtime.errors.invoke import InvokeError
from libs.login import login_required
@ -111,121 +107,6 @@ class RuleStructuredOutputGenerateApi(Resource):
return structured_output
class InstructionGenerateApi(Resource):
@setup_required
@login_required
@account_initialization_required
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("flow_id", type=str, required=True, default="", location="json")
parser.add_argument("node_id", type=str, required=False, default="", location="json")
parser.add_argument("current", type=str, required=False, default="", location="json")
parser.add_argument("language", type=str, required=False, default="javascript", location="json")
parser.add_argument("instruction", type=str, required=True, nullable=False, location="json")
parser.add_argument("model_config", type=dict, required=True, nullable=False, location="json")
parser.add_argument("ideal_output", type=str, required=False, default="", location="json")
args = parser.parse_args()
code_template = (
Python3CodeProvider.get_default_code()
if args["language"] == "python"
else (JavascriptCodeProvider.get_default_code())
if args["language"] == "javascript"
else ""
)
try:
# Generate from nothing for a workflow node
if (args["current"] == code_template or args["current"] == "") and args["node_id"] != "":
from models import App, db
from services.workflow_service import WorkflowService
app = db.session.query(App).where(App.id == args["flow_id"]).first()
if not app:
return {"error": f"app {args['flow_id']} not found"}, 400
workflow = WorkflowService().get_draft_workflow(app_model=app)
if not workflow:
return {"error": f"workflow {args['flow_id']} not found"}, 400
nodes: Sequence = workflow.graph_dict["nodes"]
node = [node for node in nodes if node["id"] == args["node_id"]]
if len(node) == 0:
return {"error": f"node {args['node_id']} not found"}, 400
node_type = node[0]["data"]["type"]
match node_type:
case "llm":
return LLMGenerator.generate_rule_config(
current_user.current_tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
no_variable=True,
)
case "agent":
return LLMGenerator.generate_rule_config(
current_user.current_tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
no_variable=True,
)
case "code":
return LLMGenerator.generate_code(
tenant_id=current_user.current_tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
code_language=args["language"],
)
case _:
return {"error": f"invalid node type: {node_type}"}
if args["node_id"] == "" and args["current"] != "": # For legacy app without a workflow
return LLMGenerator.instruction_modify_legacy(
tenant_id=current_user.current_tenant_id,
flow_id=args["flow_id"],
current=args["current"],
instruction=args["instruction"],
model_config=args["model_config"],
ideal_output=args["ideal_output"],
)
if args["node_id"] != "" and args["current"] != "": # For workflow node
return LLMGenerator.instruction_modify_workflow(
tenant_id=current_user.current_tenant_id,
flow_id=args["flow_id"],
node_id=args["node_id"],
current=args["current"],
instruction=args["instruction"],
model_config=args["model_config"],
ideal_output=args["ideal_output"],
)
return {"error": "incompatible parameters"}, 400
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
except QuotaExceededError:
raise ProviderQuotaExceededError()
except ModelCurrentlyNotSupportError:
raise ProviderModelCurrentlyNotSupportError()
except InvokeError as e:
raise CompletionRequestError(e.description)
class InstructionGenerationTemplateApi(Resource):
@setup_required
@login_required
@account_initialization_required
def post(self) -> dict:
parser = reqparse.RequestParser()
parser.add_argument("type", type=str, required=True, default=False, location="json")
args = parser.parse_args()
match args["type"]:
case "prompt":
from core.llm_generator.prompts import INSTRUCTION_GENERATE_TEMPLATE_PROMPT
return {"data": INSTRUCTION_GENERATE_TEMPLATE_PROMPT}
case "code":
from core.llm_generator.prompts import INSTRUCTION_GENERATE_TEMPLATE_CODE
return {"data": INSTRUCTION_GENERATE_TEMPLATE_CODE}
case _:
raise ValueError(f"Invalid type: {args['type']}")
api.add_resource(RuleGenerateApi, "/rule-generate")
api.add_resource(RuleCodeGenerateApi, "/rule-code-generate")
api.add_resource(RuleStructuredOutputGenerateApi, "/rule-structured-output-generate")
api.add_resource(InstructionGenerateApi, "/instruction-generate")
api.add_resource(InstructionGenerationTemplateApi, "/instruction-generate/template")

View File

@ -27,7 +27,7 @@ from fields.conversation_fields import annotation_fields, message_detail_fields
from libs.helper import uuid_value
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from libs.login import login_required
from models.model import AppMode, Conversation, Message, MessageAnnotation, MessageFeedback
from models.model import AppMode, Conversation, Message, MessageAnnotation
from services.annotation_service import AppAnnotationService
from services.errors.conversation import ConversationNotExistsError
from services.errors.message import MessageNotExistsError, SuggestedQuestionsAfterAnswerDisabledError
@ -124,33 +124,16 @@ class MessageFeedbackApi(Resource):
parser.add_argument("rating", type=str, choices=["like", "dislike", None], location="json")
args = parser.parse_args()
message_id = str(args["message_id"])
message = db.session.query(Message).filter(Message.id == message_id, Message.app_id == app_model.id).first()
if not message:
raise NotFound("Message Not Exists.")
feedback = message.admin_feedback
if not args["rating"] and feedback:
db.session.delete(feedback)
elif args["rating"] and feedback:
feedback.rating = args["rating"]
elif not args["rating"] and not feedback:
raise ValueError("rating cannot be None when feedback not exists")
else:
feedback = MessageFeedback(
app_id=app_model.id,
conversation_id=message.conversation_id,
message_id=message.id,
rating=args["rating"],
from_source="admin",
from_account_id=current_user.id,
try:
MessageService.create_feedback(
app_model=app_model,
message_id=str(args["message_id"]),
user=current_user,
rating=args.get("rating"),
content=None,
)
db.session.add(feedback)
db.session.commit()
except MessageNotExistsError:
raise NotFound("Message Not Exists.")
return {"result": "success"}

View File

@ -23,7 +23,6 @@ from core.app.app_config.features.file_upload.manager import FileUploadConfigMan
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.file.models import File
from core.helper.trace_id_helper import get_external_trace_id
from extensions.ext_database import db
from factories import file_factory, variable_factory
from fields.workflow_fields import workflow_fields, workflow_pagination_fields
@ -186,10 +185,6 @@ class AdvancedChatDraftWorkflowRunApi(Resource):
args = parser.parse_args()
external_trace_id = get_external_trace_id(request)
if external_trace_id:
args["external_trace_id"] = external_trace_id
try:
response = AppGenerateService.generate(
app_model=app_model, user=current_user, args=args, invoke_from=InvokeFrom.DEBUGGER, streaming=True
@ -378,10 +373,6 @@ class DraftWorkflowRunApi(Resource):
parser.add_argument("files", type=list, required=False, location="json")
args = parser.parse_args()
external_trace_id = get_external_trace_id(request)
if external_trace_id:
args["external_trace_id"] = external_trace_id
try:
response = AppGenerateService.generate(
app_model=app_model,

View File

@ -163,11 +163,11 @@ class WorkflowVariableCollectionApi(Resource):
draft_var_srv = WorkflowDraftVariableService(
session=session,
)
workflow_vars = draft_var_srv.list_variables_without_values(
app_id=app_model.id,
page=args.page,
limit=args.limit,
)
workflow_vars = draft_var_srv.list_variables_without_values(
app_id=app_model.id,
page=args.page,
limit=args.limit,
)
return workflow_vars

View File

@ -1,6 +1,6 @@
import logging
from argparse import ArgumentTypeError
from typing import Literal, cast
from typing import cast
from flask import request
from flask_login import current_user
@ -758,7 +758,7 @@ class DocumentProcessingApi(DocumentResource):
@login_required
@account_initialization_required
@cloud_edition_billing_rate_limit_check("knowledge")
def patch(self, dataset_id, document_id, action: Literal["pause", "resume"]):
def patch(self, dataset_id, document_id, action):
dataset_id = str(dataset_id)
document_id = str(document_id)
document = self.get_document(dataset_id, document_id)
@ -784,6 +784,8 @@ class DocumentProcessingApi(DocumentResource):
document.paused_at = None
document.is_paused = False
db.session.commit()
else:
raise InvalidActionError()
return {"result": "success"}, 200
@ -838,7 +840,7 @@ class DocumentStatusApi(DocumentResource):
@account_initialization_required
@cloud_edition_billing_resource_check("vector_space")
@cloud_edition_billing_rate_limit_check("knowledge")
def patch(self, dataset_id, action: Literal["enable", "disable", "archive", "un_archive"]):
def patch(self, dataset_id, action):
dataset_id = str(dataset_id)
dataset = DatasetService.get_dataset(dataset_id)
if dataset is None:

View File

@ -1,6 +1,30 @@
from libs.exception import BaseHTTPException
class NoFileUploadedError(BaseHTTPException):
error_code = "no_file_uploaded"
description = "Please upload your file."
code = 400
class TooManyFilesError(BaseHTTPException):
error_code = "too_many_files"
description = "Only one file is allowed."
code = 400
class FileTooLargeError(BaseHTTPException):
error_code = "file_too_large"
description = "File size exceeded. {message}"
code = 413
class UnsupportedFileTypeError(BaseHTTPException):
error_code = "unsupported_file_type"
description = "File type not allowed."
code = 415
class DatasetNotInitializedError(BaseHTTPException):
error_code = "dataset_not_initialized"
description = "The dataset is still being initialized or indexing. Please wait a moment."

View File

@ -1,5 +1,3 @@
from typing import Literal
from flask_login import current_user
from flask_restful import Resource, marshal_with, reqparse
from werkzeug.exceptions import NotFound
@ -102,7 +100,7 @@ class DatasetMetadataBuiltInFieldActionApi(Resource):
@login_required
@account_initialization_required
@enterprise_license_required
def post(self, dataset_id, action: Literal["enable", "disable"]):
def post(self, dataset_id, action):
dataset_id_str = str(dataset_id)
dataset = DatasetService.get_dataset(dataset_id_str)
if dataset is None:

View File

@ -39,7 +39,7 @@ class UploadFileApi(Resource):
data_source_info = document.data_source_info_dict
if data_source_info and "upload_file_id" in data_source_info:
file_id = data_source_info["upload_file_id"]
upload_file = db.session.query(UploadFile).where(UploadFile.id == file_id).first()
upload_file = db.session.query(UploadFile).filter(UploadFile.id == file_id).first()
if not upload_file:
raise NotFound("UploadFile not found.")
else:

View File

@ -76,6 +76,30 @@ class EmailSendIpLimitError(BaseHTTPException):
code = 429
class FileTooLargeError(BaseHTTPException):
error_code = "file_too_large"
description = "File size exceeded. {message}"
code = 413
class UnsupportedFileTypeError(BaseHTTPException):
error_code = "unsupported_file_type"
description = "File type not allowed."
code = 415
class TooManyFilesError(BaseHTTPException):
error_code = "too_many_files"
description = "Only one file is allowed."
code = 400
class NoFileUploadedError(BaseHTTPException):
error_code = "no_file_uploaded"
description = "Please upload your file."
code = 400
class UnauthorizedAndForceLogout(BaseHTTPException):
error_code = "unauthorized_and_force_logout"
description = "Unauthorized and force logout."

View File

@ -8,13 +8,7 @@ from werkzeug.exceptions import Forbidden
import services
from configs import dify_config
from constants import DOCUMENT_EXTENSIONS
from controllers.common.errors import (
FilenameNotExistsError,
FileTooLargeError,
NoFileUploadedError,
TooManyFilesError,
UnsupportedFileTypeError,
)
from controllers.common.errors import FilenameNotExistsError
from controllers.console.wraps import (
account_initialization_required,
cloud_edition_billing_resource_check,
@ -24,6 +18,13 @@ from fields.file_fields import file_fields, upload_config_fields
from libs.login import login_required
from services.file_service import FileService
from .error import (
FileTooLargeError,
NoFileUploadedError,
TooManyFilesError,
UnsupportedFileTypeError,
)
PREVIEW_WORDS_LIMIT = 3000

View File

@ -7,17 +7,18 @@ from flask_restful import Resource, marshal_with, reqparse
import services
from controllers.common import helpers
from controllers.common.errors import (
FileTooLargeError,
RemoteFileUploadError,
UnsupportedFileTypeError,
)
from controllers.common.errors import RemoteFileUploadError
from core.file import helpers as file_helpers
from core.helper import ssrf_proxy
from fields.file_fields import file_fields_with_signed_url, remote_file_info_fields
from models.account import Account
from services.file_service import FileService
from .error import (
FileTooLargeError,
UnsupportedFileTypeError,
)
class RemoteFileInfoApi(Resource):
@marshal_with(remote_file_info_fields)

View File

@ -32,7 +32,7 @@ class VersionApi(Resource):
return result
try:
response = requests.get(check_update_url, {"current_version": args.get("current_version")}, timeout=(3, 10))
response = requests.get(check_update_url, {"current_version": args.get("current_version")})
except Exception as error:
logging.warning("Check update version error: %s.", str(error))
result["version"] = args.get("current_version")

View File

@ -862,10 +862,6 @@ class ToolProviderMCPApi(Resource):
parser.add_argument("icon_type", type=str, required=True, nullable=False, location="json")
parser.add_argument("icon_background", type=str, required=False, nullable=True, location="json", default="")
parser.add_argument("server_identifier", type=str, required=True, nullable=False, location="json")
parser.add_argument("timeout", type=float, required=False, nullable=False, location="json", default=30)
parser.add_argument(
"sse_read_timeout", type=float, required=False, nullable=False, location="json", default=300
)
args = parser.parse_args()
user = current_user
if not is_valid_url(args["server_url"]):
@ -880,8 +876,6 @@ class ToolProviderMCPApi(Resource):
icon_background=args["icon_background"],
user_id=user.id,
server_identifier=args["server_identifier"],
timeout=args["timeout"],
sse_read_timeout=args["sse_read_timeout"],
)
)
@ -897,8 +891,6 @@ class ToolProviderMCPApi(Resource):
parser.add_argument("icon_background", type=str, required=False, nullable=True, location="json")
parser.add_argument("provider_id", type=str, required=True, nullable=False, location="json")
parser.add_argument("server_identifier", type=str, required=True, nullable=False, location="json")
parser.add_argument("timeout", type=float, required=False, nullable=True, location="json")
parser.add_argument("sse_read_timeout", type=float, required=False, nullable=True, location="json")
args = parser.parse_args()
if not is_valid_url(args["server_url"]):
if "[__HIDDEN__]" in args["server_url"]:
@ -914,8 +906,6 @@ class ToolProviderMCPApi(Resource):
icon_type=args["icon_type"],
icon_background=args["icon_background"],
server_identifier=args["server_identifier"],
timeout=args.get("timeout"),
sse_read_timeout=args.get("sse_read_timeout"),
)
return {"result": "success"}

View File

@ -7,15 +7,15 @@ from sqlalchemy import select
from werkzeug.exceptions import Unauthorized
import services
from controllers.common.errors import (
FilenameNotExistsError,
from controllers.common.errors import FilenameNotExistsError
from controllers.console import api
from controllers.console.admin import admin_required
from controllers.console.datasets.error import (
FileTooLargeError,
NoFileUploadedError,
TooManyFilesError,
UnsupportedFileTypeError,
)
from controllers.console import api
from controllers.console.admin import admin_required
from controllers.console.error import AccountNotLinkTenantError
from controllers.console.wraps import (
account_initialization_required,

View File

@ -0,0 +1,7 @@
from libs.exception import BaseHTTPException
class UnsupportedFileTypeError(BaseHTTPException):
error_code = "unsupported_file_type"
description = "File type not allowed."
code = 415

View File

@ -5,8 +5,8 @@ from flask_restful import Resource, reqparse
from werkzeug.exceptions import NotFound
import services
from controllers.common.errors import UnsupportedFileTypeError
from controllers.files import api
from controllers.files.error import UnsupportedFileTypeError
from services.account_service import TenantService
from services.file_service import FileService

View File

@ -4,8 +4,8 @@ from flask import Response
from flask_restful import Resource, reqparse
from werkzeug.exceptions import Forbidden, NotFound
from controllers.common.errors import UnsupportedFileTypeError
from controllers.files import api
from controllers.files.error import UnsupportedFileTypeError
from core.tools.signature import verify_tool_file_signature
from core.tools.tool_file_manager import ToolFileManager
from models import db as global_db

View File

@ -5,13 +5,11 @@ from flask_restful import Resource, marshal_with
from werkzeug.exceptions import Forbidden
import services
from controllers.common.errors import (
FileTooLargeError,
UnsupportedFileTypeError,
)
from controllers.console.wraps import setup_required
from controllers.files import api
from controllers.files.error import UnsupportedFileTypeError
from controllers.inner_api.plugin.wraps import get_user
from controllers.service_api.app.error import FileTooLargeError
from core.file.helpers import verify_plugin_file_signature
from core.tools.tool_file_manager import ToolFileManager
from fields.file_fields import file_fields

View File

@ -1,5 +1,3 @@
from typing import Literal
from flask import request
from flask_restful import Resource, marshal, marshal_with, reqparse
from werkzeug.exceptions import Forbidden
@ -17,7 +15,7 @@ from services.annotation_service import AppAnnotationService
class AnnotationReplyActionApi(Resource):
@validate_app_token
def post(self, app_model: App, action: Literal["enable", "disable"]):
def post(self, app_model: App, action):
parser = reqparse.RequestParser()
parser.add_argument("score_threshold", required=True, type=float, location="json")
parser.add_argument("embedding_provider_name", required=True, type=str, location="json")
@ -27,6 +25,8 @@ class AnnotationReplyActionApi(Resource):
result = AppAnnotationService.enable_app_annotation(args, app_model.id)
elif action == "disable":
result = AppAnnotationService.disable_app_annotation(app_model.id)
else:
raise ValueError("Unsupported annotation reply action")
return result, 200

View File

@ -1,3 +1,5 @@
import json
from flask_restful import Resource, marshal_with, reqparse
from flask_restful.inputs import int_range
from sqlalchemy.orm import Session
@ -134,15 +136,12 @@ class ConversationVariableDetailApi(Resource):
variable_id = str(variable_id)
parser = reqparse.RequestParser()
# using lambda is for passing the already-typed value without modification
# if no lambda, it will be converted to string
# the string cannot be converted using json.loads
parser.add_argument("value", required=True, location="json", type=lambda x: x)
parser.add_argument("value", required=True, location="json")
args = parser.parse_args()
try:
return ConversationService.update_conversation_variable(
app_model, conversation_id, variable_id, end_user, args["value"]
app_model, conversation_id, variable_id, end_user, json.loads(args["value"])
)
except services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")

View File

@ -85,6 +85,30 @@ class ProviderNotSupportSpeechToTextError(BaseHTTPException):
code = 400
class NoFileUploadedError(BaseHTTPException):
error_code = "no_file_uploaded"
description = "Please upload your file."
code = 400
class TooManyFilesError(BaseHTTPException):
error_code = "too_many_files"
description = "Only one file is allowed."
code = 400
class FileTooLargeError(BaseHTTPException):
error_code = "file_too_large"
description = "File size exceeded. {message}"
code = 413
class UnsupportedFileTypeError(BaseHTTPException):
error_code = "unsupported_file_type"
description = "File type not allowed."
code = 415
class FileNotFoundError(BaseHTTPException):
error_code = "file_not_found"
description = "The requested file was not found."

View File

@ -2,14 +2,14 @@ from flask import request
from flask_restful import Resource, marshal_with
import services
from controllers.common.errors import (
FilenameNotExistsError,
from controllers.common.errors import FilenameNotExistsError
from controllers.service_api import api
from controllers.service_api.app.error import (
FileTooLargeError,
NoFileUploadedError,
TooManyFilesError,
UnsupportedFileTypeError,
)
from controllers.service_api import api
from controllers.service_api.wraps import FetchUserArg, WhereisUserArg, validate_app_token
from fields.file_fields import file_fields
from models.model import App, EndUser

View File

@ -1,5 +1,3 @@
from typing import Literal
from flask import request
from flask_restful import marshal, marshal_with, reqparse
from werkzeug.exceptions import Forbidden, NotFound
@ -360,14 +358,14 @@ class DatasetApi(DatasetApiResource):
class DocumentStatusApi(DatasetApiResource):
"""Resource for batch document status operations."""
def patch(self, tenant_id, dataset_id, action: Literal["enable", "disable", "archive", "un_archive"]):
def patch(self, tenant_id, dataset_id, action):
"""
Batch update document status.
Args:
tenant_id: tenant id
dataset_id: dataset id
action: action to perform (Literal["enable", "disable", "archive", "un_archive"])
action: action to perform (enable, disable, archive, un_archive)
Returns:
dict: A dictionary with a key 'result' and a value 'success'

View File

@ -6,15 +6,15 @@ from sqlalchemy import desc, select
from werkzeug.exceptions import Forbidden, NotFound
import services
from controllers.common.errors import (
FilenameNotExistsError,
from controllers.common.errors import FilenameNotExistsError
from controllers.service_api import api
from controllers.service_api.app.error import (
FileTooLargeError,
NoFileUploadedError,
ProviderNotInitializeError,
TooManyFilesError,
UnsupportedFileTypeError,
)
from controllers.service_api import api
from controllers.service_api.app.error import ProviderNotInitializeError
from controllers.service_api.dataset.error import (
ArchivedDocumentImmutableError,
DocumentIndexingError,

View File

@ -1,6 +1,30 @@
from libs.exception import BaseHTTPException
class NoFileUploadedError(BaseHTTPException):
error_code = "no_file_uploaded"
description = "Please upload your file."
code = 400
class TooManyFilesError(BaseHTTPException):
error_code = "too_many_files"
description = "Only one file is allowed."
code = 400
class FileTooLargeError(BaseHTTPException):
error_code = "file_too_large"
description = "File size exceeded. {message}"
code = 413
class UnsupportedFileTypeError(BaseHTTPException):
error_code = "unsupported_file_type"
description = "File type not allowed."
code = 415
class DatasetNotInitializedError(BaseHTTPException):
error_code = "dataset_not_initialized"
description = "The dataset is still being initialized or indexing. Please wait a moment."

View File

@ -1,5 +1,3 @@
from typing import Literal
from flask_login import current_user # type: ignore
from flask_restful import marshal, reqparse
from werkzeug.exceptions import NotFound
@ -79,7 +77,7 @@ class DatasetMetadataBuiltInFieldServiceApi(DatasetApiResource):
class DatasetMetadataBuiltInFieldActionServiceApi(DatasetApiResource):
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id, dataset_id, action: Literal["enable", "disable"]):
def post(self, tenant_id, dataset_id, action):
dataset_id_str = str(dataset_id)
dataset = DatasetService.get_dataset(dataset_id_str)
if dataset is None:

View File

@ -97,6 +97,30 @@ class ProviderNotSupportSpeechToTextError(BaseHTTPException):
code = 400
class NoFileUploadedError(BaseHTTPException):
error_code = "no_file_uploaded"
description = "Please upload your file."
code = 400
class TooManyFilesError(BaseHTTPException):
error_code = "too_many_files"
description = "Only one file is allowed."
code = 400
class FileTooLargeError(BaseHTTPException):
error_code = "file_too_large"
description = "File size exceeded. {message}"
code = 413
class UnsupportedFileTypeError(BaseHTTPException):
error_code = "unsupported_file_type"
description = "File type not allowed."
code = 415
class WebAppAuthRequiredError(BaseHTTPException):
error_code = "web_sso_auth_required"
description = "Web app authentication required."

View File

@ -2,13 +2,8 @@ from flask import request
from flask_restful import marshal_with
import services
from controllers.common.errors import (
FilenameNotExistsError,
FileTooLargeError,
NoFileUploadedError,
TooManyFilesError,
UnsupportedFileTypeError,
)
from controllers.common.errors import FilenameNotExistsError
from controllers.web.error import FileTooLargeError, NoFileUploadedError, TooManyFilesError, UnsupportedFileTypeError
from controllers.web.wraps import WebApiResource
from fields.file_fields import file_fields
from services.file_service import FileService

View File

@ -5,17 +5,15 @@ from flask_restful import marshal_with, reqparse
import services
from controllers.common import helpers
from controllers.common.errors import (
FileTooLargeError,
RemoteFileUploadError,
UnsupportedFileTypeError,
)
from controllers.common.errors import RemoteFileUploadError
from controllers.web.wraps import WebApiResource
from core.file import helpers as file_helpers
from core.helper import ssrf_proxy
from fields.file_fields import file_fields_with_signed_url, remote_file_info_fields
from services.file_service import FileService
from .error import FileTooLargeError, UnsupportedFileTypeError
class RemoteFileInfoApi(WebApiResource):
@marshal_with(remote_file_info_fields)

View File

@ -74,7 +74,6 @@ from core.workflow.system_variable import SystemVariable
from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager
from events.message_event import message_was_created
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models import Conversation, EndUser, Message, MessageFile
from models.account import Account
from models.enums import CreatorUserRole
@ -569,7 +568,7 @@ class AdvancedChatAppGenerateTaskPipeline:
)
yield workflow_finish_resp
self._base_task_pipeline.queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
self._base_task_pipeline._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
def _handle_workflow_partial_success_event(
self,
@ -601,7 +600,7 @@ class AdvancedChatAppGenerateTaskPipeline:
)
yield workflow_finish_resp
self._base_task_pipeline.queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
self._base_task_pipeline._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
def _handle_workflow_failed_event(
self,
@ -846,7 +845,7 @@ class AdvancedChatAppGenerateTaskPipeline:
# Initialize graph runtime state
graph_runtime_state: Optional[GraphRuntimeState] = None
for queue_message in self._base_task_pipeline.queue_manager.listen():
for queue_message in self._base_task_pipeline._queue_manager.listen():
event = queue_message.event
match event:
@ -897,7 +896,6 @@ class AdvancedChatAppGenerateTaskPipeline:
def _save_message(self, *, session: Session, graph_runtime_state: Optional[GraphRuntimeState] = None) -> None:
message = self._get_message(session=session)
message.answer = self._task_state.answer
message.updated_at = naive_utc_now()
message.provider_response_latency = time.perf_counter() - self._base_task_pipeline._start_at
message.message_metadata = self._task_state.metadata.model_dump_json()
message_files = [
@ -961,11 +959,11 @@ class AdvancedChatAppGenerateTaskPipeline:
if self._base_task_pipeline._output_moderation_handler:
if self._base_task_pipeline._output_moderation_handler.should_direct_output():
self._task_state.answer = self._base_task_pipeline._output_moderation_handler.get_final_output()
self._base_task_pipeline.queue_manager.publish(
self._base_task_pipeline._queue_manager.publish(
QueueTextChunkEvent(text=self._task_state.answer), PublishFrom.TASK_PIPELINE
)
self._base_task_pipeline.queue_manager.publish(
self._base_task_pipeline._queue_manager.publish(
QueueStopEvent(stopped_by=QueueStopEvent.StopBy.OUTPUT_MODERATION), PublishFrom.TASK_PIPELINE
)
return True

View File

@ -140,9 +140,7 @@ class ChatAppGenerator(MessageBasedAppGenerator):
)
# get tracing instance
trace_manager = TraceQueueManager(
app_id=app_model.id, user_id=user.id if isinstance(user, Account) else user.session_id
)
trace_manager = TraceQueueManager(app_id=app_model.id)
# init application generate entity
application_generate_entity = ChatAppGenerateEntity(

View File

@ -124,9 +124,7 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
)
# get tracing instance
trace_manager = TraceQueueManager(
app_id=app_model.id, user_id=user.id if isinstance(user, Account) else user.session_id
)
trace_manager = TraceQueueManager(app_model.id)
# init application generate entity
application_generate_entity = CompletionAppGenerateEntity(

View File

@ -6,6 +6,7 @@ from core.app.entities.queue_entities import (
MessageQueueMessage,
QueueAdvancedChatMessageEndEvent,
QueueErrorEvent,
QueueMessage,
QueueMessageEndEvent,
QueueStopEvent,
)
@ -21,6 +22,15 @@ class MessageBasedAppQueueManager(AppQueueManager):
self._app_mode = app_mode
self._message_id = str(message_id)
def construct_queue_message(self, event: AppQueueEvent) -> QueueMessage:
return MessageQueueMessage(
task_id=self._task_id,
message_id=self._message_id,
conversation_id=self._conversation_id,
app_mode=self._app_mode,
event=event,
)
def _publish(self, event: AppQueueEvent, pub_from: PublishFrom) -> None:
"""
Publish event to queue

View File

@ -711,7 +711,7 @@ class WorkflowAppGenerateTaskPipeline:
# Initialize graph runtime state
graph_runtime_state = None
for queue_message in self._base_task_pipeline.queue_manager.listen():
for queue_message in self._base_task_pipeline._queue_manager.listen():
event = queue_message.event
match event:

View File

@ -37,7 +37,7 @@ class BasedGenerateTaskPipeline:
stream: bool,
) -> None:
self._application_generate_entity = application_generate_entity
self.queue_manager = queue_manager
self._queue_manager = queue_manager
self._start_at = time.perf_counter()
self._output_moderation_handler = self._init_output_moderation()
self._stream = stream
@ -113,7 +113,7 @@ class BasedGenerateTaskPipeline:
tenant_id=app_config.tenant_id,
app_id=app_config.app_id,
rule=ModerationRule(type=sensitive_word_avoidance.type, config=sensitive_word_avoidance.config),
queue_manager=self.queue_manager,
queue_manager=self._queue_manager,
)
return None

View File

@ -57,7 +57,6 @@ from core.prompt.utils.prompt_message_util import PromptMessageUtil
from core.prompt.utils.prompt_template_parser import PromptTemplateParser
from events.message_event import message_was_created
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models.model import AppMode, Conversation, Message, MessageAgentThought
logger = logging.getLogger(__name__)
@ -258,7 +257,7 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
Process stream response.
:return:
"""
for message in self.queue_manager.listen():
for message in self._queue_manager.listen():
if publisher:
publisher.publish(message)
event = message.event
@ -390,7 +389,6 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
if llm_result.message.content
else ""
)
message.updated_at = naive_utc_now()
message.answer_tokens = usage.completion_tokens
message.answer_unit_price = usage.completion_unit_price
message.answer_price_unit = usage.completion_price_unit
@ -501,7 +499,7 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
if self._output_moderation_handler.should_direct_output():
# stop subscribe new token when output moderation should direct output
self._task_state.llm_result.message.content = self._output_moderation_handler.get_final_output()
self.queue_manager.publish(
self._queue_manager.publish(
QueueLLMChunkEvent(
chunk=LLMResultChunk(
model=self._task_state.llm_result.model,
@ -515,7 +513,7 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
PublishFrom.TASK_PIPELINE,
)
self.queue_manager.publish(
self._queue_manager.publish(
QueueStopEvent(stopped_by=QueueStopEvent.StopBy.OUTPUT_MODERATION), PublishFrom.TASK_PIPELINE
)
return True

View File

@ -181,7 +181,7 @@ class MessageCycleManager:
:param message_id: message id
:return:
"""
message_file = db.session.query(MessageFile).where(MessageFile.id == message_id).first()
message_file = db.session.query(MessageFile).filter(MessageFile.id == message_id).first()
event_type = StreamEvent.MESSAGE_FILE if message_file else StreamEvent.MESSAGE
return MessageStreamResponse(

View File

@ -5,7 +5,7 @@ from base64 import b64encode
from collections.abc import Mapping
from typing import Any
from core.variables.utils import dumps_with_segments
from core.variables.utils import SegmentJSONEncoder
class TemplateTransformer(ABC):
@ -93,7 +93,7 @@ class TemplateTransformer(ABC):
@classmethod
def serialize_inputs(cls, inputs: Mapping[str, Any]) -> str:
inputs_json_str = dumps_with_segments(inputs, ensure_ascii=False).encode()
inputs_json_str = json.dumps(inputs, ensure_ascii=False, cls=SegmentJSONEncoder).encode()
input_base64_encoded = b64encode(inputs_json_str).decode("utf-8")
return input_base64_encoded

View File

@ -16,33 +16,15 @@ def get_external_trace_id(request: Any) -> Optional[str]:
"""
Retrieve the trace_id from the request.
Priority:
1. header ('X-Trace-Id')
2. parameters
3. JSON body
4. Current OpenTelemetry context (if enabled)
5. OpenTelemetry traceparent header (if present and valid)
Returns None if no valid trace_id is provided.
Priority: header ('X-Trace-Id'), then parameters, then JSON body. Returns None if not provided or invalid.
"""
trace_id = request.headers.get("X-Trace-Id")
if not trace_id:
trace_id = request.args.get("trace_id")
if not trace_id and getattr(request, "is_json", False):
json_data = getattr(request, "json", None)
if json_data:
trace_id = json_data.get("trace_id")
if not trace_id:
trace_id = get_trace_id_from_otel_context()
if not trace_id:
traceparent = request.headers.get("traceparent")
if traceparent:
trace_id = parse_traceparent_header(traceparent)
if isinstance(trace_id, str) and is_valid_trace_id(trace_id):
return trace_id
return None
@ -58,49 +40,3 @@ def extract_external_trace_id_from_args(args: Mapping[str, Any]) -> dict:
if trace_id:
return {"external_trace_id": trace_id}
return {}
def get_trace_id_from_otel_context() -> Optional[str]:
"""
Retrieve the current trace ID from the active OpenTelemetry trace context.
Returns None if:
1. OpenTelemetry SDK is not installed or enabled.
2. There is no active span or trace context.
"""
try:
from opentelemetry.trace import SpanContext, get_current_span
from opentelemetry.trace.span import INVALID_TRACE_ID
span = get_current_span()
if not span:
return None
span_context: SpanContext = span.get_span_context()
if not span_context or span_context.trace_id == INVALID_TRACE_ID:
return None
trace_id_hex = f"{span_context.trace_id:032x}"
return trace_id_hex
except Exception:
return None
def parse_traceparent_header(traceparent: str) -> Optional[str]:
"""
Parse the `traceparent` header to extract the trace_id.
Expected format:
'version-trace_id-span_id-flags'
Reference:
W3C Trace Context Specification: https://www.w3.org/TR/trace-context/
"""
try:
parts = traceparent.split("-")
if len(parts) == 4 and len(parts[1]) == 32:
return parts[1]
except Exception:
pass
return None

View File

@ -1,7 +1,6 @@
import json
import logging
import re
from collections.abc import Sequence
from typing import Optional, cast
import json_repair
@ -12,8 +11,6 @@ from core.llm_generator.prompts import (
CONVERSATION_TITLE_PROMPT,
GENERATOR_QA_PROMPT,
JAVASCRIPT_CODE_GENERATOR_PROMPT_TEMPLATE,
LLM_MODIFY_CODE_SYSTEM,
LLM_MODIFY_PROMPT_SYSTEM,
PYTHON_CODE_GENERATOR_PROMPT_TEMPLATE,
SYSTEM_STRUCTURED_OUTPUT_GENERATE,
WORKFLOW_RULE_CONFIG_PROMPT_GENERATE_TEMPLATE,
@ -27,9 +24,6 @@ from core.ops.entities.trace_entity import TraceTaskName
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.ops.utils import measure_time
from core.prompt.utils.prompt_template_parser import PromptTemplateParser
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey
from core.workflow.graph_engine.entities.event import AgentLogEvent
from models import App, Message, WorkflowNodeExecutionModel, db
class LLMGenerator:
@ -394,181 +388,3 @@ class LLMGenerator:
except Exception as e:
logging.exception("Failed to invoke LLM model, model: %s", model_config.get("name"))
return {"output": "", "error": f"An unexpected error occurred: {str(e)}"}
@staticmethod
def instruction_modify_legacy(
tenant_id: str, flow_id: str, current: str, instruction: str, model_config: dict, ideal_output: str | None
) -> dict:
app: App | None = db.session.query(App).where(App.id == flow_id).first()
last_run: Message | None = (
db.session.query(Message).where(Message.app_id == flow_id).order_by(Message.created_at.desc()).first()
)
if not last_run:
return LLMGenerator.__instruction_modify_common(
tenant_id=tenant_id,
model_config=model_config,
last_run=None,
current=current,
error_message="",
instruction=instruction,
node_type="llm",
ideal_output=ideal_output,
)
last_run_dict = {
"query": last_run.query,
"answer": last_run.answer,
"error": last_run.error,
}
return LLMGenerator.__instruction_modify_common(
tenant_id=tenant_id,
model_config=model_config,
last_run=last_run_dict,
current=current,
error_message=str(last_run.error),
instruction=instruction,
node_type="llm",
ideal_output=ideal_output,
)
@staticmethod
def instruction_modify_workflow(
tenant_id: str,
flow_id: str,
node_id: str,
current: str,
instruction: str,
model_config: dict,
ideal_output: str | None,
) -> dict:
from services.workflow_service import WorkflowService
app: App | None = db.session.query(App).where(App.id == flow_id).first()
if not app:
raise ValueError("App not found.")
workflow = WorkflowService().get_draft_workflow(app_model=app)
if not workflow:
raise ValueError("Workflow not found for the given app model.")
last_run = WorkflowService().get_node_last_run(app_model=app, workflow=workflow, node_id=node_id)
try:
node_type = cast(WorkflowNodeExecutionModel, last_run).node_type
except Exception:
try:
node_type = [it for it in workflow.graph_dict["graph"]["nodes"] if it["id"] == node_id][0]["data"][
"type"
]
except Exception:
node_type = "llm"
if not last_run: # Node is not executed yet
return LLMGenerator.__instruction_modify_common(
tenant_id=tenant_id,
model_config=model_config,
last_run=None,
current=current,
error_message="",
instruction=instruction,
node_type=node_type,
ideal_output=ideal_output,
)
def agent_log_of(node_execution: WorkflowNodeExecutionModel) -> Sequence:
raw_agent_log = node_execution.execution_metadata_dict.get(WorkflowNodeExecutionMetadataKey.AGENT_LOG)
if not raw_agent_log:
return []
parsed: Sequence[AgentLogEvent] = json.loads(raw_agent_log)
def dict_of_event(event: AgentLogEvent) -> dict:
return {
"status": event.status,
"error": event.error,
"data": event.data,
}
return [dict_of_event(event) for event in parsed]
last_run_dict = {
"inputs": last_run.inputs_dict,
"status": last_run.status,
"error": last_run.error,
"agent_log": agent_log_of(last_run),
}
return LLMGenerator.__instruction_modify_common(
tenant_id=tenant_id,
model_config=model_config,
last_run=last_run_dict,
current=current,
error_message=last_run.error,
instruction=instruction,
node_type=last_run.node_type,
ideal_output=ideal_output,
)
@staticmethod
def __instruction_modify_common(
tenant_id: str,
model_config: dict,
last_run: dict | None,
current: str | None,
error_message: str | None,
instruction: str,
node_type: str,
ideal_output: str | None,
) -> dict:
LAST_RUN = "{{#last_run#}}"
CURRENT = "{{#current#}}"
ERROR_MESSAGE = "{{#error_message#}}"
injected_instruction = instruction
if LAST_RUN in injected_instruction:
injected_instruction = injected_instruction.replace(LAST_RUN, json.dumps(last_run))
if CURRENT in injected_instruction:
injected_instruction = injected_instruction.replace(CURRENT, current or "null")
if ERROR_MESSAGE in injected_instruction:
injected_instruction = injected_instruction.replace(ERROR_MESSAGE, error_message or "null")
model_instance = ModelManager().get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
provider=model_config.get("provider", ""),
model=model_config.get("name", ""),
)
match node_type:
case "llm", "agent":
system_prompt = LLM_MODIFY_PROMPT_SYSTEM
case "code":
system_prompt = LLM_MODIFY_CODE_SYSTEM
case _:
system_prompt = LLM_MODIFY_PROMPT_SYSTEM
prompt_messages = [
SystemPromptMessage(content=system_prompt),
UserPromptMessage(
content=json.dumps(
{
"current": current,
"last_run": last_run,
"instruction": injected_instruction,
"ideal_output": ideal_output,
}
)
),
]
model_parameters = {"temperature": 0.4}
try:
response = cast(
LLMResult,
model_instance.invoke_llm(
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
),
)
generated_raw = cast(str, response.message.content)
first_brace = generated_raw.find("{")
last_brace = generated_raw.rfind("}")
return {**json.loads(generated_raw[first_brace : last_brace + 1])}
except InvokeError as e:
error = str(e)
return {"error": f"Failed to generate code. Error: {error}"}
except Exception as e:
logging.exception("Failed to invoke LLM model, model: " + json.dumps(model_config.get("name")), exc_info=e)
return {"error": f"An unexpected error occurred: {str(e)}"}

View File

@ -309,116 +309,3 @@ eg:
Here is the JSON schema:
{{schema}}
""" # noqa: E501
LLM_MODIFY_PROMPT_SYSTEM = """
Both your input and output should be in JSON format.
! Below is the schema for input content !
{
"type": "object",
"description": "The user is trying to process some content with a prompt, but the output is not as expected. They hope to achieve their goal by modifying the prompt.",
"properties": {
"current": {
"type": "string",
"description": "The prompt before modification, where placeholders {{}} will be replaced with actual values for the large language model. The content in the placeholders should not be changed."
},
"last_run": {
"type": "object",
"description": "The output result from the large language model after receiving the prompt.",
},
"instruction": {
"type": "string",
"description": "User's instruction to edit the current prompt"
},
"ideal_output": {
"type": "string",
"description": "The ideal output that the user expects from the large language model after modifying the prompt. You should compare the last output with the ideal output and make changes to the prompt to achieve the goal."
}
}
}
! Above is the schema for input content !
! Below is the schema for output content !
{
"type": "object",
"description": "Your feedback to the user after they provide modification suggestions.",
"properties": {
"modified": {
"type": "string",
"description": "Your modified prompt. You should change the original prompt as little as possible to achieve the goal. Keep the language of prompt if not asked to change"
},
"message": {
"type": "string",
"description": "Your feedback to the user, in the user's language, explaining what you did and your thought process in text, providing sufficient emotional value to the user."
}
},
"required": [
"modified",
"message"
]
}
! Above is the schema for output content !
Your output must strictly follow the schema format, do not output any content outside of the JSON body.
""" # noqa: E501
LLM_MODIFY_CODE_SYSTEM = """
Both your input and output should be in JSON format.
! Below is the schema for input content !
{
"type": "object",
"description": "The user is trying to process some data with a code snippet, but the result is not as expected. They hope to achieve their goal by modifying the code.",
"properties": {
"current": {
"type": "string",
"description": "The code before modification."
},
"last_run": {
"type": "object",
"description": "The result of the code.",
},
"message": {
"type": "string",
"description": "User's instruction to edit the current code"
}
}
}
! Above is the schema for input content !
! Below is the schema for output content !
{
"type": "object",
"description": "Your feedback to the user after they provide modification suggestions.",
"properties": {
"modified": {
"type": "string",
"description": "Your modified code. You should change the original code as little as possible to achieve the goal. Keep the programming language of code if not asked to change"
},
"message": {
"type": "string",
"description": "Your feedback to the user, in the user's language, explaining what you did and your thought process in text, providing sufficient emotional value to the user."
}
},
"required": [
"modified",
"message"
]
}
! Above is the schema for output content !
When you are modifying the code, you should remember:
- Do not use print, this not work in dify sandbox.
- Do not try dangerous call like deleting files. It's PROHIBITED.
- Do not use any library that is not built-in in with Python.
- Get inputs from the parameters of the function and have explicit type annotations.
- Write proper imports at the top of the code.
- Use return statement to return the result.
- You should return a `dict`. If you need to return a `result: str`, you should `return {"result": result}`.
Your output must strictly follow the schema format, do not output any content outside of the JSON body.
""" # noqa: E501
INSTRUCTION_GENERATE_TEMPLATE_PROMPT = """The output of this prompt is not as expected: {{#last_run#}}.
You should edit the prompt according to the IDEAL OUTPUT."""
INSTRUCTION_GENERATE_TEMPLATE_CODE = """Please fix the errors in the {{#error_message#}}."""

View File

@ -10,6 +10,8 @@ from core.mcp.types import (
from models.tools import MCPToolProvider
from services.tools.mcp_tools_manage_service import MCPToolManageService
LATEST_PROTOCOL_VERSION = "1.0"
class OAuthClientProvider:
mcp_provider: MCPToolProvider

View File

@ -7,7 +7,6 @@ from typing import Any, TypeAlias, final
from urllib.parse import urljoin, urlparse
import httpx
from httpx_sse import EventSource, ServerSentEvent
from sseclient import SSEClient
from core.mcp import types
@ -38,6 +37,11 @@ WriteQueue: TypeAlias = queue.Queue[SessionMessage | Exception | None]
StatusQueue: TypeAlias = queue.Queue[_StatusReady | _StatusError]
def remove_request_params(url: str) -> str:
"""Remove request parameters from URL, keeping only the path."""
return urljoin(url, urlparse(url).path)
class SSETransport:
"""SSE client transport implementation."""
@ -110,7 +114,7 @@ class SSETransport:
logger.exception("Error parsing server message")
read_queue.put(exc)
def _handle_sse_event(self, sse: ServerSentEvent, read_queue: ReadQueue, status_queue: StatusQueue) -> None:
def _handle_sse_event(self, sse, read_queue: ReadQueue, status_queue: StatusQueue) -> None:
"""Handle a single SSE event.
Args:
@ -126,7 +130,7 @@ class SSETransport:
case _:
logger.warning("Unknown SSE event: %s", sse.event)
def sse_reader(self, event_source: EventSource, read_queue: ReadQueue, status_queue: StatusQueue) -> None:
def sse_reader(self, event_source, read_queue: ReadQueue, status_queue: StatusQueue) -> None:
"""Read and process SSE events.
Args:
@ -221,7 +225,7 @@ class SSETransport:
self,
executor: ThreadPoolExecutor,
client: httpx.Client,
event_source: EventSource,
event_source,
) -> tuple[ReadQueue, WriteQueue]:
"""Establish connection and start worker threads.
@ -323,7 +327,7 @@ def send_message(http_client: httpx.Client, endpoint_url: str, session_message:
)
response.raise_for_status()
logger.debug("Client message sent successfully: %s", response.status_code)
except Exception:
except Exception as exc:
logger.exception("Error sending message")
raise

View File

@ -55,10 +55,14 @@ DEFAULT_QUEUE_READ_TIMEOUT = 3
class StreamableHTTPError(Exception):
"""Base exception for StreamableHTTP transport errors."""
pass
class ResumptionError(StreamableHTTPError):
"""Raised when resumption request is invalid."""
pass
@dataclass
class RequestContext:
@ -70,7 +74,7 @@ class RequestContext:
session_message: SessionMessage
metadata: ClientMessageMetadata | None
server_to_client_queue: ServerToClientQueue # Renamed for clarity
sse_read_timeout: float
sse_read_timeout: timedelta
class StreamableHTTPTransport:
@ -80,8 +84,8 @@ class StreamableHTTPTransport:
self,
url: str,
headers: dict[str, Any] | None = None,
timeout: float | timedelta = 30,
sse_read_timeout: float | timedelta = 60 * 5,
timeout: timedelta = timedelta(seconds=30),
sse_read_timeout: timedelta = timedelta(seconds=60 * 5),
) -> None:
"""Initialize the StreamableHTTP transport.
@ -93,10 +97,8 @@ class StreamableHTTPTransport:
"""
self.url = url
self.headers = headers or {}
self.timeout = timeout.total_seconds() if isinstance(timeout, timedelta) else timeout
self.sse_read_timeout = (
sse_read_timeout.total_seconds() if isinstance(sse_read_timeout, timedelta) else sse_read_timeout
)
self.timeout = timeout
self.sse_read_timeout = sse_read_timeout
self.session_id: str | None = None
self.request_headers = {
ACCEPT: f"{JSON}, {SSE}",
@ -184,7 +186,7 @@ class StreamableHTTPTransport:
with ssrf_proxy_sse_connect(
self.url,
headers=headers,
timeout=httpx.Timeout(self.timeout, read=self.sse_read_timeout),
timeout=httpx.Timeout(self.timeout.seconds, read=self.sse_read_timeout.seconds),
client=client,
method="GET",
) as event_source:
@ -213,7 +215,7 @@ class StreamableHTTPTransport:
with ssrf_proxy_sse_connect(
self.url,
headers=headers,
timeout=httpx.Timeout(self.timeout, read=self.sse_read_timeout),
timeout=httpx.Timeout(self.timeout.seconds, read=ctx.sse_read_timeout.seconds),
client=ctx.client,
method="GET",
) as event_source:
@ -400,8 +402,8 @@ class StreamableHTTPTransport:
def streamablehttp_client(
url: str,
headers: dict[str, Any] | None = None,
timeout: float | timedelta = 30,
sse_read_timeout: float | timedelta = 60 * 5,
timeout: timedelta = timedelta(seconds=30),
sse_read_timeout: timedelta = timedelta(seconds=60 * 5),
terminate_on_close: bool = True,
) -> Generator[
tuple[
@ -434,7 +436,7 @@ def streamablehttp_client(
try:
with create_ssrf_proxy_mcp_http_client(
headers=transport.request_headers,
timeout=httpx.Timeout(transport.timeout, read=transport.sse_read_timeout),
timeout=httpx.Timeout(transport.timeout.seconds, read=transport.sse_read_timeout.seconds),
) as client:
# Define callbacks that need access to thread pool
def start_get_stream() -> None:

View File

@ -23,18 +23,12 @@ class MCPClient:
authed: bool = True,
authorization_code: Optional[str] = None,
for_list: bool = False,
headers: Optional[dict[str, str]] = None,
timeout: Optional[float] = None,
sse_read_timeout: Optional[float] = None,
):
# Initialize info
self.provider_id = provider_id
self.tenant_id = tenant_id
self.client_type = "streamable"
self.server_url = server_url
self.headers = headers or {}
self.timeout = timeout
self.sse_read_timeout = sse_read_timeout
# Authentication info
self.authed = authed
@ -49,7 +43,7 @@ class MCPClient:
self._session: Optional[ClientSession] = None
self._streams_context: Optional[AbstractContextManager[Any]] = None
self._session_context: Optional[ClientSession] = None
self._exit_stack = ExitStack()
self.exit_stack = ExitStack()
# Whether the client has been initialized
self._initialized = False
@ -96,26 +90,21 @@ class MCPClient:
headers = (
{"Authorization": f"{self.token.token_type.capitalize()} {self.token.access_token}"}
if self.authed and self.token
else self.headers
)
self._streams_context = client_factory(
url=self.server_url,
headers=headers,
timeout=self.timeout,
sse_read_timeout=self.sse_read_timeout,
else {}
)
self._streams_context = client_factory(url=self.server_url, headers=headers)
if not self._streams_context:
raise MCPConnectionError("Failed to create connection context")
# Use exit_stack to manage context managers properly
if method_name == "mcp":
read_stream, write_stream, _ = self._exit_stack.enter_context(self._streams_context)
read_stream, write_stream, _ = self.exit_stack.enter_context(self._streams_context)
streams = (read_stream, write_stream)
else: # sse_client
streams = self._exit_stack.enter_context(self._streams_context)
streams = self.exit_stack.enter_context(self._streams_context)
self._session_context = ClientSession(*streams)
self._session = self._exit_stack.enter_context(self._session_context)
self._session = self.exit_stack.enter_context(self._session_context)
session = cast(ClientSession, self._session)
session.initialize()
return
@ -131,6 +120,9 @@ class MCPClient:
if first_try:
return self.connect_server(client_factory, method_name, first_try=False)
except MCPConnectionError:
raise
def list_tools(self) -> list[Tool]:
"""Connect to an MCP server running with SSE transport"""
# List available tools to verify connection
@ -150,7 +142,7 @@ class MCPClient:
"""Clean up resources"""
try:
# ExitStack will handle proper cleanup of all managed context managers
self._exit_stack.close()
self.exit_stack.close()
except Exception as e:
logging.exception("Error during cleanup")
raise ValueError(f"Error during cleanup: {e}")

View File

@ -16,14 +16,13 @@ from extensions.ext_database import db
from models.model import App, AppMCPServer, AppMode, EndUser
from services.app_generate_service import AppGenerateService
"""
Apply to MCP HTTP streamable server with stateless http
"""
logger = logging.getLogger(__name__)
class MCPServerStreamableHTTPRequestHandler:
"""
Apply to MCP HTTP streamable server with stateless http
"""
def __init__(
self, app: App, request: types.ClientRequest | types.ClientNotification, user_input_form: list[VariableEntity]
):

View File

@ -2,6 +2,7 @@ import logging
import queue
from collections.abc import Callable
from concurrent.futures import Future, ThreadPoolExecutor, TimeoutError
from contextlib import ExitStack
from datetime import timedelta
from types import TracebackType
from typing import Any, Generic, Self, TypeVar
@ -169,6 +170,7 @@ class BaseSession(
self._receive_notification_type = receive_notification_type
self._session_read_timeout_seconds = read_timeout_seconds
self._in_flight = {}
self._exit_stack = ExitStack()
# Initialize executor and future to None for proper cleanup checks
self._executor: ThreadPoolExecutor | None = None
self._receiver_future: Future | None = None
@ -375,7 +377,7 @@ class BaseSession(
self._handle_incoming(RuntimeError(f"Server Error: {message}"))
except queue.Empty:
continue
except Exception:
except Exception as e:
logging.exception("Error in message processing loop")
raise
@ -387,12 +389,14 @@ class BaseSession(
If the request is responded to within this method, it will not be
forwarded on to the message stream.
"""
pass
def _received_notification(self, notification: ReceiveNotificationT) -> None:
"""
Can be overridden by subclasses to handle a notification without needing
to listen on the message stream.
"""
pass
def send_progress_notification(
self, progress_token: str | int, progress: float, total: float | None = None
@ -401,9 +405,11 @@ class BaseSession(
Sends a progress notification for a request that is currently being
processed.
"""
pass
def _handle_incoming(
self,
req: RequestResponder[ReceiveRequestT, SendResultT] | ReceiveNotificationT | Exception,
) -> None:
"""A generic handler for incoming messages. Overwritten by subclasses."""
pass

View File

@ -1,4 +1,3 @@
import queue
from datetime import timedelta
from typing import Any, Protocol
@ -86,8 +85,8 @@ class ClientSession(
):
def __init__(
self,
read_stream: queue.Queue,
write_stream: queue.Queue,
read_stream,
write_stream,
read_timeout_seconds: timedelta | None = None,
sampling_callback: SamplingFnT | None = None,
list_roots_callback: ListRootsFnT | None = None,

View File

@ -1,10 +1,6 @@
import json
from collections.abc import Generator
from contextlib import AbstractContextManager
import httpx
import httpx_sse
from httpx_sse import connect_sse
from configs import dify_config
from core.mcp.types import ErrorData, JSONRPCError
@ -59,42 +55,20 @@ def create_ssrf_proxy_mcp_http_client(
)
def ssrf_proxy_sse_connect(url: str, **kwargs) -> AbstractContextManager[httpx_sse.EventSource]:
def ssrf_proxy_sse_connect(url, **kwargs):
"""Connect to SSE endpoint with SSRF proxy protection.
This function creates an SSE connection using the configured proxy settings
to prevent SSRF attacks when connecting to external endpoints. It returns
a context manager that yields an EventSource object for SSE streaming.
The function handles HTTP client creation and cleanup automatically, but
also accepts a pre-configured client via kwargs.
to prevent SSRF attacks when connecting to external endpoints.
Args:
url (str): The SSE endpoint URL to connect to
**kwargs: Additional arguments passed to the SSE connection, including:
- client (httpx.Client, optional): Pre-configured HTTP client.
If not provided, one will be created with SSRF protection.
- method (str, optional): HTTP method to use, defaults to "GET"
- headers (dict, optional): HTTP headers to include in the request
- timeout (httpx.Timeout, optional): Timeout configuration for the connection
url: The SSE endpoint URL
**kwargs: Additional arguments passed to the SSE connection
Returns:
AbstractContextManager[httpx_sse.EventSource]: A context manager that yields an EventSource
object for SSE streaming. The EventSource provides access to server-sent events.
Example:
```python
with ssrf_proxy_sse_connect(url, headers=headers) as event_source:
for sse in event_source.iter_sse():
print(sse.event, sse.data)
```
Note:
If a client is not provided in kwargs, one will be automatically created
with SSRF protection based on the application's configuration. If an
exception occurs during connection, any automatically created client
will be cleaned up automatically.
EventSource object for SSE streaming
"""
from httpx_sse import connect_sse
# Extract client if provided, otherwise create one
client = kwargs.pop("client", None)
@ -127,9 +101,7 @@ def ssrf_proxy_sse_connect(url: str, **kwargs) -> AbstractContextManager[httpx_s
raise
def create_mcp_error_response(
request_id: int | str | None, code: int, message: str, data=None
) -> Generator[bytes, None, None]:
def create_mcp_error_response(request_id: int | str | None, code: int, message: str, data=None):
"""Create MCP error response"""
error_data = ErrorData(code=code, message=message, data=data)
json_response = JSONRPCError(

View File

@ -99,13 +99,13 @@ class TokenBufferMemory:
prompt_messages.append(UserPromptMessage(content=message.query))
else:
prompt_message_contents: list[PromptMessageContentUnionTypes] = []
prompt_message_contents.append(TextPromptMessageContent(data=message.query))
for file in file_objs:
prompt_message = file_manager.to_prompt_message_content(
file,
image_detail_config=detail,
)
prompt_message_contents.append(prompt_message)
prompt_message_contents.append(TextPromptMessageContent(data=message.query))
prompt_messages.append(UserPromptMessage(content=prompt_message_contents))

View File

@ -257,6 +257,11 @@ class ModelProviderFactory:
# scan all providers
plugin_model_provider_entities = self.get_plugin_model_providers()
# convert provider_configs to dict
provider_credentials_dict = {}
for provider_config in provider_configs:
provider_credentials_dict[provider_config.provider] = provider_config.credentials
# traverse all model_provider_extensions
providers = []
for plugin_model_provider_entity in plugin_model_provider_entities:

View File

@ -68,7 +68,7 @@ class CommonValidator:
if credential_form_schema.max_length:
if len(value) > credential_form_schema.max_length:
raise ValueError(
f"Variable {credential_form_schema.variable} length should not be"
f"Variable {credential_form_schema.variable} length should not"
f" greater than {credential_form_schema.max_length}"
)

View File

@ -151,9 +151,12 @@ def jsonable_encoder(
return format(obj, "f")
if isinstance(obj, dict):
encoded_dict = {}
allowed_keys = set(obj.keys())
for key, value in obj.items():
if (not sqlalchemy_safe or (not isinstance(key, str)) or (not key.startswith("_sa"))) and (
value is not None or not exclude_none
if (
(not sqlalchemy_safe or (not isinstance(key, str)) or (not key.startswith("_sa")))
and (value is not None or not exclude_none)
and key in allowed_keys
):
encoded_key = jsonable_encoder(
key,

View File

@ -0,0 +1,10 @@
import pydantic
from pydantic import BaseModel
def dump_model(model: BaseModel) -> dict:
if hasattr(pydantic, "model_dump"):
# FIXME mypy error, try to fix it instead of using type: ignore
return pydantic.model_dump(model) # type: ignore
else:
return model.model_dump()

View File

@ -4,15 +4,15 @@ from collections.abc import Sequence
from typing import Optional
from urllib.parse import urljoin
from opentelemetry.trace import Link, Status, StatusCode
from opentelemetry.trace import Status, StatusCode
from sqlalchemy.orm import Session, sessionmaker
from core.ops.aliyun_trace.data_exporter.traceclient import (
TraceClient,
convert_datetime_to_nanoseconds,
convert_string_to_id,
convert_to_span_id,
convert_to_trace_id,
create_link,
generate_span_id,
)
from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData
@ -103,11 +103,10 @@ class AliyunDataTrace(BaseTraceInstance):
def workflow_trace(self, trace_info: WorkflowTraceInfo):
trace_id = convert_to_trace_id(trace_info.workflow_run_id)
links = []
if trace_info.trace_id:
links.append(create_link(trace_id_str=trace_info.trace_id))
trace_id = convert_string_to_id(trace_info.trace_id)
workflow_span_id = convert_to_span_id(trace_info.workflow_run_id, "workflow")
self.add_workflow_span(trace_id, workflow_span_id, trace_info, links)
self.add_workflow_span(trace_id, workflow_span_id, trace_info)
workflow_node_executions = self.get_workflow_node_executions(trace_info)
for node_execution in workflow_node_executions:
@ -133,9 +132,8 @@ class AliyunDataTrace(BaseTraceInstance):
status = Status(StatusCode.ERROR, trace_info.error)
trace_id = convert_to_trace_id(message_id)
links = []
if trace_info.trace_id:
links.append(create_link(trace_id_str=trace_info.trace_id))
trace_id = convert_string_to_id(trace_info.trace_id)
message_span_id = convert_to_span_id(message_id, "message")
message_span = SpanData(
@ -154,7 +152,6 @@ class AliyunDataTrace(BaseTraceInstance):
OUTPUT_VALUE: str(trace_info.outputs),
},
status=status,
links=links,
)
self.trace_client.add_span(message_span)
@ -195,9 +192,8 @@ class AliyunDataTrace(BaseTraceInstance):
message_id = trace_info.message_id
trace_id = convert_to_trace_id(message_id)
links = []
if trace_info.trace_id:
links.append(create_link(trace_id_str=trace_info.trace_id))
trace_id = convert_string_to_id(trace_info.trace_id)
documents_data = extract_retrieval_documents(trace_info.documents)
dataset_retrieval_span = SpanData(
@ -215,7 +211,6 @@ class AliyunDataTrace(BaseTraceInstance):
INPUT_VALUE: str(trace_info.inputs),
OUTPUT_VALUE: json.dumps(documents_data, ensure_ascii=False),
},
links=links,
)
self.trace_client.add_span(dataset_retrieval_span)
@ -229,9 +224,8 @@ class AliyunDataTrace(BaseTraceInstance):
status = Status(StatusCode.ERROR, trace_info.error)
trace_id = convert_to_trace_id(message_id)
links = []
if trace_info.trace_id:
links.append(create_link(trace_id_str=trace_info.trace_id))
trace_id = convert_string_to_id(trace_info.trace_id)
tool_span = SpanData(
trace_id=trace_id,
@ -250,7 +244,6 @@ class AliyunDataTrace(BaseTraceInstance):
OUTPUT_VALUE: str(trace_info.tool_outputs),
},
status=status,
links=links,
)
self.trace_client.add_span(tool_span)
@ -420,9 +413,7 @@ class AliyunDataTrace(BaseTraceInstance):
status=self.get_workflow_node_status(node_execution),
)
def add_workflow_span(
self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, links: Sequence[Link]
):
def add_workflow_span(self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo):
message_span_id = None
if trace_info.message_id:
message_span_id = convert_to_span_id(trace_info.message_id, "message")
@ -447,7 +438,6 @@ class AliyunDataTrace(BaseTraceInstance):
OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False),
},
status=status,
links=links,
)
self.trace_client.add_span(message_span)
@ -466,7 +456,6 @@ class AliyunDataTrace(BaseTraceInstance):
OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False),
},
status=status,
links=links,
)
self.trace_client.add_span(workflow_span)
@ -477,9 +466,8 @@ class AliyunDataTrace(BaseTraceInstance):
status = Status(StatusCode.ERROR, trace_info.error)
trace_id = convert_to_trace_id(message_id)
links = []
if trace_info.trace_id:
links.append(create_link(trace_id_str=trace_info.trace_id))
trace_id = convert_string_to_id(trace_info.trace_id)
suggested_question_span = SpanData(
trace_id=trace_id,
@ -499,7 +487,6 @@ class AliyunDataTrace(BaseTraceInstance):
OUTPUT_VALUE: json.dumps(trace_info.suggested_question, ensure_ascii=False),
},
status=status,
links=links,
)
self.trace_client.add_span(suggested_question_span)

View File

@ -16,7 +16,6 @@ from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.trace import Link, SpanContext, TraceFlags
from configs import dify_config
from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData
@ -167,16 +166,6 @@ class SpanBuilder:
return span
def create_link(trace_id_str: str) -> Link:
placeholder_span_id = 0x0000000000000000
trace_id = int(trace_id_str, 16)
span_context = SpanContext(
trace_id=trace_id, span_id=placeholder_span_id, is_remote=False, trace_flags=TraceFlags(TraceFlags.SAMPLED)
)
return Link(span_context)
def generate_span_id() -> int:
span_id = random.getrandbits(64)
while span_id == INVALID_SPAN_ID:

View File

@ -523,7 +523,7 @@ class ProviderManager:
# Init trial provider records if not exists
if ProviderQuotaType.TRIAL not in provider_quota_to_provider_record_dict:
try:
# FIXME ignore the type error, only TrialHostingQuota has limit need to change the logic
# FIXME ignore the type errork, onyl TrialHostingQuota has limit need to change the logic
new_provider_record = Provider(
tenant_id=tenant_id,
# TODO: Use provider name with prefix after the data migration.

View File

@ -1,7 +1,7 @@
import json
from collections import defaultdict
from typing import Any, Optional
import orjson
from pydantic import BaseModel
from configs import dify_config
@ -134,13 +134,13 @@ class Jieba(BaseKeyword):
dataset_keyword_table = self.dataset.dataset_keyword_table
keyword_data_source_type = dataset_keyword_table.data_source_type
if keyword_data_source_type == "database":
dataset_keyword_table.keyword_table = dumps_with_sets(keyword_table_dict)
dataset_keyword_table.keyword_table = json.dumps(keyword_table_dict, cls=SetEncoder)
db.session.commit()
else:
file_key = "keyword_files/" + self.dataset.tenant_id + "/" + self.dataset.id + ".txt"
if storage.exists(file_key):
storage.delete(file_key)
storage.save(file_key, dumps_with_sets(keyword_table_dict).encode("utf-8"))
storage.save(file_key, json.dumps(keyword_table_dict, cls=SetEncoder).encode("utf-8"))
def _get_dataset_keyword_table(self) -> Optional[dict]:
dataset_keyword_table = self.dataset.dataset_keyword_table
@ -156,11 +156,12 @@ class Jieba(BaseKeyword):
data_source_type=keyword_data_source_type,
)
if keyword_data_source_type == "database":
dataset_keyword_table.keyword_table = dumps_with_sets(
dataset_keyword_table.keyword_table = json.dumps(
{
"__type__": "keyword_table",
"__data__": {"index_id": self.dataset.id, "summary": None, "table": {}},
}
},
cls=SetEncoder,
)
db.session.add(dataset_keyword_table)
db.session.commit()
@ -251,13 +252,8 @@ class Jieba(BaseKeyword):
self._save_dataset_keyword_table(keyword_table)
def set_orjson_default(obj: Any) -> Any:
"""Default function for orjson serialization of set types"""
if isinstance(obj, set):
return list(obj)
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")
def dumps_with_sets(obj: Any) -> str:
"""JSON dumps with set support using orjson"""
return orjson.dumps(obj, default=set_orjson_default).decode("utf-8")
class SetEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return super().default(obj)

View File

@ -4,8 +4,8 @@ import math
from typing import Any
from pydantic import BaseModel, model_validator
from pyobvector import VECTOR, FtsIndexParam, FtsParser, ObVecClient, l2_distance # type: ignore
from sqlalchemy import JSON, Column, String
from pyobvector import VECTOR, ObVecClient # type: ignore
from sqlalchemy import JSON, Column, String, func
from sqlalchemy.dialects.mysql import LONGTEXT
from configs import dify_config
@ -119,21 +119,14 @@ class OceanBaseVector(BaseVector):
)
try:
if self._hybrid_search_enabled:
self._client.create_fts_idx_with_fts_index_param(
table_name=self._collection_name,
fts_idx_param=FtsIndexParam(
index_name="fulltext_index_for_col_text",
field_names=["text"],
parser_type=FtsParser.IK,
),
)
self._client.perform_raw_text_sql(f"""ALTER TABLE {self._collection_name}
ADD FULLTEXT INDEX fulltext_index_for_col_text (text) WITH PARSER ik""")
except Exception as e:
raise Exception(
"Failed to add fulltext index to the target table, your OceanBase version must be 4.3.5.1 or above "
+ "to support fulltext index and vector index in the same table",
e,
)
self._client.refresh_metadata([self._collection_name])
redis_client.set(collection_exist_cache_key, 1, ex=3600)
def _check_hybrid_search_support(self) -> bool:
@ -259,7 +252,7 @@ class OceanBaseVector(BaseVector):
vec_column_name="vector",
vec_data=query_vector,
topk=topk,
distance_func=l2_distance,
distance_func=func.l2_distance,
output_column_names=["text", "metadata"],
with_dist=True,
where_clause=_where_clause,

View File

@ -109,19 +109,8 @@ class OracleVector(BaseVector):
)
def _get_connection(self) -> Connection:
if self.config.is_autonomous:
connection = oracledb.connect(
user=self.config.user,
password=self.config.password,
dsn=self.config.dsn,
config_dir=self.config.config_dir,
wallet_location=self.config.wallet_location,
wallet_password=self.config.wallet_password,
)
return connection
else:
connection = oracledb.connect(user=self.config.user, password=self.config.password, dsn=self.config.dsn)
return connection
connection = oracledb.connect(user=self.config.user, password=self.config.password, dsn=self.config.dsn)
return connection
def _create_connection_pool(self, config: OracleVectorConfig):
pool_params = {

View File

@ -331,12 +331,6 @@ class QdrantVector(BaseVector):
def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]:
from qdrant_client.http import models
score_threshold = float(kwargs.get("score_threshold") or 0.0)
if score_threshold >= 1:
# return empty list because some versions of qdrant may response with 400 bad request
# and at the same time, the score_threshold with value 1 may be valid for other vector stores
return []
filter = models.Filter(
must=[
models.FieldCondition(
@ -361,7 +355,7 @@ class QdrantVector(BaseVector):
limit=kwargs.get("top_k", 4),
with_payload=True,
with_vectors=True,
score_threshold=score_threshold,
score_threshold=float(kwargs.get("score_threshold") or 0.0),
)
docs = []
for result in results:
@ -369,6 +363,7 @@ class QdrantVector(BaseVector):
continue
metadata = result.payload.get(Field.METADATA_KEY.value) or {}
# duplicate check score threshold
score_threshold = float(kwargs.get("score_threshold") or 0.0)
if result.score > score_threshold:
metadata["score"] = result.score
doc = Document(

View File

@ -5,14 +5,10 @@ This package contains concrete implementations of the repository interfaces
defined in the core.workflow.repository package.
"""
from core.repositories.celery_workflow_execution_repository import CeleryWorkflowExecutionRepository
from core.repositories.celery_workflow_node_execution_repository import CeleryWorkflowNodeExecutionRepository
from core.repositories.factory import DifyCoreRepositoryFactory, RepositoryImportError
from core.repositories.sqlalchemy_workflow_node_execution_repository import SQLAlchemyWorkflowNodeExecutionRepository
__all__ = [
"CeleryWorkflowExecutionRepository",
"CeleryWorkflowNodeExecutionRepository",
"DifyCoreRepositoryFactory",
"RepositoryImportError",
"SQLAlchemyWorkflowNodeExecutionRepository",

View File

@ -1,126 +0,0 @@
"""
Celery-based implementation of the WorkflowExecutionRepository.
This implementation uses Celery tasks for asynchronous storage operations,
providing improved performance by offloading database operations to background workers.
"""
import logging
from typing import Optional, Union
from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker
from core.workflow.entities.workflow_execution import WorkflowExecution
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from libs.helper import extract_tenant_id
from models import Account, CreatorUserRole, EndUser
from models.enums import WorkflowRunTriggeredFrom
from tasks.workflow_execution_tasks import (
save_workflow_execution_task,
)
logger = logging.getLogger(__name__)
class CeleryWorkflowExecutionRepository(WorkflowExecutionRepository):
"""
Celery-based implementation of the WorkflowExecutionRepository interface.
This implementation provides asynchronous storage capabilities by using Celery tasks
to handle database operations in background workers. This improves performance by
reducing the blocking time for workflow execution storage operations.
Key features:
- Asynchronous save operations using Celery tasks
- Support for multi-tenancy through tenant/app filtering
- Automatic retry and error handling through Celery
"""
_session_factory: sessionmaker
_tenant_id: str
_app_id: Optional[str]
_triggered_from: Optional[WorkflowRunTriggeredFrom]
_creator_user_id: str
_creator_user_role: CreatorUserRole
def __init__(
self,
session_factory: sessionmaker | Engine,
user: Union[Account, EndUser],
app_id: Optional[str],
triggered_from: Optional[WorkflowRunTriggeredFrom],
):
"""
Initialize the repository with Celery task configuration and context information.
Args:
session_factory: SQLAlchemy sessionmaker or engine for fallback operations
user: Account or EndUser object containing tenant_id, user ID, and role information
app_id: App ID for filtering by application (can be None)
triggered_from: Source of the execution trigger (DEBUGGING or APP_RUN)
"""
# Store session factory for fallback operations
if isinstance(session_factory, Engine):
self._session_factory = sessionmaker(bind=session_factory, expire_on_commit=False)
elif isinstance(session_factory, sessionmaker):
self._session_factory = session_factory
else:
raise ValueError(
f"Invalid session_factory type {type(session_factory).__name__}; expected sessionmaker or Engine"
)
# Extract tenant_id from user
tenant_id = extract_tenant_id(user)
if not tenant_id:
raise ValueError("User must have a tenant_id or current_tenant_id")
self._tenant_id = tenant_id # type: ignore[assignment] # We've already checked tenant_id is not None
# Store app context
self._app_id = app_id
# Extract user context
self._triggered_from = triggered_from
self._creator_user_id = user.id
# Determine user role based on user type
self._creator_user_role = CreatorUserRole.ACCOUNT if isinstance(user, Account) else CreatorUserRole.END_USER
logger.info(
"Initialized CeleryWorkflowExecutionRepository for tenant %s, app %s, triggered_from %s",
self._tenant_id,
self._app_id,
self._triggered_from,
)
def save(self, execution: WorkflowExecution) -> None:
"""
Save or update a WorkflowExecution instance asynchronously using Celery.
This method queues the save operation as a Celery task and returns immediately,
providing improved performance for high-throughput scenarios.
Args:
execution: The WorkflowExecution instance to save or update
"""
try:
# Serialize execution for Celery task
execution_data = execution.model_dump()
# Queue the save operation as a Celery task (fire and forget)
save_workflow_execution_task.delay(
execution_data=execution_data,
tenant_id=self._tenant_id,
app_id=self._app_id or "",
triggered_from=self._triggered_from.value if self._triggered_from else "",
creator_user_id=self._creator_user_id,
creator_user_role=self._creator_user_role.value,
)
logger.debug("Queued async save for workflow execution: %s", execution.id_)
except Exception as e:
logger.exception("Failed to queue save operation for execution %s", execution.id_)
# In case of Celery failure, we could implement a fallback to synchronous save
# For now, we'll re-raise the exception
raise

Some files were not shown because too many files have changed in this diff Show More