πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Cloud Dataproc for Spark/Hadoop Workloads

🟒 Free Lesson

Advertisement

Cloud Dataproc for Spark/Hadoop Workloads

Dataproc Cluster ArchitectureMaster NodeHDFS NameNodeWorker 1DataNode + ExecutorWorker 2DataNode + ExecutorWorker NDataNode + ExecutorExternal ServicesHive/HBase/PrestoGCS IntegrationCloud Storage ConnectorAuto-scalingDynamic Worker ManagementPreemptible VMsCost OptimizationInitialization ActionsCustom Setup Scripts

Dataproc Architecture

Cloud Dataproc is a managed Spark and Hadoop service for running Apache Spark, Apache Hadoop, and other data processing frameworks on GCP.

Core Components

Master Node:

  • Runs YARN ResourceManager
  • HDFS NameNode
  • Spark History Server
  • Jupyter notebook server (optional)

Worker Nodes:

  • Run YARN NodeManager
  • HDFS DataNode
  • Spark Executor containers
  • Handle data processing tasks

External Services:

  • Hive Metastore
  • HBase (optional)
  • Presto (optional)
  • Kafka (optional)

Cluster Creation

Basic Cluster Creation

# Create a basic cluster
gcloud dataproc clusters create my-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=3 \
    --image-version=2.1-debian

# Create a cluster with autoscaling
gcloud dataproc clusters create my-autoscaling-cluster \
    --region=us-central1 \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --max-num-workers=10 \
    --autoscaling-policy=my-autoscaling-policy

Advanced Cluster Configuration

# Create cluster with initialization actions
gcloud dataproc clusters create my-advanced-cluster \
    --region=us-central1 \
    --master-machine-type=n1-standard-8 \
    --worker-machine-type=n1-standard-8 \
    --num-workers=5 \
    --image-version=2.1-debian \
    --initialization-actions=gs://my-bucket/init-script.sh \
    --properties=spark:spark.dynamicAllocation.enabled=true \
    --properties=yarn:yarn.nodemanager.resource.memory-mb=12288 \
    --properties=spark:spark.executor.memory=8g \
    --properties=spark:spark.executor.cores=4

Python Client Library

from google.cloud import dataproc_v1 as dataproc
import time

def create_cluster(project_id, region, cluster_name):
    """Create a Dataproc cluster."""
    client = dataproc.ClusterControllerClient(
        client_info=dataproc.gapic.transports.cluster_controller_transports.ClusterControllerGrpcTransport
    )
    
    cluster = {
        'project_id': project_id,
        'cluster_name': cluster_name,
        'config': {
            'master_config': {
                'num_instances': 1,
                'machine_type_uri': 'n1-standard-4',
                'disk_config': {
                    'boot_disk_type': 'pd-ssd',
                    'boot_disk_size_gb': 100
                }
            },
            'worker_config': {
                'num_instances': 3,
                'machine_type_uri': 'n1-standard-4',
                'disk_config': {
                    'boot_disk_type': 'pd-standard',
                    'boot_disk_size_gb': 100
                }
            },
            'software_config': {
                'image_version': '2.1-debian',
                'properties': {
                    'spark:spark.dynamicAllocation.enabled': 'true',
                    'spark:spark.shuffle.service.enabled': 'true'
                }
            },
            'initialization_actions': [
                {
                    'executable_file': 'gs://my-bucket/init-script.sh'
                }
            ]
        }
    }
    
    operation = client.cluster(
        request={'project_id': project_id, 'region': region, 'cluster': cluster}
    )
    
    return operation.result()

# Create cluster
create_cluster('my-project', 'us-central1', 'my-cluster')

Image Versions

Dataproc uses image versions that bundle specific versions of Spark, Hadoop, and other components.

Supported Image Versions

Image VersionSparkHadoopHivePython
2.1-debian3.3.23.3.63.1.33.10
2.0-debian3.2.33.3.23.1.23.9
1.5-debian2.4.83.2.32.3.93.7
1.4-debian2.3.42.10.12.3.72.7

Using Image Versions

# Specify image version
gcloud dataproc clusters create my-cluster \
    --image-version=2.1-debian \
    --region=us-central1

# Use custom image
gcloud dataproc clusters create my-cluster \
    --image-uri=projects/my-project/global/images/my-custom-image \
    --region=us-central1

Preemptible VMs

Preemptible VMs are short-lived, lower-cost instances that can be preempted by GCP for system needs.

Configuring Preemptible Workers

# Create cluster with preemptible workers
gcloud dataproc clusters create my-cluster \
    --num-workers=10 \
    --num-preemptible-workers=5 \
    --worker-machine-type=n1-standard-4 \
    --preemptible-worker-boot-disk-size=100 \
    --region=us-central1

