Compare commits

..

179 Commits

Author SHA1 Message Date
08caa4fce3 Merge branch 'feat/summary-index' into deploy/dev 2026-01-19 15:35:41 +08:00
5293fbe8ba fix: hit testing chunk detail summary 2026-01-19 15:35:07 +08:00
ed555c5fe7 Merge branch 'feat/summary-index' into deploy/dev 2026-01-19 15:14:28 +08:00
22974ea6b0 fix: preview chunk summary 2026-01-19 15:13:51 +08:00
754b01366a Merge branch 'chore/relocate-datasets-api-form' into deploy/dev 2026-01-19 14:51:03 +08:00
8af626092e chore: relocate datasets api form 2026-01-19 14:50:01 +08:00
49b3bad26b locl 2026-01-19 11:50:26 +08:00
50616c25d4 Merge branch 'feat/storage-50' into deploy/dev 2026-01-19 11:49:16 +08:00
3b4b5b332c feat(billing): enhance usage info with storage threshold display
- Add storageThreshold, storageTooltip, storageTotalDisplay props to UsageInfo
- Implement indeterminate state in ProgressBar for usage below threshold
- Update VectorSpaceInfo to calculate total based on plan type
- Add i18n for storage threshold tooltip (en-US, ja-JP, zh-Hans)
2026-01-19 11:47:35 +08:00
62c3f14570 Merge branch 'main' into feat/summary-index 2026-01-19 10:21:40 +08:00
41c3b1c57c Merge branch 'feat/support-free-try-app' into deploy/dev 2026-01-18 12:58:58 +08:00
994357d8b5 merge 2026-01-17 09:46:38 +08:00
5fb9fe3c94 fix: fix summary index bug. (#31134) 2026-01-16 20:24:57 +08:00
4fb08ae7d2 fix: fix summary index bug. 2026-01-16 20:24:18 +08:00
7481762acb fix: fix summary index bug. (#31125) 2026-01-16 18:56:17 +08:00
fcb2fe55e7 fix: fix summary index bug. 2026-01-16 18:55:10 +08:00
yyh
a0aa8cdb45 Merge remote-tracking branch 'origin/main' into feature/task-quadrant-view 2026-01-16 18:20:29 +08:00
yyh
ae8618877b fix(web): quadrant matrix i18n 2026-01-16 18:17:28 +08:00
yyh
1c55602445 fix(web): add calendar icon and DDL label to deadline badge in task-item 2026-01-16 17:24:11 +08:00
yyh
a3f1220d23 feat(web): add fullscreen expand mode to quadrant-matrix component
- Add expand button in header to open FullScreenModal
- Add numbered circles (1-4) to quadrant headers
- Add expanded prop to show full content without line-clamp
- Reorder grid layout: Q1 top-left, Q2 top-right, Q3 bottom-left, Q4 bottom-right
- Remove axis labels for cleaner design
2026-01-16 17:16:13 +08:00
4d7384731e fix: call get_text_content on LLMResult
Signed-off-by: Stream <Stream_2@qq.com>
2026-01-16 17:08:39 +08:00
yyh
d62e16b9bb fix(web): improve quadrant-matrix layout and text overflow handling
- Simplify axis label layout with horizontal/vertical arrangement
- Add proper text truncation with line-clamp and tooltips
- Fix overflow issues by adding min-w-0 on flex children
- Move scores inline with task name for compact display
- Add task count badge to quadrant headers
- Reduce maxDisplay to 3 for better density
2026-01-16 16:58:57 +08:00
yyh
13f2a43ccc feat(web): add Eisenhower Matrix visualization component for task quadrants
Add a new quadrant-matrix component that renders tasks in a 2x2 grid based
on importance and urgency scores. Integrate with code-block as a new
'quadrant' language type for markdown rendering.
2026-01-16 16:58:56 +08:00
553dd3266b fix: call get_text_content on LLMResult
Signed-off-by: Stream <Stream_2@qq.com>
2026-01-16 16:46:28 +08:00
5b0590d58e Merge branch 'feat/summary-index' into deploy/dev 2026-01-16 10:56:12 +08:00
d97f2df85c Merge branch 'main' into feat/summary-index 2026-01-16 10:55:58 +08:00
d3c09f16a9 merge feat/summary-index 2026-01-16 10:55:18 +08:00
fde8efa4a2 fix: summary index in parent child chunk 2026-01-16 10:49:38 +08:00
5f6d1297b0 fix: fix summary index bug. (#31058) 2026-01-15 18:10:46 +08:00
869e70964f fix: fix summary index bug. 2026-01-15 18:09:48 +08:00
1f313eb15c fix: pipeline run panel summary 2026-01-15 18:03:09 +08:00
f02adc26e5 fix: pipeline run panel summary 2026-01-15 18:02:19 +08:00
73027eab0a fix: fix summary index bug. (#31057) 2026-01-15 17:58:04 +08:00
74245fea8e fix: fix summary index bug. 2026-01-15 17:57:15 +08:00
5bc4bba668 Merge branch 'feat/summary-index' into deploy/dev 2026-01-15 16:09:44 +08:00
1126a2aa95 merge main 2026-01-15 16:08:29 +08:00
2107a3c32c feat: knowledgebase summary index (#31047) 2026-01-15 16:07:17 +08:00
22d0c55363 fix: fix summary index bug. 2026-01-15 15:10:38 +08:00
7c3ce7b1e6 fix: summary index change in create document 2026-01-15 13:48:07 +08:00
f4d20a02aa feat: fix summary index bug. 2026-01-15 11:06:18 +08:00
7eb65b07c8 feat: Make summary index support vision, and make the code more standardized. 2026-01-14 17:52:27 +08:00
830a7fb034 Merge branch 'main' into feat/summary-index 2026-01-14 13:40:15 +08:00
9b7e807690 feat: summary index (#30950) 2026-01-14 11:26:44 +08:00
af86f8de6f Merge branch 'feat/knowledgebase-summaryIndex' into feat/summary-index 2026-01-14 11:25:15 +08:00
ec78676949 Merge branch 'deploy/dev' into feat/summary-index 2026-01-13 21:30:50 +08:00
01a7dbcee8 Merge branch 'main' into feat/summary-index 2026-01-13 16:29:09 +08:00
4fe8d2491e feat: summary index 2026-01-13 16:27:32 +08:00
76da8b4ff3 Merge remote-tracking branch 'origin/deploy/dev' 2026-01-12 17:09:25 +08:00
25bfc1cc3b feat: implement Summary Index feature. 2026-01-12 16:52:21 +08:00
5c2ae922bc merge main 2026-01-12 13:42:17 +08:00
a92df530da mrege main 2026-01-12 13:41:27 +08:00
13eec13a14 feat: summary index 2026-01-12 13:38:18 +08:00
431936beb9 chore: handle callback warning 2026-01-12 11:33:18 +08:00
163540bf4a chore: handle refetch after created 2026-01-12 11:30:03 +08:00
221130b448 chore: remove old i18n 2026-01-12 10:55:02 +08:00
b1eb265fa5 fix: try app not call conversations and sessions 2026-01-09 16:48:03 +08:00
c2a0950660 fix: button ui problem 2026-01-09 15:34:48 +08:00
bfe98009fd chore: fix dataset problems 2026-01-09 14:26:18 +08:00
ea1704d211 fix: try basic detail errors 2026-01-09 14:14:15 +08:00
3ed0937734 merge 2026-01-08 18:27:47 +08:00
1fcf6e4943 Update 2025_12_16_1817-03ea244985ce_add_type_column_not_null_default_tool.py 2025-12-17 11:12:59 +08:00
f4a7efde3d update migration script. 2025-12-16 18:30:12 +08:00
38d4f0fd96 Merge remote-tracking branch 'origin/deploy/dev' 2025-12-16 18:25:54 +08:00
ec4f885dad update migration script. 2025-12-16 18:19:24 +08:00
3781c2a025 [autofix.ci] apply automated fixes 2025-12-16 08:37:32 +00:00
3782f17dc7 Optimize code. 2025-12-16 16:35:15 +08:00
29698aeed2 Merge remote-tracking branch 'origin/deploy/dev' 2025-12-16 16:26:19 +08:00
15ff8efb15 merge alembic head 2025-12-16 16:20:04 +08:00
407e1c8276 [autofix.ci] apply automated fixes 2025-12-16 08:14:05 +00:00
e368825c21 Merge remote-tracking branch 'upstream/main' 2025-12-16 15:50:49 +08:00
8dad6b6a6d Add "type" field to PipelineRecommendedPlugin model; Add query param "type" to recommended-plugins api. 2025-12-16 14:34:59 +08:00
2f54965a72 Add "type" field to PipelineRecommendedPlugin model; Add query param "type" to recommended-plugins api. 2025-12-16 10:43:45 +08:00
a1a3fa0283 Add "type" field to PipelineRecommendedPlugin model; Add query param "type" to recommended-plugins api. 2025-12-15 16:44:32 +08:00
ff7344f3d3 Add "type" field to PipelineRecommendedPlugin model; Add query param "type" to recommended-plugins api. 2025-12-15 16:38:44 +08:00
bcd33be22a Add "type" field to PipelineRecommendedPlugin model; Add query param "type" to recommended-plugins api. 2025-12-15 16:33:06 +08:00
0fb339ca4f fix: saved message 2025-11-18 11:38:12 +08:00
c1871e67aa chore: hide disabed action in try app 2025-11-18 11:28:13 +08:00
f711f9a317 fix: webapp url 2025-11-18 11:22:58 +08:00
9ff3310cb6 chore: handle suggestion readonly 2025-11-18 11:07:01 +08:00
b6bdcc7052 fix: not auther tool in readonly mode 2025-11-18 11:02:46 +08:00
67b0771081 fix: try app not ok in chat 2025-11-17 18:21:43 +08:00
9a07488da9 mrege 2025-11-17 15:42:56 +08:00
ef043c6906 fix: no app not show problem 2025-11-06 14:53:11 +08:00
ab814e3eac fix: inputs overwrite by curr item 2025-10-27 14:08:32 +08:00
a0e1eeb3f1 chore: reset form 2025-10-27 13:57:16 +08:00
b1ebeb67a7 feat: support new chat 2025-10-27 13:50:36 +08:00
082179f70f fix: try chat has not set converstaion 2025-10-27 13:38:41 +08:00
8786ebdbca feat: support use tempalte in create app 2025-10-27 10:58:57 +08:00
b49a4eab62 feat: add app list context 2025-10-24 18:33:54 +08:00
0a7b59f500 feat: add tool requirements to flow 2025-10-24 17:49:29 +08:00
c264d9152f chore: add advanced models 2025-10-24 17:42:38 +08:00
3bf9d898c0 feat: basic app requirements 2025-10-24 17:29:42 +08:00
a7f2849e74 fix: try chatbot ui 2025-10-24 16:22:01 +08:00
0957ece92f fix: the try app always use the curent conversation 2025-10-24 15:57:33 +08:00
949bf38d3c fix: chat setup ui 2025-10-24 15:30:53 +08:00
7bafb7f959 feat: chat info 2025-10-24 14:54:06 +08:00
9735f55ca4 feat: try app alert and i18n 2025-10-24 14:00:24 +08:00
4c1f9b949b feat: alert info and lodash to lodash-es 2025-10-24 11:24:19 +08:00
0af0c94dde fix: preview not full 2025-10-24 10:52:05 +08:00
8e4f0640cc fix: variable readonly in basic app problem 2025-10-24 10:41:18 +08:00
1f513e3b43 chore: remove debug code 2025-10-23 18:26:38 +08:00
aa0841e2a8 chore: 18n 2025-10-23 18:05:34 +08:00
b6a1562357 fix: handle create can not show 2025-10-23 17:54:45 +08:00
bee0797401 feat: create from try app 2025-10-23 17:45:54 +08:00
e085f39c13 chore: description and category 2025-10-23 17:29:32 +08:00
344844d3e0 chore: handle data is large 2025-10-23 16:53:10 +08:00
6e9f82491d chore: reuse the app detail and right meta 2025-10-23 15:51:59 +08:00
372b1c3db8 chore: change detail icon 2025-10-23 15:28:12 +08:00
58d305dbed chore: tab header jp 2025-10-23 15:25:25 +08:00
0360a0416b feat: integration preview page 2025-10-23 15:23:50 +08:00
72282b6e8f feat: try app layout 2025-10-23 14:58:17 +08:00
8391884c4e chore: tab and close btn 2025-10-23 14:45:08 +08:00
b018f2b0a0 feat: can show app detail modal 2025-10-23 14:17:43 +08:00
ab56b4a818 merge main 2025-10-23 11:12:13 +08:00
61ebc756aa feat: workflow preview 2025-10-16 17:38:13 +08:00
4bea38042a feat: text completion form preview 2025-10-16 14:03:30 +08:00
337abc536b fix: update responsive breakpoint and adjust divider visibility in banner component 2025-10-16 13:47:38 +08:00
cc02b78aca feat: different app preview 2025-10-16 11:27:58 +08:00
18f2d24f8e chore: preview input field readonly 2025-10-16 10:42:47 +08:00
0c7b9a462f chore: tools preview readonly 2025-10-16 10:36:36 +08:00
4dd5580854 chore: preview two cols in panel 2025-10-15 18:16:57 +08:00
440bd825d8 feat: can show tools in preview 2025-10-15 17:35:59 +08:00
d2379c38bd chore: handle history panel and completion review crash 2025-10-15 17:35:59 +08:00
cbc55c577b Merge branch 'feat/support-free-try-app' of github.com:langgenius/dify into feat/support-free-try-app 2025-10-15 17:20:20 +08:00
8e962d15d1 feat: improve explore page banner component with enhanced layout and responsive styles 2025-10-15 17:20:00 +08:00
b07c766551 chroe: fix ts problem 2025-10-15 16:00:14 +08:00
9e3dd69277 fix: upload btn not sync right 2025-10-15 15:51:18 +08:00
db9e5665c2 fix: docuemnt and aduio show condition in preview 2025-10-15 15:35:49 +08:00
cad77ce0bf chore: audio config readonly 2025-10-15 15:29:09 +08:00
6f4518ebf7 chore: document readonly 2025-10-15 15:27:18 +08:00
a8f5748dee chore: vision readonly 2025-10-15 15:21:23 +08:00
738d3001be chore: chat input and feature readonly 2025-10-15 15:21:22 +08:00
df4e32aaa0 Merge branch 'feat/support-free-try-app' of github.com:langgenius/dify into feat/support-free-try-app 2025-10-15 14:36:47 +08:00
a25e37a96d feat: implement responsive design and resize handling for explore page banner 2025-10-15 14:36:27 +08:00
f156b46705 chore: user input readonly 2025-10-15 13:48:39 +08:00
3b64e118d0 chore: readonly ui 2025-10-15 11:39:41 +08:00
566cd20849 feat: dataset config support readonly 2025-10-15 11:37:12 +08:00
df76527f29 feat: add pause functionality to explore page banner for improved user interaction 2025-10-15 10:36:09 +08:00
53a80a5dbe feat: enhance explore page banner functionality with state management and animation improvements 2025-10-15 09:55:14 +08:00
1507792a0c Merge branch 'feat/support-free-try-app' of github.com:langgenius/dify into feat/support-free-try-app 2025-10-14 18:54:11 +08:00
00b9bbff75 feat: enhance explore page banner functionality with state management and animation improvements 2025-10-14 18:53:29 +08:00
e1f8b4b387 feat: support show dataset in knowledge 2025-10-14 18:31:42 +08:00
1539d86f7d chore: instruction and vars to readonly 2025-10-14 17:28:49 +08:00
67bb14d3ee chore: update dependencies and improve explore page banner 2025-10-14 15:51:07 +08:00
5653309080 feat: add carousel & new banner of explore page 2025-10-14 15:41:22 +08:00
0f52b34b61 feat: try apps basic app preveiw 2025-10-14 15:38:22 +08:00
75e35857c1 feat: add carousel & new banner of explore page 2025-10-14 14:17:49 +08:00
4f81be70e3 feat: no apps 2025-10-13 18:31:57 +08:00
1d4d627d05 feat: toogle sidebar 2025-10-13 17:36:24 +08:00
2357234f39 chore: sidebar ui 2025-10-13 17:11:51 +08:00
a3f7d8f996 chore: merge main 2025-10-13 16:38:29 +08:00
56f12e70c1 chore: web apps copywritings 2025-10-13 16:18:57 +08:00
b14afda160 chore: app gallary nav 2025-10-13 15:40:13 +08:00
44b4948972 chore: explore card ui and permission 2025-10-13 15:07:25 +08:00
487eac3b91 chore: add banner permission 2025-10-13 11:27:50 +08:00
84b2913cd9 feat: filter title 2025-10-13 11:12:10 +08:00
176d810c8d chore: update category ui 2025-10-13 10:55:49 +08:00
9e66564526 feat: banner placeholder 2025-10-11 15:07:03 +08:00
781a9a56cd feat: explore title change 2025-10-11 14:58:54 +08:00
93be1219eb chore: try app title 2025-10-11 11:00:26 +08:00
3276d6429d chore: handle completion acion 2025-10-11 10:53:24 +08:00
50072a63ae feat: support try agent app 2025-10-11 10:42:55 +08:00
1ab7e1cba8 fix: try chatflow run url problem 2025-10-11 10:11:14 +08:00
b0aef35c63 feat: try chat flow app 2025-10-10 18:24:56 +08:00
ac351b700c chore: some ui 2025-10-10 16:51:49 +08:00
d1e5d30ea9 fix: text generation api url 2025-10-10 16:39:42 +08:00
c73e84d992 feat: can show text completion run result pages 2025-10-10 16:34:10 +08:00
5f0bd5119a chore: temp 2025-09-24 13:39:52 +08:00
8353352bda chore: try app can use web app run 2025-09-22 15:17:11 +08:00
73845cbec5 feat: text generation 2025-09-19 16:32:11 +08:00
c2f94e9e8a feat: api call the try app and support disable feedback 2025-09-19 11:32:30 +08:00
e54efda36f feat: try app page 2025-09-18 14:54:15 +08:00
d4bd19f6d8 fix: api login detect problems 2025-09-17 17:15:23 +08:00
4decbbbf18 chore: remove useless api 2025-09-17 14:34:59 +08:00
b15867f92e chore: feedback api 2025-09-17 14:12:34 +08:00
a5e5fbc6e0 chore: some api change to new 2025-09-17 14:10:56 +08:00
1b1471b6d8 fix: stop response api 2025-09-17 14:07:15 +08:00
5280bffde2 feat: change api to new 2025-09-17 11:17:12 +08:00
db0fc94b39 chore: change api to support try apps 2025-09-16 18:21:23 +08:00
642 changed files with 15767 additions and 35394 deletions

View File

@ -716,13 +716,3 @@ SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD=21
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE=1000
SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS=30
# Sandbox Dify CLI configuration
# Directory containing dify CLI binaries (dify-cli-<os>-<arch>). Defaults to api/bin when unset.
SANDBOX_DIFY_CLI_ROOT=
# CLI API URL for sandbox (dify-sandbox or e2b) to call back to Dify API.
# This URL must be accessible from the sandbox environment.
# For local development: use http://localhost:5001 or http://127.0.0.1:5001
# For Docker deployment: use http://api:5001 (internal Docker network)
# For external sandbox (e.g., e2b): use a publicly accessible URL
CLI_API_URL=http://localhost:5001

View File

@ -71,8 +71,6 @@ def create_app() -> DifyApp:
def initialize_extensions(app: DifyApp):
# Initialize Flask context capture for workflow execution
from context.flask_app_context import init_flask_context
from extensions import (
ext_app_metrics,
ext_blueprints,
@ -102,8 +100,6 @@ def initialize_extensions(app: DifyApp):
ext_warnings,
)
init_flask_context()
extensions = [
ext_timezone,
ext_logging,

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -23,7 +23,7 @@ from core.rag.datasource.vdb.vector_factory import Vector
from core.rag.datasource.vdb.vector_type import VectorType
from core.rag.index_processor.constant.built_in_field import BuiltInField
from core.rag.models.document import Document
from core.tools.utils.system_encryption import encrypt_system_params
from core.tools.utils.system_oauth_encryption import encrypt_system_oauth_params
from events.app_event import app_was_created
from extensions.ext_database import db
from extensions.ext_redis import redis_client
@ -862,27 +862,8 @@ def clear_free_plan_tenant_expired_logs(days: int, batch: int, tenant_ids: list[
@click.command("clean-workflow-runs", help="Clean expired workflow runs and related data for free tenants.")
@click.option(
"--before-days",
"--days",
default=30,
show_default=True,
type=click.IntRange(min=0),
help="Delete workflow runs created before N days ago.",
)
@click.option("--days", default=30, show_default=True, help="Delete workflow runs created before N days ago.")
@click.option("--batch-size", default=200, show_default=True, help="Batch size for selecting workflow runs.")
@click.option(
"--from-days-ago",
default=None,
type=click.IntRange(min=0),
help="Lower bound in days ago (older). Must be paired with --to-days-ago.",
)
@click.option(
"--to-days-ago",
default=None,
type=click.IntRange(min=0),
help="Upper bound in days ago (newer). Must be paired with --from-days-ago.",
)
@click.option(
"--start-from",
type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"]),
@ -901,10 +882,8 @@ def clear_free_plan_tenant_expired_logs(days: int, batch: int, tenant_ids: list[
help="Preview cleanup results without deleting any workflow run data.",
)
def clean_workflow_runs(
before_days: int,
days: int,
batch_size: int,
from_days_ago: int | None,
to_days_ago: int | None,
start_from: datetime.datetime | None,
end_before: datetime.datetime | None,
dry_run: bool,
@ -915,24 +894,11 @@ def clean_workflow_runs(
if (start_from is None) ^ (end_before is None):
raise click.UsageError("--start-from and --end-before must be provided together.")
if (from_days_ago is None) ^ (to_days_ago is None):
raise click.UsageError("--from-days-ago and --to-days-ago must be provided together.")
if from_days_ago is not None and to_days_ago is not None:
if start_from or end_before:
raise click.UsageError("Choose either day offsets or explicit dates, not both.")
if from_days_ago <= to_days_ago:
raise click.UsageError("--from-days-ago must be greater than --to-days-ago.")
now = datetime.datetime.now()
start_from = now - datetime.timedelta(days=from_days_ago)
end_before = now - datetime.timedelta(days=to_days_ago)
before_days = 0
start_time = datetime.datetime.now(datetime.UTC)
click.echo(click.style(f"Starting workflow run cleanup at {start_time.isoformat()}.", fg="white"))
WorkflowRunCleanup(
days=before_days,
days=days,
batch_size=batch_size,
start_from=start_from,
end_before=end_before,
@ -1245,7 +1211,7 @@ def remove_orphaned_files_on_storage(force: bool):
click.echo(click.style(f"- Scanning files on storage path {storage_path}", fg="white"))
files = storage.scan(path=storage_path, files=True, directories=False)
all_files_on_storage.extend(files)
except FileNotFoundError:
except FileNotFoundError as e:
click.echo(click.style(f" -> Skipping path {storage_path} as it does not exist.", fg="yellow"))
continue
except Exception as e:
@ -1493,60 +1459,6 @@ def file_usage(
click.echo(click.style(f"Use --offset {offset + limit} to see next page", fg="white"))
@click.command("setup-sandbox-system-config", help="Setup system-level sandbox provider configuration.")
@click.option(
"--provider-type", prompt=True, type=click.Choice(["e2b", "docker", "local"]), help="Sandbox provider type"
)
@click.option("--config", prompt=True, help='Configuration JSON (e.g., {"api_key": "xxx"} for e2b)')
def setup_sandbox_system_config(provider_type: str, config: str):
"""
Setup system-level sandbox provider configuration.
Examples:
flask setup-sandbox-system-config --provider-type e2b --config '{"api_key": "e2b_xxx"}'
flask setup-sandbox-system-config --provider-type docker --config '{"docker_sock": "unix:///var/run/docker.sock"}'
flask setup-sandbox-system-config --provider-type local --config '{}'
"""
from models.sandbox import SandboxProviderSystemConfig
from services.sandbox.sandbox_provider_service import PROVIDER_CONFIG_MODELS
try:
click.echo(click.style(f"Validating config: {config}", fg="yellow"))
config_dict = TypeAdapter(dict[str, Any]).validate_json(config)
click.echo(click.style("Config validated successfully.", fg="green"))
click.echo(click.style(f"Validating config schema for provider type: {provider_type}", fg="yellow"))
model_class = PROVIDER_CONFIG_MODELS.get(provider_type)
if model_class:
model_class.model_validate(config_dict)
click.echo(click.style("Config schema validated successfully.", fg="green"))
click.echo(click.style("Encrypting config...", fg="yellow"))
click.echo(click.style(f"Using SECRET_KEY: `{dify_config.SECRET_KEY}`", fg="yellow"))
encrypted_config = encrypt_system_params(config_dict)
click.echo(click.style("Config encrypted successfully.", fg="green"))
except Exception as e:
click.echo(click.style(f"Error validating/encrypting config: {str(e)}", fg="red"))
return
deleted_count = db.session.query(SandboxProviderSystemConfig).filter_by(provider_type=provider_type).delete()
if deleted_count > 0:
click.echo(
click.style(
f"Deleted {deleted_count} existing system config for provider type: {provider_type}", fg="yellow"
)
)
system_config = SandboxProviderSystemConfig(
provider_type=provider_type,
encrypted_config=encrypted_config,
)
db.session.add(system_config)
db.session.commit()
click.echo(click.style(f"Sandbox system config setup successfully. id: {system_config.id}", fg="green"))
click.echo(click.style(f"Provider type: {provider_type}", fg="green"))
@click.command("setup-system-tool-oauth-client", help="Setup system tool oauth client.")
@click.option("--provider", prompt=True, help="Provider name")
@click.option("--client-params", prompt=True, help="Client Params")
@ -1566,7 +1478,7 @@ def setup_system_tool_oauth_client(provider, client_params):
click.echo(click.style(f"Encrypting client params: {client_params}", fg="yellow"))
click.echo(click.style(f"Using SECRET_KEY: `{dify_config.SECRET_KEY}`", fg="yellow"))
oauth_client_params = encrypt_system_params(client_params_dict)
oauth_client_params = encrypt_system_oauth_params(client_params_dict)
click.echo(click.style("Client params encrypted successfully.", fg="green"))
except Exception as e:
click.echo(click.style(f"Error parsing client params: {str(e)}", fg="red"))
@ -1615,7 +1527,7 @@ def setup_system_trigger_oauth_client(provider, client_params):
click.echo(click.style(f"Encrypting client params: {client_params}", fg="yellow"))
click.echo(click.style(f"Using SECRET_KEY: `{dify_config.SECRET_KEY}`", fg="yellow"))
oauth_client_params = encrypt_system_params(client_params_dict)
oauth_client_params = encrypt_system_oauth_params(client_params_dict)
click.echo(click.style("Client params encrypted successfully.", fg="green"))
except Exception as e:
click.echo(click.style(f"Error parsing client params: {str(e)}", fg="red"))

View File

@ -2,7 +2,6 @@ import logging
from pathlib import Path
from typing import Any
from pydantic import Field
from pydantic.fields import FieldInfo
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, SettingsConfigDict, TomlConfigSettingsSource
@ -83,14 +82,6 @@ class DifyConfig(
extra="ignore",
)
SANDBOX_DIFY_CLI_ROOT: str | None = Field(
default=None,
description=(
"Filesystem directory containing dify CLI binaries named dify-cli-<os>-<arch>. "
"Defaults to api/bin when unset."
),
)
# Before adding any config,
# please consider to arrange it in the proper config group of existed or added
# for better readability and maintainability.

View File

@ -244,17 +244,6 @@ class PluginConfig(BaseSettings):
)
class CliApiConfig(BaseSettings):
"""
Configuration for CLI API (for dify-cli to call back from external sandbox environments)
"""
CLI_API_URL: str = Field(
description="CLI API URL for external sandbox (e.g., e2b) to call back.",
default="http://localhost:5001",
)
class MarketplaceConfig(BaseSettings):
"""
Configuration for marketplace
@ -1320,7 +1309,6 @@ class FeatureConfig(
TriggerConfig,
AsyncWorkflowConfig,
PluginConfig,
CliApiConfig,
MarketplaceConfig,
DataSetConfig,
EndpointConfig,

View File

@ -1,74 +0,0 @@
"""
Core Context - Framework-agnostic context management.
This module provides context management that is independent of any specific
web framework. Framework-specific implementations register their context
capture functions at application initialization time.
This ensures the workflow layer remains completely decoupled from Flask
or any other web framework.
"""
import contextvars
from collections.abc import Callable
from core.workflow.context.execution_context import (
ExecutionContext,
IExecutionContext,
NullAppContext,
)
# Global capturer function - set by framework-specific modules
_capturer: Callable[[], IExecutionContext] | None = None
def register_context_capturer(capturer: Callable[[], IExecutionContext]) -> None:
"""
Register a context capture function.
This should be called by framework-specific modules (e.g., Flask)
during application initialization.
Args:
capturer: Function that captures current context and returns IExecutionContext
"""
global _capturer
_capturer = capturer
def capture_current_context() -> IExecutionContext:
"""
Capture current execution context.
This function uses the registered context capturer. If no capturer
is registered, it returns a minimal context with only contextvars
(suitable for non-framework environments like tests or standalone scripts).
Returns:
IExecutionContext with captured context
"""
if _capturer is None:
# No framework registered - return minimal context
return ExecutionContext(
app_context=NullAppContext(),
context_vars=contextvars.copy_context(),
)
return _capturer()
def reset_context_provider() -> None:
"""
Reset the context capturer.
This is primarily useful for testing to ensure a clean state.
"""
global _capturer
_capturer = None
__all__ = [
"capture_current_context",
"register_context_capturer",
"reset_context_provider",
]

View File

@ -1,198 +0,0 @@
"""
Flask App Context - Flask implementation of AppContext interface.
"""
import contextvars
from collections.abc import Generator
from contextlib import contextmanager
from typing import Any, final
from flask import Flask, current_app, g
from context import register_context_capturer
from core.workflow.context.execution_context import (
AppContext,
IExecutionContext,
)
@final
class FlaskAppContext(AppContext):
"""
Flask implementation of AppContext.
This adapts Flask's app context to the AppContext interface.
"""
def __init__(self, flask_app: Flask) -> None:
"""
Initialize Flask app context.
Args:
flask_app: The Flask application instance
"""
self._flask_app = flask_app
def get_config(self, key: str, default: Any = None) -> Any:
"""Get configuration value from Flask app config."""
return self._flask_app.config.get(key, default)
def get_extension(self, name: str) -> Any:
"""Get Flask extension by name."""
return self._flask_app.extensions.get(name)
@contextmanager
def enter(self) -> Generator[None, None, None]:
"""Enter Flask app context."""
with self._flask_app.app_context():
yield
@property
def flask_app(self) -> Flask:
"""Get the underlying Flask app instance."""
return self._flask_app
def capture_flask_context(user: Any = None) -> IExecutionContext:
"""
Capture current Flask execution context.
This function captures the Flask app context and contextvars from the
current environment. It should be called from within a Flask request or
app context.
Args:
user: Optional user object to include in context
Returns:
IExecutionContext with captured Flask context
Raises:
RuntimeError: If called outside Flask context
"""
# Get Flask app instance
flask_app = current_app._get_current_object() # type: ignore
# Save current user if available
saved_user = user
if saved_user is None:
# Check for user in g (flask-login)
if hasattr(g, "_login_user"):
saved_user = g._login_user
# Capture contextvars
context_vars = contextvars.copy_context()
return FlaskExecutionContext(
flask_app=flask_app,
context_vars=context_vars,
user=saved_user,
)
@final
class FlaskExecutionContext:
"""
Flask-specific execution context.
This is a specialized version of ExecutionContext that includes Flask app
context. It provides the same interface as ExecutionContext but with
Flask-specific implementation.
"""
def __init__(
self,
flask_app: Flask,
context_vars: contextvars.Context,
user: Any = None,
) -> None:
"""
Initialize Flask execution context.
Args:
flask_app: Flask application instance
context_vars: Python contextvars
user: Optional user object
"""
self._app_context = FlaskAppContext(flask_app)
self._context_vars = context_vars
self._user = user
self._flask_app = flask_app
@property
def app_context(self) -> FlaskAppContext:
"""Get Flask app context."""
return self._app_context
@property
def context_vars(self) -> contextvars.Context:
"""Get context variables."""
return self._context_vars
@property
def user(self) -> Any:
"""Get user object."""
return self._user
def __enter__(self) -> "FlaskExecutionContext":
"""Enter the Flask execution context."""
# Restore context variables
for var, val in self._context_vars.items():
var.set(val)
# Save current user from g if available
saved_user = None
if hasattr(g, "_login_user"):
saved_user = g._login_user
# Enter Flask app context
self._cm = self._app_context.enter()
self._cm.__enter__()
# Restore user in new app context
if saved_user is not None:
g._login_user = saved_user
return self
def __exit__(self, *args: Any) -> None:
"""Exit the Flask execution context."""
if hasattr(self, "_cm"):
self._cm.__exit__(*args)
@contextmanager
def enter(self) -> Generator[None, None, None]:
"""Enter Flask execution context as context manager."""
# Restore context variables
for var, val in self._context_vars.items():
var.set(val)
# Save current user from g if available
saved_user = None
if hasattr(g, "_login_user"):
saved_user = g._login_user
# Enter Flask app context
with self._flask_app.app_context():
# Restore user in new app context
if saved_user is not None:
g._login_user = saved_user
yield
def init_flask_context() -> None:
"""
Initialize Flask context capture by registering the capturer.
This function should be called during Flask application initialization
to register the Flask-specific context capturer with the core context module.
Example:
app = Flask(__name__)
init_flask_context() # Register Flask context capturer
Note:
This function does not need the app instance as it uses Flask's
`current_app` to get the app when capturing context.
"""
register_context_capturer(capture_flask_context)

View File

@ -1,27 +0,0 @@
from flask import Blueprint
from flask_restx import Namespace
from libs.external_api import ExternalApi
bp = Blueprint("cli_api", __name__, url_prefix="/cli/api")
api = ExternalApi(
bp,
version="1.0",
title="CLI API",
description="APIs for Dify CLI to call back from external sandbox environments (e.g., e2b)",
)
# Create namespace
cli_api_ns = Namespace("cli_api", description="CLI API operations", path="/")
from .plugin import plugin as _plugin
api.add_namespace(cli_api_ns)
__all__ = [
"_plugin",
"api",
"bp",
"cli_api_ns",
]

View File

@ -1,137 +0,0 @@
from flask_restx import Resource
from controllers.cli_api import cli_api_ns
from controllers.cli_api.plugin.wraps import get_user_tenant, plugin_data
from controllers.cli_api.wraps import cli_api_only
from controllers.console.wraps import setup_required
from core.file.helpers import get_signed_file_url_for_plugin
from core.plugin.backwards_invocation.app import PluginAppBackwardsInvocation
from core.plugin.backwards_invocation.base import BaseBackwardsInvocationResponse
from core.plugin.backwards_invocation.model import PluginModelBackwardsInvocation
from core.plugin.backwards_invocation.tool import PluginToolBackwardsInvocation
from core.plugin.entities.request import (
RequestInvokeApp,
RequestInvokeLLM,
RequestInvokeTool,
RequestRequestUploadFile,
)
from core.tools.entities.tool_entities import ToolProviderType
from libs.helper import length_prefixed_response
from models import Account, Tenant
from models.model import EndUser
@cli_api_ns.route("/invoke/llm")
class CliInvokeLLMApi(Resource):
@get_user_tenant
@setup_required
@cli_api_only
@plugin_data(payload_type=RequestInvokeLLM)
def post(self, user_model: Account | EndUser, tenant_model: Tenant, payload: RequestInvokeLLM):
def generator():
response = PluginModelBackwardsInvocation.invoke_llm(user_model.id, tenant_model, payload)
return PluginModelBackwardsInvocation.convert_to_event_stream(response)
return length_prefixed_response(0xF, generator())
@cli_api_ns.route("/invoke/tool")
class CliInvokeToolApi(Resource):
@get_user_tenant
@setup_required
@cli_api_only
@plugin_data(payload_type=RequestInvokeTool)
def post(self, user_model: Account | EndUser, tenant_model: Tenant, payload: RequestInvokeTool):
def generator():
return PluginToolBackwardsInvocation.convert_to_event_stream(
PluginToolBackwardsInvocation.invoke_tool(
tenant_id=tenant_model.id,
user_id=user_model.id,
tool_type=ToolProviderType.value_of(payload.tool_type),
provider=payload.provider,
tool_name=payload.tool,
tool_parameters=payload.tool_parameters,
credential_id=payload.credential_id,
),
)
return length_prefixed_response(0xF, generator())
@cli_api_ns.route("/invoke/app")
class CliInvokeAppApi(Resource):
@get_user_tenant
@setup_required
@cli_api_only
@plugin_data(payload_type=RequestInvokeApp)
def post(self, user_model: Account | EndUser, tenant_model: Tenant, payload: RequestInvokeApp):
response = PluginAppBackwardsInvocation.invoke_app(
app_id=payload.app_id,
user_id=user_model.id,
tenant_id=tenant_model.id,
conversation_id=payload.conversation_id,
query=payload.query,
stream=payload.response_mode == "streaming",
inputs=payload.inputs,
files=payload.files,
)
return length_prefixed_response(0xF, PluginAppBackwardsInvocation.convert_to_event_stream(response))
@cli_api_ns.route("/upload/file/request")
class CliUploadFileRequestApi(Resource):
@get_user_tenant
@setup_required
@cli_api_only
@plugin_data(payload_type=RequestRequestUploadFile)
def post(self, user_model: Account | EndUser, tenant_model: Tenant, payload: RequestRequestUploadFile):
# generate signed url
url = get_signed_file_url_for_plugin(
filename=payload.filename,
mimetype=payload.mimetype,
tenant_id=tenant_model.id,
user_id=user_model.id,
)
return BaseBackwardsInvocationResponse(data={"url": url}).model_dump()
@cli_api_ns.route("/fetch/tools/list")
class CliFetchToolsListApi(Resource):
@get_user_tenant
@setup_required
@cli_api_only
def post(self, user_model: Account | EndUser, tenant_model: Tenant):
from sqlalchemy.orm import Session
from extensions.ext_database import db
from services.tools.api_tools_manage_service import ApiToolManageService
from services.tools.builtin_tools_manage_service import BuiltinToolManageService
from services.tools.mcp_tools_manage_service import MCPToolManageService
from services.tools.workflow_tools_manage_service import WorkflowToolManageService
providers = []
# Get builtin tools
builtin_providers = BuiltinToolManageService.list_builtin_tools(user_model.id, tenant_model.id)
for provider in builtin_providers:
providers.append(provider.to_dict())
# Get API tools
api_providers = ApiToolManageService.list_api_tools(tenant_model.id)
for provider in api_providers:
providers.append(provider.to_dict())
# Get workflow tools
workflow_providers = WorkflowToolManageService.list_tenant_workflow_tools(user_model.id, tenant_model.id)
for provider in workflow_providers:
providers.append(provider.to_dict())
# Get MCP tools
with Session(db.engine) as session:
mcp_service = MCPToolManageService(session)
mcp_providers = mcp_service.list_providers(tenant_id=tenant_model.id, for_list=True)
for provider in mcp_providers:
providers.append(provider.to_dict())
return BaseBackwardsInvocationResponse(data={"providers": providers}).model_dump()

View File

@ -1,145 +0,0 @@
from collections.abc import Callable
from functools import wraps
from typing import ParamSpec, TypeVar
from flask import current_app, request
from flask_login import user_logged_in
from pydantic import BaseModel
from sqlalchemy.orm import Session
from core.session.cli_api import CliApiSession, CliApiSessionManager
from extensions.ext_database import db
from libs.login import current_user
from models.account import Tenant
from models.model import DefaultEndUserSessionID, EndUser
P = ParamSpec("P")
R = TypeVar("R")
class TenantUserPayload(BaseModel):
tenant_id: str
user_id: str
def get_user(tenant_id: str, user_id: str | None) -> EndUser:
"""
Get current user
NOTE: user_id is not trusted, it could be maliciously set to any value.
As a result, it could only be considered as an end user id.
"""
if not user_id:
user_id = DefaultEndUserSessionID.DEFAULT_SESSION_ID
is_anonymous = user_id == DefaultEndUserSessionID.DEFAULT_SESSION_ID
try:
with Session(db.engine) as session:
user_model = None
if is_anonymous:
user_model = (
session.query(EndUser)
.where(
EndUser.session_id == user_id,
EndUser.tenant_id == tenant_id,
)
.first()
)
else:
user_model = (
session.query(EndUser)
.where(
EndUser.id == user_id,
EndUser.tenant_id == tenant_id,
)
.first()
)
if not user_model:
user_model = EndUser(
tenant_id=tenant_id,
type="service_api",
is_anonymous=is_anonymous,
session_id=user_id,
)
session.add(user_model)
session.commit()
session.refresh(user_model)
except Exception:
raise ValueError("user not found")
return user_model
def get_user_tenant(view_func: Callable[P, R]):
@wraps(view_func)
def decorated_view(*args: P.args, **kwargs: P.kwargs):
session_id = request.headers.get("X-Cli-Api-Session-Id")
if session_id:
session: CliApiSession | None = CliApiSessionManager().get(session_id)
if not session:
raise ValueError("session not found")
user_id = session.user_id
tenant_id = session.tenant_id
else:
payload = TenantUserPayload.model_validate(request.get_json(silent=True) or {})
user_id = payload.user_id
tenant_id = payload.tenant_id
if not tenant_id:
raise ValueError("tenant_id is required")
if not user_id:
user_id = DefaultEndUserSessionID.DEFAULT_SESSION_ID
try:
tenant_model = (
db.session.query(Tenant)
.where(
Tenant.id == tenant_id,
)
.first()
)
except Exception:
raise ValueError("tenant not found")
if not tenant_model:
raise ValueError("tenant not found")
kwargs["tenant_model"] = tenant_model
user = get_user(tenant_id, user_id)
kwargs["user_model"] = user
current_app.login_manager._update_request_context_with_user(user) # type: ignore
user_logged_in.send(current_app._get_current_object(), user=current_user) # type: ignore
return view_func(*args, **kwargs)
return decorated_view
def plugin_data(view: Callable[P, R] | None = None, *, payload_type: type[BaseModel]):
def decorator(view_func: Callable[P, R]):
def decorated_view(*args: P.args, **kwargs: P.kwargs):
try:
data = request.get_json()
except Exception:
raise ValueError("invalid json")
try:
payload = payload_type.model_validate(data)
except Exception as e:
raise ValueError(f"invalid payload: {str(e)}")
kwargs["payload"] = payload
return view_func(*args, **kwargs)
return decorated_view
if view is None:
return decorator
else:
return decorator(view)

View File

@ -1,54 +0,0 @@
import hashlib
import hmac
import time
from collections.abc import Callable
from functools import wraps
from typing import ParamSpec, TypeVar
from flask import abort, request
from core.session.cli_api import CliApiSessionManager
P = ParamSpec("P")
R = TypeVar("R")
SIGNATURE_TTL_SECONDS = 300
def _verify_signature(session_secret: str, timestamp: str, body: bytes, signature: str) -> bool:
expected = hmac.new(
session_secret.encode(),
f"{timestamp}.".encode() + body,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
def cli_api_only(view: Callable[P, R]):
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
session_id = request.headers.get("X-Cli-Api-Session-Id")
timestamp = request.headers.get("X-Cli-Api-Timestamp")
signature = request.headers.get("X-Cli-Api-Signature")
if not session_id or not timestamp or not signature:
abort(401)
try:
ts = int(timestamp)
if abs(time.time() - ts) > SIGNATURE_TTL_SECONDS:
abort(401)
except ValueError:
abort(401)
session = CliApiSessionManager().get(session_id)
if not session:
abort(401)
body = request.get_data()
if not _verify_signature(session.secret, timestamp, body, signature):
abort(401)
return view(*args, **kwargs)
return decorated

View File

@ -50,7 +50,6 @@ from .app import (
agent,
annotation,
app,
app_asset,
audio,
completion,
conversation,
@ -127,7 +126,6 @@ from .workspace import (
model_providers,
models,
plugin,
sandbox_providers,
tool_providers,
trigger_providers,
workspace,
@ -146,7 +144,6 @@ __all__ = [
"api",
"apikey",
"app",
"app_asset",
"audio",
"billing",
"bp",
@ -194,7 +191,6 @@ __all__ = [
"rag_pipeline_import",
"rag_pipeline_workflow",
"recommended_app",
"sandbox_providers",
"saved_message",
"setup",
"site",

View File

@ -1,3 +1,4 @@
import re
import uuid
from datetime import datetime
from typing import Any, Literal, TypeAlias
@ -67,6 +68,48 @@ class AppListQuery(BaseModel):
raise ValueError("Invalid UUID format in tag_ids.") from exc
# XSS prevention: patterns that could lead to XSS attacks
# Includes: script tags, iframe tags, javascript: protocol, SVG with onload, etc.
_XSS_PATTERNS = [
r"<script[^>]*>.*?</script>", # Script tags
r"<iframe\b[^>]*?(?:/>|>.*?</iframe>)", # Iframe tags (including self-closing)
r"javascript:", # JavaScript protocol
r"<svg[^>]*?\s+onload\s*=[^>]*>", # SVG with onload handler (attribute-aware, flexible whitespace)
r"<.*?on\s*\w+\s*=", # Event handlers like onclick, onerror, etc.
r"<object\b[^>]*(?:\s*/>|>.*?</object\s*>)", # Object tags (opening tag)
r"<embed[^>]*>", # Embed tags (self-closing)
r"<link[^>]*>", # Link tags with javascript
]
def _validate_xss_safe(value: str | None, field_name: str = "Field") -> str | None:
"""
Validate that a string value doesn't contain potential XSS payloads.
Args:
value: The string value to validate
field_name: Name of the field for error messages
Returns:
The original value if safe
Raises:
ValueError: If the value contains XSS patterns
"""
if value is None:
return None
value_lower = value.lower()
for pattern in _XSS_PATTERNS:
if re.search(pattern, value_lower, re.DOTALL | re.IGNORECASE):
raise ValueError(
f"{field_name} contains invalid characters or patterns. "
"HTML tags, JavaScript, and other potentially dangerous content are not allowed."
)
return value
class CreateAppPayload(BaseModel):
name: str = Field(..., min_length=1, description="App name")
description: str | None = Field(default=None, description="App description (max 400 chars)", max_length=400)
@ -75,6 +118,11 @@ class CreateAppPayload(BaseModel):
icon: str | None = Field(default=None, description="Icon")
icon_background: str | None = Field(default=None, description="Icon background color")
@field_validator("name", "description", mode="before")
@classmethod
def validate_xss_safe(cls, value: str | None, info) -> str | None:
return _validate_xss_safe(value, info.field_name)
class UpdateAppPayload(BaseModel):
name: str = Field(..., min_length=1, description="App name")
@ -85,6 +133,11 @@ class UpdateAppPayload(BaseModel):
use_icon_as_answer_icon: bool | None = Field(default=None, description="Use icon as answer icon")
max_active_requests: int | None = Field(default=None, description="Maximum active requests")
@field_validator("name", "description", mode="before")
@classmethod
def validate_xss_safe(cls, value: str | None, info) -> str | None:
return _validate_xss_safe(value, info.field_name)
class CopyAppPayload(BaseModel):
name: str | None = Field(default=None, description="Name for the copied app")
@ -93,6 +146,11 @@ class CopyAppPayload(BaseModel):
icon: str | None = Field(default=None, description="Icon")
icon_background: str | None = Field(default=None, description="Icon background color")
@field_validator("name", "description", mode="before")
@classmethod
def validate_xss_safe(cls, value: str | None, info) -> str | None:
return _validate_xss_safe(value, info.field_name)
class AppExportQuery(BaseModel):
include_secret: bool = Field(default=False, description="Include secrets in export")

View File

@ -1,274 +0,0 @@
from flask import request
from flask_restx import Resource
from pydantic import BaseModel, Field, field_validator
from controllers.console import console_ns
from controllers.console.app.error import (
AppAssetFileRequiredError,
AppAssetNodeNotFoundError,
AppAssetPathConflictError,
)
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, setup_required
from libs.login import current_account_with_tenant, login_required
from models import App
from models.model import AppMode
from services.app_asset_service import AppAssetService
from services.errors.app_asset import (
AppAssetNodeNotFoundError as ServiceNodeNotFoundError,
)
from services.errors.app_asset import (
AppAssetParentNotFoundError,
)
from services.errors.app_asset import (
AppAssetPathConflictError as ServicePathConflictError,
)
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
class CreateFolderPayload(BaseModel):
name: str = Field(..., min_length=1, max_length=255)
parent_id: str | None = None
class CreateFilePayload(BaseModel):
name: str = Field(..., min_length=1, max_length=255)
parent_id: str | None = None
@field_validator("name", mode="before")
@classmethod
def strip_name(cls, v: str) -> str:
return v.strip() if isinstance(v, str) else v
@field_validator("parent_id", mode="before")
@classmethod
def empty_to_none(cls, v: str | None) -> str | None:
return v or None
class UpdateFileContentPayload(BaseModel):
content: str
class RenameNodePayload(BaseModel):
name: str = Field(..., min_length=1, max_length=255)
class MoveNodePayload(BaseModel):
parent_id: str | None = None
class ReorderNodePayload(BaseModel):
after_node_id: str | None = Field(default=None, description="Place after this node, None for first position")
def reg(cls: type[BaseModel]) -> None:
console_ns.schema_model(cls.__name__, cls.model_json_schema(ref_template=DEFAULT_REF_TEMPLATE_SWAGGER_2_0))
reg(CreateFolderPayload)
reg(CreateFilePayload)
reg(UpdateFileContentPayload)
reg(RenameNodePayload)
reg(MoveNodePayload)
reg(ReorderNodePayload)
@console_ns.route("/apps/<string:app_id>/assets/tree")
class AppAssetTreeResource(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App):
current_user, _ = current_account_with_tenant()
tree = AppAssetService.get_asset_tree(app_model, current_user.id)
return {"children": [view.model_dump() for view in tree.transform()]}
@console_ns.route("/apps/<string:app_id>/assets/folders")
class AppAssetFolderResource(Resource):
@console_ns.expect(console_ns.models[CreateFolderPayload.__name__])
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def post(self, app_model: App):
current_user, _ = current_account_with_tenant()
payload = CreateFolderPayload.model_validate(console_ns.payload or {})
try:
node = AppAssetService.create_folder(app_model, current_user.id, payload.name, payload.parent_id)
return node.model_dump(), 201
except AppAssetParentNotFoundError:
raise AppAssetNodeNotFoundError()
except ServicePathConflictError:
raise AppAssetPathConflictError()
@console_ns.route("/apps/<string:app_id>/assets/files")
class AppAssetFileResource(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def post(self, app_model: App):
current_user, _ = current_account_with_tenant()
file = request.files.get("file")
if not file:
raise AppAssetFileRequiredError()
payload = CreateFilePayload.model_validate(request.form.to_dict())
content = file.read()
try:
node = AppAssetService.create_file(app_model, current_user.id, payload.name, content, payload.parent_id)
return node.model_dump(), 201
except AppAssetParentNotFoundError:
raise AppAssetNodeNotFoundError()
except ServicePathConflictError:
raise AppAssetPathConflictError()
@console_ns.route("/apps/<string:app_id>/assets/files/<string:node_id>")
class AppAssetFileDetailResource(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, node_id: str):
current_user, _ = current_account_with_tenant()
try:
content = AppAssetService.get_file_content(app_model, current_user.id, node_id)
return {"content": content.decode("utf-8", errors="replace")}
except ServiceNodeNotFoundError:
raise AppAssetNodeNotFoundError()
@console_ns.expect(console_ns.models[UpdateFileContentPayload.__name__])
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def put(self, app_model: App, node_id: str):
current_user, _ = current_account_with_tenant()
file = request.files.get("file")
if file:
content = file.read()
else:
payload = UpdateFileContentPayload.model_validate(console_ns.payload or {})
content = payload.content.encode("utf-8")
try:
node = AppAssetService.update_file_content(app_model, current_user.id, node_id, content)
return node.model_dump()
except ServiceNodeNotFoundError:
raise AppAssetNodeNotFoundError()
@console_ns.route("/apps/<string:app_id>/assets/nodes/<string:node_id>")
class AppAssetNodeResource(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def delete(self, app_model: App, node_id: str):
current_user, _ = current_account_with_tenant()
try:
AppAssetService.delete_node(app_model, current_user.id, node_id)
return {"result": "success"}, 200
except ServiceNodeNotFoundError:
raise AppAssetNodeNotFoundError()
@console_ns.route("/apps/<string:app_id>/assets/nodes/<string:node_id>/rename")
class AppAssetNodeRenameResource(Resource):
@console_ns.expect(console_ns.models[RenameNodePayload.__name__])
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def post(self, app_model: App, node_id: str):
current_user, _ = current_account_with_tenant()
payload = RenameNodePayload.model_validate(console_ns.payload or {})
try:
node = AppAssetService.rename_node(app_model, current_user.id, node_id, payload.name)
return node.model_dump()
except ServiceNodeNotFoundError:
raise AppAssetNodeNotFoundError()
except ServicePathConflictError:
raise AppAssetPathConflictError()
@console_ns.route("/apps/<string:app_id>/assets/nodes/<string:node_id>/move")
class AppAssetNodeMoveResource(Resource):
@console_ns.expect(console_ns.models[MoveNodePayload.__name__])
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def post(self, app_model: App, node_id: str):
current_user, _ = current_account_with_tenant()
payload = MoveNodePayload.model_validate(console_ns.payload or {})
try:
node = AppAssetService.move_node(app_model, current_user.id, node_id, payload.parent_id)
return node.model_dump()
except ServiceNodeNotFoundError:
raise AppAssetNodeNotFoundError()
except AppAssetParentNotFoundError:
raise AppAssetNodeNotFoundError()
except ServicePathConflictError:
raise AppAssetPathConflictError()
@console_ns.route("/apps/<string:app_id>/assets/nodes/<string:node_id>/reorder")
class AppAssetNodeReorderResource(Resource):
@console_ns.expect(console_ns.models[ReorderNodePayload.__name__])
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def post(self, app_model: App, node_id: str):
current_user, _ = current_account_with_tenant()
payload = ReorderNodePayload.model_validate(console_ns.payload or {})
try:
node = AppAssetService.reorder_node(app_model, current_user.id, node_id, payload.after_node_id)
return node.model_dump()
except ServiceNodeNotFoundError:
raise AppAssetNodeNotFoundError()
@console_ns.route("/apps/<string:app_id>/assets/publish")
class AppAssetPublishResource(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def post(self, app_model: App):
current_user, _ = current_account_with_tenant()
published = AppAssetService.publish(app_model, current_user.id)
return {
"id": published.id,
"version": published.version,
"asset_tree": published.asset_tree.model_dump(),
}, 201
@console_ns.route("/apps/<string:app_id>/assets/files/<string:node_id>/download-url")
class AppAssetFileDownloadUrlResource(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App, node_id: str):
current_user, _ = current_account_with_tenant()
try:
download_url = AppAssetService.get_file_download_url(app_model, current_user.id, node_id)
return {"download_url": download_url}
except ServiceNodeNotFoundError:
raise AppAssetNodeNotFoundError()

View File

@ -110,24 +110,8 @@ class TracingConfigCheckError(BaseHTTPException):
class InvokeRateLimitError(BaseHTTPException):
"""Raised when the Invoke returns rate limit error."""
error_code = "rate_limit_error"
description = "Rate Limit Error"
code = 429
class AppAssetNodeNotFoundError(BaseHTTPException):
error_code = "app_asset_node_not_found"
description = "App asset node not found."
code = 404
class AppAssetFileRequiredError(BaseHTTPException):
error_code = "app_asset_file_required"
description = "File is required."
code = 400
class AppAssetPathConflictError(BaseHTTPException):
error_code = "app_asset_path_conflict"
description = "Path already exists."
code = 409

View File

@ -202,7 +202,6 @@ message_detail_model = console_ns.model(
"status": fields.String,
"error": fields.String,
"parent_message_id": fields.String,
"generation_detail": fields.Raw,
},
)

View File

@ -69,13 +69,6 @@ class ActivateCheckApi(Resource):
if invitation:
data = invitation.get("data", {})
tenant = invitation.get("tenant", None)
# Check workspace permission
if tenant:
from libs.workspace_permission import check_workspace_member_invite_permission
check_workspace_member_invite_permission(tenant.id)
workspace_name = tenant.name if tenant else None
workspace_id = tenant.id if tenant else None
invitee_email = data.get("email") if data else None

View File

@ -146,6 +146,7 @@ class DatasetUpdatePayload(BaseModel):
embedding_model: str | None = None
embedding_model_provider: str | None = None
retrieval_model: dict[str, Any] | None = None
summary_index_setting: dict[str, Any] | None = None
partial_member_list: list[dict[str, str]] | None = None
external_retrieval_model: dict[str, Any] | None = None
external_knowledge_id: str | None = None

View File

@ -39,9 +39,10 @@ from fields.document_fields import (
from libs.datetime_utils import naive_utc_now
from libs.login import current_account_with_tenant, login_required
from models import DatasetProcessRule, Document, DocumentSegment, UploadFile
from models.dataset import DocumentPipelineExecutionLog
from models.dataset import DocumentPipelineExecutionLog, DocumentSegmentSummary
from services.dataset_service import DatasetService, DocumentService
from services.entities.knowledge_entities.knowledge_entities import KnowledgeConfig, ProcessRule, RetrievalModel
from tasks.generate_summary_index_task import generate_summary_index_task
from ..app.error import (
ProviderModelCurrentlyNotSupportError,
@ -104,6 +105,10 @@ class DocumentRenamePayload(BaseModel):
name: str
class GenerateSummaryPayload(BaseModel):
document_list: list[str]
class DocumentDatasetListParam(BaseModel):
page: int = Field(1, title="Page", description="Page number.")
limit: int = Field(20, title="Limit", description="Page size.")
@ -120,6 +125,7 @@ register_schema_models(
RetrievalModel,
DocumentRetryPayload,
DocumentRenamePayload,
GenerateSummaryPayload,
)
@ -306,6 +312,94 @@ class DatasetDocumentListApi(Resource):
paginated_documents = db.paginate(select=query, page=page, per_page=limit, max_per_page=100, error_out=False)
documents = paginated_documents.items
# Check if dataset has summary index enabled
has_summary_index = dataset.summary_index_setting and dataset.summary_index_setting.get("enable") is True
# Filter documents that need summary calculation
documents_need_summary = [doc for doc in documents if doc.need_summary is True]
document_ids_need_summary = [str(doc.id) for doc in documents_need_summary]
# Calculate summary_index_status for documents that need summary (only if dataset summary index is enabled)
summary_status_map = {}
if has_summary_index and document_ids_need_summary:
# Get all segments for these documents (excluding qa_model and re_segment)
segments = (
db.session.query(DocumentSegment.id, DocumentSegment.document_id)
.where(
DocumentSegment.document_id.in_(document_ids_need_summary),
DocumentSegment.status != "re_segment",
DocumentSegment.tenant_id == current_tenant_id,
)
.all()
)
# Group segments by document_id
document_segments_map = {}
for segment in segments:
doc_id = str(segment.document_id)
if doc_id not in document_segments_map:
document_segments_map[doc_id] = []
document_segments_map[doc_id].append(segment.id)
# Get all summary records for these segments
all_segment_ids = [seg.id for seg in segments]
summaries = {}
if all_segment_ids:
summary_records = (
db.session.query(DocumentSegmentSummary)
.where(
DocumentSegmentSummary.chunk_id.in_(all_segment_ids),
DocumentSegmentSummary.dataset_id == dataset_id,
DocumentSegmentSummary.enabled == True, # Only count enabled summaries
)
.all()
)
summaries = {summary.chunk_id: summary.status for summary in summary_records}
# Calculate summary_index_status for each document
for doc_id in document_ids_need_summary:
segment_ids = document_segments_map.get(doc_id, [])
if not segment_ids:
# No segments, status is "GENERATING" (waiting to generate)
summary_status_map[doc_id] = "GENERATING"
continue
# Count summary statuses for this document's segments
status_counts = {"completed": 0, "generating": 0, "error": 0, "not_started": 0}
for segment_id in segment_ids:
status = summaries.get(segment_id, "not_started")
if status in status_counts:
status_counts[status] += 1
else:
status_counts["not_started"] += 1
total_segments = len(segment_ids)
completed_count = status_counts["completed"]
generating_count = status_counts["generating"]
error_count = status_counts["error"]
# Determine overall status (only three states: GENERATING, COMPLETED, ERROR)
if completed_count == total_segments:
summary_status_map[doc_id] = "COMPLETED"
elif error_count > 0:
# Has errors (even if some are completed or generating)
summary_status_map[doc_id] = "ERROR"
elif generating_count > 0 or status_counts["not_started"] > 0:
# Still generating or not started
summary_status_map[doc_id] = "GENERATING"
else:
# Default to generating
summary_status_map[doc_id] = "GENERATING"
# Add summary_index_status to each document
for document in documents:
if has_summary_index and document.need_summary is True:
document.summary_index_status = summary_status_map.get(str(document.id), "GENERATING")
else:
# Return null if summary index is not enabled or document doesn't need summary
document.summary_index_status = None
if fetch:
for document in documents:
completed_segments = (
@ -791,6 +885,7 @@ class DocumentApi(DocumentResource):
"display_status": document.display_status,
"doc_form": document.doc_form,
"doc_language": document.doc_language,
"need_summary": document.need_summary if document.need_summary is not None else False,
}
else:
dataset_process_rules = DatasetService.get_process_rules(dataset_id)
@ -826,6 +921,7 @@ class DocumentApi(DocumentResource):
"display_status": document.display_status,
"doc_form": document.doc_form,
"doc_language": document.doc_language,
"need_summary": document.need_summary if document.need_summary is not None else False,
}
return response, 200
@ -1193,3 +1289,216 @@ class DocumentPipelineExecutionLogApi(DocumentResource):
"input_data": log.input_data,
"datasource_node_id": log.datasource_node_id,
}, 200
@console_ns.route("/datasets/<uuid:dataset_id>/documents/generate-summary")
class DocumentGenerateSummaryApi(Resource):
@console_ns.doc("generate_summary_for_documents")
@console_ns.doc(description="Generate summary index for documents")
@console_ns.doc(params={"dataset_id": "Dataset ID"})
@console_ns.expect(console_ns.models[GenerateSummaryPayload.__name__])
@console_ns.response(200, "Summary generation started successfully")
@console_ns.response(400, "Invalid request or dataset configuration")
@console_ns.response(403, "Permission denied")
@console_ns.response(404, "Dataset not found")
@setup_required
@login_required
@account_initialization_required
@cloud_edition_billing_rate_limit_check("knowledge")
def post(self, dataset_id):
"""
Generate summary index for specified documents.
This endpoint checks if the dataset configuration supports summary generation
(indexing_technique must be 'high_quality' and summary_index_setting.enable must be true),
then asynchronously generates summary indexes for the provided documents.
"""
current_user, _ = current_account_with_tenant()
dataset_id = str(dataset_id)
# Get dataset
dataset = DatasetService.get_dataset(dataset_id)
if not dataset:
raise NotFound("Dataset not found.")
# Check permissions
if not current_user.is_dataset_editor:
raise Forbidden()
try:
DatasetService.check_dataset_permission(dataset, current_user)
except services.errors.account.NoPermissionError as e:
raise Forbidden(str(e))
# Validate request payload
payload = GenerateSummaryPayload.model_validate(console_ns.payload or {})
document_list = payload.document_list
if not document_list:
raise ValueError("document_list cannot be empty.")
# Check if dataset configuration supports summary generation
if dataset.indexing_technique != "high_quality":
raise ValueError(
f"Summary generation is only available for 'high_quality' indexing technique. "
f"Current indexing technique: {dataset.indexing_technique}"
)
summary_index_setting = dataset.summary_index_setting
if not summary_index_setting or not summary_index_setting.get("enable"):
raise ValueError("Summary index is not enabled for this dataset. Please enable it in the dataset settings.")
# Verify all documents exist and belong to the dataset
documents = (
db.session.query(Document)
.filter(
Document.id.in_(document_list),
Document.dataset_id == dataset_id,
)
.all()
)
if len(documents) != len(document_list):
found_ids = {doc.id for doc in documents}
missing_ids = set(document_list) - found_ids
raise NotFound(f"Some documents not found: {list(missing_ids)}")
# Dispatch async tasks for each document
for document in documents:
# Skip qa_model documents as they don't generate summaries
if document.doc_form == "qa_model":
logger.info("Skipping summary generation for qa_model document %s", document.id)
continue
# Dispatch async task
generate_summary_index_task(dataset_id, document.id)
logger.info(
"Dispatched summary generation task for document %s in dataset %s",
document.id,
dataset_id,
)
return {"result": "success"}, 200
@console_ns.route("/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/summary-status")
class DocumentSummaryStatusApi(DocumentResource):
@console_ns.doc("get_document_summary_status")
@console_ns.doc(description="Get summary index generation status for a document")
@console_ns.doc(params={"dataset_id": "Dataset ID", "document_id": "Document ID"})
@console_ns.response(200, "Summary status retrieved successfully")
@console_ns.response(404, "Document not found")
@setup_required
@login_required
@account_initialization_required
def get(self, dataset_id, document_id):
"""
Get summary index generation status for a document.
Returns:
- total_segments: Total number of segments in the document
- summary_status: Dictionary with status counts
- completed: Number of summaries completed
- generating: Number of summaries being generated
- error: Number of summaries with errors
- not_started: Number of segments without summary records
- summaries: List of summary records with status and content preview
"""
current_user, _ = current_account_with_tenant()
dataset_id = str(dataset_id)
document_id = str(document_id)
# Get document
document = self.get_document(dataset_id, document_id)
# Get dataset
dataset = DatasetService.get_dataset(dataset_id)
if not dataset:
raise NotFound("Dataset not found.")
# Check permissions
try:
DatasetService.check_dataset_permission(dataset, current_user)
except services.errors.account.NoPermissionError as e:
raise Forbidden(str(e))
# Get all segments for this document
segments = (
db.session.query(DocumentSegment)
.filter(
DocumentSegment.document_id == document_id,
DocumentSegment.dataset_id == dataset_id,
DocumentSegment.status == "completed",
DocumentSegment.enabled == True,
)
.all()
)
total_segments = len(segments)
# Get all summary records for these segments
segment_ids = [segment.id for segment in segments]
summaries = []
if segment_ids:
summaries = (
db.session.query(DocumentSegmentSummary)
.filter(
DocumentSegmentSummary.document_id == document_id,
DocumentSegmentSummary.dataset_id == dataset_id,
DocumentSegmentSummary.chunk_id.in_(segment_ids),
DocumentSegmentSummary.enabled == True, # Only return enabled summaries
)
.all()
)
# Create a mapping of chunk_id to summary
summary_map = {summary.chunk_id: summary for summary in summaries}
# Count statuses
status_counts = {
"completed": 0,
"generating": 0,
"error": 0,
"not_started": 0,
}
summary_list = []
for segment in segments:
summary = summary_map.get(segment.id)
if summary:
status = summary.status
status_counts[status] = status_counts.get(status, 0) + 1
summary_list.append(
{
"segment_id": segment.id,
"segment_position": segment.position,
"status": summary.status,
"summary_preview": (
summary.summary_content[:100] + "..."
if summary.summary_content and len(summary.summary_content) > 100
else summary.summary_content
),
"error": summary.error,
"created_at": int(summary.created_at.timestamp()) if summary.created_at else None,
"updated_at": int(summary.updated_at.timestamp()) if summary.updated_at else None,
}
)
else:
status_counts["not_started"] += 1
summary_list.append(
{
"segment_id": segment.id,
"segment_position": segment.position,
"status": "not_started",
"summary_preview": None,
"error": None,
"created_at": None,
"updated_at": None,
}
)
return {
"total_segments": total_segments,
"summary_status": status_counts,
"summaries": summary_list,
}, 200

View File

@ -32,7 +32,7 @@ from extensions.ext_redis import redis_client
from fields.segment_fields import child_chunk_fields, segment_fields
from libs.helper import escape_like_pattern
from libs.login import current_account_with_tenant, login_required
from models.dataset import ChildChunk, DocumentSegment
from models.dataset import ChildChunk, DocumentSegment, DocumentSegmentSummary
from models.model import UploadFile
from services.dataset_service import DatasetService, DocumentService, SegmentService
from services.entities.knowledge_entities.knowledge_entities import ChildChunkUpdateArgs, SegmentUpdateArgs
@ -41,6 +41,23 @@ from services.errors.chunk import ChildChunkIndexingError as ChildChunkIndexingS
from tasks.batch_create_segment_to_index_task import batch_create_segment_to_index_task
def _get_segment_with_summary(segment, dataset_id):
"""Helper function to marshal segment and add summary information."""
segment_dict = marshal(segment, segment_fields)
# Query summary for this segment (only enabled summaries)
summary = (
db.session.query(DocumentSegmentSummary)
.where(
DocumentSegmentSummary.chunk_id == segment.id,
DocumentSegmentSummary.dataset_id == dataset_id,
DocumentSegmentSummary.enabled == True, # Only return enabled summaries
)
.first()
)
segment_dict["summary"] = summary.summary_content if summary else None
return segment_dict
class SegmentListQuery(BaseModel):
limit: int = Field(default=20, ge=1, le=100)
status: list[str] = Field(default_factory=list)
@ -63,6 +80,7 @@ class SegmentUpdatePayload(BaseModel):
keywords: list[str] | None = None
regenerate_child_chunks: bool = False
attachment_ids: list[str] | None = None
summary: str | None = None # Summary content for summary index
class BatchImportPayload(BaseModel):
@ -180,8 +198,32 @@ class DatasetDocumentSegmentListApi(Resource):
segments = db.paginate(select=query, page=page, per_page=limit, max_per_page=100, error_out=False)
# Query summaries for all segments in this page (batch query for efficiency)
segment_ids = [segment.id for segment in segments.items]
summaries = {}
if segment_ids:
summary_records = (
db.session.query(DocumentSegmentSummary)
.where(
DocumentSegmentSummary.chunk_id.in_(segment_ids),
DocumentSegmentSummary.dataset_id == dataset_id,
)
.all()
)
# Only include enabled summaries
summaries = {
summary.chunk_id: summary.summary_content for summary in summary_records if summary.enabled is True
}
# Add summary to each segment
segments_with_summary = []
for segment in segments.items:
segment_dict = marshal(segment, segment_fields)
segment_dict["summary"] = summaries.get(segment.id)
segments_with_summary.append(segment_dict)
response = {
"data": marshal(segments.items, segment_fields),
"data": segments_with_summary,
"limit": limit,
"total": segments.total,
"total_pages": segments.pages,
@ -327,7 +369,7 @@ class DatasetDocumentSegmentAddApi(Resource):
payload_dict = payload.model_dump(exclude_none=True)
SegmentService.segment_create_args_validate(payload_dict, document)
segment = SegmentService.create_segment(payload_dict, document, dataset)
return {"data": marshal(segment, segment_fields), "doc_form": document.doc_form}, 200
return {"data": _get_segment_with_summary(segment, dataset_id), "doc_form": document.doc_form}, 200
@console_ns.route("/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/segments/<uuid:segment_id>")
@ -389,10 +431,12 @@ class DatasetDocumentSegmentUpdateApi(Resource):
payload = SegmentUpdatePayload.model_validate(console_ns.payload or {})
payload_dict = payload.model_dump(exclude_none=True)
SegmentService.segment_create_args_validate(payload_dict, document)
# Update segment (summary update with change detection is handled in SegmentService.update_segment)
segment = SegmentService.update_segment(
SegmentUpdateArgs.model_validate(payload.model_dump(exclude_none=True)), segment, document, dataset
)
return {"data": marshal(segment, segment_fields), "doc_form": document.doc_form}, 200
return {"data": _get_segment_with_summary(segment, dataset_id), "doc_form": document.doc_form}, 200
@setup_required
@login_required

View File

@ -1,6 +1,13 @@
from flask_restx import Resource
from flask_restx import Resource, fields
from controllers.common.schema import register_schema_model
from fields.hit_testing_fields import (
child_chunk_fields,
document_fields,
files_fields,
hit_testing_record_fields,
segment_fields,
)
from libs.login import login_required
from .. import console_ns
@ -14,13 +21,45 @@ from ..wraps import (
register_schema_model(console_ns, HitTestingPayload)
def _get_or_create_model(model_name: str, field_def):
"""Get or create a flask_restx model to avoid dict type issues in Swagger."""
existing = console_ns.models.get(model_name)
if existing is None:
existing = console_ns.model(model_name, field_def)
return existing
# Register models for flask_restx to avoid dict type issues in Swagger
document_model = _get_or_create_model("HitTestingDocument", document_fields)
segment_fields_copy = segment_fields.copy()
segment_fields_copy["document"] = fields.Nested(document_model)
segment_model = _get_or_create_model("HitTestingSegment", segment_fields_copy)
child_chunk_model = _get_or_create_model("HitTestingChildChunk", child_chunk_fields)
files_model = _get_or_create_model("HitTestingFile", files_fields)
hit_testing_record_fields_copy = hit_testing_record_fields.copy()
hit_testing_record_fields_copy["segment"] = fields.Nested(segment_model)
hit_testing_record_fields_copy["child_chunks"] = fields.List(fields.Nested(child_chunk_model))
hit_testing_record_fields_copy["files"] = fields.List(fields.Nested(files_model))
hit_testing_record_model = _get_or_create_model("HitTestingRecord", hit_testing_record_fields_copy)
# Response model for hit testing API
hit_testing_response_fields = {
"query": fields.String,
"records": fields.List(fields.Nested(hit_testing_record_model)),
}
hit_testing_response_model = _get_or_create_model("HitTestingResponse", hit_testing_response_fields)
@console_ns.route("/datasets/<uuid:dataset_id>/hit-testing")
class HitTestingApi(Resource, DatasetsHitTestingBase):
@console_ns.doc("test_dataset_retrieval")
@console_ns.doc(description="Test dataset knowledge retrieval")
@console_ns.doc(params={"dataset_id": "Dataset ID"})
@console_ns.expect(console_ns.models[HitTestingPayload.__name__])
@console_ns.response(200, "Hit testing completed successfully")
@console_ns.response(200, "Hit testing completed successfully", model=hit_testing_response_model)
@console_ns.response(404, "Dataset not found")
@console_ns.response(400, "Invalid parameters")
@setup_required

View File

@ -1,65 +0,0 @@
import json
import httpx
import yaml
from flask_restx import Resource, reqparse
from sqlalchemy.orm import Session
from werkzeug.exceptions import Forbidden
from controllers.console import console_ns
from controllers.console.wraps import account_initialization_required, setup_required
from core.plugin.impl.exc import PluginPermissionDeniedError
from extensions.ext_database import db
from libs.login import current_account_with_tenant, login_required
from models.model import App
from models.workflow import Workflow
from services.app_dsl_service import AppDslService
@console_ns.route("/workspaces/current/dsl/predict")
class DSLPredictApi(Resource):
@setup_required
@login_required
@account_initialization_required
def post(self):
user, _ = current_account_with_tenant()
if not user.is_admin_or_owner:
raise Forbidden()
parser = (
reqparse.RequestParser()
.add_argument("app_id", type=str, required=True, location="json")
.add_argument("current_node_id", type=str, required=True, location="json")
)
args = parser.parse_args()
app_id: str = args["app_id"]
current_node_id: str = args["current_node_id"]
with Session(db.engine) as session:
app = session.query(App).filter_by(id=app_id).first()
workflow = session.query(Workflow).filter_by(app_id=app_id, version=Workflow.VERSION_DRAFT).first()
if not app:
raise ValueError("App not found")
if not workflow:
raise ValueError("Workflow not found")
try:
i = 0
for node_id, _ in workflow.walk_nodes():
if node_id == current_node_id:
break
i += 1
dsl = yaml.safe_load(AppDslService.export_dsl(app_model=app))
response = httpx.post(
"http://spark-832c:8000/predict",
json={"graph_data": dsl, "source_node_index": i},
)
return {
"nodes": json.loads(response.json()),
}
except PluginPermissionDeniedError as e:
raise ValueError(e.description) from e

View File

@ -107,12 +107,6 @@ class MemberInviteEmailApi(Resource):
inviter = current_user
if not inviter.current_tenant:
raise ValueError("No current tenant")
# Check workspace permission for member invitations
from libs.workspace_permission import check_workspace_member_invite_permission
check_workspace_member_invite_permission(inviter.current_tenant.id)
invitation_results = []
console_web_url = dify_config.CONSOLE_WEB_URL

View File

@ -1,95 +0,0 @@
import logging
from flask_restx import Resource, fields, reqparse
from controllers.console import console_ns
from controllers.console.wraps import account_initialization_required, setup_required
from core.model_runtime.utils.encoders import jsonable_encoder
from libs.login import current_account_with_tenant, login_required
from services.sandbox.sandbox_provider_service import SandboxProviderService
logger = logging.getLogger(__name__)
@console_ns.route("/workspaces/current/sandbox-providers")
class SandboxProviderListApi(Resource):
@console_ns.doc("list_sandbox_providers")
@console_ns.doc(description="Get list of available sandbox providers with configuration status")
@console_ns.response(200, "Success", fields.List(fields.Raw(description="Sandbox provider information")))
@setup_required
@login_required
@account_initialization_required
def get(self):
_, current_tenant_id = current_account_with_tenant()
providers = SandboxProviderService.list_providers(current_tenant_id)
return jsonable_encoder([p.model_dump() for p in providers])
config_parser = reqparse.RequestParser()
config_parser.add_argument("config", type=dict, required=True, location="json")
@console_ns.route("/workspaces/current/sandbox-provider/<string:provider_type>/config")
class SandboxProviderConfigApi(Resource):
@console_ns.doc("save_sandbox_provider_config")
@console_ns.doc(description="Save or update configuration for a sandbox provider")
@console_ns.expect(config_parser)
@console_ns.response(200, "Success")
@setup_required
@login_required
@account_initialization_required
def post(self, provider_type: str):
_, current_tenant_id = current_account_with_tenant()
args = config_parser.parse_args()
try:
result = SandboxProviderService.save_config(
tenant_id=current_tenant_id,
provider_type=provider_type,
config=args["config"],
)
return result
except ValueError as e:
return {"message": str(e)}, 400
@console_ns.doc("delete_sandbox_provider_config")
@console_ns.doc(description="Delete configuration for a sandbox provider")
@console_ns.response(200, "Success")
@setup_required
@login_required
@account_initialization_required
def delete(self, provider_type: str):
_, current_tenant_id = current_account_with_tenant()
try:
result = SandboxProviderService.delete_config(
tenant_id=current_tenant_id,
provider_type=provider_type,
)
return result
except ValueError as e:
return {"message": str(e)}, 400
@console_ns.route("/workspaces/current/sandbox-provider/<string:provider_type>/activate")
class SandboxProviderActivateApi(Resource):
"""Activate a sandbox provider."""
@console_ns.doc("activate_sandbox_provider")
@console_ns.doc(description="Activate a sandbox provider for the current workspace")
@console_ns.response(200, "Success")
@setup_required
@login_required
@account_initialization_required
def post(self, provider_type: str):
"""Activate a sandbox provider."""
_, current_tenant_id = current_account_with_tenant()
try:
result = SandboxProviderService.activate_provider(
tenant_id=current_tenant_id,
provider_type=provider_type,
)
return result
except ValueError as e:
return {"message": str(e)}, 400

View File

@ -20,7 +20,6 @@ from controllers.console.error import AccountNotLinkTenantError
from controllers.console.wraps import (
account_initialization_required,
cloud_edition_billing_resource_check,
only_edition_enterprise,
setup_required,
)
from enums.cloud_plan import CloudPlan
@ -29,7 +28,6 @@ from libs.helper import TimestampField
from libs.login import current_account_with_tenant, login_required
from models.account import Tenant, TenantStatus
from services.account_service import TenantService
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
from services.file_service import FileService
from services.workspace_service import WorkspaceService
@ -290,31 +288,3 @@ class WorkspaceInfoApi(Resource):
db.session.commit()
return {"result": "success", "tenant": marshal(WorkspaceService.get_tenant_info(tenant), tenant_fields)}
@console_ns.route("/workspaces/current/permission")
class WorkspacePermissionApi(Resource):
"""Get workspace permissions for the current workspace."""
@setup_required
@login_required
@account_initialization_required
@only_edition_enterprise
def get(self):
"""
Get workspace permission settings.
Returns permission flags that control workspace features like member invitations and owner transfer.
"""
_, current_tenant_id = current_account_with_tenant()
if not current_tenant_id:
raise ValueError("No current tenant")
# Get workspace permissions from enterprise service
permission = EnterpriseService.WorkspacePermissionService.get_permission(current_tenant_id)
return {
"workspace_id": permission.workspace_id,
"allow_member_invite": permission.allow_member_invite,
"allow_owner_transfer": permission.allow_owner_transfer,
}, 200

View File

@ -286,12 +286,13 @@ def enable_change_email(view: Callable[P, R]):
def is_allow_transfer_owner(view: Callable[P, R]):
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs):
from libs.workspace_permission import check_workspace_owner_transfer_permission
_, current_tenant_id = current_account_with_tenant()
# Check both billing/plan level and workspace policy level permissions
check_workspace_owner_transfer_permission(current_tenant_id)
return view(*args, **kwargs)
features = FeatureService.get_features(current_tenant_id)
if features.is_allow_transfer_workspace:
return view(*args, **kwargs)
# otherwise, return 403
abort(403)
return decorated

View File

@ -14,7 +14,7 @@ api = ExternalApi(
files_ns = Namespace("files", description="File operations", path="/")
from . import image_preview, storage_download, tool_files, upload
from . import image_preview, tool_files, upload
api.add_namespace(files_ns)
@ -23,7 +23,6 @@ __all__ = [
"bp",
"files_ns",
"image_preview",
"storage_download",
"tool_files",
"upload",
]

View File

@ -1,56 +0,0 @@
from urllib.parse import quote, unquote
from flask import Response, request
from flask_restx import Resource
from pydantic import BaseModel, Field
from werkzeug.exceptions import Forbidden, NotFound
from controllers.files import files_ns
from extensions.ext_storage import storage
from extensions.storage.file_presign_storage import FilePresignStorage
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
class StorageDownloadQuery(BaseModel):
timestamp: str = Field(..., description="Unix timestamp used in the signature")
nonce: str = Field(..., description="Random string for signature")
sign: str = Field(..., description="HMAC signature")
files_ns.schema_model(
StorageDownloadQuery.__name__,
StorageDownloadQuery.model_json_schema(ref_template=DEFAULT_REF_TEMPLATE_SWAGGER_2_0),
)
@files_ns.route("/storage/<path:filename>/download")
class StorageFileDownloadApi(Resource):
def get(self, filename: str):
filename = unquote(filename)
args = StorageDownloadQuery.model_validate(request.args.to_dict(flat=True))
if not FilePresignStorage.verify_signature(
filename=filename,
timestamp=args.timestamp,
nonce=args.nonce,
sign=args.sign,
):
raise Forbidden("Invalid or expired download link")
try:
generator = storage.load_stream(filename)
except FileNotFoundError:
raise NotFound("File not found")
encoded_filename = quote(filename.split("/")[-1])
return Response(
generator,
mimetype="application/octet-stream",
direct_passthrough=True,
headers={
"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}",
},
)

View File

@ -448,53 +448,3 @@ class PluginFetchAppInfoApi(Resource):
return BaseBackwardsInvocationResponse(
data=PluginAppBackwardsInvocation.fetch_app_info(payload.app_id, tenant_model.id)
).model_dump()
@inner_api_ns.route("/fetch/tools/list")
class PluginFetchToolsListApi(Resource):
@get_user_tenant
@setup_required
@plugin_inner_api_only
@inner_api_ns.doc("plugin_fetch_tools_list")
@inner_api_ns.doc(description="Fetch all available tools through plugin interface")
@inner_api_ns.doc(
responses={
200: "Tools list retrieved successfully",
401: "Unauthorized - invalid API key",
404: "Service not available",
}
)
def post(self, user_model: Account | EndUser, tenant_model: Tenant):
from sqlalchemy.orm import Session
from extensions.ext_database import db
from services.tools.api_tools_manage_service import ApiToolManageService
from services.tools.builtin_tools_manage_service import BuiltinToolManageService
from services.tools.mcp_tools_manage_service import MCPToolManageService
from services.tools.workflow_tools_manage_service import WorkflowToolManageService
providers = []
# Get builtin tools
builtin_providers = BuiltinToolManageService.list_builtin_tools(user_model.id, tenant_model.id)
for provider in builtin_providers:
providers.append(provider.to_dict())
# Get API tools
api_providers = ApiToolManageService.list_api_tools(tenant_model.id)
for provider in api_providers:
providers.append(provider.to_dict())
# Get workflow tools
workflow_providers = WorkflowToolManageService.list_tenant_workflow_tools(user_model.id, tenant_model.id)
for provider in workflow_providers:
providers.append(provider.to_dict())
# Get MCP tools
with Session(db.engine) as session:
mcp_service = MCPToolManageService(session)
mcp_providers = mcp_service.list_providers(tenant_id=tenant_model.id, for_list=True)
for provider in mcp_providers:
providers.append(provider.to_dict())
return BaseBackwardsInvocationResponse(data={"providers": providers}).model_dump()

View File

@ -75,6 +75,7 @@ def get_user_tenant(view_func: Callable[P, R]):
@wraps(view_func)
def decorated_view(*args: P.args, **kwargs: P.kwargs):
payload = TenantUserPayload.model_validate(request.get_json(silent=True) or {})
user_id = payload.user_id
tenant_id = payload.tenant_id

View File

@ -5,15 +5,14 @@ from hashlib import sha1
from hmac import new as hmac_new
from typing import ParamSpec, TypeVar
P = ParamSpec("P")
R = TypeVar("R")
from flask import abort, request
from configs import dify_config
from extensions.ext_database import db
from models.model import EndUser
P = ParamSpec("P")
R = TypeVar("R")
def billing_inner_api_only(view: Callable[P, R]):
@wraps(view)
@ -89,11 +88,11 @@ def plugin_inner_api_only(view: Callable[P, R]):
if not dify_config.PLUGIN_DAEMON_KEY:
abort(404)
# validate using inner api key
# get header 'X-Inner-Api-Key'
inner_api_key = request.headers.get("X-Inner-Api-Key")
if inner_api_key and inner_api_key == dify_config.INNER_API_KEY_FOR_PLUGIN:
return view(*args, **kwargs)
if not inner_api_key or inner_api_key != dify_config.INNER_API_KEY_FOR_PLUGIN:
abort(404)
abort(401)
return view(*args, **kwargs)
return decorated

View File

@ -1,380 +0,0 @@
import logging
from collections.abc import Generator
from copy import deepcopy
from typing import Any
from core.agent.base_agent_runner import BaseAgentRunner
from core.agent.entities import AgentEntity, AgentLog, AgentResult
from core.agent.patterns.strategy_factory import StrategyFactory
from core.app.apps.base_app_queue_manager import PublishFrom
from core.app.entities.queue_entities import QueueAgentThoughtEvent, QueueMessageEndEvent, QueueMessageFileEvent
from core.file import file_manager
from core.model_runtime.entities import (
AssistantPromptMessage,
LLMResult,
LLMResultChunk,
LLMUsage,
PromptMessage,
PromptMessageContentType,
SystemPromptMessage,
TextPromptMessageContent,
UserPromptMessage,
)
from core.model_runtime.entities.message_entities import ImagePromptMessageContent, PromptMessageContentUnionTypes
from core.prompt.agent_history_prompt_transform import AgentHistoryPromptTransform
from core.tools.__base.tool import Tool
from core.tools.entities.tool_entities import ToolInvokeMeta
from core.tools.tool_engine import ToolEngine
from models.model import Message
logger = logging.getLogger(__name__)
class AgentAppRunner(BaseAgentRunner):
def _create_tool_invoke_hook(self, message: Message):
"""
Create a tool invoke hook that uses ToolEngine.agent_invoke.
This hook handles file creation and returns proper meta information.
"""
# Get trace manager from app generate entity
trace_manager = self.application_generate_entity.trace_manager
def tool_invoke_hook(
tool: Tool, tool_args: dict[str, Any], tool_name: str
) -> tuple[str, list[str], ToolInvokeMeta]:
"""Hook that uses agent_invoke for proper file and meta handling."""
tool_invoke_response, message_files, tool_invoke_meta = ToolEngine.agent_invoke(
tool=tool,
tool_parameters=tool_args,
user_id=self.user_id,
tenant_id=self.tenant_id,
message=message,
invoke_from=self.application_generate_entity.invoke_from,
agent_tool_callback=self.agent_callback,
trace_manager=trace_manager,
app_id=self.application_generate_entity.app_config.app_id,
message_id=message.id,
conversation_id=self.conversation.id,
)
# Publish files and track IDs
for message_file_id in message_files:
self.queue_manager.publish(
QueueMessageFileEvent(message_file_id=message_file_id),
PublishFrom.APPLICATION_MANAGER,
)
self._current_message_file_ids.append(message_file_id)
return tool_invoke_response, message_files, tool_invoke_meta
return tool_invoke_hook
def run(self, message: Message, query: str, **kwargs: Any) -> Generator[LLMResultChunk, None, None]:
"""
Run Agent application
"""
self.query = query
app_generate_entity = self.application_generate_entity
app_config = self.app_config
assert app_config is not None, "app_config is required"
assert app_config.agent is not None, "app_config.agent is required"
# convert tools into ModelRuntime Tool format
tool_instances, _ = self._init_prompt_tools()
assert app_config.agent
# Create tool invoke hook for agent_invoke
tool_invoke_hook = self._create_tool_invoke_hook(message)
# Get instruction for ReAct strategy
instruction = self.app_config.prompt_template.simple_prompt_template or ""
# Use factory to create appropriate strategy
strategy = StrategyFactory.create_strategy(
model_features=self.model_features,
model_instance=self.model_instance,
tools=list(tool_instances.values()),
files=list(self.files),
max_iterations=app_config.agent.max_iteration,
context=self.build_execution_context(),
agent_strategy=self.config.strategy,
tool_invoke_hook=tool_invoke_hook,
instruction=instruction,
)
# Initialize state variables
current_agent_thought_id = None
has_published_thought = False
current_tool_name: str | None = None
self._current_message_file_ids: list[str] = []
# organize prompt messages
prompt_messages = self._organize_prompt_messages()
# Run strategy
generator = strategy.run(
prompt_messages=prompt_messages,
model_parameters=app_generate_entity.model_conf.parameters,
stop=app_generate_entity.model_conf.stop,
stream=True,
)
# Consume generator and collect result
result: AgentResult | None = None
try:
while True:
try:
output = next(generator)
except StopIteration as e:
# Generator finished, get the return value
result = e.value
break
if isinstance(output, LLMResultChunk):
# Handle LLM chunk
if current_agent_thought_id and not has_published_thought:
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=current_agent_thought_id),
PublishFrom.APPLICATION_MANAGER,
)
has_published_thought = True
yield output
elif isinstance(output, AgentLog):
# Handle Agent Log using log_type for type-safe dispatch
if output.status == AgentLog.LogStatus.START:
if output.log_type == AgentLog.LogType.ROUND:
# Start of a new round
message_file_ids: list[str] = []
current_agent_thought_id = self.create_agent_thought(
message_id=message.id,
message="",
tool_name="",
tool_input="",
messages_ids=message_file_ids,
)
has_published_thought = False
elif output.log_type == AgentLog.LogType.TOOL_CALL:
if current_agent_thought_id is None:
continue
# Tool call start - extract data from structured fields
current_tool_name = output.data.get("tool_name", "")
tool_input = output.data.get("tool_args", {})
self.save_agent_thought(
agent_thought_id=current_agent_thought_id,
tool_name=current_tool_name,
tool_input=tool_input,
thought=None,
observation=None,
tool_invoke_meta=None,
answer=None,
messages_ids=[],
)
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=current_agent_thought_id),
PublishFrom.APPLICATION_MANAGER,
)
elif output.status == AgentLog.LogStatus.SUCCESS:
if output.log_type == AgentLog.LogType.THOUGHT:
if current_agent_thought_id is None:
continue
thought_text = output.data.get("thought")
self.save_agent_thought(
agent_thought_id=current_agent_thought_id,
tool_name=None,
tool_input=None,
thought=thought_text,
observation=None,
tool_invoke_meta=None,
answer=None,
messages_ids=[],
)
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=current_agent_thought_id),
PublishFrom.APPLICATION_MANAGER,
)
elif output.log_type == AgentLog.LogType.TOOL_CALL:
if current_agent_thought_id is None:
continue
# Tool call finished
tool_output = output.data.get("output")
# Get meta from strategy output (now properly populated)
tool_meta = output.data.get("meta")
# Wrap tool_meta with tool_name as key (required by agent_service)
if tool_meta and current_tool_name:
tool_meta = {current_tool_name: tool_meta}
self.save_agent_thought(
agent_thought_id=current_agent_thought_id,
tool_name=None,
tool_input=None,
thought=None,
observation=tool_output,
tool_invoke_meta=tool_meta,
answer=None,
messages_ids=self._current_message_file_ids,
)
# Clear message file ids after saving
self._current_message_file_ids = []
current_tool_name = None
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=current_agent_thought_id),
PublishFrom.APPLICATION_MANAGER,
)
elif output.log_type == AgentLog.LogType.ROUND:
if current_agent_thought_id is None:
continue
# Round finished - save LLM usage and answer
llm_usage = output.metadata.get(AgentLog.LogMetadata.LLM_USAGE)
llm_result = output.data.get("llm_result")
final_answer = output.data.get("final_answer")
self.save_agent_thought(
agent_thought_id=current_agent_thought_id,
tool_name=None,
tool_input=None,
thought=llm_result,
observation=None,
tool_invoke_meta=None,
answer=final_answer,
messages_ids=[],
llm_usage=llm_usage,
)
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=current_agent_thought_id),
PublishFrom.APPLICATION_MANAGER,
)
except Exception:
# Re-raise any other exceptions
raise
# Process final result
if isinstance(result, AgentResult):
final_answer = result.text
usage = result.usage or LLMUsage.empty_usage()
# Publish end event
self.queue_manager.publish(
QueueMessageEndEvent(
llm_result=LLMResult(
model=self.model_instance.model,
prompt_messages=prompt_messages,
message=AssistantPromptMessage(content=final_answer),
usage=usage,
system_fingerprint="",
)
),
PublishFrom.APPLICATION_MANAGER,
)
def _init_system_message(self, prompt_template: str, prompt_messages: list[PromptMessage]) -> list[PromptMessage]:
"""
Initialize system message
"""
if not prompt_template:
return prompt_messages or []
prompt_messages = prompt_messages or []
if prompt_messages and isinstance(prompt_messages[0], SystemPromptMessage):
prompt_messages[0] = SystemPromptMessage(content=prompt_template)
return prompt_messages
if not prompt_messages:
return [SystemPromptMessage(content=prompt_template)]
prompt_messages.insert(0, SystemPromptMessage(content=prompt_template))
return prompt_messages
def _organize_user_query(self, query: str, prompt_messages: list[PromptMessage]) -> list[PromptMessage]:
"""
Organize user query
"""
if self.files:
# get image detail config
image_detail_config = (
self.application_generate_entity.file_upload_config.image_config.detail
if (
self.application_generate_entity.file_upload_config
and self.application_generate_entity.file_upload_config.image_config
)
else None
)
image_detail_config = image_detail_config or ImagePromptMessageContent.DETAIL.LOW
prompt_message_contents: list[PromptMessageContentUnionTypes] = []
for file in self.files:
prompt_message_contents.append(
file_manager.to_prompt_message_content(
file,
image_detail_config=image_detail_config,
)
)
prompt_message_contents.append(TextPromptMessageContent(data=query))
prompt_messages.append(UserPromptMessage(content=prompt_message_contents))
else:
prompt_messages.append(UserPromptMessage(content=query))
return prompt_messages
def _clear_user_prompt_image_messages(self, prompt_messages: list[PromptMessage]) -> list[PromptMessage]:
"""
As for now, gpt supports both fc and vision at the first iteration.
We need to remove the image messages from the prompt messages at the first iteration.
"""
prompt_messages = deepcopy(prompt_messages)
for prompt_message in prompt_messages:
if isinstance(prompt_message, UserPromptMessage):
if isinstance(prompt_message.content, list):
prompt_message.content = "\n".join(
[
content.data
if content.type == PromptMessageContentType.TEXT
else "[image]"
if content.type == PromptMessageContentType.IMAGE
else "[file]"
for content in prompt_message.content
]
)
return prompt_messages
def _organize_prompt_messages(self):
# For ReAct strategy, use the agent prompt template
if self.config.strategy == AgentEntity.Strategy.CHAIN_OF_THOUGHT and self.config.prompt:
prompt_template = self.config.prompt.first_prompt
else:
prompt_template = self.app_config.prompt_template.simple_prompt_template or ""
self.history_prompt_messages = self._init_system_message(prompt_template, self.history_prompt_messages)
query_prompt_messages = self._organize_user_query(self.query or "", [])
self.history_prompt_messages = AgentHistoryPromptTransform(
model_config=self.model_config,
prompt_messages=[*query_prompt_messages, *self._current_thoughts],
history_messages=self.history_prompt_messages,
memory=self.memory,
).get_prompt()
prompt_messages = [*self.history_prompt_messages, *query_prompt_messages, *self._current_thoughts]
if len(self._current_thoughts) != 0:
# clear messages after the first iteration
prompt_messages = self._clear_user_prompt_image_messages(prompt_messages)
return prompt_messages

View File

@ -6,7 +6,7 @@ from typing import Union, cast
from sqlalchemy import select
from core.agent.entities import AgentEntity, AgentToolEntity, ExecutionContext
from core.agent.entities import AgentEntity, AgentToolEntity
from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
from core.app.apps.agent_chat.app_config_manager import AgentChatAppConfig
from core.app.apps.base_app_queue_manager import AppQueueManager
@ -116,20 +116,9 @@ class BaseAgentRunner(AppRunner):
features = model_schema.features if model_schema and model_schema.features else []
self.stream_tool_call = ModelFeature.STREAM_TOOL_CALL in features
self.files = application_generate_entity.files if ModelFeature.VISION in features else []
self.model_features = features
self.query: str | None = ""
self._current_thoughts: list[PromptMessage] = []
def build_execution_context(self) -> ExecutionContext:
"""Build execution context."""
return ExecutionContext(
user_id=self.user_id,
app_id=self.app_config.app_id,
conversation_id=self.conversation.id,
message_id=self.message.id,
tenant_id=self.tenant_id,
)
def _repack_app_generate_entity(
self, app_generate_entity: AgentChatAppGenerateEntity
) -> AgentChatAppGenerateEntity:

View File

@ -0,0 +1,437 @@
import json
import logging
from abc import ABC, abstractmethod
from collections.abc import Generator, Mapping, Sequence
from typing import Any
from core.agent.base_agent_runner import BaseAgentRunner
from core.agent.entities import AgentScratchpadUnit
from core.agent.output_parser.cot_output_parser import CotAgentOutputParser
from core.app.apps.base_app_queue_manager import PublishFrom
from core.app.entities.queue_entities import QueueAgentThoughtEvent, QueueMessageEndEvent, QueueMessageFileEvent
from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage
from core.model_runtime.entities.message_entities import (
AssistantPromptMessage,
PromptMessage,
PromptMessageTool,
ToolPromptMessage,
UserPromptMessage,
)
from core.ops.ops_trace_manager import TraceQueueManager
from core.prompt.agent_history_prompt_transform import AgentHistoryPromptTransform
from core.tools.__base.tool import Tool
from core.tools.entities.tool_entities import ToolInvokeMeta
from core.tools.tool_engine import ToolEngine
from core.workflow.nodes.agent.exc import AgentMaxIterationError
from models.model import Message
logger = logging.getLogger(__name__)
class CotAgentRunner(BaseAgentRunner, ABC):
_is_first_iteration = True
_ignore_observation_providers = ["wenxin"]
_historic_prompt_messages: list[PromptMessage]
_agent_scratchpad: list[AgentScratchpadUnit]
_instruction: str
_query: str
_prompt_messages_tools: Sequence[PromptMessageTool]
def run(
self,
message: Message,
query: str,
inputs: Mapping[str, str],
) -> Generator:
"""
Run Cot agent application
"""
app_generate_entity = self.application_generate_entity
self._repack_app_generate_entity(app_generate_entity)
self._init_react_state(query)
trace_manager = app_generate_entity.trace_manager
# check model mode
if "Observation" not in app_generate_entity.model_conf.stop:
if app_generate_entity.model_conf.provider not in self._ignore_observation_providers:
app_generate_entity.model_conf.stop.append("Observation")
app_config = self.app_config
assert app_config.agent
# init instruction
inputs = inputs or {}
instruction = app_config.prompt_template.simple_prompt_template or ""
self._instruction = self._fill_in_inputs_from_external_data_tools(instruction, inputs)
iteration_step = 1
max_iteration_steps = min(app_config.agent.max_iteration, 99) + 1
# convert tools into ModelRuntime Tool format
tool_instances, prompt_messages_tools = self._init_prompt_tools()
self._prompt_messages_tools = prompt_messages_tools
function_call_state = True
llm_usage: dict[str, LLMUsage | None] = {"usage": None}
final_answer = ""
prompt_messages: list = [] # Initialize prompt_messages
agent_thought_id = "" # Initialize agent_thought_id
def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMUsage):
if not final_llm_usage_dict["usage"]:
final_llm_usage_dict["usage"] = usage
else:
llm_usage = final_llm_usage_dict["usage"]
llm_usage.prompt_tokens += usage.prompt_tokens
llm_usage.completion_tokens += usage.completion_tokens
llm_usage.total_tokens += usage.total_tokens
llm_usage.prompt_price += usage.prompt_price
llm_usage.completion_price += usage.completion_price
llm_usage.total_price += usage.total_price
model_instance = self.model_instance
while function_call_state and iteration_step <= max_iteration_steps:
# continue to run until there is not any tool call
function_call_state = False
if iteration_step == max_iteration_steps:
# the last iteration, remove all tools
self._prompt_messages_tools = []
message_file_ids: list[str] = []
agent_thought_id = self.create_agent_thought(
message_id=message.id, message="", tool_name="", tool_input="", messages_ids=message_file_ids
)
if iteration_step > 1:
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER
)
# recalc llm max tokens
prompt_messages = self._organize_prompt_messages()
self.recalc_llm_max_tokens(self.model_config, prompt_messages)
# invoke model
chunks = model_instance.invoke_llm(
prompt_messages=prompt_messages,
model_parameters=app_generate_entity.model_conf.parameters,
tools=[],
stop=app_generate_entity.model_conf.stop,
stream=True,
user=self.user_id,
callbacks=[],
)
usage_dict: dict[str, LLMUsage | None] = {}
react_chunks = CotAgentOutputParser.handle_react_stream_output(chunks, usage_dict)
scratchpad = AgentScratchpadUnit(
agent_response="",
thought="",
action_str="",
observation="",
action=None,
)
# publish agent thought if it's first iteration
if iteration_step == 1:
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER
)
for chunk in react_chunks:
if isinstance(chunk, AgentScratchpadUnit.Action):
action = chunk
# detect action
assert scratchpad.agent_response is not None
scratchpad.agent_response += json.dumps(chunk.model_dump())
scratchpad.action_str = json.dumps(chunk.model_dump())
scratchpad.action = action
else:
assert scratchpad.agent_response is not None
scratchpad.agent_response += chunk
assert scratchpad.thought is not None
scratchpad.thought += chunk
yield LLMResultChunk(
model=self.model_config.model,
prompt_messages=prompt_messages,
system_fingerprint="",
delta=LLMResultChunkDelta(index=0, message=AssistantPromptMessage(content=chunk), usage=None),
)
assert scratchpad.thought is not None
scratchpad.thought = scratchpad.thought.strip() or "I am thinking about how to help you"
self._agent_scratchpad.append(scratchpad)
# Check if max iteration is reached and model still wants to call tools
if iteration_step == max_iteration_steps and scratchpad.action:
if scratchpad.action.action_name.lower() != "final answer":
raise AgentMaxIterationError(app_config.agent.max_iteration)
# get llm usage
if "usage" in usage_dict:
if usage_dict["usage"] is not None:
increase_usage(llm_usage, usage_dict["usage"])
else:
usage_dict["usage"] = LLMUsage.empty_usage()
self.save_agent_thought(
agent_thought_id=agent_thought_id,
tool_name=(scratchpad.action.action_name if scratchpad.action and not scratchpad.is_final() else ""),
tool_input={scratchpad.action.action_name: scratchpad.action.action_input} if scratchpad.action else {},
tool_invoke_meta={},
thought=scratchpad.thought or "",
observation="",
answer=scratchpad.agent_response or "",
messages_ids=[],
llm_usage=usage_dict["usage"],
)
if not scratchpad.is_final():
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER
)
if not scratchpad.action:
# failed to extract action, return final answer directly
final_answer = ""
else:
if scratchpad.action.action_name.lower() == "final answer":
# action is final answer, return final answer directly
try:
if isinstance(scratchpad.action.action_input, dict):
final_answer = json.dumps(scratchpad.action.action_input, ensure_ascii=False)
elif isinstance(scratchpad.action.action_input, str):
final_answer = scratchpad.action.action_input
else:
final_answer = f"{scratchpad.action.action_input}"
except TypeError:
final_answer = f"{scratchpad.action.action_input}"
else:
function_call_state = True
# action is tool call, invoke tool
tool_invoke_response, tool_invoke_meta = self._handle_invoke_action(
action=scratchpad.action,
tool_instances=tool_instances,
message_file_ids=message_file_ids,
trace_manager=trace_manager,
)
scratchpad.observation = tool_invoke_response
scratchpad.agent_response = tool_invoke_response
self.save_agent_thought(
agent_thought_id=agent_thought_id,
tool_name=scratchpad.action.action_name,
tool_input={scratchpad.action.action_name: scratchpad.action.action_input},
thought=scratchpad.thought or "",
observation={scratchpad.action.action_name: tool_invoke_response},
tool_invoke_meta={scratchpad.action.action_name: tool_invoke_meta.to_dict()},
answer=scratchpad.agent_response,
messages_ids=message_file_ids,
llm_usage=usage_dict["usage"],
)
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER
)
# update prompt tool message
for prompt_tool in self._prompt_messages_tools:
self.update_prompt_message_tool(tool_instances[prompt_tool.name], prompt_tool)
iteration_step += 1
yield LLMResultChunk(
model=model_instance.model,
prompt_messages=prompt_messages,
delta=LLMResultChunkDelta(
index=0, message=AssistantPromptMessage(content=final_answer), usage=llm_usage["usage"]
),
system_fingerprint="",
)
# save agent thought
self.save_agent_thought(
agent_thought_id=agent_thought_id,
tool_name="",
tool_input={},
tool_invoke_meta={},
thought=final_answer,
observation={},
answer=final_answer,
messages_ids=[],
)
# publish end event
self.queue_manager.publish(
QueueMessageEndEvent(
llm_result=LLMResult(
model=model_instance.model,
prompt_messages=prompt_messages,
message=AssistantPromptMessage(content=final_answer),
usage=llm_usage["usage"] or LLMUsage.empty_usage(),
system_fingerprint="",
)
),
PublishFrom.APPLICATION_MANAGER,
)
def _handle_invoke_action(
self,
action: AgentScratchpadUnit.Action,
tool_instances: Mapping[str, Tool],
message_file_ids: list[str],
trace_manager: TraceQueueManager | None = None,
) -> tuple[str, ToolInvokeMeta]:
"""
handle invoke action
:param action: action
:param tool_instances: tool instances
:param message_file_ids: message file ids
:param trace_manager: trace manager
:return: observation, meta
"""
# action is tool call, invoke tool
tool_call_name = action.action_name
tool_call_args = action.action_input
tool_instance = tool_instances.get(tool_call_name)
if not tool_instance:
answer = f"there is not a tool named {tool_call_name}"
return answer, ToolInvokeMeta.error_instance(answer)
if isinstance(tool_call_args, str):
try:
tool_call_args = json.loads(tool_call_args)
except json.JSONDecodeError:
pass
# invoke tool
tool_invoke_response, message_files, tool_invoke_meta = ToolEngine.agent_invoke(
tool=tool_instance,
tool_parameters=tool_call_args,
user_id=self.user_id,
tenant_id=self.tenant_id,
message=self.message,
invoke_from=self.application_generate_entity.invoke_from,
agent_tool_callback=self.agent_callback,
trace_manager=trace_manager,
)
# publish files
for message_file_id in message_files:
# publish message file
self.queue_manager.publish(
QueueMessageFileEvent(message_file_id=message_file_id), PublishFrom.APPLICATION_MANAGER
)
# add message file ids
message_file_ids.append(message_file_id)
return tool_invoke_response, tool_invoke_meta
def _convert_dict_to_action(self, action: dict) -> AgentScratchpadUnit.Action:
"""
convert dict to action
"""
return AgentScratchpadUnit.Action(action_name=action["action"], action_input=action["action_input"])
def _fill_in_inputs_from_external_data_tools(self, instruction: str, inputs: Mapping[str, Any]) -> str:
"""
fill in inputs from external data tools
"""
for key, value in inputs.items():
try:
instruction = instruction.replace(f"{{{{{key}}}}}", str(value))
except Exception:
continue
return instruction
def _init_react_state(self, query):
"""
init agent scratchpad
"""
self._query = query
self._agent_scratchpad = []
self._historic_prompt_messages = self._organize_historic_prompt_messages()
@abstractmethod
def _organize_prompt_messages(self) -> list[PromptMessage]:
"""
organize prompt messages
"""
def _format_assistant_message(self, agent_scratchpad: list[AgentScratchpadUnit]) -> str:
"""
format assistant message
"""
message = ""
for scratchpad in agent_scratchpad:
if scratchpad.is_final():
message += f"Final Answer: {scratchpad.agent_response}"
else:
message += f"Thought: {scratchpad.thought}\n\n"
if scratchpad.action_str:
message += f"Action: {scratchpad.action_str}\n\n"
if scratchpad.observation:
message += f"Observation: {scratchpad.observation}\n\n"
return message
def _organize_historic_prompt_messages(
self, current_session_messages: list[PromptMessage] | None = None
) -> list[PromptMessage]:
"""
organize historic prompt messages
"""
result: list[PromptMessage] = []
scratchpads: list[AgentScratchpadUnit] = []
current_scratchpad: AgentScratchpadUnit | None = None
for message in self.history_prompt_messages:
if isinstance(message, AssistantPromptMessage):
if not current_scratchpad:
assert isinstance(message.content, str)
current_scratchpad = AgentScratchpadUnit(
agent_response=message.content,
thought=message.content or "I am thinking about how to help you",
action_str="",
action=None,
observation=None,
)
scratchpads.append(current_scratchpad)
if message.tool_calls:
try:
current_scratchpad.action = AgentScratchpadUnit.Action(
action_name=message.tool_calls[0].function.name,
action_input=json.loads(message.tool_calls[0].function.arguments),
)
current_scratchpad.action_str = json.dumps(current_scratchpad.action.to_dict())
except Exception:
logger.exception("Failed to parse tool call from assistant message")
elif isinstance(message, ToolPromptMessage):
if current_scratchpad:
assert isinstance(message.content, str)
current_scratchpad.observation = message.content
else:
raise NotImplementedError("expected str type")
elif isinstance(message, UserPromptMessage):
if scratchpads:
result.append(AssistantPromptMessage(content=self._format_assistant_message(scratchpads)))
scratchpads = []
current_scratchpad = None
result.append(message)
if scratchpads:
result.append(AssistantPromptMessage(content=self._format_assistant_message(scratchpads)))
historic_prompts = AgentHistoryPromptTransform(
model_config=self.model_config,
prompt_messages=current_session_messages or [],
history_messages=result,
memory=self.memory,
).get_prompt()
return historic_prompts

View File

@ -0,0 +1,118 @@
import json
from core.agent.cot_agent_runner import CotAgentRunner
from core.file import file_manager
from core.model_runtime.entities import (
AssistantPromptMessage,
PromptMessage,
SystemPromptMessage,
TextPromptMessageContent,
UserPromptMessage,
)
from core.model_runtime.entities.message_entities import ImagePromptMessageContent, PromptMessageContentUnionTypes
from core.model_runtime.utils.encoders import jsonable_encoder
class CotChatAgentRunner(CotAgentRunner):
def _organize_system_prompt(self) -> SystemPromptMessage:
"""
Organize system prompt
"""
assert self.app_config.agent
assert self.app_config.agent.prompt
prompt_entity = self.app_config.agent.prompt
if not prompt_entity:
raise ValueError("Agent prompt configuration is not set")
first_prompt = prompt_entity.first_prompt
system_prompt = (
first_prompt.replace("{{instruction}}", self._instruction)
.replace("{{tools}}", json.dumps(jsonable_encoder(self._prompt_messages_tools)))
.replace("{{tool_names}}", ", ".join([tool.name for tool in self._prompt_messages_tools]))
)
return SystemPromptMessage(content=system_prompt)
def _organize_user_query(self, query, prompt_messages: list[PromptMessage]) -> list[PromptMessage]:
"""
Organize user query
"""
if self.files:
# get image detail config
image_detail_config = (
self.application_generate_entity.file_upload_config.image_config.detail
if (
self.application_generate_entity.file_upload_config
and self.application_generate_entity.file_upload_config.image_config
)
else None
)
image_detail_config = image_detail_config or ImagePromptMessageContent.DETAIL.LOW
prompt_message_contents: list[PromptMessageContentUnionTypes] = []
for file in self.files:
prompt_message_contents.append(
file_manager.to_prompt_message_content(
file,
image_detail_config=image_detail_config,
)
)
prompt_message_contents.append(TextPromptMessageContent(data=query))
prompt_messages.append(UserPromptMessage(content=prompt_message_contents))
else:
prompt_messages.append(UserPromptMessage(content=query))
return prompt_messages
def _organize_prompt_messages(self) -> list[PromptMessage]:
"""
Organize
"""
# organize system prompt
system_message = self._organize_system_prompt()
# organize current assistant messages
agent_scratchpad = self._agent_scratchpad
if not agent_scratchpad:
assistant_messages = []
else:
assistant_message = AssistantPromptMessage(content="")
assistant_message.content = "" # FIXME: type check tell mypy that assistant_message.content is str
for unit in agent_scratchpad:
if unit.is_final():
assert isinstance(assistant_message.content, str)
assistant_message.content += f"Final Answer: {unit.agent_response}"
else:
assert isinstance(assistant_message.content, str)
assistant_message.content += f"Thought: {unit.thought}\n\n"
if unit.action_str:
assistant_message.content += f"Action: {unit.action_str}\n\n"
if unit.observation:
assistant_message.content += f"Observation: {unit.observation}\n\n"
assistant_messages = [assistant_message]
# query messages
query_messages = self._organize_user_query(self._query, [])
if assistant_messages:
# organize historic prompt messages
historic_messages = self._organize_historic_prompt_messages(
[system_message, *query_messages, *assistant_messages, UserPromptMessage(content="continue")]
)
messages = [
system_message,
*historic_messages,
*query_messages,
*assistant_messages,
UserPromptMessage(content="continue"),
]
else:
# organize historic prompt messages
historic_messages = self._organize_historic_prompt_messages([system_message, *query_messages])
messages = [system_message, *historic_messages, *query_messages]
# join all messages
return messages

View File

@ -0,0 +1,87 @@
import json
from core.agent.cot_agent_runner import CotAgentRunner
from core.model_runtime.entities.message_entities import (
AssistantPromptMessage,
PromptMessage,
TextPromptMessageContent,
UserPromptMessage,
)
from core.model_runtime.utils.encoders import jsonable_encoder
class CotCompletionAgentRunner(CotAgentRunner):
def _organize_instruction_prompt(self) -> str:
"""
Organize instruction prompt
"""
if self.app_config.agent is None:
raise ValueError("Agent configuration is not set")
prompt_entity = self.app_config.agent.prompt
if prompt_entity is None:
raise ValueError("prompt entity is not set")
first_prompt = prompt_entity.first_prompt
system_prompt = (
first_prompt.replace("{{instruction}}", self._instruction)
.replace("{{tools}}", json.dumps(jsonable_encoder(self._prompt_messages_tools)))
.replace("{{tool_names}}", ", ".join([tool.name for tool in self._prompt_messages_tools]))
)
return system_prompt
def _organize_historic_prompt(self, current_session_messages: list[PromptMessage] | None = None) -> str:
"""
Organize historic prompt
"""
historic_prompt_messages = self._organize_historic_prompt_messages(current_session_messages)
historic_prompt = ""
for message in historic_prompt_messages:
if isinstance(message, UserPromptMessage):
historic_prompt += f"Question: {message.content}\n\n"
elif isinstance(message, AssistantPromptMessage):
if isinstance(message.content, str):
historic_prompt += message.content + "\n\n"
elif isinstance(message.content, list):
for content in message.content:
if not isinstance(content, TextPromptMessageContent):
continue
historic_prompt += content.data
return historic_prompt
def _organize_prompt_messages(self) -> list[PromptMessage]:
"""
Organize prompt messages
"""
# organize system prompt
system_prompt = self._organize_instruction_prompt()
# organize historic prompt messages
historic_prompt = self._organize_historic_prompt()
# organize current assistant messages
agent_scratchpad = self._agent_scratchpad
assistant_prompt = ""
for unit in agent_scratchpad or []:
if unit.is_final():
assistant_prompt += f"Final Answer: {unit.agent_response}"
else:
assistant_prompt += f"Thought: {unit.thought}\n\n"
if unit.action_str:
assistant_prompt += f"Action: {unit.action_str}\n\n"
if unit.observation:
assistant_prompt += f"Observation: {unit.observation}\n\n"
# query messages
query_prompt = f"Question: {self._query}"
# join all messages
prompt = (
system_prompt.replace("{{historic_messages}}", historic_prompt)
.replace("{{agent_scratchpad}}", assistant_prompt)
.replace("{{query}}", query_prompt)
)
return [UserPromptMessage(content=prompt)]

View File

@ -1,5 +1,3 @@
import uuid
from collections.abc import Mapping
from enum import StrEnum
from typing import Any, Union
@ -94,96 +92,3 @@ class AgentInvokeMessage(ToolInvokeMessage):
"""
pass
class ExecutionContext(BaseModel):
"""Execution context containing trace and audit information.
This context carries all the IDs and metadata that are not part of
the core business logic but needed for tracing, auditing, and
correlation purposes.
"""
user_id: str | None = None
app_id: str | None = None
conversation_id: str | None = None
message_id: str | None = None
tenant_id: str | None = None
@classmethod
def create_minimal(cls, user_id: str | None = None) -> "ExecutionContext":
"""Create a minimal context with only essential fields."""
return cls(user_id=user_id)
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for passing to legacy code."""
return {
"user_id": self.user_id,
"app_id": self.app_id,
"conversation_id": self.conversation_id,
"message_id": self.message_id,
"tenant_id": self.tenant_id,
}
def with_updates(self, **kwargs) -> "ExecutionContext":
"""Create a new context with updated fields."""
data = self.to_dict()
data.update(kwargs)
return ExecutionContext(
user_id=data.get("user_id"),
app_id=data.get("app_id"),
conversation_id=data.get("conversation_id"),
message_id=data.get("message_id"),
tenant_id=data.get("tenant_id"),
)
class AgentLog(BaseModel):
"""
Agent Log.
"""
class LogType(StrEnum):
"""Type of agent log entry."""
ROUND = "round" # A complete iteration round
THOUGHT = "thought" # LLM thinking/reasoning
TOOL_CALL = "tool_call" # Tool invocation
class LogMetadata(StrEnum):
STARTED_AT = "started_at"
FINISHED_AT = "finished_at"
ELAPSED_TIME = "elapsed_time"
TOTAL_PRICE = "total_price"
TOTAL_TOKENS = "total_tokens"
PROVIDER = "provider"
CURRENCY = "currency"
LLM_USAGE = "llm_usage"
ICON = "icon"
ICON_DARK = "icon_dark"
class LogStatus(StrEnum):
START = "start"
ERROR = "error"
SUCCESS = "success"
id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="The id of the log")
label: str = Field(..., description="The label of the log")
log_type: LogType = Field(..., description="The type of the log")
parent_id: str | None = Field(default=None, description="Leave empty for root log")
error: str | None = Field(default=None, description="The error message")
status: LogStatus = Field(..., description="The status of the log")
data: Mapping[str, Any] = Field(..., description="Detailed log data")
metadata: Mapping[LogMetadata, Any] = Field(default={}, description="The metadata of the log")
class AgentResult(BaseModel):
"""
Agent execution result.
"""
text: str = Field(default="", description="The generated text")
files: list[Any] = Field(default_factory=list, description="Files produced during execution")
usage: Any | None = Field(default=None, description="LLM usage statistics")
finish_reason: str | None = Field(default=None, description="Reason for completion")

View File

@ -0,0 +1,468 @@
import json
import logging
from collections.abc import Generator
from copy import deepcopy
from typing import Any, Union
from core.agent.base_agent_runner import BaseAgentRunner
from core.app.apps.base_app_queue_manager import PublishFrom
from core.app.entities.queue_entities import QueueAgentThoughtEvent, QueueMessageEndEvent, QueueMessageFileEvent
from core.file import file_manager
from core.model_runtime.entities import (
AssistantPromptMessage,
LLMResult,
LLMResultChunk,
LLMResultChunkDelta,
LLMUsage,
PromptMessage,
PromptMessageContentType,
SystemPromptMessage,
TextPromptMessageContent,
ToolPromptMessage,
UserPromptMessage,
)
from core.model_runtime.entities.message_entities import ImagePromptMessageContent, PromptMessageContentUnionTypes
from core.prompt.agent_history_prompt_transform import AgentHistoryPromptTransform
from core.tools.entities.tool_entities import ToolInvokeMeta
from core.tools.tool_engine import ToolEngine
from core.workflow.nodes.agent.exc import AgentMaxIterationError
from models.model import Message
logger = logging.getLogger(__name__)
class FunctionCallAgentRunner(BaseAgentRunner):
def run(self, message: Message, query: str, **kwargs: Any) -> Generator[LLMResultChunk, None, None]:
"""
Run FunctionCall agent application
"""
self.query = query
app_generate_entity = self.application_generate_entity
app_config = self.app_config
assert app_config is not None, "app_config is required"
assert app_config.agent is not None, "app_config.agent is required"
# convert tools into ModelRuntime Tool format
tool_instances, prompt_messages_tools = self._init_prompt_tools()
assert app_config.agent
iteration_step = 1
max_iteration_steps = min(app_config.agent.max_iteration, 99) + 1
# continue to run until there is not any tool call
function_call_state = True
llm_usage: dict[str, LLMUsage | None] = {"usage": None}
final_answer = ""
prompt_messages: list = [] # Initialize prompt_messages
# get tracing instance
trace_manager = app_generate_entity.trace_manager
def increase_usage(final_llm_usage_dict: dict[str, LLMUsage | None], usage: LLMUsage):
if not final_llm_usage_dict["usage"]:
final_llm_usage_dict["usage"] = usage
else:
llm_usage = final_llm_usage_dict["usage"]
llm_usage.prompt_tokens += usage.prompt_tokens
llm_usage.completion_tokens += usage.completion_tokens
llm_usage.total_tokens += usage.total_tokens
llm_usage.prompt_price += usage.prompt_price
llm_usage.completion_price += usage.completion_price
llm_usage.total_price += usage.total_price
model_instance = self.model_instance
while function_call_state and iteration_step <= max_iteration_steps:
function_call_state = False
if iteration_step == max_iteration_steps:
# the last iteration, remove all tools
prompt_messages_tools = []
message_file_ids: list[str] = []
agent_thought_id = self.create_agent_thought(
message_id=message.id, message="", tool_name="", tool_input="", messages_ids=message_file_ids
)
# recalc llm max tokens
prompt_messages = self._organize_prompt_messages()
self.recalc_llm_max_tokens(self.model_config, prompt_messages)
# invoke model
chunks: Union[Generator[LLMResultChunk, None, None], LLMResult] = model_instance.invoke_llm(
prompt_messages=prompt_messages,
model_parameters=app_generate_entity.model_conf.parameters,
tools=prompt_messages_tools,
stop=app_generate_entity.model_conf.stop,
stream=self.stream_tool_call,
user=self.user_id,
callbacks=[],
)
tool_calls: list[tuple[str, str, dict[str, Any]]] = []
# save full response
response = ""
# save tool call names and inputs
tool_call_names = ""
tool_call_inputs = ""
current_llm_usage = None
if isinstance(chunks, Generator):
is_first_chunk = True
for chunk in chunks:
if is_first_chunk:
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER
)
is_first_chunk = False
# check if there is any tool call
if self.check_tool_calls(chunk):
function_call_state = True
tool_calls.extend(self.extract_tool_calls(chunk) or [])
tool_call_names = ";".join([tool_call[1] for tool_call in tool_calls])
try:
tool_call_inputs = json.dumps(
{tool_call[1]: tool_call[2] for tool_call in tool_calls}, ensure_ascii=False
)
except TypeError:
# fallback: force ASCII to handle non-serializable objects
tool_call_inputs = json.dumps({tool_call[1]: tool_call[2] for tool_call in tool_calls})
if chunk.delta.message and chunk.delta.message.content:
if isinstance(chunk.delta.message.content, list):
for content in chunk.delta.message.content:
response += content.data
else:
response += str(chunk.delta.message.content)
if chunk.delta.usage:
increase_usage(llm_usage, chunk.delta.usage)
current_llm_usage = chunk.delta.usage
yield chunk
else:
result = chunks
# check if there is any tool call
if self.check_blocking_tool_calls(result):
function_call_state = True
tool_calls.extend(self.extract_blocking_tool_calls(result) or [])
tool_call_names = ";".join([tool_call[1] for tool_call in tool_calls])
try:
tool_call_inputs = json.dumps(
{tool_call[1]: tool_call[2] for tool_call in tool_calls}, ensure_ascii=False
)
except TypeError:
# fallback: force ASCII to handle non-serializable objects
tool_call_inputs = json.dumps({tool_call[1]: tool_call[2] for tool_call in tool_calls})
if result.usage:
increase_usage(llm_usage, result.usage)
current_llm_usage = result.usage
if result.message and result.message.content:
if isinstance(result.message.content, list):
for content in result.message.content:
response += content.data
else:
response += str(result.message.content)
if not result.message.content:
result.message.content = ""
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER
)
yield LLMResultChunk(
model=model_instance.model,
prompt_messages=result.prompt_messages,
system_fingerprint=result.system_fingerprint,
delta=LLMResultChunkDelta(
index=0,
message=result.message,
usage=result.usage,
),
)
assistant_message = AssistantPromptMessage(content=response, tool_calls=[])
if tool_calls:
assistant_message.tool_calls = [
AssistantPromptMessage.ToolCall(
id=tool_call[0],
type="function",
function=AssistantPromptMessage.ToolCall.ToolCallFunction(
name=tool_call[1], arguments=json.dumps(tool_call[2], ensure_ascii=False)
),
)
for tool_call in tool_calls
]
self._current_thoughts.append(assistant_message)
# save thought
self.save_agent_thought(
agent_thought_id=agent_thought_id,
tool_name=tool_call_names,
tool_input=tool_call_inputs,
thought=response,
tool_invoke_meta=None,
observation=None,
answer=response,
messages_ids=[],
llm_usage=current_llm_usage,
)
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER
)
final_answer += response + "\n"
# Check if max iteration is reached and model still wants to call tools
if iteration_step == max_iteration_steps and tool_calls:
raise AgentMaxIterationError(app_config.agent.max_iteration)
# call tools
tool_responses = []
for tool_call_id, tool_call_name, tool_call_args in tool_calls:
tool_instance = tool_instances.get(tool_call_name)
if not tool_instance:
tool_response = {
"tool_call_id": tool_call_id,
"tool_call_name": tool_call_name,
"tool_response": f"there is not a tool named {tool_call_name}",
"meta": ToolInvokeMeta.error_instance(f"there is not a tool named {tool_call_name}").to_dict(),
}
else:
# invoke tool
tool_invoke_response, message_files, tool_invoke_meta = ToolEngine.agent_invoke(
tool=tool_instance,
tool_parameters=tool_call_args,
user_id=self.user_id,
tenant_id=self.tenant_id,
message=self.message,
invoke_from=self.application_generate_entity.invoke_from,
agent_tool_callback=self.agent_callback,
trace_manager=trace_manager,
app_id=self.application_generate_entity.app_config.app_id,
message_id=self.message.id,
conversation_id=self.conversation.id,
)
# publish files
for message_file_id in message_files:
# publish message file
self.queue_manager.publish(
QueueMessageFileEvent(message_file_id=message_file_id), PublishFrom.APPLICATION_MANAGER
)
# add message file ids
message_file_ids.append(message_file_id)
tool_response = {
"tool_call_id": tool_call_id,
"tool_call_name": tool_call_name,
"tool_response": tool_invoke_response,
"meta": tool_invoke_meta.to_dict(),
}
tool_responses.append(tool_response)
if tool_response["tool_response"] is not None:
self._current_thoughts.append(
ToolPromptMessage(
content=str(tool_response["tool_response"]),
tool_call_id=tool_call_id,
name=tool_call_name,
)
)
if len(tool_responses) > 0:
# save agent thought
self.save_agent_thought(
agent_thought_id=agent_thought_id,
tool_name="",
tool_input="",
thought="",
tool_invoke_meta={
tool_response["tool_call_name"]: tool_response["meta"] for tool_response in tool_responses
},
observation={
tool_response["tool_call_name"]: tool_response["tool_response"]
for tool_response in tool_responses
},
answer="",
messages_ids=message_file_ids,
)
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER
)
# update prompt tool
for prompt_tool in prompt_messages_tools:
self.update_prompt_message_tool(tool_instances[prompt_tool.name], prompt_tool)
iteration_step += 1
# publish end event
self.queue_manager.publish(
QueueMessageEndEvent(
llm_result=LLMResult(
model=model_instance.model,
prompt_messages=prompt_messages,
message=AssistantPromptMessage(content=final_answer),
usage=llm_usage["usage"] or LLMUsage.empty_usage(),
system_fingerprint="",
)
),
PublishFrom.APPLICATION_MANAGER,
)
def check_tool_calls(self, llm_result_chunk: LLMResultChunk) -> bool:
"""
Check if there is any tool call in llm result chunk
"""
if llm_result_chunk.delta.message.tool_calls:
return True
return False
def check_blocking_tool_calls(self, llm_result: LLMResult) -> bool:
"""
Check if there is any blocking tool call in llm result
"""
if llm_result.message.tool_calls:
return True
return False
def extract_tool_calls(self, llm_result_chunk: LLMResultChunk) -> list[tuple[str, str, dict[str, Any]]]:
"""
Extract tool calls from llm result chunk
Returns:
List[Tuple[str, str, Dict[str, Any]]]: [(tool_call_id, tool_call_name, tool_call_args)]
"""
tool_calls = []
for prompt_message in llm_result_chunk.delta.message.tool_calls:
args = {}
if prompt_message.function.arguments != "":
args = json.loads(prompt_message.function.arguments)
tool_calls.append(
(
prompt_message.id,
prompt_message.function.name,
args,
)
)
return tool_calls
def extract_blocking_tool_calls(self, llm_result: LLMResult) -> list[tuple[str, str, dict[str, Any]]]:
"""
Extract blocking tool calls from llm result
Returns:
List[Tuple[str, str, Dict[str, Any]]]: [(tool_call_id, tool_call_name, tool_call_args)]
"""
tool_calls = []
for prompt_message in llm_result.message.tool_calls:
args = {}
if prompt_message.function.arguments != "":
args = json.loads(prompt_message.function.arguments)
tool_calls.append(
(
prompt_message.id,
prompt_message.function.name,
args,
)
)
return tool_calls
def _init_system_message(self, prompt_template: str, prompt_messages: list[PromptMessage]) -> list[PromptMessage]:
"""
Initialize system message
"""
if not prompt_messages and prompt_template:
return [
SystemPromptMessage(content=prompt_template),
]
if prompt_messages and not isinstance(prompt_messages[0], SystemPromptMessage) and prompt_template:
prompt_messages.insert(0, SystemPromptMessage(content=prompt_template))
return prompt_messages or []
def _organize_user_query(self, query: str, prompt_messages: list[PromptMessage]) -> list[PromptMessage]:
"""
Organize user query
"""
if self.files:
# get image detail config
image_detail_config = (
self.application_generate_entity.file_upload_config.image_config.detail
if (
self.application_generate_entity.file_upload_config
and self.application_generate_entity.file_upload_config.image_config
)
else None
)
image_detail_config = image_detail_config or ImagePromptMessageContent.DETAIL.LOW
prompt_message_contents: list[PromptMessageContentUnionTypes] = []
for file in self.files:
prompt_message_contents.append(
file_manager.to_prompt_message_content(
file,
image_detail_config=image_detail_config,
)
)
prompt_message_contents.append(TextPromptMessageContent(data=query))
prompt_messages.append(UserPromptMessage(content=prompt_message_contents))
else:
prompt_messages.append(UserPromptMessage(content=query))
return prompt_messages
def _clear_user_prompt_image_messages(self, prompt_messages: list[PromptMessage]) -> list[PromptMessage]:
"""
As for now, gpt supports both fc and vision at the first iteration.
We need to remove the image messages from the prompt messages at the first iteration.
"""
prompt_messages = deepcopy(prompt_messages)
for prompt_message in prompt_messages:
if isinstance(prompt_message, UserPromptMessage):
if isinstance(prompt_message.content, list):
prompt_message.content = "\n".join(
[
content.data
if content.type == PromptMessageContentType.TEXT
else "[image]"
if content.type == PromptMessageContentType.IMAGE
else "[file]"
for content in prompt_message.content
]
)
return prompt_messages
def _organize_prompt_messages(self):
prompt_template = self.app_config.prompt_template.simple_prompt_template or ""
self.history_prompt_messages = self._init_system_message(prompt_template, self.history_prompt_messages)
query_prompt_messages = self._organize_user_query(self.query or "", [])
self.history_prompt_messages = AgentHistoryPromptTransform(
model_config=self.model_config,
prompt_messages=[*query_prompt_messages, *self._current_thoughts],
history_messages=self.history_prompt_messages,
memory=self.memory,
).get_prompt()
prompt_messages = [*self.history_prompt_messages, *query_prompt_messages, *self._current_thoughts]
if len(self._current_thoughts) != 0:
# clear messages after the first iteration
prompt_messages = self._clear_user_prompt_image_messages(prompt_messages)
return prompt_messages

View File

@ -1,55 +0,0 @@
# Agent Patterns
A unified agent pattern module that powers both Agent V2 workflow nodes and agent applications. Strategies share a common execution contract while adapting to model capabilities and tool availability.
## Overview
The module applies a strategy pattern around LLM/tool orchestration. `StrategyFactory` auto-selects the best implementation based on model features or an explicit agent strategy, and each strategy streams logs and usage consistently.
## Key Features
- **Dual strategies**
- `FunctionCallStrategy`: uses native LLM function/tool calling when the model exposes `TOOL_CALL`, `MULTI_TOOL_CALL`, or `STREAM_TOOL_CALL`.
- `ReActStrategy`: ReAct (reasoning + acting) flow driven by `CotAgentOutputParser`, used when function calling is unavailable or explicitly requested.
- **Explicit or auto selection**
- `StrategyFactory.create_strategy` prefers an explicit `AgentEntity.Strategy` (FUNCTION_CALLING or CHAIN_OF_THOUGHT).
- Otherwise it falls back to function calling when tool-call features exist, or ReAct when they do not.
- **Unified execution contract**
- `AgentPattern.run` yields streaming `AgentLog` entries and `LLMResultChunk` data, returning an `AgentResult` with text, files, usage, and `finish_reason`.
- Iterations are configurable and hard-capped at 99 rounds; the last round forces a final answer by withholding tools.
- **Tool handling and hooks**
- Tools convert to `PromptMessageTool` objects before invocation.
- Optional `tool_invoke_hook` lets callers override tool execution (e.g., agent apps) while workflow runs use `ToolEngine.generic_invoke`.
- Tool outputs support text, links, JSON, variables, blobs, retriever resources, and file attachments; `target=="self"` files are reloaded into model context, others are returned as outputs.
- **File-aware arguments**
- Tool args accept `[File: <id>]` or `[Files: <id1, id2>]` placeholders that resolve to `File` objects before invocation, enabling models to reference uploaded files safely.
- **ReAct prompt shaping**
- System prompts replace `{{instruction}}`, `{{tools}}`, and `{{tool_names}}` placeholders.
- Adds `Observation` to stop sequences and appends scratchpad text so the model sees prior Thought/Action/Observation history.
- **Observability and accounting**
- Standardized `AgentLog` entries for rounds, model thoughts, and tool calls, including usage aggregation (`LLMUsage`) across streaming and non-streaming paths.
## Architecture
```
agent/patterns/
├── base.py # Shared utilities: logging, usage, tool invocation, file handling
├── function_call.py # Native function-calling loop with tool execution
├── react.py # ReAct loop with CoT parsing and scratchpad wiring
└── strategy_factory.py # Strategy selection by model features or explicit override
```
## Usage
- For auto-selection:
- Call `StrategyFactory.create_strategy(model_features, model_instance, context, tools, files, ...)` and run the returned strategy with prompt messages and model params.
- For explicit behavior:
- Pass `agent_strategy=AgentEntity.Strategy.FUNCTION_CALLING` to force native calls (falls back to ReAct if unsupported), or `CHAIN_OF_THOUGHT` to force ReAct.
- Both strategies stream chunks and logs; collect the generator output until it returns an `AgentResult`.
## Integration Points
- **Model runtime**: delegates to `ModelInstance.invoke_llm` for both streaming and non-streaming calls.
- **Tool system**: defaults to `ToolEngine.generic_invoke`, with `tool_invoke_hook` for custom callers.
- **Files**: flows through `File` objects for tool inputs/outputs and model-context attachments.
- **Execution context**: `ExecutionContext` fields (user/app/conversation/message) propagate to tool invocations and logging.

View File

@ -1,19 +0,0 @@
"""Agent patterns module.
This module provides different strategies for agent execution:
- FunctionCallStrategy: Uses native function/tool calling
- ReActStrategy: Uses ReAct (Reasoning + Acting) approach
- StrategyFactory: Factory for creating strategies based on model features
"""
from .base import AgentPattern
from .function_call import FunctionCallStrategy
from .react import ReActStrategy
from .strategy_factory import StrategyFactory
__all__ = [
"AgentPattern",
"FunctionCallStrategy",
"ReActStrategy",
"StrategyFactory",
]

View File

@ -1,474 +0,0 @@
"""Base class for agent strategies."""
from __future__ import annotations
import json
import re
import time
from abc import ABC, abstractmethod
from collections.abc import Callable, Generator
from typing import TYPE_CHECKING, Any
from core.agent.entities import AgentLog, AgentResult, ExecutionContext
from core.file import File
from core.model_manager import ModelInstance
from core.model_runtime.entities import (
AssistantPromptMessage,
LLMResult,
LLMResultChunk,
LLMResultChunkDelta,
PromptMessage,
PromptMessageTool,
)
from core.model_runtime.entities.llm_entities import LLMUsage
from core.model_runtime.entities.message_entities import TextPromptMessageContent
from core.tools.entities.tool_entities import ToolInvokeMessage, ToolInvokeMeta
if TYPE_CHECKING:
from core.tools.__base.tool import Tool
# Type alias for tool invoke hook
# Returns: (response_content, message_file_ids, tool_invoke_meta)
ToolInvokeHook = Callable[["Tool", dict[str, Any], str], tuple[str, list[str], ToolInvokeMeta]]
class AgentPattern(ABC):
"""Base class for agent execution strategies."""
def __init__(
self,
model_instance: ModelInstance,
tools: list[Tool],
context: ExecutionContext,
max_iterations: int = 10,
workflow_call_depth: int = 0,
files: list[File] = [],
tool_invoke_hook: ToolInvokeHook | None = None,
):
"""Initialize the agent strategy."""
self.model_instance = model_instance
self.tools = tools
self.context = context
self.max_iterations = min(max_iterations, 99) # Cap at 99 iterations
self.workflow_call_depth = workflow_call_depth
self.files: list[File] = files
self.tool_invoke_hook = tool_invoke_hook
@abstractmethod
def run(
self,
prompt_messages: list[PromptMessage],
model_parameters: dict[str, Any],
stop: list[str] = [],
stream: bool = True,
) -> Generator[LLMResultChunk | AgentLog, None, AgentResult]:
"""Execute the agent strategy."""
pass
def _accumulate_usage(self, total_usage: dict[str, Any], delta_usage: LLMUsage) -> None:
"""Accumulate LLM usage statistics."""
if not total_usage.get("usage"):
# Create a copy to avoid modifying the original
total_usage["usage"] = LLMUsage(
prompt_tokens=delta_usage.prompt_tokens,
prompt_unit_price=delta_usage.prompt_unit_price,
prompt_price_unit=delta_usage.prompt_price_unit,
prompt_price=delta_usage.prompt_price,
completion_tokens=delta_usage.completion_tokens,
completion_unit_price=delta_usage.completion_unit_price,
completion_price_unit=delta_usage.completion_price_unit,
completion_price=delta_usage.completion_price,
total_tokens=delta_usage.total_tokens,
total_price=delta_usage.total_price,
currency=delta_usage.currency,
latency=delta_usage.latency,
)
else:
current: LLMUsage = total_usage["usage"]
current.prompt_tokens += delta_usage.prompt_tokens
current.completion_tokens += delta_usage.completion_tokens
current.total_tokens += delta_usage.total_tokens
current.prompt_price += delta_usage.prompt_price
current.completion_price += delta_usage.completion_price
current.total_price += delta_usage.total_price
def _extract_content(self, content: Any) -> str:
"""Extract text content from message content."""
if isinstance(content, list):
# Content items are PromptMessageContentUnionTypes
text_parts = []
for c in content:
# Check if it's a TextPromptMessageContent (which has data attribute)
if isinstance(c, TextPromptMessageContent):
text_parts.append(c.data)
return "".join(text_parts)
return str(content)
def _has_tool_calls(self, chunk: LLMResultChunk) -> bool:
"""Check if chunk contains tool calls."""
# LLMResultChunk always has delta attribute
return bool(chunk.delta.message and chunk.delta.message.tool_calls)
def _has_tool_calls_result(self, result: LLMResult) -> bool:
"""Check if result contains tool calls (non-streaming)."""
# LLMResult always has message attribute
return bool(result.message and result.message.tool_calls)
def _extract_tool_calls(self, chunk: LLMResultChunk) -> list[tuple[str, str, dict[str, Any]]]:
"""Extract tool calls from streaming chunk."""
tool_calls: list[tuple[str, str, dict[str, Any]]] = []
if chunk.delta.message and chunk.delta.message.tool_calls:
for tool_call in chunk.delta.message.tool_calls:
if tool_call.function:
try:
args = json.loads(tool_call.function.arguments) if tool_call.function.arguments else {}
except json.JSONDecodeError:
args = {}
tool_calls.append((tool_call.id or "", tool_call.function.name, args))
return tool_calls
def _extract_tool_calls_result(self, result: LLMResult) -> list[tuple[str, str, dict[str, Any]]]:
"""Extract tool calls from non-streaming result."""
tool_calls = []
if result.message and result.message.tool_calls:
for tool_call in result.message.tool_calls:
if tool_call.function:
try:
args = json.loads(tool_call.function.arguments) if tool_call.function.arguments else {}
except json.JSONDecodeError:
args = {}
tool_calls.append((tool_call.id or "", tool_call.function.name, args))
return tool_calls
def _extract_text_from_message(self, message: PromptMessage) -> str:
"""Extract text content from a prompt message."""
# PromptMessage always has content attribute
content = message.content
if isinstance(content, str):
return content
elif isinstance(content, list):
# Extract text from content list
text_parts = []
for item in content:
if isinstance(item, TextPromptMessageContent):
text_parts.append(item.data)
return " ".join(text_parts)
return ""
def _get_tool_metadata(self, tool_instance: Tool) -> dict[AgentLog.LogMetadata, Any]:
"""Get metadata for a tool including provider and icon info."""
from core.tools.tool_manager import ToolManager
metadata: dict[AgentLog.LogMetadata, Any] = {}
if tool_instance.entity and tool_instance.entity.identity:
identity = tool_instance.entity.identity
if identity.provider:
metadata[AgentLog.LogMetadata.PROVIDER] = identity.provider
# Get icon using ToolManager for proper URL generation
tenant_id = self.context.tenant_id
if tenant_id and identity.provider:
try:
provider_type = tool_instance.tool_provider_type()
icon = ToolManager.get_tool_icon(tenant_id, provider_type, identity.provider)
if isinstance(icon, str):
metadata[AgentLog.LogMetadata.ICON] = icon
elif isinstance(icon, dict):
# Handle icon dict with background/content or light/dark variants
metadata[AgentLog.LogMetadata.ICON] = icon
except Exception:
# Fallback to identity.icon if ToolManager fails
if identity.icon:
metadata[AgentLog.LogMetadata.ICON] = identity.icon
elif identity.icon:
metadata[AgentLog.LogMetadata.ICON] = identity.icon
return metadata
def _create_log(
self,
label: str,
log_type: AgentLog.LogType,
status: AgentLog.LogStatus,
data: dict[str, Any] | None = None,
parent_id: str | None = None,
extra_metadata: dict[AgentLog.LogMetadata, Any] | None = None,
) -> AgentLog:
"""Create a new AgentLog with standard metadata."""
metadata: dict[AgentLog.LogMetadata, Any] = {
AgentLog.LogMetadata.STARTED_AT: time.perf_counter(),
}
if extra_metadata:
metadata.update(extra_metadata)
return AgentLog(
label=label,
log_type=log_type,
status=status,
data=data or {},
parent_id=parent_id,
metadata=metadata,
)
def _finish_log(
self,
log: AgentLog,
data: dict[str, Any] | None = None,
usage: LLMUsage | None = None,
) -> AgentLog:
"""Finish an AgentLog by updating its status and metadata."""
log.status = AgentLog.LogStatus.SUCCESS
if data is not None:
log.data = data
# Calculate elapsed time
started_at = log.metadata.get(AgentLog.LogMetadata.STARTED_AT, time.perf_counter())
finished_at = time.perf_counter()
# Update metadata
log.metadata = {
**log.metadata,
AgentLog.LogMetadata.FINISHED_AT: finished_at,
# Calculate elapsed time in seconds
AgentLog.LogMetadata.ELAPSED_TIME: round(finished_at - started_at, 4),
}
# Add usage information if provided
if usage:
log.metadata.update(
{
AgentLog.LogMetadata.TOTAL_PRICE: usage.total_price,
AgentLog.LogMetadata.CURRENCY: usage.currency,
AgentLog.LogMetadata.TOTAL_TOKENS: usage.total_tokens,
AgentLog.LogMetadata.LLM_USAGE: usage,
}
)
return log
def _replace_file_references(self, tool_args: dict[str, Any]) -> dict[str, Any]:
"""
Replace file references in tool arguments with actual File objects.
Args:
tool_args: Dictionary of tool arguments
Returns:
Updated tool arguments with file references replaced
"""
# Process each argument in the dictionary
processed_args: dict[str, Any] = {}
for key, value in tool_args.items():
processed_args[key] = self._process_file_reference(value)
return processed_args
def _process_file_reference(self, data: Any) -> Any:
"""
Recursively process data to replace file references.
Supports both single file [File: file_id] and multiple files [Files: file_id1, file_id2, ...].
Args:
data: The data to process (can be dict, list, str, or other types)
Returns:
Processed data with file references replaced
"""
single_file_pattern = re.compile(r"^\[File:\s*([^\]]+)\]$")
multiple_files_pattern = re.compile(r"^\[Files:\s*([^\]]+)\]$")
if isinstance(data, dict):
# Process dictionary recursively
return {key: self._process_file_reference(value) for key, value in data.items()}
elif isinstance(data, list):
# Process list recursively
return [self._process_file_reference(item) for item in data]
elif isinstance(data, str):
# Check for single file pattern [File: file_id]
single_match = single_file_pattern.match(data.strip())
if single_match:
file_id = single_match.group(1).strip()
# Find the file in self.files
for file in self.files:
if file.id and str(file.id) == file_id:
return file
# If file not found, return original value
return data
# Check for multiple files pattern [Files: file_id1, file_id2, ...]
multiple_match = multiple_files_pattern.match(data.strip())
if multiple_match:
file_ids_str = multiple_match.group(1).strip()
# Split by comma and strip whitespace
file_ids = [fid.strip() for fid in file_ids_str.split(",")]
# Find all matching files
matched_files: list[File] = []
for file_id in file_ids:
for file in self.files:
if file.id and str(file.id) == file_id:
matched_files.append(file)
break
# Return list of files if any were found, otherwise return original
return matched_files or data
return data
else:
# Return other types as-is
return data
def _create_text_chunk(self, text: str, prompt_messages: list[PromptMessage]) -> LLMResultChunk:
"""Create a text chunk for streaming."""
return LLMResultChunk(
model=self.model_instance.model,
prompt_messages=prompt_messages,
delta=LLMResultChunkDelta(
index=0,
message=AssistantPromptMessage(content=text),
usage=None,
),
system_fingerprint="",
)
def _invoke_tool(
self,
tool_instance: Tool,
tool_args: dict[str, Any],
tool_name: str,
) -> tuple[str, list[File], ToolInvokeMeta | None]:
"""
Invoke a tool and collect its response.
Args:
tool_instance: The tool instance to invoke
tool_args: Tool arguments
tool_name: Name of the tool
Returns:
Tuple of (response_content, tool_files, tool_invoke_meta)
"""
# Process tool_args to replace file references with actual File objects
tool_args = self._replace_file_references(tool_args)
# If a tool invoke hook is set, use it instead of generic_invoke
if self.tool_invoke_hook:
response_content, _, tool_invoke_meta = self.tool_invoke_hook(tool_instance, tool_args, tool_name)
# Note: message_file_ids are stored in DB, we don't convert them to File objects here
# The caller (AgentAppRunner) handles file publishing
return response_content, [], tool_invoke_meta
# Default: use generic_invoke for workflow scenarios
# Import here to avoid circular import
from core.tools.tool_engine import DifyWorkflowCallbackHandler, ToolEngine
tool_response = ToolEngine().generic_invoke(
tool=tool_instance,
tool_parameters=tool_args,
user_id=self.context.user_id or "",
workflow_tool_callback=DifyWorkflowCallbackHandler(),
workflow_call_depth=self.workflow_call_depth,
app_id=self.context.app_id,
conversation_id=self.context.conversation_id,
message_id=self.context.message_id,
)
# Collect response and files
response_content = ""
tool_files: list[File] = []
for response in tool_response:
if response.type == ToolInvokeMessage.MessageType.TEXT:
assert isinstance(response.message, ToolInvokeMessage.TextMessage)
response_content += response.message.text
elif response.type == ToolInvokeMessage.MessageType.LINK:
# Handle link messages
if isinstance(response.message, ToolInvokeMessage.TextMessage):
response_content += f"[Link: {response.message.text}]"
elif response.type == ToolInvokeMessage.MessageType.IMAGE:
# Handle image URL messages
if isinstance(response.message, ToolInvokeMessage.TextMessage):
response_content += f"[Image: {response.message.text}]"
elif response.type == ToolInvokeMessage.MessageType.IMAGE_LINK:
# Handle image link messages
if isinstance(response.message, ToolInvokeMessage.TextMessage):
response_content += f"[Image: {response.message.text}]"
elif response.type == ToolInvokeMessage.MessageType.BINARY_LINK:
# Handle binary file link messages
if isinstance(response.message, ToolInvokeMessage.TextMessage):
filename = response.meta.get("filename", "file") if response.meta else "file"
response_content += f"[File: {filename} - {response.message.text}]"
elif response.type == ToolInvokeMessage.MessageType.JSON:
# Handle JSON messages
if isinstance(response.message, ToolInvokeMessage.JsonMessage):
response_content += json.dumps(response.message.json_object, ensure_ascii=False, indent=2)
elif response.type == ToolInvokeMessage.MessageType.BLOB:
# Handle blob messages - convert to text representation
if isinstance(response.message, ToolInvokeMessage.BlobMessage):
mime_type = (
response.meta.get("mime_type", "application/octet-stream")
if response.meta
else "application/octet-stream"
)
size = len(response.message.blob)
response_content += f"[Binary data: {mime_type}, size: {size} bytes]"
elif response.type == ToolInvokeMessage.MessageType.VARIABLE:
# Handle variable messages
if isinstance(response.message, ToolInvokeMessage.VariableMessage):
var_name = response.message.variable_name
var_value = response.message.variable_value
if isinstance(var_value, str):
response_content += var_value
else:
response_content += f"[Variable {var_name}: {json.dumps(var_value, ensure_ascii=False)}]"
elif response.type == ToolInvokeMessage.MessageType.BLOB_CHUNK:
# Handle blob chunk messages - these are parts of a larger blob
if isinstance(response.message, ToolInvokeMessage.BlobChunkMessage):
response_content += f"[Blob chunk {response.message.sequence}: {len(response.message.blob)} bytes]"
elif response.type == ToolInvokeMessage.MessageType.RETRIEVER_RESOURCES:
# Handle retriever resources messages
if isinstance(response.message, ToolInvokeMessage.RetrieverResourceMessage):
response_content += response.message.context
elif response.type == ToolInvokeMessage.MessageType.FILE:
# Extract file from meta
if response.meta and "file" in response.meta:
file = response.meta["file"]
if isinstance(file, File):
# Check if file is for model or tool output
if response.meta.get("target") == "self":
# File is for model - add to files for next prompt
self.files.append(file)
response_content += f"File '{file.filename}' has been loaded into your context."
else:
# File is tool output
tool_files.append(file)
return response_content, tool_files, None
def _find_tool_by_name(self, tool_name: str) -> Tool | None:
"""Find a tool instance by its name."""
for tool in self.tools:
if tool.entity.identity.name == tool_name:
return tool
return None
def _convert_tools_to_prompt_format(self) -> list[PromptMessageTool]:
"""Convert tools to prompt message format."""
prompt_tools: list[PromptMessageTool] = []
for tool in self.tools:
prompt_tools.append(tool.to_prompt_message_tool())
return prompt_tools
def _update_usage_with_empty(self, llm_usage: dict[str, Any]) -> None:
"""Initialize usage tracking with empty usage if not set."""
if "usage" not in llm_usage or llm_usage["usage"] is None:
llm_usage["usage"] = LLMUsage.empty_usage()

View File

@ -1,299 +0,0 @@
"""Function Call strategy implementation."""
import json
from collections.abc import Generator
from typing import Any, Union
from core.agent.entities import AgentLog, AgentResult
from core.file import File
from core.model_runtime.entities import (
AssistantPromptMessage,
LLMResult,
LLMResultChunk,
LLMResultChunkDelta,
LLMUsage,
PromptMessage,
PromptMessageTool,
ToolPromptMessage,
)
from core.tools.entities.tool_entities import ToolInvokeMeta
from .base import AgentPattern
class FunctionCallStrategy(AgentPattern):
"""Function Call strategy using model's native tool calling capability."""
def run(
self,
prompt_messages: list[PromptMessage],
model_parameters: dict[str, Any],
stop: list[str] = [],
stream: bool = True,
) -> Generator[LLMResultChunk | AgentLog, None, AgentResult]:
"""Execute the function call agent strategy."""
# Convert tools to prompt format
prompt_tools: list[PromptMessageTool] = self._convert_tools_to_prompt_format()
# Initialize tracking
iteration_step: int = 1
max_iterations: int = self.max_iterations + 1
function_call_state: bool = True
total_usage: dict[str, LLMUsage | None] = {"usage": None}
messages: list[PromptMessage] = list(prompt_messages) # Create mutable copy
final_text: str = ""
finish_reason: str | None = None
output_files: list[File] = [] # Track files produced by tools
while function_call_state and iteration_step <= max_iterations:
function_call_state = False
round_log = self._create_log(
label=f"ROUND {iteration_step}",
log_type=AgentLog.LogType.ROUND,
status=AgentLog.LogStatus.START,
data={},
)
yield round_log
# On last iteration, remove tools to force final answer
current_tools: list[PromptMessageTool] = [] if iteration_step == max_iterations else prompt_tools
model_log = self._create_log(
label=f"{self.model_instance.model} Thought",
log_type=AgentLog.LogType.THOUGHT,
status=AgentLog.LogStatus.START,
data={},
parent_id=round_log.id,
extra_metadata={
AgentLog.LogMetadata.PROVIDER: self.model_instance.provider,
},
)
yield model_log
# Track usage for this round only
round_usage: dict[str, LLMUsage | None] = {"usage": None}
# Invoke model
chunks: Union[Generator[LLMResultChunk, None, None], LLMResult] = self.model_instance.invoke_llm(
prompt_messages=messages,
model_parameters=model_parameters,
tools=current_tools,
stop=stop,
stream=stream,
user=self.context.user_id,
callbacks=[],
)
# Process response
tool_calls, response_content, chunk_finish_reason = yield from self._handle_chunks(
chunks, round_usage, model_log
)
messages.append(self._create_assistant_message(response_content, tool_calls))
# Accumulate to total usage
round_usage_value = round_usage.get("usage")
if round_usage_value:
self._accumulate_usage(total_usage, round_usage_value)
# Update final text if no tool calls (this is likely the final answer)
if not tool_calls:
final_text = response_content
# Update finish reason
if chunk_finish_reason:
finish_reason = chunk_finish_reason
# Process tool calls
tool_outputs: dict[str, str] = {}
if tool_calls:
function_call_state = True
# Execute tools
for tool_call_id, tool_name, tool_args in tool_calls:
tool_response, tool_files, _ = yield from self._handle_tool_call(
tool_name, tool_args, tool_call_id, messages, round_log
)
tool_outputs[tool_name] = tool_response
# Track files produced by tools
output_files.extend(tool_files)
yield self._finish_log(
round_log,
data={
"llm_result": response_content,
"tool_calls": [
{"name": tc[1], "args": tc[2], "output": tool_outputs.get(tc[1], "")} for tc in tool_calls
]
if tool_calls
else [],
"final_answer": final_text if not function_call_state else None,
},
usage=round_usage.get("usage"),
)
iteration_step += 1
# Return final result
from core.agent.entities import AgentResult
return AgentResult(
text=final_text,
files=output_files,
usage=total_usage.get("usage") or LLMUsage.empty_usage(),
finish_reason=finish_reason,
)
def _handle_chunks(
self,
chunks: Union[Generator[LLMResultChunk, None, None], LLMResult],
llm_usage: dict[str, LLMUsage | None],
start_log: AgentLog,
) -> Generator[
LLMResultChunk | AgentLog,
None,
tuple[list[tuple[str, str, dict[str, Any]]], str, str | None],
]:
"""Handle LLM response chunks and extract tool calls and content.
Returns a tuple of (tool_calls, response_content, finish_reason).
"""
tool_calls: list[tuple[str, str, dict[str, Any]]] = []
response_content: str = ""
finish_reason: str | None = None
if isinstance(chunks, Generator):
# Streaming response
for chunk in chunks:
# Extract tool calls
if self._has_tool_calls(chunk):
tool_calls.extend(self._extract_tool_calls(chunk))
# Extract content
if chunk.delta.message and chunk.delta.message.content:
response_content += self._extract_content(chunk.delta.message.content)
# Track usage
if chunk.delta.usage:
self._accumulate_usage(llm_usage, chunk.delta.usage)
# Capture finish reason
if chunk.delta.finish_reason:
finish_reason = chunk.delta.finish_reason
yield chunk
else:
# Non-streaming response
result: LLMResult = chunks
if self._has_tool_calls_result(result):
tool_calls.extend(self._extract_tool_calls_result(result))
if result.message and result.message.content:
response_content += self._extract_content(result.message.content)
if result.usage:
self._accumulate_usage(llm_usage, result.usage)
# Convert to streaming format
yield LLMResultChunk(
model=result.model,
prompt_messages=result.prompt_messages,
delta=LLMResultChunkDelta(index=0, message=result.message, usage=result.usage),
)
yield self._finish_log(
start_log,
data={
"result": response_content,
},
usage=llm_usage.get("usage"),
)
return tool_calls, response_content, finish_reason
def _create_assistant_message(
self, content: str, tool_calls: list[tuple[str, str, dict[str, Any]]] | None = None
) -> AssistantPromptMessage:
"""Create assistant message with tool calls."""
if tool_calls is None:
return AssistantPromptMessage(content=content)
return AssistantPromptMessage(
content=content or "",
tool_calls=[
AssistantPromptMessage.ToolCall(
id=tc[0],
type="function",
function=AssistantPromptMessage.ToolCall.ToolCallFunction(name=tc[1], arguments=json.dumps(tc[2])),
)
for tc in tool_calls
],
)
def _handle_tool_call(
self,
tool_name: str,
tool_args: dict[str, Any],
tool_call_id: str,
messages: list[PromptMessage],
round_log: AgentLog,
) -> Generator[AgentLog, None, tuple[str, list[File], ToolInvokeMeta | None]]:
"""Handle a single tool call and return response with files and meta."""
# Find tool
tool_instance = self._find_tool_by_name(tool_name)
if not tool_instance:
raise ValueError(f"Tool {tool_name} not found")
# Get tool metadata (provider, icon, etc.)
tool_metadata = self._get_tool_metadata(tool_instance)
# Create tool call log
tool_call_log = self._create_log(
label=f"CALL {tool_name}",
log_type=AgentLog.LogType.TOOL_CALL,
status=AgentLog.LogStatus.START,
data={
"tool_call_id": tool_call_id,
"tool_name": tool_name,
"tool_args": tool_args,
},
parent_id=round_log.id,
extra_metadata=tool_metadata,
)
yield tool_call_log
# Invoke tool using base class method with error handling
try:
response_content, tool_files, tool_invoke_meta = self._invoke_tool(tool_instance, tool_args, tool_name)
yield self._finish_log(
tool_call_log,
data={
**tool_call_log.data,
"output": response_content,
"files": len(tool_files),
"meta": tool_invoke_meta.to_dict() if tool_invoke_meta else None,
},
)
final_content = response_content or "Tool executed successfully"
# Add tool response to messages
messages.append(
ToolPromptMessage(
content=final_content,
tool_call_id=tool_call_id,
name=tool_name,
)
)
return response_content, tool_files, tool_invoke_meta
except Exception as e:
# Tool invocation failed, yield error log
error_message = str(e)
tool_call_log.status = AgentLog.LogStatus.ERROR
tool_call_log.error = error_message
tool_call_log.data = {
**tool_call_log.data,
"error": error_message,
}
yield tool_call_log
# Add error message to conversation
error_content = f"Tool execution failed: {error_message}"
messages.append(
ToolPromptMessage(
content=error_content,
tool_call_id=tool_call_id,
name=tool_name,
)
)
return error_content, [], None

View File

@ -1,418 +0,0 @@
"""ReAct strategy implementation."""
from __future__ import annotations
import json
from collections.abc import Generator
from typing import TYPE_CHECKING, Any, Union
from core.agent.entities import AgentLog, AgentResult, AgentScratchpadUnit, ExecutionContext
from core.agent.output_parser.cot_output_parser import CotAgentOutputParser
from core.file import File
from core.model_manager import ModelInstance
from core.model_runtime.entities import (
AssistantPromptMessage,
LLMResult,
LLMResultChunk,
LLMResultChunkDelta,
PromptMessage,
SystemPromptMessage,
)
from .base import AgentPattern, ToolInvokeHook
if TYPE_CHECKING:
from core.tools.__base.tool import Tool
class ReActStrategy(AgentPattern):
"""ReAct strategy using reasoning and acting approach."""
def __init__(
self,
model_instance: ModelInstance,
tools: list[Tool],
context: ExecutionContext,
max_iterations: int = 10,
workflow_call_depth: int = 0,
files: list[File] = [],
tool_invoke_hook: ToolInvokeHook | None = None,
instruction: str = "",
):
"""Initialize the ReAct strategy with instruction support."""
super().__init__(
model_instance=model_instance,
tools=tools,
context=context,
max_iterations=max_iterations,
workflow_call_depth=workflow_call_depth,
files=files,
tool_invoke_hook=tool_invoke_hook,
)
self.instruction = instruction
def run(
self,
prompt_messages: list[PromptMessage],
model_parameters: dict[str, Any],
stop: list[str] = [],
stream: bool = True,
) -> Generator[LLMResultChunk | AgentLog, None, AgentResult]:
"""Execute the ReAct agent strategy."""
# Initialize tracking
agent_scratchpad: list[AgentScratchpadUnit] = []
iteration_step: int = 1
max_iterations: int = self.max_iterations + 1
react_state: bool = True
total_usage: dict[str, Any] = {"usage": None}
output_files: list[File] = [] # Track files produced by tools
final_text: str = ""
finish_reason: str | None = None
# Add "Observation" to stop sequences
if "Observation" not in stop:
stop = stop.copy()
stop.append("Observation")
while react_state and iteration_step <= max_iterations:
react_state = False
round_log = self._create_log(
label=f"ROUND {iteration_step}",
log_type=AgentLog.LogType.ROUND,
status=AgentLog.LogStatus.START,
data={},
)
yield round_log
# Build prompt with/without tools based on iteration
include_tools = iteration_step < max_iterations
current_messages = self._build_prompt_with_react_format(
prompt_messages, agent_scratchpad, include_tools, self.instruction
)
model_log = self._create_log(
label=f"{self.model_instance.model} Thought",
log_type=AgentLog.LogType.THOUGHT,
status=AgentLog.LogStatus.START,
data={},
parent_id=round_log.id,
extra_metadata={
AgentLog.LogMetadata.PROVIDER: self.model_instance.provider,
},
)
yield model_log
# Track usage for this round only
round_usage: dict[str, Any] = {"usage": None}
# Use current messages directly (files are handled by base class if needed)
messages_to_use = current_messages
# Invoke model
chunks: Union[Generator[LLMResultChunk, None, None], LLMResult] = self.model_instance.invoke_llm(
prompt_messages=messages_to_use,
model_parameters=model_parameters,
stop=stop,
stream=stream,
user=self.context.user_id or "",
callbacks=[],
)
# Process response
scratchpad, chunk_finish_reason = yield from self._handle_chunks(
chunks, round_usage, model_log, current_messages
)
agent_scratchpad.append(scratchpad)
# Accumulate to total usage
round_usage_value = round_usage.get("usage")
if round_usage_value:
self._accumulate_usage(total_usage, round_usage_value)
# Update finish reason
if chunk_finish_reason:
finish_reason = chunk_finish_reason
# Check if we have an action to execute
if scratchpad.action and scratchpad.action.action_name.lower() != "final answer":
react_state = True
# Execute tool
observation, tool_files = yield from self._handle_tool_call(
scratchpad.action, current_messages, round_log
)
scratchpad.observation = observation
# Track files produced by tools
output_files.extend(tool_files)
# Add observation to scratchpad for display
yield self._create_text_chunk(f"\nObservation: {observation}\n", current_messages)
else:
# Extract final answer
if scratchpad.action and scratchpad.action.action_input:
final_answer = scratchpad.action.action_input
if isinstance(final_answer, dict):
final_answer = json.dumps(final_answer, ensure_ascii=False)
final_text = str(final_answer)
elif scratchpad.thought:
# If no action but we have thought, use thought as final answer
final_text = scratchpad.thought
yield self._finish_log(
round_log,
data={
"thought": scratchpad.thought,
"action": scratchpad.action_str if scratchpad.action else None,
"observation": scratchpad.observation or None,
"final_answer": final_text if not react_state else None,
},
usage=round_usage.get("usage"),
)
iteration_step += 1
# Return final result
from core.agent.entities import AgentResult
return AgentResult(
text=final_text, files=output_files, usage=total_usage.get("usage"), finish_reason=finish_reason
)
def _build_prompt_with_react_format(
self,
original_messages: list[PromptMessage],
agent_scratchpad: list[AgentScratchpadUnit],
include_tools: bool = True,
instruction: str = "",
) -> list[PromptMessage]:
"""Build prompt messages with ReAct format."""
# Copy messages to avoid modifying original
messages = list(original_messages)
# Find and update the system prompt that should already exist
system_prompt_found = False
for i, msg in enumerate(messages):
if isinstance(msg, SystemPromptMessage):
system_prompt_found = True
# The system prompt from frontend already has the template, just replace placeholders
# Format tools
tools_str = ""
tool_names = []
if include_tools and self.tools:
# Convert tools to prompt message tools format
prompt_tools = [tool.to_prompt_message_tool() for tool in self.tools]
tool_names = [tool.name for tool in prompt_tools]
# Format tools as JSON for comprehensive information
from core.model_runtime.utils.encoders import jsonable_encoder
tools_str = json.dumps(jsonable_encoder(prompt_tools), indent=2)
tool_names_str = ", ".join(f'"{name}"' for name in tool_names)
else:
tools_str = "No tools available"
tool_names_str = ""
# Replace placeholders in the existing system prompt
updated_content = msg.content
assert isinstance(updated_content, str)
updated_content = updated_content.replace("{{instruction}}", instruction)
updated_content = updated_content.replace("{{tools}}", tools_str)
updated_content = updated_content.replace("{{tool_names}}", tool_names_str)
# Create new SystemPromptMessage with updated content
messages[i] = SystemPromptMessage(content=updated_content)
break
# If no system prompt found, that's unexpected but add scratchpad anyway
if not system_prompt_found:
# This shouldn't happen if frontend is working correctly
pass
# Format agent scratchpad
scratchpad_str = ""
if agent_scratchpad:
scratchpad_parts: list[str] = []
for unit in agent_scratchpad:
if unit.thought:
scratchpad_parts.append(f"Thought: {unit.thought}")
if unit.action_str:
scratchpad_parts.append(f"Action:\n```\n{unit.action_str}\n```")
if unit.observation:
scratchpad_parts.append(f"Observation: {unit.observation}")
scratchpad_str = "\n".join(scratchpad_parts)
# If there's a scratchpad, append it to the last message
if scratchpad_str:
messages.append(AssistantPromptMessage(content=scratchpad_str))
return messages
def _handle_chunks(
self,
chunks: Union[Generator[LLMResultChunk, None, None], LLMResult],
llm_usage: dict[str, Any],
model_log: AgentLog,
current_messages: list[PromptMessage],
) -> Generator[
LLMResultChunk | AgentLog,
None,
tuple[AgentScratchpadUnit, str | None],
]:
"""Handle LLM response chunks and extract action/thought.
Returns a tuple of (scratchpad_unit, finish_reason).
"""
usage_dict: dict[str, Any] = {}
# Convert non-streaming to streaming format if needed
if isinstance(chunks, LLMResult):
# Create a generator from the LLMResult
def result_to_chunks() -> Generator[LLMResultChunk, None, None]:
yield LLMResultChunk(
model=chunks.model,
prompt_messages=chunks.prompt_messages,
delta=LLMResultChunkDelta(
index=0,
message=chunks.message,
usage=chunks.usage,
finish_reason=None, # LLMResult doesn't have finish_reason, only streaming chunks do
),
system_fingerprint=chunks.system_fingerprint or "",
)
streaming_chunks = result_to_chunks()
else:
streaming_chunks = chunks
react_chunks = CotAgentOutputParser.handle_react_stream_output(streaming_chunks, usage_dict)
# Initialize scratchpad unit
scratchpad = AgentScratchpadUnit(
agent_response="",
thought="",
action_str="",
observation="",
action=None,
)
finish_reason: str | None = None
# Process chunks
for chunk in react_chunks:
if isinstance(chunk, AgentScratchpadUnit.Action):
# Action detected
action_str = json.dumps(chunk.model_dump())
scratchpad.agent_response = (scratchpad.agent_response or "") + action_str
scratchpad.action_str = action_str
scratchpad.action = chunk
yield self._create_text_chunk(json.dumps(chunk.model_dump()), current_messages)
else:
# Text chunk
chunk_text = str(chunk)
scratchpad.agent_response = (scratchpad.agent_response or "") + chunk_text
scratchpad.thought = (scratchpad.thought or "") + chunk_text
yield self._create_text_chunk(chunk_text, current_messages)
# Update usage
if usage_dict.get("usage"):
if llm_usage.get("usage"):
self._accumulate_usage(llm_usage, usage_dict["usage"])
else:
llm_usage["usage"] = usage_dict["usage"]
# Clean up thought
scratchpad.thought = (scratchpad.thought or "").strip() or "I am thinking about how to help you"
# Finish model log
yield self._finish_log(
model_log,
data={
"thought": scratchpad.thought,
"action": scratchpad.action_str if scratchpad.action else None,
},
usage=llm_usage.get("usage"),
)
return scratchpad, finish_reason
def _handle_tool_call(
self,
action: AgentScratchpadUnit.Action,
prompt_messages: list[PromptMessage],
round_log: AgentLog,
) -> Generator[AgentLog, None, tuple[str, list[File]]]:
"""Handle tool call and return observation with files."""
tool_name = action.action_name
tool_args: dict[str, Any] | str = action.action_input
# Find tool instance first to get metadata
tool_instance = self._find_tool_by_name(tool_name)
tool_metadata = self._get_tool_metadata(tool_instance) if tool_instance else {}
# Start tool log with tool metadata
tool_log = self._create_log(
label=f"CALL {tool_name}",
log_type=AgentLog.LogType.TOOL_CALL,
status=AgentLog.LogStatus.START,
data={
"tool_name": tool_name,
"tool_args": tool_args,
},
parent_id=round_log.id,
extra_metadata=tool_metadata,
)
yield tool_log
if not tool_instance:
# Finish tool log with error
yield self._finish_log(
tool_log,
data={
**tool_log.data,
"error": f"Tool {tool_name} not found",
},
)
return f"Tool {tool_name} not found", []
# Ensure tool_args is a dict
tool_args_dict: dict[str, Any]
if isinstance(tool_args, str):
try:
tool_args_dict = json.loads(tool_args)
except json.JSONDecodeError:
tool_args_dict = {"input": tool_args}
elif not isinstance(tool_args, dict):
tool_args_dict = {"input": str(tool_args)}
else:
tool_args_dict = tool_args
# Invoke tool using base class method with error handling
try:
response_content, tool_files, tool_invoke_meta = self._invoke_tool(tool_instance, tool_args_dict, tool_name)
# Finish tool log
yield self._finish_log(
tool_log,
data={
**tool_log.data,
"output": response_content,
"files": len(tool_files),
"meta": tool_invoke_meta.to_dict() if tool_invoke_meta else None,
},
)
return response_content or "Tool executed successfully", tool_files
except Exception as e:
# Tool invocation failed, yield error log
error_message = str(e)
tool_log.status = AgentLog.LogStatus.ERROR
tool_log.error = error_message
tool_log.data = {
**tool_log.data,
"error": error_message,
}
yield tool_log
return f"Tool execution failed: {error_message}", []

View File

@ -1,107 +0,0 @@
"""Strategy factory for creating agent strategies."""
from __future__ import annotations
from typing import TYPE_CHECKING
from core.agent.entities import AgentEntity, ExecutionContext
from core.file.models import File
from core.model_manager import ModelInstance
from core.model_runtime.entities.model_entities import ModelFeature
from .base import AgentPattern, ToolInvokeHook
from .function_call import FunctionCallStrategy
from .react import ReActStrategy
if TYPE_CHECKING:
from core.tools.__base.tool import Tool
class StrategyFactory:
"""Factory for creating agent strategies based on model features."""
# Tool calling related features
TOOL_CALL_FEATURES = {ModelFeature.TOOL_CALL, ModelFeature.MULTI_TOOL_CALL, ModelFeature.STREAM_TOOL_CALL}
@staticmethod
def create_strategy(
model_features: list[ModelFeature],
model_instance: ModelInstance,
context: ExecutionContext,
tools: list[Tool],
files: list[File],
max_iterations: int = 10,
workflow_call_depth: int = 0,
agent_strategy: AgentEntity.Strategy | None = None,
tool_invoke_hook: ToolInvokeHook | None = None,
instruction: str = "",
) -> AgentPattern:
"""
Create an appropriate strategy based on model features.
Args:
model_features: List of model features/capabilities
model_instance: Model instance to use
context: Execution context containing trace/audit information
tools: Available tools
files: Available files
max_iterations: Maximum iterations for the strategy
workflow_call_depth: Depth of workflow calls
agent_strategy: Optional explicit strategy override
tool_invoke_hook: Optional hook for custom tool invocation (e.g., agent_invoke)
instruction: Optional instruction for ReAct strategy
Returns:
AgentStrategy instance
"""
# If explicit strategy is provided and it's Function Calling, try to use it if supported
if agent_strategy == AgentEntity.Strategy.FUNCTION_CALLING:
if set(model_features) & StrategyFactory.TOOL_CALL_FEATURES:
return FunctionCallStrategy(
model_instance=model_instance,
context=context,
tools=tools,
files=files,
max_iterations=max_iterations,
workflow_call_depth=workflow_call_depth,
tool_invoke_hook=tool_invoke_hook,
)
# Fallback to ReAct if FC is requested but not supported
# If explicit strategy is Chain of Thought (ReAct)
if agent_strategy == AgentEntity.Strategy.CHAIN_OF_THOUGHT:
return ReActStrategy(
model_instance=model_instance,
context=context,
tools=tools,
files=files,
max_iterations=max_iterations,
workflow_call_depth=workflow_call_depth,
tool_invoke_hook=tool_invoke_hook,
instruction=instruction,
)
# Default auto-selection logic
if set(model_features) & StrategyFactory.TOOL_CALL_FEATURES:
# Model supports native function calling
return FunctionCallStrategy(
model_instance=model_instance,
context=context,
tools=tools,
files=files,
max_iterations=max_iterations,
workflow_call_depth=workflow_call_depth,
tool_invoke_hook=tool_invoke_hook,
)
else:
# Use ReAct strategy for models without function calling
return ReActStrategy(
model_instance=model_instance,
context=context,
tools=tools,
files=files,
max_iterations=max_iterations,
workflow_call_depth=workflow_call_depth,
tool_invoke_hook=tool_invoke_hook,
instruction=instruction,
)

View File

@ -24,13 +24,11 @@ from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom
from core.app.entities.task_entities import ChatbotAppBlockingResponse, ChatbotAppStreamResponse
from core.app.layers.sandbox_layer import SandboxLayer
from core.helper.trace_id_helper import extract_external_trace_id_from_args
from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.ops.ops_trace_manager import TraceQueueManager
from core.prompt.utils.get_thread_messages_length import get_thread_messages_length
from core.repositories import DifyCoreRepositoryFactory
from core.sandbox.storage.archive_storage import ArchiveSandboxStorage
from core.workflow.repositories.draft_variable_repository import (
DraftVariableSaverFactory,
)
@ -42,7 +40,6 @@ from factories import file_factory
from libs.flask_utils import preserve_flask_contexts
from models import Account, App, Conversation, EndUser, Message, Workflow, WorkflowNodeExecutionTriggeredFrom
from models.enums import WorkflowRunTriggeredFrom
from models.workflow_features import WorkflowFeatures
from services.conversation_service import ConversationService
from services.workflow_draft_variable_service import (
DraftVarLoader,
@ -515,23 +512,6 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
if workflow is None:
raise ValueError("Workflow not found")
graph_engine_layers: tuple = ()
if workflow.get_feature(WorkflowFeatures.SANDBOX).enabled:
if application_generate_entity.workflow_run_id is None:
raise ValueError("workflow_run_id is required when sandbox is enabled")
graph_engine_layers = (
SandboxLayer(
tenant_id=application_generate_entity.app_config.tenant_id,
app_id=application_generate_entity.app_config.app_id,
workflow_version=workflow.version,
sandbox_id=application_generate_entity.workflow_run_id,
sandbox_storage=ArchiveSandboxStorage(
tenant_id=application_generate_entity.app_config.tenant_id,
sandbox_id=application_generate_entity.workflow_run_id,
),
),
)
# Determine system_user_id based on invocation source
is_external_api_call = application_generate_entity.invoke_from in {
InvokeFrom.WEB_APP,
@ -562,7 +542,6 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
app=app,
workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository,
graph_engine_layers=graph_engine_layers,
)
try:

View File

@ -4,7 +4,6 @@ import re
import time
from collections.abc import Callable, Generator, Mapping
from contextlib import contextmanager
from dataclasses import dataclass, field
from threading import Thread
from typing import Any, Union
@ -20,7 +19,6 @@ from core.app.entities.app_invoke_entities import (
InvokeFrom,
)
from core.app.entities.queue_entities import (
ChunkType,
MessageQueueMessage,
QueueAdvancedChatMessageEndEvent,
QueueAgentLogEvent,
@ -72,122 +70,13 @@ from core.workflow.runtime import GraphRuntimeState
from core.workflow.system_variable import SystemVariable
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models import Account, Conversation, EndUser, LLMGenerationDetail, Message, MessageFile
from models import Account, Conversation, EndUser, Message, MessageFile
from models.enums import CreatorUserRole
from models.workflow import Workflow
logger = logging.getLogger(__name__)
@dataclass
class StreamEventBuffer:
"""
Buffer for recording stream events in order to reconstruct the generation sequence.
Records the exact order of text chunks, thoughts, and tool calls as they stream.
"""
# Accumulated reasoning content (each thought block is a separate element)
reasoning_content: list[str] = field(default_factory=list)
# Current reasoning buffer (accumulates until we see a different event type)
_current_reasoning: str = ""
# Tool calls with their details
tool_calls: list[dict] = field(default_factory=list)
# Tool call ID to index mapping for updating results
_tool_call_id_map: dict[str, int] = field(default_factory=dict)
# Sequence of events in stream order
sequence: list[dict] = field(default_factory=list)
# Current position in answer text
_content_position: int = 0
# Track last event type to detect transitions
_last_event_type: str | None = None
def _flush_current_reasoning(self) -> None:
"""Flush accumulated reasoning to the list and add to sequence."""
if self._current_reasoning.strip():
self.reasoning_content.append(self._current_reasoning.strip())
self.sequence.append({"type": "reasoning", "index": len(self.reasoning_content) - 1})
self._current_reasoning = ""
def record_text_chunk(self, text: str) -> None:
"""Record a text chunk event."""
if not text:
return
# Flush any pending reasoning first
if self._last_event_type == "thought":
self._flush_current_reasoning()
text_len = len(text)
start_pos = self._content_position
# If last event was also content, extend it; otherwise create new
if self.sequence and self.sequence[-1].get("type") == "content":
self.sequence[-1]["end"] = start_pos + text_len
else:
self.sequence.append({"type": "content", "start": start_pos, "end": start_pos + text_len})
self._content_position += text_len
self._last_event_type = "content"
def record_thought_chunk(self, text: str) -> None:
"""Record a thought/reasoning chunk event."""
if not text:
return
# Accumulate thought content
self._current_reasoning += text
self._last_event_type = "thought"
def record_tool_call(self, tool_call_id: str, tool_name: str, tool_arguments: str) -> None:
"""Record a tool call event."""
if not tool_call_id:
return
# Flush any pending reasoning first
if self._last_event_type == "thought":
self._flush_current_reasoning()
# Check if this tool call already exists (we might get multiple chunks)
if tool_call_id in self._tool_call_id_map:
idx = self._tool_call_id_map[tool_call_id]
# Update arguments if provided
if tool_arguments:
self.tool_calls[idx]["arguments"] = tool_arguments
else:
# New tool call
tool_call = {
"id": tool_call_id or "",
"name": tool_name or "",
"arguments": tool_arguments or "",
"result": "",
"elapsed_time": None,
}
self.tool_calls.append(tool_call)
idx = len(self.tool_calls) - 1
self._tool_call_id_map[tool_call_id] = idx
self.sequence.append({"type": "tool_call", "index": idx})
self._last_event_type = "tool_call"
def record_tool_result(self, tool_call_id: str, result: str, tool_elapsed_time: float | None = None) -> None:
"""Record a tool result event (update existing tool call)."""
if not tool_call_id:
return
if tool_call_id in self._tool_call_id_map:
idx = self._tool_call_id_map[tool_call_id]
self.tool_calls[idx]["result"] = result
self.tool_calls[idx]["elapsed_time"] = tool_elapsed_time
def finalize(self) -> None:
"""Finalize the buffer, flushing any pending data."""
if self._last_event_type == "thought":
self._flush_current_reasoning()
def has_data(self) -> bool:
"""Check if there's any meaningful data recorded."""
return bool(self.reasoning_content or self.tool_calls or self.sequence)
class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
"""
AdvancedChatAppGenerateTaskPipeline is a class that generate stream output and state management for Application.
@ -255,8 +144,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
self._workflow_run_id: str = ""
self._draft_var_saver_factory = draft_var_saver_factory
self._graph_runtime_state: GraphRuntimeState | None = None
# Stream event buffer for recording generation sequence
self._stream_buffer = StreamEventBuffer()
self._seed_graph_runtime_state_from_queue_manager()
def process(self) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]:
@ -496,7 +383,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
queue_message: Union[WorkflowQueueMessage, MessageQueueMessage] | None = None,
**kwargs,
) -> Generator[StreamResponse, None, None]:
"""Handle text chunk events and record to stream buffer for sequence reconstruction."""
"""Handle text chunk events."""
delta_text = event.text
if delta_text is None:
return
@ -518,52 +405,9 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
if tts_publisher and queue_message:
tts_publisher.publish(queue_message)
tool_call = event.tool_call
tool_result = event.tool_result
tool_payload = tool_call or tool_result
tool_call_id = tool_payload.id if tool_payload and tool_payload.id else ""
tool_name = tool_payload.name if tool_payload and tool_payload.name else ""
tool_arguments = tool_call.arguments if tool_call and tool_call.arguments else ""
tool_files = tool_result.files if tool_result else []
tool_elapsed_time = tool_result.elapsed_time if tool_result else None
tool_icon = tool_payload.icon if tool_payload else None
tool_icon_dark = tool_payload.icon_dark if tool_payload else None
# Record stream event based on chunk type
chunk_type = event.chunk_type or ChunkType.TEXT
match chunk_type:
case ChunkType.TEXT:
self._stream_buffer.record_text_chunk(delta_text)
self._task_state.answer += delta_text
case ChunkType.THOUGHT:
# Reasoning should not be part of final answer text
self._stream_buffer.record_thought_chunk(delta_text)
case ChunkType.TOOL_CALL:
self._stream_buffer.record_tool_call(
tool_call_id=tool_call_id,
tool_name=tool_name,
tool_arguments=tool_arguments,
)
case ChunkType.TOOL_RESULT:
self._stream_buffer.record_tool_result(
tool_call_id=tool_call_id,
result=delta_text,
tool_elapsed_time=tool_elapsed_time,
)
self._task_state.answer += delta_text
case _:
pass
self._task_state.answer += delta_text
yield self._message_cycle_manager.message_to_stream_response(
answer=delta_text,
message_id=self._message_id,
from_variable_selector=event.from_variable_selector,
chunk_type=event.chunk_type.value if event.chunk_type else None,
tool_call_id=tool_call_id or None,
tool_name=tool_name or None,
tool_arguments=tool_arguments or None,
tool_files=tool_files,
tool_elapsed_time=tool_elapsed_time,
tool_icon=tool_icon,
tool_icon_dark=tool_icon_dark,
answer=delta_text, message_id=self._message_id, from_variable_selector=event.from_variable_selector
)
def _handle_iteration_start_event(
@ -931,7 +775,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
# If there are assistant files, remove markdown image links from answer
answer_text = self._task_state.answer
answer_text = self._strip_think_blocks(answer_text)
if self._recorded_files:
# Remove markdown image links since we're storing files separately
answer_text = re.sub(r"!\[.*?\]\(.*?\)", "", answer_text).strip()
@ -983,54 +826,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
]
session.add_all(message_files)
# Save generation detail (reasoning/tool calls/sequence) from stream buffer
self._save_generation_detail(session=session, message=message)
@staticmethod
def _strip_think_blocks(text: str) -> str:
"""Remove <think>...</think> blocks (including their content) from text."""
if not text or "<think" not in text.lower():
return text
clean_text = re.sub(r"<think[^>]*>.*?</think>", "", text, flags=re.IGNORECASE | re.DOTALL)
clean_text = re.sub(r"\n\s*\n", "\n\n", clean_text).strip()
return clean_text
def _save_generation_detail(self, *, session: Session, message: Message) -> None:
"""
Save LLM generation detail for Chatflow using stream event buffer.
The buffer records the exact order of events as they streamed,
allowing accurate reconstruction of the generation sequence.
"""
# Finalize the stream buffer to flush any pending data
self._stream_buffer.finalize()
# Only save if there's meaningful data
if not self._stream_buffer.has_data():
return
reasoning_content = self._stream_buffer.reasoning_content
tool_calls = self._stream_buffer.tool_calls
sequence = self._stream_buffer.sequence
# Check if generation detail already exists for this message
existing = session.query(LLMGenerationDetail).filter_by(message_id=message.id).first()
if existing:
existing.reasoning_content = json.dumps(reasoning_content) if reasoning_content else None
existing.tool_calls = json.dumps(tool_calls) if tool_calls else None
existing.sequence = json.dumps(sequence) if sequence else None
else:
generation_detail = LLMGenerationDetail(
tenant_id=self._application_generate_entity.app_config.tenant_id,
app_id=self._application_generate_entity.app_config.app_id,
message_id=message.id,
reasoning_content=json.dumps(reasoning_content) if reasoning_content else None,
tool_calls=json.dumps(tool_calls) if tool_calls else None,
sequence=json.dumps(sequence) if sequence else None,
)
session.add(generation_detail)
def _seed_graph_runtime_state_from_queue_manager(self) -> None:
"""Bootstrap the cached runtime state from the queue manager when present."""
candidate = self._base_task_pipeline.queue_manager.graph_runtime_state

View File

@ -3,8 +3,10 @@ from typing import cast
from sqlalchemy import select
from core.agent.agent_app_runner import AgentAppRunner
from core.agent.cot_chat_agent_runner import CotChatAgentRunner
from core.agent.cot_completion_agent_runner import CotCompletionAgentRunner
from core.agent.entities import AgentEntity
from core.agent.fc_agent_runner import FunctionCallAgentRunner
from core.app.apps.agent_chat.app_config_manager import AgentChatAppConfig
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.base_app_runner import AppRunner
@ -12,7 +14,8 @@ from core.app.entities.app_invoke_entities import AgentChatAppGenerateEntity
from core.app.entities.queue_entities import QueueAnnotationReplyEvent
from core.memory.token_buffer_memory import TokenBufferMemory
from core.model_manager import ModelInstance
from core.model_runtime.entities.model_entities import ModelFeature
from core.model_runtime.entities.llm_entities import LLMMode
from core.model_runtime.entities.model_entities import ModelFeature, ModelPropertyKey
from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from core.moderation.base import ModerationError
from extensions.ext_database import db
@ -191,7 +194,22 @@ class AgentChatAppRunner(AppRunner):
raise ValueError("Message not found")
db.session.close()
runner = AgentAppRunner(
runner_cls: type[FunctionCallAgentRunner] | type[CotChatAgentRunner] | type[CotCompletionAgentRunner]
# start agent runner
if agent_entity.strategy == AgentEntity.Strategy.CHAIN_OF_THOUGHT:
# check LLM mode
if model_schema.model_properties.get(ModelPropertyKey.MODE) == LLMMode.CHAT:
runner_cls = CotChatAgentRunner
elif model_schema.model_properties.get(ModelPropertyKey.MODE) == LLMMode.COMPLETION:
runner_cls = CotCompletionAgentRunner
else:
raise ValueError(f"Invalid LLM mode: {model_schema.model_properties.get(ModelPropertyKey.MODE)}")
elif agent_entity.strategy == AgentEntity.Strategy.FUNCTION_CALLING:
runner_cls = FunctionCallAgentRunner
else:
raise ValueError(f"Invalid agent strategy: {agent_entity.strategy}")
runner = runner_cls(
tenant_id=app_config.tenant_id,
application_generate_entity=application_generate_entity,
conversation=conversation_result,

View File

@ -671,7 +671,7 @@ class WorkflowResponseConverter:
task_id=task_id,
data=AgentLogStreamResponse.Data(
node_execution_id=event.node_execution_id,
message_id=event.id,
id=event.id,
parent_id=event.parent_id,
label=event.label,
error=event.error,

View File

@ -8,7 +8,7 @@ from typing import Any, Literal, Union, overload
from flask import Flask, current_app
from pydantic import ValidationError
from sqlalchemy import select
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import Session, sessionmaker
import contexts
from configs import dify_config
@ -23,13 +23,10 @@ from core.app.apps.workflow.generate_response_converter import WorkflowAppGenera
from core.app.apps.workflow.generate_task_pipeline import WorkflowAppGenerateTaskPipeline
from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity
from core.app.entities.task_entities import WorkflowAppBlockingResponse, WorkflowAppStreamResponse
from core.app.layers.sandbox_layer import SandboxLayer
from core.db.session_factory import session_factory
from core.helper.trace_id_helper import extract_external_trace_id_from_args
from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.ops.ops_trace_manager import TraceQueueManager
from core.repositories import DifyCoreRepositoryFactory
from core.sandbox.storage.archive_storage import ArchiveSandboxStorage
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
@ -40,7 +37,6 @@ from factories import file_factory
from libs.flask_utils import preserve_flask_contexts
from models import Account, App, EndUser, Workflow, WorkflowNodeExecutionTriggeredFrom
from models.enums import WorkflowRunTriggeredFrom
from models.workflow_features import WorkflowFeatures
from services.workflow_draft_variable_service import DraftVarLoader, WorkflowDraftVariableService
SKIP_PREPARE_USER_INPUTS_KEY = "_skip_prepare_user_inputs"
@ -480,7 +476,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
:return:
"""
with preserve_flask_contexts(flask_app, context_vars=context):
with session_factory.create_session() as session:
with Session(db.engine, expire_on_commit=False) as session:
workflow = session.scalar(
select(Workflow).where(
Workflow.tenant_id == application_generate_entity.app_config.tenant_id,
@ -491,21 +487,6 @@ class WorkflowAppGenerator(BaseAppGenerator):
if workflow is None:
raise ValueError("Workflow not found")
if workflow.get_feature(WorkflowFeatures.SANDBOX).enabled:
graph_engine_layers = (
*graph_engine_layers,
SandboxLayer(
tenant_id=application_generate_entity.app_config.tenant_id,
app_id=application_generate_entity.app_config.app_id,
workflow_version=workflow.version,
sandbox_id=application_generate_entity.workflow_execution_id,
sandbox_storage=ArchiveSandboxStorage(
tenant_id=application_generate_entity.app_config.tenant_id,
sandbox_id=application_generate_entity.workflow_execution_id,
),
),
)
# Determine system_user_id based on invocation source
is_external_api_call = application_generate_entity.invoke_from in {
InvokeFrom.WEB_APP,

View File

@ -13,7 +13,6 @@ from core.app.apps.common.workflow_response_converter import WorkflowResponseCon
from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity
from core.app.entities.queue_entities import (
AppQueueEvent,
ChunkType,
MessageQueueMessage,
QueueAgentLogEvent,
QueueErrorEvent,
@ -484,33 +483,11 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
if delta_text is None:
return
tool_call = event.tool_call
tool_result = event.tool_result
tool_payload = tool_call or tool_result
tool_call_id = tool_payload.id if tool_payload and tool_payload.id else None
tool_name = tool_payload.name if tool_payload and tool_payload.name else None
tool_arguments = tool_call.arguments if tool_call else None
tool_elapsed_time = tool_result.elapsed_time if tool_result else None
tool_files = tool_result.files if tool_result else []
tool_icon = tool_payload.icon if tool_payload else None
tool_icon_dark = tool_payload.icon_dark if tool_payload else None
# only publish tts message at text chunk streaming
if tts_publisher and queue_message:
tts_publisher.publish(queue_message)
yield self._text_chunk_to_stream_response(
text=delta_text,
from_variable_selector=event.from_variable_selector,
chunk_type=event.chunk_type,
tool_call_id=tool_call_id,
tool_name=tool_name,
tool_arguments=tool_arguments,
tool_files=tool_files,
tool_elapsed_time=tool_elapsed_time,
tool_icon=tool_icon,
tool_icon_dark=tool_icon_dark,
)
yield self._text_chunk_to_stream_response(delta_text, from_variable_selector=event.from_variable_selector)
def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[StreamResponse, None, None]:
"""Handle agent log events."""
@ -673,61 +650,16 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
session.add(workflow_app_log)
def _text_chunk_to_stream_response(
self,
text: str,
from_variable_selector: list[str] | None = None,
chunk_type: ChunkType | None = None,
tool_call_id: str | None = None,
tool_name: str | None = None,
tool_arguments: str | None = None,
tool_files: list[str] | None = None,
tool_error: str | None = None,
tool_elapsed_time: float | None = None,
tool_icon: str | dict | None = None,
tool_icon_dark: str | dict | None = None,
self, text: str, from_variable_selector: list[str] | None = None
) -> TextChunkStreamResponse:
"""
Handle completed event.
:param text: text
:return:
"""
from core.app.entities.task_entities import ChunkType as ResponseChunkType
response_chunk_type = ResponseChunkType(chunk_type.value) if chunk_type else ResponseChunkType.TEXT
data = TextChunkStreamResponse.Data(
text=text,
from_variable_selector=from_variable_selector,
chunk_type=response_chunk_type,
)
if response_chunk_type == ResponseChunkType.TOOL_CALL:
data = data.model_copy(
update={
"tool_call_id": tool_call_id,
"tool_name": tool_name,
"tool_arguments": tool_arguments,
"tool_icon": tool_icon,
"tool_icon_dark": tool_icon_dark,
}
)
elif response_chunk_type == ResponseChunkType.TOOL_RESULT:
data = data.model_copy(
update={
"tool_call_id": tool_call_id,
"tool_name": tool_name,
"tool_arguments": tool_arguments,
"tool_files": tool_files,
"tool_error": tool_error,
"tool_elapsed_time": tool_elapsed_time,
"tool_icon": tool_icon,
"tool_icon_dark": tool_icon_dark,
}
)
response = TextChunkStreamResponse(
task_id=self._application_generate_entity.task_id,
data=data,
data=TextChunkStreamResponse.Data(text=text, from_variable_selector=from_variable_selector),
)
return response

View File

@ -463,20 +463,12 @@ class WorkflowBasedAppRunner:
)
)
elif isinstance(event, NodeRunStreamChunkEvent):
from core.app.entities.queue_entities import ChunkType as QueueChunkType
if event.is_final and not event.chunk:
return
self._publish_event(
QueueTextChunkEvent(
text=event.chunk,
from_variable_selector=list(event.selector),
in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id,
chunk_type=QueueChunkType(event.chunk_type.value),
tool_call=event.tool_call,
tool_result=event.tool_result,
)
)
elif isinstance(event, NodeRunRetrieverResourceEvent):

View File

@ -1,236 +0,0 @@
from __future__ import annotations
from collections import defaultdict
from collections.abc import Generator
from enum import StrEnum
from pydantic import BaseModel, Field
class AssetNodeType(StrEnum):
FILE = "file"
FOLDER = "folder"
class AppAssetNode(BaseModel):
id: str = Field(description="Unique identifier for the node")
node_type: AssetNodeType = Field(description="Type of node: file or folder")
name: str = Field(description="Name of the file or folder")
parent_id: str | None = Field(default=None, description="Parent folder ID, None for root level")
order: int = Field(default=0, description="Sort order within parent folder, lower values first")
extension: str = Field(default="", description="File extension without dot, empty for folders")
size: int = Field(default=0, description="File size in bytes, 0 for folders")
checksum: str = Field(default="", description="SHA-256 checksum of file content, empty for folders")
@classmethod
def create_folder(cls, node_id: str, name: str, parent_id: str | None = None) -> AppAssetNode:
return cls(id=node_id, node_type=AssetNodeType.FOLDER, name=name, parent_id=parent_id)
@classmethod
def create_file(
cls, node_id: str, name: str, parent_id: str | None = None, size: int = 0, checksum: str = ""
) -> AppAssetNode:
return cls(
id=node_id,
node_type=AssetNodeType.FILE,
name=name,
parent_id=parent_id,
extension=name.rsplit(".", 1)[-1] if "." in name else "",
size=size,
checksum=checksum,
)
class AppAssetNodeView(BaseModel):
id: str = Field(description="Unique identifier for the node")
node_type: str = Field(description="Type of node: 'file' or 'folder'")
name: str = Field(description="Name of the file or folder")
path: str = Field(description="Full path from root, e.g. '/folder/file.txt'")
extension: str = Field(default="", description="File extension without dot")
size: int = Field(default=0, description="File size in bytes")
checksum: str = Field(default="", description="SHA-256 checksum of file content")
children: list[AppAssetNodeView] = Field(default_factory=list, description="Child nodes for folders")
class TreeNodeNotFoundError(Exception):
"""Tree internal: node not found"""
pass
class TreeParentNotFoundError(Exception):
"""Tree internal: parent folder not found"""
pass
class TreePathConflictError(Exception):
"""Tree internal: path already exists"""
pass
class AppAssetFileTree(BaseModel):
"""
File tree structure for app assets using adjacency list pattern.
Design:
- Storage: Flat list with parent_id references (adjacency list)
- Path: Computed dynamically via get_path(), not stored
- Order: Integer field for user-defined sorting within each folder
- API response: transform() builds nested tree with computed paths
Why adjacency list over nested tree or materialized path:
- Simpler CRUD: move/rename only updates one node's parent_id
- No path cascade: renaming parent doesn't require updating all descendants
- JSON-friendly: flat list serializes cleanly to database JSON column
- Trade-off: path lookup is O(depth), acceptable for typical file trees
"""
nodes: list[AppAssetNode] = Field(default_factory=list, description="Flat list of all nodes in the tree")
def get(self, node_id: str) -> AppAssetNode | None:
return next((n for n in self.nodes if n.id == node_id), None)
def get_children(self, parent_id: str | None) -> list[AppAssetNode]:
return [n for n in self.nodes if n.parent_id == parent_id]
def has_child_named(self, parent_id: str | None, name: str) -> bool:
return any(n.name == name and n.parent_id == parent_id for n in self.nodes)
def get_path(self, node_id: str) -> str:
node = self.get(node_id)
if not node:
raise TreeNodeNotFoundError(node_id)
parts: list[str] = []
current: AppAssetNode | None = node
while current:
parts.append(current.name)
current = self.get(current.parent_id) if current.parent_id else None
return "/" + "/".join(reversed(parts))
def get_descendant_ids(self, node_id: str) -> list[str]:
result: list[str] = []
stack = [node_id]
while stack:
current_id = stack.pop()
for child in self.nodes:
if child.parent_id == current_id:
result.append(child.id)
stack.append(child.id)
return result
def add(self, node: AppAssetNode) -> AppAssetNode:
if self.get(node.id):
raise TreePathConflictError(node.id)
if self.has_child_named(node.parent_id, node.name):
raise TreePathConflictError(node.name)
if node.parent_id:
parent = self.get(node.parent_id)
if not parent or parent.node_type != AssetNodeType.FOLDER:
raise TreeParentNotFoundError(node.parent_id)
siblings = self.get_children(node.parent_id)
node.order = max((s.order for s in siblings), default=-1) + 1
self.nodes.append(node)
return node
def update(self, node_id: str, size: int, checksum: str) -> AppAssetNode:
node = self.get(node_id)
if not node or node.node_type != AssetNodeType.FILE:
raise TreeNodeNotFoundError(node_id)
node.size = size
node.checksum = checksum
return node
def rename(self, node_id: str, new_name: str) -> AppAssetNode:
node = self.get(node_id)
if not node:
raise TreeNodeNotFoundError(node_id)
if node.name != new_name and self.has_child_named(node.parent_id, new_name):
raise TreePathConflictError(new_name)
node.name = new_name
if node.node_type == AssetNodeType.FILE:
node.extension = new_name.rsplit(".", 1)[-1] if "." in new_name else ""
return node
def move(self, node_id: str, new_parent_id: str | None) -> AppAssetNode:
node = self.get(node_id)
if not node:
raise TreeNodeNotFoundError(node_id)
if new_parent_id:
parent = self.get(new_parent_id)
if not parent or parent.node_type != AssetNodeType.FOLDER:
raise TreeParentNotFoundError(new_parent_id)
if self.has_child_named(new_parent_id, node.name):
raise TreePathConflictError(node.name)
node.parent_id = new_parent_id
siblings = self.get_children(new_parent_id)
node.order = max((s.order for s in siblings if s.id != node_id), default=-1) + 1
return node
def reorder(self, node_id: str, after_node_id: str | None) -> AppAssetNode:
node = self.get(node_id)
if not node:
raise TreeNodeNotFoundError(node_id)
siblings = sorted(self.get_children(node.parent_id), key=lambda x: x.order)
siblings = [s for s in siblings if s.id != node_id]
if after_node_id is None:
insert_idx = 0
else:
after_node = self.get(after_node_id)
if not after_node or after_node.parent_id != node.parent_id:
raise TreeNodeNotFoundError(after_node_id)
insert_idx = next((i for i, s in enumerate(siblings) if s.id == after_node_id), -1) + 1
siblings.insert(insert_idx, node)
for idx, sibling in enumerate(siblings):
sibling.order = idx
return node
def remove(self, node_id: str) -> list[str]:
node = self.get(node_id)
if not node:
raise TreeNodeNotFoundError(node_id)
ids_to_remove = [node_id] + self.get_descendant_ids(node_id)
self.nodes = [n for n in self.nodes if n.id not in ids_to_remove]
return ids_to_remove
def walk_files(self) -> Generator[AppAssetNode, None, None]:
return (n for n in self.nodes if n.node_type == AssetNodeType.FILE)
def transform(self) -> list[AppAssetNodeView]:
by_parent: dict[str | None, list[AppAssetNode]] = defaultdict(list)
for n in self.nodes:
by_parent[n.parent_id].append(n)
for children in by_parent.values():
children.sort(key=lambda x: x.order)
paths: dict[str, str] = {}
tree_views: dict[str, AppAssetNodeView] = {}
def build_view(node: AppAssetNode, parent_path: str) -> None:
path = f"{parent_path}/{node.name}"
paths[node.id] = path
child_views: list[AppAssetNodeView] = []
for child in by_parent.get(node.id, []):
build_view(child, path)
child_views.append(tree_views[child.id])
tree_views[node.id] = AppAssetNodeView(
id=node.id,
node_type=node.node_type.value,
name=node.name,
path=path,
extension=node.extension,
size=node.size,
checksum=node.checksum,
children=child_views,
)
for root_node in by_parent.get(None, []):
build_view(root_node, "")
return [tree_views[n.id] for n in by_parent.get(None, [])]

View File

@ -36,9 +36,6 @@ class InvokeFrom(StrEnum):
# this is used for plugin trigger and webhook trigger.
TRIGGER = "trigger"
# AGENT indicates that this invocation is from an agent.
AGENT = "agent"
# EXPLORE indicates that this invocation is from
# the workflow (or chatflow) explore page.
EXPLORE = "explore"

View File

@ -1,70 +0,0 @@
"""
LLM Generation Detail entities.
Defines the structure for storing and transmitting LLM generation details
including reasoning content, tool calls, and their sequence.
"""
from typing import Literal
from pydantic import BaseModel, Field
class ContentSegment(BaseModel):
"""Represents a content segment in the generation sequence."""
type: Literal["content"] = "content"
start: int = Field(..., description="Start position in the text")
end: int = Field(..., description="End position in the text")
class ReasoningSegment(BaseModel):
"""Represents a reasoning segment in the generation sequence."""
type: Literal["reasoning"] = "reasoning"
index: int = Field(..., description="Index into reasoning_content array")
class ToolCallSegment(BaseModel):
"""Represents a tool call segment in the generation sequence."""
type: Literal["tool_call"] = "tool_call"
index: int = Field(..., description="Index into tool_calls array")
SequenceSegment = ContentSegment | ReasoningSegment | ToolCallSegment
class ToolCallDetail(BaseModel):
"""Represents a tool call with its arguments and result."""
id: str = Field(default="", description="Unique identifier for the tool call")
name: str = Field(..., description="Name of the tool")
arguments: str = Field(default="", description="JSON string of tool arguments")
result: str = Field(default="", description="Result from the tool execution")
elapsed_time: float | None = Field(default=None, description="Elapsed time in seconds")
class LLMGenerationDetailData(BaseModel):
"""
Domain model for LLM generation detail.
Contains the structured data for reasoning content, tool calls,
and their display sequence.
"""
reasoning_content: list[str] = Field(default_factory=list, description="List of reasoning segments")
tool_calls: list[ToolCallDetail] = Field(default_factory=list, description="List of tool call details")
sequence: list[SequenceSegment] = Field(default_factory=list, description="Display order of segments")
def is_empty(self) -> bool:
"""Check if there's any meaningful generation detail."""
return not self.reasoning_content and not self.tool_calls
def to_response_dict(self) -> dict:
"""Convert to dictionary for API response."""
return {
"reasoning_content": self.reasoning_content,
"tool_calls": [tc.model_dump() for tc in self.tool_calls],
"sequence": [seg.model_dump() for seg in self.sequence],
}

View File

@ -7,7 +7,7 @@ from pydantic import BaseModel, ConfigDict, Field
from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from core.workflow.entities import AgentNodeStrategyInit, ToolCall, ToolResult
from core.workflow.entities import AgentNodeStrategyInit
from core.workflow.enums import WorkflowNodeExecutionMetadataKey
from core.workflow.nodes import NodeType
@ -177,17 +177,6 @@ class QueueLoopCompletedEvent(AppQueueEvent):
error: str | None = None
class ChunkType(StrEnum):
"""Stream chunk type for LLM-related events."""
TEXT = "text" # Normal text streaming
TOOL_CALL = "tool_call" # Tool call arguments streaming
TOOL_RESULT = "tool_result" # Tool execution result
THOUGHT = "thought" # Agent thinking process (ReAct)
THOUGHT_START = "thought_start" # Agent thought start
THOUGHT_END = "thought_end" # Agent thought end
class QueueTextChunkEvent(AppQueueEvent):
"""
QueueTextChunkEvent entity
@ -202,16 +191,6 @@ class QueueTextChunkEvent(AppQueueEvent):
in_loop_id: str | None = None
"""loop id if node is in loop"""
# Extended fields for Agent/Tool streaming
chunk_type: ChunkType = ChunkType.TEXT
"""type of the chunk"""
# Tool streaming payloads
tool_call: ToolCall | None = None
"""structured tool call info"""
tool_result: ToolResult | None = None
"""structured tool result info"""
class QueueAgentMessageEvent(AppQueueEvent):
"""

View File

@ -113,38 +113,6 @@ class MessageStreamResponse(StreamResponse):
answer: str
from_variable_selector: list[str] | None = None
# Extended fields for Agent/Tool streaming (imported at runtime to avoid circular import)
chunk_type: str | None = None
"""type of the chunk: text, tool_call, tool_result, thought"""
# Tool call fields (when chunk_type == "tool_call")
tool_call_id: str | None = None
"""unique identifier for this tool call"""
tool_name: str | None = None
"""name of the tool being called"""
tool_arguments: str | None = None
"""accumulated tool arguments JSON"""
# Tool result fields (when chunk_type == "tool_result")
tool_files: list[str] | None = None
"""file IDs produced by tool"""
tool_error: str | None = None
"""error message if tool failed"""
tool_elapsed_time: float | None = None
"""elapsed time spent executing the tool"""
tool_icon: str | dict | None = None
"""icon of the tool"""
tool_icon_dark: str | dict | None = None
"""dark theme icon of the tool"""
def model_dump(self, *args, **kwargs) -> dict[str, object]:
kwargs.setdefault("exclude_none", True)
return super().model_dump(*args, **kwargs)
def model_dump_json(self, *args, **kwargs) -> str:
kwargs.setdefault("exclude_none", True)
return super().model_dump_json(*args, **kwargs)
class MessageAudioStreamResponse(StreamResponse):
"""
@ -614,17 +582,6 @@ class LoopNodeCompletedStreamResponse(StreamResponse):
data: Data
class ChunkType(StrEnum):
"""Stream chunk type for LLM-related events."""
TEXT = "text" # Normal text streaming
TOOL_CALL = "tool_call" # Tool call arguments streaming
TOOL_RESULT = "tool_result" # Tool execution result
THOUGHT = "thought" # Agent thinking process (ReAct)
THOUGHT_START = "thought_start" # Agent thought start
THOUGHT_END = "thought_end" # Agent thought end
class TextChunkStreamResponse(StreamResponse):
"""
TextChunkStreamResponse entity
@ -638,36 +595,6 @@ class TextChunkStreamResponse(StreamResponse):
text: str
from_variable_selector: list[str] | None = None
# Extended fields for Agent/Tool streaming
chunk_type: ChunkType = ChunkType.TEXT
"""type of the chunk"""
# Tool call fields (when chunk_type == TOOL_CALL)
tool_call_id: str | None = None
"""unique identifier for this tool call"""
tool_name: str | None = None
"""name of the tool being called"""
tool_arguments: str | None = None
"""accumulated tool arguments JSON"""
# Tool result fields (when chunk_type == TOOL_RESULT)
tool_files: list[str] | None = None
"""file IDs produced by tool"""
tool_error: str | None = None
"""error message if tool failed"""
# Tool elapsed time fields (when chunk_type == TOOL_RESULT)
tool_elapsed_time: float | None = None
"""elapsed time spent executing the tool"""
def model_dump(self, *args, **kwargs) -> dict[str, object]:
kwargs.setdefault("exclude_none", True)
return super().model_dump(*args, **kwargs)
def model_dump_json(self, *args, **kwargs) -> str:
kwargs.setdefault("exclude_none", True)
return super().model_dump_json(*args, **kwargs)
event: StreamEvent = StreamEvent.TEXT_CHUNK
data: Data
@ -816,7 +743,7 @@ class AgentLogStreamResponse(StreamResponse):
"""
node_execution_id: str
message_id: str
id: str
label: str
parent_id: str | None = None
error: str | None = None

View File

@ -1,119 +0,0 @@
import logging
from core.sandbox import AppAssetsInitializer, DifyCliInitializer, SandboxManager
from core.sandbox.storage.sandbox_storage import SandboxStorage
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.graph_events.base import GraphEngineEvent
from core.workflow.graph_events.graph import GraphRunPausedEvent
from models.workflow import Workflow
from services.app_asset_service import AppAssetService
from services.sandbox.sandbox_provider_service import SandboxProviderService
logger = logging.getLogger(__name__)
class SandboxInitializationError(Exception):
pass
class SandboxLayer(GraphEngineLayer):
def __init__(
self,
tenant_id: str,
app_id: str,
workflow_version: str,
sandbox_id: str,
sandbox_storage: SandboxStorage,
) -> None:
super().__init__()
self._tenant_id = tenant_id
self._app_id = app_id
self._workflow_version = workflow_version
self._sandbox_id = sandbox_id
self._sandbox_storage = sandbox_storage
@property
def sandbox(self) -> VirtualEnvironment:
sandbox = SandboxManager.get(self._sandbox_id)
if sandbox is None:
raise RuntimeError(f"Sandbox not found or not initialized for sandbox_id={self._sandbox_id}")
return sandbox
def on_graph_start(self) -> None:
try:
is_draft = self._workflow_version == Workflow.VERSION_DRAFT
assets = AppAssetService.get_assets(self._tenant_id, self._app_id, is_draft=is_draft)
if not assets:
raise ValueError(
f"No assets found for tid={self._tenant_id}, app_id={self._app_id}, wf={self._workflow_version}"
)
if is_draft:
logger.info(
"Building draft assets for tenant_id=%s, app_id=%s, workflow_version=%s, assets_id=%s",
self._tenant_id,
self._app_id,
self._workflow_version,
assets.id,
)
AppAssetService.build_assets(self._tenant_id, self._app_id, assets)
logger.info(
"Initializing sandbox for tenant_id=%s, app_id=%s, workflow_version=%s, assets_id=%s",
self._tenant_id,
self._app_id,
self._workflow_version,
assets.id,
)
builder = (
SandboxProviderService.create_sandbox_builder(self._tenant_id)
.initializer(AppAssetsInitializer(self._tenant_id, self._app_id, assets.id))
.initializer(DifyCliInitializer(self._tenant_id, self._app_id, assets.id))
)
sandbox = builder.build()
SandboxManager.register(self._sandbox_id, sandbox)
logger.info(
"Sandbox initialized, workflow_execution_id=%s, sandbox_id=%s, sandbox_arch=%s",
self._sandbox_id,
sandbox.metadata.id,
sandbox.metadata.arch,
)
# Check if sandbox is initialized
if self._sandbox_storage.mount(sandbox):
logger.info("Sandbox files restored, sandbox_id=%s", self._sandbox_id)
except Exception as e:
logger.exception("Failed to initialize sandbox")
raise SandboxInitializationError(f"Failed to initialize sandbox: {e}") from e
def on_event(self, event: GraphEngineEvent) -> None:
# TODO: handle graph run paused event
if not isinstance(event, GraphRunPausedEvent):
return
def on_graph_end(self, error: Exception | None) -> None:
sandbox = SandboxManager.unregister(self._sandbox_id)
if sandbox is None:
logger.debug("No sandbox to release for sandbox_id=%s", self._sandbox_id)
return
sandbox_id = sandbox.metadata.id
logger.info(
"Releasing sandbox, workflow_execution_id=%s, sandbox_id=%s",
self._sandbox_id,
sandbox_id,
)
try:
self._sandbox_storage.unmount(sandbox)
logger.info("Sandbox files persisted, sandbox_id=%s", self._sandbox_id)
except Exception:
logger.exception("Failed to persist sandbox files, sandbox_id=%s", self._sandbox_id)
try:
sandbox.release_environment()
logger.info("Sandbox released, sandbox_id=%s", sandbox_id)
except Exception:
logger.exception("Failed to release sandbox, sandbox_id=%s", sandbox_id)

View File

@ -1,5 +1,4 @@
import logging
import re
import time
from collections.abc import Generator
from threading import Thread
@ -59,7 +58,7 @@ from core.prompt.utils.prompt_template_parser import PromptTemplateParser
from events.message_event import message_was_created
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models.model import AppMode, Conversation, LLMGenerationDetail, Message, MessageAgentThought
from models.model import AppMode, Conversation, Message, MessageAgentThought
logger = logging.getLogger(__name__)
@ -69,8 +68,6 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
EasyUIBasedGenerateTaskPipeline is a class that generate stream output and state management for Application.
"""
_THINK_PATTERN = re.compile(r"<think[^>]*>(.*?)</think>", re.IGNORECASE | re.DOTALL)
_task_state: EasyUITaskState
_application_generate_entity: Union[ChatAppGenerateEntity, CompletionAppGenerateEntity, AgentChatAppGenerateEntity]
@ -412,136 +409,11 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
)
)
# Save LLM generation detail if there's reasoning_content
self._save_generation_detail(session=session, message=message, llm_result=llm_result)
message_was_created.send(
message,
application_generate_entity=self._application_generate_entity,
)
def _save_generation_detail(self, *, session: Session, message: Message, llm_result: LLMResult) -> None:
"""
Save LLM generation detail for Completion/Chat/Agent-Chat applications.
For Agent-Chat, also merges MessageAgentThought records.
"""
import json
reasoning_list: list[str] = []
tool_calls_list: list[dict] = []
sequence: list[dict] = []
answer = message.answer or ""
# Check if this is Agent-Chat mode by looking for agent thoughts
agent_thoughts = (
session.query(MessageAgentThought)
.filter_by(message_id=message.id)
.order_by(MessageAgentThought.position.asc())
.all()
)
if agent_thoughts:
# Agent-Chat mode: merge MessageAgentThought records
content_pos = 0
cleaned_answer_parts: list[str] = []
for thought in agent_thoughts:
# Add thought/reasoning
if thought.thought:
reasoning_text = thought.thought
if "<think" in reasoning_text.lower():
clean_text, extracted_reasoning = self._split_reasoning_from_answer(reasoning_text)
if extracted_reasoning:
reasoning_text = extracted_reasoning
thought.thought = clean_text or extracted_reasoning
reasoning_list.append(reasoning_text)
sequence.append({"type": "reasoning", "index": len(reasoning_list) - 1})
# Add tool calls
if thought.tool:
tool_calls_list.append(
{
"name": thought.tool,
"arguments": thought.tool_input or "",
"result": thought.observation or "",
}
)
sequence.append({"type": "tool_call", "index": len(tool_calls_list) - 1})
# Add answer content if present
if thought.answer:
content_text = thought.answer
if "<think" in content_text.lower():
clean_answer, extracted_reasoning = self._split_reasoning_from_answer(content_text)
if extracted_reasoning:
reasoning_list.append(extracted_reasoning)
sequence.append({"type": "reasoning", "index": len(reasoning_list) - 1})
content_text = clean_answer
thought.answer = clean_answer or content_text
if content_text:
start = content_pos
end = content_pos + len(content_text)
sequence.append({"type": "content", "start": start, "end": end})
content_pos = end
cleaned_answer_parts.append(content_text)
if cleaned_answer_parts:
merged_answer = "".join(cleaned_answer_parts)
message.answer = merged_answer
llm_result.message.content = merged_answer
else:
# Completion/Chat mode: use reasoning_content from llm_result
reasoning_content = llm_result.reasoning_content
if not reasoning_content and answer:
# Extract reasoning from <think> blocks and clean the final answer
clean_answer, reasoning_content = self._split_reasoning_from_answer(answer)
if reasoning_content:
answer = clean_answer
llm_result.message.content = clean_answer
llm_result.reasoning_content = reasoning_content
message.answer = clean_answer
if reasoning_content:
reasoning_list = [reasoning_content]
# Content comes first, then reasoning
if answer:
sequence.append({"type": "content", "start": 0, "end": len(answer)})
sequence.append({"type": "reasoning", "index": 0})
# Only save if there's meaningful generation detail
if not reasoning_list and not tool_calls_list:
return
# Check if generation detail already exists
existing = session.query(LLMGenerationDetail).filter_by(message_id=message.id).first()
if existing:
existing.reasoning_content = json.dumps(reasoning_list) if reasoning_list else None
existing.tool_calls = json.dumps(tool_calls_list) if tool_calls_list else None
existing.sequence = json.dumps(sequence) if sequence else None
else:
generation_detail = LLMGenerationDetail(
tenant_id=self._application_generate_entity.app_config.tenant_id,
app_id=self._application_generate_entity.app_config.app_id,
message_id=message.id,
reasoning_content=json.dumps(reasoning_list) if reasoning_list else None,
tool_calls=json.dumps(tool_calls_list) if tool_calls_list else None,
sequence=json.dumps(sequence) if sequence else None,
)
session.add(generation_detail)
@classmethod
def _split_reasoning_from_answer(cls, text: str) -> tuple[str, str]:
"""
Extract reasoning segments from <think> blocks and return (clean_text, reasoning).
"""
matches = cls._THINK_PATTERN.findall(text)
reasoning_content = "\n".join(match.strip() for match in matches) if matches else ""
clean_text = cls._THINK_PATTERN.sub("", text)
clean_text = re.sub(r"\n\s*\n", "\n\n", clean_text).strip()
return clean_text, reasoning_content or ""
def _handle_stop(self, event: QueueStopEvent):
"""
Handle stop.

View File

@ -232,31 +232,15 @@ class MessageCycleManager:
answer: str,
message_id: str,
from_variable_selector: list[str] | None = None,
chunk_type: str | None = None,
tool_call_id: str | None = None,
tool_name: str | None = None,
tool_arguments: str | None = None,
tool_files: list[str] | None = None,
tool_error: str | None = None,
tool_elapsed_time: float | None = None,
tool_icon: str | dict | None = None,
tool_icon_dark: str | dict | None = None,
event_type: StreamEvent | None = None,
) -> MessageStreamResponse:
"""
Message to stream response.
:param answer: answer
:param message_id: message id
:param from_variable_selector: from variable selector
:param chunk_type: type of the chunk (text, function_call, tool_result, thought)
:param tool_call_id: unique identifier for this tool call
:param tool_name: name of the tool being called
:param tool_arguments: accumulated tool arguments JSON
:param tool_files: file IDs produced by tool
:param tool_error: error message if tool failed
:return:
"""
response = MessageStreamResponse(
return MessageStreamResponse(
task_id=self._application_generate_entity.task_id,
id=message_id,
answer=answer,
@ -264,35 +248,6 @@ class MessageCycleManager:
event=event_type or StreamEvent.MESSAGE,
)
if chunk_type:
response = response.model_copy(update={"chunk_type": chunk_type})
if chunk_type == "tool_call":
response = response.model_copy(
update={
"tool_call_id": tool_call_id,
"tool_name": tool_name,
"tool_arguments": tool_arguments,
"tool_icon": tool_icon,
"tool_icon_dark": tool_icon_dark,
}
)
elif chunk_type == "tool_result":
response = response.model_copy(
update={
"tool_call_id": tool_call_id,
"tool_name": tool_name,
"tool_arguments": tool_arguments,
"tool_files": tool_files,
"tool_error": tool_error,
"tool_elapsed_time": tool_elapsed_time,
"tool_icon": tool_icon,
"tool_icon_dark": tool_icon_dark,
}
)
return response
def message_replace_to_stream_response(self, answer: str, reason: str = "") -> MessageReplaceStreamResponse:
"""
Message replace to stream response.

View File

@ -1,35 +0,0 @@
from .entities import (
AssetItem,
FileAsset,
FileReference,
SkillAsset,
SkillMetadata,
ToolConfiguration,
ToolDefinition,
ToolFieldConfig,
ToolReference,
ToolType,
)
from .packager import AssetPackager, ZipPackager
from .parser import AssetItemParser, AssetParser, FileAssetParser, SkillAssetParser
from .paths import AssetPaths
__all__ = [
"AssetItem",
"AssetItemParser",
"AssetPackager",
"AssetParser",
"AssetPaths",
"FileAsset",
"FileAssetParser",
"FileReference",
"SkillAsset",
"SkillAssetParser",
"SkillMetadata",
"ToolConfiguration",
"ToolDefinition",
"ToolFieldConfig",
"ToolReference",
"ToolType",
"ZipPackager",
]

View File

@ -1,24 +0,0 @@
from .assets import AssetItem, FileAsset
from .skill import (
FileReference,
SkillAsset,
SkillMetadata,
ToolConfiguration,
ToolDefinition,
ToolFieldConfig,
ToolReference,
ToolType,
)
__all__ = [
"AssetItem",
"FileAsset",
"FileReference",
"SkillAsset",
"SkillMetadata",
"ToolConfiguration",
"ToolDefinition",
"ToolFieldConfig",
"ToolReference",
"ToolType",
]

View File

@ -1,22 +0,0 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class AssetItem(ABC):
node_id: str
path: str
file_name: str
extension: str
@abstractmethod
def get_storage_key(self) -> str:
raise NotImplementedError
@dataclass
class FileAsset(AssetItem):
storage_key: str
def get_storage_key(self) -> str:
return self.storage_key

View File

@ -1,68 +0,0 @@
from dataclasses import dataclass, field
from enum import StrEnum
from typing import Any
from pydantic import BaseModel, ConfigDict, Field
from .assets import AssetItem
class ToolType(StrEnum):
MCP = "mcp"
BUILTIN = "builtin"
class ToolFieldConfig(BaseModel):
model_config = ConfigDict(extra="forbid")
id: str
value: Any
auto: bool = False
class ToolConfiguration(BaseModel):
model_config = ConfigDict(extra="forbid")
fields: list[ToolFieldConfig] = Field(default_factory=list)
class ToolDefinition(BaseModel):
model_config = ConfigDict(extra="forbid")
type: ToolType
credential_id: str | None = None
configuration: ToolConfiguration = Field(default_factory=ToolConfiguration)
class ToolReference(BaseModel):
model_config = ConfigDict(extra="forbid")
provider: str
tool_name: str
uuid: str
raw: str
class FileReference(BaseModel):
model_config = ConfigDict(extra="forbid")
source: str
uuid: str
raw: str
class SkillMetadata(BaseModel):
model_config = ConfigDict(extra="allow")
tools: dict[str, ToolDefinition] = Field(default_factory=dict)
@dataclass
class SkillAsset(AssetItem):
storage_key: str
metadata: SkillMetadata
tool_references: list[ToolReference] = field(default_factory=list)
file_references: list[FileReference] = field(default_factory=list)
def get_storage_key(self) -> str:
return self.storage_key

View File

@ -1,7 +0,0 @@
from .base import AssetPackager
from .zip_packager import ZipPackager
__all__ = [
"AssetPackager",
"ZipPackager",
]

View File

@ -1,9 +0,0 @@
from abc import ABC, abstractmethod
from core.app_assets.entities import AssetItem
class AssetPackager(ABC):
@abstractmethod
def package(self, assets: list[AssetItem]) -> bytes:
raise NotImplementedError

View File

@ -1,27 +0,0 @@
import io
import zipfile
from typing import TYPE_CHECKING
from core.app_assets.entities import AssetItem
from .base import AssetPackager
if TYPE_CHECKING:
from extensions.ext_storage import Storage
class ZipPackager(AssetPackager):
_storage: "Storage"
def __init__(self, storage: "Storage") -> None:
self._storage = storage
def package(self, assets: list[AssetItem]) -> bytes:
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
for asset in assets:
content = self._storage.load_once(asset.get_storage_key())
zf.writestr(asset.path, content)
return zip_buffer.getvalue()

View File

@ -1,10 +0,0 @@
from .asset_parser import AssetParser
from .base import AssetItemParser, FileAssetParser
from .skill_parser import SkillAssetParser
__all__ = [
"AssetItemParser",
"AssetParser",
"FileAssetParser",
"SkillAssetParser",
]

View File

@ -1,36 +0,0 @@
from core.app.entities.app_asset_entities import AppAssetFileTree
from core.app_assets.entities import AssetItem
from core.app_assets.paths import AssetPaths
from .base import AssetItemParser, FileAssetParser
class AssetParser:
def __init__(
self,
tree: AppAssetFileTree,
tenant_id: str,
app_id: str,
) -> None:
self._tree = tree
self._tenant_id = tenant_id
self._app_id = app_id
self._parsers = {}
self._default_parser = FileAssetParser()
def register(self, extension: str, parser: AssetItemParser) -> None:
self._parsers[extension] = parser
def parse(self) -> list[AssetItem]:
assets: list[AssetItem] = []
for node in self._tree.walk_files():
path = self._tree.get_path(node.id).lstrip("/")
storage_key = AssetPaths.draft_file(self._tenant_id, self._app_id, node.id)
extension = node.extension or ""
parser = self._parsers.get(extension, self._default_parser)
asset = parser.parse(node.id, path, node.name, extension, storage_key)
assets.append(asset)
return assets

View File

@ -1,34 +0,0 @@
from abc import ABC, abstractmethod
from core.app_assets.entities import AssetItem, FileAsset
class AssetItemParser(ABC):
@abstractmethod
def parse(
self,
node_id: str,
path: str,
file_name: str,
extension: str,
storage_key: str,
) -> AssetItem:
raise NotImplementedError
class FileAssetParser(AssetItemParser):
def parse(
self,
node_id: str,
path: str,
file_name: str,
extension: str,
storage_key: str,
) -> FileAsset:
return FileAsset(
node_id=node_id,
path=path,
file_name=file_name,
extension=extension,
storage_key=storage_key,
)

View File

@ -1,146 +0,0 @@
import json
import logging
import re
from typing import Any
from core.app_assets.entities import (
FileReference,
SkillAsset,
SkillMetadata,
ToolReference,
)
from core.app_assets.paths import AssetPaths
from extensions.ext_storage import storage
from .base import AssetItemParser
TOOL_REFERENCE_PATTERN = re.compile(r"§\[tool\]\.\[([^\]]+)\]\.\[([^\]]+)\]\.\[([^\]]+)\")
FILE_REFERENCE_PATTERN = re.compile(r"§\[file\]\.\[([^\]]+)\]\.\[([^\]]+)\")
logger = logging.getLogger(__name__)
class SkillAssetParser(AssetItemParser):
def __init__(
self,
tenant_id: str,
app_id: str,
assets_id: str,
) -> None:
self._tenant_id = tenant_id
self._app_id = app_id
self._assets_id = assets_id
def parse(
self,
node_id: str,
path: str,
file_name: str,
extension: str,
storage_key: str,
) -> SkillAsset:
try:
return self._parse_skill_asset(node_id, path, file_name, extension, storage_key)
except Exception:
logger.exception("Failed to parse skill asset %s: %s", node_id)
# handle as plain text
return SkillAsset(
node_id=node_id,
path=path,
file_name=file_name,
extension=extension,
storage_key=storage_key,
metadata=SkillMetadata(),
tool_references=[],
file_references=[],
)
def _parse_skill_asset(
self, node_id: str, path: str, file_name: str, extension: str, storage_key: str
) -> SkillAsset:
try:
data = json.loads(storage.load_once(storage_key))
except (json.JSONDecodeError, UnicodeDecodeError):
# handle as plain text
return SkillAsset(
node_id=node_id,
path=path,
file_name=file_name,
extension=extension,
storage_key=storage_key,
metadata=SkillMetadata(),
tool_references=[],
file_references=[],
)
if not isinstance(data, dict):
raise ValueError(f"Skill document {node_id} must be a JSON object")
data_dict: dict[str, Any] = data
metadata_raw = data_dict.get("metadata", {})
content = data_dict.get("content", "")
if not isinstance(content, str):
raise ValueError(f"Skill document {node_id} 'content' must be a string")
metadata = SkillMetadata.model_validate(metadata_raw)
tool_references: list[ToolReference] = self._parse_tool_references(content)
file_references: list[FileReference] = self._parse_file_references(content)
resolved_content = self._resolve_content(content, tool_references, file_references)
resolved_key = AssetPaths.build_resolved_file(self._tenant_id, self._app_id, self._assets_id, node_id)
storage.save(resolved_key, resolved_content.encode("utf-8"))
return SkillAsset(
node_id=node_id,
path=path,
file_name=file_name,
extension=extension,
storage_key=resolved_key,
metadata=metadata,
tool_references=tool_references,
file_references=file_references,
)
def _resolve_content(
self,
content: str,
tool_references: list[ToolReference],
file_references: list[FileReference],
) -> str:
for ref in tool_references:
replacement = f"{ref.tool_name}"
content = content.replace(ref.raw, replacement)
for ref in file_references:
replacement = f"[file:{ref.uuid}]"
content = content.replace(ref.raw, replacement)
return content
def _parse_tool_references(self, content: str) -> list[ToolReference]:
tool_references: list[ToolReference] = []
for match in TOOL_REFERENCE_PATTERN.finditer(content):
tool_references.append(
ToolReference(
provider=match.group(1),
tool_name=match.group(2),
uuid=match.group(3),
raw=match.group(0),
)
)
return tool_references
def _parse_file_references(self, content: str) -> list[FileReference]:
file_references: list[FileReference] = []
for match in FILE_REFERENCE_PATTERN.finditer(content):
file_references.append(
FileReference(
source=match.group(1),
uuid=match.group(2),
raw=match.group(0),
)
)
return file_references

View File

@ -1,18 +0,0 @@
class AssetPaths:
_BASE = "app_assets"
@staticmethod
def draft_file(tenant_id: str, app_id: str, node_id: str) -> str:
return f"{AssetPaths._BASE}/{tenant_id}/{app_id}/draft/{node_id}"
@staticmethod
def build_zip(tenant_id: str, app_id: str, assets_id: str) -> str:
return f"{AssetPaths._BASE}/{tenant_id}/{app_id}/build/{assets_id}.zip"
@staticmethod
def build_resolved_file(tenant_id: str, app_id: str, assets_id: str, node_id: str) -> str:
return f"{AssetPaths._BASE}/{tenant_id}/{app_id}/build/{assets_id}/resolved/{node_id}"
@staticmethod
def build_tool_manifest(tenant_id: str, app_id: str, assets_id: str) -> str:
return f"{AssetPaths._BASE}/{tenant_id}/{app_id}/build/{assets_id}/tools.json"

View File

@ -5,6 +5,7 @@ from sqlalchemy import select
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.queue_entities import QueueRetrieverResourcesEvent
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from core.rag.index_processor.constant.index_type import IndexStructureType
from core.rag.models.document import Document
@ -89,8 +90,6 @@ class DatasetIndexToolCallbackHandler:
# TODO(-LAN-): Improve type check
def return_retriever_resource_info(self, resource: Sequence[RetrievalSourceMetadata]):
"""Handle return_retriever_resource_info."""
from core.app.entities.queue_entities import QueueRetrieverResourcesEvent
self._queue_manager.publish(
QueueRetrieverResourcesEvent(retriever_resources=resource), PublishFrom.APPLICATION_MANAGER
)

View File

@ -3,6 +3,7 @@ from pydantic import BaseModel, Field, field_validator
class PreviewDetail(BaseModel):
content: str
summary: str | None = None
child_chunks: list[str] | None = None

View File

@ -311,14 +311,18 @@ class IndexingRunner:
qa_preview_texts: list[QAPreviewDetail] = []
total_segments = 0
# doc_form represents the segmentation method (general, parent-child, QA)
index_type = doc_form
index_processor = IndexProcessorFactory(index_type).init_index_processor()
# one extract_setting is one source document
for extract_setting in extract_settings:
# extract
processing_rule = DatasetProcessRule(
mode=tmp_processing_rule["mode"], rules=json.dumps(tmp_processing_rule["rules"])
)
# Extract document content
text_docs = index_processor.extract(extract_setting, process_rule_mode=tmp_processing_rule["mode"])
# Cleaning and segmentation
documents = index_processor.transform(
text_docs,
current_user=None,
@ -361,6 +365,12 @@ class IndexingRunner:
if doc_form and doc_form == "qa_model":
return IndexingEstimate(total_segments=total_segments * 20, qa_preview=qa_preview_texts, preview=[])
# Generate summary preview
summary_index_setting = tmp_processing_rule.get("summary_index_setting")
if summary_index_setting and summary_index_setting.get("enable") and preview_texts:
preview_texts = index_processor.generate_summary_preview(tenant_id, preview_texts, summary_index_setting)
return IndexingEstimate(total_segments=total_segments, preview=preview_texts)
def _extract(

View File

@ -72,7 +72,7 @@ class LLMGenerator:
prompt_messages=list(prompts), model_parameters={"max_tokens": 500, "temperature": 1}, stream=False
)
answer = response.message.get_text_content()
if answer == "":
if answer is None:
return ""
try:
result_dict = json.loads(answer)
@ -113,9 +113,11 @@ class LLMGenerator:
output_parser = SuggestedQuestionsAfterAnswerOutputParser()
format_instructions = output_parser.get_format_instructions()
prompt_template = PromptTemplateParser(template="{{histories}}\n{{format_instructions}}\nquestions:\n")
prompt_template = PromptTemplateParser(
template="{{histories}}\n{{format_instructions}}\nquestions:\n")
prompt = prompt_template.format({"histories": histories, "format_instructions": format_instructions})
prompt = prompt_template.format(
{"histories": histories, "format_instructions": format_instructions})
try:
model_manager = ModelManager()
@ -141,11 +143,13 @@ class LLMGenerator:
)
text_content = response.message.get_text_content()
questions = output_parser.parse(text_content) if text_content else []
questions = output_parser.parse(
text_content) if text_content else []
except InvokeError:
questions = []
except Exception:
logger.exception("Failed to generate suggested questions after answer")
logger.exception(
"Failed to generate suggested questions after answer")
questions = []
return questions
@ -156,10 +160,12 @@ class LLMGenerator:
error = ""
error_step = ""
rule_config = {"prompt": "", "variables": [], "opening_statement": "", "error": ""}
rule_config = {"prompt": "", "variables": [],
"opening_statement": "", "error": ""}
model_parameters = model_config.get("completion_params", {})
if no_variable:
prompt_template = PromptTemplateParser(WORKFLOW_RULE_CONFIG_PROMPT_GENERATE_TEMPLATE)
prompt_template = PromptTemplateParser(
WORKFLOW_RULE_CONFIG_PROMPT_GENERATE_TEMPLATE)
prompt_generate = prompt_template.format(
inputs={
@ -190,7 +196,8 @@ class LLMGenerator:
error = str(e)
error_step = "generate rule config"
except Exception as e:
logger.exception("Failed to generate rule config, model: %s", model_config.get("name"))
logger.exception(
"Failed to generate rule config, model: %s", model_config.get("name"))
rule_config["error"] = str(e)
rule_config["error"] = f"Failed to {error_step}. Error: {error}" if error else ""
@ -245,7 +252,8 @@ class LLMGenerator:
},
remove_template_variables=False,
)
parameter_messages = [UserPromptMessage(content=parameter_generate_prompt)]
parameter_messages = [UserPromptMessage(
content=parameter_generate_prompt)]
# the second step to generate the task_parameter and task_statement
statement_generate_prompt = statement_template.format(
@ -255,13 +263,15 @@ class LLMGenerator:
},
remove_template_variables=False,
)
statement_messages = [UserPromptMessage(content=statement_generate_prompt)]
statement_messages = [UserPromptMessage(
content=statement_generate_prompt)]
try:
parameter_content: LLMResult = model_instance.invoke_llm(
prompt_messages=list(parameter_messages), model_parameters=model_parameters, stream=False
)
rule_config["variables"] = re.findall(r'"\s*([^"]+)\s*"', parameter_content.message.get_text_content())
rule_config["variables"] = re.findall(
r'"\s*([^"]+)\s*"', parameter_content.message.get_text_content())
except InvokeError as e:
error = str(e)
error_step = "generate variables"
@ -270,13 +280,15 @@ class LLMGenerator:
statement_content: LLMResult = model_instance.invoke_llm(
prompt_messages=list(statement_messages), model_parameters=model_parameters, stream=False
)
rule_config["opening_statement"] = statement_content.message.get_text_content()
rule_config["opening_statement"] = statement_content.message.get_text_content(
)
except InvokeError as e:
error = str(e)
error_step = "generate conversation opener"
except Exception as e:
logger.exception("Failed to generate rule config, model: %s", model_config.get("name"))
logger.exception(
"Failed to generate rule config, model: %s", model_config.get("name"))
rule_config["error"] = str(e)
rule_config["error"] = f"Failed to {error_step}. Error: {error}" if error else ""
@ -286,9 +298,11 @@ class LLMGenerator:
@classmethod
def generate_code(cls, tenant_id: str, instruction: str, model_config: dict, code_language: str = "javascript"):
if code_language == "python":
prompt_template = PromptTemplateParser(PYTHON_CODE_GENERATOR_PROMPT_TEMPLATE)
prompt_template = PromptTemplateParser(
PYTHON_CODE_GENERATOR_PROMPT_TEMPLATE)
else:
prompt_template = PromptTemplateParser(JAVASCRIPT_CODE_GENERATOR_PROMPT_TEMPLATE)
prompt_template = PromptTemplateParser(
JAVASCRIPT_CODE_GENERATOR_PROMPT_TEMPLATE)
prompt = prompt_template.format(
inputs={
@ -321,7 +335,8 @@ class LLMGenerator:
return {"code": "", "language": code_language, "error": f"Failed to generate code. Error: {error}"}
except Exception as e:
logger.exception(
"Failed to invoke LLM model, model: %s, language: %s", model_config.get("name"), code_language
"Failed to invoke LLM model, model: %s, language: %s", model_config.get(
"name"), code_language
)
return {"code": "", "language": code_language, "error": f"An unexpected error occurred: {str(e)}"}
@ -335,7 +350,8 @@ class LLMGenerator:
model_type=ModelType.LLM,
)
prompt_messages: list[PromptMessage] = [SystemPromptMessage(content=prompt), UserPromptMessage(content=query)]
prompt_messages: list[PromptMessage] = [SystemPromptMessage(
content=prompt), UserPromptMessage(content=query)]
# Explicitly use the non-streaming overload
result = model_instance.invoke_llm(
@ -381,16 +397,19 @@ class LLMGenerator:
parsed_content = json_repair.loads(raw_content)
if not isinstance(parsed_content, dict | list):
raise ValueError(f"Failed to parse structured output from llm: {raw_content}")
raise ValueError(
f"Failed to parse structured output from llm: {raw_content}")
generated_json_schema = json.dumps(parsed_content, indent=2, ensure_ascii=False)
generated_json_schema = json.dumps(
parsed_content, indent=2, ensure_ascii=False)
return {"output": generated_json_schema, "error": ""}
except InvokeError as e:
error = str(e)
return {"output": "", "error": f"Failed to generate JSON Schema. Error: {error}"}
except Exception as e:
logger.exception("Failed to invoke LLM model, model: %s", model_config.get("name"))
logger.exception(
"Failed to invoke LLM model, model: %s", model_config.get("name"))
return {"output": "", "error": f"An unexpected error occurred: {str(e)}"}
@staticmethod
@ -398,7 +417,8 @@ class LLMGenerator:
tenant_id: str, flow_id: str, current: str, instruction: str, model_config: dict, ideal_output: str | None
):
last_run: Message | None = (
db.session.query(Message).where(Message.app_id == flow_id).order_by(Message.created_at.desc()).first()
db.session.query(Message).where(Message.app_id == flow_id).order_by(
Message.created_at.desc()).first()
)
if not last_run:
return LLMGenerator.__instruction_modify_common(
@ -446,7 +466,8 @@ class LLMGenerator:
workflow = workflow_service.get_draft_workflow(app_model=app)
if not workflow:
raise ValueError("Workflow not found for the given app model.")
last_run = workflow_service.get_node_last_run(app_model=app, workflow=workflow, node_id=node_id)
last_run = workflow_service.get_node_last_run(
app_model=app, workflow=workflow, node_id=node_id)
try:
node_type = cast(WorkflowNodeExecutionModel, last_run).node_type
except Exception:
@ -470,7 +491,8 @@ class LLMGenerator:
)
def agent_log_of(node_execution: WorkflowNodeExecutionModel) -> Sequence:
raw_agent_log = node_execution.execution_metadata_dict.get(WorkflowNodeExecutionMetadataKey.AGENT_LOG, [])
raw_agent_log = node_execution.execution_metadata_dict.get(
WorkflowNodeExecutionMetadataKey.AGENT_LOG, [])
if not raw_agent_log:
return []
@ -518,11 +540,14 @@ class LLMGenerator:
ERROR_MESSAGE = "{{#error_message#}}"
injected_instruction = instruction
if LAST_RUN in injected_instruction:
injected_instruction = injected_instruction.replace(LAST_RUN, json.dumps(last_run))
injected_instruction = injected_instruction.replace(
LAST_RUN, json.dumps(last_run))
if CURRENT in injected_instruction:
injected_instruction = injected_instruction.replace(CURRENT, current or "null")
injected_instruction = injected_instruction.replace(
CURRENT, current or "null")
if ERROR_MESSAGE in injected_instruction:
injected_instruction = injected_instruction.replace(ERROR_MESSAGE, error_message or "null")
injected_instruction = injected_instruction.replace(
ERROR_MESSAGE, error_message or "null")
model_instance = ModelManager().get_model_instance(
tenant_id=tenant_id,
model_type=ModelType.LLM,
@ -560,11 +585,13 @@ class LLMGenerator:
first_brace = generated_raw.find("{")
last_brace = generated_raw.rfind("}")
if first_brace == -1 or last_brace == -1 or last_brace < first_brace:
raise ValueError(f"Could not find a valid JSON object in response: {generated_raw}")
json_str = generated_raw[first_brace : last_brace + 1]
raise ValueError(
f"Could not find a valid JSON object in response: {generated_raw}")
json_str = generated_raw[first_brace: last_brace + 1]
data = json_repair.loads(json_str)
if not isinstance(data, dict):
raise TypeError(f"Expected a JSON object, but got {type(data).__name__}")
raise TypeError(
f"Expected a JSON object, but got {type(data).__name__}")
return data
except InvokeError as e:
error = str(e)

View File

@ -434,3 +434,20 @@ INSTRUCTION_GENERATE_TEMPLATE_PROMPT = """The output of this prompt is not as ex
You should edit the prompt according to the IDEAL OUTPUT."""
INSTRUCTION_GENERATE_TEMPLATE_CODE = """Please fix the errors in the {{#error_message#}}."""
DEFAULT_GENERATOR_SUMMARY_PROMPT = (
"""Summarize the following content. Extract only the key information and main points. """
"""Remove redundant details.
Requirements:
1. Write a concise summary in plain text
2. Use the same language as the input content
3. Focus on important facts, concepts, and details
4. If images are included, describe their key information
5. Do not use words like "好的", "ok", "I understand", "This text discusses", "The content mentions"
6. Write directly without extra words
Output only the summary text. Start summarizing now:
"""
)

View File

@ -4,7 +4,6 @@ from typing import Any
from core.callback_handler.workflow_tool_callback_handler import DifyWorkflowCallbackHandler
from core.plugin.backwards_invocation.base import BaseBackwardsInvocation
from core.tools.entities.tool_entities import ToolInvokeMessage, ToolProviderType
from core.tools.signature import sign_tool_file
from core.tools.tool_engine import ToolEngine
from core.tools.tool_manager import ToolManager
from core.tools.utils.message_transformer import ToolFileMessageTransformer
@ -42,43 +41,6 @@ class PluginToolBackwardsInvocation(BaseBackwardsInvocation):
response, user_id=user_id, tenant_id=tenant_id
)
return cls._sign_tool_file_urls(response)
return response
except Exception as e:
raise e
# FIXME: this method should be gracefully deprecated
@classmethod
def _sign_tool_file_urls(
cls, messages: Generator[ToolInvokeMessage, None, None]
) -> Generator[ToolInvokeMessage, None, None]:
"""
Sign file URLs in tool invoke messages for external access.
"""
for message in messages:
if message.type in {
ToolInvokeMessage.MessageType.IMAGE_LINK,
ToolInvokeMessage.MessageType.BINARY_LINK,
ToolInvokeMessage.MessageType.FILE,
}:
if isinstance(message.message, ToolInvokeMessage.TextMessage):
url = message.message.text
# Check if it's an unsigned internal path
if url.startswith("/files/tools/"):
parts = url.split("/")[-1]
if "." in parts:
file_id, ext = parts.rsplit(".", 1)
extension = f".{ext}"
else:
file_id = parts
extension = ".bin"
signed_url = sign_tool_file(tool_file_id=file_id, extension=extension)
yield ToolInvokeMessage(
type=message.type,
message=ToolInvokeMessage.TextMessage(text=signed_url),
meta=message.meta,
)
continue
yield message

View File

@ -282,11 +282,3 @@ class TriggerDispatchResponse(BaseModel):
return deserialize_response(binascii.unhexlify(v.encode()))
except Exception as e:
raise ValueError("Failed to deserialize response from hex string") from e
class RequestListTools(BaseModel):
"""
Request to list all available tools
"""
pass

View File

@ -320,17 +320,18 @@ class BasePluginClient:
case PluginInvokeError.__name__:
error_object = json.loads(message)
invoke_error_type = error_object.get("error_type")
args = error_object.get("args")
match invoke_error_type:
case InvokeRateLimitError.__name__:
raise InvokeRateLimitError(description=error_object.get("message"))
raise InvokeRateLimitError(description=args.get("description"))
case InvokeAuthorizationError.__name__:
raise InvokeAuthorizationError(description=error_object.get("message"))
raise InvokeAuthorizationError(description=args.get("description"))
case InvokeBadRequestError.__name__:
raise InvokeBadRequestError(description=error_object.get("message"))
raise InvokeBadRequestError(description=args.get("description"))
case InvokeConnectionError.__name__:
raise InvokeConnectionError(description=error_object.get("message"))
raise InvokeConnectionError(description=args.get("description"))
case InvokeServerUnavailableError.__name__:
raise InvokeServerUnavailableError(description=error_object.get("message"))
raise InvokeServerUnavailableError(description=args.get("description"))
case CredentialsValidateFailedError.__name__:
raise CredentialsValidateFailedError(error_object.get("message"))
case EndpointSetupFailedError.__name__:
@ -338,11 +339,11 @@ class BasePluginClient:
case TriggerProviderCredentialValidationError.__name__:
raise TriggerProviderCredentialValidationError(error_object.get("message"))
case TriggerPluginInvokeError.__name__:
raise TriggerPluginInvokeError(description=error_object.get("message"))
raise TriggerPluginInvokeError(description=error_object.get("description"))
case TriggerInvokeError.__name__:
raise TriggerInvokeError(error_object.get("message"))
case EventIgnoreError.__name__:
raise EventIgnoreError(description=error_object.get("message"))
raise EventIgnoreError(description=error_object.get("description"))
case _:
raise PluginInvokeError(description=message)
case PluginDaemonInternalServerError.__name__:

View File

@ -389,15 +389,14 @@ class RetrievalService:
.all()
}
records = []
include_segment_ids = set()
segment_child_map = {}
valid_dataset_documents = {}
image_doc_ids: list[Any] = []
child_index_node_ids = []
index_node_ids = []
doc_to_document_map = {}
summary_segment_ids = set() # Track segments retrieved via summary
# First pass: collect all document IDs and identify summary documents
for document in documents:
document_id = document.metadata.get("document_id")
if document_id not in dataset_documents:
@ -408,16 +407,24 @@ class RetrievalService:
continue
valid_dataset_documents[document_id] = dataset_document
doc_id = document.metadata.get("doc_id") or ""
doc_to_document_map[doc_id] = document
# Check if this is a summary document
is_summary = document.metadata.get("is_summary", False)
if is_summary:
# For summary documents, find the original chunk via original_chunk_id
original_chunk_id = document.metadata.get("original_chunk_id")
if original_chunk_id:
summary_segment_ids.add(original_chunk_id)
continue # Skip adding to other lists for summary documents
if dataset_document.doc_form == IndexStructureType.PARENT_CHILD_INDEX:
doc_id = document.metadata.get("doc_id") or ""
doc_to_document_map[doc_id] = document
if document.metadata.get("doc_type") == DocType.IMAGE:
image_doc_ids.append(doc_id)
else:
child_index_node_ids.append(doc_id)
else:
doc_id = document.metadata.get("doc_id") or ""
doc_to_document_map[doc_id] = document
if document.metadata.get("doc_type") == DocType.IMAGE:
image_doc_ids.append(doc_id)
else:
@ -433,6 +440,7 @@ class RetrievalService:
attachment_map: dict[str, list[dict[str, Any]]] = {}
child_chunk_map: dict[str, list[ChildChunk]] = {}
doc_segment_map: dict[str, list[str]] = {}
segment_summary_map: dict[str, str] = {} # Map segment_id to summary content
with session_factory.create_session() as session:
attachments = cls.get_segment_attachment_infos(image_doc_ids, session)
@ -447,6 +455,7 @@ class RetrievalService:
doc_segment_map[attachment["segment_id"]].append(attachment["attachment_id"])
else:
doc_segment_map[attachment["segment_id"]] = [attachment["attachment_id"]]
child_chunk_stmt = select(ChildChunk).where(ChildChunk.index_node_id.in_(child_index_node_ids))
child_index_nodes = session.execute(child_chunk_stmt).scalars().all()
@ -470,6 +479,7 @@ class RetrievalService:
index_node_segments = session.execute(document_segment_stmt).scalars().all() # type: ignore
for index_node_segment in index_node_segments:
doc_segment_map[index_node_segment.id] = [index_node_segment.index_node_id]
if segment_ids:
document_segment_stmt = select(DocumentSegment).where(
DocumentSegment.enabled == True,
@ -481,6 +491,42 @@ class RetrievalService:
if index_node_segments:
segments.extend(index_node_segments)
# Handle summary documents: query segments by original_chunk_id
if summary_segment_ids:
summary_segment_ids_list = list(summary_segment_ids)
summary_segment_stmt = select(DocumentSegment).where(
DocumentSegment.enabled == True,
DocumentSegment.status == "completed",
DocumentSegment.id.in_(summary_segment_ids_list),
)
summary_segments = session.execute(summary_segment_stmt).scalars().all() # type: ignore
segments.extend(summary_segments)
# Add summary segment IDs to segment_ids for summary query
for seg in summary_segments:
if seg.id not in segment_ids:
segment_ids.append(seg.id)
# Batch query summaries for segments retrieved via summary (only enabled summaries)
if summary_segment_ids:
from models.dataset import DocumentSegmentSummary
summaries = (
session.query(DocumentSegmentSummary)
.filter(
DocumentSegmentSummary.chunk_id.in_(list(summary_segment_ids)),
DocumentSegmentSummary.status == "completed",
DocumentSegmentSummary.enabled == True, # Only retrieve enabled summaries
)
.all()
)
for summary in summaries:
if summary.summary_content:
segment_summary_map[summary.chunk_id] = summary.summary_content
include_segment_ids = set()
segment_child_map: dict[str, dict[str, Any]] = {}
records: list[dict[str, Any]] = []
for segment in segments:
child_chunks: list[ChildChunk] = child_chunk_map.get(segment.id, [])
attachment_infos: list[dict[str, Any]] = attachment_map.get(segment.id, [])
@ -493,7 +539,7 @@ class RetrievalService:
child_chunk_details = []
max_score = 0.0
for child_chunk in child_chunks:
document = doc_to_document_map[child_chunk.index_node_id]
document = doc_to_document_map.get(child_chunk.index_node_id)
child_chunk_detail = {
"id": child_chunk.id,
"content": child_chunk.content,
@ -503,7 +549,7 @@ class RetrievalService:
child_chunk_details.append(child_chunk_detail)
max_score = max(max_score, document.metadata.get("score", 0.0) if document else 0.0)
for attachment_info in attachment_infos:
file_document = doc_to_document_map[attachment_info["id"]]
file_document = doc_to_document_map.get(attachment_info["id"])
max_score = max(
max_score, file_document.metadata.get("score", 0.0) if file_document else 0.0
)
@ -576,9 +622,16 @@ class RetrievalService:
else None
)
# Extract summary if this segment was retrieved via summary
summary_content = segment_summary_map.get(segment.id)
# Create RetrievalSegments object
retrieval_segment = RetrievalSegments(
segment=segment, child_chunks=child_chunks_list, score=score, files=files
segment=segment,
child_chunks=child_chunks_list,
score=score,
files=files,
summary=summary_content,
)
result.append(retrieval_segment)

View File

@ -20,3 +20,4 @@ class RetrievalSegments(BaseModel):
child_chunks: list[RetrievalChildChunk] | None = None
score: float | None = None
files: list[dict[str, str | int]] | None = None
summary: str | None = None # Summary content if retrieved via summary index

View File

@ -13,6 +13,7 @@ from urllib.parse import unquote, urlparse
import httpx
from configs import dify_config
from core.entities.knowledge_entities import PreviewDetail
from core.helper import ssrf_proxy
from core.rag.extractor.entity.extract_setting import ExtractSetting
from core.rag.index_processor.constant.doc_type import DocType
@ -45,6 +46,17 @@ class BaseIndexProcessor(ABC):
def transform(self, documents: list[Document], current_user: Account | None = None, **kwargs) -> list[Document]:
raise NotImplementedError
@abstractmethod
def generate_summary_preview(
self, tenant_id: str, preview_texts: list[PreviewDetail], summary_index_setting: dict
) -> list[PreviewDetail]:
"""
For each segment in preview_texts, generate a summary using LLM and attach it to the segment.
The summary can be stored in a new attribute, e.g., summary.
This method should be implemented by subclasses.
"""
raise NotImplementedError
@abstractmethod
def load(
self,

View File

@ -1,9 +1,25 @@
"""Paragraph index processor."""
import logging
import re
import uuid
from collections.abc import Mapping
from typing import Any
logger = logging.getLogger(__name__)
from core.entities.knowledge_entities import PreviewDetail
from core.file import File, FileTransferMethod, FileType, file_manager
from core.llm_generator.prompts import DEFAULT_GENERATOR_SUMMARY_PROMPT
from core.model_manager import ModelInstance
from core.model_runtime.entities.message_entities import (
ImagePromptMessageContent,
PromptMessageContentUnionTypes,
TextPromptMessageContent,
UserPromptMessage,
)
from core.model_runtime.entities.model_entities import ModelFeature, ModelType
from core.provider_manager import ProviderManager
from core.rag.cleaner.clean_processor import CleanProcessor
from core.rag.datasource.keyword.keyword_factory import Keyword
from core.rag.datasource.retrieval_service import RetrievalService
@ -17,12 +33,16 @@ from core.rag.index_processor.index_processor_base import BaseIndexProcessor
from core.rag.models.document import AttachmentDocument, Document, MultimodalGeneralStructureChunk
from core.rag.retrieval.retrieval_methods import RetrievalMethod
from core.tools.utils.text_processing_utils import remove_leading_symbols
from extensions.ext_database import db
from factories.file_factory import build_from_mapping
from libs import helper
from models import UploadFile
from models.account import Account
from models.dataset import Dataset, DatasetProcessRule
from models.dataset import Dataset, DatasetProcessRule, DocumentSegment, SegmentAttachmentBinding
from models.dataset import Document as DatasetDocument
from services.account_service import AccountService
from services.entities.knowledge_entities.knowledge_entities import Rule
from services.summary_index_service import SummaryIndexService
class ParagraphIndexProcessor(BaseIndexProcessor):
@ -108,6 +128,29 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
keyword.add_texts(documents)
def clean(self, dataset: Dataset, node_ids: list[str] | None, with_keywords: bool = True, **kwargs):
# Note: Summary indexes are now disabled (not deleted) when segments are disabled.
# This method is called for actual deletion scenarios (e.g., when segment is deleted).
# For disable operations, disable_summaries_for_segments is called directly in the task.
# Only delete summaries if explicitly requested (e.g., when segment is actually deleted)
delete_summaries = kwargs.get("delete_summaries", False)
if delete_summaries:
if node_ids:
# Find segments by index_node_id
segments = (
db.session.query(DocumentSegment)
.filter(
DocumentSegment.dataset_id == dataset.id,
DocumentSegment.index_node_id.in_(node_ids),
)
.all()
)
segment_ids = [segment.id for segment in segments]
if segment_ids:
SummaryIndexService.delete_summaries_for_segments(dataset, segment_ids)
else:
# Delete all summaries for the dataset
SummaryIndexService.delete_summaries_for_segments(dataset, None)
if dataset.indexing_technique == "high_quality":
vector = Vector(dataset)
if node_ids:
@ -227,3 +270,263 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
}
else:
raise ValueError("Chunks is not a list")
def generate_summary_preview(
self, tenant_id: str, preview_texts: list[PreviewDetail], summary_index_setting: dict
) -> list[PreviewDetail]:
"""
For each segment, concurrently call generate_summary to generate a summary
and write it to the summary attribute of PreviewDetail.
"""
import concurrent.futures
from flask import current_app
# Capture Flask app context for worker threads
flask_app = None
try:
flask_app = current_app._get_current_object() # type: ignore
except RuntimeError:
logger.warning("No Flask application context available, summary generation may fail")
def process(preview: PreviewDetail) -> None:
"""Generate summary for a single preview item."""
try:
if flask_app:
# Ensure Flask app context in worker thread
with flask_app.app_context():
summary = self.generate_summary(tenant_id, preview.content, summary_index_setting)
preview.summary = summary
else:
# Fallback: try without app context (may fail)
summary = self.generate_summary(tenant_id, preview.content, summary_index_setting)
preview.summary = summary
except Exception:
logger.exception("Failed to generate summary for preview")
# Don't fail the entire preview if summary generation fails
preview.summary = None
with concurrent.futures.ThreadPoolExecutor() as executor:
list(executor.map(process, preview_texts))
return preview_texts
@staticmethod
def generate_summary(
tenant_id: str,
text: str,
summary_index_setting: dict | None = None,
segment_id: str | None = None,
) -> str:
"""
Generate summary for the given text using ModelInstance.invoke_llm and the default or custom summary prompt,
and supports vision models by including images from the segment attachments or text content.
Args:
tenant_id: Tenant ID
text: Text content to summarize
summary_index_setting: Summary index configuration
segment_id: Optional segment ID to fetch attachments from SegmentAttachmentBinding table
"""
if not summary_index_setting or not summary_index_setting.get("enable"):
raise ValueError("summary_index_setting is required and must be enabled to generate summary.")
model_name = summary_index_setting.get("model_name")
model_provider_name = summary_index_setting.get("model_provider_name")
summary_prompt = summary_index_setting.get("summary_prompt")
# Import default summary prompt
if not summary_prompt:
summary_prompt = DEFAULT_GENERATOR_SUMMARY_PROMPT
provider_manager = ProviderManager()
provider_model_bundle = provider_manager.get_provider_model_bundle(
tenant_id, model_provider_name, ModelType.LLM
)
model_instance = ModelInstance(provider_model_bundle, model_name)
# Get model schema to check if vision is supported
model_schema = model_instance.model_type_instance.get_model_schema(model_name, model_instance.credentials)
supports_vision = model_schema and model_schema.features and ModelFeature.VISION in model_schema.features
# Extract images if model supports vision
image_files = []
if supports_vision:
# First, try to get images from SegmentAttachmentBinding (preferred method)
if segment_id:
image_files = ParagraphIndexProcessor._extract_images_from_segment_attachments(tenant_id, segment_id)
# If no images from attachments, fall back to extracting from text
if not image_files:
image_files = ParagraphIndexProcessor._extract_images_from_text(tenant_id, text)
# Build prompt messages
prompt_messages = []
if image_files:
# If we have images, create a UserPromptMessage with both text and images
prompt_message_contents: list[PromptMessageContentUnionTypes] = []
# Add images first
for file in image_files:
try:
file_content = file_manager.to_prompt_message_content(
file, image_detail_config=ImagePromptMessageContent.DETAIL.LOW
)
prompt_message_contents.append(file_content)
except Exception as e:
logger.warning("Failed to convert image file to prompt message content: %s", str(e))
continue
# Add text content
if prompt_message_contents: # Only add text if we successfully added images
prompt_message_contents.append(TextPromptMessageContent(data=f"{summary_prompt}\n{text}"))
prompt_messages.append(UserPromptMessage(content=prompt_message_contents))
else:
# If image conversion failed, fall back to text-only
prompt = f"{summary_prompt}\n{text}"
prompt_messages.append(UserPromptMessage(content=prompt))
else:
# No images, use simple text prompt
prompt = f"{summary_prompt}\n{text}"
prompt_messages.append(UserPromptMessage(content=prompt))
result = model_instance.invoke_llm(prompt_messages=prompt_messages, model_parameters={}, stream=False)
return getattr(result.message, "content", "")
@staticmethod
def _extract_images_from_text(tenant_id: str, text: str) -> list[File]:
"""
Extract images from markdown text and convert them to File objects.
Args:
tenant_id: Tenant ID
text: Text content that may contain markdown image links
Returns:
List of File objects representing images found in the text
"""
# Extract markdown images using regex pattern
pattern = r"!\[.*?\]\((.*?)\)"
images = re.findall(pattern, text)
if not images:
return []
upload_file_id_list = []
for image in images:
# For data before v0.10.0
pattern = r"/files/([a-f0-9\-]+)/image-preview(?:\?.*?)?"
match = re.search(pattern, image)
if match:
upload_file_id = match.group(1)
upload_file_id_list.append(upload_file_id)
continue
# For data after v0.10.0
pattern = r"/files/([a-f0-9\-]+)/file-preview(?:\?.*?)?"
match = re.search(pattern, image)
if match:
upload_file_id = match.group(1)
upload_file_id_list.append(upload_file_id)
continue
# For tools directory - direct file formats (e.g., .png, .jpg, etc.)
pattern = r"/files/tools/([a-f0-9\-]+)\.([a-zA-Z0-9]+)(?:\?[^\s\)\"\']*)?"
match = re.search(pattern, image)
if match:
# Tool files are handled differently, skip for now
continue
if not upload_file_id_list:
return []
# Get unique IDs for database query
unique_upload_file_ids = list(set(upload_file_id_list))
upload_files = (
db.session.query(UploadFile)
.where(UploadFile.id.in_(unique_upload_file_ids), UploadFile.tenant_id == tenant_id)
.all()
)
# Create File objects from UploadFile records
file_objects = []
for upload_file in upload_files:
# Only process image files
if not upload_file.mime_type or "image" not in upload_file.mime_type:
continue
mapping = {
"upload_file_id": upload_file.id,
"transfer_method": FileTransferMethod.LOCAL_FILE.value,
"type": FileType.IMAGE.value,
}
try:
file_obj = build_from_mapping(
mapping=mapping,
tenant_id=tenant_id,
)
file_objects.append(file_obj)
except Exception as e:
logger.warning("Failed to create File object from UploadFile %s: %s", upload_file.id, str(e))
continue
return file_objects
@staticmethod
def _extract_images_from_segment_attachments(tenant_id: str, segment_id: str) -> list[File]:
"""
Extract images from SegmentAttachmentBinding table (preferred method).
This matches how DatasetRetrieval gets segment attachments.
Args:
tenant_id: Tenant ID
segment_id: Segment ID to fetch attachments for
Returns:
List of File objects representing images found in segment attachments
"""
from sqlalchemy import select
# Query attachments from SegmentAttachmentBinding table
attachments_with_bindings = db.session.execute(
select(SegmentAttachmentBinding, UploadFile)
.join(UploadFile, UploadFile.id == SegmentAttachmentBinding.attachment_id)
.where(
SegmentAttachmentBinding.segment_id == segment_id,
SegmentAttachmentBinding.tenant_id == tenant_id,
)
).all()
if not attachments_with_bindings:
return []
file_objects = []
for _, upload_file in attachments_with_bindings:
# Only process image files
if not upload_file.mime_type or "image" not in upload_file.mime_type:
continue
try:
# Create File object directly (similar to DatasetRetrieval)
file_obj = File(
id=upload_file.id,
filename=upload_file.name,
extension="." + upload_file.extension,
mime_type=upload_file.mime_type,
tenant_id=tenant_id,
type=FileType.IMAGE,
transfer_method=FileTransferMethod.LOCAL_FILE,
remote_url=upload_file.source_url,
related_id=upload_file.id,
size=upload_file.size,
storage_key=upload_file.key,
)
file_objects.append(file_obj)
except Exception as e:
logger.warning("Failed to create File object from UploadFile %s: %s", upload_file.id, str(e))
continue
return file_objects

View File

@ -1,11 +1,13 @@
"""Paragraph index processor."""
import json
import logging
import uuid
from collections.abc import Mapping
from typing import Any
from configs import dify_config
from core.entities.knowledge_entities import PreviewDetail
from core.model_manager import ModelInstance
from core.rag.cleaner.clean_processor import CleanProcessor
from core.rag.datasource.retrieval_service import RetrievalService
@ -25,6 +27,9 @@ from models.dataset import ChildChunk, Dataset, DatasetProcessRule, DocumentSegm
from models.dataset import Document as DatasetDocument
from services.account_service import AccountService
from services.entities.knowledge_entities.knowledge_entities import ParentMode, Rule
from services.summary_index_service import SummaryIndexService
logger = logging.getLogger(__name__)
class ParentChildIndexProcessor(BaseIndexProcessor):
@ -135,6 +140,29 @@ class ParentChildIndexProcessor(BaseIndexProcessor):
def clean(self, dataset: Dataset, node_ids: list[str] | None, with_keywords: bool = True, **kwargs):
# node_ids is segment's node_ids
# Note: Summary indexes are now disabled (not deleted) when segments are disabled.
# This method is called for actual deletion scenarios (e.g., when segment is deleted).
# For disable operations, disable_summaries_for_segments is called directly in the task.
# Only delete summaries if explicitly requested (e.g., when segment is actually deleted)
delete_summaries = kwargs.get("delete_summaries", False)
if delete_summaries:
if node_ids:
# Find segments by index_node_id
segments = (
db.session.query(DocumentSegment)
.filter(
DocumentSegment.dataset_id == dataset.id,
DocumentSegment.index_node_id.in_(node_ids),
)
.all()
)
segment_ids = [segment.id for segment in segments]
if segment_ids:
SummaryIndexService.delete_summaries_for_segments(dataset, segment_ids)
else:
# Delete all summaries for the dataset
SummaryIndexService.delete_summaries_for_segments(dataset, None)
if dataset.indexing_technique == "high_quality":
delete_child_chunks = kwargs.get("delete_child_chunks") or False
precomputed_child_node_ids = kwargs.get("precomputed_child_node_ids")
@ -326,3 +354,57 @@ class ParentChildIndexProcessor(BaseIndexProcessor):
"preview": preview,
"total_segments": len(parent_childs.parent_child_chunks),
}
def generate_summary_preview(
self, tenant_id: str, preview_texts: list[PreviewDetail], summary_index_setting: dict
) -> list[PreviewDetail]:
"""
For each parent chunk in preview_texts, concurrently call generate_summary to generate a summary
and write it to the summary attribute of PreviewDetail.
Note: For parent-child structure, we only generate summaries for parent chunks.
"""
import concurrent.futures
from flask import current_app
# Capture Flask app context for worker threads
flask_app = None
try:
flask_app = current_app._get_current_object() # type: ignore
except RuntimeError:
logger.warning("No Flask application context available, summary generation may fail")
def process(preview: PreviewDetail) -> None:
"""Generate summary for a single preview item (parent chunk)."""
try:
if flask_app:
# Ensure Flask app context in worker thread
with flask_app.app_context():
# Use ParagraphIndexProcessor's generate_summary method
from core.rag.index_processor.processor.paragraph_index_processor import ParagraphIndexProcessor
summary = ParagraphIndexProcessor.generate_summary(
tenant_id=tenant_id,
text=preview.content,
summary_index_setting=summary_index_setting,
)
if summary:
preview.summary = summary
else:
# Fallback: try without app context (may fail)
from core.rag.index_processor.processor.paragraph_index_processor import ParagraphIndexProcessor
summary = ParagraphIndexProcessor.generate_summary(
tenant_id=tenant_id,
text=preview.content,
summary_index_setting=summary_index_setting,
)
if summary:
preview.summary = summary
except Exception:
logger.exception("Failed to generate summary for preview")
# Don't fail the entire preview if summary generation fails
preview.summary = None
with concurrent.futures.ThreadPoolExecutor() as executor:
list(executor.map(process, preview_texts))
return preview_texts

View File

@ -11,6 +11,7 @@ import pandas as pd
from flask import Flask, current_app
from werkzeug.datastructures import FileStorage
from core.entities.knowledge_entities import PreviewDetail
from core.llm_generator.llm_generator import LLMGenerator
from core.rag.cleaner.clean_processor import CleanProcessor
from core.rag.datasource.retrieval_service import RetrievalService
@ -25,9 +26,10 @@ from core.rag.retrieval.retrieval_methods import RetrievalMethod
from core.tools.utils.text_processing_utils import remove_leading_symbols
from libs import helper
from models.account import Account
from models.dataset import Dataset
from models.dataset import Dataset, DocumentSegment
from models.dataset import Document as DatasetDocument
from services.entities.knowledge_entities.knowledge_entities import Rule
from services.summary_index_service import SummaryIndexService
logger = logging.getLogger(__name__)
@ -144,6 +146,30 @@ class QAIndexProcessor(BaseIndexProcessor):
vector.create_multimodal(multimodal_documents)
def clean(self, dataset: Dataset, node_ids: list[str] | None, with_keywords: bool = True, **kwargs):
# Note: Summary indexes are now disabled (not deleted) when segments are disabled.
# This method is called for actual deletion scenarios (e.g., when segment is deleted).
# For disable operations, disable_summaries_for_segments is called directly in the task.
# Note: qa_model doesn't generate summaries, but we clean them for completeness
# Only delete summaries if explicitly requested (e.g., when segment is actually deleted)
delete_summaries = kwargs.get("delete_summaries", False)
if delete_summaries:
if node_ids:
# Find segments by index_node_id
segments = (
db.session.query(DocumentSegment)
.filter(
DocumentSegment.dataset_id == dataset.id,
DocumentSegment.index_node_id.in_(node_ids),
)
.all()
)
segment_ids = [segment.id for segment in segments]
if segment_ids:
SummaryIndexService.delete_summaries_for_segments(dataset, segment_ids)
else:
# Delete all summaries for the dataset
SummaryIndexService.delete_summaries_for_segments(dataset, None)
vector = Vector(dataset)
if node_ids:
vector.delete_by_ids(node_ids)
@ -212,6 +238,17 @@ class QAIndexProcessor(BaseIndexProcessor):
"total_segments": len(qa_chunks.qa_chunks),
}
def generate_summary_preview(
self, tenant_id: str, preview_texts: list[PreviewDetail], summary_index_setting: dict
) -> list[PreviewDetail]:
"""
QA model doesn't generate summaries, so this method returns preview_texts unchanged.
Note: QA model uses question-answer pairs, which don't require summary generation.
"""
# QA model doesn't generate summaries, return as-is
return preview_texts
def _format_qa_document(self, flask_app: Flask, tenant_id: str, document_node, all_qa_documents, document_language):
format_documents = []
if document_node.page_content is None or not document_node.page_content.strip():

View File

@ -29,7 +29,6 @@ from models import (
Account,
CreatorUserRole,
EndUser,
LLMGenerationDetail,
WorkflowNodeExecutionModel,
WorkflowNodeExecutionTriggeredFrom,
)
@ -458,113 +457,6 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository)
session.merge(db_model)
session.flush()
# Save LLMGenerationDetail for LLM nodes with successful execution
if (
domain_model.node_type == NodeType.LLM
and domain_model.status == WorkflowNodeExecutionStatus.SUCCEEDED
and domain_model.outputs is not None
):
self._save_llm_generation_detail(session, domain_model)
def _save_llm_generation_detail(self, session, execution: WorkflowNodeExecution) -> None:
"""
Save LLM generation detail for LLM nodes.
Extracts reasoning_content, tool_calls, and sequence from outputs and metadata.
"""
outputs = execution.outputs or {}
metadata = execution.metadata or {}
reasoning_list = self._extract_reasoning(outputs)
tool_calls_list = self._extract_tool_calls(metadata.get(WorkflowNodeExecutionMetadataKey.AGENT_LOG))
if not reasoning_list and not tool_calls_list:
return
sequence = self._build_generation_sequence(outputs.get("text", ""), reasoning_list, tool_calls_list)
self._upsert_generation_detail(session, execution, reasoning_list, tool_calls_list, sequence)
def _extract_reasoning(self, outputs: Mapping[str, Any]) -> list[str]:
"""Extract reasoning_content as a clean list of non-empty strings."""
reasoning_content = outputs.get("reasoning_content")
if isinstance(reasoning_content, str):
trimmed = reasoning_content.strip()
return [trimmed] if trimmed else []
if isinstance(reasoning_content, list):
return [item.strip() for item in reasoning_content if isinstance(item, str) and item.strip()]
return []
def _extract_tool_calls(self, agent_log: Any) -> list[dict[str, str]]:
"""Extract tool call records from agent logs."""
if not agent_log or not isinstance(agent_log, list):
return []
tool_calls: list[dict[str, str]] = []
for log in agent_log:
log_data = log.data if hasattr(log, "data") else (log.get("data", {}) if isinstance(log, dict) else {})
tool_name = log_data.get("tool_name")
if tool_name and str(tool_name).strip():
tool_calls.append(
{
"id": log_data.get("tool_call_id", ""),
"name": tool_name,
"arguments": json.dumps(log_data.get("tool_args", {})),
"result": str(log_data.get("output", "")),
}
)
return tool_calls
def _build_generation_sequence(
self, text: str, reasoning_list: list[str], tool_calls_list: list[dict[str, str]]
) -> list[dict[str, Any]]:
"""Build a simple content/reasoning/tool_call sequence."""
sequence: list[dict[str, Any]] = []
if text:
sequence.append({"type": "content", "start": 0, "end": len(text)})
for index in range(len(reasoning_list)):
sequence.append({"type": "reasoning", "index": index})
for index in range(len(tool_calls_list)):
sequence.append({"type": "tool_call", "index": index})
return sequence
def _upsert_generation_detail(
self,
session,
execution: WorkflowNodeExecution,
reasoning_list: list[str],
tool_calls_list: list[dict[str, str]],
sequence: list[dict[str, Any]],
) -> None:
"""Insert or update LLMGenerationDetail with serialized fields."""
existing = (
session.query(LLMGenerationDetail)
.filter_by(
workflow_run_id=execution.workflow_execution_id,
node_id=execution.node_id,
)
.first()
)
reasoning_json = json.dumps(reasoning_list) if reasoning_list else None
tool_calls_json = json.dumps(tool_calls_list) if tool_calls_list else None
sequence_json = json.dumps(sequence) if sequence else None
if existing:
existing.reasoning_content = reasoning_json
existing.tool_calls = tool_calls_json
existing.sequence = sequence_json
return
generation_detail = LLMGenerationDetail(
tenant_id=self._tenant_id,
app_id=self._app_id,
workflow_run_id=execution.workflow_execution_id,
node_id=execution.node_id,
reasoning_content=reasoning_json,
tool_calls=tool_calls_json,
sequence=sequence_json,
)
session.add(generation_detail)
def get_db_models_by_workflow_run(
self,
workflow_run_id: str,

View File

@ -1,53 +0,0 @@
from .bash.dify_cli import (
DifyCliBinary,
DifyCliConfig,
DifyCliEnvConfig,
DifyCliLocator,
DifyCliToolConfig,
)
from .constants import (
APP_ASSETS_PATH,
APP_ASSETS_ZIP_PATH,
DIFY_CLI_CONFIG_FILENAME,
DIFY_CLI_GLOBAL_TOOLS_PATH,
DIFY_CLI_PATH,
DIFY_CLI_PATH_PATTERN,
DIFY_CLI_ROOT,
DIFY_CLI_TOOLS_ROOT,
)
from .initializer import AppAssetsInitializer, DifyCliInitializer, SandboxInitializer
from .manager import SandboxManager
from .session import SandboxSession
from .storage import ArchiveSandboxStorage, SandboxStorage
from .utils.debug import sandbox_debug
from .utils.encryption import create_sandbox_config_encrypter, masked_config
from .vm import SandboxBuilder, SandboxType, VMConfig
__all__ = [
"APP_ASSETS_PATH",
"APP_ASSETS_ZIP_PATH",
"DIFY_CLI_CONFIG_FILENAME",
"DIFY_CLI_GLOBAL_TOOLS_PATH",
"DIFY_CLI_PATH",
"DIFY_CLI_PATH_PATTERN",
"DIFY_CLI_ROOT",
"DIFY_CLI_TOOLS_ROOT",
"AppAssetsInitializer",
"ArchiveSandboxStorage",
"DifyCliBinary",
"DifyCliConfig",
"DifyCliEnvConfig",
"DifyCliInitializer",
"DifyCliLocator",
"DifyCliToolConfig",
"SandboxBuilder",
"SandboxInitializer",
"SandboxManager",
"SandboxSession",
"SandboxStorage",
"SandboxType",
"VMConfig",
"create_sandbox_config_encrypter",
"masked_config",
"sandbox_debug",
]

View File

@ -1,15 +0,0 @@
from .dify_cli import (
DifyCliBinary,
DifyCliConfig,
DifyCliEnvConfig,
DifyCliLocator,
DifyCliToolConfig,
)
__all__ = [
"DifyCliBinary",
"DifyCliConfig",
"DifyCliEnvConfig",
"DifyCliLocator",
"DifyCliToolConfig",
]

View File

@ -1,98 +0,0 @@
from collections.abc import Generator
from typing import Any
from core.tools.__base.tool import Tool
from core.tools.__base.tool_runtime import ToolRuntime
from core.tools.entities.common_entities import I18nObject
from core.tools.entities.tool_entities import (
ToolDescription,
ToolEntity,
ToolIdentity,
ToolInvokeMessage,
ToolParameter,
ToolProviderType,
)
from core.virtual_environment.__base.helpers import submit_command, with_connection
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
from ..utils.debug import sandbox_debug
COMMAND_TIMEOUT_SECONDS = 60
class SandboxBashTool(Tool):
def __init__(self, sandbox: VirtualEnvironment, tenant_id: str, tools_path: str) -> None:
self._sandbox = sandbox
self._tools_path = tools_path
entity = ToolEntity(
identity=ToolIdentity(
author="Dify",
name="bash",
label=I18nObject(en_US="Bash", zh_Hans="Bash"),
provider="sandbox",
),
parameters=[
ToolParameter.get_simple_instance(
name="command",
llm_description="The bash command to execute in current working directory",
typ=ToolParameter.ToolParameterType.STRING,
required=True,
),
],
description=ToolDescription(
human=I18nObject(
en_US="Execute bash commands in current working directory",
),
llm="Execute bash commands in current working directory. "
"Use this tool to run shell commands, scripts, or interact with the system. "
"The command will be executed in the current working directory.",
),
)
runtime = ToolRuntime(tenant_id=tenant_id)
super().__init__(entity=entity, runtime=runtime)
def tool_provider_type(self) -> ToolProviderType:
return ToolProviderType.BUILT_IN
def _invoke(
self,
user_id: str,
tool_parameters: dict[str, Any],
conversation_id: str | None = None,
app_id: str | None = None,
message_id: str | None = None,
) -> Generator[ToolInvokeMessage, None, None]:
command = tool_parameters.get("command", "")
if not command:
yield self.create_text_message("Error: No command provided")
return
try:
with with_connection(self._sandbox) as conn:
cmd_list = ["bash", "-c", command]
env_vars = {"PATH": f"{self._tools_path}:/usr/local/bin:/usr/bin:/bin"}
sandbox_debug("bash_tool", "cmd_list", cmd_list)
future = submit_command(self._sandbox, conn, cmd_list, environments=env_vars)
timeout = COMMAND_TIMEOUT_SECONDS if COMMAND_TIMEOUT_SECONDS > 0 else None
result = future.result(timeout=timeout)
stdout = result.stdout.decode("utf-8", errors="replace") if result.stdout else ""
stderr = result.stderr.decode("utf-8", errors="replace") if result.stderr else ""
exit_code = result.exit_code
output_parts: list[str] = []
if stdout:
output_parts.append(f"\n{stdout}")
if stderr:
output_parts.append(f"\n{stderr}")
output_parts.append(f"\nCommand exited with code {exit_code}")
yield self.create_text_message("\n".join(output_parts))
except TimeoutError:
yield self.create_text_message(f"Error: Command timed out after {COMMAND_TIMEOUT_SECONDS}s")
except Exception as e:
yield self.create_text_message(f"Error: {e!s}")

View File

@ -1,134 +0,0 @@
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Any
from pydantic import BaseModel, Field
from core.model_runtime.utils.encoders import jsonable_encoder
from core.session.cli_api import CliApiSession
from core.tools.entities.tool_entities import ToolParameter, ToolProviderType
from core.virtual_environment.__base.entities import Arch, OperatingSystem
from ..constants import DIFY_CLI_PATH_PATTERN
if TYPE_CHECKING:
from core.tools.__base.tool import Tool
class DifyCliBinary(BaseModel):
operating_system: OperatingSystem = Field(alias="os")
arch: Arch
path: Path
model_config = {
"populate_by_name": True,
"arbitrary_types_allowed": True,
}
class DifyCliLocator:
def __init__(self, root: str | Path | None = None) -> None:
from configs import dify_config
if root is not None:
self._root = Path(root)
elif dify_config.SANDBOX_DIFY_CLI_ROOT:
self._root = Path(dify_config.SANDBOX_DIFY_CLI_ROOT)
else:
api_root = Path(__file__).resolve().parents[3]
self._root = api_root / "bin"
def resolve(self, operating_system: OperatingSystem, arch: Arch) -> DifyCliBinary:
filename = DIFY_CLI_PATH_PATTERN.format(os=operating_system.value, arch=arch.value)
candidate = self._root / filename
if not candidate.is_file():
raise FileNotFoundError(
f"dify CLI binary not found: {candidate}. Configure SANDBOX_DIFY_CLI_ROOT or ensure the file exists."
)
return DifyCliBinary(os=operating_system, arch=arch, path=candidate)
class DifyCliEnvConfig(BaseModel):
files_url: str
cli_api_url: str
cli_api_session_id: str
cli_api_secret: str
class DifyCliToolConfig(BaseModel):
provider_type: str
identity: dict[str, Any]
description: dict[str, Any]
parameters: list[dict[str, Any]]
@classmethod
def transform_provider_type(cls, tool_provider_type: ToolProviderType) -> str:
provider_type = tool_provider_type
match tool_provider_type:
case ToolProviderType.BUILT_IN | ToolProviderType.PLUGIN:
provider_type = "builtin"
case ToolProviderType.MCP | ToolProviderType.WORKFLOW | ToolProviderType.API:
provider_type = provider_type
case _:
raise ValueError(f"Invalid tool provider type: {tool_provider_type}")
return provider_type
@classmethod
def create_from_tool(cls, tool: Tool) -> DifyCliToolConfig:
return cls(
provider_type=cls.transform_provider_type(tool.tool_provider_type()),
identity=to_json(tool.entity.identity),
description=to_json(tool.entity.description),
parameters=[cls.transform_parameter(parameter) for parameter in tool.entity.parameters],
)
@classmethod
def transform_parameter(cls, parameter: ToolParameter) -> dict[str, Any]:
transformed_parameter = to_json(parameter)
transformed_parameter.pop("input_schema", None)
transformed_parameter.pop("form", None)
match parameter.type:
case (
ToolParameter.ToolParameterType.SYSTEM_FILES
| ToolParameter.ToolParameterType.FILE
| ToolParameter.ToolParameterType.FILES
):
return transformed_parameter
case _:
return transformed_parameter
class DifyCliConfig(BaseModel):
env: DifyCliEnvConfig
tools: list[DifyCliToolConfig]
@classmethod
def create(cls, session: CliApiSession, tools: list[Tool]) -> DifyCliConfig:
from configs import dify_config
cli_api_url = dify_config.CLI_API_URL
return cls(
env=DifyCliEnvConfig(
files_url=dify_config.FILES_URL,
cli_api_url=cli_api_url,
cli_api_session_id=session.id,
cli_api_secret=session.secret,
),
tools=[DifyCliToolConfig.create_from_tool(tool) for tool in tools],
)
def to_json(obj: Any) -> dict[str, Any]:
return jsonable_encoder(obj, exclude_unset=True, exclude_defaults=True, exclude_none=True)
__all__ = [
"DifyCliBinary",
"DifyCliConfig",
"DifyCliEnvConfig",
"DifyCliLocator",
"DifyCliToolConfig",
]

View File

@ -1,16 +0,0 @@
from typing import Final
# Dify CLI (absolute path - hidden in /tmp, not in sandbox workdir)
DIFY_CLI_ROOT: Final[str] = "/tmp/.dify"
DIFY_CLI_PATH: Final[str] = "/tmp/.dify/bin/dify"
DIFY_CLI_PATH_PATTERN: Final[str] = "dify-cli-{os}-{arch}"
DIFY_CLI_CONFIG_FILENAME: Final[str] = ".dify_cli.json"
DIFY_CLI_TOOLS_ROOT: Final[str] = "/tmp/.dify/tools"
DIFY_CLI_GLOBAL_TOOLS_PATH: Final[str] = "/tmp/.dify/tools/global"
# App Assets (relative path - stays in sandbox workdir)
APP_ASSETS_PATH: Final[str] = "assets"
APP_ASSETS_ZIP_PATH: Final[str] = "/tmp/assets.zip"

View File

@ -1,3 +0,0 @@
from .providers import SandboxProviderApiEntity
__all__ = ["SandboxProviderApiEntity"]

View File

@ -1,21 +0,0 @@
from collections.abc import Mapping
from typing import Any
from pydantic import BaseModel, Field
class SandboxProviderApiEntity(BaseModel):
provider_type: str = Field(..., description="Provider type identifier")
is_system_configured: bool = Field(default=False)
is_tenant_configured: bool = Field(default=False)
is_active: bool = Field(default=False)
config: Mapping[str, Any] = Field(default_factory=dict)
config_schema: list[dict[str, Any]] = Field(default_factory=list)
class SandboxProviderEntity(BaseModel):
id: str = Field(..., description="Provider identifier")
provider_type: str = Field(..., description="Provider type identifier")
is_active: bool = Field(default=False)
config: Mapping[str, Any] = Field(default_factory=dict)
config_schema: list[dict[str, Any]] = Field(default_factory=list)

View File

@ -1,9 +0,0 @@
from .app_assets_initializer import AppAssetsInitializer
from .base import SandboxInitializer
from .dify_cli_initializer import DifyCliInitializer
__all__ = [
"AppAssetsInitializer",
"DifyCliInitializer",
"SandboxInitializer",
]

View File

@ -1,55 +0,0 @@
import logging
from io import BytesIO
from core.app_assets.paths import AssetPaths
from core.virtual_environment.__base.helpers import execute, with_connection
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
from extensions.ext_storage import storage
from ..constants import APP_ASSETS_PATH, APP_ASSETS_ZIP_PATH
from .base import SandboxInitializer
logger = logging.getLogger(__name__)
class AppAssetsInitializer(SandboxInitializer):
def __init__(self, tenant_id: str, app_id: str, assets_id: str) -> None:
self._tenant_id = tenant_id
self._app_id = app_id
self._assets_id = assets_id
def initialize(self, env: VirtualEnvironment) -> None:
zip_key = AssetPaths.build_zip(self._tenant_id, self._app_id, self._assets_id)
try:
zip_data = storage.load_once(zip_key)
except Exception:
logger.warning(
"Failed to load assets zip for app_id=%s, key=%s",
self._app_id,
zip_key,
exc_info=True,
)
return
env.upload_file(APP_ASSETS_ZIP_PATH, BytesIO(zip_data))
with with_connection(env) as conn:
execute(
env,
["unzip", "-o", APP_ASSETS_ZIP_PATH, "-d", APP_ASSETS_PATH],
connection=conn,
timeout=60,
error_message="Failed to unzip assets",
)
execute(
env,
["rm", "-f", APP_ASSETS_ZIP_PATH],
connection=conn,
error_message="Failed to cleanup temp zip file",
)
logger.info(
"App assets initialized for app_id=%s, published_id=%s",
self._app_id,
self._assets_id,
)

Some files were not shown because too many files have changed in this diff Show More