πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Databricks Provider Integration with Airflow

🟒 Free Lesson

Advertisement

Databricks Provider Integration with Airflow

Airflow-Databricks IntegrationAirflowSchedulerDatabricksHookAPI authREST APIJobs & ClustersClusterCompute layerData LakeS3 / ADLS / GCSSubmitRunOperatorSubmit Spark jobsNotebookOperatorRun notebooksRunNowOperatorTrigger existing jobsJobSensorWait for job completionUse DatabricksSubmitRun for new jobs; RunNow for existing job triggers

Architecture Diagram

Formal Definitions

DfDatabricks Provider

The Databricks provider (apache-airflow-providers-databricks) is a collection of operators, hooks, triggers, and sensors that integrate Apache Airflow with Databricks. It manages cluster lifecycle, job submission, notebook execution, and data movement operations through the Databricks REST API.

DfDatabricks Hook

The DatabricksHook encapsulates authentication (token or OAuth), API versioning, retry logic, and HTTP client management for Databricks REST API calls. It provides a unified interface for all Databricks operations.

Detailed Explanation

Databricks Integration Overview

The Databricks provider enables Airflow to orchestrate Spark jobs, notebooks, and cluster operations on Databricks. It abstracts the complexity of the Databricks REST API.

Key Insight: Use DatabricksRunNowOperator for long-running jobs (>30 min) to avoid the 15-minute timeout on SubmitRunOperator.

Operator Selection Guide

OperatorUse CaseCreates ClusterPolling
DatabricksSubmitRunOperatorSubmit new jobYes or existingYes
DatabricksNotebookOperatorRun notebookYes or existingYes
DatabricksRunNowOperatorTrigger existing jobNoOptional
DatabricksJobSensorWait for jobNoYes
DatabricksClusterLifeCycleOperatorStart/stop clusterNoYes

Connection Setup

# Airflow connection configuration
# Connection ID: databricks_default
# Connection Type: Databricks
# Host: https://<databricks-instance>.databricks.com
# Password: <personal-access-token>
# Extra: {"login": "<user>", "token": "<personal-access-token>"}

DatabricksSubmitRunOperator

The primary operator for submitting new Databricks jobs. It creates a new cluster or uses an existing one, submits the job, and monitors until completion.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.databricks.operators.databricks import (
    DatabricksSubmitRunOperator,
)

with DAG(
    dag_id='databricks_spark_job',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    tags=['databricks', 'spark'],
) as dag:

    submit_spark = DatabricksSubmitRunOperator(
        task_id='submit_spark_job',
        databricks_conn_id='databricks_default',
        new_cluster={
            'spark_version': '13.3.x-scala2.12',
            'node_type_id': 'i3.xlarge',
            'num_workers': 4,
            'spark_conf': {
                'spark.sql.shuffle.partitions': '200',
                'spark.sql.adaptive.enabled': 'true',
            },
            'custom_tags': {
                'team': 'data-engineering',
                'project': 'etl-pipeline',
            },
        },
        spark_jar_task={
            'main_class_name': 'com.company.ETLJob',
            'parameters': [
                '--date', '{{ ds }}',
                '--source', 's3://data-lake/raw/',
                '--target', 's3://data-lake/processed/',
            ],
        },
        libraries=[
            {'jar': 's3://jars/company-etl-1.0.jar'},
            {'pypi': {'package': 'pyspark==3.5.0'}},
        ],
        timeout_seconds=3600,
        polling_period_seconds=30,
        trigger_rule='all_success',
    )

    submit_spark

DatabricksNotebookOperator

Execute Databricks notebooks as part of Airflow workflows.

from airflow.providers.databricks.operators.databricks import (
    DatabricksNotebookOperator,
)

run_notebook = DatabricksNotebookOperator(
    task_id='run_data_processing',
    databricks_conn_id='databricks_default',
    source='Repos/team/notebooks/data_processing',
    source_type='WORKSPACE',
    base_parameters={
        'date': '{{ ds }}',
        'run_type': 'scheduled',
    },
    new_cluster={
        'spark_version': '13.3.x-scala2.12',
        'node_type_id': 'Standard_DS3_v2',
        'num_workers': 2,
    },
    timeout_seconds=1800,
)

DatabricksRunNowOperator

Trigger an existing Databricks job by ID.

from airflow.providers.databricks.operators.databricks import (
    DatabricksRunNowOperator,
)

trigger_existing_job = DatabricksRunNowOperator(
    task_id='trigger_etl_job',
    databricks_conn_id='databricks_default',
    job_id=123456789,
    json_params={
        'date': '{{ ds }}',
        'overwrite': 'true',
    },
    poll=True,
    poll_interval=30,
    timeout=3600,
)

Sensor for Job Completion

from airflow.providers.databricks.sensors.databricks import (
    DatabricksJobSensor,
)

wait_for_job = DatabricksJobSensor(
    task_id='wait_for_databricks_job',
    databricks_conn_id='databricks_default',
    run_id='{{ ti.xcom_pull(task_ids="submit_job", key="run_id") }}',
    poke_interval=30,
    timeout=3600,
    mode='reschedule',
)

Full ETL Pipeline

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.databricks.operators.databricks import (
    DatabricksSubmitRunOperator,
    DatabricksRunNowOperator,
)
from airflow.providers.databricks.sensors.databricks import (
    DatabricksJobSensor,
)
from airflow.operators.python import PythonOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator


def on_job_failure(context):
    SlackWebhookOperator(
        task_id='alert',
        slack_webhook_conn_id='slack_webhook',
        message=f'Databricks job failed: {context["exception"]}',
    ).execute(context={})


with DAG(
    dag_id='databricks_full_etl',
    start_date=datetime(2024, 1, 1),
    schedule_interval='0 2 * * *',
    catchup=False,
    default_args={
        'on_failure_callback': on_job_failure,
        'retries': 1,
        'retry_delay': timedelta(minutes=10),
    },
    tags=['databricks', 'etl'],
) as dag:

    submit_extract = DatabricksSubmitRunOperator(
        task_id='extract_data',
        new_cluster={
            'spark_version': '13.3.x-scala2.12',
            'node_type_id': 'i3.xlarge',
            'num_workers': 2,
        },
        spark_jar_task={
            'main_class_name': 'com.company.ExtractJob',
            'parameters': ['--date', '{{ ds }}'],
        },
        libraries=[{'jar': 's3://jars/extract-1.0.jar'}],
        timeout_seconds=3600,
    )

    wait_extract = DatabricksJobSensor(
        task_id='wait_extract',
        run_id='{{ ti.xcom_pull(task_ids="extract_data", key="run_id") }}',
        poke_interval=30,
        mode='reschedule',
    )

    submit_transform = DatabricksSubmitRunOperator(
        task_id='transform_data',
        existing_cluster_id='0123-456789-abcdef',
        spark_jar_task={
            'main_class_name': 'com.company.TransformJob',
            'parameters': ['--date', '{{ ds }}'],
        },
        libraries=[{'jar': 's3://jars/transform-1.0.jar'}],
        timeout_seconds=3600,
    )

    wait_transform = DatabricksJobSensor(
        task_id='wait_transform',
        run_id='{{ ti.xcom_pull(task_ids="transform_data", key="run_id") }}',
        poke_interval=30,
        mode='reschedule',
    )

    notify = PythonOperator(
        task_id='notify_completion',
        python_callable=lambda: print('ETL pipeline completed'),
    )

    submit_extract >> wait_extract >> submit_transform >> wait_transform >> notify

Key Concepts Table

OperatorPurposeCreates ClusterPolling
DatabricksSubmitRunOperatorSubmit new jobYes or existingYes
DatabricksNotebookOperatorRun notebookYes or existingYes
DatabricksRunNowOperatorTrigger existing jobNoOptional
DatabricksJobSensorWait for jobNoYes
DatabricksClusterLifeCycleOperatorStart/stop/restart clusterNoYes

Configuration Reference

ParameterDescriptionExample
databricks_conn_idAirflow connection IDdatabricks_default
new_clusterCluster spec for new cluster{'spark_version': '13.3.x', ...}
existing_cluster_idReuse existing cluster0123-456789-abcdef
spark_jar_taskJAR-based job definition{'main_class_name': '...'}
notebook_taskNotebook-based job{'notebook_path': '...'}
timeout_secondsMax wait time3600
polling_period_secondsPoll interval30
json_paramsJob parameters{'key': 'value'}

Best Practices

Cluster Management

  1. Use existing clusters when possible β€” cluster startup can take 5-10 minutes.
  2. Tag clusters with team/project for cost tracking and management.
  3. Monitor job runs through Databricks job history and Airflow task logs.

Operator Configuration

  1. Set timeout_seconds on all operators to prevent indefinite waits.
  2. Use mode='reschedule' on sensors to free worker slots during waits.
  3. Use libraries for dependency management β€” wheel files, pip packages, or maven coordinates.

Error Handling

  1. Handle TERMINATED_WITH_ERRORS state with retries and failure callbacks.
  2. Leverage json_params for dynamic job parameterization via Jinja templates.

Cost Optimization

StrategyImpactImplementation
Reuse clusters5-10 min faster startupSet existing_cluster_id
Auto-terminate idle clustersReduce costsConfigure in Databricks
Use spot instances60-70% cost reductionSet in cluster config
Right-size clustersOptimize resource usageMonitor utilization metrics

For long-running Spark jobs (>30 minutes), use DatabricksRunNowOperator with a pre-existing job configured in Databricks. This avoids the 15-minute timeout on SubmitRunOperator and gives you Databricks-native job monitoring.

The Databricks provider requires a Personal Access Token (PAT) or OAuth token stored in Airflow's connection. For managed Airflow (MWAA/Cloud Composer), use the environment's secret backend for token management.

Key Takeaways:

  • The Databricks provider offers operators, sensors, and hooks for full Databricks integration
  • DatabricksSubmitRunOperator creates new jobs with configurable clusters
  • DatabricksRunNowOperator triggers pre-existing Databricks jobs
  • Use DatabricksJobSensor with mode='reschedule' for efficient waiting
  • Configure existing_cluster_id to avoid cluster startup overhead
  • Always set timeout_seconds and handle failure callbacks

See Also

⭐

Premium Content

Databricks Provider Integration with Airflow

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Airflow Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement