πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

BigQuery Provider Integration with Airflow

🟒 Free Lesson

Advertisement

BigQuery Provider Integration with Airflow

BigQuery Integration ArchitectureAirflowSchedulerBigQueryHookConnection mgmtBigQuery APIREST / gRPCBigQueryData warehouseGCSStaging areaInsertJobOperatorExecute SQL/LoadGCSToBigQueryLoad from GCSCreateDatasetManage datasetsTableExistenceSensorWait for tableUse partitioning + clustering for cost optimization in BigQuery queries

Architecture Diagram

Formal Definitions

DfBigQuery Provider

The BigQuery provider (apache-airflow-providers-google) provides operators, hooks, triggers, and sensors for Google BigQuery. It wraps the BigQuery Python client library, managing authentication via Google Cloud connections and providing Airflow-native task interfaces.

DfPartition Strategy

BigQuery partitioning divides large tables into segments. Partition strategies include: time-unit partitioning (P=PDAY∣PMONTH∣PYEARP = P_{\text{DAY}} | P_{\text{MONTH}} | P_{\text{YEAR}}), ingestion-time partitioning, and integer-range partitioning. Partition pruning reduces query scan from O(N)O(N) to O(N/Ppartitions)O(N/P_{\text{partitions}}).

Detailed Explanation

BigQuery Integration Overview

The BigQuery provider enables Airflow to execute queries, load data, and manage datasets in Google BigQuery. It wraps the BigQuery Python client library.

Key Insight: Partition pruning reduces query scan from O(N) to O(N/P) where P is partition count. Always include partition columns in WHERE clauses.

Operator Selection Guide

OperatorUse CaseKey Parameters
BigQueryInsertJobOperatorExecute any BQ jobconfiguration, location
GCSToBigQueryOperatorLoad GCS files to BQbucket, source_objects
BigQueryCopyTableOperatorCopy between tablessource_project_dataset_table
BigQueryCreateDatasetOperatorCreate datasetdataset_id, project_id
BigQueryTableExistenceSensorWait for tableproject_id, dataset_id, table_id

Partitioning Strategies

StrategyBest ForQuery PruningCost Impact
DAYHigh-cardinality time seriesExcellentLowest
MONTHMedium-cardinality aggregatesGoodLow
YEARLow-cardinality historicalModerateMedium
HOURReal-time analyticsExcellentLowest
INTEGER_RANGENon-temporal rangesGoodLow

Connection Setup

# Airflow connection for BigQuery
# Connection ID: google_cloud_default
# Connection Type: Google Cloud
# Project ID: my-gcp-project
# Keyfile JSON: Path to service account key
# Extra: {"scope": "https://www.googleapis.com/auth/bigquery"}

Query Execution

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryInsertJobOperator,
)

with DAG(
    dag_id='bigquery_etl',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    tags=['bigquery', 'etl'],
) as dag:

    create_table = BigQueryInsertJobOperator(
        task_id='create_partitioned_table',
        configuration={
            'query': {
                'query': """
                    CREATE TABLE IF NOT EXISTS `project.dataset.orders`
                    (
                        order_id INT64,
                        customer_id INT64,
                        amount NUMERIC,
                        order_date DATE
                    )
                    PARTITION BY order_date
                    CLUSTER BY customer_id
                """,
                'useLegacySql': False,
            }
        },
        location='US',
    )

    load_data = BigQueryInsertJobOperator(
        task_id='load_from_gcs',
        configuration={
            'load': {
                'sourceUris': ['gs://data-lake/raw/orders/*.parquet'],
                'destinationTable': {
                    'projectId': 'project',
                    'datasetId': 'dataset',
                    'tableId': 'stg_orders',
                },
                'sourceFormat': 'PARQUET',
                'writeDisposition': 'WRITE_TRUNCATE',
                'timePartitioning': {
                    'type': 'DAY',
                    'field': 'order_date',
                },
                'clustering': {
                    'fields': ['customer_id'],
                },
            }
        },
        location='US',
    )

    transform = BigQueryInsertJobOperator(
        task_id='transform_orders',
        configuration={
            'query': {
                'query': """
                    INSERT INTO `project.dataset.fct_orders`
                    SELECT
                        order_id,
                        customer_id,
                        SUM(amount) as total_amount,
                        COUNT(*) as order_count,
                        MIN(order_date) as first_order_date
                    FROM `project.dataset.stg_orders`
                    WHERE order_date = DATE('{{ ds }}')
                    GROUP BY order_id, customer_id
                """,
                'useLegacySql': False,
            }
        },
        location='US',
    )

    create_table >> load_data >> transform

GCS to BigQuery Loading

from airflow.providers.google.cloud.operators.bigquery import (
    GCSToBigQueryOperator,
)

load_csv = GCSToBigQueryOperator(
    task_id='load_csv_to_bq',
    bucket='data-lake',
    source_objects=['raw/customers/*.csv'],
    destination_project_dataset_table='project.dataset.customers',
    source_format='CSV',
    skip_leading_rows=1,
    field_delimiter=',',
    write_disposition='WRITE_APPEND',
    create_disposition='CREATE_IF_NEEDED',
    schema_fields=[
        {'name': 'customer_id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
        {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'email', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'created_at', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'},
    ],
    location='US',
)

Dataset Management

from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryCreateDatasetOperator,
    BigQueryDeleteDatasetOperator,
    BigQueryUpdateDatasetOperator,
)

create_dataset = BigQueryCreateDatasetOperator(
    task_id='create_dataset',
    dataset_id='analytics',
    project_id='my-project',
    dataset_reference={
        'description': 'Analytics dataset',
        'location': 'US',
    },
)

update_dataset = BigQueryUpdateDatasetOperator(
    task_id='update_dataset',
    dataset_id='analytics',
    dataset_reference={
        'description': 'Updated analytics dataset',
    },
    fields_mask='description',
)

delete_dataset = BigQueryDeleteDatasetOperator(
    task_id='delete_staging',
    dataset_id='staging',
    delete_contents=True,
)

Sensors

from airflow.providers.google.cloud.sensors.bigquery import (
    BigQueryTableExistenceSensor,
    BigQueryInsertJobTrigger,
)

wait_for_table = BigQueryTableExistenceSensor(
    task_id='wait_for_table',
    project_id='my-project',
    dataset_id='dataset',
    table_id='orders',
    poke_interval=30,
    timeout=3600,
    mode='reschedule',
)

Partitioned Query with Parameters

from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryInsertJobOperator,
)

def generate_partition_queries(start_date: str, end_date: str) -> list:
    """Generate queries for date range partitioning."""
    queries = []
    current = datetime.strptime(start_date, '%Y-%m-%d')
    end = datetime.strptime(end_date, '%Y-%m-%d')

    while current <= end:
        date_str = current.strftime('%Y-%m-%d')
        query = f"""
            SELECT * FROM `project.dataset.orders`
            WHERE order_date = '{date_str}'
            AND status = 'completed'
        """
        queries.append((date_str, query))
        current += timedelta(days=1)

    return queries


with DAG(
    dag_id='partitioned_processing',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
) as dag:

    queries = generate_partition_queries('{{ ds }}', '{{ ds }}')
    prev_task = None

    for date_str, sql in queries:
        task = BigQueryInsertJobOperator(
            task_id=f'process_{date_str}',
            configuration={
                'query': {
                    'query': sql,
                    'useLegacySql': False,
                }
            },
            location='US',
        )
        if prev_task:
            prev_task >> task
        prev_task = task

Key Concepts Table

OperatorPurposeKey Parameters
BigQueryInsertJobOperatorExecute any BQ jobconfiguration, location
GCSToBigQueryOperatorLoad GCS files to BQbucket, source_objects, destination_project_dataset_table
BigQueryCopyTableOperatorCopy between tablessource_project_dataset_table, destination_project_dataset_table
BigQueryCreateDatasetOperatorCreate datasetdataset_id, project_id
BigQueryDeleteDatasetOperatorDelete datasetdataset_id, delete_contents
BigQueryTableExistenceSensorWait for tableproject_id, dataset_id, table_id

Partitioning Strategies

StrategyPartition KeyBest ForQuery Pruning
DAYDATE columnHigh-cardinality time seriesExcellent
MONTHDATE columnMedium-cardinality aggregatesGood
YEARDATE columnLow-cardinality historicalModerate
HOURTIMESTAMP columnReal-time analyticsExcellent
INGESTION_TIMELoad timestampWhen source lacks datesModerate
INTEGER_RANGEINT64 columnNon-temporal rangesGood

Best Practices

Query Optimization

  1. Always specify location to match your dataset's geographic location.
  2. Use partitioned tables for time-series data to reduce query costs and improve performance.
  3. Leverage clustering on frequently filtered columns for additional query optimization.

Data Loading

  1. Use WRITE_TRUNCATE for full table refreshes, WRITE_APPEND for incremental loads.
  2. Handle schema_fields explicitly for CSV/JSON loads to avoid schema detection overhead.
  3. Use create_disposition='CREATE_IF_NEEDED' for idempotent table creation.

Cost Management

  1. Set priority on jobs: INTERACTIVE for ad-hoc, BATCH for scheduled ETL.
  2. Monitor BQ job costs through configuration.dryRun and GCP billing exports.

Cost Optimization Strategies

StrategySavingsImplementation
Partitioning10-100xAdd PARTITION BY clause
Clustering2-10xAdd CLUSTER BY clause
Batch queries50%Use priority='BATCH'
Materialized views10-100xPre-compute frequent queries

Partition pruning reduces the amount of data scanned. Always include the partition column in WHERE clauses. For example: WHERE order_date = DATE('{{ ds }}') ensures only one partition is scanned instead of the entire table.

BigQuery charges based on bytes scanned. Using partitioned and clustered tables can reduce query costs by 10-100x. Combine with materialized views for frequently-run reports.

Key Takeaways:

  • BigQueryInsertJobOperator is the primary operator for all BQ operations (queries, loads, copies)
  • GCSToBigQueryOperator handles file loading from GCS to BigQuery
  • Partitioning reduces query scan from O(N) to O(N/P) where P is partition count
  • Clustering provides additional optimization for filtered columns
  • Always specify location to match dataset geography
  • Use sensors with mode='reschedule' for efficient table existence checks

See Also

⭐

Premium Content

BigQuery Provider Integration with Airflow

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Airflow Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement