mirror of
https://github.com/langgenius/dify.git
synced 2026-05-04 01:18:05 +08:00
fix: add timeout to queue.get() in QueueTransportReadCloser to prevent indefinite blocking
This commit is contained in:
@ -1,5 +1,3 @@
|
||||
from queue import Queue
|
||||
|
||||
from core.virtual_environment.channel.exec import TransportEOFError
|
||||
from core.virtual_environment.channel.transport import TransportReadCloser
|
||||
|
||||
@ -22,11 +20,15 @@ class QueueTransportReadCloser(TransportReadCloser):
|
||||
q_transport.close()
|
||||
"""
|
||||
|
||||
_QUEUE_GET_TIMEOUT = 5.0
|
||||
|
||||
class WriteHandler:
|
||||
"""
|
||||
A write handler that writes data to a queue.
|
||||
"""
|
||||
|
||||
from queue import Queue
|
||||
|
||||
def __init__(self, queue: Queue[bytes | None]) -> None:
|
||||
self.queue = queue
|
||||
|
||||
@ -39,6 +41,8 @@ class QueueTransportReadCloser(TransportReadCloser):
|
||||
"""
|
||||
Initialize the QueueTransportReadCloser with write function.
|
||||
"""
|
||||
from queue import Queue
|
||||
|
||||
self.q = Queue[bytes | None]()
|
||||
self._read_buffer = bytearray()
|
||||
self._closed = False
|
||||
@ -66,6 +70,8 @@ class QueueTransportReadCloser(TransportReadCloser):
|
||||
|
||||
NEVER USE IT IN A MULTI-THREADED CONTEXT WITHOUT PROPER SYNCHRONIZATION.
|
||||
"""
|
||||
from queue import Empty
|
||||
|
||||
if n <= 0:
|
||||
return b""
|
||||
|
||||
@ -79,7 +85,12 @@ class QueueTransportReadCloser(TransportReadCloser):
|
||||
round = 0
|
||||
|
||||
while len(to_return) < n and not self._closed and (self.q.qsize() > 0 or round == 0):
|
||||
chunk = self.q.get()
|
||||
try:
|
||||
chunk = self.q.get(timeout=self._QUEUE_GET_TIMEOUT)
|
||||
except Empty:
|
||||
if self._closed:
|
||||
raise TransportEOFError("Transport is closed")
|
||||
continue
|
||||
if chunk is None:
|
||||
self._closed = True
|
||||
if len(to_return) == 0:
|
||||
|
||||
Reference in New Issue
Block a user