PySpark Advanced Interview Series
Module 14: File Formats β Choosing the Right Storage
Interview Question
"At Uber, we store petabytes of ride data. Walk us through the trade-offs between Parquet, ORC, Avro, and Delta Lake. When would you choose each, and how do compression codecs affect query performance?" β Uber Data Engineer Interview
"At Google, we optimize storage costs while maintaining query performance. Explain how columnar storage works, the impact of file sizing on performance, and how you would optimize a Parquet dataset for both analytics and point lookups." β Google Senior Data Engineer Interview
File Format Comparison
| Format | Storage | Compression | Schema Evolution | Columnar | Row-based | Best For |
|---|---|---|---|---|---|---|
| Parquet | Columnar | Excellent | Yes | Yes | No | Analytics, Hadoop |
| ORC | Columnar | Excellent | Yes | Yes | No | Hive, ACID transactions |
| Avro | Row-based | Good | Yes | No | Yes | Streaming, write-heavy |
| Delta Lake | Parquet+Log | Excellent | Yes | Yes | No | ACID, time travel |
| JSON | Row-based | Poor | Yes | No | No | APIs, semi-structured |
| CSV | Row-based | Poor | No | No | No | Simple exports |
| Avro | Row-based | Good | Yes | No | Yes | Schema evolution |
Parquet Deep Dive
Columnar Storage
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FileFormatInterview").getOrCreate()
# Create sample data
data = [(i, f"user_{i}", i * 100, "2024-01-01") for i in range(1000000)]
df = spark.createDataFrame(data, ["id", "name", "value", "date"])
# Write as Parquet
df.write \
.mode("overwrite") \
.option("compression", "snappy") \
.parquet("s3a://bucket/data-parquet/")
# Write as CSV for comparison
df.write \
.mode("overwrite") \
.option("header", "true") \
.csv("s3a://bucket/data-csv/")
# Check file sizes
parquet_size = spark.read.parquet("s3a://bucket/data-parquet/").count()
csv_size = spark.read.csv("s3a://bucket/data-csv/", header=True).count()
Parquet Configuration
# Write with optimal settings
df.write \
.mode("overwrite") \
.option("compression", "snappy") \
.option("parquet.block.size", 128 * 1024 * 1024) # 128MB row group
.option("parquet.page.size", 1 * 1024 * 1024) # 1MB page
.option("parquet.dictionary.enabled", "true") \
.option("parquet.dictionary.page.size", 1 * 1024 * 1024) \
.option("parquet.enable.dictionary", "true") \
.parquet("s3a://bucket/data-optimized/")
# Read with predicate pushdown
result = spark.read.parquet("s3a://bucket/data-optimized/") \
.filter(col("id") > 500000) \
.select("id", "name")
# Only reads relevant row groups (predicate pushdown)
Parquet Compression Codecs
| Codec | Compression | Speed | Splittable | Use Case |
|---|---|---|---|---|
| Snappy | Good | Fast | Yes | Default, balanced |
| Gzip | Better | Slow | No | Archival, small files |
| LZ4 | Good | Fastest | Yes | Speed-critical |
| Zstd | Best | Medium | Yes | Best compression |
| None | None | Fastest | Yes | Already compressed |
# Write with different codecs
df.write.option("compression", "snappy").parquet("s3a://bucket/snappy/")
df.write.option("compression", "gzip").parquet("s3a://bucket/gzip/")
df.write.option("compression", "zstd").parquet("s3a://bucket/zstd/")
df.write.option("compression", "lz4").parquet("s3a://bucket/lz4/")
ORC Format
ORC Features
# ORC is optimized for Hive workloads
df.write \
.mode("overwrite") \
.option("orc.compress", "zlib") \
.option("orc.stripe.size", "67108864") # 64MB stripe
.option("orc.row.index.stride", "10000") \
.orc("s3a://bucket/data-orc/")
# ORC with ACID support (Hive 3.0+)
spark.sql("""
CREATE TABLE orc_table (
id INT,
name STRING,
value INT
)
STORED AS ORC
TBLPROPERTIES ('transactional' = 'true')
""")
ORC vs Parquet
| Feature | ORC | Parquet |
|---|---|---|
| ACID Support | Native (Hive 3.0+) | Via Delta Lake |
| Index | Row-level bloom filter | Min/max statistics |
| Compression | Zlib (better) | Snappy (faster) |
| Ecosystem | Hive-native | Broader support |
| Performance | Better for Hive | Better for Spark |
Avro Format
Row-Based Storage
# Avro is row-based β better for write-heavy workloads
df.write \
.mode("overwrite") \
.avro("s3a://bucket/data-avro/")
# Read Avro
avro_df = spark.read.avro("s3a://bucket/data-avro/")
# Schema evolution with Avro
# Avro stores schema with data, enabling evolution
Avro Use Cases
# 1. Streaming (Kafka) β schema evolution
# 2. Write-heavy workloads
# 3. Cross-platform compatibility
# 4. When you need row-level access
Delta Lake
ACID Transactions
# Delta Lake = Parquet + Transaction Log
# Provides ACID transactions, schema enforcement, time travel
# Write Delta table
df.write \
.format("delta") \
.mode("overwrite") \
.save("s3a://bucket/data-delta/")
# ACID merge (upsert)
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "s3a://bucket/data-delta/")
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.id = source.id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Time travel
# Read previous version
previous_df = spark.read.format("delta") \
.option("versionAsOf", 0) \
.load("s3a://bucket/data-delta/")
# View history
delta_table.history().show()
Delta Lake Features
# 1. Schema evolution
delta_table = DeltaTable.forPath(spark, "s3a://bucket/data-delta/")
delta_table.addColumns([StructField("new_col", StringType(), True)])
# 2. Data skipping (Z-ordering)
spark.sql("""
OPTIMIZE delta.`s3a://bucket/data-delta/`
ZORDER BY (id, date)
""")
# 3. VACUUM (clean old versions)
spark.sql("""
VACUUM delta.`s3a://bucket/data-delta/` RETAIN 168 HOURS
""")
# 4. Change data feed
spark.read \
.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.load("s3a://bucket/data-delta/")
Real-World Scenario: Uber Storage Optimization
Problem Statement
Optimize a 10TB ride dataset for multiple access patterns: analytics queries, point lookups, and streaming ingestion. Choose the right format and configuration.
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("UberStorageOptimization") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# Read raw ride data
raw_rides = spark.read.json("s3a://uber-data/rides-raw/")
# === FORMAT SELECTION STRATEGY ===
# 1. Analytics workload (historical queries)
# Use Parquet with Snappy compression
analytics_rides = raw_rides \
.select("ride_id", "driver_id", "rider_id", "pickup_time",
"dropoff_time", "fare_amount", "status", "city")
analytics_rides.write \
.mode("overwrite") \
.partitionBy("city", "year", "month") \
.option("compression", "snappy") \
.parquet("s3a://uber-storage/rides-analytics/")
# 2. Real-time ingestion (Kafka source)
# Use Delta Lake for ACID and schema evolution
realtime_rides = raw_rides \
.withColumn("ingestion_time", current_timestamp())
realtime_rides.write \
.format("delta") \
.mode("append") \
.partitionBy("city") \
.save("s3a://uber-storage/rides-realtime/")
# 3. Point lookups (API serving)
# Use Parquet with Z-ordering for fast lookups
point_lookup_rides = raw_rides \
.select("ride_id", "rider_id", "driver_id", "status", "fare_amount")
point_lookup_rides.write \
.mode("overwrite") \
.option("compression", "zstd") \
.parquet("s3a://uber-storage/rides-pointlookup/")
# Optimize for point lookups
spark.sql("""
OPTIMIZE delta.`s3a://uber-storage/rides-realtime/`
ZORDER BY (ride_id)
""")
# === COMPRESSION COMPARISON ===
# Test different codecs
codecs = ["snappy", "gzip", "zstd", "lz4"]
for codec in codecs:
start = time.time()
analytics_rides.write \
.mode("overwrite") \
.option("compression", codec) \
.parquet(f"s3a://uber-storage/rides-{codec}/")
write_time = time.time() - start
# Measure read performance
start = time.time()
read_df = spark.read.parquet(f"s3a://uber-storage/rides-{codec}/")
read_df.filter(col("city") == "NYC").count()
read_time = time.time() - start
print(f"{codec}: Write={write_time:.2f}s, Read={read_time:.2f}s")
# === FILE SIZE OPTIMIZATION ===
# Bad: Too many small files
analytics_rides.repartition(10000) \
.write.mode("overwrite") \
.parquet("s3a://uber-storage/rides-small-files/")
# Good: Optimal file size (128-256MB)
analytics_rides.repartition(
int(analytics_rides.count() * 200 / (1024 * 1024 * 256)) # ~256MB per file
).write.mode("overwrite") \
.parquet("s3a://uber-storage/rides-optimal/")
# Best: Let Spark handle it with coalesce
analytics_rides.coalesce(100) \
.write.mode("overwrite") \
.parquet("s3a://uber-storage/rides-coalesced/")
spark.stop()
File Sizing Best Practices
# Optimal file size: 128MB-256MB (compressed)
# Too small: many small files β overhead
# Too large: fewer files β less parallelism
# Calculate optimal partition count
data_size_bytes = 10 * 1024 * 1024 * 1024 * 1024 # 10TB
target_file_size_bytes = 256 * 1024 * 1024 # 256MB
optimal_files = data_size_bytes / target_file_size_bytes
print(f"Optimal files: {optimal_files}") # ~40,000 files
# Compact small files
spark.sql("""
OPTIMIZE delta.`s3a://bucket/data/`
ZORDER BY (key_column)
""")
Edge Cases
1. Schema Evolution
# Parquet schema evolution
df.write.mode("overwrite").parquet("s3a://bucket/data/")
# Add new column
df_new = df.withColumn("new_col", lit("value"))
df_new.write.mode("append").parquet("s3a://bucket/data/")
# Will fail without schema merge
# Enable schema merge
df_new.write.mode("append") \
.option("mergeSchema", "true") \
.parquet("s3a://bucket/data/")
2. Corrupt Files
# Handle corrupt files gracefully
df = spark.read.parquet("s3a://bucket/data/") \
.option("mode", "PERMISSIVE") \
.option("columnNameOfCorruptRecord", "_corrupt_record")
# Filter out corrupt records
clean_df = df.filter(col("_corrupt_record").isNull())
3. Predicate Pushdown
# Parquet: min/max statistics enable predicate pushdown
# ORC: Bloom filters enable point lookup pushdown
# Delta: Data skipping with Z-ordering
# Verify pushdown in explain plan
df.filter(col("date") == "2024-01-01").explain(True)
# Shows PartitionFilters in the plan
Performance Comparison
| Format | Read Speed | Write Speed | Compression | Column Pruning | Predicate Pushdown |
|---|---|---|---|---|---|
| Parquet | Fast | Medium | Excellent | Yes | Yes (min/max) |
| ORC | Fast | Medium | Excellent | Yes | Yes (bloom) |
| Avro | Medium | Fast | Good | No | No |
| Delta | Fast | Medium | Excellent | Yes | Yes (Z-order) |
| CSV | Slow | Fast | Poor | No | No |
| JSON | Slow | Slow | Poor | No | No |
π‘Production Recommendation
For most Spark workloads, Parquet with Snappy compression is the default choice. Use Delta Lake when you need ACID transactions, schema evolution, or time travel. Use Avro for streaming workloads with schema evolution.
Summary
Choosing the right file format impacts query performance, storage costs, and operational complexity. Parquet is the standard for columnar analytics; Delta Lake adds ACID and time travel; Avro is best for streaming and write-heavy workloads. Understanding compression trade-offs and file sizing is critical for optimizing petabyte-scale storage at Uber and Google.