πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

AWS Batch Analytics Interview Questions

AWS Data EngineeringBatch Processing & Analytics Interview Preparation⭐ Premium

Advertisement

πŸ“Š AWS Batch Analytics Interview

Comprehensive interview preparation for batch processing, ETL pipelines, and analytics on AWS.

Module: AWS Data Engineering β€’ Topic 51 of 65 β€’ Premium Content

Batch Processing Architecture Overview

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    AWS Batch Analytics Architecture                 β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚  β”‚   S3     │───▢│   EMR    │───▢│  Glue    │───▢│ Redshift β”‚      β”‚
β”‚  β”‚  Data    β”‚    β”‚  Spark   β”‚    β”‚  ETL     β”‚    β”‚  DW      β”‚      β”‚
β”‚  β”‚  Lake    β”‚    β”‚  Cluster β”‚    β”‚  Jobs    β”‚    β”‚          β”‚      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β”‚       β”‚              β”‚               β”‚               β”‚              β”‚
β”‚       β–Ό              β–Ό               β–Ό               β–Ό              β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚  β”‚  Athena  β”‚    β”‚  DynamoDBβ”‚    β”‚  SQS     β”‚    β”‚ QuickSightβ”‚     β”‚
β”‚  β”‚  Query   β”‚    β”‚  Metadataβ”‚    β”‚  Queue   β”‚    β”‚  Viz     β”‚      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β”‚                                                                     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Q1: How do you design a fault-tolerant batch processing pipeline on AWS?

Answer:

A fault-tolerant batch pipeline requires multiple layers of protection:

1. Checkpointing Strategy:

# EMR Spark checkpointing
spark.conf.set("spark.streaming.streamingContext.checkpointDirectory", 
               "s3://bucket/checkpoints/")

# Glue bookmarking
glueContext.create_dynamic_frame.from_catalog(
    database="mydb",
    table_name="mytable",
    transformation_ctx="datasource0"  # Enables job bookmarking
)

2. Dead Letter Queue Pattern:

Architecture Diagram
Source β†’ SQS Main Queue β†’ Processing β†’ Success β†’ S3
                              ↓
                         Failure β†’ DLQ β†’ Alert β†’ Manual Review

3. Retry Configuration:

# Step Functions retry logic
{
  "Type": "Task",
  "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
  "Retry": [
    {
      "ErrorEquals": ["States.TaskFailed"],
      "IntervalSeconds": 60,
      "MaxAttempts": 3,
      "BackoffRate": 2.0
    }
  ]
}

ℹ️

Key Interview Point: Always mention idempotency when discussing fault tolerance. Each batch job should produce the same result even if executed multiple times.

Q2: Compare EMR vs Glue for batch processing. When would you choose each?

Answer:

FeatureEMRGlue
ControlFull cluster controlServerless, managed
Cost ModelEC2 instancesDPU-hours
CustomizationCustom JARs, bootstrapPySpark/Scala only
Use CaseComplex, long-runningStandard ETL

Choose EMR when:

  • Running custom Spark/Flink applications
  • Need specific instance types (GPU, memory-optimized)
  • Long-running clusters (hours/days)
  • Require fine-grained tuning

Choose Glue when:

  • Standard ETL transformations
  • Want serverless, auto-scaling
  • Need built-in data catalog
  • Short-running jobs (minutes)
Architecture Diagram
Decision Tree:
                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚ Need custom JAR? β”‚
                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                             β”‚
              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
              β–Ό                              β–Ό
             Yes                            No
              β”‚                              β”‚
              β–Ό                              β–Ό
        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
        β”‚  EMR    β”‚              β”‚ Need real-time?  β”‚
        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                          β”‚
                           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                           β–Ό                              β–Ό
                          Yes                            No
                           β”‚                              β”‚
                           β–Ό                              β–Ό
                     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                     β”‚Kinesis/ β”‚                  β”‚  Glue   β”‚
                     β”‚ MSK     β”‚                  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Q3: How do you optimize Spark jobs on EMR for better performance?

Answer:

1. Instance Selection:

# Use Graviton instances for cost savings
aws emr create-cluster --instance-groups \
  InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m6g.xlarge \
  InstanceGroupType=CORE,InstanceCount=4,InstanceType=m6g.2xlarge

2. Spark Configuration Tuning:

// Optimal Spark settings for batch
spark.sql.shuffle.partitions=200  // Default 200, adjust based on data
spark.executor.memory=8g
spark.executor.cores=4
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=2
spark.dynamicAllocation.maxExecutors=50

3. Data Format Optimization:

# Use Parquet with Snappy compression
df.write.partitionBy("date").parquet("s3://bucket/output/")

# Optimal partition size: 128MB - 1GB per file

4. Caching Strategy:

# Cache frequently accessed DataFrames
df.cache()  # or df.persist(StorageLevel.MEMORY_AND_DISK)

⚠️

Common Mistake: Over-partitioning data. Too many small files cause overhead. Aim for 128MB-1GB partitions.

Q4: Explain the Glue Job Bookmark mechanism and its benefits.

Answer:

Glue Bookmarks track processed data to enable incremental processing:

# Enable bookmarking (default behavior)
datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database="mydb",
    table_name="mytable",
    transformation_ctx="datasource0"  # Bookmark context
)

# Process only new data
apply_mapping = ApplyMapping.apply(
    frame=datasource0,
    transformation_ctx="apply_mapping"
)

sink = glueContext.write_dynamic_frame.from_catalog(
    frame=apply_mapping,
    database="outputdb",
    table_name="outputtable",
    transformation_ctx="sink0"
)

Bookmark States:

