Merge branch 'main' into feat/r2

# Conflicts:
#	docker/docker-compose.middleware.yaml
#	web/app/components/workflow-app/components/workflow-main.tsx
#	web/app/components/workflow-app/hooks/index.ts
#	web/app/components/workflow/hooks-store/store.ts
#	web/app/components/workflow/hooks/index.ts
#	web/app/components/workflow/nodes/_base/components/variable/var-reference-picker.tsx
This commit is contained in:
jyong
2025-07-02 18:20:05 +08:00
201 changed files with 7572 additions and 5289 deletions

View File

@ -27,6 +27,9 @@ from core.ops.ops_trace_manager import TraceQueueManager
from core.prompt.utils.get_thread_messages_length import get_thread_messages_length
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository
from core.workflow.repositories.draft_variable_repository import (
DraftVariableSaverFactory,
)
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader
@ -36,8 +39,10 @@ from libs.flask_utils import preserve_flask_contexts
from models import Account, App, Conversation, EndUser, Message, Workflow, WorkflowNodeExecutionTriggeredFrom
from models.enums import WorkflowRunTriggeredFrom
from services.conversation_service import ConversationService
from services.errors.message import MessageNotExistsError
from services.workflow_draft_variable_service import DraftVarLoader, WorkflowDraftVariableService
from services.workflow_draft_variable_service import (
DraftVarLoader,
WorkflowDraftVariableService,
)
logger = logging.getLogger(__name__)
@ -451,6 +456,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository,
stream=stream,
draft_var_saver_factory=self._get_draft_var_saver_factory(invoke_from),
)
return AdvancedChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
@ -480,8 +486,6 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
# get conversation and message
conversation = self._get_conversation(conversation_id)
message = self._get_message(message_id)
if message is None:
raise MessageNotExistsError("Message not exists")
# chatbot app
runner = AdvancedChatAppRunner(
@ -524,6 +528,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
user: Union[Account, EndUser],
workflow_execution_repository: WorkflowExecutionRepository,
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
draft_var_saver_factory: DraftVariableSaverFactory,
stream: bool = False,
) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]:
"""
@ -550,6 +555,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository,
stream=stream,
draft_var_saver_factory=draft_var_saver_factory,
)
try:

View File

@ -64,6 +64,7 @@ from core.workflow.entities.workflow_execution import WorkflowExecutionStatus, W
from core.workflow.enums import SystemVariableKey
from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState
from core.workflow.nodes import NodeType
from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager
@ -94,6 +95,7 @@ class AdvancedChatAppGenerateTaskPipeline:
dialogue_count: int,
workflow_execution_repository: WorkflowExecutionRepository,
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
draft_var_saver_factory: DraftVariableSaverFactory,
) -> None:
self._base_task_pipeline = BasedGenerateTaskPipeline(
application_generate_entity=application_generate_entity,
@ -153,6 +155,7 @@ class AdvancedChatAppGenerateTaskPipeline:
self._conversation_name_generate_thread: Thread | None = None
self._recorded_files: list[Mapping[str, Any]] = []
self._workflow_run_id: str = ""
self._draft_var_saver_factory = draft_var_saver_factory
def process(self) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]:
"""
@ -371,6 +374,7 @@ class AdvancedChatAppGenerateTaskPipeline:
workflow_node_execution=workflow_node_execution,
)
session.commit()
self._save_output_for_event(event, workflow_node_execution.id)
if node_finish_resp:
yield node_finish_resp
@ -390,6 +394,8 @@ class AdvancedChatAppGenerateTaskPipeline:
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
if isinstance(event, QueueNodeExceptionEvent):
self._save_output_for_event(event, workflow_node_execution.id)
if node_finish_resp:
yield node_finish_resp
@ -759,3 +765,15 @@ class AdvancedChatAppGenerateTaskPipeline:
if not message:
raise ValueError(f"Message not found: {self._message_id}")
return message
def _save_output_for_event(self, event: QueueNodeSucceededEvent | QueueNodeExceptionEvent, node_execution_id: str):
with Session(db.engine) as session, session.begin():
saver = self._draft_var_saver_factory(
session=session,
app_id=self._application_generate_entity.app_config.app_id,
node_id=event.node_id,
node_type=event.node_type,
node_execution_id=node_execution_id,
enclosing_node_id=event.in_loop_id or event.in_iteration_id,
)
saver.save(event.process_data, event.outputs)

View File

@ -26,7 +26,6 @@ from factories import file_factory
from libs.flask_utils import preserve_flask_contexts
from models import Account, App, EndUser
from services.conversation_service import ConversationService
from services.errors.message import MessageNotExistsError
logger = logging.getLogger(__name__)
@ -238,8 +237,6 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
# get conversation and message
conversation = self._get_conversation(conversation_id)
message = self._get_message(message_id)
if message is None:
raise MessageNotExistsError("Message not exists")
# chatbot app
runner = AgentChatAppRunner()

View File

@ -1,10 +1,20 @@
import json
from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any, Optional, Union
from typing import TYPE_CHECKING, Any, Optional, Union, final
from sqlalchemy.orm import Session
from core.app.app_config.entities import VariableEntityType
from core.app.entities.app_invoke_entities import InvokeFrom
from core.file import File, FileUploadConfig
from core.workflow.nodes.enums import NodeType
from core.workflow.repositories.draft_variable_repository import (
DraftVariableSaver,
DraftVariableSaverFactory,
NoopDraftVariableSaver,
)
from factories import file_factory
from services.workflow_draft_variable_service import DraftVariableSaver as DraftVariableSaverImpl
if TYPE_CHECKING:
from core.app.app_config.entities import VariableEntity
@ -159,3 +169,38 @@ class BaseAppGenerator:
yield f"event: {message}\n\n"
return gen()
@final
@staticmethod
def _get_draft_var_saver_factory(invoke_from: InvokeFrom) -> DraftVariableSaverFactory:
if invoke_from == InvokeFrom.DEBUGGER:
def draft_var_saver_factory(
session: Session,
app_id: str,
node_id: str,
node_type: NodeType,
node_execution_id: str,
enclosing_node_id: str | None = None,
) -> DraftVariableSaver:
return DraftVariableSaverImpl(
session=session,
app_id=app_id,
node_id=node_id,
node_type=node_type,
node_execution_id=node_execution_id,
enclosing_node_id=enclosing_node_id,
)
else:
def draft_var_saver_factory(
session: Session,
app_id: str,
node_id: str,
node_type: NodeType,
node_execution_id: str,
enclosing_node_id: str | None = None,
) -> DraftVariableSaver:
return NoopDraftVariableSaver()
return draft_var_saver_factory

View File

@ -25,7 +25,6 @@ from factories import file_factory
from models.account import Account
from models.model import App, EndUser
from services.conversation_service import ConversationService
from services.errors.message import MessageNotExistsError
logger = logging.getLogger(__name__)
@ -224,8 +223,6 @@ class ChatAppGenerator(MessageBasedAppGenerator):
# get conversation and message
conversation = self._get_conversation(conversation_id)
message = self._get_message(message_id)
if message is None:
raise MessageNotExistsError("Message not exists")
# chatbot app
runner = ChatAppRunner()

View File

@ -45,6 +45,7 @@ from core.app.entities.task_entities import (
from core.file import FILE_MODEL_IDENTITY, File
from core.plugin.impl.datasource import PluginDatasourceManager
from core.tools.tool_manager import ToolManager
from core.variables.segments import ArrayFileSegment, FileSegment, Segment
from core.workflow.entities.workflow_execution import WorkflowExecution
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecution, WorkflowNodeExecutionStatus
from core.workflow.nodes import NodeType
@ -516,7 +517,8 @@ class WorkflowResponseConverter:
# Convert to tuple to match Sequence type
return tuple(flattened_files)
def _fetch_files_from_variable_value(self, value: Union[dict, list]) -> Sequence[Mapping[str, Any]]:
@classmethod
def _fetch_files_from_variable_value(cls, value: Union[dict, list, Segment]) -> Sequence[Mapping[str, Any]]:
"""
Fetch files from variable value
:param value: variable value
@ -525,20 +527,30 @@ class WorkflowResponseConverter:
if not value:
return []
files = []
if isinstance(value, list):
files: list[Mapping[str, Any]] = []
if isinstance(value, FileSegment):
files.append(value.value.to_dict())
elif isinstance(value, ArrayFileSegment):
files.extend([i.to_dict() for i in value.value])
elif isinstance(value, File):
files.append(value.to_dict())
elif isinstance(value, list):
for item in value:
file = self._get_file_var_from_value(item)
file = cls._get_file_var_from_value(item)
if file:
files.append(file)
elif isinstance(value, dict):
file = self._get_file_var_from_value(value)
elif isinstance(
value,
dict,
):
file = cls._get_file_var_from_value(value)
if file:
files.append(file)
return files
def _get_file_var_from_value(self, value: Union[dict, list]) -> Mapping[str, Any] | None:
@classmethod
def _get_file_var_from_value(cls, value: Union[dict, list]) -> Mapping[str, Any] | None:
"""
Get file var from value
:param value: variable value

View File

@ -201,8 +201,6 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
try:
# get message
message = self._get_message(message_id)
if message is None:
raise MessageNotExistsError()
# chatbot app
runner = CompletionAppRunner()

View File

@ -29,6 +29,7 @@ from models.enums import CreatorUserRole
from models.model import App, AppMode, AppModelConfig, Conversation, EndUser, Message, MessageFile
from services.errors.app_model_config import AppModelConfigBrokenError
from services.errors.conversation import ConversationNotExistsError
from services.errors.message import MessageNotExistsError
logger = logging.getLogger(__name__)
@ -251,7 +252,7 @@ class MessageBasedAppGenerator(BaseAppGenerator):
return introduction or ""
def _get_conversation(self, conversation_id: str):
def _get_conversation(self, conversation_id: str) -> Conversation:
"""
Get conversation by conversation id
:param conversation_id: conversation id
@ -260,11 +261,11 @@ class MessageBasedAppGenerator(BaseAppGenerator):
conversation = db.session.query(Conversation).filter(Conversation.id == conversation_id).first()
if not conversation:
raise ConversationNotExistsError()
raise ConversationNotExistsError("Conversation not exists")
return conversation
def _get_message(self, message_id: str) -> Optional[Message]:
def _get_message(self, message_id: str) -> Message:
"""
Get message by message id
:param message_id: message id
@ -272,4 +273,7 @@ class MessageBasedAppGenerator(BaseAppGenerator):
"""
message = db.session.query(Message).filter(Message.id == message_id).first()
if message is None:
raise MessageNotExistsError("Message not exists")
return message

View File

@ -25,6 +25,7 @@ from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.ops.ops_trace_manager import TraceQueueManager
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository
from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader
@ -219,6 +220,9 @@ class WorkflowAppGenerator(BaseAppGenerator):
# new thread with request context and contextvars
context = contextvars.copy_context()
# release database connection, because the following new thread operations may take a long time
db.session.close()
worker_thread = threading.Thread(
target=self._generate_worker,
kwargs={
@ -233,6 +237,10 @@ class WorkflowAppGenerator(BaseAppGenerator):
worker_thread.start()
draft_var_saver_factory = self._get_draft_var_saver_factory(
invoke_from,
)
# return response or stream generator
response = self._handle_response(
application_generate_entity=application_generate_entity,
@ -241,6 +249,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
user=user,
workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository,
draft_var_saver_factory=draft_var_saver_factory,
stream=streaming,
)
@ -471,6 +480,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
user: Union[Account, EndUser],
workflow_execution_repository: WorkflowExecutionRepository,
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
draft_var_saver_factory: DraftVariableSaverFactory,
stream: bool = False,
) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]:
"""
@ -491,6 +501,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
user=user,
workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository,
draft_var_saver_factory=draft_var_saver_factory,
stream=stream,
)

View File

