🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Apache Hudi Operations in PySpark

🟢 Free Lesson

Advertisement

🐄 Apache Hudi Operations in PySpark

COW vs MOR Table Structure

Copy-on-Write (COW)Merge-on-Read (MOR)Read Path: Parquet files only (fast reads)base_file_1.parquetbase_file_2.parquetOn UPDATE: rewrite entire Parquet fileFast reads, slow writesBest for read-heavy workloadsRead: Base + Log files (merge at read)base_file_1.parquetlog_file_1.logOn UPDATE: append to log file (fast writes)Fast writes, slower reads (compaction needed)Best for write-heavy workloads

Architecture Diagram

Detailed Explanation

What is Apache Hudi?

Apache Hudi (Hadoop Upserts Deletes and Incremental processing) is an open-source data lake platform that provides record-level upserts, incremental processing, and table services.

Key Takeaway: Hudi enables efficient update and delete operations at the record level, making it ideal for near-real-time data ingestion.


File Layout Management

Hudi's innovation is its file layout management system that surgically updates only affected files.

ComponentPurpose
Base FilesParquet format storing actual data
Log FilesAppend-only files for MOR tables
Metadata LayerTracks record locations across files
Record IndexBloom filters for efficient record lookup

COW vs MOR Table Types

AspectCopy-on-Write (COW)Merge-on-Read (MOR)
Update MechanismRewrites entire base fileAppends to log files
Write PerformanceExpensive (full rewrite)Fast (append-only)
Read PerformanceFast (no merge overhead)Slower (merge at read time)
Storage OverheadHigherLower
Ideal WorkloadRead-heavyWrite-heavy, near-real-time

Timeline Mechanism

Hudi's timeline is the central coordination mechanism tracking all operations.

  • Each commit, compaction, or cleaning creates a timestamped entry
  • Enables incremental processing—downstream consumers query only changed files
  • Different from full-refresh patterns that reprocess entire datasets

Compaction Service

Compaction is critical for maintaining MOR table read performance.

Configuration Options:

  1. Inline — Runs during writes
  2. Asynchronous — Background processing
  3. On-demand — Triggered manually

Compaction Selection Factors:

  • Log file count
  • Log file size
  • Time since last compaction

Mathematical Foundations

Definition: Hudi Timeline

The Hudi timeline T\mathcal{T} is a chronologically ordered set of actions T=[a1,a2,,an]\mathcal{T} = [a_1, a_2, \ldots, a_n] where each action ai=(oi,si,ti)a_i = (o_i, s_i, t_i) has an operation type oi{CREATE, UPSERT, DELETE, COMPACT, CLUSTER}o_i \in \{\text{CREATE, UPSERT, DELETE, COMPACT, CLUSTER}\}, state si{INFLIGHT, COMMITTED, FAILED}s_i \in \{\text{INFLIGHT, COMMITTED, FAILED}\}, and timestamp tit_i.

Copy-on-Write Merge

For a COW table, an upsert of record set UU into file group FjF_j produces:

Fj=(Fj{rFj:r.keyU.keys})UF_j' = (F_j \setminus \{r \in F_j : r.\text{key} \in U.\text{keys}\}) \cup U

The entire file is rewritten, achieving O(Fj+U)O(|F_j| + |U|) I/O.

MOR Read Consistency Theorem

A MOR read at time tt merges the base file FF with log file LL where:

Read(F,L,t)=merge(F,{lL:l.tt})\text{Read}(F, L, t) = \text{merge}(F, \{l \in L : l.t \leq t\})

This guarantees eventual consistency: as compaction merges LL into FF, read latency decreases monotonically.

Compaction Write Amplification

For MOR compaction merging base file FF with log files {L1,,Lk}\{L_1, \ldots, L_k\}:

WA=F+i=1kLiF=1+i=1kLiFW_A = \frac{|F| + \sum_{i=1}^{k} |L_i|}{|F|} = 1 + \frac{\sum_{i=1}^{k} |L_i|}{|F|}

Target: WA<2W_A < 2 to keep compaction overhead bounded.

File Grouping Efficiency

Optimal file sizing minimizes the total number of file groups:

mingj=1gFjs.t.j:SminFjSmax\min_g \sum_{j=1}^{g} |F_j| \quad \text{s.t.} \quad \forall j: S_{\min} \leq |F_j| \leq S_{\max}

Key Insight

Hudi's file group abstraction enables record-level versioning without full file rewrites. This makes MOR ideal for write-heavy workloads where COW's O(n)O(n) rewrite cost becomes prohibitive.

Summary

Hudi's timeline provides chronological ordering, COW achieves read-optimized merges at write cost, MOR optimizes writes with deferred compaction. The write amplification metric WAW_A guides compaction scheduling to balance read/write performance.

Key Concepts Table

ConceptDescriptionWhen to Use
COW TableCopy-on-Write: rewrites files on updateRead-heavy, batch workloads
MOR TableMerge-on-Read: appends logs on updateWrite-heavy, near-real-time ingestion
TimelineChronological list of all commitsIncremental processing, audit trail
File GroupSet of base files + associated log filesParallel processing unit
Record IndexBloom filter index on record keysEfficient record lookup for updates
Bucket IndexHash-based file assignmentHigh-volume upsert workloads
Global IndexIndex across all partitionsGlobal uniqueness enforcement
CompactionMerges log files into base filesMOR read performance optimization
ClusteringGroups small files into larger onesFile size optimization
CleanerRemoves old file versionsStorage reclamation

Code Examples

Setting Up Hudi with PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("HudiOperations") \
    .config("spark.jars.packages", "org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.1") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define Hudi configuration
hudi_options = {
    'hoodie.table.name': 'customers',
    'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
    'hoodie.datasource.write.recordkey.field': 'customer_id',
    'hoodie.datasource.write.precombine.field': 'update_timestamp',
    'hoodie.datasource.write.partitionpath.field': 'region',
    'hoodie.table.shuffle.parallelism': '200',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.shuffle.enable': 'true',
    'hoodie.upsert.shuffle.parallelism': '200',
    'hoodie.insert.shuffle.parallelism': '200',
}

# Create initial dataset
customers_data = [
    (1, "Alice", "North", "2024-01-15 10:00:00", "alice@email.com"),
    (2, "Bob", "South", "2024-01-15 10:00:00", "bob@email.com"),
    (3, "Charlie", "East", "2024-01-15 10:00:00", "charlie@email.com"),
    (4, "Diana", "West", "2024-01-15 10:00:00", "diana@email.com"),
]
df = spark.createDataFrame(customers_data, [
    "customer_id", "name", "region", "update_timestamp", "email"
])

df.write.format("hudi") \
    .options(**hudi_options) \
    .mode("overwrite") \
    .save("/hudi/customers")

# Upsert - update existing records
updates_data = [
    (1, "Alice Smith", "North", "2024-01-16 10:00:00", "alice.smith@email.com"),
    (5, "Eve", "South", "2024-01-16 10:00:00", "eve@email.com"),
]
updates_df = spark.createDataFrame(updates_data, [
    "customer_id", "name", "region", "update_timestamp", "email"
])

updates_df.write.format("hudi") \
    .options(**hudi_options) \
    .mode("append") \
    .save("/hudi/customers")

MOR Table with Incremental Queries

# Create MOR table for high-frequency updates
mor_options = {
    'hoodie.table.name': 'transactions',
    'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
    'hoodie.datasource.write.recordkey.field': 'txn_id',
    'hoodie.datasource.write.precombine.field': 'event_time',
    'hoodie.datasource.write.partitionpath.field': 'date',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.compaction.inline': 'false',
    'hoodie.logfile.max.size': '1073741824',
    'hoodie.logfile.to.parquet.compression.ratio': '0.5',
    'hoodie.parquet.max.file.size': '134217728',
}

# Write transactions
txn_data = [
    ("TXN001", "CUST001", "2024-01-15 08:00:00", 100.00, "2024-01-15"),
    ("TXN002", "CUST002", "2024-01-15 08:30:00", 250.00, "2024-01-15"),
    ("TXN003", "CUST001", "2024-01-15 09:00:00", 75.00, "2024-01-15"),
]
txn_df = spark.createDataFrame(txn_data, [
    "txn_id", "cust_id", "event_time", "amount", "date"
])

txn_df.write.format("hudi") \
    .options(**mor_options) \
    .mode("overwrite") \
    .save("/hudi/transactions")

# Incremental read from last commit
incremental_df = spark.read \
    .format("hudi") \
    .option("hoodie.datasource.query.type", "incremental") \
    .option("hoodie.datasource.read.begin.instanttime", "2024-01-15T08:00:00") \
    .option("hoodie.datasource.read.end.instanttime", "2024-01-15T09:00:00") \
    .load("/hudi/transactions")

incremental_df.show(truncate=False)

Compaction and Clustering

# Schedule compaction for MOR table
spark.sql("""
    CALL hudi_compact(
        table => 'hudi.transactions',
        params => map(
            'hoodie.compaction.target.io', '500000000',
            'hoodie.compaction.target.filesize', '134217728'
        )
    )
""")

# Async compaction
spark.sql("""
    CALL hudi_compact_async(
        table => 'hudi.transactions',
        params => map(
            'hoodie.compaction.target.io', '500000000'
        )
    )
""")

# Clustering for COW tables
spark.sql("""
    CALL hudi_clustering(
        table => 'hudi.customers',
        params => map(
            'hoodie.clustering.target.filemax', '10',
            'hoodie.clustering.sort.columns', 'customer_id',
            'hoodie.clustering.max.bytes.per.file', '134217728'
        )
    )
""")

# Show timeline
spark.sql("SHOW COMMITS hudi.transactions").show(truncate=False)

# Show compaction plan
spark.sql("SHOW COMPACTION hudi.transactions").show(truncate=False)

Schema Evolution and Partition Evolution

# Schema evolution
spark.sql("""
    ALTER TABLE hudi.customers ADD COLUMNS (
        phone_number STRING,
        loyalty_tier STRING DEFAULT 'Bronze'
    )
""")

# Partition evolution - change partition field
spark.sql("""
    CALL hudi_change_partition_column(
        table => 'hudi.customers',
        partition_col => 'region',
        new_partition_col => 'country',
        params => map(
            'hoodie_PARTITION_evolutION_parallelism', '100'
        )
    )
""")

# Show table properties
spark.sql("DESCRIBE EXTENDED hudi.customers").show(truncate=False)

Performance Metrics

MetricCOW TableMOR TableOptimized COWOptimized MOR
Upsert Latency (1K records)12-15 seconds2-4 seconds8-10 seconds1-2 seconds
Upsert Throughput500-800 records/sec2000-5000 records/sec800-1200 records/sec5000-10000 records/sec
Read Latency (point query)50-100 ms150-300 ms (with merge)30-60 ms100-200 ms
Read Latency (full scan)10-20 seconds15-30 seconds8-15 seconds12-25 seconds
Storage Overhead~10-15%~5-8%~8-12%~4-6%
File CountLower (compacted)Higher (logs)LowestModerate
Compaction CostN/ACPU-intensiveN/AModerate
Concurrent Writers2-3x slower1.5x slower1.5x slower1.2x slower
Time Travel Latency< 1 second< 2 seconds< 1 second< 1.5 seconds
Incremental QuerySupportedSupportedFasterFaster

Best Practices

  1. Choose COW for read-heavy workloads where write latency is acceptable and read performance is critical
  2. Choose MOR for write-heavy workloads where near-real-time ingestion is required and read latency can be traded
  3. Configure hoodie.compaction.target.io to balance compaction aggressiveness with write performance
  4. Use hoodie.datasource.write.operation=bulk_insert for initial loads to avoid unnecessary deduplication overhead
  5. Enable hoodie.cleaner.policy=KEEP_LATEST_COMMITS with appropriate hoodie.cleaner.commits.retained for time travel requirements
  6. Tune hoodie.logfile.max.size to control log file splitting and compaction frequency for MOR tables
  7. Use bucket indexing (hoodie.index.type=BUCKET) for high-volume upsert workloads with uniform key distribution
  8. Schedule clustering for COW tables to consolidate small files and improve scan performance
  9. Monitor the timeline using SHOW COMMITS to identify compaction lag and write latency issues
  10. Use hoodie.datasource.write.recordkey.field as a unique identifier to enable efficient upserts
  11. Configure hoodie.parquet.block.size to match your typical file size requirements (default 128MB)
  12. Enable hoodie.metadata.enable=true for large datasets to enable efficient metadata queries and faster compaction

Mathematical Foundation Summary

Hudi's timeline-based architecture provides O(1) incremental processing by tracking only changes since the last commit. The Record Index bloom filter achieves O(1) lookup with configurable false positive rate p = (1 - e^(-kn/m))^k where m is filter size, n is records, k is hash functions. COW tables trade write amplification (WA = f(number of base files)) for read performance, while MOR tables trade read amplification (RA = 1 + log files to merge) for write throughput. Compaction cost follows C = O(base_files × log_files) and should be scheduled based on read_latency SLA.

See also: Apache Iceberg Integration (21), Delta Lake Operations (22), Change Data Capture (36), Data Lakehouse Architecture (34)

See Also

Premium Content

Apache Hudi Operations in PySpark

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
💼Interview Prep
📜Certificates
🤝Community Access

Already a member? Log in

Need Expert PySpark Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement