feat: implement invoke app args

This commit is contained in:
Yeuoly
2024-08-29 20:50:36 +08:00
parent 41ed2e0cc2
commit 12ea085e22
5 changed files with 99 additions and 82 deletions

View File

@ -6,15 +6,17 @@ from pydantic import BaseModel
class BaseBackwardsInvocation:
@classmethod
def convert_to_event_stream(cls, response: Generator[BaseModel | dict, None, None] | BaseModel | dict):
def convert_to_event_stream(cls, response: Generator[BaseModel | dict | str, None, None] | BaseModel | dict):
if isinstance(response, Generator):
for chunk in response:
if isinstance(chunk, BaseModel):
yield chunk.model_dump_json().encode() + b'\n\n'
if isinstance(chunk, str):
yield f"event: {chunk}\n\n".encode()
else:
yield json.dumps(chunk).encode() + b'\n\n'
else:
if isinstance(response, BaseModel):
yield response.model_dump_json().encode() + b'\n\n'
else:
yield json.dumps(response).encode() + b'\n\n'
yield json.dumps(response).encode() + b'\n\n'