Data Lakehouse Architecture
The data lakehouse combines the flexibility of data lakes with the reliability of data warehouses. Learn the core technologies and patterns that make this possible.
Why Lakehouse?
Traditional architectures forced a choice: data lakes for cheap storage and flexibility, or warehouses for ACID transactions and SQL. The lakehouse eliminates this tradeoff.
Delta Lake Deep Dive
Delta Lake adds ACID transactions, time travel, and schema enforcement to Parquet files.
from delta import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DeltaLake") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Create a Delta table with schema enforcement
data = [
(1, "Alice", 28, 75000.0),
(2, "Bob", 35, 92000.0),
(3, "Charlie", 42, 110000.0)
]
columns = ["id", "name", "age", "salary"]
df = spark.createDataFrame(data, columns)
df.write.format("delta").mode("overwrite").save("/delta/employees")
# Schema evolution - adding a column
new_data = [
(4, "Diana", 31, 88000.0, "Engineering"),
(5, "Eve", 29, 95000.0, "Data Science")
]
new_columns = ["id", "name", "age", "salary", "department"]
new_df = spark.createDataFrame(new_data, new_columns)
new_df.write.format("delta").mode("append") \
.option("mergeSchema", "true") \
.save("/delta/employees")
# Time travel - query historical versions
historical_df = spark.read.format("delta") \
.option("versionAsOf", 0) \
.load("/delta/employees")
# ACID update operation
delta_table = DeltaTable.forPath(spark, "/delta/employees")
delta_table.update(
condition="name = 'Bob'",
set={"salary": "salary * 1.10"}
)
# Delta Lake optimizations
delta_table.optimize().executeCompaction()
delta_table.vacuum(168) # Retain 7 days of history
Apache Iceberg
Iceberg provides hidden partitioning, schema evolution without rewriting data, and snapshot isolation.
# Iceberg with PySpark
spark = SparkSession.builder \
.appName("IcebergExample") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local.type", "hadoop") \
.config("spark.sql.catalog.local.warehouse", "warehouse") \
.getOrCreate()
# Create an Iceberg table
spark.sql("""
CREATE TABLE local.db.users (
id BIGINT,
name STRING,
email STRING,
created_at TIMESTAMP
)
USING iceberg
PARTITIONED BY (days(created_at))
""")
# Schema evolution in Iceberg - add, rename, reorder columns
spark.sql("ALTER TABLE local.db.users ADD COLUMN phone STRING")
spark.sql("ALTER TABLE local.db.users RENAME COLUMN name TO full_name")
spark.sql("ALTER TABLE local.db.users ALTER COLUMN email TYPE STRING NOT NULL")
# Hidden partitioning - Iceberg handles partitions automatically
spark.sql("""
INSERT INTO local.db.users VALUES
(1, 'Alice', 'alice@example.com', '2024-01-15'),
(2, 'Bob', 'bob@example.com', '2024-03-22')
""")
# Query with partition pruning (automatic)
spark.sql("""
SELECT * FROM local.db.users
WHERE created_at BETWEEN '2024-01-01' AND '2024-12-31'
""")
# Iceberg table maintenance
spark.sql("CALL local.system.rewrite_data_files(table => 'db.users')")
spark.sql("CALL local.system.expire_snapshots(table => 'db.users', retain_max_snapshots => 10)")
Apache Hudi
Hudi excels at incremental processing and upsert-heavy workloads.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("HudiExample") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("hoodie.datasource.write.recordkey.field", "id") \
.config("hoodie.datasource.write.precombine.field", "updated_at") \
.config("hoodie.table.name", "user_profiles") \
.getOrCreate()
# Write initial data
df.write.format("hudi") \
.option("hoodie.datasource.write.operation", "upsert") \
.option("hoodie.datasource.write.table.type", "COPY_ON_WRITE") \
.mode("overwrite") \
.save("/hudi/user_profiles")
# Incremental read - only new/changed records
hudi_df = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", "20240101000000") \
.load("/hudi/user_profiles")
# Merge on read for better write performance
df.write.format("hudi") \
.option("hoodie.datasource.write.table.type", "MERGE_ON_READ") \
.option("hoodie.compact.inline.max.delta.commits", "5") \
.mode("append") \
.save("/hudi/user_profiles")
# Global index for upserts across partitions
df.write.format("hudi") \
.option("hoodie.index.type", "GLOBAL_BLOOM") \
.option("hoodie.bloom.index.filter.type", "DYNAMIC_V0") \
.mode("append") \
.save("/hudi/user_profiles")
Comparing Table Formats
| Feature | Delta Lake | Iceberg | Hudi |
|---|---|---|---|
| ACID Transactions | Yes | Yes | Yes |
| Time Travel | Yes | Yes | Yes |
| Schema Evolution | Yes | Yes (best) | Yes |
| Hidden Partitioning | No | Yes | No |
| Upsert Performance | Good | Good | Best |
| Streaming Support | Good | Good | Best |
| Community | Databricks | Apache | Uber/Apache |
Best Practices
- Start small β Pick one table format and master it before expanding
- Partition wisely β Low-cardinality columns work best for partitioning
- Compact regularly β Small files kill read performance
- Set retention policies β Use vacuum/expire snapshots to manage storage
- Monitor file counts β Track small file accumulation as an operational metric