Skip to content

Parse Parquet

import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import json
from datetime import datetime
import io

Store JSON as String

def csv_to_parquet_with_schema(csv_file, parquet_file):
    """
    Convert CSV to Parquet with proper schema definition
    JSON will be treated as a string field
    """

    df = pd.read_csv(csv_file)

    # Convert create_dt to timestamp
    df['create_dt'] = pd.to_datetime(df['create_dt'])

    # Define the schema
    schema = pa.schema([
        ('id', pa.int64()),
        ('create_dt', pa.timestamp('us')),
        ('attrs', pa.string())  # JSON stored as string
    ])

    # Create PyArrow table
    table = pa.Table.from_pandas(df, schema=schema)

    # Write to Parquet with compression. Parquet version can also be set
    pq.write_table(
        table, 
        parquet_file,
        compression='snappy',
        use_dictionary=True       
    )

    print(f"Parquet file created: {parquet_file}")

    # Print file metadata
    parquet_file_obj = pq.ParquetFile(parquet_file)
    print(f"\nParquet file metadata:")
    print(f"  Number of rows: {parquet_file_obj.metadata.num_rows}")
    print(f"  Number of columns: {parquet_file_obj.metadata.num_columns}")
    print(f"  Schema: {parquet_file_obj.schema}")
    print(f"  Compression: snappy")
csv_to_parquet_with_schema('sample_data.csv', 'output_with_json_string.parquet')
Parquet file created: output_with_json_string.parquet

Parquet file metadata:
  Number of rows: 5
  Number of columns: 3
  Schema: <pyarrow._parquet.ParquetSchema object at 0x10e35d580>
required group field_id=-1 schema {
  optional int64 field_id=-1 id;
  optional int64 field_id=-1 create_dt (Timestamp(isAdjustedToUTC=false, timeUnit=microseconds, is_from_converted_type=false, force_set_converted_type=false));
  optional binary field_id=-1 attrs (String);
}

  Compression: snappy
def read_and_parse_parquet(parquet_file):
    """
    Read Parquet file and demonstrate parsing JSON column
    """
    print(f"\nReading Parquet file: {parquet_file}")

    # Read the entire file
    table = pq.read_table(parquet_file)
    df = table.to_pandas()

    print(f"\nDataFrame shape: {df.shape}")
    print(f"\nDataFrame dtypes:\n{df.dtypes}")

    # Parse JSON column for first row as example
    print("\nParsing JSON from first row:")
    first_row_json = json.loads(df.iloc[0]['attrs'])
    print(json.dumps(first_row_json, indent=2))

    # Example: Extract specific nested values
    print("\nExtracting nested values from all rows:")
    for idx, row in df.iterrows():
        json_data = json.loads(row['attrs'])
        user_name = json_data['user']['name']
        city = json_data['user']['address']['city']
        lat = json_data['user']['address']['coordinates']['lat']
        email_pref = json_data['user']['preferences']['notifications']['email']
        tags = json_data['metadata']['tags']

        print(f"ID {row['id']}: {user_name} from {city} (lat: {lat}), "
              f"email notifications: {email_pref}, tags: {tags}")
read_and_parse_parquet('output_with_json_string.parquet')
Reading Parquet file: output_with_json_string.parquet

DataFrame shape: (5, 3)

DataFrame dtypes:
id                    int64
create_dt    datetime64[us]
attrs                object
dtype: object

Parsing JSON from first row:
{
  "user": {
    "name": "John Doe",
    "age": 32,
    "address": {
      "street": "123 Main St",
      "city": "New York",
      "state": "NY",
      "coordinates": {
        "lat": 40.7128,
        "lng": -74.006
      }
    },
    "preferences": {
      "notifications": {
        "email": true,
        "sms": false,
        "push": true
      },
      "theme": "dark"
    }
  },
  "metadata": {
    "source": "web",
    "version": "2.1.0",
    "tags": [
      "premium",
      "verified",
      "active"
    ]
  }
}

Extracting nested values from all rows:
ID 1: John Doe from New York (lat: 40.7128), email notifications: True, tags: ['premium', 'verified', 'active']
ID 2: Jane Smith from Los Angeles (lat: 34.0522), email notifications: False, tags: ['basic', 'new_user']
ID 3: Bob Johnson from Chicago (lat: 41.8781), email notifications: True, tags: ['enterprise', 'verified', 'beta_tester']
ID 4: Alice Brown from Houston (lat: 29.7604), email notifications: True, tags: ['premium', 'long_term_user', 'vip']
ID 5: Charlie Wilson from Phoenix (lat: 33.4484), email notifications: False, tags: ['basic']
def read_parquet_in_batches(parquet_file, batch_size=2):
    """
    Demonstrate reading Parquet file in batches for memory efficiency
    """
    print(f"\nReading Parquet file in batches of {batch_size}:")

    parquet_file_obj = pq.ParquetFile(parquet_file)

    batch_num = 0
    for batch in parquet_file_obj.iter_batches(batch_size=batch_size):
        batch_num += 1
        df_batch = batch.to_pandas()
        print(f"\nBatch {batch_num}:")
        print(f"  Rows: {len(df_batch)}")
        print(f"  IDs: {df_batch['id'].tolist()}")

        # Process JSON in this batch
        for idx, row in df_batch.iterrows():
            json_data = json.loads(row['attrs'])
            print(f"  - {json_data['user']['name']}: {json_data['metadata']['source']}")
read_parquet_in_batches('output_with_json_string.parquet')
Reading Parquet file in batches of 2:

Batch 1:
  Rows: 2
  IDs: [1, 2]
  - John Doe: web
  - Jane Smith: mobile

Batch 2:
  Rows: 2
  IDs: [3, 4]
  - Bob Johnson: api
  - Alice Brown: web

Batch 3:
  Rows: 1
  IDs: [5]
  - Charlie Wilson: mobile