How We Audited a High-Traffic Python Enterprise Stack on OVH and Mitigated Server-Side Request Forgery (SSRF) in webhook parsers
Auditing a High-Traffic Python Stack on OVH: A Deep Dive into SSRF Mitigation
This post details a recent security audit of a high-traffic Python enterprise application hosted on OVH infrastructure. The primary focus was identifying and mitigating Server-Side Request Forgery (SSRF) vulnerabilities, particularly within webhook parsing mechanisms. We’ll cover the diagnostic process, specific code vulnerabilities, and the implemented remediation strategies, providing actionable insights for CTOs and VPs of Engineering managing similar stacks.
Environment and Initial Assessment
The target application is a complex microservices architecture built primarily in Python (Flask and Django) with a PostgreSQL backend, running on a fleet of OVH dedicated servers. Traffic volume averages several thousand requests per second, with significant bursts during peak hours. The application integrates with numerous third-party services via webhooks, which are a common vector for SSRF attacks.
Our initial assessment involved:
- Review of network topology and firewall rules (OVH’s native firewall and server-level `iptables`).
- Analysis of ingress/egress traffic patterns using server logs and network monitoring tools.
- Code review of webhook processing endpoints and any external resource fetching logic.
- Penetration testing focused on manipulating webhook payloads to trigger SSRF.
Identifying SSRF Vulnerabilities in Webhook Parsers
Webhooks are designed to be initiated by external services, sending data to our application. If the application then uses a URL provided within that webhook payload to fetch additional resources without proper validation, an attacker can trick the server into making requests to arbitrary internal or external resources. This is the core of SSRF.
A common pattern we observed was a webhook handler that received a URL and then used it to fetch data for processing. For example, a hypothetical webhook might look like this JSON payload:
{
"event_type": "user_update",
"payload": {
"user_id": "12345",
"avatar_url": "https://external-service.com/avatars/user12345.jpg",
"metadata_source": "https://internal-metadata-service.local/users/12345/details"
}
}
The vulnerable Python code might then attempt to fetch the avatar or metadata like this:
import requests
import logging
def process_webhook(request_data):
user_id = request_data.get('payload', {}).get('user_id')
avatar_url = request_data.get('payload', {}).get('avatar_url')
metadata_source = request_data.get('payload', {}).get('metadata_source')
if avatar_url:
try:
# Vulnerable: Directly using user-supplied URL
response = requests.get(avatar_url, timeout=5)
if response.status_code == 200:
# Process avatar data
logging.info(f"Successfully fetched avatar for user {user_id}")
else:
logging.warning(f"Failed to fetch avatar for user {user_id}: {response.status_code}")
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching avatar for user {user_id}: {e}")
if metadata_source:
try:
# Another vulnerable point
response = requests.get(metadata_source, timeout=5)
if response.status_code == 200:
# Process metadata
logging.info(f"Successfully fetched metadata for user {user_id}")
else:
logging.warning(f"Failed to fetch metadata for user {user_id}: {response.status_code}")
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching metadata for user {user_id}: {e}")
# ... rest of processing
An attacker could craft a payload where `metadata_source` points to an internal service, such as an EC2 metadata endpoint (if on AWS, though this is OVH, so internal IPs are the concern), a database service, or another internal API. For instance, pointing to `http://169.254.169.254/latest/meta-data/` (a common cloud metadata endpoint, though not directly applicable to OVH’s bare-metal/VMs, the principle of targeting internal IPs holds) or `http://localhost:5432` to probe the PostgreSQL database.
Exploitation Scenarios and Impact
The impact of a successful SSRF attack can range from information disclosure to remote code execution, depending on the internal services accessible from the compromised server. Common exploitation scenarios include:
- Information Disclosure: Accessing internal configuration files, credentials, or sensitive data exposed by internal services.
- Port Scanning: Probing internal network ports to discover running services and their vulnerabilities.
- Bypassing Firewalls: Making requests to internal services that are not directly exposed to the internet but are accessible from the web server.
- Attacking Other Internal Services: Using the compromised server as a pivot point to attack other systems within the internal network.
Mitigation Strategy: Defense in Depth
Mitigating SSRF requires a multi-layered approach, focusing on validating all user-supplied input that is used in network requests. We implemented the following strategies:
1. Strict URL Validation and Whitelisting
The most effective defense is to only allow requests to known, trusted domains or IP addresses. This can be achieved by maintaining an explicit whitelist of allowed domains/IPs for webhook data fetching.
import requests
import logging
from urllib.parse import urlparse
ALLOWED_DOMAINS = {
"external-service.com",
"cdn.example.com"
}
# In a real-world scenario, this list would be dynamically managed
# and potentially loaded from a secure configuration store.
def is_url_allowed(url):
try:
parsed_url = urlparse(url)
# Check if scheme is http or https
if parsed_url.scheme not in ('http', 'https'):
return False
# Check if domain is in the allowed list
return parsed_url.netloc in ALLOWED_DOMAINS
except Exception as e:
logging.error(f"Error parsing URL {url}: {e}")
return False
def process_webhook_secure(request_data):
user_id = request_data.get('payload', {}).get('user_id')
avatar_url = request_data.get('payload', {}).get('avatar_url')
metadata_source = request_data.get('payload', {}).get('metadata_source')
if avatar_url and is_url_allowed(avatar_url):
try:
response = requests.get(avatar_url, timeout=5)
if response.status_code == 200:
logging.info(f"Successfully fetched avatar for user {user_id}")
else:
logging.warning(f"Failed to fetch avatar for user {user_id}: {response.status_code}")
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching avatar for user {user_id}: {e}")
elif avatar_url:
logging.warning(f"Blocked unauthorized URL for avatar fetch: {avatar_url} for user {user_id}")
if metadata_source and is_url_allowed(metadata_source):
try:
response = requests.get(metadata_source, timeout=5)
if response.status_code == 200:
logging.info(f"Successfully fetched metadata for user {user_id}")
else:
logging.warning(f"Failed to fetch metadata for user {user_id}: {response.status_code}")
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching metadata for user {user_id}: {e}")
elif metadata_source:
logging.warning(f"Blocked unauthorized URL for metadata fetch: {metadata_source} for user {user_id}")
# ... rest of processing
This approach is robust but requires careful management of the whitelist. Any new trusted external service must be added to this list.
2. Network-Level Restrictions (OVH Firewall & iptables)
While application-level validation is primary, network-level controls provide an additional layer of defense. We configured OVH’s firewall and server-level `iptables` to restrict outbound connections from the web servers.
OVH Firewall Configuration (Example):
# Within OVH Control Panel -> Network -> IPs -> Firewall # Create rules to ALLOW outbound traffic ONLY to specific IPs/ports # For example, allow outbound to CDN IPs on port 443 (HTTPS) # Rule: ALLOW TCP OUT from [Your Server IP] to [CDN IP Address] on port 443 # Rule: ALLOW TCP OUT from [Your Server IP] to [External Service IP] on port 443 # Explicitly DENY outbound traffic to private IP ranges # Rule: DENY TCP OUT from [Your Server IP] to 10.0.0.0/8 on any port # Rule: DENY TCP OUT from [Your Server IP] to 172.16.0.0/12 on any port # Rule: DENY TCP OUT from [Your Server IP] to 192.168.0.0/16 on any port # Rule: DENY TCP OUT from [Your Server IP] to 127.0.0.1/32 on any port # Rule: DENY TCP OUT from [Your Server IP] to [Internal Service IPs] on any port
Server-Level `iptables` Configuration (Example):
# Ensure you have a backup of your current iptables rules before applying changes. # sudo iptables-save > /etc/iptables/rules.v4.bak # Flush existing rules (use with extreme caution in production) # sudo iptables -F # sudo iptables -X # sudo iptables -t nat -F # sudo iptables -t nat -X # sudo iptables -t mangle -F # sudo iptables -t mangle -X # Set default policies to DROP for outbound traffic sudo iptables -P OUTPUT DROP # Allow loopback traffic sudo iptables -A OUTPUT -o lo -j ACCEPT # Allow established connections sudo iptables -A OUTPUT -m conntrack --ctstate ESTABLISHED,RELATED -j ACCEPT # Allow outbound connections to specific trusted external IPs/ports # Example: Allow access to a CDN IP on port 443 # sudo iptables -A OUTPUT -d <CDN_IP_ADDRESS> -p tcp --dport 443 -j ACCEPT # Example: Allow access to an external API IP on port 443 # sudo iptables -A OUTPUT -d <EXTERNAL_API_IP> -p tcp --dport 443 -j ACCEPT # Explicitly block access to private IP ranges and localhost sudo iptables -A OUTPUT -d 10.0.0.0/8 -j DROP sudo iptables -A OUTPUT -d 172.16.0.0/12 -j DROP sudo iptables -A OUTPUT -d 192.168.0.0/16 -j DROP sudo iptables -A OUTPUT -d 127.0.0.1 -j DROP # Block access to common internal service ports if not explicitly allowed # Example: Block access to PostgreSQL port on localhost # sudo iptables -A OUTPUT -d 127.0.0.1 -p tcp --dport 5432 -j DROP # Allow DNS resolution (UDP port 53) to specific trusted DNS servers # sudo iptables -A OUTPUT -p udp --dport 53 -d <TRUSTED_DNS_SERVER_IP> -j ACCEPT # Save the rules (requires iptables-persistent package or similar) # sudo netfilter-persistent save
It’s crucial to identify all legitimate outbound destinations and explicitly allow them. Any traffic not matching an ALLOW rule will be DROPPED by the default policy. This requires thorough network mapping and understanding of all inter-service communication.
3. Using a Dedicated Proxy or Gateway
For more complex scenarios or when direct IP whitelisting is impractical, introducing a dedicated outbound proxy or API gateway can centralize and enforce outbound access policies. All outbound requests from the application would be routed through this proxy, which then performs validation before forwarding the request.
4. Input Sanitization and Schema Validation
Beyond URL validation, ensure that all data within the webhook payload is strictly validated against an expected schema. This prevents malformed payloads from being processed in unexpected ways, which could indirectly lead to security issues.
from marshmallow import Schema, fields, ValidationError
class WebhookPayloadSchema(Schema):
user_id = fields.Str(required=True)
avatar_url = fields.URL(required=False, allow_none=True)
metadata_source = fields.URL(required=False, allow_none=True)
def process_webhook_with_schema(request_data):
schema = WebhookPayloadSchema()
try:
# Load and validate data against the schema
validated_data = schema.load(request_data.get('payload', {}))
except ValidationError as err:
logging.error(f"Invalid webhook payload: {err.messages}")
return {"status": "error", "message": "Invalid payload"}
user_id = validated_data.get('user_id')
avatar_url = validated_data.get('avatar_url')
metadata_source = validated_data.get('metadata_source')
# Proceed with processing using validated_data, applying URL validation as before
if avatar_url and is_url_allowed(avatar_url):
# ... fetch avatar
pass
elif avatar_url:
logging.warning(f"Blocked unauthorized URL for avatar fetch: {avatar_url} for user {user_id}")
if metadata_source and is_url_allowed(metadata_source):
# ... fetch metadata
pass
elif metadata_source:
logging.warning(f"Blocked unauthorized URL for metadata fetch: {metadata_source} for user {user_id}")
return {"status": "success"}
Monitoring and Alerting
Effective monitoring is critical for detecting and responding to attempted SSRF attacks. We implemented the following:
- Log Analysis: Centralized logging of all outbound connection attempts, especially those originating from webhook processing logic. Alerts are triggered for any connection attempts to internal IP ranges or disallowed external IPs.
- Network Traffic Monitoring: Real-time monitoring of outbound network traffic for anomalies, such as unexpected destinations or protocols.
- Intrusion Detection Systems (IDS): Deploying IDS signatures that can detect common SSRF patterns.
Alerts are configured to notify the security operations team immediately upon detection of suspicious outbound activity, allowing for rapid investigation and response.
Conclusion
Auditing and securing a high-traffic enterprise application against SSRF requires a comprehensive strategy that combines secure coding practices, robust input validation, strict network segmentation, and vigilant monitoring. By implementing a layered defense, including strict URL whitelisting, network-level restrictions via OVH firewall and `iptables`, and thorough input sanitization, we significantly reduced the attack surface and mitigated the risk of SSRF vulnerabilities within our Python stack. Continuous review and adaptation of these security measures are paramount in the ever-evolving threat landscape.