Skip to content

Terraform Script

  • Creates an S3 bucket (for Parquet uploads). Name shown in output
  • Sets up an S3 Event Notification to trigger the Lambda
  • Creates an SQS queue (as a Dead Letter Queue)
  • Creates a Lambda function (that processes Parquet files)

terraform/main.tf terraform/variables.tf terraform/lambda/lambda_function.py

variables.tf
variable "region" {
  default = "us-east-1"
}

main.tf
provider "aws" {
  region = var.region
}

resource "aws_s3_bucket" "parquet_bucket" {
  bucket = "parquet-ingest-${random_id.suffix.hex}"
}

resource "random_id" "suffix" {
  byte_length = 4
}

resource "aws_sqs_queue" "dlq" {
  name = "parquet-processing-dlq"
}

resource "aws_iam_role" "lambda_exec_role" {
  name = "lambda_parquet_exec_role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [{
      Action = "sts:AssumeRole",
      Effect = "Allow",
      Principal = {
        Service = "lambda.amazonaws.com"
      }
    }]
  })
}

resource "aws_iam_policy" "lambda_policy" {
  name = "lambda_parquet_policy"

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect = "Allow",
        Action = [
          "s3:GetObject",
          "s3:ListBucket"
        ],
        Resource = [
          aws_s3_bucket.parquet_bucket.arn,
          "${aws_s3_bucket.parquet_bucket.arn}/*"
        ]
      },
      {
        Effect = "Allow",
        Action = "logs:*",
        Resource = "*"
      },
      {
        Effect = "Allow",
        Action = "sqs:SendMessage",
        Resource = aws_sqs_queue.dlq.arn
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "attach_policy" {
  role       = aws_iam_role.lambda_exec_role.name
  policy_arn = aws_iam_policy.lambda_policy.arn
}

data "archive_file" "lambda_zip" {
  type        = "zip"
  source_dir  = "${path.module}/lambda"
  output_path = "${path.module}/lambda.zip"
}

resource "aws_lambda_function" "parquet_processor" {
  function_name = "parquetProcessor"
  filename      = data.archive_file.lambda_zip.output_path
  source_code_hash = data.archive_file.lambda_zip.output_base64sha256
  role          = aws_iam_role.lambda_exec_role.arn
  handler       = "lambda_function.lambda_handler"
  runtime       = "python3.11"
  timeout       = 30

  dead_letter_config {
    target_arn = aws_sqs_queue.dlq.arn
  }
}

resource "aws_s3_bucket_notification" "bucket_notify" {
  bucket = aws_s3_bucket.parquet_bucket.id

  lambda_function {
    lambda_function_arn = aws_lambda_function.parquet_processor.arn
    events              = ["s3:ObjectCreated:*"]
  }

  depends_on = [aws_lambda_permission.allow_s3]
}

resource "aws_lambda_permission" "allow_s3" {
  statement_id  = "AllowExecutionFromS3"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.parquet_processor.function_name
  principal     = "s3.amazonaws.com"
  source_arn    = aws_s3_bucket.parquet_bucket.arn
}
Step 3: lambda/lambda_function.py

import boto3
import pyarrow.parquet as pq
import io

def lambda_handler(event, context):
    s3 = boto3.client('s3')
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        try:
            obj = s3.get_object(Bucket=bucket, Key=key)
            table = pq.read_table(source=io.BytesIO(obj['Body'].read()))
            print(f"Processed file: {key}")
        except Exception as e:
            print(f"ERROR processing {key}: {e}")
            raise e  # Ensure Lambda fails to trigger DLQ
terraform init terraform apply