• Skip to secondary menu
  • Skip to main content
  • Skip to primary sidebar
  • Home
  • Projects
  • Products
  • Themes
  • Tools
  • Request for Quote

Vengala Vinay

Having 9+ Years of Experience in Software Development

  • Home
  • WordPress
  • PHP
    • Codeigniter
  • Django
  • Magento
  • Selenium
  • Server
Home » Step-by-Step: Diagnosing Memory leaks in long-running Python Celery worker daemons on Google Cloud Servers

Step-by-Step: Diagnosing Memory leaks in long-running Python Celery worker daemons on Google Cloud Servers

Identifying the Problem: Gradual Memory Increase in Celery Workers

Long-running processes, especially those handling asynchronous tasks like Celery workers, are prime candidates for memory leaks. On Google Cloud Platform (GCP) Compute Engine instances, these leaks can manifest as a steady, predictable increase in memory consumption over time, eventually leading to OOM (Out Of Memory) killer intervention or performance degradation. The first step in diagnosing this is to establish a baseline and observe the trend.

We’ll leverage standard Linux tools and Python-specific introspection to pinpoint the source. Assume your Celery workers are managed by a process supervisor like `systemd` or `supervisor` and are running on a Debian/Ubuntu-based GCP instance.

Phase 1: System-Level Monitoring and Baseline Establishment

Before diving into Python code, let’s confirm the memory usage at the OS level. We’ll use `htop` for interactive monitoring and `pidstat` for historical data collection.

1. Interactive Monitoring with `htop`

SSH into your GCP instance and run htop. Identify your Celery worker processes. Note their PIDs (Process IDs). Observe the ‘RES’ (Resident Set Size) and ‘VIRT’ (Virtual Memory Size) columns. A consistently growing ‘RES’ value for a specific worker PID over hours or days is a strong indicator of a leak.

2. Historical Data Collection with `pidstat`

For more systematic tracking, `pidstat` is invaluable. It’s part of the `sysstat` package. If not installed, run:

sudo apt update
sudo apt install sysstat -y

To monitor a specific Celery worker PID (let’s assume PID 12345) every 60 seconds for an hour, you can use:

pidstat -r -p 12345 60 60 > celery_worker_memory.log

The -r flag enables memory reporting. The output will be a CSV-like format. Analyze celery_worker_memory.log. Look for a steady upward trend in the %MEM and RSS (Resident Set Size in KB) columns.

Phase 2: Python-Specific Memory Profiling

Once system-level monitoring confirms a memory issue, we need to investigate within the Python application itself. Python’s garbage collector handles memory management, but leaks can occur due to:

  • Unreleased references to objects (e.g., in global lists, caches, or closures).
  • External libraries with their own memory management issues.
  • C extensions that don’t properly manage memory.

1. Using `objgraph` for Object Graph Analysis

objgraph is a fantastic tool for visualizing Python object reference graphs. It helps identify objects that are unexpectedly still referenced.

First, install it:

pip install objgraph

To use it, you’ll typically need to attach to the running Celery worker process. This can be tricky with long-running daemons. A common approach is to add temporary debugging code to your worker or tasks that dumps object counts at intervals or upon receiving a specific signal.

Consider adding a signal handler to your Celery worker’s main script or a shared module:

import objgraph
import signal
import sys
import gc

def dump_memory_objects(signum, frame):
    print("--- Dumping object counts ---")
    # Get counts for the top 10 most common types
    objgraph.show_most_common_types(limit=20)
    print("--- Dumping references to large objects ---")
    # Find objects larger than a certain size (e.g., 1MB)
    # This requires knowing typical object sizes or experimenting
    # A more targeted approach is often better
    # Example: Find objects of a specific class that are growing
    # objgraph.show_refs([obj for obj in gc.get_objects() if isinstance(obj, MyLeakyClass)], filename="leaky_class_refs.png")
    print("--- Done dumping ---")

# Register the signal handler (e.g., for SIGUSR1)
signal.signal(signal.SIGUSR1, dump_memory_objects)

# In your Celery worker's main execution block or a relevant module:
# Make sure this code is imported and executed by the worker process.
# For example, if your worker starts from a main.py:
#
# if __name__ == "__main__":
#     # ... your Celery app setup ...
#     # Register the signal handler here or in an imported module
#     signal.signal(signal.SIGUSR1, dump_memory_objects)
#     # ... start worker ...

After deploying this change and restarting your Celery workers, you can send the signal to a specific worker PID:

kill -USR1 <CELERY_WORKER_PID>

Examine the output. Look for object types whose counts are increasing significantly over time. If you suspect a specific class (e.g., a custom cache object, a data structure holding task results), you can use objgraph.by_type('YourClassName') to inspect them directly.

2. Using `tracemalloc` for Allocation Tracing

Python 3.4+ includes the built-in `tracemalloc` module, which is excellent for tracking memory allocations and identifying where memory is being allocated.

Similar to `objgraph`, you’ll need to integrate `tracemalloc` into your worker’s code. You can start it at the beginning of your worker’s execution and periodically take snapshots.

import tracemalloc
import time
import gc

# Start tracing memory allocations
tracemalloc.start()

# Optional: Clear existing objects to get a cleaner baseline
gc.collect()

# Take an initial snapshot
snapshot1 = tracemalloc.take_snapshot()
print("Initial snapshot taken.")

# Simulate some work or let the worker run for a period
# For demonstration, we'll just sleep. In a real worker, this would be
# the normal operation loop.
# time.sleep(300) # Sleep for 5 minutes

# Take a second snapshot after some time
snapshot2 = tracemalloc.take_snapshot()
print("Second snapshot taken.")

# Compare the snapshots
top_stats = snapshot2.compare_to(snapshot1, 'lineno')

print("[ Top 10 differences ]")
for stat in top_stats[:10]:
    print(stat)

# To get a more detailed view of where memory is being allocated:
# You can also use tracemalloc.get_traced_memory() to get current and peak usage.

# To save a snapshot for later analysis (e.g., with a GUI tool like snakeviz)
# with open("snapshot.prof", "wb") as f:
#     tracemalloc.take_snapshot().store_to(f)

# Remember to stop tracing if needed, though for a long-running worker,
# you might keep it running and periodically take snapshots.
# tracemalloc.stop()

Integrate this code into your Celery worker’s startup or within a task that you suspect might be causing the leak. You can trigger snapshot comparisons periodically or via a signal handler (similar to the `objgraph` example). The output will show lines of code where memory usage has increased the most between snapshots. Focus on lines within your application code or trusted libraries.

3. Using `memory_profiler`

The `memory_profiler` library provides decorators to profile memory usage line-by-line for specific functions.

pip install memory_profiler

Decorate the functions you suspect are leaking memory:

from memory_profiler import profile

@profile
def process_large_data(data):
    # ... code that might be leaking ...
    intermediate_results = []
    for item in data:
        # Example: Appending to a list that never gets cleared
        intermediate_results.append(item * 2)
    return intermediate_results

# In your Celery task:
# from celery import Celery
# app = Celery('tasks', broker='redis://localhost:6379/0')
#
# @app.task
# def my_leaky_task(input_data):
#     # To profile this task, you'd need to run it in a way that
#     # memory_profiler can attach. This is often done by running
#     # the script directly for testing, not ideal for a Celery worker daemon.
#     # A better approach for daemons is to use tracemalloc or objgraph.
#     # However, for isolated testing of a task function:
#     #
#     # if __name__ == "__main__":
#     #     # This part is for standalone testing, not for the actual worker
#     #     processed_data = process_large_data(input_data)
#     #     print(f"Processed data length: {len(processed_data)}")
#     pass

Running the script containing the decorated function with python -m memory_profiler your_script.py will output detailed memory usage per line. For Celery workers, this is best used for debugging specific tasks in isolation rather than profiling the entire daemon.

Phase 3: Analyzing and Fixing the Leak

Once `objgraph` or `tracemalloc` points to specific objects or code locations:

  • Global Variables and Caches: Check for unbounded growth in global lists, dictionaries, or custom caches. Implement size limits, eviction policies (LRU), or explicit cleanup mechanisms.
  • Closures and Long-Lived References: Be mindful of functions defined within other functions that might hold references to large objects longer than intended.
  • External Libraries: If the leak appears to be within a third-party library, check its issue tracker. You might need to update the library, find an alternative, or implement workarounds.
  • Database Connections/Resources: Ensure that database connections, file handles, or other external resources are properly closed and released, especially within loops or long-running tasks.
  • Task Result Storage: If your tasks store large intermediate results or final outputs in memory before returning or saving, this can be a source. Consider streaming or processing data in chunks.

Example Fix: Clearing a Growing List

Suppose `objgraph` reveals a large number of `MyTaskResult` objects are being held, and `tracemalloc` points to a list named `results_buffer` within a task handler:

# Original potentially leaky code
# results_buffer = []
# def process_items(items):
#     for item in items:
#         result = MyTaskResult(item)
#         results_buffer.append(result) # Reference kept indefinitely
#     return results_buffer

# Fixed code
results_buffer = []
def process_items_fixed(items):
    local_results = []
    for item in items:
        result = MyTaskResult(item)
        local_results.append(result)
    # Clear the buffer after processing or return local_results
    # If results_buffer is intended as a global cache, implement eviction
    # If it's just for the current task run, clear it:
    # results_buffer.clear() # Or reassign: results_buffer = []
    return local_results # Return local results instead of relying on global

Phase 4: Production Deployment and Verification

After implementing a fix, redeploy your Celery workers. Monitor their memory usage closely using the same `pidstat` or `htop` methods established in Phase 1. The memory consumption should now stabilize or show a much slower, acceptable rate of increase.

Consider implementing automated alerting based on memory thresholds (e.g., using GCP’s Cloud Monitoring or Prometheus/Grafana) to notify you proactively if memory usage starts climbing unexpectedly again.

Primary Sidebar

A little about the Author

Having 9+ Years of Experience in Software Development.
Expertised in Php Development, WordPress Custom Theme Development (From scratch using underscores or Genesis Framework or using any blank theme or Premium Theme), Custom Plugin Development. Hands on Experience on 3rd Party Php Extension like Chilkat, nSoftware.

Recent Posts

  • Step-by-Step: Diagnosing thread pools deadlock during concurrent ActiveRecord transaction processing on Linode Servers
  • Securing Your E-commerce APIs: Preventing SQL Injection (SQLi) in customized checkout queries in WooCommerce Implementations
  • Disaster Recovery 101: Architecting Auto-Failovers for MySQL and Ruby Deployments on Linode
  • High-Throughput Caching Strategies: Scaling MySQL for Perl Application APIs
  • Disaster Recovery 101: Architecting Auto-Failovers for DynamoDB and Laravel Deployments on DigitalOcean

Copyright © 2026 · Vinay Vengala