๐ŸŽ‰ 75% of content is free forever โ€” Unlock Premium from $10/mo โ†’
CW
Search coursesโ€ฆ
๐Ÿ’ผ Servicesโ„น๏ธ Aboutโœ‰๏ธ ContactView Pricing Plansfrom $10

Cloud Data Lakehouse

Cloud ArchitectureData Architectureโญ Premium

Advertisement

Cloud Data Lakehouse

Difficulty: Senior Level | Companies: AWS, Google, Microsoft, Netflix, Uber

Lakehouse Architecture

The data lakehouse combines the low-cost storage of data lakes with the performance and ACID transactions of data warehouses.

โ„น๏ธ

Lakehouse eliminates the need for separate data lake and data warehouse systems. One architecture handles both analytics and machine learning workloads.

Lakehouse Architecture Diagram

Architecture Diagram
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Consumption Layer                       โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”              โ”‚
โ”‚  โ”‚ BI Tools โ”‚    โ”‚ ML Tools โ”‚    โ”‚ SQL Query โ”‚              โ”‚
โ”‚  โ”‚ Tableau  โ”‚    โ”‚  Jupyter โ”‚    โ”‚    Work   โ”‚              โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜              โ”‚
โ”‚       โ”‚               โ”‚               โ”‚                    โ”‚
โ”‚       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                    โ”‚
โ”‚                       โ”‚                                    โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚  โ”‚              Query Engine Layer                      โ”‚   โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚   โ”‚
โ”‚  โ”‚  โ”‚  Spark   โ”‚  โ”‚ Trino    โ”‚  โ”‚  Athena  โ”‚          โ”‚   โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚                        โ”‚                                   โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚  โ”‚              Table Format Layer                      โ”‚   โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚   โ”‚
โ”‚  โ”‚  โ”‚  Delta   โ”‚  โ”‚  Iceberg โ”‚  โ”‚   Hudi   โ”‚          โ”‚   โ”‚
โ”‚  โ”‚  โ”‚  Lake    โ”‚  โ”‚          โ”‚  โ”‚          โ”‚          โ”‚   โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚                        โ”‚                                   โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚  โ”‚              Storage Layer (S3/GCS/ADLS)            โ”‚   โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚   โ”‚
โ”‚  โ”‚  โ”‚ Raw Zone โ”‚  โ”‚  Curated โ”‚  โ”‚  Serving โ”‚          โ”‚   โ”‚
โ”‚  โ”‚  โ”‚  (Bronze)โ”‚  โ”‚  (Silver)โ”‚  โ”‚  (Gold)  โ”‚          โ”‚   โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Pattern 1: Medallion Architecture (Bronze/Silver/Gold)

Organize data in progressive quality tiers.

# Delta Lake medallion architecture
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Bronze Layer - Raw ingestion (append-only)
def ingest_to_bronze(source_path: str, bronze_path: str):
    """Ingest raw data to bronze layer."""
    raw_df = spark.read.format("json").load(source_path)
    
    # Add ingestion metadata
    bronze_df = raw_df \
        .withColumn("_ingestion_timestamp", current_timestamp()) \
        .withColumn("_source_file", input_file_name()) \
        .withColumn("_batch_id", monotonically_increasing_id())
    
    bronze_df.write \
        .format("delta") \
        .mode("append") \
        .partitionBy("_ingestion_date") \
        .save(bronze_path)

# Silver Layer - Cleaned and validated
def transform_to_silver(bronze_path: str, silver_path: str):
    """Clean and validate data for silver layer."""
    bronze_df = spark.read.format("delta").load(bronze_path)
    
    silver_df = bronze_df \
        .filter(col("order_id").isNotNull()) \
        .dropDuplicates(["order_id"]) \
        .withColumn("total", col("total").cast("decimal(10,2)")) \
        .withColumn("order_date", to_date(col("created_at"))) \
        .filter(col("total") > 0)
    
    # Merge (upsert) for dimension tables
    silver_table = DeltaTable.forPath(spark, silver_path)
    silver_table.alias("target").merge(
        silver_df.alias("source"),
        "target.order_id = source.order_id"
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()

# Gold Layer - Business-level aggregates
def build_gold(silver_path: str, gold_path: str):
    """Create business aggregates for gold layer."""
    orders_df = spark.read.format("delta").load(f"{silver_path}/orders")
    customers_df = spark.read.format("delta").load(f"{silver_path}/customers")
    
    # Daily revenue by region
    daily_revenue = orders_df \
        .join(customers_df, "customer_id") \
        .groupBy("order_date", "region") \
        .agg(
            count("order_id").alias("order_count"),
            sum("total").alias("revenue"),
            avg("total").alias("avg_order_value"),
        )
    
    daily_revenue.write \
        .format("delta") \
        .mode("overwrite") \
        .partitionBy("order_date") \
        .save(f"{gold_path}/daily_revenue")

โ„น๏ธ

Bronze stores raw data as-is. Silver cleans and validates. Gold contains business aggregates ready for consumption.

Pattern 2: Delta Lake Time Travel

Query historical data versions for auditing and debugging.

# Time travel queries with Delta Lake
# Query specific version
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load(silver_path)

# Query by timestamp
df_specific = spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-15") \
    .load(silver_path)

# View table history
delta_table = DeltaTable.forPath(spark, silver_path)
history = delta_table.history()
history.show()

# Restore to previous version
delta_table.restoreToVersion(5)

# Configure retention for time travel
spark.conf.set("delta.log.retentionDuration", "interval 30 days")
spark.conf.set("delta.deletedFileRetentionDuration", "interval 7 days")

Pattern 3: Schema Evolution

Handle schema changes without breaking pipelines.

# Schema evolution strategies
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Strategy 1: Add columns
spark.read.format("delta").load(path) \
    .withColumn("new_field", lit("default_value")) \
    .write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(path)

# Strategy 2: Merge schema
spark.read.format("delta").option("mergeSchema", "true").load(path) \
    .write.format("delta").mode("append").save(path)

# Strategy 3: Schema enforcement with evolution
spark.read.format("delta").load(path) \
    .write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save(path)

# Define explicit schema for strict control
schema = StructType([
    StructField("order_id", StringType(), False),
    StructField("customer_id", StringType(), False),
    StructField("total", IntegerType(), True),
    StructField("status", StringType(), True),
])

# Validate incoming data against schema
validated_df = spark.createDataFrame(incoming_data, schema)

Pattern 4: Data Quality Framework

Implement quality checks across all layers.

# Great Expectations integration with Delta Lake
from great_expectations.dataset import SparkDFDataset

class LakehouseQualityFramework:
    def __init__(self, spark_session):
        self.spark = spark_session
    
    def validate_bronze(self, df):
        """Validate raw data ingestion."""
        ge_df = SparkDFDataset(df)
        
        expectations = [
            ge_df.expect_column_values_to_not_be_null("order_id"),
            ge_df.expect_column_values_to_be_unique("order_id"),
            ge_df.expect_table_row_count_to_be_between(min_value=1),
        ]
        
        return self._run_expectations(expectations, "bronze")
    
    def validate_silver(self, df):
        """Validate cleaned data."""
        ge_df = SparkDFDataset(df)
        
        expectations = [
            ge_df.expect_column_values_to_not_be_null("order_id"),
            ge_df.expect_column_values_to_not_be_null("customer_id"),
            ge_df.expect_column_values_to_be_between("total", min_value=0.01, max_value=1000000),
            ge_df.expect_column_values_to_be_in_set("status", ["pending", "confirmed", "shipped"]),
            ge_df.expect_compound_columns_to_be_unique(["order_id", "order_date"]),
        ]
        
        return self._run_expectations(expectations, "silver")
    
    def validate_gold(self, df):
        """Validate business aggregates."""
        ge_df = SparkDFDataset(df)
        
        expectations = [
            ge_df.expect_column_values_to_not_be_null("order_date"),
            ge_df.expect_column_values_to_be_between("revenue", min_value=0),
            ge_df.expect_column_values_to_be_between("order_count", min_value=0),
        ]
        
        return self._run_expectations(expectations, "gold")
    
    def _run_expectations(self, expectations, layer):
        results = []
        for exp in expectations:
            results.append(exp)
        
        failures = [r for r in results if not r.success]
        if failures:
            raise QualityError(f"{layer} quality check failed: {[r.expectation_config.expectation_type for r in failures]}")
        
        return results

Pattern 5: Cost-Optimized Storage

Optimize storage costs across tiers.

# Storage optimization strategies
class StorageOptimizer:
    def __init__(self, s3_client):
        self.s3 = s3_client
    
    def optimize_table_storage(self, table_path: str):
        """Optimize Delta table storage."""
        # Z-ORDER for query optimization
        delta_table = DeltaTable.forPath(spark, table_path)
        delta_table.optimize().executeZOrderBy("customer_id", "order_date")
        
        # Vacuum old files
        delta_table.vacuum(retentionHours=168)  # 7 days
    
    def configure_compression(self, format: str = "zstd"):
        """Configure compression for different data types."""
        spark.conf.set("spark.sql.parquet.compression.codec", format)
        spark.conf.set("spark.sql.orc.compression.codec", format)
    
    def implement_partitioning(self, df, partition_cols: list):
        """Intelligent partitioning strategy."""
        # Analyze data distribution
        for col in partition_cols:
            distinct_count = df.select(col).distinct().count()
            print(f"{col}: {distinct_count} distinct values")
        
        # Apply partitioning
        df.write \
            .format("delta") \
            .partitionBy(*partition_cols) \
            .save(path)
    
    def lifecycle_policy(self, bucket: str, prefix: str):
        """S3 lifecycle for cost optimization."""
        lifecycle_config = {
            'Rules': [
                {
                    'ID': 'transition-to-ia',
                    'Filter': {'Prefix': f'{prefix}/raw/'},
                    'Status': 'Enabled',
                    'Transitions': [
                        {'Days': 30, 'StorageClass': 'STANDARD_IA'},
                        {'Days': 90, 'StorageClass': 'GLACIER'},
                    ],
                },
                {
                    'ID': 'delete-old-versions',
                    'Filter': {'Prefix': f'{prefix}/'},
                    'Status': 'Enabled',
                    'NoncurrentVersionTransitions': [
                        {'NoncurrentDays': 30, 'StorageClass': 'STANDARD_IA'},
                    ],
                    'NoncurrentVersionExpiration': {'NoncurrentDays': 365},
                },
            ]
        }
        
        self.s3.put_bucket_lifecycle_configuration(
            Bucket=bucket,
            LifecycleConfiguration=lifecycle_config,
        )

Lakehouse Technology Comparison

FeatureDelta LakeApache IcebergApache Hudi
ACID TransactionsYesYesYes
Time TravelYesYesYes
Schema EvolutionYesYesLimited
UpsertsYesYesYes
AWS IntegrationGoodGoodGood

Follow-Up Questions

  1. How do you handle real-time streaming into a data lakehouse architecture?
  2. What strategies would you use to migrate from a data warehouse to a lakehouse?
  3. How do you implement data governance and access control in a lakehouse?

Advertisement