🧊 Apache Iceberg Integration in PySpark
Iceberg Metadata Layers
Architecture Diagram
Detailed Explanation
What is Apache Iceberg?
Apache Iceberg is an open table format designed for huge analytic datasets. It was developed at Netflix to solve the fundamental limitations of Hive tables.
Key Takeaway: Iceberg brings SQL table reliability to big data while enabling concurrent access from Spark, Trino, Flink, and Hive.
Metadata Layer Architecture
Iceberg's core innovation lies in its metadata layer, which provides a complete snapshot of the table at any point in time.
| Layer | Purpose |
|---|---|
| Catalog | Stores the pointer to the current snapshot |
| Metadata File | Contains table-level settings and manifest file list |
| Manifest Files | List data files and their partition values |
- Each commit creates a new immutable snapshot
- The layered approach enables efficient query planning without full table scans
Time Travel Queries
Iceberg implements time travel through snapshot IDs or timestamps.
How it works:
- User queries with
TIMESTAMP AS OForVERSION AS OF - Engine resolves the closest snapshot to the requested time
- Only data files referenced by that snapshot are read
Important: This reads the exact bytes that constituted the table at that moment, including deleted rows marked inactive in newer snapshots.
Partition Evolution
Unlike Hive, where changing partitions requires rewriting the entire table, Iceberg supports partition evolution without data rewrite.
- Add new partition specs without affecting existing data
- Old data remains in its original partition spec
- New data uses the new spec
- Engine automatically handles both specs during query execution
Hidden Partitioning
Iceberg's hidden partitioning eliminates user error in query filtering.
- Users never need to specify partition columns in queries
- Engine automatically translates predicates into partition filters
- Uses the table's partition spec transform functions
- Eliminates the anti-pattern of filtering on non-partitioned columns
Mathematical Foundations
Definition: Iceberg Snapshot
An Iceberg snapshot is an immutable, point-in-time representation of table state, defined as a tuple where is the set of manifest files, is the set of data files referenced by those manifests, and is the commit timestamp. Snapshots form a directed acyclic graph (DAG) where each node points to its parent snapshot.
Time Travel Resolution
Given a query timestamp , the resolved snapshot is:
This finds the latest snapshot committed no later than the requested time.
Snapshot Isolation Theorem
Under Iceberg's snapshot isolation model, any read operation sees a consistent view of the table as of snapshot , regardless of concurrent write operations. This is guaranteed because:
Snapshot Retention Cost
Storage cost for retained snapshots with average data file size :
where is the number of manifest files and is average manifest size.
Partition Pruning Effectiveness
With partition pruning, scanned data reduces from total table size to:
where is the number of partition dimensions.
Key Insight
Iceberg's hidden partitioning eliminates the user-error of filtering on non-partitioned columns. The engine automatically translates predicates into partition filters using the partition spec's transform functions.
Summary
Iceberg snapshots enable O(1) time travel resolution, snapshot isolation eliminates read-write conflicts, and partition pruning achieves multiplicative data reduction across partition dimensions. These properties make Iceberg suitable for concurrent analytical workloads at petabyte scale.
Key Concepts Table
| Concept | Description | Performance Impact |
|---|---|---|
| Snapshot | Immutable point-in-time view of the table | Enables time travel with zero copy overhead |
| Manifest File | Lists data files with partition values and column stats | Enables partition pruning and file-level filtering |
| Partition Spec | Defines how data is partitioned (can evolve over time) | Supports partition evolution without rewrite |
| Catalog | Stores current snapshot pointer and table metadata | Atomic commits via catalog-level locking |
| Snapshot ID | Unique identifier for each table commit | Enables precise time travel by snapshot ID |
| Sequence Number | Monotonically increasing ID per snapshot | Resolves ordering in concurrent writes |
| Equality Delete | Marks rows as deleted by column values | Soft delete without rewriting data files |
| Position Delete | Marks rows as deleted by file position | Enables row-level updates in merge operations |
Code Examples
Setting Up Iceberg with Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Initialize Spark with Iceberg support
spark = SparkSession.builder \
.appName("IcebergIntegration") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.iceberg.type", "hadoop") \
.config("spark.sql.catalog.iceberg.warehouse", "hdfs://cluster/iceberg-warehouse") \
.getOrCreate()
# Create an Iceberg table with partitioning
spark.sql("""
CREATE TABLE iceberg.sales (
transaction_id STRING,
customer_id STRING,
product_id STRING,
quantity INT,
price DECIMAL(10, 2),
transaction_date DATE,
region STRING
)
USING iceberg
PARTITIONED BY (days(transaction_date), region)
TBLPROPERTIES (
'format-version' = '2',
'write.parquet.compression-codec' = 'zstd',
'write.distribution-mode' = 'hash'
)
""")
# Insert sample data
sales_data = [
("TXN001", "CUST001", "PROD001", 5, 29.99, "2024-01-15", "North"),
("TXN002", "CUST002", "PROD002", 3, 49.99, "2024-01-16", "South"),
("TXN003", "CUST001", "PROD003", 10, 19.99, "2024-01-17", "East"),
("TXN004", "CUST003", "PROD001", 2, 29.99, "2024-01-18", "West"),
("TXN005", "CUST004", "PROD004", 7, 39.99, "2024-01-19", "North"),
]
df = spark.createDataFrame(sales_data, [
"transaction_id", "customer_id", "product_id",
"quantity", "price", "transaction_date", "region"
])
df.writeTo("iceberg.sales").append()
# Second batch - creates new snapshot
sales_data_2 = [
("TXN006", "CUST005", "PROD001", 4, 29.99, "2024-01-20", "South"),
("TXN007", "CUST006", "PROD002", 1, 49.99, "2024-01-21", "East"),
]
df2 = spark.createDataFrame(sales_data_2, [
"transaction_id", "customer_id", "product_id",
"quantity", "price", "transaction_date", "region"
])
df2.writeTo("iceberg.sales").append()
Time Travel Queries
# Query by timestamp
df_current = spark.sql("""
SELECT * FROM iceberg.sales
WHERE transaction_date >= '2024-01-18'
""")
df_historical = spark.sql("""
SELECT * FROM iceberg.sales
TIMESTAMP AS OF '2024-01-16 12:00:00'
""")
# Query by snapshot ID
snapshots = spark.sql("SELECT * FROM iceberg.sales.history").collect()
first_snapshot_id = snapshots[0]['snapshot_id']
df_at_snapshot = spark.sql(f"""
SELECT * FROM iceberg.sales VERSION AS OF {first_snapshot_id}
""")
# Compare data between snapshots
df_diff = spark.sql("""
SELECT t1.transaction_id, t1.quantity as old_qty, t2.quantity as new_qty
FROM (
SELECT * FROM iceberg.sales VERSION AS OF {first_snapshot_id}
) t1
INNER JOIN iceberg.sales t2
ON t1.transaction_id = t2.transaction_id
WHERE t1.quantity != t2.quantity
""")
Schema Evolution and Partition Evolution
# Add a new column (schema evolution)
spark.sql("""
ALTER TABLE iceberg.sales ADD COLUMNS (
discount DECIMAL(5, 2) AFTER price,
payment_method STRING AFTER region
)
""")
# Drop a column
spark.sql("ALTER TABLE iceberg.sales DROP COLUMN payment_method")
# Rename a column
spark.sql("ALTER TABLE iceberg.sales RENAME COLUMN discount TO discount_pct")
# Partition evolution - change partitioning without rewriting data
spark.sql("""
ALTER TABLE iceberg.sales DROP PARTITION FIELD days(transaction_date)
""")
spark.sql("""
ALTER TABLE iceberg.sales ADD PARTITION FIELD bucket(16, customer_id)
""")
# New data will use new partition spec
new_data = [
("TXN008", "CUST007", "PROD005", 6, 59.99, None, "2024-01-22", "North"),
]
df_new = spark.createDataFrame(new_data, [
"transaction_id", "customer_id", "product_id",
"quantity", "price", "transaction_date", "region"
])
df_new.writeTo("iceberg.sales").append()
Snapshot Management and Cleanup
# View all snapshots
spark.sql("SELECT * FROM iceberg.sales.snapshots").show(truncate=False)
# View snapshot history with operation details
spark.sql("""
SELECT
committed_at,
snapshot_id,
operation,
summary
FROM iceberg.sales.snapshots
ORDER BY committed_at
""").show(truncate=False)
# Expire old snapshots (keep last 3)
spark.sql("""
CALL iceberg.system.expire_snapshots(
table => 'iceberg.sales',
retain_last => 3
)
""")
# Manually compact small files
spark.sql("""
CALL iceberg.system.rewrite_data_files(
table => 'iceberg.sales',
options => map(
'min-input-files', '5',
'max-concurrent-file-group-rewrites', '9',
'rewrite-all', 'true'
)
)
""")
# Rewrite manifests for better pruning
spark.sql("""
CALL iceberg.system.rewrite_manifests(
table => 'iceberg.sales'
)
""")
Performance Metrics
| Metric | Hive Tables | Iceberg Tables | Improvement |
|---|---|---|---|
| Time Travel Query Latency | Full table scan required | Snapshot pointer + manifest scan | 10-50x faster |
| Schema Evolution | Requires table rebuild | Metadata-only operation | Instant (milliseconds) |
| Partition Evolution | Requires INSERT OVERWRITE | Metadata-only operation | No data rewrite needed |
| Concurrent Write Conflicts | High (partition-level locking) | Low (snapshot-level conflicts) | 5-10x fewer conflicts |
| Small File Compaction | Manual process (INSERT OVERWRITE) | Built-in rewrite_data_files | Automated compaction |
| Data Skipping (Partition Pruning) | Column-level only | Partition + column statistics | 2-3x better pruning |
| Metadata Overhead | Minimal | ~1KB per manifest file | Negligible for most workloads |
| Storage Efficiency | No built-in dedup | Equality/position deletes | Better update semantics |
Best Practices
- Always use Z-ordered columns for frequently filtered columns beyond partition keys to enable data skipping via manifest file column statistics
- Enable format-version=2 to unlock equality deletes and row-level updates required for MERGE operations
- Schedule regular compaction using
rewrite_data_filesto merge small files and improve read performance - Set
write.distribution-mode=hashfor write-heavy tables to ensure even data distribution across partitions - Use
rewrite_manifestsperiodically to consolidate small manifests and improve pruning efficiency - Never hardcode snapshot IDs in production queries—use timestamp-based time travel for reproducibility
- Configure
snapshot.target.max-file-size-bytesto control when snapshots trigger file splitting - Leverage partition evolution when query patterns change, rather than rewriting the entire table
- Monitor snapshot count and set
snapshot.retention.max-snapshot-ageto prevent unbounded growth - Use
call metastore.partition()to inspect partition metadata and validate partition pruning is working correctly - Implement a data retention policy with automated expire_snapshots calls to balance storage costs and time travel requirements
- Always validate schema before writes using
spark.sql("DESCRIBE EXTENDED table")to catch column type mismatches early
Mathematical Foundation Summary
Iceberg's metadata architecture uses a tree structure where snapshots point to manifest lists, which point to manifests, which point to data files. This indirection enables O(log n) metadata operations through column-level statistics pruning. The formula Effective Files Scanned = Total Files × (1 - Pruning Ratio) demonstrates that with proper partitioning and Z-ordering, Iceberg can reduce I/O by 80-95% through predicate pushdown. Time travel uses snapshot isolation with append-only metadata, ensuring reads are never blocked by writes while maintaining ACID guarantees through the snapshot lineage chain.
See also: Delta Lake Operations (22), Apache Hudi Operations (23), Data Lakehouse Architecture (34), Change Data Capture (36)
See Also
- Delta Lake — Delta Lake ACID transactions and time travel
- Hudi Operations — Apache Hudi for incremental processing
- Data Lakehouse — Lakehouse architecture patterns
- Data Lake Architecture — Data lake design and implementation