From c0bdd6792f703ceef5efc2e8e0ccccdbda6ed85a Mon Sep 17 00:00:00 2001 From: EvanYao <2869018789@qq.com> Date: Fri, 15 May 2026 21:51:49 +0800 Subject: [PATCH] refactor: convert isinstance chains to match/case in easy_ui_based_generate_task_pipeline.py (#36222) --- .../easy_ui_based_generate_task_pipeline.py | 256 +++++++++--------- 1 file changed, 130 insertions(+), 126 deletions(-) diff --git a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py index e2e07ebaff..171d5ab342 100644 --- a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py +++ b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py @@ -140,42 +140,43 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline): :return: """ for stream_response in generator: - if isinstance(stream_response, ErrorStreamResponse): - raise stream_response.err - elif isinstance(stream_response, MessageEndStreamResponse): - extras = {"usage": self._task_state.llm_result.usage.model_dump()} - if self._task_state.metadata: - extras["metadata"] = self._task_state.metadata.model_dump() - response: ChatbotAppBlockingResponse | CompletionAppBlockingResponse - if self._conversation_mode == AppMode.COMPLETION: - response = CompletionAppBlockingResponse( - task_id=self._application_generate_entity.task_id, - data=CompletionAppBlockingResponse.Data( - id=self._message_id, - mode=self._conversation_mode, - message_id=self._message_id, - answer=self._task_state.llm_result.message.get_text_content(), - created_at=self._message_created_at, - **extras, - ), - ) - else: - response = ChatbotAppBlockingResponse( - task_id=self._application_generate_entity.task_id, - data=ChatbotAppBlockingResponse.Data( - id=self._message_id, - mode=self._conversation_mode, - conversation_id=self._conversation_id, - message_id=self._message_id, - answer=self._task_state.llm_result.message.get_text_content(), - created_at=self._message_created_at, - **extras, - ), - ) + match stream_response: + case ErrorStreamResponse(): + raise stream_response.err + case MessageEndStreamResponse(): + extras = {"usage": self._task_state.llm_result.usage.model_dump()} + if self._task_state.metadata: + extras["metadata"] = self._task_state.metadata.model_dump() + response: ChatbotAppBlockingResponse | CompletionAppBlockingResponse + if self._conversation_mode == AppMode.COMPLETION: + response = CompletionAppBlockingResponse( + task_id=self._application_generate_entity.task_id, + data=CompletionAppBlockingResponse.Data( + id=self._message_id, + mode=self._conversation_mode, + message_id=self._message_id, + answer=self._task_state.llm_result.message.get_text_content(), + created_at=self._message_created_at, + **extras, + ), + ) + else: + response = ChatbotAppBlockingResponse( + task_id=self._application_generate_entity.task_id, + data=ChatbotAppBlockingResponse.Data( + id=self._message_id, + mode=self._conversation_mode, + conversation_id=self._conversation_id, + message_id=self._message_id, + answer=self._task_state.llm_result.message.get_text_content(), + created_at=self._message_created_at, + **extras, + ), + ) - return response - else: - continue + return response + case _: + continue raise RuntimeError("queue listening stopped unexpectedly.") @@ -265,104 +266,107 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline): publisher.publish(message) event = message.event - if isinstance(event, QueueErrorEvent): - with sessionmaker(bind=db.engine).begin() as session: - err = self.handle_error(event=event, session=session, message_id=self._message_id) - yield self.error_to_stream_response(err) - break - elif isinstance(event, QueueStopEvent | QueueMessageEndEvent): - if isinstance(event, QueueMessageEndEvent): - if event.llm_result: - self._task_state.llm_result = event.llm_result - else: - self._handle_stop(event) + match event: + case QueueErrorEvent(): + with sessionmaker(bind=db.engine).begin() as session: + err = self.handle_error(event=event, session=session, message_id=self._message_id) + yield self.error_to_stream_response(err) + break + case QueueStopEvent() | QueueMessageEndEvent(): + if isinstance(event, QueueMessageEndEvent): + if event.llm_result: + self._task_state.llm_result = event.llm_result + else: + self._handle_stop(event) - # handle output moderation - output_moderation_answer = self.handle_output_moderation_when_task_finished( - self._task_state.llm_result.message.get_text_content() - ) - if output_moderation_answer: - self._task_state.llm_result.message.content = output_moderation_answer - yield self._message_cycle_manager.message_replace_to_stream_response( - answer=output_moderation_answer + # handle output moderation + output_moderation_answer = self.handle_output_moderation_when_task_finished( + self._task_state.llm_result.message.get_text_content() ) - - with sessionmaker(bind=db.engine).begin() as session: - # Save message - self._save_message(session=session, trace_manager=trace_manager) - message_end_resp = self._message_end_to_stream_response() - yield message_end_resp - elif isinstance(event, QueueRetrieverResourcesEvent): - self._message_cycle_manager.handle_retriever_resources(event) - elif isinstance(event, QueueAnnotationReplyEvent): - annotation = self._message_cycle_manager.handle_annotation_reply(event) - if annotation: - self._task_state.llm_result.message.content = annotation.content - elif isinstance(event, QueueAgentThoughtEvent): - agent_thought_response = self._agent_thought_to_stream_response(event) - if agent_thought_response is not None: - yield agent_thought_response - elif isinstance(event, QueueMessageFileEvent): - response = self._message_cycle_manager.message_file_to_stream_response(event) - if response: - yield response - elif isinstance(event, QueueLLMChunkEvent | QueueAgentMessageEvent): - chunk = event.chunk - delta_text = chunk.delta.message.content - if delta_text is None: - continue - if isinstance(chunk.delta.message.content, list): - delta_text = "" - for content in chunk.delta.message.content: - logger.debug( - "The content type %s in LLM chunk delta message content.: %r", type(content), content + if output_moderation_answer: + self._task_state.llm_result.message.content = output_moderation_answer + yield self._message_cycle_manager.message_replace_to_stream_response( + answer=output_moderation_answer ) - if isinstance(content, TextPromptMessageContent): - delta_text += content.data - elif isinstance(content, str): - delta_text += content # failback to str - else: - logger.warning( - "Unsupported content type %s in LLM chunk delta message content.: %r", - type(content), - content, + + with sessionmaker(bind=db.engine).begin() as session: + # Save message + self._save_message(session=session, trace_manager=trace_manager) + message_end_resp = self._message_end_to_stream_response() + yield message_end_resp + case QueueRetrieverResourcesEvent(): + self._message_cycle_manager.handle_retriever_resources(event) + case QueueAnnotationReplyEvent(): + annotation = self._message_cycle_manager.handle_annotation_reply(event) + if annotation: + self._task_state.llm_result.message.content = annotation.content + case QueueAgentThoughtEvent(): + agent_thought_response = self._agent_thought_to_stream_response(event) + if agent_thought_response is not None: + yield agent_thought_response + case QueueMessageFileEvent(): + response = self._message_cycle_manager.message_file_to_stream_response(event) + if response: + yield response + case QueueLLMChunkEvent() | QueueAgentMessageEvent(): + chunk = event.chunk + delta_text = chunk.delta.message.content + if delta_text is None: + continue + if isinstance(chunk.delta.message.content, list): + delta_text = "" + for content in chunk.delta.message.content: + logger.debug( + "The content type %s in LLM chunk delta message content.: %r", type(content), content ) - continue + match content: + case TextPromptMessageContent(): + delta_text += content.data + case str(): + delta_text += content # failback to str + case _: + logger.warning( + "Unsupported content type %s in LLM chunk delta message content.: %r", + type(content), + content, + ) + continue - if not self._task_state.llm_result.prompt_messages: - self._task_state.llm_result.prompt_messages = chunk.prompt_messages + if not self._task_state.llm_result.prompt_messages: + self._task_state.llm_result.prompt_messages = chunk.prompt_messages - # handle output moderation chunk - should_direct_answer = self._handle_output_moderation_chunk(cast(str, delta_text)) - if should_direct_answer: + # handle output moderation chunk + should_direct_answer = self._handle_output_moderation_chunk(cast(str, delta_text)) + if should_direct_answer: + continue + + current_content = cast(str, self._task_state.llm_result.message.content) + current_content += cast(str, delta_text) + self._task_state.llm_result.message.content = current_content + + match event: + case QueueLLMChunkEvent(): + # Determine the event type once, on first LLM chunk, and reuse for subsequent chunks + if not hasattr(self, "_precomputed_event_type") or self._precomputed_event_type is None: + self._precomputed_event_type = self._message_cycle_manager.get_message_event_type( + message_id=self._message_id + ) + yield self._message_cycle_manager.message_to_stream_response( + answer=cast(str, delta_text), + message_id=self._message_id, + event_type=self._precomputed_event_type, + ) + case _: + yield self._agent_message_to_stream_response( + answer=cast(str, delta_text), + message_id=self._message_id, + ) + case QueueMessageReplaceEvent(): + yield self._message_cycle_manager.message_replace_to_stream_response(answer=event.text) + case QueuePingEvent(): + yield self.ping_stream_response() + case _: continue - - current_content = cast(str, self._task_state.llm_result.message.content) - current_content += cast(str, delta_text) - self._task_state.llm_result.message.content = current_content - - if isinstance(event, QueueLLMChunkEvent): - # Determine the event type once, on first LLM chunk, and reuse for subsequent chunks - if not hasattr(self, "_precomputed_event_type") or self._precomputed_event_type is None: - self._precomputed_event_type = self._message_cycle_manager.get_message_event_type( - message_id=self._message_id - ) - yield self._message_cycle_manager.message_to_stream_response( - answer=cast(str, delta_text), - message_id=self._message_id, - event_type=self._precomputed_event_type, - ) - else: - yield self._agent_message_to_stream_response( - answer=cast(str, delta_text), - message_id=self._message_id, - ) - elif isinstance(event, QueueMessageReplaceEvent): - yield self._message_cycle_manager.message_replace_to_stream_response(answer=event.text) - elif isinstance(event, QueuePingEvent): - yield self.ping_stream_response() - else: - continue if publisher: publisher.publish(None) if self._conversation_name_generate_thread: