How to construct high-throughput import engines for large vendor commission records sets using custom XML/JSON parsers
Designing the Core Import Logic
When dealing with large vendor commission datasets, often delivered in custom XML or JSON formats, a standard WordPress importer will quickly buckle under the load. We need a robust, high-throughput engine. This involves a multi-stage process: parsing, validation, transformation, and finally, database insertion. The key is to avoid loading the entire dataset into memory at once. We’ll leverage PHP’s built-in XML and JSON parsing capabilities, specifically focusing on stream-based parsing for efficiency.
Stream-Based XML Parsing with `XMLReader`
For XML, `XMLReader` is the go-to class. It allows us to iterate through an XML document node by node without loading the entire DOM. This is crucial for large files. We’ll define a structure for our commission records within the XML, and then write a PHP class to process it.
Consider an XML structure like this:
<?xml version="1.0" encoding="UTF-8"?>
<commissions>
<record id="1001">
<vendor_id>VNDR789</vendor_id>
<product_sku>SKU-ABC-123</product_sku>
<sale_date>2023-10-26</sale_date>
<sale_amount>150.75</sale_amount>
<commission_rate>0.10</commission_rate>
<status>completed</status>
</record>
<record id="1002">
<vendor_id>VNDR789</vendor_id>
<product_sku>SKU-XYZ-456</product_sku>
<sale_date>2023-10-26</sale_date>
<sale_amount>220.50</sale_amount>
<commission_rate>0.12</commission_rate>
<status>pending</status>
</record>
<!-- ... more records ... -->
</commissions>
Our PHP parser will look something like this:
class XMLCommissionParser {
private $filePath;
private $db; // WordPress $wpdb instance
public function __construct(string $filePath, $wpdb) {
$this->filePath = $filePath;
$this->db = $wpdb;
}
public function processRecords() {
$xmlReader = new XMLReader();
if (!$xmlReader->open($this->filePath)) {
throw new Exception("Failed to open XML file: " . $this->filePath);
}
$batchSize = 100; // Process in batches for database inserts
$recordsToInsert = [];
$recordCount = 0;
while ($xmlReader->read()) {
if ($xmlReader->nodeType == XMLReader::ELEMENT && $xmlReader->name == 'record') {
// Read the entire record element as a string
$recordXml = $xmlReader->readOuterXml();
// Load this single record into a DOMDocument for easier parsing
$dom = new DOMDocument();
if ($dom->loadXML($recordXml)) {
$recordData = $this->extractRecordData($dom);
if ($recordData) {
$recordsToInsert[] = $recordData;
$recordCount++;
if (count($recordsToInsert) >= $batchSize) {
$this->insertBatch($recordsToInsert);
$recordsToInsert = []; // Clear the batch
}
}
}
}
}
// Insert any remaining records
if (!empty($recordsToInsert)) {
$this->insertBatch($recordsToInsert);
}
$xmlReader->close();
return $recordCount;
}
private function extractRecordData(DOMDocument $dom): ?array {
$recordElement = $dom->documentElement;
if (!$recordElement) {
return null;
}
$data = [];
$data['external_id'] = $recordElement->getAttribute('id'); // Assuming 'id' is the external record ID
foreach ($recordElement->childNodes as $node) {
if ($node->nodeType == XML_ELEMENT_NODE) {
switch ($node->nodeName) {
case 'vendor_id':
$data['vendor_id'] = $node->textContent;
break;
case 'product_sku':
$data['product_sku'] = $node->textContent;
break;
case 'sale_date':
$data['sale_date'] = $node->textContent;
break;
case 'sale_amount':
$data['sale_amount'] = (float) $node->textContent;
break;
case 'commission_rate':
$data['commission_rate'] = (float) $node->textContent;
break;
case 'status':
$data['status'] = $node->textContent;
break;
}
}
}
// Basic validation
if (empty($data['external_id']) || empty($data['vendor_id']) || empty($data['sale_date'])) {
// Log error or skip record
error_log("Skipping record due to missing essential fields: " . print_r($data, true));
return null;
}
// Further data sanitization and type casting can be done here
$data['sale_date'] = date('Y-m-d H:i:s', strtotime($data['sale_date'])); // Ensure proper datetime format
return $data;
}
private function insertBatch(array $records) {
if (empty($records)) {
return;
}
$tableName = $this->db->prefix . 'commission_records'; // Assuming a custom table
$sql = "INSERT INTO {$tableName} (external_id, vendor_id, product_sku, sale_date, sale_amount, commission_rate, status, import_timestamp) VALUES ";
$values = [];
$now = current_time('mysql');
foreach ($records as $record) {
$values[] = $this->db->prepare(
"(%s, %s, %s, %s, %f, %f, %s, %s)",
$record['external_id'],
$record['vendor_id'],
$record['product_sku'] ?? '', // Handle optional fields
$record['sale_date'],
$record['sale_amount'],
$record['commission_rate'],
$record['status'],
$now
);
}
$sql .= implode(', ', $values);
// Use $this->db->query() for non-SELECT queries
$result = $this->db->query($sql);
if ($result === false) {
error_log("Database error during batch insert: " . $this->db->last_error);
// Implement retry logic or error handling
}
}
}
// Example Usage within a WordPress context (e.g., a custom AJAX handler or WP-CLI command)
/*
global $wpdb;
$xml_file_path = '/path/to/your/commission_data.xml';
$parser = new XMLCommissionParser($xml_file_path, $wpdb);
try {
$processed_count = $parser->processRecords();
echo "Successfully processed {$processed_count} records.";
} catch (Exception $e) {
echo "Error: " . $e->getMessage();
}
*/
Stream-Based JSON Parsing with `JSON_BIG_INT_AS_STRING` and Iterators
For JSON, the situation is slightly more complex as PHP’s `json_decode` typically loads the entire structure into memory. However, for very large JSON files, we can simulate stream processing by reading the file in chunks and using `JSON_BIG_INT_AS_STRING` to prevent precision loss with large numbers. A more robust approach involves custom iterators or libraries that support streaming JSON parsing.
Let’s assume a JSON structure like this:
[
{
"record_id": "REC987",
"vendor": "VNDR789",
"product": "SKU-ABC-123",
"transaction_date": "2023-10-26T10:30:00Z",
"amount": 150.75,
"rate": 0.10,
"status": "completed"
},
{
"record_id": "REC988",
"vendor": "VNDR789",
"product": "SKU-XYZ-456",
"transaction_date": "2023-10-26T11:00:00Z",
"amount": 220.50,
"rate": 0.12,
"status": "pending"
}
// ... more records
]
A basic, albeit memory-intensive for truly massive files, approach using `json_decode` with `JSON_BIG_INT_AS_STRING` and iterating over the decoded array:
class JSONCommissionParser {
private $filePath;
private $db; // WordPress $wpdb instance
public function __construct(string $filePath, $wpdb) {
$this->filePath = $filePath;
$this->db = $wpdb;
}
public function processRecords() {
$jsonString = file_get_contents($this->filePath);
if ($jsonString === false) {
throw new Exception("Failed to read JSON file: " . $this->filePath);
}
// Use JSON_BIG_INT_AS_STRING to handle potentially large IDs or amounts without precision loss
$data = json_decode($jsonString, true, 512, JSON_BIG_INT_AS_STRING);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new Exception("JSON decode error: " . json_last_error_msg());
}
if (!is_array($data)) {
throw new Exception("JSON data is not an array.");
}
$batchSize = 100;
$recordsToInsert = [];
$recordCount = 0;
foreach ($data as $record) {
$processedRecord = $this->transformRecord($record);
if ($processedRecord) {
$recordsToInsert[] = $processedRecord;
$recordCount++;
if (count($recordsToInsert) >= $batchSize) {
$this->insertBatch($recordsToInsert);
$recordsToInsert = [];
}
}
}
if (!empty($recordsToInsert)) {
$this->insertBatch($recordsToInsert);
}
return $recordCount;
}
private function transformRecord(array $rawRecord): ?array {
// Basic validation and transformation
$transformed = [];
$transformed['external_id'] = $rawRecord['record_id'] ?? null;
$transformed['vendor_id'] = $rawRecord['vendor'] ?? null;
$transformed['product_sku'] = $rawRecord['product'] ?? null;
$transformed['sale_date'] = $rawRecord['transaction_date'] ?? null;
$transformed['sale_amount'] = isset($rawRecord['amount']) ? (float) $rawRecord['amount'] : null;
$transformed['commission_rate'] = isset($rawRecord['rate']) ? (float) $rawRecord['rate'] : null;
$transformed['status'] = $rawRecord['status'] ?? null;
// Essential field validation
if (empty($transformed['external_id']) || empty($transformed['vendor_id']) || empty($transformed['sale_date'])) {
error_log("Skipping JSON record due to missing essential fields: " . print_r($rawRecord, true));
return null;
}
// Date formatting
try {
$date = new DateTime($transformed['sale_date']);
$transformed['sale_date'] = $date->format('Y-m-d H:i:s');
} catch (Exception $e) {
error_log("Invalid date format in JSON record: " . $transformed['sale_date']);
return null;
}
return $transformed;
}
private function insertBatch(array $records) {
if (empty($records)) {
return;
}
$tableName = $this->db->prefix . 'commission_records';
$sql = "INSERT INTO {$tableName} (external_id, vendor_id, product_sku, sale_date, sale_amount, commission_rate, status, import_timestamp) VALUES ";
$values = [];
$now = current_time('mysql');
foreach ($records as $record) {
$values[] = $this->db->prepare(
"(%s, %s, %s, %s, %f, %f, %s, %s)",
$record['external_id'],
$record['vendor_id'],
$record['product_sku'] ?? '',
$record['sale_date'],
$record['sale_amount'],
$record['commission_rate'],
$record['status'],
$now
);
}
$sql .= implode(', ', $values);
$result = $this->db->query($sql);
if ($result === false) {
error_log("Database error during batch insert: " . $this->db->last_error);
}
}
}
// Example Usage
/*
global $wpdb;
$json_file_path = '/path/to/your/commission_data.json';
$parser = new JSONCommissionParser($json_file_path, $wpdb);
try {
$processed_count = $parser->processRecords();
echo "Successfully processed {$processed_count} records.";
} catch (Exception $e) {
echo "Error: " . $e->getMessage();
}
*/
Advanced JSON Streaming with `JsonMachine`
For truly massive JSON files that `file_get_contents` would choke on, a true streaming parser is necessary. The `JsonMachine` library (available via Composer) is an excellent choice. It provides an iterator interface that reads the JSON file incrementally.
First, install it:
composer require mac_il/json-machine
Then, integrate it into a parser class:
use JsonMachine\Items;
use JsonMachine\JsonDecoder\ExtJsonDecoder;
class StreamingJSONCommissionParser {
private $filePath;
private $db; // WordPress $wpdb instance
public function __construct(string $filePath, $wpdb) {
$this->filePath = $filePath;
$this->db = $wpdb;
}
public function processRecords() {
if (!file_exists($this->filePath)) {
throw new Exception("File not found: " . $this->filePath);
}
// Assuming the JSON is an array of objects at the top level
// Use ExtJsonDecoder for better performance and options like JSON_BIG_INT_AS_STRING
$items = Items::fromFile($this->filePath, ['decoder' => new ExtJsonDecoder(JSON_BIG_INT_AS_STRING)]);
$batchSize = 100;
$recordsToInsert = [];
$recordCount = 0;
foreach ($items as $record) {
// $record is now a PHP associative array representing one JSON object
$processedRecord = $this->transformRecord($record);
if ($processedRecord) {
$recordsToInsert[] = $processedRecord;
$recordCount++;
if (count($recordsToInsert) >= $batchSize) {
$this->insertBatch($recordsToInsert);
$recordsToInsert = [];
}
}
}
if (!empty($recordsToInsert)) {
$this->insertBatch($recordsToInsert);
}
return $recordCount;
}
// transformRecord and insertBatch methods would be identical to the JSONCommissionParser class above
// for brevity, they are omitted here but should be included.
// Ensure the transformRecord method handles potential variations in JSON keys.
private function transformRecord(array $rawRecord): ?array {
// Basic validation and transformation
$transformed = [];
$transformed['external_id'] = $rawRecord['record_id'] ?? null;
$transformed['vendor_id'] = $rawRecord['vendor'] ?? null;
$transformed['product_sku'] = $rawRecord['product'] ?? null;
$transformed['sale_date'] = $rawRecord['transaction_date'] ?? null;
$transformed['sale_amount'] = isset($rawRecord['amount']) ? (float) $rawRecord['amount'] : null;
$transformed['commission_rate'] = isset($rawRecord['rate']) ? (float) $rawRecord['rate'] : null;
$transformed['status'] = $rawRecord['status'] ?? null;
// Essential field validation
if (empty($transformed['external_id']) || empty($transformed['vendor_id']) || empty($transformed['sale_date'])) {
error_log("Skipping JSON record due to missing essential fields: " . print_r($rawRecord, true));
return null;
}
// Date formatting
try {
$date = new DateTime($transformed['sale_date']);
$transformed['sale_date'] = $date->format('Y-m-d H:i:s');
} catch (Exception $e) {
error_log("Invalid date format in JSON record: " . $transformed['sale_date']);
return null;
}
return $transformed;
}
private function insertBatch(array $records) {
if (empty($records)) {
return;
}
$tableName = $this->db->prefix . 'commission_records';
$sql = "INSERT INTO {$tableName} (external_id, vendor_id, product_sku, sale_date, sale_amount, commission_rate, status, import_timestamp) VALUES ";
$values = [];
$now = current_time('mysql');
foreach ($records as $record) {
$values[] = $this->db->prepare(
"(%s, %s, %s, %s, %f, %f, %s, %s)",
$record['external_id'],
$record['vendor_id'],
$record['product_sku'] ?? '',
$record['sale_date'],
$record['sale_amount'],
$record['commission_rate'],
$record['status'],
$now
);
}
$sql .= implode(', ', $values);
$result = $this->db->query($sql);
if ($result === false) {
error_log("Database error during batch insert: " . $this->db->last_error);
}
}
}
// Example Usage
/*
global $wpdb;
$json_file_path = '/path/to/your/large_commission_data.json';
$parser = new StreamingJSONCommissionParser($json_file_path, $wpdb);
try {
$processed_count = $parser->processRecords();
echo "Successfully processed {$processed_count} records.";
} catch (Exception $e) {
echo "Error: " . $e->getMessage();
}
*/
Database Schema and Performance Considerations
For storing commission records, a dedicated custom table is highly recommended over `wp_posts` or `wp_postmeta`. This allows for optimized indexing and querying.
CREATE TABLE wp_commission_records (
id BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,
external_id VARCHAR(100) NOT NULL UNIQUE, -- Assuming external ID is unique
vendor_id VARCHAR(100) NOT NULL,
product_sku VARCHAR(100) NULL,
sale_date DATETIME NOT NULL,
sale_amount DECIMAL(12, 2) NOT NULL,
commission_rate DECIMAL(5, 4) NOT NULL, -- e.g., 0.1000 for 10%
status VARCHAR(50) NOT NULL DEFAULT 'pending',
import_timestamp DATETIME NOT NULL,
PRIMARY KEY (id),
INDEX idx_vendor_date (vendor_id, sale_date), -- Useful for vendor-specific reports
INDEX idx_sale_date (sale_date) -- For time-based analysis
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Key considerations:
- `external_id` as UNIQUE: Crucial for preventing duplicate imports if the same file is processed multiple times.
- `DECIMAL` types: Use `DECIMAL` for monetary values and rates to avoid floating-point inaccuracies. Adjust precision and scale as needed.
- Indexes: Strategically add indexes for columns frequently used in `WHERE` clauses or `ORDER BY` clauses for reporting and analysis.
- `ENGINE=InnoDB`: Ensures transactional integrity.
- Batch Inserts: As demonstrated, inserting records in batches significantly reduces database overhead compared to individual inserts. Tune `$batchSize` based on your server’s capabilities and typical record size.
- Error Handling and Logging: Implement robust logging for skipped records, validation errors, and database failures. This is critical for debugging and auditing.
- Transaction Management: For critical imports, wrap batch inserts within database transactions (`$wpdb->query(‘START TRANSACTION’);`, `$wpdb->query(‘COMMIT’);`, `$wpdb->query(‘ROLLBACK’);`) to ensure atomicity. If one batch fails, the entire import can be rolled back.
Integration into WordPress
These parsers can be integrated into WordPress in several ways:
- WP-CLI Commands: Create custom WP-CLI commands for command-line execution, ideal for scheduled imports or large files that might time out via a web request.
- AJAX Handlers: For user-initiated imports via the WordPress admin, use AJAX to process files in chunks, providing progress feedback to the user. This requires careful management of execution time limits and potential AJAX timeouts.
- Background Processing Libraries: For truly robust, asynchronous imports, consider libraries like `Action Scheduler` or `WP Cron` (with careful management of execution time) to offload the parsing to background processes.
When using AJAX, you’ll need to break down the import into smaller steps, perhaps processing one file or a portion of a file per AJAX request, and storing the state (e.g., current file position, records processed) in the WordPress options or transient API.