@ -56,6 +56,7 @@ from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
from core.ops.ops_trace_manager import TraceQueueManager
from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType
from core.workflow.enums import SystemVariableKey
from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager
@ -87,6 +88,7 @@ class WorkflowAppGenerateTaskPipeline:
stream: bool,
workflow_execution_repository: WorkflowExecutionRepository,
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
draft_var_saver_factory: DraftVariableSaverFactory,
) -> None:
self._base_task_pipeline = BasedGenerateTaskPipeline(
application_generate_entity=application_generate_entity,
@ -131,6 +133,8 @@ class WorkflowAppGenerateTaskPipeline:
self._application_generate_entity = application_generate_entity
self._workflow_features_dict = workflow.features_dict
self._workflow_run_id = ""
self._invoke_from = queue_manager._invoke_from
self._draft_var_saver_factory = draft_var_saver_factory
def process(self) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]:
"""
@ -322,6 +326,8 @@ class WorkflowAppGenerateTaskPipeline:
workflow_node_execution=workflow_node_execution,
)
self._save_output_for_event(event, workflow_node_execution.id)
if node_success_response:
yield node_success_response
elif isinstance(
@ -339,6 +345,8 @@ class WorkflowAppGenerateTaskPipeline:
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
if isinstance(event, QueueNodeExceptionEvent):
self._save_output_for_event(event, workflow_node_execution.id)
if node_failed_response:
yield node_failed_response
@ -593,3 +601,15 @@ class WorkflowAppGenerateTaskPipeline:
)
return response
def _save_output_for_event(self, event: QueueNodeSucceededEvent | QueueNodeExceptionEvent, node_execution_id: str):
with Session(db.engine) as session, session.begin():
saver = self._draft_var_saver_factory(
session=session,
app_id=self._application_generate_entity.app_config.app_id,
node_id=event.node_id,
node_type=event.node_type,
node_execution_id=node_execution_id,
enclosing_node_id=event.in_loop_id or event.in_iteration_id,
)
saver.save(event.process_data, event.outputs)

View File

@ -1,8 +1,6 @@
from collections.abc import Mapping
from typing import Any, Optional, cast
from sqlalchemy.orm import Session
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.base_app_runner import AppRunner
from core.app.entities.queue_entities import (
@ -35,7 +33,6 @@ from core.workflow.entities.variable_pool import VariablePool
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey
from core.workflow.graph_engine.entities.event import (
AgentLogEvent,
BaseNodeEvent,
GraphEngineEvent,
GraphRunFailedEvent,
GraphRunPartialSucceededEvent,
@ -70,9 +67,6 @@ from core.workflow.workflow_entry import WorkflowEntry
from extensions.ext_database import db
from models.model import App
from models.workflow import Workflow
from services.workflow_draft_variable_service import (
DraftVariableSaver,
)
class WorkflowBasedAppRunner(AppRunner):
@ -400,7 +394,6 @@ class WorkflowBasedAppRunner(AppRunner):
in_loop_id=event.in_loop_id,
)
)
self._save_draft_var_for_event(event)
elif isinstance(event, NodeRunFailedEvent):
self._publish_event(
@ -464,7 +457,6 @@ class WorkflowBasedAppRunner(AppRunner):
in_loop_id=event.in_loop_id,
)
)
self._save_draft_var_for_event(event)
elif isinstance(event, NodeInIterationFailedEvent):
self._publish_event(
@ -718,30 +710,3 @@ class WorkflowBasedAppRunner(AppRunner):
def _publish_event(self, event: AppQueueEvent) -> None:
self.queue_manager.publish(event, PublishFrom.APPLICATION_MANAGER)
def _save_draft_var_for_event(self, event: BaseNodeEvent):
run_result = event.route_node_state.node_run_result
if run_result is None:
return
process_data = run_result.process_data
outputs = run_result.outputs
with Session(bind=db.engine) as session, session.begin():
draft_var_saver = DraftVariableSaver(
session=session,
app_id=self._get_app_id(),
node_id=event.node_id,
node_type=event.node_type,
# FIXME(QuantumGhost): rely on private state of queue_manager is not ideal.
invoke_from=self.queue_manager._invoke_from,
node_execution_id=event.id,
enclosing_node_id=event.in_loop_id or event.in_iteration_id or None,
)
draft_var_saver.save(process_data=process_data, outputs=outputs)
def _remove_first_element_from_variable_string(key: str) -> str:
"""
Remove the first element from the prefix.
"""
prefix, remaining = key.split(".", maxsplit=1)
return remaining

View File

@ -395,6 +395,7 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
message.provider_response_latency = time.perf_counter() - self._start_at
message.total_price = usage.total_price
message.currency = usage.currency
self._task_state.llm_result.usage.latency = message.provider_response_latency
message.message_metadata = self._task_state.metadata.model_dump_json()
if trace_manager:

View File

@ -15,6 +15,11 @@ class CommonParameterType(StrEnum):
MODEL_SELECTOR = "model-selector"
TOOLS_SELECTOR = "array[tools]"
# Dynamic select parameter
# Once you are not sure about the available options until authorization is done
# eg: Select a Slack channel from a Slack workspace
DYNAMIC_SELECT = "dynamic-select"
# TOOL_SELECTOR = "tool-selector"

View File

@ -534,7 +534,7 @@ class IndexingRunner:
# chunk nodes by chunk size
indexing_start_at = time.perf_counter()
tokens = 0
if dataset_document.doc_form != IndexType.PARENT_CHILD_INDEX:
if dataset_document.doc_form != IndexType.PARENT_CHILD_INDEX and dataset.indexing_technique == "economy":
# create keyword index
create_keyword_thread = threading.Thread(
target=self._process_keyword_index,
@ -572,7 +572,7 @@ class IndexingRunner:
for future in futures:
tokens += future.result()
if dataset_document.doc_form != IndexType.PARENT_CHILD_INDEX:
if dataset_document.doc_form != IndexType.PARENT_CHILD_INDEX and dataset.indexing_technique == "economy":
create_keyword_thread.join()
indexing_end_at = time.perf_counter()

View File

@ -0,0 +1,374 @@
import json
from collections.abc import Generator, Mapping, Sequence
from copy import deepcopy
from enum import StrEnum
from typing import Any, Literal, Optional, cast, overload
import json_repair
from pydantic import TypeAdapter, ValidationError
from core.llm_generator.output_parser.errors import OutputParserError
from core.llm_generator.prompts import STRUCTURED_OUTPUT_PROMPT
from core.model_manager import ModelInstance
from core.model_runtime.callbacks.base_callback import Callback
from core.model_runtime.entities.llm_entities import (
LLMResult,
LLMResultChunk,
LLMResultChunkDelta,
LLMResultChunkWithStructuredOutput,
LLMResultWithStructuredOutput,
)
from core.model_runtime.entities.message_entities import (
AssistantPromptMessage,
PromptMessage,
PromptMessageTool,
SystemPromptMessage,
)
from core.model_runtime.entities.model_entities import AIModelEntity, ParameterRule
class ResponseFormat(StrEnum):
"""Constants for model response formats"""
JSON_SCHEMA = "json_schema" # model's structured output mode. some model like gemini, gpt-4o, support this mode.
JSON = "JSON" # model's json mode. some model like claude support this mode.
JSON_OBJECT = "json_object" # json mode's another alias. some model like deepseek-chat, qwen use this alias.
class SpecialModelType(StrEnum):
"""Constants for identifying model types"""
GEMINI = "gemini"
OLLAMA = "ollama"
@overload
def invoke_llm_with_structured_output(
provider: str,
model_schema: AIModelEntity,
model_instance: ModelInstance,
prompt_messages: Sequence[PromptMessage],
json_schema: Mapping[str, Any],
model_parameters: Optional[Mapping] = None,
tools: Sequence[PromptMessageTool] | None = None,
stop: Optional[list[str]] = None,
stream: Literal[True] = True,
user: Optional[str] = None,
callbacks: Optional[list[Callback]] = None,
) -> Generator[LLMResultChunkWithStructuredOutput, None, None]: ...
@overload
def invoke_llm_with_structured_output(
provider: str,
model_schema: AIModelEntity,
model_instance: ModelInstance,
prompt_messages: Sequence[PromptMessage],
json_schema: Mapping[str, Any],
model_parameters: Optional[Mapping] = None,
tools: Sequence[PromptMessageTool] | None = None,
stop: Optional[list[str]] = None,
stream: Literal[False] = False,
user: Optional[str] = None,
callbacks: Optional[list[Callback]] = None,
) -> LLMResultWithStructuredOutput: ...
@overload
def invoke_llm_with_structured_output(
provider: str,
model_schema: AIModelEntity,
model_instance: ModelInstance,
prompt_messages: Sequence[PromptMessage],
json_schema: Mapping[str, Any],
model_parameters: Optional[Mapping] = None,
tools: Sequence[PromptMessageTool] | None = None,
stop: Optional[list[str]] = None,
stream: bool = True,
user: Optional[str] = None,
callbacks: Optional[list[Callback]] = None,
) -> LLMResultWithStructuredOutput | Generator[LLMResultChunkWithStructuredOutput, None, None]: ...
def invoke_llm_with_structured_output(
provider: str,
model_schema: AIModelEntity,
model_instance: ModelInstance,
prompt_messages: Sequence[PromptMessage],
json_schema: Mapping[str, Any],
model_parameters: Optional[Mapping] = None,
tools: Sequence[PromptMessageTool] | None = None,
stop: Optional[list[str]] = None,
stream: bool = True,
user: Optional[str] = None,
callbacks: Optional[list[Callback]] = None,
) -> LLMResultWithStructuredOutput | Generator[LLMResultChunkWithStructuredOutput, None, None]:
"""
Invoke large language model with structured output
1. This method invokes model_instance.invoke_llm with json_schema
2. Try to parse the result as structured output
:param prompt_messages: prompt messages
:param json_schema: json schema
:param model_parameters: model parameters
:param tools: tools for tool calling
:param stop: stop words
:param stream: is stream response
:param user: unique user id
:param callbacks: callbacks
:return: full response or stream response chunk generator result
"""
# handle native json schema
model_parameters_with_json_schema: dict[str, Any] = {
**(model_parameters or {}),
}
if model_schema.support_structure_output:
model_parameters = _handle_native_json_schema(
provider, model_schema, json_schema, model_parameters_with_json_schema, model_schema.parameter_rules
)
else:
# Set appropriate response format based on model capabilities
_set_response_format(model_parameters_with_json_schema, model_schema.parameter_rules)
# handle prompt based schema
prompt_messages = _handle_prompt_based_schema(
prompt_messages=prompt_messages,
structured_output_schema=json_schema,
)
llm_result = model_instance.invoke_llm(
prompt_messages=list(prompt_messages),
model_parameters=model_parameters_with_json_schema,
tools=tools,
stop=stop,
stream=stream,
user=user,
callbacks=callbacks,
)
if isinstance(llm_result, LLMResult):
if not isinstance(llm_result.message.content, str):
raise OutputParserError(
f"Failed to parse structured output, LLM result is not a string: {llm_result.message.content}"
)
return LLMResultWithStructuredOutput(
structured_output=_parse_structured_output(llm_result.message.content),
model=llm_result.model,
message=llm_result.message,
usage=llm_result.usage,
system_fingerprint=llm_result.system_fingerprint,
prompt_messages=llm_result.prompt_messages,
)
else:
def generator() -> Generator[LLMResultChunkWithStructuredOutput, None, None]:
result_text: str = ""
prompt_messages: Sequence[PromptMessage] = []
system_fingerprint: Optional[str] = None
for event in llm_result:
if isinstance(event, LLMResultChunk):
if isinstance(event.delta.message.content, str):
result_text += event.delta.message.content
prompt_messages = event.prompt_messages
system_fingerprint = event.system_fingerprint
yield LLMResultChunkWithStructuredOutput(
model=model_schema.model,
prompt_messages=prompt_messages,
system_fingerprint=system_fingerprint,
delta=event.delta,
)
yield LLMResultChunkWithStructuredOutput(
structured_output=_parse_structured_output(result_text),
model=model_schema.model,
prompt_messages=prompt_messages,
system_fingerprint=system_fingerprint,
delta=LLMResultChunkDelta(
index=0,
message=AssistantPromptMessage(content=""),
usage=None,
finish_reason=None,
),
)
return generator()
def _handle_native_json_schema(
provider: str,
model_schema: AIModelEntity,
structured_output_schema: Mapping,
model_parameters: dict,
rules: list[ParameterRule],
) -> dict:
"""
Handle structured output for models with native JSON schema support.
:param model_parameters: Model parameters to update
:param rules: Model parameter rules
:return: Updated model parameters with JSON schema configuration
"""
# Process schema according to model requirements
schema_json = _prepare_schema_for_model(provider, model_schema, structured_output_schema)
# Set JSON schema in parameters
model_parameters["json_schema"] = json.dumps(schema_json, ensure_ascii=False)
# Set appropriate response format if required by the model
for rule in rules:
if rule.name == "response_format" and ResponseFormat.JSON_SCHEMA.value in rule.options:
model_parameters["response_format"] = ResponseFormat.JSON_SCHEMA.value
return model_parameters
def _set_response_format(model_parameters: dict, rules: list) -> None:
"""
Set the appropriate response format parameter based on model rules.
:param model_parameters: Model parameters to update
:param rules: Model parameter rules
"""
for rule in rules:
if rule.name == "response_format":
if ResponseFormat.JSON.value in rule.options:
model_parameters["response_format"] = ResponseFormat.JSON.value
elif ResponseFormat.JSON_OBJECT.value in rule.options:
model_parameters["response_format"] = ResponseFormat.JSON_OBJECT.value
def _handle_prompt_based_schema(
prompt_messages: Sequence[PromptMessage], structured_output_schema: Mapping
) -> list[PromptMessage]:
"""
Handle structured output for models without native JSON schema support.
This function modifies the prompt messages to include schema-based output requirements.
Args:
prompt_messages: Original sequence of prompt messages
Returns:
list[PromptMessage]: Updated prompt messages with structured output requirements
"""
# Convert schema to string format
schema_str = json.dumps(structured_output_schema, ensure_ascii=False)
# Find existing system prompt with schema placeholder
system_prompt = next(
(prompt for prompt in prompt_messages if isinstance(prompt, SystemPromptMessage)),
None,
)
structured_output_prompt = STRUCTURED_OUTPUT_PROMPT.replace("{{schema}}", schema_str)
# Prepare system prompt content
system_prompt_content = (
structured_output_prompt + "\n\n" + system_prompt.content
if system_prompt and isinstance(system_prompt.content, str)
else structured_output_prompt
)
system_prompt = SystemPromptMessage(content=system_prompt_content)
# Extract content from the last user message
filtered_prompts = [prompt for prompt in prompt_messages if not isinstance(prompt, SystemPromptMessage)]
updated_prompt = [system_prompt] + filtered_prompts
return updated_prompt
def _parse_structured_output(result_text: str) -> Mapping[str, Any]:
structured_output: Mapping[str, Any] = {}
parsed: Mapping[str, Any] = {}
try:
parsed = TypeAdapter(Mapping).validate_json(result_text)
if not isinstance(parsed, dict):
raise OutputParserError(f"Failed to parse structured output: {result_text}")
structured_output = parsed
except ValidationError:
# if the result_text is not a valid json, try to repair it
temp_parsed = json_repair.loads(result_text)
if not isinstance(temp_parsed, dict):
# handle reasoning model like deepseek-r1 got '<think>\n\n</think>\n' prefix
if isinstance(temp_parsed, list):
temp_parsed = next((item for item in temp_parsed if isinstance(item, dict)), {})
else:
raise OutputParserError(f"Failed to parse structured output: {result_text}")
structured_output = cast(dict, temp_parsed)
return structured_output
def _prepare_schema_for_model(provider: str, model_schema: AIModelEntity, schema: Mapping) -> dict:
"""
Prepare JSON schema based on model requirements.
Different models have different requirements for JSON schema formatting.
This function handles these differences.
:param schema: The original JSON schema
:return: Processed schema compatible with the current model
"""
# Deep copy to avoid modifying the original schema
processed_schema = dict(deepcopy(schema))
# Convert boolean types to string types (common requirement)
convert_boolean_to_string(processed_schema)
# Apply model-specific transformations
if SpecialModelType.GEMINI in model_schema.model:
remove_additional_properties(processed_schema)
return processed_schema
elif SpecialModelType.OLLAMA in provider:
return processed_schema
else:
# Default format with name field
return {"schema": processed_schema, "name": "llm_response"}
def remove_additional_properties(schema: dict) -> None:
"""
Remove additionalProperties fields from JSON schema.
Used for models like Gemini that don't support this property.
:param schema: JSON schema to modify in-place
"""
if not isinstance(schema, dict):
return
# Remove additionalProperties at current level
schema.pop("additionalProperties", None)
# Process nested structures recursively
for value in schema.values():
if isinstance(value, dict):
remove_additional_properties(value)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
remove_additional_properties(item)
def convert_boolean_to_string(schema: dict) -> None:
"""
Convert boolean type specifications to string in JSON schema.
:param schema: JSON schema to modify in-place
"""
if not isinstance(schema, dict):
return
# Check for boolean type at current level
if schema.get("type") == "boolean":
schema["type"] = "string"
# Process nested dictionaries and lists recursively
for value in schema.values():
if isinstance(value, dict):
convert_boolean_to_string(value)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
convert_boolean_to_string(item)

View File

@ -291,3 +291,21 @@ Your task is to convert simple user descriptions into properly formatted JSON Sc
Now, generate a JSON Schema based on my description
""" # noqa: E501
STRUCTURED_OUTPUT_PROMPT = """Youre a helpful AI assistant. You could answer questions and output in JSON format.
constraints:
- You must output in JSON format.
- Do not output boolean value, use string type instead.
- Do not output integer or float value, use number type instead.
eg:
Here is the JSON schema:
{"additionalProperties": false, "properties": {"age": {"type": "number"}, "name": {"type": "string"}}, "required": ["name", "age"], "type": "object"}
Here is the user's question:
My name is John Doe and I am 30 years old.
output:
{"name": "John Doe", "age": 30}
Here is the JSON schema:
{{schema}}
""" # noqa: E501

