Apache Spark: Unified Analytics Engine for Big Data
Apache Spark is a unified analytics engine for large-scale data processing, providing APIs in Python (PySpark), Scala, Java, and R.
Why Spark Dominates Big Data Processing
Key Innovation:
- Resilient Distributed Dataset (RDD) β immutable, partitioned, parallel collection
- DataFrame/Dataset API with Catalyst query optimizer
- SQL-like optimizations for arbitrary Python code
Spark vs MapReduce Advantages:
- In-memory computation β eliminates disk I/O between stages
- DAG execution engine β optimizes the physical plan
- Catalyst optimizer β rewrites queries for performance
- Unified API β batch, streaming, SQL, ML, and graph processing
Performance Comparison:
| Feature | Apache Spark | MapReduce |
|---|---|---|
| Processing Model | In-memory | Disk-based (HDFS) |
| Speed | 10-100x faster | Baseline |
| Ease of Use | High (Python, Scala, SQL) | Low (Java only) |
| Iterative Processing | Excellent (cache) | Poor (re-read disk) |
| Real-Time | Yes (Structured Streaming) | No |
| SQL Support | Spark SQL | Hive |
| ML Library | MLlib | Mahout |
| Fault Tolerance | Lineage-based | Replication-based |
Key Insight: For typical workloads, Spark is 10-100x faster than MapReduce due to in-memory processing and optimized shuffles. |
Spark Application Architecture
Architecture Diagram
An RDD is an immutable, partitioned collection of elements that can be operated on in parallel. RDDs are the fundamental abstraction in Spark, providing: (1) Partitioning β data is split across cluster nodes, (2) Transformations β lazy operations that build a lineage graph (DAG), (3) Actions β operations that trigger computation and return results. RDDs are fault-tolerant via lineage: if a partition is lost, Spark recomputes it from the parent RDD using the recorded transformations.
A DataFrame is a distributed collection of rows organized into named columns, equivalent to a table in a relational database or a DataFrame in pandas. DataFrames are built on top of RDDs but include schema information and are processed by the Catalyst optimizer. The DataFrame API provides lazy evaluation, push-down predicates, and columnar storage optimizations. DataFrames are the recommended abstraction for most Spark workloads.
Catalyst is Spark's extensible query optimizer that converts logical query plans into optimized physical execution plans. Catalyst operates in four phases: (1) Analysis β resolve references using the catalog, (2) Logical Optimization β apply rule-based optimizations (predicate pushdown, constant folding, column pruning), (3) Physical Planning β generate one or more physical plans and select the optimal one based on cost model, (4) Code Generation β use Tungsten's whole-stage code generation to produce optimized JVM bytecode.
A shuffle is the process of redistributing data across partitions, typically triggered by operations like groupBy(), join(), repartition(), and distinct(). Shuffles are the most expensive operation in Spark because they require data exchange across the network, disk I/O for spill files, and serialization/deserialization. Shuffle performance is critical: poorly designed pipelines can spend 80%+ of execution time in shuffles.
Spark Partition Count
The optimal number of partitions for a Spark job is: P = max(E / S, C), where E is the total data size, S is the target partition size (typically 128MB-256MB), and C is the total number of cores across all executors. For example, with 100GB of data and 256MB target partition size: P = 100GB / 256MB = 400 partitions.
Spark Shuffle Cost
Shuffle cost = Data_shuffled * (Serialization_cost + Network_cost + Deserialization_cost + Disk_cost). For a join between two DataFrames of sizes A and B with partition count P: Shuffle_volume = min(A, B) * (1 + Replication_factor). Reducing shuffle volume by filtering early is the single most impactful optimization.
Spark recovers lost RDD partitions by replaying the lineage (DAG of transformations) from a checkpoint or source data. For a lineage of length L transformations, recovery time is: T_recovery = L * T_per_transform. To bound recovery time, checkpoint long lineages to persistent storage (HDFS, S3) every N transformations (typical N = 10-20). The checkpoint breaks the lineage and reduces recovery cost.
Catalyst's predicate pushdown optimization pushes filter conditions as close to the data source as possible, reducing the volume of data processed in subsequent stages. Formally: if query Q = Ο_pred(R), Catalyst rewrites Q as Ο_pred(R_filtered) where R_filtered is the result of applying pred at the source level. For columnar sources (Parquet, ORC), this eliminates reading entire columns. Reduction factor = 1 / (selectivity of predicate).
Key Concepts
| Concept | Description | API |
|---|---|---|
| RDD | Immutable distributed collection | sc.parallelize(data), rdd.map(f) |
| DataFrame | Distributed table with schema | spark.read.parquet(path) |
| Dataset | Type-safe DataFrame (Scala/Java only) | ds.map(f) |
| Transformation | Lazy operation building DAG | .map(), .filter(), .join(), .groupBy() |
| Action | Triggers computation | .count(), .collect(), .save(), .show() |
| Partition | Unit of parallelism | df.repartition(n), df.coalesce(n) |
| Shuffle | Data redistribution across partitions | Triggered by groupBy, join, repartition |
| Broadcast | Send small DataFrame to all executors | F.broadcast(small_df) |
| Accumulator | Write-only shared variable | sc.accumulator(0) |
| Broadcast Variable | Read-only shared variable | sc.broadcast(variable) |
| Cache/Persist | Store DataFrame in memory/disk | .cache(), .persist(StorageLevel.MEMORY_AND_DISK) |
| Checkpoint | Write lineage to durable storage | .checkpoint() |
| SparkSession | Entry point for Spark operations | SparkSession.builder.appName("app").getOrCreate() |
| Catalog | Metadata store for tables, databases | spark.catalog.listDatabases() |
| UDF | User-defined function | @udf(returnType=StringType()) |
| Pandas UDF | Vectorized UDF using Pandas | @pandas_udf(IntegerType()) |
| Schema | StructType defining column types | StructType([StructField("col", StringType())]) |
- Cache frequently accessed DataFrames: Use
.cache()or.persist()for DataFrames reused across multiple actions. Monitor storage via the Spark UI. - Broadcast small DataFrames for joins: Use
F.broadcast(small_df)to avoid shuffling the large side of a join. Threshold:spark.sql.autoBroadcastJoinThreshold(default 10MB). - Filter early: Push filters before joins and aggregations to reduce data volume before expensive operations.
- Partition appropriately: Repartition by join/group keys to ensure co-partitioning. Use
repartition(N, "key")for shuffle-based repartitioning. - Avoid UDFs when possible: Use built-in Spark SQL functions (100+ available) instead of Python UDFs, which have serialization overhead.
- Use Pandas UDFs when custom logic is required:
@pandas_udfenables vectorized execution with Arrow, 3-100x faster than row-at-a-time UDFs. - Tune shuffle partitions: Set
spark.sql.shuffle.partitions(default 200) based on data volume. Target 128MB-256MB per partition after shuffle. - Monitor the Spark UI: Identify stages with high shuffle read/write, data skew, and task stragglers.
- Use AQE (Adaptive Query Execution): Enable
spark.sql.adaptive.enabled=truefor runtime optimization of shuffle partitions and join strategies. - Checkpoint long lineages: For iterative algorithms (ML), checkpoint RDDs to break lineage and prevent stack overflow.
Production Code
Optimized DataFrame ETL Pipeline
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
from pyspark.sql.window import Window
import logging
logger = logging.getLogger(__name__)
def create_spark_session(app_name: str = "ETL-Pipeline") -> SparkSession:
"""Create a production-optimized SparkSession."""
return (
SparkSession.builder
.appName(app_name)
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.sql.adaptive.skewJoin.enabled", "true")
.config("spark.sql.autoBroadcastJoinThreshold", str(10 * 1024 * 1024)) # 10MB
.config("spark.sql.shuffle.partitions", "200")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.parquet.compression.codec", "snappy")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.config("spark.dynamicAllocation.enabled", "true")
.config("spark.dynamicAllocation.minExecutors", "2")
.config("spark.dynamicAllocation.maxExecutors", "50")
.config("spark.sql.execution.arrow.pyspark.enabled", "true")
.getOrCreate()
)
def run_optimized_etl(spark: SparkSession, input_path: str, output_path: str):
"""Run an optimized ETL pipeline with best practices."""
# Read source data with schema enforcement
schema = StructType([
StructField("transaction_id", StringType(), False),
StructField("customer_id", StringType(), False),
StructField("product_id", StringType(), False),
StructField("amount", DoubleType(), True),
StructField("quantity", IntegerType(), True),
StructField("event_date", StringType(), False),
])
raw_df = (
spark.read
.schema(schema)
.parquet(input_path)
.repartition(F.col("customer_id")) # Partition by join key
)
# Cache for reuse across multiple transformations
raw_df.cache()
logger.info(f"Raw record count: {raw_df.count()}")
# Filter early to reduce data volume before joins
filtered_df = (
raw_df
.filter(F.col("amount").isNotNull())
.filter(F.col("amount") > 0)
.filter(F.col("event_date") >= "2024-01-01")
)
# Broadcast join with small dimension table
customer_df = (
spark.read
.parquet("s3://data-lake/dimensions/customers")
.select("customer_id", "name", "tier", "segment")
)
enriched_df = filtered_df.join(
F.broadcast(customer_df), on="customer_id", how="left"
)
# Windowed aggregation without shuffle (if partitioned by customer_id)
window_spec = Window.partitionBy("customer_id").orderBy("event_date")
result_df = (
enriched_df
.withColumn(
"running_total",
F.sum("amount").over(window_spec),
)
.withColumn(
"rank_in_segment",
F.row_number().over(
Window.partitionBy("segment").orderBy(F.desc("amount"))
),
)
)
# Write with dynamic partition overwrite
(
result_df
.write
.mode("overwrite")
.partitionBy("event_date")
.parquet(output_path)
)
raw_df.unpersist()
logger.info(f"ETL completed. Output: {output_path}")
if __name__ == "__main__":
spark = create_spark_session("production-etl")
run_optimized_etl(spark, "s3://raw/transactions", "s3://curated/transactions_enriched")
spark.stop()
Custom UDF and Pandas UDF Comparison
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, pandas_udf, col
from pyspark.sql.types import StringType, DoubleType
import pandas as pd
import numpy as np
from typing import Iterator
spark = create_spark_session("udf-benchmark")
# Row-at-a-time Python UDF (slow - serialization overhead)
@udf(returnType=StringType())
def categorize_amount_slow(amount: float) -> str:
"""Categorize transaction amount (row-at-a-time UDF)."""
if amount is None:
return "unknown"
elif amount < 10:
return "micro"
elif amount < 100:
return "small"
elif amount < 1000:
return "medium"
else:
return "large"
# Vectorized Pandas UDF (fast - Arrow serialization)
@pandas_udf(StringType())
def categorize_amount_fast(amounts: pd.Series) -> pd.Series:
"""Categorize transaction amount (vectorized Pandas UDF)."""
def _categorize(a):
if pd.isna(a):
return "unknown"
elif a < 10:
return "micro"
elif a < 100:
return "small"
elif a < 1000:
return "medium"
else:
return "large"
return amounts.apply(_categorize)
# Grouped Map Pandas UDF for complex transformations
@pandas_udf(
schema="transaction_id string, customer_id string, amount double, z_score double",
functionType=pandas_udf.GROUPED_MAP,
)
def compute_z_score(pdf: pd.DataFrame) -> pd.DataFrame:
"""Compute Z-score within each customer group."""
pdf["z_score"] = (pdf["amount"] - pdf["amount"].mean()) / pdf["amount"].std()
return pdf
# Benchmark: Compare UDF performance
df = spark.read.parquet("s3://raw/transactions")
# Slow: Row-at-a-time UDF
result_slow = df.withColumn("category", categorize_amount_slow(col("amount")))
# Fast: Vectorized Pandas UDF
result_fast = df.withColumn("category", categorize_amount_fast(col("amount")))
# Grouped transformation
result_zscore = df.groupBy("customer_id").apply(compute_z_score)
# Performance comparison (via Spark UI or time measurement)
import time
start = time.time()
result_slow.write.mode("overwrite").parquet("/tmp/output_slow")
slow_time = time.time() - start
start = time.time()
result_fast.write.mode("overwrite").parquet("/tmp/output_fast")
fast_time = time.time() - start
print(f"Row UDF time: {slow_time:.2f}s")
print(f"Pandas UDF time: {fast_time:.2f}s")
print(f"Speedup: {slow_time / fast_time:.1f}x")
Data Skew: When one partition has significantly more data than others (e.g., a popular product ID), tasks processing that partition become stragglers. Mitigate with: (1) Salting β add random prefix to skewed keys to distribute across partitions, (2) AQE skew join β automatically detects and splits skewed partitions at runtime, (3) Broadcast join β eliminate shuffle for small-table joins.
Spark Memory Model: Each executor has: (1) Reserved memory (300MB), (2) User memory (40%) for UDFs and data structures, (3) Execution memory (40%) for shuffles, joins, sorts, (4) Storage memory (20%) for cached data. Execution and Storage share a unified pool (60% total) and can borrow from each other. Configure with spark.memory.fraction and spark.memory.storageFraction.
- RDDs are the low-level abstraction; DataFrames are the high-level, optimized abstraction. Prefer DataFrames for most workloads.
- Catalyst optimizer rewrites queries through analysis, logical optimization, physical planning, and code generation.
- Shuffles are the most expensive operation. Reduce shuffle volume by filtering early, broadcasting small tables, and co-partitioning data.
- Use Pandas UDFs (vectorized) instead of Python UDFs (row-at-a-time) for 3-100x performance improvement.
- Enable AQE (
spark.sql.adaptive.enabled=true) for automatic shuffle partition optimization and skew handling. - Cache DataFrames reused across multiple actions. Unpersist when no longer needed to free executor memory.
- Monitor the Spark UI for shuffle skew, task stragglers, and GC overhead. Target < 10% GC time.
Best Practices
- Prefer DataFrame/Dataset API over RDDs β the Catalyst optimizer provides significant performance improvements with minimal developer effort.
- Broadcast small DataFrames (< 10MB) for joins to avoid expensive shuffles on the large side.
- Filter early β push predicates as close to the source as possible to minimize data processed in subsequent stages.
- Use Pandas UDFs instead of Python UDFs when custom logic is required. Vectorized execution with Arrow provides 3-100x speedup.
- Enable AQE (
spark.sql.adaptive.enabled=true) for runtime optimization of shuffle partitions, join strategies, and skew handling. - Cache frequently reused DataFrames with
.cache()or.persist(). Monitor cache usage in the Spark UI. - Set
spark.sql.shuffle.partitionsbased on data volume. Target 128MB-256MB per partition after shuffle. - Checkpoint long lineages (every 10-20 transformations) to prevent stack overflow and reduce recovery time.
- Use dynamic partition overwrite (
spark.sql.sources.partitionOverwriteMode=dynamic) to avoid deleting unrelated partitions. - Monitor executor GC time in the Spark UI. If > 10%, increase executor memory or reduce cache usage.
Spark Configuration Quick Reference
| Configuration | Default | Recommended | Impact |
|---|---|---|---|
spark.sql.shuffle.partitions | 200 | 50-500 | More partitions = more parallelism |
spark.sql.autoBroadcastJoinThreshold | 10MB | 10-50MB | Larger = more broadcast joins |
spark.sql.adaptive.enabled | false | true | Runtime optimization |
spark.executor.memory | 1g | 4-16g | Cache and shuffle memory |
spark.executor.cores | 1 | 4-8 | Tasks per executor |
spark.driver.memory | 1g | 2-8g | Driver-side operations |
spark.sql.execution.arrow.pyspark.enabled | false | true | Vectorized UDFs |
spark.dynamicAllocation.enabled | false | true | Auto-scale executors |
See Also
- 022 - Spark Structured Streaming - Streaming extensions of Spark
- 021 - Apache Spark: RDDs, DataFrames, and the Catalyst Optimizer - This lesson
- 016 - ETL vs ELT - Spark in ETL pipelines
- 023 - Batch vs Streaming - Spark batch vs streaming
- 026 - Data Pipeline Testing - Testing Spark transformations