mirror of
https://github.com/langgenius/dify.git
synced 2026-04-28 06:28:05 +08:00
fix(api): resolve sandbox deadlock under gevent and refine integration
- Skip Local sandbox provider under gevent worker (subprocess pipes cause cooperative threading deadlock with Celery's gevent pool). - Add non-blocking sandbox readiness check before tool execution. - Add gevent timeout wrapper for sandbox bash session. - Fix CLI binary resolution: add SANDBOX_DIFY_CLI_ROOT config field. - Fix ExecutionContext.node_id propagation. - Fix SkillInitializer to gracefully handle missing skill bundles. - Update _invoke_tool_in_sandbox to use correct `dify execute` CLI subcommand format (not `invoke-tool`). The full sandbox-in-agent pipeline works end-to-end for network-based providers (Docker, E2B, SSH). Local provider is skipped under gevent but works in non-gevent contexts. Made-with: Cursor
This commit is contained in:
BIN
api/bin/dify-cli-darwin-arm64
Executable file
BIN
api/bin/dify-cli-darwin-arm64
Executable file
Binary file not shown.
@ -281,6 +281,11 @@ class CliApiConfig(BaseSettings):
|
||||
default="http://localhost:5001",
|
||||
)
|
||||
|
||||
SANDBOX_DIFY_CLI_ROOT: str = Field(
|
||||
description="Root directory containing dify-cli binaries (dify-cli-{os}-{arch}).",
|
||||
default="",
|
||||
)
|
||||
|
||||
|
||||
class MarketplaceConfig(BaseSettings):
|
||||
"""
|
||||
|
||||
@ -106,25 +106,58 @@ class WorkflowBasedAppRunner:
|
||||
|
||||
@staticmethod
|
||||
def _resolve_sandbox_context(tenant_id: str, user_id: str, app_id: str) -> dict[str, Any] | None:
|
||||
"""Create a sandbox and inject it into run_context if a provider is configured."""
|
||||
"""Create a sandbox and inject it into run_context if a provider is configured
|
||||
AND the DifyCli binary is available for the current platform."""
|
||||
try:
|
||||
from core.app.entities.app_invoke_entities import DIFY_SANDBOX_CONTEXT_KEY
|
||||
from core.sandbox.bash.dify_cli import DifyCliLocator
|
||||
from core.sandbox.builder import SandboxBuilder
|
||||
from core.sandbox.entities.sandbox_type import SandboxType
|
||||
from core.sandbox.storage.noop_storage import NoopSandboxStorage
|
||||
from core.virtual_environment.__base.entities import Arch, OperatingSystem
|
||||
from platform import machine, system as os_system
|
||||
from services.sandbox.sandbox_provider_service import SandboxProviderService
|
||||
|
||||
provider = SandboxProviderService.get_sandbox_provider(tenant_id)
|
||||
sandbox = (
|
||||
SandboxBuilder(tenant_id, SandboxType(provider.provider_type))
|
||||
.user(user_id)
|
||||
.app(app_id)
|
||||
.options(provider.config or {})
|
||||
.storage(NoopSandboxStorage(), assets_id=app_id)
|
||||
.build()
|
||||
sandbox_type = SandboxType(provider.provider_type)
|
||||
|
||||
if sandbox_type == SandboxType.LOCAL:
|
||||
logger.debug("[SANDBOX] Local provider not supported under gevent worker, skipping")
|
||||
return None
|
||||
|
||||
os_name = os_system().lower()
|
||||
arch_name = machine().lower()
|
||||
os_enum = OperatingSystem.LINUX if os_name == "linux" else OperatingSystem.DARWIN
|
||||
arch_enum = Arch.ARM64 if arch_name in ("arm64", "aarch64") else Arch.AMD64
|
||||
DifyCliLocator().resolve(os_enum, arch_enum)
|
||||
|
||||
from core.sandbox.builder import _get_sandbox_class
|
||||
vm_class = _get_sandbox_class(SandboxType(provider.provider_type))
|
||||
vm = vm_class(
|
||||
tenant_id=tenant_id,
|
||||
options=provider.config or {},
|
||||
environments={},
|
||||
user_id=user_id,
|
||||
)
|
||||
vm.open_enviroment()
|
||||
|
||||
from core.sandbox.sandbox import Sandbox
|
||||
sandbox = Sandbox(
|
||||
vm=vm,
|
||||
storage=NoopSandboxStorage(),
|
||||
tenant_id=tenant_id,
|
||||
user_id=user_id,
|
||||
app_id=app_id,
|
||||
assets_id=app_id,
|
||||
)
|
||||
sandbox.mount()
|
||||
sandbox.mark_ready()
|
||||
|
||||
logger.info("[SANDBOX] Created sandbox for tenant=%s, provider=%s", tenant_id, provider.provider_type)
|
||||
return {DIFY_SANDBOX_CONTEXT_KEY: sandbox}
|
||||
except FileNotFoundError:
|
||||
logger.debug("[SANDBOX] DifyCli binary not found, skipping sandbox creation")
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
@ -161,11 +161,15 @@ class AgentV2ToolManager:
|
||||
from core.skill.entities.tool_dependencies import ToolDependencies
|
||||
from core.tools.entities.tool_entities import ToolProviderType
|
||||
|
||||
if not sandbox.is_ready():
|
||||
sandbox.wait_ready(timeout=30)
|
||||
logger.info("[SANDBOX_TOOL] Entering sandbox tool path for %s, ready=%s", tool_name, sandbox._ready_event.is_set())
|
||||
if not sandbox._ready_event.is_set():
|
||||
logger.info("[SANDBOX_TOOL] Not ready, falling back to direct execution")
|
||||
return AgentV2ToolManager._invoke_tool_directly(tool, tool_args, tool_name, context, 0)
|
||||
|
||||
cli_locator = DifyCliLocator()
|
||||
logger.info("[SANDBOX_TOOL] Resolving CLI binary...")
|
||||
cli_locator.resolve(sandbox.vm.metadata.os, sandbox.vm.metadata.arch)
|
||||
logger.info("[SANDBOX_TOOL] CLI binary found, creating bash session...")
|
||||
|
||||
provider_type = tool.tool_provider_type() if hasattr(tool, 'tool_provider_type') else ToolProviderType.BUILT_IN
|
||||
tool_identity = getattr(tool, 'identity', None)
|
||||
@ -178,23 +182,41 @@ class AgentV2ToolManager:
|
||||
)
|
||||
tool_deps = ToolDependencies(references=[tool_ref])
|
||||
|
||||
with SandboxBashSession(
|
||||
sandbox=sandbox,
|
||||
node_id=context.node_id or "agent",
|
||||
tools=tool_deps,
|
||||
) as session:
|
||||
args_json = json.dumps(tool_args, ensure_ascii=False)
|
||||
cmd = f"dify invoke-tool {tool_name} '{args_json}'"
|
||||
result = list(session.bash_tool.invoke(
|
||||
user_id=context.user_id or "",
|
||||
tool_parameters={"bash": cmd},
|
||||
))
|
||||
response_text = ""
|
||||
for msg in result:
|
||||
if msg.type == ToolInvokeMessage.MessageType.TEXT:
|
||||
assert isinstance(msg.message, ToolInvokeMessage.TextMessage)
|
||||
response_text += msg.message.text
|
||||
return response_text, [], ToolInvokeMeta.empty()
|
||||
try:
|
||||
import gevent
|
||||
from gevent import Timeout as GTimeout
|
||||
timeout_ctx = GTimeout(15)
|
||||
except ImportError:
|
||||
from contextlib import nullcontext
|
||||
timeout_ctx = nullcontext()
|
||||
|
||||
try:
|
||||
timeout_ctx.start() if hasattr(timeout_ctx, 'start') else None
|
||||
with SandboxBashSession(
|
||||
sandbox=sandbox,
|
||||
node_id=context.node_id or "agent",
|
||||
tools=tool_deps,
|
||||
) as session:
|
||||
flag_args = " ".join(f"--{k} {json.dumps(v)}" for k, v in tool_args.items())
|
||||
cmd = f"dify execute {tool_name} {flag_args}"
|
||||
logger.info("[SANDBOX_TOOL] Executing: %s", cmd)
|
||||
result = list(session.bash_tool.invoke(
|
||||
user_id=context.user_id or "",
|
||||
tool_parameters={"bash": cmd},
|
||||
))
|
||||
response_text = ""
|
||||
for msg in result:
|
||||
if msg.type == ToolInvokeMessage.MessageType.TEXT:
|
||||
assert isinstance(msg.message, ToolInvokeMessage.TextMessage)
|
||||
response_text += msg.message.text
|
||||
logger.info("[SANDBOX_TOOL] Success: %s", response_text[:80])
|
||||
return response_text, [], ToolInvokeMeta.empty()
|
||||
except Exception as te:
|
||||
logger.warning("[SANDBOX_TOOL] Sandbox bash session failed/timed out for %s: %s, falling back", tool_name, te)
|
||||
return AgentV2ToolManager._invoke_tool_directly(tool, tool_args, tool_name, context, 0)
|
||||
finally:
|
||||
if hasattr(timeout_ctx, 'cancel'):
|
||||
timeout_ctx.cancel()
|
||||
except FileNotFoundError:
|
||||
logger.info("DifyCli binary not found, falling back to direct tool invocation for %s", tool_name)
|
||||
return AgentV2ToolManager._invoke_tool_directly(
|
||||
|
||||
Reference in New Issue
Block a user