Cloud Data Lakehouse
Difficulty: Senior Level | Companies: AWS, Google, Microsoft, Netflix, Uber
Lakehouse Architecture
The data lakehouse combines the low-cost storage of data lakes with the performance and ACID transactions of data warehouses.
โน๏ธ
Lakehouse eliminates the need for separate data lake and data warehouse systems. One architecture handles both analytics and machine learning workloads.
Lakehouse Architecture Diagram
Architecture Diagram
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Consumption Layer โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โ BI Tools โ โ ML Tools โ โ SQL Query โ โ
โ โ Tableau โ โ Jupyter โ โ Work โ โ
โ โโโโโโฌโโโโโโ โโโโโโฌโโโโโโ โโโโโโฌโโโโโโ โ
โ โ โ โ โ
โ โโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Query Engine Layer โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ Spark โ โ Trino โ โ Athena โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Table Format Layer โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ Delta โ โ Iceberg โ โ Hudi โ โ โ
โ โ โ Lake โ โ โ โ โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Storage Layer (S3/GCS/ADLS) โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โ โ Raw Zone โ โ Curated โ โ Serving โ โ โ
โ โ โ (Bronze)โ โ (Silver)โ โ (Gold) โ โ โ
โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Pattern 1: Medallion Architecture (Bronze/Silver/Gold)
Organize data in progressive quality tiers.
# Delta Lake medallion architecture
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
spark = SparkSession.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Bronze Layer - Raw ingestion (append-only)
def ingest_to_bronze(source_path: str, bronze_path: str):
"""Ingest raw data to bronze layer."""
raw_df = spark.read.format("json").load(source_path)
# Add ingestion metadata
bronze_df = raw_df \
.withColumn("_ingestion_timestamp", current_timestamp()) \
.withColumn("_source_file", input_file_name()) \
.withColumn("_batch_id", monotonically_increasing_id())
bronze_df.write \
.format("delta") \
.mode("append") \
.partitionBy("_ingestion_date") \
.save(bronze_path)
# Silver Layer - Cleaned and validated
def transform_to_silver(bronze_path: str, silver_path: str):
"""Clean and validate data for silver layer."""
bronze_df = spark.read.format("delta").load(bronze_path)
silver_df = bronze_df \
.filter(col("order_id").isNotNull()) \
.dropDuplicates(["order_id"]) \
.withColumn("total", col("total").cast("decimal(10,2)")) \
.withColumn("order_date", to_date(col("created_at"))) \
.filter(col("total") > 0)
# Merge (upsert) for dimension tables
silver_table = DeltaTable.forPath(spark, silver_path)
silver_table.alias("target").merge(
silver_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Gold Layer - Business-level aggregates
def build_gold(silver_path: str, gold_path: str):
"""Create business aggregates for gold layer."""
orders_df = spark.read.format("delta").load(f"{silver_path}/orders")
customers_df = spark.read.format("delta").load(f"{silver_path}/customers")
# Daily revenue by region
daily_revenue = orders_df \
.join(customers_df, "customer_id") \
.groupBy("order_date", "region") \
.agg(
count("order_id").alias("order_count"),
sum("total").alias("revenue"),
avg("total").alias("avg_order_value"),
)
daily_revenue.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("order_date") \
.save(f"{gold_path}/daily_revenue")
โน๏ธ
Bronze stores raw data as-is. Silver cleans and validates. Gold contains business aggregates ready for consumption.
Pattern 2: Delta Lake Time Travel
Query historical data versions for auditing and debugging.
# Time travel queries with Delta Lake
# Query specific version
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load(silver_path)
# Query by timestamp
df_specific = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-15") \
.load(silver_path)
# View table history
delta_table = DeltaTable.forPath(spark, silver_path)
history = delta_table.history()
history.show()
# Restore to previous version
delta_table.restoreToVersion(5)
# Configure retention for time travel
spark.conf.set("delta.log.retentionDuration", "interval 30 days")
spark.conf.set("delta.deletedFileRetentionDuration", "interval 7 days")
Pattern 3: Schema Evolution
Handle schema changes without breaking pipelines.
# Schema evolution strategies
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Strategy 1: Add columns
spark.read.format("delta").load(path) \
.withColumn("new_field", lit("default_value")) \
.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(path)
# Strategy 2: Merge schema
spark.read.format("delta").option("mergeSchema", "true").load(path) \
.write.format("delta").mode("append").save(path)
# Strategy 3: Schema enforcement with evolution
spark.read.format("delta").load(path) \
.write.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save(path)
# Define explicit schema for strict control
schema = StructType([
StructField("order_id", StringType(), False),
StructField("customer_id", StringType(), False),
StructField("total", IntegerType(), True),
StructField("status", StringType(), True),
])
# Validate incoming data against schema
validated_df = spark.createDataFrame(incoming_data, schema)
Pattern 4: Data Quality Framework
Implement quality checks across all layers.
# Great Expectations integration with Delta Lake
from great_expectations.dataset import SparkDFDataset
class LakehouseQualityFramework:
def __init__(self, spark_session):
self.spark = spark_session
def validate_bronze(self, df):
"""Validate raw data ingestion."""
ge_df = SparkDFDataset(df)
expectations = [
ge_df.expect_column_values_to_not_be_null("order_id"),
ge_df.expect_column_values_to_be_unique("order_id"),
ge_df.expect_table_row_count_to_be_between(min_value=1),
]
return self._run_expectations(expectations, "bronze")
def validate_silver(self, df):
"""Validate cleaned data."""
ge_df = SparkDFDataset(df)
expectations = [
ge_df.expect_column_values_to_not_be_null("order_id"),
ge_df.expect_column_values_to_not_be_null("customer_id"),
ge_df.expect_column_values_to_be_between("total", min_value=0.01, max_value=1000000),
ge_df.expect_column_values_to_be_in_set("status", ["pending", "confirmed", "shipped"]),
ge_df.expect_compound_columns_to_be_unique(["order_id", "order_date"]),
]
return self._run_expectations(expectations, "silver")
def validate_gold(self, df):
"""Validate business aggregates."""
ge_df = SparkDFDataset(df)
expectations = [
ge_df.expect_column_values_to_not_be_null("order_date"),
ge_df.expect_column_values_to_be_between("revenue", min_value=0),
ge_df.expect_column_values_to_be_between("order_count", min_value=0),
]
return self._run_expectations(expectations, "gold")
def _run_expectations(self, expectations, layer):
results = []
for exp in expectations:
results.append(exp)
failures = [r for r in results if not r.success]
if failures:
raise QualityError(f"{layer} quality check failed: {[r.expectation_config.expectation_type for r in failures]}")
return results
Pattern 5: Cost-Optimized Storage
Optimize storage costs across tiers.
# Storage optimization strategies
class StorageOptimizer:
def __init__(self, s3_client):
self.s3 = s3_client
def optimize_table_storage(self, table_path: str):
"""Optimize Delta table storage."""
# Z-ORDER for query optimization
delta_table = DeltaTable.forPath(spark, table_path)
delta_table.optimize().executeZOrderBy("customer_id", "order_date")
# Vacuum old files
delta_table.vacuum(retentionHours=168) # 7 days
def configure_compression(self, format: str = "zstd"):
"""Configure compression for different data types."""
spark.conf.set("spark.sql.parquet.compression.codec", format)
spark.conf.set("spark.sql.orc.compression.codec", format)
def implement_partitioning(self, df, partition_cols: list):
"""Intelligent partitioning strategy."""
# Analyze data distribution
for col in partition_cols:
distinct_count = df.select(col).distinct().count()
print(f"{col}: {distinct_count} distinct values")
# Apply partitioning
df.write \
.format("delta") \
.partitionBy(*partition_cols) \
.save(path)
def lifecycle_policy(self, bucket: str, prefix: str):
"""S3 lifecycle for cost optimization."""
lifecycle_config = {
'Rules': [
{
'ID': 'transition-to-ia',
'Filter': {'Prefix': f'{prefix}/raw/'},
'Status': 'Enabled',
'Transitions': [
{'Days': 30, 'StorageClass': 'STANDARD_IA'},
{'Days': 90, 'StorageClass': 'GLACIER'},
],
},
{
'ID': 'delete-old-versions',
'Filter': {'Prefix': f'{prefix}/'},
'Status': 'Enabled',
'NoncurrentVersionTransitions': [
{'NoncurrentDays': 30, 'StorageClass': 'STANDARD_IA'},
],
'NoncurrentVersionExpiration': {'NoncurrentDays': 365},
},
]
}
self.s3.put_bucket_lifecycle_configuration(
Bucket=bucket,
LifecycleConfiguration=lifecycle_config,
)
Lakehouse Technology Comparison
| Feature | Delta Lake | Apache Iceberg | Apache Hudi |
|---|---|---|---|
| ACID Transactions | Yes | Yes | Yes |
| Time Travel | Yes | Yes | Yes |
| Schema Evolution | Yes | Yes | Limited |
| Upserts | Yes | Yes | Yes |
| AWS Integration | Good | Good | Good |
Follow-Up Questions
- How do you handle real-time streaming into a data lakehouse architecture?
- What strategies would you use to migrate from a data warehouse to a lakehouse?
- How do you implement data governance and access control in a lakehouse?