Compare commits

..

3 Commits

Author SHA1 Message Date
5611d1c9b6 security: address CodeRabbit review feedback on GHSA-779p tests
- test #3: guard the symlink-escape test with a try/except skip so it no
  longer errors on Windows CI where os.symlink needs elevated privileges /
  Developer Mode (mirrors the guard in the sibling test #2).
- test #5: refresh the stale module docstring to describe the actual /view
  gating (view_image closure calling folder_paths.is_dangerous_content_type,
  the normalising check) instead of the bypassable raw set-membership test.
2026-07-02 20:13:36 -07:00
e4eb7f2698 security: address review feedback on GHSA-779p fixes
- Fix Windows CI failure in test_get_annotated_filepath: compare against
  os.path.abspath(...) to match the intentional abspath normalization added
  by the traversal hardening (abspath prepends the drive letter on Windows).
- origin_check: narrow the bare `except:` in is_loopback() to ValueError so
  genuine interrupts aren't swallowed (review nit).
- origin_check: guard .port access in is_cross_origin_forbidden() so a
  malformed/out-of-range port (e.g. Origin: http://127.0.0.1:99999) fails
  closed with a 403 instead of surfacing an uncaught 500 in the middleware.
- server /view: escape backslash/quote in the Content-Disposition filename
  (RFC 6266 quoted-string) so a filename containing a double quote can't
  malform the response header.
2026-07-02 19:58:06 -07:00
ae4fcaaf41 security: fix five vulnerabilities (GHSA-779p-m5rp-r4h4)
- CVE-2026-56670: force download of SVG/XML responses on /view to prevent stored XSS
- CVE-2026-56671: contain /experiment/models/preview reads within the model folder
- CVE-2026-56672: stop inline rendering of uploaded /userdata/{file} content
- CVE-2026-56673: prevent path traversal in get_annotated_filepath (LoadImage /prompt input)
- CVE-2026-56674: reject opaque/null Origin to close the CSRF middleware bypass

Adds regression tests under tests-unit/security_test/ covering all five.
2026-07-02 19:10:30 -07:00
14 changed files with 1028 additions and 62 deletions

View File

@ -306,12 +306,15 @@ async def download_asset_content(request: web.Request) -> web.Response:
404, "FILE_NOT_FOUND", "Underlying file not found on disk."
)
_DANGEROUS_MIME_TYPES = {
"text/html", "text/html-sandboxed", "application/xhtml+xml",
"text/javascript", "text/css",
}
if content_type in _DANGEROUS_MIME_TYPES:
# User-controlled asset content must never render inline in the app origin
# (stored XSS via SVG/HTML/XML). Force dangerous types to download and
# override any requested inline disposition. Centralised through
# folder_paths.is_dangerous_content_type so this can't drift from /view and
# /userdata (the previous inline set here omitted image/svg+xml and missed
# the charset/casing/+xml-dialect bypasses).
if folder_paths.is_dangerous_content_type(content_type):
content_type = "application/octet-stream"
disposition = "attachment"
safe_name = (filename or "").replace("\r", "").replace("\n", "")
encoded = urllib.parse.quote(safe_name)

View File

@ -50,21 +50,45 @@ class ModelFileManager:
@routes.get("/experiment/models/preview/{folder}/{path_index}/{filename:.*}")
async def get_model_preview(request):
folder_name = request.match_info.get("folder", None)
path_index = int(request.match_info.get("path_index", None))
filename = request.match_info.get("filename", None)
if folder_name not in folder_paths.folder_names_and_paths:
return web.Response(status=404)
# The "{filename:.*}" capture also matches the empty string, which
# would resolve to the folder itself; reject it explicitly.
if not filename:
return web.Response(status=400)
try:
path_index = int(request.match_info.get("path_index", None))
except (TypeError, ValueError):
return web.Response(status=400)
folders = folder_paths.folder_names_and_paths[folder_name]
if path_index < 0 or path_index >= len(folders[0]):
return web.Response(status=404)
folder = folders[0][path_index]
full_filename = os.path.join(folder, filename)
full_filename = os.path.normpath(os.path.join(folder, filename))
# Prevent path traversal: the requested file must stay within the
# configured model folder. `filename` is an unrestricted ".*" capture,
# so values like "../../../../etc/passwd" would otherwise escape it.
if not folder_paths.is_within_directory(folder, full_filename):
return web.Response(status=403)
previews = self.get_model_previews(full_filename)
default_preview = previews[0] if len(previews) > 0 else None
if default_preview is None or (isinstance(default_preview, str) and not os.path.isfile(default_preview)):
return web.Response(status=404)
# The preview is selected by a glob inside get_model_previews, so a
# companion file (e.g. "model.preview.png") could itself be a symlink
# resolving outside the model folder. Re-validate the file actually
# opened: is_within_directory realpaths it, catching symlink escape.
if isinstance(default_preview, str) and not folder_paths.is_within_directory(folder, default_preview):
return web.Response(status=403)
try:
with Image.open(default_preview) as img:
img_bytes = BytesIO()

View File

@ -6,6 +6,7 @@ import glob
import shutil
import logging
import tempfile
import mimetypes
from aiohttp import web
from urllib import parse
from comfy.cli_args import args
@ -336,7 +337,20 @@ class UserManager():
if not isinstance(path, str):
return path
return web.FileResponse(path)
# User data files are arbitrary user-supplied content and are never
# meant to render inline. Disable MIME sniffing and force a download
# so uploaded markup/scripts can't execute in the app origin (stored
# XSS). Content-Disposition: attachment is the load-bearing guard;
# the content-type override and nosniff are defence in depth.
content_type = mimetypes.guess_type(path)[0] or 'application/octet-stream'
if folder_paths.is_dangerous_content_type(content_type):
content_type = 'application/octet-stream'
return web.FileResponse(path, headers={
"Content-Type": content_type,
"X-Content-Type-Options": "nosniff",
"Content-Disposition": "attachment",
})
@routes.post("/userdata/{file}")
async def post_userdata(request):

View File

@ -264,6 +264,59 @@ def annotated_filepath(name: str) -> tuple[str, str | None]:
return name, base_dir
# Content types a browser may execute or render inline. File endpoints that
# serve user-controlled content must force these to download (and ideally set
# Content-Disposition: attachment) to avoid stored XSS. Centralised here so the
# /view and /userdata handlers can't drift apart. mimetypes.guess_type may
# return either the text/* or application/* spelling depending on platform, so
# both are listed.
DANGEROUS_CONTENT_TYPES = {
'text/html', 'text/html-sandboxed', 'application/xhtml+xml',
'text/javascript', 'application/javascript', 'application/x-javascript',
'application/ecmascript', 'text/css',
'image/svg+xml', 'application/xml', 'text/xml',
# message/rfc822 (.mht/.mhtml) can carry script in some browsers.
'message/rfc822',
}
def is_dangerous_content_type(content_type: str | None) -> bool:
"""Return True if a browser may execute or render `content_type` inline.
Normalises before matching so the check can't be slipped past with a
charset/boundary parameter (``text/html; charset=utf-8``) or casing
(``TEXT/HTML``). Any XML dialect (``*+xml`` or ``*/xml``) is treated as
dangerous because XML can carry inline script via stylesheet/entity tricks,
which also covers the ``application/{xslt,rss,atom,rdf}+xml`` family without
enumerating each one. Endpoints serving user-controlled content should route
a dangerous type to ``application/octet-stream`` + ``Content-Disposition:
attachment`` + ``X-Content-Type-Options: nosniff``.
"""
if not content_type:
return False
normalized = content_type.split(';', 1)[0].strip().lower()
if normalized in DANGEROUS_CONTENT_TYPES:
return True
return normalized.endswith('+xml') or normalized.endswith('/xml')
def is_within_directory(directory: str, target: str) -> bool:
"""Return True if `target` resolves to a path inside `directory`.
Uses realpath on both operands so that a symlink placed inside `directory`
that points elsewhere cannot escape the containment check at open time.
"""
try:
directory = os.path.realpath(directory)
target = os.path.realpath(target)
return os.path.commonpath((directory, target)) == directory
except ValueError:
# ValueError is raised by realpath() on a path with an embedded null
# byte, and by commonpath() on Windows when the paths are on different
# drives. In either case the target is not safely within the directory.
return False
def get_annotated_filepath(name: str, default_dir: str | None=None) -> str:
name, base_dir = annotated_filepath(name)
@ -273,7 +326,12 @@ def get_annotated_filepath(name: str, default_dir: str | None=None) -> str:
else:
base_dir = get_input_directory() # fallback path
return os.path.join(base_dir, name)
filepath = os.path.abspath(os.path.join(base_dir, name))
# Prevent path traversal: the resolved path must stay within base_dir.
# repr() the name in the message so a crafted value can't inject log lines.
if not is_within_directory(base_dir, filepath):
raise ValueError("Invalid file path: {!r}".format(name))
return filepath
def exists_annotated_filepath(name) -> bool:
@ -282,7 +340,10 @@ def exists_annotated_filepath(name) -> bool:
if base_dir is None:
base_dir = get_input_directory() # fallback path
filepath = os.path.join(base_dir, name)
filepath = os.path.abspath(os.path.join(base_dir, name))
# Treat traversal attempts as non-existent rather than probing the filesystem.
if not is_within_directory(base_dir, filepath):
return False
return os.path.exists(filepath)

View File

@ -23,8 +23,6 @@ import json
import glob
import struct
import ssl
import socket
import ipaddress
from PIL import Image, ImageOps
from PIL.PngImagePlugin import PngInfo
from io import BytesIO
@ -40,6 +38,7 @@ import comfy.utils
import comfy.model_management
from comfy_api import feature_flags
import node_helpers
from utils.origin_check import is_cross_origin_forbidden
from comfyui_version import __version__
from app.frontend_management import FrontendManager, parse_version
from comfy_api.internal import _ComfyNodeInternal
@ -127,32 +126,6 @@ def create_cors_middleware(allowed_origin: str):
return cors_middleware
def is_loopback(host):
if host is None:
return False
try:
if ipaddress.ip_address(host).is_loopback:
return True
else:
return False
except:
pass
loopback = False
for family in (socket.AF_INET, socket.AF_INET6):
try:
r = socket.getaddrinfo(host, None, family, socket.SOCK_STREAM)
for family, _, _, _, sockaddr in r:
if not ipaddress.ip_address(sockaddr[0]).is_loopback:
return loopback
else:
loopback = True
except socket.gaierror:
pass
return loopback
def create_origin_only_middleware():
@web.middleware
async def origin_only_middleware(request: web.Request, handler):
@ -166,23 +139,13 @@ def create_origin_only_middleware():
if 'Host' in request.headers and 'Origin' in request.headers:
host = request.headers['Host']
origin = request.headers['Origin']
host_domain = host.lower()
parsed = urllib.parse.urlparse(origin)
origin_domain = parsed.netloc.lower()
host_domain_parsed = urllib.parse.urlsplit('//' + host_domain)
#limit the check to when the host domain is localhost, this makes it slightly less safe but should still prevent the exploit
loopback = is_loopback(host_domain_parsed.hostname)
if parsed.port is None: #if origin doesn't have a port strip it from the host to handle weird browsers, same for host
host_domain = host_domain_parsed.hostname
if host_domain_parsed.port is None:
origin_domain = parsed.hostname
if loopback and host_domain is not None and origin_domain is not None and len(host_domain) > 0 and len(origin_domain) > 0:
if host_domain != origin_domain:
logging.warning("WARNING: request with non matching host and origin {} != {}, returning 403".format(host_domain, origin_domain))
return web.Response(status=403)
# The host/origin CSRF decision (incl. the Origin: null bypass fix
# for GHSA-779p-m5rp-r4h4 #1) lives in utils.origin_check so it can
# be unit-tested without standing up the full server. See
# tests-unit/security_test/test_ghsa_779p_01_origin_csrf.py.
if is_cross_origin_forbidden(host, origin):
logging.warning("WARNING: request with non matching host and origin, returning 403 (host={!r} origin={!r})".format(host, origin))
return web.Response(status=403)
if request.method == "OPTIONS":
response = web.Response()
@ -616,15 +579,30 @@ class PromptServer():
or 'application/octet-stream'
)
# For security, force certain mimetypes to download instead of display
if content_type in {'text/html', 'text/html-sandboxed', 'application/xhtml+xml', 'text/javascript', 'text/css'}:
content_type = 'application/octet-stream' # Forces download
# For security, force renderable/active types (HTML, JS,
# CSS, SVG, XML — anything that can carry inline <script>
# and execute in the page origin) to download instead of
# displaying inline, preventing stored XSS. The
# attachment disposition is the load-bearing guard: a
# bare filename= hint does not force a download per
# RFC 6266, so we only attach it on the dangerous branch
# to avoid breaking inline display of legitimate images.
# Escape backslash/quote per RFC 6266 quoted-string so a
# filename containing a double quote (which passes the
# ".."/leading-slash filter above) can't break out of the
# header's quoted-string and malform the disposition.
safe_filename = filename.replace("\\", "\\\\").replace('"', '\\"')
disposition = f"filename=\"{safe_filename}\""
if folder_paths.is_dangerous_content_type(content_type):
content_type = 'application/octet-stream'
disposition = f"attachment; filename=\"{safe_filename}\""
return web.FileResponse(
file,
headers={
"Content-Disposition": f"filename=\"{filename}\"",
"Content-Type": content_type
"Content-Disposition": disposition,
"Content-Type": content_type,
"X-Content-Type-Options": "nosniff"
}
)

View File

@ -1,3 +1,5 @@
import contextlib
import json
import time
import uuid
from datetime import datetime
@ -9,6 +11,40 @@ import requests
from helpers import get_asset_filename, trigger_sync_seed_assets
def test_download_svg_forced_to_attachment(http: requests.Session, api_base: str):
"""GHSA-779p-m5rp-r4h4 CISA-5 (sibling route): an uploaded SVG must never be
served inline from GET /api/assets/{id}/content, or an inline <script> runs
in the app origin (stored XSS). Even with disposition=inline requested, a
dangerous content type must be forced to application/octet-stream +
Content-Disposition: attachment + nosniff. Regression guard for the stale
inline blocklist that previously omitted image/svg+xml and ignored the
centralized folder_paths.is_dangerous_content_type check.
"""
svg = b'<svg xmlns="http://www.w3.org/2000/svg"><script>alert(1)</script></svg>'
files = {"file": ("evil.svg", svg, "image/svg+xml")}
form_data = {
"tags": json.dumps(["models", "checkpoints", "unit-tests", "svgxss"]),
"name": "evil.svg",
}
up = http.post(api_base + "/api/assets", files=files, data=form_data, timeout=120)
body = up.json()
assert up.status_code in (200, 201), body
aid = body["id"]
try:
r = http.get(f"{api_base}/api/assets/{aid}/content?disposition=inline", timeout=120)
r.content
assert r.status_code == 200
ct = r.headers.get("Content-Type", "").lower()
cd = r.headers.get("Content-Disposition", "").lower()
assert "svg" not in ct, f"SVG served with a renderable content type: {ct!r}"
assert ct.startswith("application/octet-stream"), f"expected octet-stream, got {ct!r}"
assert "attachment" in cd, f"inline disposition not overridden to attachment: {cd!r}"
assert r.headers.get("X-Content-Type-Options", "").lower() == "nosniff"
finally:
with contextlib.suppress(Exception):
http.delete(f"{api_base}/api/assets/{aid}", timeout=30)
def test_download_attachment_and_inline(http: requests.Session, api_base: str, seeded_asset: dict):
aid = seeded_asset["id"]

View File

@ -53,8 +53,11 @@ def test_annotated_filepath():
def test_get_annotated_filepath():
default_dir = "/default/dir"
assert folder_paths.get_annotated_filepath("test.txt", default_dir) == os.path.join(default_dir, "test.txt")
assert folder_paths.get_annotated_filepath("test.txt [output]") == os.path.join(folder_paths.get_output_directory(), "test.txt")
# get_annotated_filepath now normalizes with os.path.abspath (part of the
# GHSA-779p traversal hardening), so compare against the normalized form —
# on Windows abspath also prepends the current drive letter.
assert folder_paths.get_annotated_filepath("test.txt", default_dir) == os.path.abspath(os.path.join(default_dir, "test.txt"))
assert folder_paths.get_annotated_filepath("test.txt [output]") == os.path.abspath(os.path.join(folder_paths.get_output_directory(), "test.txt"))
def test_add_model_folder_path_append(clear_folder_paths):
folder_paths.add_model_folder_path("test_folder", "/default/path", is_default=True)

View File

View File

@ -0,0 +1,114 @@
"""CI unit guard for FIX #1 of GHSA-779p-m5rp-r4h4 — the Origin: null CSRF bypass.
Vuln #1 was a CSRF bypass: the loopback host/origin guard in
``server.create_origin_only_middleware`` skipped the comparison entirely whenever
the origin host parsed to an empty string. An opaque ``Origin: null`` (sent by a
sandboxed iframe or a ``data:``/``file:`` document) parses to exactly that, so an
attacker could forge cross-origin state-mutating requests against a victim's
loopback ComfyUI — the entry primitive for the [CISA-1] CSRF -> stored-XSS ->
client-side RCE chain.
The decision logic was extracted verbatim from the middleware closure into
``utils.origin_check`` precisely so it can be exercised here: ``server.py`` cannot
be imported in a unit test (importing it spins up the full PromptServer/aiohttp
app and its global side effects), which is why finding #1 previously had no
server-free CI coverage and only a live-server POC
(``.security/pocs/test_security_ghsa_779p.py::TestOriginNullCsrf``, skipped
unless a server is running on 127.0.0.1:8188). This file is the fast, hermetic
guard so the Origin: null bypass cannot silently reopen.
Cases use IP literals so the loopback determination does not depend on DNS; the
one name-based case ("localhost") relies only on standard loopback resolution.
"""
from utils.origin_check import is_cross_origin_forbidden, is_loopback
# ---------------------------------------------------------------------------
# The regression: an opaque/empty Origin against a loopback Host MUST be a 403.
# Each of these returned False (allowed) before the fix — that is the bug.
# ---------------------------------------------------------------------------
OPAQUE_ORIGIN_ON_LOOPBACK = [
("127.0.0.1:8188", "null"), # the exact reported bypass
("127.0.0.1:8188", ""), # empty Origin header, same empty-host path
("127.0.0.1", "null"), # host without an explicit port
("[::1]:8188", "null"), # IPv6 loopback host
]
def test_origin_null_is_forbidden():
"""Origin: null against a loopback host must be rejected (the #1 fix)."""
assert is_cross_origin_forbidden("127.0.0.1:8188", "null") is True, (
"Origin: null was treated as allowed — this is exactly the "
"GHSA-779p-m5rp-r4h4 #1 CSRF bypass reopening."
)
def test_all_opaque_origins_on_loopback_forbidden():
for host, origin in OPAQUE_ORIGIN_ON_LOOPBACK:
assert is_cross_origin_forbidden(host, origin) is True, (host, origin)
# ---------------------------------------------------------------------------
# False-positive guards: legitimate same-origin requests must stay allowed, or
# the fix would break the dev server. The port-stripping cases preserve the
# original "handle weird browsers" behaviour (origin without a port matches a
# host with one, and vice versa).
# ---------------------------------------------------------------------------
MATCHING_ORIGINS = [
("127.0.0.1:8188", "http://127.0.0.1:8188"),
("127.0.0.1:8188", "http://127.0.0.1"), # origin has no port -> host port stripped for compare
("127.0.0.1", "http://127.0.0.1"),
("localhost:8188", "http://localhost:8188"),
]
def test_matching_origins_allowed():
for host, origin in MATCHING_ORIGINS:
assert is_cross_origin_forbidden(host, origin) is False, (host, origin)
# ---------------------------------------------------------------------------
# Genuine cross-origin requests against a loopback host must be forbidden.
# ---------------------------------------------------------------------------
MISMATCHED_ORIGINS_ON_LOOPBACK = [
("127.0.0.1:8188", "http://evil.com"),
("127.0.0.1:8188", "https://127.0.0.1:9999"), # same host, different port
("127.0.0.1:8188", "http://localhost.evil.com"),
]
def test_mismatched_origins_on_loopback_forbidden():
for host, origin in MISMATCHED_ORIGINS_ON_LOOPBACK:
assert is_cross_origin_forbidden(host, origin) is True, (host, origin)
# ---------------------------------------------------------------------------
# Scope guard: the check is deliberately limited to loopback hosts. A
# non-loopback Host must NOT trip the guard (even on a mismatch / opaque
# origin) — this preserves the original behaviour and documents that the
# mitigation is localhost-only by design.
# ---------------------------------------------------------------------------
NON_LOOPBACK_HOSTS = [
("203.0.113.5:8188", "http://evil.com"), # public IP literal -> not loopback
("203.0.113.5:8188", "null"),
("10.0.0.5:8188", "null"), # private but not loopback
]
def test_non_loopback_host_not_subject_to_check():
for host, origin in NON_LOOPBACK_HOSTS:
assert is_cross_origin_forbidden(host, origin) is False, (host, origin)
# ---------------------------------------------------------------------------
# is_loopback — the predicate the scoping above depends on.
# ---------------------------------------------------------------------------
def test_is_loopback_true_for_loopback_addresses():
for host in ("127.0.0.1", "::1", "localhost"):
assert is_loopback(host) is True, host
def test_is_loopback_false_for_non_loopback_and_none():
for host in ("203.0.113.5", "10.0.0.5", None):
assert is_loopback(host) is False, host

View File

@ -0,0 +1,192 @@
"""CI unit tests for FIX #2 of GHSA-779p-m5rp-r4h4.
Path traversal / hardening in app/model_manager.py get_model_preview
(route /experiment/models/preview/{folder}/{path_index}/{filename:.*}).
Reference: https://github.com/Comfy-Org/ComfyUI/security/advisories/GHSA-779p-m5rp-r4h4
"""
import pytest
import yarl
from io import BytesIO
from PIL import Image
from aiohttp import web
from unittest.mock import patch
from app.model_manager import ModelFileManager
pytestmark = (
pytest.mark.asyncio
) # This applies the asyncio mark to all test functions in the module
@pytest.fixture
def model_manager():
return ModelFileManager()
@pytest.fixture
def app(model_manager):
app = web.Application()
routes = web.RouteTableDef()
model_manager.add_routes(routes)
app.add_routes(routes)
return app
async def test_legit_preview_returns_200(aiohttp_client, app, tmp_path):
"""Sanity: a real preview PNG inside the model folder is served as webp 200."""
img = Image.new('RGB', (16, 16), color=(255, 0, 128))
img.save(tmp_path / "test_model.png", format='PNG')
with patch('folder_paths.folder_names_and_paths', {
'test_folder': ([str(tmp_path)], None)
}):
client = await aiohttp_client(app)
response = await client.get('/experiment/models/preview/test_folder/0/test_model.png')
assert response.status == 200
assert response.content_type == 'image/webp'
img_bytes = BytesIO(await response.read())
served = Image.open(img_bytes)
assert served.format
assert served.format.lower() == 'webp'
served.close()
async def test_non_integer_path_index_returns_400(aiohttp_client, app, tmp_path):
"""A non-integer path_index segment must be rejected with 400."""
with patch('folder_paths.folder_names_and_paths', {
'test_folder': ([str(tmp_path)], None)
}):
client = await aiohttp_client(app)
response = await client.get('/experiment/models/preview/test_folder/abc/test_model.png')
assert response.status == 400
async def test_out_of_range_path_index_returns_404(aiohttp_client, app, tmp_path):
"""A path_index beyond the configured folder list must return 404."""
with patch('folder_paths.folder_names_and_paths', {
'test_folder': ([str(tmp_path)], None)
}):
client = await aiohttp_client(app)
response = await client.get('/experiment/models/preview/test_folder/99/test_model.png')
assert response.status == 404
async def test_empty_filename_returns_400(aiohttp_client, app, tmp_path):
"""The "{filename:.*}" capture also matches the empty string (trailing
slash). It would resolve to the folder itself and must be rejected with 400."""
with patch('folder_paths.folder_names_and_paths', {
'test_folder': ([str(tmp_path)], None)
}):
client = await aiohttp_client(app)
response = await client.get('/experiment/models/preview/test_folder/0/')
assert response.status == 400
async def test_path_traversal_in_filename_returns_403(aiohttp_client, app, tmp_path):
"""Path traversal in {filename} must be rejected with 403 and must NOT read
a file outside the configured model directory.
GOTCHA: aiohttp/yarl collapses literal ``../`` dot-segments out of the URL
path before it reaches the handler, which would make this test vacuously
pass (the request would hit a different/non-existent route). We percent-encode
the dots and slashes (``%2e%2e%2f``) and send the URL with
``yarl.URL(..., encoded=True)`` so the bytes survive client-side normalization
untouched; aiohttp's router then percent-decodes them into ``match_info``,
delivering the literal ``../`` traversal to the handler's ``{filename:.*}``
capture.
Without the fix the handler computes
``os.path.normpath(os.path.join(folder, "../../../../etc/hosts"))``, which
escapes ``tmp_path`` and would be passed straight to get_model_previews ->
Image.open, serving bytes from outside the model dir (200/served bytes). The
is_within_directory() containment check is the load-bearing fix that turns
that escape into a 403.
"""
# Sanity-anchor: a legit preview exists inside tmp_path, so a 200 path is
# genuinely reachable — proving the 403 below is the containment check
# firing, not an unrelated 404.
img = Image.new('RGB', (16, 16), color=(255, 0, 128))
img.save(tmp_path / "test_model.png", format='PNG')
# Percent-encoded "../../../../etc/hosts" so yarl does not collapse the
# dot-segments before the request leaves the client.
encoded_traversal = '%2e%2e%2f' * 4 + 'etc%2fhosts'
raw_path = '/experiment/models/preview/test_folder/0/' + encoded_traversal
url = yarl.URL(raw_path, encoded=True)
with patch('folder_paths.folder_names_and_paths', {
'test_folder': ([str(tmp_path)], None)
}):
client = await aiohttp_client(app)
response = await client.get(url)
# Confirm the traversal actually reached the handler intact: a 200 here
# would mean either normalization stripped the ``../`` (vacuous pass) or
# the containment check failed open and served outside-dir bytes.
assert response.status == 403, (
f"expected 403 from is_within_directory() containment check, "
f"got {response.status}; traversal may have been normalized away "
f"or the fix failed open"
)
body = await response.read()
assert body == b"", "403 response must not carry any file bytes"
async def test_symlink_companion_preview_returns_403(aiohttp_client, app, tmp_path):
"""A companion preview file is selected by a glob inside get_model_previews
and then opened. If that companion is a symlink whose path is in-dir but
whose target escapes the model folder, it must be rejected with 403 — not
served. The requested path itself stays in-dir (so the first containment
check passes); the load-bearing fix is the SECOND is_within_directory check
on the file actually opened.
"""
model_dir = tmp_path / "models"
model_dir.mkdir()
secret_dir = tmp_path / "secret"
secret_dir.mkdir()
# A real image OUTSIDE the model dir — valid, so without the fix Image.open
# would succeed and its bytes would be served (200).
secret = secret_dir / "secret.png"
Image.new('RGB', (8, 8), color=(0, 0, 0)).save(secret, format='PNG')
# Companion preview, in-dir by name but a symlink escaping the model dir.
# (No real model file is needed — get_model_previews globs companions by
# basename, and omitting a .safetensors avoids the metadata-header read.)
companion = model_dir / "model.preview.png"
try:
companion.symlink_to(secret)
except (OSError, NotImplementedError):
pytest.skip("symlinks not supported on this platform/filesystem")
with patch('folder_paths.folder_names_and_paths', {
'test_folder': ([str(model_dir)], None)
}):
client = await aiohttp_client(app)
response = await client.get('/experiment/models/preview/test_folder/0/model.safetensors')
assert response.status == 403, (
f"expected 403 — the globbed companion preview is a symlink resolving "
f"outside the model dir and must not be served; got {response.status}"
)
assert await response.read() == b""
async def test_null_byte_in_filename_no_500(aiohttp_client, app, tmp_path):
"""A NUL byte in the filename must yield a clean client rejection, not a 500
from an uncaught ValueError in is_within_directory's realpath() call."""
raw_path = '/experiment/models/preview/test_folder/0/' + 'a%00b'
url = yarl.URL(raw_path, encoded=True)
with patch('folder_paths.folder_names_and_paths', {
'test_folder': ([str(tmp_path)], None)
}):
client = await aiohttp_client(app)
response = await client.get(url)
assert response.status != 500, (
f"NUL byte produced a 500 (uncaught ValueError); expected a clean "
f"4xx rejection, got {response.status}"
)
assert 400 <= response.status < 500

View File

@ -0,0 +1,165 @@
"""Security tests for GHSA-779p-m5rp-r4h4 — FIX #3.
Path traversal in folder_paths.get_annotated_filepath / exists_annotated_filepath,
plus the shared is_within_directory() containment helper.
These are pure-function tests (no running server). The input/output/temp
directories are pointed at tmp_path via the folder_paths setters, so a crafted
name containing `../`, an absolute path, or a symlink that escapes the base
directory must be rejected.
Reference: https://github.com/Comfy-Org/ComfyUI/security/advisories/GHSA-779p-m5rp-r4h4
"""
import os
import pytest
import folder_paths
from comfy.options import enable_args_parsing
enable_args_parsing()
@pytest.fixture
def sandbox(tmp_path):
"""Point folder_paths' input/output/temp dirs at a real temp sandbox.
Yields the realpath'd base, input, output and temp directories. The original
directory values are restored afterward so tests stay isolated.
"""
base = os.path.realpath(str(tmp_path))
input_dir = os.path.join(base, "input")
output_dir = os.path.join(base, "output")
temp_dir = os.path.join(base, "temp")
for d in (input_dir, output_dir, temp_dir):
os.makedirs(d, exist_ok=True)
orig_input = folder_paths.get_input_directory()
orig_output = folder_paths.get_output_directory()
orig_temp = folder_paths.get_temp_directory()
folder_paths.set_input_directory(input_dir)
folder_paths.set_output_directory(output_dir)
folder_paths.set_temp_directory(temp_dir)
yield {
"base": base,
"input": input_dir,
"output": output_dir,
"temp": temp_dir,
}
folder_paths.set_input_directory(orig_input)
folder_paths.set_output_directory(orig_output)
folder_paths.set_temp_directory(orig_temp)
# ---------------------------------------------------------------------------
# is_within_directory() — the shared containment helper
# ---------------------------------------------------------------------------
def test_is_within_directory_legit_child(sandbox):
base = sandbox["input"]
child = os.path.join(base, "sub", "image.png")
assert folder_paths.is_within_directory(base, child) is True
def test_is_within_directory_dotdot_escape(sandbox):
base = sandbox["input"]
escape = os.path.join(base, "..", "..", "etc", "passwd")
assert folder_paths.is_within_directory(base, escape) is False
def test_is_within_directory_symlink_escape(sandbox):
"""A symlink created INSIDE base that points OUTSIDE base must not pass.
This is the key new hardening: is_within_directory realpath()s both operands,
so a symlink planted in the base directory can't be used to read files
elsewhere. We create a real on-disk symlink and a real secret target to
verify the check actually resolves the link.
"""
base = sandbox["input"]
# A directory living outside the base, holding a secret file.
outside = os.path.join(sandbox["base"], "outside_secret_dir")
os.makedirs(outside, exist_ok=True)
secret = os.path.join(outside, "secret.txt")
with open(secret, "w") as f:
f.write("top secret")
# Plant a symlink inside base that points at the outside directory.
# symlink creation can require elevated privileges / Developer Mode on
# Windows, so skip cleanly where it isn't available (same guard as the
# sibling test in test_ghsa_779p_02_preview_traversal.py).
link = os.path.join(base, "escape_link")
try:
os.symlink(outside, link)
except (OSError, NotImplementedError):
pytest.skip("symlinks not supported on this platform/filesystem")
# Accessing the secret "through" the in-base symlink must be rejected.
target_via_link = os.path.join(link, "secret.txt")
assert folder_paths.is_within_directory(base, target_via_link) is False
# ---------------------------------------------------------------------------
# get_annotated_filepath()
# ---------------------------------------------------------------------------
def test_get_annotated_filepath_legit_name(sandbox):
result = folder_paths.get_annotated_filepath("image.png")
assert result == os.path.join(sandbox["input"], "image.png")
assert folder_paths.is_within_directory(sandbox["input"], result)
def test_get_annotated_filepath_input_annotation(sandbox):
result = folder_paths.get_annotated_filepath("image.png [input]")
assert result == os.path.join(sandbox["input"], "image.png")
def test_get_annotated_filepath_output_annotation(sandbox):
result = folder_paths.get_annotated_filepath("image.png [output]")
assert result == os.path.join(sandbox["output"], "image.png")
def test_get_annotated_filepath_temp_annotation(sandbox):
result = folder_paths.get_annotated_filepath("image.png [temp]")
assert result == os.path.join(sandbox["temp"], "image.png")
def test_get_annotated_filepath_dotdot_raises(sandbox):
with pytest.raises(ValueError):
folder_paths.get_annotated_filepath("../etc/passwd")
def test_get_annotated_filepath_dotdot_with_annotation_raises(sandbox):
with pytest.raises(ValueError):
folder_paths.get_annotated_filepath("../../etc/passwd [output]")
def test_get_annotated_filepath_absolute_escape_raises(sandbox):
with pytest.raises(ValueError):
folder_paths.get_annotated_filepath("/etc/passwd")
# ---------------------------------------------------------------------------
# exists_annotated_filepath()
# ---------------------------------------------------------------------------
def test_exists_annotated_filepath_existing_legit_file(sandbox):
real = os.path.join(sandbox["input"], "real.png")
with open(real, "w") as f:
f.write("data")
assert folder_paths.exists_annotated_filepath("real.png") is True
def test_exists_annotated_filepath_traversal_returns_false(sandbox):
"""A traversal name must return False without raising and without probing
outside the base directory (must never reach os.path.exists for the escape).
"""
# /etc/passwd exists on POSIX; the function must still report False because
# the resolved path escapes the input directory.
assert folder_paths.exists_annotated_filepath("../../../../../../etc/passwd") is False
def test_exists_annotated_filepath_absolute_returns_false(sandbox):
assert folder_paths.exists_annotated_filepath("/etc/passwd") is False

View File

@ -0,0 +1,147 @@
"""
CI unit tests for FIX #4 of GHSA-779p-m5rp-r4h4.
Stored-XSS hardening on GET /userdata/{file} in app/user_manager.py.
User data files are arbitrary user-supplied content and must never render
inline in the app origin. The getuserdata handler:
- forces Content-Type to application/octet-stream for any type in
folder_paths.DANGEROUS_CONTENT_TYPES (text/html, image/svg+xml,
text/javascript, ...),
- sets X-Content-Type-Options: nosniff,
- sets Content-Disposition: attachment.
These tests pre-create files in tmp_path and GET them back, asserting the
secure response headers. They mirror the aiohttp_client pattern in
tests-unit/prompt_server_test/user_manager_test.py.
"""
import pytest
import os
from aiohttp import web
from app.user_manager import UserManager
pytestmark = (
pytest.mark.asyncio
) # This applies the asyncio mark to all test functions in the module
@pytest.fixture
def user_manager(tmp_path):
um = UserManager()
um.get_request_user_filepath = lambda req, file, **kwargs: os.path.join(
tmp_path, file
) if file else tmp_path
return um
@pytest.fixture
def app(user_manager):
app = web.Application()
routes = web.RouteTableDef()
user_manager.add_routes(routes)
app.add_routes(routes)
return app
async def test_html_served_as_octet_stream(aiohttp_client, app, tmp_path):
(tmp_path / "evil.html").write_text(
"<script>console.log('xss-marker-ghsa-779p')</script>"
)
client = await aiohttp_client(app)
resp = await client.get("/userdata/evil.html")
assert resp.status == 200
ct = resp.headers.get("Content-Type", "")
# The load-bearing assertion: a .html file must NOT be served as text/html.
assert "text/html" not in ct.lower(), (
f"Content-Type {ct!r} would let a browser render/execute the file (stored XSS)."
)
assert ct == "application/octet-stream"
assert resp.headers.get("X-Content-Type-Options") == "nosniff"
assert "attachment" in resp.headers.get("Content-Disposition", "")
async def test_svg_served_as_octet_stream(aiohttp_client, app, tmp_path):
(tmp_path / "evil.svg").write_text(
'<?xml version="1.0"?>'
'<svg xmlns="http://www.w3.org/2000/svg">'
'<script>console.log("xss-marker-ghsa-779p")</script>'
"</svg>"
)
client = await aiohttp_client(app)
resp = await client.get("/userdata/evil.svg")
assert resp.status == 200
ct = resp.headers.get("Content-Type", "")
# SVG can carry inline <script>; it must not be served as image/svg+xml.
assert "svg" not in ct.lower(), (
f"Content-Type {ct!r} would let a browser render the SVG and execute embedded scripts."
)
assert ct == "application/octet-stream"
assert resp.headers.get("X-Content-Type-Options") == "nosniff"
assert "attachment" in resp.headers.get("Content-Disposition", "")
async def test_js_served_as_octet_stream(aiohttp_client, app, tmp_path):
(tmp_path / "evil.js").write_text("alert('xss-marker-ghsa-779p')")
client = await aiohttp_client(app)
resp = await client.get("/userdata/evil.js")
assert resp.status == 200
ct = resp.headers.get("Content-Type", "").lower()
# Must not be served as any executable JavaScript content type.
assert "javascript" not in ct, (
f"Content-Type {ct!r} is an executable JS type."
)
assert "ecmascript" not in ct, (
f"Content-Type {ct!r} is an executable JS type."
)
assert ct == "application/octet-stream"
assert resp.headers.get("X-Content-Type-Options") == "nosniff"
assert "attachment" in resp.headers.get("Content-Disposition", "")
async def test_xml_dialect_served_as_octet_stream(aiohttp_client, app, tmp_path):
"""An XML dialect outside the original blocklist (.xslt -> application/xslt+xml)
must still be forced to download. This pins the normalised *+xml family rule
in folder_paths.is_dangerous_content_type(); a plain set-membership test would
have served this inline."""
(tmp_path / "evil.xslt").write_text(
'<?xml version="1.0"?>'
'<xsl:stylesheet version="1.0" '
'xmlns:xsl="http://www.w3.org/1999/XSL/Transform">'
"<!-- xss-marker-ghsa-779p -->"
"</xsl:stylesheet>"
)
client = await aiohttp_client(app)
resp = await client.get("/userdata/evil.xslt")
assert resp.status == 200
ct = resp.headers.get("Content-Type", "")
assert ct == "application/octet-stream", (
f"Content-Type {ct!r}: an *+xml dialect must be forced to octet-stream "
f"(it can carry inline script via stylesheet/entity tricks)."
)
assert resp.headers.get("X-Content-Type-Options") == "nosniff"
assert "attachment" in resp.headers.get("Content-Disposition", "")
async def test_benign_txt_still_served(aiohttp_client, app, tmp_path):
(tmp_path / "note.txt").write_text("just a harmless note")
client = await aiohttp_client(app)
resp = await client.get("/userdata/note.txt")
assert resp.status == 200
assert await resp.text() == "just a harmless note"
ct = resp.headers.get("Content-Type", "")
# text/plain is not in the dangerous set, so it is acceptable here. The
# defence-in-depth headers must still be present regardless.
assert "text/plain" in ct.lower()
assert resp.headers.get("X-Content-Type-Options") == "nosniff"
assert "attachment" in resp.headers.get("Content-Disposition", "")

