• Skip to secondary menu
  • Skip to main content
  • Skip to primary sidebar
  • Home
  • Projects
  • Products
  • Themes
  • Tools
  • Request for Quote

Vengala Vinay

Having 9+ Years of Experience in Software Development

  • Home
  • WordPress
  • PHP
    • Codeigniter
  • Django
  • Magento
  • Selenium
  • Server
Home » Disaster Recovery 101: Architecting Auto-Failovers for Elasticsearch and Python Deployments on DigitalOcean

Disaster Recovery 101: Architecting Auto-Failovers for Elasticsearch and Python Deployments on DigitalOcean

Designing for Resilience: Elasticsearch Auto-Failover with Python Orchestration

Achieving true high availability for critical services like Elasticsearch requires more than just redundant instances; it demands an automated failover strategy. This post details a robust approach to architecting auto-failover for Elasticsearch clusters deployed on DigitalOcean, orchestrated by a Python application that monitors cluster health and initiates recovery actions.

Elasticsearch Cluster Setup and Health Monitoring

A typical Elasticsearch deployment for high availability involves multiple nodes forming a cluster. For this scenario, we’ll assume a setup with at least three master-eligible nodes to ensure quorum. Health monitoring is paramount. Elasticsearch exposes its cluster health via the `_cluster/health` API. We’ll leverage this endpoint to gauge the cluster’s status (green, yellow, red) and the number of unassigned shards, which are strong indicators of node failure or cluster instability.

Consider a basic Elasticsearch cluster configuration using Docker Compose for simplicity. This example assumes nodes are accessible within a private network. For production, consider dedicated DigitalOcean Droplets with static IPs and appropriate firewall rules.

Example Docker Compose for Elasticsearch

version: '3.7'

services:
  elasticsearch1:
    image: elasticsearch:7.17.0
    container_name: es01
    environment:
      - discovery.seed_hosts=elasticsearch2:9300,elasticsearch3:9300
      - cluster.initial_master_nodes=elasticsearch1,elasticsearch2,elasticsearch3
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - esdata1:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
    networks:
      - esnet

  elasticsearch2:
    image: elasticsearch:7.17.0
    container_name: es02
    environment:
      - discovery.seed_hosts=elasticsearch1:9300,elasticsearch3:9300
      - cluster.initial_master_nodes=elasticsearch1,elasticsearch2,elasticsearch3
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - esdata2:/usr/share/elasticsearch/data
    ports:
      - 9201:9200
    networks:
      - esnet

  elasticsearch3:
    image: elasticsearch:7.17.0
    container_name: es03
    environment:
      - discovery.seed_hosts=elasticsearch1:9300,elasticsearch2:9300
      - cluster.initial_master_nodes=elasticsearch1,elasticsearch2,elasticsearch3
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - esdata3:/usr/share/elasticsearch/data
    ports:
      - 9202:9200
    networks:
      - esnet

volumes:
  esdata1:
    driver: local
  esdata2:
    driver: local
  esdata3:
    driver: local

networks:
  esnet:
    driver: bridge

Monitoring Elasticsearch Health with Python

We’ll use Python’s `requests` library to query the Elasticsearch cluster health API. The monitoring script will run periodically, checking the cluster status. A ‘red’ status or a significant number of unassigned shards will trigger an alert and potentially an automated recovery process.

import requests
import time
import json

ELASTICSEARCH_URL = "http://localhost:9200" # Adjust if running remotely
HEALTH_ENDPOINT = "_cluster/health"
TIMEOUT_SECONDS = 10
CHECK_INTERVAL_SECONDS = 30

def get_cluster_health():
    try:
        response = requests.get(f"{ELASTICSEARCH_URL}/{HEALTH_ENDPOINT}", timeout=TIMEOUT_SECONDS)
        response.raise_for_status() # Raise an exception for bad status codes
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Error fetching cluster health: {e}")
        return None

def is_cluster_unhealthy(health_data):
    if not health_data:
        return True # Assume unhealthy if we can't get data

    status = health_data.get("status")
    unassigned_shards = health_data.get("unassigned_shards", 0)
    initializing_shards = health_data.get("initializing_shards", 0)

    # Define unhealthy conditions
    if status == "red":
        print("Cluster status is RED. Critical issue detected.")
        return True
    if status == "yellow" and unassigned_shards > 0:
        print(f"Cluster status is YELLOW with {unassigned_shards} unassigned shards. Potential issue.")
        return True
    if initializing_shards > 0:
        print(f"Cluster is initializing {initializing_shards} shards. Waiting for stability.")
        return False # Not necessarily unhealthy, but not fully ready

    return False

def monitor_elasticsearch():
    while True:
        health = get_cluster_health()
        if health:
            print(f"Current cluster health: {json.dumps(health, indent=2)}")
            if is_cluster_unhealthy(health):
                print("Unhealthy cluster detected. Initiating failover procedure...")
                # In a real-world scenario, call your failover orchestration function here
                # trigger_failover()
                # For demonstration, we'll just break after detecting an issue
                break
        else:
            print("Could not retrieve cluster health. Assuming failure.")
            # trigger_failover()
            break
        time.sleep(CHECK_INTERVAL_SECONDS)

if __name__ == "__main__":
    monitor_elasticsearch()

Orchestrating Failover with Python and DigitalOcean Droplets

The core of our auto-failover strategy lies in an orchestration layer. This Python application will: 1. Monitor Elasticsearch health. 2. If unhealthy, identify the problematic node(s). 3. Initiate recovery actions on DigitalOcean.

For this example, we’ll assume a scenario where a single Elasticsearch node becomes unresponsive. The Python script will detect this by failing to get a healthy response from the cluster API. The recovery action will involve restarting the affected Elasticsearch Droplet or, more aggressively, destroying and recreating it.

DigitalOcean API Interaction with Python

We’ll use the `python-digitalocean` library to interact with the DigitalOcean API. Ensure you have a DigitalOcean API token with appropriate read/write permissions. Store this token securely (e.g., as an environment variable).

pip install python-digitalocean requests
import digitalocean
import requests
import time
import json
import os

# --- Configuration ---
DO_API_TOKEN = os.environ.get("DIGITALOCEAN_API_TOKEN")
ELASTICSEARCH_NODES = {
    "es01": {"droplet_id": 12345678, "ip": "192.168.1.10"}, # Replace with actual IDs and IPs
    "es02": {"droplet_id": 87654321, "ip": "192.168.1.11"},
    "es03": {"droplet_id": 11223344, "ip": "192.168.1.12"},
}
ELASTICSEARCH_BASE_URL = "http://{ip}:9200" # Use the primary node's IP for initial check
HEALTH_ENDPOINT = "_cluster/health"
TIMEOUT_SECONDS = 10
CHECK_INTERVAL_SECONDS = 30
RETRY_ATTEMPTS = 3
RETRY_DELAY_SECONDS = 15

# --- DigitalOcean Manager ---
try:
    manager = digitalocean.Manager(token=DO_API_TOKEN)
except Exception as e:
    print(f"Error initializing DigitalOcean manager: {e}")
    exit(1)

def get_droplet(droplet_id):
    try:
        return manager.get_droplet(droplet_id)
    except digitalocean.baseapi.NotFoundError:
        print(f"Droplet with ID {droplet_id} not found.")
        return None
    except Exception as e:
        print(f"Error fetching Droplet {droplet_id}: {e}")
        return None

def restart_droplet(droplet_id):
    droplet = get_droplet(droplet_id)
    if droplet:
        print(f"Attempting to restart Droplet ID: {droplet_id} ({droplet.name})...")
        try:
            droplet.reboot()
            print(f"Restart command sent for Droplet ID: {droplet_id}.")
            return True
        except Exception as e:
            print(f"Error restarting Droplet ID {droplet_id}: {e}")
            return False
    return False

def destroy_droplet(droplet_id):
    droplet = get_droplet(droplet_id)
    if droplet:
        print(f"Attempting to destroy Droplet ID: {droplet_id} ({droplet.name})...")
        try:
            droplet.destroy()
            print(f"Destroy command sent for Droplet ID: {droplet_id}.")
            return True
        except Exception as e:
            print(f"Error destroying Droplet ID {droplet_id}: {e}")
            return False
    return False

# --- Elasticsearch Monitoring and Failover Logic ---
def get_es_health(node_ip):
    url = ELASTICSEARCH_BASE_URL.format(ip=node_ip)
    try:
        response = requests.get(f"{url}/{HEALTH_ENDPOINT}", timeout=TIMEOUT_SECONDS)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        # print(f"Error fetching health from {node_ip}: {e}")
        return None

def find_unhealthy_node(current_primary_ip):
    # First, try to get health from the current primary. If it fails, that's our first suspect.
    health_data = get_es_health(current_primary_ip)
    if health_data is None:
        print(f"Primary node {current_primary_ip} is unresponsive. Assuming it's the failure point.")
        return current_primary_ip # Return the IP of the unresponsive primary

    status = health_data.get("status")
    unassigned_shards = health_data.get("unassigned_shards", 0)

    if status == "red" or (status == "yellow" and unassigned_shards > 0):
        print(f"Cluster status is {status} with {unassigned_shards} unassigned shards.")
        # In a more complex setup, you'd inspect cluster.nodes to find the specific node.
        # For simplicity here, if the primary is unhealthy, we assume it's the culprit.
        # A more robust check would iterate through all nodes and try to ping them.
        return current_primary_ip
    return None # Cluster is healthy

def trigger_failover(failed_node_ip):
    print(f"Initiating failover for node at IP: {failed_node_ip}")
    failed_node_name = None
    failed_droplet_id = None

    for name, data in ELASTICSEARCH_NODES.items():
        if data["ip"] == failed_node_ip:
            failed_node_name = name
            failed_droplet_id = data["droplet_id"]
            break

    if not failed_droplet_id:
        print(f"Could not find Droplet ID for failed IP: {failed_node_ip}")
        return

    # --- Recovery Strategy ---
    # Option 1: Restart the Droplet
    print(f"Attempting to restart Droplet {failed_droplet_id} ({failed_node_name})...")
    if restart_droplet(failed_droplet_id):
        print(f"Restart initiated for {failed_node_name}. Waiting for recovery...")
        # Add logic here to wait for the node to come back online and cluster to stabilize
        # This might involve polling the health API again after a delay.
        time.sleep(60) # Simple delay, needs more sophisticated check
        print("Assuming node recovery. Re-monitoring cluster health.")
        return # Exit after attempting restart

    # Option 2: Destroy and Recreate (more aggressive)
    # This requires a mechanism to provision a new Droplet and join the cluster.
    # For this example, we'll just log it as a fallback.
    print(f"Restart failed or not attempted. Consider destroying and recreating Droplet {failed_droplet_id} ({failed_node_name}).")
    # destroy_droplet(failed_droplet_id) # Uncomment with extreme caution and proper provisioning logic

def monitor_and_orchestrate():
    current_primary_ip = ELASTICSEARCH_NODES["es01"]["ip"] # Start with a known primary

    while True:
        failed_node_ip = find_unhealthy_node(current_primary_ip)

        if failed_node_ip:
            print(f"Unhealthy node detected at IP: {failed_node_ip}. Triggering failover.")
            trigger_failover(failed_node_ip)
            # After failover, we might need to re-evaluate the primary node or wait for cluster re-election.
            # For simplicity, we'll just continue monitoring.
            # A more robust system would have a dedicated leader election for the orchestrator itself.
            current_primary_ip = ELASTICSEARCH_NODES["es01"]["ip"] # Reset to initial primary for next check
        else:
            print("Cluster health is OK.")

        time.sleep(CHECK_INTERVAL_SECONDS)

if __name__ == "__main__":
    if not DO_API_TOKEN:
        print("Error: DIGITALOCEAN_API_TOKEN environment variable not set.")
        exit(1)
    print("Starting Elasticsearch monitoring and failover orchestration...")
    monitor_and_orchestrate()

Deployment and Operational Considerations

Deploying this orchestration script requires careful consideration:

  • Orchestrator Resilience: The Python script itself needs to be highly available. Consider running it on a separate, reliable Droplet or using a container orchestration platform like Kubernetes. If the orchestrator fails, your auto-failover mechanism is compromised.
  • State Management: The script assumes a static mapping of node names to Droplet IDs and IPs. In dynamic environments, you’ll need a more sophisticated way to track your Elasticsearch nodes (e.g., using DigitalOcean tags, a configuration management database (CMDB), or Consul).
  • Idempotency: Failover actions (like restarting a Droplet) should be idempotent. Running the restart command multiple times on an already restarting or running Droplet should not cause issues.
  • Network Configuration: Ensure your Elasticsearch nodes can communicate with each other on the discovery ports (default 9300) and that the orchestrator can reach the Elasticsearch API (default 9200) and the DigitalOcean API. Use DigitalOcean’s private networking for inter-Droplet communication.
  • Credentials Management: Never hardcode API tokens. Use environment variables or a secrets management system.
  • Testing: Thoroughly test your failover scenarios. Simulate node failures (e.g., by stopping an Elasticsearch Docker container or rebooting a Droplet) and verify that the orchestration script correctly detects and recovers from them. Test scenarios like network partitions and split-brain conditions.
  • Provisioning New Nodes: The provided `destroy_droplet` function is a placeholder. A complete solution would involve a mechanism to provision a new Droplet (e.g., using Terraform or `python-digitalocean` to create a new Droplet with the correct configuration) and ensure it joins the Elasticsearch cluster correctly. This is a complex but crucial part of a fully automated recovery.
  • Monitoring Granularity: The health check is basic. For production, consider more granular checks: pinging nodes, checking shard allocation status more deeply, and monitoring resource utilization (CPU, memory, disk I/O) on Elasticsearch nodes.

Advanced Strategies and Future Enhancements

This architecture provides a foundational auto-failover mechanism. For even greater resilience:

  • Multi-Region Deployments: For disaster recovery against entire data center failures, deploy Elasticsearch clusters across multiple DigitalOcean regions. Implement cross-region replication and a global load balancing solution.
  • Automated Provisioning: Integrate with Infrastructure as Code (IaC) tools like Terraform or Ansible to automate the creation of new Elasticsearch nodes when failures are detected.
  • Advanced Health Checks: Implement checks for specific Elasticsearch metrics (e.g., indexing latency, search performance) beyond just the cluster health status.
  • Leader Election for Orchestrator: If running multiple instances of the Python orchestrator for redundancy, implement a leader election mechanism (e.g., using ZooKeeper, etcd, or a simple database-backed lock) to ensure only one instance is active at a time.
  • Graceful Shutdowns and Node Removal: Implement logic to gracefully remove nodes from the Elasticsearch cluster before destroying them, preventing unnecessary shard rebalancing or data loss.
  • Centralized Logging and Alerting: Forward logs from Elasticsearch nodes and the orchestrator to a centralized logging system (e.g., ELK stack itself, or a managed service) and configure alerts for critical events.

By combining robust Elasticsearch configurations with intelligent Python-based orchestration and leveraging the capabilities of DigitalOcean, you can build a highly available and resilient search infrastructure that automatically recovers from common failure scenarios.

Primary Sidebar

A little about the Author

Having 9+ Years of Experience in Software Development.
Expertised in Php Development, WordPress Custom Theme Development (From scratch using underscores or Genesis Framework or using any blank theme or Premium Theme), Custom Plugin Development. Hands on Experience on 3rd Party Php Extension like Chilkat, nSoftware.

Recent Posts

  • Step-by-Step: Diagnosing thread pools deadlock during concurrent ActiveRecord transaction processing on Linode Servers
  • Securing Your E-commerce APIs: Preventing SQL Injection (SQLi) in customized checkout queries in WooCommerce Implementations
  • Disaster Recovery 101: Architecting Auto-Failovers for MySQL and Ruby Deployments on Linode
  • High-Throughput Caching Strategies: Scaling MySQL for Perl Application APIs
  • Disaster Recovery 101: Architecting Auto-Failovers for DynamoDB and Laravel Deployments on DigitalOcean

Copyright © 2026 · Vinay Vengala