πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Cross-Platform Integration: Spark, Airflow & dbt

🟒 Free Lesson

Advertisement

Cross-Platform Integration: Spark, Airflow & dbt

Architecture Diagram 1: Spark Integration Architecture

Architecture Diagram 2: Airflow Integration Architecture

Architecture Diagram 3: dbt Integration Architecture

Architecture Diagram 4: dbt Model DAG Execution

dbt Model DAG Execution FlowSourcesRaw tablesstg_ modelsStagingClean & standardize{{ source() }}IntermediateBusiness logicComplex joinsMartsFinal analytical tables{{ ref() }}DAG determines execution order | Selective run: --select model_name | Test after run

External Functions enable calling external APIs (REST, gRPC, custom logic) from SQL queries. They run as managed services outside Snowflake but integrate seamlessly, enabling data enrichment, external API calls, and integration with third-party systems without leaving Snowflake.

Snowpark Container Services run custom containers (Docker) directly on Snowflake, enabling any language, library, or runtime (ML models, ETL tools, custom applications) within Snowflake's managed infrastructure without external hosting.

Integration Latency Model
total_latency=network_rtt+processing_time+serializationtotal\_latency = network\_rtt + processing\_time + serialization

Data Pipeline Throughput

  • External table refresh: 100-1000 files/batch depending on size
  • Stream processing: Real-time micro-batches (1-5 second latency)
  • Batch ETL: 1-10 GB/s on XL warehouse (format-dependent)
  • Semi-structured: JSON/Avro 2-3Γ— slower than Parquet
  • Optimization: Partition files by date; use predicate pushdown on external tables

Use Snowpark Container Services for ML model serving and custom runtimes. Use External Functions for lightweight API integrations. Use External Tables with streams for near-real-time data lake ingestion.

  • External Functions: Call REST APIs, custom logic from SQL
  • Snowpark Containers: Run any Docker container on Snowflake infrastructure
  • External Tables: Query cloud storage files directly (Parquet, JSON, ORC)
  • Data sharing: Zero-copy sharing across Snowflake accounts
  • Stream processing: Real-time ingestion via streams + tasks + external functions


Detailed Explanation

Spark Integration

Large-scale data processing using Spark's distributed compute + Snowflake's managed platform.

  • Pushdown predicates β€” Spark filters translated to Snowflake SQL
  • Partition pruning β€” leverages micro-partition metadata
  • Data loading β€” COPY command for efficient bulk loading
  • Ideal for complex transformations requiring Spark-specific libraries

Airflow Integration

Orchestrate Snowflake pipelines via operators, hooks, and sensors.

ComponentPurpose
SnowflakeOperatorExecute SQL with parameterized queries
Transfer operatorsS3/Azure/GCS ↔ Snowflake data movement
HooksLow-level connection management

dbt Integration

SQL-based transformation framework natively integrated with Snowflake.

LayerPurposeMaterialization
StagingClean and standardize raw dataView
IntermediateCombine staging modelsView or Table
MartsFinal analytical tablesTable
  • Tests: nulls, uniqueness, referential integrity
  • {{ source() }} references raw data; {{ ref() }} references models

Integration Pattern Selection

Use CaseRecommended Tool
SQL transformationsdbt
Complex ETL with MLSpark + Snowflake
Workflow orchestrationAirflow
Real-time streamingSnowpipe + Streams
API integrationExternal Functions
Custom runtimesSnowpark Container Services

Key Takeaway: Use Snowpark over Spark when transformations can execute within Snowflake to minimize data movement.

Key Concepts Table

IntegrationPrimary UseStrengthsLimitations
SparkLarge-scale processingML libraries, complex transformsRequires Spark cluster
AirflowOrchestrationRich operators, schedulingSteep learning curve
dbtSQL transformationsVersion control, testingSQL-only transformations
ComponentPurposeConfiguration
Spark ConnectorData read/writeSnowflake connection options
Airflow OperatorTask executionSQL, parameters, connections
dbt ModelData transformationSQL, Jinja templating
MetricSparkAirflowdbt
Setup ComplexityHighMediumLow
Learning CurveHighMediumLow
FlexibilityVery HighHighMedium
PerformanceVery HighN/AHigh

