Production Spark Patterns
Difficulty: Senior Level | Companies: Databricks, Netflix, Uber, Apple, Airbnb
Production Configuration Management
Centralized configuration management is critical for maintaining consistency across environments.
Configuration as Code
from pyspark.sql import SparkSession
from dataclasses import dataclass
from typing import Dict, Optional
@dataclass
class SparkEnvironmentConfig:
"""Centralized Spark configuration"""
app_name: str
master: str
executor_memory: str = "8g"
executor_cores: int = 4
num_executors: int = 10
driver_memory: str = "4g"
dynamic_allocation: bool = True
event_logging: bool = True
def to_spark_config(self) -> Dict[str, str]:
config = {
"spark.executor.memory": self.executor_memory,
"spark.executor.cores": self.executor_cores,
"spark.driver.memory": self.driver_memory,
"spark.sql.shuffle.partitions": "200",
"spark.sql.adaptive.enabled": "true",
}
if self.dynamic_allocation:
config.update({
"spark.dynamicAllocation.enabled": "true",
"spark.dynamicAllocation.minExecutors": "2",
"spark.dynamicAllocation.maxExecutors": str(self.num_executors),
})
if self.event_logging:
config.update({
"spark.eventLog.enabled": "true",
"spark.eventLog.dir": "hdfs://logs/spark-events",
})
return config
def create_spark_session(env_config: SparkEnvironmentConfig) -> SparkSession:
builder = SparkSession.builder \
.appName(env_config.app_name) \
.master(env_config.master)
for key, value in env_config.to_spark_config().items():
builder = builder.config(key, value)
return builder.getOrCreate()
# Environment-specific configs
production_config = SparkEnvironmentConfig(
app_name="ETL-Pipeline-Production",
master="yarn",
executor_memory="16g",
executor_cores=4,
num_executors=50,
)
staging_config = SparkEnvironmentConfig(
app_name="ETL-Pipeline-Staging",
master="yarn",
executor_memory="8g",
executor_cores=2,
num_executors=5,
)
spark = create_spark_session(production_config)
βΉοΈ
Interview Insight: Production Spark applications require centralized configuration management. Use dataclasses or config files to maintain consistency across environments.
Error Handling and Resilience
from pyspark.sql import DataFrame
from functools import wraps
import logging
import time
logger = logging.getLogger(__name__)
def retry(max_retries=3, delay=10):
"""Retry decorator for Spark operations"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
logger.warning(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(delay * (attempt + 1))
raise last_exception
return wrapper
return decorator
@retry(max_retries=3, delay=30)
def safe_read_parquet(spark, path):
"""Read parquet with retry logic"""
return spark.read.parquet(path)
def process_with_checkpoint(spark, input_path, checkpoint_path):
"""Process data with checkpointing for fault tolerance"""
try:
df = safe_read_parquet(spark, input_path)
# Process in batches
result = df \
.filter(F.col("status") == "active") \
.groupBy("category") \
.agg(F.sum("amount"))
# Write checkpoint
result.write.mode("overwrite").parquet(checkpoint_path)
return result
except Exception as e:
logger.error(f"Processing failed: {e}")
# Read from checkpoint if available
try:
return spark.read.parquet(checkpoint_path)
except:
raise e
Idempotent Operations
def idempotent_write(df, output_path, mode="overwrite"):
"""Ensure writes are idempotent"""
# Write to temp location first
temp_path = f"{output_path}_temp_{int(time.time())}"
try:
df.write.mode("overwrite").parquet(temp_path)
# Atomic rename (HDFS/S3)
# On S3, use S3guard or DynamoDB for atomicity
fs = df.sparkSession._jvm.org.apache.hadoop.fs.FileSystem.get(
df.sparkSession._jsc.hadoopConfiguration()
)
output = df.sparkSession._jvm.org.apache.hadoop.fs.Path(output_path)
temp = df.sparkSession._jvm.org.apache.hadoop.fs.Path(temp_path)
if fs.exists(output):
fs.delete(output, True)
fs.rename(temp, output)
except Exception as e:
# Clean up temp on failure
try:
fs.delete(temp, True)
except:
pass
raise e
# Use for every write operation
result = df.groupBy("key").agg(F.sum("value"))
idempotent_write(result, "hdfs://output/daily-aggregation")
β οΈ
Warning: Always design write operations to be idempotent. This allows safe retries without duplicating data.
Monitoring and Alerting
from datetime import datetime
class SparkJobMonitor:
"""Monitor Spark job health and performance"""
def __init__(self, spark_context):
self.sc = spark_context
self.metrics = {}
def track_job(self, job_name):
"""Track job execution metrics"""
start_time = time.time()
start_date = datetime.now()
def on_complete(success):
duration = time.time() - start_time
self.metrics[job_name] = {
"success": success,
"duration": duration,
"start_time": start_date.isoformat(),
"end_time": datetime.now().isoformat()
}
# Send to monitoring system
self.send_metrics(job_name, self.metrics[job_name])
return on_complete
def send_metrics(self, job_name, metrics):
"""Send metrics to monitoring system"""
# Integration with Prometheus, Datadog, etc.
pass
def check_sla(self, job_name, max_duration_seconds):
"""Check if job meets SLA"""
if job_name in self.metrics:
duration = self.metrics[job_name]["duration"]
if duration > max_duration_seconds:
self.send_alert(f"Job {job_name} exceeded SLA: {duration:.1f}s > {max_duration_seconds}s")
def send_alert(self, message):
"""Send alert to operations team"""
# Integration with PagerDuty, Slack, etc.
pass
# Usage
monitor = SparkJobMonitor(spark.sparkContext)
# Track critical jobs
monitor.track_job("daily_etl")()
result = df.groupBy("key").agg(F.sum("value"))
result.count()
monitor.check_sla("daily_etl", max_duration_seconds=3600)
Data Quality Framework
class DataQualityChecker:
"""Validate data quality in Spark pipelines"""
def __init__(self, spark):
self.spark = spark
self.violations = []
def check_not_null(self, df, columns):
"""Check for null values"""
for col in columns:
null_count = df.filter(F.col(col).isNull()).count()
if null_count > 0:
self.violations.append({
"check": "not_null",
"column": col,
"violations": null_count
})
def check_unique(self, df, columns):
"""Check for uniqueness"""
total_count = df.count()
distinct_count = df.select(columns).distinct().count()
if distinct_count < total_count:
self.violations.append({
"check": "unique",
"columns": columns,
"duplicates": total_count - distinct_count
})
def check_range(self, df, column, min_val, max_val):
"""Check value range"""
out_of_range = df.filter(
(F.col(column) < min_val) | (F.col(column) > max_val)
).count()
if out_of_range > 0:
self.violations.append({
"check": "range",
"column": column,
"violations": out_of_range
})
def check_referential(self, df, fk_column, reference_df, pk_column):
"""Check referential integrity"""
orphan_count = df.join(reference_df, fk_column, "left_anti").count()
if orphan_count > 0:
self.violations.append({
"check": "referential",
"fk_column": fk_column,
"orphan_count": orphan_count
})
def report(self):
"""Generate quality report"""
if self.violations:
for v in self.violations:
print(f"VIOLATION: {v}")
return False
return True
# Usage in pipeline
checker = DataQualityChecker(spark)
checker.check_not_null(df, ["id", "user_id", "amount"])
checker.check_unique(df, ["id"])
checker.check_range(df, "amount", 0, 1000000)
if not checker.report():
raise ValueError("Data quality checks failed")
βΉοΈ
Pro Tip: Integrate data quality checks into your pipeline. Fail fast on quality issues rather than processing bad data through downstream systems.
Deployment Strategies
# Blue-Green Deployment
def blue_green_deploy(spark, new_data_path, live_path, staging_path):
"""Deploy new version with zero downtime"""
# Write to staging
new_data = spark.read.parquet(new_data_path)
new_data.write.mode("overwrite").parquet(staging_path)
# Validate staging data
checker = DataQualityChecker(spark)
checker.check_not_null(new_data, ["id"])
if not checker.report():
raise ValueError("Staging data failed quality checks")
# Atomic swap
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(
spark._jsc.hadoopConfiguration()
)
live = spark._jvm.org.apache.hadoop.fs.Path(live_path)
staging = spark._jvm.org.apache.hadoop.fs.Path(staging_path)
fs.rename(staging, live)
# Canary Deployment
def canary_deploy(spark, new_data, live_path, canary_percentage=10):
"""Deploy to subset of traffic first"""
# Process canary percentage
canary_data = new_data.sample(fraction=canary_percentage / 100)
# Write canary results
canary_path = f"{live_path}_canary"
canary_data.write.mode("overwrite").parquet(canary_path)
# Monitor canary for issues
time.sleep(300) # Wait 5 minutes
# Check for errors
# If no issues, promote to full deployment
Resource Optimization
# Right-size executors based on workload
def optimize_executor_config(spark, sample_data):
"""Recommend executor configuration based on data"""
# Estimate data size
sample_size = sample_data.cache().count()
estimated_partitions = max(200, sample_size // 100000)
# Calculate optimal executor count
executor_cores = 4
total_cores = estimated_partitions * executor_cores
num_executors = max(2, total_cores // executor_cores)
return {
"spark.executor.instances": str(num_executors),
"spark.executor.cores": str(executor_cores),
"spark.sql.shuffle.partitions": str(estimated_partitions),
}
# Apply optimizations
optimal_config = optimize_executor_config(spark, input_df)
for key, value in optimal_config.items():
spark.conf.set(key, value)
βΉοΈ
Key Takeaway: Production Spark requires comprehensive error handling, idempotent operations, monitoring, data quality validation, and careful deployment strategies. Build these patterns into your codebase from the start.
Follow-Up Questions
- How would you design a disaster recovery strategy for Spark applications?
- Explain approaches to managing Spark configuration across multiple environments.
- How do you handle schema evolution in production pipelines?
- Describe strategies for cost optimization in cloud Spark deployments.
- How would you implement CI/CD for Spark applications?