test(ssrf_proxy): Add integration test for ssrf proxy

Signed-off-by: -LAN- <laipz8200@outlook.com>
This commit is contained in:
-LAN-
2025-09-01 11:40:36 +08:00
parent fb36069f1c
commit 42110a8217
8 changed files with 962 additions and 16 deletions

View File

@ -0,0 +1,118 @@
# SSRF Proxy Test Cases
## Overview
The SSRF proxy test suite uses YAML files to define test cases, making them easier to maintain and extend without modifying code. These tests validate the SSRF proxy configuration in `docker/ssrf_proxy/`.
## Location
These tests are located in `api/tests/integration_tests/ssrf_proxy/` because they require the Python environment from the API project.
## Usage
### Basic Testing
From the `api/` directory:
```bash
uv run python tests/integration_tests/ssrf_proxy/test_ssrf_proxy.py
```
Or from the repository root:
```bash
cd api && uv run python tests/integration_tests/ssrf_proxy/test_ssrf_proxy.py
```
### List Available Tests
View all test cases without running them:
```bash
uv run python tests/integration_tests/ssrf_proxy/test_ssrf_proxy.py --list-tests
```
### Use Custom Test File
Run tests from a specific YAML file:
```bash
uv run python tests/integration_tests/ssrf_proxy/test_ssrf_proxy.py --test-file test_cases_extended.yaml
```
### Command Line Options
- `--host HOST`: Proxy host (default: localhost)
- `--port PORT`: Proxy port (default: 3128)
- `--no-container`: Don't start container (assume proxy is already running)
- `--save-results`: Save test results to JSON file
- `--test-file FILE`: Path to YAML file containing test cases
- `--list-tests`: List all test cases without running them
## YAML Test Case Format
Test cases are organized by categories in YAML files:
```yaml
test_categories:
category_key:
name: "Category Display Name"
description: "Category description"
test_cases:
- name: "Test Case Name"
url: "http://example.com"
expected_blocked: false # true if should be blocked, false if allowed
description: "Optional test description"
```
## Available Test Files
1. **test_cases.yaml** - Standard test suite with essential test cases
2. **test_cases_extended.yaml** - Extended test suite with additional edge cases and scenarios
Both files are located in `api/tests/integration_tests/ssrf_proxy/`
## Categories
### Standard Categories
- **Private Networks**: Tests for blocking private IP ranges and loopback addresses
- **Cloud Metadata**: Tests for blocking cloud provider metadata endpoints
- **Public Internet**: Tests for allowing legitimate public internet access
- **Port Restrictions**: Tests for port-based access control
### Extended Categories (in test_cases_extended.yaml)
- **IPv6 Tests**: Tests for IPv6 address handling
- **Special Cases**: Edge cases like decimal/octal/hex IP notation
## Adding New Test Cases
1. Edit the YAML file (or create a new one)
2. Add test cases under appropriate categories
3. Run with `--test-file` option if using a custom file
Example:
```yaml
test_categories:
custom_tests:
name: "Custom Tests"
description: "My custom test cases"
test_cases:
- name: "Custom Test 1"
url: "http://test.example.com"
expected_blocked: false
description: "Testing custom domain"
```
## What Gets Tested
The tests validate the SSRF proxy configuration files in `docker/ssrf_proxy/`:
- `squid.conf.template` - Squid proxy configuration
- `docker-entrypoint.sh` - Container initialization script
- `conf.d/` - Additional configuration files (if present)
## Benefits
- **Maintainability**: Test cases can be updated without code changes
- **Extensibility**: Easy to add new test cases or categories
- **Clarity**: YAML format is human-readable and self-documenting
- **Flexibility**: Multiple test files for different scenarios
- **Fallback**: Code includes default test cases if YAML loading fails
- **Integration**: Properly integrated with the API project's Python environment

View File

@ -0,0 +1 @@
"""SSRF Proxy Integration Tests"""

