Dataproc Architecture
Cloud Dataproc is a fully managed Spark and Hadoop service. It runs on Compute Engine and provides a fast, easy, and cost-efficient way to run Apache Spark, Apache Hadoop, and other data processing frameworks.
Architecture Overview
Cluster Creation and Configuration
Standard Cluster
# Create a Dataproc cluster with standard configuration
gcloud dataproc clusters create my-spark-cluster \
--region=us-central1 \
--zone=us-central1-a \
--master-machine-type=n1-standard-4 \
--master-boot-disk-size=100GB \
--num-workers=4 \
--worker-machine-type=n1-standard-4 \
--worker-boot-disk-size=200GB \
--image-version=2.1-debian12 \
--initialization-actions=gs://my-scripts/init.sh \
--properties=yarn:yarn.nodemanager.resource.memory-mb=12288,\
spark:spark.dynamicAllocation.enabled=true
# Check cluster status
gcloud dataproc clusters describe my-spark-cluster --region=us-central1
Cluster with Pre-emptible Workers
# Create cluster with pre-emptible (spot) VMs for cost savings
gcloud dataproc clusters create cost-optimized-cluster \
--region=us-central1 \
--zone=us-central1-a \
--master-machine-type=n1-standard-4 \
--num-workers=4 \
--worker-machine-type=n1-standard-4 \
--num-preemptible-workers=4 \
--preemptible-worker-boot-disk-size=100GB \
--image-version=2.1-debian12 \
--max-idle=10m \
--auto-delete \
--properties=yarn:yarn.nodemanager.resource.memory-mb=12288
Spark on Dataproc
Submitting Spark Jobs
# Submit a Spark job to Dataproc
gcloud dataproc jobs submit spark \
--cluster=my-spark-cluster \
--region=us-central1 \
--jars=gs://my-bucket/libs/my-app.jar \
--class=com.example.SparkETL \
--properties=spark.dynamicAllocation.enabled=true,\
spark.sql.shuffle.partitions=200 \
-- gs://my-data-lake/input/ gs://my-data-lake/output/
# Submit a PySpark job
gcloud dataproc jobs submit pyspark \
--cluster=my-spark-cluster \
--region=us-central1 \
gs://my-bucket/scripts/etl_job.py \
-- --input gs://my-data-lake/input/ --output gs://my-data-lake/output/
Spark Configuration
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Create Spark session with optimized configuration
spark = SparkSession.builder \
.appName("Data Engineering Pipeline") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.skewJoin.enabled", "true") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", "2") \
.config("spark.dynamicAllocation.maxExecutors", "20") \
.config("spark.executor.memory", "8g") \
.config("spark.executor.cores", "4") \
.getOrCreate()
# Read from GCS
df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("gs://my-data-lake/raw/sales/")
# Transform data
transformed_df = df \
.withColumn("date", to_date(col("date"), "yyyy-MM-dd")) \
.withColumn("revenue", col("quantity") * col("price")) \
.filter(col("revenue") > 0) \
.groupBy("date", "product_category") \
.agg(
sum("revenue").alias("total_revenue"),
count("*").alias("transaction_count"),
avg("revenue").alias("avg_revenue")
)
# Write to BigQuery
transformed_df.write \
.format("bigquery") \
.option("table", "project.dataset.sales_summary") \
.option("temporaryGcsBucket", "my-temp-bucket") \
.mode("append") \
.save()
spark.stop()
β¨
Best Practice: Use Spark dynamic allocation with adaptive query execution enabled. This allows Spark to automatically adjust resources based on workload and optimize queries at runtime. Configure spark.sql.adaptive.enabled=true for most workloads.
Autoscaling
Dataproc provides autoscaling that automatically adjusts cluster size based on workload demands.
# Create cluster with autoscaling
gcloud dataproc clusters create autoscale-cluster \
--region=us-central1 \
--zone=us-central1-a \
--master-machine-type=n1-standard-4 \
--num-workers=2 \
--worker-machine-type=n1-standard-4 \
--image-version=2.1-debian12 \
--enable-autoscaling \
--min-idle=5m \
--max-idle=10m \
--autoscaling-policy=gs://my-policies/autoscale-policy.yaml
Autoscaling Policy
# autoscale-policy.yaml
autoscalingLimits:
minWorkers: 2
maxWorkers: 20
autoscalingRules:
- cooldownPeriodSec: 120
metric:
yarn:YARNMemoryAvailablePercentage
source: yarn
scalingPolicy:
scaleDownFactor: 0.5
scaleUpFactor: 1.0
- cooldownPeriodSec: 120
metric:
yarn:YARNContainersPending
source: yarn
scalingPolicy:
scaleDownFactor: 1.0
scaleUpFactor: 2.0
Initialization Actions
Initialization actions run on cluster startup to customize the environment.
# Create initialization action
cat > init.sh << 'EOF'
#!/bin/bash
set -e
# Install Python packages
pip install pandas==2.0.0 sqlalchemy psycopg2-binary
# Install Jupyter extensions
jupyter contrib nbextension install --user
# Configure Spark
cat > /etc/spark/conf/spark-defaults.conf << 'SPARK_CONF'
spark.dynamicAllocation.enabled=true
spark.sql.adaptive.enabled=true
spark.eventLog.enabled=true
spark.eventLog.dir=gs://my-bucket/spark-logs/
SPARK_CONF
# Download JAR dependencies
gsutil cp gs://my-bucket/libs/postgres-42.6.0.jar /usr/lib/spark/jars/
gsutil cp gs://my-bucket/libs/mysql-connector-j-8.0.33.jar /usr/lib/spark/jars/
EOF
# Upload init action
gsutil cp init.sh gs://my-scripts/init.sh
# Create cluster with init action
gcloud dataproc clusters create custom-cluster \
--region=us-central1 \
--initialization-actions=gs://my-scripts/init.sh \
--initialization-action-timeout=10m
Always monitor your BigQuery costs using INFORMATION_SCHEMA. Set up budget alerts at 50%, 80%, and 100% thresholds.
Cost Optimization
# Cost analysis for Dataproc
cost_analysis = {
"standard_4_workers": {
"master": "n1-standard-4 ($0.19/hr)",
"workers": "4x n1-standard-4 ($0.76/hr)",
"total_hourly": "$0.95/hr",
"daily_24h": "$22.80/day",
"monthly_730h": "$693.50/month"
},
"preemptible_4_workers": {
"master": "n1-standard-4 ($0.19/hr)",
"workers": "4x preemptible ($0.152/hr)",
"total_hourly": "$0.342/hr",
"savings": "64% vs on-demand"
},
"autoscale_2_to_10": {
"min": "2 workers ($0.566/hr)",
"max": "10 workers ($1.326/hr)",
"avg": "~4 workers ($0.83/hr)",
"savings": "30-50% vs fixed cluster"
}
}
# Cost optimization strategies
strategies = {
"preemptible_workers": "Up to 91% discount for fault-tolerant batch",
"autoscaling": "Scale down when idle, up during peak",
"max_idle": "Auto-delete clusters after idle period",
"startup_scaling": "Start with minimal workers, scale as needed",
"hdfs_to_gcs": "Use GCS instead of HDFS for data persistence",
"right_sizing": "Use n1-standard instead of high-mem for most jobs"
}
βΉοΈ
Cost Tip: Dataproc with pre-emptible workers can reduce costs by up to 91%. Use --max-idle=10m to auto-delete clusters after 10 minutes of idle time. For persistent clusters, use --enable-autoscaling with appropriate min/max workers to scale with demand.
Common Interview Questions
Q1: When would you use Dataproc vs. Dataflow?
Answer: Dataproc is for existing Spark/Hadoop workloads, complex ML pipelines, or when you need cluster-level control. Dataflow is for new development, unified batch/streaming, or when you want serverless operation. Dataproc provides more control over cluster configuration; Dataflow eliminates cluster management.
Q2: How do you optimize Spark jobs on Dataproc?
Answer: 1) Enable dynamic allocation to auto-scale executors, 2) Use adaptive query execution for runtime optimization, 3) Tune shuffle partitions based on data size, 4) Use broadcast joins for small tables, 5) Partition data by frequently filtered columns, 6) Use columnar formats (Parquet/ORC) for I/O efficiency.
Q3: What is the difference between preemptible and on-demand workers?
Answer: On-demand workers have guaranteed uptime and standard pricing. Preemptible workers (spot VMs) can be terminated anytime by Google but provide up to 91% discount. Use pre-emptible workers for fault-tolerant batch workloads. The master node should always be on-demand.
Q4: How do you handle cluster failures in Dataproc?
Answer: 1) Use HDFS replication (factor 3) for data durability, 2) Enable YARN node labeling for application placement, 3) Implement retry logic in Spark jobs, 4) Use persistent history server for job recovery, 5) Configure automatic restart policies for critical applications.
Q5: How does Dataproc integrate with GCS?
Answer: Dataproc provides a GCS connector that allows Hadoop-compatible applications to read/write GCS as if it were HDFS. The connector is pre-installed on Dataproc clusters. Configure fs.gs.impl and fs.AbstractFileSystem.gs.impl for Hadoop-compatible access. Use GCS for data persistence since HDFS data is lost when the cluster is deleted.