feat(datasource): change datasource result type to event-stream

This commit is contained in:
Dongyu Li
2025-06-17 13:51:41 +08:00
parent b277acc298
commit c5976f5a09
15 changed files with 281 additions and 267 deletions

View File

@ -5,7 +5,7 @@ from sqlalchemy import select
from sqlalchemy.orm import Session
from core.datasource.entities.datasource_entities import (
DatasourceInvokeMessage,
DatasourceMessage,
DatasourceParameter,
DatasourceProviderType,
GetOnlineDocumentPageContentRequest,
@ -100,8 +100,8 @@ class DatasourceNode(BaseNode[DatasourceNodeData]):
match datasource_type:
case DatasourceProviderType.ONLINE_DOCUMENT:
datasource_runtime = cast(OnlineDocumentDatasourcePlugin, datasource_runtime)
online_document_result: Generator[DatasourceInvokeMessage, None, None] = (
datasource_runtime._get_online_document_page_content(
online_document_result: Generator[DatasourceMessage, None, None] = (
datasource_runtime.get_online_document_page_content(
user_id=self.user_id,
datasource_parameters=GetOnlineDocumentPageContentRequest(**parameters),
provider_type=datasource_type,
@ -290,7 +290,7 @@ class DatasourceNode(BaseNode[DatasourceNodeData]):
def _transform_message(
self,
messages: Generator[DatasourceInvokeMessage, None, None],
messages: Generator[DatasourceMessage, None, None],
parameters_for_log: dict[str, Any],
datasource_info: dict[str, Any],
) -> Generator:
@ -313,11 +313,11 @@ class DatasourceNode(BaseNode[DatasourceNodeData]):
for message in message_stream:
if message.type in {
DatasourceInvokeMessage.MessageType.IMAGE_LINK,
DatasourceInvokeMessage.MessageType.BINARY_LINK,
DatasourceInvokeMessage.MessageType.IMAGE,
DatasourceMessage.MessageType.IMAGE_LINK,
DatasourceMessage.MessageType.BINARY_LINK,
DatasourceMessage.MessageType.IMAGE,
}:
assert isinstance(message.message, DatasourceInvokeMessage.TextMessage)
assert isinstance(message.message, DatasourceMessage.TextMessage)
url = message.message.text
if message.meta:
@ -344,9 +344,9 @@ class DatasourceNode(BaseNode[DatasourceNodeData]):
tenant_id=self.tenant_id,
)
files.append(file)
elif message.type == DatasourceInvokeMessage.MessageType.BLOB:
elif message.type == DatasourceMessage.MessageType.BLOB:
# get tool file id
assert isinstance(message.message, DatasourceInvokeMessage.TextMessage)
assert isinstance(message.message, DatasourceMessage.TextMessage)
assert message.meta
datasource_file_id = message.message.text.split("/")[-1].split(".")[0]
@ -367,14 +367,14 @@ class DatasourceNode(BaseNode[DatasourceNodeData]):
tenant_id=self.tenant_id,
)
)
elif message.type == DatasourceInvokeMessage.MessageType.TEXT:
assert isinstance(message.message, DatasourceInvokeMessage.TextMessage)
elif message.type == DatasourceMessage.MessageType.TEXT:
assert isinstance(message.message, DatasourceMessage.TextMessage)
text += message.message.text
yield RunStreamChunkEvent(
chunk_content=message.message.text, from_variable_selector=[self.node_id, "text"]
)
elif message.type == DatasourceInvokeMessage.MessageType.JSON:
assert isinstance(message.message, DatasourceInvokeMessage.JsonMessage)
elif message.type == DatasourceMessage.MessageType.JSON:
assert isinstance(message.message, DatasourceMessage.JsonMessage)
if self.node_type == NodeType.AGENT:
msg_metadata = message.message.json_object.pop("execution_metadata", {})
agent_execution_metadata = {
@ -383,13 +383,13 @@ class DatasourceNode(BaseNode[DatasourceNodeData]):
if key in WorkflowNodeExecutionMetadataKey.__members__.values()
}
json.append(message.message.json_object)
elif message.type == DatasourceInvokeMessage.MessageType.LINK:
assert isinstance(message.message, DatasourceInvokeMessage.TextMessage)
elif message.type == DatasourceMessage.MessageType.LINK:
assert isinstance(message.message, DatasourceMessage.TextMessage)
stream_text = f"Link: {message.message.text}\n"
text += stream_text
yield RunStreamChunkEvent(chunk_content=stream_text, from_variable_selector=[self.node_id, "text"])
elif message.type == DatasourceInvokeMessage.MessageType.VARIABLE:
assert isinstance(message.message, DatasourceInvokeMessage.VariableMessage)
elif message.type == DatasourceMessage.MessageType.VARIABLE:
assert isinstance(message.message, DatasourceMessage.VariableMessage)
variable_name = message.message.variable_name
variable_value = message.message.variable_value
if message.message.stream:
@ -404,7 +404,7 @@ class DatasourceNode(BaseNode[DatasourceNodeData]):
)
else:
variables[variable_name] = variable_value
elif message.type == DatasourceInvokeMessage.MessageType.FILE:
elif message.type == DatasourceMessage.MessageType.FILE:
assert message.meta is not None
files.append(message.meta["file"])