• Skip to secondary menu
  • Skip to main content
  • Skip to primary sidebar
  • Home
  • Projects
  • Products
  • Themes
  • Tools
  • Request for Quote

Vengala Vinay

Having 12+ Years of Experience in Software Development

  • Home
  • WordPress
  • PHP
    • Codeigniter
  • Django
  • Magento
  • Selenium
  • Server
Home » How to Debug and Fix Memory leaks in long-running Python Celery worker daemons in Modern Python Applications

How to Debug and Fix Memory leaks in long-running Python Celery worker daemons in Modern Python Applications

Identifying Memory Bloat in Celery Workers

Long-running processes, especially those handling asynchronous tasks like Celery workers, are prime candidates for memory leaks. Unlike short-lived scripts, a worker process that accumulates memory over time without releasing it will eventually consume all available RAM, leading to `OutOfMemory` errors, process termination, and cascading failures in your application. The first step is to reliably detect this bloat.

We’ll primarily use system-level tools and Python’s built-in capabilities for initial diagnosis. For a running Celery worker, you can inspect its current memory usage using standard Linux utilities. The most common is top or htop. Look for the RES (Resident Set Size) column for your worker processes. A steadily increasing RES value over hours or days, even when idle, is a strong indicator of a leak.

To get a more programmatic view, we can leverage Python’s psutil library. This allows us to query process information directly from within Python, which is invaluable for automated monitoring or targeted debugging.

Leveraging `psutil` for Real-time Memory Monitoring

Install psutil if you haven’t already:

pip install psutil

Now, let’s craft a small script that can be run periodically (e.g., via a cron job or a separate monitoring service) to log the memory usage of your Celery worker PIDs. You’ll need to know the PIDs of your workers. A common way to manage Celery workers is using supervisor or systemd, which often log PIDs to a file.

Assuming you have a file named celery_worker.pid containing the PID of a single worker, or a file with multiple PIDs (one per line), you can use the following Python script:

Note: Replace /path/to/your/celery_worker.pid with the actual path to your PID file.

This script will output the PID, RSS (Resident Set Size in MB), and VMS (Virtual Memory Size in MB) for each process found in the PID file. Monitor this output over time. A consistent upward trend in RSS for your worker PIDs is the smoking gun.

import psutil
import os
import time
import sys

def get_memory_usage(pid_file_path):
    if not os.path.exists(pid_file_path):
        print(f"PID file not found: {pid_file_path}", file=sys.stderr)
        return

    pids = []
    try:
        with open(pid_file_path, 'r') as f:
            for line in f:
                try:
                    pid = int(line.strip())
                    if psutil.pid_exists(pid):
                        pids.append(pid)
                    else:
                        print(f"PID {pid} from {pid_file_path} does not exist.", file=sys.stderr)
                except ValueError:
                    print(f"Invalid PID format in {pid_file_path}: {line.strip()}", file=sys.stderr)
    except IOError as e:
        print(f"Error reading PID file {pid_file_path}: {e}", file=sys.stderr)
        return

    if not pids:
        print(f"No active PIDs found in {pid_file_path}.", file=sys.stderr)
        return

    print(f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}")
    for pid in pids:
        try:
            process = psutil.Process(pid)
            mem_info = process.memory_info()
            rss_mb = mem_info.rss / (1024 * 1024)
            vms_mb = mem_info.vms / (1024 * 1024)
            print(f"PID: {pid}, RSS: {rss_mb:.2f} MB, VMS: {vms_mb:.2f} MB")
        except psutil.NoSuchProcess:
            print(f"PID {pid} disappeared during check.", file=sys.stderr)
        except Exception as e:
            print(f"Error checking PID {pid}: {e}", file=sys.stderr)

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: python monitor_celery_memory.py ")
        sys.exit(1)
    
    pid_file = sys.argv[1]
    get_memory_usage(pid_file)

Profiling Memory Usage with `objgraph`

Once a leak is suspected, the next step is to understand what is consuming the memory. Python’s garbage collector handles most memory management, but leaks often occur due to reference cycles or objects that are no longer needed but are still referenced. The objgraph library is an excellent tool for visualizing these object references.

Install objgraph:

pip install objgraph

To use objgraph effectively, you need to integrate it into your Celery worker code. A common strategy is to add a special task that, when called, triggers a memory snapshot and analysis. This allows you to compare memory states before and after a suspected problematic operation or after the worker has been running for a while.

Here’s an example of how you might add a debugging task to your Celery app. This task will take a snapshot, and if a previous snapshot exists, it will compare them to identify growing object counts.

from celery import Celery
import objgraph
import gc
import os
import pickle

# Assume your Celery app is configured elsewhere, e.g., in tasks.py
# from your_project.celery_app import app

# For demonstration, let's create a dummy Celery app
app = Celery('tasks', broker='redis://localhost:6379/0')

# Define a path to store the snapshot file
SNAPSHOT_FILE = '/tmp/celery_worker_snapshot.pkl'

@app.task
def debug_memory_snapshot():
    """
    Takes a memory snapshot and compares it with a previous one if available.
    Identifies objects whose count has significantly increased.
    """
    print("Taking memory snapshot...")
    gc.collect() # Force garbage collection before snapshot

    # Get current object counts
    current_counts = objgraph.get_leaking_objects()

    if os.path.exists(SNAPSHOT_FILE):
        try:
            with open(SNAPSHOT_FILE, 'rb') as f:
                previous_counts = pickle.load(f)
            
            print("Comparing with previous snapshot...")
            # Compare counts: find objects that have increased significantly
            # objgraph.show_growth() is a good starting point, but we can customize
            
            leaks = objgraph.count_diff(previous_counts, current_counts)
            
            # Sort by count difference (descending) and show top N
            sorted_leaks = sorted(leaks.items(), key=lambda item: item[1], reverse=True)
            
            print("\n--- Memory Growth Report ---")
            print("Top 10 objects with increased counts:")
            for obj_type, count_diff in sorted_leaks[:10]:
                print(f"- {obj_type}: +{count_diff}")
            print("--------------------------\n")

            # Optionally, show references for the most grown object type
            if sorted_leaks:
                top_leaking_type = sorted_leaks[0][0]
                print(f"Showing references for top growing type: {top_leaking_type}")
                # Get a few objects of this type
                leaking_objects = [obj for obj in gc.get_objects() if type(obj).__name__ == top_leaking_type]
                if leaking_objects:
                    objgraph.show_refs([leaking_objects[0]], filename='/tmp/top_leak_refs.png', max_depth=5)
                    print("Generated /tmp/top_leak_refs.png")
                else:
                    print(f"Could not find any live objects of type {top_leaking_type} to show references.")

        except Exception as e:
            print(f"Error comparing snapshots: {e}", file=sys.stderr)
    
    # Save current snapshot for next time
    try:
        with open(SNAPSHOT_FILE, 'wb') as f:
            pickle.dump(current_counts, f)
        print(f"Current snapshot saved to {SNAPSHOT_FILE}")
    except Exception as e:
        print(f"Error saving snapshot: {e}", file=sys.stderr)

    return "Memory snapshot taken and analyzed."

# Example of a task that might cause memory issues (for testing)
@app.task
def process_large_data(data_size_mb):
    """A dummy task that allocates memory."""
    print(f"Allocating {data_size_mb} MB of data...")
    # Allocate a large list of bytes
    try:
        data = [b'\x00' * (1024 * 1024) for _ in range(data_size_mb)]
        print(f"Allocated {len(data)} chunks.")
        # In a real leak, this memory might not be released properly if references persist
        # For this example, it will be released when the task finishes unless we hold onto 'data'
        # For demonstration, let's keep a reference for a bit longer to simulate a potential issue
        # In a real leak, this reference might be held by a global variable, a long-lived object, etc.
        # For this simple task, Python's GC will likely clean it up.
        # To truly simulate a leak, you'd need a more complex scenario.
        time.sleep(2) # Simulate work
        return f"Processed {data_size_mb} MB."
    except MemoryError:
        return f"Failed to allocate {data_size_mb} MB."

# To run this:
# 1. Start a Redis server.
# 2. Start a Celery worker: celery -A your_module worker -l info -P eventlet (or gevent for concurrency)
# 3. In a Python interpreter or another script:
#    from your_module import debug_memory_snapshot, process_large_data
#    debug_memory_snapshot.delay() # Take initial snapshot
#    process_large_data.delay(500) # Run a task that uses memory
#    time.sleep(5) # Wait for task to finish
#    debug_memory_snapshot.delay() # Take snapshot again and compare

Common Culprits and How to Fix Them

Memory leaks in Python applications, especially in long-running workers, often stem from a few common patterns:

1. Unbounded Caches and Global Collections

Storing results, fetched data, or intermediate computations in global variables or unbounded dictionaries/lists within your worker can lead to memory growth. If these collections are never cleared or pruned, they will grow indefinitely.

Example of a leak:

# In your tasks.py or a module imported by tasks.py
import time

# !!! LEAKING CODE !!!
# This list will grow indefinitely with each task execution
_unbounded_cache = []

@app.task
def process_and_cache(item):
    # Simulate processing
    result = f"Processed: {item}"
    time.sleep(0.1)
    
    # Add to the global cache without any limit or removal strategy
    _unbounded_cache.append(result) 
    
    print(f"Item '{item}' cached. Cache size: {len(_unbounded_cache)}")
    return result

Fix: Implement a bounded cache (e.g., using collections.deque with a `maxlen`, or a custom LRU cache) or ensure that data is explicitly removed when no longer needed. If the cache is meant to be per-worker-instance, it should not be a global module variable but perhaps an instance variable of a class that manages worker state, and that state should be reset periodically or when memory pressure is detected.

from collections import deque
import time

# !!! FIXED CODE !!!
# Use a deque with a maximum length
MAX_CACHE_SIZE = 1000
_bounded_cache = deque(maxlen=MAX_CACHE_SIZE)

@app.task
def process_and_cache_fixed(item):
    result = f"Processed: {item}"
    time.sleep(0.1)
    
    # Adding to a full deque automatically removes the oldest item
    _bounded_cache.append(result) 
    
    print(f"Item '{item}' cached. Cache size: {len(_bounded_cache)}")
    return result

# Alternative: Explicitly clear the cache periodically if it's not time-bound
# You could add a separate task for this or integrate into another task.
def clear_global_cache():
    global _unbounded_cache
    print(f"Clearing cache. Current size: {len(_unbounded_cache)}")
    _unbounded_cache.clear()
    # Force garbage collection if needed, though clear() should release references
    gc.collect() 

2. Lingering References in Asynchronous Operations

When using libraries that perform I/O or background operations (like network requests, database queries, or even certain Celery features like `chain` or `group` that might hold references to intermediate results), it’s possible for references to objects to be held longer than expected, preventing them from being garbage collected.

Example scenario: A task that fetches a large dataset, processes it, and then passes it to another task in a chain. If the intermediate result object is not properly dereferenced, it might persist in memory longer than necessary.

Fix: Be mindful of object lifetimes. Explicitly set variables to None when they are no longer needed, especially if they hold large objects. Use context managers (with statements) where appropriate, as they often help in ensuring resources are released. For complex asynchronous workflows, consider using tools like asyncio with proper `await` and `async with` patterns, or ensure that your Celery task functions return minimal data and avoid holding onto large objects across task boundaries unless explicitly intended.

3. Third-Party Libraries

Sometimes, the leak isn’t in your code but in a third-party library you’re using. This is particularly common with libraries that manage their own internal caches or stateful connections.

Fix: If you suspect a third-party library, try to isolate its usage. Create a minimal test case that only uses that library’s functionality. If the leak persists, check the library’s issue tracker. You might need to update the library, configure it differently (e.g., disable caching), or find an alternative. Using objgraph to inspect objects related to the library’s modules can help pinpoint the source.

Strategies for Production Environments

Debugging memory leaks in production requires a delicate balance between gathering information and not impacting performance or stability. Here are some production-ready strategies:

1. Periodic Restarts

The simplest, albeit brute-force, solution is to periodically restart your Celery workers. This doesn’t fix the leak but effectively manages its symptoms by resetting the worker’s memory footprint. Determine a restart interval based on your observed memory growth rate and available resources. Tools like supervisor or systemd can be configured to automatically restart workers if they crash (e.g., due to OOM killer) or on a schedule.

; Example supervisor configuration for periodic restart
[program:celery_worker]
command=/usr/bin/python /path/to/your/manage.py celery worker -A your_project --loglevel=info
autostart=true
autorestart=true
stopwaitsecs=600 ; Allow graceful shutdown
startsecs=10
redirect_stderr=true
stdout_logfile=/var/log/celery/worker.log
pidfile=/var/run/celery/worker.pid

; Add a cron job or another supervisor program to restart this program on a schedule
; For example, a script that runs `supervisorctl restart celery_worker` every 24 hours.

2. Health Checks and Automated Restarts

Implement a health check endpoint or a monitoring script that checks the worker’s memory usage (using psutil as shown earlier). If memory exceeds a predefined threshold (e.g., 80% of available RAM, or a specific MB value that indicates significant growth), trigger an alert and/or an automated restart of the affected worker process. This is more sophisticated than a fixed schedule as it reacts to actual memory pressure.

Example monitoring script snippet:

import psutil
import os
import sys
import time
import subprocess

# Configuration
PID_FILE = '/path/to/your/celery_worker.pid'
MEMORY_THRESHOLD_MB = 1500 # Restart if RSS exceeds 1500 MB
CHECK_INTERVAL_SECONDS = 300 # Check every 5 minutes
SUPERVISOR_PROGRAM_NAME = 'celery_worker' # Name of the program in supervisorctl

def check_and_manage_memory(pid_file, threshold_mb, program_name):
    if not os.path.exists(pid_file):
        print(f"PID file not found: {pid_file}. Cannot check memory.", file=sys.stderr)
        return

    try:
        with open(pid_file, 'r') as f:
            pid = int(f.readline().strip())
    except (IOError, ValueError) as e:
        print(f"Error reading PID from {pid_file}: {e}", file=sys.stderr)
        return

    if not psutil.pid_exists(pid):
        print(f"Worker process (PID: {pid}) not found. It might have already restarted or crashed.", file=sys.stderr)
        return

    try:
        process = psutil.Process(pid)
        mem_info = process.memory_info()
        rss_mb = mem_info.rss / (1024 * 1024)

        print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] PID: {pid}, RSS: {rss_mb:.2f} MB")

        if rss_mb > threshold_mb:
            print(f"WARNING: Memory usage ({rss_mb:.2f} MB) exceeds threshold ({threshold_mb} MB). Triggering restart for {program_name}.", file=sys.stderr)
            try:
                # Use supervisorctl to restart the process
                subprocess.run(['supervisorctl', 'restart', program_name], check=True)
                print(f"Successfully sent restart command for {program_name}.")
            except FileNotFoundError:
                print("Error: supervisorctl command not found. Is Supervisor installed and in PATH?", file=sys.stderr)
            except subprocess.CalledProcessError as e:
                print(f"Error executing supervisorctl restart: {e}", file=sys.stderr)
            # Exit after triggering restart to avoid further checks until it's back up
            sys.exit(0) 

    except psutil.NoSuchProcess:
        print(f"Process {pid} disappeared during check.", file=sys.stderr)
    except Exception as e:
        print(f"Error checking process {pid}: {e}", file=sys.stderr)

if __name__ == "__main__":
    # This script should be run periodically by cron or a similar scheduler
    # Example cron entry: */5 * * * * /usr/bin/python /path/to/monitor_script.py
    check_and_manage_memory(PID_FILE, MEMORY_THRESHOLD_MB, SUPERVISOR_PROGRAM_NAME)

3. Targeted Debugging in Staging

Avoid running heavy profiling tools like objgraph directly in production unless absolutely necessary and carefully managed. Instead, try to reproduce the suspected leak scenario in a staging environment that closely mirrors production. Once reproduced, use objgraph, memory_profiler, or other debugging tools to pinpoint the issue. You can even attach a debugger like pdb or ipdb to a worker process in staging to step through code and inspect memory.

Using memory_profiler:

pip install memory_profiler
pip install psutil # Already installed for previous examples

Decorate your task functions with @profile and run the worker with memory_profiler. This provides line-by-line memory usage analysis.

# In your tasks.py
from celery import Celery
import time
from memory_profiler import profile # Import the decorator

app = Celery('tasks', broker='redis://localhost:6379/0')

@profile # Decorate the task
def process_large_data_with_profiling(data_size_mb):
    print(f"Allocating {data_size_mb} MB of data...")
    data = [b'\x00' * (1024 * 1024) for _ in range(data_size_mb)]
    print(f"Allocated {len(data)} chunks.")
    time.sleep(2)
    return f"Processed {data_size_mb} MB."

# To run this:
# 1. Start Redis.
# 2. Start Celery worker with: celery -A your_module worker -l info -P eventlet
# 3. Run the profiler:
#    python -m memory_profiler your_module.py
#    (This will execute the script and run the decorated function, outputting memory usage)
#    Or, if you want to run it as a task and capture output:
#    You might need to configure Celery to capture stdout/stderr from tasks, or
#    manually trigger the task and check logs. For direct execution:
#    from your_module import process_large_data_with_profiling
#    process_large_data_with_profiling(500)

The output will show memory usage at each line, helping you identify which lines are allocating significant amounts of memory. Remember to remove the @profile decorator in production code.

Conclusion

Debugging memory leaks in long-running Celery workers is a systematic process. Start with monitoring to confirm the leak, then use profiling tools like objgraph and memory_profiler to identify the source. Address common culprits like unbounded caches and lingering references. In production, employ strategies like periodic restarts and automated health checks to manage the symptoms while you work on permanent fixes in a staging environment. Consistent vigilance and a methodical approach are key to maintaining the stability of your distributed task processing system.

Primary Sidebar

A little about the Author

Having 12+ Years of Experience in Software Development, Vinay is a principal software architect, senior systems engineer, and elite technical consultant. He specializes in bespoke PHP/WordPress development, high-performance Magento 2 & Shopify architectures, custom plugin/theme development from scratch, and legacy code modernization (including VB6, VB.NET, PyQt, and Crystal Reports). Known for solving complex database bottlenecks, speed optimization (Core Web Vitals), and advanced security code auditing, Vinay engineers production-ready systems designed to scale under heavy concurrent load conditions.



Chat on WhatsApp

Recent Posts

  • Go Goroutines vs. Node.js Event Loop: Scaling I/O-Bound Microservices Under High Load
  • Elixir Phoenix vs. Go Gin: Concurrency Models and Fault Tolerance Under Peak Request Volume
  • Python Celery vs. Go Channels: Distributed Task Queue Overhead and Memory Reliability
  • Scala Pekko vs. Go Goroutines: Actor Model vs. CSP for Event-Driven Reactive Systems
  • Java Loom Virtual Threads vs. Go Goroutines: Under-the-Hood Scheduler and Thread Overhead Comparison

Categories

  • apache (1)
  • Business & Monetization (390)
  • Centos (4)
  • Comparisons & Decision Making (55)
  • Debian (2)
  • Debugging & Troubleshooting (584)
  • Desktop Applications (14)
  • DevOps (7)
  • DevOps & Cloud Scaling (962)
  • Django (1)
  • Laravel (4)
  • Migration & Architecture (192)
  • Mobile Applications (24)
  • MySQL (1)
  • Performance & Optimization (806)
  • PHP (5)
  • PHP Development (21)
  • Plugins & Themes (244)
  • Programming Languages (9)
  • Python (19)
  • Ruby on Rails (1)
  • Security & Compliance (543)
  • SEO & Growth (491)
  • Server (23)
  • Ubuntu (9)
  • VB6 & VB.NET (8)
  • Web Applications & Frontend (19)
  • Web Assembly (Wasm) (2)
  • WordPress (22)
  • WordPress Plugin Development (7)
  • WordPress Theme Development (357)

Recent Posts

  • Go Goroutines vs. Node.js Event Loop: Scaling I/O-Bound Microservices Under High Load
  • Elixir Phoenix vs. Go Gin: Concurrency Models and Fault Tolerance Under Peak Request Volume
  • Python Celery vs. Go Channels: Distributed Task Queue Overhead and Memory Reliability

Top Categories

  • DevOps & Cloud Scaling (962)
  • Performance & Optimization (806)
  • Debugging & Troubleshooting (584)
  • Security & Compliance (543)
  • SEO & Growth (491)
  • Business & Monetization (390)

Our Products

  • ERP & LMS Systems (4)
  • Directories & Marketplaces (4)
  • Healthcare Portals (3)
  • Point of Sale (POS) (2)
  • E-Commerce Engines (2)

Our Services

  • E-Commerce Development (10)
  • WordPress Development (8)
  • Python & Desktop GUI (7)
  • General Consulting (7)
  • Legacy Modernization (5)
  • Mobile App Development (4)

Copyright © 2026 · Vinay Vengala