mirror of
https://github.com/langgenius/dify.git
synced 2026-05-04 09:28:04 +08:00
Merge remote-tracking branch 'origin/feat/r2' into feat/r2
This commit is contained in:
@ -1,4 +1,4 @@
|
||||
SYSTEM_VARIABLE_NODE_ID = "sys"
|
||||
ENVIRONMENT_VARIABLE_NODE_ID = "env"
|
||||
CONVERSATION_VARIABLE_NODE_ID = "conversation"
|
||||
PIPELINE_VARIABLE_NODE_ID = "pipeline"
|
||||
PIPELINE_VARIABLE_NODE_ID = "pipeline"
|
||||
|
||||
@ -1,13 +0,0 @@
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from core.workflow.graph_engine.entities.graph import GraphParallel
|
||||
|
||||
|
||||
class NextGraphNode(BaseModel):
|
||||
node_id: str
|
||||
"""next node id"""
|
||||
|
||||
parallel: Optional[GraphParallel] = None
|
||||
"""parallel"""
|
||||
@ -7,8 +7,8 @@ from core.agent.plugin_entities import AgentStrategyParameter
|
||||
from core.memory.token_buffer_memory import TokenBufferMemory
|
||||
from core.model_manager import ModelInstance, ModelManager
|
||||
from core.model_runtime.entities.model_entities import AIModelEntity, ModelType
|
||||
from core.plugin.manager.exc import PluginDaemonClientSideError
|
||||
from core.plugin.manager.plugin import PluginInstallationManager
|
||||
from core.plugin.impl.exc import PluginDaemonClientSideError
|
||||
from core.plugin.impl.plugin import PluginInstaller
|
||||
from core.provider_manager import ProviderManager
|
||||
from core.tools.entities.tool_entities import ToolParameter, ToolProviderType
|
||||
from core.tools.tool_manager import ToolManager
|
||||
@ -16,7 +16,7 @@ from core.variables.segments import StringSegment
|
||||
from core.workflow.entities.node_entities import NodeRunResult
|
||||
from core.workflow.entities.variable_pool import VariablePool
|
||||
from core.workflow.enums import SystemVariableKey
|
||||
from core.workflow.nodes.agent.entities import AgentNodeData, ParamsAutoGenerated
|
||||
from core.workflow.nodes.agent.entities import AgentNodeData, AgentOldVersionModelFeatures, ParamsAutoGenerated
|
||||
from core.workflow.nodes.base.entities import BaseNodeData
|
||||
from core.workflow.nodes.enums import NodeType
|
||||
from core.workflow.nodes.event.event import RunCompletedEvent
|
||||
@ -251,7 +251,12 @@ class AgentNode(ToolNode):
|
||||
prompt_message.model_dump(mode="json") for prompt_message in prompt_messages
|
||||
]
|
||||
value["history_prompt_messages"] = history_prompt_messages
|
||||
value["entity"] = model_schema.model_dump(mode="json") if model_schema else None
|
||||
if model_schema:
|
||||
# remove structured output feature to support old version agent plugin
|
||||
model_schema = self._remove_unsupported_model_features_for_old_version(model_schema)
|
||||
value["entity"] = model_schema.model_dump(mode="json")
|
||||
else:
|
||||
value["entity"] = None
|
||||
result[parameter_name] = value
|
||||
|
||||
return result
|
||||
@ -292,7 +297,7 @@ class AgentNode(ToolNode):
|
||||
Get agent strategy icon
|
||||
:return:
|
||||
"""
|
||||
manager = PluginInstallationManager()
|
||||
manager = PluginInstaller()
|
||||
plugins = manager.list_plugins(self.tenant_id)
|
||||
try:
|
||||
current_plugin = next(
|
||||
@ -348,3 +353,10 @@ class AgentNode(ToolNode):
|
||||
)
|
||||
model_schema = model_type_instance.get_model_schema(model_name, model_credentials)
|
||||
return model_instance, model_schema
|
||||
|
||||
def _remove_unsupported_model_features_for_old_version(self, model_schema: AIModelEntity) -> AIModelEntity:
|
||||
if model_schema.features:
|
||||
for feature in model_schema.features:
|
||||
if feature.value not in AgentOldVersionModelFeatures:
|
||||
model_schema.features.remove(feature)
|
||||
return model_schema
|
||||
|
||||
@ -24,3 +24,18 @@ class AgentNodeData(BaseNodeData):
|
||||
class ParamsAutoGenerated(Enum):
|
||||
CLOSE = 0
|
||||
OPEN = 1
|
||||
|
||||
|
||||
class AgentOldVersionModelFeatures(Enum):
|
||||
"""
|
||||
Enum class for old SDK version llm feature.
|
||||
"""
|
||||
|
||||
TOOL_CALL = "tool-call"
|
||||
MULTI_TOOL_CALL = "multi-tool-call"
|
||||
AGENT_THOUGHT = "agent-thought"
|
||||
VISION = "vision"
|
||||
STREAM_TOOL_CALL = "stream-tool-call"
|
||||
DOCUMENT = "document"
|
||||
VIDEO = "video"
|
||||
AUDIO = "audio"
|
||||
|
||||
@ -155,9 +155,28 @@ class AnswerStreamProcessor(StreamProcessor):
|
||||
for answer_node_id, route_position in self.route_position.items():
|
||||
if answer_node_id not in self.rest_node_ids:
|
||||
continue
|
||||
# exclude current node id
|
||||
# Remove current node id from answer dependencies to support stream output if it is a success branch
|
||||
answer_dependencies = self.generate_routes.answer_dependencies
|
||||
if event.node_id in answer_dependencies[answer_node_id]:
|
||||
edge_mapping = self.graph.edge_mapping.get(event.node_id)
|
||||
success_edge = (
|
||||
next(
|
||||
(
|
||||
edge
|
||||
for edge in edge_mapping
|
||||
if edge.run_condition
|
||||
and edge.run_condition.type == "branch_identify"
|
||||
and edge.run_condition.branch_identify == "success-branch"
|
||||
),
|
||||
None,
|
||||
)
|
||||
if edge_mapping
|
||||
else None
|
||||
)
|
||||
if (
|
||||
event.node_id in answer_dependencies[answer_node_id]
|
||||
and success_edge
|
||||
and success_edge.target_node_id == answer_node_id
|
||||
):
|
||||
answer_dependencies[answer_node_id].remove(event.node_id)
|
||||
answer_dependencies_ids = answer_dependencies.get(answer_node_id, [])
|
||||
# all depends on answer node id not in rest node ids
|
||||
|
||||
@ -90,6 +90,7 @@ class HttpRequestNodeData(BaseNodeData):
|
||||
params: str
|
||||
body: Optional[HttpRequestNodeBody] = None
|
||||
timeout: Optional[HttpRequestNodeTimeout] = None
|
||||
ssl_verify: Optional[bool] = dify_config.HTTP_REQUEST_NODE_SSL_VERIFY
|
||||
|
||||
|
||||
class Response:
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import base64
|
||||
import json
|
||||
from collections.abc import Mapping
|
||||
from copy import deepcopy
|
||||
@ -87,6 +88,7 @@ class Executor:
|
||||
self.method = node_data.method
|
||||
self.auth = node_data.authorization
|
||||
self.timeout = timeout
|
||||
self.ssl_verify = node_data.ssl_verify
|
||||
self.params = []
|
||||
self.headers = {}
|
||||
self.content = None
|
||||
@ -259,7 +261,9 @@ class Executor:
|
||||
if self.auth.config.type == "bearer":
|
||||
headers[authorization.config.header] = f"Bearer {authorization.config.api_key}"
|
||||
elif self.auth.config.type == "basic":
|
||||
headers[authorization.config.header] = f"Basic {authorization.config.api_key}"
|
||||
credentials = authorization.config.api_key
|
||||
encoded_credentials = base64.b64encode(credentials.encode("utf-8")).decode("utf-8")
|
||||
headers[authorization.config.header] = f"Basic {encoded_credentials}"
|
||||
elif self.auth.config.type == "custom":
|
||||
headers[authorization.config.header] = authorization.config.api_key or ""
|
||||
|
||||
@ -313,6 +317,7 @@ class Executor:
|
||||
"headers": headers,
|
||||
"params": self.params,
|
||||
"timeout": (self.timeout.connect, self.timeout.read, self.timeout.write),
|
||||
"ssl_verify": self.ssl_verify,
|
||||
"follow_redirects": True,
|
||||
"max_retries": self.max_retries,
|
||||
}
|
||||
|
||||
@ -51,6 +51,7 @@ class HttpRequestNode(BaseNode[HttpRequestNodeData]):
|
||||
"max_read_timeout": dify_config.HTTP_REQUEST_MAX_READ_TIMEOUT,
|
||||
"max_write_timeout": dify_config.HTTP_REQUEST_MAX_WRITE_TIMEOUT,
|
||||
},
|
||||
"ssl_verify": dify_config.HTTP_REQUEST_NODE_SSL_VERIFY,
|
||||
},
|
||||
"retry_config": {
|
||||
"max_retries": dify_config.SSRF_DEFAULT_MAX_RETRIES,
|
||||
|
||||
@ -259,6 +259,7 @@ class KnowledgeRetrievalNode(LLMNode):
|
||||
"_source": "knowledge",
|
||||
"dataset_id": item.metadata.get("dataset_id"),
|
||||
"dataset_name": item.metadata.get("dataset_name"),
|
||||
"document_id": item.metadata.get("document_id") or item.metadata.get("title"),
|
||||
"document_name": item.metadata.get("title"),
|
||||
"data_source_type": "external",
|
||||
"retriever_from": "workflow",
|
||||
@ -348,7 +349,9 @@ class KnowledgeRetrievalNode(LLMNode):
|
||||
)
|
||||
)
|
||||
metadata_condition = MetadataCondition(
|
||||
logical_operator=node_data.metadata_filtering_conditions.logical_operator, # type: ignore
|
||||
logical_operator=node_data.metadata_filtering_conditions.logical_operator
|
||||
if node_data.metadata_filtering_conditions
|
||||
else "or", # type: ignore
|
||||
conditions=conditions,
|
||||
)
|
||||
elif node_data.metadata_filtering_mode == "manual":
|
||||
@ -379,7 +382,10 @@ class KnowledgeRetrievalNode(LLMNode):
|
||||
else:
|
||||
raise ValueError("Invalid metadata filtering mode")
|
||||
if filters:
|
||||
if node_data.metadata_filtering_conditions.logical_operator == "and": # type: ignore
|
||||
if (
|
||||
node_data.metadata_filtering_conditions
|
||||
and node_data.metadata_filtering_conditions.logical_operator == "and"
|
||||
): # type: ignore
|
||||
document_query = document_query.filter(and_(*filters))
|
||||
else:
|
||||
document_query = document_query.filter(or_(*filters))
|
||||
@ -596,7 +602,6 @@ class KnowledgeRetrievalNode(LLMNode):
|
||||
def _get_prompt_template(self, node_data: KnowledgeRetrievalNodeData, metadata_fields: list, query: str):
|
||||
model_mode = ModelMode.value_of(node_data.metadata_model_config.mode) # type: ignore
|
||||
input_text = query
|
||||
memory_str = ""
|
||||
|
||||
prompt_messages: list[LLMNodeChatModelMessage] = []
|
||||
if model_mode == ModelMode.CHAT:
|
||||
|
||||
@ -149,7 +149,10 @@ class ListOperatorNode(BaseNode[ListOperatorNodeData]):
|
||||
def _extract_slice(
|
||||
self, variable: Union[ArrayFileSegment, ArrayNumberSegment, ArrayStringSegment]
|
||||
) -> Union[ArrayFileSegment, ArrayNumberSegment, ArrayStringSegment]:
|
||||
value = int(self.graph_runtime_state.variable_pool.convert_template(self.node_data.extract_by.serial).text) - 1
|
||||
value = int(self.graph_runtime_state.variable_pool.convert_template(self.node_data.extract_by.serial).text)
|
||||
if value < 1:
|
||||
raise ValueError(f"Invalid serial index: must be >= 1, got {value}")
|
||||
value -= 1
|
||||
if len(variable.value) > int(value):
|
||||
result = variable.value[value]
|
||||
else:
|
||||
|
||||
@ -65,6 +65,8 @@ class LLMNodeData(BaseNodeData):
|
||||
memory: Optional[MemoryConfig] = None
|
||||
context: ContextConfig
|
||||
vision: VisionConfig = Field(default_factory=VisionConfig)
|
||||
structured_output: dict | None = None
|
||||
structured_output_enabled: bool = False
|
||||
|
||||
@field_validator("prompt_config", mode="before")
|
||||
@classmethod
|
||||
|
||||
@ -4,6 +4,8 @@ from collections.abc import Generator, Mapping, Sequence
|
||||
from datetime import UTC, datetime
|
||||
from typing import TYPE_CHECKING, Any, Optional, cast
|
||||
|
||||
import json_repair
|
||||
|
||||
from configs import dify_config
|
||||
from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
|
||||
from core.entities.model_entities import ModelStatus
|
||||
@ -22,14 +24,21 @@ from core.model_runtime.entities import (
|
||||
from core.model_runtime.entities.llm_entities import LLMResult, LLMUsage
|
||||
from core.model_runtime.entities.message_entities import (
|
||||
AssistantPromptMessage,
|
||||
PromptMessageContent,
|
||||
PromptMessageContentUnionTypes,
|
||||
PromptMessageRole,
|
||||
SystemPromptMessage,
|
||||
UserPromptMessage,
|
||||
)
|
||||
from core.model_runtime.entities.model_entities import ModelFeature, ModelPropertyKey, ModelType
|
||||
from core.model_runtime.entities.model_entities import (
|
||||
AIModelEntity,
|
||||
ModelFeature,
|
||||
ModelPropertyKey,
|
||||
ModelType,
|
||||
ParameterRule,
|
||||
)
|
||||
from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
|
||||
from core.model_runtime.utils.encoders import jsonable_encoder
|
||||
from core.model_runtime.utils.helper import convert_llm_result_chunk_to_str
|
||||
from core.plugin.entities.plugin import ModelProviderID
|
||||
from core.prompt.entities.advanced_prompt_entities import CompletionModelPromptTemplate, MemoryConfig
|
||||
from core.prompt.utils.prompt_message_util import PromptMessageUtil
|
||||
@ -57,6 +66,12 @@ from core.workflow.nodes.event import (
|
||||
RunRetrieverResourceEvent,
|
||||
RunStreamChunkEvent,
|
||||
)
|
||||
from core.workflow.utils.structured_output.entities import (
|
||||
ResponseFormat,
|
||||
SpecialModelType,
|
||||
SupportStructuredOutputStatus,
|
||||
)
|
||||
from core.workflow.utils.structured_output.prompt import STRUCTURED_OUTPUT_PROMPT
|
||||
from core.workflow.utils.variable_template_parser import VariableTemplateParser
|
||||
from extensions.ext_database import db
|
||||
from models.model import Conversation
|
||||
@ -92,6 +107,12 @@ class LLMNode(BaseNode[LLMNodeData]):
|
||||
_node_type = NodeType.LLM
|
||||
|
||||
def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]:
|
||||
def process_structured_output(text: str) -> Optional[dict[str, Any] | list[Any]]:
|
||||
"""Process structured output if enabled"""
|
||||
if not self.node_data.structured_output_enabled or not self.node_data.structured_output:
|
||||
return None
|
||||
return self._parse_structured_output(text)
|
||||
|
||||
node_inputs: Optional[dict[str, Any]] = None
|
||||
process_data = None
|
||||
result_text = ""
|
||||
@ -130,7 +151,6 @@ class LLMNode(BaseNode[LLMNodeData]):
|
||||
if isinstance(event, RunRetrieverResourceEvent):
|
||||
context = event.context
|
||||
yield event
|
||||
|
||||
if context:
|
||||
node_inputs["#context#"] = context
|
||||
|
||||
@ -192,7 +212,9 @@ class LLMNode(BaseNode[LLMNodeData]):
|
||||
self.deduct_llm_quota(tenant_id=self.tenant_id, model_instance=model_instance, usage=usage)
|
||||
break
|
||||
outputs = {"text": result_text, "usage": jsonable_encoder(usage), "finish_reason": finish_reason}
|
||||
|
||||
structured_output = process_structured_output(result_text)
|
||||
if structured_output:
|
||||
outputs["structured_output"] = structured_output
|
||||
yield RunCompletedEvent(
|
||||
run_result=NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
@ -248,18 +270,7 @@ class LLMNode(BaseNode[LLMNodeData]):
|
||||
|
||||
def _handle_invoke_result(self, invoke_result: LLMResult | Generator) -> Generator[NodeEvent, None, None]:
|
||||
if isinstance(invoke_result, LLMResult):
|
||||
content = invoke_result.message.content
|
||||
if content is None:
|
||||
message_text = ""
|
||||
elif isinstance(content, str):
|
||||
message_text = content
|
||||
elif isinstance(content, list):
|
||||
# Assuming the list contains PromptMessageContent objects with a "data" attribute
|
||||
message_text = "".join(
|
||||
item.data if hasattr(item, "data") and isinstance(item.data, str) else str(item) for item in content
|
||||
)
|
||||
else:
|
||||
message_text = str(content)
|
||||
message_text = convert_llm_result_chunk_to_str(invoke_result.message.content)
|
||||
|
||||
yield ModelInvokeCompletedEvent(
|
||||
text=message_text,
|
||||
@ -274,7 +285,7 @@ class LLMNode(BaseNode[LLMNodeData]):
|
||||
usage = None
|
||||
finish_reason = None
|
||||
for result in invoke_result:
|
||||
text = result.delta.message.content
|
||||
text = convert_llm_result_chunk_to_str(result.delta.message.content)
|
||||
full_text += text
|
||||
|
||||
yield RunStreamChunkEvent(chunk_content=text, from_variable_selector=[self.node_id, "text"])
|
||||
@ -513,7 +524,12 @@ class LLMNode(BaseNode[LLMNodeData]):
|
||||
|
||||
if not model_schema:
|
||||
raise ModelNotExistError(f"Model {model_name} not exist.")
|
||||
|
||||
support_structured_output = self._check_model_structured_output_support()
|
||||
if support_structured_output == SupportStructuredOutputStatus.SUPPORTED:
|
||||
completion_params = self._handle_native_json_schema(completion_params, model_schema.parameter_rules)
|
||||
elif support_structured_output == SupportStructuredOutputStatus.UNSUPPORTED:
|
||||
# Set appropriate response format based on model capabilities
|
||||
self._set_response_format(completion_params, model_schema.parameter_rules)
|
||||
return model_instance, ModelConfigWithCredentialsEntity(
|
||||
provider=provider_name,
|
||||
model=model_name,
|
||||
@ -568,8 +584,7 @@ class LLMNode(BaseNode[LLMNodeData]):
|
||||
variable_pool: VariablePool,
|
||||
jinja2_variables: Sequence[VariableSelector],
|
||||
) -> tuple[Sequence[PromptMessage], Optional[Sequence[str]]]:
|
||||
# FIXME: fix the type error cause prompt_messages is type quick a few times
|
||||
prompt_messages: list[Any] = []
|
||||
prompt_messages: list[PromptMessage] = []
|
||||
|
||||
if isinstance(prompt_template, list):
|
||||
# For chat model
|
||||
@ -631,12 +646,14 @@ class LLMNode(BaseNode[LLMNodeData]):
|
||||
# For issue #11247 - Check if prompt content is a string or a list
|
||||
prompt_content_type = type(prompt_content)
|
||||
if prompt_content_type == str:
|
||||
prompt_content = str(prompt_content)
|
||||
if "#histories#" in prompt_content:
|
||||
prompt_content = prompt_content.replace("#histories#", memory_text)
|
||||
else:
|
||||
prompt_content = memory_text + "\n" + prompt_content
|
||||
prompt_messages[0].content = prompt_content
|
||||
elif prompt_content_type == list:
|
||||
prompt_content = prompt_content if isinstance(prompt_content, list) else []
|
||||
for content_item in prompt_content:
|
||||
if content_item.type == PromptMessageContentType.TEXT:
|
||||
if "#histories#" in content_item.data:
|
||||
@ -649,9 +666,10 @@ class LLMNode(BaseNode[LLMNodeData]):
|
||||
# Add current query to the prompt message
|
||||
if sys_query:
|
||||
if prompt_content_type == str:
|
||||
prompt_content = prompt_messages[0].content.replace("#sys.query#", sys_query)
|
||||
prompt_content = str(prompt_messages[0].content).replace("#sys.query#", sys_query)
|
||||
prompt_messages[0].content = prompt_content
|
||||
elif prompt_content_type == list:
|
||||
prompt_content = prompt_content if isinstance(prompt_content, list) else []
|
||||
for content_item in prompt_content:
|
||||
if content_item.type == PromptMessageContentType.TEXT:
|
||||
content_item.data = sys_query + "\n" + content_item.data
|
||||
@ -681,7 +699,7 @@ class LLMNode(BaseNode[LLMNodeData]):
|
||||
filtered_prompt_messages = []
|
||||
for prompt_message in prompt_messages:
|
||||
if isinstance(prompt_message.content, list):
|
||||
prompt_message_content = []
|
||||
prompt_message_content: list[PromptMessageContentUnionTypes] = []
|
||||
for content_item in prompt_message.content:
|
||||
# Skip content if features are not defined
|
||||
if not model_config.model_schema.features:
|
||||
@ -724,10 +742,29 @@ class LLMNode(BaseNode[LLMNodeData]):
|
||||
"No prompt found in the LLM configuration. "
|
||||
"Please ensure a prompt is properly configured before proceeding."
|
||||
)
|
||||
|
||||
support_structured_output = self._check_model_structured_output_support()
|
||||
if support_structured_output == SupportStructuredOutputStatus.UNSUPPORTED:
|
||||
filtered_prompt_messages = self._handle_prompt_based_schema(
|
||||
prompt_messages=filtered_prompt_messages,
|
||||
)
|
||||
stop = model_config.stop
|
||||
return filtered_prompt_messages, stop
|
||||
|
||||
def _parse_structured_output(self, result_text: str) -> dict[str, Any] | list[Any]:
|
||||
structured_output: dict[str, Any] | list[Any] = {}
|
||||
try:
|
||||
parsed = json.loads(result_text)
|
||||
if not isinstance(parsed, (dict | list)):
|
||||
raise LLMNodeError(f"Failed to parse structured output: {result_text}")
|
||||
structured_output = parsed
|
||||
except json.JSONDecodeError as e:
|
||||
# if the result_text is not a valid json, try to repair it
|
||||
parsed = json_repair.loads(result_text)
|
||||
if not isinstance(parsed, (dict | list)):
|
||||
raise LLMNodeError(f"Failed to parse structured output: {result_text}")
|
||||
structured_output = parsed
|
||||
return structured_output
|
||||
|
||||
@classmethod
|
||||
def deduct_llm_quota(cls, tenant_id: str, model_instance: ModelInstance, usage: LLMUsage) -> None:
|
||||
provider_model_bundle = model_instance.provider_model_bundle
|
||||
@ -926,8 +963,170 @@ class LLMNode(BaseNode[LLMNodeData]):
|
||||
|
||||
return prompt_messages
|
||||
|
||||
def _handle_native_json_schema(self, model_parameters: dict, rules: list[ParameterRule]) -> dict:
|
||||
"""
|
||||
Handle structured output for models with native JSON schema support.
|
||||
|
||||
def _combine_message_content_with_role(*, contents: Sequence[PromptMessageContent], role: PromptMessageRole):
|
||||
:param model_parameters: Model parameters to update
|
||||
:param rules: Model parameter rules
|
||||
:return: Updated model parameters with JSON schema configuration
|
||||
"""
|
||||
# Process schema according to model requirements
|
||||
schema = self._fetch_structured_output_schema()
|
||||
schema_json = self._prepare_schema_for_model(schema)
|
||||
|
||||
# Set JSON schema in parameters
|
||||
model_parameters["json_schema"] = json.dumps(schema_json, ensure_ascii=False)
|
||||
|
||||
# Set appropriate response format if required by the model
|
||||
for rule in rules:
|
||||
if rule.name == "response_format" and ResponseFormat.JSON_SCHEMA.value in rule.options:
|
||||
model_parameters["response_format"] = ResponseFormat.JSON_SCHEMA.value
|
||||
|
||||
return model_parameters
|
||||
|
||||
def _handle_prompt_based_schema(self, prompt_messages: Sequence[PromptMessage]) -> list[PromptMessage]:
|
||||
"""
|
||||
Handle structured output for models without native JSON schema support.
|
||||
This function modifies the prompt messages to include schema-based output requirements.
|
||||
|
||||
Args:
|
||||
prompt_messages: Original sequence of prompt messages
|
||||
|
||||
Returns:
|
||||
list[PromptMessage]: Updated prompt messages with structured output requirements
|
||||
"""
|
||||
# Convert schema to string format
|
||||
schema_str = json.dumps(self._fetch_structured_output_schema(), ensure_ascii=False)
|
||||
|
||||
# Find existing system prompt with schema placeholder
|
||||
system_prompt = next(
|
||||
(prompt for prompt in prompt_messages if isinstance(prompt, SystemPromptMessage)),
|
||||
None,
|
||||
)
|
||||
structured_output_prompt = STRUCTURED_OUTPUT_PROMPT.replace("{{schema}}", schema_str)
|
||||
# Prepare system prompt content
|
||||
system_prompt_content = (
|
||||
structured_output_prompt + "\n\n" + system_prompt.content
|
||||
if system_prompt and isinstance(system_prompt.content, str)
|
||||
else structured_output_prompt
|
||||
)
|
||||
system_prompt = SystemPromptMessage(content=system_prompt_content)
|
||||
|
||||
# Extract content from the last user message
|
||||
|
||||
filtered_prompts = [prompt for prompt in prompt_messages if not isinstance(prompt, SystemPromptMessage)]
|
||||
updated_prompt = [system_prompt] + filtered_prompts
|
||||
|
||||
return updated_prompt
|
||||
|
||||
def _set_response_format(self, model_parameters: dict, rules: list) -> None:
|
||||
"""
|
||||
Set the appropriate response format parameter based on model rules.
|
||||
|
||||
:param model_parameters: Model parameters to update
|
||||
:param rules: Model parameter rules
|
||||
"""
|
||||
for rule in rules:
|
||||
if rule.name == "response_format":
|
||||
if ResponseFormat.JSON.value in rule.options:
|
||||
model_parameters["response_format"] = ResponseFormat.JSON.value
|
||||
elif ResponseFormat.JSON_OBJECT.value in rule.options:
|
||||
model_parameters["response_format"] = ResponseFormat.JSON_OBJECT.value
|
||||
|
||||
def _prepare_schema_for_model(self, schema: dict) -> dict:
|
||||
"""
|
||||
Prepare JSON schema based on model requirements.
|
||||
|
||||
Different models have different requirements for JSON schema formatting.
|
||||
This function handles these differences.
|
||||
|
||||
:param schema: The original JSON schema
|
||||
:return: Processed schema compatible with the current model
|
||||
"""
|
||||
|
||||
# Deep copy to avoid modifying the original schema
|
||||
processed_schema = schema.copy()
|
||||
|
||||
# Convert boolean types to string types (common requirement)
|
||||
convert_boolean_to_string(processed_schema)
|
||||
|
||||
# Apply model-specific transformations
|
||||
if SpecialModelType.GEMINI in self.node_data.model.name:
|
||||
remove_additional_properties(processed_schema)
|
||||
return processed_schema
|
||||
elif SpecialModelType.OLLAMA in self.node_data.model.provider:
|
||||
return processed_schema
|
||||
else:
|
||||
# Default format with name field
|
||||
return {"schema": processed_schema, "name": "llm_response"}
|
||||
|
||||
def _fetch_model_schema(self, provider: str) -> AIModelEntity | None:
|
||||
"""
|
||||
Fetch model schema
|
||||
"""
|
||||
model_name = self.node_data.model.name
|
||||
model_manager = ModelManager()
|
||||
model_instance = model_manager.get_model_instance(
|
||||
tenant_id=self.tenant_id, model_type=ModelType.LLM, provider=provider, model=model_name
|
||||
)
|
||||
model_type_instance = model_instance.model_type_instance
|
||||
model_type_instance = cast(LargeLanguageModel, model_type_instance)
|
||||
model_credentials = model_instance.credentials
|
||||
model_schema = model_type_instance.get_model_schema(model_name, model_credentials)
|
||||
return model_schema
|
||||
|
||||
def _fetch_structured_output_schema(self) -> dict[str, Any]:
|
||||
"""
|
||||
Fetch the structured output schema from the node data.
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: The structured output schema
|
||||
"""
|
||||
if not self.node_data.structured_output:
|
||||
raise LLMNodeError("Please provide a valid structured output schema")
|
||||
structured_output_schema = json.dumps(self.node_data.structured_output.get("schema", {}), ensure_ascii=False)
|
||||
if not structured_output_schema:
|
||||
raise LLMNodeError("Please provide a valid structured output schema")
|
||||
|
||||
try:
|
||||
schema = json.loads(structured_output_schema)
|
||||
if not isinstance(schema, dict):
|
||||
raise LLMNodeError("structured_output_schema must be a JSON object")
|
||||
return schema
|
||||
except json.JSONDecodeError:
|
||||
raise LLMNodeError("structured_output_schema is not valid JSON format")
|
||||
|
||||
def _check_model_structured_output_support(self) -> SupportStructuredOutputStatus:
|
||||
"""
|
||||
Check if the current model supports structured output.
|
||||
|
||||
Returns:
|
||||
SupportStructuredOutput: The support status of structured output
|
||||
"""
|
||||
# Early return if structured output is disabled
|
||||
if (
|
||||
not isinstance(self.node_data, LLMNodeData)
|
||||
or not self.node_data.structured_output_enabled
|
||||
or not self.node_data.structured_output
|
||||
):
|
||||
return SupportStructuredOutputStatus.DISABLED
|
||||
# Get model schema and check if it exists
|
||||
model_schema = self._fetch_model_schema(self.node_data.model.provider)
|
||||
if not model_schema:
|
||||
return SupportStructuredOutputStatus.DISABLED
|
||||
|
||||
# Check if model supports structured output feature
|
||||
return (
|
||||
SupportStructuredOutputStatus.SUPPORTED
|
||||
if bool(model_schema.features and ModelFeature.STRUCTURED_OUTPUT in model_schema.features)
|
||||
else SupportStructuredOutputStatus.UNSUPPORTED
|
||||
)
|
||||
|
||||
|
||||
def _combine_message_content_with_role(
|
||||
*, contents: Optional[str | list[PromptMessageContentUnionTypes]] = None, role: PromptMessageRole
|
||||
):
|
||||
match role:
|
||||
case PromptMessageRole.USER:
|
||||
return UserPromptMessage(content=contents)
|
||||
@ -1064,3 +1263,49 @@ def _handle_completion_template(
|
||||
)
|
||||
prompt_messages.append(prompt_message)
|
||||
return prompt_messages
|
||||
|
||||
|
||||
def remove_additional_properties(schema: dict) -> None:
|
||||
"""
|
||||
Remove additionalProperties fields from JSON schema.
|
||||
Used for models like Gemini that don't support this property.
|
||||
|
||||
:param schema: JSON schema to modify in-place
|
||||
"""
|
||||
if not isinstance(schema, dict):
|
||||
return
|
||||
|
||||
# Remove additionalProperties at current level
|
||||
schema.pop("additionalProperties", None)
|
||||
|
||||
# Process nested structures recursively
|
||||
for value in schema.values():
|
||||
if isinstance(value, dict):
|
||||
remove_additional_properties(value)
|
||||
elif isinstance(value, list):
|
||||
for item in value:
|
||||
if isinstance(item, dict):
|
||||
remove_additional_properties(item)
|
||||
|
||||
|
||||
def convert_boolean_to_string(schema: dict) -> None:
|
||||
"""
|
||||
Convert boolean type specifications to string in JSON schema.
|
||||
|
||||
:param schema: JSON schema to modify in-place
|
||||
"""
|
||||
if not isinstance(schema, dict):
|
||||
return
|
||||
|
||||
# Check for boolean type at current level
|
||||
if schema.get("type") == "boolean":
|
||||
schema["type"] = "string"
|
||||
|
||||
# Process nested dictionaries and lists recursively
|
||||
for value in schema.values():
|
||||
if isinstance(value, dict):
|
||||
convert_boolean_to_string(value)
|
||||
elif isinstance(value, list):
|
||||
for item in value:
|
||||
if isinstance(item, dict):
|
||||
convert_boolean_to_string(item)
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from collections.abc import Mapping, Sequence
|
||||
from typing import Any, Optional, cast
|
||||
@ -58,6 +59,30 @@ from .prompts import (
|
||||
FUNCTION_CALLING_EXTRACTOR_USER_TEMPLATE,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def extract_json(text):
|
||||
"""
|
||||
From a given JSON started from '{' or '[' extract the complete JSON object.
|
||||
"""
|
||||
stack = []
|
||||
for i, c in enumerate(text):
|
||||
if c in {"{", "["}:
|
||||
stack.append(c)
|
||||
elif c in {"}", "]"}:
|
||||
# check if stack is empty
|
||||
if not stack:
|
||||
return text[:i]
|
||||
# check if the last element in stack is matching
|
||||
if (c == "}" and stack[-1] == "{") or (c == "]" and stack[-1] == "["):
|
||||
stack.pop()
|
||||
if not stack:
|
||||
return text[: i + 1]
|
||||
else:
|
||||
return text[:i]
|
||||
return None
|
||||
|
||||
|
||||
class ParameterExtractorNode(LLMNode):
|
||||
"""
|
||||
@ -161,6 +186,8 @@ class ParameterExtractorNode(LLMNode):
|
||||
"usage": None,
|
||||
"function": {} if not prompt_message_tools else jsonable_encoder(prompt_message_tools[0]),
|
||||
"tool_call": None,
|
||||
"model_provider": model_config.provider,
|
||||
"model_name": model_config.model,
|
||||
}
|
||||
|
||||
try:
|
||||
@ -594,27 +621,6 @@ class ParameterExtractorNode(LLMNode):
|
||||
Extract complete json response.
|
||||
"""
|
||||
|
||||
def extract_json(text):
|
||||
"""
|
||||
From a given JSON started from '{' or '[' extract the complete JSON object.
|
||||
"""
|
||||
stack = []
|
||||
for i, c in enumerate(text):
|
||||
if c in {"{", "["}:
|
||||
stack.append(c)
|
||||
elif c in {"}", "]"}:
|
||||
# check if stack is empty
|
||||
if not stack:
|
||||
return text[:i]
|
||||
# check if the last element in stack is matching
|
||||
if (c == "}" and stack[-1] == "{") or (c == "]" and stack[-1] == "["):
|
||||
stack.pop()
|
||||
if not stack:
|
||||
return text[: i + 1]
|
||||
else:
|
||||
return text[:i]
|
||||
return None
|
||||
|
||||
# extract json from the text
|
||||
for idx in range(len(result)):
|
||||
if result[idx] == "{" or result[idx] == "[":
|
||||
@ -624,6 +630,7 @@ class ParameterExtractorNode(LLMNode):
|
||||
return cast(dict, json.loads(json_str))
|
||||
except Exception:
|
||||
pass
|
||||
logger.info(f"extra error: {result}")
|
||||
return None
|
||||
|
||||
def _extract_json_from_tool_call(self, tool_call: AssistantPromptMessage.ToolCall) -> Optional[dict]:
|
||||
@ -633,7 +640,18 @@ class ParameterExtractorNode(LLMNode):
|
||||
if not tool_call or not tool_call.function.arguments:
|
||||
return None
|
||||
|
||||
return cast(dict, json.loads(tool_call.function.arguments))
|
||||
result = tool_call.function.arguments
|
||||
# extract json from the arguments
|
||||
for idx in range(len(result)):
|
||||
if result[idx] == "{" or result[idx] == "[":
|
||||
json_str = extract_json(result[idx:])
|
||||
if json_str:
|
||||
try:
|
||||
return cast(dict, json.loads(json_str))
|
||||
except Exception:
|
||||
pass
|
||||
logger.info(f"extra error: {result}")
|
||||
return None
|
||||
|
||||
def _generate_default_result(self, data: ParameterExtractorNodeData) -> dict:
|
||||
"""
|
||||
|
||||
@ -130,6 +130,8 @@ class QuestionClassifierNode(LLMNode):
|
||||
),
|
||||
"usage": jsonable_encoder(usage),
|
||||
"finish_reason": finish_reason,
|
||||
"model_provider": model_config.provider,
|
||||
"model_name": model_config.model,
|
||||
}
|
||||
outputs = {"class_name": category_name, "class_id": category_id}
|
||||
|
||||
|
||||
@ -1,21 +1,21 @@
|
||||
QUESTION_CLASSIFIER_SYSTEM_PROMPT = """
|
||||
### Job Description',
|
||||
You are a text classification engine that analyzes text data and assigns categories based on user input or automatically determined categories.
|
||||
### Task
|
||||
Your task is to assign one categories ONLY to the input text and only one category may be assigned returned in the output. Additionally, you need to extract the key words from the text that are related to the classification.
|
||||
### Format
|
||||
The input text is in the variable input_text. Categories are specified as a category list with two filed category_id and category_name in the variable categories. Classification instructions may be included to improve the classification accuracy.
|
||||
### Constraint
|
||||
DO NOT include anything other than the JSON array in your response.
|
||||
### Memory
|
||||
Here are the chat histories between human and assistant, inside <histories></histories> XML tags.
|
||||
<histories>
|
||||
{histories}
|
||||
</histories>
|
||||
### Job Description',
|
||||
You are a text classification engine that analyzes text data and assigns categories based on user input or automatically determined categories.
|
||||
### Task
|
||||
Your task is to assign one categories ONLY to the input text and only one category may be assigned returned in the output. Additionally, you need to extract the key words from the text that are related to the classification.
|
||||
### Format
|
||||
The input text is in the variable input_text. Categories are specified as a category list with two filed category_id and category_name in the variable categories. Classification instructions may be included to improve the classification accuracy.
|
||||
### Constraint
|
||||
DO NOT include anything other than the JSON array in your response.
|
||||
### Memory
|
||||
Here are the chat histories between human and assistant, inside <histories></histories> XML tags.
|
||||
<histories>
|
||||
{histories}
|
||||
</histories>
|
||||
""" # noqa: E501
|
||||
|
||||
QUESTION_CLASSIFIER_USER_PROMPT_1 = """
|
||||
{ "input_text": ["I recently had a great experience with your company. The service was prompt and the staff was very friendly."],
|
||||
{"input_text": ["I recently had a great experience with your company. The service was prompt and the staff was very friendly."],
|
||||
"categories": [{"category_id":"f5660049-284f-41a7-b301-fd24176a711c","category_name":"Customer Service"},{"category_id":"8d007d06-f2c9-4be5-8ff6-cd4381c13c60","category_name":"Satisfaction"},{"category_id":"5fbbbb18-9843-466d-9b8e-b9bfbb9482c8","category_name":"Sales"},{"category_id":"23623c75-7184-4a2e-8226-466c2e4631e4","category_name":"Product"}],
|
||||
"classification_instructions": ["classify the text based on the feedback provided by customer"]}
|
||||
""" # noqa: E501
|
||||
@ -43,9 +43,9 @@ QUESTION_CLASSIFIER_ASSISTANT_PROMPT_2 = """
|
||||
"""
|
||||
|
||||
QUESTION_CLASSIFIER_USER_PROMPT_3 = """
|
||||
'{{"input_text": ["{input_text}"],',
|
||||
'"categories": {categories}, ',
|
||||
'"classification_instructions": ["{classification_instructions}"]}}'
|
||||
{{"input_text": ["{input_text}"],
|
||||
"categories": {categories},
|
||||
"classification_instructions": ["{classification_instructions}"]}}
|
||||
"""
|
||||
|
||||
QUESTION_CLASSIFIER_COMPLETION_PROMPT = """
|
||||
|
||||
@ -6,8 +6,8 @@ from sqlalchemy.orm import Session
|
||||
|
||||
from core.callback_handler.workflow_tool_callback_handler import DifyWorkflowCallbackHandler
|
||||
from core.file import File, FileTransferMethod
|
||||
from core.plugin.manager.exc import PluginDaemonClientSideError
|
||||
from core.plugin.manager.plugin import PluginInstallationManager
|
||||
from core.plugin.impl.exc import PluginDaemonClientSideError
|
||||
from core.plugin.impl.plugin import PluginInstaller
|
||||
from core.tools.entities.tool_entities import ToolInvokeMessage, ToolParameter
|
||||
from core.tools.errors import ToolInvokeError
|
||||
from core.tools.tool_engine import ToolEngine
|
||||
@ -73,7 +73,7 @@ class ToolNode(BaseNode[ToolNodeData]):
|
||||
metadata={NodeRunMetadataKey.TOOL_INFO: tool_info},
|
||||
error=f"Failed to get tool runtime: {str(e)}",
|
||||
error_type=type(e).__name__,
|
||||
)
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
@ -307,7 +307,7 @@ class ToolNode(BaseNode[ToolNodeData]):
|
||||
icon = tool_info.get("icon", "")
|
||||
dict_metadata = dict(message.message.metadata)
|
||||
if dict_metadata.get("provider"):
|
||||
manager = PluginInstallationManager()
|
||||
manager = PluginInstaller()
|
||||
plugins = manager.list_plugins(self.tenant_id)
|
||||
try:
|
||||
current_plugin = next(
|
||||
|
||||
24
api/core/workflow/utils/structured_output/entities.py
Normal file
24
api/core/workflow/utils/structured_output/entities.py
Normal file
@ -0,0 +1,24 @@
|
||||
from enum import StrEnum
|
||||
|
||||
|
||||
class ResponseFormat(StrEnum):
|
||||
"""Constants for model response formats"""
|
||||
|
||||
JSON_SCHEMA = "json_schema" # model's structured output mode. some model like gemini, gpt-4o, support this mode.
|
||||
JSON = "JSON" # model's json mode. some model like claude support this mode.
|
||||
JSON_OBJECT = "json_object" # json mode's another alias. some model like deepseek-chat, qwen use this alias.
|
||||
|
||||
|
||||
class SpecialModelType(StrEnum):
|
||||
"""Constants for identifying model types"""
|
||||
|
||||
GEMINI = "gemini"
|
||||
OLLAMA = "ollama"
|
||||
|
||||
|
||||
class SupportStructuredOutputStatus(StrEnum):
|
||||
"""Constants for structured output support status"""
|
||||
|
||||
SUPPORTED = "supported"
|
||||
UNSUPPORTED = "unsupported"
|
||||
DISABLED = "disabled"
|
||||
17
api/core/workflow/utils/structured_output/prompt.py
Normal file
17
api/core/workflow/utils/structured_output/prompt.py
Normal file
@ -0,0 +1,17 @@
|
||||
STRUCTURED_OUTPUT_PROMPT = """You’re a helpful AI assistant. You could answer questions and output in JSON format.
|
||||
constraints:
|
||||
- You must output in JSON format.
|
||||
- Do not output boolean value, use string type instead.
|
||||
- Do not output integer or float value, use number type instead.
|
||||
eg:
|
||||
Here is the JSON schema:
|
||||
{"additionalProperties": false, "properties": {"age": {"type": "number"}, "name": {"type": "string"}}, "required": ["name", "age"], "type": "object"}
|
||||
|
||||
Here is the user's question:
|
||||
My name is John Doe and I am 30 years old.
|
||||
|
||||
output:
|
||||
{"name": "John Doe", "age": 30}
|
||||
Here is the JSON schema:
|
||||
{{schema}}
|
||||
""" # noqa: E501
|
||||
Reference in New Issue
Block a user