πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Data Lakehouse Architecture

⭐ Premium

Advertisement

Data Lakehouse Architecture

The data lakehouse combines the flexibility of data lakes with the reliability of data warehouses. Learn the core technologies and patterns that make this possible.

Why Lakehouse?

Traditional architectures forced a choice: data lakes for cheap storage and flexibility, or warehouses for ACID transactions and SQL. The lakehouse eliminates this tradeoff.

Data Lakehouse ArchitectureSQL Analytics / BI / MLDashboards, Reports, Model TrainingMetadata & Governance LayerCatalog, Lineage, Access ControlTable Format (Delta / Iceberg / Hudi)ACID Transactions, Time Travel, Schema EvolutionStorage (S3 / GCS / ADLS / HDFS)Parquet, ORC, Avro FilesSparkEnginePrestoTrino

Delta Lake Deep Dive

Delta Lake adds ACID transactions, time travel, and schema enforcement to Parquet files.

from delta import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DeltaLake") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", 
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Create a Delta table with schema enforcement
data = [
    (1, "Alice", 28, 75000.0),
    (2, "Bob", 35, 92000.0),
    (3, "Charlie", 42, 110000.0)
]
columns = ["id", "name", "age", "salary"]
df = spark.createDataFrame(data, columns)

df.write.format("delta").mode("overwrite").save("/delta/employees")

# Schema evolution - adding a column
new_data = [
    (4, "Diana", 31, 88000.0, "Engineering"),
    (5, "Eve", 29, 95000.0, "Data Science")
]
new_columns = ["id", "name", "age", "salary", "department"]
new_df = spark.createDataFrame(new_data, new_columns)

new_df.write.format("delta").mode("append") \
    .option("mergeSchema", "true") \
    .save("/delta/employees")

# Time travel - query historical versions
historical_df = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load("/delta/employees")

# ACID update operation
delta_table = DeltaTable.forPath(spark, "/delta/employees")
delta_table.update(
    condition="name = 'Bob'",
    set={"salary": "salary * 1.10"}
)

# Delta Lake optimizations
delta_table.optimize().executeCompaction()
delta_table.vacuum(168)  # Retain 7 days of history

Apache Iceberg

Iceberg provides hidden partitioning, schema evolution without rewriting data, and snapshot isolation.

# Iceberg with PySpark
spark = SparkSession.builder \
    .appName("IcebergExample") \
    .config("spark.sql.catalog.spark_catalog", 
            "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "warehouse") \
    .getOrCreate()

# Create an Iceberg table
spark.sql("""
    CREATE TABLE local.db.users (
        id BIGINT,
        name STRING,
        email STRING,
        created_at TIMESTAMP
    )
    USING iceberg
    PARTITIONED BY (days(created_at))
""")

# Schema evolution in Iceberg - add, rename, reorder columns
spark.sql("ALTER TABLE local.db.users ADD COLUMN phone STRING")
spark.sql("ALTER TABLE local.db.users RENAME COLUMN name TO full_name")
spark.sql("ALTER TABLE local.db.users ALTER COLUMN email TYPE STRING NOT NULL")

# Hidden partitioning - Iceberg handles partitions automatically
spark.sql("""
    INSERT INTO local.db.users VALUES
    (1, 'Alice', 'alice@example.com', '2024-01-15'),
    (2, 'Bob', 'bob@example.com', '2024-03-22')
""")

# Query with partition pruning (automatic)
spark.sql("""
    SELECT * FROM local.db.users 
    WHERE created_at BETWEEN '2024-01-01' AND '2024-12-31'
""")

# Iceberg table maintenance
spark.sql("CALL local.system.rewrite_data_files(table => 'db.users')")
spark.sql("CALL local.system.expire_snapshots(table => 'db.users', retain_max_snapshots => 10)")

Apache Hudi

Hudi excels at incremental processing and upsert-heavy workloads.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("HudiExample") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("hoodie.datasource.write.recordkey.field", "id") \
    .config("hoodie.datasource.write.precombine.field", "updated_at") \
    .config("hoodie.table.name", "user_profiles") \
    .getOrCreate()

# Write initial data
df.write.format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE") \
    .mode("overwrite") \
    .save("/hudi/user_profiles")

# Incremental read - only new/changed records
hudi_df = spark.read.format("hudi") \
    .option("hoodie.datasource.query.type", "incremental") \
    .option("hoodie.datasource.read.begin.instanttime", "20240101000000") \
    .load("/hudi/user_profiles")

# Merge on read for better write performance
df.write.format("hudi") \
    .option("hoodie.datasource.write.table.type", "MERGE_ON_READ") \
    .option("hoodie.compact.inline.max.delta.commits", "5") \
    .mode("append") \
    .save("/hudi/user_profiles")

# Global index for upserts across partitions
df.write.format("hudi") \
    .option("hoodie.index.type", "GLOBAL_BLOOM") \
    .option("hoodie.bloom.index.filter.type", "DYNAMIC_V0") \
    .mode("append") \
    .save("/hudi/user_profiles")

Comparing Table Formats

FeatureDelta LakeIcebergHudi
ACID TransactionsYesYesYes
Time TravelYesYesYes
Schema EvolutionYesYes (best)Yes
Hidden PartitioningNoYesNo
Upsert PerformanceGoodGoodBest
Streaming SupportGoodGoodBest
CommunityDatabricksApacheUber/Apache

Best Practices

  1. Start small – Pick one table format and master it before expanding
  2. Partition wisely – Low-cardinality columns work best for partitioning
  3. Compact regularly – Small files kill read performance
  4. Set retention policies – Use vacuum/expire snapshots to manage storage
  5. Monitor file counts – Track small file accumulation as an operational metric

Advertisement