Step-by-Step: Diagnosing Memory leaks in long-running Python Celery worker daemons on AWS Servers
Identifying the Problem: Gradual Memory Increase in Celery Workers
A common symptom of memory leaks in long-running Python processes, particularly Celery workers, is a consistent, albeit slow, increase in memory consumption over time. This isn’t a sudden spike but a gradual creep that eventually leads to the process being OOM-killed by the operating system or the cloud provider’s infrastructure. On AWS, this often manifests as EC2 instances becoming unresponsive or Elastic Beanstalk worker environments restarting unexpectedly.
The challenge with diagnosing these leaks is that they often require observing the system over an extended period, correlating memory usage with specific task executions, and pinpointing the exact code paths responsible for holding onto memory. Standard heap dumps might not always be sufficient if the leak is related to external resources or complex object graphs that are not garbage collected as expected.
Initial Monitoring and Data Collection
Before diving into code, establish a baseline and collect data. On your AWS instances (e.g., EC2, ECS tasks), use system monitoring tools to track the memory usage of your Celery worker processes. CloudWatch Agent is a good starting point for EC2, or you can leverage container-specific metrics for ECS.
A simple shell script can periodically poll the memory usage of your Celery worker PIDs. We’ll assume your Celery workers are managed by `systemd` or a similar process manager, and you can identify their PIDs.
Script for Periodic Memory Polling
This script will log the Resident Set Size (RSS) of a given process ID (PID) to a file. Run this script as a cron job or a background process.
#!/bin/bash
# Configuration
TARGET_PID="$1"
OUTPUT_FILE="/var/log/celery_worker_memory.log"
TIMESTAMP=$(date +"%Y-%m-%d %H:%M:%S")
if [ -z "$TARGET_PID" ]; then
echo "Usage: $0 "
exit 1
fi
if ! ps -p $TARGET_PID > /dev/null; then
echo "$TIMESTAMP - PID $TARGET_PID not found. Exiting." >> $OUTPUT_FILE
exit 1
fi
# Get RSS in KB
RSS_KB=$(ps -p $TARGET_PID -o rss= | tr -d ' ')
echo "$TIMESTAMP - PID: $TARGET_PID, RSS (KB): $RSS_KB" >> $OUTPUT_FILE
To use this, first find your Celery worker PID. If using `systemd`, you can often find it with:
pgrep -f "celery worker -A your_app_name"
Then, run the polling script:
# Assuming PID is 12345 ./monitor_memory.sh 12345 >> /var/log/celery_worker_memory.log 2>&1
Schedule this script to run every minute using cron:
# crontab -e * * * * * /path/to/your/monitor_memory.sh>> /var/log/celery_worker_memory.log 2>&1
After a day or two, analyze /var/log/celery_worker_memory.log. Look for a consistent upward trend in the RSS values. If you see this, you have a leak.
Profiling Memory Usage with objgraph
Once a leak is confirmed, the next step is to identify what objects are accumulating. The objgraph library is invaluable for this. It allows you to visualize object references and count instances of specific types.
Integrating objgraph into Celery Tasks
The most effective way to use objgraph for long-running processes is to trigger snapshots at specific points, ideally when you suspect memory might be accumulating. You can add conditional logic to your Celery tasks or use a separate monitoring task.
First, install objgraph:
pip install objgraph
Now, let’s modify a Celery task to take an object graph snapshot under certain conditions. For instance, if a task processes a large number of items, we might want to snapshot after processing a batch.
import objgraph
import time
import os
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
# Global flag to enable/disable profiling
ENABLE_PROFILING = os.environ.get('ENABLE_CELERY_PROFILING', 'false').lower() == 'true'
PROFILE_SNAPSHOT_DIR = "/tmp/celery_profiles"
if ENABLE_PROFILING and not os.path.exists(PROFILE_PROFILING_DIR):
os.makedirs(PROFILE_PROFILING_DIR)
@app.task
def process_large_dataset(data_ids):
processed_count = 0
total_items = len(data_ids)
snapshot_interval = 100 # Take snapshot every 100 items processed
for item_id in data_ids:
# Simulate processing
time.sleep(0.01)
processed_count += 1
if ENABLE_PROFILING and processed_count % snapshot_interval == 0:
snapshot_filename = os.path.join(
PROFILE_SNAPSHOT_DIR,
f"snapshot_task_{self.request.id}_batch_{processed_count}.obj"
)
print(f"Taking objgraph snapshot: {snapshot_filename}")
objgraph.show_growth(
filename=snapshot_filename,
# Limit to top N objects to keep file size manageable
limit=20
)
# Optional: Clean up old snapshots if disk space is a concern
# objgraph.cleanup_snapshots(PROFILE_SNAPSHOT_DIR, max_age_days=1)
print(f"Finished processing {total_items} items.")
return f"Processed {total_items} items."
@app.task
def trigger_full_heap_snapshot():
if ENABLE_PROFILING:
snapshot_filename = os.path.join(
PROFILE_SNAPSHOT_DIR,
f"full_heap_snapshot_task_{self.request.id}_{int(time.time())}.obj"
)
print(f"Taking full heap snapshot: {snapshot_filename}")
objgraph.show_most_common_types(
filename=snapshot_filename,
limit=50 # Capture more types for a full heap
)
return f"Full heap snapshot saved to {snapshot_filename}"
return "Profiling not enabled."
if __name__ == '__main__':
# Example of how to run this locally for testing
# Ensure Redis is running on localhost:6379
# Run worker: celery -A your_module_name worker -l info -P eventlet -c 10
# To enable profiling, set environment variable:
# ENABLE_CELERY_PROFILING=true celery -A your_module_name worker -l info -P eventlet -c 10
pass
To enable profiling, you would start your Celery worker with the environment variable set:
ENABLE_CELERY_PROFILING=true celery -A your_app_name worker -l info -P eventlet -c 10
When the process_large_dataset task runs and hits the snapshot interval, or when trigger_full_heap_snapshot is called, objgraph will save files to /tmp/celery_profiles. These files contain information about object growth or the most common types.
Analyzing objgraph Output
The objgraph.show_growth() output is particularly useful. It shows which object types have increased the most in count between snapshots. Look for types that are not expected to grow indefinitely.
Example output snippet from an objgraph.show_growth() file:
... dict: 1500000 +100000 list: 800000 +50000 str: 1200000 +75000 MyCustomObject: 50000 +45000 <-- Suspicious! <module 'some_library' from '/path/to/some_library/__init__.py'>: 10 +10 ...
In this example, MyCustomObject is growing rapidly. You would then investigate the code that creates and manages instances of MyCustomObject.
You can also use objgraph.by_type('MyCustomObject') to get a list of all instances and inspect their references.
import objgraph
# Assuming you have a running process where MyCustomObject instances might be leaking
# This would typically be run in an interactive Python session connected to the process
# or by adding this code to a diagnostic task.
leaking_objects = objgraph.by_type('MyCustomObject')
print(f"Found {len(leaking_objects)} instances of MyCustomObject.")
if leaking_objects:
# Show references for the first leaking object
objgraph.show_refs([leaking_objects[0]], filename='my_custom_object_refs.png', max_depth=5)
# Show who is referencing the first leaking object
objgraph.show_backrefs([leaking_objects[0]], filename='my_custom_object_backrefs.png', max_depth=5)
Generating these reference graphs (.png files) can be very insightful. You’ll need Graphviz installed on the machine where you run this analysis (or where the snapshots were generated if you’re analyzing remotely).
Common Pitfalls and Solutions
- Unclosed Resources: File handles, network sockets, database connections that are opened but never closed can lead to resource leaks that manifest as memory growth. Ensure all resources are properly managed using context managers (
withstatements) or explicit close calls. - Circular References with `__del__` Methods: While Python’s garbage collector is good, complex circular references, especially those involving objects with custom
__del__methods, can sometimes be problematic. - Caching Without Limits: In-memory caches that grow indefinitely without a size limit or eviction policy are a prime suspect. Ensure your caches have mechanisms to prune old or less-used items.
- Global Variables and Long-Lived Objects: Objects stored in global variables or long-lived data structures (like class variables in frequently instantiated classes) that are no longer needed but not explicitly cleared can cause leaks.
- Third-Party Libraries: Sometimes, the leak might be within a third-party library.
objgraphcan help identify if the growing objects are from such libraries. If so, check for library updates or report the issue. - Celery Result Backend: If you’re storing task results and not cleaning them up, the result backend (e.g., Redis, database) can grow. Ensure you have a strategy for result expiration.
- Task State Accumulation: Certain frameworks or custom logic might inadvertently store task state or intermediate results in a way that persists beyond the task’s execution.
Example: Fixing a Cache Leak
Suppose objgraph reveals a large number of MyCacheEntry objects. You might find code like this:
# Potentially leaky cache implementation
_my_app_cache = {}
def get_from_cache(key):
if key in _my_app_cache:
return _my_app_cache[key]
# ... fetch data ...
data = fetch_expensive_data(key)
_my_app_cache[key] = data # No limit, no expiration
return data
The fix involves adding a size limit and/or an expiration mechanism. Using Python’s `functools.lru_cache` or a dedicated caching library like `cachetools` is often a better approach.
from functools import lru_cache
import time
# Using LRU cache with a max size and a TTL (simulated)
# Note: lru_cache itself doesn't have TTL, you'd need a wrapper or another library for that.
# For demonstration, let's use a simple dictionary with manual cleanup.
class TimedLRUCache:
def __init__(self, max_size=1000, ttl_seconds=300):
self._cache = {}
self._max_size = max_size
self._ttl = ttl_seconds
self._order = [] # To maintain LRU order
def _cleanup_old_entries(self):
current_time = time.time()
# Remove expired entries
keys_to_remove = [key for key, (value, timestamp) in self._cache.items() if current_time - timestamp > self._ttl]
for key in keys_to_remove:
del self._cache[key]
self._order.remove(key)
# Remove least recently used if cache exceeds max_size
while len(self._cache) > self._max_size:
lru_key = self._order.pop(0)
if lru_key in self._cache: # Ensure it wasn't already removed by TTL
del self._cache[lru_key]
def get(self, key):
self._cleanup_old_entries()
if key in self._cache:
# Move accessed item to end of order (most recently used)
self._order.remove(key)
self._order.append(key)
return self._cache[key][0] # Return value, not (value, timestamp)
return None
def set(self, key, value):
self._cleanup_old_entries()
if key in self._cache:
# Update existing item, move to end of order
self._order.remove(key)
self._order.append(key)
elif len(self._cache) >= self._max_size:
# If cache is full, remove LRU item before adding new one
lru_key = self._order.pop(0)
if lru_key in self._cache:
del self._cache[lru_key]
self._cache[key] = (value, time.time())
self._order.append(key)
# Usage within a Celery task:
# Assuming `my_cache` is a global instance or passed appropriately
my_cache = TimedLRUCache(max_size=5000, ttl_seconds=600) # Cache up to 5000 items, expires after 10 mins
@app.task
def process_and_cache_data(item_id):
cached_data = my_cache.get(item_id)
if cached_data is None:
print(f"Cache miss for {item_id}. Fetching data...")
data = fetch_expensive_data(item_id) # Assume this is a slow operation
my_cache.set(item_id, data)
return f"Fetched and cached data for {item_id}"
else:
print(f"Cache hit for {item_id}.")
return f"Used cached data for {item_id}"
# In a real scenario, you'd need to ensure `my_cache` is accessible
# and potentially shared across worker processes if using multiprocessing pool,
# or managed per-worker if using eventlet/gevent.
# For simplicity, a global instance is shown here, which works for single-process workers.
Advanced Techniques: memory_profiler and Tracing
For more granular analysis, the memory_profiler library can be integrated. It allows you to profile memory usage line-by-line within a function.
Using memory_profiler
Install it:
pip install memory_profiler psutil
Decorate your functions with @profile. You can then run the script using mprof run and analyze the results with mprof plot or mprof report.
# profile_this_task.py
from memory_profiler import profile
import time
@profile
def my_memory_intensive_task(n):
a = [i * i for i in range(n)] # Allocates memory
time.sleep(1)
b = [i * i * i for i in range(n)] # Allocates more memory
del a # Explicitly delete to free memory
time.sleep(1)
return b
if __name__ == '__main__':
my_memory_intensive_task(100000)
To run this:
mprof run profile_this_task.py mprof report # or mprof plot
Integrating mprof directly into a running Celery worker can be tricky due to its execution model. It’s often easier to use mprof to profile a standalone script that mimics the problematic task’s behavior, or to attach mprof to a specific worker process if your deployment allows.
AWS Specific Considerations
When running on AWS, remember:
- Instance Sizing: Ensure your EC2 instances or ECS tasks have sufficient RAM. While this post focuses on leaks, sometimes the issue is simply under-provisioning.
- Auto Scaling: If your workers are part of an auto-scaling group, a memory leak can cause new instances to be launched frequently, leading to unexpected costs and performance issues.
- Containerization (ECS/EKS): In containerized environments, monitor container memory limits and usage. Leaks can cause containers to be OOM-killed. Ensure your container definitions have appropriate memory limits set.
- CloudWatch Alarms: Set up CloudWatch alarms for high memory utilization (e.g., `MemAvailable` metric from CloudWatch Agent) on your EC2 instances or container metrics to proactively alert you to potential issues.
Conclusion
Diagnosing memory leaks in long-running Python applications like Celery workers requires a systematic approach. Start with system-level monitoring to confirm the leak, then use tools like objgraph to pinpoint the accumulating object types. Finally, dive into the specific code paths responsible, employing techniques like context managers, cache limits, and careful resource management. By combining these strategies with AWS-specific monitoring, you can effectively combat memory leaks and ensure the stability of your distributed task processing systems.