PySpark Advanced Interview Series
Module 01: SparkSession & Configuration Mastery
Interview Question
"At Google, we process petabytes of data daily. Walk us through how you would create and configure a production-grade SparkSession for a data pipeline that ingests 2TB of raw logs, transforms them, and writes to a data lake. What configuration parameters are critical, how does lazy evaluation affect debugging, and what happens when you call an action?" β Google Data Engineering Interview
"At Amazon, we need SparkSessions that are both memory-efficient and fault-tolerant. Explain the lifecycle of a SparkSession, the difference between local and cluster modes, and how you would tune spark.sql.shuffle.partitions for a 500GB dataset with skewed keys." β Amazon Senior Data Engineer Interview
SparkSession: The Gateway to Spark
SparkSession is the unified entry point introduced in Spark 2.0, replacing the separate SparkContext, SQLContext, and HiveContext. Every Spark application must create exactly one SparkSession.
Basic Creation
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("ProductionPipeline") \
.master("yarn") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.executor.memory", "8g") \
.config("spark.driver.memory", "4g") \
.getOrCreate()
Why a Single SparkSession Matters
Each SparkSession creates one SparkContext. Creating multiple sessions leads to multiple JVMs, duplicate memory overhead, and resource contention. In cluster mode, this can consume excessive cluster resources and cause job failures.
βΉοΈGoogle Interview Insight
At Google's scale, engineers use a factory pattern to ensure only one SparkSession exists per JVM. The singleton pattern prevents resource leaks:
class SparkSessionFactory:
_instance = None
@classmethod
def get_instance(cls, app_name="default"):
if cls._instance is None:
cls._instance = SparkSession.builder \
.appName(app_name) \
.config("spark.sql.shuffle.partitions", "auto") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
return cls._instance
Configuration Deep Dive
Critical Configuration Parameters
| Parameter | Default | Recommended | Purpose |
|---|---|---|---|
spark.sql.shuffle.partitions | 200 | 100-1000 based on data | Number of partitions after shuffle operations |
spark.executor.memory | 1g | 4-16g | Memory per executor |
spark.executor.cores | 1 | 4-5 | Cores per executor |
spark.driver.memory | 1g | 2-8g | Driver memory |
spark.sql.adaptive.enabled | false | true | Adaptive query execution |
spark.serializer | Java | Kryo | Serialization format |
spark.sql.autoBroadcastJoinThreshold | 10MB | 50-100MB | Broadcast join threshold |
Adaptive Query Execution (AQE) β Spark 3.0+
AQE dynamically optimizes query plans at runtime based on actual data statistics:
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
π‘Amazon Pro Tip
Enable AQE in production. It automatically coalesces small partitions after shuffles, handles data skew in joins, and optimizes sort-merge joins to broadcast joins when one side becomes small. This alone can reduce job runtime by 30-50%.
Lazy Evaluation: The Double-Ed Sword
How Lazy Evaluation Works
Spark builds a Directed Acyclic Graph (DAG) of transformations but doesn't execute until an action is called. This enables optimizations like predicate pushdown and column pruning.
# These are just transformations β NO execution happens
df = spark.read.parquet("/data/logs") # Line 1
df = df.filter(df.date == "2024-01-01") # Line 2
df = df.select("user_id", "event_type") # Line 3
df = df.groupBy("user_id").count() # Line 4
# THIS triggers execution
df.show() # Action β Spark plans and executes the entire DAG
The Debugging Challenge
Lazy evaluation means errors don't surface until actions. A typo in column name on line 2 won't fail until line 5:
# This will NOT fail immediately
df = spark.read.parquet("/data/logs")
df = df.filter(df.nonexistent_column > 100) # ERROR NOT RAISED YET
# Fails here
df.collect() # AnalysisException: Column 'nonexistent_column' does not exist
β οΈCommon Pitfall
Always call df.show(5) or df.printSchema() after each transformation during development. This catches schema errors early. In production, rely on integration tests.
Execution Plan Inspection
# View the physical plan
df.explain(True)
# Output shows:
# == Parsed Logical Plan ==
# == Analyzed Logical Plan ==
# == Optimized Logical Plan ==
# == Physical Plan ==
Real-World Scenario: Google-Scale Log Processing
Problem Statement
Process 2TB of daily web server logs, extract user sessions, compute session-level metrics, and write partitioned by date to BigQuery-compatible Parquet.
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col, window, count, sum as _sum,
first, last, lag, when, lit
)
from pyspark.sql.types import StructType, StructField, StringType, LongType
# Production SparkSession configuration
spark = SparkSession.builder \
.appName("WebLogSessionAnalytics") \
.config("spark.sql.shuffle.partitions", "500") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.skewJoin.enabled", "true") \
.config("spark.executor.memory", "12g") \
.config("spark.executor.cores", "4") \
.config("spark.executor.instances", "100") \
.config("spark.driver.memory", "8g") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.parquet.compression.codec", "snappy") \
.config("spark.sql.sources.partitionOverwriteMode", "dynamic") \
.config("spark.sql.autoBroadcastJoinThreshold", str(50 * 1024 * 1024)) \
.getOrCreate()
# Define schema for raw logs
log_schema = StructType([
StructField("timestamp", LongType(), False),
StructField("user_id", StringType(), True),
StructField("event_type", StringType(), False),
StructField("url", StringType(), False),
StructField("ip_address", StringType(), False),
StructField("user_agent", StringType(), True)
])
# Read with schema enforcement
raw_logs = spark.read \
.schema(log_schema) \
.parquet("s3a://data-lake/raw/web-logs/2024/01/*")
# Add session window (30-minute inactivity = new session)
from pyspark.sql.functions import session_window
sessionized = raw_logs \
.withColumn("event_time", col("timestamp").cast("timestamp")) \
.withColumn(
"session_window",
session_window(col("event_time"), "30 minutes")
)
# Session-level aggregations
session_metrics = sessionized \
.groupBy("user_id", "session_window") \
.agg(
count("*").alias("page_views"),
_sum(when(col("event_type") == "purchase", 1).otherwise(0)).alias("purchases"),
first("url").alias("landing_page"),
last("url").alias("exit_page")
) \
.withColumn(
"conversion_rate",
when(col("page_views") > 0, col("purchases") / col("page_views")).otherwise(0)
)
# Write partitioned output
session_metrics \
.repartition(200, "user_id") \
.write \
.mode("overwrite") \
.partitionBy("session_window.start") \
.parquet("s3a://data-lake/processed/session-metrics/")
spark.stop()
Edge Cases and Production Considerations
1. SparkSession Recovery After Failure
try:
spark = SparkSession.builder \
.appName("ResilientJob") \
.getOrCreate()
except Exception as e:
# Fallback configuration
spark = SparkSession.builder \
.appName("ResilientJob_Fallback") \
.master("local[*]") \
.getOrCreate()
2. Dynamic Configuration at Runtime
# Change config without restarting session
spark.conf.set("spark.sql.shuffle.partitions", "100")
# Read current config
current_partitions = spark.conf.get("spark.sql.shuffle.partitions")
print(f"Current shuffle partitions: {current_partitions}")
3. Memory Configuration Formula
Total Executor Memory = spark.executor.memory
+ spark.executor.memoryOverhead
+ spark.memory.fraction Γ spark.executor.memory
βΉοΈAmazon Interview Gotcha
At Amazon, interviewers expect you to know that spark.executor.memoryOverhead defaults to max(384MB, 0.10 Γ spark.executor.memory) in YARN. For a 16GB executor, overhead is 1.6GB, so total container request is 17.6GB. Not accounting for this causes YARN to kill containers.
Best Practices Checklist
π‘Production Checklist
- Always set
spark.app.namefor monitoring and debugging - Use
spark.sql.adaptive.enabled=truein Spark 3.0+ - Set
spark.sql.shuffle.partitionsbased on data size (rule: 200MB per partition) - Enable Kryo serialization for 2-10x performance improvement
- Use
spark.sql.autoBroadcastJoinThresholdto control broadcast joins - Call
spark.stop()at application end to release resources - Never create multiple SparkSessions in the same application
- Use
spark.sql.sources.partitionOverwriteMode=dynamicfor partition overwrites
Performance Analysis
| Metric | Poor Config | Optimized Config | Improvement |
|---|---|---|---|
| Job Runtime (2TB) | 4.5 hours | 1.8 hours | 60% faster |
| Shuffle Spill | 120GB | 8GB | 93% reduction |
| Memory Usage | OOM errors | Stable | No failures |
| Partition Count | 200 (fixed) | Auto (adaptive) | Balanced |
Additional Concepts
SparkSession vs SparkContext
# SparkSession wraps SparkContext
spark_context = spark.sparkContext
# SparkContext for RDD operations
rdd = spark_context.parallelize([1, 2, 3, 4, 5])
# SparkSession for DataFrame/SQL operations
df = spark.createDataFrame([(1,), (2,), (3,)], ["value"])
Hive Support
# Enable Hive support for metastore integration
spark = SparkSession.builder \
.enableHiveSupport() \
.config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
.getOrCreate()
# Now you can use HiveQL
spark.sql("SHOW DATABASES")
spark.sql("USE my_database")
spark.sql("SELECT * FROM my_table WHERE date = '2024-01-01'")
Iceberg and Delta Lake Integration
# For Delta Lake
spark = SparkSession.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# For Iceberg
spark = SparkSession.builder \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.glue", "org.apache.iceberg.spark.SparkCatalog") \
.getOrCreate()
Summary
SparkSession is the foundation of every PySpark application. Mastering its configuration, understanding lazy evaluation mechanics, and knowing how to tune parameters for your data size separates senior data engineers from juniors. At companies like Google and Amazon, the difference between a well-configured and poorly-configured SparkSession can mean hours of runtime and thousands of dollars in compute costs.