refactor webhook service

This commit is contained in:
hjlarry
2025-10-01 12:46:42 +08:00
parent 9114881623
commit 604651873e
3 changed files with 340 additions and 734 deletions

View File

@ -82,15 +82,34 @@ class WebhookService:
return webhook_trigger, workflow, node_config
@classmethod
def extract_and_validate_webhook_data(
cls, webhook_trigger: WorkflowWebhookTrigger, node_config: Mapping[str, Any]
) -> dict[str, Any]:
"""Extract and validate webhook data in a single unified process."""
# Extract raw data first
raw_data = cls.extract_webhook_data(webhook_trigger)
# Validate HTTP metadata (method, content-type)
node_data = node_config.get("data", {})
validation_result = cls._validate_http_metadata(raw_data, node_data)
if not validation_result["valid"]:
raise ValueError(validation_result["error"])
# Process and validate data according to configuration
processed_data = cls._process_and_validate_data(raw_data, node_data)
return processed_data
@classmethod
def extract_webhook_data(cls, webhook_trigger: WorkflowWebhookTrigger) -> dict[str, Any]:
"""Extract and process data from incoming webhook request."""
"""Extract raw data from incoming webhook request without type conversion."""
cls._validate_content_length()
data = {
"method": request.method,
"headers": dict(request.headers),
"query_params": cls._extract_query_params(),
"query_params": dict(request.args),
"body": {},
"files": {},
}
@ -121,41 +140,25 @@ class WebhookService:
return data
@classmethod
def _extract_query_params(cls) -> dict[str, Any]:
"""Extract query parameters preserving multi-value entries."""
if not request.args:
return {}
def _process_and_validate_data(cls, raw_data: dict[str, Any], node_data: dict[str, Any]) -> dict[str, Any]:
"""Process and validate webhook data according to node configuration."""
result = raw_data.copy()
query_params: dict[str, Any] = {}
for key, value in request.args.items():
query_params[key] = cls._convert_query_param_value(value)
# Validate and process headers
cls._validate_required_headers(raw_data["headers"], node_data.get("headers", []))
return query_params
# Process query parameters with type conversion and validation
result["query_params"] = cls._process_parameters(
raw_data["query_params"], node_data.get("params", []), is_form_data=True
)
@classmethod
def _convert_query_param_value(cls, value: str) -> Any:
"""Convert query parameter strings to numbers or booleans when applicable."""
lower_value = value.lower()
bool_map = {
"true": True,
"false": False,
"yes": True,
"no": False,
}
# Process body parameters based on content type
configured_content_type = node_data.get("content_type", "application/json").lower()
result["body"] = cls._process_body_parameters(
raw_data["body"], node_data.get("body", []), configured_content_type
)
if lower_value in bool_map:
return bool_map[lower_value]
if cls._can_convert_to_number(value):
try:
numeric_value = float(value)
if numeric_value.is_integer():
return int(numeric_value)
return numeric_value
except ValueError:
return value
return value
return result
@classmethod
def _validate_content_length(cls) -> None:
@ -260,26 +263,156 @@ class WebhookService:
)
@classmethod
def validate_webhook_request(cls, webhook_data: dict[str, Any], node_config: Mapping[str, Any]) -> dict[str, Any]:
"""Validate webhook request against node configuration."""
if node_config is None:
return cls._validation_error("Validation failed: Invalid node configuration")
def _process_parameters(
cls, raw_params: dict[str, str], param_configs: list, is_form_data: bool = False
) -> dict[str, Any]:
"""Process parameters with unified validation and type conversion."""
processed = {}
configured_params = {config.get("name", ""): config for config in param_configs}
node_data = node_config.get("data", {})
# Process configured parameters
for param_config in param_configs:
name = param_config.get("name", "")
param_type = param_config.get("type", SegmentType.STRING)
required = param_config.get("required", False)
# Early validation of HTTP method and content-type
validation_result = cls._validate_http_metadata(webhook_data, node_data)
if not validation_result["valid"]:
return validation_result
# Check required parameters
if required and name not in raw_params:
raise ValueError(f"Required parameter missing: {name}")
# Validate headers and query params
validation_result = cls._validate_headers_and_params(webhook_data, node_data)
if not validation_result["valid"]:
return validation_result
if name in raw_params:
raw_value = raw_params[name]
processed[name] = cls._validate_and_convert_value(name, raw_value, param_type, is_form_data)
# Validate body based on content type
configured_content_type = node_data.get("content_type", "application/json").lower()
return cls._validate_body_by_content_type(webhook_data, node_data, configured_content_type)
# Include unconfigured parameters as strings
for name, value in raw_params.items():
if name not in configured_params:
processed[name] = value
return processed
@classmethod
def _process_body_parameters(
cls, raw_body: dict[str, Any], body_configs: list, content_type: str
) -> dict[str, Any]:
"""Process body parameters based on content type and configuration."""
if content_type in ["text/plain", "application/octet-stream"]:
# For text/plain and octet-stream, validate required content exists
if body_configs and any(config.get("required", False) for config in body_configs):
raw_content = raw_body.get("raw")
if not raw_content:
raise ValueError(f"Required body content missing for {content_type} request")
return raw_body
# For structured data (JSON, form-data, etc.)
processed = {}
configured_params = {config.get("name", ""): config for config in body_configs}
for body_config in body_configs:
name = body_config.get("name", "")
param_type = body_config.get("type", SegmentType.STRING)
required = body_config.get("required", False)
# Handle file parameters for multipart data
if param_type == SegmentType.FILE and content_type == "multipart/form-data":
# File validation is handled separately in extract phase
continue
# Check required parameters
if required and name not in raw_body:
raise ValueError(f"Required body parameter missing: {name}")
if name in raw_body:
raw_value = raw_body[name]
is_form_data = content_type in ["application/x-www-form-urlencoded", "multipart/form-data"]
processed[name] = cls._validate_and_convert_value(name, raw_value, param_type, is_form_data)
# Include unconfigured parameters
for name, value in raw_body.items():
if name not in configured_params:
processed[name] = value
return processed
@classmethod
def _validate_and_convert_value(cls, param_name: str, value: Any, param_type: str, is_form_data: bool) -> Any:
"""Unified validation and type conversion for parameter values."""
try:
if is_form_data:
# Form data comes as strings and needs conversion
return cls._convert_form_value(param_name, value, param_type)
else:
# JSON data should already be in correct types, just validate
return cls._validate_json_value(param_name, value, param_type)
except Exception as e:
raise ValueError(f"Parameter '{param_name}' validation failed: {str(e)}")
@classmethod
def _convert_form_value(cls, param_name: str, value: str, param_type: str) -> Any:
"""Convert form data string values to specified types."""
if param_type == SegmentType.STRING:
return value
elif param_type == SegmentType.NUMBER:
if not cls._can_convert_to_number(value):
raise ValueError(f"Cannot convert '{value}' to number")
numeric_value = float(value)
return int(numeric_value) if numeric_value.is_integer() else numeric_value
elif param_type == SegmentType.BOOLEAN:
lower_value = value.lower()
bool_map = {"true": True, "false": False, "1": True, "0": False, "yes": True, "no": False}
if lower_value not in bool_map:
raise ValueError(f"Cannot convert '{value}' to boolean")
return bool_map[lower_value]
else:
raise ValueError(f"Unsupported type '{param_type}' for form data parameter '{param_name}'")
@classmethod
def _validate_json_value(cls, param_name: str, value: Any, param_type: str) -> Any:
"""Validate JSON values against expected types."""
type_validators = {
SegmentType.STRING: (lambda v: isinstance(v, str), "string"),
SegmentType.NUMBER: (lambda v: isinstance(v, (int, float)), "number"),
SegmentType.BOOLEAN: (lambda v: isinstance(v, bool), "boolean"),
SegmentType.OBJECT: (lambda v: isinstance(v, dict), "object"),
SegmentType.ARRAY_STRING: (
lambda v: isinstance(v, list) and all(isinstance(item, str) for item in v),
"array of strings",
),
SegmentType.ARRAY_NUMBER: (
lambda v: isinstance(v, list) and all(isinstance(item, (int, float)) for item in v),
"array of numbers",
),
SegmentType.ARRAY_BOOLEAN: (
lambda v: isinstance(v, list) and all(isinstance(item, bool) for item in v),
"array of booleans",
),
SegmentType.ARRAY_OBJECT: (
lambda v: isinstance(v, list) and all(isinstance(item, dict) for item in v),
"array of objects",
),
}
validator_info = type_validators.get(SegmentType(param_type))
if not validator_info:
logger.warning("Unknown parameter type: %s for parameter %s", param_type, param_name)
return value
validator, expected_type = validator_info
if not validator(value):
actual_type = type(value).__name__
raise ValueError(f"Expected {expected_type}, got {actual_type}")
return value
@classmethod
def _validate_required_headers(cls, headers: dict[str, Any], header_configs: list) -> None:
"""Validate required headers are present."""
headers_lower = {k.lower(): v for k, v in headers.items()}
for header_config in header_configs:
if header_config.get("required", False):
header_name = header_config.get("name", "")
if header_name.lower() not in headers_lower:
raise ValueError(f"Required header missing: {header_name}")
@classmethod
def _validate_http_metadata(cls, webhook_data: dict[str, Any], node_data: dict[str, Any]) -> dict[str, Any]:
@ -310,259 +443,11 @@ class WebhookService:
# Extract the main content type (ignore parameters like boundary)
return content_type.split(";")[0].strip()
@classmethod
def _validate_headers_and_params(cls, webhook_data: dict[str, Any], node_data: dict[str, Any]) -> dict[str, Any]:
"""Validate required headers and query parameters."""
# Validate required headers (case-insensitive)
webhook_headers_lower = {k.lower(): v for k, v in webhook_data["headers"].items()}
for header in node_data.get("headers", []):
if header.get("required", False):
header_name = header.get("name", "")
if header_name.lower() not in webhook_headers_lower:
return cls._validation_error(f"Required header missing: {header_name}")
# Validate required query parameters
query_params = webhook_data.get("query_params", {})
for param in node_data.get("params", []):
param_name = param.get("name", "")
param_type = param.get("type", SegmentType.STRING)
is_required = param.get("required", False)
param_exists = param_name in query_params
if is_required and not param_exists:
return cls._validation_error(f"Required query parameter missing: {param_name}")
if not param_exists:
continue
if param_exists and param_type != SegmentType.STRING:
param_value = query_params[param_name]
validation_result = cls._validate_form_parameter_type(param_name, param_value, param_type)
if not validation_result["valid"]:
return validation_result
return {"valid": True}
@classmethod
def _validate_body_by_content_type(
cls, webhook_data: dict[str, Any], node_data: dict[str, Any], content_type: str
) -> dict[str, Any]:
"""Route body validation to appropriate validator based on content type."""
validators = {
"text/plain": cls._validate_text_plain_body,
"application/octet-stream": cls._validate_octet_stream_body,
"application/json": cls._validate_json_body,
"application/x-www-form-urlencoded": cls._validate_form_urlencoded_body,
"multipart/form-data": cls._validate_multipart_body,
}
validator = validators.get(content_type)
if not validator:
raise ValueError(f"Unsupported Content-Type for validation: {content_type}")
return validator(webhook_data, node_data)
@classmethod
def _validate_text_plain_body(cls, webhook_data: dict[str, Any], node_data: dict[str, Any]) -> dict[str, Any]:
"""Validate text/plain body."""
body_params = node_data.get("body", [])
if body_params and any(param.get("required", False) for param in body_params):
body_data = webhook_data.get("body", {})
raw_content = body_data.get("raw", "")
if not raw_content or not isinstance(raw_content, str):
return cls._validation_error("Required body content missing for text/plain request")
return {"valid": True}
@classmethod
def _validate_octet_stream_body(cls, webhook_data: dict[str, Any], node_data: dict[str, Any]) -> dict[str, Any]:
"""Validate application/octet-stream body."""
body_params = node_data.get("body", [])
if body_params and any(param.get("required", False) for param in body_params):
body_data = webhook_data.get("body", {})
raw_content = body_data.get("raw", "")
if not raw_content or not isinstance(raw_content, bytes):
return cls._validation_error("Required body content missing for application/octet-stream request")
return {"valid": True}
@classmethod
def _validate_json_body(cls, webhook_data: dict[str, Any], node_data: dict[str, Any]) -> dict[str, Any]:
"""Validate application/json body."""
body_params = node_data.get("body", [])
body_data = webhook_data.get("body", {})
for body_param in body_params:
param_name = body_param.get("name", "")
param_type = body_param.get("type", SegmentType.STRING)
is_required = body_param.get("required", False)
param_exists = param_name in body_data
if is_required and not param_exists:
return cls._validation_error(f"Required body parameter missing: {param_name}")
if param_exists:
param_value = body_data[param_name]
validation_result = cls._validate_json_parameter_type(param_name, param_value, param_type)
if not validation_result["valid"]:
return validation_result
return {"valid": True}
@classmethod
def _validate_form_urlencoded_body(cls, webhook_data: dict[str, Any], node_data: dict[str, Any]) -> dict[str, Any]:
"""Validate application/x-www-form-urlencoded body."""
body_params = node_data.get("body", [])
body_data = webhook_data.get("body", {})
for body_param in body_params:
param_name = body_param.get("name", "")
param_type = body_param.get("type", SegmentType.STRING)
is_required = body_param.get("required", False)
param_exists = param_name in body_data
if is_required and not param_exists:
return cls._validation_error(f"Required body parameter missing: {param_name}")
if param_exists and param_type != SegmentType.STRING:
param_value = body_data[param_name]
validation_result = cls._validate_form_parameter_type(param_name, param_value, param_type)
if not validation_result["valid"]:
return validation_result
return {"valid": True}
@classmethod
def _validate_multipart_body(cls, webhook_data: dict[str, Any], node_data: dict[str, Any]) -> dict[str, Any]:
"""Validate multipart/form-data body."""
body_params = node_data.get("body", [])
body_data = webhook_data.get("body", {})
for body_param in body_params:
param_name = body_param.get("name", "")
param_type = body_param.get("type", SegmentType.STRING)
is_required = body_param.get("required", False)
if param_type == SegmentType.FILE:
file_obj = webhook_data.get("files", {}).get(param_name)
if is_required and not file_obj:
return cls._validation_error(f"Required file parameter missing: {param_name}")
else:
param_exists = param_name in body_data
if is_required and not param_exists:
return cls._validation_error(f"Required body parameter missing: {param_name}")
if param_exists and param_type != SegmentType.STRING:
param_value = body_data[param_name]
validation_result = cls._validate_form_parameter_type(param_name, param_value, param_type)
if not validation_result["valid"]:
return validation_result
return {"valid": True}
@classmethod
def _validation_error(cls, error_message: str) -> dict[str, Any]:
"""Create a standard validation error response."""
return {"valid": False, "error": error_message}
@classmethod
def _validate_json_parameter_type(cls, param_name: str, param_value: Any, param_type: str) -> dict[str, Any]:
"""Validate JSON parameter type against expected type."""
try:
# Define type validators
type_validators = {
SegmentType.STRING: (lambda v: isinstance(v, str), "string"),
SegmentType.NUMBER: (lambda v: isinstance(v, (int, float)), "number"),
SegmentType.BOOLEAN: (lambda v: isinstance(v, bool), "boolean"),
SegmentType.OBJECT: (lambda v: isinstance(v, dict), "object"),
SegmentType.ARRAY_STRING: (
lambda v: isinstance(v, list) and all(isinstance(item, str) for item in v),
"array of strings",
),
SegmentType.ARRAY_NUMBER: (
lambda v: isinstance(v, list) and all(isinstance(item, (int, float)) for item in v),
"array of numbers",
),
SegmentType.ARRAY_BOOLEAN: (
lambda v: isinstance(v, list) and all(isinstance(item, bool) for item in v),
"array of booleans",
),
SegmentType.ARRAY_OBJECT: (
lambda v: isinstance(v, list) and all(isinstance(item, dict) for item in v),
"array of objects",
),
}
# Get validator for the type
validator_info = type_validators.get(SegmentType(param_type))
if not validator_info:
logger.warning("Unknown parameter type: %s for parameter %s", param_type, param_name)
return {"valid": True}
validator, expected_type = validator_info
# Validate the parameter
if not validator(param_value):
# Check if it's an array type first
if param_type.startswith("array") and not isinstance(param_value, list):
actual_type = type(param_value).__name__
error_msg = f"Parameter '{param_name}' must be an array, got {actual_type}"
else:
actual_type = type(param_value).__name__
# Format error message based on expected type
if param_type.startswith("array"):
error_msg = f"Parameter '{param_name}' must be an {expected_type}"
elif expected_type in ["string", "number", "boolean"]:
error_msg = f"Parameter '{param_name}' must be a {expected_type}, got {actual_type}"
else:
error_msg = f"Parameter '{param_name}' must be an {expected_type}, got {actual_type}"
return {"valid": False, "error": error_msg}
return {"valid": True}
except Exception:
logger.exception("Type validation error for parameter %s", param_name)
return {"valid": False, "error": f"Type validation failed for parameter '{param_name}'"}
@classmethod
def _validate_form_parameter_type(cls, param_name: str, param_value: str, param_type: str) -> dict[str, Any]:
"""Validate form parameter type against expected type. Form data are always strings but can be converted."""
try:
# Define form type converters and validators
form_validators = {
SegmentType.STRING: (lambda _: True, None), # String is always valid
SegmentType.NUMBER: (lambda v: cls._can_convert_to_number(v), "a valid number"),
SegmentType.BOOLEAN: (
lambda v: v.lower() in ["true", "false", "1", "0", "yes", "no"],
"a boolean value",
),
}
# Get validator for the type
validator_info = form_validators.get(SegmentType(param_type))
if not validator_info:
# Unsupported type for form data
return {
"valid": False,
"error": f"Parameter '{param_name}' type '{param_type}' is not supported for form data.",
}
validator, expected_format = validator_info
# Validate the parameter
if not validator(param_value):
return {
"valid": False,
"error": f"Parameter '{param_name}' must be {expected_format}, got '{param_value}'",
}
return {"valid": True}
except Exception:
logger.exception("Form type validation error for parameter %s", param_name)
return {"valid": False, "error": f"Form type validation failed for parameter '{param_name}'"}
@classmethod
def _can_convert_to_number(cls, value: str) -> bool:
"""Check if a string can be converted to a number."""