feat: add stream response

This commit is contained in:
Novice
2026-01-13 14:12:57 +08:00
parent 47790b49d4
commit 969c96b070
25 changed files with 371 additions and 134 deletions

View File

@ -12,11 +12,14 @@ from sqlalchemy.orm import Session
from core.agent.entities import AgentToolEntity
from core.agent.plugin_entities import AgentStrategyParameter
from core.file import File, FileTransferMethod
from core.memory.base import BaseMemory
from core.memory.node_token_buffer_memory import NodeTokenBufferMemory
from core.memory.token_buffer_memory import TokenBufferMemory
from core.model_manager import ModelInstance, ModelManager
from core.model_runtime.entities.llm_entities import LLMUsage, LLMUsageMetadata
from core.model_runtime.entities.model_entities import AIModelEntity, ModelType
from core.model_runtime.utils.encoders import jsonable_encoder
from core.prompt.entities.advanced_prompt_entities import MemoryMode
from core.provider_manager import ProviderManager
from core.tools.entities.tool_entities import (
ToolIdentity,
@ -136,6 +139,9 @@ class AgentNode(Node[AgentNodeData]):
)
return
# Fetch memory for node memory saving
memory = self._fetch_memory_for_save()
try:
yield from self._transform_message(
messages=message_stream,
@ -149,6 +155,7 @@ class AgentNode(Node[AgentNodeData]):
node_type=self.node_type,
node_id=self._node_id,
node_execution_id=self.id,
memory=memory,
)
except PluginDaemonClientSideError as e:
transform_error = AgentMessageTransformError(
@ -395,8 +402,20 @@ class AgentNode(Node[AgentNodeData]):
icon = None
return icon
def _fetch_memory(self, model_instance: ModelInstance) -> TokenBufferMemory | None:
# get conversation id
def _fetch_memory(self, model_instance: ModelInstance) -> BaseMemory | None:
"""
Fetch memory based on configuration mode.
Returns TokenBufferMemory for conversation mode (default),
or NodeTokenBufferMemory for node mode (Chatflow only).
"""
node_data = self.node_data
memory_config = node_data.memory
if not memory_config:
return None
# get conversation id (required for both modes in Chatflow)
conversation_id_variable = self.graph_runtime_state.variable_pool.get(
["sys", SystemVariableKey.CONVERSATION_ID]
)
@ -404,16 +423,26 @@ class AgentNode(Node[AgentNodeData]):
return None
conversation_id = conversation_id_variable.value
with Session(db.engine, expire_on_commit=False) as session:
stmt = select(Conversation).where(Conversation.app_id == self.app_id, Conversation.id == conversation_id)
conversation = session.scalar(stmt)
if not conversation:
return None
memory = TokenBufferMemory(conversation=conversation, model_instance=model_instance)
return memory
# Return appropriate memory type based on mode
if memory_config.mode == MemoryMode.NODE:
# Node-level memory (Chatflow only)
return NodeTokenBufferMemory(
app_id=self.app_id,
conversation_id=conversation_id,
node_id=self._node_id,
tenant_id=self.tenant_id,
model_instance=model_instance,
)
else:
# Conversation-level memory (default)
with Session(db.engine, expire_on_commit=False) as session:
stmt = select(Conversation).where(
Conversation.app_id == self.app_id, Conversation.id == conversation_id
)
conversation = session.scalar(stmt)
if not conversation:
return None
return TokenBufferMemory(conversation=conversation, model_instance=model_instance)
def _fetch_model(self, value: dict[str, Any]) -> tuple[ModelInstance, AIModelEntity | None]:
provider_manager = ProviderManager()
@ -457,6 +486,47 @@ class AgentNode(Node[AgentNodeData]):
else:
return [tool for tool in tools if tool.get("type") != ToolProviderType.MCP]
def _fetch_memory_for_save(self) -> BaseMemory | None:
"""
Fetch memory instance for saving node memory.
This is a simplified version that doesn't require model_instance.
"""
from core.model_manager import ModelManager
from core.model_runtime.entities.model_entities import ModelType
node_data = self.node_data
if not node_data.memory:
return None
# Get conversation_id
conversation_id_var = self.graph_runtime_state.variable_pool.get(["sys", SystemVariableKey.CONVERSATION_ID])
if not isinstance(conversation_id_var, StringSegment):
return None
conversation_id = conversation_id_var.value
# Return appropriate memory type based on mode
if node_data.memory.mode == MemoryMode.NODE:
# For node memory, we need a model_instance for token counting
# Use a simple default model for this purpose
try:
model_instance = ModelManager().get_default_model_instance(
tenant_id=self.tenant_id,
model_type=ModelType.LLM,
)
except Exception:
return None
return NodeTokenBufferMemory(
app_id=self.app_id,
conversation_id=conversation_id,
node_id=self._node_id,
tenant_id=self.tenant_id,
model_instance=model_instance,
)
else:
# Conversation-level memory doesn't need saving here
return None
def _transform_message(
self,
messages: Generator[ToolInvokeMessage, None, None],
@ -467,6 +537,7 @@ class AgentNode(Node[AgentNodeData]):
node_type: NodeType,
node_id: str,
node_execution_id: str,
memory: BaseMemory | None = None,
) -> Generator[NodeEventBase, None, None]:
"""
Convert ToolInvokeMessages into tuple[plain_text, files]
@ -711,6 +782,21 @@ class AgentNode(Node[AgentNodeData]):
is_final=True,
)
# Save to node memory if in node memory mode
from core.workflow.nodes.llm import llm_utils
# Get user query from sys.query
user_query_var = self.graph_runtime_state.variable_pool.get(["sys", SystemVariableKey.QUERY])
user_query = user_query_var.text if user_query_var else ""
llm_utils.save_node_memory(
memory=memory,
variable_pool=self.graph_runtime_state.variable_pool,
user_query=user_query,
assistant_response=text,
assistant_files=files,
)
yield StreamCompletedEvent(
node_run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,