mirror of
https://github.com/langgenius/dify.git
synced 2026-05-05 01:48:04 +08:00
feat(api): send ping while the connection is idle
To keep the connection alive and avoid being closed.
This commit is contained in:
@ -31,18 +31,21 @@ class MessageGenerator:
|
||||
app_mode: AppMode,
|
||||
workflow_run_id: str,
|
||||
idle_timeout=300,
|
||||
ping_interval: float = 10.0,
|
||||
on_subscribe: Callable[[], None] | None = None,
|
||||
) -> Generator[Mapping | str, None, None]:
|
||||
topic = cls.get_response_topic(app_mode, workflow_run_id)
|
||||
return _topic_msg_generator(topic, idle_timeout, on_subscribe)
|
||||
return _topic_msg_generator(topic, idle_timeout, ping_interval, on_subscribe)
|
||||
|
||||
|
||||
def _topic_msg_generator(
|
||||
topic: Topic,
|
||||
idle_timeout: float,
|
||||
ping_interval: float,
|
||||
on_subscribe: Callable[[], None] | None = None,
|
||||
) -> Generator[Mapping[str, Any], None, None]:
|
||||
last_msg_time = time.time()
|
||||
last_ping_time = last_msg_time
|
||||
with topic.subscribe() as sub:
|
||||
# on_subscribe fires only after the Redis subscription is active.
|
||||
# This is used to gate task start and reduce pub/sub race for the first event.
|
||||
@ -50,17 +53,21 @@ def _topic_msg_generator(
|
||||
on_subscribe()
|
||||
while True:
|
||||
try:
|
||||
msg = sub.receive()
|
||||
msg = sub.receive(timeout=0.1)
|
||||
except SubscriptionClosedError:
|
||||
return
|
||||
if msg is None:
|
||||
current_time = time.time()
|
||||
if current_time - last_msg_time > idle_timeout:
|
||||
return
|
||||
if current_time - last_ping_time >= ping_interval:
|
||||
yield "ping"
|
||||
last_ping_time = current_time
|
||||
# skip the `None` message
|
||||
continue
|
||||
|
||||
last_msg_time = time.time()
|
||||
last_ping_time = last_msg_time
|
||||
event = json.loads(msg)
|
||||
yield event
|
||||
if not isinstance(event, dict):
|
||||
|
||||
@ -162,7 +162,7 @@ class RedisSubscriptionBase(Subscription):
|
||||
self._start_if_needed()
|
||||
return iter(self._message_iterator())
|
||||
|
||||
def receive(self, timeout: float | None = None) -> bytes | None:
|
||||
def receive(self, timeout: float | None = 0.1) -> bytes | None:
|
||||
"""Receive the next message from the subscription."""
|
||||
if self._closed.is_set():
|
||||
raise SubscriptionClosedError(f"The Redis {self._get_subscription_type()} subscription is closed")
|
||||
|
||||
Reference in New Issue
Block a user