View File

@ -0,0 +1,129 @@
# SSRF Proxy Test Cases Configuration
# This file defines all test cases for the SSRF proxy
# Each test case validates whether the proxy correctly blocks or allows requests
test_categories:
private_networks:
name: "Private Networks"
description: "Tests for blocking private IP ranges and loopback addresses"
test_cases:
- name: "Loopback (127.0.0.1)"
url: "http://127.0.0.1"
expected_blocked: true
description: "IPv4 loopback address"
- name: "Localhost"
url: "http://localhost"
expected_blocked: true
description: "Localhost hostname"
- name: "Private 10.x.x.x"
url: "http://10.0.0.1"
expected_blocked: true
description: "RFC 1918 private network"
- name: "Private 172.16.x.x"
url: "http://172.16.0.1"
expected_blocked: true
description: "RFC 1918 private network"
- name: "Private 192.168.x.x"
url: "http://192.168.1.1"
expected_blocked: true
description: "RFC 1918 private network"
- name: "Link-local"
url: "http://169.254.1.1"
expected_blocked: true
description: "Link-local address"
- name: "This network"
url: "http://0.0.0.0"
expected_blocked: true
description: "'This' network address"
cloud_metadata:
name: "Cloud Metadata"
description: "Tests for blocking cloud provider metadata endpoints"
test_cases:
- name: "AWS Metadata"
url: "http://169.254.169.254/latest/meta-data/"
expected_blocked: true
description: "AWS EC2 metadata endpoint"
- name: "Azure Metadata"
url: "http://169.254.169.254/metadata/instance"
expected_blocked: true
description: "Azure metadata endpoint"
# Note: metadata.google.internal is not included as it may resolve to public IPs
public_internet:
name: "Public Internet"
description: "Tests for allowing legitimate public internet access"
test_cases:
- name: "Example.com"
url: "http://example.com"
expected_blocked: false
description: "Public website"
- name: "Google HTTPS"
url: "https://www.google.com"
expected_blocked: false
description: "HTTPS public website"
- name: "HTTPBin API"
url: "http://httpbin.org/get"
expected_blocked: false
description: "Public API endpoint"
- name: "GitHub API"
url: "https://api.github.com"
expected_blocked: false
description: "Public API over HTTPS"
port_restrictions:
name: "Port Restrictions"
description: "Tests for port-based access control"
test_cases:
- name: "HTTP Port 80"
url: "http://example.com:80"
expected_blocked: false
description: "Standard HTTP port"
- name: "HTTPS Port 443"
url: "http://example.com:443"
expected_blocked: false
description: "Standard HTTPS port"
- name: "Port 8080"
url: "http://example.com:8080"
expected_blocked: true
description: "Non-standard port"
- name: "Port 3000"
url: "http://example.com:3000"
expected_blocked: true
description: "Development port"
- name: "SSH Port 22"
url: "http://example.com:22"
expected_blocked: true
description: "SSH port"
- name: "MySQL Port 3306"
url: "http://example.com:3306"
expected_blocked: true
description: "Database port"
# Additional test configurations can be added here
# For example:
#
# ipv6_tests:
# name: "IPv6 Tests"
# description: "Tests for IPv6 address handling"
# test_cases:
# - name: "IPv6 Loopback"
# url: "http://[::1]"
# expected_blocked: true
# description: "IPv6 loopback address"

View File

