Implementing automated compliance reporting for custom custom subscription logs ledgers using native TCP printing streams
Leveraging TCP Streams for Automated Subscription Log Compliance Reporting
This document details the implementation of an automated compliance reporting system for custom subscription log ledgers within a WordPress environment. The core of this solution relies on directly streaming log data via native TCP sockets to a dedicated reporting service, bypassing traditional file-based logging and offering near real-time compliance auditing capabilities. This approach is particularly suited for scenarios requiring granular tracking of subscription lifecycle events (creation, renewal, cancellation, status changes) for regulatory or internal audit purposes.
System Architecture Overview
The system comprises two primary components:
- WordPress Plugin (Logger): This component intercepts subscription-related events within WordPress (e.g., using WooCommerce hooks or custom subscription management logic). It formats these events into a structured log entry and transmits them over a TCP stream to the reporting endpoint.
- External Reporting Service (Listener): A separate application (can be a simple script or a robust service) listening on a designated TCP port. It receives log entries, validates them, and stores them in a compliance-grade ledger (e.g., a secure database or immutable log store).
This architecture ensures that log data is captured and transmitted independently of the web server’s operational state, enhancing data integrity and auditability. The use of TCP streams provides a persistent, connection-oriented channel for reliable data transfer.
WordPress Plugin: Subscription Event Logging
We’ll develop a WordPress plugin that hooks into relevant subscription events. For demonstration purposes, we’ll assume a hypothetical subscription management system or hooks within WooCommerce Subscriptions. The plugin will format log entries as JSON for easy parsing by the reporting service.
Plugin Structure and Core Logic
Create a new plugin directory, e.g., wp-content/plugins/subscription-compliance-logger. Inside, create a main PHP file, e.g., subscription-compliance-logger.php.
subscription-compliance-logger.php
This file will contain the plugin header, configuration constants, and the core logging function.
Plugin Header and Configuration
<?php
/**
* Plugin Name: Subscription Compliance Logger
* Description: Logs subscription events to a remote TCP endpoint for compliance reporting.
* Version: 1.0.0
* Author: Antigravity
*/
// Exit if accessed directly.
if ( ! defined( 'ABSPATH' ) ) {
exit;
}
// --- Configuration Constants ---
// Define the remote logging server and port.
// IMPORTANT: Ensure this is a stable, accessible endpoint.
define( 'SCL_LOG_HOST', '127.0.0.1' ); // Or your dedicated logging server IP/hostname
define( 'SCL_LOG_PORT', 50001 ); // A non-privileged, dedicated port
// Define a timeout for the TCP connection and write operations.
define( 'SCL_LOG_TIMEOUT', 5 ); // Seconds
// Define the log entry format. JSON is recommended.
define( 'SCL_LOG_FORMAT', 'json' );
// --- Core Logging Function ---
/**
* Sends a log entry to the configured TCP endpoint.
*
* @param array $log_data The data to log.
* @return bool True on success, false on failure.
*/
function scl_send_log_to_tcp( $log_data ) {
if ( ! defined( 'SCL_LOG_HOST' ) || ! defined( 'SCL_LOG_PORT' ) ) {
error_log( 'Subscription Compliance Logger: Logging host/port not defined.' );
return false;
}
// Add common metadata to the log entry
$log_data['timestamp'] = gmdate( 'Y-m-d\TH:i:s\Z' );
$log_data['server_name'] = $_SERVER['SERVER_NAME'] ?? php_uname( 'n' ); // WordPress server name or hostname
// Format the log entry
$formatted_log = '';
switch ( SCL_LOG_FORMAT ) {
case 'json':
$formatted_log = json_encode( $log_data ) . "\n"; // Append newline for easier line-based parsing
break;
// Add other formats if needed (e.g., plain text, CSV)
default:
error_log( 'Subscription Compliance Logger: Unsupported log format: ' . SCL_LOG_FORMAT );
return false;
}
// Attempt to establish a TCP connection
$socket = @fsockopen( SCL_LOG_HOST, SCL_LOG_PORT, $errno, $errstr, SCL_LOG_TIMEOUT );
if ( ! $socket ) {
// Log the error locally if the remote endpoint is unreachable.
// In a production system, consider a more robust error handling strategy
// (e.g., local queueing, retry mechanisms, or alerting).
error_log( sprintf(
'Subscription Compliance Logger: Failed to connect to %s:%d. Error: %s (%d)',
SCL_LOG_HOST,
SCL_LOG_PORT,
$errstr,
$errno
) );
return false;
}
// Set socket options for non-blocking write and a write timeout
stream_set_blocking( $socket, true ); // Keep blocking for simplicity in this example, but consider non-blocking for high throughput
stream_set_timeout( $socket, SCL_LOG_TIMEOUT );
// Send the log data
$bytes_written = fwrite( $socket, $formatted_log );
// Close the connection
fclose( $socket );
if ( $bytes_written === false || $bytes_written < strlen( $formatted_log ) ) {
error_log( sprintf(
'Subscription Compliance Logger: Failed to write full log entry to %s:%d. Bytes written: %d/%d',
SCL_LOG_HOST,
SCL_LOG_PORT,
$bytes_written,
strlen( $formatted_log )
) );
return false;
}
return true;
}
// --- Hook into Subscription Events ---
// This section needs to be adapted based on your specific subscription system.
// Example for WooCommerce Subscriptions:
// Hook into the 'woocommerce_subscription_status_changed' action.
add_action( 'woocommerce_subscription_status_changed', function( $subscription ) {
$log_entry = array(
'event_type' => 'subscription_status_change',
'subscription_id' => $subscription->get_id(),
'old_status' => $subscription->get_status(), // Note: This might be the status *before* the change, depending on hook timing.
'new_status' => $subscription->get_status(), // Get the current status after the change.
'user_id' => $subscription->get_user_id(),
'order_id' => $subscription->get_parent_order_id(),
'product_ids' => $subscription->get_product_ids(),
);
scl_send_log_to_tcp( $log_entry );
}, 10, 1 );
// Example for subscription creation (e.g., after order completion for a subscription product)
// This hook might vary significantly. For WooCommerce, consider 'woocommerce_checkout_order_processed'.
add_action( 'woocommerce_checkout_order_processed', function( $order_id ) {
$order = wc_get_order( $order_id );
if ( ! $order ) {
return;
}
// Check if the order contains subscription products
$has_subscription = false;
$subscription_ids = array();
foreach ( $order->get_items() as $item_id => $item ) {
if ( $item->get_product_type() === 'subscription' ) {
$has_subscription = true;
// In a real scenario, you'd fetch the actual subscription object(s) created from this order.
// This is a placeholder. You might need to query WC_Subscription objects.
// For simplicity, we'll just log the order ID and indicate subscription creation.
// A more robust solution would find the WC_Subscription object(s) linked to this order.
// Example: $subscriptions = wcs_get_subscriptions_for_order( $order_id );
// foreach ( $subscriptions as $subscription ) { $subscription_ids[] = $subscription->get_id(); }
}
}
if ( $has_subscription ) {
$log_entry = array(
'event_type' => 'subscription_created',
'order_id' => $order_id,
'user_id' => $order->get_user_id(),
'customer_ip' => $order->get_customer_ip_address(),
// 'subscription_ids' => $subscription_ids, // Add if you can reliably fetch them
);
scl_send_log_to_tcp( $log_entry );
}
}, 10, 1 );
// Add more hooks for renewal, cancellation, payment failures, etc.
// For cancellations, you might hook into 'woocommerce_subscription_cancelled'.
add_action( 'woocommerce_subscription_cancelled', function( $subscription ) {
$log_entry = array(
'event_type' => 'subscription_cancelled',
'subscription_id' => $subscription->get_id(),
'user_id' => $subscription->get_user_id(),
'order_id' => $subscription->get_parent_order_id(),
);
scl_send_log_to_tcp( $log_entry );
}, 10, 1 );
// For renewals, you might hook into 'woocommerce_subscription_renewal_payment_complete'.
add_action( 'woocommerce_subscription_renewal_payment_complete', function( $subscription ) {
$log_entry = array(
'event_type' => 'subscription_renewal_successful',
'subscription_id' => $subscription->get_id(),
'user_id' => $subscription->get_user_id(),
'order_id' => $subscription->get_parent_order_id(), // This is the renewal order ID
'renewal_order_id' => $subscription->get_last_order_id(), // The ID of the renewal order itself
);
scl_send_log_to_tcp( $log_entry );
}, 10, 1 );
?>
Error Handling and Resilience
The provided scl_send_log_to_tcp function includes basic error handling for connection failures and write errors. In a production environment, consider these enhancements:
- Local Queuing: If the TCP connection fails, store the log entry in a local WordPress transient or a dedicated database table. A background process (e.g., a WP-Cron job or a separate PHP script) can then attempt to resend queued logs.
- Retry Mechanism: Implement exponential backoff for retrying failed connections.
- Alerting: Integrate with an alerting system (e.g., Slack, email) to notify administrators of persistent logging failures.
- Buffering: For high-volume sites, consider buffering log entries locally and sending them in batches to reduce the overhead of establishing TCP connections for every event.
- Security: If the logging endpoint is external, consider TLS/SSL encryption for the TCP stream (using
openssl_connector similar) and authentication mechanisms.
External Reporting Service: TCP Listener
This service will listen on the configured TCP port, receive the JSON log entries, and process them. Python is an excellent choice for this due to its robust networking libraries.
Python TCP Listener Script
Create a Python script, e.g., compliance_listener.py.
compliance_listener.py
import socket
import json
import sys
import os
import datetime
import logging
from logging.handlers import RotatingFileHandler
# --- Configuration ---
LISTEN_HOST = '127.0.0.1' # Listen on localhost, or '0.0.0.0' to listen on all interfaces
LISTEN_PORT = 50001 # Must match SCL_LOG_PORT in WordPress plugin
LOG_FILE_PATH = '/var/log/subscription_compliance.log' # Ensure this directory is writable by the script user
MAX_LOG_FILE_SIZE = 10 * 1024 * 1024 # 10 MB
MAX_LOG_FILES = 5
# --- Logging Setup ---
# Setup basic logging for the listener itself
listener_logger = logging.getLogger('ComplianceListener')
listener_logger.setLevel(logging.INFO)
# Use RotatingFileHandler for the listener's own logs
log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler = RotatingFileHandler(
LOG_FILE_PATH,
maxBytes=MAX_LOG_FILE_SIZE,
backupCount=MAX_LOG_FILES
)
file_handler.setFormatter(log_formatter)
listener_logger.addHandler(file_handler)
# Also log to console for immediate feedback during development/debugging
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(log_formatter)
listener_logger.addHandler(console_handler)
# --- Compliance Ledger Storage ---
# This is a placeholder. In production, use a secure, auditable storage.
# Examples: PostgreSQL, MySQL, dedicated log management systems (ELK, Splunk), or immutable ledger technologies.
COMPLIANCE_LEDGER_FILE = '/var/log/subscription_ledger.jsonl' # JSON Lines format
def initialize_ledger():
"""Ensures the ledger file exists."""
if not os.path.exists(COMPLIANCE_LEDGER_FILE):
try:
with open(COMPLIANCE_LEDGER_FILE, 'w') as f:
pass # Create empty file
listener_logger.info(f"Initialized compliance ledger file: {COMPLIANCE_LEDGER_FILE}")
except IOError as e:
listener_logger.error(f"Failed to initialize ledger file {COMPLIANCE_LEDGER_FILE}: {e}")
sys.exit(1)
def store_log_entry(log_data):
"""Stores a single log entry to the compliance ledger.
Using JSON Lines format for easy append and parsing.
"""
try:
with open(COMPLIANCE_LEDGER_FILE, 'a') as f:
json.dump(log_data, f)
f.write('\n') # Each entry on a new line
return True
except IOError as e:
listener_logger.error(f"Failed to write to ledger file {COMPLIANCE_LEDGER_FILE}: {e}")
return False
except Exception as e:
listener_logger.error(f"An unexpected error occurred during ledger storage: {e}")
return False
def process_log_entry(log_json_string):
"""Parses and stores a single log entry."""
try:
log_entry = json.loads(log_json_string)
# --- Data Validation and Enrichment (Crucial for Compliance) ---
# Basic validation: check for essential fields
required_fields = ['event_type', 'timestamp']
if not all(field in log_entry and log_entry[field] for field in required_fields):
listener_logger.warning(f"Received log entry missing required fields: {log_json_string.strip()}")
# Decide whether to store incomplete logs or discard them
# For compliance, it's often better to store even incomplete data with a flag.
log_entry['validation_error'] = 'Missing required fields'
# return False # Or continue to store with error flag
# Timestamp validation (optional but recommended)
try:
datetime.datetime.strptime(log_entry['timestamp'], '%Y-%m-%dT%H:%M:%SZ')
except ValueError:
listener_logger.warning(f"Received log entry with invalid timestamp format: {log_entry['timestamp']}")
log_entry['validation_error'] = log_entry.get('validation_error', '') + '; Invalid timestamp format'
# Add server-side timestamp for when the log was received
log_entry['received_at'] = datetime.datetime.utcnow().isoformat() + 'Z'
# --- Store the validated/enriched log entry ---
if store_log_entry(log_entry):
listener_logger.debug(f"Successfully logged: {log_entry.get('event_type', 'N/A')} for ID {log_entry.get('subscription_id', log_entry.get('order_id', 'N/A'))}")
return True
else:
listener_logger.error(f"Failed to store log entry: {log_entry}")
return False
except json.JSONDecodeError:
listener_logger.error(f"Failed to decode JSON log entry: {log_json_string.strip()}")
return False
except Exception as e:
listener_logger.error(f"An unexpected error occurred processing log entry: {e} - Data: {log_json_string.strip()}")
return False
def start_listener():
"""Starts the TCP listener."""
try:
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Allow address reuse
server_socket.bind((LISTEN_HOST, LISTEN_PORT))
server_socket.listen(5) # Max backlog of connections
listener_logger.info(f"TCP Compliance Listener started on {LISTEN_HOST}:{LISTEN_PORT}")
while True:
client_socket, addr = server_socket.accept()
listener_logger.info(f"Connection accepted from {addr}")
# Handle client connection in a loop, expecting line-delimited JSON
buffer = ""
try:
while True:
# Receive data in chunks. Adjust chunk size as needed.
chunk = client_socket.recv(4096)
if not chunk:
break # Connection closed by client
buffer += chunk.decode('utf-8', errors='ignore')
# Process complete lines (log entries ending with newline)
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
if line: # Ensure it's not an empty string
process_log_entry(line)
# Process any remaining data in the buffer after connection closes
if buffer:
process_log_entry(buffer)
except socket.error as e:
listener_logger.error(f"Socket error with client {addr}: {e}")
except Exception as e:
listener_logger.error(f"Error handling client {addr}: {e}")
finally:
client_socket.close()
listener_logger.info(f"Connection closed from {addr}")
except socket.error as e:
listener_logger.critical(f"Failed to start listener on {LISTEN_HOST}:{LISTEN_PORT}: {e}")
sys.exit(1)
except KeyboardInterrupt:
listener_logger.info("Listener shutting down.")
sys.exit(0)
finally:
if 'server_socket' in locals() and server_socket:
server_socket.close()
if __name__ == "__main__":
initialize_ledger()
start_listener()
Running the Listener
To run the Python listener, ensure you have Python 3 installed. You can execute it from your terminal:
python compliance_listener.py
For production deployment, consider running this script as a system service using tools like systemd on Linux to ensure it restarts automatically if it crashes or the server reboots.
systemd Service File Example
Create a file named /etc/systemd/system/compliance-listener.service:
[Unit] Description=Subscription Compliance TCP Listener After=network.target [Service] User=www-data # Or a dedicated user for the listener Group=www-data # Or a dedicated group WorkingDirectory=/path/to/your/python/script/directory ExecStart=/usr/bin/python3 /path/to/your/python/script/directory/compliance_listener.py Restart=always RestartSec=10 StandardOutput=syslog StandardError=syslog SyslogIdentifier=compliance-listener [Install] WantedBy=multi-user.target
Then, enable and start the service:
sudo systemctl daemon-reload sudo systemctl enable compliance-listener.service sudo systemctl start compliance-listener.service sudo systemctl status compliance-listener.service
Compliance Ledger Management and Auditing
The Python script stores logs in JSON Lines format. This is human-readable and easily parsable by other tools. For robust compliance, consider:
- Database Integration: Instead of a file, the Python script can connect to a database (PostgreSQL, MySQL) and insert log entries. This provides better querying capabilities and data integrity.
- Immutable Storage: For high-assurance compliance, explore solutions that offer append-only, tamper-evident storage.
- Data Retention Policies: Implement automated processes to archive or delete logs according to your organization’s data retention policies.
- Access Control: Ensure that the ledger data is protected by strict access controls.
- Regular Audits: Periodically audit the log data to verify its completeness and accuracy. The Python script’s logging of validation errors can be a starting point for such audits.
Security Considerations
Direct TCP communication, especially over untrusted networks, requires careful security considerations:
- Network Segmentation: Ensure the WordPress server and the logging listener are on a secure, isolated network segment.
- Firewall Rules: Configure firewalls to allow traffic only from the WordPress server to the listener’s port and vice-versa if needed.
- TLS/SSL Encryption: For sensitive data or communication over public networks, encrypt the TCP stream. This can be achieved using libraries like
sslin Python for the listener and potentiallyopenssl_connectin PHP for the plugin. This adds complexity but is essential for secure data transfer. - Authentication: Implement a simple shared secret or token-based authentication mechanism between the plugin and the listener to verify the source of the logs.
- Listener Security: The machine running the listener should be hardened, with minimal unnecessary services and regular security updates.
Conclusion
Implementing automated compliance reporting via native TCP streams offers a powerful, low-latency solution for tracking critical subscription events. By carefully designing the WordPress plugin and the external listener, and by prioritizing security and robust error handling, you can build a reliable system that meets stringent auditing requirements.