Apache Spark Provider Integration with Airflow
Architecture Diagram
Formal Definitions
DfSparkSubmitOperator
The SparkSubmitOperator wraps the spark-submit CLI command, providing Airflow-native task execution for PySpark, Spark Scala, and Spark Java applications. It manages master URL construction, package dependencies, and configuration arguments.
DfSpark Connection
A Spark connection in Airflow specifies the cluster endpoint (host), deploy mode (extra.host), and authentication details. It abstracts cluster-specific configuration, allowing operators to be portable across YARN, Kubernetes, and standalone deployments.
DfDeploy Mode
Deploy mode determines where the Spark driver runs: client (driver on the Airflow worker) or cluster (driver on the cluster). In cluster mode, is reduced since the driver doesn't depend on Airflow's worker process.
Detailed Explanation
Spark Integration Overview
The Spark provider enables Airflow to submit and monitor Spark jobs across YARN, Kubernetes, and standalone clusters. It wraps the spark-submit CLI command.
Key Insight: Use cluster deploy mode for production to decouple the Spark driver from Airflow workers. This prevents worker failures from killing Spark jobs.
Operator Selection Guide
| Operator | Use Case | Best For |
|---|---|---|
| SparkSubmitOperator | Submit any Spark app | PySpark, Scala, Java |
| SparkJDBCOperator | JDBC read/write | Database transfers |
| SparkSQLOperator | Execute SQL queries | Hive/Spark SQL |
| SparkBatchOperator | Batch processing | Structured Streaming |
Deploy Mode Comparison
| Mode | Driver Location | Worker Dependency | Best For |
|---|---|---|---|
| Client | Airflow worker | High | Development, debugging |
| Cluster | Spark cluster | Low | Production, long jobs |
Connection Setup
# Airflow connection for Spark
# Connection ID: spark_default
# Connection Type: Spark
# Host: yarn (or k8s://https://\<api\>:6443 or spark://<host>:7077)
# Extra: {
# "deploy-mode": "cluster",
# "spark-home": "/opt/spark",
# "spark-binary": "spark-submit",
# "namespace": "spark-jobs",
# "job-template": {
# "apiVersion": "spark.apache.org/v1beta2",
# "kind": "SparkApplication",
# "spec": {"...": "..."}
# }
# }
Basic SparkSubmitOperator
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
with DAG(
dag_id='spark_etl_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False,
tags=['spark', 'etl'],
) as dag:
submit_extract = SparkSubmitOperator(
task_id='spark_extract',
conn_id='spark_default',
application='/opt/spark/jobs/extract.py',
name='extract_job_{{ ds }}',
application_args=[
'--date', '{{ ds }}',
'--source', 's3://data-lake/raw/',
'--target', 's3://data-lake/staging/',
],
conf={
'spark.sql.shuffle.partitions': '200',
'spark.sql.adaptive.enabled': 'true',
'spark.dynamicAllocation.enabled': 'true',
'spark.dynamicAllocation.minExecutors': '2',
'spark.dynamicAllocation.maxExecutors': '20',
},
packages='org.apache.hadoop:hadoop-aws:3.3.4',
py_files='/opt/spark/lib/utils.py',
driver_memory='4g',
executor_memory='8g',
executor_cores=4,
num_executors=10,
verbose=False,
trigger_rule='all_success',
on_failure_callback=None,
timeout=3600,
)
submit_transform = SparkSubmitOperator(
task_id='spark_transform',
conn_id='spark_default',
application='/opt/spark/jobs/transform.py',
name='transform_job_{{ ds }}',
application_args=[
'--date', '{{ ds }}',
'--input', 's3://data-lake/staging/',
'--output', 's3://data-lake/processed/',
],
conf={
'spark.sql.shuffle.partitions': '200',
'spark.sql.adaptive.enabled': 'true',
},
driver_memory='4g',
executor_memory='8g',
num_executors=10,
)
submit_extract >> submit_transform
SparkJDBCOperator
from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator
jdbc_extract = SparkJDBCOperator(
task_id='jdbc_extract',
conn_id='spark_default',
jdbc_table='orders',
jdbc_url='jdbc:postgresql://db-host:5432/warehouse',
jdbc_driver='org.postgresql.Driver',
connection_properties={
'user': '{{ conn.jdbc.username }}',
'password': '{{ conn.jdbc.password }}',
},
target_dir='s3://data-lake/staging/orders/',
fetchsize=10000,
batchsize=5000,
num_partitions=20,
partition_column='order_id',
lower_bound=1,
upper_bound=1000000,
driver_memory='4g',
executor_memory='8g',
num_executors=5,
)
PySpark Job File
# /opt/spark/jobs/transform.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, max
import argparse
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--date', required=True)
parser.add_argument('--input', required=True)
parser.add_argument('--output', required=True)
return parser.parse_args()
def main():
args = parse_args()
spark = SparkSession.builder \
.appName(f'transform_{args.date}') \
.getOrCreate()
# Read raw data
orders = spark.read.parquet(f'{args.input}/orders/{args.date}')
# Transform
aggregated = orders.groupBy('customer_id').agg(
sum('amount').alias('total_amount'),
count('*').alias('order_count'),
max('order_date').alias('last_order_date'),
)
# Write processed data
aggregated.write.mode('overwrite').parquet(f'{args.output}/fct_orders/{args.date}')
spark.stop()
if __name__ == '__main__':
main()
Deferrable Spark Job Monitoring
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.apache.spark.triggers.spark_submit import (
SparkSubmitTrigger,
)
from airflow.operators.python import PythonOperator
from typing import Any, Dict
class DeferredSparkSubmitOperator(SparkSubmitOperator):
"""
SparkSubmitOperator that defers after submitting the job.
The worker slot is released while monitoring.
"""
def execute(self, context: Dict[str, Any]) -> None:
self._hook = self._get_hook()
self._conn = self._hook.get_connection(self._conn_id)
application_id = self._submit_job(
context['run_id'],
self._application,
)
self.defer(
timeout=self._total_timeout_seconds,
trigger=SparkSubmitTrigger(
application_id=application_id,
poll_interval=self._poll_interval,
conn_id=self._conn_id,
),
method_name='execute_complete',
)
def execute_complete(self, context: Dict[str, Any], event: Dict[str, Any]) -> None:
if event['status'] == 'error':
raise RuntimeError(f'Spark job failed: {event.get("message")}')
self.log.info(f'Spark job completed: {event.get("application_id")}')
Key Concepts Table
| Operator | Purpose | Connection | Best For |
|---|---|---|---|
| SparkSubmitOperator | Submit any Spark app | spark_default | PySpark, Scala, Java |
| SparkJDBCOperator | JDBC read/write | spark_default | Database transfers |
| SparkSQLOperator | Execute SQL queries | spark_default | Hive/Spark SQL |
| SparkBatchOperator | Batch processing | spark_default | Structured Streaming |
Configuration Reference
| Parameter | Description | YARN | Kubernetes |
|---|---|---|---|
conn_id | Spark connection | yarn | k8s://https://<api>:6443 |
deploy_mode | client or cluster | cluster | cluster |
num_executors | Executor count | Fixed or dynamic | Pod replicas |
executor_memory | RAM per executor | 4-16g | Configured via resources |
executor_cores | CPU per executor | 2-8 | Configured via resources |
driver_memory | Driver RAM | 2-8g | 2-8g |
packages | Maven coordinates | Downloaded on submit | Downloaded on submit |
py_files | Python dependencies | Local or HDFS | Mounted volumes |
Best Practices
Job Configuration
- Use cluster deploy mode for production β client mode ties the driver to Airflow's worker.
- Enable dynamic allocation to optimize resource usage:
spark.dynamicAllocation.enabled=true. - Set
num_partitionsappropriately for JDBC reads to avoid OOM or under-parallelization. - Use
packagesfor dependency management instead of pre-installing on cluster.
Monitoring and Performance
- Monitor Spark UI for job performance β check stages, shuffle, and spill metrics.
- Use deferrable operators for long-running Spark jobs to free worker slots.
Operational Guidelines
- Set
timeouton SparkSubmitOperator to prevent indefinite hangs. - Tag Spark applications with Airflow run IDs for traceability.
Spark Configuration Reference
| Configuration | Description | Recommended Value |
|---|---|---|
spark.sql.shuffle.partitions | Number of shuffle partitions | 200 (default) |
spark.sql.adaptive.enabled | Adaptive query execution | true |
spark.dynamicAllocation.enabled | Dynamic executor allocation | true |
spark.dynamicAllocation.minExecutors | Minimum executors | 2 |
spark.dynamicAllocation.maxExecutors | Maximum executors | 20 |
For Spark on Kubernetes, configure the Spark connection to use k8s://https://<api-server>:6443 and set spark.kubernetes.authenticate.driver.serviceAccountName in conf. This avoids embedding credentials in the operator.
SparkSubmitOperator builds the spark-submit command from operator arguments. The command is executed as a subprocess on the Airflow worker (client mode) or via the cluster manager (cluster mode). Ensure the Airflow worker has access to spark-submit in its PATH.
Key Takeaways:
- SparkSubmitOperator wraps spark-submit for PySpark, Scala, and Java applications
- SparkJDBCOperator handles database reads/writes with partitioned parallel loading
- Use cluster deploy mode for production to decouple the Spark driver from Airflow workers
- Dynamic allocation and adaptive query execution optimize resource usage
- Deferrable SparkSubmitOperator frees worker slots during long-running jobs
- Configure spark-defaults.conf on the cluster for shared settings across jobs
See Also
- Databricks Provider β Databricks-managed Spark clusters
- BigQuery Provider β Google BigQuery integration
- Snowflake Provider β Snowflake data warehouse integration
- Operators and Hooks β Operator lifecycle and hook architecture