Server Monitoring Best Practices: Keeping Your Python App and Elasticsearch Clusters Alive on Google Cloud
Proactive Health Checks for Python Applications on GCE
Maintaining the health of Python applications deployed on Google Compute Engine (GCE) requires a multi-layered approach, starting with robust application-level health checks. These checks should go beyond simple port availability and delve into the application’s internal state. For a typical web application using Flask or Django, this means exposing an endpoint that verifies database connectivity, cache status, and essential service dependencies.
We’ll implement a simple Flask-based health check endpoint. This endpoint will query a mock database and a cache service. In a real-world scenario, you’d replace these with actual database connection checks and cache client pings.
Flask Health Check Endpoint
Create a file named healthcheck.py:
from flask import Flask, jsonify
import redis # Assuming Redis for caching
app = Flask(__name__)
# Mock database connection function
def check_database_connection():
# In a real app, this would be a DB query or connection pool check
# For demonstration, we'll just return True
return True
# Mock cache connection function
def check_cache_connection():
try:
r = redis.Redis(host='your-redis-host', port=6379, db=0, socket_timeout=1)
r.ping()
return True
except redis.exceptions.ConnectionError:
return False
@app.route('/healthz', methods=['GET'])
def health_check():
db_ok = check_database_connection()
cache_ok = check_cache_connection()
if db_ok and cache_ok:
return jsonify({"status": "ok", "database": "ok", "cache": "ok"}), 200
else:
response = {"status": "degraded", "database": "ok" if db_ok else "failed", "cache": "ok" if cache_ok else "failed"}
return jsonify(response), 503 # Service Unavailable
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
To integrate this with GCE’s load balancing and instance health checks, you’ll configure a health check in the Google Cloud Console or via gcloud. This health check will target the /healthz endpoint on your application’s port (e.g., 5000).
GCE Health Check Configuration (gcloud CLI)
Assuming your application runs on port 5000 and you have an instance group named my-python-app-ig:
gcloud compute health-checks create http my-python-app-health-check \
--request-path="/healthz" \
--port=5000 \
--check-interval=30s \
--timeout=5s \
--unhealthy-threshold=3 \
--healthy-threshold=2
gcloud compute instance-groups managed \
update-health-check my-python-app-ig \
--health-check=my-python-app-health-check \
--zone=us-central1-a # Replace with your zone
This setup ensures that GCE’s load balancer will stop sending traffic to instances that fail the application-level health check, preventing users from hitting unhealthy application instances.
Monitoring Elasticsearch Clusters on Google Cloud
Elasticsearch clusters, especially when managed on GCE or GKE, require dedicated monitoring strategies. Key metrics to track include cluster health (green, yellow, red), node status, JVM heap usage, disk I/O, network traffic, and query latency. Google Cloud’s operations suite (formerly Stackdriver) provides excellent tools for this, but direct integration with Elasticsearch’s APIs is crucial for granular insights.
Leveraging Elasticsearch APIs for Monitoring
The Elasticsearch Cluster Health API (_cluster/health) is fundamental. It provides a snapshot of the cluster’s status, number of nodes, shards, and pending tasks. The Nodes Stats API (_nodes/stats) offers detailed performance metrics for each node, including JVM, filesystem, and network statistics.
# Get cluster health curl -X GET "http://localhost:9200/_cluster/health?pretty" # Get nodes stats curl -X GET "http://localhost:9200/_nodes/stats?pretty"
For automated monitoring, we can use a Python script to periodically query these APIs and send the data to Google Cloud Monitoring. This script can be run as a cron job on a dedicated monitoring instance or as a sidecar container in a GKE deployment.
Python Script for Elasticsearch Monitoring Data Ingestion
This script fetches cluster health and node stats and sends them as custom metrics to Google Cloud Monitoring.
import google.auth
from google.cloud import monitoring_v3
import requests
import time
import json
import os
# --- Configuration ---
ELASTICSEARCH_HOST = os.environ.get("ELASTICSEARCH_HOST", "http://localhost:9200")
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT")
if not PROJECT_ID:
_, PROJECT_ID = google.auth.default()
# --- Google Cloud Monitoring Client ---
client = monitoring_v3.MetricServiceClient()
project_name = f"projects/{PROJECT_ID}"
# --- Helper function to send metrics ---
def send_metric(metric_type, value, resource_type, resource_labels, metric_labels=None):
if metric_labels is None:
metric_labels = {}
series = monitoring_v3.TimeSeries()
series.metric.type = metric_type
series.resource.type = resource_type
series.resource.labels.update(resource_labels)
series.metric.labels.update(metric_labels)
point = monitoring_v3.Point()
point.value.double_value = float(value)
now = time.time()
point.interval.end_time.seconds = int(now)
point.interval.end_time.nanos = int((now - point.interval.end_time.seconds) * 10**9)
series.points = [point]
try:
client.create_time_series(name=project_name, time_series=[series])
print(f"Sent metric: {metric_type} = {value}")
except Exception as e:
print(f"Error sending metric {metric_type}: {e}")
# --- Main monitoring loop ---
def monitor_elasticsearch():
try:
# 1. Get Cluster Health
health_response = requests.get(f"{ELASTICSEARCH_HOST}/_cluster/health?pretty")
health_response.raise_for_status() # Raise an exception for bad status codes
health_data = health_response.json()
cluster_name = health_data.get("cluster_name", "unknown")
status = health_data.get("status", "red") # Default to red if status is missing
num_nodes = health_data.get("number_of_nodes", 0)
num_data_nodes = health_data.get("number_of_data_nodes", 0)
active_shards = health_data.get("active_shards", 0)
relocating_shards = health_data.get("relocating_shards", 0)
initializing_shards = health_data.get("initializing_shards", 0)
unassigned_shards = health_data.get("unassigned_shards", 0)
# Map status to numeric values for GCM
status_map = {"green": 2, "yellow": 1, "red": 0}
status_numeric = status_map.get(status, 0)
resource_labels_cluster = {
"project_id": PROJECT_ID,
"cluster_name": cluster_name,
"location": "global" # Or a specific region if applicable
}
send_metric("custom.googleapis.com/elasticsearch/cluster/health/status", status_numeric, "generic_task", resource_labels_cluster, {"status_text": status})
send_metric("custom.googleapis.com/elasticsearch/cluster/nodes/count", num_nodes, "generic_task", resource_labels_cluster)
send_metric("custom.googleapis.com/elasticsearch/cluster/data_nodes/count", num_data_nodes, "generic_task", resource_labels_cluster)
send_metric("custom.googleapis.com/elasticsearch/cluster/shards/active", active_shards, "generic_task", resource_labels_cluster)
send_metric("custom.googleapis.com/elasticsearch/cluster/shards/relocating", relocating_shards, "generic_task", resource_labels_cluster)
send_metric("custom.googleapis.com/elasticsearch/cluster/shards/initializing", initializing_shards, "generic_task", resource_labels_cluster)
send_metric("custom.googleapis.com/elasticsearch/cluster/shards/unassigned", unassigned_shards, "generic_task", resource_labels_cluster)
# 2. Get Nodes Stats (simplified for one node, extend for multiple)
nodes_stats_response = requests.get(f"{ELASTICSEARCH_HOST}/_nodes/stats?pretty")
nodes_stats_response.raise_for_status()
nodes_stats_data = nodes_stats_response.json()
for node_id, node_data in nodes_stats_data.get("nodes", {}).items():
node_name = node_data.get("name", node_id)
host = node_data.get("host", "unknown")
# JVM Heap
jvm_heap_used_percent = node_data.get("jvm", {}).get("mem", {}).get("heap_used_percent", 0)
jvm_heap_max_bytes = node_data.get("jvm", {}).get("mem", {}).get("heap_max_in_bytes", 0)
# Filesystem
fs_data = node_data.get("fs", {}).get("data", [{}])[0] # Assuming single data path for simplicity
fs_available_bytes = fs_data.get("available_in_bytes", 0)
fs_total_bytes = fs_data.get("total_in_bytes", 0)
fs_used_percent = (fs_total_bytes - fs_available_bytes) / fs_total_bytes * 100 if fs_total_bytes else 0
# Network
network_total_bytes_sent = node_data.get("network", {}).get("total", {}).get("bytes_sent", 0)
network_total_bytes_received = node_data.get("network", {}).get("total", {}).get("bytes_received", 0)
resource_labels_node = {
"project_id": PROJECT_ID,
"cluster_name": cluster_name,
"node_name": node_name,
"node_id": node_id,
"host": host,
"location": "global" # Or a specific region
}
send_metric("custom.googleapis.com/elasticsearch/node/jvm/heap/used_percent", jvm_heap_used_percent, "generic_task", resource_labels_node)
send_metric("custom.googleapis.com/elasticsearch/node/jvm/heap/max_bytes", jvm_heap_max_bytes, "generic_task", resource_labels_node)
send_metric("custom.googleapis.com/elasticsearch/node/filesystem/used_percent", fs_used_percent, "generic_task", resource_labels_node)
send_metric("custom.googleapis.com/elasticsearch/node/filesystem/total_bytes", fs_total_bytes, "generic_task", resource_labels_node)
send_metric("custom.googleapis.com/elasticsearch/node/network/bytes_sent", network_total_bytes_sent, "generic_task", resource_labels_node)
send_metric("custom.googleapis.com/elasticsearch/node/network/bytes_received", network_total_bytes_received, "generic_task", resource_labels_node)
except requests.exceptions.RequestException as e:
print(f"Error connecting to Elasticsearch: {e}")
# Optionally send an alert metric here
except Exception as e:
print(f"An unexpected error occurred: {e}")
if __name__ == "__main__":
# Run the monitoring loop every 60 seconds
while True:
monitor_elasticsearch()
time.sleep(60)
To run this script:
- Install necessary libraries:
pip install google-cloud-monitoring requests redis - Set environment variables:
export ELASTICSEARCH_HOST="http://your-es-endpoint:9200"and ensure your environment is authenticated with Google Cloud (e.g., via `gcloud auth application-default login` or service account keys). - Run the script:
python your_monitor_script.py
You can then create dashboards and alerting policies in Google Cloud Monitoring based on these custom metrics. For instance, an alert can be configured to trigger if the cluster health status remains “red” or “yellow” for more than 5 minutes, or if any node’s disk usage exceeds 90%.
Integrating with Google Cloud Operations Suite
While custom scripts provide granular control, leveraging Google Cloud’s managed services for logging and monitoring is essential for a holistic view. The Ops Agent (formerly Stackdriver agent) can collect system-level metrics and logs from your GCE instances. For Elasticsearch, consider deploying the official Elastic Cloud on Kubernetes (ECK) operator if you’re on GKE, which simplifies management and integrates well with Prometheus-based monitoring, which can then be scraped by the Ops Agent.
Ops Agent Configuration for Elasticsearch Logs
If your Elasticsearch logs are written to files (e.g., /var/log/elasticsearch/cluster_name.log), you can configure the Ops Agent to collect them. Edit the agent configuration file (typically /etc/google-cloud-ops-agent/config.yaml):
logging:
receivers:
elasticsearch_logs:
type: files
include_paths:
- /var/log/elasticsearch/*.log # Adjust path as needed
record_log_line: true
processors:
- type: parser_json
field: 'message' # If your logs are JSON formatted
- type: add_severity
# Map log levels to severity
log_level_map:
DEBUG: DEBUG
INFO: INFO
WARN: WARNING
ERROR: ERROR
FATAL: CRITICAL
# Example for structured logs
# processors:
# - type: json_payload
# field: message
# key_prefix: elasticsearch_log
# Example for unstructured logs
# processors:
# - type: regex_parser
# regex: '^(?P<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)\s+(?P<level>\w+)\s+\[(?P<thread>.*?)\]\s+(?P<logger>.*?)\s+-\s+(?P<message>.*)$'
# timestamp_key: timestamp
# timestamp_format: '%Y-%m-%dT%H:%M:%S.%fZ'
# severity_key: level
# severity_map:
# DEBUG: DEBUG
# INFO: INFO
# WARN: WARNING
# ERROR: ERROR
# FATAL: CRITICAL
# Example for adding cluster name as a label
# processors:
# - type: replace_keys
# keys:
# message: elasticsearch_message
# - type: add_fields
# fields:
# elasticsearch_cluster_name: "my-es-cluster" # Replace with actual cluster name
# Example for routing to a specific log name in Cloud Logging
# log_name: "projects/YOUR_PROJECT_ID/logs/elasticsearch.log"
logs:
- name: my-elasticsearch-logs
receivers: [elasticsearch_logs]
processors: [parser_json, add_severity] # Use appropriate processors
# log_name: "projects/YOUR_PROJECT_ID/logs/elasticsearch.log" # Optional: specify log name
After updating the configuration, restart the Ops Agent: sudo systemctl restart google-cloud-ops-agent. This will stream your Elasticsearch logs to Cloud Logging, where you can analyze them, create metrics from log entries, and set up alerts.
Alerting Strategies and Best Practices
Effective alerting is crucial for proactive incident response. Combine different types of alerts:
- Availability Alerts: Triggered when application health checks fail consistently or when Elasticsearch cluster health drops to “yellow” or “red”.
- Performance Alerts: Based on custom metrics (e.g., high JVM heap usage, low disk space on Elasticsearch nodes, increased request latency for Python apps).
- Error Rate Alerts: Monitor application error logs or specific HTTP status codes (e.g., 5xx errors) from load balancers or application logs.
- Resource Saturation Alerts: High CPU, memory, or disk I/O on GCE instances hosting Python apps or Elasticsearch nodes.
When setting up alerts in Google Cloud Monitoring, ensure you define clear notification channels (e.g., PagerDuty, Slack, email) and use appropriate thresholds. Avoid alert fatigue by tuning thresholds and using composite alerts that require multiple conditions to be met before firing.