From 76d7a44585609fa47d45062c216cef26fd39e37b Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Wed, 28 Jan 2026 16:31:55 +0800 Subject: [PATCH] chore(api): reformat code --- .../test_parallel_human_input_join_resume.py | 4 +- .../libs/redis_channels/# 广播队列抽象.md | 101 ------------------ .../unit_tests/models/test_app_models.py | 1 + 3 files changed, 2 insertions(+), 104 deletions(-) delete mode 100644 api/tests/unit_tests/libs/redis_channels/# 广播队列抽象.md diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_parallel_human_input_join_resume.py b/api/tests/unit_tests/core/workflow/graph_engine/test_parallel_human_input_join_resume.py index ea77690cfb..b52fcf0526 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_parallel_human_input_join_resume.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_parallel_human_input_join_resume.py @@ -264,6 +264,4 @@ def test_parallel_human_input_join_completes_after_second_resume() -> None: assert isinstance(second_resume_events[0], GraphRunStartedEvent) assert second_resume_events[0].reason is WorkflowStartReason.RESUMPTION assert isinstance(second_resume_events[-1], GraphRunSucceededEvent) - assert any( - isinstance(event, NodeRunSucceededEvent) and event.node_id == "end" for event in second_resume_events - ) + assert any(isinstance(event, NodeRunSucceededEvent) and event.node_id == "end" for event in second_resume_events) diff --git a/api/tests/unit_tests/libs/redis_channels/# 广播队列抽象.md b/api/tests/unit_tests/libs/redis_channels/# 广播队列抽象.md deleted file mode 100644 index a470680725..0000000000 --- a/api/tests/unit_tests/libs/redis_channels/# 广播队列抽象.md +++ /dev/null @@ -1,101 +0,0 @@ -# 广播队列抽象 - -我需要设计一个 BroadcastChannel,实现一个对广播队列的抽象。 - -这个广播队列的语义是这样的:队列与一个 key 绑定,队列的使用分为两个角色,Producer 和 Consumer。API 大概如下 - -```py -producer: Produder -producer.publish(payload) - -consumer: Consumer -for i in consumer.subscribe(): - # do something with payload - # ... -``` - -队列的语义是这样的 - -1. 订阅和发布都以 topic(key) 为 namespace,例如,例如 publisher1 往 topic_a 进行发布,consumer1 - 订阅了 topic_a,那么 consumer1 应该能收到 producer1 的消息;而订阅了 topic_b 的 consumer2 - 无法收到 producer1 的消息,consumer1 也无法收到 producer3 往 topic_c 发送的消息。 -2. At most once delivery,不进行持久化,如果 Publisher1 向 topic_a 发布消息 msg_1 的时候,topic_a 没有 consumer, - 那么消息 msg_1 不会被任何人收到;在这之后,如果 consumer1 订阅了 topic_a,然后 Publisher1 向 topic_a 发送了 msg_2,那么 consumer2 - 应该能收到 msg_2,但依然收不到 msg_1。 -3. 广播语义。如果 consumer1 和 consumer2 同时订阅了 topic_a,然后 producer1 向 topic_a 发布了 msg_1 ,那么 consumer1 和 - consumer 2 都应该能收到 msg_1。 - -大体上来说,就是 redis PubSub 的语义,但去掉了 key pattern subscribe 的部分。 - -关于这个队列,我对它的 API 设计有一些想法,考虑下面两种 API 设计: - -```py -import abc -from typing import Protocol - -# 方案 A,Publisher 和 Subscriber API 分开 - -class Producer(Protocol): - @abc.abstractmethod - def publish(self, topic: str, payload: bytes): - pass - - -class Consumer(Protocol): - @abc.abstractmethod - def subscribe(self, topic: str) -> Iterator[bytes]: - pass - -# 方案 B,Publisher 和 Subscriber API 合并 - -class PubSub(Protocol): - @abc.abstractmethod - def publish(self, topic: str, payload: bytes): - pass - - @abc.abstractmethod - def subscribe(self, topic: str) -> Iterator[bytes]: - pass -``` - -并且对于是否在 publish / subscribe 方法中包含 topic name,抑或是将 topic name 「藏」在接口里面,也需要考虑: - -```py -import abc -from typing import Protocol - -# 将 topic 参数「藏」在 constructor 里面,提供最强的抽象: - - -class Producer(Protocol): - @abc.abstractmethod - def publish(self, payload: bytes): - pass - - -class RedisProducer: - def __init__(self, redis_client, topic: str): - self._topic = topic - self._redis_client = redis_client - - def publish(self, payload: bytes): - pass - -# 像上面的代码一样,topic 参数是接口的一部分 - -class ProducerWithTopic(Protocol): - @abc.abstractmethod - def publish(self, topic: str, payload: bytes): - pass - - -class RedisProducerWithTopic: - def __init__(self, redis_client): - self._topic = topic - self._redis_client = redis_client - - def publish(self, topic:str, payload: bytes): - pass -``` - -请你帮我分析一下我应该如何设计接口。 diff --git a/api/tests/unit_tests/models/test_app_models.py b/api/tests/unit_tests/models/test_app_models.py index 3317e9a311..c6dfd41803 100644 --- a/api/tests/unit_tests/models/test_app_models.py +++ b/api/tests/unit_tests/models/test_app_models.py @@ -1442,6 +1442,7 @@ class TestConversationStatusCount: ] with patch("models.model.db.session.scalars") as mock_scalars: + def mock_scalars_side_effect(query): mock_result = MagicMock() if "messages" in str(query):