• Skip to secondary menu
  • Skip to main content
  • Skip to primary sidebar
  • Home
  • Projects
  • Products
  • Themes
  • Tools
  • Request for Quote

Vengala Vinay

Having 12+ Years of Experience in Software Development

  • Home
  • WordPress
  • PHP
    • Codeigniter
  • Django
  • Magento
  • Selenium
  • Server
Home » Step-by-Step Guide: Offloading high-frequency affiliate click tracking logs metadata writes to a Redis KV store

Step-by-Step Guide: Offloading high-frequency affiliate click tracking logs metadata writes to a Redis KV store

Problem Statement: The Click Log Bottleneck

Enterprise-scale affiliate marketing platforms often generate an overwhelming volume of click-tracking events. Each click represents a potential conversion, a revenue opportunity, and a data point for optimization. However, the sheer velocity of these events—potentially millions per hour—can saturate traditional relational database write operations. Performing synchronous writes to a MySQL or PostgreSQL table for every single click event introduces significant latency, leading to dropped clicks, increased server load, and a degraded user experience for affiliates and advertisers alike. This bottleneck directly impacts the platform’s ability to accurately track and attribute revenue.

Solution: Asynchronous Offloading to Redis

To address this, we can implement an asynchronous, high-throughput logging mechanism by offloading the initial metadata capture of click events to an in-memory data store like Redis. Redis, with its sub-millisecond latency for key-value operations, is ideally suited for this task. The strategy involves a lightweight web service (e.g., a PHP or Python microservice) that receives the click event, extracts essential metadata, and immediately pushes this data into a Redis sorted set or a simple key-value pair with a timestamp. A separate, background worker process then periodically flushes these batched Redis entries to a persistent, long-term storage solution (like a data warehouse or a relational database) for analytics and reporting.

Architecture Overview

The proposed architecture consists of three primary components:

  • Click Ingestion Service: A stateless web service (e.g., Nginx + PHP-FPM or a Python Flask/FastAPI app) responsible for receiving HTTP GET/POST requests containing click metadata. Its sole job is to validate minimal data and push it to Redis.
  • Redis Cluster: A horizontally scalable Redis cluster acting as a high-speed buffer. We’ll use sorted sets (ZSETs) to store click events, with the timestamp as the score, allowing for easy retrieval and ordering.
  • Batching Worker: A background daemon (e.g., a Python script using Celery or a simple cron job) that periodically reads data from Redis, aggregates it, and writes it to the primary data store.

Implementation: Click Ingestion Service (PHP Example)

This PHP script acts as the entry point for click events. It’s designed to be extremely fast, minimizing processing time before handing off to Redis.

<?php
// config.php
define('REDIS_HOST', 'redis-master.internal');
define('REDIS_PORT', 6379);
define('REDIS_DB', 0);
define('REDIS_KEY_PREFIX', 'click_log:');
define('MAX_CLICK_METADATA_SIZE', 1024); // Limit metadata size to prevent abuse

// Autoloader for Predis (or your preferred Redis client)
require 'vendor/autoload.php';

// Basic input validation and sanitization
function sanitize_input($data) {
    $data = trim($data);
    $data = stripslashes($data);
    $data = htmlspecialchars($data, ENT_QUOTES, 'UTF-8');
    return $data;
}

// Main click processing logic
header('Content-Type: application/json');
$response = ['status' => 'error', 'message' => ''];

try {
    // Assume click data comes via GET parameters for simplicity,
    // but POST is generally preferred for larger payloads.
    $affiliate_id = isset($_GET['aid']) ? sanitize_input($_GET['aid']) : null;
    $campaign_id = isset($_GET['cid']) ? sanitize_input($_GET['cid']) : null;
    $creative_id = isset($_GET['crid']) ? sanitize_input($_GET['crid']) : null;
    $user_ip = $_SERVER['REMOTE_ADDR'] ?? 'unknown'; // Capture IP directly
    $user_agent = $_SERVER['HTTP_USER_AGENT'] ?? 'unknown';
    $timestamp_ms = round(microtime(true) * 1000); // Milliseconds for higher precision

    // Minimal validation
    if (!$affiliate_id || !$campaign_id) {
        throw new InvalidArgumentException("Missing required parameters: affiliate_id (aid) and campaign_id (cid).");
    }

    // Construct metadata payload
    $metadata = [
        'affiliate_id' => $affiliate_id,
        'campaign_id' => $campaign_id,
        'creative_id' => $creative_id,
        'ip' => $user_ip,
        'ua' => $user_agent,
        'ts_ms' => $timestamp_ms,
        // Add any other essential, low-cardinality metadata here
    ];

    // Limit the total size of metadata to prevent excessive memory usage in Redis
    $metadata_json = json_encode($metadata);
    if (strlen($metadata_json) > MAX_CLICK_METADATA_SIZE) {
        throw new RuntimeException("Metadata payload exceeds maximum allowed size.");
    }

    // Connect to Redis
    $redis = new Redis();
    $redis->connect(REDIS_HOST, REDIS_PORT);
    $redis->select(REDIS_DB);

    // Use a sorted set: key is 'click_log:YYYY-MM-DD', score is timestamp_ms, member is JSON payload
    // This allows for easy time-based retrieval and deduplication if needed (though less common for clicks)
    $date_key = REDIS_KEY_PREFIX . date('Y-m-d');
    $redis->zAdd($date_key, $timestamp_ms, $metadata_json);

    // Set an expiration on the Redis key to prevent indefinite growth if the worker fails
    // Adjust TTL based on your batching interval + buffer
    $redis->expire($date_key, 3600 * 24 * 2); // 2 days TTL

    $response['status'] = 'success';
    $response['message'] = 'Click logged successfully.';

} catch (InvalidArgumentException $e) {
    $response['message'] = $e->getMessage();
    http_response_code(400); // Bad Request
} catch (RedisException $e) {
    $response['message'] = "Redis connection or operation failed: " . $e->getMessage();
    // Log this error for monitoring
    error_log("Redis Error: " . $e->getMessage());
    http_response_code(503); // Service Unavailable
} catch (RuntimeException $e) {
    $response['message'] = $e->getMessage();
    http_response_code(413); // Payload Too Large
} catch (Exception $e) {
    $response['message'] = "An unexpected error occurred: " . $e->getMessage();
    error_log("Unexpected Error: " . $e->getMessage());
    http_response_code(500); // Internal Server Error
}

echo json_encode($response);
?>

Nginx Configuration Snippet for PHP-FPM:

server {
    listen 80;
    server_name click.yourdomain.com;
    root /var/www/html/click_tracker; # Directory containing index.php and vendor

    location / {
        index index.php;
        try_files $uri $uri/ /index.php?$query_string;
    }

    location ~ \.php$ {
        include snippets/fastcgi-php.conf;
        fastcgi_pass unix:/var/run/php/php7.4-fpm.sock; # Adjust PHP version as needed
        fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
        include fastcgi_params;
    }

    # Optional: Add rate limiting to prevent abuse
    # limit_req zone=click_rate_limit burst=100 nodelay;
}

# Example rate limiting zone (in nginx.conf)
# limit_req_zone $binary_remote_addr zone=click_rate_limit:10m rate=100r/s;

Implementation: Batching Worker (Python Example)

This Python script runs periodically (e.g., via cron or a task scheduler like Celery) to pull data from Redis and persist it. It aggregates clicks into batches for more efficient database writes.

import redis
import json
import time
import datetime
import logging
from collections import defaultdict
# Assume you have a database connector, e.g., psycopg2 for PostgreSQL or mysql.connector
# import psycopg2

# --- Configuration ---
REDIS_HOST = 'redis-master.internal'
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_KEY_PREFIX = 'click_log:'
BATCH_SIZE = 5000  # Number of clicks to process per batch
FLUSH_INTERVAL_SECONDS = 60 # How often to run the flush process
MAX_RETRIES = 3
RETRY_DELAY_SECONDS = 5

# Database connection details (replace with your actual DB config)
DB_CONFIG = {
    'dbname': 'analytics_db',
    'user': 'analytics_user',
    'password': 'your_db_password',
    'host': 'db.internal',
    'port': 5432,
}

# --- Logging Setup ---
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')

# --- Redis Connection ---
def get_redis_connection():
    try:
        r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, decode_responses=True)
        r.ping()
        logging.info("Successfully connected to Redis.")
        return r
    except redis.exceptions.ConnectionError as e:
        logging.error(f"Failed to connect to Redis: {e}")
        return None

# --- Database Connection ---
def get_db_connection():
    try:
        # Replace with your actual DB connection logic
        # conn = psycopg2.connect(**DB_CONFIG)
        # logging.info("Successfully connected to the database.")
        # return conn
        logging.warning("Database connection placeholder. Implement actual DB connection.")
        return None # Placeholder
    except Exception as e:
        logging.error(f"Failed to connect to database: {e}")
        return None

# --- Data Processing ---
def process_click_batch(redis_conn, db_conn):
    today_str = datetime.date.today().strftime('%Y-%m-%d')
    redis_key = f"{REDIS_KEY_PREFIX}{today_str}"
    
    clicks_processed_today = 0
    total_clicks_processed = 0
    
    # Use ZRANGEBYSCORE to get all elements for today, ordered by score (timestamp)
    # We fetch all for the day, assuming the TTL on Redis keys is managed.
    # If keys can grow indefinitely, you might need to paginate or use ZREMRANGEBYSCORE
    # to remove processed items.
    
    try:
        # Fetch all members from the sorted set for today
        # Note: For extremely large datasets within a single day, consider ZSCAN or paginated ZRANGEBYSCORE
        # to avoid loading everything into memory at once.
        click_data_json = redis_conn.zrange(redis_key, 0, -1)
        
        if not click_data_json:
            logging.info(f"No click data found in Redis key: {redis_key}")
            return 0

        logging.info(f"Found {len(click_data_json)} clicks in Redis key {redis_key}.")

        # Parse JSON and prepare for batch insert
        parsed_clicks = []
        for item_json in click_data_json:
            try:
                click_data = json.loads(item_json)
                # Add any transformations or enrichments needed before DB insert
                # e.g., converting timestamp to datetime object
                click_data['event_timestamp'] = datetime.datetime.fromtimestamp(click_data['ts_ms'] / 1000.0)
                parsed_clicks.append(click_data)
            except json.JSONDecodeError:
                logging.warning(f"Failed to decode JSON: {item_json}")
            except Exception as e:
                logging.warning(f"Error processing click data: {item_json} - {e}")

        if not parsed_clicks:
            logging.warning("No valid clicks parsed after JSON decoding.")
            return 0

        # --- Batch Database Insertion ---
        # This is a critical section. Implement robust batch insertion.
        # Example using a hypothetical bulk insert function:
        
        # Placeholder for actual DB insertion logic
        logging.info(f"Preparing to insert {len(parsed_clicks)} parsed clicks into the database.")
        # For PostgreSQL with psycopg2, you'd use executemany or COPY FROM
        # For MySQL, use multi-value INSERT statements or LOAD DATA INFILE
        
        # Simulate DB insertion success
        # db_cursor.executemany("INSERT INTO affiliate_clicks (affiliate_id, campaign_id, creative_id, ip_address, user_agent, event_time) VALUES (%s, %s, %s, %s, %s, %s)",
        #                       [(c['affiliate_id'], c['campaign_id'], c.get('creative_id'), c['ip'], c['ua'], c['event_timestamp']) for c in parsed_clicks])
        # db_conn.commit()
        
        # If insertion is successful, remove processed data from Redis
        # IMPORTANT: Only remove if the DB write was successful. Use transactions if possible.
        # For Redis ZSETs, removing by value can be tricky if values are not unique.
        # A safer approach is to use a separate key to track processed items or delete the entire key
        # if all items within it are guaranteed to be processed.
        
        # Option 1: Delete the entire key if all items are processed (simplest if TTL is managed)
        redis_conn.delete(redis_key)
        logging.info(f"Successfully processed and deleted Redis key: {redis_key}")
        
        # Option 2: Remove individual members (more complex if values aren't unique)
        # for item_json in click_data_json:
        #     redis_conn.zrem(redis_key, item_json)
        
        total_clicks_processed = len(parsed_clicks)
        logging.info(f"Successfully inserted {total_clicks_processed} clicks into the database.")

    except redis.exceptions.RedisError as e:
        logging.error(f"Redis error during processing key {redis_key}: {e}")
        # Implement retry logic here if needed
    except Exception as e: # Catch-all for DB errors or other issues
        logging.error(f"Error during batch processing or DB insertion for key {redis_key}: {e}")
        # Implement retry logic and potentially move data to a dead-letter queue
        
    return total_clicks_processed

def main():
    redis_conn = get_redis_connection()
    if not redis_conn:
        logging.critical("Exiting due to Redis connection failure.")
        return

    # db_conn = get_db_connection()
    # if not db_conn:
    #     logging.critical("Exiting due to database connection failure.")
    #     return

    logging.info("Starting click log batch processing worker.")
    
    while True:
        start_time = time.time()
        
        # Process clicks for the current day
        processed_count = process_click_batch(redis_conn, None) # Pass db_conn when implemented
        
        # Add logic here to process older keys if necessary (e.g., if worker was down)
        # This would involve iterating through previous days' keys.
        
        elapsed_time = time.time() - start_time
        logging.info(f"Batch processing cycle completed. Processed {processed_count} clicks in {elapsed_time:.2f} seconds.")
        
        # Wait for the next interval
        sleep_duration = max(0, FLUSH_INTERVAL_SECONDS - elapsed_time)
        time.sleep(sleep_duration)

if __name__ == "__main__":
    # Example of running this script via cron:
    # */5 * * * * /usr/bin/python3 /path/to/your/worker.py >> /var/log/click_worker.log 2>&1
    # Or using Celery:
    # @app.task
    # def flush_clicks_task():
    #     redis_conn = get_redis_connection()
    #     # db_conn = get_db_connection()
    #     if redis_conn:
    #         process_click_batch(redis_conn, None) # Pass db_conn when implemented
    main()

Redis Data Structure Choice: Sorted Sets (ZSETs)

We are using Redis Sorted Sets (ZSETs) for storing the click metadata. The structure is:

  • Key: `click_log:YYYY-MM-DD` (e.g., `click_log:2023-10-27`). This partitions data by day, making it easier to manage and potentially archive.
  • Score: The millisecond timestamp (`timestamp_ms`) of the click event. This allows Redis to automatically order the elements, which is useful for processing clicks in chronological order.
  • Member: The JSON string representing the click event’s metadata.

Advantages of ZSETs here:

  • Ordered Data: Naturally keeps clicks sorted by time.
  • Efficient Retrieval: `ZRANGEBYSCORE` or `ZREVRANGEBYSCORE` can retrieve clicks within a time range efficiently.
  • Atomic Operations: `ZADD` is atomic.
  • Memory Efficiency: Compared to storing individual keys for each click, ZSETs can be more memory-efficient for large numbers of small items.

Considerations:

  • Memory Usage: If a single day’s clicks are exceptionally high (billions), the ZSET for that day could consume significant memory. Redis’s TTL mechanism on the key (`EXPIRE`) is crucial to prevent indefinite memory growth. The batching worker must reliably process and remove data before the TTL expires.
  • Data Loss Risk: If the batching worker fails and cannot process data before the TTL expires, data for that period will be lost. Implement robust monitoring and potentially a mechanism to re-process older data.

Scaling and High Availability

Redis Scaling:

  • Sharding: For extremely high write volumes, deploy Redis Cluster for automatic sharding across multiple nodes. The client library (Predis or `redis-py`) needs to be cluster-aware.
  • Replication: Use Redis Sentinel or Redis Cluster for high availability. The ingestion service should ideally connect to a primary node, while the batching worker could potentially read from replicas if latency permits and consistency requirements allow. However, for guaranteed data integrity, the worker should read from the primary or a dedicated read replica.

Ingestion Service Scaling:

  • Deploy multiple instances of the PHP/Python ingestion service behind a load balancer (e.g., HAProxy, AWS ELB). Ensure the load balancer distributes traffic evenly.
  • The service is stateless, making horizontal scaling straightforward.

Batching Worker Scaling:

  • Run multiple instances of the worker script. If using a task queue like Celery, scale the number of worker processes.
  • Ensure that multiple workers do not attempt to process the same Redis key concurrently. This can be achieved by having workers claim keys (e.g., using Redis `SETNX` or a distributed lock) or by processing distinct keys (e.g., workers assigned specific days or date ranges). For the daily key approach, ensure only one worker processes the current day’s key at a time.

Monitoring and Alerting

Crucial metrics to monitor include:

  • Redis Latency: PING time, ZADD latency.
  • Redis Memory Usage: Track overall memory and per-key usage.
  • Ingestion Service Throughput: Requests per second (RPS) handled by the ingestion service.
  • Ingestion Service Error Rate: Monitor 4xx and 5xx HTTP responses.
  • Batching Worker Lag: How far behind the worker is from processing the latest data. Check the age of the data being processed.
  • Database Write Performance: Monitor insert latency and throughput in the target database.
  • Redis TTL Expirations: Alert if keys are expiring before being processed.

Implement alerts for high latency, high error rates, low throughput on the ingestion service, and significant processing lag on the worker.

Conclusion

By offloading high-frequency click tracking writes to Redis, we decouple the immediate ingestion of events from the slower, persistent storage process. This architecture significantly improves the performance and scalability of the click tracking system, allowing it to handle massive volumes of traffic without becoming a bottleneck. The use of asynchronous processing and a robust in-memory buffer like Redis is a proven pattern for handling high-velocity data streams in enterprise applications.

Primary Sidebar

A little about the Author

Having 12+ Years of Experience in Software Development, Vinay is a principal software architect, senior systems engineer, and elite technical consultant. He specializes in bespoke PHP/WordPress development, high-performance Magento 2 & Shopify architectures, custom plugin/theme development from scratch, and legacy code modernization (including VB6, VB.NET, PyQt, and Crystal Reports). Known for solving complex database bottlenecks, speed optimization (Core Web Vitals), and advanced security code auditing, Vinay engineers production-ready systems designed to scale under heavy concurrent load conditions.



Chat on WhatsApp

Recent Posts

  • How to design a modular Domain-driven architecture (DDD) blocks architecture for enterprise-level custom plugins
  • Step-by-Step Guide to building a custom real-time audit dashboard block for Gutenberg using Svelte standalone templates
  • Designing audit logs for enterprise WordPress setups tracking internal user modifications to custom product catalogs
  • Troubleshooting broken WP-Cron schedules in production when using modern Classic Core PHP wrappers
  • WordPress Development Recipe: Leveraging PHP 8.x Attributes to build type-safe, auto-wired hooks

Categories

  • apache (1)
  • Business & Monetization (390)
  • Centos (4)
  • Comparisons & Decision Making (55)
  • Debian (2)
  • Debugging & Troubleshooting (658)
  • Desktop Applications (14)
  • DevOps (7)
  • DevOps & Cloud Scaling (962)
  • Django (1)
  • Laravel (4)
  • Migration & Architecture (192)
  • Mobile Applications (24)
  • MySQL (1)
  • Performance & Optimization (872)
  • PHP (5)
  • PHP Development (43)
  • Plugins & Themes (244)
  • Programming Languages (9)
  • Python (20)
  • Ruby on Rails (1)
  • Security & Compliance (639)
  • SEO & Growth (492)
  • Server (23)
  • Ubuntu (9)
  • VB6 & VB.NET (8)
  • Web Applications & Frontend (19)
  • Web Assembly (Wasm) (2)
  • WordPress (22)
  • WordPress Plugin Development (136)
  • WordPress Plugin Development (149)
  • WordPress Plugin Development (330)
  • WordPress Theme Development (357)

Recent Posts

  • How to design a modular Domain-driven architecture (DDD) blocks architecture for enterprise-level custom plugins
  • Step-by-Step Guide to building a custom real-time audit dashboard block for Gutenberg using Svelte standalone templates
  • Designing audit logs for enterprise WordPress setups tracking internal user modifications to custom product catalogs

Top Categories

  • DevOps & Cloud Scaling (962)
  • Performance & Optimization (872)
  • Debugging & Troubleshooting (658)
  • Security & Compliance (639)
  • SEO & Growth (492)
  • Business & Monetization (390)

Our Products

  • ERP & LMS Systems (4)
  • Directories & Marketplaces (4)
  • Healthcare Portals (3)
  • Point of Sale (POS) (2)
  • E-Commerce Engines (2)

Our Services

  • E-Commerce Development (10)
  • WordPress Development (8)
  • Python & Desktop GUI (7)
  • General Consulting (7)
  • Legacy Modernization (5)
  • Mobile App Development (4)

Copyright © 2026 · Vinay Vengala