Terraform Script¶
- Creates an S3 bucket (for Parquet uploads). Name shown in output
- Sets up an S3 Event Notification to trigger the Lambda
- Creates an SQS queue (as a Dead Letter Queue)
- Creates a Lambda function (that processes Parquet files)
terraform/main.tf terraform/variables.tf terraform/lambda/lambda_function.py
variables.tf
variable "region" {
default = "us-east-1"
}
main.tf
provider "aws" {
region = var.region
}
resource "aws_s3_bucket" "parquet_bucket" {
bucket = "parquet-ingest-${random_id.suffix.hex}"
}
resource "random_id" "suffix" {
byte_length = 4
}
resource "aws_sqs_queue" "dlq" {
name = "parquet-processing-dlq"
}
resource "aws_iam_role" "lambda_exec_role" {
name = "lambda_parquet_exec_role"
assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [{
Action = "sts:AssumeRole",
Effect = "Allow",
Principal = {
Service = "lambda.amazonaws.com"
}
}]
})
}
resource "aws_iam_policy" "lambda_policy" {
name = "lambda_parquet_policy"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = [
"s3:GetObject",
"s3:ListBucket"
],
Resource = [
aws_s3_bucket.parquet_bucket.arn,
"${aws_s3_bucket.parquet_bucket.arn}/*"
]
},
{
Effect = "Allow",
Action = "logs:*",
Resource = "*"
},
{
Effect = "Allow",
Action = "sqs:SendMessage",
Resource = aws_sqs_queue.dlq.arn
}
]
})
}
resource "aws_iam_role_policy_attachment" "attach_policy" {
role = aws_iam_role.lambda_exec_role.name
policy_arn = aws_iam_policy.lambda_policy.arn
}
data "archive_file" "lambda_zip" {
type = "zip"
source_dir = "${path.module}/lambda"
output_path = "${path.module}/lambda.zip"
}
resource "aws_lambda_function" "parquet_processor" {
function_name = "parquetProcessor"
filename = data.archive_file.lambda_zip.output_path
source_code_hash = data.archive_file.lambda_zip.output_base64sha256
role = aws_iam_role.lambda_exec_role.arn
handler = "lambda_function.lambda_handler"
runtime = "python3.11"
timeout = 30
dead_letter_config {
target_arn = aws_sqs_queue.dlq.arn
}
}
resource "aws_s3_bucket_notification" "bucket_notify" {
bucket = aws_s3_bucket.parquet_bucket.id
lambda_function {
lambda_function_arn = aws_lambda_function.parquet_processor.arn
events = ["s3:ObjectCreated:*"]
}
depends_on = [aws_lambda_permission.allow_s3]
}
resource "aws_lambda_permission" "allow_s3" {
statement_id = "AllowExecutionFromS3"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.parquet_processor.function_name
principal = "s3.amazonaws.com"
source_arn = aws_s3_bucket.parquet_bucket.arn
}
import boto3
import pyarrow.parquet as pq
import io
def lambda_handler(event, context):
s3 = boto3.client('s3')
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
try:
obj = s3.get_object(Bucket=bucket, Key=key)
table = pq.read_table(source=io.BytesIO(obj['Body'].read()))
print(f"Processed file: {key}")
except Exception as e:
print(f"ERROR processing {key}: {e}")
raise e # Ensure Lambda fails to trigger DLQ