πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Apache Spark Provider Integration with Airflow

🟒 Free Lesson

Advertisement

Apache Spark Provider Integration with Airflow

Spark Integration ArchitectureAirflowSchedulerSparkHookManages submitspark-submitCLI invocationYARNResource managerK8s / StandaloneCluster optionsSubmitOperatorspark-submit CLIJDBCOperatorJDBC read/writeSQLOperatorHive/Spark SQLConnectSensorMonitor job statusUse SparkConnectTrigger for async monitoring; YARN for production workloads

Architecture Diagram

Formal Definitions

DfSparkSubmitOperator

The SparkSubmitOperator wraps the spark-submit CLI command, providing Airflow-native task execution for PySpark, Spark Scala, and Spark Java applications. It manages master URL construction, package dependencies, and configuration arguments.

DfSpark Connection

A Spark connection in Airflow specifies the cluster endpoint (host), deploy mode (extra.host), and authentication details. It abstracts cluster-specific configuration, allowing operators to be portable across YARN, Kubernetes, and standalone deployments.

DfDeploy Mode

Deploy mode determines where the Spark driver runs: client (driver on the Airflow worker) or cluster (driver on the cluster). In cluster mode, ToverheadT_{\text{overhead}} is reduced since the driver doesn't depend on Airflow's worker process.

Detailed Explanation

Spark Integration Overview

The Spark provider enables Airflow to submit and monitor Spark jobs across YARN, Kubernetes, and standalone clusters. It wraps the spark-submit CLI command.

Key Insight: Use cluster deploy mode for production to decouple the Spark driver from Airflow workers. This prevents worker failures from killing Spark jobs.

Operator Selection Guide

OperatorUse CaseBest For
SparkSubmitOperatorSubmit any Spark appPySpark, Scala, Java
SparkJDBCOperatorJDBC read/writeDatabase transfers
SparkSQLOperatorExecute SQL queriesHive/Spark SQL
SparkBatchOperatorBatch processingStructured Streaming

Deploy Mode Comparison

ModeDriver LocationWorker DependencyBest For
ClientAirflow workerHighDevelopment, debugging
ClusterSpark clusterLowProduction, long jobs

Connection Setup

# Airflow connection for Spark
# Connection ID: spark_default
# Connection Type: Spark
# Host: yarn (or k8s://https://\<api\>:6443 or spark://<host>:7077)
# Extra: {
#   "deploy-mode": "cluster",
#   "spark-home": "/opt/spark",
#   "spark-binary": "spark-submit",
#   "namespace": "spark-jobs",
#   "job-template": {
#       "apiVersion": "spark.apache.org/v1beta2",
#       "kind": "SparkApplication",
#       "spec": {"...": "..."}
#   }
# }

Basic SparkSubmitOperator

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

with DAG(
    dag_id='spark_etl_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    tags=['spark', 'etl'],
) as dag:

    submit_extract = SparkSubmitOperator(
        task_id='spark_extract',
        conn_id='spark_default',
        application='/opt/spark/jobs/extract.py',
        name='extract_job_{{ ds }}',
        application_args=[
            '--date', '{{ ds }}',
            '--source', 's3://data-lake/raw/',
            '--target', 's3://data-lake/staging/',
        ],
        conf={
            'spark.sql.shuffle.partitions': '200',
            'spark.sql.adaptive.enabled': 'true',
            'spark.dynamicAllocation.enabled': 'true',
            'spark.dynamicAllocation.minExecutors': '2',
            'spark.dynamicAllocation.maxExecutors': '20',
        },
        packages='org.apache.hadoop:hadoop-aws:3.3.4',
        py_files='/opt/spark/lib/utils.py',
        driver_memory='4g',
        executor_memory='8g',
        executor_cores=4,
        num_executors=10,
        verbose=False,
        trigger_rule='all_success',
        on_failure_callback=None,
        timeout=3600,
    )

    submit_transform = SparkSubmitOperator(
        task_id='spark_transform',
        conn_id='spark_default',
        application='/opt/spark/jobs/transform.py',
        name='transform_job_{{ ds }}',
        application_args=[
            '--date', '{{ ds }}',
            '--input', 's3://data-lake/staging/',
            '--output', 's3://data-lake/processed/',
        ],
        conf={
            'spark.sql.shuffle.partitions': '200',
            'spark.sql.adaptive.enabled': 'true',
        },
        driver_memory='4g',
        executor_memory='8g',
        num_executors=10,
    )

    submit_extract >> submit_transform

SparkJDBCOperator

from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator

jdbc_extract = SparkJDBCOperator(
    task_id='jdbc_extract',
    conn_id='spark_default',
    jdbc_table='orders',
    jdbc_url='jdbc:postgresql://db-host:5432/warehouse',
    jdbc_driver='org.postgresql.Driver',
    connection_properties={
        'user': '{{ conn.jdbc.username }}',
        'password': '{{ conn.jdbc.password }}',
    },
    target_dir='s3://data-lake/staging/orders/',
    fetchsize=10000,
    batchsize=5000,
    num_partitions=20,
    partition_column='order_id',
    lower_bound=1,
    upper_bound=1000000,
    driver_memory='4g',
    executor_memory='8g',
    num_executors=5,
)

PySpark Job File

# /opt/spark/jobs/transform.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, max
import argparse


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--date', required=True)
    parser.add_argument('--input', required=True)
    parser.add_argument('--output', required=True)
    return parser.parse_args()


def main():
    args = parse_args()

    spark = SparkSession.builder \
        .appName(f'transform_{args.date}') \
        .getOrCreate()

    # Read raw data
    orders = spark.read.parquet(f'{args.input}/orders/{args.date}')

    # Transform
    aggregated = orders.groupBy('customer_id').agg(
        sum('amount').alias('total_amount'),
        count('*').alias('order_count'),
        max('order_date').alias('last_order_date'),
    )

    # Write processed data
    aggregated.write.mode('overwrite').parquet(f'{args.output}/fct_orders/{args.date}')

    spark.stop()


if __name__ == '__main__':
    main()

Deferrable Spark Job Monitoring

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.apache.spark.triggers.spark_submit import (
    SparkSubmitTrigger,
)
from airflow.operators.python import PythonOperator
from typing import Any, Dict


class DeferredSparkSubmitOperator(SparkSubmitOperator):
    """
    SparkSubmitOperator that defers after submitting the job.
    The worker slot is released while monitoring.
    """

    def execute(self, context: Dict[str, Any]) -> None:
        self._hook = self._get_hook()
        self._conn = self._hook.get_connection(self._conn_id)

        application_id = self._submit_job(
            context['run_id'],
            self._application,
        )

        self.defer(
            timeout=self._total_timeout_seconds,
            trigger=SparkSubmitTrigger(
                application_id=application_id,
                poll_interval=self._poll_interval,
                conn_id=self._conn_id,
            ),
            method_name='execute_complete',
        )

    def execute_complete(self, context: Dict[str, Any], event: Dict[str, Any]) -> None:
        if event['status'] == 'error':
            raise RuntimeError(f'Spark job failed: {event.get("message")}')
        self.log.info(f'Spark job completed: {event.get("application_id")}')

Key Concepts Table

OperatorPurposeConnectionBest For
SparkSubmitOperatorSubmit any Spark appspark_defaultPySpark, Scala, Java
SparkJDBCOperatorJDBC read/writespark_defaultDatabase transfers
SparkSQLOperatorExecute SQL queriesspark_defaultHive/Spark SQL
SparkBatchOperatorBatch processingspark_defaultStructured Streaming

Configuration Reference

ParameterDescriptionYARNKubernetes
conn_idSpark connectionyarnk8s://https://<api>:6443
deploy_modeclient or clusterclustercluster
num_executorsExecutor countFixed or dynamicPod replicas
executor_memoryRAM per executor4-16gConfigured via resources
executor_coresCPU per executor2-8Configured via resources
driver_memoryDriver RAM2-8g2-8g
packagesMaven coordinatesDownloaded on submitDownloaded on submit
py_filesPython dependenciesLocal or HDFSMounted volumes

Best Practices

Job Configuration

  1. Use cluster deploy mode for production β€” client mode ties the driver to Airflow's worker.
  2. Enable dynamic allocation to optimize resource usage: spark.dynamicAllocation.enabled=true.
  3. Set num_partitions appropriately for JDBC reads to avoid OOM or under-parallelization.
  4. Use packages for dependency management instead of pre-installing on cluster.

Monitoring and Performance

  1. Monitor Spark UI for job performance β€” check stages, shuffle, and spill metrics.
  2. Use deferrable operators for long-running Spark jobs to free worker slots.

Operational Guidelines

  1. Set timeout on SparkSubmitOperator to prevent indefinite hangs.
  2. Tag Spark applications with Airflow run IDs for traceability.

Spark Configuration Reference

ConfigurationDescriptionRecommended Value
spark.sql.shuffle.partitionsNumber of shuffle partitions200 (default)
spark.sql.adaptive.enabledAdaptive query executiontrue
spark.dynamicAllocation.enabledDynamic executor allocationtrue
spark.dynamicAllocation.minExecutorsMinimum executors2
spark.dynamicAllocation.maxExecutorsMaximum executors20

For Spark on Kubernetes, configure the Spark connection to use k8s://https://<api-server>:6443 and set spark.kubernetes.authenticate.driver.serviceAccountName in conf. This avoids embedding credentials in the operator.

SparkSubmitOperator builds the spark-submit command from operator arguments. The command is executed as a subprocess on the Airflow worker (client mode) or via the cluster manager (cluster mode). Ensure the Airflow worker has access to spark-submit in its PATH.

Key Takeaways:

  • SparkSubmitOperator wraps spark-submit for PySpark, Scala, and Java applications
  • SparkJDBCOperator handles database reads/writes with partitioned parallel loading
  • Use cluster deploy mode for production to decouple the Spark driver from Airflow workers
  • Dynamic allocation and adaptive query execution optimize resource usage
  • Deferrable SparkSubmitOperator frees worker slots during long-running jobs
  • Configure spark-defaults.conf on the cluster for shared settings across jobs

See Also

⭐

Premium Content

Apache Spark Provider Integration with Airflow

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Airflow Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement