feat: enhance tool access management with ToolAccessDescription and update ToolAccessPolicy

This commit is contained in:
Harry
2026-03-11 18:45:56 +08:00
parent 8513fa2897
commit 6fe221518e
3 changed files with 100 additions and 46 deletions

View File

@ -4,7 +4,7 @@ import json
import logging
import mimetypes
import os
from io import BytesIO
import shlex
from types import TracebackType
from core.file import File, FileTransferMethod, FileType
@ -52,8 +52,10 @@ class SandboxBashSession:
user_id=self._user_id,
context=CliContext(tool_access=ToolAccessPolicy.from_dependencies(self._tools)),
)
# FIXME(Mairuis): enable all tool using
tools_path = DifyCli.GLOBAL_TOOLS_PATH
if self._tools is not None and not self._tools.is_empty():
tools_path = self._setup_node_tools_directory(self._node_id, self._tools, self._cli_api_session)
else:
tools_path = DifyCli.GLOBAL_TOOLS_PATH
self._bash_tool = SandboxBashTool(
sandbox=self._sandbox.vm,
@ -69,22 +71,28 @@ class SandboxBashSession:
cli_api_session: CliApiSession,
) -> str:
node_tools_path = f"{DifyCli.TOOLS_ROOT}/{node_id}"
vm = self._sandbox.vm
(
pipeline(vm)
.add(["mkdir", "-p", DifyCli.GLOBAL_TOOLS_PATH], error_message="Failed to create global tools dir")
.add(["mkdir", "-p", node_tools_path], error_message="Failed to create node tools dir")
.execute(raise_on_error=True)
)
config_json = json.dumps(
DifyCliConfig.create(session=cli_api_session, tenant_id=self._tenant_id, tool_deps=tools).model_dump(
mode="json"
),
ensure_ascii=False,
)
vm.upload_file(f"{node_tools_path}/{DifyCli.CONFIG_FILENAME}", BytesIO(config_json.encode("utf-8")))
config_path = shlex.quote(f"{node_tools_path}/{DifyCli.CONFIG_FILENAME}")
vm = self._sandbox.vm
# Merge mkdir + config write into a single pipeline to reduce round-trips.
(
pipeline(vm)
.add(["mkdir", "-p", DifyCli.GLOBAL_TOOLS_PATH], error_message="Failed to create global tools dir")
.add(["mkdir", "-p", node_tools_path], error_message="Failed to create node tools dir")
# Use a quoted heredoc (<<'EOF') so the shell performs no expansion on the
# content — safe regardless of $, `, \, or quotes inside the JSON.
.add(
["sh", "-c", f"cat > {config_path} << '__DIFY_CFG__'\n{config_json}\n__DIFY_CFG__"],
error_message="Failed to write CLI config",
)
.execute(raise_on_error=True)
)
pipeline(vm, cwd=node_tools_path).add(
[DifyCli.PATH, "init"], error_message="Failed to initialize Dify CLI"

View File

@ -7,7 +7,7 @@ from .skill_metadata import (
ToolFieldConfig,
ToolReference,
)
from .tool_access_policy import ToolAccessPolicy, ToolDescription, ToolInvocationRequest
from .tool_access_policy import ToolAccessDescription, ToolAccessPolicy, ToolDescription, ToolInvocationRequest
from .tool_dependencies import ToolDependencies, ToolDependency
__all__ = [
@ -17,6 +17,7 @@ __all__ = [
"SkillDependance",
"SkillDocument",
"SkillMetadata",
"ToolAccessDescription",
"ToolAccessPolicy",
"ToolConfiguration",
"ToolDependencies",

View File

@ -19,6 +19,44 @@ class ToolDescription(BaseModel):
return f"{self.tool_type.value}:{self.provider}:{self.tool_name}"
class ToolAccessDescription(BaseModel):
"""
Per-tool access descriptor that bundles identity with allowed credentials.
Each allowed tool is represented by exactly one ``ToolAccessDescription``.
``allowed_credentials`` captures the set of credential IDs that may be used
when invoking this tool:
* **empty set** the tool requires no special credential; only requests
*without* a ``credential_id`` are accepted.
* **non-empty set** the tool requires an explicit credential; the
request's ``credential_id`` must be a member of this set.
"""
model_config = ConfigDict(frozen=True)
tool_type: ToolProviderType
provider: str
tool_name: str
allowed_credentials: frozenset[str] = Field(default_factory=frozenset)
def tool_id(self) -> str:
return f"{self.tool_type.value}:{self.provider}:{self.tool_name}"
def is_credential_allowed(self, credential_id: str | None) -> bool:
"""Check whether *credential_id* satisfies this tool's credential policy.
* No credentials registered (``allowed_credentials`` is empty) →
only requests *without* a credential are accepted.
* Credentials registered → the supplied ``credential_id`` must be in
the set.
"""
if credential_id is None or credential_id == "":
return True
return credential_id in self.allowed_credentials
class ToolInvocationRequest(BaseModel):
"""A request to invoke a specific tool with optional credential."""
@ -38,63 +76,70 @@ class ToolAccessPolicy(BaseModel):
"""
Determines whether a tool invocation is allowed based on ToolDependencies.
The policy is built exclusively from ``ToolDependencies.references`` each
``ToolReference`` declares both the tool identity *and* the credential that
may be used. ``ToolDependencies.dependencies`` is a de-duplicated identity
list and does not participate in access-control decisions.
Rules:
1. Tool must be declared in dependencies or references.
2. If references exist for the tool, credential_id must match one of them.
3. If no references exist for the tool, credential_id must be None.
1. The tool must appear in at least one reference.
2. If references for the tool carry credential IDs, the request must supply
one of those exact IDs.
3. If no reference for the tool carries a credential ID, the request must
*not* supply one (use default/ambient credentials).
"""
model_config = ConfigDict(frozen=True)
allowed_tools: Mapping[str, ToolDescription] = Field(default_factory=dict)
credentials_by_tool: Mapping[str, set[str]] = Field(default_factory=dict)
access_map: Mapping[str, ToolAccessDescription] = Field(default_factory=dict)
@classmethod
def from_dependencies(cls, deps: ToolDependencies | None) -> "ToolAccessPolicy":
"""Create a ToolAccessPolicy from ToolDependencies."""
"""Build a policy from ``ToolDependencies``.
Only ``deps.references`` are used. Multiple references to the same
tool are merged their credential IDs are unioned into a single
``ToolAccessDescription.allowed_credentials`` set.
"""
if deps is None or deps.is_empty():
return cls()
allowed_tools: dict[str, ToolDescription] = {}
# Accumulate credential sets keyed by tool_id so that multiple
# references to the same tool are merged correctly.
credentials_by_tool: dict[str, set[str]] = {}
first_seen: dict[str, tuple[ToolProviderType, str, str]] = {}
# Process dependencies - tools that can be used without specific credentials
for dep in deps.dependencies:
tool_desc = ToolDescription(tool_type=dep.type, provider=dep.provider, tool_name=dep.tool_name)
tool_id = tool_desc.tool_id()
allowed_tools[tool_id] = tool_desc
# Process references - tools that may require specific credentials
for ref in deps.references:
tool_desc = ToolDescription(tool_type=ref.type, provider=ref.provider, tool_name=ref.tool_name)
tool_id = tool_desc.tool_id()
allowed_tools[tool_id] = tool_desc
# If reference has a credential_id, add it to the allowed credentials for this tool
tool_id = f"{ref.type.value}:{ref.provider}:{ref.tool_name}"
if tool_id not in first_seen:
first_seen[tool_id] = (ref.type, ref.provider, ref.tool_name)
credentials_by_tool[tool_id] = set()
if ref.credential_id is not None:
if tool_id not in credentials_by_tool:
credentials_by_tool[tool_id] = set()
credentials_by_tool[tool_id].add(ref.credential_id)
return cls(allowed_tools=allowed_tools, credentials_by_tool=credentials_by_tool)
access_map: dict[str, ToolAccessDescription] = {}
for tool_id, (tool_type, provider, tool_name) in first_seen.items():
access_map[tool_id] = ToolAccessDescription(
tool_type=tool_type,
provider=provider,
tool_name=tool_name,
allowed_credentials=frozenset(credentials_by_tool[tool_id]),
)
return cls(access_map=access_map)
def is_empty(self) -> bool:
return len(self.allowed_tools) == 0
return len(self.access_map) == 0
def is_allowed(self, request: ToolInvocationRequest) -> bool:
"""Check if the tool invocation request is allowed."""
# If the policy is empty, allow any invocation.
# An empty policy (no references declared) permits any invocation.
if self.is_empty():
return True
tool_id = request.tool_description.tool_id()
if tool_id not in self.allowed_tools:
access_desc = self.access_map.get(tool_id)
if access_desc is None:
return False
# No special credential required, use default credentials only
if request.credential_id is None or request.credential_id == "":
return self.credentials_by_tool.get(tool_id) is None
# Special credential required, check if it is allowed
else:
return request.credential_id in self.credentials_by_tool.get(tool_id, set())
return access_desc.is_credential_allowed(request.credential_id)