refactor(api): move workflow knowledge nodes and trigger nodes (#33445)

This commit is contained in:
-LAN-
2026-03-15 15:24:59 +08:00
committed by GitHub
parent 1b6e695520
commit fb41b215c8
232 changed files with 1575 additions and 1421 deletions

View File

@ -96,7 +96,6 @@ ignore_imports =
dify_graph.nodes.tool.tool_node -> core.callback_handler.workflow_tool_callback_handler dify_graph.nodes.tool.tool_node -> core.callback_handler.workflow_tool_callback_handler
dify_graph.nodes.tool.tool_node -> core.tools.tool_engine dify_graph.nodes.tool.tool_node -> core.tools.tool_engine
dify_graph.nodes.tool.tool_node -> core.tools.tool_manager dify_graph.nodes.tool.tool_node -> core.tools.tool_manager
dify_graph.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.app.app_config.entities
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.advanced_prompt_transform dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.advanced_prompt_transform
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.simple_prompt_transform dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.simple_prompt_transform
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> dify_graph.model_runtime.model_providers.__base.large_language_model dify_graph.nodes.parameter_extractor.parameter_extractor_node -> dify_graph.model_runtime.model_providers.__base.large_language_model
@ -116,7 +115,6 @@ ignore_imports =
dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.utils.prompt_message_util dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.utils.prompt_message_util
dify_graph.nodes.question_classifier.entities -> core.prompt.entities.advanced_prompt_entities dify_graph.nodes.question_classifier.entities -> core.prompt.entities.advanced_prompt_entities
dify_graph.nodes.question_classifier.question_classifier_node -> core.prompt.utils.prompt_message_util dify_graph.nodes.question_classifier.question_classifier_node -> core.prompt.utils.prompt_message_util
dify_graph.nodes.knowledge_index.entities -> core.rag.retrieval.retrieval_methods
dify_graph.nodes.llm.node -> models.dataset dify_graph.nodes.llm.node -> models.dataset
dify_graph.nodes.llm.file_saver -> core.tools.signature dify_graph.nodes.llm.file_saver -> core.tools.signature
dify_graph.nodes.llm.file_saver -> core.tools.tool_file_manager dify_graph.nodes.llm.file_saver -> core.tools.tool_file_manager

View File

@ -25,7 +25,8 @@ from controllers.console.wraps import (
) )
from core.ops.ops_trace_manager import OpsTraceManager from core.ops.ops_trace_manager import OpsTraceManager
from core.rag.retrieval.retrieval_methods import RetrievalMethod from core.rag.retrieval.retrieval_methods import RetrievalMethod
from dify_graph.enums import NodeType, WorkflowExecutionStatus from core.trigger.constants import TRIGGER_NODE_TYPES
from dify_graph.enums import WorkflowExecutionStatus
from dify_graph.file import helpers as file_helpers from dify_graph.file import helpers as file_helpers
from extensions.ext_database import db from extensions.ext_database import db
from libs.login import current_account_with_tenant, login_required from libs.login import current_account_with_tenant, login_required
@ -508,11 +509,7 @@ class AppListApi(Resource):
.scalars() .scalars()
.all() .all()
) )
trigger_node_types = { trigger_node_types = TRIGGER_NODE_TYPES
NodeType.TRIGGER_WEBHOOK,
NodeType.TRIGGER_SCHEDULE,
NodeType.TRIGGER_PLUGIN,
}
for workflow in draft_workflows: for workflow in draft_workflows:
node_id = None node_id = None
try: try:

View File

@ -22,6 +22,7 @@ from core.app.apps.workflow.app_generator import SKIP_PREPARE_USER_INPUTS_KEY
from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.app_invoke_entities import InvokeFrom
from core.helper.trace_id_helper import get_external_trace_id from core.helper.trace_id_helper import get_external_trace_id
from core.plugin.impl.exc import PluginInvokeError from core.plugin.impl.exc import PluginInvokeError
from core.trigger.constants import TRIGGER_SCHEDULE_NODE_TYPE
from core.trigger.debug.event_selectors import ( from core.trigger.debug.event_selectors import (
TriggerDebugEvent, TriggerDebugEvent,
TriggerDebugEventPoller, TriggerDebugEventPoller,
@ -1209,7 +1210,7 @@ class DraftWorkflowTriggerNodeApi(Resource):
node_type: NodeType = draft_workflow.get_node_type_from_node_config(node_config) node_type: NodeType = draft_workflow.get_node_type_from_node_config(node_config)
event: TriggerDebugEvent | None = None event: TriggerDebugEvent | None = None
# for schedule trigger, when run single node, just execute directly # for schedule trigger, when run single node, just execute directly
if node_type == NodeType.TRIGGER_SCHEDULE: if node_type == TRIGGER_SCHEDULE_NODE_TYPE:
event = TriggerDebugEvent( event = TriggerDebugEvent(
workflow_args={}, workflow_args={},
node_id=node_id, node_id=node_id,

View File

@ -69,7 +69,7 @@ from dify_graph.entities.pause_reason import HumanInputRequired
from dify_graph.enums import WorkflowExecutionStatus from dify_graph.enums import WorkflowExecutionStatus
from dify_graph.model_runtime.entities.llm_entities import LLMUsage from dify_graph.model_runtime.entities.llm_entities import LLMUsage
from dify_graph.model_runtime.utils.encoders import jsonable_encoder from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from dify_graph.nodes import NodeType from dify_graph.nodes import BuiltinNodeTypes
from dify_graph.repositories.draft_variable_repository import DraftVariableSaverFactory from dify_graph.repositories.draft_variable_repository import DraftVariableSaverFactory
from dify_graph.runtime import GraphRuntimeState from dify_graph.runtime import GraphRuntimeState
from dify_graph.system_variable import SystemVariable from dify_graph.system_variable import SystemVariable
@ -357,7 +357,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
) -> Generator[StreamResponse, None, None]: ) -> Generator[StreamResponse, None, None]:
"""Handle node succeeded events.""" """Handle node succeeded events."""
# Record files if it's an answer node or end node # Record files if it's an answer node or end node
if event.node_type in [NodeType.ANSWER, NodeType.END, NodeType.LLM]: if event.node_type in [BuiltinNodeTypes.ANSWER, BuiltinNodeTypes.END, BuiltinNodeTypes.LLM]:
self._recorded_files.extend( self._recorded_files.extend(
self._workflow_response_converter.fetch_files_from_node_outputs(event.outputs or {}) self._workflow_response_converter.fetch_files_from_node_outputs(event.outputs or {})
) )

View File

@ -48,12 +48,13 @@ from core.app.entities.task_entities import (
from core.plugin.impl.datasource import PluginDatasourceManager from core.plugin.impl.datasource import PluginDatasourceManager
from core.tools.entities.tool_entities import ToolProviderType from core.tools.entities.tool_entities import ToolProviderType
from core.tools.tool_manager import ToolManager from core.tools.tool_manager import ToolManager
from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE
from core.trigger.trigger_manager import TriggerManager from core.trigger.trigger_manager import TriggerManager
from core.workflow.workflow_entry import WorkflowEntry from core.workflow.workflow_entry import WorkflowEntry
from dify_graph.entities.pause_reason import HumanInputRequired from dify_graph.entities.pause_reason import HumanInputRequired
from dify_graph.entities.workflow_start_reason import WorkflowStartReason from dify_graph.entities.workflow_start_reason import WorkflowStartReason
from dify_graph.enums import ( from dify_graph.enums import (
NodeType, BuiltinNodeTypes,
SystemVariableKey, SystemVariableKey,
WorkflowExecutionStatus, WorkflowExecutionStatus,
WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionMetadataKey,
@ -442,7 +443,7 @@ class WorkflowResponseConverter:
event: QueueNodeStartedEvent, event: QueueNodeStartedEvent,
task_id: str, task_id: str,
) -> NodeStartStreamResponse | None: ) -> NodeStartStreamResponse | None:
if event.node_type in {NodeType.ITERATION, NodeType.LOOP}: if event.node_type in {BuiltinNodeTypes.ITERATION, BuiltinNodeTypes.LOOP}:
return None return None
run_id = self._ensure_workflow_run_id() run_id = self._ensure_workflow_run_id()
snapshot = self._store_snapshot(event) snapshot = self._store_snapshot(event)
@ -464,13 +465,13 @@ class WorkflowResponseConverter:
) )
try: try:
if event.node_type == NodeType.TOOL: if event.node_type == BuiltinNodeTypes.TOOL:
response.data.extras["icon"] = ToolManager.get_tool_icon( response.data.extras["icon"] = ToolManager.get_tool_icon(
tenant_id=self._application_generate_entity.app_config.tenant_id, tenant_id=self._application_generate_entity.app_config.tenant_id,
provider_type=ToolProviderType(event.provider_type), provider_type=ToolProviderType(event.provider_type),
provider_id=event.provider_id, provider_id=event.provider_id,
) )
elif event.node_type == NodeType.DATASOURCE: elif event.node_type == BuiltinNodeTypes.DATASOURCE:
manager = PluginDatasourceManager() manager = PluginDatasourceManager()
provider_entity = manager.fetch_datasource_provider( provider_entity = manager.fetch_datasource_provider(
self._application_generate_entity.app_config.tenant_id, self._application_generate_entity.app_config.tenant_id,
@ -479,7 +480,7 @@ class WorkflowResponseConverter:
response.data.extras["icon"] = provider_entity.declaration.identity.generate_datasource_icon_url( response.data.extras["icon"] = provider_entity.declaration.identity.generate_datasource_icon_url(
self._application_generate_entity.app_config.tenant_id self._application_generate_entity.app_config.tenant_id
) )
elif event.node_type == NodeType.TRIGGER_PLUGIN: elif event.node_type == TRIGGER_PLUGIN_NODE_TYPE:
response.data.extras["icon"] = TriggerManager.get_trigger_plugin_icon( response.data.extras["icon"] = TriggerManager.get_trigger_plugin_icon(
self._application_generate_entity.app_config.tenant_id, self._application_generate_entity.app_config.tenant_id,
event.provider_id, event.provider_id,
@ -496,7 +497,7 @@ class WorkflowResponseConverter:
event: QueueNodeSucceededEvent | QueueNodeFailedEvent | QueueNodeExceptionEvent, event: QueueNodeSucceededEvent | QueueNodeFailedEvent | QueueNodeExceptionEvent,
task_id: str, task_id: str,
) -> NodeFinishStreamResponse | None: ) -> NodeFinishStreamResponse | None:
if event.node_type in {NodeType.ITERATION, NodeType.LOOP}: if event.node_type in {BuiltinNodeTypes.ITERATION, BuiltinNodeTypes.LOOP}:
return None return None
run_id = self._ensure_workflow_run_id() run_id = self._ensure_workflow_run_id()
snapshot = self._pop_snapshot(event.node_execution_id) snapshot = self._pop_snapshot(event.node_execution_id)
@ -554,7 +555,7 @@ class WorkflowResponseConverter:
event: QueueNodeRetryEvent, event: QueueNodeRetryEvent,
task_id: str, task_id: str,
) -> NodeRetryStreamResponse | None: ) -> NodeRetryStreamResponse | None:
if event.node_type in {NodeType.ITERATION, NodeType.LOOP}: if event.node_type in {BuiltinNodeTypes.ITERATION, BuiltinNodeTypes.LOOP}:
return None return None
run_id = self._ensure_workflow_run_id() run_id = self._ensure_workflow_run_id()
@ -612,7 +613,7 @@ class WorkflowResponseConverter:
data=IterationNodeStartStreamResponse.Data( data=IterationNodeStartStreamResponse.Data(
id=event.node_id, id=event.node_id,
node_id=event.node_id, node_id=event.node_id,
node_type=event.node_type.value, node_type=event.node_type,
title=event.node_title, title=event.node_title,
created_at=int(time.time()), created_at=int(time.time()),
extras={}, extras={},
@ -635,7 +636,7 @@ class WorkflowResponseConverter:
data=IterationNodeNextStreamResponse.Data( data=IterationNodeNextStreamResponse.Data(
id=event.node_id, id=event.node_id,
node_id=event.node_id, node_id=event.node_id,
node_type=event.node_type.value, node_type=event.node_type,
title=event.node_title, title=event.node_title,
index=event.index, index=event.index,
created_at=int(time.time()), created_at=int(time.time()),
@ -662,7 +663,7 @@ class WorkflowResponseConverter:
data=IterationNodeCompletedStreamResponse.Data( data=IterationNodeCompletedStreamResponse.Data(
id=event.node_id, id=event.node_id,
node_id=event.node_id, node_id=event.node_id,
node_type=event.node_type.value, node_type=event.node_type,
title=event.node_title, title=event.node_title,
outputs=new_outputs, outputs=new_outputs,
outputs_truncated=outputs_truncated, outputs_truncated=outputs_truncated,
@ -692,7 +693,7 @@ class WorkflowResponseConverter:
data=LoopNodeStartStreamResponse.Data( data=LoopNodeStartStreamResponse.Data(
id=event.node_id, id=event.node_id,
node_id=event.node_id, node_id=event.node_id,
node_type=event.node_type.value, node_type=event.node_type,
title=event.node_title, title=event.node_title,
created_at=int(time.time()), created_at=int(time.time()),
extras={}, extras={},
@ -715,7 +716,7 @@ class WorkflowResponseConverter:
data=LoopNodeNextStreamResponse.Data( data=LoopNodeNextStreamResponse.Data(
id=event.node_id, id=event.node_id,
node_id=event.node_id, node_id=event.node_id,
node_type=event.node_type.value, node_type=event.node_type,
title=event.node_title, title=event.node_title,
index=event.index, index=event.index,
# The `pre_loop_output` field is not utilized by the frontend. # The `pre_loop_output` field is not utilized by the frontend.
@ -744,7 +745,7 @@ class WorkflowResponseConverter:
data=LoopNodeCompletedStreamResponse.Data( data=LoopNodeCompletedStreamResponse.Data(
id=event.node_id, id=event.node_id,
node_id=event.node_id, node_id=event.node_id,
node_type=event.node_type.value, node_type=event.node_type,
title=event.node_title, title=event.node_title,
outputs=new_outputs, outputs=new_outputs,
outputs_truncated=outputs_truncated, outputs_truncated=outputs_truncated,

View File

@ -12,7 +12,7 @@ from core.app.entities.app_invoke_entities import (
build_dify_run_context, build_dify_run_context,
) )
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.workflow.node_factory import DifyNodeFactory from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id
from core.workflow.workflow_entry import WorkflowEntry from core.workflow.workflow_entry import WorkflowEntry
from dify_graph.entities.graph_init_params import GraphInitParams from dify_graph.entities.graph_init_params import GraphInitParams
from dify_graph.enums import WorkflowType from dify_graph.enums import WorkflowType
@ -274,6 +274,8 @@ class PipelineRunner(WorkflowBasedAppRunner):
graph_init_params=graph_init_params, graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state, graph_runtime_state=graph_runtime_state,
) )
if start_node_id is None:
start_node_id = get_default_root_node_id(graph_config)
graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=start_node_id) graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=start_node_id)
if not graph: if not graph:

View File

@ -32,8 +32,8 @@ from core.app.entities.queue_entities import (
QueueWorkflowStartedEvent, QueueWorkflowStartedEvent,
QueueWorkflowSucceededEvent, QueueWorkflowSucceededEvent,
) )
from core.workflow.node_factory import DifyNodeFactory from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from core.workflow.node_resolution import resolve_workflow_node_class from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id, resolve_workflow_node_class
from core.workflow.workflow_entry import WorkflowEntry from core.workflow.workflow_entry import WorkflowEntry
from dify_graph.entities import GraphInitParams from dify_graph.entities import GraphInitParams
from dify_graph.entities.graph_config import NodeConfigDictAdapter from dify_graph.entities.graph_config import NodeConfigDictAdapter
@ -140,6 +140,9 @@ class WorkflowBasedAppRunner:
graph_runtime_state=graph_runtime_state, graph_runtime_state=graph_runtime_state,
) )
if root_node_id is None:
root_node_id = get_default_root_node_id(graph_config)
# init graph # init graph
graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=root_node_id) graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=root_node_id)
@ -505,7 +508,9 @@ class WorkflowBasedAppRunner:
elif isinstance(event, NodeRunRetrieverResourceEvent): elif isinstance(event, NodeRunRetrieverResourceEvent):
self._publish_event( self._publish_event(
QueueRetrieverResourcesEvent( QueueRetrieverResourcesEvent(
retriever_resources=event.retriever_resources, retriever_resources=[
RetrievalSourceMetadata.model_validate(resource) for resource in event.retriever_resources
],
in_iteration_id=event.in_iteration_id, in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id, in_loop_id=event.in_loop_id,
) )

View File

@ -9,9 +9,8 @@ from core.app.entities.agent_strategy import AgentStrategyInfo
from core.rag.entities.citation_metadata import RetrievalSourceMetadata from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from dify_graph.entities.pause_reason import PauseReason from dify_graph.entities.pause_reason import PauseReason
from dify_graph.entities.workflow_start_reason import WorkflowStartReason from dify_graph.entities.workflow_start_reason import WorkflowStartReason
from dify_graph.enums import WorkflowNodeExecutionMetadataKey from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey
from dify_graph.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk from dify_graph.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk
from dify_graph.nodes import NodeType
class QueueEvent(StrEnum): class QueueEvent(StrEnum):

View File

@ -2,7 +2,7 @@ import logging
from dify_graph.constants import CONVERSATION_VARIABLE_NODE_ID from dify_graph.constants import CONVERSATION_VARIABLE_NODE_ID
from dify_graph.conversation_variable_updater import ConversationVariableUpdater from dify_graph.conversation_variable_updater import ConversationVariableUpdater
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes
from dify_graph.graph_engine.layers.base import GraphEngineLayer from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.graph_events import GraphEngineEvent, NodeRunSucceededEvent from dify_graph.graph_events import GraphEngineEvent, NodeRunSucceededEvent
from dify_graph.nodes.variable_assigner.common import helpers as common_helpers from dify_graph.nodes.variable_assigner.common import helpers as common_helpers
@ -22,7 +22,7 @@ class ConversationVariablePersistenceLayer(GraphEngineLayer):
def on_event(self, event: GraphEngineEvent) -> None: def on_event(self, event: GraphEngineEvent) -> None:
if not isinstance(event, NodeRunSucceededEvent): if not isinstance(event, NodeRunSucceededEvent):
return return
if event.node_type != NodeType.VARIABLE_ASSIGNER: if event.node_type != BuiltinNodeTypes.VARIABLE_ASSIGNER:
return return
if self.graph_runtime_state is None: if self.graph_runtime_state is None:
return return

View File

@ -12,7 +12,7 @@ from typing_extensions import override
from core.app.llm import deduct_llm_quota, ensure_llm_quota_available from core.app.llm import deduct_llm_quota, ensure_llm_quota_available
from core.errors.error import QuotaExceededError from core.errors.error import QuotaExceededError
from core.model_manager import ModelInstance from core.model_manager import ModelInstance
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes
from dify_graph.graph_engine.entities.commands import AbortCommand, CommandType from dify_graph.graph_engine.entities.commands import AbortCommand, CommandType
from dify_graph.graph_engine.layers.base import GraphEngineLayer from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.graph_events import GraphEngineEvent, GraphNodeEventBase from dify_graph.graph_events import GraphEngineEvent, GraphNodeEventBase
@ -113,11 +113,11 @@ class LLMQuotaLayer(GraphEngineLayer):
def _extract_model_instance(node: Node) -> ModelInstance | None: def _extract_model_instance(node: Node) -> ModelInstance | None:
try: try:
match node.node_type: match node.node_type:
case NodeType.LLM: case BuiltinNodeTypes.LLM:
return cast("LLMNode", node).model_instance return cast("LLMNode", node).model_instance
case NodeType.PARAMETER_EXTRACTOR: case BuiltinNodeTypes.PARAMETER_EXTRACTOR:
return cast("ParameterExtractorNode", node).model_instance return cast("ParameterExtractorNode", node).model_instance
case NodeType.QUESTION_CLASSIFIER: case BuiltinNodeTypes.QUESTION_CLASSIFIER:
return cast("QuestionClassifierNode", node).model_instance return cast("QuestionClassifierNode", node).model_instance
case _: case _:
return None return None

View File

@ -16,7 +16,7 @@ from opentelemetry.trace import Span, SpanKind, Tracer, get_tracer, set_span_in_
from typing_extensions import override from typing_extensions import override
from configs import dify_config from configs import dify_config
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes, NodeType
from dify_graph.graph_engine.layers.base import GraphEngineLayer from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.graph_events import GraphNodeEventBase from dify_graph.graph_events import GraphNodeEventBase
from dify_graph.nodes.base.node import Node from dify_graph.nodes.base.node import Node
@ -74,16 +74,13 @@ class ObservabilityLayer(GraphEngineLayer):
def _build_parser_registry(self) -> None: def _build_parser_registry(self) -> None:
"""Initialize parser registry for node types.""" """Initialize parser registry for node types."""
self._parsers = { self._parsers = {
NodeType.TOOL: ToolNodeOTelParser(), BuiltinNodeTypes.TOOL: ToolNodeOTelParser(),
NodeType.LLM: LLMNodeOTelParser(), BuiltinNodeTypes.LLM: LLMNodeOTelParser(),
NodeType.KNOWLEDGE_RETRIEVAL: RetrievalNodeOTelParser(), BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL: RetrievalNodeOTelParser(),
} }
def _get_parser(self, node: Node) -> NodeOTelParser: def _get_parser(self, node: Node) -> NodeOTelParser:
node_type = getattr(node, "node_type", None) return self._parsers.get(node.node_type, self._default_parser)
if isinstance(node_type, NodeType):
return self._parsers.get(node_type, self._default_parser)
return self._default_parser
@override @override
def on_graph_start(self) -> None: def on_graph_start(self) -> None:

View File

@ -24,12 +24,12 @@ from core.datasource.utils.message_transformer import DatasourceFileMessageTrans
from core.datasource.website_crawl.website_crawl_provider import WebsiteCrawlDatasourcePluginProviderController from core.datasource.website_crawl.website_crawl_provider import WebsiteCrawlDatasourcePluginProviderController
from core.db.session_factory import session_factory from core.db.session_factory import session_factory
from core.plugin.impl.datasource import PluginDatasourceManager from core.plugin.impl.datasource import PluginDatasourceManager
from core.workflow.nodes.datasource.entities import DatasourceParameter, OnlineDriveDownloadFileParam
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import WorkflowNodeExecutionMetadataKey from dify_graph.enums import WorkflowNodeExecutionMetadataKey
from dify_graph.file import File from dify_graph.file import File
from dify_graph.file.enums import FileTransferMethod, FileType from dify_graph.file.enums import FileTransferMethod, FileType
from dify_graph.node_events import NodeRunResult, StreamChunkEvent, StreamCompletedEvent from dify_graph.node_events import NodeRunResult, StreamChunkEvent, StreamCompletedEvent
from dify_graph.repositories.datasource_manager_protocol import DatasourceParameter, OnlineDriveDownloadFileParam
from factories import file_factory from factories import file_factory
from models.model import UploadFile from models.model import UploadFile
from models.tools import ToolFile from models.tools import ToolFile

View File

@ -58,7 +58,7 @@ from core.ops.entities.trace_entity import (
) )
from core.repositories import DifyCoreRepositoryFactory from core.repositories import DifyCoreRepositoryFactory
from dify_graph.entities import WorkflowNodeExecution from dify_graph.entities import WorkflowNodeExecution
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey
from extensions.ext_database import db from extensions.ext_database import db
from models import WorkflowNodeExecutionTriggeredFrom from models import WorkflowNodeExecutionTriggeredFrom
@ -302,11 +302,11 @@ class AliyunDataTrace(BaseTraceInstance):
self, node_execution: WorkflowNodeExecution, trace_info: WorkflowTraceInfo, trace_metadata: TraceMetadata self, node_execution: WorkflowNodeExecution, trace_info: WorkflowTraceInfo, trace_metadata: TraceMetadata
): ):
try: try:
if node_execution.node_type == NodeType.LLM: if node_execution.node_type == BuiltinNodeTypes.LLM:
node_span = self.build_workflow_llm_span(trace_info, node_execution, trace_metadata) node_span = self.build_workflow_llm_span(trace_info, node_execution, trace_metadata)
elif node_execution.node_type == NodeType.KNOWLEDGE_RETRIEVAL: elif node_execution.node_type == BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL:
node_span = self.build_workflow_retrieval_span(trace_info, node_execution, trace_metadata) node_span = self.build_workflow_retrieval_span(trace_info, node_execution, trace_metadata)
elif node_execution.node_type == NodeType.TOOL: elif node_execution.node_type == BuiltinNodeTypes.TOOL:
node_span = self.build_workflow_tool_span(trace_info, node_execution, trace_metadata) node_span = self.build_workflow_tool_span(trace_info, node_execution, trace_metadata)
else: else:
node_span = self.build_workflow_task_span(trace_info, node_execution, trace_metadata) node_span = self.build_workflow_task_span(trace_info, node_execution, trace_metadata)

View File

@ -155,8 +155,8 @@ def wrap_span_metadata(metadata, **kwargs):
return metadata return metadata
# Mapping from NodeType string values to OpenInference span kinds. # Mapping from built-in node type strings to OpenInference span kinds.
# NodeType values not listed here default to CHAIN. # Node types not listed here default to CHAIN.
_NODE_TYPE_TO_SPAN_KIND: dict[str, OpenInferenceSpanKindValues] = { _NODE_TYPE_TO_SPAN_KIND: dict[str, OpenInferenceSpanKindValues] = {
"llm": OpenInferenceSpanKindValues.LLM, "llm": OpenInferenceSpanKindValues.LLM,
"knowledge-retrieval": OpenInferenceSpanKindValues.RETRIEVER, "knowledge-retrieval": OpenInferenceSpanKindValues.RETRIEVER,
@ -168,7 +168,7 @@ _NODE_TYPE_TO_SPAN_KIND: dict[str, OpenInferenceSpanKindValues] = {
def _get_node_span_kind(node_type: str) -> OpenInferenceSpanKindValues: def _get_node_span_kind(node_type: str) -> OpenInferenceSpanKindValues:
"""Return the OpenInference span kind for a given workflow node type. """Return the OpenInference span kind for a given workflow node type.
Covers every ``NodeType`` enum value. Nodes that do not have a Covers every built-in node type string. Nodes that do not have a
specialised span kind (e.g. ``start``, ``end``, ``if-else``, specialised span kind (e.g. ``start``, ``end``, ``if-else``,
``code``, ``loop``, ``iteration``, etc.) are mapped to ``CHAIN``. ``code``, ``loop``, ``iteration``, etc.) are mapped to ``CHAIN``.
""" """

View File

@ -28,7 +28,7 @@ from core.ops.langfuse_trace.entities.langfuse_trace_entity import (
) )
from core.ops.utils import filter_none_values from core.ops.utils import filter_none_values
from core.repositories import DifyCoreRepositoryFactory from core.repositories import DifyCoreRepositoryFactory
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes
from extensions.ext_database import db from extensions.ext_database import db
from models import EndUser, WorkflowNodeExecutionTriggeredFrom from models import EndUser, WorkflowNodeExecutionTriggeredFrom
from models.enums import MessageStatus from models.enums import MessageStatus
@ -141,7 +141,7 @@ class LangFuseDataTrace(BaseTraceInstance):
node_name = node_execution.title node_name = node_execution.title
node_type = node_execution.node_type node_type = node_execution.node_type
status = node_execution.status status = node_execution.status
if node_type == NodeType.LLM: if node_type == BuiltinNodeTypes.LLM:
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {} inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
else: else:
inputs = node_execution.inputs or {} inputs = node_execution.inputs or {}

View File

@ -28,7 +28,7 @@ from core.ops.langsmith_trace.entities.langsmith_trace_entity import (
) )
from core.ops.utils import filter_none_values, generate_dotted_order from core.ops.utils import filter_none_values, generate_dotted_order
from core.repositories import DifyCoreRepositoryFactory from core.repositories import DifyCoreRepositoryFactory
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey
from extensions.ext_database import db from extensions.ext_database import db
from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom
@ -163,7 +163,7 @@ class LangSmithDataTrace(BaseTraceInstance):
node_name = node_execution.title node_name = node_execution.title
node_type = node_execution.node_type node_type = node_execution.node_type
status = node_execution.status status = node_execution.status
if node_type == NodeType.LLM: if node_type == BuiltinNodeTypes.LLM:
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {} inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
else: else:
inputs = node_execution.inputs or {} inputs = node_execution.inputs or {}
@ -197,7 +197,7 @@ class LangSmithDataTrace(BaseTraceInstance):
"ls_model_name": process_data.get("model_name", ""), "ls_model_name": process_data.get("model_name", ""),
} }
) )
elif node_type == NodeType.KNOWLEDGE_RETRIEVAL: elif node_type == BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL:
run_type = LangSmithRunType.retriever run_type = LangSmithRunType.retriever
else: else:
run_type = LangSmithRunType.tool run_type = LangSmithRunType.tool

View File

@ -23,7 +23,7 @@ from core.ops.entities.trace_entity import (
TraceTaskName, TraceTaskName,
WorkflowTraceInfo, WorkflowTraceInfo,
) )
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes
from extensions.ext_database import db from extensions.ext_database import db
from models import EndUser from models import EndUser
from models.workflow import WorkflowNodeExecutionModel from models.workflow import WorkflowNodeExecutionModel
@ -145,10 +145,10 @@ class MLflowDataTrace(BaseTraceInstance):
"app_name": node.title, "app_name": node.title,
} }
if node.node_type in (NodeType.LLM, NodeType.QUESTION_CLASSIFIER): if node.node_type in (BuiltinNodeTypes.LLM, BuiltinNodeTypes.QUESTION_CLASSIFIER):
inputs, llm_attributes = self._parse_llm_inputs_and_attributes(node) inputs, llm_attributes = self._parse_llm_inputs_and_attributes(node)
attributes.update(llm_attributes) attributes.update(llm_attributes)
elif node.node_type == NodeType.HTTP_REQUEST: elif node.node_type == BuiltinNodeTypes.HTTP_REQUEST:
inputs = node.process_data # contains request URL inputs = node.process_data # contains request URL
if not inputs: if not inputs:
@ -180,9 +180,9 @@ class MLflowDataTrace(BaseTraceInstance):
# End node span # End node span
finished_at = node.created_at + timedelta(seconds=node.elapsed_time) finished_at = node.created_at + timedelta(seconds=node.elapsed_time)
outputs = json.loads(node.outputs) if node.outputs else {} outputs = json.loads(node.outputs) if node.outputs else {}
if node.node_type == NodeType.KNOWLEDGE_RETRIEVAL: if node.node_type == BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL:
outputs = self._parse_knowledge_retrieval_outputs(outputs) outputs = self._parse_knowledge_retrieval_outputs(outputs)
elif node.node_type == NodeType.LLM: elif node.node_type == BuiltinNodeTypes.LLM:
outputs = outputs.get("text", outputs) outputs = outputs.get("text", outputs)
node_span.end( node_span.end(
outputs=outputs, outputs=outputs,
@ -471,13 +471,13 @@ class MLflowDataTrace(BaseTraceInstance):
def _get_node_span_type(self, node_type: str) -> str: def _get_node_span_type(self, node_type: str) -> str:
"""Map Dify node types to MLflow span types""" """Map Dify node types to MLflow span types"""
node_type_mapping = { node_type_mapping = {
NodeType.LLM: SpanType.LLM, BuiltinNodeTypes.LLM: SpanType.LLM,
NodeType.QUESTION_CLASSIFIER: SpanType.LLM, BuiltinNodeTypes.QUESTION_CLASSIFIER: SpanType.LLM,
NodeType.KNOWLEDGE_RETRIEVAL: SpanType.RETRIEVER, BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL: SpanType.RETRIEVER,
NodeType.TOOL: SpanType.TOOL, BuiltinNodeTypes.TOOL: SpanType.TOOL,
NodeType.CODE: SpanType.TOOL, BuiltinNodeTypes.CODE: SpanType.TOOL,
NodeType.HTTP_REQUEST: SpanType.TOOL, BuiltinNodeTypes.HTTP_REQUEST: SpanType.TOOL,
NodeType.AGENT: SpanType.AGENT, BuiltinNodeTypes.AGENT: SpanType.AGENT,
} }
return node_type_mapping.get(node_type, "CHAIN") # type: ignore[arg-type,call-overload] return node_type_mapping.get(node_type, "CHAIN") # type: ignore[arg-type,call-overload]

View File

@ -23,7 +23,7 @@ from core.ops.entities.trace_entity import (
WorkflowTraceInfo, WorkflowTraceInfo,
) )
from core.repositories import DifyCoreRepositoryFactory from core.repositories import DifyCoreRepositoryFactory
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey
from extensions.ext_database import db from extensions.ext_database import db
from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom
@ -187,7 +187,7 @@ class OpikDataTrace(BaseTraceInstance):
node_name = node_execution.title node_name = node_execution.title
node_type = node_execution.node_type node_type = node_execution.node_type
status = node_execution.status status = node_execution.status
if node_type == NodeType.LLM: if node_type == BuiltinNodeTypes.LLM:
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {} inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
else: else:
inputs = node_execution.inputs or {} inputs = node_execution.inputs or {}

View File

@ -27,7 +27,7 @@ from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from dify_graph.entities.workflow_node_execution import ( from dify_graph.entities.workflow_node_execution import (
WorkflowNodeExecution, WorkflowNodeExecution,
) )
from dify_graph.nodes import NodeType from dify_graph.nodes import BuiltinNodeTypes
from extensions.ext_database import db from extensions.ext_database import db
from models import Account, App, TenantAccountJoin, WorkflowNodeExecutionTriggeredFrom from models import Account, App, TenantAccountJoin, WorkflowNodeExecutionTriggeredFrom
@ -179,7 +179,7 @@ class TencentDataTrace(BaseTraceInstance):
if node_span: if node_span:
self.trace_client.add_span(node_span) self.trace_client.add_span(node_span)
if node_execution.node_type == NodeType.LLM: if node_execution.node_type == BuiltinNodeTypes.LLM:
self._record_llm_metrics(node_execution) self._record_llm_metrics(node_execution)
except Exception: except Exception:
logger.exception("[Tencent APM] Failed to process node execution: %s", node_execution.id) logger.exception("[Tencent APM] Failed to process node execution: %s", node_execution.id)
@ -192,15 +192,15 @@ class TencentDataTrace(BaseTraceInstance):
) -> SpanData | None: ) -> SpanData | None:
"""Build span for different node types""" """Build span for different node types"""
try: try:
if node_execution.node_type == NodeType.LLM: if node_execution.node_type == BuiltinNodeTypes.LLM:
return TencentSpanBuilder.build_workflow_llm_span( return TencentSpanBuilder.build_workflow_llm_span(
trace_id, workflow_span_id, trace_info, node_execution trace_id, workflow_span_id, trace_info, node_execution
) )
elif node_execution.node_type == NodeType.KNOWLEDGE_RETRIEVAL: elif node_execution.node_type == BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL:
return TencentSpanBuilder.build_workflow_retrieval_span( return TencentSpanBuilder.build_workflow_retrieval_span(
trace_id, workflow_span_id, trace_info, node_execution trace_id, workflow_span_id, trace_info, node_execution
) )
elif node_execution.node_type == NodeType.TOOL: elif node_execution.node_type == BuiltinNodeTypes.TOOL:
return TencentSpanBuilder.build_workflow_tool_span( return TencentSpanBuilder.build_workflow_tool_span(
trace_id, workflow_span_id, trace_info, node_execution trace_id, workflow_span_id, trace_info, node_execution
) )

View File

@ -31,7 +31,7 @@ from core.ops.entities.trace_entity import (
) )
from core.ops.weave_trace.entities.weave_trace_entity import WeaveTraceModel from core.ops.weave_trace.entities.weave_trace_entity import WeaveTraceModel
from core.repositories import DifyCoreRepositoryFactory from core.repositories import DifyCoreRepositoryFactory
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey
from extensions.ext_database import db from extensions.ext_database import db
from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom
@ -175,7 +175,7 @@ class WeaveDataTrace(BaseTraceInstance):
node_name = node_execution.title node_name = node_execution.title
node_type = node_execution.node_type node_type = node_execution.node_type
status = node_execution.status status = node_execution.status
if node_type == NodeType.LLM: if node_type == BuiltinNodeTypes.LLM:
inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {} inputs = node_execution.process_data.get("prompts", {}) if node_execution.process_data else {}
else: else:
inputs = node_execution.inputs or {} inputs = node_execution.inputs or {}

