• Skip to secondary menu
  • Skip to main content
  • Skip to primary sidebar
  • Home
  • Projects
  • Products
  • Themes
  • Tools
  • Request for Quote

Vengala Vinay

Having 9+ Years of Experience in Software Development

  • Home
  • WordPress
  • PHP
    • Codeigniter
  • Django
  • Magento
  • Selenium
  • Server
Home » Server Monitoring Best Practices: Keeping Your Python App and Elasticsearch Clusters Alive on Google Cloud

Server Monitoring Best Practices: Keeping Your Python App and Elasticsearch Clusters Alive on Google Cloud

Proactive Health Checks for Python Applications on GCE

Maintaining the health of Python applications deployed on Google Compute Engine (GCE) requires a multi-layered approach, starting with robust application-level health checks. These checks should go beyond simple port availability and delve into the application’s internal state. For a typical web application using Flask or Django, this means exposing an endpoint that verifies database connectivity, cache status, and essential service dependencies.

We’ll implement a simple Flask-based health check endpoint. This endpoint will query a mock database and a cache service. In a real-world scenario, you’d replace these with actual database connection checks and cache client pings.

Flask Health Check Endpoint

Create a file named healthcheck.py:

from flask import Flask, jsonify
import redis # Assuming Redis for caching

app = Flask(__name__)

# Mock database connection function
def check_database_connection():
    # In a real app, this would be a DB query or connection pool check
    # For demonstration, we'll just return True
    return True

# Mock cache connection function
def check_cache_connection():
    try:
        r = redis.Redis(host='your-redis-host', port=6379, db=0, socket_timeout=1)
        r.ping()
        return True
    except redis.exceptions.ConnectionError:
        return False

@app.route('/healthz', methods=['GET'])
def health_check():
    db_ok = check_database_connection()
    cache_ok = check_cache_connection()

    if db_ok and cache_ok:
        return jsonify({"status": "ok", "database": "ok", "cache": "ok"}), 200
    else:
        response = {"status": "degraded", "database": "ok" if db_ok else "failed", "cache": "ok" if cache_ok else "failed"}
        return jsonify(response), 503 # Service Unavailable

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

To integrate this with GCE’s load balancing and instance health checks, you’ll configure a health check in the Google Cloud Console or via gcloud. This health check will target the /healthz endpoint on your application’s port (e.g., 5000).

GCE Health Check Configuration (gcloud CLI)

Assuming your application runs on port 5000 and you have an instance group named my-python-app-ig:

gcloud compute health-checks create http my-python-app-health-check \
    --request-path="/healthz" \
    --port=5000 \
    --check-interval=30s \
    --timeout=5s \
    --unhealthy-threshold=3 \
    --healthy-threshold=2

gcloud compute instance-groups managed \
    update-health-check my-python-app-ig \
    --health-check=my-python-app-health-check \
    --zone=us-central1-a # Replace with your zone

This setup ensures that GCE’s load balancer will stop sending traffic to instances that fail the application-level health check, preventing users from hitting unhealthy application instances.

Monitoring Elasticsearch Clusters on Google Cloud

Elasticsearch clusters, especially when managed on GCE or GKE, require dedicated monitoring strategies. Key metrics to track include cluster health (green, yellow, red), node status, JVM heap usage, disk I/O, network traffic, and query latency. Google Cloud’s operations suite (formerly Stackdriver) provides excellent tools for this, but direct integration with Elasticsearch’s APIs is crucial for granular insights.

Leveraging Elasticsearch APIs for Monitoring

The Elasticsearch Cluster Health API (_cluster/health) is fundamental. It provides a snapshot of the cluster’s status, number of nodes, shards, and pending tasks. The Nodes Stats API (_nodes/stats) offers detailed performance metrics for each node, including JVM, filesystem, and network statistics.

# Get cluster health
curl -X GET "http://localhost:9200/_cluster/health?pretty"

# Get nodes stats
curl -X GET "http://localhost:9200/_nodes/stats?pretty"

For automated monitoring, we can use a Python script to periodically query these APIs and send the data to Google Cloud Monitoring. This script can be run as a cron job on a dedicated monitoring instance or as a sidecar container in a GKE deployment.

Python Script for Elasticsearch Monitoring Data Ingestion

This script fetches cluster health and node stats and sends them as custom metrics to Google Cloud Monitoring.

import google.auth
from google.cloud import monitoring_v3
import requests
import time
import json
import os

# --- Configuration ---
ELASTICSEARCH_HOST = os.environ.get("ELASTICSEARCH_HOST", "http://localhost:9200")
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT")
if not PROJECT_ID:
    _, PROJECT_ID = google.auth.default()

# --- Google Cloud Monitoring Client ---
client = monitoring_v3.MetricServiceClient()
project_name = f"projects/{PROJECT_ID}"

# --- Helper function to send metrics ---
def send_metric(metric_type, value, resource_type, resource_labels, metric_labels=None):
    if metric_labels is None:
        metric_labels = {}

    series = monitoring_v3.TimeSeries()
    series.metric.type = metric_type
    series.resource.type = resource_type
    series.resource.labels.update(resource_labels)
    series.metric.labels.update(metric_labels)

    point = monitoring_v3.Point()
    point.value.double_value = float(value)
    now = time.time()
    point.interval.end_time.seconds = int(now)
    point.interval.end_time.nanos = int((now - point.interval.end_time.seconds) * 10**9)
    series.points = [point]

    try:
        client.create_time_series(name=project_name, time_series=[series])
        print(f"Sent metric: {metric_type} = {value}")
    except Exception as e:
        print(f"Error sending metric {metric_type}: {e}")

# --- Main monitoring loop ---
def monitor_elasticsearch():
    try:
        # 1. Get Cluster Health
        health_response = requests.get(f"{ELASTICSEARCH_HOST}/_cluster/health?pretty")
        health_response.raise_for_status() # Raise an exception for bad status codes
        health_data = health_response.json()

        cluster_name = health_data.get("cluster_name", "unknown")
        status = health_data.get("status", "red") # Default to red if status is missing
        num_nodes = health_data.get("number_of_nodes", 0)
        num_data_nodes = health_data.get("number_of_data_nodes", 0)
        active_shards = health_data.get("active_shards", 0)
        relocating_shards = health_data.get("relocating_shards", 0)
        initializing_shards = health_data.get("initializing_shards", 0)
        unassigned_shards = health_data.get("unassigned_shards", 0)

        # Map status to numeric values for GCM
        status_map = {"green": 2, "yellow": 1, "red": 0}
        status_numeric = status_map.get(status, 0)

        resource_labels_cluster = {
            "project_id": PROJECT_ID,
            "cluster_name": cluster_name,
            "location": "global" # Or a specific region if applicable
        }

        send_metric("custom.googleapis.com/elasticsearch/cluster/health/status", status_numeric, "generic_task", resource_labels_cluster, {"status_text": status})
        send_metric("custom.googleapis.com/elasticsearch/cluster/nodes/count", num_nodes, "generic_task", resource_labels_cluster)
        send_metric("custom.googleapis.com/elasticsearch/cluster/data_nodes/count", num_data_nodes, "generic_task", resource_labels_cluster)
        send_metric("custom.googleapis.com/elasticsearch/cluster/shards/active", active_shards, "generic_task", resource_labels_cluster)
        send_metric("custom.googleapis.com/elasticsearch/cluster/shards/relocating", relocating_shards, "generic_task", resource_labels_cluster)
        send_metric("custom.googleapis.com/elasticsearch/cluster/shards/initializing", initializing_shards, "generic_task", resource_labels_cluster)
        send_metric("custom.googleapis.com/elasticsearch/cluster/shards/unassigned", unassigned_shards, "generic_task", resource_labels_cluster)

        # 2. Get Nodes Stats (simplified for one node, extend for multiple)
        nodes_stats_response = requests.get(f"{ELASTICSEARCH_HOST}/_nodes/stats?pretty")
        nodes_stats_response.raise_for_status()
        nodes_stats_data = nodes_stats_response.json()

        for node_id, node_data in nodes_stats_data.get("nodes", {}).items():
            node_name = node_data.get("name", node_id)
            host = node_data.get("host", "unknown")

            # JVM Heap
            jvm_heap_used_percent = node_data.get("jvm", {}).get("mem", {}).get("heap_used_percent", 0)
            jvm_heap_max_bytes = node_data.get("jvm", {}).get("mem", {}).get("heap_max_in_bytes", 0)

            # Filesystem
            fs_data = node_data.get("fs", {}).get("data", [{}])[0] # Assuming single data path for simplicity
            fs_available_bytes = fs_data.get("available_in_bytes", 0)
            fs_total_bytes = fs_data.get("total_in_bytes", 0)
            fs_used_percent = (fs_total_bytes - fs_available_bytes) / fs_total_bytes * 100 if fs_total_bytes else 0

            # Network
            network_total_bytes_sent = node_data.get("network", {}).get("total", {}).get("bytes_sent", 0)
            network_total_bytes_received = node_data.get("network", {}).get("total", {}).get("bytes_received", 0)

            resource_labels_node = {
                "project_id": PROJECT_ID,
                "cluster_name": cluster_name,
                "node_name": node_name,
                "node_id": node_id,
                "host": host,
                "location": "global" # Or a specific region
            }

            send_metric("custom.googleapis.com/elasticsearch/node/jvm/heap/used_percent", jvm_heap_used_percent, "generic_task", resource_labels_node)
            send_metric("custom.googleapis.com/elasticsearch/node/jvm/heap/max_bytes", jvm_heap_max_bytes, "generic_task", resource_labels_node)
            send_metric("custom.googleapis.com/elasticsearch/node/filesystem/used_percent", fs_used_percent, "generic_task", resource_labels_node)
            send_metric("custom.googleapis.com/elasticsearch/node/filesystem/total_bytes", fs_total_bytes, "generic_task", resource_labels_node)
            send_metric("custom.googleapis.com/elasticsearch/node/network/bytes_sent", network_total_bytes_sent, "generic_task", resource_labels_node)
            send_metric("custom.googleapis.com/elasticsearch/node/network/bytes_received", network_total_bytes_received, "generic_task", resource_labels_node)

    except requests.exceptions.RequestException as e:
        print(f"Error connecting to Elasticsearch: {e}")
        # Optionally send an alert metric here
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

if __name__ == "__main__":
    # Run the monitoring loop every 60 seconds
    while True:
        monitor_elasticsearch()
        time.sleep(60)

To run this script:

  • Install necessary libraries: pip install google-cloud-monitoring requests redis
  • Set environment variables: export ELASTICSEARCH_HOST="http://your-es-endpoint:9200" and ensure your environment is authenticated with Google Cloud (e.g., via `gcloud auth application-default login` or service account keys).
  • Run the script: python your_monitor_script.py

You can then create dashboards and alerting policies in Google Cloud Monitoring based on these custom metrics. For instance, an alert can be configured to trigger if the cluster health status remains “red” or “yellow” for more than 5 minutes, or if any node’s disk usage exceeds 90%.

Integrating with Google Cloud Operations Suite

While custom scripts provide granular control, leveraging Google Cloud’s managed services for logging and monitoring is essential for a holistic view. The Ops Agent (formerly Stackdriver agent) can collect system-level metrics and logs from your GCE instances. For Elasticsearch, consider deploying the official Elastic Cloud on Kubernetes (ECK) operator if you’re on GKE, which simplifies management and integrates well with Prometheus-based monitoring, which can then be scraped by the Ops Agent.

Ops Agent Configuration for Elasticsearch Logs

If your Elasticsearch logs are written to files (e.g., /var/log/elasticsearch/cluster_name.log), you can configure the Ops Agent to collect them. Edit the agent configuration file (typically /etc/google-cloud-ops-agent/config.yaml):

logging:
  receivers:
    elasticsearch_logs:
      type: files
      include_paths:
        - /var/log/elasticsearch/*.log # Adjust path as needed
      record_log_line: true
  processors:
    - type: parser_json
      field: 'message' # If your logs are JSON formatted
    - type: add_severity
      # Map log levels to severity
      log_level_map:
        DEBUG: DEBUG
        INFO: INFO
        WARN: WARNING
        ERROR: ERROR
        FATAL: CRITICAL
  
  # Example for structured logs
  # processors:
  #   - type: json_payload
  #     field: message
  #     key_prefix: elasticsearch_log

  # Example for unstructured logs
  # processors:
  #   - type: regex_parser
  #     regex: '^(?P<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)\s+(?P<level>\w+)\s+\[(?P<thread>.*?)\]\s+(?P<logger>.*?)\s+-\s+(?P<message>.*)$'
  #     timestamp_key: timestamp
  #     timestamp_format: '%Y-%m-%dT%H:%M:%S.%fZ'
  #     severity_key: level
  #     severity_map:
  #       DEBUG: DEBUG
  #       INFO: INFO
  #       WARN: WARNING
  #       ERROR: ERROR
  #       FATAL: CRITICAL

  # Example for adding cluster name as a label
  # processors:
  #   - type: replace_keys
  #     keys:
  #       message: elasticsearch_message
  #   - type: add_fields
  #     fields:
  #       elasticsearch_cluster_name: "my-es-cluster" # Replace with actual cluster name

  # Example for routing to a specific log name in Cloud Logging
  # log_name: "projects/YOUR_PROJECT_ID/logs/elasticsearch.log"

logs:
  - name: my-elasticsearch-logs
    receivers: [elasticsearch_logs]
    processors: [parser_json, add_severity] # Use appropriate processors
    # log_name: "projects/YOUR_PROJECT_ID/logs/elasticsearch.log" # Optional: specify log name

After updating the configuration, restart the Ops Agent: sudo systemctl restart google-cloud-ops-agent. This will stream your Elasticsearch logs to Cloud Logging, where you can analyze them, create metrics from log entries, and set up alerts.

Alerting Strategies and Best Practices

Effective alerting is crucial for proactive incident response. Combine different types of alerts:

  • Availability Alerts: Triggered when application health checks fail consistently or when Elasticsearch cluster health drops to “yellow” or “red”.
  • Performance Alerts: Based on custom metrics (e.g., high JVM heap usage, low disk space on Elasticsearch nodes, increased request latency for Python apps).
  • Error Rate Alerts: Monitor application error logs or specific HTTP status codes (e.g., 5xx errors) from load balancers or application logs.
  • Resource Saturation Alerts: High CPU, memory, or disk I/O on GCE instances hosting Python apps or Elasticsearch nodes.

When setting up alerts in Google Cloud Monitoring, ensure you define clear notification channels (e.g., PagerDuty, Slack, email) and use appropriate thresholds. Avoid alert fatigue by tuning thresholds and using composite alerts that require multiple conditions to be met before firing.

Primary Sidebar

A little about the Author

Having 9+ Years of Experience in Software Development.
Expertised in Php Development, WordPress Custom Theme Development (From scratch using underscores or Genesis Framework or using any blank theme or Premium Theme), Custom Plugin Development. Hands on Experience on 3rd Party Php Extension like Chilkat, nSoftware.

Recent Posts

  • Step-by-Step: Diagnosing thread pools deadlock during concurrent ActiveRecord transaction processing on Linode Servers
  • Securing Your E-commerce APIs: Preventing SQL Injection (SQLi) in customized checkout queries in WooCommerce Implementations
  • Disaster Recovery 101: Architecting Auto-Failovers for MySQL and Ruby Deployments on Linode
  • High-Throughput Caching Strategies: Scaling MySQL for Perl Application APIs
  • Disaster Recovery 101: Architecting Auto-Failovers for DynamoDB and Laravel Deployments on DigitalOcean

Copyright © 2026 · Vinay Vengala