View File

@ -1,7 +1,7 @@
from collections.abc import Sequence
from collections.abc import Mapping, Sequence
from decimal import Decimal
from enum import StrEnum
from typing import Optional
from typing import Any, Optional
from pydantic import BaseModel, Field
@ -101,6 +101,20 @@ class LLMResult(BaseModel):
system_fingerprint: Optional[str] = None
class LLMStructuredOutput(BaseModel):
"""
Model class for llm structured output.
"""
structured_output: Optional[Mapping[str, Any]] = None
class LLMResultWithStructuredOutput(LLMResult, LLMStructuredOutput):
"""
Model class for llm result with structured output.
"""
class LLMResultChunkDelta(BaseModel):
"""
Model class for llm result chunk delta.
@ -123,6 +137,12 @@ class LLMResultChunk(BaseModel):
delta: LLMResultChunkDelta
class LLMResultChunkWithStructuredOutput(LLMResultChunk, LLMStructuredOutput):
"""
Model class for llm result chunk with structured output.
"""
class NumTokensResult(PriceInfo):
"""
Model class for number of tokens result.

View File

@ -83,6 +83,7 @@ class LangFuseDataTrace(BaseTraceInstance):
metadata=metadata,
session_id=trace_info.conversation_id,
tags=["message", "workflow"],
version=trace_info.workflow_run_version,
)
self.add_trace(langfuse_trace_data=trace_data)
workflow_span_data = LangfuseSpan(
@ -108,6 +109,7 @@ class LangFuseDataTrace(BaseTraceInstance):
metadata=metadata,
session_id=trace_info.conversation_id,
tags=["workflow"],
version=trace_info.workflow_run_version,
)
self.add_trace(langfuse_trace_data=trace_data)
@ -172,37 +174,7 @@ class LangFuseDataTrace(BaseTraceInstance):
}
)
# add span
if trace_info.message_id:
span_data = LangfuseSpan(
id=node_execution_id,
name=node_type,
input=inputs,
output=outputs,
trace_id=trace_id,
start_time=created_at,
end_time=finished_at,
metadata=metadata,
level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR),
status_message=trace_info.error or "",
parent_observation_id=trace_info.workflow_run_id,
)
else:
span_data = LangfuseSpan(
id=node_execution_id,
name=node_type,
input=inputs,
output=outputs,
trace_id=trace_id,
start_time=created_at,
end_time=finished_at,
metadata=metadata,
level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR),
status_message=trace_info.error or "",
)
self.add_span(langfuse_span_data=span_data)
# add generation span
if process_data and process_data.get("model_mode") == "chat":
total_token = metadata.get("total_tokens", 0)
prompt_tokens = 0
@ -226,10 +198,10 @@ class LangFuseDataTrace(BaseTraceInstance):
)
node_generation_data = LangfuseGeneration(
name="llm",
id=node_execution_id,
name=node_name,
trace_id=trace_id,
model=process_data.get("model_name"),
parent_observation_id=node_execution_id,
start_time=created_at,
end_time=finished_at,
input=inputs,
@ -237,11 +209,30 @@ class LangFuseDataTrace(BaseTraceInstance):
metadata=metadata,
level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR),
status_message=trace_info.error or "",
parent_observation_id=trace_info.workflow_run_id if trace_info.message_id else None,
usage=generation_usage,
)
self.add_generation(langfuse_generation_data=node_generation_data)
# add normal span
else:
span_data = LangfuseSpan(
id=node_execution_id,
name=node_name,
input=inputs,
output=outputs,
trace_id=trace_id,
start_time=created_at,
end_time=finished_at,
metadata=metadata,
level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR),
status_message=trace_info.error or "",
parent_observation_id=trace_info.workflow_run_id if trace_info.message_id else None,
)
self.add_span(langfuse_span_data=span_data)
def message_trace(self, trace_info: MessageTraceInfo, **kwargs):
# get message file data
file_list = trace_info.file_list
@ -284,7 +275,7 @@ class LangFuseDataTrace(BaseTraceInstance):
)
self.add_trace(langfuse_trace_data=trace_data)
# start add span
# add generation
generation_usage = GenerationUsage(
input=trace_info.message_tokens,
output=trace_info.answer_tokens,

View File

@ -2,8 +2,15 @@ import tempfile
from binascii import hexlify, unhexlify
from collections.abc import Generator
from core.llm_generator.output_parser.structured_output import invoke_llm_with_structured_output
from core.model_manager import ModelManager
from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta
from core.model_runtime.entities.llm_entities import (
LLMResult,
LLMResultChunk,
LLMResultChunkDelta,
LLMResultChunkWithStructuredOutput,
LLMResultWithStructuredOutput,
)
from core.model_runtime.entities.message_entities import (
PromptMessage,
SystemPromptMessage,
@ -12,6 +19,7 @@ from core.model_runtime.entities.message_entities import (
from core.plugin.backwards_invocation.base import BaseBackwardsInvocation
from core.plugin.entities.request import (
RequestInvokeLLM,
RequestInvokeLLMWithStructuredOutput,
RequestInvokeModeration,
RequestInvokeRerank,
RequestInvokeSpeech2Text,
@ -81,6 +89,72 @@ class PluginModelBackwardsInvocation(BaseBackwardsInvocation):
return handle_non_streaming(response)
@classmethod
def invoke_llm_with_structured_output(
cls, user_id: str, tenant: Tenant, payload: RequestInvokeLLMWithStructuredOutput
):
"""
invoke llm with structured output
"""
model_instance = ModelManager().get_model_instance(
tenant_id=tenant.id,
provider=payload.provider,
model_type=payload.model_type,
model=payload.model,
)
model_schema = model_instance.model_type_instance.get_model_schema(payload.model, model_instance.credentials)
if not model_schema:
raise ValueError(f"Model schema not found for {payload.model}")
response = invoke_llm_with_structured_output(
provider=payload.provider,
model_schema=model_schema,
model_instance=model_instance,
prompt_messages=payload.prompt_messages,
json_schema=payload.structured_output_schema,
tools=payload.tools,
stop=payload.stop,
stream=True if payload.stream is None else payload.stream,
user=user_id,
model_parameters=payload.completion_params,
)
if isinstance(response, Generator):
def handle() -> Generator[LLMResultChunkWithStructuredOutput, None, None]:
for chunk in response:
if chunk.delta.usage:
llm_utils.deduct_llm_quota(
tenant_id=tenant.id, model_instance=model_instance, usage=chunk.delta.usage
)
chunk.prompt_messages = []
yield chunk
return handle()
else:
if response.usage:
llm_utils.deduct_llm_quota(tenant_id=tenant.id, model_instance=model_instance, usage=response.usage)
def handle_non_streaming(
response: LLMResultWithStructuredOutput,
) -> Generator[LLMResultChunkWithStructuredOutput, None, None]:
yield LLMResultChunkWithStructuredOutput(
model=response.model,
prompt_messages=[],
system_fingerprint=response.system_fingerprint,
structured_output=response.structured_output,
delta=LLMResultChunkDelta(
index=0,
message=response.message,
usage=response.usage,
finish_reason="",
),
)
return handle_non_streaming(response)
@classmethod
def invoke_text_embedding(cls, user_id: str, tenant: Tenant, payload: RequestInvokeTextEmbedding):
"""

View File

@ -10,6 +10,9 @@ from core.tools.entities.common_entities import I18nObject
class PluginParameterOption(BaseModel):
value: str = Field(..., description="The value of the option")
label: I18nObject = Field(..., description="The label of the option")
icon: Optional[str] = Field(
default=None, description="The icon of the option, can be a url or a base64 encoded image"
)
@field_validator("value", mode="before")
@classmethod
@ -35,6 +38,7 @@ class PluginParameterType(enum.StrEnum):
APP_SELECTOR = CommonParameterType.APP_SELECTOR.value
MODEL_SELECTOR = CommonParameterType.MODEL_SELECTOR.value
TOOLS_SELECTOR = CommonParameterType.TOOLS_SELECTOR.value
DYNAMIC_SELECT = CommonParameterType.DYNAMIC_SELECT.value
# deprecated, should not use.
SYSTEM_FILES = CommonParameterType.SYSTEM_FILES.value

View File

@ -1,4 +1,4 @@
from collections.abc import Mapping
from collections.abc import Mapping, Sequence
from datetime import datetime
from enum import StrEnum
from typing import Any, Generic, Optional, TypeVar
@ -10,6 +10,7 @@ from core.datasource.entities.datasource_entities import DatasourceProviderEntit
from core.model_runtime.entities.model_entities import AIModelEntity
from core.model_runtime.entities.provider_entities import ProviderEntity
from core.plugin.entities.base import BasePluginEntity
from core.plugin.entities.parameters import PluginParameterOption
from core.plugin.entities.plugin import PluginDeclaration, PluginEntity
from core.tools.entities.common_entities import I18nObject
from core.tools.entities.tool_entities import ToolProviderEntityWithPlugin
@ -195,3 +196,7 @@ class PluginOAuthCredentialsResponse(BaseModel):
class PluginListResponse(BaseModel):
list: list[PluginEntity]
total: int
class PluginDynamicSelectOptionsResponse(BaseModel):
options: Sequence[PluginParameterOption] = Field(description="The options of the dynamic select.")

View File

@ -82,6 +82,16 @@ class RequestInvokeLLM(BaseRequestInvokeModel):
return v
class RequestInvokeLLMWithStructuredOutput(RequestInvokeLLM):
"""
Request to invoke LLM with structured output
"""
structured_output_schema: dict[str, Any] = Field(
default_factory=dict, description="The schema of the structured output in JSON schema format"
)
class RequestInvokeTextEmbedding(BaseRequestInvokeModel):
"""
Request to invoke text embedding

View File

@ -0,0 +1,45 @@
from collections.abc import Mapping
from typing import Any
from core.plugin.entities.plugin import GenericProviderID
from core.plugin.entities.plugin_daemon import PluginDynamicSelectOptionsResponse
from core.plugin.impl.base import BasePluginClient
class DynamicSelectClient(BasePluginClient):
def fetch_dynamic_select_options(
self,
tenant_id: str,
user_id: str,
plugin_id: str,
provider: str,
action: str,
credentials: Mapping[str, Any],
parameter: str,
) -> PluginDynamicSelectOptionsResponse:
"""
Fetch dynamic select options for a plugin parameter.
"""
response = self._request_with_plugin_daemon_response_stream(
"POST",
f"plugin/{tenant_id}/dispatch/dynamic_select/fetch_parameter_options",
PluginDynamicSelectOptionsResponse,
data={
"user_id": user_id,
"data": {
"provider": GenericProviderID(provider).provider_name,
"credentials": credentials,
"provider_action": action,
"parameter": parameter,
},
},
headers={
"X-Plugin-ID": plugin_id,
"Content-Type": "application/json",
},
)
for options in response:
return options
raise ValueError(f"Plugin service returned no options for parameter '{parameter}' in provider '{provider}'")

View File

@ -79,6 +79,7 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
if dataset.indexing_technique == "high_quality":
vector = Vector(dataset)
vector.create(documents)
with_keywords = False
if with_keywords:
keywords_list = kwargs.get("keywords_list")
keyword = Keyword(dataset)
@ -94,6 +95,7 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
vector.delete_by_ids(node_ids)
else:
vector.delete()
with_keywords = False
if with_keywords:
keyword = Keyword(dataset)
if node_ids:

View File

@ -1010,6 +1010,9 @@ class DatasetRetrieval:
def _process_metadata_filter_func(
self, sequence: int, condition: str, metadata_name: str, value: Optional[Any], filters: list
):
if value is None:
return
key = f"{metadata_name}_{sequence}"
key_value = f"{metadata_name}_{sequence}_value"
match condition:

View File

@ -4,6 +4,7 @@ from typing import Any, Optional
from core.helper.code_executor.code_executor import CodeExecutor, CodeLanguage
from core.tools.builtin_tool.tool import BuiltinTool
from core.tools.entities.tool_entities import ToolInvokeMessage
from core.tools.errors import ToolInvokeError
class SimpleCode(BuiltinTool):
@ -25,6 +26,8 @@ class SimpleCode(BuiltinTool):
if language not in {CodeLanguage.PYTHON3, CodeLanguage.JAVASCRIPT}:
raise ValueError(f"Only python3 and javascript are supported, not {language}")
result = CodeExecutor.execute_code(language, "", code)
yield self.create_text_message(result)
try:
result = CodeExecutor.execute_code(language, "", code)
yield self.create_text_message(result)
except Exception as e:
raise ToolInvokeError(str(e))

View File

@ -240,6 +240,7 @@ class ToolParameter(PluginParameter):
FILES = PluginParameterType.FILES.value
APP_SELECTOR = PluginParameterType.APP_SELECTOR.value
MODEL_SELECTOR = PluginParameterType.MODEL_SELECTOR.value
DYNAMIC_SELECT = PluginParameterType.DYNAMIC_SELECT.value
# deprecated, should not use.
SYSTEM_FILES = PluginParameterType.SYSTEM_FILES.value

View File

@ -86,6 +86,7 @@ class ProviderConfigEncrypter(BaseModel):
cached_credentials = cache.get()
if cached_credentials:
return cached_credentials
data = self._deep_copy(data)
# get fields need to be decrypted
fields = dict[str, BasicProviderConfig]()

View File

@ -67,11 +67,21 @@ class WorkflowNodeExecution(BaseModel):
but they are not stored in the model.
"""
# Core identification fields
id: str # Unique identifier for this execution record
node_execution_id: Optional[str] = None # Optional secondary ID for cross-referencing
# --------- Core identification fields ---------
# Unique identifier for this execution record, used when persisting to storage.
# Value is a UUID string (e.g., '09b3e04c-f9ae-404c-ad82-290b8d7bd382').
id: str
# Optional secondary ID for cross-referencing purposes.
#
# NOTE: For referencing the persisted record, use `id` rather than `node_execution_id`.
# While `node_execution_id` may sometimes be a UUID string, this is not guaranteed.
# In most scenarios, `id` should be used as the primary identifier.
node_execution_id: Optional[str] = None
workflow_id: str # ID of the workflow this node belongs to
workflow_execution_id: Optional[str] = None # ID of the specific workflow run (null for single-step debugging)
# --------- Core identification fields ends ---------
# Execution positioning and flow
index: int # Sequence number for ordering in trace visualization

View File

@ -158,7 +158,10 @@ class AgentNode(ToolNode):
# variable_pool.convert_template expects a string template,
# but if passing a dict, convert to JSON string first before rendering
try:
parameter_value = json.dumps(agent_input.value, ensure_ascii=False)
if not isinstance(agent_input.value, str):
parameter_value = json.dumps(agent_input.value, ensure_ascii=False)
else:
parameter_value = str(agent_input.value)
except TypeError:
parameter_value = str(agent_input.value)
segment_group = variable_pool.convert_template(parameter_value)
@ -166,7 +169,8 @@ class AgentNode(ToolNode):
# variable_pool.convert_template returns a string,
# so we need to convert it back to a dictionary
try:
parameter_value = json.loads(parameter_value)
if not isinstance(agent_input.value, str):
parameter_value = json.loads(parameter_value)
except json.JSONDecodeError:
parameter_value = parameter_value
else:

View File

@ -2,7 +2,6 @@ import logging
from collections.abc import Generator
from typing import cast
from core.file import FILE_MODEL_IDENTITY, File
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.graph_engine.entities.event import (
GraphEngineEvent,
@ -201,44 +200,3 @@ class AnswerStreamProcessor(StreamProcessor):
stream_out_answer_node_ids.append(answer_node_id)
return stream_out_answer_node_ids
@classmethod
def _fetch_files_from_variable_value(cls, value: dict | list) -> list[dict]:
"""
Fetch files from variable value
:param value: variable value
:return:
"""
if not value:
return []
files = []
if isinstance(value, list):
for item in value:
file_var = cls._get_file_var_from_value(item)
if file_var:
files.append(file_var)
elif isinstance(value, dict):
file_var = cls._get_file_var_from_value(value)
if file_var:
files.append(file_var)
return files
@classmethod
def _get_file_var_from_value(cls, value: dict | list):
"""
Get file var from value
:param value: variable value
:return:
"""
if not value:
return None
if isinstance(value, dict):
if "dify_model_identity" in value and value["dify_model_identity"] == FILE_MODEL_IDENTITY:
return value
elif isinstance(value, File):
return value.to_dict()
return None

View File

@ -333,7 +333,7 @@ class Executor:
try:
response = getattr(ssrf_proxy, self.method.lower())(**request_args)
except (ssrf_proxy.MaxRetriesExceededError, httpx.RequestError) as e:
raise HttpRequestNodeError(str(e))
raise HttpRequestNodeError(str(e)) from e
# FIXME: fix type ignore, this maybe httpx type issue
return response # type: ignore

View File

@ -490,6 +490,9 @@ class KnowledgeRetrievalNode(LLMNode):
def _process_metadata_filter_func(
self, sequence: int, condition: str, metadata_name: str, value: Optional[Any], filters: list
):
if value is None:
return
key = f"{metadata_name}_{sequence}"
key_value = f"{metadata_name}_{sequence}_value"
match condition:

View File

@ -5,11 +5,11 @@ import logging
from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any, Optional, cast
import json_repair
from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
from core.file import FileType, file_manager
from core.helper.code_executor import CodeExecutor, CodeLanguage
from core.llm_generator.output_parser.errors import OutputParserError
from core.llm_generator.output_parser.structured_output import invoke_llm_with_structured_output
from core.memory.token_buffer_memory import TokenBufferMemory
from core.model_manager import ModelInstance, ModelManager
from core.model_runtime.entities import (
@ -18,7 +18,13 @@ from core.model_runtime.entities import (
PromptMessageContentType,
TextPromptMessageContent,
)
from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMUsage
from core.model_runtime.entities.llm_entities import (
LLMResult,
LLMResultChunk,
LLMResultChunkWithStructuredOutput,
LLMStructuredOutput,
LLMUsage,
)
from core.model_runtime.entities.message_entities import (
AssistantPromptMessage,
PromptMessageContentUnionTypes,
@ -31,7 +37,6 @@ from core.model_runtime.entities.model_entities import (
ModelFeature,
ModelPropertyKey,
ModelType,
ParameterRule,
)
from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from core.model_runtime.utils.encoders import jsonable_encoder
@ -62,11 +67,6 @@ from core.workflow.nodes.event import (
RunRetrieverResourceEvent,
RunStreamChunkEvent,
)
from core.workflow.utils.structured_output.entities import (
ResponseFormat,
SpecialModelType,
)
from core.workflow.utils.structured_output.prompt import STRUCTURED_OUTPUT_PROMPT
from core.workflow.utils.variable_template_parser import VariableTemplateParser
from . import llm_utils
@ -143,12 +143,6 @@ class LLMNode(BaseNode[LLMNodeData]):
return "1"
def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]:
def process_structured_output(text: str) -> Optional[dict[str, Any]]:
"""Process structured output if enabled"""
if not self.node_data.structured_output_enabled or not self.node_data.structured_output:
return None
return self._parse_structured_output(text)
node_inputs: Optional[dict[str, Any]] = None
process_data = None
result_text = ""
@ -244,6 +238,8 @@ class LLMNode(BaseNode[LLMNodeData]):
stop=stop,
)
structured_output: LLMStructuredOutput | None = None
for event in generator:
if isinstance(event, RunStreamChunkEvent):
yield event
@ -254,10 +250,12 @@ class LLMNode(BaseNode[LLMNodeData]):
# deduct quota
llm_utils.deduct_llm_quota(tenant_id=self.tenant_id, model_instance=model_instance, usage=usage)
break
elif isinstance(event, LLMStructuredOutput):
structured_output = event
outputs = {"text": result_text, "usage": jsonable_encoder(usage), "finish_reason": finish_reason}
structured_output = process_structured_output(result_text)
if structured_output:
outputs["structured_output"] = structured_output
outputs["structured_output"] = structured_output.structured_output
if self._file_outputs is not None:
outputs["files"] = ArrayFileSegment(value=self._file_outputs)
@ -302,20 +300,40 @@ class LLMNode(BaseNode[LLMNodeData]):
model_instance: ModelInstance,
prompt_messages: Sequence[PromptMessage],
stop: Optional[Sequence[str]] = None,
) -> Generator[NodeEvent, None, None]:
invoke_result = model_instance.invoke_llm(
prompt_messages=list(prompt_messages),
model_parameters=node_data_model.completion_params,
stop=list(stop or []),
stream=True,
user=self.user_id,
) -> Generator[NodeEvent | LLMStructuredOutput, None, None]:
model_schema = model_instance.model_type_instance.get_model_schema(
node_data_model.name, model_instance.credentials
)
if not model_schema:
raise ValueError(f"Model schema not found for {node_data_model.name}")
if self.node_data.structured_output_enabled:
output_schema = self._fetch_structured_output_schema()
invoke_result = invoke_llm_with_structured_output(
provider=model_instance.provider,
model_schema=model_schema,
model_instance=model_instance,
prompt_messages=prompt_messages,
json_schema=output_schema,
model_parameters=node_data_model.completion_params,
stop=list(stop or []),
stream=True,
user=self.user_id,
)
else:
invoke_result = model_instance.invoke_llm(
prompt_messages=list(prompt_messages),
model_parameters=node_data_model.completion_params,
stop=list(stop or []),
stream=True,
user=self.user_id,
)
return self._handle_invoke_result(invoke_result=invoke_result)
def _handle_invoke_result(
self, invoke_result: LLMResult | Generator[LLMResultChunk, None, None]
) -> Generator[NodeEvent, None, None]:
self, invoke_result: LLMResult | Generator[LLMResultChunk | LLMStructuredOutput, None, None]
) -> Generator[NodeEvent | LLMStructuredOutput, None, None]:
# For blocking mode
if isinstance(invoke_result, LLMResult):
event = self._handle_blocking_result(invoke_result=invoke_result)
@ -329,23 +347,32 @@ class LLMNode(BaseNode[LLMNodeData]):
usage = LLMUsage.empty_usage()
finish_reason = None
full_text_buffer = io.StringIO()
for result in invoke_result:
contents = result.delta.message.content
for text_part in self._save_multimodal_output_and_convert_result_to_markdown(contents):
full_text_buffer.write(text_part)
yield RunStreamChunkEvent(chunk_content=text_part, from_variable_selector=[self.node_id, "text"])
# Consume the invoke result and handle generator exception
try:
for result in invoke_result:
if isinstance(result, LLMResultChunkWithStructuredOutput):
yield result
if isinstance(result, LLMResultChunk):
contents = result.delta.message.content
for text_part in self._save_multimodal_output_and_convert_result_to_markdown(contents):
full_text_buffer.write(text_part)
yield RunStreamChunkEvent(
chunk_content=text_part, from_variable_selector=[self.node_id, "text"]
)
# Update the whole metadata
if not model and result.model:
model = result.model
if len(prompt_messages) == 0:
# TODO(QuantumGhost): it seems that this update has no visable effect.
# What's the purpose of the line below?
prompt_messages = list(result.prompt_messages)
if usage.prompt_tokens == 0 and result.delta.usage:
usage = result.delta.usage
if finish_reason is None and result.delta.finish_reason:
finish_reason = result.delta.finish_reason
# Update the whole metadata
if not model and result.model:
model = result.model
if len(prompt_messages) == 0:
# TODO(QuantumGhost): it seems that this update has no visable effect.
# What's the purpose of the line below?
prompt_messages = list(result.prompt_messages)
if usage.prompt_tokens == 0 and result.delta.usage:
usage = result.delta.usage
if finish_reason is None and result.delta.finish_reason:
finish_reason = result.delta.finish_reason
except OutputParserError as e:
raise LLMNodeError(f"Failed to parse structured output: {e}")
yield ModelInvokeCompletedEvent(text=full_text_buffer.getvalue(), usage=usage, finish_reason=finish_reason)
@ -522,12 +549,6 @@ class LLMNode(BaseNode[LLMNodeData]):
if not model_schema:
raise ModelNotExistError(f"Model {node_data_model.name} not exist.")
if self.node_data.structured_output_enabled:
if model_schema.support_structure_output:
completion_params = self._handle_native_json_schema(completion_params, model_schema.parameter_rules)
else:
# Set appropriate response format based on model capabilities
self._set_response_format(completion_params, model_schema.parameter_rules)
model_config_with_cred.parameters = completion_params
# NOTE(-LAN-): This line modify the `self.node_data.model`, which is used in `_invoke_llm()`.
node_data_model.completion_params = completion_params
@ -719,32 +740,8 @@ class LLMNode(BaseNode[LLMNodeData]):
)
if not model_schema:
raise ModelNotExistError(f"Model {model_config.model} not exist.")
if self.node_data.structured_output_enabled:
if not model_schema.support_structure_output:
filtered_prompt_messages = self._handle_prompt_based_schema(
prompt_messages=filtered_prompt_messages,
)
return filtered_prompt_messages, model_config.stop
def _parse_structured_output(self, result_text: str) -> dict[str, Any]:
structured_output: dict[str, Any] = {}
try:
parsed = json.loads(result_text)
if not isinstance(parsed, dict):
raise LLMNodeError(f"Failed to parse structured output: {result_text}")
structured_output = parsed
except json.JSONDecodeError as e:
# if the result_text is not a valid json, try to repair it
parsed = json_repair.loads(result_text)
if not isinstance(parsed, dict):
# handle reasoning model like deepseek-r1 got '<think>\n\n</think>\n' prefix
if isinstance(parsed, list):
parsed = next((item for item in parsed if isinstance(item, dict)), {})
else:
raise LLMNodeError(f"Failed to parse structured output: {result_text}")
structured_output = parsed
return structured_output
@classmethod
def _extract_variable_selector_to_variable_mapping(
cls,
@ -934,104 +931,6 @@ class LLMNode(BaseNode[LLMNodeData]):
self._file_outputs.append(saved_file)
return saved_file
def _handle_native_json_schema(self, model_parameters: dict, rules: list[ParameterRule]) -> dict:
"""
Handle structured output for models with native JSON schema support.
:param model_parameters: Model parameters to update
:param rules: Model parameter rules
:return: Updated model parameters with JSON schema configuration
"""
# Process schema according to model requirements
schema = self._fetch_structured_output_schema()
schema_json = self._prepare_schema_for_model(schema)
# Set JSON schema in parameters
model_parameters["json_schema"] = json.dumps(schema_json, ensure_ascii=False)
# Set appropriate response format if required by the model
for rule in rules:
if rule.name == "response_format" and ResponseFormat.JSON_SCHEMA.value in rule.options:
model_parameters["response_format"] = ResponseFormat.JSON_SCHEMA.value
return model_parameters
def _handle_prompt_based_schema(self, prompt_messages: Sequence[PromptMessage]) -> list[PromptMessage]:
"""
Handle structured output for models without native JSON schema support.
This function modifies the prompt messages to include schema-based output requirements.
Args:
prompt_messages: Original sequence of prompt messages
Returns:
list[PromptMessage]: Updated prompt messages with structured output requirements
"""
# Convert schema to string format
schema_str = json.dumps(self._fetch_structured_output_schema(), ensure_ascii=False)
# Find existing system prompt with schema placeholder
system_prompt = next(
(prompt for prompt in prompt_messages if isinstance(prompt, SystemPromptMessage)),
None,
)
structured_output_prompt = STRUCTURED_OUTPUT_PROMPT.replace("{{schema}}", schema_str)
# Prepare system prompt content
system_prompt_content = (
structured_output_prompt + "\n\n" + system_prompt.content
if system_prompt and isinstance(system_prompt.content, str)
else structured_output_prompt
)
system_prompt = SystemPromptMessage(content=system_prompt_content)
# Extract content from the last user message
filtered_prompts = [prompt for prompt in prompt_messages if not isinstance(prompt, SystemPromptMessage)]
updated_prompt = [system_prompt] + filtered_prompts
return updated_prompt
def _set_response_format(self, model_parameters: dict, rules: list) -> None:
"""
Set the appropriate response format parameter based on model rules.
:param model_parameters: Model parameters to update
:param rules: Model parameter rules
"""
for rule in rules:
if rule.name == "response_format":
if ResponseFormat.JSON.value in rule.options:
model_parameters["response_format"] = ResponseFormat.JSON.value
elif ResponseFormat.JSON_OBJECT.value in rule.options:
model_parameters["response_format"] = ResponseFormat.JSON_OBJECT.value
def _prepare_schema_for_model(self, schema: dict) -> dict:
"""
Prepare JSON schema based on model requirements.
Different models have different requirements for JSON schema formatting.
This function handles these differences.
:param schema: The original JSON schema
:return: Processed schema compatible with the current model
"""
# Deep copy to avoid modifying the original schema
processed_schema = schema.copy()
# Convert boolean types to string types (common requirement)
convert_boolean_to_string(processed_schema)
# Apply model-specific transformations
if SpecialModelType.GEMINI in self.node_data.model.name:
remove_additional_properties(processed_schema)
return processed_schema
elif SpecialModelType.OLLAMA in self.node_data.model.provider:
return processed_schema
else:
# Default format with name field
return {"schema": processed_schema, "name": "llm_response"}
def _fetch_model_schema(self, provider: str) -> AIModelEntity | None:
"""
Fetch model schema
@ -1243,49 +1142,3 @@ def _handle_completion_template(
)
prompt_messages.append(prompt_message)
return prompt_messages
def remove_additional_properties(schema: dict) -> None:
"""
Remove additionalProperties fields from JSON schema.
Used for models like Gemini that don't support this property.
:param schema: JSON schema to modify in-place
"""
if not isinstance(schema, dict):
return
# Remove additionalProperties at current level
schema.pop("additionalProperties", None)
# Process nested structures recursively
for value in schema.values():
if isinstance(value, dict):
remove_additional_properties(value)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
remove_additional_properties(item)
def convert_boolean_to_string(schema: dict) -> None:
"""
Convert boolean type specifications to string in JSON schema.
:param schema: JSON schema to modify in-place
"""
if not isinstance(schema, dict):
return
# Check for boolean type at current level
if schema.get("type") == "boolean":
schema["type"] = "string"
# Process nested dictionaries and lists recursively
for value in schema.values():
if isinstance(value, dict):
convert_boolean_to_string(value)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
convert_boolean_to_string(item)

View File

@ -167,7 +167,9 @@ class ToolNode(BaseNode[ToolNodeData]):
if tool_input.type == "variable":
variable = variable_pool.get(tool_input.value)
if variable is None:
raise ToolParameterError(f"Variable {tool_input.value} does not exist")
if parameter.required:
raise ToolParameterError(f"Variable {tool_input.value} does not exist")
continue
parameter_value = variable.value
elif tool_input.type in {"mixed", "constant"}:
segment_group = variable_pool.convert_template(str(tool_input.value))

View File

@ -0,0 +1,32 @@
import abc
from collections.abc import Mapping
from typing import Any, Protocol
from sqlalchemy.orm import Session
from core.workflow.nodes.enums import NodeType
class DraftVariableSaver(Protocol):
@abc.abstractmethod
def save(self, process_data: Mapping[str, Any] | None, outputs: Mapping[str, Any] | None):
pass
class DraftVariableSaverFactory(Protocol):
@abc.abstractmethod
def __call__(
self,
session: Session,
app_id: str,
node_id: str,
node_type: NodeType,
node_execution_id: str,
enclosing_node_id: str | None = None,
) -> "DraftVariableSaver":
pass
class NoopDraftVariableSaver(DraftVariableSaver):
def save(self, process_data: Mapping[str, Any] | None, outputs: Mapping[str, Any] | None):
pass

View File

@ -1,16 +0,0 @@
from enum import StrEnum
class ResponseFormat(StrEnum):
"""Constants for model response formats"""
JSON_SCHEMA = "json_schema" # model's structured output mode. some model like gemini, gpt-4o, support this mode.
JSON = "JSON" # model's json mode. some model like claude support this mode.
JSON_OBJECT = "json_object" # json mode's another alias. some model like deepseek-chat, qwen use this alias.
class SpecialModelType(StrEnum):
"""Constants for identifying model types"""
GEMINI = "gemini"
OLLAMA = "ollama"

View File

@ -1,17 +0,0 @@
STRUCTURED_OUTPUT_PROMPT = """Youre a helpful AI assistant. You could answer questions and output in JSON format.
constraints:
- You must output in JSON format.
- Do not output boolean value, use string type instead.
- Do not output integer or float value, use number type instead.
eg:
Here is the JSON schema:
{"additionalProperties": false, "properties": {"age": {"type": "number"}, "name": {"type": "string"}}, "required": ["name", "age"], "type": "object"}
Here is the user's question:
My name is John Doe and I am 30 years old.
output:
{"name": "John Doe", "age": 30}
Here is the JSON schema:
{{schema}}
""" # noqa: E501

View File

@ -7,6 +7,7 @@ def append_variables_recursively(
):
"""
Append variables recursively
:param pool: variable pool to append variables to
:param node_id: node id
:param variable_key_list: variable key list
:param variable_value: variable value

View File

@ -27,6 +27,7 @@ from core.workflow.enums import SystemVariableKey
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.workflow_entry import WorkflowEntry
from libs.datetime_utils import naive_utc_now
@dataclass
@ -160,12 +161,13 @@ class WorkflowCycleManager:
exceptions_count: int = 0,
) -> WorkflowExecution:
workflow_execution = self._get_workflow_execution_or_raise_error(workflow_run_id)
now = naive_utc_now()
workflow_execution.status = WorkflowExecutionStatus(status.value)
workflow_execution.error_message = error_message
workflow_execution.total_tokens = total_tokens
workflow_execution.total_steps = total_steps
workflow_execution.finished_at = datetime.now(UTC).replace(tzinfo=None)
workflow_execution.finished_at = now
workflow_execution.exceptions_count = exceptions_count
# Use the instance repository to find running executions for a workflow run
@ -174,7 +176,6 @@ class WorkflowCycleManager:
)
# Update the domain models
now = datetime.now(UTC).replace(tzinfo=None)
for node_execution in running_node_executions:
if node_execution.node_execution_id:
# Update the domain model

View File

@ -300,7 +300,7 @@ class WorkflowEntry:
return node_instance, generator
except Exception as e:
logger.exception(
"error while running node_instance, workflow_id=%s, node_id=%s, type=%s, version=%s",
"error while running node_instance, node_id=%s, type=%s, version=%s",
node_instance.id,
node_instance.node_type,
node_instance.version(),