chore: add trace metadata and streaming icon

This commit is contained in:
Novice
2026-01-06 16:30:33 +08:00
parent dc8a618b6a
commit cef7fd484b
11 changed files with 233 additions and 81 deletions

View File

@ -10,7 +10,7 @@ from core.model_runtime.entities import ImagePromptMessageContent, LLMMode
from core.model_runtime.entities.llm_entities import LLMUsage
from core.prompt.entities.advanced_prompt_entities import ChatModelMessage, CompletionModelPromptTemplate, MemoryConfig
from core.tools.entities.tool_entities import ToolProviderType
from core.workflow.entities import ToolCallResult
from core.workflow.entities import ToolCall, ToolCallResult
from core.workflow.node_events import AgentLogEvent
from core.workflow.nodes.base import BaseNodeData
from core.workflow.nodes.base.entities import VariableSelector
@ -89,24 +89,44 @@ class ToolMetadata(BaseModel):
extra: dict[str, Any] = Field(default_factory=dict, description="Extra tool configuration like custom description")
class ModelTraceSegment(BaseModel):
"""Model invocation trace segment with token usage and output."""
text: str | None = Field(None, description="Model output text content")
reasoning: str | None = Field(None, description="Reasoning/thought content from model")
tool_calls: list[ToolCall] = Field(default_factory=list, description="Tool calls made by the model")
class ToolTraceSegment(BaseModel):
"""Tool invocation trace segment with call details and result."""
id: str | None = Field(default=None, description="Unique identifier for this tool call")
name: str | None = Field(default=None, description="Name of the tool being called")
arguments: str | None = Field(default=None, description="Accumulated tool arguments JSON")
output: str | None = Field(default=None, description="Tool call result")
class LLMTraceSegment(BaseModel):
"""
Streaming trace segment for LLM tool-enabled runs.
Order is preserved for replay. Tool calls are single entries containing both
arguments and results.
Represents alternating model and tool invocations in sequence:
model -> tool -> model -> tool -> ...
Each segment records its execution duration.
"""
type: Literal["thought", "content", "tool_call"]
type: Literal["model", "tool"]
duration: float = Field(..., description="Execution duration in seconds")
usage: LLMUsage | None = Field(default=None, description="Token usage statistics for this model call")
output: ModelTraceSegment | ToolTraceSegment = Field(..., description="Output of the segment")
# Common optional fields
text: str | None = Field(None, description="Text chunk for thought/content")
# Tool call fields (combined start + result)
tool_call: ToolCallResult | None = Field(
default=None,
description="Combined tool call arguments and result for this segment",
)
# Common metadata for both model and tool segments
provider: str | None = Field(default=None, description="Model or tool provider identifier")
icon: str | None = Field(default=None, description="Icon for the provider")
icon_dark: str | None = Field(default=None, description="Dark theme icon for the provider")
error: str | None = Field(default=None, description="Error message if segment failed")
status: Literal["success", "error"] | None = Field(default=None, description="Tool execution status")
class LLMGenerationData(BaseModel):
@ -233,6 +253,7 @@ class StreamBuffers(BaseModel):
think_parser: ThinkTagStreamParser = Field(default_factory=ThinkTagStreamParser)
pending_thought: list[str] = Field(default_factory=list)
pending_content: list[str] = Field(default_factory=list)
pending_tool_calls: list[ToolCall] = Field(default_factory=list)
current_turn_reasoning: list[str] = Field(default_factory=list)
reasoning_per_turn: list[str] = Field(default_factory=list)
@ -241,6 +262,8 @@ class TraceState(BaseModel):
trace_segments: list[LLMTraceSegment] = Field(default_factory=list)
tool_trace_map: dict[str, LLMTraceSegment] = Field(default_factory=dict)
tool_call_index_map: dict[str, int] = Field(default_factory=dict)
model_segment_start_time: float | None = Field(default=None, description="Start time for current model segment")
pending_usage: LLMUsage | None = Field(default=None, description="Pending usage for current model segment")
class AggregatedResult(BaseModel):

