fix stream bugs

This commit is contained in:
takatost
2024-03-08 16:44:42 +08:00
parent 90bcb241cc
commit 2ffb63ff0c
7 changed files with 13 additions and 8 deletions

View File

@ -54,7 +54,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
inputs = args['inputs'] inputs = args['inputs']
extras = { extras = {
"auto_generate_conversation_name": args['auto_generate_name'] if 'auto_generate_name' in args else True "auto_generate_conversation_name": args['auto_generate_name'] if 'auto_generate_name' in args else False
} }
# get conversation # get conversation

View File

@ -346,7 +346,7 @@ class AdvancedChatAppGenerateTaskPipeline:
yield self._yield_response(response) yield self._yield_response(response)
elif isinstance(event, QueueTextChunkEvent): elif isinstance(event, QueueTextChunkEvent):
delta_text = event.chunk_text delta_text = event.text
if delta_text is None: if delta_text is None:
continue continue

View File

@ -76,7 +76,7 @@ class WorkflowEventTriggerCallback(BaseWorkflowCallback):
streamable_node_ids = [] streamable_node_ids = []
end_node_ids = [] end_node_ids = []
for node_config in graph.get('nodes'): for node_config in graph.get('nodes'):
if node_config.get('type') == NodeType.END.value: if node_config.get('data', {}).get('type') == NodeType.END.value:
end_node_ids.append(node_config.get('id')) end_node_ids.append(node_config.get('id'))
for edge_config in graph.get('edges'): for edge_config in graph.get('edges'):

View File

@ -15,6 +15,7 @@ from core.app.entities.queue_entities import (
QueueMessageEndEvent, QueueMessageEndEvent,
QueuePingEvent, QueuePingEvent,
QueueStopEvent, QueueStopEvent,
QueueWorkflowFinishedEvent,
) )
from extensions.ext_redis import redis_client from extensions.ext_redis import redis_client
@ -36,7 +37,8 @@ class AppQueueManager:
self._invoke_from = invoke_from self._invoke_from = invoke_from
user_prefix = 'account' if self._invoke_from in [InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER] else 'end-user' user_prefix = 'account' if self._invoke_from in [InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER] else 'end-user'
redis_client.setex(AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}") redis_client.setex(AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800,
f"{user_prefix}-{self._user_id}")
q = queue.Queue() q = queue.Queue()
@ -106,7 +108,10 @@ class AppQueueManager:
self._q.put(message) self._q.put(message)
if isinstance(event, QueueStopEvent | QueueErrorEvent | QueueMessageEndEvent): if isinstance(event, QueueStopEvent
| QueueErrorEvent
| QueueMessageEndEvent
| QueueWorkflowFinishedEvent):
self.stop_listen() self.stop_listen()
if pub_from == PublishFrom.APPLICATION_MANAGER and self._is_stopped(): if pub_from == PublishFrom.APPLICATION_MANAGER and self._is_stopped():

View File

@ -248,7 +248,7 @@ class WorkflowAppGenerateTaskPipeline:
yield self._yield_response(workflow_run_response) yield self._yield_response(workflow_run_response)
elif isinstance(event, QueueTextChunkEvent): elif isinstance(event, QueueTextChunkEvent):
delta_text = event.chunk_text delta_text = event.text
if delta_text is None: if delta_text is None:
continue continue

View File

@ -76,7 +76,7 @@ class WorkflowEventTriggerCallback(BaseWorkflowCallback):
streamable_node_ids = [] streamable_node_ids = []
end_node_ids = [] end_node_ids = []
for node_config in graph.get('nodes'): for node_config in graph.get('nodes'):
if node_config.get('type') == NodeType.END.value: if node_config.get('data', {}).get('type') == NodeType.END.value:
if node_config.get('data', {}).get('outputs', {}).get('type', '') == 'plain-text': if node_config.get('data', {}).get('outputs', {}).get('type', '') == 'plain-text':
end_node_ids.append(node_config.get('id')) end_node_ids.append(node_config.get('id'))

View File

@ -48,7 +48,7 @@ class QueueTextChunkEvent(AppQueueEvent):
QueueTextChunkEvent entity QueueTextChunkEvent entity
""" """
event = QueueEvent.TEXT_CHUNK event = QueueEvent.TEXT_CHUNK
chunk_text: str text: str
class QueueAgentMessageEvent(AppQueueEvent): class QueueAgentMessageEvent(AppQueueEvent):