@ -0,0 +1,219 @@
# Extended SSRF Proxy Test Cases Configuration
# This file contains additional test cases for comprehensive testing
# Use with: python test_ssrf_proxy.py --test-file test_cases_extended.yaml
test_categories:
# Standard test cases
private_networks:
name: "Private Networks"
description: "Tests for blocking private IP ranges and loopback addresses"
test_cases:
- name: "Loopback (127.0.0.1)"
url: "http://127.0.0.1"
expected_blocked: true
description: "IPv4 loopback address"
- name: "Localhost"
url: "http://localhost"
expected_blocked: true
description: "Localhost hostname"
- name: "Private 10.x.x.x"
url: "http://10.0.0.1"
expected_blocked: true
description: "RFC 1918 private network"
- name: "Private 172.16.x.x"
url: "http://172.16.0.1"
expected_blocked: true
description: "RFC 1918 private network"
- name: "Private 192.168.x.x"
url: "http://192.168.1.1"
expected_blocked: true
description: "RFC 1918 private network"
- name: "Link-local"
url: "http://169.254.1.1"
expected_blocked: true
description: "Link-local address"
- name: "This network"
url: "http://0.0.0.0"
expected_blocked: true
description: "'This' network address"
cloud_metadata:
name: "Cloud Metadata"
description: "Tests for blocking cloud provider metadata endpoints"
test_cases:
- name: "AWS Metadata"
url: "http://169.254.169.254/latest/meta-data/"
expected_blocked: true
description: "AWS EC2 metadata endpoint"
- name: "Azure Metadata"
url: "http://169.254.169.254/metadata/instance"
expected_blocked: true
description: "Azure metadata endpoint"
- name: "DigitalOcean Metadata"
url: "http://169.254.169.254/metadata/v1"
expected_blocked: true
description: "DigitalOcean metadata endpoint"
- name: "Oracle Cloud Metadata"
url: "http://169.254.169.254/opc/v1"
expected_blocked: true
description: "Oracle Cloud metadata endpoint"
public_internet:
name: "Public Internet"
description: "Tests for allowing legitimate public internet access"
test_cases:
- name: "Example.com"
url: "http://example.com"
expected_blocked: false
description: "Public website"
- name: "Google HTTPS"
url: "https://www.google.com"
expected_blocked: false
description: "HTTPS public website"
- name: "HTTPBin API"
url: "http://httpbin.org/get"
expected_blocked: false
description: "Public API endpoint"
- name: "GitHub API"
url: "https://api.github.com"
expected_blocked: false
description: "Public API over HTTPS"
- name: "OpenAI API"
url: "https://api.openai.com"
expected_blocked: false
description: "OpenAI API endpoint"
- name: "Anthropic API"
url: "https://api.anthropic.com"
expected_blocked: false
description: "Anthropic API endpoint"
port_restrictions:
name: "Port Restrictions"
description: "Tests for port-based access control"
test_cases:
- name: "HTTP Port 80"
url: "http://example.com:80"
expected_blocked: false
description: "Standard HTTP port"
- name: "HTTPS Port 443"
url: "http://example.com:443"
expected_blocked: false
description: "Standard HTTPS port"
- name: "Port 8080"
url: "http://example.com:8080"
expected_blocked: true
description: "Alternative HTTP port"
- name: "Port 3000"
url: "http://example.com:3000"
expected_blocked: true
description: "Node.js development port"
- name: "SSH Port 22"
url: "http://example.com:22"
expected_blocked: true
description: "SSH port"
- name: "Telnet Port 23"
url: "http://example.com:23"
expected_blocked: true
description: "Telnet port"
- name: "SMTP Port 25"
url: "http://example.com:25"
expected_blocked: true
description: "SMTP mail port"
- name: "MySQL Port 3306"
url: "http://example.com:3306"
expected_blocked: true
description: "MySQL database port"
- name: "PostgreSQL Port 5432"
url: "http://example.com:5432"
expected_blocked: true
description: "PostgreSQL database port"
- name: "Redis Port 6379"
url: "http://example.com:6379"
expected_blocked: true
description: "Redis port"
- name: "MongoDB Port 27017"
url: "http://example.com:27017"
expected_blocked: true
description: "MongoDB port"
ipv6_tests:
name: "IPv6 Tests"
description: "Tests for IPv6 address handling"
test_cases:
- name: "IPv6 Loopback"
url: "http://[::1]"
expected_blocked: true
description: "IPv6 loopback address"
- name: "IPv6 All zeros"
url: "http://[::]"
expected_blocked: true
description: "IPv6 all zeros address"
- name: "IPv6 Link-local"
url: "http://[fe80::1]"
expected_blocked: true
description: "IPv6 link-local address"
- name: "IPv6 Unique local"
url: "http://[fc00::1]"
expected_blocked: true
description: "IPv6 unique local address"
special_cases:
name: "Special Cases"
description: "Edge cases and special scenarios"
test_cases:
- name: "Decimal IP notation"
url: "http://2130706433"
expected_blocked: true
description: "127.0.0.1 in decimal notation"
- name: "Octal IP notation"
url: "http://0177.0.0.1"
expected_blocked: true
description: "127.0.0.1 with octal notation"
- name: "Hex IP notation"
url: "http://0x7f.0.0.1"
expected_blocked: true
description: "127.0.0.1 with hex notation"
- name: "Mixed notation"
url: "http://0x7f.0.0.0x1"
expected_blocked: true
description: "127.0.0.1 with mixed hex notation"
- name: "Localhost with port"
url: "http://localhost:8080"
expected_blocked: true
description: "Localhost with non-standard port"
- name: "Domain with private IP"
url: "http://192-168-1-1.example.com"
expected_blocked: false
description: "Domain that looks like private IP (should resolve)"

View File

@ -0,0 +1,423 @@
#!/usr/bin/env python3
"""
SSRF Proxy Test Suite
This script tests the SSRF proxy configuration to ensure it blocks
private networks while allowing public internet access.
"""
import argparse
import json
import os
import subprocess
import sys
import time
import urllib.error
import urllib.request
from dataclasses import dataclass
from enum import Enum
from typing import final
import yaml
# Color codes for terminal output
class Colors:
RED: str = "\033[0;31m"
GREEN: str = "\033[0;32m"
YELLOW: str = "\033[1;33m"
BLUE: str = "\033[0;34m"
NC: str = "\033[0m" # No Color
class TestResult(Enum):
PASSED = "passed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class TestCase:
name: str
url: str
expected_blocked: bool
category: str
description: str = ""
@final
class SSRFProxyTester:
def __init__(self, proxy_host: str = "localhost", proxy_port: int = 3128, test_file: str | None = None):
self.proxy_host = proxy_host
self.proxy_port = proxy_port
self.proxy_url = f"http://{proxy_host}:{proxy_port}"
self.container_name = "ssrf-proxy-test"
self.image = "ubuntu/squid:latest"
self.results: list[dict[str, object]] = []
self.test_file = test_file or "test_cases.yaml"
def start_proxy_container(self) -> bool:
"""Start the SSRF proxy container"""
print(f"{Colors.YELLOW}Starting SSRF proxy container...{Colors.NC}")
# Stop and remove existing container if exists
_ = subprocess.run(["docker", "stop", self.container_name], capture_output=True, text=True)
_ = subprocess.run(["docker", "rm", self.container_name], capture_output=True, text=True)
# Get directories for mounting config files
script_dir = os.path.dirname(os.path.abspath(__file__))
# Docker config files are in docker/ssrf_proxy relative to project root
project_root = os.path.abspath(os.path.join(script_dir, "..", "..", "..", ".."))
docker_config_dir = os.path.join(project_root, "docker", "ssrf_proxy")
# Start container
cmd = [
"docker",
"run",
"-d",
"--name",
self.container_name,
"-p",
f"{self.proxy_port}:{self.proxy_port}",
"-p",
"8194:8194",
"-v",
f"{docker_config_dir}/squid.conf.template:/etc/squid/squid.conf.template:ro",
"-v",
f"{docker_config_dir}/docker-entrypoint.sh:/docker-entrypoint-mount.sh:ro",
"-e",
f"HTTP_PORT={self.proxy_port}",
"-e",
"COREDUMP_DIR=/var/spool/squid",
"-e",
"REVERSE_PROXY_PORT=8194",
"-e",
"SANDBOX_HOST=sandbox",
"-e",
"SANDBOX_PORT=8194",
"--entrypoint",
"sh",
self.image,
"-c",
"cp /docker-entrypoint-mount.sh /docker-entrypoint.sh && sed -i 's/\\r$//' /docker-entrypoint.sh && chmod +x /docker-entrypoint.sh && /docker-entrypoint.sh", # noqa: E501
]
# Add conf.d mount if directory exists
conf_d_path = f"{docker_config_dir}/conf.d"
if os.path.exists(conf_d_path) and os.listdir(conf_d_path):
cmd.insert(-3, "-v")
cmd.insert(-3, f"{conf_d_path}:/etc/squid/conf.d:ro")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"{Colors.RED}Failed to start container: {result.stderr}{Colors.NC}")
return False
# Wait for proxy to start
print(f"{Colors.YELLOW}Waiting for proxy to start...{Colors.NC}")
time.sleep(5)
# Check if container is running
result = subprocess.run(
["docker", "ps", "--filter", f"name={self.container_name}"],
capture_output=True,
text=True,
)
if self.container_name not in result.stdout:
print(f"{Colors.RED}Container failed to start!{Colors.NC}")
logs = subprocess.run(["docker", "logs", self.container_name], capture_output=True, text=True)
print(logs.stdout)
return False
print(f"{Colors.GREEN}Proxy started successfully!{Colors.NC}\n")
return True
def stop_proxy_container(self):
"""Stop and remove the proxy container"""
_ = subprocess.run(["docker", "stop", self.container_name], capture_output=True, text=True)
_ = subprocess.run(["docker", "rm", self.container_name], capture_output=True, text=True)
def test_url(self, test_case: TestCase) -> TestResult:
"""Test a single URL through the proxy"""
# Configure proxy for urllib
proxy_handler = urllib.request.ProxyHandler({"http": self.proxy_url, "https": self.proxy_url})
opener = urllib.request.build_opener(proxy_handler)
try:
# Make request through proxy
request = urllib.request.Request(test_case.url)
with opener.open(request, timeout=5):
# If we got a response, the request was allowed
is_blocked = False
except urllib.error.HTTPError as e:
# HTTP errors like 403 from proxy mean blocked
if e.code in [403, 407]:
is_blocked = True
else:
# Other HTTP errors mean the request went through
is_blocked = False
except (urllib.error.URLError, OSError, TimeoutError):
# Connection errors mean blocked by proxy
is_blocked = True
except Exception as e:
# Unexpected error
print(f"{Colors.YELLOW}Warning: Unexpected error testing {test_case.url}: {e}{Colors.NC}")
return TestResult.SKIPPED
# Check if result matches expectation
if is_blocked == test_case.expected_blocked:
return TestResult.PASSED
else:
return TestResult.FAILED
def run_test(self, test_case: TestCase):
"""Run a single test and record result"""
result = self.test_url(test_case)
# Print result
if result == TestResult.PASSED:
symbol = f"{Colors.GREEN}{Colors.NC}"
elif result == TestResult.FAILED:
symbol = f"{Colors.RED}{Colors.NC}"
else:
symbol = f"{Colors.YELLOW}{Colors.NC}"
status = "blocked" if test_case.expected_blocked else "allowed"
print(f" {symbol} {test_case.name} (should be {status})")
# Record result
self.results.append(
{
"name": test_case.name,
"category": test_case.category,
"url": test_case.url,
"expected_blocked": test_case.expected_blocked,
"result": result.value,
"description": test_case.description,
}
)
def run_all_tests(self):
"""Run all test cases"""
test_cases = self.get_test_cases()
print("=" * 50)
print(" SSRF Proxy Test Suite")
print("=" * 50)
# Group tests by category
categories: dict[str, list[TestCase]] = {}
for test in test_cases:
if test.category not in categories:
categories[test.category] = []
categories[test.category].append(test)
# Run tests by category
for category, tests in categories.items():
print(f"\n{Colors.YELLOW}{category}:{Colors.NC}")
for test in tests:
self.run_test(test)
def load_test_cases_from_yaml(self, yaml_file: str = "test_cases.yaml") -> list[TestCase]:
"""Load test cases from YAML configuration file"""
try:
# Try to load from YAML file
yaml_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), yaml_file)
with open(yaml_path) as f:
config = yaml.safe_load(f) # pyright: ignore[reportAny]
test_cases: list[TestCase] = []
# Parse test categories and cases from YAML
test_categories = config.get("test_categories", {}) # pyright: ignore[reportAny]
for category_key, category_data in test_categories.items(): # pyright: ignore[reportAny]
category_name: str = str(category_data.get("name", category_key)) # pyright: ignore[reportAny]
test_cases_list = category_data.get("test_cases", []) # pyright: ignore[reportAny]
for test_data in test_cases_list: # pyright: ignore[reportAny]
test_case = TestCase(
name=str(test_data["name"]), # pyright: ignore[reportAny]
url=str(test_data["url"]), # pyright: ignore[reportAny]
expected_blocked=bool(test_data["expected_blocked"]), # pyright: ignore[reportAny]
category=category_name,
description=str(test_data.get("description", "")), # pyright: ignore[reportAny]
)
test_cases.append(test_case)
if test_cases:
print(f"{Colors.BLUE}Loaded {len(test_cases)} test cases from {yaml_file}{Colors.NC}")
return test_cases
else:
print(f"{Colors.YELLOW}No test cases found in {yaml_file}, using defaults{Colors.NC}")
return self.get_default_test_cases()
except FileNotFoundError:
print(f"{Colors.YELLOW}Test case file {yaml_file} not found, using defaults{Colors.NC}")
return self.get_default_test_cases()
except yaml.YAMLError as e:
print(f"{Colors.YELLOW}Error parsing {yaml_file}: {e}, using defaults{Colors.NC}")
return self.get_default_test_cases()
except Exception as e:
print(f"{Colors.YELLOW}Unexpected error loading {yaml_file}: {e}, using defaults{Colors.NC}")
return self.get_default_test_cases()
def get_default_test_cases(self) -> list[TestCase]:
"""Fallback test cases if YAML loading fails"""
return [
# Essential test cases as fallback
TestCase("Loopback", "http://127.0.0.1", True, "Private Networks", "IPv4 loopback"),
TestCase("Private Network", "http://192.168.1.1", True, "Private Networks", "RFC 1918"),
TestCase("AWS Metadata", "http://169.254.169.254", True, "Cloud Metadata", "AWS metadata"),
TestCase("Public Site", "http://example.com", False, "Public Internet", "Public website"),
TestCase("Port 8080", "http://example.com:8080", True, "Port Restrictions", "Non-standard port"),
]
def get_test_cases(self) -> list[TestCase]:
"""Get all test cases from YAML or defaults"""
return self.load_test_cases_from_yaml(self.test_file)
def print_summary(self):
"""Print test results summary"""
passed = sum(1 for r in self.results if r["result"] == "passed")
failed = sum(1 for r in self.results if r["result"] == "failed")
skipped = sum(1 for r in self.results if r["result"] == "skipped")
print("\n" + "=" * 50)
print(" Test Summary")
print("=" * 50)
print(f"Tests Passed: {Colors.GREEN}{passed}{Colors.NC}")
print(f"Tests Failed: {Colors.RED}{failed}{Colors.NC}")
if skipped > 0:
print(f"Tests Skipped: {Colors.YELLOW}{skipped}{Colors.NC}")
if failed == 0:
print(f"\n{Colors.GREEN}✓ All tests passed! SSRF proxy is configured correctly.{Colors.NC}")
else:
print(f"\n{Colors.RED}✗ Some tests failed. Please review the configuration.{Colors.NC}")
print("\nFailed tests:")
for r in self.results:
if r["result"] == "failed":
status = "should be blocked" if r["expected_blocked"] else "should be allowed"
print(f" - {r['name']} ({status}): {r['url']}")
return failed == 0
def save_results(self, filename: str = "test_results.json"):
"""Save test results to JSON file"""
with open(filename, "w") as f:
json.dump(
{
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"proxy_url": self.proxy_url,
"results": self.results,
},
f,
indent=2,
)
print(f"\nResults saved to {filename}")
def main():
@dataclass
class Args:
host: str = "localhost"
port: int = 3128
no_container: bool = False
save_results: bool = False
test_file: str | None = None
list_tests: bool = False
def parse_args() -> Args:
parser = argparse.ArgumentParser(description="Test SSRF Proxy Configuration")
_ = parser.add_argument("--host", type=str, default="localhost", help="Proxy host (default: localhost)")
_ = parser.add_argument("--port", type=int, default=3128, help="Proxy port (default: 3128)")
_ = parser.add_argument(
"--no-container",
action="store_true",
help="Don't start container (assume proxy is already running)",
)
_ = parser.add_argument("--save-results", action="store_true", help="Save test results to JSON file")
_ = parser.add_argument(
"--test-file", type=str, help="Path to YAML file containing test cases (default: test_cases.yaml)"
)
_ = parser.add_argument("--list-tests", action="store_true", help="List all test cases without running them")
# Parse arguments - argparse.Namespace has Any-typed attributes
# This is a known limitation of argparse in Python's type system
namespace = parser.parse_args()
# Convert namespace attributes to properly typed values
# argparse guarantees these attributes exist with the correct types
# based on our argument definitions, but the type system cannot verify this
return Args(
host=str(namespace.host), # pyright: ignore[reportAny]
port=int(namespace.port), # pyright: ignore[reportAny]
no_container=bool(namespace.no_container), # pyright: ignore[reportAny]
save_results=bool(namespace.save_results), # pyright: ignore[reportAny]
test_file=namespace.test_file if namespace.test_file else None, # pyright: ignore[reportAny]
list_tests=bool(namespace.list_tests), # pyright: ignore[reportAny]
)
args = parse_args()
tester = SSRFProxyTester(args.host, args.port, args.test_file)
# If --list-tests flag is set, just list the tests and exit
if args.list_tests:
test_cases = tester.get_test_cases()
print("\n" + "=" * 50)
print(" Available Test Cases")
print("=" * 50)
# Group by category for display
categories: dict[str, list[TestCase]] = {}
for test in test_cases:
if test.category not in categories:
categories[test.category] = []
categories[test.category].append(test)
for category, tests in categories.items():
print(f"\n{Colors.YELLOW}{category}:{Colors.NC}")
for test in tests:
blocked_status = "BLOCK" if test.expected_blocked else "ALLOW"
color = Colors.RED if test.expected_blocked else Colors.GREEN
print(f" {color}[{blocked_status}]{Colors.NC} {test.name}")
if test.description:
print(f" {test.description}")
print(f" URL: {test.url}")
print(f"\nTotal: {len(test_cases)} test cases")
sys.exit(0)
try:
# Start container unless --no-container flag is set
if not args.no_container:
if not tester.start_proxy_container():
sys.exit(1)
# Run tests
tester.run_all_tests()
# Print summary
success = tester.print_summary()
# Save results if requested
if args.save_results:
tester.save_results()
# Exit with appropriate code
sys.exit(0 if success else 1)
finally:
# Cleanup
if not args.no_container:
print(f"\n{Colors.YELLOW}Cleaning up...{Colors.NC}")
tester.stop_proxy_container()
if __name__ == "__main__":
main()