Code Auditing Guidelines: Detecting and Fixing Server-Side Request Forgery (SSRF) in webhook parsers in Your Python Monolith
Understanding SSRF in Webhook Parsers
Server-Side Request Forgery (SSRF) is a critical vulnerability that allows an attacker to induce the server-side application to make HTTP requests to an arbitrary domain of the attacker’s choosing. In the context of webhook parsers within a Python monolith, this often arises when user-supplied data is used to construct URLs for outgoing requests without proper validation. Attackers can exploit this to scan internal networks, access sensitive internal services, or even interact with cloud metadata endpoints.
Consider a common scenario where a webhook handler needs to fetch additional data from a third-party service based on an identifier provided in the incoming webhook payload. If the URL for this fetch operation is constructed directly from user input, it becomes a prime target for SSRF.
Identifying SSRF Vulnerabilities in Python Code
The primary indicator of an SSRF vulnerability in Python code is the use of user-controlled input to construct URLs for network requests. This typically involves libraries like requests, urllib.request, or even lower-level socket operations. We’ll focus on the requests library as it’s prevalent.
Look for patterns where a variable derived from incoming request data (e.g., JSON payload, query parameters, headers) is directly concatenated or formatted into a URL string that is then passed to a request function.
Example Vulnerable Code Snippet
Imagine a Flask application endpoint that processes incoming webhooks. The webhook payload might contain a resource_url field that the application is supposed to fetch and process.
from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
@app.route('/webhook/process', methods=['POST'])
def process_webhook():
data = request.get_json()
resource_url = data.get('resource_url')
if not resource_url:
return jsonify({"error": "resource_url is required"}), 400
try:
# Vulnerable: resource_url is directly used without validation
response = requests.get(resource_url, timeout=5)
response.raise_for_status() # Raise an exception for bad status codes
processed_data = process_content(response.text)
return jsonify({"status": "success", "data": processed_data}), 200
except requests.exceptions.RequestException as e:
return jsonify({"error": f"Failed to fetch resource: {e}"}), 500
except Exception as e:
return jsonify({"error": f"An unexpected error occurred: {e}"}), 500
def process_content(content):
# Placeholder for actual content processing logic
return f"Processed: {content[:50]}..."
if __name__ == '__main__':
app.run(debug=True)
In this snippet, the resource_url is taken directly from the JSON payload and passed to requests.get(). An attacker could provide a URL like http://169.254.169.254/latest/meta-data/ (AWS EC2 metadata endpoint) or http://localhost:8080/admin to probe internal services.
Mitigation Strategies: Validation and Sanitization
The core principle for preventing SSRF is to strictly validate and sanitize any user-supplied input that influences outgoing network requests. This involves:
- Allowlisting: The most secure approach is to maintain a strict allowlist of domains or IP addresses that the application is permitted to connect to. Any URL not matching this list should be rejected.
- Denylisting (Less Secure): While less robust, a denylist can block known malicious IPs or internal IP ranges. This is prone to bypasses.
- URL Parsing and Validation: Carefully parse the URL and check its components (scheme, hostname, port) against expected values.
- Disabling Redirects: If not strictly necessary, disable automatic redirects in HTTP clients, as they can be used to chain requests to unintended destinations.
Implementing a Robust Allowlist Strategy
An allowlist is the preferred method. This can be implemented by defining a set of trusted domains or by performing a reverse DNS lookup and checking against a list of allowed hostnames.
Refactored Vulnerable Code with Allowlist
Let’s refactor the previous example to include an allowlist. We’ll define a set of allowed domains and use Python’s urllib.parse to inspect the URL.
from flask import Flask, request, jsonify
import requests
from urllib.parse import urlparse
app = Flask(__name__)
# Define your trusted domains
ALLOWED_DOMAINS = {
"api.example.com",
"cdn.example.com",
"external-service.net"
}
# Optional: Define internal IP ranges to block if not using a strict allowlist
# BLOCKED_IPS = ["127.0.0.1", "10.0.0.0/8", "192.168.0.0/16", "172.16.0.0/12"]
def is_internal_ip(ip_address):
# Basic check for common internal IP ranges.
# For a more robust solution, consider libraries like 'ipaddress'.
if ip_address.startswith("127."): return True
if ip_address.startswith("10."): return True
if ip_address.startswith("192.168."): return True
if ip_address.startswith("172."):
try:
parts = ip_address.split('.')
if len(parts) == 4:
octet2 = int(parts[1])
if 16 <= octet2 <= 31:
return True
except ValueError:
pass # Not a valid IP format
return False
def is_allowed_url(url):
try:
parsed_url = urlparse(url)
hostname = parsed_url.hostname
if not hostname:
return False # No hostname found
# 1. Check against ALLOWED_DOMAINS
if hostname in ALLOWED_DOMAINS:
return True
# 2. (Optional but recommended) Block access to internal IPs if not explicitly allowed
# This requires resolving the hostname to an IP, which can be complex and slow.
# A simpler approach is to check if the hostname *itself* looks like an internal IP.
# For true IP-based blocking, you'd need to resolve and check.
# Example: if is_internal_ip(hostname): return False # If not explicitly allowed
# 3. Further checks: Ensure scheme is HTTP/HTTPS
if parsed_url.scheme not in ('http', 'https'):
return False
# If it's not in ALLOWED_DOMAINS and not an internal IP (or if internal IPs are generally disallowed), reject.
# For this example, we'll strictly rely on ALLOWED_DOMAINS.
return False
except Exception:
# Handle potential parsing errors gracefully
return False
@app.route('/webhook/process_secure', methods=['POST'])
def process_webhook_secure():
data = request.get_json()
resource_url = data.get('resource_url')
if not resource_url:
return jsonify({"error": "resource_url is required"}), 400
if not is_allowed_url(resource_url):
return jsonify({"error": "Invalid or disallowed resource URL"}), 400
try:
# Secure: resource_url is validated
response = requests.get(resource_url, timeout=5, allow_redirects=False) # Disable redirects
response.raise_for_status()
processed_data = process_content(response.text)
return jsonify({"status": "success", "data": processed_data}), 200
except requests.exceptions.RequestException as e:
return jsonify({"error": f"Failed to fetch resource: {e}"}), 500
except Exception as e:
return jsonify({"error": f"An unexpected error occurred: {e}"}), 500
def process_content(content):
# Placeholder for actual content processing logic
return f"Processed: {content[:50]}..."
if __name__ == '__main__':
app.run(debug=True)
In this improved version:
- We define a clear
ALLOWED_DOMAINSset. - The
is_allowed_urlfunction parses the URL and checks if its hostname is in the allowed set. It also verifies the scheme ishttporhttps. requests.getis called withallow_redirects=Falseto prevent chained attacks via redirects.- The function returns an error if the URL is not allowed, preventing the request from being made.
Advanced Validation: IP Address Resolution and Blocking
For more stringent security, especially if your application might receive IP addresses directly or if hostnames could resolve to internal IPs, you’ll need to perform IP address resolution and check against internal IP ranges. This adds complexity and potential latency.
Python’s socket module can be used for DNS resolution, but be mindful of DNS rebinding attacks. A more robust approach involves using the ipaddress module for IP range checks.
import socket
import ipaddress
# ... (previous code) ...
# Define internal IP networks using ipaddress module
INTERNAL_NETWORKS = [
ipaddress.ip_network('127.0.0.0/8'),
ipaddress.ip_network('10.0.0.0/8'),
ipaddress.ip_network('192.168.0.0/16'),
ipaddress.ip_network('172.16.0.0/12'),
# Add other internal networks as needed, e.g., private RFC1918 ranges
]
def is_url_pointing_to_internal_ip(url):
try:
parsed_url = urlparse(url)
hostname = parsed_url.hostname
if not hostname:
return False # No hostname
# If the hostname is already an IP address, check it directly
try:
ip_addr = ipaddress.ip_address(hostname)
for network in INTERNAL_NETWORKS:
if ip_addr in network:
return True
return False # It's a public IP address
except ValueError:
# Not an IP address, proceed to DNS resolution
pass
# Resolve hostname to IP address
# Be cautious: DNS resolution can be slow and is susceptible to DNS rebinding.
# Consider using a timeout for DNS lookups if possible, or a dedicated DNS resolver.
ip_addresses = socket.getaddrinfo(hostname, None, socket.AF_INET, socket.SOCK_STREAM)
for res in ip_addresses:
ip_str = res[4][0] # Get the IP address string
try:
ip_addr = ipaddress.ip_address(ip_str)
for network in INTERNAL_NETWORKS:
if ip_addr in network:
return True
except ValueError:
# Not a valid IP address format returned by getaddrinfo
continue
return False # All resolved IPs are external
except socket.gaierror:
# Hostname resolution failed
return False
except Exception:
# Other errors during parsing or IP address handling
return False
def is_allowed_url_advanced(url):
try:
parsed_url = urlparse(url)
hostname = parsed_url.hostname
if not hostname:
return False
# 1. Check against ALLOWED_DOMAINS
if hostname in ALLOWED_DOMAINS:
return True
# 2. Check if the hostname resolves to an internal IP
if is_url_pointing_to_internal_ip(url):
return False # Explicitly block internal IPs if not in ALLOWED_DOMAINS
# 3. Ensure scheme is HTTP/HTTPS
if parsed_url.scheme not in ('http', 'https'):
return False
return True # If it passed all checks and is not internal, allow it.
except Exception:
return False
# Update your route to use is_allowed_url_advanced
@app.route('/webhook/process_secure_advanced', methods=['POST'])
def process_webhook_secure_advanced():
data = request.get_json()
resource_url = data.get('resource_url')
if not resource_url:
return jsonify({"error": "resource_url is required"}), 400
if not is_allowed_url_advanced(resource_url):
return jsonify({"error": "Invalid or disallowed resource URL"}), 400
try:
response = requests.get(resource_url, timeout=5, allow_redirects=False)
response.raise_for_status()
processed_data = process_content(response.text)
return jsonify({"status": "success", "data": processed_data}), 200
except requests.exceptions.RequestException as e:
return jsonify({"error": f"Failed to fetch resource: {e}"}), 500
except Exception as e:
return jsonify({"error": f"An unexpected error occurred: {e}"}), 500
This advanced validation:
- Uses the
ipaddressmodule for accurate internal IP range checking. - Attempts to resolve hostnames to IP addresses and checks if any resolved IP falls within internal networks.
- Handles cases where the provided URL might already be an IP address.
Caveats: DNS resolution can be a bottleneck and a security concern (DNS rebinding). For critical applications, consider using a dedicated, secure DNS resolver or a proxy that handles these checks.
Code Auditing Checklist for SSRF
- Identify all outgoing HTTP requests: Search your codebase for usage of libraries like
requests,urllib,httpx, etc. - Trace data flow: For each outgoing request, determine if any part of the URL, headers, or body originates from user-controlled input (webhooks, API requests, file uploads, etc.).
- Check for validation: Verify if user-supplied URL components are strictly validated against an allowlist of trusted domains/IPs.
- Look for denylists: If denylists are used, assess their completeness and the risk of bypasses (e.g., using different IP representations like octal or hex).
- Examine redirect handling: Ensure that HTTP client redirects are disabled or carefully managed if they are essential.
- Test with malicious payloads: Craft test cases that attempt to access internal services (e.g.,
http://localhost:port,http://127.0.0.1:port), cloud metadata endpoints (e.g.,http://169.254.169.254/), or use IP address variations. - Review DNS resolution logic: If your application performs DNS lookups based on user input, ensure it’s protected against DNS rebinding attacks.
Conclusion
Server-Side Request Forgery in webhook parsers is a serious threat that can be mitigated through diligent code auditing and robust input validation. Prioritizing an allowlist-based approach for all external requests originating from user-controlled data is paramount. Regularly reviewing your codebase for these patterns and implementing security best practices will significantly reduce your application’s attack surface.