π PySpark Serialization and Kryo
DfSerialization
Serialization is the process of converting an object's state into a byte stream for storage or network transfer. In Spark, serialization is critical for shuffling data between executors and for caching RDDs/DataFrames.
DfKryo Serialization
Kryo serialization is a fast, compact binary serialization library that is 10x faster and 5x more compact than Java serialization. It requires class registration and cannot handle all Java types (e.g., no support for ClosureCleaner).
Here,
- =Serialized size using Kryo
- =Serialized size using Java serialization
- =Typical compression factor (5xβ10x improvement)
Serialization Time Formula
Here,
- =Number of objects to serialize
- =Time to traverse the object graph
- =Time to encode each object to bytes
- =Buffer allocation and I/O time
Shuffle Serialization Impact
Here,
- =Total shuffle time per partition
- =Serialization time at map side
- =Network transfer time
- =Deserialization time at reduce side
Java serialization traverses the entire object graph and writes class descriptors for each object. Kryo uses pre-registered class IDs (1βbyte) instead of full class names, dramatically reducing both size and traversal time.
Always register classes with Kryo: conf.registerKryoClass([MyClass1, MyClass2]). Unregistered classes use class names (full path) which defeats Kryo's size advantage and can cause java.io.StreamCorruptedException.
ThKryo Serialization Efficiency
Theorem: Kryo achieves serialization speedup of β₯ 10x and size reduction of β₯ 5x compared to Java serialization for typical Spark workloads. The speedup comes from avoiding object graph traversal and using compact class ID encoding.
- Kryo: 10x faster, 5x smaller than Java serialization; requires class registration
- Java serialization: slower, larger, but handles all Java types
- Always set
spark.serializertoKryoSerializerfor production workloads - Register custom classes to avoid full class name serialization overhead
- Kryo buffer: default 64KB initial, 64MB max; tune for large objects
Java vs Kryo Serialization
ποΈ Architecture Diagram
π Detailed Explanation
1. Why Serialization Matters
Serialization is the process of converting objects into a byte format for storage or transmission. In Spark, serialization is critical for:
Shuffle Operations:
- Data must be serialized before network transfer
- Faster serialization = faster shuffles
- Smaller serialized size = less network I/O
Caching/Persistence:
- Cached DataFrames are serialized
- Serialization affects cache size and access speed
- Memory efficiency depends on serialization format
Data Transfer:
- Driver-executor communication
- Inter-executor communication
- External storage writes
2. Java Serialization
Java serialization is the default in Spark.
Characteristics:
- Universal compatibility
- Verbose output (includes class metadata)
- Slow compared to alternatives
- High memory overhead
When to Use:
- Simple applications
- When compatibility is more important than performance
- When using only built-in Spark classes
Configuration:
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
3. Kryo Serialization
Kryo is a high-performance serialization library for Java.
Characteristics:
- 2-10x smaller output than Java
- 2-10x faster serialization/deserialization
- Requires class registration for best performance
- Not all Java classes supported out of the box
When to Use:
- Performance-critical applications
- Large datasets with many shuffle operations
- When memory efficiency is important
Configuration:
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
4. Kryo Registration
Class registration maps classes to integer IDs for compact serialization.
Why Register:
- Without registration, Kryo writes full class name (like Java)
- With registration, Kryo writes integer ID (much smaller)
- Registration enables custom serializers for specific classes
How to Register:
# Register classes
conf.registerKryoClasses([MyClass1, MyClass2, MyClass3])
# Or use KryoRegistrator
conf.set("spark.kryo.registrator", "com.example.MyRegistrator")
Built-in Registrations: Spark registers many common classes automatically:
java.lang.Stringjava.util.ArrayListscala.collection.immutable.List- All Spark internal classes
5. Kryo Buffer Configuration
Kryo uses buffers for serialization/deserialization.
| Setting | Default | Description |
|---|---|---|
spark.kryo.buffer | 64KB | Initial buffer size |
spark.kryo.buffer.max | 64MB | Maximum buffer size |
spark.kryo.compress | false | Enable compression |
When to Adjust:
- Large objects: Increase max buffer
- Many small objects: Decrease initial buffer
- Limited memory: Enable compression
6. Serialization in Shuffle
During shuffle, Spark serializes partition data.
Write Phase:
- Partition data by key
- Serialize each partition using configured serializer
- Write serialized bytes to shuffle files
Read Phase:
- Fetch shuffle files from mappers
- Deserialize bytes using configured serializer
- Merge and process data
Performance Impact:
- Serialization is on the critical path
- Faster serialization = faster shuffles
- Smaller output = less network I/O
7. Serialization in Caching
When caching DataFrames, serialization format matters.
| Storage Level | Format | Speed | Memory |
|---|---|---|---|
| MEMORY_ONLY | Deserialized | Fastest | Highest |
| MEMORY_ONLY_SER | Serialized (Kryo or Java) | Slower | Lower |
| MEMORY_AND_DISK | Memory: Deserialized; Disk: Serialized | Mixed | Mixed |
8. Custom Serializers
For complex classes, custom serializers can optimize serialization.
Implementing Custom Serializer:
from com.esotericsoftware.kryo import Serializer
from com.esotericsoftware.kryo.io import Input, Output
class MyClassSerializer(Serializer):
def write(self, kryo, output, obj):
output.writeInt(obj.id)
output.writeString(obj.name)
def read(self, kryo, input, cls):
obj = cls()
obj.id = input.readInt()
obj.name = input.readString()
return obj
Registering Custom Serializer:
conf.registerKryoClasses([MyClass])
kryo.register(MyClass, MyClassSerializer())
9. Common Serialization Issues
| Issue | Cause | Solution |
|---|---|---|
| ClassNotFoundException | Class not registered with Kryo | Register the class |
| SerializationException | Class not serializable | Implement Serializable or use Kryo |
| Buffer overflow | Object too large for buffer | Increase buffer size |
Issue 1: ClassNotFoundException
# Solution: Register the class
conf.registerKryoClasses([MyClass])
Issue 2: SerializationException
# Solution: Implement Serializable or use Kryo serializer
class MyClass(Serializable):
pass
Issue 3: Buffer overflow
# Solution: Increase buffer size
conf.set("spark.kryo.buffer.max", "128m")
10. Performance Tuning
Tuning Checklist:
- Enable Kryo serialization
- Register all custom classes
- Enable registration required for debugging
- Tune buffer sizes based on object sizes
- Consider compression for large datasets
- Monitor serialization metrics in Spark UI
Best Practice: Always set
spark.serializertoKryoSerializerfor production workloads β it's 10x faster and 5x smaller than Java serialization.
π Key Concepts Table
| Aspect | Java Serialization | Kryo Serialization | Improvement |
|---|---|---|---|
| Output Size | 1024 bytes | 128 bytes | 8x smaller |
| Speed | 100 MB/s | 800 MB/s | 8x faster |
| Memory | 3x object | 1.5x object | 2x less |
| Compatibility | Universal | Requires registration | - |
| Compression | GZIP | LZ4, Snappy | 2-4x |
| Registration | Not needed | Recommended | - |
| Configuration | Default | spark.serializer | - |
π» Code Examples
Example 1: Basic Kryo Configuration
from pyspark import SparkConf, SparkContext
# Configure Kryo serialization
conf = SparkConf()
conf.setMaster("local[*]")
conf.setAppName("KryoExample")
# Enable Kryo serializer
# Parameter: spark.serializer β serializer class name
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
# Register classes
# Parameter: list of classes to register for compact ID-based serialization
from pyspark.sql.types import *
conf.registerKryoClasses([
StructType,
StructField,
StringType,
IntegerType,
DoubleType
])
# Create SparkContext
sc = SparkContext(conf=conf)
# Test serialization
data = [(i, f"item_{i}", i * 1.0) for i in range(100000)]
rdd = sc.parallelize(data, 10)
# Operations will use Kryo for serialization
result = rdd.map(lambda x: (x[0] % 10, x[2])).reduceByKey(lambda a, b: a + b)
print(result.collect())
Example 2: Kryo with Custom Classes
from pyspark import SparkConf, SparkContext
from dataclasses import dataclass
from typing import List
# Custom class
@dataclass
class Person:
id: int
name: str
age: int
scores: List[float]
# Configure Kryo
conf = SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
# Parameter: spark.kryo.registrationRequired β fail on unregistered classes
conf.set("spark.kryo.registrationRequired", "true")
# Register custom class
conf.registerKryoClasses([Person])
# Create SparkContext
sc = SparkContext(conf=conf)
# Create RDD of custom objects
persons = [
Person(i, f"person_{i}", 20 + i % 50, [float(i) * 0.1])
for i in range(100000)
]
rdd = sc.parallelize(persons, 10)
# Process with Kryo serialization
result = rdd.filter(lambda p: p.age > 30).map(lambda p: (p.age, 1)).reduceByKey(lambda a, b: a + b)
print(result.collect())
Example 3: Serialization Benchmark
import time
from pyspark import SparkConf, SparkContext
def benchmark_serializer(serializer_name, num_records=1000000):
conf = SparkConf()
conf.setMaster("local[*]")
conf.setAppName(f"Benchmark-{serializer_name}")
conf.set("spark.serializer", serializer_name)
sc = SparkContext(conf=conf)
# Create test data
data = [(i, f"item_{i % 1000}", float(i) * 0.1) for i in range(num_records)]
rdd = sc.parallelize(data, 20)
# Benchmark serialization (shuffle operation)
start = time.time()
result = rdd.map(lambda x: (x[0] % 100, x[2])).reduceByKey(lambda a, b: a + b)
result.count() # Trigger execution
elapsed = time.time() - start
sc.stop()
return elapsed
# Benchmark both serializers
java_time = benchmark_serializer("org.apache.spark.serializer.JavaSerializer")
kryo_time = benchmark_serializer("org.apache.spark.serializer.KryoSerializer")
print(f"Java Serialization: {java_time:.2f}s")
print(f"Kryo Serialization: {kryo_time:.2f}s")
print(f"Speedup: {java_time / kryo_time:.1f}x")
Example 4: Kryo with Compression
from pyspark import SparkConf, SparkContext
# Configure Kryo with compression
conf = SparkConf()
conf.setMaster("local[*]")
conf.setAppName("KryoCompression")
# Enable Kryo
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
# Enable compression
# Parameter: spark.kryo.compress β enable compression
conf.set("spark.kryo.compress", "true")
# Parameter: spark.kryo.compress.codec β compression codec (lz4, snappy, zlib)
conf.set("spark.kryo.compress.codec", "lz4")
# Buffer configuration
# Parameter: spark.kryo.buffer β initial buffer size
conf.set("spark.kryo.buffer", "64k")
# Parameter: spark.kryo.buffer.max β maximum buffer size
conf.set("spark.kryo.buffer.max", "64m")
sc = SparkContext(conf=conf)
# Test with large data
data = [(i, "x" * 1000) for i in range(100000)] # Large strings
rdd = sc.parallelize(data, 10)
# Shuffle with compression
result = rdd.map(lambda x: (x[0] % 10, x[1])).groupByKey()
result.count()
sc.stop()
π Performance Metrics
| Metric | Java | Kryo | Kryo+Compression | Improvement |
|---|---|---|---|---|
| Shuffle Size (1GB) | 1000MB | 150MB | 80MB | 12.5x |
| Shuffle Time | 45s | 12s | 15s | 3x |
| Cache Size (1GB) | 3000MB | 500MB | 250MB | 12x |
| GC Time | 500ms | 150ms | 100ms | 5x |
| Serialization Speed | 100MB/s | 800MB/s | 600MB/s | 6x |
| Deserialization Speed | 80MB/s | 700MB/s | 500MB/s | 6x |
| Memory Overhead | 3x | 1.5x | 1.2x | 2.5x |
| CPU Usage | High | Medium | Low | 2x |
β Best Practices
1. Enable Kryo for Performance
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
2. Register Custom Classes
conf.registerKryoClasses([MyClass1, MyClass2, MyClass3])
3. Enable Registration Required for Debugging
conf.set("spark.kryo.registrationRequired", "true")
# Catches unregistered classes early
4. Tune Buffer Sizes
# For large objects
conf.set("spark.kryo.buffer.max", "128m")
# For many small objects
conf.set("spark.kryo.buffer", "32k")
5. Use Compression for Large Datasets
conf.set("spark.kryo.compress", "true")
conf.set("spark.kryo.compress.codec", "lz4")
6. Monitor Serialization Metrics
# Check Spark UI for:
# - Shuffle Write Size
# - Shuffle Read Time
# - Serialization Time
# - GC Time
7. Test Serialization
# Verify classes are serializable
from pyspark.serializers import MarshalSerializer
try:
sc._jvm.org.apache.spark.serializer.JavaSerializer().newInstance().serialize(obj)
print("Object is serializable")
except Exception as e:
print(f"Serialization error: {e}")
See Also
- 09-udf-optimization.mdx: Serialization overhead in UDF data transfer
- 08-caching-persistence.mdx: Serialization levels for cached data
- 18-gc-tuning.mdx: GC impact from serialized objects
- Kafka Streams (kafka/03): Serialization in Kafka message processing
- Data Engineering Streaming (data-engineering/022): Serialization optimization in streaming pipelines