Architecture Diagram
Job Start β†’ Read Bookmark β†’ Filter New Data β†’ Process β†’ Write β†’ Update Bookmark
                    ↑                                          β”‚
                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Benefits:

  • Eliminates duplicate processing
  • Reduces runtime for incremental loads
  • No need for custom watermark tracking
  • Automatic state management

Q5: How do you handle schema evolution in batch pipelines?

Answer:

1. Glue Schema Evolution:

# Enable schema evolution in Glue
sink = glueContext.write_dynamic_frame.from_catalog(
    frame=apply_mapping,
    database="outputdb",
    table_name="outputtable",
    enable_update_catalog=True,
    update_catalog_options={
        "updateBehavior": "UPDATE_IN_DATABASE",
        "schemaChangePolicy": "LOG"  # or "UPDATE"
    }
)

2. Spark Schema Evolution:

# Read with merge schema option
df = spark.read \
    .option("mergeSchema", "true") \
    .parquet("s3://bucket/data/")

# Write with overwrite schema
df.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .parquet("s3://bucket/output/")

3. Schema Registry with Glue:

# Register schema in Glue Data Catalog
import boto3
glue = boto3.client('glue')

glue.register_schema_version(
    SchemaId={
        'RegistryName': 'my-registry',
        'SchemaName': 'my-schema'
    },
    SchemaVersionNumber={
        'LatestVersion': True
    }
)
Architecture Diagram
Schema Evolution Flow:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Source   │────▢│ Schema   │────▢│ Target   β”‚
β”‚ Schema   β”‚     β”‚ Registry β”‚     β”‚ Schema   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
      β”‚               β”‚                β”‚
      β–Ό               β–Ό                β–Ό
   Compare ──▢ Compatible? ──▢ Auto Merge
                   β”‚
              Incompatible
                   β”‚
                   β–Ό
              Alert & Stop

Q6: Design a data quality framework for batch processing on AWS.

Answer:

Multi-Layer Quality Framework:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  Data Quality Framework                     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  Layer 1: Schema Validation                                β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ β€’ Column existence  β€’ Data types  β€’ Null checks     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚  Layer 2: Statistical Validation                           β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ β€’ Range checks  β€’ Distribution  β€’ Outlier detection β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚  Layer 3: Business Rules                                   β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ β€’ Referential integrity  β€’ Cross-column logic       β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚  Layer 4: Completeness                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ β€’ Record counts  β€’ Freshness  β€’ Coverage            β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Implementation with Glue:

from pyspark.sql import DataFrame
from pyspark.sql.functions import col, count, when

def validate_data(df: DataFrame, rules: dict) -> dict:
    results = {}
    
    # Schema validation
    for col_name, expected_type in rules.get('schema', {}).items():
        results[f'schema_{col_name}'] = (
            df.schema[col_name].dataType == expected_type
        )
    
    # Null checks
    for col_name in rules.get('not_null', []):
        null_count = df.filter(col(col_name).isNull()).count()
        results[f'null_{col_name}'] = null_count == 0
    
    # Range checks
    for col_name, (min_val, max_val) in rules.get('range', {}).items():
        out_of_range = df.filter(
            (col(col_name) < min_val) | (col(col_name) > max_val)
        ).count()
        results[f'range_{col_name}'] = out_of_range == 0
    
    return results

Q7: How do you implement idempotency in AWS batch jobs?

Answer:

Idempotency Patterns:

1. Deterministic Output Paths:

# Use partition keys for deterministic paths
output_path = f"s3://bucket/output/year={year}/month={month}/day={day}/"

df.write.mode("overwrite").partitionBy("year", "month", "day").parquet(output_path)

2. Upsert Pattern with DynamoDB:

import boto3
from boto3.dynamodb.conditions import Key

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('processed_records')

def upsert_record(record):
    table.put_item(
        Item=record,
        ConditionExpression='attribute_not_exists(record_id) OR updated_at < :ts',
        ExpressionAttributeValues={':ts': record['updated_at']}
    )

3. Glue Bookmark Reset:

# Reset bookmark for reprocessing
aws glue reset-job-bookmark --job-name my-job

4. S3 Versioning for Rollback:

# Enable versioning for safe overwrites
s3 = boto3.client('s3')
s3.put_bucket_versioning(
    Bucket='my-bucket',
    VersioningConfiguration={'Status': 'Enabled'}
)

⚠️

Critical: Always design batch jobs to be re-runnable. If a job fails midway, re-running should not create duplicates or corrupt data.

Q8: Explain the cost optimization strategies for EMR clusters.

Answer:

1. Instance Fleet with Spot:

aws emr create-cluster --instance-fleets '[
  {
    "InstanceFleetType": "MASTER",
    "TargetOnDemandCapacity": 1,
    "InstanceTypeConfigs": [{"InstanceType": "m5.xlarge"}]
  },
  {
    "InstanceFleetType": "CORE",
    "TargetOnDemandCapacity": 2,
    "TargetSpotCapacity": 8,
    "InstanceTypeConfigs": [
      {"InstanceType": "m5.xlarge", "WeightedCapacity": 1},
      {"InstanceType": "m5.2xlarge", "WeightedCapacity": 2}
    ]
  }
]'

2. Auto-scaling Configuration:

{
  "ScaleOut": {
    "MetricName": "YARNMemoryAvailablePercentage",
    "TargetValue": 75.0,
    "ScaleOutCooldown": 300
  },
  "ScaleIn": {
    "MetricName": "YARNMemoryAvailablePercentage", 
    "TargetValue": 30.0,
    "ScaleInCooldown": 600
  }
}

3. S3 Storage Classes:

# Move processed data to cheaper storage
s3 = boto3.client('s3')
s3.put_object_tagging(
    Bucket='my-bucket',
    Key='processed/data.parquet',
    Tagging={'TagSet': [{'Key': 'tier', 'Value': 'archive'}]}
)

# Lifecycle policy moves to Glacier after 30 days

Cost Comparison:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Instance Type   β”‚ On-Demand  β”‚ 1yr RI     β”‚ Spot        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ m5.xlarge       β”‚ $0.192/hr  β”‚ $0.121/hr  β”‚ $0.058/hr   β”‚
β”‚ m5.2xlarge      β”‚ $0.384/hr  β”‚ $0.242/hr  β”‚ $0.116/hr   β”‚
β”‚ r5.xlarge       β”‚ $0.252/hr  β”‚ $0.159/hr  β”‚ $0.076/hr   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Q9: How do you monitor and alert on batch job failures?

Answer:

CloudWatch Metrics + Alarms:

import boto3

cloudwatch = boto3.client('cloudwatch')

# Create alarm for Glue job failures
cloudwatch.put_metric_alarm(
    AlarmName='GlueJobFailure',
    MetricName='Failed',
    Namespace='AWS/Glue',
    Statistic='Sum',
    Period=300,
    EvaluationPeriods=1,
    Threshold=1,
    ComparisonOperator='GreaterThanOrEqualToThreshold',
    AlarmActions=['arn:aws:sns:us-east-1:123456789:alerts']
)

Step Functions Error Handling:

{
  "Type": "Parallel",
  "Branches": [
    {
      "StartAt": "ProcessData",
      "States": {
        "ProcessData": {
          "Type": "Task",
          "Resource": "arn:aws:states:::emr:addStep.sync",
          "Catch": [
            {
              "ErrorEquals": ["States.ALL"],
              "Next": "NotifyFailure",
              "ResultPath": "$.error"
            }
          ]
        },
        "NotifyFailure": {
          "Type": "Task",
          "Resource": "arn:aws:states:::sns:publish",
          "Parameters": {
            "TopicArn": "arn:aws:sns:us-east-1:123456789:alerts",
            "Message.$": "$.error"
          }
        }
      }
    }
  ]
}

Dashboard Setup:

Architecture Diagram
CloudWatch Dashboard: Batch Monitoring
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Jobs Running β”‚ Jobs Failed β”‚ Avg Duration β”‚ Cost Today β”‚
β”‚      12       β”‚      1      β”‚    45 min    β”‚   $234     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  [Chart: Job Duration Trend]                            β”‚
β”‚  [Chart: Error Rate by Job Type]                        β”‚
β”‚  [Chart: Cost by Cluster]                               β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Q10: Describe the architecture for a petabyte-scale data warehouse on AWS.

Answer:

Architecture Components:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Petabyte-Scale Data Warehouse Architecture         β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  Ingestion Layer          Processing Layer        Storage       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ Kinesis     │────────▢│ EMR Spark   │────▢│ S3 Data Lakeβ”‚   β”‚
β”‚  β”‚ Streams     β”‚         β”‚             β”‚     β”‚ (Parquet)   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                      β”‚          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚          β”‚
β”‚  β”‚ DMS         │────────▢│ Glue ETL    │─────────────          β”‚
β”‚  β”‚ (CDC)       β”‚         β”‚             β”‚            β”‚          β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β”‚          β”‚
β”‚                                                      β–Ό          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ SFTP/FTP    │────────▢│ Lambda      │────▢│ Redshift    β”‚   β”‚
β”‚  β”‚             β”‚         β”‚ Parser      β”‚     β”‚ Spectrum     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                      β”‚          β”‚
β”‚                                                 β”Œβ”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”   β”‚
β”‚                                                 β”‚QuickSightβ”‚   β”‚
β”‚                                                 β”‚ Athena   β”‚   β”‚
β”‚                                                 β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Redshift Configuration for Petabyte Scale:

-- Create distkey and sortkey for optimal performance
CREATE TABLE fact_orders (
    order_id BIGINT NOT NULL,
    customer_id BIGINT NOT NULL,
    order_date TIMESTAMP NOT NULL,
    amount DECIMAL(10,2),
    status VARCHAR(20)
)
DISTSTYLE KEY
DISTKEY(customer_id)
COMPOUND SORTKEY(order_date, customer_id);

-- Enable auto vacuum
VACUUM SORT ONLY fact_orders;

-- Analyze compression
ANALYZE COMPRESSION fact_orders;

Data Distribution Strategy:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Redshift Distribution Styles                β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                         β”‚
β”‚  KEY Distribution         EVEN Distribution            β”‚
β”‚  β”Œβ”€β”€β”€β”¬β”€β”€β”€β”¬β”€β”€β”€β”           β”Œβ”€β”€β”€β”¬β”€β”€β”€β”¬β”€β”€β”€β”               β”‚
β”‚  β”‚ A β”‚ A β”‚ B β”‚           β”‚ 1 β”‚ 2 β”‚ 3 β”‚               β”‚
β”‚  β”‚ A β”‚ A β”‚ B β”‚           β”‚ 4 β”‚ 5 β”‚ 6 β”‚               β”‚
β”‚  β”‚ B β”‚ B β”‚ A β”‚           β”‚ 7 β”‚ 8 β”‚ 9 β”‚               β”‚
β”‚  β””β”€β”€β”€β”΄β”€β”€β”€β”΄β”€β”€β”€β”˜           β””β”€β”€β”€β”΄β”€β”€β”€β”΄β”€β”€β”€β”˜               β”‚
β”‚  (Join columns)          (Even spread)                 β”‚
β”‚                                                         β”‚
β”‚  ALL Distribution                                        β”‚
β”‚  β”Œβ”€β”€β”€β”¬β”€β”€β”€β”¬β”€β”€β”€β”                                         β”‚
β”‚  β”‚ X β”‚ X β”‚ X β”‚  (Small tables - replicated)           β”‚
β”‚  β”‚ X β”‚ X β”‚ X β”‚                                         β”‚
β”‚  β””β”€β”€β”€β”΄β”€β”€β”€β”΄β”€β”€β”€β”˜                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Q11: How do you implement data partitioning strategies for batch analytics?

Answer:

Partitioning Principles:

# Optimal partition size: 128MB - 1GB per file
# Calculate partitions based on data volume

# For 1TB daily data, target ~1000 partitions
daily_partitions = total_bytes / target_partition_size
# 1TB / 1GB = 1024 partitions

# Partition by date and region
df.write \
    .partitionBy("year", "month", "day", "region") \
    .parquet("s3://bucket/data/")

Partition Pruning:

-- Effective partition pruning
SELECT * FROM events 
WHERE year = 2024 AND month = 1 AND day = 15;

-- Ineffective - full scan
SELECT * FROM events 
WHERE month = 1;

Partition Strategy Decision Tree:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚           Partitioning Decision Tree                    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                         β”‚
β”‚  Query Pattern Analysis                                β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ What columns are most frequently filtered?      β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                         β”‚                              β”‚
β”‚         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”‚
β”‚         β–Ό               β–Ό               β–Ό              β”‚
β”‚    Time-based      Category-based   Hybrid            β”‚
β”‚    (date/hour)     (region/type)    (date+region)     β”‚
β”‚         β”‚               β”‚               β”‚              β”‚
β”‚         β–Ό               β–Ό               β–Ό              β”‚
β”‚    High cardinality  Low cardinality  Balanced         β”‚
β”‚    (hourly)         (10-100 values)  (daily+region)   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Q12: Explain the ETL vs ELT paradigm in AWS data pipelines.

Answer:

ETL (Extract, Transform, Load):

Architecture Diagram
Source ──▢ Extract ──▢ Transform ──▢ Load ──▢ Target
                         β”‚
                    (in processing
                     engine: Glue/EMR)

ELT (Extract, Load, Transform):

Architecture Diagram
Source ──▢ Extract ──▢ Load ──▢ Transform ──▢ Target
                         β”‚         β”‚
                    (raw data    (in target
                     to S3)      warehouse)

When to Use Each:

ScenarioETLELT
Small datasets (<100GB)βœ“
Large datasets (>1TB)βœ“
Complex transformationsβœ“
Simple transformationsβœ“
Need raw dataδΏη•™βœ“
Cost-sensitiveβœ“

AWS Implementation:

# ETL with Glue
# Transform before loading to Redshift
source = glueContext.create_dynamic_frame.from_catalog(...)
transformed = ApplyMapping.apply(frame=source, ...)
glueContext.write_dynamic_frame.from_jdbc_conf(
    frame=transformed,
    catalog_connection="redshift",
    connection_options={"dbtable": "fact_table", "database": "dw"}
)

# ELT with Redshift Spectrum
# Load raw to S3, transform in Redshift
# 1. Load raw to S3
raw_df.write.parquet("s3://bucket/raw/")

# 2. Transform in Redshift
"""
CREATE TABLE fact_table AS
SELECT * FROM raw_data
WHERE is_valid = true;
"""

Q13: How do you handle late-arriving data in batch processing?

Answer:

Late Data Strategies:

1. Watermark-Based Processing:

# Spark Structured Streaming with watermark
streaming_df \
    .withWatermark("event_time", "1 hour") \
    .groupBy(
        window("event_time", "1 hour"),
        "category"
    ) \
    .count()

2. Lambda Architecture:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  Lambda Architecture                    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                         β”‚
β”‚  Real-time Layer        Batch Layer                    β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”‚
β”‚  β”‚  Kinesis    β”‚       β”‚   S3 + EMR   β”‚                β”‚
β”‚  β”‚  Streams    β”‚       β”‚   Daily Job  β”‚                β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                β”‚
β”‚         β”‚                     β”‚                        β”‚
β”‚         β–Ό                     β–Ό                        β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”‚
β”‚  β”‚  Real-time  β”‚       β”‚  Batch      β”‚                β”‚
β”‚  β”‚  Views      β”‚       β”‚  Views      β”‚                β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                β”‚
β”‚         β”‚                     β”‚                        β”‚
β”‚         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                        β”‚
β”‚                    β–Ό                                   β”‚
β”‚             β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                            β”‚
β”‚             β”‚   Serving   β”‚                            β”‚
β”‚             β”‚   Layer     β”‚                            β”‚
β”‚             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                            β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

3. Backfill Mechanism:

# Process late data by re-running specific partitions
def process_date_range(start_date, end_date):
    for date in date_range(start_date, end_date):
        # Reprocess specific partition
        process_partition(date)

# Schedule backfill via Step Functions
{
  "Type": "Map",
  "ItemsProcessor": {
    "Iterator": {
      "StartAt": "ProcessPartition",
      "States": {
        "ProcessPartition": {
          "Type": "Task",
          "Resource": "processLambda"
        }
      }
    }
  }
}

Q14: Design a metadata-driven batch processing framework.

Answer:

Metadata Store Design:

-- Glue Data Catalog or custom DynamoDB table
CREATE TABLE job_config (
    job_id VARCHAR(50) PRIMARY KEY,
    source_type VARCHAR(20),
    source_path VARCHAR(500),
    target_path VARCHAR(500),
    transform_type VARCHAR(50),
    schedule VARCHAR(20),
    enabled BOOLEAN,
    last_run TIMESTAMP,
    next_run TIMESTAMP
);

Dynamic Job Execution:

import json
import boto3

def lambda_handler(event, context):
    dynamodb = boto3.resource('dynamodb')
    config_table = dynamodb.Table('job_config')
    
    # Get enabled jobs
    response = config_table.scan(
        FilterExpression='enabled = :val',
        ExpressionAttributeValues={':val': True}
    )
    
    for job in response['Items']:
        execute_job(job)

def execute_job(job_config):
    # Dynamic ETL based on configuration
    if job_config['transform_type'] == 'aggregation':
        run_aggregation_job(job_config)
    elif job_config['transform_type'] == 'join':
        run_join_job(job_config)
    elif job_config['transform_type'] == 'filter':
        run_filter_job(job_config)

Orchestration with Step Functions:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚            Metadata-Driven Orchestration                β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                       β”‚
β”‚  β”‚ DynamoDB    β”‚                                       β”‚
β”‚  β”‚ Config      β”‚                                       β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                                       β”‚
β”‚         β”‚                                               β”‚
β”‚         β–Ό                                               β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                       β”‚
β”‚  β”‚ Lambda      β”‚                                       β”‚
β”‚  β”‚ Dispatcher  β”‚                                       β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                                       β”‚
β”‚         β”‚                                               β”‚
β”‚    β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”                       β”‚
β”‚    β–Ό         β–Ό        β–Ό        β–Ό                       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”                  β”‚
β”‚  β”‚Job Aβ”‚  β”‚Job Bβ”‚  β”‚Job Cβ”‚  β”‚Job Dβ”‚                  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”˜                  β”‚
β”‚    β”‚         β”‚        β”‚        β”‚                       β”‚
β”‚    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”˜                       β”‚
β”‚                    β”‚                                   β”‚
β”‚                    β–Ό                                   β”‚
β”‚             β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                            β”‚
β”‚             β”‚ Status      β”‚                            β”‚
β”‚             β”‚ Update      β”‚                            β”‚
β”‚             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                            β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Q15: How do you implement data lineage tracking in batch pipelines?

Answer:

Lineage Tracking Components:

# Custom lineage tracker
class LineageTracker:
    def __init__(self, job_name):
        self.job_name = job_name
        self.lineage = {
            'job': job_name,
            'inputs': [],
            'outputs': [],
            'transformations': [],
            'timestamp': datetime.now().isoformat()
        }
    
    def add_input(self, source, record_count):
        self.lineage['inputs'].append({
            'source': source,
            'record_count': record_count
        })
    
    def add_output(self, target, record_count):
        self.lineage['outputs'].append({
            'target': target,
            'record_count': record_count
        })
    
    def save_lineage(self):
        # Store in DynamoDB or Glue
        dynamodb = boto3.resource('dynamodb')
        table = dynamodb.Table('data_lineage')
        table.put_item(Item=self.lineage)

AWS Native Lineage:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              AWS Lineage Tracking                       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                         β”‚
β”‚  Glue Data Catalog ──────▢ Table/Partition History      β”‚
β”‚         β”‚                                               β”‚
β”‚         β”œβ”€β”€β–Ά Column lineage                            β”‚
β”‚         β”œβ”€β”€β–Ά Job run history                            β”‚
β”‚         └──▢ Transformation details                     β”‚
β”‚                                                         β”‚
β”‚  CloudTrail ─────────────▢ API call lineage             β”‚
β”‚         β”‚                                               β”‚
β”‚         └──▢ S3, EMR, Glue operations                  β”‚
β”‚                                                         β”‚
β”‚  CloudWatch ─────────────▢ Job metrics lineage          β”‚
β”‚         β”‚                                               β”‚
β”‚         └──▢ Execution flow                            β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

ℹ️

Interview Tip: Mention AWS Lake Formation for centralized lineage tracking. It provides column-level lineage across AWS analytics services.

Q16: Explain the concept of data skew and how to handle it in Spark.

Answer:

Data Skew Detection:

# Detect skew by checking partition sizes
df.groupBy("key").count().orderBy("count", ascending=False).show()

# If some partitions are much larger, you have skew

Handling Skew:

1. Salting Technique:

# Add random salt to skewed key
from pyspark.sql.functions import rand, lit

# Salt the skewed key
salted_df = df.withColumn("salt", (rand() * 10).cast("int"))
salted_df = salted_df.withColumn("salted_key", 
    concat(col("key"), lit("_"), col("salt")))

# Repartition and join
repartitioned = salted_df.repartition(200, "salted_key")

2. Broadcast Join for Small Tables:

from pyspark.sql.functions import broadcast

# Broadcast small table to avoid shuffle
result = large_df.join(broadcast(small_df), "key")

3. AQE (Adaptive Query Execution):

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

Q17: How do you implement a data warehouse slowly changing dimension (SCD) on AWS?

Answer:

SCD Type 2 Implementation:

-- Redshift SCD Type 2
CREATE TABLE dim_customer_scd (
    customer_sk BIGINT IDENTITY(1,1),  -- Surrogate key
    customer_id INT,
    name VARCHAR(100),
    email VARCHAR(200),
    effective_date TIMESTAMP,
    end_date TIMESTAMP,
    is_current BOOLEAN
);

-- Merge procedure
MERGE INTO dim_customer_scd t
USING staging_customer s
ON t.customer_id = s.customer_id AND t.is_current = TRUE
WHEN MATCHED AND (
    t.name != s.name OR t.email != s.email
) THEN
    UPDATE SET 
        is_current = FALSE,
        end_date = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
    INSERT (customer_id, name, email, effective_date, end_date, is_current)
    VALUES (s.customer_id, s.name, s.email, CURRENT_TIMESTAMP, NULL, TRUE);

Glue SCD Implementation:

# Use AWS Glue resolveChoice for SCD
from awsglue.transforms import ResolveChoice

# Apply SCD Type 2 logic
apply_scd = DynamicFrame.fromDF(
    resolve_choice.resolve_choice(
        dynamic_frame=source_df,
        choice="matched:columns",
        casecade=True
    ),
    glueContext,
    "apply_scd"
)

Q18: Describe the architecture for real-time batch aggregation on AWS.

Answer:

Micro-Batch Aggregation Architecture:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚           Real-Time Batch Aggregation                   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                       β”‚
β”‚  β”‚ Event Sourcesβ”‚                                       β”‚
β”‚  β”‚ (IoT/Apps)  β”‚                                       β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                                       β”‚
β”‚         β”‚                                               β”‚
β”‚         β–Ό                                               β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                   β”‚
β”‚  β”‚ Kinesis     │────▢│ Lambda      β”‚                   β”‚
β”‚  β”‚ Data Firehoseβ”‚    β”‚ Aggregation β”‚                   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                   β”‚
β”‚                             β”‚                           β”‚
β”‚              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”‚
β”‚              β–Ό              β–Ό              β–Ό           β”‚
β”‚        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚        β”‚ 1-min   β”‚    β”‚ 5-min   β”‚    β”‚ 1-hour  β”‚     β”‚
β”‚        β”‚ Agg     β”‚    β”‚ Agg     β”‚    β”‚ Agg     β”‚     β”‚
β”‚        β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜     β”‚
β”‚             β”‚              β”‚              β”‚           β”‚
β”‚             β–Ό              β–Ό              β–Ό           β”‚
β”‚        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚
β”‚        β”‚     S3 (Partitioned by time)        β”‚       β”‚
β”‚        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚
β”‚                        β”‚                              β”‚
β”‚                        β–Ό                              β”‚
β”‚        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚
β”‚        β”‚         Athena / Redshift           β”‚       β”‚
β”‚        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Lambda Aggregation Code:

import json
from datetime import datetime

def lambda_handler(event, context):
    # Aggregate records in micro-batch
    aggregated = {}
    
    for record in event['Records']:
        data = json.loads(record['kinesis']['data'])
        key = data['category']
        
        if key not in aggregated:
            aggregated[key] = {'count': 0, 'sum': 0}
        
        aggregated[key]['count'] += 1
        aggregated[key]['sum'] += data['amount']
    
    # Write aggregated results
    for category, metrics in aggregated.items():
        write_to_s3(category, metrics)

Q19: How do you optimize SQL queries for large-scale batch analytics?

Answer:

Query Optimization Techniques:

1. Materialized Views:

-- Create materialized view for frequent aggregations
CREATE MATERIALIZED VIEW mv_daily_sales AS
SELECT 
    date_trunc('day', order_date) as sale_date,
    region,
    product_category,
    SUM(amount) as total_sales,
    COUNT(*) as order_count
FROM fact_orders
GROUP BY 1, 2, 3;

-- Refresh periodically
REFRESH MATERIALIZED VIEW mv_daily_sales;

2. Sort Key Optimization:

-- Compound sort key for multi-column filters
CREATE TABLE events (
    event_id BIGINT,
    event_date TIMESTAMP,
    user_id BIGINT,
    event_type VARCHAR(50)
)
COMPOUND SORTKEY(event_date, user_id);

-- Interleaved sort key for independent column filters
CREATE TABLE logs (
    log_id BIGINT,
    timestamp TIMESTAMP,
    user_id BIGINT,
    action VARCHAR(100)
)
INTERLEAVED SORTKEY(timestamp, user_id, action);

3. Query Execution Plan:

-- Analyze query plan
EXPLAIN 
SELECT * FROM events 
WHERE event_date = '2024-01-01' 
AND user_id = 12345;

Performance Comparison:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚           Query Performance Optimization                β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                         β”‚
β”‚  Without Optimization          With Optimization       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ Full table scan     β”‚      β”‚ Partition pruning   β”‚  β”‚
β”‚  β”‚ 2.5 hours          β”‚      β”‚ 15 minutes         β”‚  β”‚
β”‚  β”‚ $150 cost          β”‚      β”‚ $15 cost           β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚                                                         β”‚
β”‚  Techniques Applied:                                    β”‚
β”‚  βœ“ Sort keys aligned with query patterns               β”‚
β”‚  βœ“ Materialized views for aggregations                 β”‚
β”‚  βœ“ Partition pruning                                   β”‚
β”‚  βœ“ Compression encoding                                β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Q20: Explain how to implement data quality SLAs in batch processing.

Answer:

SLA Framework:

from datetime import datetime, timedelta

class SLAMonitor:
    def __init__(self, job_name, sla_config):
        self.job_name = job_name
        self.sla_config = sla_config
    
    def check_sla(self, job_metrics):
        violations = []
        
        # Check completion time
        if job_metrics['end_time'] > self.sla_config['deadline']:
            violations.append({
                'type': 'COMPLETION_TIME',
                'expected': self.sla_config['deadline'],
                'actual': job_metrics['end_time']
            })
        
        # Check data freshness
        freshness = datetime.now() - job_metrics['last_record_time']
        if freshness > self.sla_config['max_freshness']:
            violations.append({
                'type': 'DATA_FRESHNESS',
                'expected': self.sla_config['max_freshness'],
                'actual': freshness
            })
        
        # Check completeness
        completeness = job_metrics['processed'] / job_metrics['expected']
        if completeness < self.sla_config['min_completeness']:
            violations.append({
                'type': 'COMPLETENESS',
                'expected': self.sla_config['min_completeness'],
                'actual': completeness
            })
        
        return violations

CloudWatch SLA Metrics:

cloudwatch = boto3.client('cloudwatch')

# Publish SLA metrics
cloudwatch.put_metric_data(
    Namespace='BatchAnalytics/SLA',
    MetricData=[
        {
            'MetricName': 'SLAViolations',
            'Dimensions': [
                {'Name': 'JobName', 'Value': 'daily_etl'},
                {'Name': 'ViolationType', 'Value': 'completion_time'}
            ],
            'Value': 1,
            'Unit': 'Count'
        }
    ]
)

Q21: How do you implement data replication for disaster recovery in batch systems?

Answer:

Cross-Region Replication Architecture:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Disaster Recovery Architecture                     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  Primary Region (us-east-1)      DR Region (us-west-2)        β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚
β”‚  β”‚ S3 Bucket           │───────▢│ S3 Replica Bucket   β”‚       β”‚
β”‚  β”‚ (Versioning Enabled)β”‚  CRR   β”‚                     β”‚       β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚
β”‚  β”‚ Redshift Cluster    │───────▢│ Redshift Snapshot   β”‚       β”‚
β”‚  β”‚                     β”‚  Cross β”‚ Transfer            β”‚       β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  Regionβ””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚
β”‚  β”‚ DynamoDB Table      │───────▢│ DynamoDB Replica    β”‚       β”‚
β”‚  β”‚                     β”‚  DTR   β”‚                     β”‚       β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚
β”‚                                                                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

S3 Cross-Region Replication:

# Enable CRR via boto3
s3 = boto3.client('s3')

s3.put_bucket_replication(
    Bucket='primary-bucket',
    ReplicationConfiguration={
        'Role': 'arn:aws:iam::role/s3-replication-role',
        'Rules': [
            {
                'ID': 'replicate-all',
                'Status': 'Enabled',
                'Destination': {
                    'Bucket': 'arn:aws:s3:::dr-bucket',
                    'StorageClass': 'STANDARD_IA'
                }
            }
        ]
    }
)

Automated Failover:

# Route 53 health check + failover
route53 = boto3.client('route53')

route53.change_resource_record_sets(
    HostedZoneId='Z1234567890',
    ChangeBatch={
        'Changes': [{
            'Action': 'UPSERT',
            'ResourceRecordSet': {
                'Name': 'analytics.example.com',
                'Type': 'CNAME',
                'SetIdentifier': 'primary',
                'Failover': 'PRIMARY',
                'TTL': 60,
                'ResourceRecords': [{'Value': 'primary-redshift.cluster-xxx.us-east-1.rds.amazonaws.com'}]
            }
        }]
    }
)

Q22: Design a multi-tenant batch processing system on AWS.

Answer:

Multi-Tenant Architecture:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Multi-Tenant Batch Processing                      β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  Tenant A          Tenant B          Tenant C                  β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”‚
β”‚  β”‚ Config  β”‚      β”‚ Config  β”‚      β”‚ Config  β”‚                β”‚
β”‚  β”‚ Table   β”‚      β”‚ Table   β”‚      β”‚ Table   β”‚                β”‚
β”‚  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜                β”‚
β”‚       β”‚                β”‚                β”‚                       β”‚
β”‚       β–Ό                β–Ό                β–Ό                       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚           Job Dispatcher (Lambda)                        β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚       β”‚                β”‚                β”‚                       β”‚
β”‚       β–Ό                β–Ό                β–Ό                       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”‚
β”‚  β”‚ EMR     β”‚      β”‚ EMR     β”‚      β”‚ EMR     β”‚                β”‚
β”‚  β”‚ Cluster β”‚      β”‚ Cluster β”‚      β”‚ Cluster β”‚                β”‚
β”‚  β”‚ (VPC A) β”‚      β”‚ (VPC B) β”‚      β”‚ (VPC C) β”‚                β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                β”‚
β”‚       β”‚                β”‚                β”‚                       β”‚
β”‚       β–Ό                β–Ό                β–Ό                       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚    S3 (Partitioned by tenant_id/year/month/day/)        β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Tenant Isolation:

# IAM policy for tenant isolation
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": ["s3:GetObject", "s3:PutObject"],
            "Resource": "arn:aws:s3:::data-lake/${aws:PrincipalTag/tenant_id}/*"
        }
    ]
}

Q23: How do you implement data versioning and rollback in batch pipelines?

Answer:

Data Versioning Strategy:

# S3-based versioning with metadata
class DataVersionManager:
    def __init__(self, bucket, base_path):
        self.bucket = bucket
        self.base_path = base_path
        self.s3 = boto3.client('s3')
    
    def create_version(self, dataset_name, version_info):
        version_id = f"v{version_info['major']}.{version_info['minor']}"
        version_path = f"{self.base_path}/{dataset_name}/{version_id}/"
        
        # Write data to versioned path
        self.write_data(version_path, version_info['data'])
        
        # Update latest pointer
        self.update_pointer(dataset_name, version_id)
        
        return version_id
    
    def rollback(self, dataset_name, target_version):
        # Restore data from target version
        source_path = f"{self.base_path}/{dataset_name}/{target_version}/"
        target_path = f"{self.base_path}/{dataset_name}/latest/"
        
        self.copy_data(source_path, target_path)
        self.update_pointer(dataset_name, target_version)

Glue Versioning:

# Use Glue table versions
glue = boto3.client('glue')

# Get table version
response = glue.get_table_version(
    DatabaseName='mydb',
    TableName='mytable',
    VersionId='4'
)

# Restore table version
glue.batch_update_table_version(
    DatabaseName='mydb',
    TableName='mytable',
    VersionDeltas=[{
        'VersionId': '4',
        'ViewToUpdate': response['TableVersion']['Table']
    }]
)

Q24: Explain the testing strategies for batch data pipelines.

Answer:

Testing Pyramid:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  Testing Pyramid                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                         β”‚
β”‚                    β•±β•²                                   β”‚
β”‚                   β•±  β•²    E2E Tests                     β”‚
β”‚                  β•± E2Eβ•²   (Step Functions)              β”‚
β”‚                 ╱──────╲                                β”‚
β”‚                β•±        β•²   Integration Tests           β”‚
β”‚               β•±Integrationβ•²  (Glue + EMR)              β”‚
β”‚              ╱──────────────╲                          β”‚
β”‚             β•±                β•²  Unit Tests              β”‚
β”‚            β•±    Unit Tests    β•² (PySpark functions)    β”‚
β”‚           ╱────────────────────╲                      β”‚
β”‚                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Unit Testing with PySpark:

import unittest
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

class TestTransformations(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.spark = SparkSession.builder \
            .master("local[*]") \
            .appName("Unit Tests") \
            .getOrCreate()
    
    def test_data_quality_checks(self):
        # Create test data
        schema = StructType([
            StructField("id", IntegerType(), False),
            StructField("name", StringType(), True)
        ])
        
        test_data = [(1, "valid"), (2, None), (3, "")]
        df = self.spark.createDataFrame(test_data, schema)
        
        # Apply transformation
        result = df.filter(df.name.isNotNull() & (df.name != ""))
        
        # Assert
        self.assertEqual(result.count(), 1)
    
    @classmethod
    def tearDownClass(cls):
        cls.spark.stop()

if __name__ == '__main__':
    unittest.main()

Integration Testing:

def test_etl_pipeline():
    # Setup
    input_data = create_test_input()
    upload_to_s3(input_data, 's3://test-bucket/input/')
    
    # Run Glue job
    run_glue_job('test-etl-job')
    
    # Verify output
    output = read_from_s3('s3://test-bucket/output/')
    
    assert output.count() > 0
    assert 'transformed_column' in output.columns

Q25: How do you implement observability in batch processing systems?

Answer:

Observability Stack:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Observability Architecture                         β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  Metrics Layer          Logging Layer        Tracing Layer     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ CloudWatch  β”‚       β”‚ CloudWatch  β”‚     β”‚ X-Ray       β”‚   β”‚
β”‚  β”‚ Metrics     β”‚       β”‚ Logs        β”‚     β”‚ Tracing     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚         β”‚                     β”‚                    β”‚          β”‚
β”‚         β–Ό                     β–Ό                    β–Ό          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚                   Centralized Dashboard                  β”‚  β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚  β”‚
β”‚  β”‚  β”‚ Job Health  β”‚  β”‚ Data Qualityβ”‚  β”‚ Cost        β”‚     β”‚  β”‚
β”‚  β”‚  β”‚ Status      β”‚  β”‚ Metrics     β”‚  β”‚ Tracking    β”‚     β”‚  β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚         β”‚                     β”‚                    β”‚          β”‚
β”‚         β–Ό                     β–Ό                    β–Ό          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚  Alerts     β”‚       β”‚  Anomaly    β”‚     β”‚  Root Cause β”‚   β”‚
β”‚  β”‚  (SNS)      β”‚       β”‚  Detection  β”‚     β”‚  Analysis   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Custom Metrics:

import boto3
from datetime import datetime

class BatchMetrics:
    def __init__(self, namespace):
        self.cloudwatch = boto3.client('cloudwatch')
        self.namespace = namespace
    
    def record_job_metrics(self, job_name, duration, records_processed, success):
        self.cloudwatch.put_metric_data(
            Namespace=self.namespace,
            MetricData=[
                {
                    'MetricName': 'JobDuration',
                    'Dimensions': [
                        {'Name': 'JobName', 'Value': job_name}
                    ],
                    'Value': duration,
                    'Unit': 'Seconds'
                },
                {
                    'MetricName': 'RecordsProcessed',
                    'Dimensions': [
                        {'Name': 'JobName', 'Value': job_name}
                    ],
                    'Value': records_processed,
                    'Unit': 'Count'
                },
                {
                    'MetricName': 'JobSuccess',
                    'Dimensions': [
                        {'Name': 'JobName', 'Value': job_name}
                    ],
                    'Value': 1 if success else 0,
                    'Unit': 'Count'
                }
            ]
        )

Structured Logging:

import json
from datetime import datetime

class StructuredLogger:
    def log_job_event(self, event_type, job_name, details):
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'event_type': event_type,
            'job_name': job_name,
            'details': details,
            'environment': 'production'
        }
        
        # CloudWatch Logs Insights compatible format
        print(json.dumps(log_entry))

ℹ️

Key Interview Point: Always discuss the three pillars of observability: metrics, logs, and traces. Each provides different insights into batch pipeline behavior.

Summary

Mastering AWS batch analytics requires understanding:

  • Service Selection: EMR vs Glue vs Redshift based on use case
  • Performance Optimization: Partitioning, caching, and resource tuning
  • Fault Tolerance: Checkpointing, retries, and idempotency
  • Cost Management: Spot instances, reserved capacity, and right-sizing
  • Data Quality: Validation frameworks and SLA monitoring
  • Observability: Metrics, logging, and tracing across the pipeline

These concepts form the foundation for designing scalable, reliable, and cost-effective batch processing systems on AWS.

Advertisement