View File

@ -1,5 +1,5 @@
from core.plugin.backwards_invocation.base import BaseBackwardsInvocation from core.plugin.backwards_invocation.base import BaseBackwardsInvocation
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes
from dify_graph.nodes.parameter_extractor.entities import ( from dify_graph.nodes.parameter_extractor.entities import (
ModelConfig as ParameterExtractorModelConfig, ModelConfig as ParameterExtractorModelConfig,
) )
@ -52,7 +52,7 @@ class PluginNodeBackwardsInvocation(BaseBackwardsInvocation):
instruction=instruction, # instruct with variables are not supported instruction=instruction, # instruct with variables are not supported
) )
node_data_dict = node_data.model_dump() node_data_dict = node_data.model_dump()
node_data_dict["type"] = NodeType.PARAMETER_EXTRACTOR node_data_dict["type"] = BuiltinNodeTypes.PARAMETER_EXTRACTOR
execution = workflow_service.run_free_workflow_node( execution = workflow_service.run_free_workflow_node(
node_data_dict, node_data_dict,
tenant_id=tenant_id, tenant_id=tenant_id,

View File

@ -9,8 +9,8 @@ from flask import current_app
from sqlalchemy import delete, func, select from sqlalchemy import delete, func, select
from core.db.session_factory import session_factory from core.db.session_factory import session_factory
from dify_graph.nodes.knowledge_index.exc import KnowledgeIndexNodeError from core.workflow.nodes.knowledge_index.exc import KnowledgeIndexNodeError
from dify_graph.repositories.index_processor_protocol import Preview, PreviewItem, QaPreview from core.workflow.nodes.knowledge_index.protocols import Preview, PreviewItem, QaPreview
from models.dataset import Dataset, Document, DocumentSegment from models.dataset import Dataset, Document, DocumentSegment
from .index_processor_factory import IndexProcessorFactory from .index_processor_factory import IndexProcessorFactory

View File

@ -56,18 +56,18 @@ from core.rag.retrieval.template_prompts import (
) )
from core.tools.signature import sign_upload_file from core.tools.signature import sign_upload_file
from core.tools.utils.dataset_retriever.dataset_retriever_base_tool import DatasetRetrieverBaseTool from core.tools.utils.dataset_retriever.dataset_retriever_base_tool import DatasetRetrieverBaseTool
from dify_graph.file import File, FileTransferMethod, FileType from core.workflow.nodes.knowledge_retrieval import exc
from dify_graph.model_runtime.entities.llm_entities import LLMMode, LLMResult, LLMUsage from core.workflow.nodes.knowledge_retrieval.retrieval import (
from dify_graph.model_runtime.entities.message_entities import PromptMessage, PromptMessageRole, PromptMessageTool
from dify_graph.model_runtime.entities.model_entities import ModelFeature, ModelType
from dify_graph.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from dify_graph.nodes.knowledge_retrieval import exc
from dify_graph.repositories.rag_retrieval_protocol import (
KnowledgeRetrievalRequest, KnowledgeRetrievalRequest,
Source, Source,
SourceChildChunk, SourceChildChunk,
SourceMetadata, SourceMetadata,
) )
from dify_graph.file import File, FileTransferMethod, FileType
from dify_graph.model_runtime.entities.llm_entities import LLMMode, LLMResult, LLMUsage
from dify_graph.model_runtime.entities.message_entities import PromptMessage, PromptMessageRole, PromptMessageTool
from dify_graph.model_runtime.entities.model_entities import ModelFeature, ModelType
from dify_graph.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from extensions.ext_database import db from extensions.ext_database import db
from extensions.ext_redis import redis_client from extensions.ext_redis import redis_client
from libs.json_in_md_parser import parse_and_check_json_markdown from libs.json_in_md_parser import parse_and_check_json_markdown

View File

@ -18,7 +18,7 @@ from tenacity import before_sleep_log, retry, retry_if_exception, stop_after_att
from configs import dify_config from configs import dify_config
from dify_graph.entities import WorkflowNodeExecution from dify_graph.entities import WorkflowNodeExecution
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from dify_graph.enums import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from dify_graph.model_runtime.utils.encoders import jsonable_encoder from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from dify_graph.repositories.workflow_node_execution_repository import OrderConfig, WorkflowNodeExecutionRepository from dify_graph.repositories.workflow_node_execution_repository import OrderConfig, WorkflowNodeExecutionRepository
from dify_graph.workflow_type_encoder import WorkflowRuntimeTypeConverter from dify_graph.workflow_type_encoder import WorkflowRuntimeTypeConverter
@ -146,7 +146,7 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository)
index=db_model.index, index=db_model.index,
predecessor_node_id=db_model.predecessor_node_id, predecessor_node_id=db_model.predecessor_node_id,
node_id=db_model.node_id, node_id=db_model.node_id,
node_type=NodeType(db_model.node_type), node_type=db_model.node_type,
title=db_model.title, title=db_model.title,
inputs=inputs, inputs=inputs,
process_data=process_data, process_data=process_data,

View File

@ -3,7 +3,7 @@ from typing import Any
from core.tools.entities.tool_entities import WorkflowToolParameterConfiguration from core.tools.entities.tool_entities import WorkflowToolParameterConfiguration
from core.tools.errors import WorkflowToolHumanInputNotSupportedError from core.tools.errors import WorkflowToolHumanInputNotSupportedError
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes
from dify_graph.nodes.base.entities import OutputVariableEntity from dify_graph.nodes.base.entities import OutputVariableEntity
from dify_graph.variables.input_entities import VariableEntity from dify_graph.variables.input_entities import VariableEntity
@ -51,7 +51,7 @@ class WorkflowToolConfigurationUtils:
def ensure_no_human_input_nodes(cls, graph: Mapping[str, Any]) -> None: def ensure_no_human_input_nodes(cls, graph: Mapping[str, Any]) -> None:
nodes = graph.get("nodes", []) nodes = graph.get("nodes", [])
for node in nodes: for node in nodes:
if node.get("data", {}).get("type") == NodeType.HUMAN_INPUT: if node.get("data", {}).get("type") == BuiltinNodeTypes.HUMAN_INPUT:
raise WorkflowToolHumanInputNotSupportedError() raise WorkflowToolHumanInputNotSupportedError()
@classmethod @classmethod

View File

@ -0,0 +1,18 @@
from typing import Final
TRIGGER_WEBHOOK_NODE_TYPE: Final[str] = "trigger-webhook"
TRIGGER_SCHEDULE_NODE_TYPE: Final[str] = "trigger-schedule"
TRIGGER_PLUGIN_NODE_TYPE: Final[str] = "trigger-plugin"
TRIGGER_INFO_METADATA_KEY: Final[str] = "trigger_info"
TRIGGER_NODE_TYPES: Final[frozenset[str]] = frozenset(
{
TRIGGER_WEBHOOK_NODE_TYPE,
TRIGGER_SCHEDULE_NODE_TYPE,
TRIGGER_PLUGIN_NODE_TYPE,
}
)
def is_trigger_node_type(node_type: str) -> bool:
return node_type in TRIGGER_NODE_TYPES

View File

@ -11,6 +11,11 @@ from typing import Any
from pydantic import BaseModel from pydantic import BaseModel
from core.plugin.entities.request import TriggerInvokeEventResponse from core.plugin.entities.request import TriggerInvokeEventResponse
from core.trigger.constants import (
TRIGGER_PLUGIN_NODE_TYPE,
TRIGGER_SCHEDULE_NODE_TYPE,
TRIGGER_WEBHOOK_NODE_TYPE,
)
from core.trigger.debug.event_bus import TriggerDebugEventBus from core.trigger.debug.event_bus import TriggerDebugEventBus
from core.trigger.debug.events import ( from core.trigger.debug.events import (
PluginTriggerDebugEvent, PluginTriggerDebugEvent,
@ -19,10 +24,9 @@ from core.trigger.debug.events import (
build_plugin_pool_key, build_plugin_pool_key,
build_webhook_pool_key, build_webhook_pool_key,
) )
from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData
from core.workflow.nodes.trigger_schedule.entities import ScheduleConfig
from dify_graph.entities.graph_config import NodeConfigDict from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.enums import NodeType
from dify_graph.nodes.trigger_plugin.entities import TriggerEventNodeData
from dify_graph.nodes.trigger_schedule.entities import ScheduleConfig
from extensions.ext_redis import redis_client from extensions.ext_redis import redis_client
from libs.datetime_utils import ensure_naive_utc, naive_utc_now from libs.datetime_utils import ensure_naive_utc, naive_utc_now
from libs.schedule_utils import calculate_next_run_at from libs.schedule_utils import calculate_next_run_at
@ -206,21 +210,19 @@ def create_event_poller(
if not node_config: if not node_config:
raise ValueError("Node data not found for node %s", node_id) raise ValueError("Node data not found for node %s", node_id)
node_type = draft_workflow.get_node_type_from_node_config(node_config) node_type = draft_workflow.get_node_type_from_node_config(node_config)
match node_type: if node_type == TRIGGER_PLUGIN_NODE_TYPE:
case NodeType.TRIGGER_PLUGIN: return PluginTriggerDebugEventPoller(
return PluginTriggerDebugEventPoller( tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id )
) if node_type == TRIGGER_WEBHOOK_NODE_TYPE:
case NodeType.TRIGGER_WEBHOOK: return WebhookTriggerDebugEventPoller(
return WebhookTriggerDebugEventPoller( tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id )
) if node_type == TRIGGER_SCHEDULE_NODE_TYPE:
case NodeType.TRIGGER_SCHEDULE: return ScheduleTriggerDebugEventPoller(
return ScheduleTriggerDebugEventPoller( tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id
tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id )
) raise ValueError("unable to create event poller for node type %s", node_type)
case _:
raise ValueError("unable to create event poller for node type %s", node_type)
def select_trigger_debug_events( def select_trigger_debug_events(

View File

@ -1,4 +1 @@
from .node_factory import DifyNodeFactory """Core workflow package."""
from .workflow_entry import WorkflowEntry
__all__ = ["DifyNodeFactory", "WorkflowEntry"]

View File

@ -1,4 +1,7 @@
from collections.abc import Callable, Mapping import importlib
import pkgutil
from collections.abc import Callable, Iterator, Mapping, MutableMapping
from functools import lru_cache
from typing import TYPE_CHECKING, Any, TypeAlias, cast, final from typing import TYPE_CHECKING, Any, TypeAlias, cast, final
from sqlalchemy import select from sqlalchemy import select
@ -8,7 +11,6 @@ from typing_extensions import override
from configs import dify_config from configs import dify_config
from core.app.entities.app_invoke_entities import DifyRunContext from core.app.entities.app_invoke_entities import DifyRunContext
from core.app.llm.model_access import build_dify_model_access from core.app.llm.model_access import build_dify_model_access
from core.datasource.datasource_manager import DatasourceManager
from core.helper.code_executor.code_executor import ( from core.helper.code_executor.code_executor import (
CodeExecutionError, CodeExecutionError,
CodeExecutor, CodeExecutor,
@ -17,12 +19,9 @@ from core.helper.ssrf_proxy import ssrf_proxy
from core.memory.token_buffer_memory import TokenBufferMemory from core.memory.token_buffer_memory import TokenBufferMemory
from core.model_manager import ModelInstance from core.model_manager import ModelInstance
from core.prompt.entities.advanced_prompt_entities import MemoryConfig from core.prompt.entities.advanced_prompt_entities import MemoryConfig
from core.rag.index_processor.index_processor import IndexProcessor
from core.rag.retrieval.dataset_retrieval import DatasetRetrieval
from core.rag.summary_index.summary_index import SummaryIndex
from core.repositories.human_input_repository import HumanInputFormRepositoryImpl from core.repositories.human_input_repository import HumanInputFormRepositoryImpl
from core.tools.tool_file_manager import ToolFileManager from core.tools.tool_file_manager import ToolFileManager
from core.workflow.node_resolution import resolve_workflow_node_class from core.trigger.constants import TRIGGER_NODE_TYPES
from core.workflow.nodes.agent.message_transformer import AgentMessageTransformer from core.workflow.nodes.agent.message_transformer import AgentMessageTransformer
from core.workflow.nodes.agent.plugin_strategy_adapter import ( from core.workflow.nodes.agent.plugin_strategy_adapter import (
PluginAgentStrategyPresentationProvider, PluginAgentStrategyPresentationProvider,
@ -32,7 +31,7 @@ from core.workflow.nodes.agent.runtime_support import AgentRuntimeSupport
from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.entities.graph_config import NodeConfigDict, NodeConfigDictAdapter from dify_graph.entities.graph_config import NodeConfigDict, NodeConfigDictAdapter
from dify_graph.entities.graph_init_params import DIFY_RUN_CONTEXT_KEY from dify_graph.entities.graph_init_params import DIFY_RUN_CONTEXT_KEY
from dify_graph.enums import NodeType, SystemVariableKey from dify_graph.enums import BuiltinNodeTypes, NodeType, SystemVariableKey
from dify_graph.file.file_manager import file_manager from dify_graph.file.file_manager import file_manager
from dify_graph.graph.graph import NodeFactory from dify_graph.graph.graph import NodeFactory
from dify_graph.model_runtime.entities.model_entities import ModelType from dify_graph.model_runtime.entities.model_entities import ModelType
@ -59,6 +58,135 @@ if TYPE_CHECKING:
from dify_graph.entities import GraphInitParams from dify_graph.entities import GraphInitParams
from dify_graph.runtime import GraphRuntimeState from dify_graph.runtime import GraphRuntimeState
LATEST_VERSION = "latest"
_START_NODE_TYPES: frozenset[NodeType] = frozenset(
(BuiltinNodeTypes.START, BuiltinNodeTypes.DATASOURCE, *TRIGGER_NODE_TYPES)
)
def _import_node_package(package_name: str, *, excluded_modules: frozenset[str] = frozenset()) -> None:
package = importlib.import_module(package_name)
for _, module_name, _ in pkgutil.walk_packages(package.__path__, package.__name__ + "."):
if module_name in excluded_modules:
continue
importlib.import_module(module_name)
@lru_cache(maxsize=1)
def register_nodes() -> None:
"""Import production node modules so they self-register with ``Node``."""
_import_node_package("dify_graph.nodes")
_import_node_package("core.workflow.nodes")
def get_node_type_classes_mapping() -> Mapping[NodeType, Mapping[str, type[Node]]]:
"""Return a read-only snapshot of the current production node registry.
The workflow layer owns node bootstrap because it must compose built-in
`dify_graph.nodes.*` implementations with workflow-local nodes under
`core.workflow.nodes.*`. Keeping this import side effect here avoids
reintroducing registry bootstrapping into lower-level graph primitives.
"""
register_nodes()
return Node.get_node_type_classes_mapping()
def resolve_workflow_node_class(*, node_type: NodeType, node_version: str) -> type[Node]:
node_mapping = get_node_type_classes_mapping().get(node_type)
if not node_mapping:
raise ValueError(f"No class mapping found for node type: {node_type}")
latest_node_class = node_mapping.get(LATEST_VERSION)
matched_node_class = node_mapping.get(node_version)
node_class = matched_node_class or latest_node_class
if not node_class:
raise ValueError(f"No latest version class found for node type: {node_type}")
return node_class
def is_start_node_type(node_type: NodeType) -> bool:
"""Return True when the node type can serve as a workflow entry point."""
return node_type in _START_NODE_TYPES
def get_default_root_node_id(graph_config: Mapping[str, Any]) -> str:
"""Resolve the default entry node for a persisted top-level workflow graph.
This workflow-layer helper depends on start-node semantics defined by
`is_start_node_type`, so it intentionally lives next to the node registry
instead of in the raw `dify_graph.entities.graph_config` schema module.
"""
nodes = graph_config.get("nodes")
if not isinstance(nodes, list):
raise ValueError("nodes in workflow graph must be a list")
for node in nodes:
if not isinstance(node, Mapping):
continue
if node.get("type") == "custom-note":
continue
node_id = node.get("id")
data = node.get("data")
if not isinstance(node_id, str) or not isinstance(data, Mapping):
continue
node_type = data.get("type")
if isinstance(node_type, str) and is_start_node_type(node_type):
return node_id
raise ValueError("Unable to determine default root node ID from workflow graph")
class _LazyNodeTypeClassesMapping(MutableMapping[NodeType, Mapping[str, type[Node]]]):
"""Mutable dict-like view over the current node registry."""
def __init__(self) -> None:
self._cached_snapshot: dict[NodeType, Mapping[str, type[Node]]] = {}
self._cached_version = -1
self._deleted: set[NodeType] = set()
self._overrides: dict[NodeType, Mapping[str, type[Node]]] = {}
def _snapshot(self) -> dict[NodeType, Mapping[str, type[Node]]]:
current_version = Node.get_registry_version()
if self._cached_version != current_version:
self._cached_snapshot = dict(get_node_type_classes_mapping())
self._cached_version = current_version
if not self._deleted and not self._overrides:
return self._cached_snapshot
snapshot = {key: value for key, value in self._cached_snapshot.items() if key not in self._deleted}
snapshot.update(self._overrides)
return snapshot
def __getitem__(self, key: NodeType) -> Mapping[str, type[Node]]:
return self._snapshot()[key]
def __setitem__(self, key: NodeType, value: Mapping[str, type[Node]]) -> None:
self._deleted.discard(key)
self._overrides[key] = value
def __delitem__(self, key: NodeType) -> None:
if key in self._overrides:
del self._overrides[key]
return
if key in self._cached_snapshot:
self._deleted.add(key)
return
raise KeyError(key)
def __iter__(self) -> Iterator[NodeType]:
return iter(self._snapshot())
def __len__(self) -> int:
return len(self._snapshot())
# Keep the canonical node-class mapping in the workflow layer that also bootstraps
# legacy `core.workflow.nodes.*` registrations.
NODE_TYPE_CLASSES_MAPPING: MutableMapping[NodeType, Mapping[str, type[Node]]] = _LazyNodeTypeClassesMapping()
LLMCompatibleNodeData: TypeAlias = LLMNodeData | QuestionClassifierNodeData | ParameterExtractorNodeData LLMCompatibleNodeData: TypeAlias = LLMNodeData | QuestionClassifierNodeData | ParameterExtractorNodeData
@ -130,7 +258,6 @@ class DifyNodeFactory(NodeFactory):
self._http_request_http_client = ssrf_proxy self._http_request_http_client = ssrf_proxy
self._http_request_tool_file_manager_factory = ToolFileManager self._http_request_tool_file_manager_factory = ToolFileManager
self._http_request_file_manager = file_manager self._http_request_file_manager = file_manager
self._rag_retrieval = DatasetRetrieval()
self._document_extractor_unstructured_api_config = UnstructuredApiConfig( self._document_extractor_unstructured_api_config = UnstructuredApiConfig(
api_url=dify_config.UNSTRUCTURED_API_URL, api_url=dify_config.UNSTRUCTURED_API_URL,
api_key=dify_config.UNSTRUCTURED_API_KEY or "", api_key=dify_config.UNSTRUCTURED_API_KEY or "",
@ -177,56 +304,46 @@ class DifyNodeFactory(NodeFactory):
node_class = self._resolve_node_class(node_type=node_data.type, node_version=str(node_data.version)) node_class = self._resolve_node_class(node_type=node_data.type, node_version=str(node_data.version))
node_type = node_data.type node_type = node_data.type
node_init_kwargs_factories: Mapping[NodeType, Callable[[], dict[str, object]]] = { node_init_kwargs_factories: Mapping[NodeType, Callable[[], dict[str, object]]] = {
NodeType.CODE: lambda: { BuiltinNodeTypes.CODE: lambda: {
"code_executor": self._code_executor, "code_executor": self._code_executor,
"code_limits": self._code_limits, "code_limits": self._code_limits,
}, },
NodeType.TEMPLATE_TRANSFORM: lambda: { BuiltinNodeTypes.TEMPLATE_TRANSFORM: lambda: {
"template_renderer": self._template_renderer, "template_renderer": self._template_renderer,
"max_output_length": self._template_transform_max_output_length, "max_output_length": self._template_transform_max_output_length,
}, },
NodeType.HTTP_REQUEST: lambda: { BuiltinNodeTypes.HTTP_REQUEST: lambda: {
"http_request_config": self._http_request_config, "http_request_config": self._http_request_config,
"http_client": self._http_request_http_client, "http_client": self._http_request_http_client,
"tool_file_manager_factory": self._http_request_tool_file_manager_factory, "tool_file_manager_factory": self._http_request_tool_file_manager_factory,
"file_manager": self._http_request_file_manager, "file_manager": self._http_request_file_manager,
}, },
NodeType.HUMAN_INPUT: lambda: { BuiltinNodeTypes.HUMAN_INPUT: lambda: {
"form_repository": HumanInputFormRepositoryImpl(tenant_id=self._dify_context.tenant_id), "form_repository": HumanInputFormRepositoryImpl(tenant_id=self._dify_context.tenant_id),
}, },
NodeType.KNOWLEDGE_INDEX: lambda: { BuiltinNodeTypes.LLM: lambda: self._build_llm_compatible_node_init_kwargs(
"index_processor": IndexProcessor(),
"summary_index_service": SummaryIndex(),
},
NodeType.LLM: lambda: self._build_llm_compatible_node_init_kwargs(
node_class=node_class, node_class=node_class,
node_data=node_data, node_data=node_data,
include_http_client=True, include_http_client=True,
), ),
NodeType.DATASOURCE: lambda: { BuiltinNodeTypes.DOCUMENT_EXTRACTOR: lambda: {
"datasource_manager": DatasourceManager,
},
NodeType.KNOWLEDGE_RETRIEVAL: lambda: {
"rag_retrieval": self._rag_retrieval,
},
NodeType.DOCUMENT_EXTRACTOR: lambda: {
"unstructured_api_config": self._document_extractor_unstructured_api_config, "unstructured_api_config": self._document_extractor_unstructured_api_config,
"http_client": self._http_request_http_client, "http_client": self._http_request_http_client,
}, },
NodeType.QUESTION_CLASSIFIER: lambda: self._build_llm_compatible_node_init_kwargs( BuiltinNodeTypes.QUESTION_CLASSIFIER: lambda: self._build_llm_compatible_node_init_kwargs(
node_class=node_class, node_class=node_class,
node_data=node_data, node_data=node_data,
include_http_client=True, include_http_client=True,
), ),
NodeType.PARAMETER_EXTRACTOR: lambda: self._build_llm_compatible_node_init_kwargs( BuiltinNodeTypes.PARAMETER_EXTRACTOR: lambda: self._build_llm_compatible_node_init_kwargs(
node_class=node_class, node_class=node_class,
node_data=node_data, node_data=node_data,
include_http_client=False, include_http_client=False,
), ),
NodeType.TOOL: lambda: { BuiltinNodeTypes.TOOL: lambda: {
"tool_file_manager_factory": self._http_request_tool_file_manager_factory(), "tool_file_manager_factory": self._http_request_tool_file_manager_factory(),
}, },
NodeType.AGENT: lambda: { BuiltinNodeTypes.AGENT: lambda: {
"strategy_resolver": self._agent_strategy_resolver, "strategy_resolver": self._agent_strategy_resolver,
"presentation_provider": self._agent_strategy_presentation_provider, "presentation_provider": self._agent_strategy_presentation_provider,
"runtime_support": self._agent_runtime_support, "runtime_support": self._agent_runtime_support,

View File

@ -1,42 +0,0 @@
from __future__ import annotations
from collections.abc import Mapping
from importlib import import_module
from dify_graph.enums import NodeType
from dify_graph.nodes.base.node import Node
from dify_graph.nodes.node_mapping import LATEST_VERSION, get_node_type_classes_mapping
_WORKFLOW_NODE_MODULES = ("core.workflow.nodes.agent",)
_workflow_nodes_registered = False
def ensure_workflow_nodes_registered() -> None:
"""Import workflow-local node modules so they can register with `Node.__init_subclass__`."""
global _workflow_nodes_registered
if _workflow_nodes_registered:
return
for module_name in _WORKFLOW_NODE_MODULES:
import_module(module_name)
_workflow_nodes_registered = True
def get_workflow_node_type_classes_mapping() -> Mapping[NodeType, Mapping[str, type[Node]]]:
ensure_workflow_nodes_registered()
return get_node_type_classes_mapping()
def resolve_workflow_node_class(*, node_type: NodeType, node_version: str) -> type[Node]:
node_mapping = get_workflow_node_type_classes_mapping().get(node_type)
if not node_mapping:
raise ValueError(f"No class mapping found for node type: {node_type}")
latest_node_class = node_mapping.get(LATEST_VERSION)
matched_node_class = node_mapping.get(node_version)
node_class = matched_node_class or latest_node_class
if not node_class:
raise ValueError(f"No latest version class found for node type: {node_type}")
return node_class

View File

@ -0,0 +1 @@
"""Workflow node implementations that remain under the legacy core.workflow namespace."""

View File

@ -4,7 +4,7 @@ from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
from dify_graph.entities.graph_config import NodeConfigDict from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.enums import NodeType, SystemVariableKey, WorkflowNodeExecutionStatus from dify_graph.enums import BuiltinNodeTypes, SystemVariableKey, WorkflowNodeExecutionStatus
from dify_graph.node_events import NodeEventBase, NodeRunResult, StreamCompletedEvent from dify_graph.node_events import NodeEventBase, NodeRunResult, StreamCompletedEvent
from dify_graph.nodes.base.node import Node from dify_graph.nodes.base.node import Node
from dify_graph.nodes.base.variable_template_parser import VariableTemplateParser from dify_graph.nodes.base.variable_template_parser import VariableTemplateParser
@ -24,7 +24,7 @@ if TYPE_CHECKING:
class AgentNode(Node[AgentNodeData]): class AgentNode(Node[AgentNodeData]):
node_type = NodeType.AGENT node_type = BuiltinNodeTypes.AGENT
_strategy_resolver: AgentStrategyResolver _strategy_resolver: AgentStrategyResolver
_presentation_provider: AgentStrategyPresentationProvider _presentation_provider: AgentStrategyPresentationProvider

View File

@ -6,11 +6,11 @@ from pydantic import BaseModel
from core.prompt.entities.advanced_prompt_entities import MemoryConfig from core.prompt.entities.advanced_prompt_entities import MemoryConfig
from core.tools.entities.tool_entities import ToolSelector from core.tools.entities.tool_entities import ToolSelector
from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes, NodeType
class AgentNodeData(BaseNodeData): class AgentNodeData(BaseNodeData):
type: NodeType = NodeType.AGENT type: NodeType = BuiltinNodeTypes.AGENT
agent_strategy_provider_name: str agent_strategy_provider_name: str
agent_strategy_name: str agent_strategy_name: str
agent_strategy_label: str agent_strategy_label: str

View File

@ -8,7 +8,7 @@ from sqlalchemy.orm import Session
from core.tools.entities.tool_entities import ToolInvokeMessage from core.tools.entities.tool_entities import ToolInvokeMessage
from core.tools.utils.message_transformer import ToolFileMessageTransformer from core.tools.utils.message_transformer import ToolFileMessageTransformer
from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from dify_graph.enums import BuiltinNodeTypes, NodeType, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from dify_graph.file import File, FileTransferMethod from dify_graph.file import File, FileTransferMethod
from dify_graph.model_runtime.entities.llm_entities import LLMUsage, LLMUsageMetadata from dify_graph.model_runtime.entities.llm_entities import LLMUsage, LLMUsageMetadata
from dify_graph.model_runtime.utils.encoders import jsonable_encoder from dify_graph.model_runtime.utils.encoders import jsonable_encoder
@ -123,7 +123,7 @@ class AgentMessageTransformer:
) )
elif message.type == ToolInvokeMessage.MessageType.JSON: elif message.type == ToolInvokeMessage.MessageType.JSON:
assert isinstance(message.message, ToolInvokeMessage.JsonMessage) assert isinstance(message.message, ToolInvokeMessage.JsonMessage)
if node_type == NodeType.AGENT: if node_type == BuiltinNodeTypes.AGENT:
if isinstance(message.message.json_object, dict): if isinstance(message.message.json_object, dict):
msg_metadata: dict[str, Any] = message.message.json_object.pop("execution_metadata", {}) msg_metadata: dict[str, Any] = message.message.json_object.pop("execution_metadata", {})
llm_usage = LLMUsage.from_metadata(cast(LLMUsageMetadata, msg_metadata)) llm_usage = LLMUsage.from_metadata(cast(LLMUsageMetadata, msg_metadata))

View File

@ -0,0 +1 @@
"""Datasource workflow node package."""

View File

@ -1,22 +1,17 @@
from collections.abc import Generator, Mapping, Sequence from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
from core.datasource.datasource_manager import DatasourceManager
from core.datasource.entities.datasource_entities import DatasourceProviderType from core.datasource.entities.datasource_entities import DatasourceProviderType
from core.plugin.impl.exc import PluginDaemonClientSideError from core.plugin.impl.exc import PluginDaemonClientSideError
from dify_graph.entities.graph_config import NodeConfigDict from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, NodeType, SystemVariableKey from dify_graph.enums import BuiltinNodeTypes, NodeExecutionType, SystemVariableKey, WorkflowNodeExecutionMetadataKey
from dify_graph.node_events import NodeRunResult, StreamCompletedEvent from dify_graph.node_events import NodeRunResult, StreamCompletedEvent
from dify_graph.nodes.base.node import Node from dify_graph.nodes.base.node import Node
from dify_graph.nodes.base.variable_template_parser import VariableTemplateParser from dify_graph.nodes.base.variable_template_parser import VariableTemplateParser
from dify_graph.repositories.datasource_manager_protocol import (
DatasourceManagerProtocol,
DatasourceParameter,
OnlineDriveDownloadFileParam,
)
from ...entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey from .entities import DatasourceNodeData, DatasourceParameter, OnlineDriveDownloadFileParam
from .entities import DatasourceNodeData
from .exc import DatasourceNodeError from .exc import DatasourceNodeError
if TYPE_CHECKING: if TYPE_CHECKING:
@ -29,7 +24,7 @@ class DatasourceNode(Node[DatasourceNodeData]):
Datasource Node Datasource Node
""" """
node_type = NodeType.DATASOURCE node_type = BuiltinNodeTypes.DATASOURCE
execution_type = NodeExecutionType.ROOT execution_type = NodeExecutionType.ROOT
def __init__( def __init__(
@ -38,7 +33,6 @@ class DatasourceNode(Node[DatasourceNodeData]):
config: NodeConfigDict, config: NodeConfigDict,
graph_init_params: "GraphInitParams", graph_init_params: "GraphInitParams",
graph_runtime_state: "GraphRuntimeState", graph_runtime_state: "GraphRuntimeState",
datasource_manager: DatasourceManagerProtocol,
): ):
super().__init__( super().__init__(
id=id, id=id,
@ -46,7 +40,7 @@ class DatasourceNode(Node[DatasourceNodeData]):
graph_init_params=graph_init_params, graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state, graph_runtime_state=graph_runtime_state,
) )
self.datasource_manager = datasource_manager self.datasource_manager = DatasourceManager
def populate_start_event(self, event) -> None: def populate_start_event(self, event) -> None:
event.provider_id = f"{self.node_data.plugin_id}/{self.node_data.provider_name}" event.provider_id = f"{self.node_data.plugin_id}/{self.node_data.provider_name}"

View File

@ -4,7 +4,7 @@ from pydantic import BaseModel, field_validator
from pydantic_core.core_schema import ValidationInfo from pydantic_core.core_schema import ValidationInfo
from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes, NodeType
class DatasourceEntity(BaseModel): class DatasourceEntity(BaseModel):
@ -17,7 +17,7 @@ class DatasourceEntity(BaseModel):
class DatasourceNodeData(BaseNodeData, DatasourceEntity): class DatasourceNodeData(BaseNodeData, DatasourceEntity):
type: NodeType = NodeType.DATASOURCE type: NodeType = BuiltinNodeTypes.DATASOURCE
class DatasourceInput(BaseModel): class DatasourceInput(BaseModel):
# TODO: check this type # TODO: check this type
@ -42,3 +42,14 @@ class DatasourceNodeData(BaseNodeData, DatasourceEntity):
return typ return typ
datasource_parameters: dict[str, DatasourceInput] | None = None datasource_parameters: dict[str, DatasourceInput] | None = None
class DatasourceParameter(BaseModel):
workspace_id: str
page_id: str
type: str
class OnlineDriveDownloadFileParam(BaseModel):
id: str
bucket: str

View File

@ -1,25 +1,10 @@
from collections.abc import Generator from collections.abc import Generator
from typing import Any, Protocol from typing import Any, Protocol
from pydantic import BaseModel
from dify_graph.file import File from dify_graph.file import File
from dify_graph.node_events import StreamChunkEvent, StreamCompletedEvent from dify_graph.node_events import StreamChunkEvent, StreamCompletedEvent
from .entities import DatasourceParameter, OnlineDriveDownloadFileParam
class DatasourceParameter(BaseModel):
workspace_id: str
page_id: str
type: str
class OnlineDriveDownloadFileParam(BaseModel):
id: str
bucket: str
class DatasourceFinal(BaseModel):
data: dict[str, Any] | None = None
class DatasourceManagerProtocol(Protocol): class DatasourceManagerProtocol(Protocol):

View File

@ -0,0 +1,5 @@
"""Knowledge index workflow node package."""
KNOWLEDGE_INDEX_NODE_TYPE = "knowledge-index"
__all__ = ["KNOWLEDGE_INDEX_NODE_TYPE"]

View File

@ -3,6 +3,7 @@ from typing import Literal, Union
from pydantic import BaseModel from pydantic import BaseModel
from core.rag.retrieval.retrieval_methods import RetrievalMethod from core.rag.retrieval.retrieval_methods import RetrievalMethod
from core.workflow.nodes.knowledge_index import KNOWLEDGE_INDEX_NODE_TYPE
from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType from dify_graph.enums import NodeType
@ -156,7 +157,7 @@ class KnowledgeIndexNodeData(BaseNodeData):
Knowledge index Node Data. Knowledge index Node Data.
""" """
type: NodeType = NodeType.KNOWLEDGE_INDEX type: NodeType = KNOWLEDGE_INDEX_NODE_TYPE
chunk_structure: str chunk_structure: str
index_chunk_variable_selector: list[str] index_chunk_variable_selector: list[str]
indexing_technique: str | None = None indexing_technique: str | None = None

View File

@ -2,14 +2,15 @@ import logging
from collections.abc import Mapping from collections.abc import Mapping
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
from core.rag.index_processor.index_processor import IndexProcessor
from core.rag.summary_index.summary_index import SummaryIndex
from core.workflow.nodes.knowledge_index import KNOWLEDGE_INDEX_NODE_TYPE
from dify_graph.entities.graph_config import NodeConfigDict from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, NodeType, SystemVariableKey from dify_graph.enums import NodeExecutionType, SystemVariableKey
from dify_graph.node_events import NodeRunResult from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node from dify_graph.nodes.base.node import Node
from dify_graph.nodes.base.template import Template from dify_graph.nodes.base.template import Template
from dify_graph.repositories.index_processor_protocol import IndexProcessorProtocol
from dify_graph.repositories.summary_index_service_protocol import SummaryIndexServiceProtocol
from .entities import KnowledgeIndexNodeData from .entities import KnowledgeIndexNodeData
from .exc import ( from .exc import (
@ -25,7 +26,7 @@ _INVOKE_FROM_DEBUGGER = "debugger"
class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
node_type = NodeType.KNOWLEDGE_INDEX node_type = KNOWLEDGE_INDEX_NODE_TYPE
execution_type = NodeExecutionType.RESPONSE execution_type = NodeExecutionType.RESPONSE
def __init__( def __init__(
@ -34,12 +35,10 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
config: NodeConfigDict, config: NodeConfigDict,
graph_init_params: "GraphInitParams", graph_init_params: "GraphInitParams",
graph_runtime_state: "GraphRuntimeState", graph_runtime_state: "GraphRuntimeState",
index_processor: IndexProcessorProtocol,
summary_index_service: SummaryIndexServiceProtocol,
) -> None: ) -> None:
super().__init__(id, config, graph_init_params, graph_runtime_state) super().__init__(id, config, graph_init_params, graph_runtime_state)
self.index_processor = index_processor self.index_processor = IndexProcessor()
self.summary_index_service = summary_index_service self.summary_index_service = SummaryIndex()
def _run(self) -> NodeRunResult: # type: ignore def _run(self) -> NodeRunResult: # type: ignore
node_data = self.node_data node_data = self.node_data

View File

@ -5,21 +5,21 @@ from pydantic import BaseModel, Field
class PreviewItem(BaseModel): class PreviewItem(BaseModel):
content: str | None = Field(None) content: str | None = Field(default=None)
child_chunks: list[str] | None = Field(None) child_chunks: list[str] | None = Field(default=None)
summary: str | None = Field(None) summary: str | None = Field(default=None)
class QaPreview(BaseModel): class QaPreview(BaseModel):
answer: str | None = Field(None) answer: str | None = Field(default=None)
question: str | None = Field(None) question: str | None = Field(default=None)
class Preview(BaseModel): class Preview(BaseModel):
chunk_structure: str chunk_structure: str
parent_mode: str | None = Field(None) parent_mode: str | None = Field(default=None)
preview: list[PreviewItem] = Field([]) preview: list[PreviewItem] = Field(default_factory=list)
qa_preview: list[QaPreview] = Field([]) qa_preview: list[QaPreview] = Field(default_factory=list)
total_segments: int total_segments: int
@ -39,3 +39,9 @@ class IndexProcessorProtocol(Protocol):
def get_preview_output( def get_preview_output(
self, chunks: Any, dataset_id: str, document_id: str, chunk_structure: str, summary_index_setting: dict | None self, chunks: Any, dataset_id: str, document_id: str, chunk_structure: str, summary_index_setting: dict | None
) -> Preview: ... ) -> Preview: ...
class SummaryIndexServiceProtocol(Protocol):
def generate_and_vectorize_summary(
self, dataset_id: str, document_id: str, is_preview: bool, summary_index_setting: dict | None = None
) -> None: ...

View File

@ -0,0 +1 @@
"""Knowledge retrieval workflow node package."""

View File

@ -4,7 +4,7 @@ from typing import Literal
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes, NodeType
from dify_graph.nodes.llm.entities import ModelConfig, VisionConfig from dify_graph.nodes.llm.entities import ModelConfig, VisionConfig
@ -114,7 +114,7 @@ class KnowledgeRetrievalNodeData(BaseNodeData):
Knowledge retrieval Node Data. Knowledge retrieval Node Data.
""" """
type: NodeType = NodeType.KNOWLEDGE_RETRIEVAL type: NodeType = BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL
query_variable_selector: list[str] | None | str = None query_variable_selector: list[str] | None | str = None
query_attachment_selector: list[str] | None | str = None query_attachment_selector: list[str] | None | str = None
dataset_ids: list[str] dataset_ids: list[str]

View File

@ -1,12 +1,19 @@
"""Knowledge retrieval workflow node implementation.
This node now lives under ``core.workflow.nodes`` and is discovered directly by
the workflow node registry.
"""
import logging import logging
from collections.abc import Mapping, Sequence from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any, Literal from typing import TYPE_CHECKING, Any, Literal
from core.app.app_config.entities import DatasetRetrieveConfigEntity from core.app.app_config.entities import DatasetRetrieveConfigEntity
from core.rag.retrieval.dataset_retrieval import DatasetRetrieval
from dify_graph.entities import GraphInitParams from dify_graph.entities import GraphInitParams
from dify_graph.entities.graph_config import NodeConfigDict from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.enums import ( from dify_graph.enums import (
NodeType, BuiltinNodeTypes,
WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionMetadataKey,
WorkflowNodeExecutionStatus, WorkflowNodeExecutionStatus,
) )
@ -15,7 +22,6 @@ from dify_graph.model_runtime.utils.encoders import jsonable_encoder
from dify_graph.node_events import NodeRunResult from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base import LLMUsageTrackingMixin from dify_graph.nodes.base import LLMUsageTrackingMixin
from dify_graph.nodes.base.node import Node from dify_graph.nodes.base.node import Node
from dify_graph.repositories.rag_retrieval_protocol import KnowledgeRetrievalRequest, RAGRetrievalProtocol, Source
from dify_graph.variables import ( from dify_graph.variables import (
ArrayFileSegment, ArrayFileSegment,
FileSegment, FileSegment,
@ -32,6 +38,7 @@ from .exc import (
KnowledgeRetrievalNodeError, KnowledgeRetrievalNodeError,
RateLimitExceededError, RateLimitExceededError,
) )
from .retrieval import KnowledgeRetrievalRequest, Source
if TYPE_CHECKING: if TYPE_CHECKING:
from dify_graph.file.models import File from dify_graph.file.models import File
@ -41,7 +48,7 @@ logger = logging.getLogger(__name__)
class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeData]): class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeData]):
node_type = NodeType.KNOWLEDGE_RETRIEVAL node_type = BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL
# Instance attributes specific to LLMNode. # Instance attributes specific to LLMNode.
# Output variable for file # Output variable for file
@ -53,7 +60,6 @@ class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeD
config: NodeConfigDict, config: NodeConfigDict,
graph_init_params: "GraphInitParams", graph_init_params: "GraphInitParams",
graph_runtime_state: "GraphRuntimeState", graph_runtime_state: "GraphRuntimeState",
rag_retrieval: RAGRetrievalProtocol,
): ):
super().__init__( super().__init__(
id=id, id=id,
@ -63,7 +69,7 @@ class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeD
) )
# LLM file outputs, used for MultiModal outputs. # LLM file outputs, used for MultiModal outputs.
self._file_outputs = [] self._file_outputs = []
self._rag_retrieval = rag_retrieval self._rag_retrieval = DatasetRetrieval()
@classmethod @classmethod
def version(cls): def version(cls):

View File

@ -3,9 +3,10 @@ from typing import Any, Literal, Protocol
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from dify_graph.model_runtime.entities import LLMUsage from dify_graph.model_runtime.entities import LLMUsage
from dify_graph.nodes.knowledge_retrieval.entities import MetadataFilteringCondition
from dify_graph.nodes.llm.entities import ModelConfig from dify_graph.nodes.llm.entities import ModelConfig
from .entities import MetadataFilteringCondition
class SourceChildChunk(BaseModel): class SourceChildChunk(BaseModel):
id: str = Field(default="", description="Child chunk ID") id: str = Field(default="", description="Child chunk ID")
@ -28,7 +29,7 @@ class SourceMetadata(BaseModel):
segment_id: str | None = Field(default=None, description="Segment unique identifier") segment_id: str | None = Field(default=None, description="Segment unique identifier")
retriever_from: str = Field(default="workflow", description="Retriever source context") retriever_from: str = Field(default="workflow", description="Retriever source context")
score: float = Field(default=0.0, description="Retrieval relevance score") score: float = Field(default=0.0, description="Retrieval relevance score")
child_chunks: list[SourceChildChunk] = Field(default=[], description="List of child chunks") child_chunks: list[SourceChildChunk] = Field(default_factory=list, description="List of child chunks")
segment_hit_count: int | None = Field(default=0, description="Number of times segment was retrieved") segment_hit_count: int | None = Field(default=0, description="Number of times segment was retrieved")
segment_word_count: int | None = Field(default=0, description="Word count of the segment") segment_word_count: int | None = Field(default=0, description="Word count of the segment")
segment_position: int | None = Field(default=0, description="Position of segment in document") segment_position: int | None = Field(default=0, description="Position of segment in document")
@ -81,28 +82,7 @@ class KnowledgeRetrievalRequest(BaseModel):
class RAGRetrievalProtocol(Protocol): class RAGRetrievalProtocol(Protocol):
"""Protocol for RAG-based knowledge retrieval implementations.
Implementations of this protocol handle knowledge retrieval from datasets
including rate limiting, dataset filtering, and document retrieval.
"""
@property @property
def llm_usage(self) -> LLMUsage: def llm_usage(self) -> LLMUsage: ...
"""Return accumulated LLM usage for retrieval operations."""
...
def knowledge_retrieval(self, request: KnowledgeRetrievalRequest) -> list[Source]: def knowledge_retrieval(self, request: KnowledgeRetrievalRequest) -> list[Source]: ...
"""Retrieve knowledge from datasets based on the provided request.
Args:
request: Knowledge retrieval request with search parameters
Returns:
List of sources matching the search criteria
Raises:
RateLimitExceededError: If rate limit is exceeded
ModelNotExistError: If specified model doesn't exist
"""
...

View File

@ -3,16 +3,18 @@ from typing import Any, Literal, Union
from pydantic import BaseModel, Field, ValidationInfo, field_validator from pydantic import BaseModel, Field, ValidationInfo, field_validator
from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE
from core.trigger.entities.entities import EventParameter from core.trigger.entities.entities import EventParameter
from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType from dify_graph.enums import NodeType
from dify_graph.nodes.trigger_plugin.exc import TriggerEventParameterError
from .exc import TriggerEventParameterError
class TriggerEventNodeData(BaseNodeData): class TriggerEventNodeData(BaseNodeData):
"""Plugin trigger node data""" """Plugin trigger node data"""
type: NodeType = NodeType.TRIGGER_PLUGIN type: NodeType = TRIGGER_PLUGIN_NODE_TYPE
class TriggerEventInput(BaseModel): class TriggerEventInput(BaseModel):
value: Union[Any, list[str]] value: Union[Any, list[str]]

View File

@ -1,8 +1,10 @@
from collections.abc import Mapping from collections.abc import Mapping
from typing import Any, cast
from core.trigger.constants import TRIGGER_INFO_METADATA_KEY, TRIGGER_PLUGIN_NODE_TYPE
from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, NodeType from dify_graph.enums import NodeExecutionType, WorkflowNodeExecutionMetadataKey
from dify_graph.node_events import NodeRunResult from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node from dify_graph.nodes.base.node import Node
@ -10,7 +12,7 @@ from .entities import TriggerEventNodeData
class TriggerEventNode(Node[TriggerEventNodeData]): class TriggerEventNode(Node[TriggerEventNodeData]):
node_type = NodeType.TRIGGER_PLUGIN node_type = TRIGGER_PLUGIN_NODE_TYPE
execution_type = NodeExecutionType.ROOT execution_type = NodeExecutionType.ROOT
@classmethod @classmethod
@ -44,8 +46,8 @@ class TriggerEventNode(Node[TriggerEventNodeData]):
""" """
# Get trigger data passed when workflow was triggered # Get trigger data passed when workflow was triggered
metadata = { metadata: dict[WorkflowNodeExecutionMetadataKey, Any] = {
WorkflowNodeExecutionMetadataKey.TRIGGER_INFO: { cast(WorkflowNodeExecutionMetadataKey, TRIGGER_INFO_METADATA_KEY): {
"provider_id": self.node_data.provider_id, "provider_id": self.node_data.provider_id,
"event_name": self.node_data.event_name, "event_name": self.node_data.event_name,
"plugin_unique_identifier": self.node_data.plugin_unique_identifier, "plugin_unique_identifier": self.node_data.plugin_unique_identifier,

View File

@ -0,0 +1,3 @@
from .trigger_schedule_node import TriggerScheduleNode
__all__ = ["TriggerScheduleNode"]

View File

@ -2,6 +2,7 @@ from typing import Literal, Union
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from core.trigger.constants import TRIGGER_SCHEDULE_NODE_TYPE
from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType from dify_graph.enums import NodeType
@ -11,7 +12,7 @@ class TriggerScheduleNodeData(BaseNodeData):
Trigger Schedule Node Data Trigger Schedule Node Data
""" """
type: NodeType = NodeType.TRIGGER_SCHEDULE type: NodeType = TRIGGER_SCHEDULE_NODE_TYPE
mode: str = Field(default="visual", description="Schedule mode: visual or cron") mode: str = Field(default="visual", description="Schedule mode: visual or cron")
frequency: str | None = Field(default=None, description="Frequency for visual mode: hourly, daily, weekly, monthly") frequency: str | None = Field(default=None, description="Frequency for visual mode: hourly, daily, weekly, monthly")
cron_expression: str | None = Field(default=None, description="Cron expression for cron mode") cron_expression: str | None = Field(default=None, description="Cron expression for cron mode")

View File

@ -1,15 +1,17 @@
from collections.abc import Mapping from collections.abc import Mapping
from core.trigger.constants import TRIGGER_SCHEDULE_NODE_TYPE
from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, NodeType from dify_graph.enums import NodeExecutionType
from dify_graph.node_events import NodeRunResult from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node from dify_graph.nodes.base.node import Node
from dify_graph.nodes.trigger_schedule.entities import TriggerScheduleNodeData
from .entities import TriggerScheduleNodeData
class TriggerScheduleNode(Node[TriggerScheduleNodeData]): class TriggerScheduleNode(Node[TriggerScheduleNodeData]):
node_type = NodeType.TRIGGER_SCHEDULE node_type = TRIGGER_SCHEDULE_NODE_TYPE
execution_type = NodeExecutionType.ROOT execution_type = NodeExecutionType.ROOT
@classmethod @classmethod
@ -19,7 +21,7 @@ class TriggerScheduleNode(Node[TriggerScheduleNodeData]):
@classmethod @classmethod
def get_default_config(cls, filters: Mapping[str, object] | None = None) -> Mapping[str, object]: def get_default_config(cls, filters: Mapping[str, object] | None = None) -> Mapping[str, object]:
return { return {
"type": "trigger-schedule", "type": TRIGGER_SCHEDULE_NODE_TYPE,
"config": { "config": {
"mode": "visual", "mode": "visual",
"frequency": "daily", "frequency": "daily",

View File

@ -3,6 +3,7 @@ from enum import StrEnum
from pydantic import BaseModel, Field, field_validator from pydantic import BaseModel, Field, field_validator
from core.trigger.constants import TRIGGER_WEBHOOK_NODE_TYPE
from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType from dify_graph.enums import NodeType
from dify_graph.variables.types import SegmentType from dify_graph.variables.types import SegmentType
@ -93,7 +94,7 @@ class WebhookData(BaseNodeData):
class SyncMode(StrEnum): class SyncMode(StrEnum):
SYNC = "async" # only support SYNC = "async" # only support
type: NodeType = NodeType.TRIGGER_WEBHOOK type: NodeType = TRIGGER_WEBHOOK_NODE_TYPE
method: Method = Method.GET method: Method = Method.GET
content_type: ContentType = Field(default=ContentType.JSON) content_type: ContentType = Field(default=ContentType.JSON)
headers: Sequence[WebhookParameter] = Field(default_factory=list) headers: Sequence[WebhookParameter] = Field(default_factory=list)

View File

@ -2,9 +2,10 @@ import logging
from collections.abc import Mapping from collections.abc import Mapping
from typing import Any from typing import Any
from core.trigger.constants import TRIGGER_WEBHOOK_NODE_TYPE
from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, NodeType from dify_graph.enums import NodeExecutionType
from dify_graph.file import FileTransferMethod from dify_graph.file import FileTransferMethod
from dify_graph.node_events import NodeRunResult from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node from dify_graph.nodes.base.node import Node
@ -19,7 +20,7 @@ logger = logging.getLogger(__name__)
class TriggerWebhookNode(Node[WebhookData]): class TriggerWebhookNode(Node[WebhookData]):
node_type = NodeType.TRIGGER_WEBHOOK node_type = TRIGGER_WEBHOOK_NODE_TYPE
execution_type = NodeExecutionType.ROOT execution_type = NodeExecutionType.ROOT
@classmethod @classmethod

View File

@ -8,8 +8,7 @@ from core.app.apps.exc import GenerateTaskStoppedError
from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom, build_dify_run_context from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom, build_dify_run_context
from core.app.workflow.layers.llm_quota import LLMQuotaLayer from core.app.workflow.layers.llm_quota import LLMQuotaLayer
from core.app.workflow.layers.observability import ObservabilityLayer from core.app.workflow.layers.observability import ObservabilityLayer
from core.workflow.node_factory import DifyNodeFactory from core.workflow.node_factory import DifyNodeFactory, resolve_workflow_node_class
from core.workflow.node_resolution import resolve_workflow_node_class
from dify_graph.constants import ENVIRONMENT_VARIABLE_NODE_ID from dify_graph.constants import ENVIRONMENT_VARIABLE_NODE_ID
from dify_graph.entities import GraphInitParams from dify_graph.entities import GraphInitParams
from dify_graph.entities.graph_config import NodeConfigDictAdapter from dify_graph.entities.graph_config import NodeConfigDictAdapter
@ -22,7 +21,7 @@ from dify_graph.graph_engine.layers import DebugLoggingLayer, ExecutionLimitsLay
from dify_graph.graph_engine.layers.base import GraphEngineLayer from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.graph_engine.protocols.command_channel import CommandChannel from dify_graph.graph_engine.protocols.command_channel import CommandChannel
from dify_graph.graph_events import GraphEngineEvent, GraphNodeEventBase, GraphRunFailedEvent from dify_graph.graph_events import GraphEngineEvent, GraphNodeEventBase, GraphRunFailedEvent
from dify_graph.nodes import NodeType from dify_graph.nodes import BuiltinNodeTypes
from dify_graph.nodes.base.node import Node from dify_graph.nodes.base.node import Node
from dify_graph.runtime import ChildGraphNotFoundError, GraphRuntimeState, VariablePool from dify_graph.runtime import ChildGraphNotFoundError, GraphRuntimeState, VariablePool
from dify_graph.system_variable import SystemVariable from dify_graph.system_variable import SystemVariable
@ -253,7 +252,7 @@ class WorkflowEntry:
variable_mapping=variable_mapping, variable_mapping=variable_mapping,
user_inputs=user_inputs, user_inputs=user_inputs,
) )
if node_type != NodeType.DATASOURCE: if node_type != BuiltinNodeTypes.DATASOURCE:
cls.mapping_user_inputs_to_variable_pool( cls.mapping_user_inputs_to_variable_pool(
variable_mapping=variable_mapping, variable_mapping=variable_mapping,
user_inputs=user_inputs, user_inputs=user_inputs,
@ -303,7 +302,7 @@ class WorkflowEntry:
"height": node_height, "height": node_height,
"type": "custom", "type": "custom",
"data": { "data": {
"type": NodeType.START, "type": BuiltinNodeTypes.START,
"title": "Start", "title": "Start",
"desc": "Start", "desc": "Start",
}, },
@ -339,8 +338,8 @@ class WorkflowEntry:
# Create a minimal graph for single node execution # Create a minimal graph for single node execution
graph_dict = cls._create_single_node_graph(node_id, node_data) graph_dict = cls._create_single_node_graph(node_id, node_data)
node_type = NodeType(node_data.get("type", "")) node_type = node_data.get("type", "")
if node_type not in {NodeType.PARAMETER_EXTRACTOR, NodeType.QUESTION_CLASSIFIER}: if node_type not in {BuiltinNodeTypes.PARAMETER_EXTRACTOR, BuiltinNodeTypes.QUESTION_CLASSIFIER}:
raise ValueError(f"Node type {node_type} not supported") raise ValueError(f"Node type {node_type} not supported")
node_cls = resolve_workflow_node_class(node_type=node_type, node_version="1") node_cls = resolve_workflow_node_class(node_type=node_type, node_version="1")

View File

@ -113,7 +113,7 @@ The codebase enforces strict layering via import-linter:
1. Create node class in `nodes/<node_type>/` 1. Create node class in `nodes/<node_type>/`
1. Inherit from `BaseNode` or appropriate base class 1. Inherit from `BaseNode` or appropriate base class
1. Implement `_run()` method 1. Implement `_run()` method
1. Register in `nodes/node_mapping.py` 1. Ensure the node module is importable under `nodes/<node_type>/`
1. Add tests in `tests/unit_tests/dify_graph/nodes/` 1. Add tests in `tests/unit_tests/dify_graph/nodes/`
### Implementing a Custom Layer ### Implementing a Custom Layer

View File

@ -121,6 +121,8 @@ class DefaultValue(BaseModel):
class BaseNodeData(ABC, BaseModel): class BaseNodeData(ABC, BaseModel):
# Raw graph payloads are first validated through `NodeConfigDictAdapter`, where # Raw graph payloads are first validated through `NodeConfigDictAdapter`, where
# `node["data"]` is typed as `BaseNodeData` before the concrete node class is known. # `node["data"]` is typed as `BaseNodeData` before the concrete node class is known.
# `type` therefore accepts downstream string node kinds; unknown node implementations
# are rejected later when the node factory resolves the node registry.
# At that boundary, node-specific fields are still "extra" relative to this shared DTO, # At that boundary, node-specific fields are still "extra" relative to this shared DTO,
# and persisted templates/workflows also carry undeclared compatibility keys such as # and persisted templates/workflows also carry undeclared compatibility keys such as
# `selected`, `params`, `paramSchemas`, and `datasource_label`. Keep extras permissive # `selected`, `params`, `paramSchemas`, and `datasource_label`. Keep extras permissive

View File

@ -48,7 +48,7 @@ class WorkflowNodeExecution(BaseModel):
index: int # Sequence number for ordering in trace visualization index: int # Sequence number for ordering in trace visualization
predecessor_node_id: str | None = None # ID of the node that executed before this one predecessor_node_id: str | None = None # ID of the node that executed before this one
node_id: str # ID of the node being executed node_id: str # ID of the node being executed
node_type: NodeType # Type of node (e.g., start, llm, knowledge) node_type: NodeType # Type of node (e.g., start, llm, downstream response node)
title: str # Display title of the node title: str # Display title of the node
# Execution data # Execution data

View File

@ -1,4 +1,5 @@
from enum import StrEnum from enum import StrEnum
from typing import ClassVar, TypeAlias
class NodeState(StrEnum): class NodeState(StrEnum):
@ -33,56 +34,71 @@ class SystemVariableKey(StrEnum):
INVOKE_FROM = "invoke_from" INVOKE_FROM = "invoke_from"
class NodeType(StrEnum): NodeType: TypeAlias = str
START = "start"
END = "end"
ANSWER = "answer"
LLM = "llm"
KNOWLEDGE_RETRIEVAL = "knowledge-retrieval"
KNOWLEDGE_INDEX = "knowledge-index"
IF_ELSE = "if-else"
CODE = "code"
TEMPLATE_TRANSFORM = "template-transform"
QUESTION_CLASSIFIER = "question-classifier"
HTTP_REQUEST = "http-request"
TOOL = "tool"
DATASOURCE = "datasource"
VARIABLE_AGGREGATOR = "variable-aggregator"
LEGACY_VARIABLE_AGGREGATOR = "variable-assigner" # TODO: Merge this into VARIABLE_AGGREGATOR in the database.
LOOP = "loop"
LOOP_START = "loop-start"
LOOP_END = "loop-end"
ITERATION = "iteration"
ITERATION_START = "iteration-start" # Fake start node for iteration.
PARAMETER_EXTRACTOR = "parameter-extractor"
VARIABLE_ASSIGNER = "assigner"
DOCUMENT_EXTRACTOR = "document-extractor"
LIST_OPERATOR = "list-operator"
AGENT = "agent"
TRIGGER_WEBHOOK = "trigger-webhook"
TRIGGER_SCHEDULE = "trigger-schedule"
TRIGGER_PLUGIN = "trigger-plugin"
HUMAN_INPUT = "human-input"
@property
def is_trigger_node(self) -> bool:
"""Check if this node type is a trigger node."""
return self in [
NodeType.TRIGGER_WEBHOOK,
NodeType.TRIGGER_SCHEDULE,
NodeType.TRIGGER_PLUGIN,
]
@property class BuiltinNodeTypes:
def is_start_node(self) -> bool: """Built-in node type string constants.
"""Check if this node type can serve as a workflow entry point."""
return self in [ `node_type` values are plain strings throughout the graph runtime. This namespace
NodeType.START, only exposes the built-in values shipped by `dify_graph`; downstream packages can
NodeType.DATASOURCE, use additional strings without extending this class.
NodeType.TRIGGER_WEBHOOK, """
NodeType.TRIGGER_SCHEDULE,
NodeType.TRIGGER_PLUGIN, START: ClassVar[NodeType] = "start"
] END: ClassVar[NodeType] = "end"
ANSWER: ClassVar[NodeType] = "answer"
LLM: ClassVar[NodeType] = "llm"
KNOWLEDGE_RETRIEVAL: ClassVar[NodeType] = "knowledge-retrieval"
IF_ELSE: ClassVar[NodeType] = "if-else"
CODE: ClassVar[NodeType] = "code"
TEMPLATE_TRANSFORM: ClassVar[NodeType] = "template-transform"
QUESTION_CLASSIFIER: ClassVar[NodeType] = "question-classifier"
HTTP_REQUEST: ClassVar[NodeType] = "http-request"
TOOL: ClassVar[NodeType] = "tool"
DATASOURCE: ClassVar[NodeType] = "datasource"
VARIABLE_AGGREGATOR: ClassVar[NodeType] = "variable-aggregator"
LEGACY_VARIABLE_AGGREGATOR: ClassVar[NodeType] = "variable-assigner"
LOOP: ClassVar[NodeType] = "loop"
LOOP_START: ClassVar[NodeType] = "loop-start"
LOOP_END: ClassVar[NodeType] = "loop-end"
ITERATION: ClassVar[NodeType] = "iteration"
ITERATION_START: ClassVar[NodeType] = "iteration-start"
PARAMETER_EXTRACTOR: ClassVar[NodeType] = "parameter-extractor"
VARIABLE_ASSIGNER: ClassVar[NodeType] = "assigner"
DOCUMENT_EXTRACTOR: ClassVar[NodeType] = "document-extractor"
LIST_OPERATOR: ClassVar[NodeType] = "list-operator"
AGENT: ClassVar[NodeType] = "agent"
HUMAN_INPUT: ClassVar[NodeType] = "human-input"
BUILT_IN_NODE_TYPES: tuple[NodeType, ...] = (
BuiltinNodeTypes.START,
BuiltinNodeTypes.END,
BuiltinNodeTypes.ANSWER,
BuiltinNodeTypes.LLM,
BuiltinNodeTypes.KNOWLEDGE_RETRIEVAL,
BuiltinNodeTypes.IF_ELSE,
BuiltinNodeTypes.CODE,
BuiltinNodeTypes.TEMPLATE_TRANSFORM,
BuiltinNodeTypes.QUESTION_CLASSIFIER,
BuiltinNodeTypes.HTTP_REQUEST,
BuiltinNodeTypes.TOOL,
BuiltinNodeTypes.DATASOURCE,
BuiltinNodeTypes.VARIABLE_AGGREGATOR,
BuiltinNodeTypes.LEGACY_VARIABLE_AGGREGATOR,
BuiltinNodeTypes.LOOP,
BuiltinNodeTypes.LOOP_START,
BuiltinNodeTypes.LOOP_END,
BuiltinNodeTypes.ITERATION,
BuiltinNodeTypes.ITERATION_START,
BuiltinNodeTypes.PARAMETER_EXTRACTOR,
BuiltinNodeTypes.VARIABLE_ASSIGNER,
BuiltinNodeTypes.DOCUMENT_EXTRACTOR,
BuiltinNodeTypes.LIST_OPERATOR,
BuiltinNodeTypes.AGENT,
BuiltinNodeTypes.HUMAN_INPUT,
)
class NodeExecutionType(StrEnum): class NodeExecutionType(StrEnum):
@ -236,7 +252,6 @@ class WorkflowNodeExecutionMetadataKey(StrEnum):
CURRENCY = "currency" CURRENCY = "currency"
TOOL_INFO = "tool_info" TOOL_INFO = "tool_info"
AGENT_LOG = "agent_log" AGENT_LOG = "agent_log"
TRIGGER_INFO = "trigger_info"
ITERATION_ID = "iteration_id" ITERATION_ID = "iteration_id"
ITERATION_INDEX = "iteration_index" ITERATION_INDEX = "iteration_index"
LOOP_ID = "loop_id" LOOP_ID = "loop_id"

View File

@ -83,50 +83,6 @@ class Graph:
return node_configs_map return node_configs_map
@classmethod
def _find_root_node_id(
cls,
node_configs_map: Mapping[str, NodeConfigDict],
edge_configs: Sequence[Mapping[str, object]],
root_node_id: str | None = None,
) -> str:
"""
Find the root node ID if not specified.
:param node_configs_map: mapping of node ID to node config
:param edge_configs: list of edge configurations
:param root_node_id: explicitly specified root node ID
:return: determined root node ID
"""
if root_node_id:
if root_node_id not in node_configs_map:
raise ValueError(f"Root node id {root_node_id} not found in the graph")
return root_node_id
# Find nodes with no incoming edges
nodes_with_incoming: set[str] = set()
for edge_config in edge_configs:
target = edge_config.get("target")
if isinstance(target, str):
nodes_with_incoming.add(target)
root_candidates = [nid for nid in node_configs_map if nid not in nodes_with_incoming]
# Prefer START node if available
start_node_id = None
for nid in root_candidates:
node_data = node_configs_map[nid]["data"]
if node_data.type.is_start_node:
start_node_id = nid
break
root_node_id = start_node_id or (root_candidates[0] if root_candidates else None)
if not root_node_id:
raise ValueError("Unable to determine root node ID")
return root_node_id
@classmethod @classmethod
def _build_edges( def _build_edges(
cls, edge_configs: list[dict[str, object]] cls, edge_configs: list[dict[str, object]]
@ -301,15 +257,15 @@ class Graph:
*, *,
graph_config: Mapping[str, object], graph_config: Mapping[str, object],
node_factory: NodeFactory, node_factory: NodeFactory,
root_node_id: str | None = None, root_node_id: str,
skip_validation: bool = False, skip_validation: bool = False,
) -> Graph: ) -> Graph:
""" """
Initialize graph Initialize a graph with an explicit execution entry point.
:param graph_config: graph config containing nodes and edges :param graph_config: graph config containing nodes and edges
:param node_factory: factory for creating node instances from config data :param node_factory: factory for creating node instances from config data
:param root_node_id: root node id :param root_node_id: active root node id
:return: graph instance :return: graph instance
""" """
# Parse configs # Parse configs
@ -327,8 +283,8 @@ class Graph:
# Parse node configurations # Parse node configurations
node_configs_map = cls._parse_node_configs(node_configs) node_configs_map = cls._parse_node_configs(node_configs)
# Find root node if root_node_id not in node_configs_map:
root_node_id = cls._find_root_node_id(node_configs_map, edge_configs, root_node_id) raise ValueError(f"Root node id {root_node_id} not found in the graph")
# Build edges # Build edges
edges, in_edges, out_edges = cls._build_edges(edge_configs) edges, in_edges, out_edges = cls._build_edges(edge_configs)

View File

@ -4,7 +4,7 @@ from collections.abc import Sequence
from dataclasses import dataclass from dataclasses import dataclass
from typing import TYPE_CHECKING, Protocol from typing import TYPE_CHECKING, Protocol
from dify_graph.enums import NodeExecutionType, NodeType from dify_graph.enums import BuiltinNodeTypes, NodeExecutionType, NodeType
if TYPE_CHECKING: if TYPE_CHECKING:
from .graph import Graph from .graph import Graph
@ -71,7 +71,7 @@ class _RootNodeValidator:
"""Validates root node invariants.""" """Validates root node invariants."""
invalid_root_code: str = "INVALID_ROOT" invalid_root_code: str = "INVALID_ROOT"
container_entry_types: tuple[NodeType, ...] = (NodeType.ITERATION_START, NodeType.LOOP_START) container_entry_types: tuple[NodeType, ...] = (BuiltinNodeTypes.ITERATION_START, BuiltinNodeTypes.LOOP_START)
def validate(self, graph: Graph) -> Sequence[GraphValidationIssue]: def validate(self, graph: Graph) -> Sequence[GraphValidationIssue]:
root_node = graph.root_node root_node = graph.root_node
@ -86,7 +86,7 @@ class _RootNodeValidator:
) )
return issues return issues
node_type = getattr(root_node, "node_type", None) node_type = root_node.node_type
if root_node.execution_type != NodeExecutionType.ROOT and node_type not in self.container_entry_types: if root_node.execution_type != NodeExecutionType.ROOT and node_type not in self.container_entry_types:
issues.append( issues.append(
GraphValidationIssue( GraphValidationIssue(
@ -114,45 +114,9 @@ class GraphValidator:
raise GraphValidationError(issues) raise GraphValidationError(issues)
@dataclass(frozen=True, slots=True)
class _TriggerStartExclusivityValidator:
"""Ensures trigger nodes do not coexist with UserInput (start) nodes."""
conflict_code: str = "TRIGGER_START_NODE_CONFLICT"
def validate(self, graph: Graph) -> Sequence[GraphValidationIssue]:
start_node_id: str | None = None
trigger_node_ids: list[str] = []
for node in graph.nodes.values():
node_type = getattr(node, "node_type", None)
if not isinstance(node_type, NodeType):
continue
if node_type == NodeType.START:
start_node_id = node.id
elif node_type.is_trigger_node:
trigger_node_ids.append(node.id)
if start_node_id and trigger_node_ids:
trigger_list = ", ".join(trigger_node_ids)
return [
GraphValidationIssue(
code=self.conflict_code,
message=(
f"UserInput (start) node '{start_node_id}' cannot coexist with trigger nodes: {trigger_list}."
),
node_id=start_node_id,
)
]
return []
_DEFAULT_RULES: tuple[GraphValidationRule, ...] = ( _DEFAULT_RULES: tuple[GraphValidationRule, ...] = (
_EdgeEndpointValidator(), _EdgeEndpointValidator(),
_RootNodeValidator(), _RootNodeValidator(),
_TriggerStartExclusivityValidator(),
) )

View File

@ -6,5 +6,6 @@ of responses based on upstream node outputs and constants.
""" """
from .coordinator import ResponseStreamCoordinator from .coordinator import ResponseStreamCoordinator
from .session import RESPONSE_SESSION_NODE_TYPES
__all__ = ["ResponseStreamCoordinator"] __all__ = ["RESPONSE_SESSION_NODE_TYPES", "ResponseStreamCoordinator"]

View File

@ -3,19 +3,34 @@ Internal response session management for response coordinator.
This module contains the private ResponseSession class used internally This module contains the private ResponseSession class used internally
by ResponseStreamCoordinator to manage streaming sessions. by ResponseStreamCoordinator to manage streaming sessions.
`RESPONSE_SESSION_NODE_TYPES` is intentionally mutable so downstream applications
can opt additional response-capable node types into session creation without
patching the coordinator.
""" """
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from typing import Protocol, cast
from dify_graph.nodes.answer.answer_node import AnswerNode from dify_graph.enums import BuiltinNodeTypes, NodeType
from dify_graph.nodes.base.template import Template from dify_graph.nodes.base.template import Template
from dify_graph.nodes.end.end_node import EndNode
from dify_graph.nodes.knowledge_index import KnowledgeIndexNode
from dify_graph.runtime.graph_runtime_state import NodeProtocol from dify_graph.runtime.graph_runtime_state import NodeProtocol
class _ResponseSessionNodeProtocol(NodeProtocol, Protocol):
"""Structural contract required from nodes that can open a response session."""
def get_streaming_template(self) -> Template: ...
RESPONSE_SESSION_NODE_TYPES: list[NodeType] = [
BuiltinNodeTypes.ANSWER,
BuiltinNodeTypes.END,
]
@dataclass @dataclass
class ResponseSession: class ResponseSession:
""" """
@ -33,10 +48,9 @@ class ResponseSession:
""" """
Create a ResponseSession from a response-capable node. Create a ResponseSession from a response-capable node.
The parameter is typed as `NodeProtocol` because the graph is exposed behind a protocol at the runtime layer, The parameter is typed as `NodeProtocol` because the graph is exposed behind a protocol at the runtime layer.
but at runtime this must be an `AnswerNode`, `EndNode`, or `KnowledgeIndexNode` that provides: At runtime this must be a node whose `node_type` is listed in `RESPONSE_SESSION_NODE_TYPES`
- `id: str` and which implements `get_streaming_template()`.
- `get_streaming_template() -> Template`
Args: Args:
node: Node from the materialized workflow graph. node: Node from the materialized workflow graph.
@ -47,11 +61,22 @@ class ResponseSession:
Raises: Raises:
TypeError: If node is not a supported response node type. TypeError: If node is not a supported response node type.
""" """
if not isinstance(node, AnswerNode | EndNode | KnowledgeIndexNode): if node.node_type not in RESPONSE_SESSION_NODE_TYPES:
raise TypeError("ResponseSession.from_node only supports AnswerNode, EndNode, or KnowledgeIndexNode") supported_node_types = ", ".join(RESPONSE_SESSION_NODE_TYPES)
raise TypeError(
"ResponseSession.from_node only supports node types in "
f"RESPONSE_SESSION_NODE_TYPES: {supported_node_types}"
)
response_node = cast(_ResponseSessionNodeProtocol, node)
try:
template = response_node.get_streaming_template()
except AttributeError as exc:
raise TypeError("ResponseSession.from_node requires get_streaming_template() on response nodes") from exc
return cls( return cls(
node_id=node.id, node_id=node.id,
template=node.get_streaming_template(), template=template,
) )
def is_complete(self) -> bool: def is_complete(self) -> bool:

View File

@ -1,9 +1,9 @@
from collections.abc import Sequence from collections.abc import Mapping, Sequence
from datetime import datetime from datetime import datetime
from typing import Any
from pydantic import Field from pydantic import Field
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from dify_graph.entities.pause_reason import PauseReason from dify_graph.entities.pause_reason import PauseReason
from dify_graph.file import File from dify_graph.file import File
from dify_graph.model_runtime.entities.llm_entities import LLMUsage from dify_graph.model_runtime.entities.llm_entities import LLMUsage
@ -13,7 +13,7 @@ from .base import NodeEventBase
class RunRetrieverResourceEvent(NodeEventBase): class RunRetrieverResourceEvent(NodeEventBase):
retriever_resources: Sequence[RetrievalSourceMetadata] = Field(..., description="retriever resources") retriever_resources: Sequence[Mapping[str, Any]] = Field(..., description="retriever resources")
context: str = Field(..., description="context") context: str = Field(..., description="context")
context_files: list[File] | None = Field(default=None, description="context files") context_files: list[File] | None = Field(default=None, description="context files")

View File

@ -1,3 +1,3 @@
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes
__all__ = ["NodeType"] __all__ = ["BuiltinNodeTypes"]

View File

@ -1,7 +1,7 @@
from collections.abc import Mapping, Sequence from collections.abc import Mapping, Sequence
from typing import Any from typing import Any
from dify_graph.enums import NodeExecutionType, NodeType, WorkflowNodeExecutionStatus from dify_graph.enums import BuiltinNodeTypes, NodeExecutionType, WorkflowNodeExecutionStatus
from dify_graph.node_events import NodeRunResult from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.answer.entities import AnswerNodeData from dify_graph.nodes.answer.entities import AnswerNodeData
from dify_graph.nodes.base.node import Node from dify_graph.nodes.base.node import Node
@ -11,7 +11,7 @@ from dify_graph.variables import ArrayFileSegment, FileSegment, Segment
class AnswerNode(Node[AnswerNodeData]): class AnswerNode(Node[AnswerNodeData]):
node_type = NodeType.ANSWER node_type = BuiltinNodeTypes.ANSWER
execution_type = NodeExecutionType.RESPONSE execution_type = NodeExecutionType.RESPONSE
@classmethod @classmethod

View File

@ -4,7 +4,7 @@ from enum import StrEnum, auto
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes, NodeType
class AnswerNodeData(BaseNodeData): class AnswerNodeData(BaseNodeData):
@ -12,7 +12,7 @@ class AnswerNodeData(BaseNodeData):
Answer Node Data. Answer Node Data.
""" """
type: NodeType = NodeType.ANSWER type: NodeType = BuiltinNodeTypes.ANSWER
answer: str = Field(..., description="answer template string") answer: str = Field(..., description="answer template string")

View File

@ -1,9 +1,7 @@
from __future__ import annotations from __future__ import annotations
import importlib
import logging import logging
import operator import operator
import pkgutil
from abc import abstractmethod from abc import abstractmethod
from collections.abc import Generator, Mapping, Sequence from collections.abc import Generator, Mapping, Sequence
from functools import singledispatchmethod from functools import singledispatchmethod
@ -161,7 +159,7 @@ class Node(Generic[NodeDataT]):
Example: Example:
class CodeNode(Node[CodeNodeData]): # CodeNodeData is auto-extracted class CodeNode(Node[CodeNodeData]): # CodeNodeData is auto-extracted
node_type = NodeType.CODE node_type = BuiltinNodeTypes.CODE
# No need to implement _get_title, _get_error_strategy, etc. # No need to implement _get_title, _get_error_strategy, etc.
""" """
super().__init_subclass__(**kwargs) super().__init_subclass__(**kwargs)
@ -179,7 +177,8 @@ class Node(Generic[NodeDataT]):
# Skip base class itself # Skip base class itself
if cls is Node: if cls is Node:
return return
# Only register production node implementations defined under dify_graph.nodes.* # Only register production node implementations defined under the
# canonical workflow namespaces.
# This prevents test helper subclasses from polluting the global registry and # This prevents test helper subclasses from polluting the global registry and
# accidentally overriding real node types (e.g., a test Answer node). # accidentally overriding real node types (e.g., a test Answer node).
module_name = getattr(cls, "__module__", "") module_name = getattr(cls, "__module__", "")
@ -187,7 +186,7 @@ class Node(Generic[NodeDataT]):
node_type = cls.node_type node_type = cls.node_type
version = cls.version() version = cls.version()
bucket = Node._registry.setdefault(node_type, {}) bucket = Node._registry.setdefault(node_type, {})
if module_name.startswith("dify_graph.nodes."): if module_name.startswith(("dify_graph.nodes.", "core.workflow.nodes.")):
# Production node definitions take precedence and may override # Production node definitions take precedence and may override
bucket[version] = cls # type: ignore[index] bucket[version] = cls # type: ignore[index]
else: else:
@ -203,6 +202,7 @@ class Node(Generic[NodeDataT]):
else: else:
latest_key = max(version_keys) if version_keys else version latest_key = max(version_keys) if version_keys else version
bucket["latest"] = bucket[latest_key] bucket["latest"] = bucket[latest_key]
Node._registry_version += 1
@classmethod @classmethod
def _extract_node_data_type_from_generic(cls) -> type[BaseNodeData] | None: def _extract_node_data_type_from_generic(cls) -> type[BaseNodeData] | None:
@ -237,6 +237,11 @@ class Node(Generic[NodeDataT]):
# Global registry populated via __init_subclass__ # Global registry populated via __init_subclass__
_registry: ClassVar[dict[NodeType, dict[str, type[Node]]]] = {} _registry: ClassVar[dict[NodeType, dict[str, type[Node]]]] = {}
_registry_version: ClassVar[int] = 0
@classmethod
def get_registry_version(cls) -> int:
return cls._registry_version
def __init__( def __init__(
self, self,
@ -269,6 +274,10 @@ class Node(Generic[NodeDataT]):
"""Validate shared graph node payloads against the subclass-declared NodeData model.""" """Validate shared graph node payloads against the subclass-declared NodeData model."""
return cast(NodeDataT, cls._node_data_type.model_validate(node_data, from_attributes=True)) return cast(NodeDataT, cls._node_data_type.model_validate(node_data, from_attributes=True))
def init_node_data(self, data: BaseNodeData | Mapping[str, Any]) -> None:
"""Hydrate `_node_data` for legacy callers that bypass `__init__`."""
self._node_data = self.validate_node_data(cast(BaseNodeData, data))
def post_init(self) -> None: def post_init(self) -> None:
"""Optional hook for subclasses requiring extra initialization.""" """Optional hook for subclasses requiring extra initialization."""
return return
@ -489,29 +498,19 @@ class Node(Generic[NodeDataT]):
def version(cls) -> str: def version(cls) -> str:
"""`node_version` returns the version of current node type.""" """`node_version` returns the version of current node type."""
# NOTE(QuantumGhost): Node versions must remain unique per `NodeType` so # NOTE(QuantumGhost): Node versions must remain unique per `NodeType` so
# `Node.get_node_type_classes_mapping()` can resolve numeric versions and `latest`. # registry lookups can resolve numeric versions and `latest`.
raise NotImplementedError("subclasses of BaseNode must implement `version` method.") raise NotImplementedError("subclasses of BaseNode must implement `version` method.")
@classmethod @classmethod
def get_node_type_classes_mapping(cls) -> Mapping[NodeType, Mapping[str, type[Node]]]: def get_node_type_classes_mapping(cls) -> Mapping[NodeType, Mapping[str, type[Node]]]:
"""Return mapping of NodeType -> {version -> Node subclass} using __init_subclass__ registry. """Return a read-only view of the currently registered node classes.
Import all modules under dify_graph.nodes so subclasses register themselves on import. This accessor intentionally performs no imports. The embedding layer that
Callers that rely on workflow-local nodes defined outside `dify_graph.nodes` must import owns bootstrap (for example `core.workflow.node_factory`) must import any
those modules before invoking this method so they can register through `__init_subclass__`. extension node packages before calling it so their subclasses register via
We then return a readonly view of the registry to avoid accidental mutation. `__init_subclass__`.
""" """
# Import all node modules to ensure they are loaded (thus registered) return {node_type: MappingProxyType(version_map) for node_type, version_map in cls._registry.items()}
import dify_graph.nodes as _nodes_pkg
for _, _modname, _ in pkgutil.walk_packages(_nodes_pkg.__path__, _nodes_pkg.__name__ + "."):
# Avoid importing modules that depend on the registry to prevent circular imports.
if _modname == "dify_graph.nodes.node_mapping":
continue
importlib.import_module(_modname)
# Return a readonly view so callers can't mutate the registry by accident
return {nt: MappingProxyType(ver_map) for nt, ver_map in cls._registry.items()}
@property @property
def retry(self) -> bool: def retry(self) -> bool:
@ -786,11 +785,16 @@ class Node(Generic[NodeDataT]):
@_dispatch.register @_dispatch.register
def _(self, event: RunRetrieverResourceEvent) -> NodeRunRetrieverResourceEvent: def _(self, event: RunRetrieverResourceEvent) -> NodeRunRetrieverResourceEvent:
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
retriever_resources = [
RetrievalSourceMetadata.model_validate(resource) for resource in event.retriever_resources
]
return NodeRunRetrieverResourceEvent( return NodeRunRetrieverResourceEvent(
id=self.execution_id, id=self.execution_id,
node_id=self._node_id, node_id=self._node_id,
node_type=self.node_type, node_type=self.node_type,
retriever_resources=event.retriever_resources, retriever_resources=retriever_resources,
context=event.context, context=event.context,
node_version=self.version(), node_version=self.version(),
) )

View File

@ -4,7 +4,7 @@ from textwrap import dedent
from typing import TYPE_CHECKING, Any, Protocol, cast from typing import TYPE_CHECKING, Any, Protocol, cast
from dify_graph.entities.graph_config import NodeConfigDict from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.enums import NodeType, WorkflowNodeExecutionStatus from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionStatus
from dify_graph.node_events import NodeRunResult from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node from dify_graph.nodes.base.node import Node
from dify_graph.nodes.code.entities import CodeLanguage, CodeNodeData from dify_graph.nodes.code.entities import CodeLanguage, CodeNodeData
@ -72,7 +72,7 @@ _DEFAULT_CODE_BY_LANGUAGE: Mapping[CodeLanguage, str] = {
class CodeNode(Node[CodeNodeData]): class CodeNode(Node[CodeNodeData]):
node_type = NodeType.CODE node_type = BuiltinNodeTypes.CODE
_limits: CodeNodeLimits _limits: CodeNodeLimits
def __init__( def __init__(

View File

@ -4,7 +4,7 @@ from typing import Annotated, Literal
from pydantic import AfterValidator, BaseModel from pydantic import AfterValidator, BaseModel
from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes, NodeType
from dify_graph.nodes.base.entities import VariableSelector from dify_graph.nodes.base.entities import VariableSelector
from dify_graph.variables.types import SegmentType from dify_graph.variables.types import SegmentType
@ -40,7 +40,7 @@ class CodeNodeData(BaseNodeData):
Code Node Data. Code Node Data.
""" """
type: NodeType = NodeType.CODE type: NodeType = BuiltinNodeTypes.CODE
class Output(BaseModel): class Output(BaseModel):
type: Annotated[SegmentType, AfterValidator(_validate_type)] type: Annotated[SegmentType, AfterValidator(_validate_type)]

View File

@ -1,3 +0,0 @@
from .datasource_node import DatasourceNode
__all__ = ["DatasourceNode"]

View File

@ -2,11 +2,11 @@ from collections.abc import Sequence
from dataclasses import dataclass from dataclasses import dataclass
from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes, NodeType
class DocumentExtractorNodeData(BaseNodeData): class DocumentExtractorNodeData(BaseNodeData):
type: NodeType = NodeType.DOCUMENT_EXTRACTOR type: NodeType = BuiltinNodeTypes.DOCUMENT_EXTRACTOR
variable_selector: Sequence[str] variable_selector: Sequence[str]

View File

@ -22,7 +22,7 @@ from docx.table import Table
from docx.text.paragraph import Paragraph from docx.text.paragraph import Paragraph
from dify_graph.entities.graph_config import NodeConfigDict from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.enums import NodeType, WorkflowNodeExecutionStatus from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionStatus
from dify_graph.file import File, FileTransferMethod, file_manager from dify_graph.file import File, FileTransferMethod, file_manager
from dify_graph.node_events import NodeRunResult from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node from dify_graph.nodes.base.node import Node
@ -46,7 +46,7 @@ class DocumentExtractorNode(Node[DocumentExtractorNodeData]):
Supports plain text, PDF, and DOC/DOCX files. Supports plain text, PDF, and DOC/DOCX files.
""" """
node_type = NodeType.DOCUMENT_EXTRACTOR node_type = BuiltinNodeTypes.DOCUMENT_EXTRACTOR
@classmethod @classmethod
def version(cls) -> str: def version(cls) -> str:

View File

@ -1,4 +1,4 @@
from dify_graph.enums import NodeExecutionType, NodeType, WorkflowNodeExecutionStatus from dify_graph.enums import BuiltinNodeTypes, NodeExecutionType, WorkflowNodeExecutionStatus
from dify_graph.node_events import NodeRunResult from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node from dify_graph.nodes.base.node import Node
from dify_graph.nodes.base.template import Template from dify_graph.nodes.base.template import Template
@ -6,7 +6,7 @@ from dify_graph.nodes.end.entities import EndNodeData
class EndNode(Node[EndNodeData]): class EndNode(Node[EndNodeData]):
node_type = NodeType.END node_type = BuiltinNodeTypes.END
execution_type = NodeExecutionType.RESPONSE execution_type = NodeExecutionType.RESPONSE
@classmethod @classmethod

View File

@ -1,7 +1,7 @@
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes, NodeType
from dify_graph.nodes.base.entities import OutputVariableEntity from dify_graph.nodes.base.entities import OutputVariableEntity
@ -10,7 +10,7 @@ class EndNodeData(BaseNodeData):
END Node Data. END Node Data.
""" """
type: NodeType = NodeType.END type: NodeType = BuiltinNodeTypes.END
outputs: list[OutputVariableEntity] outputs: list[OutputVariableEntity]

View File

@ -9,7 +9,7 @@ import httpx
from pydantic import BaseModel, Field, ValidationInfo, field_validator from pydantic import BaseModel, Field, ValidationInfo, field_validator
from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes, NodeType
HTTP_REQUEST_CONFIG_FILTER_KEY = "http_request_config" HTTP_REQUEST_CONFIG_FILTER_KEY = "http_request_config"
@ -90,7 +90,7 @@ class HttpRequestNodeData(BaseNodeData):
Code Node Data. Code Node Data.
""" """
type: NodeType = NodeType.HTTP_REQUEST type: NodeType = BuiltinNodeTypes.HTTP_REQUEST
method: Literal[ method: Literal[
"get", "get",
"post", "post",

View File

@ -4,7 +4,7 @@ from collections.abc import Callable, Mapping, Sequence
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
from dify_graph.entities.graph_config import NodeConfigDict from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.enums import NodeType, WorkflowNodeExecutionStatus from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionStatus
from dify_graph.file import File, FileTransferMethod from dify_graph.file import File, FileTransferMethod
from dify_graph.node_events import NodeRunResult from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base import variable_template_parser from dify_graph.nodes.base import variable_template_parser
@ -33,7 +33,7 @@ if TYPE_CHECKING:
class HttpRequestNode(Node[HttpRequestNodeData]): class HttpRequestNode(Node[HttpRequestNodeData]):
node_type = NodeType.HTTP_REQUEST node_type = BuiltinNodeTypes.HTTP_REQUEST
def __init__( def __init__(
self, self,

View File

@ -11,7 +11,7 @@ from typing import Annotated, Any, ClassVar, Literal, Self
from pydantic import BaseModel, Field, field_validator, model_validator from pydantic import BaseModel, Field, field_validator, model_validator
from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes, NodeType
from dify_graph.nodes.base.variable_template_parser import VariableTemplateParser from dify_graph.nodes.base.variable_template_parser import VariableTemplateParser
from dify_graph.runtime import VariablePool from dify_graph.runtime import VariablePool
from dify_graph.variables.consts import SELECTORS_LENGTH from dify_graph.variables.consts import SELECTORS_LENGTH
@ -215,7 +215,7 @@ class UserAction(BaseModel):
class HumanInputNodeData(BaseNodeData): class HumanInputNodeData(BaseNodeData):
"""Human Input node data.""" """Human Input node data."""
type: NodeType = NodeType.HUMAN_INPUT type: NodeType = BuiltinNodeTypes.HUMAN_INPUT
delivery_methods: list[DeliveryChannelConfig] = Field(default_factory=list) delivery_methods: list[DeliveryChannelConfig] = Field(default_factory=list)
form_content: str = "" form_content: str = ""
inputs: list[FormInput] = Field(default_factory=list) inputs: list[FormInput] = Field(default_factory=list)

View File

@ -5,7 +5,7 @@ from typing import TYPE_CHECKING, Any
from dify_graph.entities.graph_config import NodeConfigDict from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.entities.pause_reason import HumanInputRequired from dify_graph.entities.pause_reason import HumanInputRequired
from dify_graph.enums import NodeExecutionType, NodeType, WorkflowNodeExecutionStatus from dify_graph.enums import BuiltinNodeTypes, NodeExecutionType, WorkflowNodeExecutionStatus
from dify_graph.node_events import ( from dify_graph.node_events import (
HumanInputFormFilledEvent, HumanInputFormFilledEvent,
HumanInputFormTimeoutEvent, HumanInputFormTimeoutEvent,
@ -40,7 +40,7 @@ logger = logging.getLogger(__name__)
class HumanInputNode(Node[HumanInputNodeData]): class HumanInputNode(Node[HumanInputNodeData]):
node_type = NodeType.HUMAN_INPUT node_type = BuiltinNodeTypes.HUMAN_INPUT
execution_type = NodeExecutionType.BRANCH execution_type = NodeExecutionType.BRANCH
_BRANCH_SELECTION_KEYS: tuple[str, ...] = ( _BRANCH_SELECTION_KEYS: tuple[str, ...] = (

View File

@ -3,7 +3,7 @@ from typing import Literal
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes, NodeType
from dify_graph.utils.condition.entities import Condition from dify_graph.utils.condition.entities import Condition
@ -12,7 +12,7 @@ class IfElseNodeData(BaseNodeData):
If Else Node Data. If Else Node Data.
""" """
type: NodeType = NodeType.IF_ELSE type: NodeType = BuiltinNodeTypes.IF_ELSE
class Case(BaseModel): class Case(BaseModel):
""" """

View File

@ -3,7 +3,7 @@ from typing import Any, Literal
from typing_extensions import deprecated from typing_extensions import deprecated
from dify_graph.enums import NodeExecutionType, NodeType, WorkflowNodeExecutionStatus from dify_graph.enums import BuiltinNodeTypes, NodeExecutionType, WorkflowNodeExecutionStatus
from dify_graph.node_events import NodeRunResult from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node from dify_graph.nodes.base.node import Node
from dify_graph.nodes.if_else.entities import IfElseNodeData from dify_graph.nodes.if_else.entities import IfElseNodeData
@ -13,7 +13,7 @@ from dify_graph.utils.condition.processor import ConditionProcessor
class IfElseNode(Node[IfElseNodeData]): class IfElseNode(Node[IfElseNodeData]):
node_type = NodeType.IF_ELSE node_type = BuiltinNodeTypes.IF_ELSE
execution_type = NodeExecutionType.BRANCH execution_type = NodeExecutionType.BRANCH
@classmethod @classmethod

View File

@ -4,7 +4,7 @@ from typing import Any
from pydantic import Field from pydantic import Field
from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes, NodeType
from dify_graph.nodes.base import BaseIterationNodeData, BaseIterationState from dify_graph.nodes.base import BaseIterationNodeData, BaseIterationState
@ -19,7 +19,7 @@ class IterationNodeData(BaseIterationNodeData):
Iteration Node Data. Iteration Node Data.
""" """
type: NodeType = NodeType.ITERATION type: NodeType = BuiltinNodeTypes.ITERATION
parent_loop_id: str | None = None # redundant field, not used currently parent_loop_id: str | None = None # redundant field, not used currently
iterator_selector: list[str] # variable selector iterator_selector: list[str] # variable selector
output_selector: list[str] # output selector output_selector: list[str] # output selector
@ -34,7 +34,7 @@ class IterationStartNodeData(BaseNodeData):
Iteration Start Node Data. Iteration Start Node Data.
""" """
type: NodeType = NodeType.ITERATION_START type: NodeType = BuiltinNodeTypes.ITERATION_START
class IterationState(BaseIterationState): class IterationState(BaseIterationState):

View File

@ -9,8 +9,8 @@ from typing_extensions import TypeIs
from dify_graph.constants import CONVERSATION_VARIABLE_NODE_ID from dify_graph.constants import CONVERSATION_VARIABLE_NODE_ID
from dify_graph.entities.graph_config import NodeConfigDictAdapter from dify_graph.entities.graph_config import NodeConfigDictAdapter
from dify_graph.enums import ( from dify_graph.enums import (
BuiltinNodeTypes,
NodeExecutionType, NodeExecutionType,
NodeType,
WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionMetadataKey,
WorkflowNodeExecutionStatus, WorkflowNodeExecutionStatus,
) )
@ -62,7 +62,7 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]):
Iteration Node. Iteration Node.
""" """
node_type = NodeType.ITERATION node_type = BuiltinNodeTypes.ITERATION
execution_type = NodeExecutionType.CONTAINER execution_type = NodeExecutionType.CONTAINER
@classmethod @classmethod
@ -485,12 +485,9 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]):
# variable selector to variable mapping # variable selector to variable mapping
try: try:
# Get node class
from dify_graph.nodes.node_mapping import get_node_type_classes_mapping
typed_sub_node_config = NodeConfigDictAdapter.validate_python(sub_node_config) typed_sub_node_config = NodeConfigDictAdapter.validate_python(sub_node_config)
node_type = typed_sub_node_config["data"].type node_type = typed_sub_node_config["data"].type
node_mapping = get_node_type_classes_mapping() node_mapping = Node.get_node_type_classes_mapping()
if node_type not in node_mapping: if node_type not in node_mapping:
continue continue
node_version = str(typed_sub_node_config["data"].version) node_version = str(typed_sub_node_config["data"].version)
@ -563,7 +560,7 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]):
raise IterationIndexNotFoundError(f"iteration {self._node_id} current index not found") raise IterationIndexNotFoundError(f"iteration {self._node_id} current index not found")
current_index = index_variable.value current_index = index_variable.value
for event in rst: for event in rst:
if isinstance(event, GraphNodeEventBase) and event.node_type == NodeType.ITERATION_START: if isinstance(event, GraphNodeEventBase) and event.node_type == BuiltinNodeTypes.ITERATION_START:
continue continue
if isinstance(event, GraphNodeEventBase): if isinstance(event, GraphNodeEventBase):

View File

@ -1,4 +1,4 @@
from dify_graph.enums import NodeType, WorkflowNodeExecutionStatus from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionStatus
from dify_graph.node_events import NodeRunResult from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node from dify_graph.nodes.base.node import Node
from dify_graph.nodes.iteration.entities import IterationStartNodeData from dify_graph.nodes.iteration.entities import IterationStartNodeData
@ -9,7 +9,7 @@ class IterationStartNode(Node[IterationStartNodeData]):
Iteration Start Node. Iteration Start Node.
""" """
node_type = NodeType.ITERATION_START node_type = BuiltinNodeTypes.ITERATION_START
@classmethod @classmethod
def version(cls) -> str: def version(cls) -> str:

View File

@ -1,3 +0,0 @@
from .knowledge_index_node import KnowledgeIndexNode
__all__ = ["KnowledgeIndexNode"]

View File

@ -1,3 +0,0 @@
from .knowledge_retrieval_node import KnowledgeRetrievalNode
__all__ = ["KnowledgeRetrievalNode"]

View File

@ -4,7 +4,7 @@ from enum import StrEnum
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes, NodeType
class FilterOperator(StrEnum): class FilterOperator(StrEnum):
@ -63,7 +63,7 @@ class ExtractConfig(BaseModel):
class ListOperatorNodeData(BaseNodeData): class ListOperatorNodeData(BaseNodeData):
type: NodeType = NodeType.LIST_OPERATOR type: NodeType = BuiltinNodeTypes.LIST_OPERATOR
variable: Sequence[str] = Field(default_factory=list) variable: Sequence[str] = Field(default_factory=list)
filter_by: FilterBy filter_by: FilterBy
order_by: OrderByConfig order_by: OrderByConfig

View File

@ -1,7 +1,7 @@
from collections.abc import Callable, Sequence from collections.abc import Callable, Sequence
from typing import Any, TypeAlias, TypeVar from typing import Any, TypeAlias, TypeVar
from dify_graph.enums import NodeType, WorkflowNodeExecutionStatus from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionStatus
from dify_graph.file import File from dify_graph.file import File
from dify_graph.node_events import NodeRunResult from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node from dify_graph.nodes.base.node import Node
@ -35,7 +35,7 @@ def _negation(filter_: Callable[[_T], bool]) -> Callable[[_T], bool]:
class ListOperatorNode(Node[ListOperatorNodeData]): class ListOperatorNode(Node[ListOperatorNodeData]):
node_type = NodeType.LIST_OPERATOR node_type = BuiltinNodeTypes.LIST_OPERATOR
@classmethod @classmethod
def version(cls) -> str: def version(cls) -> str:

View File

@ -5,7 +5,7 @@ from pydantic import BaseModel, Field, field_validator
from core.prompt.entities.advanced_prompt_entities import ChatModelMessage, CompletionModelPromptTemplate, MemoryConfig from core.prompt.entities.advanced_prompt_entities import ChatModelMessage, CompletionModelPromptTemplate, MemoryConfig
from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes, NodeType
from dify_graph.model_runtime.entities import ImagePromptMessageContent, LLMMode from dify_graph.model_runtime.entities import ImagePromptMessageContent, LLMMode
from dify_graph.nodes.base.entities import VariableSelector from dify_graph.nodes.base.entities import VariableSelector
@ -60,7 +60,7 @@ class LLMNodeCompletionModelPromptTemplate(CompletionModelPromptTemplate):
class LLMNodeData(BaseNodeData): class LLMNodeData(BaseNodeData):
type: NodeType = NodeType.LLM type: NodeType = BuiltinNodeTypes.LLM
model: ModelConfig model: ModelConfig
prompt_template: Sequence[LLMNodeChatModelMessage] | LLMNodeCompletionModelPromptTemplate prompt_template: Sequence[LLMNodeChatModelMessage] | LLMNodeCompletionModelPromptTemplate
prompt_config: PromptConfig = Field(default_factory=PromptConfig) prompt_config: PromptConfig = Field(default_factory=PromptConfig)

View File

@ -17,12 +17,12 @@ from core.llm_generator.output_parser.structured_output import invoke_llm_with_s
from core.model_manager import ModelInstance from core.model_manager import ModelInstance
from core.prompt.entities.advanced_prompt_entities import CompletionModelPromptTemplate, MemoryConfig from core.prompt.entities.advanced_prompt_entities import CompletionModelPromptTemplate, MemoryConfig
from core.prompt.utils.prompt_message_util import PromptMessageUtil from core.prompt.utils.prompt_message_util import PromptMessageUtil
from core.rag.entities.citation_metadata import RetrievalSourceMetadata
from core.tools.signature import sign_upload_file from core.tools.signature import sign_upload_file
from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID
from dify_graph.entities import GraphInitParams from dify_graph.entities import GraphInitParams
from dify_graph.entities.graph_config import NodeConfigDict from dify_graph.entities.graph_config import NodeConfigDict
from dify_graph.enums import ( from dify_graph.enums import (
BuiltinNodeTypes,
NodeType, NodeType,
SystemVariableKey, SystemVariableKey,
WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionMetadataKey,
@ -104,7 +104,7 @@ logger = logging.getLogger(__name__)
class LLMNode(Node[LLMNodeData]): class LLMNode(Node[LLMNodeData]):
node_type = NodeType.LLM node_type = BuiltinNodeTypes.LLM
# Compiled regex for extracting <think> blocks (with compatibility for attributes) # Compiled regex for extracting <think> blocks (with compatibility for attributes)
_THINK_PATTERN = re.compile(r"<think[^>]*>(.*?)</think>", re.IGNORECASE | re.DOTALL) _THINK_PATTERN = re.compile(r"<think[^>]*>(.*?)</think>", re.IGNORECASE | re.DOTALL)
@ -677,7 +677,7 @@ class LLMNode(Node[LLMNodeData]):
) )
elif isinstance(context_value_variable, ArraySegment): elif isinstance(context_value_variable, ArraySegment):
context_str = "" context_str = ""
original_retriever_resource: list[RetrievalSourceMetadata] = [] original_retriever_resource: list[dict[str, Any]] = []
context_files: list[File] = [] context_files: list[File] = []
for item in context_value_variable.value: for item in context_value_variable.value:
if isinstance(item, str): if isinstance(item, str):
@ -693,11 +693,14 @@ class LLMNode(Node[LLMNodeData]):
retriever_resource = self._convert_to_original_retriever_resource(item) retriever_resource = self._convert_to_original_retriever_resource(item)
if retriever_resource: if retriever_resource:
original_retriever_resource.append(retriever_resource) original_retriever_resource.append(retriever_resource)
segment_id = retriever_resource.get("segment_id")
if not segment_id:
continue
attachments_with_bindings = db.session.execute( attachments_with_bindings = db.session.execute(
select(SegmentAttachmentBinding, UploadFile) select(SegmentAttachmentBinding, UploadFile)
.join(UploadFile, UploadFile.id == SegmentAttachmentBinding.attachment_id) .join(UploadFile, UploadFile.id == SegmentAttachmentBinding.attachment_id)
.where( .where(
SegmentAttachmentBinding.segment_id == retriever_resource.segment_id, SegmentAttachmentBinding.segment_id == segment_id,
) )
).all() ).all()
if attachments_with_bindings: if attachments_with_bindings:
@ -723,7 +726,7 @@ class LLMNode(Node[LLMNodeData]):
context_files=context_files, context_files=context_files,
) )
def _convert_to_original_retriever_resource(self, context_dict: dict) -> RetrievalSourceMetadata | None: def _convert_to_original_retriever_resource(self, context_dict: dict) -> dict[str, Any] | None:
if ( if (
"metadata" in context_dict "metadata" in context_dict
and "_source" in context_dict["metadata"] and "_source" in context_dict["metadata"]
@ -731,28 +734,26 @@ class LLMNode(Node[LLMNodeData]):
): ):
metadata = context_dict.get("metadata", {}) metadata = context_dict.get("metadata", {})
source = RetrievalSourceMetadata( return {
position=metadata.get("position"), "position": metadata.get("position"),
dataset_id=metadata.get("dataset_id"), "dataset_id": metadata.get("dataset_id"),
dataset_name=metadata.get("dataset_name"), "dataset_name": metadata.get("dataset_name"),
document_id=metadata.get("document_id"), "document_id": metadata.get("document_id"),
document_name=metadata.get("document_name"), "document_name": metadata.get("document_name"),
data_source_type=metadata.get("data_source_type"), "data_source_type": metadata.get("data_source_type"),
segment_id=metadata.get("segment_id"), "segment_id": metadata.get("segment_id"),
retriever_from=metadata.get("retriever_from"), "retriever_from": metadata.get("retriever_from"),
score=metadata.get("score"), "score": metadata.get("score"),
hit_count=metadata.get("segment_hit_count"), "hit_count": metadata.get("segment_hit_count"),
word_count=metadata.get("segment_word_count"), "word_count": metadata.get("segment_word_count"),
segment_position=metadata.get("segment_position"), "segment_position": metadata.get("segment_position"),
index_node_hash=metadata.get("segment_index_node_hash"), "index_node_hash": metadata.get("segment_index_node_hash"),
content=context_dict.get("content"), "content": context_dict.get("content"),
page=metadata.get("page"), "page": metadata.get("page"),
doc_metadata=metadata.get("doc_metadata"), "doc_metadata": metadata.get("doc_metadata"),
files=context_dict.get("files"), "files": context_dict.get("files"),
summary=context_dict.get("summary"), "summary": context_dict.get("summary"),
) }
return source
return None return None

View File

@ -4,7 +4,7 @@ from typing import Annotated, Any, Literal
from pydantic import AfterValidator, BaseModel, Field, field_validator from pydantic import AfterValidator, BaseModel, Field, field_validator
from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.base_node_data import BaseNodeData
from dify_graph.enums import NodeType from dify_graph.enums import BuiltinNodeTypes, NodeType
from dify_graph.nodes.base import BaseLoopNodeData, BaseLoopState from dify_graph.nodes.base import BaseLoopNodeData, BaseLoopState
from dify_graph.utils.condition.entities import Condition from dify_graph.utils.condition.entities import Condition
from dify_graph.variables.types import SegmentType from dify_graph.variables.types import SegmentType
@ -41,7 +41,7 @@ class LoopVariableData(BaseModel):
class LoopNodeData(BaseLoopNodeData): class LoopNodeData(BaseLoopNodeData):
type: NodeType = NodeType.LOOP type: NodeType = BuiltinNodeTypes.LOOP
loop_count: int # Maximum number of loops loop_count: int # Maximum number of loops
break_conditions: list[Condition] # Conditions to break the loop break_conditions: list[Condition] # Conditions to break the loop
logical_operator: Literal["and", "or"] logical_operator: Literal["and", "or"]
@ -61,7 +61,7 @@ class LoopStartNodeData(BaseNodeData):
Loop Start Node Data. Loop Start Node Data.
""" """
type: NodeType = NodeType.LOOP_START type: NodeType = BuiltinNodeTypes.LOOP_START
class LoopEndNodeData(BaseNodeData): class LoopEndNodeData(BaseNodeData):
@ -69,7 +69,7 @@ class LoopEndNodeData(BaseNodeData):
Loop End Node Data. Loop End Node Data.
""" """
type: NodeType = NodeType.LOOP_END type: NodeType = BuiltinNodeTypes.LOOP_END
class LoopState(BaseLoopState): class LoopState(BaseLoopState):

View File

@ -1,4 +1,4 @@
from dify_graph.enums import NodeType, WorkflowNodeExecutionStatus from dify_graph.enums import BuiltinNodeTypes, WorkflowNodeExecutionStatus
from dify_graph.node_events import NodeRunResult from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node from dify_graph.nodes.base.node import Node
from dify_graph.nodes.loop.entities import LoopEndNodeData from dify_graph.nodes.loop.entities import LoopEndNodeData
@ -9,7 +9,7 @@ class LoopEndNode(Node[LoopEndNodeData]):
Loop End Node. Loop End Node.
""" """
node_type = NodeType.LOOP_END node_type = BuiltinNodeTypes.LOOP_END
@classmethod @classmethod
def version(cls) -> str: def version(cls) -> str:

Some files were not shown because too many files have changed in this diff Show More