View File

@ -0,0 +1,138 @@
"""CI unit guard for FIX #5 of GHSA-779p-m5rp-r4h4 — the /view forced-download set.
Vuln #5 was stored XSS via SVG upload: the /view endpoint's Content-Type
blocklist covered text/html, text/javascript, etc. but was missing
image/svg+xml, so an uploaded SVG carrying an inline <script> was served as
image/svg+xml and executed in the page origin when rendered.
The /view forced-download decision lives in the view_image closure registered by
server.PromptServer.add_routes (server.py ~line 596), which calls
`folder_paths.is_dangerous_content_type(content_type)` — a normalising check that
strips charset/boundary parameters and casing and folds in the whole */xml and
*+xml dialect family — rather than a bypassable raw
`content_type in folder_paths.DANGEROUS_CONTENT_TYPES` membership test. On a match
it rewrites the response to application/octet-stream with a
Content-Disposition: attachment header. server.py cannot be imported in a unit
test (importing it spins up the full PromptServer/aiohttp app and its global side
effects), so these tests pin the underlying dangerous-content data
(folder_paths.DANGEROUS_CONTENT_TYPES) and the normalising is_dangerous_content_type()
helper that the closure actually calls.
The end-to-end /view assertion (upload an SVG, GET /view, confirm the response
is not served as image/svg+xml) lives in the live POC at
.security/pocs/test_security_ghsa_779p.py::TestViewSvgContentType, which
requires a running server. This file is the fast, server-free CI guard on the
set contents so the blocklist can't silently regress.
"""
import folder_paths
# Active/renderable content types that must be forced to download. Each of these
# can carry an inline <script> (or otherwise execute) in the page origin if a
# browser renders it. image/svg+xml is the original missing item that caused
# vuln #5.
DANGEROUS = [
'image/svg+xml',
'application/xml',
'text/xml',
'text/html',
'text/html-sandboxed',
'application/xhtml+xml',
'text/javascript',
'application/javascript',
'application/x-javascript',
'application/ecmascript',
'text/css',
]
# Benign image types that browsers display inline and that must keep rendering;
# forcing these to download would break legitimate previews.
BENIGN_INLINE_IMAGES = [
'image/png',
'image/jpeg',
'image/webp',
'image/gif',
]
def test_dangerous_content_types_is_a_set():
assert isinstance(folder_paths.DANGEROUS_CONTENT_TYPES, set)
def test_svg_is_in_the_blocklist():
"""The specific item whose absence caused vuln #5."""
assert 'image/svg+xml' in folder_paths.DANGEROUS_CONTENT_TYPES, (
"image/svg+xml missing from DANGEROUS_CONTENT_TYPES — this is exactly "
"the regression that reopens GHSA-779p-m5rp-r4h4 vuln #5 (stored XSS "
"via SVG upload on /view)."
)
def test_all_dangerous_types_present():
missing = [ct for ct in DANGEROUS if ct not in folder_paths.DANGEROUS_CONTENT_TYPES]
assert not missing, (
f"DANGEROUS_CONTENT_TYPES is missing required active/renderable types: "
f"{missing}. The /view closure only forces a download for content types "
f"in this set; anything missing here is served inline and can execute."
)
def test_benign_inline_image_types_absent():
leaked = [ct for ct in BENIGN_INLINE_IMAGES if ct in folder_paths.DANGEROUS_CONTENT_TYPES]
assert not leaked, (
f"Benign inline-displayable image types found in DANGEROUS_CONTENT_TYPES: "
f"{leaked}. Forcing these to download would break legitimate image "
f"previews in /view — they must keep rendering inline."
)
# ---------------------------------------------------------------------------
# is_dangerous_content_type() — the normalising check the /view and /userdata
# handlers now call instead of a raw `in DANGEROUS_CONTENT_TYPES` membership
# test. An exact-string membership test was bypassable with a charset parameter
# or odd casing, and missed the wider XML dialect family; these tests pin the
# normalisation so that bypass can't reopen.
# ---------------------------------------------------------------------------
def test_function_matches_plain_dangerous_types():
for ct in DANGEROUS:
assert folder_paths.is_dangerous_content_type(ct) is True, ct
def test_function_strips_parameters_and_casing():
"""A charset/boundary parameter or casing must not slip a type past the check.
This is the bypass surfaced by review: the /view blake3 branch can serve an
attacker-controlled, unvalidated asset mime_type like 'text/html; charset=utf-8',
which an exact-string set test missed.
"""
for ct in (
'text/html; charset=utf-8',
'TEXT/HTML',
'Text/HTML; charset=UTF-8',
'image/svg+xml; charset=utf-8',
' text/html ',
):
assert folder_paths.is_dangerous_content_type(ct) is True, ct
def test_function_covers_xml_dialect_family():
"""Any *+xml / */xml dialect is dangerous without enumerating each one."""
for ct in (
'application/xslt+xml',
'application/rss+xml',
'application/atom+xml',
'application/rdf+xml',
'application/mathml+xml',
'message/rfc822',
):
assert folder_paths.is_dangerous_content_type(ct) is True, ct
def test_function_allows_benign_and_empty():
for ct in BENIGN_INLINE_IMAGES + ['application/octet-stream', 'text/plain']:
assert folder_paths.is_dangerous_content_type(ct) is False, ct
# None / empty (mimetypes.guess_type miss) must not be treated as dangerous.
assert folder_paths.is_dangerous_content_type(None) is False
assert folder_paths.is_dangerous_content_type('') is False

91
utils/origin_check.py Normal file
View File

@ -0,0 +1,91 @@
"""Host/Origin CSRF check for the loopback dev server.
Extracted verbatim from ``server.create_origin_only_middleware`` so the decision
logic is importable and unit-testable without standing up the full
PromptServer/aiohttp app (importing ``server`` pulls in ``nodes``/``execution``/
torch and has global side effects). The wiring lives in ``server.py``; the
regression guard for GHSA-779p-m5rp-r4h4 finding #1 (CSRF bypass via
``Origin: null``) lives in
``tests-unit/security_test/test_ghsa_779p_01_origin_csrf.py``.
Only ``urllib.parse``/``ipaddress``/``socket`` (stdlib) are imported here, so the
module stays cheap to import from a unit test.
"""
import ipaddress
import socket
import urllib.parse
def is_loopback(host):
if host is None:
return False
try:
if ipaddress.ip_address(host).is_loopback:
return True
else:
return False
except ValueError:
# Not an IP literal (ip_address raises ValueError); fall through to DNS
# resolution below. Narrowed from a bare except so genuine interrupts
# (KeyboardInterrupt/SystemExit) aren't swallowed.
pass
loopback = False
for family in (socket.AF_INET, socket.AF_INET6):
try:
r = socket.getaddrinfo(host, None, family, socket.SOCK_STREAM)
for family, _, _, _, sockaddr in r:
if not ipaddress.ip_address(sockaddr[0]).is_loopback:
return loopback
else:
loopback = True
except socket.gaierror:
pass
return loopback
def is_cross_origin_forbidden(host, origin):
"""Return True if a request with these ``Host``/``Origin`` headers must be rejected (403).
This prevents the case where a random website can queue Comfy workflows by
making a POST to 127.0.0.1, which browsers don't prevent. In that case the
Host and Origin hostnames won't match. The check is intentionally limited to
when the Host resolves to a loopback address; for non-loopback hosts it
returns False (it is a localhost-CSRF mitigation, not a general same-origin
enforcer).
GHSA-779p-m5rp-r4h4 #1 fix: an opaque origin (e.g. ``"null"`` sent by a
sandboxed iframe or a ``data:``/``file:`` document) parses to an empty/None
host. Previously such requests skipped the comparison entirely, which let an
attacker bypass the host/origin CSRF check with ``Origin: null``. A missing
or empty origin host is now treated as a mismatch and rejected.
"""
host_domain = host.lower()
parsed = urllib.parse.urlparse(origin)
origin_domain = parsed.netloc.lower()
host_domain_parsed = urllib.parse.urlsplit('//' + host_domain)
# A non-numeric or out-of-range port (e.g. Origin: http://127.0.0.1:99999)
# makes urllib raise ValueError on .port access. Treat a malformed port as a
# rejected request rather than letting it surface as an uncaught 500 in the
# middleware — it fails closed, consistent with the CSRF stance.
try:
origin_port = parsed.port
host_port = host_domain_parsed.port
except ValueError:
return True
loopback = is_loopback(host_domain_parsed.hostname)
if origin_port is None: # if origin doesn't have a port strip it from the host to handle weird browsers, same for host
host_domain = host_domain_parsed.hostname
if host_port is None:
origin_domain = parsed.hostname
if loopback and host_domain is not None and len(host_domain) > 0:
if origin_domain is None or len(origin_domain) == 0 or host_domain != origin_domain:
return True
return False