revert: add tools for output in agent mode

feat: hide output tools and improve JSON formatting for structured output
feat: hide output tools and improve JSON formatting for structured output
fix: handle prompt template correctly to extract selectors for step run
fix: emit StreamChunkEvent correctly for sandbox agent
chore: better debug message
fix: incorrect output tool runtime selection
fix: type issues
fix: align parameter list
fix: align parameter list
fix: hide internal builtin providers from tool list
vibe: implement file structured output
vibe: implement file structured output
fix: refix parameter for tool
fix: crash
fix: crash
refactor: remove union types
fix: type check
Merge branch 'feat/structured-output-with-sandbox' into feat/support-agent-sandbox
fix: provide json as text
fix: provide json as text
fix: get AgentResult correctly
fix: provides correct prompts, tools and terminal predicates
fix: provides correct prompts, tools and terminal predicates
fix: circular import
feat: support structured output in sandbox and tool mode
This commit is contained in:
Stream
2026-02-04 21:13:07 +08:00
parent 25065a4f2f
commit e0082dbf18
41 changed files with 1014 additions and 1358 deletions

View File

@ -577,12 +577,12 @@ class KnowledgeRetrievalNode(LLMUsageTrackingMixin, Node[KnowledgeRetrievalNodeD
prompt_messages=prompt_messages,
stop=stop,
user_id=self.user_id,
structured_output_schema=None,
structured_output_enabled=self.node_data.structured_output_enabled,
structured_output=None,
file_saver=self._llm_file_saver,
file_outputs=self._file_outputs,
node_id=self._node_id,
node_type=self.node_type,
tenant_id=self.tenant_id,
)
for event in generator:

View File

@ -7,7 +7,7 @@ from pydantic import BaseModel, ConfigDict, Field, field_serializer, field_valid
from core.agent.entities import AgentLog, AgentResult
from core.file import File
from core.model_runtime.entities import ImagePromptMessageContent, LLMMode
from core.model_runtime.entities.llm_entities import LLMStructuredOutput, LLMUsage
from core.model_runtime.entities.llm_entities import LLMUsage
from core.prompt.entities.advanced_prompt_entities import ChatModelMessage, CompletionModelPromptTemplate, MemoryConfig
from core.tools.entities.tool_entities import ToolProviderType
from core.workflow.entities import ToolCall, ToolCallResult
@ -156,9 +156,6 @@ class LLMGenerationData(BaseModel):
finish_reason: str | None = Field(None, description="Finish reason from LLM")
files: list[File] = Field(default_factory=list, description="Generated files")
trace: list[LLMTraceSegment] = Field(default_factory=list, description="Streaming trace in emitted order")
structured_output: LLMStructuredOutput | None = Field(
default=None, description="Structured output from tool-only agent runs"
)
class ThinkTagStreamParser:
@ -287,7 +284,6 @@ class AggregatedResult(BaseModel):
files: list[File] = Field(default_factory=list)
usage: LLMUsage = Field(default_factory=LLMUsage.empty_usage)
finish_reason: str | None = None
structured_output: LLMStructuredOutput | None = None
class AgentContext(BaseModel):
@ -387,7 +383,7 @@ class LLMNodeData(BaseNodeData):
Strategy for handling model reasoning output.
separated: Return clean text (without <think> tags) + reasoning_content field.
Recommended for new workflows. Enables safe downstream parsing and
Recommended for new workflows. Enables safe downstream parsing and
workflow variable access: {{#node_id.reasoning_content#}}
tagged : Return original text (with <think> tags) + reasoning_content field.

View File

@ -257,8 +257,8 @@ def _build_file_descriptions(files: Sequence[Any]) -> str:
"""
Build a text description of generated files for inclusion in context.
The description includes file_id for context; structured output file paths
are only supported in sandbox mode.
The description includes file_id which can be used by subsequent nodes
to reference the files via structured output.
"""
if not files:
return ""

View File

@ -13,24 +13,13 @@ from typing import TYPE_CHECKING, Any, Literal, cast
from sqlalchemy import select
from core.agent.entities import AgentEntity, AgentLog, AgentResult, AgentToolEntity, ExecutionContext
from core.agent.output_tools import (
FINAL_OUTPUT_TOOL,
FINAL_STRUCTURED_OUTPUT_TOOL,
OUTPUT_TEXT_TOOL,
build_agent_output_tools,
)
from core.agent.patterns import StrategyFactory
from core.app.entities.app_asset_entities import AppAssetFileTree
from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
from core.app_assets.constants import AppAssetsAttrs
from core.file import FileTransferMethod, FileType, file_manager
from core.file import File, FileTransferMethod, FileType, file_manager
from core.helper.code_executor import CodeExecutor, CodeLanguage
from core.llm_generator.output_parser.errors import OutputParserError
from core.llm_generator.output_parser.file_ref import (
adapt_schema_for_sandbox_file_paths,
convert_sandbox_file_paths_in_output,
detect_file_path_fields,
)
from core.llm_generator.output_parser.structured_output import invoke_llm_with_structured_output
from core.memory.base import BaseMemory
from core.model_manager import ModelInstance, ModelManager
@ -73,7 +62,6 @@ from core.skill.entities.skill_document import SkillDocument
from core.skill.entities.tool_dependencies import ToolDependencies, ToolDependency
from core.skill.skill_compiler import SkillCompiler
from core.tools.__base.tool import Tool
from core.tools.entities.tool_entities import ToolInvokeFrom
from core.tools.signature import sign_upload_file
from core.tools.tool_manager import ToolManager
from core.variables import (
@ -197,11 +185,12 @@ class LLMNode(Node[LLMNodeData]):
def _run(self) -> Generator:
node_inputs: dict[str, Any] = {}
process_data: dict[str, Any] = {}
usage: LLMUsage = LLMUsage.empty_usage()
finish_reason: str | None = None
reasoning_content: str = "" # Initialize as empty string for consistency
clean_text = ""
usage = LLMUsage.empty_usage()
finish_reason = None
reasoning_content = "" # Initialize as empty string for consistency
clean_text = "" # Initialize clean_text to avoid UnboundLocalError
variable_pool: VariablePool = self.graph_runtime_state.variable_pool
variable_pool = self.graph_runtime_state.variable_pool
try:
# Parse prompt template to separate static messages and context references
@ -261,9 +250,8 @@ class LLMNode(Node[LLMNodeData]):
)
query: str | None = None
memory_config = self.node_data.memory
if memory_config:
query = memory_config.query_prompt_template
if self.node_data.memory:
query = self.node_data.memory.query_prompt_template
if not query and (
query_variable := variable_pool.get((SYSTEM_VARIABLE_NODE_ID, SystemVariableKey.QUERY))
):
@ -305,23 +293,9 @@ class LLMNode(Node[LLMNodeData]):
sandbox=self.graph_runtime_state.sandbox,
)
structured_output_schema: Mapping[str, Any] | None
structured_output_file_paths: list[str] = []
if self.node_data.structured_output_enabled:
if not self.node_data.structured_output:
raise ValueError("structured_output_enabled is True but structured_output is not set")
raw_schema = LLMNode.fetch_structured_output_schema(structured_output=self.node_data.structured_output)
if self.node_data.computer_use:
structured_output_schema, structured_output_file_paths = adapt_schema_for_sandbox_file_paths(
raw_schema
)
else:
if detect_file_path_fields(raw_schema):
raise LLMNodeError("Structured output file paths are only supported in sandbox mode.")
structured_output_schema = raw_schema
else:
structured_output_schema = None
# Variables for outputs
generation_data: LLMGenerationData | None = None
structured_output: LLMStructuredOutput | None = None
if self.node_data.computer_use:
sandbox = self.graph_runtime_state.sandbox
@ -335,10 +309,7 @@ class LLMNode(Node[LLMNodeData]):
stop=stop,
variable_pool=variable_pool,
tool_dependencies=tool_dependencies,
structured_output_schema=structured_output_schema,
structured_output_file_paths=structured_output_file_paths,
)
elif self.tool_call_enabled:
generator = self._invoke_llm_with_tools(
model_instance=model_instance,
@ -348,7 +319,6 @@ class LLMNode(Node[LLMNodeData]):
variable_pool=variable_pool,
node_inputs=node_inputs,
process_data=process_data,
structured_output_schema=structured_output_schema,
)
else:
# Use traditional LLM invocation
@ -358,7 +328,8 @@ class LLMNode(Node[LLMNodeData]):
prompt_messages=prompt_messages,
stop=stop,
user_id=self.user_id,
structured_output_schema=structured_output_schema,
structured_output_enabled=self._node_data.structured_output_enabled,
structured_output=self._node_data.structured_output,
file_saver=self._llm_file_saver,
file_outputs=self._file_outputs,
node_id=self._node_id,
@ -384,8 +355,6 @@ class LLMNode(Node[LLMNodeData]):
reasoning_content = ""
usage = generation_data.usage
finish_reason = generation_data.finish_reason
if generation_data.structured_output:
structured_output = generation_data.structured_output
# Unified process_data building
process_data = {
@ -409,7 +378,16 @@ class LLMNode(Node[LLMNodeData]):
if tool.enabled
]
# Build generation field and determine files_to_output first
# Unified outputs building
outputs = {
"text": clean_text,
"reasoning_content": reasoning_content,
"usage": jsonable_encoder(usage),
"finish_reason": finish_reason,
"context": llm_utils.build_context(prompt_messages, clean_text, generation_data),
}
# Build generation field
if generation_data:
# Use generation_data from tool invocation (supports multi-turn)
generation = {
@ -437,15 +415,6 @@ class LLMNode(Node[LLMNodeData]):
}
files_to_output = self._file_outputs
# Unified outputs building (files passed to context for subsequent node reference)
outputs = {
"text": clean_text,
"reasoning_content": reasoning_content,
"usage": jsonable_encoder(usage),
"finish_reason": finish_reason,
"context": llm_utils.build_context(prompt_messages, clean_text, generation_data, files=files_to_output),
}
outputs["generation"] = generation
if files_to_output:
outputs["files"] = ArrayFileSegment(value=files_to_output)
@ -524,7 +493,8 @@ class LLMNode(Node[LLMNodeData]):
prompt_messages: Sequence[PromptMessage],
stop: Sequence[str] | None = None,
user_id: str,
structured_output_schema: Mapping[str, Any] | None,
structured_output_enabled: bool,
structured_output: Mapping[str, Any] | None = None,
file_saver: LLMFileSaver,
file_outputs: list[File],
node_id: str,
@ -538,7 +508,10 @@ class LLMNode(Node[LLMNodeData]):
if not model_schema:
raise ValueError(f"Model schema not found for {node_data_model.name}")
if structured_output_schema:
if structured_output_enabled:
output_schema = LLMNode.fetch_structured_output_schema(
structured_output=structured_output or {},
)
request_start_time = time.perf_counter()
invoke_result = invoke_llm_with_structured_output(
@ -546,12 +519,12 @@ class LLMNode(Node[LLMNodeData]):
model_schema=model_schema,
model_instance=model_instance,
prompt_messages=prompt_messages,
json_schema=structured_output_schema,
json_schema=output_schema,
model_parameters=node_data_model.completion_params,
stop=list(stop or []),
user=user_id,
tenant_id=tenant_id,
)
else:
request_start_time = time.perf_counter()
@ -1288,16 +1261,18 @@ class LLMNode(Node[LLMNodeData]):
# Insert histories into the prompt
prompt_content = prompt_messages[0].content
# For issue #11247 - Check if prompt content is a string or a list
if isinstance(prompt_content, str):
prompt_content_type = type(prompt_content)
if prompt_content_type == str:
prompt_content = str(prompt_content)
if "#histories#" in prompt_content:
prompt_content = prompt_content.replace("#histories#", memory_text)
else:
prompt_content = memory_text + "\n" + prompt_content
prompt_messages[0].content = prompt_content
elif isinstance(prompt_content, list):
elif prompt_content_type == list:
prompt_content = prompt_content if isinstance(prompt_content, list) else []
for content_item in prompt_content:
if isinstance(content_item, TextPromptMessageContent):
if content_item.type == PromptMessageContentType.TEXT:
if "#histories#" in content_item.data:
content_item.data = content_item.data.replace("#histories#", memory_text)
else:
@ -1307,12 +1282,13 @@ class LLMNode(Node[LLMNodeData]):
# Add current query to the prompt message
if sys_query:
if isinstance(prompt_content, str):
if prompt_content_type == str:
prompt_content = str(prompt_messages[0].content).replace("#sys.query#", sys_query)
prompt_messages[0].content = prompt_content
elif isinstance(prompt_content, list):
elif prompt_content_type == list:
prompt_content = prompt_content if isinstance(prompt_content, list) else []
for content_item in prompt_content:
if isinstance(content_item, TextPromptMessageContent):
if content_item.type == PromptMessageContentType.TEXT:
content_item.data = sys_query + "\n" + content_item.data
else:
raise ValueError("Invalid prompt content type")
@ -1438,11 +1414,9 @@ class LLMNode(Node[LLMNodeData]):
if isinstance(item, PromptMessageContext):
if len(item.value_selector) >= 2:
prompt_context_selectors.append(item.value_selector)
elif isinstance(item, LLMNodeChatModelMessage):
elif isinstance(item, LLMNodeChatModelMessage) and item.edition_type == "jinja2":
variable_template_parser = VariableTemplateParser(template=item.text)
variable_selectors.extend(variable_template_parser.extract_variable_selectors())
else:
raise InvalidVariableTypeError(f"Invalid prompt template type: {type(prompt_template)}")
elif isinstance(prompt_template, LLMNodeCompletionModelPromptTemplate):
if prompt_template.edition_type != "jinja2":
variable_template_parser = VariableTemplateParser(template=prompt_template.text)
@ -1478,14 +1452,13 @@ class LLMNode(Node[LLMNodeData]):
if typed_node_data.prompt_config:
enable_jinja = False
if isinstance(prompt_template, LLMNodeCompletionModelPromptTemplate):
if prompt_template.edition_type == "jinja2":
enable_jinja = True
else:
for prompt in prompt_template:
if prompt.edition_type == "jinja2":
if isinstance(prompt_template, list):
for item in prompt_template:
if isinstance(item, LLMNodeChatModelMessage) and item.edition_type == "jinja2":
enable_jinja = True
break
else:
enable_jinja = True
if enable_jinja:
for variable_selector in typed_node_data.prompt_config.jinja2_variables or []:
@ -1898,7 +1871,6 @@ class LLMNode(Node[LLMNodeData]):
variable_pool: VariablePool,
node_inputs: dict[str, Any],
process_data: dict[str, Any],
structured_output_schema: Mapping[str, Any] | None,
) -> Generator[NodeEventBase, None, LLMGenerationData]:
"""Invoke LLM with tools support (from Agent V2).
@ -1915,16 +1887,12 @@ class LLMNode(Node[LLMNodeData]):
# Use factory to create appropriate strategy
strategy = StrategyFactory.create_strategy(
tenant_id=self.tenant_id,
invoke_from=self.invoke_from,
tool_invoke_from=ToolInvokeFrom.WORKFLOW,
model_features=model_features,
model_instance=model_instance,
tools=tool_instances,
files=prompt_files,
max_iterations=self._node_data.max_iterations or 10,
context=ExecutionContext(user_id=self.user_id, app_id=self.app_id, tenant_id=self.tenant_id),
structured_output_schema=structured_output_schema,
)
# Run strategy
@ -1932,6 +1900,7 @@ class LLMNode(Node[LLMNodeData]):
prompt_messages=list(prompt_messages),
model_parameters=self._node_data.model.completion_params,
stop=list(stop or []),
stream=True,
)
result = yield from self._process_tool_outputs(outputs)
@ -1945,12 +1914,8 @@ class LLMNode(Node[LLMNodeData]):
stop: Sequence[str] | None,
variable_pool: VariablePool,
tool_dependencies: ToolDependencies | None,
structured_output_schema: Mapping[str, Any] | None,
structured_output_file_paths: Sequence[str] | None,
) -> Generator[NodeEventBase | LLMStructuredOutput, None, LLMGenerationData]:
) -> Generator[NodeEventBase, None, LLMGenerationData]:
result: LLMGenerationData | None = None
sandbox_output_files: list[File] = []
structured_output_files: list[File] = []
# FIXME(Mairuis): Async processing for bash session.
with SandboxBashSession(sandbox=sandbox, node_id=self.id, tools=tool_dependencies) as session:
@ -1958,9 +1923,6 @@ class LLMNode(Node[LLMNodeData]):
model_features = self._get_model_features(model_instance)
strategy = StrategyFactory.create_strategy(
tenant_id=self.tenant_id,
invoke_from=self.invoke_from,
tool_invoke_from=ToolInvokeFrom.WORKFLOW,
model_features=model_features,
model_instance=model_instance,
tools=[session.bash_tool],
@ -1968,55 +1930,20 @@ class LLMNode(Node[LLMNodeData]):
max_iterations=self._node_data.max_iterations or 100,
agent_strategy=AgentEntity.Strategy.FUNCTION_CALLING,
context=ExecutionContext(user_id=self.user_id, app_id=self.app_id, tenant_id=self.tenant_id),
structured_output_schema=structured_output_schema,
)
outputs = strategy.run(
prompt_messages=list(prompt_messages),
model_parameters=self._node_data.model.completion_params,
stop=list(stop or []),
stream=True,
)
result = yield from self._process_tool_outputs(outputs)
if result and result.structured_output and structured_output_file_paths:
structured_output_payload = result.structured_output.structured_output or {}
try:
converted_output, structured_output_files = convert_sandbox_file_paths_in_output(
output=structured_output_payload,
file_path_fields=structured_output_file_paths,
file_resolver=session.download_file,
)
except ValueError as exc:
raise LLMNodeError(str(exc)) from exc
result = result.model_copy(
update={"structured_output": LLMStructuredOutput(structured_output=converted_output)}
)
# Collect output files from sandbox before session ends
# Files are saved as ToolFiles with valid tool_file_id for later reference
sandbox_output_files = session.collect_output_files()
if result is None:
raise LLMNodeError("SandboxSession exited unexpectedly")
structured_output = result.structured_output
if structured_output is not None:
yield structured_output
# Merge sandbox output files into result
if sandbox_output_files or structured_output_files:
result = LLMGenerationData(
text=result.text,
reasoning_contents=result.reasoning_contents,
tool_calls=result.tool_calls,
sequence=result.sequence,
usage=result.usage,
finish_reason=result.finish_reason,
files=result.files + sandbox_output_files + structured_output_files,
trace=result.trace,
)
return result
def _get_model_features(self, model_instance: ModelInstance) -> list[ModelFeature]:
@ -2084,20 +2011,6 @@ class LLMNode(Node[LLMNodeData]):
logger.warning("Failed to load tool %s: %s", tool, str(e))
continue
structured_output_schema = None
if self._node_data.structured_output_enabled:
structured_output_schema = LLMNode.fetch_structured_output_schema(
structured_output=self._node_data.structured_output or {},
)
tool_instances.extend(
build_agent_output_tools(
tenant_id=self.tenant_id,
invoke_from=self.invoke_from,
tool_invoke_from=ToolInvokeFrom.WORKFLOW,
structured_output_schema=structured_output_schema,
)
)
return tool_instances
def _extract_prompt_files(self, variable_pool: VariablePool) -> list[File]:
@ -2261,45 +2174,18 @@ class LLMNode(Node[LLMNodeData]):
# Add tool call to pending list for model segment
buffers.pending_tool_calls.append(ToolCall(id=tool_call_id, name=tool_name, arguments=tool_arguments))
output_tool_names = {OUTPUT_TEXT_TOOL, FINAL_OUTPUT_TOOL, FINAL_STRUCTURED_OUTPUT_TOOL}
if tool_name not in output_tool_names:
yield ToolCallChunkEvent(
selector=[self._node_id, "generation", "tool_calls"],
chunk=tool_arguments,
tool_call=ToolCall(
id=tool_call_id,
name=tool_name,
arguments=tool_arguments,
icon=tool_icon,
icon_dark=tool_icon_dark,
),
is_final=False,
)
if tool_name in output_tool_names:
content = ""
if tool_name in (OUTPUT_TEXT_TOOL, FINAL_OUTPUT_TOOL):
content = payload.tool_args["text"]
elif tool_name == FINAL_STRUCTURED_OUTPUT_TOOL:
raw_content = json.dumps(
payload.tool_args["data"],
ensure_ascii=False,
indent=2
)
content = f"```json\n{raw_content}\n```"
if content:
yield StreamChunkEvent(
selector=[self._node_id, "text"],
chunk=content,
is_final=False,
)
yield StreamChunkEvent(
selector=[self._node_id, "generation", "content"],
chunk=content,
is_final=False,
)
yield ToolCallChunkEvent(
selector=[self._node_id, "generation", "tool_calls"],
chunk=tool_arguments,
tool_call=ToolCall(
id=tool_call_id,
name=tool_name,
arguments=tool_arguments,
icon=tool_icon,
icon_dark=tool_icon_dark,
),
is_final=False,
)
if output.log_type == AgentLog.LogType.TOOL_CALL and output.status != AgentLog.LogStatus.START:
tool_name = payload.tool_name
@ -2543,7 +2429,6 @@ class LLMNode(Node[LLMNodeData]):
content_position = 0
tool_call_seen_index: dict[str, int] = {}
for trace_segment in trace_state.trace_segments:
# FIXME: These if will never happen
if trace_segment.type == "thought":
sequence.append({"type": "reasoning", "index": reasoning_index})
reasoning_index += 1
@ -2586,15 +2471,8 @@ class LLMNode(Node[LLMNodeData]):
key=lambda item: trace_state.tool_call_index_map.get(item.id or "", len(trace_state.tool_call_index_map))
)
text_content: str
if aggregate.text:
text_content = aggregate.text
elif aggregate.structured_output:
text_content = json.dumps(aggregate.structured_output.structured_output)
else:
raise ValueError("Aggregate must have either text or structured output.")
return LLMGenerationData(
text=text_content,
text=aggregate.text,
reasoning_contents=buffers.reasoning_per_turn,
tool_calls=tool_calls_for_generation,
sequence=sequence,
@ -2602,7 +2480,6 @@ class LLMNode(Node[LLMNodeData]):
finish_reason=aggregate.finish_reason,
files=aggregate.files,
trace=trace_state.trace_segments,
structured_output=aggregate.structured_output,
)
def _process_tool_outputs(
@ -2613,33 +2490,22 @@ class LLMNode(Node[LLMNodeData]):
state = ToolOutputState()
try:
while True:
output = next(outputs)
for output in outputs:
if isinstance(output, AgentLog):
yield from self._handle_agent_log_output(output, state.stream, state.trace, state.agent)
else:
continue
yield from self._handle_llm_chunk_output(output, state.stream, state.trace, state.aggregate)
except StopIteration as exception:
if not isinstance(exception.value, AgentResult):
raise ValueError(f"Unexpected output type: {type(exception.value)}") from exception
state.agent.agent_result = exception.value
agent_result = state.agent.agent_result
if not agent_result:
raise ValueError("No agent result found in tool outputs")
output_payload = agent_result.output
if isinstance(output_payload, dict):
state.aggregate.structured_output = LLMStructuredOutput(structured_output=output_payload)
state.aggregate.text = json.dumps(output_payload)
elif isinstance(output_payload, str):
state.aggregate.text = output_payload
else:
raise ValueError(f"Unexpected output type: {type(output_payload)}")
if isinstance(getattr(exception, "value", None), AgentResult):
state.agent.agent_result = exception.value
state.aggregate.files = state.agent.agent_result.files
if state.agent.agent_result.usage:
state.aggregate.usage = state.agent.agent_result.usage
if state.agent.agent_result.finish_reason:
state.aggregate.finish_reason = state.agent.agent_result.finish_reason
if state.agent.agent_result:
state.aggregate.text = state.agent.agent_result.text or state.aggregate.text
state.aggregate.files = state.agent.agent_result.files
if state.agent.agent_result.usage:
state.aggregate.usage = state.agent.agent_result.usage
if state.agent.agent_result.finish_reason:
state.aggregate.finish_reason = state.agent.agent_result.finish_reason
yield from self._flush_remaining_stream(state.stream, state.trace, state.aggregate)
yield from self._close_streams()

View File

@ -156,7 +156,8 @@ class QuestionClassifierNode(Node[QuestionClassifierNodeData]):
prompt_messages=prompt_messages,
stop=stop,
user_id=self.user_id,
structured_output_schema=None,
structured_output_enabled=False,
structured_output=None,
file_saver=self._llm_file_saver,
file_outputs=self._file_outputs,
node_id=self._node_id,