Preemptible VM Configuration

# Configure preemptible workers
cluster = {
    'config': {
        'worker_config': {
            'num_instances': 10,
            'machine_type_uri': 'n1-standard-4',
            'is_preemptible': True,
            'disk_config': {
                'boot_disk_size_gb': 100,
                'num_local_ssds': 0
            }
        },
        'autoscaling_config': {
            'policy_uri': 'projects/my-project/regions/us-central1/autoscalingPolicies/my-policy'
        }
    }
}

⚠️ Cost Alert

Always monitor your BigQuery costs using INFORMATION_SCHEMA. Set up budget alerts at 50%, 80%, and 100% thresholds.

Cost Optimization

# Monitor preemptible VM usage
from google.cloud import monitoring_v3
import time

def get_preemptible_usage(project_id, cluster_name):
    """Get preemptible VM usage metrics."""
    client = monitoring_v3.MetricServiceClient()
    project_name = f"projects/{project_id}"
    
    interval = monitoring_v3.TimeInterval()
    interval.end_time = time.time()
    interval.start_time = time.time() - 3600  # Last hour
    
    results = client.list_time_series(
        request={
            'name': project_name,
            'filter': f'resource.label.cluster_name = "{cluster_name}" AND metric.type = "compute.googleapis.com/instance/preempted"',
            'interval': interval,
            'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL
        }
    )
    
    for result in results:
        print(f'Preempted instances: {result.points[0].value.int64_value}')

Initialization Actions

Initialization actions are scripts that run on cluster nodes during cluster setup.

Creating Initialization Actions

#!/bin/bash
# init-script.sh - Install additional packages on cluster nodes

# Update package list
sudo apt-get update -y

# Install Python packages
pip install pandas numpy scikit-learn

# Install custom Spark packages
/opt/spark/bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.3.2

# Configure Spark
echo 'spark.executor.memory=8g' >> /etc/spark/conf/spark-defaults.conf
echo 'spark.executor.cores=4' >> /etc/spark/conf/spark-defaults.conf
echo 'spark.dynamicAllocation.enabled=true' >> /etc/spark/conf/spark-defaults.conf

# Mount additional storage
sudo mount -o discard,defaults /dev/sdb /mnt/scratch

Using Initialization Actions

# Upload initialization action to GCS
gsutil cp init-script.sh gs://my-bucket/init-script.sh

# Use initialization action when creating cluster
gcloud dataproc clusters create my-cluster \
    --initialization-actions=gs://my-bucket/init-script.sh \
    --region=us-central1

# Multiple initialization actions
gcloud dataproc clusters create my-cluster \
    --initialization-actions=gs://my-bucket/script1.sh,gs://my-bucket/script2.sh \
    --region=us-central1

Initialization Action Best Practices

# Best practices for initialization actions
best_practices = {
    'idempotency': 'Scripts should be idempotent and handle reruns',
    'error_handling': 'Implement proper error handling and logging',
    'timeout': 'Keep initialization actions under 10 minutes',
    'testing': 'Test initialization actions before production use',
    'logging': 'Use consistent logging for debugging',
    'dependencies': 'Check for dependencies before installing'
}

# Example initialization action with error handling
script = '''#!/bin/bash
set -e  # Exit on error

echo "Starting initialization..."

# Check if package is already installed
if ! dpkg -l | grep -q "package-name"; then
    sudo apt-get update
    sudo apt-get install -y package-name
fi

# Verify installation
if command -v package-name &> /dev/null; then
    echo "Package installed successfully"
else
    echo "Failed to install package"
    exit 1
fi
'''

Spark on Dataproc

Running Spark Jobs

# Submit Spark job
gcloud dataproc jobs submit spark \
    --cluster=my-cluster \
    --region=us-central1 \
    --class=org.apache.spark.examples.SparkPi \
    --jars=gs://spark-samples/spark-pi.jar

# Submit PySpark job
gcloud dataproc jobs submit pyspark \
    --cluster=my-cluster \
    --region=us-central1 \
    gs://my-bucket/my-spark-job.py

# Submit with dependencies
gcloud dataproc jobs submit pyspark \
    --cluster=my-cluster \
    --region=us-central1 \
    --py-files=gs://my-bucket/dependencies.zip \
    --packages=org.apache.spark:spark-avro_2.12:3.3.2 \
    gs://my-bucket/my-job.py

Spark Configuration

# Spark job configuration
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("My Dataproc Job") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.shuffle.service.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.default.parallelism", "200") \
    .getOrCreate()

# Read data from GCS
df = spark.read.parquet("gs://my-bucket/data/*.parquet")

# Process data
result = df.groupBy("category").agg({"amount": "sum"})

