mirror of
https://github.com/langgenius/dify.git
synced 2026-04-19 18:27:27 +08:00
fix: align file factory imports and CI fixtures
This commit is contained in:
@ -2,30 +2,17 @@
|
||||
|
||||
This package normalizes workflow-layer file payloads into graph-layer ``File``
|
||||
values. It keeps tenancy and ownership checks in the application layer and
|
||||
preserves the historical ``factories.file_factory`` import surface for callers.
|
||||
exports the workflow-facing file builders for callers.
|
||||
"""
|
||||
|
||||
from core.helper import ssrf_proxy
|
||||
from dify_graph.file import File, FileTransferMethod, FileType, FileUploadConfig
|
||||
from extensions.ext_database import db
|
||||
|
||||
from .builders import build_from_mapping, build_from_mappings
|
||||
from .message_files import build_from_message_file, build_from_message_files
|
||||
from .remote import _extract_filename, _get_remote_file_info
|
||||
from .storage_keys import StorageKeyLoader
|
||||
|
||||
__all__ = [
|
||||
"File",
|
||||
"FileTransferMethod",
|
||||
"FileType",
|
||||
"FileUploadConfig",
|
||||
"StorageKeyLoader",
|
||||
"_extract_filename",
|
||||
"_get_remote_file_info",
|
||||
"build_from_mapping",
|
||||
"build_from_mappings",
|
||||
"build_from_message_file",
|
||||
"build_from_message_files",
|
||||
"db",
|
||||
"ssrf_proxy",
|
||||
]
|
||||
|
||||
@ -17,7 +17,7 @@ from extensions.ext_database import db
|
||||
from models import ToolFile, UploadFile
|
||||
|
||||
from .common import resolve_mapping_file_id
|
||||
from .remote import _get_remote_file_info
|
||||
from .remote import get_remote_file_info
|
||||
from .validation import is_file_valid_with_config
|
||||
|
||||
|
||||
@ -209,7 +209,7 @@ def _build_from_remote_url(
|
||||
if not url:
|
||||
raise ValueError("Invalid file url")
|
||||
|
||||
mime_type, filename, file_size = _get_remote_file_info(url)
|
||||
mime_type, filename, file_size = get_remote_file_info(url)
|
||||
extension = mimetypes.guess_extension(mime_type) or ("." + filename.split(".")[-1] if "." in filename else ".bin")
|
||||
detected_file_type = standardize_file_type(extension=extension, mime_type=mime_type)
|
||||
file_type = _resolve_file_type(
|
||||
|
||||
@ -1,4 +1,9 @@
|
||||
"""Remote file metadata helpers used by workflow file normalization."""
|
||||
"""Remote file metadata helpers used by workflow file normalization.
|
||||
|
||||
These helpers are part of the ``factories.file_factory`` package surface
|
||||
because both workflow builders and tests rely on the same RFC5987 filename
|
||||
parsing and HEAD-response normalization rules.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
@ -14,7 +19,8 @@ from werkzeug.http import parse_options_header
|
||||
from core.helper import ssrf_proxy
|
||||
|
||||
|
||||
def _extract_filename(url_path: str, content_disposition: str | None) -> str | None:
|
||||
def extract_filename(url_path: str, content_disposition: str | None) -> str | None:
|
||||
"""Extract a safe filename from Content-Disposition or the request URL path."""
|
||||
filename: str | None = None
|
||||
if content_disposition:
|
||||
filename_star_match = re.search(r"filename\*=([^;]+)", content_disposition)
|
||||
@ -57,7 +63,8 @@ def _guess_mime_type(filename: str) -> str:
|
||||
return guessed_mime or ""
|
||||
|
||||
|
||||
def _get_remote_file_info(url: str) -> tuple[str, str, int]:
|
||||
def get_remote_file_info(url: str) -> tuple[str, str, int]:
|
||||
"""Resolve remote file metadata with SSRF-safe HEAD probing."""
|
||||
file_size = -1
|
||||
parsed_url = urllib.parse.urlparse(url)
|
||||
url_path = parsed_url.path
|
||||
@ -67,7 +74,7 @@ def _get_remote_file_info(url: str) -> tuple[str, str, int]:
|
||||
resp = ssrf_proxy.head(url, follow_redirects=True)
|
||||
if resp.status_code == httpx.codes.OK:
|
||||
content_disposition = resp.headers.get("Content-Disposition")
|
||||
extracted_filename = _extract_filename(url_path, content_disposition)
|
||||
extracted_filename = extract_filename(url_path, content_disposition)
|
||||
if extracted_filename:
|
||||
filename = extracted_filename
|
||||
mime_type = _guess_mime_type(filename)
|
||||
|
||||
@ -33,6 +33,9 @@ from extensions.ext_database import db
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_SANDBOX_TEST_IMAGE = "langgenius/dify-sandbox:0.2.12"
|
||||
SANDBOX_TEST_IMAGE_ENV = "DIFY_SANDBOX_TEST_IMAGE"
|
||||
|
||||
|
||||
class _CloserProtocol(Protocol):
|
||||
"""_Closer is any type which implement the close() method."""
|
||||
@ -166,7 +169,8 @@ class DifyTestContainers:
|
||||
# Start Dify Sandbox container for code execution environment
|
||||
# Dify Sandbox provides a secure environment for executing user code
|
||||
logger.info("Initializing Dify Sandbox container...")
|
||||
self.dify_sandbox = DockerContainer(image="langgenius/dify-sandbox:latest").with_network(self.network)
|
||||
sandbox_image = os.getenv(SANDBOX_TEST_IMAGE_ENV, DEFAULT_SANDBOX_TEST_IMAGE)
|
||||
self.dify_sandbox = DockerContainer(image=sandbox_image).with_network(self.network)
|
||||
self.dify_sandbox.with_exposed_ports(8194)
|
||||
self.dify_sandbox.env = {
|
||||
"API_KEY": "test_api_key",
|
||||
@ -176,7 +180,12 @@ class DifyTestContainers:
|
||||
sandbox_port = self.dify_sandbox.get_exposed_port(8194)
|
||||
os.environ["CODE_EXECUTION_ENDPOINT"] = f"http://{sandbox_host}:{sandbox_port}"
|
||||
os.environ["CODE_EXECUTION_API_KEY"] = "test_api_key"
|
||||
logger.info("Dify Sandbox container started successfully - Host: %s, Port: %s", sandbox_host, sandbox_port)
|
||||
logger.info(
|
||||
"Dify Sandbox container started successfully - Image: %s Host: %s, Port: %s",
|
||||
sandbox_image,
|
||||
sandbox_host,
|
||||
sandbox_port,
|
||||
)
|
||||
|
||||
# Wait for Dify Sandbox to be ready
|
||||
logger.info("Waiting for Dify Sandbox to be ready to accept connections...")
|
||||
|
||||
@ -139,9 +139,9 @@ def test_webhook_node_file_conversion_to_file_variable():
|
||||
|
||||
node = create_webhook_node(data, variable_pool)
|
||||
|
||||
# Mock the file factory and variable factory
|
||||
# Mock the file reference boundary and variable factory
|
||||
with (
|
||||
patch("factories.file_factory.build_from_mapping") as mock_file_factory,
|
||||
patch.object(node._file_reference_factory, "build_from_mapping") as mock_file_factory,
|
||||
patch("core.workflow.nodes.trigger_webhook.node.build_segment_with_type") as mock_segment_factory,
|
||||
patch("core.workflow.nodes.trigger_webhook.node.FileVariable") as mock_file_variable,
|
||||
):
|
||||
@ -166,7 +166,6 @@ def test_webhook_node_file_conversion_to_file_variable():
|
||||
# Verify file factory was called with correct parameters
|
||||
mock_file_factory.assert_called_once_with(
|
||||
mapping=expected_factory_mapping(file_dict),
|
||||
tenant_id="test-tenant",
|
||||
)
|
||||
|
||||
# Verify segment factory was called to create FileSegment
|
||||
@ -329,7 +328,7 @@ def test_webhook_node_file_conversion_mixed_parameters():
|
||||
node = create_webhook_node(data, variable_pool)
|
||||
|
||||
with (
|
||||
patch("factories.file_factory.build_from_mapping") as mock_file_factory,
|
||||
patch.object(node._file_reference_factory, "build_from_mapping") as mock_file_factory,
|
||||
patch("core.workflow.nodes.trigger_webhook.node.build_segment_with_type") as mock_segment_factory,
|
||||
patch("core.workflow.nodes.trigger_webhook.node.FileVariable") as mock_file_variable,
|
||||
):
|
||||
@ -359,7 +358,6 @@ def test_webhook_node_file_conversion_mixed_parameters():
|
||||
# Verify file conversion was called
|
||||
mock_file_factory.assert_called_once_with(
|
||||
mapping=expected_factory_mapping(file_dict),
|
||||
tenant_id="test-tenant",
|
||||
)
|
||||
|
||||
|
||||
@ -396,7 +394,7 @@ def test_webhook_node_different_file_types():
|
||||
node = create_webhook_node(data, variable_pool)
|
||||
|
||||
with (
|
||||
patch("factories.file_factory.build_from_mapping") as mock_file_factory,
|
||||
patch.object(node._file_reference_factory, "build_from_mapping") as mock_file_factory,
|
||||
patch("core.workflow.nodes.trigger_webhook.node.build_segment_with_type") as mock_segment_factory,
|
||||
patch("core.workflow.nodes.trigger_webhook.node.FileVariable") as mock_file_variable,
|
||||
):
|
||||
|
||||
@ -266,10 +266,10 @@ def test_webhook_node_run_with_file_params():
|
||||
)
|
||||
|
||||
node = create_webhook_node(data, variable_pool)
|
||||
# Mock the file factory to avoid DB-dependent validation on upload_file_id
|
||||
with patch("factories.file_factory.build_from_mapping") as mock_file_factory:
|
||||
# Mock the node's file reference boundary to avoid DB-dependent validation on upload_file_id
|
||||
with patch.object(node._file_reference_factory, "build_from_mapping") as mock_file_factory:
|
||||
|
||||
def _to_file(mapping, tenant_id, config=None, strict_type_validation=False):
|
||||
def _to_file(*, mapping):
|
||||
return File.model_validate(mapping)
|
||||
|
||||
mock_file_factory.side_effect = _to_file
|
||||
@ -314,10 +314,10 @@ def test_webhook_node_run_mixed_parameters():
|
||||
)
|
||||
|
||||
node = create_webhook_node(data, variable_pool)
|
||||
# Mock the file factory to avoid DB-dependent validation on upload_file_id
|
||||
with patch("factories.file_factory.build_from_mapping") as mock_file_factory:
|
||||
# Mock the node's file reference boundary to avoid DB-dependent validation on upload_file_id
|
||||
with patch.object(node._file_reference_factory, "build_from_mapping") as mock_file_factory:
|
||||
|
||||
def _to_file(mapping, tenant_id, config=None, strict_type_validation=False):
|
||||
def _to_file(*, mapping):
|
||||
return File.model_validate(mapping)
|
||||
|
||||
mock_file_factory.side_effect = _to_file
|
||||
|
||||
@ -23,7 +23,8 @@ from dify_graph.variables.variables import StringVariable
|
||||
def _mock_ssrf_head(monkeypatch):
|
||||
"""Avoid any real network requests during tests.
|
||||
|
||||
file_factory._get_remote_file_info() uses ssrf_proxy.head to inspect
|
||||
factories.file_factory.remote.get_remote_file_info() uses ssrf_proxy.head
|
||||
to inspect
|
||||
remote files. We stub it to return a minimal response object with
|
||||
headers so filename/mime/size can be derived deterministically.
|
||||
"""
|
||||
|
||||
@ -7,15 +7,8 @@ from httpx import Response
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom
|
||||
from core.app.file_access import DatabaseFileAccessController, FileAccessScope, bind_file_access_scope
|
||||
from core.workflow.file_reference import build_file_reference, resolve_file_record_id
|
||||
from factories.file_factory import (
|
||||
File,
|
||||
FileTransferMethod,
|
||||
FileType,
|
||||
FileUploadConfig,
|
||||
)
|
||||
from factories.file_factory import (
|
||||
build_from_mapping as _build_from_mapping,
|
||||
)
|
||||
from dify_graph.file import File, FileTransferMethod, FileType, FileUploadConfig
|
||||
from factories.file_factory.builders import build_from_mapping as _build_from_mapping
|
||||
from models import ToolFile, UploadFile
|
||||
|
||||
# Test Data
|
||||
@ -56,7 +49,7 @@ def mock_upload_file():
|
||||
mock.source_url = TEST_REMOTE_URL
|
||||
mock.size = 1024
|
||||
mock.key = "test_key"
|
||||
with patch("factories.file_factory.db.session.scalar", return_value=mock, autospec=True) as m:
|
||||
with patch("factories.file_factory.builders.db.session.scalar", return_value=mock, autospec=True) as m:
|
||||
yield m
|
||||
|
||||
|
||||
@ -70,7 +63,7 @@ def mock_tool_file():
|
||||
mock.mimetype = "application/pdf"
|
||||
mock.original_url = "http://example.com/tool.pdf"
|
||||
mock.size = 2048
|
||||
with patch("factories.file_factory.db.session.scalar", return_value=mock, autospec=True):
|
||||
with patch("factories.file_factory.builders.db.session.scalar", return_value=mock, autospec=True):
|
||||
yield mock
|
||||
|
||||
|
||||
@ -86,7 +79,7 @@ def mock_http_head():
|
||||
},
|
||||
)
|
||||
|
||||
with patch("factories.file_factory.ssrf_proxy.head", autospec=True) as mock_head:
|
||||
with patch("factories.file_factory.remote.ssrf_proxy.head", autospec=True) as mock_head:
|
||||
mock_head.return_value = _mock_response("remote_test.jpg", 2048, "image/jpeg")
|
||||
yield mock_head
|
||||
|
||||
@ -234,7 +227,7 @@ def test_build_from_remote_url_without_strict_validation(mock_http_head):
|
||||
|
||||
def test_tool_file_not_found():
|
||||
"""Test ToolFile not found in database."""
|
||||
with patch("factories.file_factory.db.session.scalar", return_value=None, autospec=True):
|
||||
with patch("factories.file_factory.builders.db.session.scalar", return_value=None, autospec=True):
|
||||
mapping = tool_file_mapping()
|
||||
with pytest.raises(ValueError, match=f"ToolFile {TEST_TOOL_FILE_ID} not found"):
|
||||
build_from_mapping(mapping=mapping, tenant_id=TEST_TENANT_ID)
|
||||
@ -242,7 +235,7 @@ def test_tool_file_not_found():
|
||||
|
||||
def test_local_file_not_found():
|
||||
"""Test UploadFile not found in database."""
|
||||
with patch("factories.file_factory.db.session.scalar", return_value=None, autospec=True):
|
||||
with patch("factories.file_factory.builders.db.session.scalar", return_value=None, autospec=True):
|
||||
mapping = local_file_mapping()
|
||||
with pytest.raises(ValueError, match="Invalid upload file"):
|
||||
build_from_mapping(mapping=mapping, tenant_id=TEST_TENANT_ID)
|
||||
@ -314,7 +307,7 @@ def test_tenant_mismatch():
|
||||
mock_file.key = "test_key"
|
||||
|
||||
# Mock the database query to return None (no file found for this tenant)
|
||||
with patch("factories.file_factory.db.session.scalar", return_value=None, autospec=True):
|
||||
with patch("factories.file_factory.builders.db.session.scalar", return_value=None, autospec=True):
|
||||
mapping = local_file_mapping()
|
||||
with pytest.raises(ValueError, match="Invalid upload file"):
|
||||
build_from_mapping(mapping=mapping, tenant_id=TEST_TENANT_ID)
|
||||
@ -353,7 +346,7 @@ def test_build_from_mapping_scopes_tool_file_to_end_user():
|
||||
invoke_from=InvokeFrom.WEB_APP,
|
||||
)
|
||||
|
||||
with patch("factories.file_factory.db.session.scalar", return_value=tool_file, autospec=True) as scalar:
|
||||
with patch("factories.file_factory.builders.db.session.scalar", return_value=tool_file, autospec=True) as scalar:
|
||||
with bind_file_access_scope(scope):
|
||||
build_from_mapping(mapping=tool_file_mapping(), tenant_id=TEST_TENANT_ID)
|
||||
|
||||
|
||||
@ -2,7 +2,7 @@ import re
|
||||
|
||||
import pytest
|
||||
|
||||
from factories.file_factory import _extract_filename, _get_remote_file_info
|
||||
from factories.file_factory.remote import extract_filename, get_remote_file_info
|
||||
|
||||
|
||||
class _FakeResponse:
|
||||
@ -15,11 +15,11 @@ def _mock_head(monkeypatch: pytest.MonkeyPatch, headers: dict[str, str], status_
|
||||
def _fake_head(url: str, follow_redirects: bool = True):
|
||||
return _FakeResponse(status_code=status_code, headers=headers)
|
||||
|
||||
monkeypatch.setattr("factories.file_factory.ssrf_proxy.head", _fake_head)
|
||||
monkeypatch.setattr("factories.file_factory.remote.ssrf_proxy.head", _fake_head)
|
||||
|
||||
|
||||
class TestGetRemoteFileInfo:
|
||||
"""Tests for _get_remote_file_info focusing on filename extraction rules."""
|
||||
"""Tests for get_remote_file_info focusing on filename extraction rules."""
|
||||
|
||||
def test_inline_no_filename(self, monkeypatch: pytest.MonkeyPatch):
|
||||
_mock_head(
|
||||
@ -30,7 +30,7 @@ class TestGetRemoteFileInfo:
|
||||
"Content-Length": "123",
|
||||
},
|
||||
)
|
||||
mime_type, filename, size = _get_remote_file_info("http://example.com/some/path/file.pdf")
|
||||
mime_type, filename, size = get_remote_file_info("http://example.com/some/path/file.pdf")
|
||||
assert filename == "file.pdf"
|
||||
assert mime_type == "application/pdf"
|
||||
assert size == 123
|
||||
@ -44,7 +44,7 @@ class TestGetRemoteFileInfo:
|
||||
"Content-Length": "456",
|
||||
},
|
||||
)
|
||||
mime_type, filename, size = _get_remote_file_info("http://example.com/downloads/data.bin")
|
||||
mime_type, filename, size = get_remote_file_info("http://example.com/downloads/data.bin")
|
||||
assert filename == "data.bin"
|
||||
assert mime_type == "application/octet-stream"
|
||||
assert size == 456
|
||||
@ -58,7 +58,7 @@ class TestGetRemoteFileInfo:
|
||||
"Content-Length": "789",
|
||||
},
|
||||
)
|
||||
mime_type, filename, size = _get_remote_file_info("http://example.com/ignored")
|
||||
mime_type, filename, size = get_remote_file_info("http://example.com/ignored")
|
||||
assert filename == "file name.jpg"
|
||||
assert mime_type == "image/jpeg"
|
||||
assert size == 789
|
||||
@ -71,7 +71,7 @@ class TestGetRemoteFileInfo:
|
||||
"Content-Type": "image/jpeg",
|
||||
},
|
||||
)
|
||||
mime_type, filename, _ = _get_remote_file_info("http://example.com/ignored")
|
||||
mime_type, filename, _ = get_remote_file_info("http://example.com/ignored")
|
||||
assert filename == "file name.jpg"
|
||||
assert mime_type == "image/jpeg"
|
||||
|
||||
@ -83,7 +83,7 @@ class TestGetRemoteFileInfo:
|
||||
"Content-Type": "image/jpeg",
|
||||
},
|
||||
)
|
||||
mime_type, filename, _ = _get_remote_file_info("http://example.com/ignored")
|
||||
mime_type, filename, _ = get_remote_file_info("http://example.com/ignored")
|
||||
assert filename == "测试文件.jpg"
|
||||
assert mime_type == "image/jpeg"
|
||||
|
||||
@ -96,7 +96,7 @@ class TestGetRemoteFileInfo:
|
||||
"Content-Length": "12",
|
||||
},
|
||||
)
|
||||
mime_type, filename, size = _get_remote_file_info("http://example.com/static/file.txt")
|
||||
mime_type, filename, size = get_remote_file_info("http://example.com/static/file.txt")
|
||||
assert filename == "file.txt"
|
||||
assert mime_type == "text/plain"
|
||||
assert size == 12
|
||||
@ -109,106 +109,106 @@ class TestGetRemoteFileInfo:
|
||||
"Content-Type": "application/octet-stream",
|
||||
},
|
||||
)
|
||||
mime_type, filename, _ = _get_remote_file_info("http://example.com/test/")
|
||||
mime_type, filename, _ = get_remote_file_info("http://example.com/test/")
|
||||
# Should generate a random hex filename with .bin extension
|
||||
assert re.match(r"^[0-9a-f]{32}\.bin$", filename) is not None
|
||||
assert mime_type == "application/octet-stream"
|
||||
|
||||
|
||||
class TestExtractFilename:
|
||||
"""Tests for _extract_filename function focusing on RFC5987 parsing and security."""
|
||||
"""Tests for extract_filename focusing on RFC5987 parsing and security."""
|
||||
|
||||
def test_no_content_disposition_uses_url_basename(self):
|
||||
"""Test that URL basename is used when no Content-Disposition header."""
|
||||
result = _extract_filename("http://example.com/path/file.txt", None)
|
||||
result = extract_filename("http://example.com/path/file.txt", None)
|
||||
assert result == "file.txt"
|
||||
|
||||
def test_no_content_disposition_with_percent_encoded_url(self):
|
||||
"""Test that percent-encoded URL basename is decoded."""
|
||||
result = _extract_filename("http://example.com/path/file%20name.txt", None)
|
||||
result = extract_filename("http://example.com/path/file%20name.txt", None)
|
||||
assert result == "file name.txt"
|
||||
|
||||
def test_no_content_disposition_empty_url_path(self):
|
||||
"""Test that empty URL path returns None."""
|
||||
result = _extract_filename("http://example.com/", None)
|
||||
result = extract_filename("http://example.com/", None)
|
||||
assert result is None
|
||||
|
||||
def test_simple_filename_header(self):
|
||||
"""Test basic filename extraction from Content-Disposition."""
|
||||
result = _extract_filename("http://example.com/", 'attachment; filename="test.txt"')
|
||||
result = extract_filename("http://example.com/", 'attachment; filename="test.txt"')
|
||||
assert result == "test.txt"
|
||||
|
||||
def test_quoted_filename_with_spaces(self):
|
||||
"""Test filename with spaces in quotes."""
|
||||
result = _extract_filename("http://example.com/", 'attachment; filename="my file.txt"')
|
||||
result = extract_filename("http://example.com/", 'attachment; filename="my file.txt"')
|
||||
assert result == "my file.txt"
|
||||
|
||||
def test_unquoted_filename(self):
|
||||
"""Test unquoted filename."""
|
||||
result = _extract_filename("http://example.com/", "attachment; filename=test.txt")
|
||||
result = extract_filename("http://example.com/", "attachment; filename=test.txt")
|
||||
assert result == "test.txt"
|
||||
|
||||
def test_percent_encoded_filename(self):
|
||||
"""Test percent-encoded filename."""
|
||||
result = _extract_filename("http://example.com/", 'attachment; filename="file%20name.txt"')
|
||||
result = extract_filename("http://example.com/", 'attachment; filename="file%20name.txt"')
|
||||
assert result == "file name.txt"
|
||||
|
||||
def test_rfc5987_filename_star_utf8(self):
|
||||
"""Test RFC5987 filename* with UTF-8 encoding."""
|
||||
result = _extract_filename("http://example.com/", "attachment; filename*=UTF-8''file%20name.txt")
|
||||
result = extract_filename("http://example.com/", "attachment; filename*=UTF-8''file%20name.txt")
|
||||
assert result == "file name.txt"
|
||||
|
||||
def test_rfc5987_filename_star_chinese(self):
|
||||
"""Test RFC5987 filename* with Chinese characters."""
|
||||
result = _extract_filename(
|
||||
result = extract_filename(
|
||||
"http://example.com/", "attachment; filename*=UTF-8''%E6%B5%8B%E8%AF%95%E6%96%87%E4%BB%B6.txt"
|
||||
)
|
||||
assert result == "测试文件.txt"
|
||||
|
||||
def test_rfc5987_filename_star_with_language(self):
|
||||
"""Test RFC5987 filename* with language tag."""
|
||||
result = _extract_filename("http://example.com/", "attachment; filename*=UTF-8'en'file%20name.txt")
|
||||
result = extract_filename("http://example.com/", "attachment; filename*=UTF-8'en'file%20name.txt")
|
||||
assert result == "file name.txt"
|
||||
|
||||
def test_rfc5987_filename_star_fallback_charset(self):
|
||||
"""Test RFC5987 filename* with fallback charset."""
|
||||
result = _extract_filename("http://example.com/", "attachment; filename*=''file%20name.txt")
|
||||
result = extract_filename("http://example.com/", "attachment; filename*=''file%20name.txt")
|
||||
assert result == "file name.txt"
|
||||
|
||||
def test_rfc5987_filename_star_malformed_fallback(self):
|
||||
"""Test RFC5987 filename* with malformed format falls back to simple unquote."""
|
||||
result = _extract_filename("http://example.com/", "attachment; filename*=malformed%20filename.txt")
|
||||
result = extract_filename("http://example.com/", "attachment; filename*=malformed%20filename.txt")
|
||||
assert result == "malformed filename.txt"
|
||||
|
||||
def test_filename_star_takes_precedence_over_filename(self):
|
||||
"""Test that filename* takes precedence over filename."""
|
||||
test_string = 'attachment; filename="old.txt"; filename*=UTF-8\'\'new.txt"'
|
||||
result = _extract_filename("http://example.com/", test_string)
|
||||
result = extract_filename("http://example.com/", test_string)
|
||||
assert result == "new.txt"
|
||||
|
||||
def test_path_injection_protection(self):
|
||||
"""Test that path injection attempts are blocked by os.path.basename."""
|
||||
result = _extract_filename("http://example.com/", 'attachment; filename="../../../etc/passwd"')
|
||||
result = extract_filename("http://example.com/", 'attachment; filename="../../../etc/passwd"')
|
||||
assert result == "passwd"
|
||||
|
||||
def test_path_injection_protection_rfc5987(self):
|
||||
"""Test that path injection attempts in RFC5987 are blocked."""
|
||||
result = _extract_filename("http://example.com/", "attachment; filename*=UTF-8''..%2F..%2F..%2Fetc%2Fpasswd")
|
||||
result = extract_filename("http://example.com/", "attachment; filename*=UTF-8''..%2F..%2F..%2Fetc%2Fpasswd")
|
||||
assert result == "passwd"
|
||||
|
||||
def test_empty_filename_returns_none(self):
|
||||
"""Test that empty filename returns None."""
|
||||
result = _extract_filename("http://example.com/", 'attachment; filename=""')
|
||||
result = extract_filename("http://example.com/", 'attachment; filename=""')
|
||||
assert result is None
|
||||
|
||||
def test_whitespace_only_filename_returns_none(self):
|
||||
"""Test that whitespace-only filename returns None."""
|
||||
result = _extract_filename("http://example.com/", 'attachment; filename=" "')
|
||||
result = extract_filename("http://example.com/", 'attachment; filename=" "')
|
||||
assert result is None
|
||||
|
||||
def test_complex_rfc5987_encoding(self):
|
||||
"""Test complex RFC5987 encoding with special characters."""
|
||||
result = _extract_filename(
|
||||
result = extract_filename(
|
||||
"http://example.com/",
|
||||
"attachment; filename*=UTF-8''%E4%B8%AD%E6%96%87%E6%96%87%E4%BB%B6%20%28%E5%89%AF%E6%9C%AC%29.pdf",
|
||||
)
|
||||
@ -216,17 +216,17 @@ class TestExtractFilename:
|
||||
|
||||
def test_iso8859_1_encoding(self):
|
||||
"""Test ISO-8859-1 encoding in RFC5987."""
|
||||
result = _extract_filename("http://example.com/", "attachment; filename*=ISO-8859-1''file%20name.txt")
|
||||
result = extract_filename("http://example.com/", "attachment; filename*=ISO-8859-1''file%20name.txt")
|
||||
assert result == "file name.txt"
|
||||
|
||||
def test_encoding_error_fallback(self):
|
||||
"""Test that encoding errors fall back to safe ASCII filename."""
|
||||
result = _extract_filename("http://example.com/", "attachment; filename*=INVALID-CHARSET''file%20name.txt")
|
||||
result = extract_filename("http://example.com/", "attachment; filename*=INVALID-CHARSET''file%20name.txt")
|
||||
assert result == "file name.txt"
|
||||
|
||||
def test_mixed_quotes_and_encoding(self):
|
||||
"""Test filename with mixed quotes and percent encoding."""
|
||||
result = _extract_filename(
|
||||
result = extract_filename(
|
||||
"http://example.com/", 'attachment; filename="file%20with%20quotes%20%26%20encoding.txt"'
|
||||
)
|
||||
assert result == "file with quotes & encoding.txt"
|
||||
|
||||
Reference in New Issue
Block a user