Batch Process Parquet with Monitoring
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import json
from datetime import datetime
import io
import psutil
import time
import threading
from collections import deque
import os
Class for system resource monitoring
class ResourceMonitor:
"""Monitor CPU and memory usage during processing"""
def __init__(self):
self.cpu_samples = deque(maxlen=1000)
self.memory_samples = deque(maxlen=1000)
self.monitoring = False
self.monitor_thread = None
self.process = psutil.Process(os.getpid())
def start_monitoring(self):
"""Start monitoring in a separate thread"""
self.monitoring = True
self.monitor_thread = threading.Thread(target=self._monitor_resources)
self.monitor_thread.start()
def stop_monitoring(self):
"""Stop monitoring and return statistics"""
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join()
if not self.cpu_samples or not self.memory_samples:
return {}
cpu_stats = {
'avg': sum(self.cpu_samples) / len(self.cpu_samples),
'max': max(self.cpu_samples),
'min': min(self.cpu_samples)
}
memory_stats = {
'avg_mb': sum(self.memory_samples) / len(self.memory_samples),
'max_mb': max(self.memory_samples),
'min_mb': min(self.memory_samples)
}
return {
'cpu_percent': cpu_stats,
'memory_mb': memory_stats,
'samples_collected': len(self.cpu_samples)
}
def _monitor_resources(self):
"""Monitor resources in background"""
while self.monitoring:
# CPU usage (percentage)
cpu_percent = self.process.cpu_percent(interval=0.1)
self.cpu_samples.append(cpu_percent)
# Memory usage (MB)
memory_info = self.process.memory_info()
memory_mb = memory_info.rss / 1024 / 1024
self.memory_samples.append(memory_mb)
time.sleep(0.1) # Sample every 100ms
def get_system_info():
"""Get system information"""
return {
'cpu_count': psutil.cpu_count(),
'cpu_count_logical': psutil.cpu_count(logical=True),
'total_memory_gb': psutil.virtual_memory().total / 1024 / 1024 / 1024,
'available_memory_gb': psutil.virtual_memory().available / 1024 / 1024 / 1024
}
Read in batches and get system metrics
def batchread_prq(parquet_file, batch_size=2, monitor=None):
"""
reading Parquet file in batches for memory efficiency
monitoring can be enable to get cpu and mem usage
"""
print(f"\nReading Parquet file in batches of {batch_size}:")
start_time = time.time()
if monitor:
monitor.start_monitoring()
try:
parquet_file_obj = pq.ParquetFile(parquet_file)
batch_num = 0
total_rows_processed = 0
for batch in parquet_file_obj.iter_batches(batch_size=batch_size):
batch_num += 1
df_batch = batch.to_pandas()
total_rows_processed += len(df_batch)
print(f"\nBatch {batch_num}:")
print(f" Rows: {len(df_batch)}")
print(f" IDs: {df_batch['id'].tolist()}")
# Process JSON in this batch
for idx, row in df_batch.iterrows():
json_data = json.loads(row['attrs'])
print(f" - {json_data['user']['name']}: {json_data['metadata']['source']}")
processing_time = time.time() - start_time
if monitor:
stats = monitor.stop_monitoring()
print(f"\n📊 Batch Processing Resource Usage:")
print(f" Total rows processed: {total_rows_processed}")
print(f" Processing time: {processing_time:.2f} seconds")
print(f" Rows per second: {total_rows_processed/processing_time:.1f}")
print(f" CPU Usage: avg={stats['cpu_percent']['avg']:.1f}%, "
f"max={stats['cpu_percent']['max']:.1f}%")
print(f" Memory Usage: avg={stats['memory_mb']['avg_mb']:.1f}MB, "
f"max={stats['memory_mb']['max_mb']:.1f}MB")
finally:
if monitor and monitor.monitoring:
monitor.stop_monitoring()
monitor = ResourceMonitor()
print("\n📊 System Information:")
sys_info = get_system_info()
for key, value in sys_info.items():
print(f" {key}: {value:.2f}" if isinstance(value, float) else f" {key}: {value}")
📊 System Information:
cpu_count: 12
cpu_count_logical: 12
total_memory_gb: 32.00
available_memory_gb: 15.69
print("\n\nTesting batch reading with different batch sizes...")
for batch_size in [100, 500, 1000]:
print(f"\n--- Batch size: {batch_size} ---")
monitor = ResourceMonitor()
read_parquet_in_batches('sample10k.parquet', batch_size=batch_size, monitor=monitor)