Parse Parquet¶
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import json
from datetime import datetime
import io
Store JSON as String
def csv_to_parquet_with_schema(csv_file, parquet_file):
"""
Convert CSV to Parquet with proper schema definition
JSON will be treated as a string field
"""
df = pd.read_csv(csv_file)
# Convert create_dt to timestamp
df['create_dt'] = pd.to_datetime(df['create_dt'])
# Define the schema
schema = pa.schema([
('id', pa.int64()),
('create_dt', pa.timestamp('us')),
('attrs', pa.string()) # JSON stored as string
])
# Create PyArrow table
table = pa.Table.from_pandas(df, schema=schema)
# Write to Parquet with compression. Parquet version can also be set
pq.write_table(
table,
parquet_file,
compression='snappy',
use_dictionary=True
)
print(f"Parquet file created: {parquet_file}")
# Print file metadata
parquet_file_obj = pq.ParquetFile(parquet_file)
print(f"\nParquet file metadata:")
print(f" Number of rows: {parquet_file_obj.metadata.num_rows}")
print(f" Number of columns: {parquet_file_obj.metadata.num_columns}")
print(f" Schema: {parquet_file_obj.schema}")
print(f" Compression: snappy")
Parquet file created: output_with_json_string.parquet
Parquet file metadata:
Number of rows: 5
Number of columns: 3
Schema: <pyarrow._parquet.ParquetSchema object at 0x10e35d580>
required group field_id=-1 schema {
optional int64 field_id=-1 id;
optional int64 field_id=-1 create_dt (Timestamp(isAdjustedToUTC=false, timeUnit=microseconds, is_from_converted_type=false, force_set_converted_type=false));
optional binary field_id=-1 attrs (String);
}
Compression: snappy
def read_and_parse_parquet(parquet_file):
"""
Read Parquet file and demonstrate parsing JSON column
"""
print(f"\nReading Parquet file: {parquet_file}")
# Read the entire file
table = pq.read_table(parquet_file)
df = table.to_pandas()
print(f"\nDataFrame shape: {df.shape}")
print(f"\nDataFrame dtypes:\n{df.dtypes}")
# Parse JSON column for first row as example
print("\nParsing JSON from first row:")
first_row_json = json.loads(df.iloc[0]['attrs'])
print(json.dumps(first_row_json, indent=2))
# Example: Extract specific nested values
print("\nExtracting nested values from all rows:")
for idx, row in df.iterrows():
json_data = json.loads(row['attrs'])
user_name = json_data['user']['name']
city = json_data['user']['address']['city']
lat = json_data['user']['address']['coordinates']['lat']
email_pref = json_data['user']['preferences']['notifications']['email']
tags = json_data['metadata']['tags']
print(f"ID {row['id']}: {user_name} from {city} (lat: {lat}), "
f"email notifications: {email_pref}, tags: {tags}")
Reading Parquet file: output_with_json_string.parquet
DataFrame shape: (5, 3)
DataFrame dtypes:
id int64
create_dt datetime64[us]
attrs object
dtype: object
Parsing JSON from first row:
{
"user": {
"name": "John Doe",
"age": 32,
"address": {
"street": "123 Main St",
"city": "New York",
"state": "NY",
"coordinates": {
"lat": 40.7128,
"lng": -74.006
}
},
"preferences": {
"notifications": {
"email": true,
"sms": false,
"push": true
},
"theme": "dark"
}
},
"metadata": {
"source": "web",
"version": "2.1.0",
"tags": [
"premium",
"verified",
"active"
]
}
}
Extracting nested values from all rows:
ID 1: John Doe from New York (lat: 40.7128), email notifications: True, tags: ['premium', 'verified', 'active']
ID 2: Jane Smith from Los Angeles (lat: 34.0522), email notifications: False, tags: ['basic', 'new_user']
ID 3: Bob Johnson from Chicago (lat: 41.8781), email notifications: True, tags: ['enterprise', 'verified', 'beta_tester']
ID 4: Alice Brown from Houston (lat: 29.7604), email notifications: True, tags: ['premium', 'long_term_user', 'vip']
ID 5: Charlie Wilson from Phoenix (lat: 33.4484), email notifications: False, tags: ['basic']
def read_parquet_in_batches(parquet_file, batch_size=2):
"""
Demonstrate reading Parquet file in batches for memory efficiency
"""
print(f"\nReading Parquet file in batches of {batch_size}:")
parquet_file_obj = pq.ParquetFile(parquet_file)
batch_num = 0
for batch in parquet_file_obj.iter_batches(batch_size=batch_size):
batch_num += 1
df_batch = batch.to_pandas()
print(f"\nBatch {batch_num}:")
print(f" Rows: {len(df_batch)}")
print(f" IDs: {df_batch['id'].tolist()}")
# Process JSON in this batch
for idx, row in df_batch.iterrows():
json_data = json.loads(row['attrs'])
print(f" - {json_data['user']['name']}: {json_data['metadata']['source']}")
Reading Parquet file in batches of 2:
Batch 1:
Rows: 2
IDs: [1, 2]
- John Doe: web
- Jane Smith: mobile
Batch 2:
Rows: 2
IDs: [3, 4]
- Bob Johnson: api
- Alice Brown: web
Batch 3:
Rows: 1
IDs: [5]
- Charlie Wilson: mobile