Compare commits

..

1 Commits

Author SHA1 Message Date
e8668782d6 gemini 3 pro dify workflow-engine test 2025-11-19 16:17:47 +08:00
373 changed files with 28063 additions and 3399 deletions

View File

@ -28,11 +28,6 @@ jobs:
# Format code
uv run ruff format ..
- name: count migration progress
run: |
cd api
./cnt_base.sh
- name: ast-grep
run: |
uvx --from ast-grep-cli sg --pattern 'db.session.query($WHATEVER).filter($HERE)' --rewrite 'db.session.query($WHATEVER).where($HERE)' -l py --update-all

View File

@ -1,10 +1,7 @@
name: Run VDB Tests
on:
push:
branches: [main]
paths:
- 'api/core/rag/*.py'
workflow_call:
concurrency:
group: vdb-tests-${{ github.head_ref || github.run_id }}

View File

@ -0,0 +1,30 @@
# Security Penetration Test Report
**Generated:** 2025-11-16 14:02:56 UTC
Executive Summary:
Conducted a thorough white-box security assessment of the API located in /workspace/api, focusing on authentication, authorization, business logic vulnerabilities, and IDOR in key endpoints such as /installed-apps.
Methodology:
- Full recursive file listing and static code analysis to identify HTTP routes and sensitive endpoint implementations.
- Focused static analysis on endpoints handling sensitive actions, authentication, and role-based authorization.
- Created specialized agents for authentication and business logic vulnerability testing.
- Dynamic testing attempted for IDOR and authorization bypass, limited by local API server unavailability.
- All findings documented with recommended next steps.
Findings:
- Discovered multiple /installed-apps endpoints with solid authentication and multi-layered authorization checks enforcing tenant and role ownership.
- No exploitable access control bypass or privilege escalation vulnerabilities confirmed.
- Dynamic vulnerability testing for IDOR hampered due to connection refusals, preventing full validation.
- Created a high-priority note recommending environment verification and retesting of dynamic attacks once the API server is accessible.
Recommendations:
- Verify and restore access to the local API server to enable full dynamic testing.
- Retry dynamic testing for IDOR and authorization bypass attacks to confirm security.
- Continue layered security reviews focusing on evolving business logic and role enforcement.
- Consider adding automated integration tests validating authorization policies.
Conclusion:
The static analysis phase confirmed robust authentication and authorization controls in key sensitive endpoints; however, dynamic testing limitations prevent final validation. Once dynamic testing is possible, verify no IDOR or broken function-level authorization issues remain. This assessment provides a strong foundation for secure API usage and further iterative validation.
Severity: Medium (due to testing environment constraints limiting dynamic verification)

View File

@ -159,8 +159,9 @@ SUPABASE_URL=your-server-url
# CORS configuration
WEB_API_CORS_ALLOW_ORIGINS=http://localhost:3000,*
CONSOLE_CORS_ALLOW_ORIGINS=http://localhost:3000,*
# When the frontend and backend run on different subdomains, set COOKIE_DOMAIN to the sites top-level domain (e.g., `example.com`). Leading dots are optional.
COOKIE_DOMAIN=
# Set COOKIE_DOMAIN when the console frontend and API are on different subdomains.
# Provide the registrable domain (e.g. example.com); leading dots are optional.
COOKIE_DOMAIN=localhost:5001
# Vector database configuration
# Supported values are `weaviate`, `qdrant`, `milvus`, `myscale`, `relyt`, `pgvector`, `pgvecto-rs`, `chroma`, `opensearch`, `oracle`, `tencent`, `elasticsearch`, `elasticsearch-ja`, `analyticdb`, `couchbase`, `vikingdb`, `oceanbase`, `opengauss`, `tablestore`,`vastbase`,`tidb`,`tidb_on_qdrant`,`baidu`,`lindorm`,`huawei_cloud`,`upstash`, `matrixone`.

View File

@ -26,10 +26,6 @@
cp .env.example .env
```
> [!IMPORTANT]
>
> When the frontend and backend run on different subdomains, set COOKIE_DOMAIN to the sites top-level domain (e.g., `example.com`). The frontend and backend must be under the same top-level domain in order to share authentication cookies.
1. Generate a `SECRET_KEY` in the `.env` file.
bash for Linux

11
api/bin/env Normal file
View File

@ -0,0 +1,11 @@
#!/bin/sh
# add binaries to PATH if they aren't added yet
# affix colons on either side of $PATH to simplify matching
case ":${PATH}:" in
*:"$HOME/.local/bin":*)
;;
*)
# Prepending path in case a system-installed binary needs to be overridden
export PATH="$HOME/.local/bin:$PATH"
;;
esac

4
api/bin/env.fish Normal file
View File

@ -0,0 +1,4 @@
if not contains "$HOME/.local/bin" $PATH
# Prepending path in case a system-installed binary needs to be overridden
set -x PATH "$HOME/.local/bin" $PATH
end

BIN
api/bin/uv Executable file

Binary file not shown.

BIN
api/bin/uvx Executable file

Binary file not shown.

View File

@ -1,7 +0,0 @@
#!/bin/bash
set -euxo pipefail
for pattern in "Base" "TypeBase"; do
printf "%s " "$pattern"
grep "($pattern):" -r --include='*.py' --exclude-dir=".venv" --exclude-dir="tests" . | wc -l
done

View File

@ -216,7 +216,6 @@ def setup_required(view: Callable[P, R]):
raise NotInitValidateError()
elif dify_config.EDITION == "SELF_HOSTED" and not db.session.query(DifySetup).first():
raise NotSetupError()
return view(*args, **kwargs)
return decorated

View File

@ -192,6 +192,7 @@ class GraphEngine:
self._dispatcher = Dispatcher(
event_queue=self._event_queue,
event_handler=self._event_handler_registry,
event_collector=self._event_manager,
execution_coordinator=self._execution_coordinator,
event_emitter=self._event_manager,
)

View File

@ -43,6 +43,7 @@ class Dispatcher:
self,
event_queue: queue.Queue[GraphNodeEventBase],
event_handler: "EventHandler",
event_collector: EventManager,
execution_coordinator: ExecutionCoordinator,
event_emitter: EventManager | None = None,
) -> None:
@ -52,11 +53,13 @@ class Dispatcher:
Args:
event_queue: Queue of events from workers
event_handler: Event handler registry for processing events
event_collector: Event manager for collecting unhandled events
execution_coordinator: Coordinator for execution flow
event_emitter: Optional event manager to signal completion
"""
self._event_queue = event_queue
self._event_handler = event_handler
self._event_collector = event_collector
self._execution_coordinator = execution_coordinator
self._event_emitter = event_emitter
@ -83,31 +86,37 @@ class Dispatcher:
def _dispatcher_loop(self) -> None:
"""Main dispatcher loop."""
try:
self._process_commands()
while not self._stop_event.is_set():
if (
self._execution_coordinator.aborted
or self._execution_coordinator.paused
or self._execution_coordinator.execution_complete
):
break
commands_checked = False
should_check_commands = False
should_break = False
self._execution_coordinator.check_scaling()
try:
event = self._event_queue.get(timeout=0.1)
self._event_handler.dispatch(event)
self._event_queue.task_done()
self._process_commands(event)
except queue.Empty:
time.sleep(0.1)
if self._execution_coordinator.is_execution_complete():
should_check_commands = True
should_break = True
else:
# Check for scaling
self._execution_coordinator.check_scaling()
self._process_commands()
while True:
try:
event = self._event_queue.get(block=False)
self._event_handler.dispatch(event)
self._event_queue.task_done()
except queue.Empty:
# Process events
try:
event = self._event_queue.get(timeout=0.1)
# Route to the event handler
self._event_handler.dispatch(event)
should_check_commands = self._should_check_commands(event)
self._event_queue.task_done()
except queue.Empty:
# Process commands even when no new events arrive so abort requests are not missed
should_check_commands = True
time.sleep(0.1)
if should_check_commands and not commands_checked:
self._execution_coordinator.check_commands()
commands_checked = True
if should_break:
if not commands_checked:
self._execution_coordinator.check_commands()
break
except Exception as e:
@ -120,6 +129,6 @@ class Dispatcher:
if self._event_emitter:
self._event_emitter.mark_complete()
def _process_commands(self, event: GraphNodeEventBase | None = None):
if event is None or isinstance(event, self._COMMAND_TRIGGER_EVENTS):
self._execution_coordinator.process_commands()
def _should_check_commands(self, event: GraphNodeEventBase) -> bool:
"""Return True if the event represents a node completion."""
return isinstance(event, self._COMMAND_TRIGGER_EVENTS)

View File

@ -40,7 +40,7 @@ class ExecutionCoordinator:
self._command_processor = command_processor
self._worker_pool = worker_pool
def process_commands(self) -> None:
def check_commands(self) -> None:
"""Process any pending commands."""
self._command_processor.process_commands()
@ -48,16 +48,24 @@ class ExecutionCoordinator:
"""Check and perform worker scaling if needed."""
self._worker_pool.check_and_scale()
@property
def execution_complete(self):
def is_execution_complete(self) -> bool:
"""
Check if execution is complete.
Returns:
True if execution is complete
"""
# Treat paused, aborted, or failed executions as terminal states
if self._graph_execution.is_paused:
return True
if self._graph_execution.aborted or self._graph_execution.has_error:
return True
return self._state_manager.is_execution_complete()
@property
def aborted(self):
return self._graph_execution.aborted or self._graph_execution.has_error
@property
def paused(self) -> bool:
def is_paused(self) -> bool:
"""Expose whether the underlying graph execution is paused."""
return self._graph_execution.is_paused

View File

@ -225,7 +225,7 @@ class Dataset(Base):
ExternalKnowledgeApis.id == external_knowledge_binding.external_knowledge_api_id
)
)
if external_knowledge_api is None or external_knowledge_api.settings is None:
if not external_knowledge_api:
return None
return {
"external_knowledge_id": external_knowledge_binding.external_knowledge_id,
@ -945,20 +945,18 @@ class DatasetQuery(Base):
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=sa.func.current_timestamp())
class DatasetKeywordTable(TypeBase):
class DatasetKeywordTable(Base):
__tablename__ = "dataset_keyword_tables"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="dataset_keyword_table_pkey"),
sa.Index("dataset_keyword_table_dataset_id_idx", "dataset_id"),
)
id: Mapped[str] = mapped_column(
StringUUID, primary_key=True, server_default=sa.text("uuid_generate_v4()"), init=False
)
dataset_id: Mapped[str] = mapped_column(StringUUID, nullable=False, unique=True)
keyword_table: Mapped[str] = mapped_column(sa.Text, nullable=False)
data_source_type: Mapped[str] = mapped_column(
String(255), nullable=False, server_default=sa.text("'database'::character varying"), default="database"
id = mapped_column(StringUUID, primary_key=True, server_default=sa.text("uuid_generate_v4()"))
dataset_id = mapped_column(StringUUID, nullable=False, unique=True)
keyword_table = mapped_column(sa.Text, nullable=False)
data_source_type = mapped_column(
String(255), nullable=False, server_default=sa.text("'database'::character varying")
)
@property
@ -1056,23 +1054,19 @@ class TidbAuthBinding(Base):
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
class Whitelist(TypeBase):
class Whitelist(Base):
__tablename__ = "whitelists"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="whitelists_pkey"),
sa.Index("whitelists_tenant_idx", "tenant_id"),
)
id: Mapped[str] = mapped_column(
StringUUID, primary_key=True, server_default=sa.text("uuid_generate_v4()"), init=False
)
tenant_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
id = mapped_column(StringUUID, primary_key=True, server_default=sa.text("uuid_generate_v4()"))
tenant_id = mapped_column(StringUUID, nullable=True)
category: Mapped[str] = mapped_column(String(255), nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), init=False
)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
class DatasetPermission(TypeBase):
class DatasetPermission(Base):
__tablename__ = "dataset_permissions"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="dataset_permission_pkey"),
@ -1081,21 +1075,15 @@ class DatasetPermission(TypeBase):
sa.Index("idx_dataset_permissions_tenant_id", "tenant_id"),
)
id: Mapped[str] = mapped_column(
StringUUID, server_default=sa.text("uuid_generate_v4()"), primary_key=True, init=False
)
dataset_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
account_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
has_permission: Mapped[bool] = mapped_column(
sa.Boolean, nullable=False, server_default=sa.text("true"), default=True
)
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), init=False
)
id = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"), primary_key=True)
dataset_id = mapped_column(StringUUID, nullable=False)
account_id = mapped_column(StringUUID, nullable=False)
tenant_id = mapped_column(StringUUID, nullable=False)
has_permission: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, server_default=sa.text("true"))
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
class ExternalKnowledgeApis(TypeBase):
class ExternalKnowledgeApis(Base):
__tablename__ = "external_knowledge_apis"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="external_knowledge_apis_pkey"),
@ -1103,20 +1091,16 @@ class ExternalKnowledgeApis(TypeBase):
sa.Index("external_knowledge_apis_name_idx", "name"),
)
id: Mapped[str] = mapped_column(
StringUUID, nullable=False, server_default=sa.text("uuid_generate_v4()"), init=False
)
id = mapped_column(StringUUID, nullable=False, server_default=sa.text("uuid_generate_v4()"))
name: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[str] = mapped_column(String(255), nullable=False)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
settings: Mapped[str | None] = mapped_column(sa.Text, nullable=True)
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), init=False
)
updated_by: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
tenant_id = mapped_column(StringUUID, nullable=False)
settings = mapped_column(sa.Text, nullable=True)
created_by = mapped_column(StringUUID, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_by = mapped_column(StringUUID, nullable=True)
updated_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp(), init=False
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
)
def to_dict(self) -> dict[str, Any]:
@ -1194,7 +1178,7 @@ class DatasetAutoDisableLog(Base):
)
class RateLimitLog(TypeBase):
class RateLimitLog(Base):
__tablename__ = "rate_limit_logs"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="rate_limit_log_pkey"),
@ -1202,12 +1186,12 @@ class RateLimitLog(TypeBase):
sa.Index("rate_limit_log_operation_idx", "operation"),
)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"), init=False)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
id = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
tenant_id = mapped_column(StringUUID, nullable=False)
subscription_plan: Mapped[str] = mapped_column(String(255), nullable=False)
operation: Mapped[str] = mapped_column(String(255), nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=sa.text("CURRENT_TIMESTAMP(0)"), init=False
DateTime, nullable=False, server_default=sa.text("CURRENT_TIMESTAMP(0)")
)

View File

@ -14,7 +14,7 @@ from core.trigger.entities.api_entities import TriggerProviderSubscriptionApiEnt
from core.trigger.entities.entities import Subscription
from core.trigger.utils.endpoint import generate_plugin_trigger_endpoint_url, generate_webhook_trigger_endpoint
from libs.datetime_utils import naive_utc_now
from models.base import Base, TypeBase
from models.base import Base
from models.engine import db
from models.enums import AppTriggerStatus, AppTriggerType, CreatorUserRole, WorkflowTriggerStatus
from models.model import Account
@ -399,7 +399,7 @@ class AppTrigger(Base):
)
class WorkflowSchedulePlan(TypeBase):
class WorkflowSchedulePlan(Base):
"""
Workflow Schedule Configuration
@ -425,7 +425,7 @@ class WorkflowSchedulePlan(TypeBase):
sa.Index("workflow_schedule_plan_next_idx", "next_run_at"),
)
id: Mapped[str] = mapped_column(StringUUID, primary_key=True, server_default=sa.text("uuidv7()"), init=False)
id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()"))
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
node_id: Mapped[str] = mapped_column(String(64), nullable=False)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
@ -436,11 +436,9 @@ class WorkflowSchedulePlan(TypeBase):
# Schedule control
next_run_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), init=False
)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp(), init=False
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
)
def to_dict(self) -> dict[str, Any]:

View File

@ -62,7 +62,7 @@ class ExternalDatasetService:
tenant_id=tenant_id,
created_by=user_id,
updated_by=user_id,
name=str(args.get("name")),
name=args.get("name"),
description=args.get("description", ""),
settings=json.dumps(args.get("settings"), ensure_ascii=False),
)
@ -163,7 +163,7 @@ class ExternalDatasetService:
external_knowledge_api = (
db.session.query(ExternalKnowledgeApis).filter_by(id=external_knowledge_api_id, tenant_id=tenant_id).first()
)
if external_knowledge_api is None or external_knowledge_api.settings is None:
if external_knowledge_api is None:
raise ValueError("api template not found")
settings = json.loads(external_knowledge_api.settings)
for setting in settings:
@ -290,7 +290,7 @@ class ExternalDatasetService:
.filter_by(id=external_knowledge_binding.external_knowledge_api_id)
.first()
)
if external_knowledge_api is None or external_knowledge_api.settings is None:
if not external_knowledge_api:
raise ValueError("external api template not found")
settings = json.loads(external_knowledge_api.settings)

View File

@ -13,13 +13,13 @@ from sqlalchemy import select
from sqlalchemy.orm import Session, sessionmaker
from configs import dify_config
from core.app.apps.workflow.app_generator import SKIP_PREPARE_USER_INPUTS_KEY, WorkflowAppGenerator
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.layers.timeslice_layer import TimeSliceLayer
from core.app.layers.trigger_post_layer import TriggerPostLayer
from extensions.ext_database import db
from models.account import Account
from models.enums import AppTriggerType, CreatorUserRole, WorkflowTriggerStatus
from models.enums import CreatorUserRole, WorkflowTriggerStatus
from models.model import App, EndUser, Tenant
from models.trigger import WorkflowTriggerLog
from models.workflow import Workflow
@ -81,19 +81,6 @@ def execute_workflow_sandbox(task_data_dict: dict[str, Any]):
)
def _build_generator_args(trigger_data: TriggerData) -> dict[str, Any]:
"""Build args passed into WorkflowAppGenerator.generate for Celery executions."""
args: dict[str, Any] = {
"inputs": dict(trigger_data.inputs),
"files": list(trigger_data.files),
}
if trigger_data.trigger_type == AppTriggerType.TRIGGER_WEBHOOK:
args[SKIP_PREPARE_USER_INPUTS_KEY] = True # Webhooks already provide structured inputs
return args
def _execute_workflow_common(
task_data: WorkflowTaskData,
cfs_plan_scheduler: AsyncWorkflowCFSPlanScheduler,
@ -141,7 +128,7 @@ def _execute_workflow_common(
generator = WorkflowAppGenerator()
# Prepare args matching AppGenerateService.generate format
args = _build_generator_args(trigger_data)
args: dict[str, Any] = {"inputs": dict(trigger_data.inputs), "files": list(trigger_data.files)}
# If workflow_id was specified, add it to args
if trigger_data.workflow_id:

View File

@ -9,7 +9,7 @@ from core.rag.index_processor.index_processor_factory import IndexProcessorFacto
from core.tools.utils.web_reader_tool import get_image_upload_file_ids
from extensions.ext_database import db
from extensions.ext_storage import storage
from models.dataset import Dataset, DatasetMetadataBinding, DocumentSegment
from models.dataset import Dataset, DocumentSegment
from models.model import UploadFile
logger = logging.getLogger(__name__)
@ -37,11 +37,6 @@ def batch_clean_document_task(document_ids: list[str], dataset_id: str, doc_form
if not dataset:
raise Exception("Document has no dataset")
db.session.query(DatasetMetadataBinding).where(
DatasetMetadataBinding.dataset_id == dataset_id,
DatasetMetadataBinding.document_id.in_(document_ids),
).delete(synchronize_session=False)
segments = db.session.scalars(
select(DocumentSegment).where(DocumentSegment.document_id.in_(document_ids))
).all()
@ -76,8 +71,7 @@ def batch_clean_document_task(document_ids: list[str], dataset_id: str, doc_form
except Exception:
logger.exception("Delete file failed when document deleted, file_id: %s", file.id)
db.session.delete(file)
db.session.commit()
db.session.commit()
end_at = time.perf_counter()
logger.info(

View File

@ -1,189 +0,0 @@
"""Tests for dispatcher command checking behavior."""
from __future__ import annotations
import queue
from datetime import datetime
from unittest import mock
from core.workflow.entities.pause_reason import SchedulingPause
from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus
from core.workflow.graph_engine.event_management.event_handlers import EventHandler
from core.workflow.graph_engine.orchestration.dispatcher import Dispatcher
from core.workflow.graph_engine.orchestration.execution_coordinator import ExecutionCoordinator
from core.workflow.graph_events import (
GraphNodeEventBase,
NodeRunPauseRequestedEvent,
NodeRunStartedEvent,
NodeRunSucceededEvent,
)
from core.workflow.node_events import NodeRunResult
def test_dispatcher_should_consume_remains_events_after_pause():
event_queue = queue.Queue()
event_queue.put(
GraphNodeEventBase(
id="test",
node_id="test",
node_type=NodeType.START,
)
)
event_handler = mock.Mock(spec=EventHandler)
execution_coordinator = mock.Mock(spec=ExecutionCoordinator)
execution_coordinator.paused.return_value = True
dispatcher = Dispatcher(
event_queue=event_queue,
event_handler=event_handler,
execution_coordinator=execution_coordinator,
)
dispatcher._dispatcher_loop()
assert event_queue.empty()
class _StubExecutionCoordinator:
"""Stub execution coordinator that tracks command checks."""
def __init__(self) -> None:
self.command_checks = 0
self.scaling_checks = 0
self.execution_complete = False
self.failed = False
self._paused = False
def process_commands(self) -> None:
self.command_checks += 1
def check_scaling(self) -> None:
self.scaling_checks += 1
@property
def paused(self) -> bool:
return self._paused
@property
def aborted(self) -> bool:
return False
def mark_complete(self) -> None:
self.execution_complete = True
def mark_failed(self, error: Exception) -> None: # pragma: no cover - defensive, not triggered in tests
self.failed = True
class _StubEventHandler:
"""Minimal event handler that marks execution complete after handling an event."""
def __init__(self, coordinator: _StubExecutionCoordinator) -> None:
self._coordinator = coordinator
self.events = []
def dispatch(self, event) -> None:
self.events.append(event)
self._coordinator.mark_complete()
def _run_dispatcher_for_event(event) -> int:
"""Run the dispatcher loop for a single event and return command check count."""
event_queue: queue.Queue = queue.Queue()
event_queue.put(event)
coordinator = _StubExecutionCoordinator()
event_handler = _StubEventHandler(coordinator)
dispatcher = Dispatcher(
event_queue=event_queue,
event_handler=event_handler,
execution_coordinator=coordinator,
)
dispatcher._dispatcher_loop()
return coordinator.command_checks
def _make_started_event() -> NodeRunStartedEvent:
return NodeRunStartedEvent(
id="start-event",
node_id="node-1",
node_type=NodeType.CODE,
node_title="Test Node",
start_at=datetime.utcnow(),
)
def _make_succeeded_event() -> NodeRunSucceededEvent:
return NodeRunSucceededEvent(
id="success-event",
node_id="node-1",
node_type=NodeType.CODE,
node_title="Test Node",
start_at=datetime.utcnow(),
node_run_result=NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED),
)
def test_dispatcher_checks_commands_during_idle_and_on_completion() -> None:
"""Dispatcher polls commands when idle and after completion events."""
started_checks = _run_dispatcher_for_event(_make_started_event())
succeeded_checks = _run_dispatcher_for_event(_make_succeeded_event())
assert started_checks == 2
assert succeeded_checks == 3
class _PauseStubEventHandler:
"""Minimal event handler that marks execution complete after handling an event."""
def __init__(self, coordinator: _StubExecutionCoordinator) -> None:
self._coordinator = coordinator
self.events = []
def dispatch(self, event) -> None:
self.events.append(event)
if isinstance(event, NodeRunPauseRequestedEvent):
self._coordinator.mark_complete()
def test_dispatcher_drain_event_queue():
events = [
NodeRunStartedEvent(
id="start-event",
node_id="node-1",
node_type=NodeType.CODE,
node_title="Code",
start_at=datetime.utcnow(),
),
NodeRunPauseRequestedEvent(
id="pause-event",
node_id="node-1",
node_type=NodeType.CODE,
reason=SchedulingPause(message="test pause"),
),
NodeRunSucceededEvent(
id="success-event",
node_id="node-1",
node_type=NodeType.CODE,
start_at=datetime.utcnow(),
node_run_result=NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED),
),
]
event_queue: queue.Queue = queue.Queue()
for e in events:
event_queue.put(e)
coordinator = _StubExecutionCoordinator()
event_handler = _PauseStubEventHandler(coordinator)
dispatcher = Dispatcher(
event_queue=event_queue,
event_handler=event_handler,
execution_coordinator=coordinator,
)
dispatcher._dispatcher_loop()
# ensure all events are drained.
assert event_queue.empty()

View File

@ -3,17 +3,13 @@
import time
from unittest.mock import MagicMock
from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.entities.graph_init_params import GraphInitParams
from core.workflow.entities.pause_reason import SchedulingPause
from core.workflow.graph import Graph
from core.workflow.graph_engine import GraphEngine
from core.workflow.graph_engine.command_channels import InMemoryChannel
from core.workflow.graph_engine.entities.commands import AbortCommand, CommandType, PauseCommand
from core.workflow.graph_events import GraphRunAbortedEvent, GraphRunPausedEvent, GraphRunStartedEvent
from core.workflow.nodes.start.start_node import StartNode
from core.workflow.runtime import GraphRuntimeState, VariablePool
from models.enums import UserFrom
def test_abort_command():
@ -30,23 +26,11 @@ def test_abort_command():
mock_graph.root_node.id = "start"
# Create mock nodes with required attributes - using shared runtime state
start_node = StartNode(
id="start",
config={"id": "start"},
graph_init_params=GraphInitParams(
tenant_id="test_tenant",
app_id="test_app",
workflow_id="test_workflow",
graph_config={},
user_id="test_user",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
),
graph_runtime_state=shared_runtime_state,
)
start_node.init_node_data({"title": "start", "variables": []})
mock_graph.nodes["start"] = start_node
mock_start_node = MagicMock()
mock_start_node.state = None
mock_start_node.id = "start"
mock_start_node.graph_runtime_state = shared_runtime_state # Use shared instance
mock_graph.nodes["start"] = mock_start_node
# Mock graph methods
mock_graph.get_outgoing_edges = MagicMock(return_value=[])
@ -140,23 +124,11 @@ def test_pause_command():
mock_graph.root_node = MagicMock()
mock_graph.root_node.id = "start"
start_node = StartNode(
id="start",
config={"id": "start"},
graph_init_params=GraphInitParams(
tenant_id="test_tenant",
app_id="test_app",
workflow_id="test_workflow",
graph_config={},
user_id="test_user",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
),
graph_runtime_state=shared_runtime_state,
)
start_node.init_node_data({"title": "start", "variables": []})
mock_graph.nodes["start"] = start_node
mock_start_node = MagicMock()
mock_start_node.state = None
mock_start_node.id = "start"
mock_start_node.graph_runtime_state = shared_runtime_state
mock_graph.nodes["start"] = mock_start_node
mock_graph.get_outgoing_edges = MagicMock(return_value=[])
mock_graph.get_incoming_edges = MagicMock(return_value=[])
@ -181,5 +153,5 @@ def test_pause_command():
assert pause_events[0].reason == SchedulingPause(message="User requested pause")
graph_execution = engine.graph_runtime_state.graph_execution
assert graph_execution.paused
assert graph_execution.is_paused
assert graph_execution.pause_reason == SchedulingPause(message="User requested pause")

View File

@ -0,0 +1,109 @@
"""Tests for dispatcher command checking behavior."""
from __future__ import annotations
import queue
from datetime import datetime
from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus
from core.workflow.graph_engine.event_management.event_manager import EventManager
from core.workflow.graph_engine.orchestration.dispatcher import Dispatcher
from core.workflow.graph_events import NodeRunStartedEvent, NodeRunSucceededEvent
from core.workflow.node_events import NodeRunResult
class _StubExecutionCoordinator:
"""Stub execution coordinator that tracks command checks."""
def __init__(self) -> None:
self.command_checks = 0
self.scaling_checks = 0
self._execution_complete = False
self.mark_complete_called = False
self.failed = False
self._paused = False
def check_commands(self) -> None:
self.command_checks += 1
def check_scaling(self) -> None:
self.scaling_checks += 1
@property
def is_paused(self) -> bool:
return self._paused
def is_execution_complete(self) -> bool:
return self._execution_complete
def mark_complete(self) -> None:
self.mark_complete_called = True
def mark_failed(self, error: Exception) -> None: # pragma: no cover - defensive, not triggered in tests
self.failed = True
def set_execution_complete(self) -> None:
self._execution_complete = True
class _StubEventHandler:
"""Minimal event handler that marks execution complete after handling an event."""
def __init__(self, coordinator: _StubExecutionCoordinator) -> None:
self._coordinator = coordinator
self.events = []
def dispatch(self, event) -> None:
self.events.append(event)
self._coordinator.set_execution_complete()
def _run_dispatcher_for_event(event) -> int:
"""Run the dispatcher loop for a single event and return command check count."""
event_queue: queue.Queue = queue.Queue()
event_queue.put(event)
coordinator = _StubExecutionCoordinator()
event_handler = _StubEventHandler(coordinator)
event_manager = EventManager()
dispatcher = Dispatcher(
event_queue=event_queue,
event_handler=event_handler,
event_collector=event_manager,
execution_coordinator=coordinator,
)
dispatcher._dispatcher_loop()
return coordinator.command_checks
def _make_started_event() -> NodeRunStartedEvent:
return NodeRunStartedEvent(
id="start-event",
node_id="node-1",
node_type=NodeType.CODE,
node_title="Test Node",
start_at=datetime.utcnow(),
)
def _make_succeeded_event() -> NodeRunSucceededEvent:
return NodeRunSucceededEvent(
id="success-event",
node_id="node-1",
node_type=NodeType.CODE,
node_title="Test Node",
start_at=datetime.utcnow(),
node_run_result=NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED),
)
def test_dispatcher_checks_commands_during_idle_and_on_completion() -> None:
"""Dispatcher polls commands when idle and after completion events."""
started_checks = _run_dispatcher_for_event(_make_started_event())
succeeded_checks = _run_dispatcher_for_event(_make_succeeded_event())
assert started_checks == 1
assert succeeded_checks == 2

View File

@ -48,3 +48,15 @@ def test_handle_pause_noop_when_execution_running() -> None:
worker_pool.stop.assert_not_called()
state_manager.clear_executing.assert_not_called()
def test_is_execution_complete_when_paused() -> None:
"""Paused execution should be treated as complete."""
graph_execution = GraphExecution(workflow_id="workflow")
graph_execution.start()
graph_execution.pause("Awaiting input")
coordinator, state_manager, _worker_pool = _build_coordinator(graph_execution)
state_manager.is_execution_complete.return_value = False
assert coordinator.is_execution_complete()

View File

@ -1,37 +0,0 @@
from core.app.apps.workflow.app_generator import SKIP_PREPARE_USER_INPUTS_KEY
from models.enums import AppTriggerType, WorkflowRunTriggeredFrom
from services.workflow.entities import TriggerData, WebhookTriggerData
from tasks import async_workflow_tasks
def test_build_generator_args_sets_skip_flag_for_webhook():
trigger_data = WebhookTriggerData(
app_id="app",
tenant_id="tenant",
workflow_id="workflow",
root_node_id="node",
inputs={"webhook_data": {"body": {"foo": "bar"}}},
)
args = async_workflow_tasks._build_generator_args(trigger_data)
assert args[SKIP_PREPARE_USER_INPUTS_KEY] is True
assert args["inputs"]["webhook_data"]["body"]["foo"] == "bar"
def test_build_generator_args_keeps_validation_for_other_triggers():
trigger_data = TriggerData(
app_id="app",
tenant_id="tenant",
workflow_id="workflow",
root_node_id="node",
inputs={"foo": "bar"},
files=[],
trigger_type=AppTriggerType.TRIGGER_SCHEDULE,
trigger_from=WorkflowRunTriggeredFrom.SCHEDULE,
)
args = async_workflow_tasks._build_generator_args(trigger_data)
assert SKIP_PREPARE_USER_INPUTS_KEY not in args
assert args["inputs"] == {"foo": "bar"}

BIN
api/uv Executable file

Binary file not shown.

5488
api/uv.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,16 @@
# Dify Workflow Engine
A standalone SDK for executing Dify workflows.
## Installation
```bash
pip install -r requirements.txt
```
## Usage
```python
from core.workflow.workflow_entry import WorkflowEntry
# ...
```

View File

@ -0,0 +1,7 @@
class DifyConfig:
WORKFLOW_CALL_MAX_DEPTH = 5
DEBUG = True
WORKFLOW_MAX_EXECUTION_STEPS = 100
WORKFLOW_MAX_EXECUTION_TIME = 600
dify_config = DifyConfig()

View File

@ -0,0 +1,2 @@
class Agent:
pass

View File

@ -0,0 +1,10 @@
from pydantic import BaseModel
class AgentEntity(BaseModel):
pass
class AgentNodeData(BaseModel):
agent_strategy_name: str
class AgentToolEntity(BaseModel):
pass

View File

@ -0,0 +1,7 @@
from pydantic import BaseModel
class AgentPluginEntity(BaseModel):
pass
class AgentStrategyParameter(BaseModel):
pass

View File

@ -0,0 +1,2 @@
class GenerateTaskStoppedError(Exception):
pass

View File

@ -0,0 +1,6 @@
from enum import Enum
class InvokeFrom(Enum):
DEBUGGER = "debugger"
SERVICE_API = "service_api"
WEB_APP = "web_app"

View File

@ -0,0 +1,2 @@
class CallbackHandler:
pass

View File

@ -0,0 +1,2 @@
class DifyWorkflowCallbackHandler:
pass

View File

@ -0,0 +1,2 @@
class Connection:
pass

View File

@ -0,0 +1,2 @@
class DatasourceManager:
pass

View File

@ -0,0 +1,4 @@
from pydantic import BaseModel
class DatasourceEntity(BaseModel):
pass

View File

@ -0,0 +1,22 @@
from pydantic import BaseModel
class DatasourceEntity(BaseModel):
pass
class DatasourceType(BaseModel):
pass
class DatasourceMessage(BaseModel):
pass
class DatasourceParameter(BaseModel):
pass
class DatasourceProviderType(BaseModel):
pass
class GetOnlineDocumentPageContentRequest(BaseModel):
pass
class OnlineDriveDownloadFileRequest(BaseModel):
pass

View File

@ -0,0 +1,2 @@
class OnlineDocumentDatasourcePlugin:
pass

View File

@ -0,0 +1,2 @@
class OnlineDriveDatasourcePlugin:
pass

View File

@ -0,0 +1,2 @@
class DatasourceFileMessageTransformer:
pass

View File

@ -0,0 +1,6 @@
from .models import File, FileAttribute, FileTransferMethod, FileType
class FileManager:
pass
file_manager = FileManager()

View File

@ -0,0 +1,13 @@
from enum import StrEnum
class FileType(StrEnum):
IMAGE = "image"
AUDIO = "audio"
VIDEO = "video"
DOCUMENT = "document"
CUSTOM = "custom"
class FileTransferMethod(StrEnum):
REMOTE_URL = "remote_url"
LOCAL_FILE = "local_file"
TOOL_FILE = "tool_file"

View File

@ -0,0 +1,14 @@
from pydantic import BaseModel
class File(BaseModel):
def to_dict(self):
return {}
class FileAttribute(BaseModel):
pass
class FileTransferMethod(BaseModel):
pass
class FileType(BaseModel):
pass

View File

@ -0,0 +1,2 @@
class SSKey:
pass

View File

@ -0,0 +1,5 @@
class CodeExecutor:
pass
class CodeLanguage:
pass

View File

@ -0,0 +1,14 @@
class CodeExecutor:
pass
class CodeLanguage:
PYTHON3 = "python3"
JAVASCRIPT = "javascript"
JSON = "json"
STRING = "string"
NUMBER = "number"
OBJECT = "object"
ARRAY = "array"
class CodeExecutionError(Exception):
pass

View File

@ -0,0 +1,2 @@
class CodeNodeProvider:
pass

View File

@ -0,0 +1,2 @@
class JavascriptCodeProvider:
pass

View File

@ -0,0 +1,2 @@
class Python3CodeProvider:
pass

View File

@ -0,0 +1,2 @@
class SSRFProxy:
pass

View File

@ -0,0 +1,2 @@
class TokenBufferMemory:
pass

View File

@ -0,0 +1,2 @@
class TokenBufferMemory:
pass

View File

@ -0,0 +1,5 @@
class ModelManager:
pass
class ModelInstance:
pass

View File

@ -0,0 +1,2 @@
class ModelPropertyKey:
pass

View File

@ -0,0 +1,12 @@
from pydantic import BaseModel
class LLMResult(BaseModel):
pass
class LLMUsage(BaseModel):
@classmethod
def empty_usage(cls):
return cls()
class LLMUsageMetadata(BaseModel):
pass

View File

@ -0,0 +1,7 @@
from pydantic import BaseModel
class ModelType(BaseModel):
pass
class AIModelEntity(BaseModel):
pass

View File

@ -0,0 +1 @@
from .encoders import jsonable_encoder

View File

@ -0,0 +1,2 @@
def jsonable_encoder(obj):
return obj

View File

@ -0,0 +1,2 @@
class PluginManager:
pass

View File

@ -0,0 +1,2 @@
class InvokeCredentials:
pass

View File

@ -0,0 +1,5 @@
class PluginDaemonClientSideError(Exception):
pass
class PluginInvokeError(Exception):
pass

View File

@ -0,0 +1,2 @@
class PluginInstaller:
pass

View File

@ -0,0 +1,2 @@
class PromptTemplate:
pass

View File

@ -0,0 +1,4 @@
from pydantic import BaseModel
class PromptEntity(BaseModel):
pass

View File

@ -0,0 +1,7 @@
from pydantic import BaseModel
class AdvancedPromptEntity(BaseModel):
pass
class MemoryConfig(BaseModel):
pass

View File

@ -0,0 +1,2 @@
class ProviderManager:
pass

View File

@ -0,0 +1 @@
# Mock core.rag

View File

@ -0,0 +1,4 @@
from pydantic import BaseModel
class RetrievalResource(BaseModel):
pass

View File

@ -0,0 +1,7 @@
from pydantic import BaseModel
class CitationMetadata(BaseModel):
pass
class RetrievalSourceMetadata(BaseModel):
pass

View File

@ -0,0 +1,4 @@
from .entities import index_processor_entities
class IndexProcessorBase:
pass

View File

@ -0,0 +1,2 @@
class IndexProcessorBase:
pass

View File

@ -0,0 +1,2 @@
class IndexProcessorFactory:
pass

View File

@ -0,0 +1 @@
from .retrieval_service import RetrievalService

View File

@ -0,0 +1,4 @@
from pydantic import BaseModel
class DatasetRetrieval(BaseModel):
pass

View File

@ -0,0 +1,7 @@
from pydantic import BaseModel
from typing import ClassVar
class RetrievalMethod(BaseModel):
SEMANTIC_SEARCH: ClassVar[str] = "SEMANTIC_SEARCH"
KEYWORD_SEARCH: ClassVar[str] = "KEYWORD_SEARCH"
HYBRID_SEARCH: ClassVar[str] = "HYBRID_SEARCH"

View File

@ -0,0 +1,2 @@
class RetrievalService:
pass

View File

@ -0,0 +1,2 @@
class Tool:
pass

View File

@ -0,0 +1,2 @@
class ToolManager:
pass

View File

@ -0,0 +1,4 @@
from pydantic import BaseModel
class ToolEntity(BaseModel):
pass

View File

@ -0,0 +1,19 @@
from pydantic import BaseModel
class ToolEntity(BaseModel):
pass
class ToolIdentity(BaseModel):
pass
class ToolInvokeMessage(BaseModel):
pass
class ToolParameter(BaseModel):
pass
class ToolProviderType(BaseModel):
pass
class ToolSelector(BaseModel):
pass

View File

@ -0,0 +1,14 @@
class ToolProviderCredentialValidationError(Exception):
pass
class ToolNotFoundError(Exception):
pass
class ToolParameterValidationError(Exception):
pass
class ToolInvokeError(Exception):
pass
class ToolEngineInvokeError(Exception):
pass

View File

@ -0,0 +1,2 @@
class ToolEngine:
pass

View File

@ -0,0 +1,2 @@
class ToolManager:
pass

View File

@ -0,0 +1,2 @@
class ToolUtils:
pass

View File

@ -0,0 +1,5 @@
class ToolMessageTransformer:
pass
class ToolFileMessageTransformer:
pass

View File

@ -0,0 +1,2 @@
class WorkflowAsTool:
pass

View File

@ -0,0 +1,2 @@
class WorkflowTool:
pass

View File

@ -0,0 +1,12 @@
from .variables import Variable, IntegerVariable, ArrayAnyVariable
from .segments import (
Segment,
ArrayFileSegment,
ArrayNumberSegment,
ArrayStringSegment,
NoneSegment,
FileSegment,
ArrayObjectSegment,
SegmentGroup
)
from .types import SegmentType, ArrayValidation

View File

@ -0,0 +1 @@
SELECTORS_LENGTH = 10

View File

@ -0,0 +1,40 @@
from pydantic import BaseModel
class Segment(BaseModel):
pass
class ArrayFileSegment(Segment):
pass
class ArrayNumberSegment(Segment):
pass
class ArrayStringSegment(Segment):
pass
class NoneSegment(Segment):
pass
class FileSegment(Segment):
pass
class ArrayObjectSegment(Segment):
pass
class ArrayBooleanSegment(Segment):
pass
class BooleanSegment(Segment):
pass
class ObjectSegment(Segment):
pass
class ArrayAnySegment(Segment):
pass
class StringSegment(Segment):
pass
class SegmentGroup(Segment):
pass

View File

@ -0,0 +1,19 @@
from pydantic import BaseModel
from typing import ClassVar
class SegmentType(BaseModel):
STRING: ClassVar[str] = "string"
NUMBER: ClassVar[str] = "number"
OBJECT: ClassVar[str] = "object"
ARRAY_STRING: ClassVar[str] = "array[string]"
ARRAY_NUMBER: ClassVar[str] = "array[number]"
ARRAY_OBJECT: ClassVar[str] = "array[object]"
BOOLEAN: ClassVar[str] = "boolean"
ARRAY_BOOLEAN: ClassVar[str] = "array[boolean]"
SECRET: ClassVar[str] = "secret"
FILE: ClassVar[str] = "file"
ARRAY_FILE: ClassVar[str] = "array[file]"
GROUP: ClassVar[str] = "group"
class ArrayValidation(BaseModel):
pass

View File

@ -0,0 +1,16 @@
from pydantic import BaseModel
class Variable(BaseModel):
pass
class IntegerVariable(Variable):
pass
class ArrayAnyVariable(Variable):
pass
class RAGPipelineVariableInput(BaseModel):
pass
class VariableUnion(BaseModel):
pass

View File

@ -0,0 +1,132 @@
# Workflow
## Project Overview
This is the workflow graph engine module of Dify, implementing a queue-based distributed workflow execution system. The engine handles agentic AI workflows with support for parallel execution, node iteration, conditional logic, and external command control.
## Architecture
### Core Components
The graph engine follows a layered architecture with strict dependency rules:
1. **Graph Engine** (`graph_engine/`) - Orchestrates workflow execution
- **Manager** - External control interface for stop/pause/resume commands
- **Worker** - Node execution runtime
- **Command Processing** - Handles control commands (abort, pause, resume)
- **Event Management** - Event propagation and layer notifications
- **Graph Traversal** - Edge processing and skip propagation
- **Response Coordinator** - Path tracking and session management
- **Layers** - Pluggable middleware (debug logging, execution limits)
- **Command Channels** - Communication channels (InMemory, Redis)
1. **Graph** (`graph/`) - Graph structure and runtime state
- **Graph Template** - Workflow definition
- **Edge** - Node connections with conditions
- **Runtime State Protocol** - State management interface
1. **Nodes** (`nodes/`) - Node implementations
- **Base** - Abstract node classes and variable parsing
- **Specific Nodes** - LLM, Agent, Code, HTTP Request, Iteration, Loop, etc.
1. **Events** (`node_events/`) - Event system
- **Base** - Event protocols
- **Node Events** - Node lifecycle events
1. **Entities** (`entities/`) - Domain models
- **Variable Pool** - Variable storage
- **Graph Init Params** - Initialization configuration
## Key Design Patterns
### Command Channel Pattern
External workflow control via Redis or in-memory channels:
```python
# Send stop command to running workflow
channel = RedisChannel(redis_client, f"workflow:{task_id}:commands")
channel.send_command(AbortCommand(reason="User requested"))
```
### Layer System
Extensible middleware for cross-cutting concerns:
```python
engine = GraphEngine(graph)
engine.layer(DebugLoggingLayer(level="INFO"))
engine.layer(ExecutionLimitsLayer(max_nodes=100))
```
### Event-Driven Architecture
All node executions emit events for monitoring and integration:
- `NodeRunStartedEvent` - Node execution begins
- `NodeRunSucceededEvent` - Node completes successfully
- `NodeRunFailedEvent` - Node encounters error
- `GraphRunStartedEvent/GraphRunCompletedEvent` - Workflow lifecycle
### Variable Pool
Centralized variable storage with namespace isolation:
```python
# Variables scoped by node_id
pool.add(["node1", "output"], value)
result = pool.get(["node1", "output"])
```
## Import Architecture Rules
The codebase enforces strict layering via import-linter:
1. **Workflow Layers** (top to bottom):
- graph_engine → graph_events → graph → nodes → node_events → entities
1. **Graph Engine Internal Layers**:
- orchestration → command_processing → event_management → graph_traversal → domain
1. **Domain Isolation**:
- Domain models cannot import from infrastructure layers
1. **Command Channel Independence**:
- InMemory and Redis channels must remain independent
## Common Tasks
### Adding a New Node Type
1. Create node class in `nodes/<node_type>/`
1. Inherit from `BaseNode` or appropriate base class
1. Implement `_run()` method
1. Register in `nodes/node_mapping.py`
1. Add tests in `tests/unit_tests/core/workflow/nodes/`
### Implementing a Custom Layer
1. Create class inheriting from `Layer` base
1. Override lifecycle methods: `on_graph_start()`, `on_event()`, `on_graph_end()`
1. Add to engine via `engine.layer()`
### Debugging Workflow Execution
Enable debug logging layer:
```python
debug_layer = DebugLoggingLayer(
level="DEBUG",
include_inputs=True,
include_outputs=True
)
```

Some files were not shown because too many files have changed in this diff Show More