Resolving Memory leaks in long-running Python Celery worker daemons Under Peak Event Traffic on OVH
Diagnosing Memory Bloat in Python Celery Workers
When Python Celery workers, especially those running on infrastructure like OVH, begin to exhibit increasing memory consumption over time, particularly under peak event traffic, it’s a critical indicator of a memory leak. This isn’t a theoretical problem; it’s a production-halting issue that demands immediate, precise intervention. The root causes are often subtle, stemming from how Python manages memory, how Celery manages tasks and their states, and how external libraries interact with the process.
The first step in any such scenario is to establish a baseline and then monitor the deviation. For long-running daemons, a gradual, continuous increase in Resident Set Size (RSS) or Virtual Memory Size (VMS) that doesn’t correlate with the number of tasks processed or their expected memory footprint is the smoking gun. We need tools that can provide granular insights into the memory usage of the Python process itself.
Leveraging `memory_profiler` and `objgraph` for Deep Dives
While system-level tools like `top` or `htop` are useful for initial detection, they lack the specificity needed for Python. We need to inspect the Python objects themselves. The `memory_profiler` library is invaluable for pinpointing memory-hungry lines of code within your task functions. For a broader view of object allocation and potential reference cycles, `objgraph` is indispensable.
To start, instrument your Celery tasks with `memory_profiler`. This involves decorating your task functions with `@profile` and running your worker with the `python -m memory_profiler your_worker_script.py` command. However, directly running a Celery worker this way can be cumbersome. A more practical approach is to add selective profiling within critical or suspected tasks.
import time
from celery import Celery
from memory_profiler import profile
# Assume your Celery app is configured elsewhere
# from .celery_app import app
# For demonstration, a simple Celery app setup
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
@profile
def process_large_data_task(data_id):
# Simulate fetching and processing data that might grow in memory
# This is where you'd put your actual task logic
large_object = []
for i in range(100000):
large_object.append(f"item_{i}_{data_id}")
if i % 10000 == 0:
time.sleep(0.01) # Simulate work
# Potential leak source: if 'large_object' or parts of it
# are inadvertently stored in a global or long-lived object
# without proper cleanup.
result = len(large_object)
print(f"Processed {data_id}, result: {result}")
return result
# To run this with memory profiling:
# 1. Save this as a Python file (e.g., tasks.py)
# 2. Ensure you have memory_profiler installed: pip install memory_profiler
# 3. Run a worker: celery -A tasks worker -l info -P eventlet -c 10 (adjust concurrency and pool as needed)
# 4. Send a task: from tasks import process_large_data_task; process_large_data_task.delay(123)
# 5. Observe the output for memory usage per line.
#
# For more advanced profiling, you might need to integrate this into your
# worker startup or use a custom signal handler.
The output of `@profile` will show memory usage line by line. Look for lines where memory increases significantly and doesn’t decrease. If the leak isn’t tied to a specific line but rather to the accumulation of objects over many task executions, `objgraph` becomes crucial. You can use `objgraph.show_most_common_types()` to see which object types are most prevalent in memory, and `objgraph.show_backrefs()` to trace where they are being referenced.
import objgraph
import gc
# Inside a task or a diagnostic endpoint:
def inspect_memory_leaks():
gc.collect() # Force garbage collection to get a cleaner snapshot
print("Most common types:")
objgraph.show_most_common_types(limit=20)
# Example: If you suspect 'MyCustomObject' is leaking
# my_objects = objgraph.by_type('MyCustomObject')
# if my_objects:
# print(f"Found {len(my_objects)} instances of MyCustomObject.")
# objgraph.show_backrefs(my_objects[0], max_depth=10)
# else:
# print("No instances of MyCustomObject found.")
# You could expose this function via a simple Flask/Django endpoint
# or call it periodically within a long-running task if safe.
The key is to identify objects that are accumulating and whose references are not being cleared. Common culprits include: unclosed file handles, database connections not being properly returned to pools, large data structures held in global variables or class attributes that are never reset, and circular references that the garbage collector struggles with.
Celery’s Internal State and Memory Management
Celery itself maintains internal state, especially when using certain result backends or when tasks are configured to store results. If your tasks are configured with `result_expires` set to `None` or a very long duration, and you’re storing results, these can accumulate. More subtly, the worker process might hold onto references to task objects, their arguments, or their results longer than expected, especially if there are issues with task acknowledgment or state transitions.
Consider the `task_track_started` setting. If enabled, the worker tracks the start time of tasks, which adds overhead. While usually minor, in extreme scenarios with millions of tasks, this state could contribute. More importantly, review your `CELERYD_MAX_TASKS_PER_CHILD` setting. This setting dictates how many tasks a worker process will execute before it’s automatically restarted. A common strategy to mitigate memory leaks in long-running processes is to set this to a reasonable, finite number (e.g., 1000 or 5000, depending on task complexity and leak rate). This effectively acts as a periodic “clean slate” for the worker process.
# celeryconfig.py or settings.py CELERYD_MAX_TASKS_PER_CHILD = 1000 CELERY_TASK_RESULT_EXPIRES = 3600 # Expire results after 1 hour
On OVH, where you might be managing your own worker processes (e.g., via `systemd` or `supervisor`), ensure that your restart policies are robust. If a worker process exceeds a certain memory threshold (which you’d monitor externally), your process manager should be configured to kill and restart it. This is a reactive measure, but it prevents a single leaking worker from consuming all available RAM and crashing the host.
External Libraries and Resource Management
Many memory leaks in Python applications are not in the application code itself but in the libraries it uses. Libraries that interact with external resources (databases, network sockets, file systems, C extensions) are prime suspects. For instance, a database connector that doesn’t properly close cursors or release connection pool resources can lead to gradual memory growth.
If your tasks involve heavy I/O or data manipulation using libraries like `pandas`, `numpy`, `requests`, or database ORMs (like SQLAlchemy), scrutinize their resource management. Ensure that any context managers (`with` statements) are used correctly for resources that need explicit closing. For libraries that manage their own pools (e.g., `SQLAlchemy` engine), verify their configuration and ensure they are not holding onto excessive idle connections or cached data.
# Example with SQLAlchemy and potential leak if not managed
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# In a real app, this engine would likely be a singleton or managed globally
engine = create_engine("postgresql://user:password@host:port/dbname")
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def process_db_task(item_id):
db = SessionLocal()
try:
# Fetch data
data = db.query(...).filter_by(id=item_id).first()
# Process data...
# If 'data' object or related objects are held in memory beyond this function's scope
# without being explicitly released or garbage collected, it's a problem.
return data.processed_value
finally:
db.close() # Crucial: ensures the session is closed and resources are released.
# If using connection pooling, ensure connections are returned to the pool.
# SQLAlchemy's engine typically handles this if db.close() is called.
# Ensure all database operations are within try...finally blocks
# that guarantee db.close() is called.
When using libraries with C extensions (e.g., `numpy`, `cryptography`), memory management can be more complex. Python’s garbage collector might not directly manage memory allocated by C code. Leaks can occur if the C extension itself has bugs or if Python objects holding references to C-allocated memory are not properly deallocated. In such cases, profiling the C extensions or looking for known issues in their issue trackers might be necessary.
Production Monitoring and Alerting Strategy
Proactive monitoring is key. On OVH, you likely have access to metrics. Ensure you’re collecting and visualizing the memory usage (RSS) of your Celery worker processes. Tools like Prometheus with `node_exporter` and `process_exporter` can provide these metrics. Set up alerts for when memory usage crosses predefined thresholds (e.g., 80% of available RAM) or when the rate of memory increase exceeds a certain limit over a period.
# Example Prometheus query to monitor Celery worker RSS
# Assuming 'process_exporter' is configured to scrape your Celery worker process
# and you have labels like 'job="celery-worker"' and 'name="python"'
avg_over_time(process_resident_memory_bytes{job="celery-worker", name="python"}[5m])
When an alert fires, the system should ideally trigger an automated restart of the offending worker process (via `systemd` or `supervisor`). Simultaneously, it should notify the on-call engineering team with sufficient context, including the worker’s PID, current memory usage, and potentially recent task activity logs. This combination of automated recovery and human investigation is crucial for maintaining stability during peak loads.
Finally, consider implementing a “health check” endpoint within your Celery worker application. This endpoint could periodically run basic memory checks (e.g., using `psutil`) and report its status. If the health check fails due to excessive memory, the worker can self-terminate, allowing the process manager to restart it. This is a more integrated form of self-preservation.
import psutil
import os
def check_memory_health(threshold_mb=1024):
process = psutil.Process(os.getpid())
mem_info = process.memory_info()
rss_mb = mem_info.rss / (1024 * 1024) # Resident Set Size in MB
if rss_mb > threshold_mb:
print(f"CRITICAL: Memory usage {rss_mb:.2f}MB exceeds threshold {threshold_mb}MB. Self-terminating.")
os._exit(1) # Force exit, allowing supervisor/systemd to restart
else:
print(f"OK: Memory usage {rss_mb:.2f}MB is within limits.")
# This function could be called periodically within a long-running task,
# or in a separate thread if your worker framework supports it.
# Be cautious not to add significant overhead with frequent checks.
Resolving memory leaks in production Celery workers under peak load is a multi-faceted challenge. It requires a systematic approach combining deep code inspection, understanding of Python’s memory model, Celery’s operational characteristics, external library behavior, and robust production monitoring. The strategies outlined above provide a framework for diagnosing and mitigating these critical issues.