🐄 Apache Hudi Operations in PySpark
COW vs MOR Table Structure
Architecture Diagram
Detailed Explanation
What is Apache Hudi?
Apache Hudi (Hadoop Upserts Deletes and Incremental processing) is an open-source data lake platform that provides record-level upserts, incremental processing, and table services.
Key Takeaway: Hudi enables efficient update and delete operations at the record level, making it ideal for near-real-time data ingestion.
File Layout Management
Hudi's innovation is its file layout management system that surgically updates only affected files.
| Component | Purpose |
|---|---|
| Base Files | Parquet format storing actual data |
| Log Files | Append-only files for MOR tables |
| Metadata Layer | Tracks record locations across files |
| Record Index | Bloom filters for efficient record lookup |
COW vs MOR Table Types
| Aspect | Copy-on-Write (COW) | Merge-on-Read (MOR) |
|---|---|---|
| Update Mechanism | Rewrites entire base file | Appends to log files |
| Write Performance | Expensive (full rewrite) | Fast (append-only) |
| Read Performance | Fast (no merge overhead) | Slower (merge at read time) |
| Storage Overhead | Higher | Lower |
| Ideal Workload | Read-heavy | Write-heavy, near-real-time |
Timeline Mechanism
Hudi's timeline is the central coordination mechanism tracking all operations.
- Each commit, compaction, or cleaning creates a timestamped entry
- Enables incremental processing—downstream consumers query only changed files
- Different from full-refresh patterns that reprocess entire datasets
Compaction Service
Compaction is critical for maintaining MOR table read performance.
Configuration Options:
- Inline — Runs during writes
- Asynchronous — Background processing
- On-demand — Triggered manually
Compaction Selection Factors:
- Log file count
- Log file size
- Time since last compaction
Mathematical Foundations
Definition: Hudi Timeline
The Hudi timeline is a chronologically ordered set of actions where each action has an operation type , state , and timestamp .
Copy-on-Write Merge
For a COW table, an upsert of record set into file group produces:
The entire file is rewritten, achieving I/O.
MOR Read Consistency Theorem
A MOR read at time merges the base file with log file where:
This guarantees eventual consistency: as compaction merges into , read latency decreases monotonically.
Compaction Write Amplification
For MOR compaction merging base file with log files :
Target: to keep compaction overhead bounded.
File Grouping Efficiency
Optimal file sizing minimizes the total number of file groups:
Key Insight
Hudi's file group abstraction enables record-level versioning without full file rewrites. This makes MOR ideal for write-heavy workloads where COW's rewrite cost becomes prohibitive.
Summary
Hudi's timeline provides chronological ordering, COW achieves read-optimized merges at write cost, MOR optimizes writes with deferred compaction. The write amplification metric guides compaction scheduling to balance read/write performance.
Key Concepts Table
| Concept | Description | When to Use |
|---|---|---|
| COW Table | Copy-on-Write: rewrites files on update | Read-heavy, batch workloads |
| MOR Table | Merge-on-Read: appends logs on update | Write-heavy, near-real-time ingestion |
| Timeline | Chronological list of all commits | Incremental processing, audit trail |
| File Group | Set of base files + associated log files | Parallel processing unit |
| Record Index | Bloom filter index on record keys | Efficient record lookup for updates |
| Bucket Index | Hash-based file assignment | High-volume upsert workloads |
| Global Index | Index across all partitions | Global uniqueness enforcement |
| Compaction | Merges log files into base files | MOR read performance optimization |
| Clustering | Groups small files into larger ones | File size optimization |
| Cleaner | Removes old file versions | Storage reclamation |
Code Examples
Setting Up Hudi with PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("HudiOperations") \
.config("spark.jars.packages", "org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.1") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Define Hudi configuration
hudi_options = {
'hoodie.table.name': 'customers',
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.recordkey.field': 'customer_id',
'hoodie.datasource.write.precombine.field': 'update_timestamp',
'hoodie.datasource.write.partitionpath.field': 'region',
'hoodie.table.shuffle.parallelism': '200',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.shuffle.enable': 'true',
'hoodie.upsert.shuffle.parallelism': '200',
'hoodie.insert.shuffle.parallelism': '200',
}
# Create initial dataset
customers_data = [
(1, "Alice", "North", "2024-01-15 10:00:00", "alice@email.com"),
(2, "Bob", "South", "2024-01-15 10:00:00", "bob@email.com"),
(3, "Charlie", "East", "2024-01-15 10:00:00", "charlie@email.com"),
(4, "Diana", "West", "2024-01-15 10:00:00", "diana@email.com"),
]
df = spark.createDataFrame(customers_data, [
"customer_id", "name", "region", "update_timestamp", "email"
])
df.write.format("hudi") \
.options(**hudi_options) \
.mode("overwrite") \
.save("/hudi/customers")
# Upsert - update existing records
updates_data = [
(1, "Alice Smith", "North", "2024-01-16 10:00:00", "alice.smith@email.com"),
(5, "Eve", "South", "2024-01-16 10:00:00", "eve@email.com"),
]
updates_df = spark.createDataFrame(updates_data, [
"customer_id", "name", "region", "update_timestamp", "email"
])
updates_df.write.format("hudi") \
.options(**hudi_options) \
.mode("append") \
.save("/hudi/customers")
MOR Table with Incremental Queries
# Create MOR table for high-frequency updates
mor_options = {
'hoodie.table.name': 'transactions',
'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field': 'txn_id',
'hoodie.datasource.write.precombine.field': 'event_time',
'hoodie.datasource.write.partitionpath.field': 'date',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.compaction.inline': 'false',
'hoodie.logfile.max.size': '1073741824',
'hoodie.logfile.to.parquet.compression.ratio': '0.5',
'hoodie.parquet.max.file.size': '134217728',
}
# Write transactions
txn_data = [
("TXN001", "CUST001", "2024-01-15 08:00:00", 100.00, "2024-01-15"),
("TXN002", "CUST002", "2024-01-15 08:30:00", 250.00, "2024-01-15"),
("TXN003", "CUST001", "2024-01-15 09:00:00", 75.00, "2024-01-15"),
]
txn_df = spark.createDataFrame(txn_data, [
"txn_id", "cust_id", "event_time", "amount", "date"
])
txn_df.write.format("hudi") \
.options(**mor_options) \
.mode("overwrite") \
.save("/hudi/transactions")
# Incremental read from last commit
incremental_df = spark.read \
.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", "2024-01-15T08:00:00") \
.option("hoodie.datasource.read.end.instanttime", "2024-01-15T09:00:00") \
.load("/hudi/transactions")
incremental_df.show(truncate=False)
Compaction and Clustering
# Schedule compaction for MOR table
spark.sql("""
CALL hudi_compact(
table => 'hudi.transactions',
params => map(
'hoodie.compaction.target.io', '500000000',
'hoodie.compaction.target.filesize', '134217728'
)
)
""")
# Async compaction
spark.sql("""
CALL hudi_compact_async(
table => 'hudi.transactions',
params => map(
'hoodie.compaction.target.io', '500000000'
)
)
""")
# Clustering for COW tables
spark.sql("""
CALL hudi_clustering(
table => 'hudi.customers',
params => map(
'hoodie.clustering.target.filemax', '10',
'hoodie.clustering.sort.columns', 'customer_id',
'hoodie.clustering.max.bytes.per.file', '134217728'
)
)
""")
# Show timeline
spark.sql("SHOW COMMITS hudi.transactions").show(truncate=False)
# Show compaction plan
spark.sql("SHOW COMPACTION hudi.transactions").show(truncate=False)
Schema Evolution and Partition Evolution
# Schema evolution
spark.sql("""
ALTER TABLE hudi.customers ADD COLUMNS (
phone_number STRING,
loyalty_tier STRING DEFAULT 'Bronze'
)
""")
# Partition evolution - change partition field
spark.sql("""
CALL hudi_change_partition_column(
table => 'hudi.customers',
partition_col => 'region',
new_partition_col => 'country',
params => map(
'hoodie_PARTITION_evolutION_parallelism', '100'
)
)
""")
# Show table properties
spark.sql("DESCRIBE EXTENDED hudi.customers").show(truncate=False)
Performance Metrics
| Metric | COW Table | MOR Table | Optimized COW | Optimized MOR |
|---|---|---|---|---|
| Upsert Latency (1K records) | 12-15 seconds | 2-4 seconds | 8-10 seconds | 1-2 seconds |
| Upsert Throughput | 500-800 records/sec | 2000-5000 records/sec | 800-1200 records/sec | 5000-10000 records/sec |
| Read Latency (point query) | 50-100 ms | 150-300 ms (with merge) | 30-60 ms | 100-200 ms |
| Read Latency (full scan) | 10-20 seconds | 15-30 seconds | 8-15 seconds | 12-25 seconds |
| Storage Overhead | ~10-15% | ~5-8% | ~8-12% | ~4-6% |
| File Count | Lower (compacted) | Higher (logs) | Lowest | Moderate |
| Compaction Cost | N/A | CPU-intensive | N/A | Moderate |
| Concurrent Writers | 2-3x slower | 1.5x slower | 1.5x slower | 1.2x slower |
| Time Travel Latency | < 1 second | < 2 seconds | < 1 second | < 1.5 seconds |
| Incremental Query | Supported | Supported | Faster | Faster |
Best Practices
- Choose COW for read-heavy workloads where write latency is acceptable and read performance is critical
- Choose MOR for write-heavy workloads where near-real-time ingestion is required and read latency can be traded
- Configure
hoodie.compaction.target.ioto balance compaction aggressiveness with write performance - Use
hoodie.datasource.write.operation=bulk_insertfor initial loads to avoid unnecessary deduplication overhead - Enable
hoodie.cleaner.policy=KEEP_LATEST_COMMITSwith appropriatehoodie.cleaner.commits.retainedfor time travel requirements - Tune
hoodie.logfile.max.sizeto control log file splitting and compaction frequency for MOR tables - Use bucket indexing (
hoodie.index.type=BUCKET) for high-volume upsert workloads with uniform key distribution - Schedule clustering for COW tables to consolidate small files and improve scan performance
- Monitor the timeline using
SHOW COMMITSto identify compaction lag and write latency issues - Use
hoodie.datasource.write.recordkey.fieldas a unique identifier to enable efficient upserts - Configure
hoodie.parquet.block.sizeto match your typical file size requirements (default 128MB) - Enable
hoodie.metadata.enable=truefor large datasets to enable efficient metadata queries and faster compaction
Mathematical Foundation Summary
Hudi's timeline-based architecture provides O(1) incremental processing by tracking only changes since the last commit. The Record Index bloom filter achieves O(1) lookup with configurable false positive rate p = (1 - e^(-kn/m))^k where m is filter size, n is records, k is hash functions. COW tables trade write amplification (WA = f(number of base files)) for read performance, while MOR tables trade read amplification (RA = 1 + log files to merge) for write throughput. Compaction cost follows C = O(base_files × log_files) and should be scheduled based on read_latency SLA.
See also: Apache Iceberg Integration (21), Delta Lake Operations (22), Change Data Capture (36), Data Lakehouse Architecture (34)
See Also
- Iceberg Integration — Apache Iceberg table format
- Delta Lake — Delta Lake ACID transactions
- Merge Upsert — Merge and upsert operations
- CDC Patterns — Change data capture patterns