• Skip to secondary menu
  • Skip to main content
  • Skip to primary sidebar
  • Home
  • Projects
  • Products
  • Themes
  • Tools
  • Request for Quote

Vengala Vinay

Having 9+ Years of Experience in Software Development

  • Home
  • WordPress
  • PHP
    • Codeigniter
  • Django
  • Magento
  • Selenium
  • Server
Home » Disaster Recovery 101: Architecting Auto-Failovers for Elasticsearch and Python Deployments on Google Cloud

Disaster Recovery 101: Architecting Auto-Failovers for Elasticsearch and Python Deployments on Google Cloud

Designing for Resilience: Elasticsearch and Python Auto-Failover on GCP

This document outlines a robust, automated failover architecture for a critical Elasticsearch cluster and its associated Python-based microservices, deployed on Google Cloud Platform (GCP). The objective is to minimize downtime and data loss during infrastructure failures, ensuring high availability for end-user facing applications.

Elasticsearch Cluster High Availability and Failover Strategy

A multi-zone Elasticsearch cluster is the foundation of our resilience. We leverage Elasticsearch’s built-in master election and shard replication mechanisms, augmented by GCP’s regional capabilities and external health checks.

GCP Infrastructure Setup

We deploy Elasticsearch nodes across multiple zones within a single GCP region. This provides resilience against single-zone failures. For disaster recovery across regions, a separate, smaller cluster in a different region is maintained, with cross-cluster replication (CCR) configured.

Elasticsearch Configuration for Resilience

Key `elasticsearch.yml` settings to ensure high availability:

cluster.name: "my-production-cluster"
node.name: ${HOSTNAME}
network.host: 0.0.0.0
discovery.seed_hosts:
  - "es-node-1.gcp.internal:9300"
  - "es-node-2.gcp.internal:9300"
  - "es-node-3.gcp.internal:9300"
cluster.initial_master_nodes:
  - "node-1"
  - "node-2"
  - "node-3"
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.http.ssl.enabled: true

# Shard allocation settings for multi-zone resilience
cluster.routing.allocation.enable: all
cluster.routing.allocation.awareness.attributes: zone
indices.recovery.max_bytes_per_sec: 100mb
indices.thread_pool.write.size: 100
indices.thread_pool.write.queue_size: 1000

# Master node quorum for stability
discovery.zen.minimum_master_nodes: 2 # For a 3-node master eligible set

Note: The `discovery.seed_hosts` and `cluster.initial_master_nodes` should reflect your actual node names or IP addresses. For production, use stable internal DNS names or GCP’s internal load balancer IPs. `discovery.zen.minimum_master_nodes` should be `(N/2) + 1` where N is the number of master-eligible nodes.

Automated Failover Mechanism for Elasticsearch

We employ a multi-layered approach for detecting and reacting to Elasticsearch cluster failures:

  • GCP Load Balancer Health Checks: A Network Load Balancer (NLB) or HTTP(S) Load Balancer pointing to the Elasticsearch HTTP API (port 9200) is configured with aggressive health checks. These checks query the /_cluster/health endpoint. If a node fails these checks, the load balancer stops sending traffic to it.
  • External Monitoring Service (e.g., Prometheus/Alertmanager): A dedicated monitoring stack scrapes Elasticsearch metrics, including cluster status, node health, and shard allocation. Alerts are configured for critical conditions like unassigned shards, master node unavailability, or high latency.
  • Automated Recovery Script: Upon receiving critical alerts (e.g., via Alertmanager webhook), a Python script is triggered. This script can perform actions like restarting unhealthy nodes, re-allocating shards, or initiating a failover to a secondary cluster if the primary is irrecoverably down.

Example: Elasticsearch Health Check Script (Python)

This script can be run by a scheduler (like cron or Kubernetes CronJob) or triggered by an alerting system.

import requests
import json
import time
import os

# Configuration
ES_HOSTS = os.environ.get("ES_HOSTS", "http://localhost:9200").split(',')
HEALTH_CHECK_URL = "/_cluster/health"
TIMEOUT_SECONDS = 10
MAX_RETRIES = 3
RETRY_DELAY_SECONDS = 5

def check_es_health(host):
    """Checks the health of a single Elasticsearch node."""
    url = f"{host}{HEALTH_CHECK_URL}"
    for attempt in range(MAX_RETRIES):
        try:
            response = requests.get(url, timeout=TIMEOUT_SECONDS)
            response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
            health_data = response.json()
            # Basic health check: cluster status should be green or yellow
            if health_data.get("status") in ["green", "yellow"]:
                print(f"INFO: Elasticsearch cluster health at {host} is {health_data.get('status')}")
                return True, health_data
            else:
                print(f"WARNING: Elasticsearch cluster status at {host} is {health_data.get('status')}")
                return False, health_data
        except requests.exceptions.RequestException as e:
            print(f"ERROR: Failed to connect to {host} on attempt {attempt + 1}/{MAX_RETRIES}: {e}")
            if attempt < MAX_RETRIES - 1:
                time.sleep(RETRY_DELAY_SECONDS)
    return False, None

def main():
    """Orchestrates health checks across multiple Elasticsearch nodes."""
    all_healthy = False
    cluster_status = None

    for host in ES_HOSTS:
        healthy, health_data = check_es_health(host.strip())
        if healthy:
            all_healthy = True
            cluster_status = health_data.get("status")
            # If we find at least one healthy node and the cluster is green/yellow, we can consider it operational
            if cluster_status in ["green", "yellow"]:
                print("Elasticsearch cluster is operational.")
                return # Exit early if operational

    if not all_healthy:
        print("CRITICAL: All Elasticsearch nodes are unreachable or unhealthy.")
        # Trigger automated recovery/failover actions here
        # e.g., send alert to PagerDuty, trigger a GCP instance restart, or initiate cross-cluster failover
        trigger_failover_procedure()
    elif cluster_status not in ["green", "yellow"]:
        print(f"CRITICAL: Elasticsearch cluster status is {cluster_status}. Initiating recovery procedures.")
        # Trigger recovery procedures for non-green/yellow states
        trigger_recovery_procedure()
    else:
        print("Elasticsearch cluster is operational.")

def trigger_failover_procedure():
    """Placeholder for failover logic (e.g., to a secondary region)."""
    print("ACTION: Initiating full cluster failover to secondary region...")
    # Implement logic to switch DNS, activate read-only secondary, etc.
    pass

def trigger_recovery_procedure():
    """Placeholder for recovery logic (e.g., re-allocating shards, restarting nodes)."""
    print("ACTION: Initiating cluster recovery procedures (e.g., shard re-allocation, node restart)...")
    # Implement logic to address specific cluster issues
    pass

if __name__ == "__main__":
    main()

Cross-Cluster Replication (CCR) for DR

For disaster recovery beyond a single region, Elasticsearch's Cross-Cluster Replication (CCR) is essential. This asynchronously replicates indices from a primary cluster in one region to a secondary cluster in another.

Configuring CCR

On the leader (primary) cluster:

PUT _ccr/masterless/my-remote-cluster
{
  "remote_cluster": "my-secondary-region-cluster",
  "skip_unavailable_nodes": false
}

On the follower (secondary) cluster, configure the remote cluster connection:

PUT _cluster/settings
{
  "persistent": {
    "cluster": {
      "remote": {
        "my-primary-region-cluster": {
          "seeds": [
            "es-node-1.primary.gcp.internal:9300",
            "es-node-2.primary.gcp.internal:9300"
          ]
        }
      }
    }
  }
}

Then, on the leader cluster, configure replication for specific indices:

PUT my-index/_ccr/follow
{
  "remote_cluster": "my-secondary-region-cluster",
  "leader_index": "my-index"
}

The failover script can then be extended to promote the follower cluster to a leader if the primary region becomes unavailable. This typically involves stopping replication and making the follower cluster writable.

Python Microservice Resilience and Failover

Python microservices interacting with Elasticsearch need to be designed for failure. This involves robust error handling, connection pooling, and leveraging GCP's managed services for resilience.

GCP Deployment Strategy

Deploy Python services using Google Kubernetes Engine (GKE) or Cloud Run. Both offer auto-scaling, self-healing, and multi-zone deployment capabilities.

Python Client Configuration and Error Handling

The Python Elasticsearch client should be configured with:

  • Connection Pooling: Maintain persistent connections to reduce overhead.
  • Timeouts: Set reasonable timeouts for requests to prevent hanging.
  • Retries: Implement exponential backoff for transient network issues or temporary Elasticsearch unavailability.
  • Circuit Breakers: Prevent cascading failures by stopping requests to an unhealthy service.

Example: Robust Elasticsearch Interaction in Python

from elasticsearch import Elasticsearch, exceptions
from elasticsearch.helpers import bulk
import time
import os

# Configuration
ES_NODES = os.environ.get("ES_NODES", "http://localhost:9200").split(',')
ES_CLOUD_ID = os.environ.get("ES_CLOUD_ID") # For Elastic Cloud
ES_API_KEY = os.environ.get("ES_API_KEY") # For Elastic Cloud

# --- Connection Setup ---
es_client = None
if ES_CLOUD_ID and ES_API_KEY:
    try:
        es_client = Elasticsearch(
            cloud_id=ES_CLOUD_ID,
            api_key=ES_API_KEY,
            request_timeout=30, # Global timeout for requests
            max_retries=3,
            retry_on_timeout=True,
            sniff_on_start=True, # Discover nodes on startup
            sniff_before_requests=True # Re-discover nodes before each request if needed
        )
        print("Connected to Elasticsearch via Cloud ID and API Key.")
    except Exception as e:
        print(f"ERROR: Failed to connect to Elasticsearch via Cloud ID: {e}")
        # Fallback or error handling
elif ES_NODES:
    try:
        es_client = Elasticsearch(
            ES_NODES,
            request_timeout=30,
            max_retries=3,
            retry_on_timeout=True,
            sniff_on_start=True,
            sniff_before_requests=True
        )
        print(f"Connected to Elasticsearch nodes: {ES_NODES}")
    except Exception as e:
        print(f"ERROR: Failed to connect to Elasticsearch nodes {ES_NODES}: {e}")
        # Fallback or error handling
else:
    print("ERROR: No Elasticsearch connection details provided (ES_NODES or ES_CLOUD_ID/ES_API_KEY).")
    # Handle critical error: cannot connect to ES

# --- Data Ingestion Example with Resilience ---
def ingest_data(index_name, documents):
    """
    Ingests documents into Elasticsearch with retry logic.
    documents: An iterable of dictionaries, each representing a document.
    """
    if not es_client or not es_client.ping():
        print("ERROR: Elasticsearch client is not available or not connected. Cannot ingest data.")
        return False

    try:
        success_count, errors = bulk(
            es_client,
            documents,
            index=index_name,
            chunk_size=500,
            request_timeout=60, # Timeout for bulk operation
            raise_on_error=False, # Don't raise on individual document errors
            raise_on_exception=True # Raise on connection/transport errors
        )
        if errors:
            print(f"WARNING: Some documents failed to ingest: {errors}")
            # Implement specific error handling for failed documents
        print(f"Successfully ingested {success_count} documents.")
        return True
    except exceptions.ConnectionError as ce:
        print(f"ERROR: Elasticsearch Connection Error during bulk ingest: {ce}")
        # This might trigger a circuit breaker or retry mechanism in a larger app
        return False
    except exceptions.TransportError as te:
        print(f"ERROR: Elasticsearch Transport Error during bulk ingest: {te}")
        return False
    except Exception as e:
        print(f"ERROR: An unexpected error occurred during bulk ingest: {e}")
        return False

# --- Data Search Example with Resilience ---
def search_data(index_name, query):
    """
    Searches Elasticsearch with retry logic.
    query: Elasticsearch query dictionary.
    """
    if not es_client or not es_client.ping():
        print("ERROR: Elasticsearch client is not available or not connected. Cannot search data.")
        return None

    retries = 3
    delay = 5
    for i in range(retries):
        try:
            response = es_client.search(
                index=index_name,
                body=query,
                request_timeout=20 # Timeout for search request
            )
            return response['hits']['hits']
        except exceptions.ConnectionError as ce:
            print(f"ERROR: Elasticsearch Connection Error during search (Attempt {i+1}/{retries}): {ce}")
            time.sleep(delay * (i+1))
        except exceptions.TransportError as te:
            print(f"ERROR: Elasticsearch Transport Error during search (Attempt {i+1}/{retries}): {te}")
            time.sleep(delay * (i+1))
        except Exception as e:
            print(f"ERROR: An unexpected error occurred during search (Attempt {i+1}/{retries}): {e}")
            time.sleep(delay * (i+1))
    print("ERROR: Max retries reached for search operation. Elasticsearch is likely unavailable.")
    return None

# Example Usage:
if __name__ == "__main__":
    # Simulate documents to ingest
    sample_docs = [
        {"_id": "doc1", "title": "First Document", "content": "This is the content of the first document."},
        {"_id": "doc2", "title": "Second Document", "content": "Content for the second document."},
    ]

    if ingest_data("my-test-index", sample_docs):
        print("Data ingestion successful.")

    # Simulate a search query
    search_query = {
        "query": {
            "match": {
                "content": "second document"
            }
        }
    }

    results = search_data("my-test-index", search_query)
    if results is not None:
        print(f"Search results: {results}")
    else:
        print("Search operation failed.")

GCP Managed Services for Resilience

Leverage GCP's managed services to offload operational burden and enhance resilience:

  • Google Kubernetes Engine (GKE): Deploy Python services as Deployments with multiple replicas. GKE's node auto-repair and auto-upgrade, combined with Pod anti-affinity rules, ensure that replicas are spread across nodes and zones, and unhealthy Pods are automatically restarted.
  • Cloud Run: For stateless services, Cloud Run provides automatic scaling and high availability without managing infrastructure. It handles traffic distribution and instance restarts transparently.
  • Cloud SQL/Spanner: If your Python services rely on relational databases, use managed options like Cloud SQL (with High Availability configuration) or Cloud Spanner for regional or multi-regional resilience.
  • Cloud Load Balancing: Use GCP's Global or Regional Load Balancers to distribute traffic to your Python service instances. Health checks configured on the load balancer will automatically remove unhealthy instances from rotation.

Automated Failover for Python Services

Automated failover for Python services typically involves:

  • GKE Deployment Replicas: If a node hosting Python service Pods fails, GKE will reschedule those Pods onto healthy nodes.
  • Application-Level Failover: For critical dependencies (like Elasticsearch), implement retry logic with circuit breakers. If a dependency is consistently unavailable, the Python service might enter a degraded mode, return cached data, or queue requests for later processing.
  • Service Discovery: Use Kubernetes Services or GCP's Service Directory to abstract the network endpoints of your Python services. This allows clients to connect to healthy instances without needing to know their individual IP addresses.
  • External Load Balancer Health Checks: As mentioned, GCP Load Balancers will stop sending traffic to unhealthy instances of your Python services.

Orchestrating Failover with GCP Tools

GCP provides several tools that can be integrated into your failover strategy:

  • Cloud Monitoring & Alerting: Set up custom metrics and alerts for both Elasticsearch and Python services. Alerts can trigger Cloud Functions or Cloud Run jobs to perform automated recovery actions.
  • Cloud Functions/Cloud Run Jobs: These serverless compute options are ideal for running your automated recovery scripts. They can be triggered by Pub/Sub messages from Cloud Monitoring alerts.
  • Cloud DNS: For regional failover, Cloud DNS can be updated (via API calls from your recovery scripts) to point traffic to a secondary region's load balancer.
  • Managed Instance Groups (MIGs): While GKE is preferred for containerized workloads, MIGs can be used for VM-based deployments. MIGs support auto-healing and auto-scaling, and can be configured for multi-zone deployments.

Example: Triggering Recovery via Cloud Functions

A Cloud Function can be triggered by an Alerting policy in Cloud Monitoring. The alert could indicate that the Elasticsearch cluster health is not 'green' for an extended period.

# main.py (Cloud Function)
import functions_framework
import google.auth
import google.auth.transport.requests
import google.cloud.storage
import google.cloud.pubsub_v1
import json
import os

# Replace with your actual Elasticsearch recovery/failover logic
def trigger_es_recovery_actions():
    print("Executing Elasticsearch recovery actions...")
    # Example: Call an internal API, restart specific VMs, or initiate CCR promotion
    # For demonstration, we'll just print.
    print("Simulating Elasticsearch recovery: Re-allocating shards and checking master nodes.")
    # In a real scenario, you might use the Elasticsearch Python client here
    # or trigger a more complex workflow.
    return True

@functions_framework.cloud_event
def handle_alert(cloud_event):
    """
    Triggered by a Cloud Monitoring alert via Pub/Sub.
    """
    print(f"Received Cloud Event: {cloud_event.data}")

    try:
        alert_payload = json.loads(cloud_event.data.decode('utf-8'))
        alert_name = alert_payload.get("alertDisplayName", "Unknown Alert")
        resource_type = alert_payload.get("resourceType", "Unknown Resource")
        resource_id = alert_payload.get("resourceId", "Unknown ID")
        metric_type = alert_payload.get("metricType", "Unknown Metric")
        condition = alert_payload.get("condition", "Unknown Condition")

        print(f"Alert Name: {alert_name}")
        print(f"Resource: {resource_type}/{resource_id}")
        print(f"Metric: {metric_type}")
        print(f"Condition: {condition}")

        # --- Custom Logic for Elasticsearch Failover ---
        # This is a simplified example. You'd need more sophisticated logic
        # to determine the exact failure and appropriate action.
        if "elasticsearch" in alert_name.lower() and "unhealthy" in condition.lower():
            print("Detected Elasticsearch unhealthy alert. Initiating recovery.")
            success = trigger_es_recovery_actions()
            if success:
                print("Elasticsearch recovery actions initiated successfully.")
            else:
                print("Failed to initiate Elasticsearch recovery actions.")
                # Consider escalating or retrying

        # Add logic for Python service failures if needed

        return "Alert processed successfully."

    except Exception as e:
        print(f"Error processing alert: {e}")
        # Log the error and potentially send a notification to an ops channel
        return f"Error processing alert: {e}", 500

# Example of how to deploy this function:
# gcloud functions deploy handle_alert \
#   --runtime python39 \
#   --trigger-topic YOUR_ALERTING_TOPIC_NAME \
#   --entry-point handle_alert \
#   --region YOUR_GCP_REGION \
#   --service-account YOUR_SERVICE_ACCOUNT_EMAIL \
#   --allow-unauthenticated # Or configure authentication as needed

This setup ensures that critical infrastructure failures are detected and that automated recovery procedures are initiated, minimizing manual intervention and reducing Mean Time To Recovery (MTTR).

Primary Sidebar

A little about the Author

Having 9+ Years of Experience in Software Development.
Expertised in Php Development, WordPress Custom Theme Development (From scratch using underscores or Genesis Framework or using any blank theme or Premium Theme), Custom Plugin Development. Hands on Experience on 3rd Party Php Extension like Chilkat, nSoftware.

Recent Posts

  • Disaster Recovery 101: Architecting Auto-Failovers for Redis and PHP Deployments on OVH
  • How We Audited a High-Traffic WooCommerce Enterprise Stack on Google Cloud and Mitigated Race conditions during high-concurrency payment processing
  • Disaster Recovery 101: Architecting Auto-Failovers for Elasticsearch and Magento 2 Deployments on DigitalOcean
  • An Auditor’s Checklist for Securing WordPress Backends on OVH
  • Step-by-Step: Diagnosing Perl script high CPU throttling due to unoptimized regular expressions on AWS Servers

Copyright © 2026 · Vinay Vengala