• Skip to secondary menu
  • Skip to main content
  • Skip to primary sidebar
  • Home
  • Projects
  • Products
  • Themes
  • Tools
  • Request for Quote

Vengala Vinay

Having 9+ Years of Experience in Software Development

  • Home
  • WordPress
  • PHP
    • Codeigniter
  • Django
  • Magento
  • Selenium
  • Server
Home » How We Audited a High-Traffic Python Enterprise Stack on Google Cloud and Mitigated Server-Side Request Forgery (SSRF) in webhook parsers

How We Audited a High-Traffic Python Enterprise Stack on Google Cloud and Mitigated Server-Side Request Forgery (SSRF) in webhook parsers

Initial Stack Assessment and Vulnerability Discovery

Our engagement began with a deep dive into a high-traffic Python enterprise application hosted on Google Cloud Platform (GCP). The primary concern was a recent surge in suspicious outbound network activity, hinting at potential Server-Side Request Forgery (SSRF) vulnerabilities. The stack comprised a Django monolith, Celery for background tasks, Redis for caching and message queuing, and a PostgreSQL database, all orchestrated within a Kubernetes cluster managed by Google Kubernetes Engine (GKE).

The initial audit focused on ingress points, particularly webhook parsers and any API endpoints that accepted external URLs or hostnames. We identified several areas where user-supplied data was used to construct network requests without proper validation or sanitization. The most critical of these were the webhook processing modules, which were designed to ingest data from various third-party services.

Deep Dive into Webhook Parsers and SSRF Vectors

The core of the SSRF vulnerability lay in how the application handled incoming webhook payloads. Specifically, certain webhooks were designed to fetch resources from external URLs provided within the payload. A common pattern observed was:

  • Receiving a webhook containing a JSON payload with a field like "resource_url": "http://external.service.com/data".
  • The backend Python code would then use this URL to fetch data, often for processing or storage.

The absence of strict URL validation allowed an attacker to supply internal GCP metadata service endpoints or other sensitive internal network resources. For instance, a malicious payload could look like this:

{
  "event_type": "user_update",
  "user_id": 12345,
  "resource_url": "http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token"
}

This payload, if processed without checks, would cause the server to attempt to fetch the GCP instance metadata token, exposing credentials that could be used to access other GCP resources. Another common vector involved fetching resources from internal Kubernetes services or even other pods within the same cluster.

Code-Level Analysis and Exploitation Proof-of-Concept

We identified a specific Django view function responsible for processing a particular type of webhook. The relevant snippet of Python code, simplified for illustration, looked something like this:

# views.py (simplified)
import requests
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST

@csrf_exempt
@require_POST
def process_external_resource_webhook(request):
    try:
        data = json.loads(request.body)
        resource_url = data.get('resource_url')

        if not resource_url:
            return JsonResponse({'status': 'error', 'message': 'resource_url is required'}, status=400)

        # Vulnerable part: direct use of user-supplied URL
        response = requests.get(resource_url, timeout=5)
        response.raise_for_status() # Raise an exception for bad status codes

        resource_data = response.json()
        # ... further processing of resource_data ...

        return JsonResponse({'status': 'success', 'message': 'Resource processed'}, status=200)

    except json.JSONDecodeError:
        return JsonResponse({'status': 'error', 'message': 'Invalid JSON payload'}, status=400)
    except requests.exceptions.RequestException as e:
        return JsonResponse({'status': 'error', 'message': f'Failed to fetch resource: {e}'}, status=500)
    except Exception as e:
        return JsonResponse({'status': 'error', 'message': f'An unexpected error occurred: {e}'}, status=500)

The critical flaw here is the direct use of resource_url in requests.get() without any validation against a whitelist of allowed domains or protocols, or any checks to prevent access to internal IP ranges.

Mitigation Strategy: Network Controls and Code Hardening

Our mitigation strategy involved a multi-layered approach, combining network-level controls with stringent code-level validation.

1. Network Policies in GKE

The first line of defense was to leverage GKE’s Network Policies to restrict egress traffic from pods. We implemented a policy that explicitly denied all egress traffic by default and then allowed only specific, necessary outbound connections to known external services. This is crucial for preventing access to internal IPs like 169.254.169.254 or private RFC 1918 ranges.

apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: webhook-egress-policy
  namespace: default # Or your application's namespace
spec:
  podSelector:
    matchLabels:
      app: webhook-processor # Label applied to your webhook processing pods
  policyTypes:
  - Egress
  egress:
  - to:
    - ipBlock:
        cidr: 0.0.0.0/0 # Default deny for all IPs
        except:
        - 10.0.0.0/8    # Deny private IPv4 ranges
        - 172.16.0.0/12
        - 192.168.0.0/16
        - 169.254.0.0/16 # Deny link-local addresses (including metadata service)
        - 192.0.0.0/24   # Deny documentation IPs
        - 198.18.0.0/15  # Deny benchmark testing IPs
    ports:
    - protocol: TCP
      port: 443 # Allow HTTPS egress
    - protocol: TCP
      port: 80  # Allow HTTP egress
  - to:
    - ipBlock:
        cidr: <IP_OF_EXTERNAL_SERVICE_1>/32 # Explicitly allow known external service 1
    ports:
    - protocol: TCP
      port: 443
  - to:
    - ipBlock:
        cidr: <IP_OF_EXTERNAL_SERVICE_2>/32 # Explicitly allow known external service 2
    ports:
    - protocol: TCP
      port: 443
  # Add more allowed external IPs as needed

This policy ensures that pods labeled app: webhook-processor can only initiate outbound TCP connections to specific external IP addresses on ports 80 and 443. All other egress traffic, including to internal IPs and the metadata service, is blocked.

2. Code-Level URL Validation and Sanitization

While network policies are effective, they are not a substitute for robust application-level validation. We refactored the Python code to include strict URL parsing and validation:

# views.py (mitigated version)
import requests
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST
from urllib.parse import urlparse
import ipaddress

# Define a whitelist of allowed domains
ALLOWED_DOMAINS = {
    "external.service.com",
    "another.trusted.api.net",
}

# Define a set of disallowed IP address ranges (internal, private, link-local)
DISALLOWED_IP_RANGES = {
    "0.0.0.0/0", # Default deny, will be handled by explicit allow
    "10.0.0.0/8",
    "172.16.0.0/12",
    "192.168.0.0/16",
    "169.254.0.0/16",
    "192.0.0.0/24",
    "198.18.0.0/15",
}

def is_internal_ip(ip_address):
    """Checks if an IP address falls within any disallowed internal/private range."""
    try:
        ip_obj = ipaddress.ip_address(ip_address)
        for ip_range_str in DISALLOWED_IP_RANGES:
            network = ipaddress.ip_network(ip_range_str, strict=False)
            if ip_obj in network:
                return True
        return False
    except ValueError:
        # Handle cases where the IP address is invalid
        return True # Treat invalid IPs as disallowed

@csrf_exempt
@require_POST
def process_external_resource_webhook(request):
    try:
        data = json.loads(request.body)
        resource_url = data.get('resource_url')

        if not resource_url:
            return JsonResponse({'status': 'error', 'message': 'resource_url is required'}, status=400)

        try:
            parsed_url = urlparse(resource_url)
            hostname = parsed_url.hostname
            scheme = parsed_url.scheme

            # 1. Validate scheme
            if scheme not in ('http', 'https'):
                return JsonResponse({'status': 'error', 'message': 'Unsupported URL scheme'}, status=400)

            # 2. Validate hostname against whitelist
            if hostname not in ALLOWED_DOMAINS:
                # If not in whitelist, check if it resolves to a disallowed IP
                try:
                    # Resolve hostname to IP address (this can be a separate security concern,
                    # but for simplicity, we'll do a basic check here. In production, consider
                    # DNS rebinding protection or a dedicated IP resolution service.)
                    # Note: This requires the 'dnspython' library or similar.
                    # For this example, we'll assume a simplified check or rely on network policies.
                    # A more robust solution would involve DNS resolution and IP checking.
                    # For demonstration, we'll focus on direct IP checks if hostname is an IP.
                    if not hostname: # If hostname is empty (e.g., IP address directly)
                        if is_internal_ip(parsed_url.netloc): # netloc might contain port
                             return JsonResponse({'status': 'error', 'message': 'Access to internal IP disallowed'}, status=400)
                    elif is_internal_ip(hostname): # If hostname itself is an IP and it's internal
                         return JsonResponse({'status': 'error', 'message': 'Access to internal IP disallowed'}, status=400)
                    else:
                        # If hostname is not an IP and not in whitelist, it's disallowed.
                        return JsonResponse({'status': 'error', 'message': 'Access to disallowed domain'}, status=400)

                except Exception as dns_error:
                    # Handle DNS resolution errors if implemented
                    return JsonResponse({'status': 'error', 'message': f'Hostname validation failed: {dns_error}'}, status=400)

            # 3. If all checks pass, proceed with the request
            response = requests.get(resource_url, timeout=5)
            response.raise_for_status()

            resource_data = response.json()
            # ... further processing of resource_data ...

            return JsonResponse({'status': 'success', 'message': 'Resource processed'}, status=200)

        except ValueError as ve: # For urlparse errors
            return JsonResponse({'status': 'error', 'message': f'Invalid URL format: {ve}'}, status=400)
        except requests.exceptions.RequestException as e:
            return JsonResponse({'status': 'error', 'message': f'Failed to fetch resource: {e}'}, status=500)
        except Exception as e:
            return JsonResponse({'status': 'error', 'message': f'An unexpected error occurred: {e}'}, status=500)

    except json.JSONDecodeError:
        return JsonResponse({'status': 'error', 'message': 'Invalid JSON payload'}, status=400)

Key improvements in the code:

  • Scheme Validation: Ensures only http and https schemes are allowed.
  • Hostname Whitelisting: A strict ALLOWED_DOMAINS set ensures that requests are only made to explicitly permitted external services.
  • IP Address Validation: The is_internal_ip function uses the ipaddress module to check if a resolved IP address falls into any known private, link-local, or reserved ranges. This is crucial if the input URL is an IP address directly or if DNS resolution is compromised.
  • Error Handling: More granular error handling for URL parsing and network requests.

Note on DNS Resolution: The example includes a placeholder for DNS resolution. In a production environment, directly resolving hostnames within the application can introduce its own security risks (e.g., DNS rebinding). A more robust solution might involve:

  • Using a dedicated, secure DNS resolver that performs checks.
  • Pre-resolving allowed IPs and whitelisting those instead of hostnames.
  • Implementing DNS rebinding protection mechanisms.

Post-Mitigation Verification and Monitoring

After implementing the network policies and code changes, we performed extensive re-testing. This included attempting to exploit the previously identified SSRF vectors with various internal IP addresses, hostnames resolving to internal IPs, and malformed URLs. All attempts to access internal resources or disallowed external domains were blocked, returning appropriate error codes.

Continuous monitoring was established:

  • GKE Network Policy Logs: Configured GKE to log denied egress traffic from the webhook pods. These logs were ingested into Cloud Logging for real-time alerting.
  • Application Logs: Enhanced application logging to capture any URL validation failures or network request errors, with alerts set up for high volumes of such events.
  • VPC Flow Logs: Enabled VPC Flow Logs for the GKE subnet to monitor all network traffic, allowing for detection of any unexpected outbound connections.

This comprehensive approach, combining infrastructure-level security with application-level hardening, effectively mitigated the SSRF risk in the webhook parsers and significantly improved the overall security posture of the enterprise Python stack on GCP.

Primary Sidebar

A little about the Author

Having 9+ Years of Experience in Software Development.
Expertised in Php Development, WordPress Custom Theme Development (From scratch using underscores or Genesis Framework or using any blank theme or Premium Theme), Custom Plugin Development. Hands on Experience on 3rd Party Php Extension like Chilkat, nSoftware.

Recent Posts

  • Step-by-Step: Diagnosing thread pools deadlock during concurrent ActiveRecord transaction processing on Linode Servers
  • Securing Your E-commerce APIs: Preventing SQL Injection (SQLi) in customized checkout queries in WooCommerce Implementations
  • Disaster Recovery 101: Architecting Auto-Failovers for MySQL and Ruby Deployments on Linode
  • High-Throughput Caching Strategies: Scaling MySQL for Perl Application APIs
  • Disaster Recovery 101: Architecting Auto-Failovers for DynamoDB and Laravel Deployments on DigitalOcean

Copyright © 2026 · Vinay Vengala