Data Lakehouse Architecture with PySpark
Lakehouse Stack & Medallion Pattern
Architecture Diagram: Lakehouse Stack
Architecture Diagram: Medallion Pattern (Bronze β Silver β Gold)
Architecture Diagram: ACID Transaction Flow on Data Lake
Detailed Explanation
The Data Lakehouse represents a paradigm shift in enterprise data architecture, merging the flexibility and cost-efficiency of data lakes with the transactional guarantees and performance of data warehouses. This convergence eliminates the traditional "two-system" problem where organizations maintained separate systems for analytical and operational workloads, each with its own copy of data, governance model, and query engine.
What is Data Lakehouse Architecture?
At its foundation, the Lakehouse relies on an open table format (Delta Lake, Apache Iceberg, or Apache Hudi) that provides ACID transactions, schema enforcement, time travel, and metadata management directly on top of cheap object storage (S3, ADLS, GCS).
Key Components:
- Transaction Log (
_delta_log/for Delta Lake): Single source of truth, recording every mutation as an immutable, ordered JSON entry - Optimistic Concurrency Control: Multiple writers can operate simultaneously, with conflicts detected and resolved at commit time through a compare-and-swap mechanism on read-set versions
- Open Table Formats: Delta Lake, Apache Iceberg, or Apache Hudi
What is the Medallion Architecture?
The Medallion Architecture organizes data into three distinct tiers (Bronze, Silver, Gold), each with specific quality guarantees and transformation requirements.
| Layer | Description | Quality Guarantees |
|---|---|---|
| Bronze | Ingests raw, unvalidated data from all sources with minimal transformation, preserving the original format for auditability and replay capability | Accepts any valid data regardless of quality |
| Silver | Applies cleansing, deduplication, standardization, and schema enforcement, producing a "single source of truth" that conforms to enterprise data models | Enforces NOT NULL constraints, data type validation, referential integrity checks, and temporal correctness |
| Gold | Contains business-level aggregates, dimensional models, and materialized views optimized for consumption by BI tools, ML pipelines, and applications | Guarantees that aggregations are idempotent, dimensions are conformed, and business rules are applied consistently across all domains |
What is Time Travel?
Time travel (also called data versioning) is a critical capability enabled by the transaction log. Every commit creates a snapshot that can be queried as of a specific timestamp or version number.
Use Cases:
- Regulatory audit trails: Reproduce any historical state
- Debugging data quality issues: Compare current vs. previous state
- Point-in-time joins: Join two tables as of the same logical moment
- Rollback capabilities: Revert to any previous consistent state
What is Schema Evolution?
Schema evolution and enforcement work together to maintain data quality while accommodating changing business requirements.
| Approach | Description |
|---|---|
| Schema Enforcement | Validates that incoming data matches the existing schema and rejects incompatible writes |
| Schema Evolution | Allows controlled modifications (adding columns, changing data types, renaming fields) through explicit ALTER commands or write-mode configurations (mergeSchema option) |
What are Advanced Optimization Features?
The table format layer provides advanced optimization features:
| Feature | Description | Benefit |
|---|---|---|
| Data Skipping | File-level statistics (min/max values stored in Parquet footers and the transaction log) | Improved query performance |
| Z-ORDER Multi-dimensional Clustering | Efficient point queries on correlated columns | Better data locality |
| Compaction (OPTIMIZE command) | Merge small files into larger ones | Better read performance |
| Liquid Clustering | Adaptive, automatic data layout optimization | Automatic optimization |
Key Takeaway: The Data Lakehouse eliminates the need for separate OLAP systems by providing ACID guarantees, schema enforcement, and time travel directly on cheap object storage through open table formats.
Mathematical Foundations
Definition: Lakehouse Architecture
A lakehouse combines data lake storage (cheap, scalable object storage) with data warehouse semantics (ACID, schema enforcement, governance) such that:
Medallion Pattern Flow
Data flows through three tiers:
Each tier applies transformations : where .
ACID on Data Lake Theorem
Transaction log ensures atomicity and isolation for concurrent operations on lake storage:
Consistency is maintained because each operation is a commit that atomically adds/removes file references.
Storage Cost Model
Total cost for lakehouse with tiers:
Typical ratio: .
Data Freshness
End-to-end freshness from ingestion to Gold tier:
Target: .
Key Insight
The lakehouse pattern eliminates the "copy tax" of traditional architectures where data must be moved between systems. Delta Lake/Iceberg/Hudi provide the transaction layer that makes ACID possible on object storage.
Summary
The lakehouse unifies lake storage with warehouse semantics through transaction logs. The medallion pattern organizes data by quality tiers. Storage costs follow tiered pricing models, and freshness is the sum of pipeline stage latencies. ACID guarantees eliminate the need for separate OLAP systems.
Key Concepts Table
| Concept | Delta Lake | Apache Iceberg | Apache Hudi | Description |
|---|---|---|---|---|
| ACID Transactions | Yes (MVCC) | Yes (snapshot isolation) | Yes (copy-on-write / merge-on-read) | Transactional semantics on object storage |
| Schema Evolution | Yes (mergeSchema) | Yes (evolve schema) | Yes (schema evolution) | Safe schema changes without rewriting data |
| Time Travel | Yes (VERSION AS OF) | Yes (snapshots) | Yes (timeline) | Query historical data states |
| Partition Evolution | Yes (dynamic) | Yes (hidden partitions) | Yes (evolvable) | Change partitioning without rewriting |
| Conflict Resolution | Optimistic concurrency | Optimistic concurrency | Optimistic concurrency | Handle concurrent writers gracefully |
| Open Format | Yes (Parquet + JSON log) | Yes (Parquet + metadata) | Yes (Parquet + metadata) | No vendor lock-in |
| Streaming Support | Yes (Structured Streaming) | Yes (Spark/Flink) | Yes (Spark/Flink) | Real-time ingestion and processing |
| DML Operations | INSERT, UPDATE, DELETE, MERGE | INSERT, UPDATE, DELETE, MERGE | INSERT, UPDATE, DELETE, UPSERT | Full SQL-compatible data manipulation |
| Caching | Yes (Delta Cache) | No (relies on storage) | No (relies on storage) | Local caching for performance |
| Statistics | Column-level stats | Partition-level stats | Column-level stats | Data skipping for query optimization |
Code Examples
Example 1: Setting Up a Delta Lake Table with Medallion Pattern
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta import configure_spark_with_delta_pip
# Initialize Spark with Delta Lake
builder = (
SparkSession.builder
.appName("Lakehouse-Medallion")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.databricks.delta.schema.autoMerge.enabled", "true")
.config("spark.sql.adaptive.enabled", "true")
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# βββ BRONZE LAYER: Raw Ingestion βββ
# Ingest raw transaction data with full audit metadata
raw_transactions = (
spark.read
.format("json")
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.load("/mnt/raw/transactions/*.json")
.withColumn("_ingestion_timestamp", current_timestamp())
.withColumn("_source_system", lit("erp_system"))
.withColumn("_batch_id", lit("batch_20260101"))
)
# Write to Bronze with append-only semantics
(
raw_transactions
.write
.format("delta")
.mode("append")
.partitionBy("_source_system")
.save("/mnt/lakehouse/bronze/transactions")
)
# βββ SILVER LAYER: Cleansed & Conformed βββ
# Read Bronze, apply cleansing rules, write to Silver
bronze_df = spark.read.format("delta").load("/mnt/lakehouse/bronze/transactions")
silver_transactions = (
bronze_df
.filter(col("_corrupt_record").isNull()) # Remove corrupt records
.drop("_corrupt_record", "_ingestion_timestamp")
.withColumn("amount", col("amount").cast(DecimalType(18, 2)))
.withColumn("transaction_date", to_date(col("timestamp")))
.withColumn("customer_id", upper(trim(col("customer_id"))))
.dropDuplicates(["transaction_id"])
.withColumn("_quality_score",
when(col("amount") > 0, lit(1.0))
.when(col("amount") < 0, lit(0.8))
.otherwise(lit(0.5))
)
)
# Merge into Silver (upsert for idempotency)
(
spark.read.format("delta").load("/mnt/lakehouse/silver/transactions")
.alias("target")
.merge(
silver_transactions.alias("source"),
"target.transaction_id = source.transaction_id"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# βββ GOLD LAYER: Business Aggregates βββ
# Compute daily revenue metrics per region
gold_daily_revenue = (
spark.read.format("delta").load("/mnt/lakehouse/silver/transactions")
.groupBy(
col("transaction_date"),
col("region"),
col("product_category")
)
.agg(
count("*").alias("transaction_count"),
sum("amount").alias("total_revenue"),
avg("amount").alias("avg_transaction_value"),
approx_count_distinct("customer_id").alias("unique_customers"),
stddev("amount").alias("revenue_stddev")
)
.withColumn("revenue_per_customer", col("total_revenue") / col("unique_customers"))
)
(
gold_daily_revenue
.write
.format("delta")
.mode("overwrite")
.partitionBy("transaction_date")
.option("overwriteSchema", "true")
.save("/mnt/lakehouse/gold/daily_revenue")
)
Example 2: Advanced ACID Operations and Time Travel
from delta.tables import DeltaTable
from pyspark.sql.functions import *
# Load existing Delta table
transactions_table = DeltaTable.forPath(spark, "/mnt/lakehouse/silver/transactions")
# βββ MERGE (UPSERT) with Complex Logic βββ
new_data = spark.read.format("delta").load("/mnt/staging/transactions_updates")
(
transactions_table.alias("target")
.merge(
new_data.alias("source"),
"target.transaction_id = source.transaction_id"
)
.whenMatchedUpdate(
condition="source.update_type = 'CORRECTION'",
set={
"amount": col("source.amount"),
"description": col("source.description"),
"_last_modified": current_timestamp()
}
)
.whenMatchedDelete(
condition="source.update_type = 'REVERSAL'"
)
.whenNotMatchedInsert(
values={
"transaction_id": col("source.transaction_id"),
"customer_id": col("source.customer_id"),
"amount": col("source.amount"),
"product_category": col("source.product_category"),
"region": col("source.region"),
"_created_at": current_timestamp(),
"_last_modified": current_timestamp()
}
)
.execute()
)
# βββ TIME TRAVEL: Query Historical States βββ
# Query as of a specific timestamp
historical_df = (
spark.read
.format("delta")
.option("timestampAsOf", "2026-01-01")
.load("/mnt/lakehouse/silver/transactions")
)
# Query as of a specific version
version_df = (
spark.read
.format("delta")
.option("versionAsOf", 42)
.load("/mnt/lakehouse/silver/transactions")
)
# Compare two versions to detect changes
current_df = spark.read.format("delta").load("/mnt/lakehouse/silver/transactions")
changes_df = (
current_df.alias("current")
.join(
version_df.alias("old"),
"transaction_id",
"full_outer"
)
.withColumn("change_type",
when(col("old.transaction_id").isNull(), lit("INSERTED"))
.when(col("current.transaction_id").isNull(), lit("DELETED"))
.when(col("current.amount") != col("old.amount"), lit("UPDATED"))
.otherwise(lit("UNCHANGED"))
)
.filter(col("change_type") != "UNCHANGED")
)
# βββ VACUUM: Remove Old Files βββ
# Set retention interval (default 7 days, minimum 24 hours)
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
transactions_table.vacuum(retentionHours=168) # 7 days
# βββ OPTIMIZE: Compact Small Files βββ
(
transactions_table.optimize()
.executeCompaction()
)
# Z-ORDER for multi-dimensional clustering
(
transactions_table.optimize()
.executeZOrderBy("customer_id", "product_category")
)
Example 3: Apache Iceberg Integration with PySpark
# Configure Spark for Iceberg
spark = (
SparkSession.builder
.appName("Iceberg-Lakehouse")
.config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.my_catalog.type", "hadoop")
.config("spark.sql.catalog.my_catalog.warehouse", "/mnt/lakehouse/iceberg")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.SparkSessionExtensions")
.getOrCreate()
)
# Create Iceberg table with advanced features
spark.sql("""
CREATE TABLE IF NOT EXISTS my_catalog.sales (
transaction_id STRING,
customer_id STRING,
amount DECIMAL(18, 2),
product_category STRING,
region STRING,
transaction_date DATE,
_created_at TIMESTAMP
)
USING iceberg
PARTITIONED BY (transaction_date)
TBLPROPERTIES (
'format-version' = '2',
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'zstd',
'write.target-file-size-bytes' = '134217728',
'commit.manifest-merge.enabled' = 'true'
)
""")
# Schema evolution: add a new column
spark.sql("""
ALTER TABLE my_catalog.sales
ADD COLUMN payment_method STRING AFTER region
""")
# Partition evolution (no data rewrite required)
spark.sql("""
ALTER TABLE my_catalog.sales
DROP PARTITION FIELD transaction_date
""")
spark.sql("""
ALTER TABLE my_catalog.sales
ADD PARTITION FIELD bucket(16, customer_id)
""")
# Snapshot-based time travel
spark.sql("""
SELECT * FROM my_catalog.sales
TIMESTAMP AS OF '2026-01-01 00:00:00'
WHERE region = 'US'
""")
# Expire old snapshots
spark.sql("""
CALL my_catalog.system.expire_snapshots(
table => 'sales',
older_than => TIMESTAMP '2025-12-01 00:00:00',
retain_last => 5
)
""")
Performance Metrics
| Metric | Before Optimization | After Optimization | Improvement |
|---|---|---|---|
| Average File Size | 1 MB (small files) | 128 MB (optimized) | 128x fewer files |
| Query Latency (P50) | 45 seconds | 2.3 seconds | 95% reduction |
| Query Latency (P99) | 320 seconds | 8.7 seconds | 97% reduction |
| Storage Cost (per TB) | 12/month | 48% reduction | |
| Write Throughput | 50 MB/s | 450 MB/s | 9x improvement |
| Concurrent Writers | 2 (frequent conflicts) | 12 (rare conflicts) | 6x scaling |
| Time Travel Query | N/A | 3.1 seconds | New capability |
| Compaction Duration | N/A | 8.5 min/100GB | New capability |
| Data Skipped (%) | 0% | 85% (Z-ORDER) | New capability |
| Snapshot Retention | Unlimited | Configurable | Space savings |
Best Practices
-
Always use the Medallion Architecture β Maintain clear separation between Bronze (raw), Silver (cleansed), and Gold (aggregated) layers with explicit quality gates between them.
-
Enable Adaptive Query Execution (AQE) β Set
spark.sql.adaptive.enabled=trueto allow Spark to dynamically optimize shuffle partitions, join strategies, and skew handling at runtime. -
Configure file compaction proactively β Run
OPTIMIZEregularly on Silver/Gold tables to merge small files; target file sizes of 128β512 MB for optimal read performance. -
Apply Z-ORDER clustering on frequently filtered columns β Use
OPTIMIZE ZORDER BYon columns commonly used in WHERE clauses, JOINs, and GROUP BY operations for data skipping. -
Set appropriate vacuum retention β Configure vacuum retention to match your compliance requirements; use
VACUUMto reclaim storage from old files while maintaining the ability to time-travel. -
Use MERGE for idempotent writes β Always use
MERGE(upsert) instead ofINSERTfor Silver-layer writes to ensure idempotency and prevent duplicate records on job retries. -
Monitor file statistics β Use
DESCRIBE DETAILandDESCRIBE HISTORYto monitor table health, file counts, and commit frequency; set alerts for excessive small file counts. -
Partition wisely β Partition by low-cardinality, high-filtration columns (date, region); avoid partitioning by high-cardinality columns (customer_id, transaction_id) which create excessive small files.
-
Leverage schema evolution carefully β Use
mergeSchemaoption sparingly and always validate schema changes against downstream consumers before applying; maintain a schema registry for governance. -
Implement data quality gates β Use tools like Great Expectations, Deequ, or custom validation in the Silver layer to ensure data meets quality standards before promotion to Gold.
-
Version your data models β Tag Gold-layer tables with version numbers and maintain backward compatibility; use schema evolution to add columns rather than modifying existing ones.
-
Optimize for your workload β For read-heavy workloads, prioritize compaction and Z-ORDER; for write-heavy workloads, use larger file target sizes and consider liquid clustering for automatic optimization.
Mathematical Foundation Summary
The Medallion architecture follows Bronze(raw) β Silver(validated) β Gold(aggregated) where each layer applies progressively stricter quality guarantees. Data freshness follows freshness = current_time - max(event_time) and should meet business SLAs. The ACID cost model shows write_cost = O(n Γ log(n)) for sorted formats versus O(n) for append-only. Z-ORDER achieves data skipping efficiency of skip_ratio β 1 - (1/2^k) where k is Z-ORDER column count. The lakehouse cost advantage: TCO_lakehouse = TCO_warehouse Γ 0.2-0.4 based on storage and compute separation.
See also: Iceberg Integration (21), Delta Lake Operations (22), Hudi Operations (23), SCD (35), CDC (36)
See Also
- Iceberg Integration β Apache Iceberg table format
- Delta Lake β Delta Lake ACID transactions
- Hudi Operations β Apache Hudi for incremental processing
- Data Lake Architecture β Data lake design patterns