Revert "refactor: replace threading with gevent primitives for cooperative scheduling"

This reverts commit 27781d6b7e.
This commit is contained in:
Harry
2026-01-28 13:51:48 +08:00
parent 27781d6b7e
commit 392cec2f54
5 changed files with 71 additions and 196 deletions

View File

@ -70,7 +70,9 @@ class ContextGeneratePayload(BaseModel):
model_config_data: dict[str, Any] = Field(..., alias="model_config", description="Model configuration")
available_vars: list[AvailableVarPayload] = Field(..., description="Available variables from upstream nodes")
parameter_info: ParameterInfoPayload = Field(..., description="Target parameter metadata from the frontend")
code_context: CodeContextPayload = Field(description="Existing code node context for incremental generation")
code_context: CodeContextPayload = Field(
description="Existing code node context for incremental generation"
)
class SuggestedQuestionsPayload(BaseModel):

View File

@ -1,25 +1,10 @@
"""
Sandbox Builder: Factory for creating and initializing sandboxes.
This module uses gevent.spawn instead of threading.Thread to ensure proper
cooperative scheduling in gevent-based WSGI servers like Gunicorn.
Using native threading.Thread in a gevent environment can cause issues because:
1. Native threads hold the GIL during blocking I/O
2. gevent's monkey-patching doesn't affect code running in native threads
3. Blocking operations in native threads prevent greenlet switching
By using gevent.spawn(), background initialization runs as a greenlet that
cooperatively yields during I/O operations.
"""
from __future__ import annotations
import logging
import threading
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any
import gevent
from flask import current_app
from core.entities.provider_entities import BasicProviderConfig
@ -160,8 +145,7 @@ class SandboxBuilder:
sandbox.mark_failed(exc)
# Background init completes or signals failure via sandbox state.
# Use gevent.spawn instead of threading.Thread for cooperative scheduling
gevent.spawn(initialize)
threading.Thread(target=initialize, daemon=True).start()
return sandbox
@staticmethod

View File

@ -1,24 +1,9 @@
"""
Sandbox: A managed virtual environment instance.
This module uses gevent.event.Event instead of threading.Event to ensure
proper cooperative scheduling in gevent-based WSGI servers like Gunicorn.
Using native threading.Event in a gevent environment can cause issues because:
1. threading.Event.wait() blocks the entire thread, not just the greenlet
2. This prevents other greenlets from running while waiting
3. Can lead to apparent "freezes" when multiple greenlets wait on events
By using gevent.event.Event, wait() calls cooperatively yield to other greenlets.
"""
from __future__ import annotations
import logging
import threading
from typing import TYPE_CHECKING
from gevent.event import Event
from libs.attr_map import AttrMap
if TYPE_CHECKING:
@ -46,8 +31,8 @@ class Sandbox:
self._app_id = app_id
self._assets_id = assets_id
self._attributes = AttrMap()
self._ready_event: Event = Event() # gevent Event for cooperative waiting
self._cancel_event: Event = Event() # gevent Event for cooperative waiting
self._ready_event = threading.Event()
self._cancel_event = threading.Event()
self._init_error: Exception | None = None
@property

View File

@ -1,26 +1,9 @@
"""
CommandFuture: Async command execution with gevent compatibility.
This module uses gevent primitives (greenlets, events) instead of native threads
to ensure proper cooperative scheduling in gevent-based WSGI servers like Gunicorn.
Using native threading.Thread or concurrent.futures.ThreadPoolExecutor in a gevent
environment can cause deadlocks because:
1. Native threads hold the GIL during blocking I/O
2. gevent's monkey-patching doesn't affect code running in native threads
3. Blocking operations in native threads prevent greenlet switching
By using gevent.spawn() and gevent.event.Event, all I/O operations become
cooperative, allowing proper greenlet scheduling even during blocking reads.
"""
import contextlib
import logging
import threading
import time
from collections.abc import Callable
from typing import Any
import gevent
from gevent.event import Event
from concurrent.futures import ThreadPoolExecutor
from core.virtual_environment.__base.entities import CommandResult, CommandStatus
from core.virtual_environment.__base.exec import NotSupportedOperationError
@ -40,12 +23,7 @@ class CommandCancelledError(Exception):
class CommandFuture:
"""
Lightweight future for command execution using gevent greenlets.
This implementation uses gevent primitives instead of native threads to ensure
proper cooperative scheduling in gevent-based WSGI servers. All blocking I/O
operations are performed in greenlets, allowing gevent to switch between them.
Lightweight future for command execution.
Mirrors concurrent.futures.Future API with 4 essential methods:
result(), done(), cancel(), cancelled().
"""
@ -66,20 +44,17 @@ class CommandFuture:
self._poll_status = poll_status
self._poll_interval = poll_interval
self._done_event: Event = Event() # gevent Event for cooperative waiting
self._done_event = threading.Event()
self._lock = threading.Lock()
self._result: CommandResult | None = None
self._exception: BaseException | None = None
self._cancelled: bool = False
self._started: bool = False
self._execute_greenlet: Any = None
self._cancelled = False
self._started = False
def result(self, timeout: float | None = None) -> CommandResult:
"""
Block until command completes and return result.
Uses gevent.event.Event.wait() for cooperative waiting, allowing other
greenlets to run while this one waits.
Args:
timeout: Maximum seconds to wait. None means wait forever.
@ -89,7 +64,6 @@ class CommandFuture:
"""
self._ensure_started()
# gevent Event.wait() returns True if set, False on timeout
if not self._done_event.wait(timeout):
raise CommandTimeoutError(f"Command timed out after {timeout}s")
@ -108,81 +82,65 @@ class CommandFuture:
def cancel(self) -> bool:
"""
Attempt to cancel command by closing transports and killing greenlets.
Attempt to cancel command by closing transports.
Returns True if cancelled, False if already completed.
"""
if self._done_event.is_set():
return False
self._cancelled = True
self._close_transports()
# Kill the execute greenlet if it's still running
if self._execute_greenlet is not None:
self._execute_greenlet.kill(block=False)
self._done_event.set()
return True
with self._lock:
if self._done_event.is_set():
return False
self._cancelled = True
self._close_transports()
self._done_event.set()
return True
def cancelled(self) -> bool:
return self._cancelled
def _ensure_started(self) -> None:
if not self._started:
self._started = True
# Use gevent.spawn instead of threading.Thread for cooperative scheduling
self._execute_greenlet = gevent.spawn(self._execute)
with self._lock:
if not self._started:
self._started = True
thread = threading.Thread(target=self._execute, daemon=True)
thread.start()
def _execute(self) -> None:
"""
Execute command and collect output using gevent greenlets.
Spawns separate greenlets for stdout/stderr draining to allow concurrent
reading while polling for command completion.
"""
stdout_buf = bytearray()
stderr_buf = bytearray()
is_combined_stream = self._stdout_transport is self._stderr_transport
stdout_greenlet: Any = None
stderr_greenlet: Any = None
try:
# Spawn greenlets for draining transports
stdout_greenlet = gevent.spawn(self._drain_transport, self._stdout_transport, stdout_buf)
if not is_combined_stream:
stderr_greenlet = gevent.spawn(self._drain_transport, self._stderr_transport, stderr_buf)
with ThreadPoolExecutor(max_workers=2) as executor:
stdout_future = executor.submit(self._drain_transport, self._stdout_transport, stdout_buf)
stderr_future = None
if not is_combined_stream:
stderr_future = executor.submit(self._drain_transport, self._stderr_transport, stderr_buf)
exit_code = self._wait_for_completion()
exit_code = self._wait_for_completion()
# Wait for drain greenlets to complete
stdout_greenlet.join()
if stderr_greenlet is not None:
stderr_greenlet.join()
stdout_future.result()
if stderr_future:
stderr_future.result()
if not self._cancelled:
self._result = CommandResult(
stdout=bytes(stdout_buf),
stderr=b"" if is_combined_stream else bytes(stderr_buf),
exit_code=exit_code,
pid=self._pid,
)
self._done_event.set()
with self._lock:
if not self._cancelled:
self._result = CommandResult(
stdout=bytes(stdout_buf),
stderr=b"" if is_combined_stream else bytes(stderr_buf),
exit_code=exit_code,
pid=self._pid,
)
self._done_event.set()
except Exception as e:
logger.exception("Command execution failed for pid %s", self._pid)
if not self._cancelled:
self._exception = e
self._done_event.set()
with self._lock:
if not self._cancelled:
self._exception = e
self._done_event.set()
finally:
# Kill any remaining greenlets
if stdout_greenlet is not None:
stdout_greenlet.kill(block=False)
if stderr_greenlet is not None:
stderr_greenlet.kill(block=False)
self._close_transports()
def _wait_for_completion(self) -> int | None:
"""
Poll for command completion using gevent.sleep for cooperative yielding.
"""
while not self._cancelled:
try:
status = self._poll_status()
@ -192,18 +150,11 @@ class CommandFuture:
if status.status == CommandStatus.Status.COMPLETED:
return status.exit_code
# Use gevent.sleep for cooperative scheduling
gevent.sleep(self._poll_interval)
time.sleep(self._poll_interval)
return None
def _drain_transport(self, transport: TransportReadCloser, buffer: bytearray) -> None:
"""
Drain all data from a transport into a buffer.
This runs in a greenlet, so blocking reads will yield to other greenlets
thanks to gevent's monkey-patching of socket operations.
"""
try:
while True:
buffer.extend(transport.read(4096))

View File

@ -1,29 +1,18 @@
"""
Docker Daemon Virtual Environment Provider.
This module implements a VirtualEnvironment using Docker containers. It uses gevent
primitives for concurrency to ensure compatibility with gevent-based WSGI servers.
IMPORTANT: This module uses gevent.queue.Queue and gevent.spawn instead of standard
library threading primitives. This is critical for proper cooperative scheduling
in gevent environments like Gunicorn with gevent workers.
"""
import logging
import socket
import tarfile
import threading
from collections.abc import Mapping, Sequence
from enum import IntEnum, StrEnum
from functools import lru_cache
from io import BytesIO
from pathlib import PurePosixPath
from queue import Queue
from typing import Any, cast
from uuid import uuid4
import docker.errors
import gevent
from docker.models.containers import Container
from gevent.queue import Queue
import docker
from configs import dify_config
@ -71,20 +60,14 @@ class DockerDemuxer:
- Bytes 1-3: reserved (zeros)
- Bytes 4-7: payload size (big-endian uint32)
GEVENT COMPATIBILITY:
This class uses gevent.spawn() and gevent.queue.Queue instead of native threading
to ensure proper cooperative scheduling in gevent-based WSGI servers. Native threads
can cause deadlocks in gevent because:
1. Native threads hold the GIL during blocking I/O
2. gevent's monkey-patching doesn't affect code running in native threads
3. Blocking Queue.get() in native threads prevents greenlet switching
By using gevent primitives, all I/O operations become cooperative, allowing
proper greenlet scheduling even during blocking socket reads.
THREAD SAFETY:
A single background thread reads frames from the socket and dispatches payloads
to thread-safe queues. This avoids race conditions where multiple threads
calling _read_next_frame() simultaneously caused frame header/body corruption,
resulting in incomplete stdout/stderr output.
"""
_HEADER_SIZE = 8
_QUEUE_GET_TIMEOUT = 5.0 # Timeout for queue.get() to allow checking for errors
def __init__(self, sock: socket.SocketIO):
self._sock = sock
@ -93,16 +76,14 @@ class DockerDemuxer:
self._closed = False
self._error: BaseException | None = None
# Use gevent.spawn instead of threading.Thread for cooperative scheduling
self._demux_greenlet = gevent.spawn(self._demux_loop)
self._demux_thread = threading.Thread(
target=self._demux_loop,
daemon=True,
name="docker-demuxer",
)
self._demux_thread.start()
def _demux_loop(self) -> None:
"""
Read frames from socket and dispatch to appropriate queues.
Runs in a greenlet, so socket reads will yield to other greenlets
thanks to gevent's monkey-patching.
"""
try:
while not self._closed:
header = self._read_exact(self._HEADER_SIZE)
@ -131,11 +112,6 @@ class DockerDemuxer:
self._stderr_queue.put(None)
def _read_exact(self, size: int) -> bytes:
"""
Read exactly `size` bytes from socket.
Socket reads are cooperative in gevent environment due to monkey-patching.
"""
data = bytearray()
remaining = size
while remaining > 0:
@ -156,42 +132,19 @@ class DockerDemuxer:
return self._read_from_queue(self._stderr_queue)
def _read_from_queue(self, queue: Queue[bytes | None]) -> bytes:
"""
Read from queue with timeout to allow periodic error checking.
Uses gevent.queue.Queue which cooperatively yields during blocking get().
The timeout ensures we can detect errors and closed state even if no data arrives.
"""
if self._error:
raise TransportEOFError(f"Demuxer error: {self._error}") from self._error
while True:
try:
# Use timeout to periodically check for errors/closed state
chunk = queue.get(timeout=self._QUEUE_GET_TIMEOUT)
if chunk is None:
if self._error:
raise TransportEOFError(f"Demuxer error: {str(self._error)}")
raise TransportEOFError("End of demuxed stream")
return chunk
except gevent.queue.Empty:
# Timeout - check if we should continue waiting
if self._closed:
raise TransportEOFError("Demuxer closed")
if self._error:
raise TransportEOFError(f"Demuxer error: {self._error}") from self._error
# No error, continue waiting
continue
chunk = queue.get()
if chunk is None:
if self._error:
raise TransportEOFError(f"Demuxer error: {str(self._error)}")
raise TransportEOFError("End of demuxed stream")
return chunk
def close(self) -> None:
"""
Close the demuxer and kill the background greenlet.
"""
if not self._closed:
self._closed = True
# Kill the demux greenlet to stop any blocking reads
if self._demux_greenlet is not None:
self._demux_greenlet.kill(block=False)
try:
self._sock.close()
except Exception: