vibe: implement file structured output

This commit is contained in:
Stream
2026-02-01 02:47:28 +08:00
parent b6465327c1
commit b66db183c9
8 changed files with 554 additions and 794 deletions

View File

@ -1,4 +1,6 @@
from collections.abc import Mapping
from decimal import Decimal
from typing import Any, NotRequired, TypedDict
from unittest.mock import MagicMock, patch
import pytest
@ -6,16 +8,13 @@ from pydantic import BaseModel, ConfigDict
from core.llm_generator.output_parser.errors import OutputParserError
from core.llm_generator.output_parser.structured_output import (
_get_default_value_for_type,
get_default_value_for_type,
fill_defaults_from_schema,
invoke_llm_with_pydantic_model,
invoke_llm_with_structured_output,
)
from core.model_runtime.entities.llm_entities import (
LLMResult,
LLMResultChunk,
LLMResultChunkDelta,
LLMResultChunkWithStructuredOutput,
LLMResultWithStructuredOutput,
LLMUsage,
)
@ -25,7 +24,30 @@ from core.model_runtime.entities.message_entities import (
TextPromptMessageContent,
UserPromptMessage,
)
from core.model_runtime.entities.model_entities import AIModelEntity, ModelType
from core.model_runtime.entities.common_entities import I18nObject
from core.model_runtime.entities.model_entities import (
AIModelEntity,
ModelType,
ParameterRule,
ParameterType,
)
class StructuredOutputTestCase(TypedDict):
name: str
provider: str
model_name: str
support_structure_output: bool
stream: bool
json_schema: Mapping[str, Any]
expected_llm_response: LLMResult
expected_result_type: type[LLMResultWithStructuredOutput] | None
should_raise: bool
expected_error: NotRequired[type[OutputParserError]]
parameter_rules: NotRequired[list[ParameterRule]]
SchemaData = dict[str, Any]
def create_mock_usage(prompt_tokens: int = 10, completion_tokens: int = 5) -> LLMUsage:
@ -71,7 +93,7 @@ def get_model_instance() -> MagicMock:
def test_structured_output_parser():
"""Test cases for invoke_llm_with_structured_output function"""
testcases = [
testcases: list[StructuredOutputTestCase] = [
# Test case 1: Model with native structured output support, non-streaming
{
"name": "native_structured_output_non_streaming",
@ -88,39 +110,6 @@ def test_structured_output_parser():
"expected_result_type": LLMResultWithStructuredOutput,
"should_raise": False,
},
# Test case 2: Model with native structured output support, streaming
{
"name": "native_structured_output_streaming",
"provider": "openai",
"model_name": "gpt-4o",
"support_structure_output": True,
"stream": True,
"json_schema": {"type": "object", "properties": {"name": {"type": "string"}}},
"expected_llm_response": [
LLMResultChunk(
model="gpt-4o",
prompt_messages=[UserPromptMessage(content="test")],
system_fingerprint="test",
delta=LLMResultChunkDelta(
index=0,
message=AssistantPromptMessage(content='{"name":'),
usage=create_mock_usage(prompt_tokens=10, completion_tokens=2),
),
),
LLMResultChunk(
model="gpt-4o",
prompt_messages=[UserPromptMessage(content="test")],
system_fingerprint="test",
delta=LLMResultChunkDelta(
index=0,
message=AssistantPromptMessage(content=' "test"}'),
usage=create_mock_usage(prompt_tokens=10, completion_tokens=3),
),
),
],
"expected_result_type": "generator",
"should_raise": False,
},
# Test case 3: Model without native structured output support, non-streaming
{
"name": "prompt_based_structured_output_non_streaming",
@ -137,78 +126,24 @@ def test_structured_output_parser():
"expected_result_type": LLMResultWithStructuredOutput,
"should_raise": False,
},
# Test case 4: Model without native structured output support, streaming
{
"name": "prompt_based_structured_output_streaming",
"provider": "anthropic",
"model_name": "claude-3-sonnet",
"support_structure_output": False,
"stream": True,
"json_schema": {"type": "object", "properties": {"answer": {"type": "string"}}},
"expected_llm_response": [
LLMResultChunk(
model="claude-3-sonnet",
prompt_messages=[UserPromptMessage(content="test")],
system_fingerprint="test",
delta=LLMResultChunkDelta(
index=0,
message=AssistantPromptMessage(content='{"answer": "test'),
usage=create_mock_usage(prompt_tokens=15, completion_tokens=3),
),
),
LLMResultChunk(
model="claude-3-sonnet",
prompt_messages=[UserPromptMessage(content="test")],
system_fingerprint="test",
delta=LLMResultChunkDelta(
index=0,
message=AssistantPromptMessage(content=' response"}'),
usage=create_mock_usage(prompt_tokens=15, completion_tokens=5),
),
),
],
"expected_result_type": "generator",
"should_raise": False,
},
# Test case 5: Streaming with list content
{
"name": "streaming_with_list_content",
"name": "non_streaming_with_list_content",
"provider": "openai",
"model_name": "gpt-4o",
"support_structure_output": True,
"stream": True,
"stream": False,
"json_schema": {"type": "object", "properties": {"data": {"type": "string"}}},
"expected_llm_response": [
LLMResultChunk(
model="gpt-4o",
prompt_messages=[UserPromptMessage(content="test")],
system_fingerprint="test",
delta=LLMResultChunkDelta(
index=0,
message=AssistantPromptMessage(
content=[
TextPromptMessageContent(data='{"data":'),
]
),
usage=create_mock_usage(prompt_tokens=10, completion_tokens=2),
),
"expected_llm_response": LLMResult(
model="gpt-4o",
message=AssistantPromptMessage(
content=[
TextPromptMessageContent(data='{"data":'),
TextPromptMessageContent(data=' "value"}'),
]
),
LLMResultChunk(
model="gpt-4o",
prompt_messages=[UserPromptMessage(content="test")],
system_fingerprint="test",
delta=LLMResultChunkDelta(
index=0,
message=AssistantPromptMessage(
content=[
TextPromptMessageContent(data=' "value"}'),
]
),
usage=create_mock_usage(prompt_tokens=10, completion_tokens=3),
),
),
],
"expected_result_type": "generator",
usage=create_mock_usage(prompt_tokens=10, completion_tokens=5),
),
"expected_result_type": LLMResultWithStructuredOutput,
"should_raise": False,
},
# Test case 6: Error case - non-string LLM response content (non-streaming)
@ -253,7 +188,13 @@ def test_structured_output_parser():
"stream": False,
"json_schema": {"type": "object", "properties": {"result": {"type": "string"}}},
"parameter_rules": [
MagicMock(name="response_format", options=["json_schema"], required=False),
ParameterRule(
name="response_format",
label=I18nObject(en_US="response_format"),
type=ParameterType.STRING,
required=False,
options=["json_schema"],
),
],
"expected_llm_response": LLMResult(
model="gpt-4o",
@ -272,7 +213,13 @@ def test_structured_output_parser():
"stream": False,
"json_schema": {"type": "object", "properties": {"output": {"type": "string"}}},
"parameter_rules": [
MagicMock(name="response_format", options=["JSON"], required=False),
ParameterRule(
name="response_format",
label=I18nObject(en_US="response_format"),
type=ParameterType.STRING,
required=False,
options=["JSON"],
),
],
"expected_llm_response": LLMResult(
model="claude-3-sonnet",
@ -285,89 +232,72 @@ def test_structured_output_parser():
]
for case in testcases:
# Setup model entity
model_schema = get_model_entity(case["provider"], case["model_name"], case["support_structure_output"])
provider = case["provider"]
model_name = case["model_name"]
support_structure_output = case["support_structure_output"]
json_schema = case["json_schema"]
stream = case["stream"]
# Add parameter rules if specified
if "parameter_rules" in case:
model_schema.parameter_rules = case["parameter_rules"]
model_schema = get_model_entity(provider, model_name, support_structure_output)
parameter_rules = case.get("parameter_rules")
if parameter_rules is not None:
model_schema.parameter_rules = parameter_rules
# Setup model instance
model_instance = get_model_instance()
model_instance.invoke_llm.return_value = case["expected_llm_response"]
# Setup prompt messages
prompt_messages = [
SystemPromptMessage(content="You are a helpful assistant."),
UserPromptMessage(content="Generate a response according to the schema."),
]
if case["should_raise"]:
# Test error cases
with pytest.raises(case["expected_error"]): # noqa: PT012
if case["stream"]:
expected_error = case.get("expected_error", OutputParserError)
with pytest.raises(expected_error): # noqa: PT012
if stream:
result_generator = invoke_llm_with_structured_output(
provider=case["provider"],
provider=provider,
model_schema=model_schema,
model_instance=model_instance,
prompt_messages=prompt_messages,
json_schema=case["json_schema"],
json_schema=json_schema,
)
# Consume the generator to trigger the error
list(result_generator)
else:
invoke_llm_with_structured_output(
provider=case["provider"],
provider=provider,
model_schema=model_schema,
model_instance=model_instance,
prompt_messages=prompt_messages,
json_schema=case["json_schema"],
json_schema=json_schema,
)
else:
# Test successful cases
with patch("core.llm_generator.output_parser.structured_output.json_repair.loads") as mock_json_repair:
# Configure json_repair mock for cases that need it
if case["name"] == "json_repair_scenario":
mock_json_repair.return_value = {"name": "test"}
result = invoke_llm_with_structured_output(
provider=case["provider"],
provider=provider,
model_schema=model_schema,
model_instance=model_instance,
prompt_messages=prompt_messages,
json_schema=case["json_schema"],
json_schema=json_schema,
model_parameters={"temperature": 0.7, "max_tokens": 100},
user="test_user",
)
if case["expected_result_type"] == "generator":
# Test streaming results
assert hasattr(result, "__iter__")
chunks = list(result)
assert len(chunks) > 0
expected_result_type = case["expected_result_type"]
assert expected_result_type is not None
assert isinstance(result, expected_result_type)
assert result.model == model_name
assert result.structured_output is not None
assert isinstance(result.structured_output, dict)
# Verify all chunks are LLMResultChunkWithStructuredOutput
for chunk in chunks[:-1]: # All except last
assert isinstance(chunk, LLMResultChunkWithStructuredOutput)
assert chunk.model == case["model_name"]
# Last chunk should have structured output
last_chunk = chunks[-1]
assert isinstance(last_chunk, LLMResultChunkWithStructuredOutput)
assert last_chunk.structured_output is not None
assert isinstance(last_chunk.structured_output, dict)
else:
# Test non-streaming results
assert isinstance(result, case["expected_result_type"])
assert result.model == case["model_name"]
assert result.structured_output is not None
assert isinstance(result.structured_output, dict)
# Verify model_instance.invoke_llm was called with correct parameters
model_instance.invoke_llm.assert_called_once()
call_args = model_instance.invoke_llm.call_args
assert call_args.kwargs["stream"] == case["stream"]
assert call_args.kwargs["stream"] == stream
assert call_args.kwargs["user"] == "test_user"
assert "temperature" in call_args.kwargs["model_parameters"]
assert "max_tokens" in call_args.kwargs["model_parameters"]
@ -376,45 +306,32 @@ def test_structured_output_parser():
def test_parse_structured_output_edge_cases():
"""Test edge cases for structured output parsing"""
# Test case with list that contains dict (reasoning model scenario)
testcase_list_with_dict = {
"name": "list_with_dict_parsing",
"provider": "deepseek",
"model_name": "deepseek-r1",
"support_structure_output": False,
"stream": False,
"json_schema": {"type": "object", "properties": {"thought": {"type": "string"}}},
"expected_llm_response": LLMResult(
model="deepseek-r1",
message=AssistantPromptMessage(content='[{"thought": "reasoning process"}, "other content"]'),
usage=create_mock_usage(prompt_tokens=10, completion_tokens=5),
),
"expected_result_type": LLMResultWithStructuredOutput,
"should_raise": False,
}
# Setup for list parsing test
model_schema = get_model_entity(
testcase_list_with_dict["provider"],
testcase_list_with_dict["model_name"],
testcase_list_with_dict["support_structure_output"],
provider = "deepseek"
model_name = "deepseek-r1"
support_structure_output = False
json_schema: SchemaData = {"type": "object", "properties": {"thought": {"type": "string"}}}
expected_llm_response = LLMResult(
model="deepseek-r1",
message=AssistantPromptMessage(content='[{"thought": "reasoning process"}, "other content"]'),
usage=create_mock_usage(prompt_tokens=10, completion_tokens=5),
)
model_schema = get_model_entity(provider, model_name, support_structure_output)
model_instance = get_model_instance()
model_instance.invoke_llm.return_value = testcase_list_with_dict["expected_llm_response"]
model_instance.invoke_llm.return_value = expected_llm_response
prompt_messages = [UserPromptMessage(content="Test reasoning")]
with patch("core.llm_generator.output_parser.structured_output.json_repair.loads") as mock_json_repair:
# Mock json_repair to return a list with dict
mock_json_repair.return_value = [{"thought": "reasoning process"}, "other content"]
result = invoke_llm_with_structured_output(
provider=testcase_list_with_dict["provider"],
provider=provider,
model_schema=model_schema,
model_instance=model_instance,
prompt_messages=prompt_messages,
json_schema=testcase_list_with_dict["json_schema"],
json_schema=json_schema,
)
assert isinstance(result, LLMResultWithStructuredOutput)
@ -424,18 +341,16 @@ def test_parse_structured_output_edge_cases():
def test_model_specific_schema_preparation():
"""Test schema preparation for different model types"""
# Test Gemini model
gemini_case = {
"provider": "google",
"model_name": "gemini-pro",
"support_structure_output": True,
"stream": False,
"json_schema": {"type": "object", "properties": {"result": {"type": "boolean"}}, "additionalProperties": False},
provider = "google"
model_name = "gemini-pro"
support_structure_output = True
json_schema: SchemaData = {
"type": "object",
"properties": {"result": {"type": "boolean"}},
"additionalProperties": False,
}
model_schema = get_model_entity(
gemini_case["provider"], gemini_case["model_name"], gemini_case["support_structure_output"]
)
model_schema = get_model_entity(provider, model_name, support_structure_output)
model_instance = get_model_instance()
model_instance.invoke_llm.return_value = LLMResult(
@ -447,11 +362,11 @@ def test_model_specific_schema_preparation():
prompt_messages = [UserPromptMessage(content="Test")]
result = invoke_llm_with_structured_output(
provider=gemini_case["provider"],
provider=provider,
model_schema=model_schema,
model_instance=model_instance,
prompt_messages=prompt_messages,
json_schema=gemini_case["json_schema"],
json_schema=json_schema,
)
assert isinstance(result, LLMResultWithStructuredOutput)
@ -493,40 +408,26 @@ def test_structured_output_with_pydantic_model_non_streaming():
assert result.name == "test"
def test_structured_output_with_pydantic_model_streaming():
def test_structured_output_with_pydantic_model_list_content():
model_schema = get_model_entity("openai", "gpt-4o", support_structure_output=True)
model_instance = get_model_instance()
def mock_streaming_response():
yield LLMResultChunk(
model="gpt-4o",
prompt_messages=[UserPromptMessage(content="test")],
system_fingerprint="test",
delta=LLMResultChunkDelta(
index=0,
message=AssistantPromptMessage(content='{"name":'),
usage=create_mock_usage(prompt_tokens=8, completion_tokens=2),
),
)
yield LLMResultChunk(
model="gpt-4o",
prompt_messages=[UserPromptMessage(content="test")],
system_fingerprint="test",
delta=LLMResultChunkDelta(
index=0,
message=AssistantPromptMessage(content=' "test"}'),
usage=create_mock_usage(prompt_tokens=8, completion_tokens=4),
),
)
model_instance.invoke_llm.return_value = mock_streaming_response()
model_instance.invoke_llm.return_value = LLMResult(
model="gpt-4o",
message=AssistantPromptMessage(
content=[
TextPromptMessageContent(data='{"name":'),
TextPromptMessageContent(data=' "test"}'),
]
),
usage=create_mock_usage(prompt_tokens=8, completion_tokens=4),
)
result = invoke_llm_with_pydantic_model(
provider="openai",
model_schema=model_schema,
model_instance=model_instance,
prompt_messages=[UserPromptMessage(content="Return a JSON object with name.")],
output_model=ExampleOutput
output_model=ExampleOutput,
)
assert isinstance(result, ExampleOutput)
@ -548,51 +449,51 @@ def test_structured_output_with_pydantic_model_validation_error():
model_schema=model_schema,
model_instance=model_instance,
prompt_messages=[UserPromptMessage(content="test")],
output_model=ExampleOutput
output_model=ExampleOutput,
)
class TestGetDefaultValueForType:
"""Test cases for _get_default_value_for_type function"""
"""Test cases for get_default_value_for_type function"""
def test_string_type(self):
assert _get_default_value_for_type("string") == ""
assert get_default_value_for_type("string") == ""
def test_object_type(self):
assert _get_default_value_for_type("object") == {}
assert get_default_value_for_type("object") == {}
def test_array_type(self):
assert _get_default_value_for_type("array") == []
assert get_default_value_for_type("array") == []
def test_number_type(self):
assert _get_default_value_for_type("number") == 0
assert get_default_value_for_type("number") == 0
def test_integer_type(self):
assert _get_default_value_for_type("integer") == 0
assert get_default_value_for_type("integer") == 0
def test_boolean_type(self):
assert _get_default_value_for_type("boolean") is False
assert get_default_value_for_type("boolean") is False
def test_null_type(self):
assert _get_default_value_for_type("null") is None
assert get_default_value_for_type("null") is None
def test_none_type(self):
assert _get_default_value_for_type(None) is None
assert get_default_value_for_type(None) is None
def test_unknown_type(self):
assert _get_default_value_for_type("unknown") is None
assert get_default_value_for_type("unknown") is None
def test_union_type_string_null(self):
# ["string", "null"] should return "" (first non-null type)
assert _get_default_value_for_type(["string", "null"]) == ""
assert get_default_value_for_type(["string", "null"]) == ""
def test_union_type_null_first(self):
# ["null", "integer"] should return 0 (first non-null type)
assert _get_default_value_for_type(["null", "integer"]) == 0
assert get_default_value_for_type(["null", "integer"]) == 0
def test_union_type_only_null(self):
# ["null"] should return None
assert _get_default_value_for_type(["null"]) is None
assert get_default_value_for_type(["null"]) is None
class TestFillDefaultsFromSchema:
@ -600,7 +501,7 @@ class TestFillDefaultsFromSchema:
def test_simple_required_fields(self):
"""Test filling simple required fields"""
schema = {
schema: SchemaData = {
"type": "object",
"properties": {
"name": {"type": "string"},
@ -609,7 +510,7 @@ class TestFillDefaultsFromSchema:
},
"required": ["name", "age"],
}
output = {"name": "Alice"}
output: SchemaData = {"name": "Alice"}
result = fill_defaults_from_schema(output, schema)
@ -619,7 +520,7 @@ class TestFillDefaultsFromSchema:
def test_non_required_fields_not_filled(self):
"""Test that non-required fields are not filled"""
schema = {
schema: SchemaData = {
"type": "object",
"properties": {
"required_field": {"type": "string"},
@ -627,7 +528,7 @@ class TestFillDefaultsFromSchema:
},
"required": ["required_field"],
}
output = {}
output: SchemaData = {}
result = fill_defaults_from_schema(output, schema)
@ -636,7 +537,7 @@ class TestFillDefaultsFromSchema:
def test_nested_object_required_fields(self):
"""Test filling nested object required fields"""
schema = {
schema: SchemaData = {
"type": "object",
"properties": {
"user": {
@ -659,7 +560,7 @@ class TestFillDefaultsFromSchema:
},
"required": ["user"],
}
output = {
output: SchemaData = {
"user": {
"name": "Alice",
"address": {
@ -684,7 +585,7 @@ class TestFillDefaultsFromSchema:
def test_missing_nested_object_created(self):
"""Test that missing required nested objects are created"""
schema = {
schema: SchemaData = {
"type": "object",
"properties": {
"metadata": {
@ -698,7 +599,7 @@ class TestFillDefaultsFromSchema:
},
"required": ["metadata"],
}
output = {}
output: SchemaData = {}
result = fill_defaults_from_schema(output, schema)
@ -710,7 +611,7 @@ class TestFillDefaultsFromSchema:
def test_all_types_default_values(self):
"""Test default values for all types"""
schema = {
schema: SchemaData = {
"type": "object",
"properties": {
"str_field": {"type": "string"},
@ -722,7 +623,7 @@ class TestFillDefaultsFromSchema:
},
"required": ["str_field", "int_field", "num_field", "bool_field", "arr_field", "obj_field"],
}
output = {}
output: SchemaData = {}
result = fill_defaults_from_schema(output, schema)
@ -737,7 +638,7 @@ class TestFillDefaultsFromSchema:
def test_existing_values_preserved(self):
"""Test that existing values are not overwritten"""
schema = {
schema: SchemaData = {
"type": "object",
"properties": {
"name": {"type": "string"},
@ -745,7 +646,7 @@ class TestFillDefaultsFromSchema:
},
"required": ["name", "count"],
}
output = {"name": "Bob", "count": 42}
output: SchemaData = {"name": "Bob", "count": 42}
result = fill_defaults_from_schema(output, schema)
@ -753,7 +654,7 @@ class TestFillDefaultsFromSchema:
def test_complex_nested_structure(self):
"""Test complex nested structure with multiple levels"""
schema = {
schema: SchemaData = {
"type": "object",
"properties": {
"user": {
@ -789,7 +690,7 @@ class TestFillDefaultsFromSchema:
},
"required": ["user", "tags", "metadata", "is_active"],
}
output = {
output: SchemaData = {
"user": {
"name": "Alice",
"age": 25,
@ -829,8 +730,8 @@ class TestFillDefaultsFromSchema:
def test_empty_schema(self):
"""Test with empty schema"""
schema = {}
output = {"any": "value"}
schema: SchemaData = {}
output: SchemaData = {"any": "value"}
result = fill_defaults_from_schema(output, schema)
@ -838,14 +739,14 @@ class TestFillDefaultsFromSchema:
def test_schema_without_required(self):
"""Test schema without required field"""
schema = {
schema: SchemaData = {
"type": "object",
"properties": {
"optional1": {"type": "string"},
"optional2": {"type": "integer"},
},
}
output = {}
output: SchemaData = {}
result = fill_defaults_from_schema(output, schema)