Implement a ratelimit for Web App Form endpoints (vibe-kanban 033e0d0d)

Prevent adversaries from brute-frocing the form token.
This commit is contained in:
QuantumGhost
2026-01-27 07:48:00 +08:00
parent b59713b980
commit 9bbe63c1d8
4 changed files with 120 additions and 2 deletions

View File

@ -14,6 +14,7 @@ from werkzeug.exceptions import Forbidden
import controllers.web.human_input_form as human_input_module
import controllers.web.site as site_module
from controllers.web.error import WebFormRateLimitExceededError
from models.human_input import RecipientType
from services.human_input_service import FormExpiredError
@ -84,6 +85,10 @@ def test_get_form_includes_site(monkeypatch: pytest.MonkeyPatch, app: Flask):
return _FakeDefinition()
form = _FakeForm(expiration_time)
limiter_mock = MagicMock()
limiter_mock.is_rate_limited.return_value = False
monkeypatch.setattr(human_input_module, "_FORM_ACCESS_RATE_LIMITER", limiter_mock)
monkeypatch.setattr(human_input_module, "extract_remote_ip", lambda req: "203.0.113.10")
tenant = SimpleNamespace(
id="tenant-1",
@ -172,6 +177,8 @@ def test_get_form_includes_site(monkeypatch: pytest.MonkeyPatch, app: Flask):
},
}
service_mock.get_form_by_token.assert_called_once_with("token-1")
limiter_mock.is_rate_limited.assert_called_once_with("203.0.113.10")
limiter_mock.increment_rate_limit.assert_called_once_with("203.0.113.10")
def test_get_form_allows_backstage_token(monkeypatch: pytest.MonkeyPatch, app: Flask):
@ -200,6 +207,10 @@ def test_get_form_allows_backstage_token(monkeypatch: pytest.MonkeyPatch, app: F
return _FakeDefinition()
form = _FakeForm(expiration_time)
limiter_mock = MagicMock()
limiter_mock.is_rate_limited.return_value = False
monkeypatch.setattr(human_input_module, "_FORM_ACCESS_RATE_LIMITER", limiter_mock)
monkeypatch.setattr(human_input_module, "extract_remote_ip", lambda req: "203.0.113.10")
tenant = SimpleNamespace(
id="tenant-1",
status=TenantStatus.NORMAL,
@ -285,6 +296,8 @@ def test_get_form_allows_backstage_token(monkeypatch: pytest.MonkeyPatch, app: F
},
}
service_mock.get_form_by_token.assert_called_once_with("token-1")
limiter_mock.is_rate_limited.assert_called_once_with("203.0.113.10")
limiter_mock.increment_rate_limit.assert_called_once_with("203.0.113.10")
def test_get_form_raises_forbidden_when_site_missing(monkeypatch: pytest.MonkeyPatch, app: Flask):
@ -313,6 +326,10 @@ def test_get_form_raises_forbidden_when_site_missing(monkeypatch: pytest.MonkeyP
return _FakeDefinition()
form = _FakeForm(expiration_time)
limiter_mock = MagicMock()
limiter_mock.is_rate_limited.return_value = False
monkeypatch.setattr(human_input_module, "_FORM_ACCESS_RATE_LIMITER", limiter_mock)
monkeypatch.setattr(human_input_module, "extract_remote_ip", lambda req: "203.0.113.10")
tenant = SimpleNamespace(status=TenantStatus.NORMAL)
app_model = SimpleNamespace(id="app-1", tenant_id="tenant-1", tenant=tenant)
workflow_run = SimpleNamespace(app_id="app-1")
@ -327,6 +344,8 @@ def test_get_form_raises_forbidden_when_site_missing(monkeypatch: pytest.MonkeyP
with app.test_request_context("/api/form/human_input/token-1", method="GET"):
with pytest.raises(Forbidden):
HumanInputFormApi().get("token-1")
limiter_mock.is_rate_limited.assert_called_once_with("203.0.113.10")
limiter_mock.increment_rate_limit.assert_called_once_with("203.0.113.10")
def test_submit_form_accepts_backstage_token(monkeypatch: pytest.MonkeyPatch, app: Flask):
@ -336,6 +355,10 @@ def test_submit_form_accepts_backstage_token(monkeypatch: pytest.MonkeyPatch, ap
recipient_type = RecipientType.BACKSTAGE
form = _FakeForm()
limiter_mock = MagicMock()
limiter_mock.is_rate_limited.return_value = False
monkeypatch.setattr(human_input_module, "_FORM_SUBMIT_RATE_LIMITER", limiter_mock)
monkeypatch.setattr(human_input_module, "extract_remote_ip", lambda req: "203.0.113.10")
service_mock = MagicMock()
service_mock.get_form_by_token.return_value = form
monkeypatch.setattr(human_input_module, "HumanInputService", lambda engine: service_mock)
@ -357,6 +380,56 @@ def test_submit_form_accepts_backstage_token(monkeypatch: pytest.MonkeyPatch, ap
form_data={"content": "ok"},
submission_end_user_id=None,
)
limiter_mock.is_rate_limited.assert_called_once_with("203.0.113.10")
limiter_mock.increment_rate_limit.assert_called_once_with("203.0.113.10")
def test_submit_form_rate_limited(monkeypatch: pytest.MonkeyPatch, app: Flask):
"""POST rejects submissions when rate limit is exceeded."""
limiter_mock = MagicMock()
limiter_mock.is_rate_limited.return_value = True
monkeypatch.setattr(human_input_module, "_FORM_SUBMIT_RATE_LIMITER", limiter_mock)
monkeypatch.setattr(human_input_module, "extract_remote_ip", lambda req: "203.0.113.10")
service_mock = MagicMock()
service_mock.get_form_by_token.return_value = None
monkeypatch.setattr(human_input_module, "HumanInputService", lambda engine: service_mock)
monkeypatch.setattr(human_input_module, "db", _FakeDB(_FakeSession({})))
with app.test_request_context(
"/api/form/human_input/token-1",
method="POST",
json={"inputs": {"content": "ok"}, "action": "approve"},
):
with pytest.raises(WebFormRateLimitExceededError):
HumanInputFormApi().post("token-1")
limiter_mock.is_rate_limited.assert_called_once_with("203.0.113.10")
limiter_mock.increment_rate_limit.assert_not_called()
service_mock.get_form_by_token.assert_not_called()
def test_get_form_rate_limited(monkeypatch: pytest.MonkeyPatch, app: Flask):
"""GET rejects requests when rate limit is exceeded."""
limiter_mock = MagicMock()
limiter_mock.is_rate_limited.return_value = True
monkeypatch.setattr(human_input_module, "_FORM_ACCESS_RATE_LIMITER", limiter_mock)
monkeypatch.setattr(human_input_module, "extract_remote_ip", lambda req: "203.0.113.10")
service_mock = MagicMock()
service_mock.get_form_by_token.return_value = None
monkeypatch.setattr(human_input_module, "HumanInputService", lambda engine: service_mock)
monkeypatch.setattr(human_input_module, "db", _FakeDB(_FakeSession({})))
with app.test_request_context("/api/form/human_input/token-1", method="GET"):
with pytest.raises(WebFormRateLimitExceededError):
HumanInputFormApi().get("token-1")
limiter_mock.is_rate_limited.assert_called_once_with("203.0.113.10")
limiter_mock.increment_rate_limit.assert_not_called()
service_mock.get_form_by_token.assert_not_called()
def test_get_form_raises_expired(monkeypatch: pytest.MonkeyPatch, app: Flask):
@ -364,6 +437,10 @@ def test_get_form_raises_expired(monkeypatch: pytest.MonkeyPatch, app: Flask):
pass
form = _FakeForm()
limiter_mock = MagicMock()
limiter_mock.is_rate_limited.return_value = False
monkeypatch.setattr(human_input_module, "_FORM_ACCESS_RATE_LIMITER", limiter_mock)
monkeypatch.setattr(human_input_module, "extract_remote_ip", lambda req: "203.0.113.10")
service_mock = MagicMock()
service_mock.get_form_by_token.return_value = form
service_mock.ensure_form_active.side_effect = FormExpiredError("form-id")
@ -375,3 +452,5 @@ def test_get_form_raises_expired(monkeypatch: pytest.MonkeyPatch, app: Flask):
HumanInputFormApi().get("token-1")
service_mock.ensure_form_active.assert_called_once_with(form)
limiter_mock.is_rate_limited.assert_called_once_with("203.0.113.10")
limiter_mock.increment_rate_limit.assert_called_once_with("203.0.113.10")