Code Examples

Apache Spark Integration

# Example 1: Spark-Snowflake read with configuration options
from pyspark.sql import SparkSession

# Spark session with Snowflake connector
# The connector JAR handles all Snowflake communication
spark = SparkSession.builder \
    .appName("SnowflakeIntegration") \
    .config("spark.jars.packages", 
            "net.snowflake:spark-snowflake_2.12:2.13.0-spark_3.3") \
    .getOrCreate()

# Read from Snowflake
# Parameters:
#   url: Snowflake account URL
#   user/password: Authentication credentials
#   db/schema/warehouse: Snowflake context
#   dbtable: Source table (mutually exclusive with query)
#   query: Custom SQL query (mutually exclusive with dbtable)
df = spark.read \
    .format("snowflake") \
    .option("url", "myaccount.snowflakecomputing.com") \
    .option("user", "myuser") \
    .option("password", "mypassword") \
    .option("db", "ANALYTICS_DB") \
    .option("schema", "PUBLIC") \
    .option("warehouse", "ANALYTICS_WH") \
    .option("dbtable", "sales_data") \
    .load()

# Transform and write back
# mode options: "overwrite" (replace), "append" (add), "ignore" (skip if exists), "error" (fail)
transformed_df = df.filter("amount > 1000") \
    .groupBy("region") \
    .agg({"amount": "sum", "quantity": "count"})

transformed_df.write \
    .format("snowflake") \
    .option("dbtable", "sales_summary") \
    .mode("overwrite") \
    .save()

# Alternative: Use query instead of dbtable for custom reads
df_custom = spark.read \
    .format("snowflake") \
    .option("query", "SELECT * FROM sales WHERE amount > 1000") \
    .load()

# Pushdown predicates (Snowflake executes these filters)
df_filtered = spark.read \
    .format("snowflake") \
    .option("dbtable", "sales_data") \
    .load()
df_filtered = df_filtered.filter(df_filtered["amount"] > 5000)  # Pushed to Snowflake

Apache Airflow Integration

# Example 2: Airflow DAG for Snowflake pipeline
from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data_team',
    'depends_on_past': False,           # Don't wait for previous run
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'email': ['data-team@company.com'],
    'retries': 2,                        # Retry twice on failure
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),  # Kill after 2 hours
}

with DAG('snowflake_etl_pipeline',
         default_args=default_args,
         schedule_interval='@daily',     # Run daily at midnight
         catchup=False,                   # Don't backfill missed runs
         max_active_runs=1) as dag:      # One run at a time

    # Extract: Load raw data from staging
    extract_data = SnowflakeOperator(
        task_id='extract_raw_data',
        sql="""
            INSERT INTO raw_sales 
            SELECT * FROM staging_sales 
            WHERE load_date = '{{ ds }}'
        """,
        snowflake_conn_id='snowflake_default',  # Connection ID from Airflow
        warehouse='ETL_WH',
        database='RAW_DB',
        schema='PUBLIC'
    )

    # Validate: Check row count
    validate_data = SnowflakeOperator(
        task_id='validate_data',
        sql="""
            SELECT COUNT(*) as row_count 
            FROM raw_sales 
            WHERE load_date = '{{ ds }}'
        """,
        snowflake_conn_id='snowflake_default'
    )

    # Transform: Run dbt or custom SQL
    transform_data = SnowflakeOperator(
        task_id='transform_data',
        sql="transform_query.sql",        # External SQL file
        parameters={'execution_date': '{{ ds }}'},
        snowflake_conn_id='snowflake_default'
    )

    # Define dependencies (task ordering)
    extract_data >> validate_data >> transform_data
# Example 3: Airflow custom operator for Snowflake data quality
from airflow.models import BaseOperator
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.utils.decorators import apply_defaults

