Step-by-Step: Diagnosing Memory leaks in long-running Python Celery worker daemons on Google Cloud Servers
Identifying the Problem: Gradual Memory Increase in Celery Workers
Long-running processes, especially those handling asynchronous tasks like Celery workers, are prime candidates for memory leaks. On Google Cloud Platform (GCP) Compute Engine instances, these leaks can manifest as a steady, predictable increase in memory consumption over time, eventually leading to OOM (Out Of Memory) killer intervention or performance degradation. The first step in diagnosing this is to establish a baseline and observe the trend.
We’ll leverage standard Linux tools and Python-specific introspection to pinpoint the source. Assume your Celery workers are managed by a process supervisor like `systemd` or `supervisor` and are running on a Debian/Ubuntu-based GCP instance.
Phase 1: System-Level Monitoring and Baseline Establishment
Before diving into Python code, let’s confirm the memory usage at the OS level. We’ll use `htop` for interactive monitoring and `pidstat` for historical data collection.
1. Interactive Monitoring with `htop`
SSH into your GCP instance and run htop. Identify your Celery worker processes. Note their PIDs (Process IDs). Observe the ‘RES’ (Resident Set Size) and ‘VIRT’ (Virtual Memory Size) columns. A consistently growing ‘RES’ value for a specific worker PID over hours or days is a strong indicator of a leak.
2. Historical Data Collection with `pidstat`
For more systematic tracking, `pidstat` is invaluable. It’s part of the `sysstat` package. If not installed, run:
sudo apt update sudo apt install sysstat -y
To monitor a specific Celery worker PID (let’s assume PID 12345) every 60 seconds for an hour, you can use:
pidstat -r -p 12345 60 60 > celery_worker_memory.log
The -r flag enables memory reporting. The output will be a CSV-like format. Analyze celery_worker_memory.log. Look for a steady upward trend in the %MEM and RSS (Resident Set Size in KB) columns.
Phase 2: Python-Specific Memory Profiling
Once system-level monitoring confirms a memory issue, we need to investigate within the Python application itself. Python’s garbage collector handles memory management, but leaks can occur due to:
- Unreleased references to objects (e.g., in global lists, caches, or closures).
- External libraries with their own memory management issues.
- C extensions that don’t properly manage memory.
1. Using `objgraph` for Object Graph Analysis
objgraph is a fantastic tool for visualizing Python object reference graphs. It helps identify objects that are unexpectedly still referenced.
First, install it:
pip install objgraph
To use it, you’ll typically need to attach to the running Celery worker process. This can be tricky with long-running daemons. A common approach is to add temporary debugging code to your worker or tasks that dumps object counts at intervals or upon receiving a specific signal.
Consider adding a signal handler to your Celery worker’s main script or a shared module:
import objgraph
import signal
import sys
import gc
def dump_memory_objects(signum, frame):
print("--- Dumping object counts ---")
# Get counts for the top 10 most common types
objgraph.show_most_common_types(limit=20)
print("--- Dumping references to large objects ---")
# Find objects larger than a certain size (e.g., 1MB)
# This requires knowing typical object sizes or experimenting
# A more targeted approach is often better
# Example: Find objects of a specific class that are growing
# objgraph.show_refs([obj for obj in gc.get_objects() if isinstance(obj, MyLeakyClass)], filename="leaky_class_refs.png")
print("--- Done dumping ---")
# Register the signal handler (e.g., for SIGUSR1)
signal.signal(signal.SIGUSR1, dump_memory_objects)
# In your Celery worker's main execution block or a relevant module:
# Make sure this code is imported and executed by the worker process.
# For example, if your worker starts from a main.py:
#
# if __name__ == "__main__":
# # ... your Celery app setup ...
# # Register the signal handler here or in an imported module
# signal.signal(signal.SIGUSR1, dump_memory_objects)
# # ... start worker ...
After deploying this change and restarting your Celery workers, you can send the signal to a specific worker PID:
kill -USR1 <CELERY_WORKER_PID>
Examine the output. Look for object types whose counts are increasing significantly over time. If you suspect a specific class (e.g., a custom cache object, a data structure holding task results), you can use objgraph.by_type('YourClassName') to inspect them directly.
2. Using `tracemalloc` for Allocation Tracing
Python 3.4+ includes the built-in `tracemalloc` module, which is excellent for tracking memory allocations and identifying where memory is being allocated.
Similar to `objgraph`, you’ll need to integrate `tracemalloc` into your worker’s code. You can start it at the beginning of your worker’s execution and periodically take snapshots.
import tracemalloc
import time
import gc
# Start tracing memory allocations
tracemalloc.start()
# Optional: Clear existing objects to get a cleaner baseline
gc.collect()
# Take an initial snapshot
snapshot1 = tracemalloc.take_snapshot()
print("Initial snapshot taken.")
# Simulate some work or let the worker run for a period
# For demonstration, we'll just sleep. In a real worker, this would be
# the normal operation loop.
# time.sleep(300) # Sleep for 5 minutes
# Take a second snapshot after some time
snapshot2 = tracemalloc.take_snapshot()
print("Second snapshot taken.")
# Compare the snapshots
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
print("[ Top 10 differences ]")
for stat in top_stats[:10]:
print(stat)
# To get a more detailed view of where memory is being allocated:
# You can also use tracemalloc.get_traced_memory() to get current and peak usage.
# To save a snapshot for later analysis (e.g., with a GUI tool like snakeviz)
# with open("snapshot.prof", "wb") as f:
# tracemalloc.take_snapshot().store_to(f)
# Remember to stop tracing if needed, though for a long-running worker,
# you might keep it running and periodically take snapshots.
# tracemalloc.stop()
Integrate this code into your Celery worker’s startup or within a task that you suspect might be causing the leak. You can trigger snapshot comparisons periodically or via a signal handler (similar to the `objgraph` example). The output will show lines of code where memory usage has increased the most between snapshots. Focus on lines within your application code or trusted libraries.
3. Using `memory_profiler`
The `memory_profiler` library provides decorators to profile memory usage line-by-line for specific functions.
pip install memory_profiler
Decorate the functions you suspect are leaking memory:
from memory_profiler import profile
@profile
def process_large_data(data):
# ... code that might be leaking ...
intermediate_results = []
for item in data:
# Example: Appending to a list that never gets cleared
intermediate_results.append(item * 2)
return intermediate_results
# In your Celery task:
# from celery import Celery
# app = Celery('tasks', broker='redis://localhost:6379/0')
#
# @app.task
# def my_leaky_task(input_data):
# # To profile this task, you'd need to run it in a way that
# # memory_profiler can attach. This is often done by running
# # the script directly for testing, not ideal for a Celery worker daemon.
# # A better approach for daemons is to use tracemalloc or objgraph.
# # However, for isolated testing of a task function:
# #
# # if __name__ == "__main__":
# # # This part is for standalone testing, not for the actual worker
# # processed_data = process_large_data(input_data)
# # print(f"Processed data length: {len(processed_data)}")
# pass
Running the script containing the decorated function with python -m memory_profiler your_script.py will output detailed memory usage per line. For Celery workers, this is best used for debugging specific tasks in isolation rather than profiling the entire daemon.
Phase 3: Analyzing and Fixing the Leak
Once `objgraph` or `tracemalloc` points to specific objects or code locations:
- Global Variables and Caches: Check for unbounded growth in global lists, dictionaries, or custom caches. Implement size limits, eviction policies (LRU), or explicit cleanup mechanisms.
- Closures and Long-Lived References: Be mindful of functions defined within other functions that might hold references to large objects longer than intended.
- External Libraries: If the leak appears to be within a third-party library, check its issue tracker. You might need to update the library, find an alternative, or implement workarounds.
- Database Connections/Resources: Ensure that database connections, file handles, or other external resources are properly closed and released, especially within loops or long-running tasks.
- Task Result Storage: If your tasks store large intermediate results or final outputs in memory before returning or saving, this can be a source. Consider streaming or processing data in chunks.
Example Fix: Clearing a Growing List
Suppose `objgraph` reveals a large number of `MyTaskResult` objects are being held, and `tracemalloc` points to a list named `results_buffer` within a task handler:
# Original potentially leaky code
# results_buffer = []
# def process_items(items):
# for item in items:
# result = MyTaskResult(item)
# results_buffer.append(result) # Reference kept indefinitely
# return results_buffer
# Fixed code
results_buffer = []
def process_items_fixed(items):
local_results = []
for item in items:
result = MyTaskResult(item)
local_results.append(result)
# Clear the buffer after processing or return local_results
# If results_buffer is intended as a global cache, implement eviction
# If it's just for the current task run, clear it:
# results_buffer.clear() # Or reassign: results_buffer = []
return local_results # Return local results instead of relying on global
Phase 4: Production Deployment and Verification
After implementing a fix, redeploy your Celery workers. Monitor their memory usage closely using the same `pidstat` or `htop` methods established in Phase 1. The memory consumption should now stabilize or show a much slower, acceptable rate of increase.
Consider implementing automated alerting based on memory thresholds (e.g., using GCP’s Cloud Monitoring or Prometheus/Grafana) to notify you proactively if memory usage starts climbing unexpectedly again.