feat: implement file structured output

This commit is contained in:
Stream
2026-02-05 00:10:51 +08:00
parent 10fb482351
commit 15c0011897
7 changed files with 492 additions and 680 deletions

View File

@ -1,188 +1,203 @@
"""
File reference detection and conversion for structured output.
File path detection and conversion for structured output.
This module provides utilities to:
1. Detect file reference fields in JSON Schema (format: "dify-file-ref")
2. Convert file ID strings to File objects after LLM returns
1. Detect sandbox file path fields in JSON Schema (format: "file-path")
2. Adapt schemas to add file-path descriptions before model invocation
3. Convert sandbox file path strings into File objects via a resolver
"""
import uuid
from collections.abc import Mapping
from typing import Any
from collections.abc import Callable, Mapping, Sequence
from typing import Any, cast
from core.file import File
from core.variables.segments import ArrayFileSegment, FileSegment
from factories.file_factory import build_from_mapping
FILE_REF_FORMAT = "dify-file-ref"
FILE_PATH_FORMAT = "file-path"
FILE_PATH_DESCRIPTION_SUFFIX = "this field contains a file path from the Dify sandbox"
def is_file_ref_property(schema: dict) -> bool:
"""Check if a schema property is a file reference."""
return schema.get("type") == "string" and schema.get("format") == FILE_REF_FORMAT
def is_file_path_property(schema: Mapping[str, Any]) -> bool:
"""Check if a schema property represents a sandbox file path."""
if schema.get("type") != "string":
return False
format_value = schema.get("format")
if not isinstance(format_value, str):
return False
normalized_format = format_value.lower().replace("_", "-")
return normalized_format == FILE_PATH_FORMAT
def detect_file_ref_fields(schema: Mapping[str, Any], path: str = "") -> list[str]:
"""
Recursively detect file reference fields in schema.
Args:
schema: JSON Schema to analyze
path: Current path in the schema (used for recursion)
Returns:
List of JSON paths containing file refs, e.g., ["image_id", "files[*]"]
"""
file_ref_paths: list[str] = []
def detect_file_path_fields(schema: Mapping[str, Any], path: str = "") -> list[str]:
"""Recursively detect file path fields in a JSON schema."""
file_path_fields: list[str] = []
schema_type = schema.get("type")
if schema_type == "object":
for prop_name, prop_schema in schema.get("properties", {}).items():
current_path = f"{path}.{prop_name}" if path else prop_name
properties = schema.get("properties")
if isinstance(properties, Mapping):
properties_mapping = cast(Mapping[str, Any], properties)
for prop_name, prop_schema in properties_mapping.items():
if not isinstance(prop_schema, Mapping):
continue
prop_schema_mapping = cast(Mapping[str, Any], prop_schema)
current_path = f"{path}.{prop_name}" if path else prop_name
if is_file_ref_property(prop_schema):
file_ref_paths.append(current_path)
elif isinstance(prop_schema, dict):
file_ref_paths.extend(detect_file_ref_fields(prop_schema, current_path))
if is_file_path_property(prop_schema_mapping):
file_path_fields.append(current_path)
else:
file_path_fields.extend(detect_file_path_fields(prop_schema_mapping, current_path))
elif schema_type == "array":
items_schema = schema.get("items", {})
items_schema = schema.get("items")
if not isinstance(items_schema, Mapping):
return file_path_fields
items_schema_mapping = cast(Mapping[str, Any], items_schema)
array_path = f"{path}[*]" if path else "[*]"
if is_file_ref_property(items_schema):
file_ref_paths.append(array_path)
elif isinstance(items_schema, dict):
file_ref_paths.extend(detect_file_ref_fields(items_schema, array_path))
return file_ref_paths
def convert_file_refs_in_output(
output: Mapping[str, Any],
json_schema: Mapping[str, Any],
tenant_id: str,
) -> dict[str, Any]:
"""
Convert file ID strings to File objects based on schema.
Args:
output: The structured_output from LLM result
json_schema: The original JSON schema (to detect file ref fields)
tenant_id: Tenant ID for file lookup
Returns:
Output with file references converted to File objects
"""
file_ref_paths = detect_file_ref_fields(json_schema)
if not file_ref_paths:
return dict(output)
result = _deep_copy_dict(output)
for path in file_ref_paths:
_convert_path_in_place(result, path.split("."), tenant_id)
return result
def _deep_copy_dict(obj: Mapping[str, Any]) -> dict[str, Any]:
"""Deep copy a mapping to a mutable dict."""
result: dict[str, Any] = {}
for key, value in obj.items():
if isinstance(value, Mapping):
result[key] = _deep_copy_dict(value)
elif isinstance(value, list):
result[key] = [_deep_copy_dict(item) if isinstance(item, Mapping) else item for item in value]
if is_file_path_property(items_schema_mapping):
file_path_fields.append(array_path)
else:
result[key] = value
return result
file_path_fields.extend(detect_file_path_fields(items_schema_mapping, array_path))
return file_path_fields
def _convert_path_in_place(obj: dict, path_parts: list[str], tenant_id: str) -> None:
"""Convert file refs at the given path in place, wrapping in Segment types."""
def adapt_schema_for_sandbox_file_paths(schema: Mapping[str, Any]) -> tuple[dict[str, Any], list[str]]:
"""Normalize sandbox file path fields and collect their JSON paths."""
result = _deep_copy_value(schema)
if not isinstance(result, dict):
raise ValueError("structured_output_schema must be a JSON object")
result_dict = cast(dict[str, Any], result)
file_path_fields: list[str] = []
_adapt_schema_in_place(result_dict, path="", file_path_fields=file_path_fields)
return result_dict, file_path_fields
def convert_sandbox_file_paths_in_output(
output: Mapping[str, Any],
file_path_fields: Sequence[str],
file_resolver: Callable[[str], File],
) -> tuple[dict[str, Any], list[File]]:
"""Convert sandbox file paths into File objects using the resolver."""
if not file_path_fields:
return dict(output), []
result = _deep_copy_value(output)
if not isinstance(result, dict):
raise ValueError("Structured output must be a JSON object")
result_dict = cast(dict[str, Any], result)
files: list[File] = []
for path in file_path_fields:
_convert_path_in_place(result_dict, path.split("."), file_resolver, files)
return result_dict, files
def _adapt_schema_in_place(schema: dict[str, Any], path: str, file_path_fields: list[str]) -> None:
schema_type = schema.get("type")
if schema_type == "object":
properties = schema.get("properties")
if isinstance(properties, Mapping):
properties_mapping = cast(Mapping[str, Any], properties)
for prop_name, prop_schema in properties_mapping.items():
if not isinstance(prop_schema, dict):
continue
prop_schema_dict = cast(dict[str, Any], prop_schema)
current_path = f"{path}.{prop_name}" if path else prop_name
if is_file_path_property(prop_schema_dict):
_normalize_file_path_schema(prop_schema_dict)
file_path_fields.append(current_path)
else:
_adapt_schema_in_place(prop_schema_dict, current_path, file_path_fields)
elif schema_type == "array":
items_schema = schema.get("items")
if not isinstance(items_schema, dict):
return
items_schema_dict = cast(dict[str, Any], items_schema)
array_path = f"{path}[*]" if path else "[*]"
if is_file_path_property(items_schema_dict):
_normalize_file_path_schema(items_schema_dict)
file_path_fields.append(array_path)
else:
_adapt_schema_in_place(items_schema_dict, array_path, file_path_fields)
def _normalize_file_path_schema(schema: dict[str, Any]) -> None:
schema["type"] = "string"
schema["format"] = FILE_PATH_FORMAT
description = schema.get("description", "")
if description:
if FILE_PATH_DESCRIPTION_SUFFIX not in description:
schema["description"] = f"{description}\n{FILE_PATH_DESCRIPTION_SUFFIX}"
else:
schema["description"] = FILE_PATH_DESCRIPTION_SUFFIX
def _deep_copy_value(value: Any) -> Any:
if isinstance(value, Mapping):
mapping = cast(Mapping[str, Any], value)
return {key: _deep_copy_value(item) for key, item in mapping.items()}
if isinstance(value, list):
list_value = cast(list[Any], value)
return [_deep_copy_value(item) for item in list_value]
return value
def _convert_path_in_place(
obj: dict[str, Any],
path_parts: list[str],
file_resolver: Callable[[str], File],
files: list[File],
) -> None:
if not path_parts:
return
current = path_parts[0]
remaining = path_parts[1:]
# Handle array notation like "files[*]"
if current.endswith("[*]"):
key = current[:-3] if current != "[*]" else None
target = obj.get(key) if key else obj
key = current[:-3] if current != "[*]" else ""
target_value = obj.get(key) if key else obj
if isinstance(target, list):
if isinstance(target_value, list):
target_list = cast(list[Any], target_value)
if remaining:
# Nested array with remaining path - recurse into each item
for item in target:
for item in target_list:
if isinstance(item, dict):
_convert_path_in_place(item, remaining, tenant_id)
item_dict = cast(dict[str, Any], item)
_convert_path_in_place(item_dict, remaining, file_resolver, files)
else:
# Array of file IDs - convert all and wrap in ArrayFileSegment
files: list[File] = []
for item in target:
file = _convert_file_id(item, tenant_id)
if file is not None:
files.append(file)
# Replace the array with ArrayFileSegment
resolved_files: list[File] = []
for item in target_list:
if not isinstance(item, str):
raise ValueError("File path must be a string")
file = file_resolver(item)
files.append(file)
resolved_files.append(file)
if key:
obj[key] = ArrayFileSegment(value=files)
obj[key] = ArrayFileSegment(value=resolved_files)
return
if not remaining:
# Leaf node - convert the value and wrap in FileSegment
if current in obj:
file = _convert_file_id(obj[current], tenant_id)
if file is not None:
obj[current] = FileSegment(value=file)
else:
obj[current] = None
else:
# Recurse into nested object
if current in obj and isinstance(obj[current], dict):
_convert_path_in_place(obj[current], remaining, tenant_id)
if current not in obj:
return
value = obj[current]
if value is None:
obj[current] = None
return
if not isinstance(value, str):
raise ValueError("File path must be a string")
file = file_resolver(value)
files.append(file)
obj[current] = FileSegment(value=file)
return
def _convert_file_id(file_id: Any, tenant_id: str) -> File | None:
"""
Convert a file ID string to a File object.
Tries multiple file sources in order:
1. ToolFile (files generated by tools/workflows)
2. UploadFile (files uploaded by users)
"""
if not isinstance(file_id, str):
return None
# Validate UUID format
try:
uuid.UUID(file_id)
except ValueError:
return None
# Try ToolFile first (files generated by tools/workflows)
try:
return build_from_mapping(
mapping={
"transfer_method": "tool_file",
"tool_file_id": file_id,
},
tenant_id=tenant_id,
)
except ValueError:
pass
# Try UploadFile (files uploaded by users)
try:
return build_from_mapping(
mapping={
"transfer_method": "local_file",
"upload_file_id": file_id,
},
tenant_id=tenant_id,
)
except ValueError:
pass
# File not found in any source
return None
if current in obj and isinstance(obj[current], dict):
_convert_path_in_place(obj[current], remaining, file_resolver, files)

View File

@ -8,7 +8,7 @@ import json_repair
from pydantic import BaseModel, TypeAdapter, ValidationError
from core.llm_generator.output_parser.errors import OutputParserError
from core.llm_generator.output_parser.file_ref import convert_file_refs_in_output
from core.llm_generator.output_parser.file_ref import detect_file_path_fields
from core.llm_generator.prompts import STRUCTURED_OUTPUT_PROMPT
from core.model_manager import ModelInstance
from core.model_runtime.callbacks.base_callback import Callback
@ -55,12 +55,12 @@ def invoke_llm_with_structured_output(
model_instance: ModelInstance,
prompt_messages: Sequence[PromptMessage],
json_schema: Mapping[str, Any],
model_parameters: Mapping | None = None,
model_parameters: Mapping[str, Any] | None = None,
tools: Sequence[PromptMessageTool] | None = None,
stop: list[str] | None = None,
user: str | None = None,
callbacks: list[Callback] | None = None,
tenant_id: str | None = None,
allow_file_path: bool = False,
) -> LLMResultWithStructuredOutput:
"""
Invoke large language model with structured output.
@ -78,14 +78,13 @@ def invoke_llm_with_structured_output(
:param stop: stop words
:param user: unique user id
:param callbacks: callbacks
:param tenant_id: tenant ID for file reference conversion. When provided and
json_schema contains file reference fields (format: "dify-file-ref"),
file IDs in the output will be automatically converted to File objects.
:return: full response or stream response chunk generator result
:param allow_file_path: allow schema fields formatted as file-path
:return: response with structured output
"""
model_parameters_with_json_schema: dict[str, Any] = {
**(model_parameters or {}),
}
model_parameters_with_json_schema: dict[str, Any] = dict(model_parameters or {})
if detect_file_path_fields(json_schema) and not allow_file_path:
raise OutputParserError("Structured output file paths are only supported in sandbox mode.")
# Determine structured output strategy
@ -122,14 +121,6 @@ def invoke_llm_with_structured_output(
# Fill missing fields with default values
structured_output = fill_defaults_from_schema(structured_output, json_schema)
# Convert file references if tenant_id is provided
if tenant_id is not None:
structured_output = convert_file_refs_in_output(
output=structured_output,
json_schema=json_schema,
tenant_id=tenant_id,
)
return LLMResultWithStructuredOutput(
structured_output=structured_output,
model=llm_result.model,
@ -147,12 +138,11 @@ def invoke_llm_with_pydantic_model(
model_instance: ModelInstance,
prompt_messages: Sequence[PromptMessage],
output_model: type[T],
model_parameters: Mapping | None = None,
model_parameters: Mapping[str, Any] | None = None,
tools: Sequence[PromptMessageTool] | None = None,
stop: list[str] | None = None,
user: str | None = None,
callbacks: list[Callback] | None = None,
tenant_id: str | None = None,
) -> T:
"""
Invoke large language model with a Pydantic output model.
@ -160,11 +150,8 @@ def invoke_llm_with_pydantic_model(
This helper generates a JSON schema from the Pydantic model, invokes the
structured-output LLM path, and validates the result.
The stream parameter controls the underlying LLM invocation mode:
- stream=True (default): Uses streaming LLM call, consumes the generator internally
- stream=False: Uses non-streaming LLM call
In both cases, the function returns the validated Pydantic model directly.
The helper performs a non-streaming invocation and returns the validated
Pydantic model directly.
"""
json_schema = _schema_from_pydantic(output_model)
@ -179,7 +166,6 @@ def invoke_llm_with_pydantic_model(
stop=stop,
user=user,
callbacks=callbacks,
tenant_id=tenant_id,
)
structured_output = result.structured_output
@ -189,6 +175,11 @@ def invoke_llm_with_pydantic_model(
return _validate_structured_output(output_model, structured_output)
def parse_structured_output_text(*, result_text: str, json_schema: Mapping[str, Any]) -> dict[str, Any]:
structured_output = _parse_structured_output(result_text)
return fill_defaults_from_schema(structured_output, json_schema)
def _schema_from_pydantic(output_model: type[BaseModel]) -> dict[str, Any]:
return output_model.model_json_schema()
@ -322,8 +313,8 @@ def fill_defaults_from_schema(
def _handle_native_json_schema(
provider: str,
model_schema: AIModelEntity,
structured_output_schema: Mapping,
model_parameters: dict,
structured_output_schema: Mapping[str, Any],
model_parameters: dict[str, Any],
rules: list[ParameterRule],
):
"""
@ -347,7 +338,7 @@ def _handle_native_json_schema(
return model_parameters
def _set_response_format(model_parameters: dict, rules: list):
def _set_response_format(model_parameters: dict[str, Any], rules: list[ParameterRule]):
"""
Set the appropriate response format parameter based on model rules.
@ -363,7 +354,7 @@ def _set_response_format(model_parameters: dict, rules: list):
def _handle_prompt_based_schema(
prompt_messages: Sequence[PromptMessage], structured_output_schema: Mapping
prompt_messages: Sequence[PromptMessage], structured_output_schema: Mapping[str, Any]
) -> list[PromptMessage]:
"""
Handle structured output for models without native JSON schema support.