diff --git a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py index 57ef0c078f..b530fe1ce4 100644 --- a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py +++ b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py @@ -44,14 +44,13 @@ from core.app.entities.task_entities import ( ) from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline from core.app.task_pipeline.message_cycle_manager import MessageCycleManager +from core.app.task_pipeline.message_file_utils import prepare_file_dict from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.model_manager import ModelInstance from core.ops.entities.trace_entity import TraceTaskName from core.ops.ops_trace_manager import TraceQueueManager, TraceTask from core.prompt.utils.prompt_message_util import PromptMessageUtil from core.prompt.utils.prompt_template_parser import PromptTemplateParser -from core.tools.signature import sign_tool_file -from dify_graph.file import helpers as file_helpers from dify_graph.file.enums import FileTransferMethod from dify_graph.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage from dify_graph.model_runtime.entities.message_entities import ( @@ -460,91 +459,40 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline): """ self._task_state.metadata.usage = self._task_state.llm_result.usage metadata_dict = self._task_state.metadata.model_dump() + + # Fetch files associated with this message + files = None + with Session(db.engine, expire_on_commit=False) as session: + message_files = session.scalars(select(MessageFile).where(MessageFile.message_id == self._message_id)).all() + + if message_files: + # Fetch all required UploadFile objects in a single query to avoid N+1 problem + upload_file_ids = list( + dict.fromkeys( + mf.upload_file_id + for mf in message_files + if mf.transfer_method == FileTransferMethod.LOCAL_FILE and mf.upload_file_id + ) + ) + upload_files_map = {} + if upload_file_ids: + upload_files = session.scalars(select(UploadFile).where(UploadFile.id.in_(upload_file_ids))).all() + upload_files_map = {uf.id: uf for uf in upload_files} + + files_list = [] + for message_file in message_files: + file_dict = prepare_file_dict(message_file, upload_files_map) + files_list.append(file_dict) + + files = files_list or None + return MessageEndStreamResponse( task_id=self._application_generate_entity.task_id, id=self._message_id, metadata=metadata_dict, + files=files, ) - def _record_files(self): - with Session(db.engine, expire_on_commit=False) as session: - message_files = session.scalars(select(MessageFile).where(MessageFile.message_id == self._message_id)).all() - if not message_files: - return None - - files_list = [] - upload_file_ids = [ - mf.upload_file_id - for mf in message_files - if mf.transfer_method == FileTransferMethod.LOCAL_FILE and mf.upload_file_id - ] - upload_files_map = {} - if upload_file_ids: - upload_files = session.scalars(select(UploadFile).where(UploadFile.id.in_(upload_file_ids))).all() - upload_files_map = {uf.id: uf for uf in upload_files} - - for message_file in message_files: - upload_file = None - if message_file.transfer_method == FileTransferMethod.LOCAL_FILE and message_file.upload_file_id: - upload_file = upload_files_map.get(message_file.upload_file_id) - - url = None - filename = "file" - mime_type = "application/octet-stream" - size = 0 - extension = "" - - if message_file.transfer_method == FileTransferMethod.REMOTE_URL: - url = message_file.url - if message_file.url: - filename = message_file.url.split("/")[-1].split("?")[0] # Remove query params - elif message_file.transfer_method == FileTransferMethod.LOCAL_FILE: - if upload_file: - url = file_helpers.get_signed_file_url(upload_file_id=str(upload_file.id)) - filename = upload_file.name - mime_type = upload_file.mime_type or "application/octet-stream" - size = upload_file.size or 0 - extension = f".{upload_file.extension}" if upload_file.extension else "" - elif message_file.upload_file_id: - # Fallback: generate URL even if upload_file not found - url = file_helpers.get_signed_file_url(upload_file_id=str(message_file.upload_file_id)) - elif message_file.transfer_method == FileTransferMethod.TOOL_FILE and message_file.url: - # For tool files, use URL directly if it's HTTP, otherwise sign it - if message_file.url.startswith("http"): - url = message_file.url - filename = message_file.url.split("/")[-1].split("?")[0] - else: - # Extract tool file id and extension from URL - url_parts = message_file.url.split("/") - if url_parts: - file_part = url_parts[-1].split("?")[0] # Remove query params first - # Use rsplit to correctly handle filenames with multiple dots - if "." in file_part: - tool_file_id, ext = file_part.rsplit(".", 1) - extension = f".{ext}" - else: - tool_file_id = file_part - extension = ".bin" - url = sign_tool_file(tool_file_id=tool_file_id, extension=extension) - filename = file_part - - transfer_method_value = message_file.transfer_method - remote_url = message_file.url if message_file.transfer_method == FileTransferMethod.REMOTE_URL else "" - file_dict = { - "related_id": message_file.id, - "extension": extension, - "filename": filename, - "size": size, - "mime_type": mime_type, - "transfer_method": transfer_method_value, - "type": message_file.type, - "url": url or "", - "upload_file_id": message_file.upload_file_id or message_file.id, - "remote_url": remote_url, - } - files_list.append(file_dict) - return files_list or None - def _agent_message_to_stream_response(self, answer: str, message_id: str) -> AgentMessageStreamResponse: """ Agent message to stream response. diff --git a/api/core/app/task_pipeline/message_file_utils.py b/api/core/app/task_pipeline/message_file_utils.py new file mode 100644 index 0000000000..843e9eea30 --- /dev/null +++ b/api/core/app/task_pipeline/message_file_utils.py @@ -0,0 +1,76 @@ +from core.tools.signature import sign_tool_file +from dify_graph.file import helpers as file_helpers +from dify_graph.file.enums import FileTransferMethod +from models.model import MessageFile, UploadFile + +MAX_TOOL_FILE_EXTENSION_LENGTH = 10 + + +def prepare_file_dict(message_file: MessageFile, upload_files_map: dict[str, UploadFile]) -> dict: + """ + Prepare file dictionary for message end stream response. + + :param message_file: MessageFile instance + :param upload_files_map: Dictionary mapping upload_file_id to UploadFile + :return: Dictionary containing file information + """ + upload_file = None + if message_file.transfer_method == FileTransferMethod.LOCAL_FILE and message_file.upload_file_id: + upload_file = upload_files_map.get(message_file.upload_file_id) + + url = None + filename = "file" + mime_type = "application/octet-stream" + size = 0 + extension = "" + + if message_file.transfer_method == FileTransferMethod.REMOTE_URL: + url = message_file.url + if message_file.url: + filename = message_file.url.split("/")[-1].split("?")[0] + if "." in filename: + extension = "." + filename.rsplit(".", 1)[1] + elif message_file.transfer_method == FileTransferMethod.LOCAL_FILE: + if upload_file: + url = file_helpers.get_signed_file_url(upload_file_id=str(upload_file.id)) + filename = upload_file.name + mime_type = upload_file.mime_type or "application/octet-stream" + size = upload_file.size or 0 + extension = f".{upload_file.extension}" if upload_file.extension else "" + elif message_file.upload_file_id: + url = file_helpers.get_signed_file_url(upload_file_id=str(message_file.upload_file_id)) + elif message_file.transfer_method == FileTransferMethod.TOOL_FILE and message_file.url: + if message_file.url.startswith(("http://", "https://")): + url = message_file.url + filename = message_file.url.split("/")[-1].split("?")[0] + if "." in filename: + extension = "." + filename.rsplit(".", 1)[1] + else: + url_parts = message_file.url.split("/") + if url_parts: + file_part = url_parts[-1].split("?")[0] + if "." in file_part: + tool_file_id, ext = file_part.rsplit(".", 1) + extension = f".{ext}" + if len(extension) > MAX_TOOL_FILE_EXTENSION_LENGTH: + extension = ".bin" + else: + tool_file_id = file_part + extension = ".bin" + url = sign_tool_file(tool_file_id=tool_file_id, extension=extension) + filename = file_part + + transfer_method_value = message_file.transfer_method.value + remote_url = message_file.url if message_file.transfer_method == FileTransferMethod.REMOTE_URL else "" + return { + "related_id": message_file.id, + "extension": extension, + "filename": filename, + "size": size, + "mime_type": mime_type, + "transfer_method": transfer_method_value, + "type": message_file.type, + "url": url or "", + "upload_file_id": message_file.upload_file_id or message_file.id, + "remote_url": remote_url, + } diff --git a/api/tests/unit_tests/core/app/task_pipeline/test_easy_ui_message_end_files.py b/api/tests/unit_tests/core/app/task_pipeline/test_easy_ui_message_end_files.py new file mode 100644 index 0000000000..582990c88a --- /dev/null +++ b/api/tests/unit_tests/core/app/task_pipeline/test_easy_ui_message_end_files.py @@ -0,0 +1,425 @@ +""" +Unit tests for EasyUIBasedGenerateTaskPipeline._message_end_to_stream_response method. + +This test suite ensures that the files array is correctly populated in the message_end +SSE event, which is critical for vision/image chat responses to render correctly. + +Test Coverage: +- Files array populated when MessageFile records exist +- Files array is None when no MessageFile records exist +- Correct signed URL generation for LOCAL_FILE transfer method +- Correct URL handling for REMOTE_URL transfer method +- Correct URL handling for TOOL_FILE transfer method +- Proper file metadata formatting (filename, mime_type, size, extension) +""" + +import uuid +from unittest.mock import MagicMock, Mock, patch + +import pytest +from sqlalchemy.orm import Session + +from core.app.entities.task_entities import MessageEndStreamResponse +from core.app.task_pipeline.easy_ui_based_generate_task_pipeline import EasyUIBasedGenerateTaskPipeline +from dify_graph.file.enums import FileTransferMethod +from models.model import MessageFile, UploadFile + + +class TestMessageEndStreamResponseFiles: + """Test suite for files array population in message_end SSE event.""" + + @pytest.fixture + def mock_pipeline(self): + """Create a mock EasyUIBasedGenerateTaskPipeline instance.""" + pipeline = Mock(spec=EasyUIBasedGenerateTaskPipeline) + pipeline._message_id = str(uuid.uuid4()) + pipeline._task_state = Mock() + pipeline._task_state.metadata = Mock() + pipeline._task_state.metadata.model_dump = Mock(return_value={"test": "metadata"}) + pipeline._task_state.llm_result = Mock() + pipeline._task_state.llm_result.usage = Mock() + pipeline._application_generate_entity = Mock() + pipeline._application_generate_entity.task_id = str(uuid.uuid4()) + return pipeline + + @pytest.fixture + def mock_message_file_local(self): + """Create a mock MessageFile with LOCAL_FILE transfer method.""" + message_file = Mock(spec=MessageFile) + message_file.id = str(uuid.uuid4()) + message_file.message_id = str(uuid.uuid4()) + message_file.transfer_method = FileTransferMethod.LOCAL_FILE + message_file.upload_file_id = str(uuid.uuid4()) + message_file.url = None + message_file.type = "image" + return message_file + + @pytest.fixture + def mock_message_file_remote(self): + """Create a mock MessageFile with REMOTE_URL transfer method.""" + message_file = Mock(spec=MessageFile) + message_file.id = str(uuid.uuid4()) + message_file.message_id = str(uuid.uuid4()) + message_file.transfer_method = FileTransferMethod.REMOTE_URL + message_file.upload_file_id = None + message_file.url = "https://example.com/image.jpg" + message_file.type = "image" + return message_file + + @pytest.fixture + def mock_message_file_tool(self): + """Create a mock MessageFile with TOOL_FILE transfer method.""" + message_file = Mock(spec=MessageFile) + message_file.id = str(uuid.uuid4()) + message_file.message_id = str(uuid.uuid4()) + message_file.transfer_method = FileTransferMethod.TOOL_FILE + message_file.upload_file_id = None + message_file.url = "tool_file_123.png" + message_file.type = "image" + return message_file + + @pytest.fixture + def mock_upload_file(self, mock_message_file_local): + """Create a mock UploadFile.""" + upload_file = Mock(spec=UploadFile) + upload_file.id = mock_message_file_local.upload_file_id + upload_file.name = "test_image.png" + upload_file.mime_type = "image/png" + upload_file.size = 1024 + upload_file.extension = "png" + return upload_file + + def test_message_end_with_no_files(self, mock_pipeline): + """Test that files array is None when no MessageFile records exist.""" + # Arrange + with ( + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.db") as mock_db, + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.Session") as mock_session_class, + ): + mock_engine = MagicMock() + mock_db.engine = mock_engine + + mock_session = MagicMock(spec=Session) + mock_session_class.return_value.__enter__.return_value = mock_session + mock_session.scalars.return_value.all.return_value = [] + + # Act + result = EasyUIBasedGenerateTaskPipeline._message_end_to_stream_response(mock_pipeline) + + # Assert + assert isinstance(result, MessageEndStreamResponse) + assert result.files is None + assert result.id == mock_pipeline._message_id + assert result.metadata == {"test": "metadata"} + + def test_message_end_with_local_file(self, mock_pipeline, mock_message_file_local, mock_upload_file): + """Test that files array is populated correctly for LOCAL_FILE transfer method.""" + # Arrange + mock_message_file_local.message_id = mock_pipeline._message_id + + with ( + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.db") as mock_db, + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.Session") as mock_session_class, + patch("core.app.task_pipeline.message_file_utils.file_helpers.get_signed_file_url") as mock_get_url, + ): + mock_engine = MagicMock() + mock_db.engine = mock_engine + + mock_session = MagicMock(spec=Session) + mock_session_class.return_value.__enter__.return_value = mock_session + + # Mock database queries + # First query: MessageFile + mock_message_files_result = Mock() + mock_message_files_result.all.return_value = [mock_message_file_local] + + # Second query: UploadFile (batch query to avoid N+1) + mock_upload_files_result = Mock() + mock_upload_files_result.all.return_value = [mock_upload_file] + + # Setup scalars to return different results for different queries + call_count = [0] # Use list to allow modification in nested function + + def scalars_side_effect(query): + call_count[0] += 1 + # First call is for MessageFile, second call is for UploadFile + if call_count[0] == 1: + return mock_message_files_result + else: + return mock_upload_files_result + + mock_session.scalars.side_effect = scalars_side_effect + mock_get_url.return_value = "https://example.com/signed-url?signature=abc123" + + # Act + result = EasyUIBasedGenerateTaskPipeline._message_end_to_stream_response(mock_pipeline) + + # Assert + assert isinstance(result, MessageEndStreamResponse) + assert result.files is not None + assert len(result.files) == 1 + + file_dict = result.files[0] + assert file_dict["related_id"] == mock_message_file_local.id + assert file_dict["filename"] == "test_image.png" + assert file_dict["mime_type"] == "image/png" + assert file_dict["size"] == 1024 + assert file_dict["extension"] == ".png" + assert file_dict["type"] == "image" + assert file_dict["transfer_method"] == FileTransferMethod.LOCAL_FILE.value + assert "https://example.com/signed-url" in file_dict["url"] + assert file_dict["upload_file_id"] == mock_message_file_local.upload_file_id + assert file_dict["remote_url"] == "" + + # Verify database queries + # Should be called twice: once for MessageFile, once for UploadFile + assert mock_session.scalars.call_count == 2 + mock_get_url.assert_called_once_with(upload_file_id=str(mock_upload_file.id)) + + def test_message_end_with_remote_url(self, mock_pipeline, mock_message_file_remote): + """Test that files array is populated correctly for REMOTE_URL transfer method.""" + # Arrange + mock_message_file_remote.message_id = mock_pipeline._message_id + + with ( + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.db") as mock_db, + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.Session") as mock_session_class, + ): + mock_engine = MagicMock() + mock_db.engine = mock_engine + mock_session = MagicMock(spec=Session) + mock_session_class.return_value.__enter__.return_value = mock_session + + # Mock database queries + mock_scalars_result = Mock() + mock_scalars_result.all.return_value = [mock_message_file_remote] + mock_session.scalars.return_value = mock_scalars_result + + # Act + result = EasyUIBasedGenerateTaskPipeline._message_end_to_stream_response(mock_pipeline) + + # Assert + assert isinstance(result, MessageEndStreamResponse) + assert result.files is not None + assert len(result.files) == 1 + + file_dict = result.files[0] + assert file_dict["related_id"] == mock_message_file_remote.id + assert file_dict["filename"] == "image.jpg" + assert file_dict["url"] == "https://example.com/image.jpg" + assert file_dict["extension"] == ".jpg" + assert file_dict["type"] == "image" + assert file_dict["transfer_method"] == FileTransferMethod.REMOTE_URL.value + assert file_dict["remote_url"] == "https://example.com/image.jpg" + assert file_dict["upload_file_id"] == mock_message_file_remote.id + + # Verify only one query for message_files is made + mock_session.scalars.assert_called_once() + + def test_message_end_with_tool_file_http(self, mock_pipeline, mock_message_file_tool): + """Test that files array is populated correctly for TOOL_FILE with HTTP URL.""" + # Arrange + mock_message_file_tool.message_id = mock_pipeline._message_id + mock_message_file_tool.url = "https://example.com/tool_file.png" + + with ( + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.db") as mock_db, + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.Session") as mock_session_class, + ): + mock_engine = MagicMock() + mock_db.engine = mock_engine + mock_session = MagicMock(spec=Session) + mock_session_class.return_value.__enter__.return_value = mock_session + + # Mock database queries + mock_scalars_result = Mock() + mock_scalars_result.all.return_value = [mock_message_file_tool] + mock_session.scalars.return_value = mock_scalars_result + + # Act + result = EasyUIBasedGenerateTaskPipeline._message_end_to_stream_response(mock_pipeline) + + # Assert + assert isinstance(result, MessageEndStreamResponse) + assert result.files is not None + assert len(result.files) == 1 + + file_dict = result.files[0] + assert file_dict["url"] == "https://example.com/tool_file.png" + assert file_dict["filename"] == "tool_file.png" + assert file_dict["extension"] == ".png" + assert file_dict["transfer_method"] == FileTransferMethod.TOOL_FILE.value + + def test_message_end_with_tool_file_local(self, mock_pipeline, mock_message_file_tool): + """Test that files array is populated correctly for TOOL_FILE with local path.""" + # Arrange + mock_message_file_tool.message_id = mock_pipeline._message_id + mock_message_file_tool.url = "tool_file_123.png" + + with ( + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.db") as mock_db, + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.Session") as mock_session_class, + patch("core.app.task_pipeline.message_file_utils.sign_tool_file") as mock_sign_tool, + ): + mock_engine = MagicMock() + mock_db.engine = mock_engine + + mock_session = MagicMock(spec=Session) + mock_session_class.return_value.__enter__.return_value = mock_session + + # Mock database queries + mock_scalars_result = Mock() + mock_scalars_result.all.return_value = [mock_message_file_tool] + mock_session.scalars.return_value = mock_scalars_result + + mock_sign_tool.return_value = "https://example.com/signed-tool-file.png?signature=xyz" + + # Act + result = EasyUIBasedGenerateTaskPipeline._message_end_to_stream_response(mock_pipeline) + + # Assert + assert isinstance(result, MessageEndStreamResponse) + assert result.files is not None + assert len(result.files) == 1 + + file_dict = result.files[0] + assert "https://example.com/signed-tool-file.png" in file_dict["url"] + assert file_dict["filename"] == "tool_file_123.png" + assert file_dict["extension"] == ".png" + assert file_dict["transfer_method"] == FileTransferMethod.TOOL_FILE.value + + # Verify tool file signing was called + mock_sign_tool.assert_called_once_with(tool_file_id="tool_file_123", extension=".png") + + def test_message_end_with_tool_file_long_extension(self, mock_pipeline, mock_message_file_tool): + """Test that TOOL_FILE extensions longer than MAX_TOOL_FILE_EXTENSION_LENGTH fall back to .bin.""" + mock_message_file_tool.message_id = mock_pipeline._message_id + mock_message_file_tool.url = "tool_file_abc.verylongextension" + + with ( + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.db") as mock_db, + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.Session") as mock_session_class, + patch("core.app.task_pipeline.message_file_utils.sign_tool_file") as mock_sign_tool, + ): + mock_engine = MagicMock() + mock_db.engine = mock_engine + mock_session = MagicMock(spec=Session) + mock_session_class.return_value.__enter__.return_value = mock_session + mock_scalars_result = Mock() + mock_scalars_result.all.return_value = [mock_message_file_tool] + mock_session.scalars.return_value = mock_scalars_result + mock_sign_tool.return_value = "https://example.com/signed.bin" + + result = EasyUIBasedGenerateTaskPipeline._message_end_to_stream_response(mock_pipeline) + + assert result.files is not None + file_dict = result.files[0] + assert file_dict["extension"] == ".bin" + mock_sign_tool.assert_called_once_with(tool_file_id="tool_file_abc", extension=".bin") + + def test_message_end_with_multiple_files( + self, mock_pipeline, mock_message_file_local, mock_message_file_remote, mock_upload_file + ): + """Test that files array contains all MessageFile records when multiple exist.""" + # Arrange + mock_message_file_local.message_id = mock_pipeline._message_id + mock_message_file_remote.message_id = mock_pipeline._message_id + + with ( + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.db") as mock_db, + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.Session") as mock_session_class, + patch("core.app.task_pipeline.message_file_utils.file_helpers.get_signed_file_url") as mock_get_url, + ): + mock_engine = MagicMock() + mock_db.engine = mock_engine + + mock_session = MagicMock(spec=Session) + mock_session_class.return_value.__enter__.return_value = mock_session + + # Mock database queries + # First query: MessageFile + mock_message_files_result = Mock() + mock_message_files_result.all.return_value = [mock_message_file_local, mock_message_file_remote] + + # Second query: UploadFile (batch query to avoid N+1) + mock_upload_files_result = Mock() + mock_upload_files_result.all.return_value = [mock_upload_file] + + # Setup scalars to return different results for different queries + call_count = [0] # Use list to allow modification in nested function + + def scalars_side_effect(query): + call_count[0] += 1 + # First call is for MessageFile, second call is for UploadFile + if call_count[0] == 1: + return mock_message_files_result + else: + return mock_upload_files_result + + mock_session.scalars.side_effect = scalars_side_effect + mock_get_url.return_value = "https://example.com/signed-url?signature=abc123" + + # Act + result = EasyUIBasedGenerateTaskPipeline._message_end_to_stream_response(mock_pipeline) + + # Assert + assert isinstance(result, MessageEndStreamResponse) + assert result.files is not None + assert len(result.files) == 2 + + # Verify both files are present + file_ids = [f["related_id"] for f in result.files] + assert mock_message_file_local.id in file_ids + assert mock_message_file_remote.id in file_ids + + def test_message_end_with_local_file_no_upload_file(self, mock_pipeline, mock_message_file_local): + """Test fallback when UploadFile is not found for LOCAL_FILE.""" + # Arrange + mock_message_file_local.message_id = mock_pipeline._message_id + + with ( + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.db") as mock_db, + patch("core.app.task_pipeline.easy_ui_based_generate_task_pipeline.Session") as mock_session_class, + patch("core.app.task_pipeline.message_file_utils.file_helpers.get_signed_file_url") as mock_get_url, + ): + mock_engine = MagicMock() + mock_db.engine = mock_engine + + mock_session = MagicMock(spec=Session) + mock_session_class.return_value.__enter__.return_value = mock_session + + # Mock database queries + # First query: MessageFile + mock_message_files_result = Mock() + mock_message_files_result.all.return_value = [mock_message_file_local] + + # Second query: UploadFile (batch query) - returns empty list (not found) + mock_upload_files_result = Mock() + mock_upload_files_result.all.return_value = [] # UploadFile not found + + # Setup scalars to return different results for different queries + call_count = [0] # Use list to allow modification in nested function + + def scalars_side_effect(query): + call_count[0] += 1 + # First call is for MessageFile, second call is for UploadFile + if call_count[0] == 1: + return mock_message_files_result + else: + return mock_upload_files_result + + mock_session.scalars.side_effect = scalars_side_effect + mock_get_url.return_value = "https://example.com/fallback-url?signature=def456" + + # Act + result = EasyUIBasedGenerateTaskPipeline._message_end_to_stream_response(mock_pipeline) + + # Assert + assert isinstance(result, MessageEndStreamResponse) + assert result.files is not None + assert len(result.files) == 1 + + file_dict = result.files[0] + assert "https://example.com/fallback-url" in file_dict["url"] + # Verify fallback URL was generated using upload_file_id from message_file + mock_get_url.assert_called_with(upload_file_id=str(mock_message_file_local.upload_file_id))