mirror of
https://github.com/langgenius/dify.git
synced 2026-01-28 15:56:00 +08:00
Compare commits
1 Commits
copilot/re
...
chore/remo
| Author | SHA1 | Date | |
|---|---|---|---|
| e8668782d6 |
5
.github/workflows/autofix.yml
vendored
5
.github/workflows/autofix.yml
vendored
@ -28,11 +28,6 @@ jobs:
|
||||
# Format code
|
||||
uv run ruff format ..
|
||||
|
||||
- name: count migration progress
|
||||
run: |
|
||||
cd api
|
||||
./cnt_base.sh
|
||||
|
||||
- name: ast-grep
|
||||
run: |
|
||||
uvx --from ast-grep-cli sg --pattern 'db.session.query($WHATEVER).filter($HERE)' --rewrite 'db.session.query($WHATEVER).where($HERE)' -l py --update-all
|
||||
|
||||
5
.github/workflows/vdb-tests.yml
vendored
5
.github/workflows/vdb-tests.yml
vendored
@ -1,10 +1,7 @@
|
||||
name: Run VDB Tests
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main]
|
||||
paths:
|
||||
- 'api/core/rag/*.py'
|
||||
workflow_call:
|
||||
|
||||
concurrency:
|
||||
group: vdb-tests-${{ github.head_ref || github.run_id }}
|
||||
|
||||
30
agent_runs/invisible-worm-177/penetration_test_report.md
Normal file
30
agent_runs/invisible-worm-177/penetration_test_report.md
Normal file
@ -0,0 +1,30 @@
|
||||
# Security Penetration Test Report
|
||||
|
||||
**Generated:** 2025-11-16 14:02:56 UTC
|
||||
|
||||
Executive Summary:
|
||||
Conducted a thorough white-box security assessment of the API located in /workspace/api, focusing on authentication, authorization, business logic vulnerabilities, and IDOR in key endpoints such as /installed-apps.
|
||||
|
||||
Methodology:
|
||||
- Full recursive file listing and static code analysis to identify HTTP routes and sensitive endpoint implementations.
|
||||
- Focused static analysis on endpoints handling sensitive actions, authentication, and role-based authorization.
|
||||
- Created specialized agents for authentication and business logic vulnerability testing.
|
||||
- Dynamic testing attempted for IDOR and authorization bypass, limited by local API server unavailability.
|
||||
- All findings documented with recommended next steps.
|
||||
|
||||
Findings:
|
||||
- Discovered multiple /installed-apps endpoints with solid authentication and multi-layered authorization checks enforcing tenant and role ownership.
|
||||
- No exploitable access control bypass or privilege escalation vulnerabilities confirmed.
|
||||
- Dynamic vulnerability testing for IDOR hampered due to connection refusals, preventing full validation.
|
||||
- Created a high-priority note recommending environment verification and retesting of dynamic attacks once the API server is accessible.
|
||||
|
||||
Recommendations:
|
||||
- Verify and restore access to the local API server to enable full dynamic testing.
|
||||
- Retry dynamic testing for IDOR and authorization bypass attacks to confirm security.
|
||||
- Continue layered security reviews focusing on evolving business logic and role enforcement.
|
||||
- Consider adding automated integration tests validating authorization policies.
|
||||
|
||||
Conclusion:
|
||||
The static analysis phase confirmed robust authentication and authorization controls in key sensitive endpoints; however, dynamic testing limitations prevent final validation. Once dynamic testing is possible, verify no IDOR or broken function-level authorization issues remain. This assessment provides a strong foundation for secure API usage and further iterative validation.
|
||||
|
||||
Severity: Medium (due to testing environment constraints limiting dynamic verification)
|
||||
@ -159,8 +159,9 @@ SUPABASE_URL=your-server-url
|
||||
# CORS configuration
|
||||
WEB_API_CORS_ALLOW_ORIGINS=http://localhost:3000,*
|
||||
CONSOLE_CORS_ALLOW_ORIGINS=http://localhost:3000,*
|
||||
# When the frontend and backend run on different subdomains, set COOKIE_DOMAIN to the site’s top-level domain (e.g., `example.com`). Leading dots are optional.
|
||||
COOKIE_DOMAIN=
|
||||
# Set COOKIE_DOMAIN when the console frontend and API are on different subdomains.
|
||||
# Provide the registrable domain (e.g. example.com); leading dots are optional.
|
||||
COOKIE_DOMAIN=localhost:5001
|
||||
|
||||
# Vector database configuration
|
||||
# Supported values are `weaviate`, `qdrant`, `milvus`, `myscale`, `relyt`, `pgvector`, `pgvecto-rs`, `chroma`, `opensearch`, `oracle`, `tencent`, `elasticsearch`, `elasticsearch-ja`, `analyticdb`, `couchbase`, `vikingdb`, `oceanbase`, `opengauss`, `tablestore`,`vastbase`,`tidb`,`tidb_on_qdrant`,`baidu`,`lindorm`,`huawei_cloud`,`upstash`, `matrixone`.
|
||||
|
||||
@ -26,10 +26,6 @@
|
||||
cp .env.example .env
|
||||
```
|
||||
|
||||
> [!IMPORTANT]
|
||||
>
|
||||
> When the frontend and backend run on different subdomains, set COOKIE_DOMAIN to the site’s top-level domain (e.g., `example.com`). The frontend and backend must be under the same top-level domain in order to share authentication cookies.
|
||||
|
||||
1. Generate a `SECRET_KEY` in the `.env` file.
|
||||
|
||||
bash for Linux
|
||||
|
||||
11
api/bin/env
Normal file
11
api/bin/env
Normal file
@ -0,0 +1,11 @@
|
||||
#!/bin/sh
|
||||
# add binaries to PATH if they aren't added yet
|
||||
# affix colons on either side of $PATH to simplify matching
|
||||
case ":${PATH}:" in
|
||||
*:"$HOME/.local/bin":*)
|
||||
;;
|
||||
*)
|
||||
# Prepending path in case a system-installed binary needs to be overridden
|
||||
export PATH="$HOME/.local/bin:$PATH"
|
||||
;;
|
||||
esac
|
||||
4
api/bin/env.fish
Normal file
4
api/bin/env.fish
Normal file
@ -0,0 +1,4 @@
|
||||
if not contains "$HOME/.local/bin" $PATH
|
||||
# Prepending path in case a system-installed binary needs to be overridden
|
||||
set -x PATH "$HOME/.local/bin" $PATH
|
||||
end
|
||||
BIN
api/bin/uv
Executable file
BIN
api/bin/uv
Executable file
Binary file not shown.
BIN
api/bin/uvx
Executable file
BIN
api/bin/uvx
Executable file
Binary file not shown.
@ -1,7 +0,0 @@
|
||||
#!/bin/bash
|
||||
set -euxo pipefail
|
||||
|
||||
for pattern in "Base" "TypeBase"; do
|
||||
printf "%s " "$pattern"
|
||||
grep "($pattern):" -r --include='*.py' --exclude-dir=".venv" --exclude-dir="tests" . | wc -l
|
||||
done
|
||||
@ -216,7 +216,6 @@ def setup_required(view: Callable[P, R]):
|
||||
raise NotInitValidateError()
|
||||
elif dify_config.EDITION == "SELF_HOSTED" and not db.session.query(DifySetup).first():
|
||||
raise NotSetupError()
|
||||
|
||||
return view(*args, **kwargs)
|
||||
|
||||
return decorated
|
||||
|
||||
@ -192,6 +192,7 @@ class GraphEngine:
|
||||
self._dispatcher = Dispatcher(
|
||||
event_queue=self._event_queue,
|
||||
event_handler=self._event_handler_registry,
|
||||
event_collector=self._event_manager,
|
||||
execution_coordinator=self._execution_coordinator,
|
||||
event_emitter=self._event_manager,
|
||||
)
|
||||
|
||||
@ -43,6 +43,7 @@ class Dispatcher:
|
||||
self,
|
||||
event_queue: queue.Queue[GraphNodeEventBase],
|
||||
event_handler: "EventHandler",
|
||||
event_collector: EventManager,
|
||||
execution_coordinator: ExecutionCoordinator,
|
||||
event_emitter: EventManager | None = None,
|
||||
) -> None:
|
||||
@ -52,11 +53,13 @@ class Dispatcher:
|
||||
Args:
|
||||
event_queue: Queue of events from workers
|
||||
event_handler: Event handler registry for processing events
|
||||
event_collector: Event manager for collecting unhandled events
|
||||
execution_coordinator: Coordinator for execution flow
|
||||
event_emitter: Optional event manager to signal completion
|
||||
"""
|
||||
self._event_queue = event_queue
|
||||
self._event_handler = event_handler
|
||||
self._event_collector = event_collector
|
||||
self._execution_coordinator = execution_coordinator
|
||||
self._event_emitter = event_emitter
|
||||
|
||||
@ -83,31 +86,37 @@ class Dispatcher:
|
||||
def _dispatcher_loop(self) -> None:
|
||||
"""Main dispatcher loop."""
|
||||
try:
|
||||
self._process_commands()
|
||||
while not self._stop_event.is_set():
|
||||
if (
|
||||
self._execution_coordinator.aborted
|
||||
or self._execution_coordinator.paused
|
||||
or self._execution_coordinator.execution_complete
|
||||
):
|
||||
break
|
||||
commands_checked = False
|
||||
should_check_commands = False
|
||||
should_break = False
|
||||
|
||||
self._execution_coordinator.check_scaling()
|
||||
try:
|
||||
event = self._event_queue.get(timeout=0.1)
|
||||
self._event_handler.dispatch(event)
|
||||
self._event_queue.task_done()
|
||||
self._process_commands(event)
|
||||
except queue.Empty:
|
||||
time.sleep(0.1)
|
||||
if self._execution_coordinator.is_execution_complete():
|
||||
should_check_commands = True
|
||||
should_break = True
|
||||
else:
|
||||
# Check for scaling
|
||||
self._execution_coordinator.check_scaling()
|
||||
|
||||
self._process_commands()
|
||||
while True:
|
||||
try:
|
||||
event = self._event_queue.get(block=False)
|
||||
self._event_handler.dispatch(event)
|
||||
self._event_queue.task_done()
|
||||
except queue.Empty:
|
||||
# Process events
|
||||
try:
|
||||
event = self._event_queue.get(timeout=0.1)
|
||||
# Route to the event handler
|
||||
self._event_handler.dispatch(event)
|
||||
should_check_commands = self._should_check_commands(event)
|
||||
self._event_queue.task_done()
|
||||
except queue.Empty:
|
||||
# Process commands even when no new events arrive so abort requests are not missed
|
||||
should_check_commands = True
|
||||
time.sleep(0.1)
|
||||
|
||||
if should_check_commands and not commands_checked:
|
||||
self._execution_coordinator.check_commands()
|
||||
commands_checked = True
|
||||
|
||||
if should_break:
|
||||
if not commands_checked:
|
||||
self._execution_coordinator.check_commands()
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
@ -120,6 +129,6 @@ class Dispatcher:
|
||||
if self._event_emitter:
|
||||
self._event_emitter.mark_complete()
|
||||
|
||||
def _process_commands(self, event: GraphNodeEventBase | None = None):
|
||||
if event is None or isinstance(event, self._COMMAND_TRIGGER_EVENTS):
|
||||
self._execution_coordinator.process_commands()
|
||||
def _should_check_commands(self, event: GraphNodeEventBase) -> bool:
|
||||
"""Return True if the event represents a node completion."""
|
||||
return isinstance(event, self._COMMAND_TRIGGER_EVENTS)
|
||||
|
||||
@ -40,7 +40,7 @@ class ExecutionCoordinator:
|
||||
self._command_processor = command_processor
|
||||
self._worker_pool = worker_pool
|
||||
|
||||
def process_commands(self) -> None:
|
||||
def check_commands(self) -> None:
|
||||
"""Process any pending commands."""
|
||||
self._command_processor.process_commands()
|
||||
|
||||
@ -48,16 +48,24 @@ class ExecutionCoordinator:
|
||||
"""Check and perform worker scaling if needed."""
|
||||
self._worker_pool.check_and_scale()
|
||||
|
||||
@property
|
||||
def execution_complete(self):
|
||||
def is_execution_complete(self) -> bool:
|
||||
"""
|
||||
Check if execution is complete.
|
||||
|
||||
Returns:
|
||||
True if execution is complete
|
||||
"""
|
||||
# Treat paused, aborted, or failed executions as terminal states
|
||||
if self._graph_execution.is_paused:
|
||||
return True
|
||||
|
||||
if self._graph_execution.aborted or self._graph_execution.has_error:
|
||||
return True
|
||||
|
||||
return self._state_manager.is_execution_complete()
|
||||
|
||||
@property
|
||||
def aborted(self):
|
||||
return self._graph_execution.aborted or self._graph_execution.has_error
|
||||
|
||||
@property
|
||||
def paused(self) -> bool:
|
||||
def is_paused(self) -> bool:
|
||||
"""Expose whether the underlying graph execution is paused."""
|
||||
return self._graph_execution.is_paused
|
||||
|
||||
|
||||
@ -225,7 +225,7 @@ class Dataset(Base):
|
||||
ExternalKnowledgeApis.id == external_knowledge_binding.external_knowledge_api_id
|
||||
)
|
||||
)
|
||||
if external_knowledge_api is None or external_knowledge_api.settings is None:
|
||||
if not external_knowledge_api:
|
||||
return None
|
||||
return {
|
||||
"external_knowledge_id": external_knowledge_binding.external_knowledge_id,
|
||||
@ -945,20 +945,18 @@ class DatasetQuery(Base):
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=sa.func.current_timestamp())
|
||||
|
||||
|
||||
class DatasetKeywordTable(TypeBase):
|
||||
class DatasetKeywordTable(Base):
|
||||
__tablename__ = "dataset_keyword_tables"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="dataset_keyword_table_pkey"),
|
||||
sa.Index("dataset_keyword_table_dataset_id_idx", "dataset_id"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(
|
||||
StringUUID, primary_key=True, server_default=sa.text("uuid_generate_v4()"), init=False
|
||||
)
|
||||
dataset_id: Mapped[str] = mapped_column(StringUUID, nullable=False, unique=True)
|
||||
keyword_table: Mapped[str] = mapped_column(sa.Text, nullable=False)
|
||||
data_source_type: Mapped[str] = mapped_column(
|
||||
String(255), nullable=False, server_default=sa.text("'database'::character varying"), default="database"
|
||||
id = mapped_column(StringUUID, primary_key=True, server_default=sa.text("uuid_generate_v4()"))
|
||||
dataset_id = mapped_column(StringUUID, nullable=False, unique=True)
|
||||
keyword_table = mapped_column(sa.Text, nullable=False)
|
||||
data_source_type = mapped_column(
|
||||
String(255), nullable=False, server_default=sa.text("'database'::character varying")
|
||||
)
|
||||
|
||||
@property
|
||||
@ -1056,23 +1054,19 @@ class TidbAuthBinding(Base):
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
|
||||
|
||||
class Whitelist(TypeBase):
|
||||
class Whitelist(Base):
|
||||
__tablename__ = "whitelists"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="whitelists_pkey"),
|
||||
sa.Index("whitelists_tenant_idx", "tenant_id"),
|
||||
)
|
||||
id: Mapped[str] = mapped_column(
|
||||
StringUUID, primary_key=True, server_default=sa.text("uuid_generate_v4()"), init=False
|
||||
)
|
||||
tenant_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
|
||||
id = mapped_column(StringUUID, primary_key=True, server_default=sa.text("uuid_generate_v4()"))
|
||||
tenant_id = mapped_column(StringUUID, nullable=True)
|
||||
category: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime, nullable=False, server_default=func.current_timestamp(), init=False
|
||||
)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
|
||||
|
||||
class DatasetPermission(TypeBase):
|
||||
class DatasetPermission(Base):
|
||||
__tablename__ = "dataset_permissions"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="dataset_permission_pkey"),
|
||||
@ -1081,21 +1075,15 @@ class DatasetPermission(TypeBase):
|
||||
sa.Index("idx_dataset_permissions_tenant_id", "tenant_id"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(
|
||||
StringUUID, server_default=sa.text("uuid_generate_v4()"), primary_key=True, init=False
|
||||
)
|
||||
dataset_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
account_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
has_permission: Mapped[bool] = mapped_column(
|
||||
sa.Boolean, nullable=False, server_default=sa.text("true"), default=True
|
||||
)
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime, nullable=False, server_default=func.current_timestamp(), init=False
|
||||
)
|
||||
id = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"), primary_key=True)
|
||||
dataset_id = mapped_column(StringUUID, nullable=False)
|
||||
account_id = mapped_column(StringUUID, nullable=False)
|
||||
tenant_id = mapped_column(StringUUID, nullable=False)
|
||||
has_permission: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, server_default=sa.text("true"))
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
|
||||
|
||||
class ExternalKnowledgeApis(TypeBase):
|
||||
class ExternalKnowledgeApis(Base):
|
||||
__tablename__ = "external_knowledge_apis"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="external_knowledge_apis_pkey"),
|
||||
@ -1103,20 +1091,16 @@ class ExternalKnowledgeApis(TypeBase):
|
||||
sa.Index("external_knowledge_apis_name_idx", "name"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(
|
||||
StringUUID, nullable=False, server_default=sa.text("uuid_generate_v4()"), init=False
|
||||
)
|
||||
id = mapped_column(StringUUID, nullable=False, server_default=sa.text("uuid_generate_v4()"))
|
||||
name: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
description: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
settings: Mapped[str | None] = mapped_column(sa.Text, nullable=True)
|
||||
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime, nullable=False, server_default=func.current_timestamp(), init=False
|
||||
)
|
||||
updated_by: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
|
||||
tenant_id = mapped_column(StringUUID, nullable=False)
|
||||
settings = mapped_column(sa.Text, nullable=True)
|
||||
created_by = mapped_column(StringUUID, nullable=False)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
updated_by = mapped_column(StringUUID, nullable=True)
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp(), init=False
|
||||
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
|
||||
)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
@ -1194,7 +1178,7 @@ class DatasetAutoDisableLog(Base):
|
||||
)
|
||||
|
||||
|
||||
class RateLimitLog(TypeBase):
|
||||
class RateLimitLog(Base):
|
||||
__tablename__ = "rate_limit_logs"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="rate_limit_log_pkey"),
|
||||
@ -1202,12 +1186,12 @@ class RateLimitLog(TypeBase):
|
||||
sa.Index("rate_limit_log_operation_idx", "operation"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"), init=False)
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
id = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
|
||||
tenant_id = mapped_column(StringUUID, nullable=False)
|
||||
subscription_plan: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
operation: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime, nullable=False, server_default=sa.text("CURRENT_TIMESTAMP(0)"), init=False
|
||||
DateTime, nullable=False, server_default=sa.text("CURRENT_TIMESTAMP(0)")
|
||||
)
|
||||
|
||||
|
||||
|
||||
@ -14,7 +14,7 @@ from core.trigger.entities.api_entities import TriggerProviderSubscriptionApiEnt
|
||||
from core.trigger.entities.entities import Subscription
|
||||
from core.trigger.utils.endpoint import generate_plugin_trigger_endpoint_url, generate_webhook_trigger_endpoint
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from models.base import Base, TypeBase
|
||||
from models.base import Base
|
||||
from models.engine import db
|
||||
from models.enums import AppTriggerStatus, AppTriggerType, CreatorUserRole, WorkflowTriggerStatus
|
||||
from models.model import Account
|
||||
@ -399,7 +399,7 @@ class AppTrigger(Base):
|
||||
)
|
||||
|
||||
|
||||
class WorkflowSchedulePlan(TypeBase):
|
||||
class WorkflowSchedulePlan(Base):
|
||||
"""
|
||||
Workflow Schedule Configuration
|
||||
|
||||
@ -425,7 +425,7 @@ class WorkflowSchedulePlan(TypeBase):
|
||||
sa.Index("workflow_schedule_plan_next_idx", "next_run_at"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, primary_key=True, server_default=sa.text("uuidv7()"), init=False)
|
||||
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()"))
|
||||
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
node_id: Mapped[str] = mapped_column(String(64), nullable=False)
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
@ -436,11 +436,9 @@ class WorkflowSchedulePlan(TypeBase):
|
||||
|
||||
# Schedule control
|
||||
next_run_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime, nullable=False, server_default=func.current_timestamp(), init=False
|
||||
)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp(), init=False
|
||||
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
|
||||
)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
|
||||
@ -62,7 +62,7 @@ class ExternalDatasetService:
|
||||
tenant_id=tenant_id,
|
||||
created_by=user_id,
|
||||
updated_by=user_id,
|
||||
name=str(args.get("name")),
|
||||
name=args.get("name"),
|
||||
description=args.get("description", ""),
|
||||
settings=json.dumps(args.get("settings"), ensure_ascii=False),
|
||||
)
|
||||
@ -163,7 +163,7 @@ class ExternalDatasetService:
|
||||
external_knowledge_api = (
|
||||
db.session.query(ExternalKnowledgeApis).filter_by(id=external_knowledge_api_id, tenant_id=tenant_id).first()
|
||||
)
|
||||
if external_knowledge_api is None or external_knowledge_api.settings is None:
|
||||
if external_knowledge_api is None:
|
||||
raise ValueError("api template not found")
|
||||
settings = json.loads(external_knowledge_api.settings)
|
||||
for setting in settings:
|
||||
@ -290,7 +290,7 @@ class ExternalDatasetService:
|
||||
.filter_by(id=external_knowledge_binding.external_knowledge_api_id)
|
||||
.first()
|
||||
)
|
||||
if external_knowledge_api is None or external_knowledge_api.settings is None:
|
||||
if not external_knowledge_api:
|
||||
raise ValueError("external api template not found")
|
||||
|
||||
settings = json.loads(external_knowledge_api.settings)
|
||||
|
||||
@ -13,13 +13,13 @@ from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
from configs import dify_config
|
||||
from core.app.apps.workflow.app_generator import SKIP_PREPARE_USER_INPUTS_KEY, WorkflowAppGenerator
|
||||
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from core.app.layers.timeslice_layer import TimeSliceLayer
|
||||
from core.app.layers.trigger_post_layer import TriggerPostLayer
|
||||
from extensions.ext_database import db
|
||||
from models.account import Account
|
||||
from models.enums import AppTriggerType, CreatorUserRole, WorkflowTriggerStatus
|
||||
from models.enums import CreatorUserRole, WorkflowTriggerStatus
|
||||
from models.model import App, EndUser, Tenant
|
||||
from models.trigger import WorkflowTriggerLog
|
||||
from models.workflow import Workflow
|
||||
@ -81,19 +81,6 @@ def execute_workflow_sandbox(task_data_dict: dict[str, Any]):
|
||||
)
|
||||
|
||||
|
||||
def _build_generator_args(trigger_data: TriggerData) -> dict[str, Any]:
|
||||
"""Build args passed into WorkflowAppGenerator.generate for Celery executions."""
|
||||
args: dict[str, Any] = {
|
||||
"inputs": dict(trigger_data.inputs),
|
||||
"files": list(trigger_data.files),
|
||||
}
|
||||
|
||||
if trigger_data.trigger_type == AppTriggerType.TRIGGER_WEBHOOK:
|
||||
args[SKIP_PREPARE_USER_INPUTS_KEY] = True # Webhooks already provide structured inputs
|
||||
|
||||
return args
|
||||
|
||||
|
||||
def _execute_workflow_common(
|
||||
task_data: WorkflowTaskData,
|
||||
cfs_plan_scheduler: AsyncWorkflowCFSPlanScheduler,
|
||||
@ -141,7 +128,7 @@ def _execute_workflow_common(
|
||||
generator = WorkflowAppGenerator()
|
||||
|
||||
# Prepare args matching AppGenerateService.generate format
|
||||
args = _build_generator_args(trigger_data)
|
||||
args: dict[str, Any] = {"inputs": dict(trigger_data.inputs), "files": list(trigger_data.files)}
|
||||
|
||||
# If workflow_id was specified, add it to args
|
||||
if trigger_data.workflow_id:
|
||||
|
||||
@ -9,7 +9,7 @@ from core.rag.index_processor.index_processor_factory import IndexProcessorFacto
|
||||
from core.tools.utils.web_reader_tool import get_image_upload_file_ids
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_storage import storage
|
||||
from models.dataset import Dataset, DatasetMetadataBinding, DocumentSegment
|
||||
from models.dataset import Dataset, DocumentSegment
|
||||
from models.model import UploadFile
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -37,11 +37,6 @@ def batch_clean_document_task(document_ids: list[str], dataset_id: str, doc_form
|
||||
if not dataset:
|
||||
raise Exception("Document has no dataset")
|
||||
|
||||
db.session.query(DatasetMetadataBinding).where(
|
||||
DatasetMetadataBinding.dataset_id == dataset_id,
|
||||
DatasetMetadataBinding.document_id.in_(document_ids),
|
||||
).delete(synchronize_session=False)
|
||||
|
||||
segments = db.session.scalars(
|
||||
select(DocumentSegment).where(DocumentSegment.document_id.in_(document_ids))
|
||||
).all()
|
||||
@ -76,8 +71,7 @@ def batch_clean_document_task(document_ids: list[str], dataset_id: str, doc_form
|
||||
except Exception:
|
||||
logger.exception("Delete file failed when document deleted, file_id: %s", file.id)
|
||||
db.session.delete(file)
|
||||
|
||||
db.session.commit()
|
||||
db.session.commit()
|
||||
|
||||
end_at = time.perf_counter()
|
||||
logger.info(
|
||||
|
||||
@ -1,189 +0,0 @@
|
||||
"""Tests for dispatcher command checking behavior."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import queue
|
||||
from datetime import datetime
|
||||
from unittest import mock
|
||||
|
||||
from core.workflow.entities.pause_reason import SchedulingPause
|
||||
from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus
|
||||
from core.workflow.graph_engine.event_management.event_handlers import EventHandler
|
||||
from core.workflow.graph_engine.orchestration.dispatcher import Dispatcher
|
||||
from core.workflow.graph_engine.orchestration.execution_coordinator import ExecutionCoordinator
|
||||
from core.workflow.graph_events import (
|
||||
GraphNodeEventBase,
|
||||
NodeRunPauseRequestedEvent,
|
||||
NodeRunStartedEvent,
|
||||
NodeRunSucceededEvent,
|
||||
)
|
||||
from core.workflow.node_events import NodeRunResult
|
||||
|
||||
|
||||
def test_dispatcher_should_consume_remains_events_after_pause():
|
||||
event_queue = queue.Queue()
|
||||
event_queue.put(
|
||||
GraphNodeEventBase(
|
||||
id="test",
|
||||
node_id="test",
|
||||
node_type=NodeType.START,
|
||||
)
|
||||
)
|
||||
event_handler = mock.Mock(spec=EventHandler)
|
||||
execution_coordinator = mock.Mock(spec=ExecutionCoordinator)
|
||||
execution_coordinator.paused.return_value = True
|
||||
dispatcher = Dispatcher(
|
||||
event_queue=event_queue,
|
||||
event_handler=event_handler,
|
||||
execution_coordinator=execution_coordinator,
|
||||
)
|
||||
dispatcher._dispatcher_loop()
|
||||
assert event_queue.empty()
|
||||
|
||||
|
||||
class _StubExecutionCoordinator:
|
||||
"""Stub execution coordinator that tracks command checks."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.command_checks = 0
|
||||
self.scaling_checks = 0
|
||||
self.execution_complete = False
|
||||
self.failed = False
|
||||
self._paused = False
|
||||
|
||||
def process_commands(self) -> None:
|
||||
self.command_checks += 1
|
||||
|
||||
def check_scaling(self) -> None:
|
||||
self.scaling_checks += 1
|
||||
|
||||
@property
|
||||
def paused(self) -> bool:
|
||||
return self._paused
|
||||
|
||||
@property
|
||||
def aborted(self) -> bool:
|
||||
return False
|
||||
|
||||
def mark_complete(self) -> None:
|
||||
self.execution_complete = True
|
||||
|
||||
def mark_failed(self, error: Exception) -> None: # pragma: no cover - defensive, not triggered in tests
|
||||
self.failed = True
|
||||
|
||||
|
||||
class _StubEventHandler:
|
||||
"""Minimal event handler that marks execution complete after handling an event."""
|
||||
|
||||
def __init__(self, coordinator: _StubExecutionCoordinator) -> None:
|
||||
self._coordinator = coordinator
|
||||
self.events = []
|
||||
|
||||
def dispatch(self, event) -> None:
|
||||
self.events.append(event)
|
||||
self._coordinator.mark_complete()
|
||||
|
||||
|
||||
def _run_dispatcher_for_event(event) -> int:
|
||||
"""Run the dispatcher loop for a single event and return command check count."""
|
||||
event_queue: queue.Queue = queue.Queue()
|
||||
event_queue.put(event)
|
||||
|
||||
coordinator = _StubExecutionCoordinator()
|
||||
event_handler = _StubEventHandler(coordinator)
|
||||
|
||||
dispatcher = Dispatcher(
|
||||
event_queue=event_queue,
|
||||
event_handler=event_handler,
|
||||
execution_coordinator=coordinator,
|
||||
)
|
||||
|
||||
dispatcher._dispatcher_loop()
|
||||
|
||||
return coordinator.command_checks
|
||||
|
||||
|
||||
def _make_started_event() -> NodeRunStartedEvent:
|
||||
return NodeRunStartedEvent(
|
||||
id="start-event",
|
||||
node_id="node-1",
|
||||
node_type=NodeType.CODE,
|
||||
node_title="Test Node",
|
||||
start_at=datetime.utcnow(),
|
||||
)
|
||||
|
||||
|
||||
def _make_succeeded_event() -> NodeRunSucceededEvent:
|
||||
return NodeRunSucceededEvent(
|
||||
id="success-event",
|
||||
node_id="node-1",
|
||||
node_type=NodeType.CODE,
|
||||
node_title="Test Node",
|
||||
start_at=datetime.utcnow(),
|
||||
node_run_result=NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED),
|
||||
)
|
||||
|
||||
|
||||
def test_dispatcher_checks_commands_during_idle_and_on_completion() -> None:
|
||||
"""Dispatcher polls commands when idle and after completion events."""
|
||||
started_checks = _run_dispatcher_for_event(_make_started_event())
|
||||
succeeded_checks = _run_dispatcher_for_event(_make_succeeded_event())
|
||||
|
||||
assert started_checks == 2
|
||||
assert succeeded_checks == 3
|
||||
|
||||
|
||||
class _PauseStubEventHandler:
|
||||
"""Minimal event handler that marks execution complete after handling an event."""
|
||||
|
||||
def __init__(self, coordinator: _StubExecutionCoordinator) -> None:
|
||||
self._coordinator = coordinator
|
||||
self.events = []
|
||||
|
||||
def dispatch(self, event) -> None:
|
||||
self.events.append(event)
|
||||
if isinstance(event, NodeRunPauseRequestedEvent):
|
||||
self._coordinator.mark_complete()
|
||||
|
||||
|
||||
def test_dispatcher_drain_event_queue():
|
||||
events = [
|
||||
NodeRunStartedEvent(
|
||||
id="start-event",
|
||||
node_id="node-1",
|
||||
node_type=NodeType.CODE,
|
||||
node_title="Code",
|
||||
start_at=datetime.utcnow(),
|
||||
),
|
||||
NodeRunPauseRequestedEvent(
|
||||
id="pause-event",
|
||||
node_id="node-1",
|
||||
node_type=NodeType.CODE,
|
||||
reason=SchedulingPause(message="test pause"),
|
||||
),
|
||||
NodeRunSucceededEvent(
|
||||
id="success-event",
|
||||
node_id="node-1",
|
||||
node_type=NodeType.CODE,
|
||||
start_at=datetime.utcnow(),
|
||||
node_run_result=NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED),
|
||||
),
|
||||
]
|
||||
|
||||
event_queue: queue.Queue = queue.Queue()
|
||||
for e in events:
|
||||
event_queue.put(e)
|
||||
|
||||
coordinator = _StubExecutionCoordinator()
|
||||
event_handler = _PauseStubEventHandler(coordinator)
|
||||
|
||||
dispatcher = Dispatcher(
|
||||
event_queue=event_queue,
|
||||
event_handler=event_handler,
|
||||
execution_coordinator=coordinator,
|
||||
)
|
||||
|
||||
dispatcher._dispatcher_loop()
|
||||
|
||||
# ensure all events are drained.
|
||||
assert event_queue.empty()
|
||||
@ -3,17 +3,13 @@
|
||||
import time
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from core.workflow.entities.graph_init_params import GraphInitParams
|
||||
from core.workflow.entities.pause_reason import SchedulingPause
|
||||
from core.workflow.graph import Graph
|
||||
from core.workflow.graph_engine import GraphEngine
|
||||
from core.workflow.graph_engine.command_channels import InMemoryChannel
|
||||
from core.workflow.graph_engine.entities.commands import AbortCommand, CommandType, PauseCommand
|
||||
from core.workflow.graph_events import GraphRunAbortedEvent, GraphRunPausedEvent, GraphRunStartedEvent
|
||||
from core.workflow.nodes.start.start_node import StartNode
|
||||
from core.workflow.runtime import GraphRuntimeState, VariablePool
|
||||
from models.enums import UserFrom
|
||||
|
||||
|
||||
def test_abort_command():
|
||||
@ -30,23 +26,11 @@ def test_abort_command():
|
||||
mock_graph.root_node.id = "start"
|
||||
|
||||
# Create mock nodes with required attributes - using shared runtime state
|
||||
start_node = StartNode(
|
||||
id="start",
|
||||
config={"id": "start"},
|
||||
graph_init_params=GraphInitParams(
|
||||
tenant_id="test_tenant",
|
||||
app_id="test_app",
|
||||
workflow_id="test_workflow",
|
||||
graph_config={},
|
||||
user_id="test_user",
|
||||
user_from=UserFrom.ACCOUNT,
|
||||
invoke_from=InvokeFrom.DEBUGGER,
|
||||
call_depth=0,
|
||||
),
|
||||
graph_runtime_state=shared_runtime_state,
|
||||
)
|
||||
start_node.init_node_data({"title": "start", "variables": []})
|
||||
mock_graph.nodes["start"] = start_node
|
||||
mock_start_node = MagicMock()
|
||||
mock_start_node.state = None
|
||||
mock_start_node.id = "start"
|
||||
mock_start_node.graph_runtime_state = shared_runtime_state # Use shared instance
|
||||
mock_graph.nodes["start"] = mock_start_node
|
||||
|
||||
# Mock graph methods
|
||||
mock_graph.get_outgoing_edges = MagicMock(return_value=[])
|
||||
@ -140,23 +124,11 @@ def test_pause_command():
|
||||
mock_graph.root_node = MagicMock()
|
||||
mock_graph.root_node.id = "start"
|
||||
|
||||
start_node = StartNode(
|
||||
id="start",
|
||||
config={"id": "start"},
|
||||
graph_init_params=GraphInitParams(
|
||||
tenant_id="test_tenant",
|
||||
app_id="test_app",
|
||||
workflow_id="test_workflow",
|
||||
graph_config={},
|
||||
user_id="test_user",
|
||||
user_from=UserFrom.ACCOUNT,
|
||||
invoke_from=InvokeFrom.DEBUGGER,
|
||||
call_depth=0,
|
||||
),
|
||||
graph_runtime_state=shared_runtime_state,
|
||||
)
|
||||
start_node.init_node_data({"title": "start", "variables": []})
|
||||
mock_graph.nodes["start"] = start_node
|
||||
mock_start_node = MagicMock()
|
||||
mock_start_node.state = None
|
||||
mock_start_node.id = "start"
|
||||
mock_start_node.graph_runtime_state = shared_runtime_state
|
||||
mock_graph.nodes["start"] = mock_start_node
|
||||
|
||||
mock_graph.get_outgoing_edges = MagicMock(return_value=[])
|
||||
mock_graph.get_incoming_edges = MagicMock(return_value=[])
|
||||
@ -181,5 +153,5 @@ def test_pause_command():
|
||||
assert pause_events[0].reason == SchedulingPause(message="User requested pause")
|
||||
|
||||
graph_execution = engine.graph_runtime_state.graph_execution
|
||||
assert graph_execution.paused
|
||||
assert graph_execution.is_paused
|
||||
assert graph_execution.pause_reason == SchedulingPause(message="User requested pause")
|
||||
|
||||
@ -0,0 +1,109 @@
|
||||
"""Tests for dispatcher command checking behavior."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import queue
|
||||
from datetime import datetime
|
||||
|
||||
from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus
|
||||
from core.workflow.graph_engine.event_management.event_manager import EventManager
|
||||
from core.workflow.graph_engine.orchestration.dispatcher import Dispatcher
|
||||
from core.workflow.graph_events import NodeRunStartedEvent, NodeRunSucceededEvent
|
||||
from core.workflow.node_events import NodeRunResult
|
||||
|
||||
|
||||
class _StubExecutionCoordinator:
|
||||
"""Stub execution coordinator that tracks command checks."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.command_checks = 0
|
||||
self.scaling_checks = 0
|
||||
self._execution_complete = False
|
||||
self.mark_complete_called = False
|
||||
self.failed = False
|
||||
self._paused = False
|
||||
|
||||
def check_commands(self) -> None:
|
||||
self.command_checks += 1
|
||||
|
||||
def check_scaling(self) -> None:
|
||||
self.scaling_checks += 1
|
||||
|
||||
@property
|
||||
def is_paused(self) -> bool:
|
||||
return self._paused
|
||||
|
||||
def is_execution_complete(self) -> bool:
|
||||
return self._execution_complete
|
||||
|
||||
def mark_complete(self) -> None:
|
||||
self.mark_complete_called = True
|
||||
|
||||
def mark_failed(self, error: Exception) -> None: # pragma: no cover - defensive, not triggered in tests
|
||||
self.failed = True
|
||||
|
||||
def set_execution_complete(self) -> None:
|
||||
self._execution_complete = True
|
||||
|
||||
|
||||
class _StubEventHandler:
|
||||
"""Minimal event handler that marks execution complete after handling an event."""
|
||||
|
||||
def __init__(self, coordinator: _StubExecutionCoordinator) -> None:
|
||||
self._coordinator = coordinator
|
||||
self.events = []
|
||||
|
||||
def dispatch(self, event) -> None:
|
||||
self.events.append(event)
|
||||
self._coordinator.set_execution_complete()
|
||||
|
||||
|
||||
def _run_dispatcher_for_event(event) -> int:
|
||||
"""Run the dispatcher loop for a single event and return command check count."""
|
||||
event_queue: queue.Queue = queue.Queue()
|
||||
event_queue.put(event)
|
||||
|
||||
coordinator = _StubExecutionCoordinator()
|
||||
event_handler = _StubEventHandler(coordinator)
|
||||
event_manager = EventManager()
|
||||
|
||||
dispatcher = Dispatcher(
|
||||
event_queue=event_queue,
|
||||
event_handler=event_handler,
|
||||
event_collector=event_manager,
|
||||
execution_coordinator=coordinator,
|
||||
)
|
||||
|
||||
dispatcher._dispatcher_loop()
|
||||
|
||||
return coordinator.command_checks
|
||||
|
||||
|
||||
def _make_started_event() -> NodeRunStartedEvent:
|
||||
return NodeRunStartedEvent(
|
||||
id="start-event",
|
||||
node_id="node-1",
|
||||
node_type=NodeType.CODE,
|
||||
node_title="Test Node",
|
||||
start_at=datetime.utcnow(),
|
||||
)
|
||||
|
||||
|
||||
def _make_succeeded_event() -> NodeRunSucceededEvent:
|
||||
return NodeRunSucceededEvent(
|
||||
id="success-event",
|
||||
node_id="node-1",
|
||||
node_type=NodeType.CODE,
|
||||
node_title="Test Node",
|
||||
start_at=datetime.utcnow(),
|
||||
node_run_result=NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED),
|
||||
)
|
||||
|
||||
|
||||
def test_dispatcher_checks_commands_during_idle_and_on_completion() -> None:
|
||||
"""Dispatcher polls commands when idle and after completion events."""
|
||||
started_checks = _run_dispatcher_for_event(_make_started_event())
|
||||
succeeded_checks = _run_dispatcher_for_event(_make_succeeded_event())
|
||||
|
||||
assert started_checks == 1
|
||||
assert succeeded_checks == 2
|
||||
@ -48,3 +48,15 @@ def test_handle_pause_noop_when_execution_running() -> None:
|
||||
|
||||
worker_pool.stop.assert_not_called()
|
||||
state_manager.clear_executing.assert_not_called()
|
||||
|
||||
|
||||
def test_is_execution_complete_when_paused() -> None:
|
||||
"""Paused execution should be treated as complete."""
|
||||
graph_execution = GraphExecution(workflow_id="workflow")
|
||||
graph_execution.start()
|
||||
graph_execution.pause("Awaiting input")
|
||||
|
||||
coordinator, state_manager, _worker_pool = _build_coordinator(graph_execution)
|
||||
state_manager.is_execution_complete.return_value = False
|
||||
|
||||
assert coordinator.is_execution_complete()
|
||||
|
||||
@ -1,37 +0,0 @@
|
||||
from core.app.apps.workflow.app_generator import SKIP_PREPARE_USER_INPUTS_KEY
|
||||
from models.enums import AppTriggerType, WorkflowRunTriggeredFrom
|
||||
from services.workflow.entities import TriggerData, WebhookTriggerData
|
||||
from tasks import async_workflow_tasks
|
||||
|
||||
|
||||
def test_build_generator_args_sets_skip_flag_for_webhook():
|
||||
trigger_data = WebhookTriggerData(
|
||||
app_id="app",
|
||||
tenant_id="tenant",
|
||||
workflow_id="workflow",
|
||||
root_node_id="node",
|
||||
inputs={"webhook_data": {"body": {"foo": "bar"}}},
|
||||
)
|
||||
|
||||
args = async_workflow_tasks._build_generator_args(trigger_data)
|
||||
|
||||
assert args[SKIP_PREPARE_USER_INPUTS_KEY] is True
|
||||
assert args["inputs"]["webhook_data"]["body"]["foo"] == "bar"
|
||||
|
||||
|
||||
def test_build_generator_args_keeps_validation_for_other_triggers():
|
||||
trigger_data = TriggerData(
|
||||
app_id="app",
|
||||
tenant_id="tenant",
|
||||
workflow_id="workflow",
|
||||
root_node_id="node",
|
||||
inputs={"foo": "bar"},
|
||||
files=[],
|
||||
trigger_type=AppTriggerType.TRIGGER_SCHEDULE,
|
||||
trigger_from=WorkflowRunTriggeredFrom.SCHEDULE,
|
||||
)
|
||||
|
||||
args = async_workflow_tasks._build_generator_args(trigger_data)
|
||||
|
||||
assert SKIP_PREPARE_USER_INPUTS_KEY not in args
|
||||
assert args["inputs"] == {"foo": "bar"}
|
||||
5488
api/uv.lock
generated
5488
api/uv.lock
generated
File diff suppressed because it is too large
Load Diff
16
dify-workflow-engine/README.md
Normal file
16
dify-workflow-engine/README.md
Normal file
@ -0,0 +1,16 @@
|
||||
# Dify Workflow Engine
|
||||
|
||||
A standalone SDK for executing Dify workflows.
|
||||
|
||||
## Installation
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
```python
|
||||
from core.workflow.workflow_entry import WorkflowEntry
|
||||
# ...
|
||||
```
|
||||
7
dify-workflow-engine/configs.py
Normal file
7
dify-workflow-engine/configs.py
Normal file
@ -0,0 +1,7 @@
|
||||
class DifyConfig:
|
||||
WORKFLOW_CALL_MAX_DEPTH = 5
|
||||
DEBUG = True
|
||||
WORKFLOW_MAX_EXECUTION_STEPS = 100
|
||||
WORKFLOW_MAX_EXECUTION_TIME = 600
|
||||
|
||||
dify_config = DifyConfig()
|
||||
2
dify-workflow-engine/core/agent/__init__.py
Normal file
2
dify-workflow-engine/core/agent/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
class Agent:
|
||||
pass
|
||||
10
dify-workflow-engine/core/agent/entities/__init__.py
Normal file
10
dify-workflow-engine/core/agent/entities/__init__.py
Normal file
@ -0,0 +1,10 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
class AgentEntity(BaseModel):
|
||||
pass
|
||||
|
||||
class AgentNodeData(BaseModel):
|
||||
agent_strategy_name: str
|
||||
|
||||
class AgentToolEntity(BaseModel):
|
||||
pass
|
||||
7
dify-workflow-engine/core/agent/plugin_entities.py
Normal file
7
dify-workflow-engine/core/agent/plugin_entities.py
Normal file
@ -0,0 +1,7 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
class AgentPluginEntity(BaseModel):
|
||||
pass
|
||||
|
||||
class AgentStrategyParameter(BaseModel):
|
||||
pass
|
||||
0
dify-workflow-engine/core/app/__init__.py
Normal file
0
dify-workflow-engine/core/app/__init__.py
Normal file
0
dify-workflow-engine/core/app/apps/__init__.py
Normal file
0
dify-workflow-engine/core/app/apps/__init__.py
Normal file
2
dify-workflow-engine/core/app/apps/exc.py
Normal file
2
dify-workflow-engine/core/app/apps/exc.py
Normal file
@ -0,0 +1,2 @@
|
||||
class GenerateTaskStoppedError(Exception):
|
||||
pass
|
||||
0
dify-workflow-engine/core/app/entities/__init__.py
Normal file
0
dify-workflow-engine/core/app/entities/__init__.py
Normal file
@ -0,0 +1,6 @@
|
||||
from enum import Enum
|
||||
|
||||
class InvokeFrom(Enum):
|
||||
DEBUGGER = "debugger"
|
||||
SERVICE_API = "service_api"
|
||||
WEB_APP = "web_app"
|
||||
2
dify-workflow-engine/core/callback_handler/__init__.py
Normal file
2
dify-workflow-engine/core/callback_handler/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
class CallbackHandler:
|
||||
pass
|
||||
@ -0,0 +1,2 @@
|
||||
class DifyWorkflowCallbackHandler:
|
||||
pass
|
||||
2
dify-workflow-engine/core/datasource/__init__.py
Normal file
2
dify-workflow-engine/core/datasource/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
class Connection:
|
||||
pass
|
||||
@ -0,0 +1,2 @@
|
||||
class DatasourceManager:
|
||||
pass
|
||||
@ -0,0 +1,4 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
class DatasourceEntity(BaseModel):
|
||||
pass
|
||||
@ -0,0 +1,22 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
class DatasourceEntity(BaseModel):
|
||||
pass
|
||||
|
||||
class DatasourceType(BaseModel):
|
||||
pass
|
||||
|
||||
class DatasourceMessage(BaseModel):
|
||||
pass
|
||||
|
||||
class DatasourceParameter(BaseModel):
|
||||
pass
|
||||
|
||||
class DatasourceProviderType(BaseModel):
|
||||
pass
|
||||
|
||||
class GetOnlineDocumentPageContentRequest(BaseModel):
|
||||
pass
|
||||
|
||||
class OnlineDriveDownloadFileRequest(BaseModel):
|
||||
pass
|
||||
@ -0,0 +1,2 @@
|
||||
class OnlineDocumentDatasourcePlugin:
|
||||
pass
|
||||
@ -0,0 +1,2 @@
|
||||
class OnlineDriveDatasourcePlugin:
|
||||
pass
|
||||
@ -0,0 +1,2 @@
|
||||
class DatasourceFileMessageTransformer:
|
||||
pass
|
||||
6
dify-workflow-engine/core/file/__init__.py
Normal file
6
dify-workflow-engine/core/file/__init__.py
Normal file
@ -0,0 +1,6 @@
|
||||
from .models import File, FileAttribute, FileTransferMethod, FileType
|
||||
|
||||
class FileManager:
|
||||
pass
|
||||
|
||||
file_manager = FileManager()
|
||||
13
dify-workflow-engine/core/file/enums.py
Normal file
13
dify-workflow-engine/core/file/enums.py
Normal file
@ -0,0 +1,13 @@
|
||||
from enum import StrEnum
|
||||
|
||||
class FileType(StrEnum):
|
||||
IMAGE = "image"
|
||||
AUDIO = "audio"
|
||||
VIDEO = "video"
|
||||
DOCUMENT = "document"
|
||||
CUSTOM = "custom"
|
||||
|
||||
class FileTransferMethod(StrEnum):
|
||||
REMOTE_URL = "remote_url"
|
||||
LOCAL_FILE = "local_file"
|
||||
TOOL_FILE = "tool_file"
|
||||
14
dify-workflow-engine/core/file/models.py
Normal file
14
dify-workflow-engine/core/file/models.py
Normal file
@ -0,0 +1,14 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
class File(BaseModel):
|
||||
def to_dict(self):
|
||||
return {}
|
||||
|
||||
class FileAttribute(BaseModel):
|
||||
pass
|
||||
|
||||
class FileTransferMethod(BaseModel):
|
||||
pass
|
||||
|
||||
class FileType(BaseModel):
|
||||
pass
|
||||
2
dify-workflow-engine/core/helper/__init__.py
Normal file
2
dify-workflow-engine/core/helper/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
class SSKey:
|
||||
pass
|
||||
@ -0,0 +1,5 @@
|
||||
class CodeExecutor:
|
||||
pass
|
||||
|
||||
class CodeLanguage:
|
||||
pass
|
||||
@ -0,0 +1,14 @@
|
||||
class CodeExecutor:
|
||||
pass
|
||||
|
||||
class CodeLanguage:
|
||||
PYTHON3 = "python3"
|
||||
JAVASCRIPT = "javascript"
|
||||
JSON = "json"
|
||||
STRING = "string"
|
||||
NUMBER = "number"
|
||||
OBJECT = "object"
|
||||
ARRAY = "array"
|
||||
|
||||
class CodeExecutionError(Exception):
|
||||
pass
|
||||
@ -0,0 +1,2 @@
|
||||
class CodeNodeProvider:
|
||||
pass
|
||||
@ -0,0 +1,2 @@
|
||||
class JavascriptCodeProvider:
|
||||
pass
|
||||
@ -0,0 +1,2 @@
|
||||
class Python3CodeProvider:
|
||||
pass
|
||||
2
dify-workflow-engine/core/helper/ssrf_proxy.py
Normal file
2
dify-workflow-engine/core/helper/ssrf_proxy.py
Normal file
@ -0,0 +1,2 @@
|
||||
class SSRFProxy:
|
||||
pass
|
||||
2
dify-workflow-engine/core/memory/__init__.py
Normal file
2
dify-workflow-engine/core/memory/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
class TokenBufferMemory:
|
||||
pass
|
||||
2
dify-workflow-engine/core/memory/token_buffer_memory.py
Normal file
2
dify-workflow-engine/core/memory/token_buffer_memory.py
Normal file
@ -0,0 +1,2 @@
|
||||
class TokenBufferMemory:
|
||||
pass
|
||||
5
dify-workflow-engine/core/model_manager/__init__.py
Normal file
5
dify-workflow-engine/core/model_manager/__init__.py
Normal file
@ -0,0 +1,5 @@
|
||||
class ModelManager:
|
||||
pass
|
||||
|
||||
class ModelInstance:
|
||||
pass
|
||||
0
dify-workflow-engine/core/model_runtime/__init__.py
Normal file
0
dify-workflow-engine/core/model_runtime/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
class ModelPropertyKey:
|
||||
pass
|
||||
@ -0,0 +1,12 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
class LLMResult(BaseModel):
|
||||
pass
|
||||
|
||||
class LLMUsage(BaseModel):
|
||||
@classmethod
|
||||
def empty_usage(cls):
|
||||
return cls()
|
||||
|
||||
class LLMUsageMetadata(BaseModel):
|
||||
pass
|
||||
@ -0,0 +1,7 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
class ModelType(BaseModel):
|
||||
pass
|
||||
|
||||
class AIModelEntity(BaseModel):
|
||||
pass
|
||||
@ -0,0 +1 @@
|
||||
from .encoders import jsonable_encoder
|
||||
@ -0,0 +1,2 @@
|
||||
def jsonable_encoder(obj):
|
||||
return obj
|
||||
2
dify-workflow-engine/core/plugin/__init__.py
Normal file
2
dify-workflow-engine/core/plugin/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
class PluginManager:
|
||||
pass
|
||||
2
dify-workflow-engine/core/plugin/entities/request.py
Normal file
2
dify-workflow-engine/core/plugin/entities/request.py
Normal file
@ -0,0 +1,2 @@
|
||||
class InvokeCredentials:
|
||||
pass
|
||||
5
dify-workflow-engine/core/plugin/impl/exc.py
Normal file
5
dify-workflow-engine/core/plugin/impl/exc.py
Normal file
@ -0,0 +1,5 @@
|
||||
class PluginDaemonClientSideError(Exception):
|
||||
pass
|
||||
|
||||
class PluginInvokeError(Exception):
|
||||
pass
|
||||
2
dify-workflow-engine/core/plugin/impl/plugin.py
Normal file
2
dify-workflow-engine/core/plugin/impl/plugin.py
Normal file
@ -0,0 +1,2 @@
|
||||
class PluginInstaller:
|
||||
pass
|
||||
2
dify-workflow-engine/core/prompt/__init__.py
Normal file
2
dify-workflow-engine/core/prompt/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
class PromptTemplate:
|
||||
pass
|
||||
4
dify-workflow-engine/core/prompt/entities/__init__.py
Normal file
4
dify-workflow-engine/core/prompt/entities/__init__.py
Normal file
@ -0,0 +1,4 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
class PromptEntity(BaseModel):
|
||||
pass
|
||||
@ -0,0 +1,7 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
class AdvancedPromptEntity(BaseModel):
|
||||
pass
|
||||
|
||||
class MemoryConfig(BaseModel):
|
||||
pass
|
||||
2
dify-workflow-engine/core/provider_manager/__init__.py
Normal file
2
dify-workflow-engine/core/provider_manager/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
class ProviderManager:
|
||||
pass
|
||||
1
dify-workflow-engine/core/rag/__init__.py
Normal file
1
dify-workflow-engine/core/rag/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
# Mock core.rag
|
||||
4
dify-workflow-engine/core/rag/entities/__init__.py
Normal file
4
dify-workflow-engine/core/rag/entities/__init__.py
Normal file
@ -0,0 +1,4 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
class RetrievalResource(BaseModel):
|
||||
pass
|
||||
@ -0,0 +1,7 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
class CitationMetadata(BaseModel):
|
||||
pass
|
||||
|
||||
class RetrievalSourceMetadata(BaseModel):
|
||||
pass
|
||||
@ -0,0 +1,4 @@
|
||||
from .entities import index_processor_entities
|
||||
|
||||
class IndexProcessorBase:
|
||||
pass
|
||||
@ -0,0 +1,2 @@
|
||||
class IndexProcessorBase:
|
||||
pass
|
||||
@ -0,0 +1,2 @@
|
||||
class IndexProcessorFactory:
|
||||
pass
|
||||
1
dify-workflow-engine/core/rag/retrieval/__init__.py
Normal file
1
dify-workflow-engine/core/rag/retrieval/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .retrieval_service import RetrievalService
|
||||
@ -0,0 +1,4 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
class DatasetRetrieval(BaseModel):
|
||||
pass
|
||||
@ -0,0 +1,7 @@
|
||||
from pydantic import BaseModel
|
||||
from typing import ClassVar
|
||||
|
||||
class RetrievalMethod(BaseModel):
|
||||
SEMANTIC_SEARCH: ClassVar[str] = "SEMANTIC_SEARCH"
|
||||
KEYWORD_SEARCH: ClassVar[str] = "KEYWORD_SEARCH"
|
||||
HYBRID_SEARCH: ClassVar[str] = "HYBRID_SEARCH"
|
||||
@ -0,0 +1,2 @@
|
||||
class RetrievalService:
|
||||
pass
|
||||
2
dify-workflow-engine/core/tools/__base/tool.py
Normal file
2
dify-workflow-engine/core/tools/__base/tool.py
Normal file
@ -0,0 +1,2 @@
|
||||
class Tool:
|
||||
pass
|
||||
2
dify-workflow-engine/core/tools/__init__.py
Normal file
2
dify-workflow-engine/core/tools/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
class ToolManager:
|
||||
pass
|
||||
4
dify-workflow-engine/core/tools/entities/__init__.py
Normal file
4
dify-workflow-engine/core/tools/entities/__init__.py
Normal file
@ -0,0 +1,4 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
class ToolEntity(BaseModel):
|
||||
pass
|
||||
19
dify-workflow-engine/core/tools/entities/tool_entities.py
Normal file
19
dify-workflow-engine/core/tools/entities/tool_entities.py
Normal file
@ -0,0 +1,19 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
class ToolEntity(BaseModel):
|
||||
pass
|
||||
|
||||
class ToolIdentity(BaseModel):
|
||||
pass
|
||||
|
||||
class ToolInvokeMessage(BaseModel):
|
||||
pass
|
||||
|
||||
class ToolParameter(BaseModel):
|
||||
pass
|
||||
|
||||
class ToolProviderType(BaseModel):
|
||||
pass
|
||||
|
||||
class ToolSelector(BaseModel):
|
||||
pass
|
||||
14
dify-workflow-engine/core/tools/errors.py
Normal file
14
dify-workflow-engine/core/tools/errors.py
Normal file
@ -0,0 +1,14 @@
|
||||
class ToolProviderCredentialValidationError(Exception):
|
||||
pass
|
||||
|
||||
class ToolNotFoundError(Exception):
|
||||
pass
|
||||
|
||||
class ToolParameterValidationError(Exception):
|
||||
pass
|
||||
|
||||
class ToolInvokeError(Exception):
|
||||
pass
|
||||
|
||||
class ToolEngineInvokeError(Exception):
|
||||
pass
|
||||
2
dify-workflow-engine/core/tools/tool_engine.py
Normal file
2
dify-workflow-engine/core/tools/tool_engine.py
Normal file
@ -0,0 +1,2 @@
|
||||
class ToolEngine:
|
||||
pass
|
||||
2
dify-workflow-engine/core/tools/tool_manager.py
Normal file
2
dify-workflow-engine/core/tools/tool_manager.py
Normal file
@ -0,0 +1,2 @@
|
||||
class ToolManager:
|
||||
pass
|
||||
2
dify-workflow-engine/core/tools/utils/__init__.py
Normal file
2
dify-workflow-engine/core/tools/utils/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
class ToolUtils:
|
||||
pass
|
||||
@ -0,0 +1,5 @@
|
||||
class ToolMessageTransformer:
|
||||
pass
|
||||
|
||||
class ToolFileMessageTransformer:
|
||||
pass
|
||||
@ -0,0 +1,2 @@
|
||||
class WorkflowAsTool:
|
||||
pass
|
||||
2
dify-workflow-engine/core/tools/workflow_as_tool/tool.py
Normal file
2
dify-workflow-engine/core/tools/workflow_as_tool/tool.py
Normal file
@ -0,0 +1,2 @@
|
||||
class WorkflowTool:
|
||||
pass
|
||||
12
dify-workflow-engine/core/variables/__init__.py
Normal file
12
dify-workflow-engine/core/variables/__init__.py
Normal file
@ -0,0 +1,12 @@
|
||||
from .variables import Variable, IntegerVariable, ArrayAnyVariable
|
||||
from .segments import (
|
||||
Segment,
|
||||
ArrayFileSegment,
|
||||
ArrayNumberSegment,
|
||||
ArrayStringSegment,
|
||||
NoneSegment,
|
||||
FileSegment,
|
||||
ArrayObjectSegment,
|
||||
SegmentGroup
|
||||
)
|
||||
from .types import SegmentType, ArrayValidation
|
||||
1
dify-workflow-engine/core/variables/consts.py
Normal file
1
dify-workflow-engine/core/variables/consts.py
Normal file
@ -0,0 +1 @@
|
||||
SELECTORS_LENGTH = 10
|
||||
40
dify-workflow-engine/core/variables/segments.py
Normal file
40
dify-workflow-engine/core/variables/segments.py
Normal file
@ -0,0 +1,40 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
class Segment(BaseModel):
|
||||
pass
|
||||
|
||||
class ArrayFileSegment(Segment):
|
||||
pass
|
||||
|
||||
class ArrayNumberSegment(Segment):
|
||||
pass
|
||||
|
||||
class ArrayStringSegment(Segment):
|
||||
pass
|
||||
|
||||
class NoneSegment(Segment):
|
||||
pass
|
||||
|
||||
class FileSegment(Segment):
|
||||
pass
|
||||
|
||||
class ArrayObjectSegment(Segment):
|
||||
pass
|
||||
|
||||
class ArrayBooleanSegment(Segment):
|
||||
pass
|
||||
|
||||
class BooleanSegment(Segment):
|
||||
pass
|
||||
|
||||
class ObjectSegment(Segment):
|
||||
pass
|
||||
|
||||
class ArrayAnySegment(Segment):
|
||||
pass
|
||||
|
||||
class StringSegment(Segment):
|
||||
pass
|
||||
|
||||
class SegmentGroup(Segment):
|
||||
pass
|
||||
19
dify-workflow-engine/core/variables/types.py
Normal file
19
dify-workflow-engine/core/variables/types.py
Normal file
@ -0,0 +1,19 @@
|
||||
from pydantic import BaseModel
|
||||
from typing import ClassVar
|
||||
|
||||
class SegmentType(BaseModel):
|
||||
STRING: ClassVar[str] = "string"
|
||||
NUMBER: ClassVar[str] = "number"
|
||||
OBJECT: ClassVar[str] = "object"
|
||||
ARRAY_STRING: ClassVar[str] = "array[string]"
|
||||
ARRAY_NUMBER: ClassVar[str] = "array[number]"
|
||||
ARRAY_OBJECT: ClassVar[str] = "array[object]"
|
||||
BOOLEAN: ClassVar[str] = "boolean"
|
||||
ARRAY_BOOLEAN: ClassVar[str] = "array[boolean]"
|
||||
SECRET: ClassVar[str] = "secret"
|
||||
FILE: ClassVar[str] = "file"
|
||||
ARRAY_FILE: ClassVar[str] = "array[file]"
|
||||
GROUP: ClassVar[str] = "group"
|
||||
|
||||
class ArrayValidation(BaseModel):
|
||||
pass
|
||||
16
dify-workflow-engine/core/variables/variables.py
Normal file
16
dify-workflow-engine/core/variables/variables.py
Normal file
@ -0,0 +1,16 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
class Variable(BaseModel):
|
||||
pass
|
||||
|
||||
class IntegerVariable(Variable):
|
||||
pass
|
||||
|
||||
class ArrayAnyVariable(Variable):
|
||||
pass
|
||||
|
||||
class RAGPipelineVariableInput(BaseModel):
|
||||
pass
|
||||
|
||||
class VariableUnion(BaseModel):
|
||||
pass
|
||||
132
dify-workflow-engine/core/workflow/README.md
Normal file
132
dify-workflow-engine/core/workflow/README.md
Normal file
@ -0,0 +1,132 @@
|
||||
# Workflow
|
||||
|
||||
## Project Overview
|
||||
|
||||
This is the workflow graph engine module of Dify, implementing a queue-based distributed workflow execution system. The engine handles agentic AI workflows with support for parallel execution, node iteration, conditional logic, and external command control.
|
||||
|
||||
## Architecture
|
||||
|
||||
### Core Components
|
||||
|
||||
The graph engine follows a layered architecture with strict dependency rules:
|
||||
|
||||
1. **Graph Engine** (`graph_engine/`) - Orchestrates workflow execution
|
||||
|
||||
- **Manager** - External control interface for stop/pause/resume commands
|
||||
- **Worker** - Node execution runtime
|
||||
- **Command Processing** - Handles control commands (abort, pause, resume)
|
||||
- **Event Management** - Event propagation and layer notifications
|
||||
- **Graph Traversal** - Edge processing and skip propagation
|
||||
- **Response Coordinator** - Path tracking and session management
|
||||
- **Layers** - Pluggable middleware (debug logging, execution limits)
|
||||
- **Command Channels** - Communication channels (InMemory, Redis)
|
||||
|
||||
1. **Graph** (`graph/`) - Graph structure and runtime state
|
||||
|
||||
- **Graph Template** - Workflow definition
|
||||
- **Edge** - Node connections with conditions
|
||||
- **Runtime State Protocol** - State management interface
|
||||
|
||||
1. **Nodes** (`nodes/`) - Node implementations
|
||||
|
||||
- **Base** - Abstract node classes and variable parsing
|
||||
- **Specific Nodes** - LLM, Agent, Code, HTTP Request, Iteration, Loop, etc.
|
||||
|
||||
1. **Events** (`node_events/`) - Event system
|
||||
|
||||
- **Base** - Event protocols
|
||||
- **Node Events** - Node lifecycle events
|
||||
|
||||
1. **Entities** (`entities/`) - Domain models
|
||||
|
||||
- **Variable Pool** - Variable storage
|
||||
- **Graph Init Params** - Initialization configuration
|
||||
|
||||
## Key Design Patterns
|
||||
|
||||
### Command Channel Pattern
|
||||
|
||||
External workflow control via Redis or in-memory channels:
|
||||
|
||||
```python
|
||||
# Send stop command to running workflow
|
||||
channel = RedisChannel(redis_client, f"workflow:{task_id}:commands")
|
||||
channel.send_command(AbortCommand(reason="User requested"))
|
||||
```
|
||||
|
||||
### Layer System
|
||||
|
||||
Extensible middleware for cross-cutting concerns:
|
||||
|
||||
```python
|
||||
engine = GraphEngine(graph)
|
||||
engine.layer(DebugLoggingLayer(level="INFO"))
|
||||
engine.layer(ExecutionLimitsLayer(max_nodes=100))
|
||||
```
|
||||
|
||||
### Event-Driven Architecture
|
||||
|
||||
All node executions emit events for monitoring and integration:
|
||||
|
||||
- `NodeRunStartedEvent` - Node execution begins
|
||||
- `NodeRunSucceededEvent` - Node completes successfully
|
||||
- `NodeRunFailedEvent` - Node encounters error
|
||||
- `GraphRunStartedEvent/GraphRunCompletedEvent` - Workflow lifecycle
|
||||
|
||||
### Variable Pool
|
||||
|
||||
Centralized variable storage with namespace isolation:
|
||||
|
||||
```python
|
||||
# Variables scoped by node_id
|
||||
pool.add(["node1", "output"], value)
|
||||
result = pool.get(["node1", "output"])
|
||||
```
|
||||
|
||||
## Import Architecture Rules
|
||||
|
||||
The codebase enforces strict layering via import-linter:
|
||||
|
||||
1. **Workflow Layers** (top to bottom):
|
||||
|
||||
- graph_engine → graph_events → graph → nodes → node_events → entities
|
||||
|
||||
1. **Graph Engine Internal Layers**:
|
||||
|
||||
- orchestration → command_processing → event_management → graph_traversal → domain
|
||||
|
||||
1. **Domain Isolation**:
|
||||
|
||||
- Domain models cannot import from infrastructure layers
|
||||
|
||||
1. **Command Channel Independence**:
|
||||
|
||||
- InMemory and Redis channels must remain independent
|
||||
|
||||
## Common Tasks
|
||||
|
||||
### Adding a New Node Type
|
||||
|
||||
1. Create node class in `nodes/<node_type>/`
|
||||
1. Inherit from `BaseNode` or appropriate base class
|
||||
1. Implement `_run()` method
|
||||
1. Register in `nodes/node_mapping.py`
|
||||
1. Add tests in `tests/unit_tests/core/workflow/nodes/`
|
||||
|
||||
### Implementing a Custom Layer
|
||||
|
||||
1. Create class inheriting from `Layer` base
|
||||
1. Override lifecycle methods: `on_graph_start()`, `on_event()`, `on_graph_end()`
|
||||
1. Add to engine via `engine.layer()`
|
||||
|
||||
### Debugging Workflow Execution
|
||||
|
||||
Enable debug logging layer:
|
||||
|
||||
```python
|
||||
debug_layer = DebugLoggingLayer(
|
||||
level="DEBUG",
|
||||
include_inputs=True,
|
||||
include_outputs=True
|
||||
)
|
||||
```
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user