feat: enhance model event handling with new identity and metrics fields

This commit is contained in:
Novice
2026-03-05 14:08:37 +08:00
parent e26d8a63da
commit 1cb5ee918f
11 changed files with 185 additions and 37 deletions

View File

@ -591,6 +591,13 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
tool_elapsed_time=tool_elapsed_time,
tool_icon=tool_icon,
tool_icon_dark=tool_icon_dark,
node_id=event.node_id,
model_provider=event.model_provider,
model_name=event.model_name,
model_icon=event.model_icon,
model_icon_dark=event.model_icon_dark,
model_usage=event.model_usage,
model_duration=event.model_duration,
)
def _handle_iteration_start_event(

View File

@ -551,6 +551,13 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
tool_elapsed_time=tool_elapsed_time,
tool_icon=tool_icon,
tool_icon_dark=tool_icon_dark,
node_id=event.node_id,
model_provider=event.model_provider,
model_name=event.model_name,
model_icon=event.model_icon,
model_icon_dark=event.model_icon_dark,
model_usage=event.model_usage,
model_duration=event.model_duration,
)
def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[StreamResponse, None, None]:
@ -748,12 +755,14 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
tool_elapsed_time: float | None = None,
tool_icon: str | dict | None = None,
tool_icon_dark: str | dict | None = None,
node_id: str | None = None,
model_provider: str | None = None,
model_name: str | None = None,
model_icon: str | dict | None = None,
model_icon_dark: str | dict | None = None,
model_usage: dict | None = None,
model_duration: float | None = None,
) -> TextChunkStreamResponse:
"""
Handle completed event.
:param text: text
:return:
"""
from core.app.entities.task_entities import ChunkType as ResponseChunkType
response_chunk_type = ResponseChunkType(chunk_type.value) if chunk_type else ResponseChunkType.TEXT
@ -762,6 +771,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
text=text,
from_variable_selector=from_variable_selector,
chunk_type=response_chunk_type,
node_id=node_id,
)
if response_chunk_type == ResponseChunkType.TOOL_CALL:
@ -787,6 +797,22 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
"tool_icon_dark": tool_icon_dark,
}
)
elif response_chunk_type == ResponseChunkType.MODEL_START:
data = data.model_copy(
update={
"model_provider": model_provider,
"model_name": model_name,
"model_icon": model_icon,
"model_icon_dark": model_icon_dark,
}
)
elif response_chunk_type == ResponseChunkType.MODEL_END:
data = data.model_copy(
update={
"model_usage": model_usage,
"model_duration": model_duration,
}
)
response = TextChunkStreamResponse(
task_id=self._application_generate_entity.task_id,

View File

@ -494,6 +494,13 @@ class WorkflowBasedAppRunner:
tool_call=event.tool_call,
tool_result=event.tool_result,
in_parent_node_id=event.in_parent_node_id,
node_id=event.node_id,
model_provider=event.model_provider,
model_name=event.model_name,
model_icon=event.model_icon,
model_icon_dark=event.model_icon_dark,
model_usage=event.model_usage,
model_duration=event.model_duration,
)
)
elif isinstance(event, NodeRunRetrieverResourceEvent):

View File

@ -188,9 +188,11 @@ class ChunkType(StrEnum):
TEXT = "text" # Normal text streaming
TOOL_CALL = "tool_call" # Tool call arguments streaming
TOOL_RESULT = "tool_result" # Tool execution result
THOUGHT = "thought" # Agent thinking process (ReAct)
THOUGHT_START = "thought_start" # Agent thought start
THOUGHT_END = "thought_end" # Agent thought end
THOUGHT = "thought" # Model thinking process
THOUGHT_START = "thought_start" # Model thought start
THOUGHT_END = "thought_end" # Model thought end
MODEL_START = "model_start" # Model turn started with identity info
MODEL_END = "model_end" # Model turn completed with metrics
class QueueTextChunkEvent(AppQueueEvent):
@ -208,6 +210,8 @@ class QueueTextChunkEvent(AppQueueEvent):
"""loop id if node is in loop"""
in_parent_node_id: str | None = None
"""parent node id if this is an extractor node event"""
node_id: str | None = None
"""workflow node id that produced this chunk"""
# Extended fields for Agent/Tool streaming
chunk_type: ChunkType = ChunkType.TEXT
@ -219,6 +223,15 @@ class QueueTextChunkEvent(AppQueueEvent):
tool_result: ToolResult | None = None
"""structured tool result info"""
# Model identity (when chunk_type == MODEL_START)
model_provider: str | None = None
model_name: str | None = None
model_icon: str | dict | None = None
model_icon_dark: str | dict | None = None
# Model metrics (when chunk_type == MODEL_END)
model_usage: dict | None = None
model_duration: float | None = None
class QueueAgentMessageEvent(AppQueueEvent):
"""

View File

@ -118,10 +118,12 @@ class MessageStreamResponse(StreamResponse):
id: str
answer: str
from_variable_selector: list[str] | None = None
node_id: str | None = None
"""workflow node id that produced this chunk"""
# Extended fields for Agent/Tool streaming (imported at runtime to avoid circular import)
chunk_type: str | None = None
"""type of the chunk: text, tool_call, tool_result, thought"""
"""type of the chunk: text, tool_call, tool_result, thought, model_start, model_end"""
# Tool call fields (when chunk_type == "tool_call")
tool_call_id: str | None = None
@ -143,6 +145,15 @@ class MessageStreamResponse(StreamResponse):
tool_icon_dark: str | dict | None = None
"""dark theme icon of the tool"""
# Model identity fields (when chunk_type == "model_start")
model_provider: str | None = None
model_name: str | None = None
model_icon: str | dict | None = None
model_icon_dark: str | dict | None = None
# Model metrics fields (when chunk_type == "model_end")
model_usage: dict | None = None
model_duration: float | None = None
def model_dump(self, *args, **kwargs) -> dict[str, object]:
kwargs.setdefault("exclude_none", True)
return super().model_dump(*args, **kwargs)
@ -716,6 +727,8 @@ class ChunkType(StrEnum):
THOUGHT = "thought" # Agent thinking process (ReAct)
THOUGHT_START = "thought_start" # Agent thought start
THOUGHT_END = "thought_end" # Agent thought end
MODEL_START = "model_start" # Model turn started with identity info
MODEL_END = "model_end" # Model turn completed with metrics
class TextChunkStreamResponse(StreamResponse):
@ -730,6 +743,8 @@ class TextChunkStreamResponse(StreamResponse):
text: str
from_variable_selector: list[str] | None = None
node_id: str | None = None
"""workflow node id that produced this chunk"""
# Extended fields for Agent/Tool streaming
chunk_type: ChunkType = ChunkType.TEXT
@ -753,6 +768,15 @@ class TextChunkStreamResponse(StreamResponse):
tool_elapsed_time: float | None = None
"""elapsed time spent executing the tool"""
# Model identity fields (when chunk_type == MODEL_START)
model_provider: str | None = None
model_name: str | None = None
model_icon: str | dict | None = None
model_icon_dark: str | dict | None = None
# Model metrics fields (when chunk_type == MODEL_END)
model_usage: dict | None = None
model_duration: float | None = None
def model_dump(self, *args, **kwargs) -> dict[str, object]:
kwargs.setdefault("exclude_none", True)
return super().model_dump(*args, **kwargs)

View File

@ -255,26 +255,21 @@ class MessageCycleManager:
tool_icon: str | dict | None = None,
tool_icon_dark: str | dict | None = None,
event_type: StreamEvent | None = None,
node_id: str | None = None,
model_provider: str | None = None,
model_name: str | None = None,
model_icon: str | dict | None = None,
model_icon_dark: str | dict | None = None,
model_usage: dict | None = None,
model_duration: float | None = None,
) -> MessageStreamResponse:
"""
Message to stream response.
:param answer: answer
:param message_id: message id
:param from_variable_selector: from variable selector
:param chunk_type: type of the chunk (text, function_call, tool_result, thought)
:param tool_call_id: unique identifier for this tool call
:param tool_name: name of the tool being called
:param tool_arguments: accumulated tool arguments JSON
:param tool_files: file IDs produced by tool
:param tool_error: error message if tool failed
:return:
"""
response = MessageStreamResponse(
task_id=self._application_generate_entity.task_id,
id=message_id,
answer=answer,
from_variable_selector=from_variable_selector,
event=event_type or StreamEvent.MESSAGE,
node_id=node_id,
)
if chunk_type:
@ -303,6 +298,22 @@ class MessageCycleManager:
"tool_icon_dark": tool_icon_dark,
}
)
elif chunk_type == "model_start":
response = response.model_copy(
update={
"model_provider": model_provider,
"model_name": model_name,
"model_icon": model_icon,
"model_icon_dark": model_icon_dark,
}
)
elif chunk_type == "model_end":
response = response.model_copy(
update={
"model_usage": model_usage,
"model_duration": model_duration,
}
)
return response

View File

@ -28,6 +28,7 @@ class ToolResult(BaseModel):
elapsed_time: float | None = Field(default=None, description="Elapsed seconds spent executing the tool")
icon: str | dict[str, Any] | None = Field(default=None, description="Icon of the tool")
icon_dark: str | dict[str, Any] | None = Field(default=None, description="Dark theme icon of the tool")
provider: str | None = Field(default=None, description="Tool provider identifier")
class ToolCallResult(BaseModel):

View File

@ -31,6 +31,8 @@ class ChunkType(StrEnum):
THOUGHT = "thought" # Agent thinking process (ReAct)
THOUGHT_START = "thought_start" # Agent thought start
THOUGHT_END = "thought_end" # Agent thought end
MODEL_START = "model_start" # Model turn started with identity info
MODEL_END = "model_end" # Model turn completed with metrics
class NodeRunStreamChunkEvent(GraphNodeEventBase):
@ -56,6 +58,15 @@ class NodeRunStreamChunkEvent(GraphNodeEventBase):
description="structured payload for tool_result chunks",
)
# Model identity fields (when chunk_type == MODEL_START)
model_provider: str | None = Field(default=None, description="model provider identifier")
model_name: str | None = Field(default=None, description="model name")
model_icon: str | dict | None = Field(default=None, description="model provider icon")
model_icon_dark: str | dict | None = Field(default=None, description="model provider dark icon")
# Model metrics fields (when chunk_type == MODEL_END)
model_usage: dict | None = Field(default=None, description="per-turn token usage as dict")
model_duration: float | None = Field(default=None, description="per-turn duration in seconds")
class NodeRunRetrieverResourceEvent(GraphNodeEventBase):
retriever_resources: Sequence[RetrievalSourceMetadata] = Field(..., description="retriever resources")

View File

@ -43,6 +43,8 @@ class ChunkType(StrEnum):
THOUGHT = "thought" # Agent thinking process (ReAct)
THOUGHT_START = "thought_start" # Agent thought start
THOUGHT_END = "thought_end" # Agent thought end
MODEL_START = "model_start" # Model turn started with identity info
MODEL_END = "model_end" # Model turn completed with metrics
class StreamChunkEvent(NodeEventBase):
@ -56,6 +58,14 @@ class StreamChunkEvent(NodeEventBase):
chunk_type: ChunkType = Field(default=ChunkType.TEXT, description="type of the chunk")
tool_call: ToolCall | None = Field(default=None, description="structured payload for tool_call chunks")
tool_result: ToolResult | None = Field(default=None, description="structured payload for tool_result chunks")
# Model identity fields (when chunk_type == MODEL_START)
model_provider: str | None = Field(default=None, description="model provider identifier")
model_name: str | None = Field(default=None, description="model name")
model_icon: str | dict | None = Field(default=None, description="model provider icon")
model_icon_dark: str | dict | None = Field(default=None, description="model provider dark icon")
# Model metrics fields (when chunk_type == MODEL_END)
model_usage: LLMUsage | None = Field(default=None, description="per-turn token usage")
model_duration: float | None = Field(default=None, description="per-turn duration in seconds")
class ToolCallChunkEvent(StreamChunkEvent):

View File

@ -274,6 +274,7 @@ class TraceState(BaseModel):
tool_trace_map: dict[str, LLMTraceSegment] = Field(default_factory=dict)
tool_call_index_map: dict[str, int] = Field(default_factory=dict)
model_segment_start_time: float | None = Field(default=None, description="Start time for current model segment")
model_start_emitted: bool = Field(default=False, description="Whether model_start has been emitted for this turn")
pending_usage: LLMUsage | None = Field(default=None, description="Pending usage for current model segment")

View File

@ -105,7 +105,7 @@ from core.workflow.node_events import (
ToolCallChunkEvent,
ToolResultChunkEvent,
)
from core.workflow.node_events.node import ThoughtEndChunkEvent, ThoughtStartChunkEvent
from core.workflow.node_events.node import ChunkType, ThoughtEndChunkEvent, ThoughtStartChunkEvent
from core.workflow.nodes.base.entities import VariableSelector
from core.workflow.nodes.base.node import Node
from core.workflow.nodes.base.variable_template_parser import VariableTemplateParser
@ -2277,23 +2277,42 @@ class LLMNode(Node[LLMNodeData]):
except Exception:
return None
def _emit_model_start(self, trace_state: TraceState) -> Generator[NodeEventBase, None, None]:
"""Yield a MODEL_START event with model identity info at the beginning of a model turn.
Idempotent: only emits once per turn (guarded by trace_state.model_start_emitted)."""
if trace_state.model_start_emitted:
return
trace_state.model_start_emitted = True
if trace_state.model_segment_start_time is None:
trace_state.model_segment_start_time = time.perf_counter()
provider = self._node_data.model.provider
yield StreamChunkEvent(
selector=[self._node_id, "generation", "model_start"],
chunk="",
chunk_type=ChunkType.MODEL_START,
is_final=False,
model_provider=provider,
model_name=self._node_data.model.name,
model_icon=self._generate_model_provider_icon_url(provider),
model_icon_dark=self._generate_model_provider_icon_url(provider, dark=True),
)
def _flush_model_segment(
self,
buffers: StreamBuffers,
trace_state: TraceState,
error: str | None = None,
) -> None:
"""Flush pending thought/content buffers into a single model trace segment."""
) -> Generator[NodeEventBase, None, None]:
"""Flush pending thought/content buffers into a single model trace segment
and yield a MODEL_END chunk event with usage/duration metrics."""
if not buffers.pending_thought and not buffers.pending_content and not buffers.pending_tool_calls:
return
now = time.perf_counter()
duration = now - trace_state.model_segment_start_time if trace_state.model_segment_start_time else 0.0
# Use pending_usage from trace_state (captured from THOUGHT log)
usage = trace_state.pending_usage
# Generate model provider icon URL
provider = self._node_data.model.provider
model_name = self._node_data.model.name
model_icon = self._generate_model_provider_icon_url(provider)
@ -2317,10 +2336,21 @@ class LLMNode(Node[LLMNodeData]):
status="error" if error else "success",
)
)
yield StreamChunkEvent(
selector=[self._node_id, "generation", "model_end"],
chunk="",
chunk_type=ChunkType.MODEL_END,
is_final=False,
model_usage=usage,
model_duration=duration,
)
buffers.pending_thought.clear()
buffers.pending_content.clear()
buffers.pending_tool_calls.clear()
trace_state.model_segment_start_time = None
trace_state.model_start_emitted = False
trace_state.pending_usage = None
def _handle_agent_log_output(
@ -2356,18 +2386,18 @@ class LLMNode(Node[LLMNodeData]):
trace_state.pending_usage = llm_usage
if output.log_type == AgentLog.LogType.TOOL_CALL and output.status == AgentLog.LogStatus.START:
yield from self._emit_model_start(trace_state)
tool_name = payload.tool_name
tool_call_id = payload.tool_call_id
tool_arguments = json.dumps(payload.tool_args or {})
# Get icon from metadata (available at START)
tool_icon = output.metadata.get(AgentLog.LogMetadata.ICON) if output.metadata else None
tool_icon_dark = output.metadata.get(AgentLog.LogMetadata.ICON_DARK) if output.metadata else None
if tool_call_id and tool_call_id not in trace_state.tool_call_index_map:
trace_state.tool_call_index_map[tool_call_id] = len(trace_state.tool_call_index_map)
# Add tool call to pending list for model segment
buffers.pending_tool_calls.append(ToolCall(id=tool_call_id, name=tool_name, arguments=tool_arguments))
yield ToolCallChunkEvent(
@ -2395,7 +2425,7 @@ class LLMNode(Node[LLMNodeData]):
trace_state.tool_call_index_map[tool_call_id] = len(trace_state.tool_call_index_map)
# Flush model segment before tool result processing
self._flush_model_segment(buffers, trace_state)
yield from self._flush_model_segment(buffers, trace_state)
if output.status == AgentLog.LogStatus.ERROR:
tool_error = output.error or payload.tool_error
@ -2450,6 +2480,7 @@ class LLMNode(Node[LLMNodeData]):
elapsed_time=elapsed_time,
icon=tool_icon,
icon_dark=tool_icon_dark,
provider=tool_provider,
),
is_final=False,
)
@ -2474,9 +2505,7 @@ class LLMNode(Node[LLMNodeData]):
if not segment and kind not in {"thought_start", "thought_end"}:
continue
# Start tracking model segment time on first output
if trace_state.model_segment_start_time is None:
trace_state.model_segment_start_time = time.perf_counter()
yield from self._emit_model_start(trace_state)
if kind == "thought_start":
yield ThoughtStartChunkEvent(
@ -2525,9 +2554,7 @@ class LLMNode(Node[LLMNodeData]):
if not segment and kind not in {"thought_start", "thought_end"}:
continue
# Start tracking model segment time on first output
if trace_state.model_segment_start_time is None:
trace_state.model_segment_start_time = time.perf_counter()
yield from self._emit_model_start(trace_state)
if kind == "thought_start":
yield ThoughtStartChunkEvent(
@ -2572,7 +2599,7 @@ class LLMNode(Node[LLMNodeData]):
trace_state.pending_usage = aggregate.usage
# Flush final model segment
self._flush_model_segment(buffers, trace_state)
yield from self._flush_model_segment(buffers, trace_state)
def _close_streams(self) -> Generator[NodeEventBase, None, None]:
yield StreamChunkEvent(
@ -2612,6 +2639,16 @@ class LLMNode(Node[LLMNodeData]):
),
is_final=True,
)
yield StreamChunkEvent(
selector=[self._node_id, "generation", "model_start"],
chunk="",
is_final=True,
)
yield StreamChunkEvent(
selector=[self._node_id, "generation", "model_end"],
chunk="",
is_final=True,
)
def _build_generation_data(
self,