mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-07-03 11:17:06 +08:00
Compare commits
3 Commits
master
...
security/g
| Author | SHA1 | Date | |
|---|---|---|---|
| 5611d1c9b6 | |||
| e4eb7f2698 | |||
| ae4fcaaf41 |
@ -306,12 +306,15 @@ async def download_asset_content(request: web.Request) -> web.Response:
|
||||
404, "FILE_NOT_FOUND", "Underlying file not found on disk."
|
||||
)
|
||||
|
||||
_DANGEROUS_MIME_TYPES = {
|
||||
"text/html", "text/html-sandboxed", "application/xhtml+xml",
|
||||
"text/javascript", "text/css",
|
||||
}
|
||||
if content_type in _DANGEROUS_MIME_TYPES:
|
||||
# User-controlled asset content must never render inline in the app origin
|
||||
# (stored XSS via SVG/HTML/XML). Force dangerous types to download and
|
||||
# override any requested inline disposition. Centralised through
|
||||
# folder_paths.is_dangerous_content_type so this can't drift from /view and
|
||||
# /userdata (the previous inline set here omitted image/svg+xml and missed
|
||||
# the charset/casing/+xml-dialect bypasses).
|
||||
if folder_paths.is_dangerous_content_type(content_type):
|
||||
content_type = "application/octet-stream"
|
||||
disposition = "attachment"
|
||||
|
||||
safe_name = (filename or "").replace("\r", "").replace("\n", "")
|
||||
encoded = urllib.parse.quote(safe_name)
|
||||
|
||||
@ -50,21 +50,45 @@ class ModelFileManager:
|
||||
@routes.get("/experiment/models/preview/{folder}/{path_index}/{filename:.*}")
|
||||
async def get_model_preview(request):
|
||||
folder_name = request.match_info.get("folder", None)
|
||||
path_index = int(request.match_info.get("path_index", None))
|
||||
filename = request.match_info.get("filename", None)
|
||||
|
||||
if folder_name not in folder_paths.folder_names_and_paths:
|
||||
return web.Response(status=404)
|
||||
|
||||
# The "{filename:.*}" capture also matches the empty string, which
|
||||
# would resolve to the folder itself; reject it explicitly.
|
||||
if not filename:
|
||||
return web.Response(status=400)
|
||||
|
||||
try:
|
||||
path_index = int(request.match_info.get("path_index", None))
|
||||
except (TypeError, ValueError):
|
||||
return web.Response(status=400)
|
||||
|
||||
folders = folder_paths.folder_names_and_paths[folder_name]
|
||||
if path_index < 0 or path_index >= len(folders[0]):
|
||||
return web.Response(status=404)
|
||||
folder = folders[0][path_index]
|
||||
full_filename = os.path.join(folder, filename)
|
||||
full_filename = os.path.normpath(os.path.join(folder, filename))
|
||||
|
||||
# Prevent path traversal: the requested file must stay within the
|
||||
# configured model folder. `filename` is an unrestricted ".*" capture,
|
||||
# so values like "../../../../etc/passwd" would otherwise escape it.
|
||||
if not folder_paths.is_within_directory(folder, full_filename):
|
||||
return web.Response(status=403)
|
||||
|
||||
previews = self.get_model_previews(full_filename)
|
||||
default_preview = previews[0] if len(previews) > 0 else None
|
||||
if default_preview is None or (isinstance(default_preview, str) and not os.path.isfile(default_preview)):
|
||||
return web.Response(status=404)
|
||||
|
||||
# The preview is selected by a glob inside get_model_previews, so a
|
||||
# companion file (e.g. "model.preview.png") could itself be a symlink
|
||||
# resolving outside the model folder. Re-validate the file actually
|
||||
# opened: is_within_directory realpaths it, catching symlink escape.
|
||||
if isinstance(default_preview, str) and not folder_paths.is_within_directory(folder, default_preview):
|
||||
return web.Response(status=403)
|
||||
|
||||
try:
|
||||
with Image.open(default_preview) as img:
|
||||
img_bytes = BytesIO()
|
||||
|
||||
@ -6,6 +6,7 @@ import glob
|
||||
import shutil
|
||||
import logging
|
||||
import tempfile
|
||||
import mimetypes
|
||||
from aiohttp import web
|
||||
from urllib import parse
|
||||
from comfy.cli_args import args
|
||||
@ -336,7 +337,20 @@ class UserManager():
|
||||
if not isinstance(path, str):
|
||||
return path
|
||||
|
||||
return web.FileResponse(path)
|
||||
# User data files are arbitrary user-supplied content and are never
|
||||
# meant to render inline. Disable MIME sniffing and force a download
|
||||
# so uploaded markup/scripts can't execute in the app origin (stored
|
||||
# XSS). Content-Disposition: attachment is the load-bearing guard;
|
||||
# the content-type override and nosniff are defence in depth.
|
||||
content_type = mimetypes.guess_type(path)[0] or 'application/octet-stream'
|
||||
if folder_paths.is_dangerous_content_type(content_type):
|
||||
content_type = 'application/octet-stream'
|
||||
|
||||
return web.FileResponse(path, headers={
|
||||
"Content-Type": content_type,
|
||||
"X-Content-Type-Options": "nosniff",
|
||||
"Content-Disposition": "attachment",
|
||||
})
|
||||
|
||||
@routes.post("/userdata/{file}")
|
||||
async def post_userdata(request):
|
||||
|
||||
@ -264,6 +264,59 @@ def annotated_filepath(name: str) -> tuple[str, str | None]:
|
||||
return name, base_dir
|
||||
|
||||
|
||||
# Content types a browser may execute or render inline. File endpoints that
|
||||
# serve user-controlled content must force these to download (and ideally set
|
||||
# Content-Disposition: attachment) to avoid stored XSS. Centralised here so the
|
||||
# /view and /userdata handlers can't drift apart. mimetypes.guess_type may
|
||||
# return either the text/* or application/* spelling depending on platform, so
|
||||
# both are listed.
|
||||
DANGEROUS_CONTENT_TYPES = {
|
||||
'text/html', 'text/html-sandboxed', 'application/xhtml+xml',
|
||||
'text/javascript', 'application/javascript', 'application/x-javascript',
|
||||
'application/ecmascript', 'text/css',
|
||||
'image/svg+xml', 'application/xml', 'text/xml',
|
||||
# message/rfc822 (.mht/.mhtml) can carry script in some browsers.
|
||||
'message/rfc822',
|
||||
}
|
||||
|
||||
|
||||
def is_dangerous_content_type(content_type: str | None) -> bool:
|
||||
"""Return True if a browser may execute or render `content_type` inline.
|
||||
|
||||
Normalises before matching so the check can't be slipped past with a
|
||||
charset/boundary parameter (``text/html; charset=utf-8``) or casing
|
||||
(``TEXT/HTML``). Any XML dialect (``*+xml`` or ``*/xml``) is treated as
|
||||
dangerous because XML can carry inline script via stylesheet/entity tricks,
|
||||
which also covers the ``application/{xslt,rss,atom,rdf}+xml`` family without
|
||||
enumerating each one. Endpoints serving user-controlled content should route
|
||||
a dangerous type to ``application/octet-stream`` + ``Content-Disposition:
|
||||
attachment`` + ``X-Content-Type-Options: nosniff``.
|
||||
"""
|
||||
if not content_type:
|
||||
return False
|
||||
normalized = content_type.split(';', 1)[0].strip().lower()
|
||||
if normalized in DANGEROUS_CONTENT_TYPES:
|
||||
return True
|
||||
return normalized.endswith('+xml') or normalized.endswith('/xml')
|
||||
|
||||
|
||||
def is_within_directory(directory: str, target: str) -> bool:
|
||||
"""Return True if `target` resolves to a path inside `directory`.
|
||||
|
||||
Uses realpath on both operands so that a symlink placed inside `directory`
|
||||
that points elsewhere cannot escape the containment check at open time.
|
||||
"""
|
||||
try:
|
||||
directory = os.path.realpath(directory)
|
||||
target = os.path.realpath(target)
|
||||
return os.path.commonpath((directory, target)) == directory
|
||||
except ValueError:
|
||||
# ValueError is raised by realpath() on a path with an embedded null
|
||||
# byte, and by commonpath() on Windows when the paths are on different
|
||||
# drives. In either case the target is not safely within the directory.
|
||||
return False
|
||||
|
||||
|
||||
def get_annotated_filepath(name: str, default_dir: str | None=None) -> str:
|
||||
name, base_dir = annotated_filepath(name)
|
||||
|
||||
@ -273,7 +326,12 @@ def get_annotated_filepath(name: str, default_dir: str | None=None) -> str:
|
||||
else:
|
||||
base_dir = get_input_directory() # fallback path
|
||||
|
||||
return os.path.join(base_dir, name)
|
||||
filepath = os.path.abspath(os.path.join(base_dir, name))
|
||||
# Prevent path traversal: the resolved path must stay within base_dir.
|
||||
# repr() the name in the message so a crafted value can't inject log lines.
|
||||
if not is_within_directory(base_dir, filepath):
|
||||
raise ValueError("Invalid file path: {!r}".format(name))
|
||||
return filepath
|
||||
|
||||
|
||||
def exists_annotated_filepath(name) -> bool:
|
||||
@ -282,7 +340,10 @@ def exists_annotated_filepath(name) -> bool:
|
||||
if base_dir is None:
|
||||
base_dir = get_input_directory() # fallback path
|
||||
|
||||
filepath = os.path.join(base_dir, name)
|
||||
filepath = os.path.abspath(os.path.join(base_dir, name))
|
||||
# Treat traversal attempts as non-existent rather than probing the filesystem.
|
||||
if not is_within_directory(base_dir, filepath):
|
||||
return False
|
||||
return os.path.exists(filepath)
|
||||
|
||||
|
||||
|
||||
78
server.py
78
server.py
@ -23,8 +23,6 @@ import json
|
||||
import glob
|
||||
import struct
|
||||
import ssl
|
||||
import socket
|
||||
import ipaddress
|
||||
from PIL import Image, ImageOps
|
||||
from PIL.PngImagePlugin import PngInfo
|
||||
from io import BytesIO
|
||||
@ -40,6 +38,7 @@ import comfy.utils
|
||||
import comfy.model_management
|
||||
from comfy_api import feature_flags
|
||||
import node_helpers
|
||||
from utils.origin_check import is_cross_origin_forbidden
|
||||
from comfyui_version import __version__
|
||||
from app.frontend_management import FrontendManager, parse_version
|
||||
from comfy_api.internal import _ComfyNodeInternal
|
||||
@ -127,32 +126,6 @@ def create_cors_middleware(allowed_origin: str):
|
||||
|
||||
return cors_middleware
|
||||
|
||||
def is_loopback(host):
|
||||
if host is None:
|
||||
return False
|
||||
try:
|
||||
if ipaddress.ip_address(host).is_loopback:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except:
|
||||
pass
|
||||
|
||||
loopback = False
|
||||
for family in (socket.AF_INET, socket.AF_INET6):
|
||||
try:
|
||||
r = socket.getaddrinfo(host, None, family, socket.SOCK_STREAM)
|
||||
for family, _, _, _, sockaddr in r:
|
||||
if not ipaddress.ip_address(sockaddr[0]).is_loopback:
|
||||
return loopback
|
||||
else:
|
||||
loopback = True
|
||||
except socket.gaierror:
|
||||
pass
|
||||
|
||||
return loopback
|
||||
|
||||
|
||||
def create_origin_only_middleware():
|
||||
@web.middleware
|
||||
async def origin_only_middleware(request: web.Request, handler):
|
||||
@ -166,23 +139,13 @@ def create_origin_only_middleware():
|
||||
if 'Host' in request.headers and 'Origin' in request.headers:
|
||||
host = request.headers['Host']
|
||||
origin = request.headers['Origin']
|
||||
host_domain = host.lower()
|
||||
parsed = urllib.parse.urlparse(origin)
|
||||
origin_domain = parsed.netloc.lower()
|
||||
host_domain_parsed = urllib.parse.urlsplit('//' + host_domain)
|
||||
|
||||
#limit the check to when the host domain is localhost, this makes it slightly less safe but should still prevent the exploit
|
||||
loopback = is_loopback(host_domain_parsed.hostname)
|
||||
|
||||
if parsed.port is None: #if origin doesn't have a port strip it from the host to handle weird browsers, same for host
|
||||
host_domain = host_domain_parsed.hostname
|
||||
if host_domain_parsed.port is None:
|
||||
origin_domain = parsed.hostname
|
||||
|
||||
if loopback and host_domain is not None and origin_domain is not None and len(host_domain) > 0 and len(origin_domain) > 0:
|
||||
if host_domain != origin_domain:
|
||||
logging.warning("WARNING: request with non matching host and origin {} != {}, returning 403".format(host_domain, origin_domain))
|
||||
return web.Response(status=403)
|
||||
# The host/origin CSRF decision (incl. the Origin: null bypass fix
|
||||
# for GHSA-779p-m5rp-r4h4 #1) lives in utils.origin_check so it can
|
||||
# be unit-tested without standing up the full server. See
|
||||
# tests-unit/security_test/test_ghsa_779p_01_origin_csrf.py.
|
||||
if is_cross_origin_forbidden(host, origin):
|
||||
logging.warning("WARNING: request with non matching host and origin, returning 403 (host={!r} origin={!r})".format(host, origin))
|
||||
return web.Response(status=403)
|
||||
|
||||
if request.method == "OPTIONS":
|
||||
response = web.Response()
|
||||
@ -616,15 +579,30 @@ class PromptServer():
|
||||
or 'application/octet-stream'
|
||||
)
|
||||
|
||||
# For security, force certain mimetypes to download instead of display
|
||||
if content_type in {'text/html', 'text/html-sandboxed', 'application/xhtml+xml', 'text/javascript', 'text/css'}:
|
||||
content_type = 'application/octet-stream' # Forces download
|
||||
# For security, force renderable/active types (HTML, JS,
|
||||
# CSS, SVG, XML — anything that can carry inline <script>
|
||||
# and execute in the page origin) to download instead of
|
||||
# displaying inline, preventing stored XSS. The
|
||||
# attachment disposition is the load-bearing guard: a
|
||||
# bare filename= hint does not force a download per
|
||||
# RFC 6266, so we only attach it on the dangerous branch
|
||||
# to avoid breaking inline display of legitimate images.
|
||||
# Escape backslash/quote per RFC 6266 quoted-string so a
|
||||
# filename containing a double quote (which passes the
|
||||
# ".."/leading-slash filter above) can't break out of the
|
||||
# header's quoted-string and malform the disposition.
|
||||
safe_filename = filename.replace("\\", "\\\\").replace('"', '\\"')
|
||||
disposition = f"filename=\"{safe_filename}\""
|
||||
if folder_paths.is_dangerous_content_type(content_type):
|
||||
content_type = 'application/octet-stream'
|
||||
disposition = f"attachment; filename=\"{safe_filename}\""
|
||||
|
||||
return web.FileResponse(
|
||||
file,
|
||||
headers={
|
||||
"Content-Disposition": f"filename=\"{filename}\"",
|
||||
"Content-Type": content_type
|
||||
"Content-Disposition": disposition,
|
||||
"Content-Type": content_type,
|
||||
"X-Content-Type-Options": "nosniff"
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
import contextlib
|
||||
import json
|
||||
import time
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
@ -9,6 +11,40 @@ import requests
|
||||
from helpers import get_asset_filename, trigger_sync_seed_assets
|
||||
|
||||
|
||||
def test_download_svg_forced_to_attachment(http: requests.Session, api_base: str):
|
||||
"""GHSA-779p-m5rp-r4h4 CISA-5 (sibling route): an uploaded SVG must never be
|
||||
served inline from GET /api/assets/{id}/content, or an inline <script> runs
|
||||
in the app origin (stored XSS). Even with disposition=inline requested, a
|
||||
dangerous content type must be forced to application/octet-stream +
|
||||
Content-Disposition: attachment + nosniff. Regression guard for the stale
|
||||
inline blocklist that previously omitted image/svg+xml and ignored the
|
||||
centralized folder_paths.is_dangerous_content_type check.
|
||||
"""
|
||||
svg = b'<svg xmlns="http://www.w3.org/2000/svg"><script>alert(1)</script></svg>'
|
||||
files = {"file": ("evil.svg", svg, "image/svg+xml")}
|
||||
form_data = {
|
||||
"tags": json.dumps(["models", "checkpoints", "unit-tests", "svgxss"]),
|
||||
"name": "evil.svg",
|
||||
}
|
||||
up = http.post(api_base + "/api/assets", files=files, data=form_data, timeout=120)
|
||||
body = up.json()
|
||||
assert up.status_code in (200, 201), body
|
||||
aid = body["id"]
|
||||
try:
|
||||
r = http.get(f"{api_base}/api/assets/{aid}/content?disposition=inline", timeout=120)
|
||||
r.content
|
||||
assert r.status_code == 200
|
||||
ct = r.headers.get("Content-Type", "").lower()
|
||||
cd = r.headers.get("Content-Disposition", "").lower()
|
||||
assert "svg" not in ct, f"SVG served with a renderable content type: {ct!r}"
|
||||
assert ct.startswith("application/octet-stream"), f"expected octet-stream, got {ct!r}"
|
||||
assert "attachment" in cd, f"inline disposition not overridden to attachment: {cd!r}"
|
||||
assert r.headers.get("X-Content-Type-Options", "").lower() == "nosniff"
|
||||
finally:
|
||||
with contextlib.suppress(Exception):
|
||||
http.delete(f"{api_base}/api/assets/{aid}", timeout=30)
|
||||
|
||||
|
||||
def test_download_attachment_and_inline(http: requests.Session, api_base: str, seeded_asset: dict):
|
||||
aid = seeded_asset["id"]
|
||||
|
||||
|
||||
@ -53,8 +53,11 @@ def test_annotated_filepath():
|
||||
|
||||
def test_get_annotated_filepath():
|
||||
default_dir = "/default/dir"
|
||||
assert folder_paths.get_annotated_filepath("test.txt", default_dir) == os.path.join(default_dir, "test.txt")
|
||||
assert folder_paths.get_annotated_filepath("test.txt [output]") == os.path.join(folder_paths.get_output_directory(), "test.txt")
|
||||
# get_annotated_filepath now normalizes with os.path.abspath (part of the
|
||||
# GHSA-779p traversal hardening), so compare against the normalized form —
|
||||
# on Windows abspath also prepends the current drive letter.
|
||||
assert folder_paths.get_annotated_filepath("test.txt", default_dir) == os.path.abspath(os.path.join(default_dir, "test.txt"))
|
||||
assert folder_paths.get_annotated_filepath("test.txt [output]") == os.path.abspath(os.path.join(folder_paths.get_output_directory(), "test.txt"))
|
||||
|
||||
def test_add_model_folder_path_append(clear_folder_paths):
|
||||
folder_paths.add_model_folder_path("test_folder", "/default/path", is_default=True)
|
||||
|
||||
0
tests-unit/security_test/__init__.py
Normal file
0
tests-unit/security_test/__init__.py
Normal file
114
tests-unit/security_test/test_ghsa_779p_01_origin_csrf.py
Normal file
114
tests-unit/security_test/test_ghsa_779p_01_origin_csrf.py
Normal file
@ -0,0 +1,114 @@
|
||||
"""CI unit guard for FIX #1 of GHSA-779p-m5rp-r4h4 — the Origin: null CSRF bypass.
|
||||
|
||||
Vuln #1 was a CSRF bypass: the loopback host/origin guard in
|
||||
``server.create_origin_only_middleware`` skipped the comparison entirely whenever
|
||||
the origin host parsed to an empty string. An opaque ``Origin: null`` (sent by a
|
||||
sandboxed iframe or a ``data:``/``file:`` document) parses to exactly that, so an
|
||||
attacker could forge cross-origin state-mutating requests against a victim's
|
||||
loopback ComfyUI — the entry primitive for the [CISA-1] CSRF -> stored-XSS ->
|
||||
client-side RCE chain.
|
||||
|
||||
The decision logic was extracted verbatim from the middleware closure into
|
||||
``utils.origin_check`` precisely so it can be exercised here: ``server.py`` cannot
|
||||
be imported in a unit test (importing it spins up the full PromptServer/aiohttp
|
||||
app and its global side effects), which is why finding #1 previously had no
|
||||
server-free CI coverage and only a live-server POC
|
||||
(``.security/pocs/test_security_ghsa_779p.py::TestOriginNullCsrf``, skipped
|
||||
unless a server is running on 127.0.0.1:8188). This file is the fast, hermetic
|
||||
guard so the Origin: null bypass cannot silently reopen.
|
||||
|
||||
Cases use IP literals so the loopback determination does not depend on DNS; the
|
||||
one name-based case ("localhost") relies only on standard loopback resolution.
|
||||
"""
|
||||
|
||||
from utils.origin_check import is_cross_origin_forbidden, is_loopback
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# The regression: an opaque/empty Origin against a loopback Host MUST be a 403.
|
||||
# Each of these returned False (allowed) before the fix — that is the bug.
|
||||
# ---------------------------------------------------------------------------
|
||||
OPAQUE_ORIGIN_ON_LOOPBACK = [
|
||||
("127.0.0.1:8188", "null"), # the exact reported bypass
|
||||
("127.0.0.1:8188", ""), # empty Origin header, same empty-host path
|
||||
("127.0.0.1", "null"), # host without an explicit port
|
||||
("[::1]:8188", "null"), # IPv6 loopback host
|
||||
]
|
||||
|
||||
|
||||
def test_origin_null_is_forbidden():
|
||||
"""Origin: null against a loopback host must be rejected (the #1 fix)."""
|
||||
assert is_cross_origin_forbidden("127.0.0.1:8188", "null") is True, (
|
||||
"Origin: null was treated as allowed — this is exactly the "
|
||||
"GHSA-779p-m5rp-r4h4 #1 CSRF bypass reopening."
|
||||
)
|
||||
|
||||
|
||||
def test_all_opaque_origins_on_loopback_forbidden():
|
||||
for host, origin in OPAQUE_ORIGIN_ON_LOOPBACK:
|
||||
assert is_cross_origin_forbidden(host, origin) is True, (host, origin)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# False-positive guards: legitimate same-origin requests must stay allowed, or
|
||||
# the fix would break the dev server. The port-stripping cases preserve the
|
||||
# original "handle weird browsers" behaviour (origin without a port matches a
|
||||
# host with one, and vice versa).
|
||||
# ---------------------------------------------------------------------------
|
||||
MATCHING_ORIGINS = [
|
||||
("127.0.0.1:8188", "http://127.0.0.1:8188"),
|
||||
("127.0.0.1:8188", "http://127.0.0.1"), # origin has no port -> host port stripped for compare
|
||||
("127.0.0.1", "http://127.0.0.1"),
|
||||
("localhost:8188", "http://localhost:8188"),
|
||||
]
|
||||
|
||||
|
||||
def test_matching_origins_allowed():
|
||||
for host, origin in MATCHING_ORIGINS:
|
||||
assert is_cross_origin_forbidden(host, origin) is False, (host, origin)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Genuine cross-origin requests against a loopback host must be forbidden.
|
||||
# ---------------------------------------------------------------------------
|
||||
MISMATCHED_ORIGINS_ON_LOOPBACK = [
|
||||
("127.0.0.1:8188", "http://evil.com"),
|
||||
("127.0.0.1:8188", "https://127.0.0.1:9999"), # same host, different port
|
||||
("127.0.0.1:8188", "http://localhost.evil.com"),
|
||||
]
|
||||
|
||||
|
||||
def test_mismatched_origins_on_loopback_forbidden():
|
||||
for host, origin in MISMATCHED_ORIGINS_ON_LOOPBACK:
|
||||
assert is_cross_origin_forbidden(host, origin) is True, (host, origin)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Scope guard: the check is deliberately limited to loopback hosts. A
|
||||
# non-loopback Host must NOT trip the guard (even on a mismatch / opaque
|
||||
# origin) — this preserves the original behaviour and documents that the
|
||||
# mitigation is localhost-only by design.
|
||||
# ---------------------------------------------------------------------------
|
||||
NON_LOOPBACK_HOSTS = [
|
||||
("203.0.113.5:8188", "http://evil.com"), # public IP literal -> not loopback
|
||||
("203.0.113.5:8188", "null"),
|
||||
("10.0.0.5:8188", "null"), # private but not loopback
|
||||
]
|
||||
|
||||
|
||||
def test_non_loopback_host_not_subject_to_check():
|
||||
for host, origin in NON_LOOPBACK_HOSTS:
|
||||
assert is_cross_origin_forbidden(host, origin) is False, (host, origin)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# is_loopback — the predicate the scoping above depends on.
|
||||
# ---------------------------------------------------------------------------
|
||||
def test_is_loopback_true_for_loopback_addresses():
|
||||
for host in ("127.0.0.1", "::1", "localhost"):
|
||||
assert is_loopback(host) is True, host
|
||||
|
||||
|
||||
def test_is_loopback_false_for_non_loopback_and_none():
|
||||
for host in ("203.0.113.5", "10.0.0.5", None):
|
||||
assert is_loopback(host) is False, host
|
||||
192
tests-unit/security_test/test_ghsa_779p_02_preview_traversal.py
Normal file
192
tests-unit/security_test/test_ghsa_779p_02_preview_traversal.py
Normal file
@ -0,0 +1,192 @@
|
||||
"""CI unit tests for FIX #2 of GHSA-779p-m5rp-r4h4.
|
||||
|
||||
Path traversal / hardening in app/model_manager.py get_model_preview
|
||||
(route /experiment/models/preview/{folder}/{path_index}/{filename:.*}).
|
||||
|
||||
Reference: https://github.com/Comfy-Org/ComfyUI/security/advisories/GHSA-779p-m5rp-r4h4
|
||||
"""
|
||||
import pytest
|
||||
import yarl
|
||||
from io import BytesIO
|
||||
from PIL import Image
|
||||
from aiohttp import web
|
||||
from unittest.mock import patch
|
||||
from app.model_manager import ModelFileManager
|
||||
|
||||
pytestmark = (
|
||||
pytest.mark.asyncio
|
||||
) # This applies the asyncio mark to all test functions in the module
|
||||
|
||||
@pytest.fixture
|
||||
def model_manager():
|
||||
return ModelFileManager()
|
||||
|
||||
@pytest.fixture
|
||||
def app(model_manager):
|
||||
app = web.Application()
|
||||
routes = web.RouteTableDef()
|
||||
model_manager.add_routes(routes)
|
||||
app.add_routes(routes)
|
||||
return app
|
||||
|
||||
|
||||
async def test_legit_preview_returns_200(aiohttp_client, app, tmp_path):
|
||||
"""Sanity: a real preview PNG inside the model folder is served as webp 200."""
|
||||
img = Image.new('RGB', (16, 16), color=(255, 0, 128))
|
||||
img.save(tmp_path / "test_model.png", format='PNG')
|
||||
|
||||
with patch('folder_paths.folder_names_and_paths', {
|
||||
'test_folder': ([str(tmp_path)], None)
|
||||
}):
|
||||
client = await aiohttp_client(app)
|
||||
response = await client.get('/experiment/models/preview/test_folder/0/test_model.png')
|
||||
|
||||
assert response.status == 200
|
||||
assert response.content_type == 'image/webp'
|
||||
|
||||
img_bytes = BytesIO(await response.read())
|
||||
served = Image.open(img_bytes)
|
||||
assert served.format
|
||||
assert served.format.lower() == 'webp'
|
||||
served.close()
|
||||
|
||||
|
||||
async def test_non_integer_path_index_returns_400(aiohttp_client, app, tmp_path):
|
||||
"""A non-integer path_index segment must be rejected with 400."""
|
||||
with patch('folder_paths.folder_names_and_paths', {
|
||||
'test_folder': ([str(tmp_path)], None)
|
||||
}):
|
||||
client = await aiohttp_client(app)
|
||||
response = await client.get('/experiment/models/preview/test_folder/abc/test_model.png')
|
||||
|
||||
assert response.status == 400
|
||||
|
||||
|
||||
async def test_out_of_range_path_index_returns_404(aiohttp_client, app, tmp_path):
|
||||
"""A path_index beyond the configured folder list must return 404."""
|
||||
with patch('folder_paths.folder_names_and_paths', {
|
||||
'test_folder': ([str(tmp_path)], None)
|
||||
}):
|
||||
client = await aiohttp_client(app)
|
||||
response = await client.get('/experiment/models/preview/test_folder/99/test_model.png')
|
||||
|
||||
assert response.status == 404
|
||||
|
||||
|
||||
async def test_empty_filename_returns_400(aiohttp_client, app, tmp_path):
|
||||
"""The "{filename:.*}" capture also matches the empty string (trailing
|
||||
slash). It would resolve to the folder itself and must be rejected with 400."""
|
||||
with patch('folder_paths.folder_names_and_paths', {
|
||||
'test_folder': ([str(tmp_path)], None)
|
||||
}):
|
||||
client = await aiohttp_client(app)
|
||||
response = await client.get('/experiment/models/preview/test_folder/0/')
|
||||
|
||||
assert response.status == 400
|
||||
|
||||
|
||||
async def test_path_traversal_in_filename_returns_403(aiohttp_client, app, tmp_path):
|
||||
"""Path traversal in {filename} must be rejected with 403 and must NOT read
|
||||
a file outside the configured model directory.
|
||||
|
||||
GOTCHA: aiohttp/yarl collapses literal ``../`` dot-segments out of the URL
|
||||
path before it reaches the handler, which would make this test vacuously
|
||||
pass (the request would hit a different/non-existent route). We percent-encode
|
||||
the dots and slashes (``%2e%2e%2f``) and send the URL with
|
||||
``yarl.URL(..., encoded=True)`` so the bytes survive client-side normalization
|
||||
untouched; aiohttp's router then percent-decodes them into ``match_info``,
|
||||
delivering the literal ``../`` traversal to the handler's ``{filename:.*}``
|
||||
capture.
|
||||
|
||||
Without the fix the handler computes
|
||||
``os.path.normpath(os.path.join(folder, "../../../../etc/hosts"))``, which
|
||||
escapes ``tmp_path`` and would be passed straight to get_model_previews ->
|
||||
Image.open, serving bytes from outside the model dir (200/served bytes). The
|
||||
is_within_directory() containment check is the load-bearing fix that turns
|
||||
that escape into a 403.
|
||||
"""
|
||||
# Sanity-anchor: a legit preview exists inside tmp_path, so a 200 path is
|
||||
# genuinely reachable — proving the 403 below is the containment check
|
||||
# firing, not an unrelated 404.
|
||||
img = Image.new('RGB', (16, 16), color=(255, 0, 128))
|
||||
img.save(tmp_path / "test_model.png", format='PNG')
|
||||
|
||||
# Percent-encoded "../../../../etc/hosts" so yarl does not collapse the
|
||||
# dot-segments before the request leaves the client.
|
||||
encoded_traversal = '%2e%2e%2f' * 4 + 'etc%2fhosts'
|
||||
raw_path = '/experiment/models/preview/test_folder/0/' + encoded_traversal
|
||||
url = yarl.URL(raw_path, encoded=True)
|
||||
|
||||
with patch('folder_paths.folder_names_and_paths', {
|
||||
'test_folder': ([str(tmp_path)], None)
|
||||
}):
|
||||
client = await aiohttp_client(app)
|
||||
response = await client.get(url)
|
||||
|
||||
# Confirm the traversal actually reached the handler intact: a 200 here
|
||||
# would mean either normalization stripped the ``../`` (vacuous pass) or
|
||||
# the containment check failed open and served outside-dir bytes.
|
||||
assert response.status == 403, (
|
||||
f"expected 403 from is_within_directory() containment check, "
|
||||
f"got {response.status}; traversal may have been normalized away "
|
||||
f"or the fix failed open"
|
||||
)
|
||||
body = await response.read()
|
||||
assert body == b"", "403 response must not carry any file bytes"
|
||||
|
||||
|
||||
async def test_symlink_companion_preview_returns_403(aiohttp_client, app, tmp_path):
|
||||
"""A companion preview file is selected by a glob inside get_model_previews
|
||||
and then opened. If that companion is a symlink whose path is in-dir but
|
||||
whose target escapes the model folder, it must be rejected with 403 — not
|
||||
served. The requested path itself stays in-dir (so the first containment
|
||||
check passes); the load-bearing fix is the SECOND is_within_directory check
|
||||
on the file actually opened.
|
||||
"""
|
||||
model_dir = tmp_path / "models"
|
||||
model_dir.mkdir()
|
||||
secret_dir = tmp_path / "secret"
|
||||
secret_dir.mkdir()
|
||||
# A real image OUTSIDE the model dir — valid, so without the fix Image.open
|
||||
# would succeed and its bytes would be served (200).
|
||||
secret = secret_dir / "secret.png"
|
||||
Image.new('RGB', (8, 8), color=(0, 0, 0)).save(secret, format='PNG')
|
||||
# Companion preview, in-dir by name but a symlink escaping the model dir.
|
||||
# (No real model file is needed — get_model_previews globs companions by
|
||||
# basename, and omitting a .safetensors avoids the metadata-header read.)
|
||||
companion = model_dir / "model.preview.png"
|
||||
try:
|
||||
companion.symlink_to(secret)
|
||||
except (OSError, NotImplementedError):
|
||||
pytest.skip("symlinks not supported on this platform/filesystem")
|
||||
|
||||
with patch('folder_paths.folder_names_and_paths', {
|
||||
'test_folder': ([str(model_dir)], None)
|
||||
}):
|
||||
client = await aiohttp_client(app)
|
||||
response = await client.get('/experiment/models/preview/test_folder/0/model.safetensors')
|
||||
|
||||
assert response.status == 403, (
|
||||
f"expected 403 — the globbed companion preview is a symlink resolving "
|
||||
f"outside the model dir and must not be served; got {response.status}"
|
||||
)
|
||||
assert await response.read() == b""
|
||||
|
||||
|
||||
async def test_null_byte_in_filename_no_500(aiohttp_client, app, tmp_path):
|
||||
"""A NUL byte in the filename must yield a clean client rejection, not a 500
|
||||
from an uncaught ValueError in is_within_directory's realpath() call."""
|
||||
raw_path = '/experiment/models/preview/test_folder/0/' + 'a%00b'
|
||||
url = yarl.URL(raw_path, encoded=True)
|
||||
|
||||
with patch('folder_paths.folder_names_and_paths', {
|
||||
'test_folder': ([str(tmp_path)], None)
|
||||
}):
|
||||
client = await aiohttp_client(app)
|
||||
response = await client.get(url)
|
||||
|
||||
assert response.status != 500, (
|
||||
f"NUL byte produced a 500 (uncaught ValueError); expected a clean "
|
||||
f"4xx rejection, got {response.status}"
|
||||
)
|
||||
assert 400 <= response.status < 500
|
||||
@ -0,0 +1,165 @@
|
||||
"""Security tests for GHSA-779p-m5rp-r4h4 — FIX #3.
|
||||
|
||||
Path traversal in folder_paths.get_annotated_filepath / exists_annotated_filepath,
|
||||
plus the shared is_within_directory() containment helper.
|
||||
|
||||
These are pure-function tests (no running server). The input/output/temp
|
||||
directories are pointed at tmp_path via the folder_paths setters, so a crafted
|
||||
name containing `../`, an absolute path, or a symlink that escapes the base
|
||||
directory must be rejected.
|
||||
|
||||
Reference: https://github.com/Comfy-Org/ComfyUI/security/advisories/GHSA-779p-m5rp-r4h4
|
||||
"""
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
import folder_paths
|
||||
from comfy.options import enable_args_parsing
|
||||
enable_args_parsing()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sandbox(tmp_path):
|
||||
"""Point folder_paths' input/output/temp dirs at a real temp sandbox.
|
||||
|
||||
Yields the realpath'd base, input, output and temp directories. The original
|
||||
directory values are restored afterward so tests stay isolated.
|
||||
"""
|
||||
base = os.path.realpath(str(tmp_path))
|
||||
input_dir = os.path.join(base, "input")
|
||||
output_dir = os.path.join(base, "output")
|
||||
temp_dir = os.path.join(base, "temp")
|
||||
for d in (input_dir, output_dir, temp_dir):
|
||||
os.makedirs(d, exist_ok=True)
|
||||
|
||||
orig_input = folder_paths.get_input_directory()
|
||||
orig_output = folder_paths.get_output_directory()
|
||||
orig_temp = folder_paths.get_temp_directory()
|
||||
|
||||
folder_paths.set_input_directory(input_dir)
|
||||
folder_paths.set_output_directory(output_dir)
|
||||
folder_paths.set_temp_directory(temp_dir)
|
||||
|
||||
yield {
|
||||
"base": base,
|
||||
"input": input_dir,
|
||||
"output": output_dir,
|
||||
"temp": temp_dir,
|
||||
}
|
||||
|
||||
folder_paths.set_input_directory(orig_input)
|
||||
folder_paths.set_output_directory(orig_output)
|
||||
folder_paths.set_temp_directory(orig_temp)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# is_within_directory() — the shared containment helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_is_within_directory_legit_child(sandbox):
|
||||
base = sandbox["input"]
|
||||
child = os.path.join(base, "sub", "image.png")
|
||||
assert folder_paths.is_within_directory(base, child) is True
|
||||
|
||||
|
||||
def test_is_within_directory_dotdot_escape(sandbox):
|
||||
base = sandbox["input"]
|
||||
escape = os.path.join(base, "..", "..", "etc", "passwd")
|
||||
assert folder_paths.is_within_directory(base, escape) is False
|
||||
|
||||
|
||||
def test_is_within_directory_symlink_escape(sandbox):
|
||||
"""A symlink created INSIDE base that points OUTSIDE base must not pass.
|
||||
|
||||
This is the key new hardening: is_within_directory realpath()s both operands,
|
||||
so a symlink planted in the base directory can't be used to read files
|
||||
elsewhere. We create a real on-disk symlink and a real secret target to
|
||||
verify the check actually resolves the link.
|
||||
"""
|
||||
base = sandbox["input"]
|
||||
|
||||
# A directory living outside the base, holding a secret file.
|
||||
outside = os.path.join(sandbox["base"], "outside_secret_dir")
|
||||
os.makedirs(outside, exist_ok=True)
|
||||
secret = os.path.join(outside, "secret.txt")
|
||||
with open(secret, "w") as f:
|
||||
f.write("top secret")
|
||||
|
||||
# Plant a symlink inside base that points at the outside directory.
|
||||
# symlink creation can require elevated privileges / Developer Mode on
|
||||
# Windows, so skip cleanly where it isn't available (same guard as the
|
||||
# sibling test in test_ghsa_779p_02_preview_traversal.py).
|
||||
link = os.path.join(base, "escape_link")
|
||||
try:
|
||||
os.symlink(outside, link)
|
||||
except (OSError, NotImplementedError):
|
||||
pytest.skip("symlinks not supported on this platform/filesystem")
|
||||
|
||||
# Accessing the secret "through" the in-base symlink must be rejected.
|
||||
target_via_link = os.path.join(link, "secret.txt")
|
||||
assert folder_paths.is_within_directory(base, target_via_link) is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_annotated_filepath()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_get_annotated_filepath_legit_name(sandbox):
|
||||
result = folder_paths.get_annotated_filepath("image.png")
|
||||
assert result == os.path.join(sandbox["input"], "image.png")
|
||||
assert folder_paths.is_within_directory(sandbox["input"], result)
|
||||
|
||||
|
||||
def test_get_annotated_filepath_input_annotation(sandbox):
|
||||
result = folder_paths.get_annotated_filepath("image.png [input]")
|
||||
assert result == os.path.join(sandbox["input"], "image.png")
|
||||
|
||||
|
||||
def test_get_annotated_filepath_output_annotation(sandbox):
|
||||
result = folder_paths.get_annotated_filepath("image.png [output]")
|
||||
assert result == os.path.join(sandbox["output"], "image.png")
|
||||
|
||||
|
||||
def test_get_annotated_filepath_temp_annotation(sandbox):
|
||||
result = folder_paths.get_annotated_filepath("image.png [temp]")
|
||||
assert result == os.path.join(sandbox["temp"], "image.png")
|
||||
|
||||
|
||||
def test_get_annotated_filepath_dotdot_raises(sandbox):
|
||||
with pytest.raises(ValueError):
|
||||
folder_paths.get_annotated_filepath("../etc/passwd")
|
||||
|
||||
|
||||
def test_get_annotated_filepath_dotdot_with_annotation_raises(sandbox):
|
||||
with pytest.raises(ValueError):
|
||||
folder_paths.get_annotated_filepath("../../etc/passwd [output]")
|
||||
|
||||
|
||||
def test_get_annotated_filepath_absolute_escape_raises(sandbox):
|
||||
with pytest.raises(ValueError):
|
||||
folder_paths.get_annotated_filepath("/etc/passwd")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# exists_annotated_filepath()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_exists_annotated_filepath_existing_legit_file(sandbox):
|
||||
real = os.path.join(sandbox["input"], "real.png")
|
||||
with open(real, "w") as f:
|
||||
f.write("data")
|
||||
assert folder_paths.exists_annotated_filepath("real.png") is True
|
||||
|
||||
|
||||
def test_exists_annotated_filepath_traversal_returns_false(sandbox):
|
||||
"""A traversal name must return False without raising and without probing
|
||||
outside the base directory (must never reach os.path.exists for the escape).
|
||||
"""
|
||||
# /etc/passwd exists on POSIX; the function must still report False because
|
||||
# the resolved path escapes the input directory.
|
||||
assert folder_paths.exists_annotated_filepath("../../../../../../etc/passwd") is False
|
||||
|
||||
|
||||
def test_exists_annotated_filepath_absolute_returns_false(sandbox):
|
||||
assert folder_paths.exists_annotated_filepath("/etc/passwd") is False
|
||||
147
tests-unit/security_test/test_ghsa_779p_04_userdata_xss.py
Normal file
147
tests-unit/security_test/test_ghsa_779p_04_userdata_xss.py
Normal file
@ -0,0 +1,147 @@
|
||||
"""
|
||||
CI unit tests for FIX #4 of GHSA-779p-m5rp-r4h4.
|
||||
|
||||
Stored-XSS hardening on GET /userdata/{file} in app/user_manager.py.
|
||||
|
||||
User data files are arbitrary user-supplied content and must never render
|
||||
inline in the app origin. The getuserdata handler:
|
||||
- forces Content-Type to application/octet-stream for any type in
|
||||
folder_paths.DANGEROUS_CONTENT_TYPES (text/html, image/svg+xml,
|
||||
text/javascript, ...),
|
||||
- sets X-Content-Type-Options: nosniff,
|
||||
- sets Content-Disposition: attachment.
|
||||
|
||||
These tests pre-create files in tmp_path and GET them back, asserting the
|
||||
secure response headers. They mirror the aiohttp_client pattern in
|
||||
tests-unit/prompt_server_test/user_manager_test.py.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import os
|
||||
from aiohttp import web
|
||||
from app.user_manager import UserManager
|
||||
|
||||
pytestmark = (
|
||||
pytest.mark.asyncio
|
||||
) # This applies the asyncio mark to all test functions in the module
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def user_manager(tmp_path):
|
||||
um = UserManager()
|
||||
um.get_request_user_filepath = lambda req, file, **kwargs: os.path.join(
|
||||
tmp_path, file
|
||||
) if file else tmp_path
|
||||
return um
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app(user_manager):
|
||||
app = web.Application()
|
||||
routes = web.RouteTableDef()
|
||||
user_manager.add_routes(routes)
|
||||
app.add_routes(routes)
|
||||
return app
|
||||
|
||||
|
||||
async def test_html_served_as_octet_stream(aiohttp_client, app, tmp_path):
|
||||
(tmp_path / "evil.html").write_text(
|
||||
"<script>console.log('xss-marker-ghsa-779p')</script>"
|
||||
)
|
||||
|
||||
client = await aiohttp_client(app)
|
||||
resp = await client.get("/userdata/evil.html")
|
||||
|
||||
assert resp.status == 200
|
||||
ct = resp.headers.get("Content-Type", "")
|
||||
# The load-bearing assertion: a .html file must NOT be served as text/html.
|
||||
assert "text/html" not in ct.lower(), (
|
||||
f"Content-Type {ct!r} would let a browser render/execute the file (stored XSS)."
|
||||
)
|
||||
assert ct == "application/octet-stream"
|
||||
assert resp.headers.get("X-Content-Type-Options") == "nosniff"
|
||||
assert "attachment" in resp.headers.get("Content-Disposition", "")
|
||||
|
||||
|
||||
async def test_svg_served_as_octet_stream(aiohttp_client, app, tmp_path):
|
||||
(tmp_path / "evil.svg").write_text(
|
||||
'<?xml version="1.0"?>'
|
||||
'<svg xmlns="http://www.w3.org/2000/svg">'
|
||||
'<script>console.log("xss-marker-ghsa-779p")</script>'
|
||||
"</svg>"
|
||||
)
|
||||
|
||||
client = await aiohttp_client(app)
|
||||
resp = await client.get("/userdata/evil.svg")
|
||||
|
||||
assert resp.status == 200
|
||||
ct = resp.headers.get("Content-Type", "")
|
||||
# SVG can carry inline <script>; it must not be served as image/svg+xml.
|
||||
assert "svg" not in ct.lower(), (
|
||||
f"Content-Type {ct!r} would let a browser render the SVG and execute embedded scripts."
|
||||
)
|
||||
assert ct == "application/octet-stream"
|
||||
assert resp.headers.get("X-Content-Type-Options") == "nosniff"
|
||||
assert "attachment" in resp.headers.get("Content-Disposition", "")
|
||||
|
||||
|
||||
async def test_js_served_as_octet_stream(aiohttp_client, app, tmp_path):
|
||||
(tmp_path / "evil.js").write_text("alert('xss-marker-ghsa-779p')")
|
||||
|
||||
client = await aiohttp_client(app)
|
||||
resp = await client.get("/userdata/evil.js")
|
||||
|
||||
assert resp.status == 200
|
||||
ct = resp.headers.get("Content-Type", "").lower()
|
||||
# Must not be served as any executable JavaScript content type.
|
||||
assert "javascript" not in ct, (
|
||||
f"Content-Type {ct!r} is an executable JS type."
|
||||
)
|
||||
assert "ecmascript" not in ct, (
|
||||
f"Content-Type {ct!r} is an executable JS type."
|
||||
)
|
||||
assert ct == "application/octet-stream"
|
||||
assert resp.headers.get("X-Content-Type-Options") == "nosniff"
|
||||
assert "attachment" in resp.headers.get("Content-Disposition", "")
|
||||
|
||||
|
||||
async def test_xml_dialect_served_as_octet_stream(aiohttp_client, app, tmp_path):
|
||||
"""An XML dialect outside the original blocklist (.xslt -> application/xslt+xml)
|
||||
must still be forced to download. This pins the normalised *+xml family rule
|
||||
in folder_paths.is_dangerous_content_type(); a plain set-membership test would
|
||||
have served this inline."""
|
||||
(tmp_path / "evil.xslt").write_text(
|
||||
'<?xml version="1.0"?>'
|
||||
'<xsl:stylesheet version="1.0" '
|
||||
'xmlns:xsl="http://www.w3.org/1999/XSL/Transform">'
|
||||
"<!-- xss-marker-ghsa-779p -->"
|
||||
"</xsl:stylesheet>"
|
||||
)
|
||||
|
||||
client = await aiohttp_client(app)
|
||||
resp = await client.get("/userdata/evil.xslt")
|
||||
|
||||
assert resp.status == 200
|
||||
ct = resp.headers.get("Content-Type", "")
|
||||
assert ct == "application/octet-stream", (
|
||||
f"Content-Type {ct!r}: an *+xml dialect must be forced to octet-stream "
|
||||
f"(it can carry inline script via stylesheet/entity tricks)."
|
||||
)
|
||||
assert resp.headers.get("X-Content-Type-Options") == "nosniff"
|
||||
assert "attachment" in resp.headers.get("Content-Disposition", "")
|
||||
|
||||
|
||||
async def test_benign_txt_still_served(aiohttp_client, app, tmp_path):
|
||||
(tmp_path / "note.txt").write_text("just a harmless note")
|
||||
|
||||
client = await aiohttp_client(app)
|
||||
resp = await client.get("/userdata/note.txt")
|
||||
|
||||
assert resp.status == 200
|
||||
assert await resp.text() == "just a harmless note"
|
||||
ct = resp.headers.get("Content-Type", "")
|
||||
# text/plain is not in the dangerous set, so it is acceptable here. The
|
||||
# defence-in-depth headers must still be present regardless.
|
||||
assert "text/plain" in ct.lower()
|
||||
assert resp.headers.get("X-Content-Type-Options") == "nosniff"
|
||||
assert "attachment" in resp.headers.get("Content-Disposition", "")
|
||||
@ -0,0 +1,138 @@
|
||||
"""CI unit guard for FIX #5 of GHSA-779p-m5rp-r4h4 — the /view forced-download set.
|
||||
|
||||
Vuln #5 was stored XSS via SVG upload: the /view endpoint's Content-Type
|
||||
blocklist covered text/html, text/javascript, etc. but was missing
|
||||
image/svg+xml, so an uploaded SVG carrying an inline <script> was served as
|
||||
image/svg+xml and executed in the page origin when rendered.
|
||||
|
||||
The /view forced-download decision lives in the view_image closure registered by
|
||||
server.PromptServer.add_routes (server.py ~line 596), which calls
|
||||
`folder_paths.is_dangerous_content_type(content_type)` — a normalising check that
|
||||
strips charset/boundary parameters and casing and folds in the whole */xml and
|
||||
*+xml dialect family — rather than a bypassable raw
|
||||
`content_type in folder_paths.DANGEROUS_CONTENT_TYPES` membership test. On a match
|
||||
it rewrites the response to application/octet-stream with a
|
||||
Content-Disposition: attachment header. server.py cannot be imported in a unit
|
||||
test (importing it spins up the full PromptServer/aiohttp app and its global side
|
||||
effects), so these tests pin the underlying dangerous-content data
|
||||
(folder_paths.DANGEROUS_CONTENT_TYPES) and the normalising is_dangerous_content_type()
|
||||
helper that the closure actually calls.
|
||||
|
||||
The end-to-end /view assertion (upload an SVG, GET /view, confirm the response
|
||||
is not served as image/svg+xml) lives in the live POC at
|
||||
.security/pocs/test_security_ghsa_779p.py::TestViewSvgContentType, which
|
||||
requires a running server. This file is the fast, server-free CI guard on the
|
||||
set contents so the blocklist can't silently regress.
|
||||
"""
|
||||
|
||||
import folder_paths
|
||||
|
||||
|
||||
# Active/renderable content types that must be forced to download. Each of these
|
||||
# can carry an inline <script> (or otherwise execute) in the page origin if a
|
||||
# browser renders it. image/svg+xml is the original missing item that caused
|
||||
# vuln #5.
|
||||
DANGEROUS = [
|
||||
'image/svg+xml',
|
||||
'application/xml',
|
||||
'text/xml',
|
||||
'text/html',
|
||||
'text/html-sandboxed',
|
||||
'application/xhtml+xml',
|
||||
'text/javascript',
|
||||
'application/javascript',
|
||||
'application/x-javascript',
|
||||
'application/ecmascript',
|
||||
'text/css',
|
||||
]
|
||||
|
||||
# Benign image types that browsers display inline and that must keep rendering;
|
||||
# forcing these to download would break legitimate previews.
|
||||
BENIGN_INLINE_IMAGES = [
|
||||
'image/png',
|
||||
'image/jpeg',
|
||||
'image/webp',
|
||||
'image/gif',
|
||||
]
|
||||
|
||||
|
||||
def test_dangerous_content_types_is_a_set():
|
||||
assert isinstance(folder_paths.DANGEROUS_CONTENT_TYPES, set)
|
||||
|
||||
|
||||
def test_svg_is_in_the_blocklist():
|
||||
"""The specific item whose absence caused vuln #5."""
|
||||
assert 'image/svg+xml' in folder_paths.DANGEROUS_CONTENT_TYPES, (
|
||||
"image/svg+xml missing from DANGEROUS_CONTENT_TYPES — this is exactly "
|
||||
"the regression that reopens GHSA-779p-m5rp-r4h4 vuln #5 (stored XSS "
|
||||
"via SVG upload on /view)."
|
||||
)
|
||||
|
||||
|
||||
def test_all_dangerous_types_present():
|
||||
missing = [ct for ct in DANGEROUS if ct not in folder_paths.DANGEROUS_CONTENT_TYPES]
|
||||
assert not missing, (
|
||||
f"DANGEROUS_CONTENT_TYPES is missing required active/renderable types: "
|
||||
f"{missing}. The /view closure only forces a download for content types "
|
||||
f"in this set; anything missing here is served inline and can execute."
|
||||
)
|
||||
|
||||
|
||||
def test_benign_inline_image_types_absent():
|
||||
leaked = [ct for ct in BENIGN_INLINE_IMAGES if ct in folder_paths.DANGEROUS_CONTENT_TYPES]
|
||||
assert not leaked, (
|
||||
f"Benign inline-displayable image types found in DANGEROUS_CONTENT_TYPES: "
|
||||
f"{leaked}. Forcing these to download would break legitimate image "
|
||||
f"previews in /view — they must keep rendering inline."
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# is_dangerous_content_type() — the normalising check the /view and /userdata
|
||||
# handlers now call instead of a raw `in DANGEROUS_CONTENT_TYPES` membership
|
||||
# test. An exact-string membership test was bypassable with a charset parameter
|
||||
# or odd casing, and missed the wider XML dialect family; these tests pin the
|
||||
# normalisation so that bypass can't reopen.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_function_matches_plain_dangerous_types():
|
||||
for ct in DANGEROUS:
|
||||
assert folder_paths.is_dangerous_content_type(ct) is True, ct
|
||||
|
||||
|
||||
def test_function_strips_parameters_and_casing():
|
||||
"""A charset/boundary parameter or casing must not slip a type past the check.
|
||||
|
||||
This is the bypass surfaced by review: the /view blake3 branch can serve an
|
||||
attacker-controlled, unvalidated asset mime_type like 'text/html; charset=utf-8',
|
||||
which an exact-string set test missed.
|
||||
"""
|
||||
for ct in (
|
||||
'text/html; charset=utf-8',
|
||||
'TEXT/HTML',
|
||||
'Text/HTML; charset=UTF-8',
|
||||
'image/svg+xml; charset=utf-8',
|
||||
' text/html ',
|
||||
):
|
||||
assert folder_paths.is_dangerous_content_type(ct) is True, ct
|
||||
|
||||
|
||||
def test_function_covers_xml_dialect_family():
|
||||
"""Any *+xml / */xml dialect is dangerous without enumerating each one."""
|
||||
for ct in (
|
||||
'application/xslt+xml',
|
||||
'application/rss+xml',
|
||||
'application/atom+xml',
|
||||
'application/rdf+xml',
|
||||
'application/mathml+xml',
|
||||
'message/rfc822',
|
||||
):
|
||||
assert folder_paths.is_dangerous_content_type(ct) is True, ct
|
||||
|
||||
|
||||
def test_function_allows_benign_and_empty():
|
||||
for ct in BENIGN_INLINE_IMAGES + ['application/octet-stream', 'text/plain']:
|
||||
assert folder_paths.is_dangerous_content_type(ct) is False, ct
|
||||
# None / empty (mimetypes.guess_type miss) must not be treated as dangerous.
|
||||
assert folder_paths.is_dangerous_content_type(None) is False
|
||||
assert folder_paths.is_dangerous_content_type('') is False
|
||||
91
utils/origin_check.py
Normal file
91
utils/origin_check.py
Normal file
@ -0,0 +1,91 @@
|
||||
"""Host/Origin CSRF check for the loopback dev server.
|
||||
|
||||
Extracted verbatim from ``server.create_origin_only_middleware`` so the decision
|
||||
logic is importable and unit-testable without standing up the full
|
||||
PromptServer/aiohttp app (importing ``server`` pulls in ``nodes``/``execution``/
|
||||
torch and has global side effects). The wiring lives in ``server.py``; the
|
||||
regression guard for GHSA-779p-m5rp-r4h4 finding #1 (CSRF bypass via
|
||||
``Origin: null``) lives in
|
||||
``tests-unit/security_test/test_ghsa_779p_01_origin_csrf.py``.
|
||||
|
||||
Only ``urllib.parse``/``ipaddress``/``socket`` (stdlib) are imported here, so the
|
||||
module stays cheap to import from a unit test.
|
||||
"""
|
||||
|
||||
import ipaddress
|
||||
import socket
|
||||
import urllib.parse
|
||||
|
||||
|
||||
def is_loopback(host):
|
||||
if host is None:
|
||||
return False
|
||||
try:
|
||||
if ipaddress.ip_address(host).is_loopback:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except ValueError:
|
||||
# Not an IP literal (ip_address raises ValueError); fall through to DNS
|
||||
# resolution below. Narrowed from a bare except so genuine interrupts
|
||||
# (KeyboardInterrupt/SystemExit) aren't swallowed.
|
||||
pass
|
||||
|
||||
loopback = False
|
||||
for family in (socket.AF_INET, socket.AF_INET6):
|
||||
try:
|
||||
r = socket.getaddrinfo(host, None, family, socket.SOCK_STREAM)
|
||||
for family, _, _, _, sockaddr in r:
|
||||
if not ipaddress.ip_address(sockaddr[0]).is_loopback:
|
||||
return loopback
|
||||
else:
|
||||
loopback = True
|
||||
except socket.gaierror:
|
||||
pass
|
||||
|
||||
return loopback
|
||||
|
||||
|
||||
def is_cross_origin_forbidden(host, origin):
|
||||
"""Return True if a request with these ``Host``/``Origin`` headers must be rejected (403).
|
||||
|
||||
This prevents the case where a random website can queue Comfy workflows by
|
||||
making a POST to 127.0.0.1, which browsers don't prevent. In that case the
|
||||
Host and Origin hostnames won't match. The check is intentionally limited to
|
||||
when the Host resolves to a loopback address; for non-loopback hosts it
|
||||
returns False (it is a localhost-CSRF mitigation, not a general same-origin
|
||||
enforcer).
|
||||
|
||||
GHSA-779p-m5rp-r4h4 #1 fix: an opaque origin (e.g. ``"null"`` sent by a
|
||||
sandboxed iframe or a ``data:``/``file:`` document) parses to an empty/None
|
||||
host. Previously such requests skipped the comparison entirely, which let an
|
||||
attacker bypass the host/origin CSRF check with ``Origin: null``. A missing
|
||||
or empty origin host is now treated as a mismatch and rejected.
|
||||
"""
|
||||
host_domain = host.lower()
|
||||
parsed = urllib.parse.urlparse(origin)
|
||||
origin_domain = parsed.netloc.lower()
|
||||
host_domain_parsed = urllib.parse.urlsplit('//' + host_domain)
|
||||
|
||||
# A non-numeric or out-of-range port (e.g. Origin: http://127.0.0.1:99999)
|
||||
# makes urllib raise ValueError on .port access. Treat a malformed port as a
|
||||
# rejected request rather than letting it surface as an uncaught 500 in the
|
||||
# middleware — it fails closed, consistent with the CSRF stance.
|
||||
try:
|
||||
origin_port = parsed.port
|
||||
host_port = host_domain_parsed.port
|
||||
except ValueError:
|
||||
return True
|
||||
|
||||
loopback = is_loopback(host_domain_parsed.hostname)
|
||||
|
||||
if origin_port is None: # if origin doesn't have a port strip it from the host to handle weird browsers, same for host
|
||||
host_domain = host_domain_parsed.hostname
|
||||
if host_port is None:
|
||||
origin_domain = parsed.hostname
|
||||
|
||||
if loopback and host_domain is not None and len(host_domain) > 0:
|
||||
if origin_domain is None or len(origin_domain) == 0 or host_domain != origin_domain:
|
||||
return True
|
||||
|
||||
return False
|
||||
Reference in New Issue
Block a user