🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Apache Iceberg Integration in PySpark

🟢 Free Lesson

Advertisement

🧊 Apache Iceberg Integration in PySpark

Iceberg Metadata Layers

Iceberg Metadata Layer ArchitectureCatalog (Hive/HDFS)Metadata Files (snapshots)Manifest ListManifest ListParquet Data FilesParquet Data Files

Architecture Diagram

Detailed Explanation

What is Apache Iceberg?

Apache Iceberg is an open table format designed for huge analytic datasets. It was developed at Netflix to solve the fundamental limitations of Hive tables.

Key Takeaway: Iceberg brings SQL table reliability to big data while enabling concurrent access from Spark, Trino, Flink, and Hive.


Metadata Layer Architecture

Iceberg's core innovation lies in its metadata layer, which provides a complete snapshot of the table at any point in time.

LayerPurpose
CatalogStores the pointer to the current snapshot
Metadata FileContains table-level settings and manifest file list
Manifest FilesList data files and their partition values
  • Each commit creates a new immutable snapshot
  • The layered approach enables efficient query planning without full table scans

Time Travel Queries

Iceberg implements time travel through snapshot IDs or timestamps.

How it works:

  1. User queries with TIMESTAMP AS OF or VERSION AS OF
  2. Engine resolves the closest snapshot to the requested time
  3. Only data files referenced by that snapshot are read

Important: This reads the exact bytes that constituted the table at that moment, including deleted rows marked inactive in newer snapshots.


Partition Evolution

Unlike Hive, where changing partitions requires rewriting the entire table, Iceberg supports partition evolution without data rewrite.

  • Add new partition specs without affecting existing data
  • Old data remains in its original partition spec
  • New data uses the new spec
  • Engine automatically handles both specs during query execution

Hidden Partitioning

Iceberg's hidden partitioning eliminates user error in query filtering.

  • Users never need to specify partition columns in queries
  • Engine automatically translates predicates into partition filters
  • Uses the table's partition spec transform functions
  • Eliminates the anti-pattern of filtering on non-partitioned columns

Mathematical Foundations

Definition: Iceberg Snapshot

An Iceberg snapshot SiS_i is an immutable, point-in-time representation of table state, defined as a tuple Si=(Mi,Fi,ti)S_i = (M_i, F_i, t_i) where MiM_i is the set of manifest files, FiF_i is the set of data files referenced by those manifests, and tit_i is the commit timestamp. Snapshots form a directed acyclic graph (DAG) where each node points to its parent snapshot.

Time Travel Resolution

Given a query timestamp tqt_q, the resolved snapshot is:

S=argminSi:titqtqtiS^* = \arg\min_{S_i : t_i \leq t_q} |t_q - t_i|

This finds the latest snapshot committed no later than the requested time.

Snapshot Isolation Theorem

Under Iceberg's snapshot isolation model, any read operation R(Si)R(S_i) sees a consistent view of the table as of snapshot SiS_i, regardless of concurrent write operations. This is guaranteed because:

 concurrent writes Wj:SjSi    R(Si)Wj=\forall\ \text{concurrent writes } W_j: S_j \neq S_i \implies R(S_i) \cap W_j = \emptyset

Snapshot Retention Cost

Storage cost for nn retained snapshots with average data file size fˉ\bar{f}:

Cretention=n×M×mˉ+i=1nFiFi1×fˉC_{\text{retention}} = n \times |M| \times \bar{m} + \sum_{i=1}^{n} |F_i \setminus F_{i-1}| \times \bar{f}

where M|M| is the number of manifest files and mˉ\bar{m} is average manifest size.

Partition Pruning Effectiveness

With partition pruning, scanned data reduces from total table size TT to:

Tpruned=T×k=1dmatching partitionskall partitionskT_{\text{pruned}} = T \times \prod_{k=1}^{d} \frac{|\text{matching partitions}_k|}{|\text{all partitions}_k|}

where dd is the number of partition dimensions.

Key Insight

Iceberg's hidden partitioning eliminates the user-error of filtering on non-partitioned columns. The engine automatically translates predicates into partition filters using the partition spec's transform functions.

Summary

Iceberg snapshots enable O(1) time travel resolution, snapshot isolation eliminates read-write conflicts, and partition pruning achieves multiplicative data reduction across partition dimensions. These properties make Iceberg suitable for concurrent analytical workloads at petabyte scale.

Key Concepts Table

ConceptDescriptionPerformance Impact
SnapshotImmutable point-in-time view of the tableEnables time travel with zero copy overhead
Manifest FileLists data files with partition values and column statsEnables partition pruning and file-level filtering
Partition SpecDefines how data is partitioned (can evolve over time)Supports partition evolution without rewrite
CatalogStores current snapshot pointer and table metadataAtomic commits via catalog-level locking
Snapshot IDUnique identifier for each table commitEnables precise time travel by snapshot ID
Sequence NumberMonotonically increasing ID per snapshotResolves ordering in concurrent writes
Equality DeleteMarks rows as deleted by column valuesSoft delete without rewriting data files
Position DeleteMarks rows as deleted by file positionEnables row-level updates in merge operations

Code Examples

Setting Up Iceberg with Spark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Initialize Spark with Iceberg support
spark = SparkSession.builder \
    .appName("IcebergIntegration") \
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.type", "hadoop") \
    .config("spark.sql.catalog.iceberg.warehouse", "hdfs://cluster/iceberg-warehouse") \
    .getOrCreate()

# Create an Iceberg table with partitioning
spark.sql("""
    CREATE TABLE iceberg.sales (
        transaction_id STRING,
        customer_id STRING,
        product_id STRING,
        quantity INT,
        price DECIMAL(10, 2),
        transaction_date DATE,
        region STRING
    )
    USING iceberg
    PARTITIONED BY (days(transaction_date), region)
    TBLPROPERTIES (
        'format-version' = '2',
        'write.parquet.compression-codec' = 'zstd',
        'write.distribution-mode' = 'hash'
    )
""")

# Insert sample data
sales_data = [
    ("TXN001", "CUST001", "PROD001", 5, 29.99, "2024-01-15", "North"),
    ("TXN002", "CUST002", "PROD002", 3, 49.99, "2024-01-16", "South"),
    ("TXN003", "CUST001", "PROD003", 10, 19.99, "2024-01-17", "East"),
    ("TXN004", "CUST003", "PROD001", 2, 29.99, "2024-01-18", "West"),
    ("TXN005", "CUST004", "PROD004", 7, 39.99, "2024-01-19", "North"),
]

df = spark.createDataFrame(sales_data, [
    "transaction_id", "customer_id", "product_id",
    "quantity", "price", "transaction_date", "region"
])

df.writeTo("iceberg.sales").append()

# Second batch - creates new snapshot
sales_data_2 = [
    ("TXN006", "CUST005", "PROD001", 4, 29.99, "2024-01-20", "South"),
    ("TXN007", "CUST006", "PROD002", 1, 49.99, "2024-01-21", "East"),
]

df2 = spark.createDataFrame(sales_data_2, [
    "transaction_id", "customer_id", "product_id",
    "quantity", "price", "transaction_date", "region"
])
df2.writeTo("iceberg.sales").append()

Time Travel Queries

# Query by timestamp
df_current = spark.sql("""
    SELECT * FROM iceberg.sales
    WHERE transaction_date >= '2024-01-18'
""")

df_historical = spark.sql("""
    SELECT * FROM iceberg.sales
    TIMESTAMP AS OF '2024-01-16 12:00:00'
""")

# Query by snapshot ID
snapshots = spark.sql("SELECT * FROM iceberg.sales.history").collect()
first_snapshot_id = snapshots[0]['snapshot_id']

df_at_snapshot = spark.sql(f"""
    SELECT * FROM iceberg.sales VERSION AS OF {first_snapshot_id}
""")

# Compare data between snapshots
df_diff = spark.sql("""
    SELECT t1.transaction_id, t1.quantity as old_qty, t2.quantity as new_qty
    FROM (
        SELECT * FROM iceberg.sales VERSION AS OF {first_snapshot_id}
    ) t1
    INNER JOIN iceberg.sales t2
    ON t1.transaction_id = t2.transaction_id
    WHERE t1.quantity != t2.quantity
""")

Schema Evolution and Partition Evolution

# Add a new column (schema evolution)
spark.sql("""
    ALTER TABLE iceberg.sales ADD COLUMNS (
        discount DECIMAL(5, 2) AFTER price,
        payment_method STRING AFTER region
    )
""")

# Drop a column
spark.sql("ALTER TABLE iceberg.sales DROP COLUMN payment_method")

# Rename a column
spark.sql("ALTER TABLE iceberg.sales RENAME COLUMN discount TO discount_pct")

# Partition evolution - change partitioning without rewriting data
spark.sql("""
    ALTER TABLE iceberg.sales DROP PARTITION FIELD days(transaction_date)
""")
spark.sql("""
    ALTER TABLE iceberg.sales ADD PARTITION FIELD bucket(16, customer_id)
""")

# New data will use new partition spec
new_data = [
    ("TXN008", "CUST007", "PROD005", 6, 59.99, None, "2024-01-22", "North"),
]
df_new = spark.createDataFrame(new_data, [
    "transaction_id", "customer_id", "product_id",
    "quantity", "price", "transaction_date", "region"
])
df_new.writeTo("iceberg.sales").append()

Snapshot Management and Cleanup

# View all snapshots
spark.sql("SELECT * FROM iceberg.sales.snapshots").show(truncate=False)

# View snapshot history with operation details
spark.sql("""
    SELECT
        committed_at,
        snapshot_id,
        operation,
        summary
    FROM iceberg.sales.snapshots
    ORDER BY committed_at
""").show(truncate=False)

# Expire old snapshots (keep last 3)
spark.sql("""
    CALL iceberg.system.expire_snapshots(
        table => 'iceberg.sales',
        retain_last => 3
    )
""")

# Manually compact small files
spark.sql("""
    CALL iceberg.system.rewrite_data_files(
        table => 'iceberg.sales',
        options => map(
            'min-input-files', '5',
            'max-concurrent-file-group-rewrites', '9',
            'rewrite-all', 'true'
        )
    )
""")

# Rewrite manifests for better pruning
spark.sql("""
    CALL iceberg.system.rewrite_manifests(
        table => 'iceberg.sales'
    )
""")

Performance Metrics

MetricHive TablesIceberg TablesImprovement
Time Travel Query LatencyFull table scan requiredSnapshot pointer + manifest scan10-50x faster
Schema EvolutionRequires table rebuildMetadata-only operationInstant (milliseconds)
Partition EvolutionRequires INSERT OVERWRITEMetadata-only operationNo data rewrite needed
Concurrent Write ConflictsHigh (partition-level locking)Low (snapshot-level conflicts)5-10x fewer conflicts
Small File CompactionManual process (INSERT OVERWRITE)Built-in rewrite_data_filesAutomated compaction
Data Skipping (Partition Pruning)Column-level onlyPartition + column statistics2-3x better pruning
Metadata OverheadMinimal~1KB per manifest fileNegligible for most workloads
Storage EfficiencyNo built-in dedupEquality/position deletesBetter update semantics

Best Practices

  1. Always use Z-ordered columns for frequently filtered columns beyond partition keys to enable data skipping via manifest file column statistics
  2. Enable format-version=2 to unlock equality deletes and row-level updates required for MERGE operations
  3. Schedule regular compaction using rewrite_data_files to merge small files and improve read performance
  4. Set write.distribution-mode=hash for write-heavy tables to ensure even data distribution across partitions
  5. Use rewrite_manifests periodically to consolidate small manifests and improve pruning efficiency
  6. Never hardcode snapshot IDs in production queries—use timestamp-based time travel for reproducibility
  7. Configure snapshot.target.max-file-size-bytes to control when snapshots trigger file splitting
  8. Leverage partition evolution when query patterns change, rather than rewriting the entire table
  9. Monitor snapshot count and set snapshot.retention.max-snapshot-age to prevent unbounded growth
  10. Use call metastore.partition() to inspect partition metadata and validate partition pruning is working correctly
  11. Implement a data retention policy with automated expire_snapshots calls to balance storage costs and time travel requirements
  12. Always validate schema before writes using spark.sql("DESCRIBE EXTENDED table") to catch column type mismatches early

Mathematical Foundation Summary

Iceberg's metadata architecture uses a tree structure where snapshots point to manifest lists, which point to manifests, which point to data files. This indirection enables O(log n) metadata operations through column-level statistics pruning. The formula Effective Files Scanned = Total Files × (1 - Pruning Ratio) demonstrates that with proper partitioning and Z-ordering, Iceberg can reduce I/O by 80-95% through predicate pushdown. Time travel uses snapshot isolation with append-only metadata, ensuring reads are never blocked by writes while maintaining ACID guarantees through the snapshot lineage chain.

See also: Delta Lake Operations (22), Apache Hudi Operations (23), Data Lakehouse Architecture (34), Change Data Capture (36)

See Also

Premium Content

Apache Iceberg Integration in PySpark

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
💼Interview Prep
📜Certificates
🤝Community Access

Already a member? Log in

Need Expert PySpark Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement