mirror of
https://github.com/langgenius/dify.git
synced 2026-05-03 00:48:04 +08:00
fix: add timeout to queue.get() in DockerDemuxer to prevent indefinite blocking
This commit is contained in:
@ -7,7 +7,7 @@ from enum import IntEnum, StrEnum
|
||||
from functools import lru_cache
|
||||
from io import BytesIO
|
||||
from pathlib import PurePosixPath
|
||||
from queue import Queue
|
||||
from queue import Empty, Queue
|
||||
from typing import Any, cast
|
||||
from uuid import uuid4
|
||||
|
||||
@ -65,9 +65,15 @@ class DockerDemuxer:
|
||||
to thread-safe queues. This avoids race conditions where multiple threads
|
||||
calling _read_next_frame() simultaneously caused frame header/body corruption,
|
||||
resulting in incomplete stdout/stderr output.
|
||||
|
||||
TIMEOUT HANDLING:
|
||||
Queue.get() uses a timeout to prevent indefinite blocking when the socket is
|
||||
closed unexpectedly (e.g., container removed). This allows periodic checks for
|
||||
error conditions and closed state.
|
||||
"""
|
||||
|
||||
_HEADER_SIZE = 8
|
||||
_QUEUE_GET_TIMEOUT = 5.0 # seconds
|
||||
|
||||
def __init__(self, sock: socket.SocketIO):
|
||||
self._sock = sock
|
||||
@ -132,15 +138,31 @@ class DockerDemuxer:
|
||||
return self._read_from_queue(self._stderr_queue)
|
||||
|
||||
def _read_from_queue(self, queue: Queue[bytes | None]) -> bytes:
|
||||
"""
|
||||
Read from queue with timeout to prevent indefinite blocking.
|
||||
|
||||
When the Docker container is removed or the socket is closed unexpectedly,
|
||||
the demux thread may be stuck in socket.read(). Using a timeout allows us
|
||||
to periodically check for errors and closed state instead of blocking forever.
|
||||
"""
|
||||
if self._error:
|
||||
raise TransportEOFError(f"Demuxer error: {self._error}") from self._error
|
||||
|
||||
chunk = queue.get()
|
||||
if chunk is None:
|
||||
if self._error:
|
||||
raise TransportEOFError(f"Demuxer error: {str(self._error)}")
|
||||
raise TransportEOFError("End of demuxed stream")
|
||||
return chunk
|
||||
while True:
|
||||
try:
|
||||
chunk = queue.get(timeout=self._QUEUE_GET_TIMEOUT)
|
||||
if chunk is None:
|
||||
if self._error:
|
||||
raise TransportEOFError(f"Demuxer error: {str(self._error)}")
|
||||
raise TransportEOFError("End of demuxed stream")
|
||||
return chunk
|
||||
except Empty:
|
||||
# Timeout - check if we should continue waiting
|
||||
if self._closed:
|
||||
raise TransportEOFError("Demuxer closed")
|
||||
if self._error:
|
||||
raise TransportEOFError(f"Demuxer error: {self._error}") from self._error
|
||||
# No error, continue waiting
|
||||
|
||||
def close(self) -> None:
|
||||
if not self._closed:
|
||||
|
||||
Reference in New Issue
Block a user