Skip to content

JSON Processing

For processing JSON, there are 2 options. Streaming and Full load.

Python ijson uses streaming.

If JSON is present as a column in a parquet file, it can be treated as line-delimited json. Memory may not be an issue in such a case.

pyarrow can be considered for reading parquet and its json field.

py arrow parquet

import ijson
import pyarrow.parquet as pq

def process_parquet_streaming(s3_key):
    # Read parquet file
    table = pq.read_table(f's3://{bucket}/{s3_key}')

    # Process each row's JSON column
    for batch in table.to_batches():
        for row in batch.to_pandas().itertuples():
            # Stream parse JSON without loading full structure
            json_data = row.metadata_column

            # Use ijson for streaming parsing
            parser = ijson.parse(json_data)
            for prefix, event, value in parser:
                # Process JSON incrementally
                process_json_element(prefix, event, value, row.id, row.timestamp)

Nested JSON

import ijson
import io
import json
from typing import Dict, Any

def parse_nested_json_streaming(parquet_path: str, json_column: str):
    """
    Parse a Parquet file with streaming JSON parsing for a nested JSON column.

    Args:
        parquet_path: Path to the Parquet file
        json_column: Name of the column containing nested JSON data
    """
    # Open the Parquet file
    parquet_file = pq.ParquetFile(parquet_path)

    # Get the batches (streaming approach)
    for batch in parquet_file.iter_batches():
        # Convert batch to pandas (for easy access)
        df = batch.to_pandas()

        # Process each row
        for index, row in df.iterrows():
            json_str = row[json_column]

            # Skip null/empty values
            if not json_str or pd.isna(json_str):
                continue

            # Create a file-like object for ijson
            json_bytes = json_str.encode('utf-8') if isinstance(json_str, str) else json_str
            json_io = io.BytesIO(json_bytes)

            # Use ijson for streaming parse
            try:
                # Example: Parse the JSON stream for specific elements
                parser = ijson.parse(json_io)

                # This is where you'd implement your specific parsing logic
                # Here's an example that just collects paths and values
                for prefix, event, value in parser:
                    print(f"Path: {prefix}, Event: {event}, Value: {value}")

                # Alternative: Extract specific nested values
                json_io.seek(0)  # Reset for new parse
                nested_value = ijson.items(json_io, 'some.nested.path')
                for item in nested_value:
                    print(f"Nested value: {item}")

            except Exception as e:
                print(f"Error parsing JSON in row {index}: {str(e)}")

# Example usage
if __name__ == "__main__":
    parse_nested_json_streaming('data.parquet', 'json_data_column')