Data Lakehouse with Apache Iceberg
ACID Transactions + Time Travel + Schema Evolution on S3
ℹ️
Project Difficulty: Advanced | Duration: 2-3 weeks | Cloud: AWS Build a production data lakehouse combining data lake flexibility with warehouse reliability using Apache Iceberg, enabling time travel queries and schema evolution.
Project Overview
Problem Statement
Traditional data lakes suffer from data quality issues, lack of ACID transactions, and poor query performance. Organizations need a solution that combines the cost-effectiveness of data lakes with the reliability of data warehouses.
Objectives
- Implement ACID transactions on S3 data lake
- Enable time travel and data versioning
- Support schema evolution without data rewriting
- Achieve sub-second query performance on petabyte-scale data
- Maintain data governance and audit trails
Tech Stack
| Component | Technology | Purpose |
|---|---|---|
| Table Format | Apache Iceberg | ACID, time travel, schema evolution |
| Storage | AWS S3 | Scalable, durable object storage |
| Compute | AWS Glue + EMR | Serverless and cluster processing |
| Query Engine | Athena + Spark SQL | Interactive and batch queries |
| Catalog | AWS Glue Data Catalog | Metadata management |
| Orchestration | AWS Step Functions | Pipeline orchestration |
Architecture Diagram
Data Source Setup and Schema
Iceberg Table Schemas
# schemas/iceberg_tables.py
from pyspark.sql import SparkSession
from pyspark.sql.types import (
StructType, StructField, StringType, IntegerType,
FloatType, TimestampType, BooleanType, ArrayType,
MapType, DecimalType
)
class IcebergSchemaManager:
def __init__(self, spark: SparkSession, catalog_name: str = "awsdatacatalog"):
self.spark = spark
self.catalog = catalog_name
self.database = "ecommerce_lakehouse"
def create_database(self):
"""Create the database if it doesn't exist."""
self.spark.sql(f"""
CREATE DATABASE IF NOT EXISTS {self.catalog}.{self.database}
COMMENT 'E-commerce data lakehouse database'
LOCATION 's3://ecommerce-lakehouse/data/{self.database}/'
""")
def create_raw_customers(self):
"""Create raw customers table (Bronze layer)."""
schema = StructType([
StructField("customer_id", StringType(), False),
StructField("email", StringType(), False),
StructField("first_name", StringType(), True),
StructField("last_name", StringType(), True),
StructField("phone", StringType(), True),
StructField("address", MapType(StringType(), StringType()), True),
StructField("created_at", StringType(), False),
StructField("updated_at", StringType(), False),
StructField("source_system", StringType(), False),
StructField("_ingestion_timestamp", TimestampType(), False)
])
self.spark.sql(f"""
CREATE TABLE IF NOT EXISTS {self.catalog}.{self.database}.raw_customers (
customer_id STRING,
email STRING,
first_name STRING,
last_name STRING,
phone STRING,
address MAP<STRING, STRING>,
created_at STRING,
updated_at STRING,
source_system STRING,
_ingestion_timestamp TIMESTAMP
)
USING iceberg
PARTITIONED BY (days(_ingestion_timestamp))
LOCATION 's3://ecommerce-lakehouse/raw/customers/'
TBLPROPERTIES (
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'snappy',
'write.target-file-size-bytes' = '134217728',
'history.expire.max-snapshot-age-ms' = '604800000',
'metadata.delete-after-commit.duration.ms' = '86400000'
)
""")
def create_raw_products(self):
"""Create raw products table (Bronze layer)."""
self.spark.sql(f"""
CREATE TABLE IF NOT EXISTS {self.catalog}.{self.database}.raw_products (
product_id STRING,
sku STRING,
name STRING,
description STRING,
category STRING,
subcategory STRING,
price DECIMAL(10, 2),
cost DECIMAL(10, 2),
stock_quantity INT,
attributes MAP<STRING, STRING>,
created_at TIMESTAMP,
updated_at TIMESTAMP,
_ingestion_timestamp TIMESTAMP
)
USING iceberg
PARTITIONED BY (category, days(_ingestion_timestamp))
LOCATION 's3://ecommerce-lakehouse/raw/products/'
TBLPROPERTIES (
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'zstd'
)
""")
def create_raw_orders(self):
"""Create raw orders table (Bronze layer)."""
self.spark.sql(f"""
CREATE TABLE IF NOT EXISTS {self.catalog}.{self.database}.raw_orders (
order_id STRING,
customer_id STRING,
order_status STRING,
order_date TIMESTAMP,
required_date TIMESTAMP,
shipped_date TIMESTAMP,
items ARRAY<STRUCT<
product_id: STRING,
quantity: INT,
unit_price: DECIMAL(10, 2),
discount: DECIMAL(5, 2)
>>,
shipping_address MAP<STRING, STRING>,
total_amount DECIMAL(12, 2),
_ingestion_timestamp TIMESTAMP
)
USING iceberg
PARTITIONED BY (days(order_date), order_status)
LOCATION 's3://ecommerce-lakehouse/raw/orders/'
TBLPROPERTIES (
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'snappy',
'write.distribution-mode' = 'hash'
)
""")
def create_curated_customers(self):
"""Create curated customers table (Silver layer)."""
self.spark.sql(f"""
CREATE TABLE IF NOT EXISTS {self.catalog}.{self.database}.curated_customers (
customer_id STRING,
email STRING,
full_name STRING,
phone STRING,
city STRING,
state STRING,
country STRING,
customer_segment STRING,
lifetime_value DECIMAL(12, 2),
first_order_date TIMESTAMP,
last_order_date TIMESTAMP,
total_orders INT,
is_active BOOLEAN,
_valid_from TIMESTAMP,
_valid_to TIMESTAMP,
_is_current BOOLEAN
)
USING iceberg
PARTITIONED BY (customer_segment, country)
LOCATION 's3://ecommerce-lakehouse/curated/customers/'
TBLPROPERTIES (
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'zstd',
'write.upsert.enabled' = 'true'
)
""")
def create_curated_orders(self):
"""Create curated orders table (Silver layer)."""
self.spark.sql(f"""
CREATE TABLE IF NOT EXISTS {self.catalog}.{self.database}.curated_orders (
order_id STRING,
customer_id STRING,
customer_name STRING,
customer_email STRING,
order_status STRING,
order_date TIMESTAMP,
shipped_date TIMESTAMP,
delivery_date TIMESTAMP,
item_count INT,
total_amount DECIMAL(12, 2),
discount_amount DECIMAL(12, 2),
tax_amount DECIMAL(12, 2),
shipping_cost DECIMAL(10, 2),
net_amount DECIMAL(12, 2),
shipping_city STRING,
shipping_state STRING,
shipping_country STRING,
payment_method STRING,
_processing_timestamp TIMESTAMP
)
USING iceberg
PARTITIONED BY (days(order_date), shipping_country)
LOCATION 's3://ecommerce-lakehouse/curated/orders/'
""")
def create_aggregated_daily_metrics(self):
"""Create daily metrics aggregation (Gold layer)."""
self.spark.sql(f"""
CREATE TABLE IF NOT EXISTS {self.catalog}.{self.database}.agg_daily_metrics (
metric_date DATE,
metric_type STRING,
dimension_key STRING,
dimension_value STRING,
metric_value DECIMAL(15, 2),
metric_count BIGINT,
_computed_at TIMESTAMP
)
USING iceberg
PARTITIONED BY (metric_type, metric_date)
LOCATION 's3://ecommerce-lakehouse/aggregated/daily_metrics/'
TBLPROPERTIES (
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'zstd',
'write.distribution-mode' = 'range'
)
""")
def create_all_tables(self):
"""Create all tables in the lakehouse."""
self.create_database()
self.create_raw_customers()
self.create_raw_products()
self.create_raw_orders()
self.create_curated_customers()
self.create_curated_orders()
self.create_aggregated_daily_metrics()
print("All Iceberg tables created successfully!")
Sample Data Generation
# data/generate_sample_data.py
import random
import uuid
from datetime import datetime, timedelta
from decimal import Decimal
import json
class SampleDataGenerator:
def __init__(self):
self.first_names = [
"James", "Mary", "Robert", "Patricia", "John", "Jennifer",
"Michael", "Linda", "David", "Elizabeth", "William", "Barbara",
"Richard", "Susan", "Joseph", "Jessica", "Thomas", "Sarah"
]
self.last_names = [
"Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia",
"Miller", "Davis", "Rodriguez", "Martinez", "Hernandez", "Lopez"
]
self.categories = [
"Electronics", "Clothing", "Home & Garden", "Sports",
"Books", "Toys", "Health", "Automotive"
]
self.subcategories = {
"Electronics": ["Phones", "Laptops", "Tablets", "Audio"],
"Clothing": ["Men", "Women", "Kids", "Accessories"],
"Home & Garden": ["Furniture", "Decor", "Kitchen", "Outdoor"],
"Sports": ["Fitness", "Outdoor", "Team Sports", "Water Sports"]
}
self.cities = [
("New York", "NY"), ("Los Angeles", "CA"), ("Chicago", "IL"),
("Houston", "TX"), ("Phoenix", "AZ"), ("Philadelphia", "PA")
]
def generate_customers(self, count: int = 1000) -> list:
"""Generate sample customer records."""
customers = []
for i in range(count):
first_name = random.choice(self.first_names)
last_name = random.choice(self.last_names)
city, state = random.choice(self.cities)
customer = {
"customer_id": f"CUST-{uuid.uuid4().hex[:8].upper()}",
"email": f"{first_name.lower()}.{last_name.lower()}{i}@email.com",
"first_name": first_name,
"last_name": last_name,
"phone": f"+1-{random.randint(200, 999)}-{random.randint(100, 999)}-{random.randint(1000, 9999)}",
"address": {
"street": f"{random.randint(100, 9999)} Main Street",
"city": city,
"state": state,
"zip": f"{random.randint(10000, 99999)}",
"country": "USA"
},
"created_at": (datetime.now() - timedelta(days=random.randint(1, 365))).isoformat(),
"updated_at": datetime.now().isoformat(),
"source_system": random.choice(["web", "mobile", "store"]),
"_ingestion_timestamp": datetime.now()
}
customers.append(customer)
return customers
def generate_products(self, count: int = 500) -> list:
"""Generate sample product records."""
products = []
for i in range(count):
category = random.choice(self.categories)
subcategory = random.choice(self.subcategories.get(category, ["General"]))
base_price = random.uniform(9.99, 999.99)
cost = base_price * random.uniform(0.3, 0.7)
product = {
"product_id": f"PROD-{uuid.uuid4().hex[:8].upper()}",
"sku": f"SKU-{category[:3].upper()}-{i:04d}",
"name": f"{category} {subcategory} Product {i}",
"description": f"High quality {category.lower()} product",
"category": category,
"subcategory": subcategory,
"price": Decimal(str(round(base_price, 2))),
"cost": Decimal(str(round(cost, 2))),
"stock_quantity": random.randint(0, 1000),
"attributes": {
"weight": f"{random.uniform(0.1, 50.0):.2f} kg",
"color": random.choice(["Black", "White", "Red", "Blue"]),
"warranty": f"{random.choice([12, 24, 36])} months"
},
"created_at": datetime.now() - timedelta(days=random.randint(1, 365)),
"updated_at": datetime.now(),
"_ingestion_timestamp": datetime.now()
}
products.append(product)
return products
def generate_orders(self, customers: list, products: list, count: int = 5000) -> list:
"""Generate sample order records."""
orders = []
statuses = ["pending", "processing", "shipped", "delivered", "cancelled"]
for i in range(count):
customer = random.choice(customers)
num_items = random.randint(1, 5)
items = []
total = Decimal('0.00')
for _ in range(num_items):
product = random.choice(products)
quantity = random.randint(1, 3)
unit_price = product["price"]
discount = Decimal(str(round(random.uniform(0, 0.2), 2)))
item_total = unit_price * quantity * (1 - discount)
total += item_total
items.append({
"product_id": product["product_id"],
"quantity": quantity,
"unit_price": unit_price,
"discount": discount
})
order_date = datetime.now() - timedelta(
days=random.randint(0, 90),
hours=random.randint(0, 23),
minutes=random.randint(0, 59)
)
status = random.choice(statuses)
shipped_date = None
if status in ["shipped", "delivered"]:
shipped_date = order_date + timedelta(days=random.randint(1, 5))
order = {
"order_id": f"ORD-{uuid.uuid4().hex[:8].upper()}",
"customer_id": customer["customer_id"],
"order_status": status,
"order_date": order_date,
"required_date": order_date + timedelta(days=random.randint(3, 10)),
"shipped_date": shipped_date,
"items": items,
"shipping_address": customer["address"],
"total_amount": total,
"_ingestion_timestamp": datetime.now()
}
orders.append(order)
return orders
def generate_all_data(self):
"""Generate complete sample dataset."""
print("Generating customers...")
customers = self.generate_customers(1000)
print("Generating products...")
products = self.generate_products(500)
print("Generating orders...")
orders = self.generate_orders(customers, products, 5000)
return {
"customers": customers,
"products": products,
"orders": orders
}
Step-by-Step Implementation Guide
Step 1: EMR Cluster Setup with Iceberg
# cluster/emr_setup.py
import boto3
import json
import time
class EMRIcebergSetup:
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.emr_client = boto3.client('emr', region_name=region)
self.s3_client = boto3.client('s3', region_name=region)
def create_iceberg_cluster(self, config: dict) -> str:
"""Create EMR cluster with Iceberg support."""
cluster_name = config.get('cluster_name', 'iceberg-cluster')
response = self.emr_client.run_job_flow(
Name=cluster_name,
ReleaseLabel='emr-6.15.0',
Applications=[
{'Name': 'Spark'},
{'Name': 'JupyterEnterpriseGateway'},
{'Name': 'Livy'}
],
Configurations=[
{
'Classification': 'spark-defaults',
'Properties': {
'spark.sql.catalog.glue_catalog': 'org.apache.iceberg.spark.SparkCatalog',
'spark.sql.catalog.glue_catalog.catalog-impl': 'org.apache.iceberg.aws.glue.GlueCatalog',
'spark.sql.catalog.glue_catalog.io-impl': 'org.apache.iceberg.aws.s3.S3FileIO',
'spark.sql.catalog.glue_catalog.warehouse': config['warehouse_path'],
'spark.sql.extensions': 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions',
'spark.sql.catalog.glue_catalog.spark.sql.events.enabled': 'true',
'spark.hadoop.hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.HiveMetastoreClientFactory',
'spark.serializer': 'org.apache.spark.serializer.KryoSerializer',
'spark.sql.sources.partitionOverwriteMode': 'dynamic'
}
},
{
'Classification': 'spark-env',
'Properties': {},
'Configurations': [
{
'Classification': 'export',
'Properties': {
'SPARK_DRIVER_MEMORY': '4g',
'SPARK_EXECUTOR_MEMORY': '8g'
}
}
]
},
{
'Classification': 'hive-site',
'Properties': {
'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.HiveMetastoreClientFactory'
}
}
],
Instances={
'MasterInstanceType': 'm5.xlarge',
'SlaveInstanceType': 'r5.2xlarge',
'InstanceCount': config.get('instance_count', 3),
'Ec2KeyName': config.get('key_pair_name'),
'Ec2SubnetId': config.get('subnet_id'),
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': True
},
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole',
VisibleToAllUsers=True,
LogUri=f"s3://{config['log_bucket']}/emr-logs/",
Tags=[
{'Key': 'Environment', 'Value': config.get('environment', 'production')},
{'Key': 'Project', 'Value': 'iceberg-lakehouse'}
]
)
cluster_id = response['JobFlowId']
print(f"Created EMR cluster: {cluster_id}")
# Wait for cluster to be ready
self._wait_for_cluster(cluster_id)
return cluster_id
def _wait_for_cluster(self, cluster_id: str, timeout: int = 1800):
"""Wait for EMR cluster to reach WAITING state."""
start_time = time.time()
while True:
response = self.emr_client.describe_cluster(ClusterId=cluster_id)
state = response['Cluster']['Status']['State']
if state == 'WAITING':
print(f"Cluster {cluster_id} is ready")
return
elif state in ['TERMINATED', 'TERMINATED_WITH_ERRORS']:
raise Exception(f"Cluster failed to start: {state}")
elapsed = time.time() - start_time
if elapsed > timeout:
raise TimeoutError(f"Cluster not ready within {timeout} seconds")
print(f"Cluster state: {state} ({elapsed:.0f}s elapsed)")
time.sleep(30)
def add_iceberg_step(self, cluster_id: str, script_path: str, args: list = None):
"""Add a Spark step to run Iceberg operations."""
step = {
'Name': 'Iceberg Processing Step',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode', 'cluster',
'--conf', 'spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog',
'--conf', 'spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog',
'--conf', 'spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO',
script_path
] + (args or [])
}
}
response = self.emr_client.add_job_flow_steps(
JobFlowId=cluster_id,
Steps=[step]
)
return response['StepIds'][0]
Step 2: Iceberg Operations Library
# iceberg/operations.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col, lit, current_timestamp, count, sum, avg,
max, min, when, coalesce, date_format, to_date
)
from datetime import datetime, timedelta
from typing import Optional, List, Dict
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class IcebergOperations:
def __init__(self, spark: SparkSession, catalog: str = "glue_catalog",
database: str = "ecommerce_lakehouse"):
self.spark = spark
self.catalog = catalog
self.database = database
self.table_prefix = f"{catalog}.{database}"
def list_tables(self) -> List[str]:
"""List all tables in the database."""
tables_df = self.spark.sql(f"SHOW TABLES IN {self.table_prefix}")
return [row['tableName'] for row in tables_df.collect()]
def get_table_info(self, table_name: str) -> Dict:
"""Get detailed information about a table."""
full_table = f"{self.table_prefix}.{table_name}"
# Get table properties
props_df = self.spark.sql(f"DESCRIBE TABLE EXTENDED {full_table}")
properties = {row['col_name']: row['data_type'] for row in props_df.collect()
if row['col_name'] and not row['col_name'].startswith('#')}
# Get snapshot history
history_df = self.spark.sql(f"""
SELECT * FROM {full_table}.snapshots
ORDER BY committed_at DESC
LIMIT 10
""")
return {
'properties': properties,
'recent_snapshots': history_df.collect()
}
def merge_upsert(self, source_df, target_table: str,
join_keys: List[str], stage: str = "curated"):
"""Perform merge/upsert operation on Iceberg table."""
full_table = f"{self.table_prefix}.{stage}_{target_table}"
# Create temp view for source data
source_df.createOrReplaceTempView("source_updates")
merge_sql = f"""
MERGE INTO {full_table} AS target
USING source_updates AS source
ON {' AND '.join([f'target.{k} = source.{k}' for k in join_keys])}
WHEN MATCHED THEN UPDATE SET
target.* = source.*
WHEN NOT MATCHED THEN INSERT *
"""
logger.info(f"Executing merge into {full_table}")
self.spark.sql(merge_sql)
logger.info(f"Merge completed for {full_table}")
def expire_old_snapshots(self, table_name: str, days_to_keep: int = 30):
"""Expire old snapshots to manage storage."""
full_table = f"{self.table_prefix}.{table_name}"
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
# Get snapshots to expire
snapshots_df = self.spark.sql(f"""
SELECT snapshot_id, committed_at
FROM {full_table}.snapshots
WHERE committed_at < '{cutoff_date.isoformat()}'
ORDER BY committed_at DESC
""")
if snapshots_df.count() > 0:
snapshot_ids = [row['snapshot_id'] for row in snapshots_df.collect()]
self.spark.sql(f"""
CALL {self.catalog}.system.expire_snapshots(
table => '{self.database}.{table_name}',
older_than => TIMESTAMP '{cutoff_date.isoformat()}',
retain_last => 10
)
""")
logger.info(f"Expired {len(snapshot_ids)} snapshots from {table_name}")
def rewrite_data_files(self, table_name: str,
target_file_size_mb: int = 128):
"""Rewrite small files to optimize read performance."""
full_table = f"{self.table_prefix}.{table_name}"
logger.info(f"Starting data file rewrite for {full_table}")
self.spark.sql(f"""
CALL {self.catalog}.system.rewrite_data_files(
table => '{self.database}.{table_name}',
options => map(
'target-file-size-bytes', '{target_file_size_mb * 1024 * 1024}',
'min-input-files', '5'
)
)
""")
logger.info(f"Completed data file rewrite for {full_table}")
def rewrite_manifests(self, table_name: str):
"""Rewrite manifests to optimize metadata."""
full_table = f"{self.table_prefix}.{table_name}"
self.spark.sql(f"""
CALL {self.catalog}.system.rewrite_manifests(
table => '{self.database}.{table_name}'
)
""")
logger.info(f"Manifests rewritten for {full_table}")
def time_travel_query(self, table_name: str,
timestamp: Optional[str] = None,
snapshot_id: Optional[int] = None):
"""Query table at a specific point in time."""
full_table = f"{self.table_prefix}.{table_name}"
if timestamp:
query = f"SELECT * FROM {full_table} TIMESTAMP AS OF '{timestamp}'"
elif snapshot_id:
query = f"SELECT * FROM {full_table} VERSION AS OF {snapshot_id}"
else:
raise ValueError("Must specify timestamp or snapshot_id")
return self.spark.sql(query)
def compare_snapshots(self, table_name: str,
snapshot_id_1: int, snapshot_id_2: int):
"""Compare data between two snapshots."""
full_table = f"{self.table_prefix}.{table_name}"
# Get data at each snapshot
df1 = self.spark.sql(f"""
SELECT * FROM {full_table} VERSION AS OF {snapshot_id_1}
""")
df2 = self.spark.sql(f"""
SELECT * FROM {full_table} VERSION AS OF {snapshot_id_2}
""")
# Create temp views for comparison
df1.createOrReplaceTempView("snapshot_1")
df2.createOrReplaceTempView("snapshot_2")
# Find differences
added = self.spark.sql("""
SELECT * FROM snapshot_2
WHERE NOT EXISTS (
SELECT 1 FROM snapshot_1
WHERE snapshot_1.id = snapshot_2.id
)
""")
removed = self.spark.sql("""
SELECT * FROM snapshot_1
WHERE NOT EXISTS (
SELECT 1 FROM snapshot_2
WHERE snapshot_1.id = snapshot_2.id
)
""")
return {
'added': added,
'removed': removed,
'snapshot_1_count': df1.count(),
'snapshot_2_count': df2.count()
}
Step 3: Glue ETL Jobs
# etl/glue_etl_job.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.types import *
# Initialize Glue context
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'database', 'table', 's3_output_path'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Configure Iceberg catalog
spark.conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
spark.conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
def read_from_glue_catalog(database: str, table_name: str):
"""Read data from Glue Data Catalog."""
dynamic_frame = glueContext.create_dynamic_frame.from_catalog(
database=database,
table_name=table_name,
transformation_ctx="source_data"
)
return dynamic_frame
def transform_customers(dynamic_frame):
"""Transform and clean customer data."""
df = dynamic_frame.toDF()
# Clean and standardize
transformed_df = df \
.withColumn("email", F.lower(F.trim(col("email")))) \
.withColumn("full_name", F.concat_ws(" ", col("first_name"), col("last_name"))) \
.withColumn("phone", F.regexp_replace(col("phone"), r"[^0-9+]", "")) \
.withColumn("city", col("address")["city"]) \
.withColumn("state", col("address")["state"]) \
.withColumn("country", col("address")["country"]) \
.withColumn("_processing_timestamp", F.current_timestamp()) \
.drop("address")
return transformed_df
def transform_orders(dynamic_frame):
"""Transform and enrich order data."""
df = dynamic_frame.toDF()
# Explode items array and calculate metrics
transformed_df = df \
.withColumn("item_count", F.size(col("items"))) \
.withColumn("subtotal", F.expr("""
aggregate(items, 0.0, (acc, item) ->
acc + (item.quantity * item.unit_price * (1 - item.discount)))
""")) \
.withColumn("discount_amount", F.expr("""
aggregate(items, 0.0, (acc, item) ->
acc + (item.quantity * item.unit_price * item.discount))
""")) \
.withColumn("tax_amount", col("subtotal") * 0.08) \
.withColumn("shipping_cost",
F.when(col("subtotal") > 100, 0).otherwise(9.99)
) \
.withColumn("net_amount",
col("subtotal") + col("tax_amount") + col("shipping_cost")
) \
.withColumn("shipping_city", col("shipping_address")["city"]) \
.withColumn("shipping_state", col("shipping_address")["state"]) \
.withColumn("shipping_country", col("shipping_address")["country"]) \
.withColumn("payment_method", F.lit("credit_card")) \
.withColumn("_processing_timestamp", F.current_timestamp())
return transformed_df
def calculate_daily_metrics(spark):
"""Calculate daily business metrics from curated tables."""
# Read curated tables
customers_df = spark.read.format("iceberg").load(
"s3://ecommerce-lakehouse/curated/customers/"
)
orders_df = spark.read.format("iceberg").load(
"s3://ecommerce-lakehouse/curated/orders/"
)
# Calculate daily revenue metrics
daily_revenue = orders_df \
.withColumn("metric_date", F.to_date(col("order_date"))) \
.groupBy("metric_date") \
.agg(
F.sum("net_amount").alias("metric_value"),
F.count("order_id").alias("metric_count")
) \
.withColumn("metric_type", F.lit("daily_revenue")) \
.withColumn("dimension_key", F.lit("all")) \
.withColumn("dimension_value", F.lit("all")) \
.withColumn("_computed_at", F.current_timestamp())
# Calculate customer segment metrics
segment_metrics = orders_df \
.join(customers_df, "customer_id") \
.withColumn("metric_date", F.to_date(col("order_date"))) \
.groupBy("metric_date", col("customer_segment")) \
.agg(
F.sum("net_amount").alias("metric_value"),
F.count("order_id").alias("metric_count")
) \
.withColumn("metric_type", F.lit("segment_revenue")) \
.withColumn("dimension_key", F.lit("customer_segment")) \
.withColumn("dimension_value", col("customer_segment")) \
.withColumn("_computed_at", F.current_timestamp())
# Calculate category metrics
category_metrics = orders_df \
.withColumn("metric_date", F.to_date(col("order_date"))) \
.groupBy("metric_date", "category") \
.agg(
F.sum("net_amount").alias("metric_value"),
F.count("order_id").alias("metric_count")
) \
.withColumn("metric_type", F.lit("category_revenue")) \
.withColumn("dimension_key", F.lit("category")) \
.withColumn("dimension_value", col("category")) \
.withColumn("_computed_at", F.current_timestamp())
# Union all metrics
all_metrics = daily_revenue \
.unionByName(segment_metrics, allowMissingColumns=True) \
.unionByName(category_metrics, allowMissingColumns=True)
return all_metrics
# Main ETL execution
try:
logger.info(f"Starting ETL job: {args['JOB_NAME']}")
if args['table'] == 'customers':
# Process customers
source_df = read_from_glue_catalog(args['database'], 'raw_customers')
transformed_df = transform_customers(source_df)
# Write to Iceberg
transformed_df.write \
.format("iceberg") \
.mode("append") \
.save(f"s3://ecommerce-lakehouse/curated/customers/")
elif args['table'] == 'orders':
# Process orders
source_df = read_from_glue_catalog(args['database'], 'raw_orders')
transformed_df = transform_orders(source_df)
# Write to Iceberg
transformed_df.write \
.format("iceberg") \
.mode("append") \
.save(f"s3://ecommerce-lakehouse/curated/orders/")
elif args['table'] == 'metrics':
# Calculate and write metrics
metrics_df = calculate_daily_metrics(spark)
metrics_df.write \
.format("iceberg") \
.mode("append") \
.save(f"s3://ecommerce-lakehouse/aggregated/daily_metrics/")
logger.info(f"ETL job completed: {args['JOB_NAME']}")
except Exception as e:
logger.error(f"ETL job failed: {str(e)}")
raise
# Commit job
job.commit()
Infrastructure Setup (Terraform)
# infrastructure/s3_lakehouse.tf
resource "aws_s3_bucket" "lakehouse" {
bucket = "${var.project_name}-lakehouse-${var.environment}"
tags = {
Name = "${var.project_name}-lakehouse"
Environment = var.environment
ManagedBy = "terraform"
}
}
resource "aws_s3_bucket_versioning" "lakehouse" {
bucket = aws_s3_bucket.lakehouse.id
versioning_configuration {
status = "Enabled"
}
}
resource "aws_s3_bucket_server_side_encryption_configuration" "lakehouse" {
bucket = aws_s3_bucket.lakehouse.id
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "aws:kms"
kms_master_key_id = aws_kms_key.lakehouse.arn
}
bucket_key_enabled = true
}
}
resource "aws_s3_bucket_lifecycle_configuration" "lakehouse" {
bucket = aws_s3_bucket.lakehouse.id
# Raw zone lifecycle
rule {
id = "raw-data-lifecycle"
status = "Enabled"
filter {
prefix = "raw/"
}
transition {
days = 90
storage_class = "STANDARD_IA"
}
transition {
days = 365
storage_class = "GLACIER"
}
noncurrent_version_expiration {
noncurrent_days = 90
}
}
# Curated zone lifecycle
rule {
id = "curated-data-lifecycle"
status = "Enabled"
filter {
prefix = "curated/"
}
transition {
days = 180
storage_class = "STANDARD_IA"
}
noncurrent_version_expiration {
noncurrent_days = 30
}
}
# Temporary files cleanup
rule {
id = "cleanup-temp"
status = "Enabled"
filter {
prefix = "_temporary/"
}
expiration {
days = 1
}
}
}
resource "aws_s3_bucket_policy" "lakehouse" {
bucket = aws_s3_bucket.lakehouse.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "EnforceSSLOnly"
Effect = "Deny"
Principal = "*"
Action = "s3:*"
Resource = [
aws_s3_bucket.lakehouse.arn,
"${aws_s3_bucket.lakehouse.arn}/*"
]
Condition = {
Bool = {
"aws:SecureTransport" = "false"
}
}
},
{
Sid = "DenyInsecureTransport"
Effect = "Deny"
Principal = "*"
Action = "s3:*"
Resource = [
aws_s3_bucket.lakehouse.arn,
"${aws_s3_bucket.lakehouse.arn}/*"
]
Condition = {
NumericLessThan = {
"s3:TlsVersion" = "1.2"
}
}
}
]
})
}
resource "aws_kms_key" "lakehouse" {
description = "KMS key for lakehouse encryption"
deletion_window_in_days = 7
enable_key_rotation = true
tags = {
Name = "${var.project_name}-lakehouse-key"
}
}
resource "aws_kms_alias" "lakehouse" {
name = "alias/${var.project_name}-lakehouse"
target_key_id = aws_kms_key.lakehouse.key_id
}
# Glue Catalog Database
resource "aws_glue_catalog_database" "lakehouse" {
name = "${var.project_name}_lakehouse_${var.environment}"
create_table_default_permissions {
permissions = ["ALL"]
principals {
data_lake_principal_identifier = "IAM_ALLOWED_PRINCIPALS"
}
}
tags = {
Environment = var.environment
}
}
# Glue Crawler for Iceberg tables
resource "aws_glue_crawler" "lakehouse_crawler" {
database_name = aws_glue_catalog_database.lakehouse.name
name = "${var.project_name}-iceberg-crawler"
role = aws_iam_role.glue_crawler.arn
s3_target {
path = "s3://${aws_s3_bucket.lakehouse.bucket}/raw/"
}
s3_target {
path = "s3://${aws_s3_bucket.lakehouse.bucket}/curated/"
}
s3_target {
path = "s3://${aws_s3_bucket.lakehouse.bucket}/aggregated/"
}
schema_change_policy {
delete_behavior = "LOG"
update_behavior = "UPDATE_IN_DATABASE"
}
configuration = jsonencode({
Version = 1.0
Grouping = {
TableGroupingPolicy = "CombineCompatibleSchemas"
}
})
}
# IAM Role for Glue Crawler
resource "aws_iam_role" "glue_crawler" {
name = "${var.project_name}-glue-crawler-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "glue.amazonaws.com"
}
}
]
})
}
resource "aws_iam_role_policy" "glue_crawler" {
name = "${var.project_name}-glue-crawler-policy"
role = aws_iam_role.glue_crawler.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket"
]
Resource = [
aws_s3_bucket.lakehouse.arn,
"${aws_s3_bucket.lakehouse.arn}/*"
]
},
{
Effect = "Allow"
Action = [
"glue:*"
]
Resource = "*"
},
{
Effect = "Allow"
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
Resource = "arn:aws:logs:*:*:*"
},
{
Effect = "Allow"
Action = [
"kms:Decrypt",
"kms:GenerateDataKey"
]
Resource = aws_kms_key.lakehouse.arn
}
]
})
}
# Athena Workgroup for Iceberg queries
resource "aws_athena_workgroup" "iceberg_queries" {
name = "${var.project_name}-iceberg-workgroup"
configuration {
enforce_workgroup_configuration = true
publish_cloudwatch_metrics_enabled = true
result_configuration {
output_location = "s3://${aws_s3_bucket.lakehouse.bucket}/athena-results/"
encryption_configuration {
encryption_option = "SSE_KMS"
kms_key_arn = aws_kms_key.lakehouse.arn
}
}
engine_version {
selected_engine_version = "Athena engine version 3"
}
}
tags = {
Environment = var.environment
}
}
# Outputs
output "lakehouse_bucket" {
description = "S3 bucket for lakehouse data"
value = aws_s3_bucket.lakehouse.bucket
}
output "glue_database" {
description = "Glue catalog database name"
value = aws_glue_catalog_database.lakehouse.name
}
output "athena_workgroup" {
description = "Athena workgroup name"
value = aws_athena_workgroup.iceberg_queries.name
}
Testing and Validation
# tests/test_iceberg_operations.py
import pytest
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from datetime import datetime, timedelta
class TestIcebergOperations:
@pytest.fixture(scope="session")
def spark(self):
"""Create a local Spark session with Iceberg support."""
return SparkSession.builder \
.appName("TestIcebergLakehouse") \
.master("local[*]") \
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local.type", "hadoop") \
.config("spark.sql.catalog.local.warehouse", "/tmp/iceberg-warehouse") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.getOrCreate()
@pytest.fixture
def sample_data(self):
"""Generate sample test data."""
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), False),
StructField("category", StringType(), False),
StructField("value", IntegerType(), True)
])
data = [
(1, "Product A", "Electronics", 100),
(2, "Product B", "Clothing", 50),
(3, "Product C", "Electronics", 200),
(4, "Product D", "Home", 75),
(5, "Product E", "Clothing", 30)
]
return schema, data
def test_create_table(self, spark, sample_data):
"""Test Iceberg table creation."""
schema, data = sample_data
df = spark.createDataFrame(data, schema)
# Create table
df.write.format("iceberg").mode("overwrite").save("/tmp/test_table")
# Verify table was created
read_df = spark.read.format("iceberg").load("/tmp/test_table")
assert read_df.count() == 5
assert set(read_df.columns) == {"id", "name", "category", "value"}
def test_upsert_operations(self, spark, sample_data):
"""Test merge/upsert operations."""
schema, data = sample_data
df = spark.createDataFrame(data, schema)
# Create initial table
df.write.format("iceberg").mode("overwrite").save("/tmp/test_upsert")
# Create update data
update_data = [
(1, "Product A Updated", "Electronics", 150), # Update
(6, "Product F", "Sports", 90) # Insert
]
update_df = spark.createDataFrame(update_data, schema)
# Perform upsert
update_df.write.format("iceberg").mode("append").save("/tmp/test_upsert")
# Verify results
result_df = spark.read.format("iceberg").load("/tmp/test_upsert")
assert result_df.count() == 6 # 5 original + 1 new
# Verify update was applied
product_a = result_df.filter("id = 1").collect()[0]
assert product_a["name"] == "Product A Updated"
assert product_a["value"] == 150
def test_time_travel(self, spark, sample_data):
"""Test time travel query capabilities."""
schema, data = sample_data
df = spark.createDataFrame(data, schema)
# Create initial version
df.write.format("iceberg").mode("overwrite").save("/tmp/test_time_travel")
# Get initial count
v1_count = spark.read.format("iceberg").load("/tmp/test_time_travel").count()
# Add more data
new_data = [(6, "Product F", "Sports", 90)]
new_df = spark.createDataFrame(new_data, schema)
new_df.write.format("iceberg").mode("append").save("/tmp/test_time_travel")
# Verify new count
v2_count = spark.read.format("iceberg").load("/tmp/test_time_travel").count()
assert v2_count == v1_count + 1
# Query historical version using snapshot ID
history_df = spark.sql("SELECT * FROM local.default.test_time_travel.snapshots")
snapshots = history_df.collect()
assert len(snapshots) >= 2
def test_schema_evolution(self, spark):
"""Test schema evolution capabilities."""
# Create initial table
schema1 = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), False)
])
data1 = [(1, "Product A"), (2, "Product B")]
df1 = spark.createDataFrame(data1, schema1)
df1.write.format("iceberg").mode("overwrite").save("/tmp/test_schema_evolution")
# Add new column
schema2 = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), False),
StructField("category", StringType(), True)
])
data2 = [(3, "Product C", "Electronics")]
df2 = spark.createDataFrame(data2, schema2)
df2.write.format("iceberg").mode("append").save("/tmp/test_schema_evolution")
# Read with schema evolution
result_df = spark.read.format("iceberg").load("/tmp/test_schema_evolution")
assert "category" in result_df.columns
assert result_df.count() == 3
# Verify original rows have NULL for new column
original = result_df.filter("id IN (1, 2)").collect()
for row in original:
assert row["category"] is None
def test_partition_evolution(self, spark):
"""Test partition evolution capabilities."""
schema = StructType([
StructField("id", IntegerType(), False),
StructField("date", StringType(), False),
StructField("value", IntegerType(), True)
])
data = [
(1, "2024-01-01", 100),
(2, "2024-01-02", 150)
]
df = spark.createDataFrame(data, schema)
# Create partitioned table
df.write.format("iceberg") \
.mode("overwrite") \
.partitionBy("date") \
.save("/tmp/test_partition_evolution")
# Verify partitioning
result_df = spark.read.format("iceberg").load("/tmp/test_partition_evolution")
assert result_df.count() == 2
# Add new partition
new_data = [(3, "2024-01-03", 200)]
new_df = spark.createDataFrame(new_data, schema)
new_df.write.format("iceberg").mode("append").save("/tmp/test_partition_evolution")
# Verify new partition added
final_df = spark.read.format("iceberg").load("/tmp/test_partition_evolution")
assert final_df.count() == 3
def test_snapshot_management(self, spark, sample_data):
"""Test snapshot expiration and management."""
schema, data = sample_data
df = spark.createDataFrame(data, schema)
# Create multiple snapshots
for i in range(5):
df.write.format("iceberg").mode("append").save("/tmp/test_snapshots")
# Count snapshots
history_df = spark.sql("SELECT * FROM local.default.test_snapshots.snapshots")
initial_snapshots = history_df.count()
assert initial_snapshots >= 5
# Expire old snapshots
spark.sql("""
CALL local.system.expire_snapshots(
table => 'default.test_snapshots',
older_than => TIMESTAMP '2099-01-01T00:00:00',
retain_last => 3
)
""")
# Verify snapshots were expired
history_df_after = spark.sql("SELECT * FROM local.default.test_snapshots.snapshots")
assert history_df_after.count() <= 3
def test_data_compaction(self, spark, sample_data):
"""Test data file compaction for performance."""
schema, data = sample_data
df = spark.createDataFrame(data, schema)
# Create table with many small files
for i in range(10):
small_df = spark.createDataFrame(data[i:i+1], schema)
small_df.write.format("iceberg").mode("append").save("/tmp/test_compaction")
# Count files before compaction
files_before = spark.sql("""
SELECT COUNT(*) as file_count
FROM local.default.test_compaction.files
""").collect()[0]["file_count"]
# Perform compaction
spark.sql("""
CALL local.system.rewrite_data_files(
table => 'default.test_compaction',
options => map('target-file-size-bytes', '134217728')
)
""")
# Count files after compaction
files_after = spark.sql("""
SELECT COUNT(*) as file_count
FROM local.default.test_compaction.files
""").collect()[0]["file_count"]
assert files_after < files_before
def test_query_performance(self, spark):
"""Test query performance optimization."""
import time
# Create a larger dataset
data = [(i, f"Product {i}", "Category", i * 10)
for i in range(10000)]
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), False),
StructField("category", StringType(), False),
StructField("value", IntegerType(), True)
])
df = spark.createDataFrame(data, schema)
# Write with and without optimization
df.write.format("iceberg") \
.mode("overwrite") \
.partitionBy("category") \
.save("/tmp/test_performance")
# Test read performance
start_time = time.time()
result = spark.read.format("iceberg").load("/tmp/test_performance")
result.cache()
count = result.count()
read_time = time.time() - start_time
assert count == 10000
assert read_time < 5.0 # Should read quickly
# Test aggregation performance
start_time = time.time()
agg_result = result.groupBy("category").agg({"value": "sum"})
agg_result.show()
agg_time = time.time() - start_time
assert agg_time < 2.0 # Aggregation should be fast
Cost Analysis
Monthly Cost Breakdown (Production)
| Component | Specification | Monthly Cost |
|---|---|---|
| S3 Storage | 10TB raw + 5TB curated | $230 |
| EMR Cluster | 3x r5.2xlarge (intermittent) | $1,200 |
| Glue ETL | 100 hours processing | $35 |
| Athena Queries | 1TB scanned monthly | $500 |
| Glue Catalog | Metadata storage | $1 |
| KMS Keys | Encryption | $3 |
| Data Transfer | Cross-region | $50 |
| Total | $2,019 |
Cost Optimization Strategies
💡
Tip: Iceberg's features enable significant cost savings:
- Partition Pruning: 70% reduction in data scanned
- File Compaction: 50% improvement in query performance
- Snapshot Expiration: 30% reduction in storage costs
- Columnar Format: 90% compression vs raw data
- Predicate Pushdown: 60% less data processing
Performance Comparison
| Operation | Traditional Lake | Iceberg Lakehouse | Improvement |
|---|---|---|---|
| Daily ETL | 2 hours | 45 minutes | 2.7x faster |
| Ad-hoc Query | 15 minutes | 2 minutes | 7.5x faster |
| Data Refresh | 4 hours | 30 minutes | 8x faster |
| Storage Cost | 150/TB | 70% cheaper |
Interview Talking Points
Architecture Decisions
ℹ️
Best Practice: Highlight these Iceberg advantages in interviews:
- ACID Transactions: Row-level locking, snapshot isolation
- Time Travel: Historical queries, data auditing, rollback
- Schema Evolution: Add columns without rewriting data
- Partition Evolution: Change partitioning without data migration
- Hidden Partitioning: Users don't need to know partition columns
Common Interview Questions
Q: "How does Iceberg differ from Delta Lake or Hudi?"
comparison = {
"Iceberg": {
"Partitioning": "Hidden partitioning, partition evolution",
"Catalog": "Pluggable (Hive, Glue, REST)",
"Time Travel": "Snapshot-based, precise",
"Schema Evolution": "Full support, no data rewrite"
},
"Delta Lake": {
"Partitioning": "Explicit partitioning",
"Catalog": "Delta Sharing protocol",
"Time Travel": "Version-based",
"Schema Evolution": "Limited support"
},
"Hudi": {
"Partitioning": "Explicit partitioning",
"Catalog": "Hive metastore only",
"Time Travel": "Incremental queries",
"Schema Evolution": "Partial support"
}
}
Q: "How do you handle small file problems?"
solutions = {
"Write Distribution": "Use distribution modes (hash, range)",
"File Compaction": "Scheduled rewrite_data_files calls",
"Target File Size": "Configure write.target-file-size-bytes",
"Merge Sorting": "Use sort-merge join strategies"
}
Q: "How do you ensure data quality?"
quality_strategies = {
"Schema Validation": "Use Iceberg schema enforcement",
"Data Contracts": "Implement with column constraints",
"Testing": "Great Expectations integration",
"Monitoring": "Track file counts, row counts, data drift"
}
Deployment Checklist
- Configure S3 bucket with lifecycle policies
- Set up Glue Catalog and crawlers
- Deploy EMR cluster with Iceberg configuration
- Create Iceberg tables with proper partitioning
- Implement ETL jobs with Glue or EMR
- Set up Athena workgroup for queries
- Configure monitoring and alerting
- Test time travel and snapshot management
- Implement data quality checks
- Document operational procedures
⚠️
Warning: Always test schema changes in non-production environments first. Iceberg supports schema evolution, but proper planning prevents data quality issues.
This project demonstrates modern data lakehouse architecture skills and is highly relevant for data engineering interviews at companies like Netflix, Uber, and LinkedIn.