View File

@ -99,10 +99,12 @@ from .entities import (
LLMNodeData,
LLMTraceSegment,
ModelConfig,
ModelTraceSegment,
StreamBuffers,
ThinkTagStreamParser,
ToolLogPayload,
ToolOutputState,
ToolTraceSegment,
TraceState,
)
from .exc import (
@ -1678,17 +1680,71 @@ class LLMNode(Node[LLMNodeData]):
"elapsed_time": tool_call.elapsed_time,
}
def _flush_thought_segment(self, buffers: StreamBuffers, trace_state: TraceState) -> None:
if not buffers.pending_thought:
return
trace_state.trace_segments.append(LLMTraceSegment(type="thought", text="".join(buffers.pending_thought)))
buffers.pending_thought.clear()
def _generate_model_provider_icon_url(self, provider: str, dark: bool = False) -> str | None:
"""Generate icon URL for model provider."""
from yarl import URL
def _flush_content_segment(self, buffers: StreamBuffers, trace_state: TraceState) -> None:
if not buffers.pending_content:
from configs import dify_config
icon_type = "icon_small_dark" if dark else "icon_small"
try:
return str(
URL(dify_config.CONSOLE_API_URL or "/")
/ "console"
/ "api"
/ "workspaces"
/ "current"
/ "model-providers"
/ provider
/ icon_type
/ "en_US"
)
except Exception:
return None
def _flush_model_segment(
self,
buffers: StreamBuffers,
trace_state: TraceState,
error: str | None = None,
) -> None:
"""Flush pending thought/content buffers into a single model trace segment."""
if not buffers.pending_thought and not buffers.pending_content and not buffers.pending_tool_calls:
return
trace_state.trace_segments.append(LLMTraceSegment(type="content", text="".join(buffers.pending_content)))
now = time.perf_counter()
duration = now - trace_state.model_segment_start_time if trace_state.model_segment_start_time else 0.0
# Use pending_usage from trace_state (captured from THOUGHT log)
usage = trace_state.pending_usage
# Generate model provider icon URL
provider = self._node_data.model.provider
model_icon = self._generate_model_provider_icon_url(provider)
model_icon_dark = self._generate_model_provider_icon_url(provider, dark=True)
trace_state.trace_segments.append(
LLMTraceSegment(
type="model",
duration=duration,
usage=usage,
output=ModelTraceSegment(
text="".join(buffers.pending_content) if buffers.pending_content else None,
reasoning="".join(buffers.pending_thought) if buffers.pending_thought else None,
tool_calls=list(buffers.pending_tool_calls),
),
provider=provider,
icon=model_icon,
icon_dark=model_icon_dark,
error=error,
status="error" if error else "success",
)
)
buffers.pending_thought.clear()
buffers.pending_content.clear()
buffers.pending_tool_calls.clear()
trace_state.model_segment_start_time = None
trace_state.pending_usage = None
def _handle_agent_log_output(
self, output: AgentLog, buffers: StreamBuffers, trace_state: TraceState, agent_context: AgentContext
@ -1716,30 +1772,26 @@ class LLMNode(Node[LLMNodeData]):
else:
agent_context.agent_logs.append(agent_log_event)
# Handle THOUGHT log completion - capture usage for model segment
if output.log_type == AgentLog.LogType.THOUGHT and output.status == AgentLog.LogStatus.SUCCESS:
llm_usage = output.metadata.get(AgentLog.LogMetadata.LLM_USAGE) if output.metadata else None
if llm_usage:
trace_state.pending_usage = llm_usage
if output.log_type == AgentLog.LogType.TOOL_CALL and output.status == AgentLog.LogStatus.START:
tool_name = payload.tool_name
tool_call_id = payload.tool_call_id
tool_arguments = json.dumps(payload.tool_args) if payload.tool_args else ""
# Get icon from metadata (available at START)
tool_icon = output.metadata.get(AgentLog.LogMetadata.ICON) if output.metadata else None
tool_icon_dark = output.metadata.get(AgentLog.LogMetadata.ICON_DARK) if output.metadata else None
if tool_call_id and tool_call_id not in trace_state.tool_call_index_map:
trace_state.tool_call_index_map[tool_call_id] = len(trace_state.tool_call_index_map)
self._flush_thought_segment(buffers, trace_state)
self._flush_content_segment(buffers, trace_state)
tool_call_segment = LLMTraceSegment(
type="tool_call",
text=None,
tool_call=ToolCallResult(
id=tool_call_id,
name=tool_name,
arguments=tool_arguments,
elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None,
),
)
trace_state.trace_segments.append(tool_call_segment)
if tool_call_id:
trace_state.tool_trace_map[tool_call_id] = tool_call_segment
# Add tool call to pending list for model segment
buffers.pending_tool_calls.append(ToolCall(id=tool_call_id, name=tool_name, arguments=tool_arguments))
yield ToolCallChunkEvent(
selector=[self._node_id, "generation", "tool_calls"],
@ -1748,6 +1800,8 @@ class LLMNode(Node[LLMNodeData]):
id=tool_call_id,
name=tool_name,
arguments=tool_arguments,
icon=tool_icon,
icon_dark=tool_icon_dark,
),
is_final=False,
)
@ -1758,12 +1812,13 @@ class LLMNode(Node[LLMNodeData]):
tool_call_id = payload.tool_call_id
tool_files = payload.files if isinstance(payload.files, list) else []
tool_error = payload.tool_error
tool_arguments = json.dumps(payload.tool_args) if payload.tool_args else ""
if tool_call_id and tool_call_id not in trace_state.tool_call_index_map:
trace_state.tool_call_index_map[tool_call_id] = len(trace_state.tool_call_index_map)
self._flush_thought_segment(buffers, trace_state)
self._flush_content_segment(buffers, trace_state)
# Flush model segment before tool result processing
self._flush_model_segment(buffers, trace_state)
if output.status == AgentLog.LogStatus.ERROR:
tool_error = output.error or payload.tool_error
@ -1775,47 +1830,48 @@ class LLMNode(Node[LLMNodeData]):
if meta_error:
tool_error = meta_error
existing_tool_segment = trace_state.tool_trace_map.get(tool_call_id)
tool_call_segment = existing_tool_segment or LLMTraceSegment(
type="tool_call",
text=None,
tool_call=ToolCallResult(
elapsed_time = output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None
tool_provider = output.metadata.get(AgentLog.LogMetadata.PROVIDER) if output.metadata else None
tool_icon = output.metadata.get(AgentLog.LogMetadata.ICON) if output.metadata else None
tool_icon_dark = output.metadata.get(AgentLog.LogMetadata.ICON_DARK) if output.metadata else None
result_str = str(tool_output) if tool_output is not None else None
tool_status: Literal["success", "error"] = "error" if tool_error else "success"
tool_call_segment = LLMTraceSegment(
type="tool",
duration=elapsed_time or 0.0,
usage=None,
output=ToolTraceSegment(
id=tool_call_id,
name=tool_name,
arguments=None,
elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None,
arguments=tool_arguments,
output=result_str,
),
provider=tool_provider,
icon=tool_icon,
icon_dark=tool_icon_dark,
error=str(tool_error) if tool_error else None,
status=tool_status,
)
if existing_tool_segment is None:
trace_state.trace_segments.append(tool_call_segment)
if tool_call_id:
trace_state.tool_trace_map[tool_call_id] = tool_call_segment
trace_state.trace_segments.append(tool_call_segment)
if tool_call_id:
trace_state.tool_trace_map[tool_call_id] = tool_call_segment
if tool_call_segment.tool_call is None:
tool_call_segment.tool_call = ToolCallResult(
id=tool_call_id,
name=tool_name,
arguments=None,
elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None,
)
tool_call_segment.tool_call.output = (
str(tool_output) if tool_output is not None else str(tool_error) if tool_error is not None else None
)
tool_call_segment.tool_call.files = []
tool_call_segment.tool_call.status = ToolResultStatus.ERROR if tool_error else ToolResultStatus.SUCCESS
result_output = str(tool_output) if tool_output is not None else str(tool_error) if tool_error else None
# Start new model segment tracking
trace_state.model_segment_start_time = time.perf_counter()
yield ToolResultChunkEvent(
selector=[self._node_id, "generation", "tool_results"],
chunk=result_output or "",
chunk=result_str or "",
tool_result=ToolResult(
id=tool_call_id,
name=tool_name,
output=result_output,
output=result_str,
files=tool_files,
status=ToolResultStatus.ERROR if tool_error else ToolResultStatus.SUCCESS,
elapsed_time=output.metadata.get(AgentLog.LogMetadata.ELAPSED_TIME) if output.metadata else None,
elapsed_time=elapsed_time,
icon=tool_icon,
icon_dark=tool_icon_dark,
),
is_final=False,
)
@ -1840,15 +1896,17 @@ class LLMNode(Node[LLMNodeData]):
if not segment and kind not in {"thought_start", "thought_end"}:
continue
# Start tracking model segment time on first output
if trace_state.model_segment_start_time is None:
trace_state.model_segment_start_time = time.perf_counter()
if kind == "thought_start":
self._flush_content_segment(buffers, trace_state)
yield ThoughtStartChunkEvent(
selector=[self._node_id, "generation", "thought"],
chunk="",
is_final=False,
)
elif kind == "thought":
self._flush_content_segment(buffers, trace_state)
buffers.current_turn_reasoning.append(segment)
buffers.pending_thought.append(segment)
yield ThoughtChunkEvent(
@ -1857,14 +1915,12 @@ class LLMNode(Node[LLMNodeData]):
is_final=False,
)
elif kind == "thought_end":
self._flush_thought_segment(buffers, trace_state)
yield ThoughtEndChunkEvent(
selector=[self._node_id, "generation", "thought"],
chunk="",
is_final=False,
)
else:
self._flush_thought_segment(buffers, trace_state)
aggregate.text += segment
buffers.pending_content.append(segment)
yield StreamChunkEvent(
@ -1890,15 +1946,18 @@ class LLMNode(Node[LLMNodeData]):
for kind, segment in buffers.think_parser.flush():
if not segment and kind not in {"thought_start", "thought_end"}:
continue
# Start tracking model segment time on first output
if trace_state.model_segment_start_time is None:
trace_state.model_segment_start_time = time.perf_counter()
if kind == "thought_start":
self._flush_content_segment(buffers, trace_state)
yield ThoughtStartChunkEvent(
selector=[self._node_id, "generation", "thought"],
chunk="",
is_final=False,
)
elif kind == "thought":
self._flush_content_segment(buffers, trace_state)
buffers.current_turn_reasoning.append(segment)
buffers.pending_thought.append(segment)
yield ThoughtChunkEvent(
@ -1907,14 +1966,12 @@ class LLMNode(Node[LLMNodeData]):
is_final=False,
)
elif kind == "thought_end":
self._flush_thought_segment(buffers, trace_state)
yield ThoughtEndChunkEvent(
selector=[self._node_id, "generation", "thought"],
chunk="",
is_final=False,
)
else:
self._flush_thought_segment(buffers, trace_state)
aggregate.text += segment
buffers.pending_content.append(segment)
yield StreamChunkEvent(
@ -1931,8 +1988,13 @@ class LLMNode(Node[LLMNodeData]):
if buffers.current_turn_reasoning:
buffers.reasoning_per_turn.append("".join(buffers.current_turn_reasoning))
self._flush_thought_segment(buffers, trace_state)
self._flush_content_segment(buffers, trace_state)
# For final flush, use aggregate.usage if pending_usage is not set
# (e.g., for simple LLM calls without tool invocations)
if trace_state.pending_usage is None:
trace_state.pending_usage = aggregate.usage
# Flush final model segment
self._flush_model_segment(buffers, trace_state)
def _close_streams(self) -> Generator[NodeEventBase, None, None]:
yield StreamChunkEvent(