Files
ragflow/agent/sandbox/executor_manager/services/security.py
Sp1kyss e6cb9faace fix: close two security analyzer bypass paths in sandbox executor (#14690)
## Summary

Two bypass vectors in the sandbox code security analyzer allowed
malicious code to pass the safety check undetected and reach the Docker
executor.

### 1. JavaScript: template-literal bypass of `require()` block

The `SecureJavaScriptAnalyzer` regex patterns used `['"]` to match
module names, covering only single and double quotes. An attacker could
use ES6 template literals to bypass all three `require` checks:

`javascript
const cp = require(`child_process`);
async function main() {
  return cp.execSync('cat /etc/passwd').toString();
}
`

The same bypass applied to `fs` and `worker_threads`.

**Fix:** Updated all three `require` patterns from `['"]` to `['"\]` to
also match backtick template literals.

### 2. Python: `builtins` not blocked + attribute-call blind spot in
`visit_Call`

`visit_Call` only checked `ast.Name` nodes, so attribute-style calls
like `module.func()` were invisible to the analyzer. Additionally,
`builtins` was absent from `DANGEROUS_IMPORTS`. Combined, this allowed:

`python
import builtins
def main():
    builtins.exec('import os; os.system("id")')
`

Neither the import nor the exec call triggered any flag.

**Fix:** Added `builtins` to `DANGEROUS_IMPORTS` and added an
`ast.Attribute` branch to `visit_Call` so that `module.dangerous_func()`
style calls are caught alongside bare `dangerous_func()` calls.

## Tests

Added four regression tests covering each new bypass vector:
- `test_javascript_child_process_template_literal_is_rejected`
- `test_javascript_fs_template_literal_is_rejected`
- `test_python_builtins_import_is_rejected`
- `test_python_attribute_eval_call_is_rejected`

---------

Co-authored-by: bounty-hunter <bounty@hunter.local>
2026-05-11 11:46:27 +08:00

208 lines
9.1 KiB
Python

#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import ast
import re
from typing import List, Tuple
from core.logger import logger
from models.enums import SupportLanguage
class SecurePythonAnalyzer(ast.NodeVisitor):
"""
An AST-based analyzer for detecting unsafe Python code patterns.
"""
DANGEROUS_IMPORTS = {"os", "subprocess", "sys", "shutil", "socket", "ctypes", "pickle", "threading", "multiprocessing", "asyncio", "http.client", "ftplib", "telnetlib", "builtins"}
DANGEROUS_CALLS = {
"eval",
"exec",
"open",
"__import__",
"compile",
"input",
"system",
"popen",
"remove",
"rename",
"rmdir",
"chdir",
"chmod",
"chown",
"getattr",
"setattr",
"globals",
"locals",
"shutil.rmtree",
"subprocess.call",
"subprocess.Popen",
"ctypes",
"pickle.load",
"pickle.loads",
"pickle.dump",
"pickle.dumps",
}
def __init__(self):
self.unsafe_items: List[Tuple[str, int]] = []
def visit_Import(self, node: ast.Import):
"""Check for dangerous imports."""
for alias in node.names:
if alias.name.split(".")[0] in self.DANGEROUS_IMPORTS:
self.unsafe_items.append((f"Import: {alias.name}", node.lineno))
self.generic_visit(node)
def visit_ImportFrom(self, node: ast.ImportFrom):
"""Check for dangerous imports from specific modules."""
if node.module and node.module.split(".")[0] in self.DANGEROUS_IMPORTS:
self.unsafe_items.append((f"From Import: {node.module}", node.lineno))
self.generic_visit(node)
def visit_Call(self, node: ast.Call):
"""Check for dangerous function calls."""
if isinstance(node.func, ast.Name) and node.func.id in self.DANGEROUS_CALLS:
self.unsafe_items.append((f"Call: {node.func.id}", node.lineno))
elif isinstance(node.func, ast.Attribute) and node.func.attr in self.DANGEROUS_CALLS:
# Surface the attribute-style match in the analyzer log so that
# incident response can grep for it just like the other unsafe-item
# findings; the bare append is invisible to operators.
logger.warning(
"[SafeCheck] Attribute-style dangerous call detected: %s (line %s)",
node.func.attr,
node.lineno,
)
self.unsafe_items.append((f"Call: {node.func.attr}", node.lineno))
self.generic_visit(node)
def visit_Attribute(self, node: ast.Attribute):
"""Check for dangerous attribute access."""
if isinstance(node.value, ast.Name) and node.value.id in self.DANGEROUS_IMPORTS:
self.unsafe_items.append((f"Attribute Access: {node.value.id}.{node.attr}", node.lineno))
self.generic_visit(node)
def visit_BinOp(self, node: ast.BinOp):
"""Check for possible unsafe operations like concatenating strings with commands."""
# This could be useful to detect `eval("os." + "system")`
if isinstance(node.left, ast.Constant) and isinstance(node.right, ast.Constant):
self.unsafe_items.append(("Possible unsafe string concatenation", node.lineno))
self.generic_visit(node)
def visit_FunctionDef(self, node: ast.FunctionDef):
"""Check for dangerous function definitions (e.g., user-defined eval)."""
if node.name in self.DANGEROUS_CALLS:
self.unsafe_items.append((f"Function Definition: {node.name}", node.lineno))
self.generic_visit(node)
def visit_Assign(self, node: ast.Assign):
"""Check for assignments to variables that might lead to dangerous operations."""
for target in node.targets:
if isinstance(target, ast.Name) and target.id in self.DANGEROUS_CALLS:
self.unsafe_items.append((f"Assignment to dangerous variable: {target.id}", node.lineno))
self.generic_visit(node)
def visit_Lambda(self, node: ast.Lambda):
"""Check for lambda functions with dangerous operations."""
if isinstance(node.body, ast.Call) and isinstance(node.body.func, ast.Name) and node.body.func.id in self.DANGEROUS_CALLS:
self.unsafe_items.append(("Lambda with dangerous function call", node.lineno))
self.generic_visit(node)
def visit_ListComp(self, node: ast.ListComp):
"""Check for list comprehensions with dangerous operations."""
# First, visit the generators to check for any issues there
for elem in node.generators:
if isinstance(elem, ast.comprehension):
self.generic_visit(elem)
if isinstance(node.elt, ast.Call) and isinstance(node.elt.func, ast.Name) and node.elt.func.id in self.DANGEROUS_CALLS:
self.unsafe_items.append(("List comprehension with dangerous function call", node.lineno))
self.generic_visit(node)
def visit_DictComp(self, node: ast.DictComp):
"""Check for dictionary comprehensions with dangerous operations."""
# Check for dangerous calls in both the key and value expressions of the dictionary comprehension
if isinstance(node.key, ast.Call) and isinstance(node.key.func, ast.Name) and node.key.func.id in self.DANGEROUS_CALLS:
self.unsafe_items.append(("Dict comprehension with dangerous function call in key", node.lineno))
if isinstance(node.value, ast.Call) and isinstance(node.value.func, ast.Name) and node.value.func.id in self.DANGEROUS_CALLS:
self.unsafe_items.append(("Dict comprehension with dangerous function call in value", node.lineno))
# Visit other sub-nodes (e.g., the generators in the comprehension)
self.generic_visit(node)
def visit_SetComp(self, node: ast.SetComp):
"""Check for set comprehensions with dangerous operations."""
for elt in node.generators:
if isinstance(elt, ast.comprehension):
self.generic_visit(elt)
if isinstance(node.elt, ast.Call) and isinstance(node.elt.func, ast.Name) and node.elt.func.id in self.DANGEROUS_CALLS:
self.unsafe_items.append(("Set comprehension with dangerous function call", node.lineno))
self.generic_visit(node)
def visit_Yield(self, node: ast.Yield):
"""Check for yield statements that could be used to produce unsafe values."""
if isinstance(node.value, ast.Call) and isinstance(node.value.func, ast.Name) and node.value.func.id in self.DANGEROUS_CALLS:
self.unsafe_items.append(("Yield with dangerous function call", node.lineno))
self.generic_visit(node)
class SecureJavaScriptAnalyzer:
DANGEROUS_PATTERNS = [
(re.compile(r"""require\s*\(\s*['"`]child_process['"`]\s*\)"""), "Require: child_process"),
(re.compile(r"""require\s*\(\s*['"`]fs['"`]\s*\)"""), "Require: fs"),
(re.compile(r"""require\s*\(\s*['"`]worker_threads['"`]\s*\)"""), "Require: worker_threads"),
(re.compile(r"""\beval\s*\("""), "Call: eval"),
(re.compile(r"""\bFunction\s*\("""), "Call: Function"),
(re.compile(r"""\bprocess\s*\.\s*binding\s*\("""), "Call: process.binding"),
]
@classmethod
def analyze(cls, code: str) -> List[Tuple[str, int]]:
issues: List[Tuple[str, int]] = []
for pattern, description in cls.DANGEROUS_PATTERNS:
for match in pattern.finditer(code):
lineno = code.count("\n", 0, match.start()) + 1
issues.append((description, lineno))
return issues
def analyze_code_security(code: str, language: SupportLanguage) -> Tuple[bool, List[Tuple[str, int]]]:
"""
Analyze the provided code string and return whether it's safe and why.
:param code: The source code to analyze.
:param language: The programming language of the code.
:return: (is_safe: bool, issues: List of (description, line number))
"""
if language == SupportLanguage.PYTHON:
try:
tree = ast.parse(code)
analyzer = SecurePythonAnalyzer()
analyzer.visit(tree)
return len(analyzer.unsafe_items) == 0, analyzer.unsafe_items
except Exception as e:
logger.error(f"[SafeCheck] Python parsing failed: {str(e)}")
return False, [(f"Parsing Error: {str(e)}", -1)]
if language == SupportLanguage.NODEJS:
issues = SecureJavaScriptAnalyzer.analyze(code)
return len(issues) == 0, issues
logger.warning(f"[SafeCheck] Unsupported language for security analysis: {language}")
return False, [(f"Unsupported language for security analysis: {language}", -1)]