Files
ragflow/api/utils/web_utils.py
Xing Hong fb95136f39 Fix: validate URL scheme and resolved IP before crawling to prevent SSRF (#14090)
### What problem does this PR solve?

The POST /upload_info?url=<url> endpoint accepted a user-supplied URL
and passed it directly to AsyncWebCrawler without any validation. There
were no restrictions on URL scheme, destination hostname, or resolved IP
address. This allowed any authenticated user to instruct the server to
make outbound HTTP requests to internal infrastructure — including RFC
1918 private networks, loopback addresses, and cloud metadata services
such as http://169.254.169.254 — effectively using the server as a proxy
for internal network reconnaissance or credential theft.

This PR adds an SSRF guard (_validate_url_for_crawl) that runs before
any crawl is initiated. It enforces an allowlist of safe schemes
(http/https), resolves the hostname at validation time, and rejects any
URL whose resolved IP falls within a private or reserved network range.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
2026-04-25 14:30:15 +08:00

269 lines
7.9 KiB
Python

#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import base64
import json
import re
import aiosmtplib
from email.mime.text import MIMEText
from email.header import Header
from common import settings
from quart import render_template_string
from api.utils.email_templates import EMAIL_TEMPLATES
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.expected_conditions import staleness_of
from selenium.webdriver.support.ui import WebDriverWait
from webdriver_manager.chrome import ChromeDriverManager
OTP_LENGTH = 4
OTP_TTL_SECONDS = 5 * 60 # valid for 5 minutes
ATTEMPT_LIMIT = 5 # maximum attempts
ATTEMPT_LOCK_SECONDS = 30 * 60 # lock for 30 minutes
RESEND_COOLDOWN_SECONDS = 60 # cooldown for 1 minute
CONTENT_TYPE_MAP = {
# Office
"docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"doc": "application/msword",
"pdf": "application/pdf",
"csv": "text/csv",
"xls": "application/vnd.ms-excel",
"xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
# Text/code
"txt": "text/plain",
"py": "text/plain",
"js": "text/plain",
"java": "text/plain",
"c": "text/plain",
"cpp": "text/plain",
"h": "text/plain",
"php": "text/plain",
"go": "text/plain",
"ts": "text/plain",
"sh": "text/plain",
"cs": "text/plain",
"kt": "text/plain",
"sql": "text/plain",
# Web
"md": "text/markdown",
"markdown": "text/markdown",
"mdx": "text/markdown",
"htm": "text/html",
"html": "text/html",
"json": "application/json",
# Image formats
"png": "image/png",
"jpg": "image/jpeg",
"jpeg": "image/jpeg",
"gif": "image/gif",
"bmp": "image/bmp",
"tiff": "image/tiff",
"tif": "image/tiff",
"webp": "image/webp",
"svg": "image/svg+xml",
"ico": "image/x-icon",
"avif": "image/avif",
"heic": "image/heic",
# PPTX
"ppt": "application/vnd.ms-powerpoint",
"pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
}
FORCE_ATTACHMENT_EXTENSIONS = {
"htm",
"html",
"shtml",
"xht",
"xhtml",
"xml",
"mhtml",
"svg",
}
FORCE_ATTACHMENT_CONTENT_TYPES = {
"text/html",
"image/svg+xml",
"application/xhtml+xml",
"text/xml",
"application/xml",
"multipart/related",
}
def should_force_attachment(ext: str | None, content_type: str | None = None) -> bool:
normalized_ext = (ext or "").lower().strip(".")
if normalized_ext in FORCE_ATTACHMENT_EXTENSIONS:
return True
normalized_type = (content_type or "").lower()
return normalized_type in FORCE_ATTACHMENT_CONTENT_TYPES
def apply_safe_file_response_headers(response, content_type: str | None, ext: str | None = None):
if content_type:
response.headers.set("Content-Type", content_type)
force_attachment = should_force_attachment(ext, content_type)
if force_attachment:
response.headers.set("X-Content-Type-Options", "nosniff")
response.headers.set("Content-Disposition", "attachment")
return response
def html2pdf(
source: str,
timeout: int = 2,
install_driver: bool = True,
print_options: dict = {},
):
result = __get_pdf_from_html(source, timeout, install_driver, print_options)
return result
def __send_devtools(driver, cmd, params={}):
resource = "/session/%s/chromium/send_command_and_get_result" % driver.session_id
url = driver.command_executor._url + resource
body = json.dumps({"cmd": cmd, "params": params})
response = driver.command_executor._request("POST", url, body)
if not response:
raise Exception(response.get("value"))
return response.get("value")
def __get_pdf_from_html(path: str, timeout: int, install_driver: bool, print_options: dict):
webdriver_options = Options()
webdriver_prefs = {}
webdriver_options.add_argument("--headless")
webdriver_options.add_argument("--disable-gpu")
webdriver_options.add_argument("--no-sandbox")
webdriver_options.add_argument("--disable-dev-shm-usage")
webdriver_options.experimental_options["prefs"] = webdriver_prefs
webdriver_prefs["profile.default_content_settings"] = {"images": 2}
if install_driver:
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=webdriver_options)
else:
driver = webdriver.Chrome(options=webdriver_options)
driver.get(path)
try:
WebDriverWait(driver, timeout).until(staleness_of(driver.find_element(by=By.TAG_NAME, value="html")))
except TimeoutException:
calculated_print_options = {
"landscape": False,
"displayHeaderFooter": False,
"printBackground": True,
"preferCSSPageSize": True,
}
calculated_print_options.update(print_options)
result = __send_devtools(driver, "Page.printToPDF", calculated_print_options)
driver.quit()
return base64.b64decode(result["data"])
def is_valid_url(url: str) -> bool:
if not re.match(r"(https?)://[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]", url):
return False
from common.ssrf_guard import assert_url_is_safe
try:
assert_url_is_safe(url)
return True
except ValueError:
return False
def safe_json_parse(data: str | dict) -> dict:
if isinstance(data, dict):
return data
try:
return json.loads(data) if data else {}
except (json.JSONDecodeError, TypeError):
return {}
def get_float(req: dict, key: str, default: float | int = 10.0) -> float:
try:
parsed = float(req.get(key, default))
return parsed if parsed > 0 else default
except (TypeError, ValueError):
return default
async def send_email_html(to_email: str, subject: str, template_key: str, **context):
body = await render_template_string(EMAIL_TEMPLATES.get(template_key), **context)
msg = MIMEText(body, "plain", "utf-8")
msg["Subject"] = Header(subject, "utf-8")
msg["From"] = f"{settings.MAIL_DEFAULT_SENDER[0]} <{settings.MAIL_DEFAULT_SENDER[1]}>"
msg["To"] = to_email
smtp = aiosmtplib.SMTP(
hostname=settings.MAIL_SERVER,
port=settings.MAIL_PORT,
use_tls=True,
timeout=10,
)
await smtp.connect()
await smtp.login(settings.MAIL_USERNAME, settings.MAIL_PASSWORD)
await smtp.send_message(msg)
await smtp.quit()
async def send_invite_email(to_email, invite_url, tenant_id, inviter):
# Reuse the generic HTML sender with 'invite' template
await send_email_html(
to_email=to_email,
subject="RAGFlow Invitation",
template_key="invite",
email=to_email,
invite_url=invite_url,
tenant_id=tenant_id,
inviter=inviter,
)
def otp_keys(email: str):
email = (email or "").strip().lower()
return (
f"otp:{email}",
f"otp_attempts:{email}",
f"otp_last_sent:{email}",
f"otp_lock:{email}",
)
def hash_code(code: str, salt: bytes) -> str:
import hashlib
import hmac
return hmac.new(salt, (code or "").encode("utf-8"), hashlib.sha256).hexdigest()
def captcha_key(email: str) -> str:
return f"captcha:{email}"