High-Throughput Caching Strategies: Scaling MongoDB for Shopify Application APIs
Leveraging Redis for MongoDB Read-Heavy Workloads
Shopify applications, particularly those dealing with high-traffic product catalogs, order processing, or user-generated content, often encounter read-heavy workloads against their primary data store, typically MongoDB. While MongoDB’s inherent caching mechanisms (wiredTiger cache) are robust, they can become a bottleneck under extreme read pressure. Offloading frequent, predictable read operations to a dedicated in-memory cache like Redis is a standard, highly effective scaling strategy. This post details practical implementation patterns for achieving high-throughput caching of MongoDB data.
Cache Invalidation Strategies: The Core Challenge
The primary challenge in any caching system is maintaining data consistency. For MongoDB, this translates to invalidating cached entries when the underlying document in MongoDB changes. We’ll explore several patterns, from simple TTL-based expiration to more sophisticated event-driven invalidation.
1. Time-To-Live (TTL) Expiration
The simplest approach is to set an expiration time on cached items. This is suitable for data that can tolerate a small degree of staleness or for data that changes infrequently. Redis’s `EXPIRE` command is ideal here.
Example: Caching Product Data (PHP/Redis/MongoDB)
Consider a scenario where we need to fetch product details by SKU. We’ll cache the entire product document.
PHP Implementation Snippet
<?php
require 'vendor/autoload.php'; // Assuming Predis or similar Redis client
use Predis\Client as RedisClient;
use MongoDB\Client as MongoClient;
// --- Configuration ---
$redisConfig = ['scheme' => 'tcp', 'host' => 'redis-cache.internal', 'port' => 6379];
$mongoConfig = 'mongodb://mongo-primary.internal:27017';
$mongoDatabase = 'shopify_data';
$mongoCollection = 'products';
$cacheTtlSeconds = 300; // 5 minutes
// --- Initialization ---
$redis = new RedisClient($redisConfig);
$mongo = new MongoClient($mongoConfig);
$productsCollection = $mongo->selectCollection($mongoDatabase, $mongoCollection);
// --- Function to get product by SKU ---
function getProductBySku(string $sku, RedisClient $redis, $productsCollection, int $ttl) {
$cacheKey = "product:sku:{$sku}";
// 1. Check cache first
$cachedProduct = $redis->get($cacheKey);
if ($cachedProduct) {
echo "Cache HIT for SKU: {$sku}\n";
return json_decode($cachedProduct, true);
}
echo "Cache MISS for SKU: {$sku}\n";
// 2. If not in cache, fetch from MongoDB
$product = $productsCollection->findOne(['sku' => $sku]);
if ($product) {
// 3. Store in cache with TTL
$redis->set($cacheKey, json_encode($product), 'EX', $ttl);
return $product;
}
return null; // Product not found
}
// --- Usage Example ---
$skuToFetch = 'SHP12345';
$productData = getProductBySku($skuToFetch, $redis, $productsCollection, $cacheTtlSeconds);
if ($productData) {
print_r($productData);
} else {
echo "Product with SKU {$skuToFetch} not found.\n";
}
?>
2. Cache-Aside Pattern with Explicit Invalidation
For data that requires stricter consistency, the cache-aside pattern combined with explicit invalidation upon data modification is more appropriate. When a product is updated in MongoDB, we must explicitly remove its corresponding entry from Redis.
Implementation Flow
- Read Operation: Attempt to retrieve data from Redis. If found, return it.
- Cache Miss: If not in Redis, fetch data from MongoDB.
- Populate Cache: Store the fetched data in Redis.
- Write Operation (Update/Delete): After successfully modifying data in MongoDB, immediately delete the corresponding key(s) from Redis.
Example: Updating Product Data (PHP/Redis/MongoDB)
<?php
// ... (Initialization as above) ...
function updateProduct(string $sku, array $updateData, RedisClient $redis, $productsCollection) {
$cacheKey = "product:sku:{$sku}";
// 1. Update document in MongoDB
$updateResult = $productsCollection->updateOne(
['sku' => $sku],
['$set' => $updateData]
);
if ($updateResult->getModifiedCount() > 0) {
echo "Successfully updated product in MongoDB for SKU: {$sku}\n";
// 2. Invalidate cache in Redis
$redis->del($cacheKey);
echo "Invalidated cache for key: {$cacheKey}\n";
return true;
}
echo "No product found or updated for SKU: {$sku}\n";
return false;
}
// --- Usage Example ---
$skuToUpdate = 'SHP12345';
$newData = ['price' => 199.99, 'last_modified' => new MongoDB\BSON\UTCDateTime()];
updateProduct($skuToUpdate, $newData, $redis, $productsCollection);
// Subsequent read for $skuToUpdate will result in a cache miss and refetch from MongoDB.
?>
3. Event-Driven Invalidation via Change Streams or Message Queues
For highly distributed systems or when direct application-level invalidation is complex (e.g., multiple services updating the same data), an event-driven approach is superior. MongoDB’s Change Streams or an external message queue (like Kafka or RabbitMQ) can publish events when data changes. A dedicated cache invalidation service then consumes these events and purges Redis entries.
Scenario: MongoDB Change Streams
A separate process monitors MongoDB’s change stream for the `products` collection. When an update or delete operation occurs, it publishes a message. A cache worker consumes this message and invalidates the relevant Redis keys.
Change Stream Listener (Python Example)
from pymongo import MongoClient
from redis import Redis
import json
import os
# --- Configuration ---
MONGO_URI = os.environ.get("MONGO_URI", "mongodb://mongo-primary.internal:27017/")
MONGO_DB = os.environ.get("MONGO_DB", "shopify_data")
MONGO_COLLECTION = os.environ.get("MONGO_COLLECTION", "products")
REDIS_HOST = os.environ.get("REDIS_HOST", "redis-cache.internal")
REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))
# --- Initialization ---
mongo_client = MongoClient(MONGO_URI)
db = mongo_client[MONGO_DB]
collection = db[MONGO_COLLECTION]
redis_client = Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
# --- Change Stream Logic ---
def listen_for_changes():
pipeline = [
{
'$match': {
'operationType': {'$in': ['update', 'replace', 'delete']}
}
}
]
with collection.watch(pipeline) as stream:
print("Listening for MongoDB changes...")
for change in stream:
print(f"Change detected: {change['operationType']} on document ID {change['documentKey']['_id']}")
# Determine cache key(s) to invalidate
cache_keys_to_invalidate = []
if change['operationType'] == 'delete':
# For delete, we might not have the full document, rely on known identifiers
# This assumes SKU is available in documentKey or fullDocument before delete
# In a real scenario, you might need to query for SKU if not present
if 'sku' in change.get('documentKey', {}): # Example: if SKU is part of documentKey
cache_keys_to_invalidate.append(f"product:sku:{change['documentKey']['sku']}")
# If SKU is not in documentKey, you might need to fetch the document *before* delete
# or rely on a different event payload.
elif change['operationType'] in ['update', 'replace']:
document_id = change['documentKey']['_id']
full_document = change.get('fullDocument')
if full_document and 'sku' in full_document:
cache_keys_to_invalidate.append(f"product:sku:{full_document['sku']}")
# Add other potential cache keys if applicable (e.g., by product_id)
# cache_keys_to_invalidate.append(f"product:id:{document_id}")
# Invalidate cache
if cache_keys_to_invalidate:
print(f"Invalidating cache for keys: {', '.join(cache_keys_to_invalidate)}")
redis_client.delete(*cache_keys_to_invalidate)
else:
print("No cache keys identified for invalidation.")
if __name__ == "__main__":
try:
listen_for_changes()
except Exception as e:
print(f"An error occurred: {e}")
# Implement robust error handling and retry mechanisms
Scenario: Message Queue (e.g., Kafka)
When a product is updated via an API endpoint, the service performing the update publishes a message to a Kafka topic (e.g., `product-updates`). A separate microservice or worker consumes from this topic and invalidates Redis.
Producer (Example – Node.js/Kafka.js)
// Assuming Kafka producer is already configured and connected
const { Kafka } = require('kafkajs');
const kafka = new Kafka({ brokers: ['kafka-broker1:9092', 'kafka-broker2:9092'] });
const producer = kafka.producer();
const topic = 'product-updates';
async function publishProductUpdate(product) {
await producer.connect();
const message = {
key: product.sku, // Use SKU as key for partitioning/ordering
value: JSON.stringify({
sku: product.sku,
productId: product._id.toString(), // MongoDB ObjectId
operation: 'update', // or 'delete'
timestamp: Date.now()
})
};
await producer.send({ topic, messages: [message] });
console.log(`Published product update for SKU: ${product.sku}`);
await producer.disconnect();
}
// --- Usage within your MongoDB update logic ---
// After successful MongoDB update:
// const updatedProduct = { sku: 'SHP12345', _id: new ObjectId('...'), price: 199.99 };
// publishProductUpdate(updatedProduct);
Consumer (Example – Python/Kafka-Python)
from kafka import KafkaConsumer
from redis import Redis
import json
import os
# --- Configuration ---
KAFKA_BROKERS = os.environ.get("KAFKA_BROKERS", "kafka-broker1:9092,kafka-broker2:9092").split(',')
KAFKA_TOPIC = os.environ.get("KAFKA_TOPIC", "product-updates")
REDIS_HOST = os.environ.get("REDIS_HOST", "redis-cache.internal")
REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))
# --- Initialization ---
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=KAFKA_BROKERS,
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='cache-invalidator-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
redis_client = Redis(host=REDIS_HOST, port=REDIS_PORT)
print(f"Consuming from Kafka topic: {KAFKA_TOPIC}")
# --- Consumption Loop ---
for message in consumer:
event_data = message.value
print(f"Received event: {event_data}")
cache_keys_to_invalidate = []
if event_data.get('sku'):
cache_keys_to_invalidate.append(f"product:sku:{event_data['sku']}")
# Add other potential cache keys based on event_data structure
# if event_data.get('productId'):
# cache_keys_to_invalidate.append(f"product:id:{event_data['productId']}")
if cache_keys_to_invalidate:
print(f"Invalidating cache for keys: {', '.join(cache_keys_to_invalidate)}")
redis_client.delete(*cache_keys_to_invalidate)
else:
print("No cache keys identified for invalidation.")
Optimizing Redis for High Throughput
Beyond cache invalidation, the performance of Redis itself is critical. For high-throughput scenarios, consider these configurations and practices:
1. Redis Persistence and Memory Management
For caching, persistence (RDB snapshots, AOF) is often disabled or configured minimally to prioritize speed and avoid disk I/O. However, this means the cache is lost on restart. If cache persistence is required, consider Redis Enterprise’s Active-Active Geo-Distribution or Redis Cluster with appropriate persistence settings.
`redis.conf` Snippets
# Disable RDB snapshots for pure cache scenarios save "" # Optionally enable AOF for durability if needed, but can impact write performance # appendonly no # Set a max memory limit and eviction policy # Example: Evict least recently used keys when memory limit is reached maxmemory 10gb maxmemory-policy allkeys-lru
2. Network Latency and Connection Pooling
Minimize network hops between your application servers and Redis. Utilize connection pooling in your application’s Redis client library to avoid the overhead of establishing new TCP connections for every request. Most modern clients (Predis, Jedis, etc.) support this.
3. Data Serialization Format
JSON is human-readable and widely supported but can be verbose. For maximum performance, consider binary serialization formats like MessagePack or Protocol Buffers if your application ecosystem supports them. This reduces network bandwidth and Redis memory footprint.
Example: MessagePack Serialization (PHP)
<?php
// Assuming igbinary or msgpack extension is installed
// --- Storing ---
$productData = ['sku' => 'SHP12345', 'price' => 199.99];
$serializedData = msgpack_pack($productData); // Or igbinary_serialize($productData);
$redis->set($cacheKey, $serializedData, 'EX', $ttl);
// --- Retrieving ---
$cachedData = $redis->get($cacheKey);
if ($cachedData) {
$productData = msgpack_unpack($cachedData); // Or igbinary_unserialize($cachedData);
// ... use $productData
}
?>
4. Redis Cluster for Scalability and High Availability
For very large datasets or extreme throughput requirements, a single Redis instance may not suffice. Redis Cluster provides sharding (distributing keys across multiple nodes) and failover capabilities. Ensure your application client is compatible with Redis Cluster and handles key slot distribution correctly.
Example: Redis Cluster Connection (Python)
from rediscluster import RedisCluster
# --- Configuration ---
# List of startup nodes (host:port)
startup_nodes = [
{"host": "redis-cluster-node1.internal", "port": 7000},
{"host": "redis-cluster-node2.internal", "port": 7001},
# ... more nodes
]
# --- Initialization ---
# Strict mode ensures that commands that might not work in cluster mode are rejected
rc = RedisCluster(startup_nodes=startup_nodes, decode_responses=True, skip_full_coverage_check=True)
# --- Usage (same as single instance for most commands) ---
cache_key = "product:sku:SHP12345"
product_data = {"sku": "SHP12345", "price": 199.99}
ttl_seconds = 300
rc.set(cache_key, json.dumps(product_data), ex=ttl_seconds)
retrieved_data_str = rc.get(cache_key)
if retrieved_data_str:
retrieved_data = json.loads(retrieved_data_str)
print(retrieved_data)
rc.delete(cache_key)
Monitoring and Performance Tuning
Continuous monitoring is essential. Key metrics to track include:
- Redis: `redis-cli info memory`, `redis-cli info stats` (keyspace hits/misses), `redis-cli slowlog get 10`.
- Application: Cache hit ratio, latency of cache operations vs. database operations.
- MongoDB: Query performance, WiredTiger cache usage, network I/O.
If cache hit ratios are low, re-evaluate the data being cached and the TTLs. If Redis latency is high, investigate network issues, Redis instance sizing, or consider Redis Sentinel/Cluster for HA and scalability. If MongoDB read performance degrades despite caching, ensure the cache invalidation is working correctly and that the remaining MongoDB reads are optimized (indexing, query structure).