• Skip to secondary menu
  • Skip to main content
  • Skip to primary sidebar
  • Home
  • Projects
  • Products
  • Themes
  • Tools
  • Request for Quote

Vengala Vinay

Having 12+ Years of Experience in Software Development

  • Home
  • WordPress
  • PHP
    • Codeigniter
  • Django
  • Magento
  • Selenium
  • Server
Home » Fixing Memory leaks in long-running Python Celery worker daemons in Legacy Python Codebases Without Breaking API Contracts

Fixing Memory leaks in long-running Python Celery worker daemons in Legacy Python Codebases Without Breaking API Contracts

Diagnosing Memory Bloat in Long-Running Python Processes

Memory leaks in long-running Python applications, particularly those acting as background workers like Celery daemons, are insidious. They manifest as a gradual increase in memory consumption over time, eventually leading to `OutOfMemoryError` exceptions, process termination, or severe performance degradation. The challenge in legacy codebases is often the lack of explicit instrumentation and the difficulty in pinpointing the exact source without introducing significant architectural changes or breaking existing API contracts.

The first step is accurate diagnosis. We need to observe the memory footprint of the worker process over an extended period. Tools like psutil and memory_profiler are invaluable here. For a Celery worker, we can attach these tools to the running process.

Leveraging `psutil` for Real-time Memory Monitoring

psutil provides a cross-platform interface for retrieving information on running processes and system utilization. We can write a simple script to periodically poll the memory usage of our Celery worker process.

First, identify the Process ID (PID) of your Celery worker. You can typically find this in your supervisor configuration or by using pgrep -f 'celery worker'.

Then, execute a monitoring script like this:

import psutil
import time
import os
import sys

def monitor_memory(pid, interval=60):
    try:
        process = psutil.Process(pid)
        print(f"Monitoring PID: {pid} (Command: {process.name()})")
        print("Timestamp,RSS (MB),VMS (MB),USS (MB)")
        while True:
            try:
                mem_info = process.memory_info()
                # RSS: Resident Set Size (physical memory used)
                # VMS: Virtual Memory Size
                # USS: Unique Set Size (memory unique to this process, not shared) - requires Linux
                rss_mb = mem_info.rss / (1024 * 1024)
                vms_mb = mem_info.vms / (1024 * 1024)
                uss_mb = getattr(mem_info, 'uss', 0) / (1024 * 1024) # USS is Linux-specific

                timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
                print(f"{timestamp},{rss_mb:.2f},{vms_mb:.2f},{uss_mb:.2f}")
                time.sleep(interval)
            except psutil.NoSuchProcess:
                print(f"Process with PID {pid} not found. Exiting.")
                sys.exit(1)
            except Exception as e:
                print(f"An error occurred during monitoring: {e}")
                time.sleep(interval) # Wait before retrying

    except psutil.NoSuchProcess:
        print(f"Error: Process with PID {pid} not found.")
        sys.exit(1)
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        sys.exit(1)

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: python monitor_memory.py ")
        sys.exit(1)
    
    try:
        target_pid = int(sys.argv[1])
        monitor_memory(target_pid)
    except ValueError:
        print("Error: PID must be an integer.")
        sys.exit(1)

Run this script, redirecting output to a file, and let it run for several days or weeks. Analyze the resulting CSV for a consistent upward trend in RSS (Resident Set Size) or USS (Unique Set Size) that doesn’t correlate with increased workload. A stable USS is a good indicator that the memory growth is due to shared libraries or OS caching, while a growing USS points to objects held by the Python process itself.

Pinpointing Leaks with `memory_profiler`

Once a leak is suspected, memory_profiler can help identify specific lines of code responsible. This tool works by decorating functions and profiling their memory usage line by line. For Celery tasks, we can decorate the task functions themselves.

Install it: pip install memory_profiler.

Modify your Celery task file (e.g., tasks.py):

from celery import Celery
from memory_profiler import profile
import time
import random

# Assume broker and backend are configured
app = Celery('my_tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

# Example of a potentially leaky task
@app.task
@profile
def process_data_leaky(item_id):
    # Simulate some data processing that might hold onto objects
    data_cache = []
    for i in range(1000):
        # In a real scenario, this might be loading data from DB,
        # performing complex computations, or deserializing large objects
        # that are not properly released.
        processed_item = {"id": item_id, "value": random.random() * i, "timestamp": time.time()}
        data_cache.append(processed_item)
        # Simulate work
        time.sleep(0.001)
    
    # If data_cache is not cleared or its contents are referenced elsewhere
    # and not garbage collected, it can lead to a leak.
    # For demonstration, we'll let it go out of scope, but in complex apps,
    # references can persist.
    
    # Simulate a task that might be called repeatedly and accumulate state
    # For a true leak, the 'data_cache' or its contents would need to be
    # held onto across task invocations, which is harder in stateless Celery tasks
    # unless using shared memory, global variables, or external caches improperly.
    
    # Let's simulate a more direct leak for demonstration:
    # If this function were part of a class instance that persisted,
    # and 'self.persistent_data' was never cleared.
    # For a standalone task, the leak is more likely in libraries or
    # how data is passed/returned and held by the caller.
    
    # A common pattern for leaks in long-running processes is holding onto
    # large objects in global variables or class instances that are not reset.
    # For a stateless Celery task, this is less common unless the task
    # itself is part of a larger, stateful application context that is reused.
    
    # Let's simulate a leak by appending to a global list (bad practice!)
    global global_data_store
    if 'global_data_store' not in globals():
        global_data_store = []
    
    for _ in range(500): # Append a moderate amount of data
        global_data_store.append({"id": item_id, "data": os.urandom(1024)}) # 1KB of random data per entry
        
    print(f"Processed item {item_id}. Global store size: {len(global_data_store)} entries.")
    
    return f"Processed {item_id}"

# A non-leaky task for comparison
@app.task
@profile
def process_data_clean(item_id):
    # Simulate some data processing
    processed_item = {"id": item_id, "value": random.random(), "timestamp": time.time()}
    # This object goes out of scope immediately after the function returns
    time.sleep(0.01)
    return f"Cleanly processed {item_id}"

# To run this, you would typically start a worker like:
# celery -A tasks worker -l info -P eventlet --concurrency=1
# And then call the tasks:
# from tasks import process_data_leaky, process_data_clean
# process_data_leaky.delay(1)
# process_data_clean.delay(2)
#
# To see the output, you need to run the worker with:
# celery -A tasks worker -l info -P eventlet --concurrency=1 --without-gossip --without-mingle --without-heartbeat
# And then run the tasks. The memory_profiler output will be printed to the worker's stdout.
# For more detailed analysis, redirect worker output to a file.

When you run a Celery worker with these decorated tasks, memory_profiler will output line-by-line memory usage to the worker’s standard output. Look for functions where the memory usage increases significantly and doesn’t return to its baseline after the function completes. Pay close attention to loops and object allocations.

Strategies for Fixing Leaks Without Breaking API Contracts

The core challenge is modifying the behavior of a long-running process without altering its external interface (the tasks it accepts and the results it returns). This often means internal refactoring and careful management of object lifecycles.

1. Explicitly Dereferencing and Garbage Collection

In Python, memory is managed by reference counting and a cyclic garbage collector. Sometimes, objects are held longer than necessary due to lingering references. Explicitly setting variables to None and calling gc.collect() can help, though it’s often a band-aid rather than a cure for fundamental design flaws.

import gc

# Inside a task, after processing large data structures:
def process_and_clean_task(data):
    large_object = perform_heavy_computation(data)
    
    # ... use large_object ...
    
    # Explicitly break references
    result = process_result(large_object)
    large_object = None # Dereference
    
    # Force garbage collection if necessary, but use sparingly
    # This can impact performance if called too frequently.
    gc.collect() 
    
    return result

This is most effective when dealing with temporary, large objects within a single task execution. If the leak is due to state persisting across task invocations (e.g., in global variables or class instances used by the worker), this approach is insufficient.

2. Managing External Resources and Caches

Leaks often occur when external resources (database connections, file handles, network sockets) or in-memory caches are not properly closed or invalidated. In legacy code, these might be managed implicitly or through global singletons.

Example: Database Connections

import threading

# Potentially problematic: Global connection pool not managed per-request
# or not having a mechanism to close idle connections.
# global_db_connection = None 

# Better approach: Use context managers and ensure connections are closed.
# If using a library like SQLAlchemy, its session management is key.

def get_db_connection():
    # Simulate getting a connection that needs proper closing
    conn = create_db_connection() 
    return conn

def process_with_db(data):
    conn = None
    try:
        conn = get_db_connection()
        cursor = conn.cursor()
        cursor.execute("SELECT ... WHERE id = %s", (data['id'],))
        results = cursor.fetchall()
        # ... process results ...
        return results
    finally:
        if conn:
            conn.close() # Ensure connection is closed
            # If using a connection pool, return it to the pool instead of closing.
            # pool.release(conn) 

# In a Celery task:
@app.task
def my_db_task(data_id):
    # Ensure connection is managed within the task scope
    db_data = process_with_db({"id": data_id})
    return db_data

For caches, ensure they have eviction policies (e.g., LRU – Least Recently Used) or size limits. If using a library like functools.lru_cache, ensure the cache size is appropriate for the worker’s memory constraints.

from functools import lru_cache

@lru_cache(maxsize=128) # Limit cache size to prevent unbounded growth
def fetch_external_resource(resource_id):
    # Simulate fetching and processing a resource
    time.sleep(0.1) # Simulate I/O
    return {"id": resource_id, "data": "some_data"}

@app.task
def task_using_cached_resource(resource_id):
    data = fetch_external_resource(resource_id)
    # ... use data ...
    return data

3. Refactoring State Management

The most common source of leaks in long-running processes that are *supposed* to be stateless (like many Celery tasks) is the accidental introduction or persistence of state. This can happen via:

  • Global variables that accumulate data over time.
  • Class instances that are reused across tasks and hold onto data.
  • Closures that capture large objects from their enclosing scope.

Refactoring Global State:

# Problematic global state
# global_processing_cache = []

# @app.task
# def process_item(item):
#     global_processing_cache.append(item) # Accumulates indefinitely
#     # ... process ...

# Refactored approach: Pass state explicitly or use task-local storage if absolutely necessary.
# For stateless tasks, avoid global mutable state. If state is truly needed across tasks,
# consider externalizing it to Redis, a database, or using Celery's built-in features
# like task chains or workflows that manage state explicitly.

# If a global cache is unavoidable and must be managed:
MAX_CACHE_SIZE = 1000
processing_cache = []

@app.task
def process_item_managed_cache(item):
    global processing_cache
    
    # Add item
    processing_cache.append(item)
    
    # Trim cache if it exceeds size
    if len(processing_cache) > MAX_CACHE_SIZE:
        processing_cache = processing_cache[len(processing_cache) - MAX_CACHE_SIZE:] # Keep the most recent items
        
    # ... process item ...
    
    return "Processed"

# Alternatively, use a thread-safe queue or a dedicated caching library.

Refactoring Class-Based Workers/Tasks:

If your Celery setup involves class instances that persist (e.g., a worker class that holds state), ensure that any large data structures within those instances are cleared or reset between task executions. This often means implementing a reset() method or ensuring the instance is re-initialized appropriately.

class StatefulProcessor:
    def __init__(self):
        self.large_data_buffer = []
        self.initialized = False

    def initialize(self):
        # Load initial data, etc.
        self.large_data_buffer = [...] # Load some initial data
        self.initialized = True

    def process(self, item):
        if not self.initialized:
            self.initialize()
        
        # Add item to buffer
        self.large_data_buffer.append(item)
        
        # Simulate processing
        result = self._perform_work(item)
        
        # Crucially, decide if the buffer should persist or be cleared.
        # If it should be cleared after each task:
        # self.large_data_buffer = [] # Clear buffer
        
        # If it's a cache that needs trimming:
        if len(self.large_data_buffer) > 500:
            self.large_data_buffer = self.large_data_buffer[1:] # Remove oldest

        return result

    def _perform_work(self, item):
        # ... actual work ...
        pass

# In Celery setup, if this class instance is shared across workers/threads:
# Ensure `process` method correctly manages `large_data_buffer` lifecycle.
# If the instance is created per-worker, the leak might be in how `initialize`
# or `_perform_work` manages memory.

4. Using `objgraph` for Deep Inspection

When standard profiling isn’t enough, objgraph can visualize object references, helping to understand why objects are not being garbage collected. It can show you which objects are holding references to others.

Install: pip install objgraph.

You can use objgraph interactively within a Python debugger (like pdb or ipdb) attached to your worker process, or by adding snippets to your code.

import objgraph
import gc

# ... inside a task or during debugging ...

# Force garbage collection to ensure all cycles are found
gc.collect()

# Get a count of all objects of a specific type
print(f"Count of list objects: {objgraph.count('list')}")

# Find objects that are keeping a specific object alive
# Assume 'leaky_object' is an object you suspect is causing the leak
# You might get this object via a reference from a known large object.
# For example, if you suspect a list is growing:
# all_lists = objgraph.by_type('list')
# if all_lists:
#     leaky_list = all_lists[-1] # Assume the last one is the largest/newest
#     print("Showing references to the suspected leaky list:")
#     objgraph.show_backrefs([leaky_list], max_depth=5, filename='backrefs.png')
#     objgraph.show_refs([leaky_list], max_depth=5, filename='refs.png')

# A more targeted approach: find objects that have grown significantly in count
# You'd typically run this multiple times and compare counts.
# For example, if you suspect a custom class 'MyDataRecord' is leaking:
# objgraph.show_most_common_types(limit=20) 
# This will show you the types of objects that are most numerous. If 'MyDataRecord'
# is high on the list and its count is increasing over time, it's a prime suspect.

# To use objgraph effectively, you often need to:
# 1. Take a snapshot of object counts/references.
# 2. Perform some operations (e.g., run a task that might leak).
# 3. Take another snapshot.
# 4. Compare the snapshots to find types whose counts have increased.
# 5. Use show_backrefs on instances of those types to find what's holding them.

Visualizing the reference graph can reveal circular dependencies or unexpected references held by long-lived objects, which are common causes of leaks in Python.

Integration with Celery Worker Lifecycle

For long-running processes, it’s often more practical to implement a periodic “restart” or “reset” mechanism rather than trying to fix every single potential leak in legacy code. This is a pragmatic approach when refactoring is too risky or time-consuming.

Periodic Worker Restarts

Configure your process supervisor (e.g., supervisor, systemd) to restart the Celery worker process after a certain number of tasks have been processed or after a fixed time interval. This effectively “cleans up” the memory.

; Example supervisor configuration snippet
[program:celery_worker]
command=/path/to/your/venv/bin/celery -A your_app worker --loglevel=info
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
numprocs=1
redirect_stderr=true
stdout_logfile=/var/log/celery_worker.log
stderr_logfile=/var/log/celery_worker.err

; Add a mechanism to restart based on time or task count
; This is often handled by external scripts or cron jobs that monitor
; the worker's memory and send a HUP signal or restart command.
; For example, a script could:
; 1. Get worker PID.
; 2. Check memory usage via psutil.
; 3. If usage > threshold, send SIGTERM or SIGQUIT to worker.
; 4. Supervisor will then restart it.

Alternatively, you can implement a task that signals the worker to shut down gracefully after completing a certain number of tasks. This requires careful coordination with your supervisor.

import os
import signal
from celery import signals

MAX_TASKS_PER_WORKER = 10000
task_counter = 0

@app.task
def process_task(data):
    global task_counter
    task_counter += 1
    
    # ... actual task logic ...
    
    if task_counter >= MAX_TASKS_PER_WORKER:
        print(f"Max tasks ({MAX_TASKS_PER_WORKER}) reached. Signaling worker shutdown.")
        # Send SIGTERM to the worker process itself.
        # This requires knowing the worker's PID. A common pattern is to
        # store the PID in a file when the worker starts.
        try:
            worker_pid_file = "/var/run/celery/worker.pid" # Example path
            with open(worker_pid_file, 'r') as f:
                worker_pid = int(f.read().strip())
            os.kill(worker_pid, signal.SIGTERM)
        except Exception as e:
            print(f"Could not signal worker shutdown: {e}")
            
    return "Task completed"

# You would need to ensure the worker PID is written to the file
# when the worker starts. This can be done via Celery's signals or
# by modifying the worker startup script.
# Example using signals (add to your tasks.py or similar):
# @signals.worker_init.connect
# def on_worker_init(**kwargs):
#     pid = os.getpid()
#     worker_pid_file = "/var/run/celery/worker.pid"
#     with open(worker_pid_file, 'w') as f:
#         f.write(str(pid))
#     print(f"Worker started with PID {pid}. PID file created: {worker_pid_file}")

# @signals.worker_term.connect
# def on_worker_term(**kwargs):
#     worker_pid_file = "/var/run/celery/worker.pid"
#     if os.path.exists(worker_pid_file):
#         os.remove(worker_pid_file)
#     print("Worker terminating. PID file removed.")

This strategy doesn’t fix the leak but contains its impact, ensuring the worker’s memory usage remains bounded. It’s a crucial technique for maintaining stability in legacy systems where deep refactoring is not immediately feasible.

Primary Sidebar

A little about the Author

Having 12+ Years of Experience in Software Development, Vinay is a principal software architect, senior systems engineer, and elite technical consultant. He specializes in bespoke PHP/WordPress development, high-performance Magento 2 & Shopify architectures, custom plugin/theme development from scratch, and legacy code modernization (including VB6, VB.NET, PyQt, and Crystal Reports). Known for solving complex database bottlenecks, speed optimization (Core Web Vitals), and advanced security code auditing, Vinay engineers production-ready systems designed to scale under heavy concurrent load conditions.



Chat on WhatsApp

Recent Posts

  • Top 100 Developer Tooling and Productivity SaaS Ideas to Launch in 2026 to Boost Organic Search Growth by 200%
  • Top 100 Developer-Centric Code Snippet Managers and Customization Plugins to Double User Engagement and Session Duration
  • Top 5 API Monetization Frameworks and Gateway Strategies for Developers to Minimize Server Costs and Load Overhead
  • Top 50 Automated PDF & Document Generation Tool Ideas for Developers to Minimize Server Costs and Load Overhead
  • Top 50 Premium Newsletter and Subscription Business Models for Devs for High-Traffic Technical Portals

Categories

  • apache (1)
  • Business & Monetization (386)
  • Centos (4)
  • Comparisons & Decision Making (55)
  • Debian (2)
  • Debugging & Troubleshooting (563)
  • DevOps (7)
  • DevOps & Cloud Scaling (949)
  • Django (1)
  • Migration & Architecture (167)
  • MySQL (1)
  • Performance & Optimization (753)
  • PHP (5)
  • Plugins & Themes (223)
  • Security & Compliance (539)
  • SEO & Growth (483)
  • Server (23)
  • Ubuntu (9)
  • WordPress (22)
  • WordPress Plugin Development (7)
  • WordPress Theme Development (301)

Recent Posts

  • Top 100 Developer Tooling and Productivity SaaS Ideas to Launch in 2026 to Boost Organic Search Growth by 200%
  • Top 100 Developer-Centric Code Snippet Managers and Customization Plugins to Double User Engagement and Session Duration
  • Top 5 API Monetization Frameworks and Gateway Strategies for Developers to Minimize Server Costs and Load Overhead
  • Top 50 Automated PDF & Document Generation Tool Ideas for Developers to Minimize Server Costs and Load Overhead
  • Top 50 Premium Newsletter and Subscription Business Models for Devs for High-Traffic Technical Portals
  • Top 100 SEO and Schema Markup Plugins for Headless Decoupled Sites for Independent Web Developers and Indie Hackers

Top Categories

  • DevOps & Cloud Scaling (949)
  • Performance & Optimization (753)
  • Debugging & Troubleshooting (563)
  • Security & Compliance (539)
  • SEO & Growth (483)
  • Business & Monetization (386)

Our Products

  • School Management & Student Administration System
  • Integrated Hospital & Clinic Management System
  • Real Estate Directory & Agent Portal
  • Restaurant POS & Table Booking System
  • Retail Inventory POS & Billing System
  • Pharmacy Inventory & Clinic Billing System

Our Services

  • Vibe Engineering & AI Code Auditing Services
  • Prompt Engineering & "Vibe Coding" Workflow Consulting
  • AI-Augmented "Vibe Coding" & Rapid MVP Development
  • Figma to Shopify Liquid Theme Customization
  • Figma to WooCommerce Frontend Development
  • Figma to Magento 2 Theme Development

Copyright © 2026 · Vinay Vengala