# Write to BigQuery
result.write \
    .format("bigquery") \
    .option("table", "my-project:analytics.summary") \
    .option("writeDisposition", "WRITE_APPEND") \
    .save()

Spark SQL

-- Spark SQL on Dataproc
CREATE TEMPORARY VIEW sales
USING parquet
OPTIONS (
  path "gs://my-bucket/sales/*.parquet"
);

-- Analyze data
SELECT
  category,
  SUM(amount) as total_sales,
  COUNT(*) as transaction_count
FROM sales
WHERE sale_date >= '2024-01-01'
GROUP BY category
ORDER BY total_sales DESC;

Auto-scaling

Auto-scaling dynamically adjusts the number of worker nodes based on workload.

Creating Autoscaling Policies

# Create autoscaling policy
gcloud dataproc autoscaling-policies import my-policy \
    --source=policy.yaml \
    --region=us-central1

Autoscaling Policy Configuration

# policy.yaml
autoscalingPolicy:
  basicAlgorithm:
    yarnConfig:
      gracefulDecommissionTimeout: "3600s"
      scaleUpFactor: 1.0
      scaleDownFactor: 1.0
    cooldownPeriod: "300s"
  workerConfig:
    minInstances: 2
    maxInstances: 10
  secondaryWorkerConfig:
    minInstances: 0
    maxInstances: 5

Auto-scaling Monitoring

# Monitor autoscaling
from google.cloud import dataproc_v1 as dataproc

def get_cluster_status(project_id, region, cluster_name):
    """Get cluster status including autoscaling."""
    client = dataproc.ClusterControllerClient()
    
    cluster = client.get_cluster(
        request={
            'project_id': project_id,
            'region': region,
            'cluster_name': cluster_name
        }
    )
    
    print(f'Cluster status: {cluster.status.state}')
    print(f'Master nodes: {cluster.config.master_config.num_instances}')
    print(f'Worker nodes: {cluster.config.worker_config.num_instances}')
    print(f'Secondary workers: {cluster.config.secondary_worker_config.num_instances}')
    
    if cluster.config.autoscaling_config:
        print(f'Autoscaling policy: {cluster.config.autoscaling_config.policy_uri}')

GCS Integration

Dataproc integrates seamlessly with Google Cloud Storage for data storage.

Using GCS as HDFS

# Read from GCS using Spark
df = spark.read.parquet("gs://my-bucket/data/*.parquet")

# Write to GCS
df.write.parquet("gs://my-bucket/output/")

# Use GCS as primary storage
spark.conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark.conf.set("fs.gs.auth.service.account.json.keyfile", "/path/to/key.json")

GCS Connector Configuration

# Configure GCS connector
gcloud dataproc clusters create my-cluster \
    --properties=google.cloud.dataproc.spark.dataproc.gcs.connector.enabled=true \
    --properties=fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem

Monitoring and Debugging

Cluster Monitoring

# Monitor cluster health
from google.cloud import monitoring_v3
import time

def monitor_cluster(project_id, cluster_name):
    """Monitor Dataproc cluster metrics."""
    client = monitoring_v3.MetricServiceClient()
    project_name = f"projects/{project_id}"
    
    # Monitor YARN metrics
    interval = monitoring_v3.TimeInterval()
    interval.end_time = time.time()
    interval.start_time = time.time() - 3600
    
    results = client.list_time_series(
        request={
            'name': project_name,
            'filter': f'resource.label.cluster_name = "{cluster_name}" AND metric.type = "compute.googleapis.com/instance/cpu/utilization"',
            'interval': interval,
            'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL
        }
    )
    
    for result in results:
        print(f'CPU utilization: {result.points[0].value.double_value}')

Debugging Common Issues

# Check cluster logs
gcloud dataproc clusters describe my-cluster --region=us-central1

# Access Spark History Server
gcloud compute ssh my-cluster-m --zone=us-central1-a -- -L 18080:localhost:18080

# Check YARN logs
gcloud compute ssh my-cluster-m --zone=us-central1-a -- -L 8088:localhost:8088

# View HDFS status
gcloud compute ssh my-cluster-m --zone=us-central1-a -- -L 9870:localhost:9870

Best Practices

  1. Use appropriate image versions - Match versions to your workload requirements
  2. Implement auto-scaling - Optimize costs with dynamic scaling
  3. Use preemptible VMs - Reduce costs for fault-tolerant workloads
  4. Leverage GCS integration - Use GCS as primary storage for flexibility
  5. Monitor cluster health - Track metrics and set up alerts
  6. Optimize Spark configuration - Tune memory, cores, and parallelism
  7. Use initialization actions - Customize clusters for your specific needs
⭐

Premium Content

Cloud Dataproc for Spark/Hadoop Workloads

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert GCP Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement