Skip to content

Batch Process Parquet with Monitoring

import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import json
from datetime import datetime
import io
import psutil
import time
import threading
from collections import deque
import os

Class for system resource monitoring

class ResourceMonitor:
    """Monitor CPU and memory usage during processing"""

    def __init__(self):
        self.cpu_samples = deque(maxlen=1000)
        self.memory_samples = deque(maxlen=1000)
        self.monitoring = False
        self.monitor_thread = None
        self.process = psutil.Process(os.getpid())

    def start_monitoring(self):
        """Start monitoring in a separate thread"""
        self.monitoring = True
        self.monitor_thread = threading.Thread(target=self._monitor_resources)
        self.monitor_thread.start()

    def stop_monitoring(self):
        """Stop monitoring and return statistics"""
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()

        if not self.cpu_samples or not self.memory_samples:
            return {}

        cpu_stats = {
            'avg': sum(self.cpu_samples) / len(self.cpu_samples),
            'max': max(self.cpu_samples),
            'min': min(self.cpu_samples)
        }

        memory_stats = {
            'avg_mb': sum(self.memory_samples) / len(self.memory_samples),
            'max_mb': max(self.memory_samples),
            'min_mb': min(self.memory_samples)
        }

        return {
            'cpu_percent': cpu_stats,
            'memory_mb': memory_stats,
            'samples_collected': len(self.cpu_samples)
        }

    def _monitor_resources(self):
        """Monitor resources in background"""
        while self.monitoring:
            # CPU usage (percentage)
            cpu_percent = self.process.cpu_percent(interval=0.1)
            self.cpu_samples.append(cpu_percent)

            # Memory usage (MB)
            memory_info = self.process.memory_info()
            memory_mb = memory_info.rss / 1024 / 1024
            self.memory_samples.append(memory_mb)

            time.sleep(0.1)  # Sample every 100ms

def get_system_info():
    """Get system information"""
    return {
        'cpu_count': psutil.cpu_count(),
        'cpu_count_logical': psutil.cpu_count(logical=True),
        'total_memory_gb': psutil.virtual_memory().total / 1024 / 1024 / 1024,
        'available_memory_gb': psutil.virtual_memory().available / 1024 / 1024 / 1024
    }

Read in batches and get system metrics

def batchread_prq(parquet_file, batch_size=2, monitor=None):
    """
    reading Parquet file in batches for memory efficiency
    monitoring can be enable to get cpu and mem usage
    """
    print(f"\nReading Parquet file in batches of {batch_size}:")

    start_time = time.time()
    if monitor:
        monitor.start_monitoring()

    try:
        parquet_file_obj = pq.ParquetFile(parquet_file)

        batch_num = 0
        total_rows_processed = 0

        for batch in parquet_file_obj.iter_batches(batch_size=batch_size):
            batch_num += 1
            df_batch = batch.to_pandas()
            total_rows_processed += len(df_batch)

            print(f"\nBatch {batch_num}:")
            print(f"  Rows: {len(df_batch)}")
            print(f"  IDs: {df_batch['id'].tolist()}")

            # Process JSON in this batch
            for idx, row in df_batch.iterrows():
                json_data = json.loads(row['attrs'])
                print(f"  - {json_data['user']['name']}: {json_data['metadata']['source']}")

        processing_time = time.time() - start_time

        if monitor:
            stats = monitor.stop_monitoring()
            print(f"\n📊 Batch Processing Resource Usage:")
            print(f"  Total rows processed: {total_rows_processed}")
            print(f"  Processing time: {processing_time:.2f} seconds")
            print(f"  Rows per second: {total_rows_processed/processing_time:.1f}")
            print(f"  CPU Usage: avg={stats['cpu_percent']['avg']:.1f}%, "
                  f"max={stats['cpu_percent']['max']:.1f}%")
            print(f"  Memory Usage: avg={stats['memory_mb']['avg_mb']:.1f}MB, "
                  f"max={stats['memory_mb']['max_mb']:.1f}MB")

    finally:
        if monitor and monitor.monitoring:
            monitor.stop_monitoring()
monitor = ResourceMonitor()
print("\n📊 System Information:")
sys_info = get_system_info()
for key, value in sys_info.items():
    print(f"  {key}: {value:.2f}" if isinstance(value, float) else f"  {key}: {value}")
📊 System Information:
  cpu_count: 12
  cpu_count_logical: 12
  total_memory_gb: 32.00
  available_memory_gb: 15.69
print("\n\nTesting batch reading with different batch sizes...")
for batch_size in [100, 500, 1000]:
    print(f"\n--- Batch size: {batch_size} ---")
    monitor = ResourceMonitor()
    read_parquet_in_batches('sample10k.parquet', batch_size=batch_size, monitor=monitor)