refactor app generate pipeline

This commit is contained in:
takatost
2024-03-15 21:42:22 +08:00
parent 9b57b4c6c8
commit 62846be275
36 changed files with 2666 additions and 2102 deletions

View File

@ -12,6 +12,7 @@ from core.app.app_config.features.file_upload.manager import FileUploadConfigMan
from core.app.apps.base_app_queue_manager import AppQueueManager, GenerateTaskStoppedException, PublishFrom
from core.app.apps.completion.app_config_manager import CompletionAppConfigManager
from core.app.apps.completion.app_runner import CompletionAppRunner
from core.app.apps.completion.generate_response_converter import CompletionAppGenerateResponseConverter
from core.app.apps.message_based_app_generator import MessageBasedAppGenerator
from core.app.apps.message_based_app_queue_manager import MessageBasedAppQueueManager
from core.app.entities.app_invoke_entities import CompletionAppGenerateEntity, InvokeFrom
@ -32,7 +33,7 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
args: Any,
invoke_from: InvokeFrom,
stream: bool = True) \
-> Union[dict, Generator]:
-> Union[dict, Generator[dict, None, None]]:
"""
Generate App response.
@ -133,14 +134,20 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
worker_thread.start()
# return response or stream generator
return self._handle_response(
response = self._handle_response(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation=conversation,
message=message,
user=user,
stream=stream
)
return CompletionAppGenerateResponseConverter.convert(
response=response,
invoke_from=invoke_from
)
def _generate_worker(self, flask_app: Flask,
application_generate_entity: CompletionAppGenerateEntity,
queue_manager: AppQueueManager,
@ -189,7 +196,7 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
user: Union[Account, EndUser],
invoke_from: InvokeFrom,
stream: bool = True) \
-> Union[dict, Generator]:
-> Union[dict, Generator[dict, None, None]]:
"""
Generate App response.
@ -289,5 +296,6 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
queue_manager=queue_manager,
conversation=conversation,
message=message,
user=user,
stream=stream
)

View File

@ -0,0 +1,104 @@
import json
from collections.abc import Generator
from typing import cast
from core.app.apps.base_app_generate_response_converter import AppGenerateResponseConverter
from core.app.entities.task_entities import (
CompletionAppBlockingResponse,
CompletionAppStreamResponse,
MessageEndStreamResponse,
PingStreamResponse,
)
class CompletionAppGenerateResponseConverter(AppGenerateResponseConverter):
_blocking_response_type = CompletionAppBlockingResponse
@classmethod
def convert_blocking_full_response(cls, blocking_response: CompletionAppBlockingResponse) -> dict:
"""
Convert blocking full response.
:param blocking_response: blocking response
:return:
"""
response = {
'event': 'message',
'task_id': blocking_response.task_id,
'id': blocking_response.data.id,
'message_id': blocking_response.data.message_id,
'mode': blocking_response.data.mode,
'answer': blocking_response.data.answer,
'metadata': blocking_response.data.metadata,
'created_at': blocking_response.data.created_at
}
return response
@classmethod
def convert_blocking_simple_response(cls, blocking_response: CompletionAppBlockingResponse) -> dict:
"""
Convert blocking simple response.
:param blocking_response: blocking response
:return:
"""
response = cls.convert_blocking_full_response(blocking_response)
metadata = response.get('metadata', {})
response['metadata'] = cls._get_simple_metadata(metadata)
return response
@classmethod
def convert_stream_full_response(cls, stream_response: Generator[CompletionAppStreamResponse, None, None]) \
-> Generator[str, None, None]:
"""
Convert stream full response.
:param stream_response: stream response
:return:
"""
for chunk in stream_response:
chunk = cast(CompletionAppStreamResponse, chunk)
sub_stream_response = chunk.stream_response
if isinstance(sub_stream_response, PingStreamResponse):
yield 'ping'
continue
response_chunk = {
'event': sub_stream_response.event.value,
'message_id': chunk.message_id,
'created_at': chunk.created_at
}
response_chunk.update(sub_stream_response.to_dict())
yield json.dumps(response_chunk)
@classmethod
def convert_stream_simple_response(cls, stream_response: Generator[CompletionAppStreamResponse, None, None]) \
-> Generator[str, None, None]:
"""
Convert stream simple response.
:param stream_response: stream response
:return:
"""
for chunk in stream_response:
chunk = cast(CompletionAppStreamResponse, chunk)
sub_stream_response = chunk.stream_response
if isinstance(sub_stream_response, PingStreamResponse):
yield 'ping'
continue
response_chunk = {
'event': sub_stream_response.event.value,
'message_id': chunk.message_id,
'created_at': chunk.created_at
}
sub_stream_response_dict = sub_stream_response.to_dict()
if isinstance(sub_stream_response, MessageEndStreamResponse):
metadata = sub_stream_response_dict.get('metadata', {})
sub_stream_response_dict['metadata'] = cls._get_simple_metadata(metadata)
response_chunk.update(sub_stream_response_dict)
yield json.dumps(response_chunk)