class SnowflakeDataQualityOperator(BaseOperator):
    """Custom operator to validate Snowflake data quality."""
    
    @apply_defaults
    def __init__(
        self,
        sql,
        expected_result,
        comparison='equals',       # 'equals', 'greater_than', 'less_than'
        snowflake_conn_id='snowflake_default',
        *args, **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.sql = sql
        self.expected_result = expected_result
        self.comparison = comparison
        self.snowflake_conn_id = snowflake_conn_id
    
    def execute(self, context):
        hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id)
        result = hook.get_first(self.sql)
        actual = result[0]
        
        if self.comparison == 'equals':
            if actual != self.expected_result:
                raise ValueError(
                    f"Data quality check failed: {actual} != {self.expected_result}"
                )
        elif self.comparison == 'greater_than':
            if actual <= self.expected_result:
                raise ValueError(
                    f"Data quality check failed: {actual} <= {self.expected_result}"
                )
        
        self.log.info(f"Quality check passed: {actual} {self.comparison} {self.expected_result}")
        return actual

# Usage in DAG
quality_check = SnowflakeDataQualityOperator(
    task_id='check_row_count',
    sql="SELECT COUNT(*) FROM fct_orders WHERE order_date = CURRENT_DATE()",
    expected_result=1000,
    comparison='greater_than',
    snowflake_conn_id='snowflake_default'
)

dbt Integration

-- Example 4: dbt staging model (models/staging/stg_orders.sql)
-- Staging models clean and standardize raw data
-- {{ source() }} macro references the source definition in sources.yml
with source as (
    select * from {{ source('raw', 'orders') }}
),

renamed as (
    select
        order_id,                                              -- Keep as-is
        customer_id,                                           -- Keep as-is
        order_date,                                            -- Keep as-is
        amount as order_amount,                                -- Rename for clarity
        status as order_status,                                -- Rename for clarity
        created_at,
        updated_at
    from source
)

select * from renamed

-- Example 5: dbt mart model (models/marts/fct_orders.sql)
-- Mart models create final analytical tables
-- {{ ref() }} macro references other models (builds dependency graph)
with orders as (
    select * from {{ ref('stg_orders') }}
),

customers as (
    select * from {{ ref('dim_customers') }}
),

final as (
    select
        orders.order_id,
        orders.customer_id,
        customers.customer_name,
        customers.customer_segment,
        orders.order_date,
        orders.order_amount,
        orders.order_status,
        DATE_TRUNC('month', orders.order_date) as order_month
    from orders
    left join customers 
        on orders.customer_id = customers.customer_id
)

select * from final
# Example 6: dbt source definition (models/sources.yml)
version: 2

sources:
  - name: raw
    database: RAW_DB
    schema: PUBLIC
    tables:
      - name: orders
        loaded_at_field: updated_at       # Freshness check column
        freshness:
          warn_after: {count: 12, period: hour}   # Warn if > 12 hours stale
          error_after: {count: 24, period: hour}  # Error if > 24 hours stale
        columns:
          - name: order_id
            tests:
              - unique
              - not_null
          - name: customer_id
            tests:
              - not_null
              - relationships:
                  to: ref('dim_customers')
                  field: customer_id
      - name: customers
        columns:
          - name: customer_id
            tests:
              - unique
              - not_null
-- Example 7: dbt test (tests/not_null_order_id.sql)
-- Singular tests are custom SQL queries that return failing rows
SELECT *
FROM {{ ref('fct_orders') }}
WHERE order_id IS NULL

-- Example 8: dbt generic test (tests/greater_than_zero.sql)
-- Generic tests accept arguments and can be reused
{% test greater_than_zero(model, column_name) %}

select *
from {{ model }}
where {{ column_name }} <= 0

{% endtest %}

-- Usage in schema YAML:
-- tests:
--   - greater_than_zero

Snowflake-Specific Integration Patterns

-- Example 9: External function for API integration
-- External functions call external APIs from SQL
CREATE OR REPLACE EXTERNAL FUNCTION call_enrichment_api(data VARIANT)
RETURNS VARIANT
LANGUAGE JAVA
HANDLER = 'EnrichmentHandler'
ENDPOINT = 'https://api.example.com/enrich'
API_INTEGRATION = my_api_integration;

-- Use external function in queries
SELECT 
    customer_id,
    name,
    call_enrichment_api(OBJECT_CONSTRUCT('email', email)) AS enriched_data
FROM customers;

-- Example 10: Snowpipe for continuous data loading
-- Snowpipe auto-ingests files from cloud storage
CREATE PIPE orders_pipe
    AUTO_INGEST = TRUE
    INTEGRATION = 's3_notification'
    AS
    COPY INTO orders_raw
    FROM @orders_stage
    FILE_FORMAT = (TYPE = 'PARQUET');

-- Monitor pipe status
SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_LOAD_HISTORY(
    PIPE_NAME => 'orders_pipe',
    START_TIME => DATEADD(day, -1, CURRENT_TIMESTAMP())
));
# Example 11: Python connector for application integration
import snowflake.connector
from snowflake.connector import DictCursor

# Connect to Snowflake
conn = snowflake.connector.connect(
    user='your_user',
    password='your_password',
    account='your_account',
    warehouse='COMPUTE_WH',
    database='ANALYTICS_DB',
    schema='PUBLIC'
)

# Execute queries
cursor = conn.cursor(DictCursor)
cursor.execute("SELECT * FROM sales_data WHERE amount > %s", (1000,))
for row in cursor:
    print(f"Order: {row['ORDER_ID']}, Amount: {row['AMOUNT']}")

# Batch insert
cursor.executemany(
    "INSERT INTO staging_data (id, value) VALUES (%s, %s)",
    [(1, 'a'), (2, 'b'), (3, 'c')]
)

conn.close()

Performance Metrics

MetricSparkAirflowdbt
Setup Time2-4 hours1-2 hours30 min
Learning Curve2-4 weeks1-2 weeks3-5 days
Pipeline SpeedVery FastModerateFast
MaintenanceHighMediumLow

Best Practices

  1. Choose the right tool: Use Spark for complex transformations requiring ML libraries, Airflow for orchestration, and dbt for SQL transformations.

  2. Use Snowpark when possible: Prefer Snowpark over Spark for transformations that can execute within Snowflake to minimize data movement.

  3. Implement incremental processing: Use dbt incremental models and Airflow backfill capabilities to process only new data.

  4. Test thoroughly: Implement data quality tests in dbt and validation checks in Airflow to catch issues early.

  5. Monitor pipelines: Track pipeline execution times, error rates, and resource usage for optimization.

  6. Version control: Use Git for all pipeline code, including dbt models, Airflow DAGs, and Spark applications.

  7. Document dependencies: Maintain clear documentation of data lineage, dependencies, and transformation logic.

  8. Optimize performance: Use appropriate partitioning, clustering, and warehouse sizing for each pipeline component.

  9. Implement error handling: Add retry logic, alerting, and fallback mechanisms for production pipelines.

  10. Regular reviews: Conduct weekly pipeline reviews to identify optimization opportunities and address issues proactively.


Additional Theory: Integration Pattern Selection

Choosing the right integration tool depends on your use case:

Use CaseRecommended ToolWhy
SQL transformationsdbtVersion control, testing, documentation
Complex ETL with MLSpark + SnowflakeML libraries, large-scale processing
Workflow orchestrationAirflowRich operators, scheduling, dependency management
Real-time streamingSnowpipe + StreamsContinuous ingestion, low-latency CDC
API integrationExternal FunctionsCall external services from SQL
Custom runtimesSnowpark Container ServicesRun any Docker container on Snowflake

Decision matrix:

  1. Is it SQL-only? β†’ Use dbt
  2. Does it need ML libraries? β†’ Use Spark or Snowpark
  3. Does it need scheduling/orchestration? β†’ Use Airflow or Snowflake Tasks
  4. Does it need real-time? β†’ Use Streams + Tasks
  5. Does it need external APIs? β†’ Use External Functions

Additional Theory: dbt Model Layering

dbt follows a layered architecture for data transformations:

LayerPurposeMaterializationNaming Convention
StagingClean and standardizeViewstg_ prefix
IntermediateBusiness logicView or Tableint_ prefix
MartsAnalytical tablesTablefct_/dim_ prefix
SeedsReference dataTableCSV files in seeds/

Key dbt concepts:

  • {{ source() }}: References raw data sources (external tables)
  • {{ ref() }}: References other dbt models (builds dependency graph)
  • Tests: Validate data quality (nulls, uniqueness, relationships)
  • Documentation: Auto-generated from YAML schema definitions

See Also

⭐

Premium Content

Cross-Platform Integration: Spark, Airflow & dbt

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Snowflake Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement