mirror of
https://github.com/langgenius/dify.git
synced 2026-05-04 01:18:05 +08:00
refactor: replace threading with gevent primitives for cooperative scheduling
Updated multiple modules to utilize gevent for concurrency, ensuring compatibility with gevent-based WSGI servers. This includes replacing threading.Thread and threading.Event with gevent.spawn and gevent.event.Event, respectively, to prevent blocking and improve performance during I/O operations. - Refactored SandboxBuilder, Sandbox, CommandFuture, and DockerDemuxer to use gevent. - Added detailed docstrings explaining the changes and benefits of using gevent primitives. This change enhances the responsiveness and efficiency of the application in a gevent environment.
This commit is contained in:
@ -70,9 +70,7 @@ class ContextGeneratePayload(BaseModel):
|
||||
model_config_data: dict[str, Any] = Field(..., alias="model_config", description="Model configuration")
|
||||
available_vars: list[AvailableVarPayload] = Field(..., description="Available variables from upstream nodes")
|
||||
parameter_info: ParameterInfoPayload = Field(..., description="Target parameter metadata from the frontend")
|
||||
code_context: CodeContextPayload = Field(
|
||||
description="Existing code node context for incremental generation"
|
||||
)
|
||||
code_context: CodeContextPayload = Field(description="Existing code node context for incremental generation")
|
||||
|
||||
|
||||
class SuggestedQuestionsPayload(BaseModel):
|
||||
|
||||
@ -1,10 +1,25 @@
|
||||
"""
|
||||
Sandbox Builder: Factory for creating and initializing sandboxes.
|
||||
|
||||
This module uses gevent.spawn instead of threading.Thread to ensure proper
|
||||
cooperative scheduling in gevent-based WSGI servers like Gunicorn.
|
||||
|
||||
Using native threading.Thread in a gevent environment can cause issues because:
|
||||
1. Native threads hold the GIL during blocking I/O
|
||||
2. gevent's monkey-patching doesn't affect code running in native threads
|
||||
3. Blocking operations in native threads prevent greenlet switching
|
||||
|
||||
By using gevent.spawn(), background initialization runs as a greenlet that
|
||||
cooperatively yields during I/O operations.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import threading
|
||||
from collections.abc import Mapping, Sequence
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import gevent
|
||||
from flask import current_app
|
||||
|
||||
from core.entities.provider_entities import BasicProviderConfig
|
||||
@ -145,7 +160,8 @@ class SandboxBuilder:
|
||||
sandbox.mark_failed(exc)
|
||||
|
||||
# Background init completes or signals failure via sandbox state.
|
||||
threading.Thread(target=initialize, daemon=True).start()
|
||||
# Use gevent.spawn instead of threading.Thread for cooperative scheduling
|
||||
gevent.spawn(initialize)
|
||||
return sandbox
|
||||
|
||||
@staticmethod
|
||||
|
||||
@ -1,9 +1,24 @@
|
||||
"""
|
||||
Sandbox: A managed virtual environment instance.
|
||||
|
||||
This module uses gevent.event.Event instead of threading.Event to ensure
|
||||
proper cooperative scheduling in gevent-based WSGI servers like Gunicorn.
|
||||
|
||||
Using native threading.Event in a gevent environment can cause issues because:
|
||||
1. threading.Event.wait() blocks the entire thread, not just the greenlet
|
||||
2. This prevents other greenlets from running while waiting
|
||||
3. Can lead to apparent "freezes" when multiple greenlets wait on events
|
||||
|
||||
By using gevent.event.Event, wait() calls cooperatively yield to other greenlets.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import threading
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from gevent.event import Event
|
||||
|
||||
from libs.attr_map import AttrMap
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@ -31,8 +46,8 @@ class Sandbox:
|
||||
self._app_id = app_id
|
||||
self._assets_id = assets_id
|
||||
self._attributes = AttrMap()
|
||||
self._ready_event = threading.Event()
|
||||
self._cancel_event = threading.Event()
|
||||
self._ready_event: Event = Event() # gevent Event for cooperative waiting
|
||||
self._cancel_event: Event = Event() # gevent Event for cooperative waiting
|
||||
self._init_error: Exception | None = None
|
||||
|
||||
@property
|
||||
|
||||
@ -1,9 +1,26 @@
|
||||
"""
|
||||
CommandFuture: Async command execution with gevent compatibility.
|
||||
|
||||
This module uses gevent primitives (greenlets, events) instead of native threads
|
||||
to ensure proper cooperative scheduling in gevent-based WSGI servers like Gunicorn.
|
||||
|
||||
Using native threading.Thread or concurrent.futures.ThreadPoolExecutor in a gevent
|
||||
environment can cause deadlocks because:
|
||||
1. Native threads hold the GIL during blocking I/O
|
||||
2. gevent's monkey-patching doesn't affect code running in native threads
|
||||
3. Blocking operations in native threads prevent greenlet switching
|
||||
|
||||
By using gevent.spawn() and gevent.event.Event, all I/O operations become
|
||||
cooperative, allowing proper greenlet scheduling even during blocking reads.
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Any
|
||||
|
||||
import gevent
|
||||
from gevent.event import Event
|
||||
|
||||
from core.virtual_environment.__base.entities import CommandResult, CommandStatus
|
||||
from core.virtual_environment.__base.exec import NotSupportedOperationError
|
||||
@ -23,7 +40,12 @@ class CommandCancelledError(Exception):
|
||||
|
||||
class CommandFuture:
|
||||
"""
|
||||
Lightweight future for command execution.
|
||||
Lightweight future for command execution using gevent greenlets.
|
||||
|
||||
This implementation uses gevent primitives instead of native threads to ensure
|
||||
proper cooperative scheduling in gevent-based WSGI servers. All blocking I/O
|
||||
operations are performed in greenlets, allowing gevent to switch between them.
|
||||
|
||||
Mirrors concurrent.futures.Future API with 4 essential methods:
|
||||
result(), done(), cancel(), cancelled().
|
||||
"""
|
||||
@ -44,17 +66,20 @@ class CommandFuture:
|
||||
self._poll_status = poll_status
|
||||
self._poll_interval = poll_interval
|
||||
|
||||
self._done_event = threading.Event()
|
||||
self._lock = threading.Lock()
|
||||
self._done_event: Event = Event() # gevent Event for cooperative waiting
|
||||
self._result: CommandResult | None = None
|
||||
self._exception: BaseException | None = None
|
||||
self._cancelled = False
|
||||
self._started = False
|
||||
self._cancelled: bool = False
|
||||
self._started: bool = False
|
||||
self._execute_greenlet: Any = None
|
||||
|
||||
def result(self, timeout: float | None = None) -> CommandResult:
|
||||
"""
|
||||
Block until command completes and return result.
|
||||
|
||||
Uses gevent.event.Event.wait() for cooperative waiting, allowing other
|
||||
greenlets to run while this one waits.
|
||||
|
||||
Args:
|
||||
timeout: Maximum seconds to wait. None means wait forever.
|
||||
|
||||
@ -64,6 +89,7 @@ class CommandFuture:
|
||||
"""
|
||||
self._ensure_started()
|
||||
|
||||
# gevent Event.wait() returns True if set, False on timeout
|
||||
if not self._done_event.wait(timeout):
|
||||
raise CommandTimeoutError(f"Command timed out after {timeout}s")
|
||||
|
||||
@ -82,65 +108,81 @@ class CommandFuture:
|
||||
|
||||
def cancel(self) -> bool:
|
||||
"""
|
||||
Attempt to cancel command by closing transports.
|
||||
Attempt to cancel command by closing transports and killing greenlets.
|
||||
Returns True if cancelled, False if already completed.
|
||||
"""
|
||||
with self._lock:
|
||||
if self._done_event.is_set():
|
||||
return False
|
||||
self._cancelled = True
|
||||
self._close_transports()
|
||||
self._done_event.set()
|
||||
return True
|
||||
if self._done_event.is_set():
|
||||
return False
|
||||
self._cancelled = True
|
||||
self._close_transports()
|
||||
# Kill the execute greenlet if it's still running
|
||||
if self._execute_greenlet is not None:
|
||||
self._execute_greenlet.kill(block=False)
|
||||
self._done_event.set()
|
||||
return True
|
||||
|
||||
def cancelled(self) -> bool:
|
||||
return self._cancelled
|
||||
|
||||
def _ensure_started(self) -> None:
|
||||
with self._lock:
|
||||
if not self._started:
|
||||
self._started = True
|
||||
thread = threading.Thread(target=self._execute, daemon=True)
|
||||
thread.start()
|
||||
if not self._started:
|
||||
self._started = True
|
||||
# Use gevent.spawn instead of threading.Thread for cooperative scheduling
|
||||
self._execute_greenlet = gevent.spawn(self._execute)
|
||||
|
||||
def _execute(self) -> None:
|
||||
"""
|
||||
Execute command and collect output using gevent greenlets.
|
||||
|
||||
Spawns separate greenlets for stdout/stderr draining to allow concurrent
|
||||
reading while polling for command completion.
|
||||
"""
|
||||
stdout_buf = bytearray()
|
||||
stderr_buf = bytearray()
|
||||
is_combined_stream = self._stdout_transport is self._stderr_transport
|
||||
|
||||
stdout_greenlet: Any = None
|
||||
stderr_greenlet: Any = None
|
||||
|
||||
try:
|
||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||||
stdout_future = executor.submit(self._drain_transport, self._stdout_transport, stdout_buf)
|
||||
stderr_future = None
|
||||
if not is_combined_stream:
|
||||
stderr_future = executor.submit(self._drain_transport, self._stderr_transport, stderr_buf)
|
||||
# Spawn greenlets for draining transports
|
||||
stdout_greenlet = gevent.spawn(self._drain_transport, self._stdout_transport, stdout_buf)
|
||||
if not is_combined_stream:
|
||||
stderr_greenlet = gevent.spawn(self._drain_transport, self._stderr_transport, stderr_buf)
|
||||
|
||||
exit_code = self._wait_for_completion()
|
||||
exit_code = self._wait_for_completion()
|
||||
|
||||
stdout_future.result()
|
||||
if stderr_future:
|
||||
stderr_future.result()
|
||||
# Wait for drain greenlets to complete
|
||||
stdout_greenlet.join()
|
||||
if stderr_greenlet is not None:
|
||||
stderr_greenlet.join()
|
||||
|
||||
with self._lock:
|
||||
if not self._cancelled:
|
||||
self._result = CommandResult(
|
||||
stdout=bytes(stdout_buf),
|
||||
stderr=b"" if is_combined_stream else bytes(stderr_buf),
|
||||
exit_code=exit_code,
|
||||
pid=self._pid,
|
||||
)
|
||||
self._done_event.set()
|
||||
if not self._cancelled:
|
||||
self._result = CommandResult(
|
||||
stdout=bytes(stdout_buf),
|
||||
stderr=b"" if is_combined_stream else bytes(stderr_buf),
|
||||
exit_code=exit_code,
|
||||
pid=self._pid,
|
||||
)
|
||||
self._done_event.set()
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Command execution failed for pid %s", self._pid)
|
||||
with self._lock:
|
||||
if not self._cancelled:
|
||||
self._exception = e
|
||||
self._done_event.set()
|
||||
if not self._cancelled:
|
||||
self._exception = e
|
||||
self._done_event.set()
|
||||
finally:
|
||||
# Kill any remaining greenlets
|
||||
if stdout_greenlet is not None:
|
||||
stdout_greenlet.kill(block=False)
|
||||
if stderr_greenlet is not None:
|
||||
stderr_greenlet.kill(block=False)
|
||||
self._close_transports()
|
||||
|
||||
def _wait_for_completion(self) -> int | None:
|
||||
"""
|
||||
Poll for command completion using gevent.sleep for cooperative yielding.
|
||||
"""
|
||||
while not self._cancelled:
|
||||
try:
|
||||
status = self._poll_status()
|
||||
@ -150,11 +192,18 @@ class CommandFuture:
|
||||
if status.status == CommandStatus.Status.COMPLETED:
|
||||
return status.exit_code
|
||||
|
||||
time.sleep(self._poll_interval)
|
||||
# Use gevent.sleep for cooperative scheduling
|
||||
gevent.sleep(self._poll_interval)
|
||||
|
||||
return None
|
||||
|
||||
def _drain_transport(self, transport: TransportReadCloser, buffer: bytearray) -> None:
|
||||
"""
|
||||
Drain all data from a transport into a buffer.
|
||||
|
||||
This runs in a greenlet, so blocking reads will yield to other greenlets
|
||||
thanks to gevent's monkey-patching of socket operations.
|
||||
"""
|
||||
try:
|
||||
while True:
|
||||
buffer.extend(transport.read(4096))
|
||||
|
||||
@ -1,18 +1,29 @@
|
||||
"""
|
||||
Docker Daemon Virtual Environment Provider.
|
||||
|
||||
This module implements a VirtualEnvironment using Docker containers. It uses gevent
|
||||
primitives for concurrency to ensure compatibility with gevent-based WSGI servers.
|
||||
|
||||
IMPORTANT: This module uses gevent.queue.Queue and gevent.spawn instead of standard
|
||||
library threading primitives. This is critical for proper cooperative scheduling
|
||||
in gevent environments like Gunicorn with gevent workers.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import socket
|
||||
import tarfile
|
||||
import threading
|
||||
from collections.abc import Mapping, Sequence
|
||||
from enum import IntEnum, StrEnum
|
||||
from functools import lru_cache
|
||||
from io import BytesIO
|
||||
from pathlib import PurePosixPath
|
||||
from queue import Queue
|
||||
from typing import Any, cast
|
||||
from uuid import uuid4
|
||||
|
||||
import docker.errors
|
||||
import gevent
|
||||
from docker.models.containers import Container
|
||||
from gevent.queue import Queue
|
||||
|
||||
import docker
|
||||
from configs import dify_config
|
||||
@ -60,14 +71,20 @@ class DockerDemuxer:
|
||||
- Bytes 1-3: reserved (zeros)
|
||||
- Bytes 4-7: payload size (big-endian uint32)
|
||||
|
||||
THREAD SAFETY:
|
||||
A single background thread reads frames from the socket and dispatches payloads
|
||||
to thread-safe queues. This avoids race conditions where multiple threads
|
||||
calling _read_next_frame() simultaneously caused frame header/body corruption,
|
||||
resulting in incomplete stdout/stderr output.
|
||||
GEVENT COMPATIBILITY:
|
||||
This class uses gevent.spawn() and gevent.queue.Queue instead of native threading
|
||||
to ensure proper cooperative scheduling in gevent-based WSGI servers. Native threads
|
||||
can cause deadlocks in gevent because:
|
||||
1. Native threads hold the GIL during blocking I/O
|
||||
2. gevent's monkey-patching doesn't affect code running in native threads
|
||||
3. Blocking Queue.get() in native threads prevents greenlet switching
|
||||
|
||||
By using gevent primitives, all I/O operations become cooperative, allowing
|
||||
proper greenlet scheduling even during blocking socket reads.
|
||||
"""
|
||||
|
||||
_HEADER_SIZE = 8
|
||||
_QUEUE_GET_TIMEOUT = 5.0 # Timeout for queue.get() to allow checking for errors
|
||||
|
||||
def __init__(self, sock: socket.SocketIO):
|
||||
self._sock = sock
|
||||
@ -76,14 +93,16 @@ class DockerDemuxer:
|
||||
self._closed = False
|
||||
self._error: BaseException | None = None
|
||||
|
||||
self._demux_thread = threading.Thread(
|
||||
target=self._demux_loop,
|
||||
daemon=True,
|
||||
name="docker-demuxer",
|
||||
)
|
||||
self._demux_thread.start()
|
||||
# Use gevent.spawn instead of threading.Thread for cooperative scheduling
|
||||
self._demux_greenlet = gevent.spawn(self._demux_loop)
|
||||
|
||||
def _demux_loop(self) -> None:
|
||||
"""
|
||||
Read frames from socket and dispatch to appropriate queues.
|
||||
|
||||
Runs in a greenlet, so socket reads will yield to other greenlets
|
||||
thanks to gevent's monkey-patching.
|
||||
"""
|
||||
try:
|
||||
while not self._closed:
|
||||
header = self._read_exact(self._HEADER_SIZE)
|
||||
@ -112,6 +131,11 @@ class DockerDemuxer:
|
||||
self._stderr_queue.put(None)
|
||||
|
||||
def _read_exact(self, size: int) -> bytes:
|
||||
"""
|
||||
Read exactly `size` bytes from socket.
|
||||
|
||||
Socket reads are cooperative in gevent environment due to monkey-patching.
|
||||
"""
|
||||
data = bytearray()
|
||||
remaining = size
|
||||
while remaining > 0:
|
||||
@ -132,19 +156,42 @@ class DockerDemuxer:
|
||||
return self._read_from_queue(self._stderr_queue)
|
||||
|
||||
def _read_from_queue(self, queue: Queue[bytes | None]) -> bytes:
|
||||
"""
|
||||
Read from queue with timeout to allow periodic error checking.
|
||||
|
||||
Uses gevent.queue.Queue which cooperatively yields during blocking get().
|
||||
The timeout ensures we can detect errors and closed state even if no data arrives.
|
||||
"""
|
||||
if self._error:
|
||||
raise TransportEOFError(f"Demuxer error: {self._error}") from self._error
|
||||
|
||||
chunk = queue.get()
|
||||
if chunk is None:
|
||||
if self._error:
|
||||
raise TransportEOFError(f"Demuxer error: {str(self._error)}")
|
||||
raise TransportEOFError("End of demuxed stream")
|
||||
return chunk
|
||||
while True:
|
||||
try:
|
||||
# Use timeout to periodically check for errors/closed state
|
||||
chunk = queue.get(timeout=self._QUEUE_GET_TIMEOUT)
|
||||
if chunk is None:
|
||||
if self._error:
|
||||
raise TransportEOFError(f"Demuxer error: {str(self._error)}")
|
||||
raise TransportEOFError("End of demuxed stream")
|
||||
return chunk
|
||||
except gevent.queue.Empty:
|
||||
# Timeout - check if we should continue waiting
|
||||
if self._closed:
|
||||
raise TransportEOFError("Demuxer closed")
|
||||
if self._error:
|
||||
raise TransportEOFError(f"Demuxer error: {self._error}") from self._error
|
||||
# No error, continue waiting
|
||||
continue
|
||||
|
||||
def close(self) -> None:
|
||||
"""
|
||||
Close the demuxer and kill the background greenlet.
|
||||
"""
|
||||
if not self._closed:
|
||||
self._closed = True
|
||||
# Kill the demux greenlet to stop any blocking reads
|
||||
if self._demux_greenlet is not None:
|
||||
self._demux_greenlet.kill(block=False)
|
||||
try:
|
||||
self._sock.close()
|
||||
except Exception:
|
||||
|
||||
Reference in New Issue
Block a user