Cross-Platform Integration: Spark, Airflow & dbt
Architecture Diagram 1: Spark Integration Architecture
Architecture Diagram 2: Airflow Integration Architecture
Architecture Diagram 3: dbt Integration Architecture
Architecture Diagram 4: dbt Model DAG Execution
External Functions enable calling external APIs (REST, gRPC, custom logic) from SQL queries. They run as managed services outside Snowflake but integrate seamlessly, enabling data enrichment, external API calls, and integration with third-party systems without leaving Snowflake.
Snowpark Container Services run custom containers (Docker) directly on Snowflake, enabling any language, library, or runtime (ML models, ETL tools, custom applications) within Snowflake's managed infrastructure without external hosting.
Data Pipeline Throughput
- External table refresh: 100-1000 files/batch depending on size
- Stream processing: Real-time micro-batches (1-5 second latency)
- Batch ETL: 1-10 GB/s on XL warehouse (format-dependent)
- Semi-structured: JSON/Avro 2-3Γ slower than Parquet
- Optimization: Partition files by date; use predicate pushdown on external tables
Use Snowpark Container Services for ML model serving and custom runtimes. Use External Functions for lightweight API integrations. Use External Tables with streams for near-real-time data lake ingestion.
- External Functions: Call REST APIs, custom logic from SQL
- Snowpark Containers: Run any Docker container on Snowflake infrastructure
- External Tables: Query cloud storage files directly (Parquet, JSON, ORC)
- Data sharing: Zero-copy sharing across Snowflake accounts
- Stream processing: Real-time ingestion via streams + tasks + external functions
Detailed Explanation
Spark Integration
Large-scale data processing using Spark's distributed compute + Snowflake's managed platform.
- Pushdown predicates β Spark filters translated to Snowflake SQL
- Partition pruning β leverages micro-partition metadata
- Data loading β COPY command for efficient bulk loading
- Ideal for complex transformations requiring Spark-specific libraries
Airflow Integration
Orchestrate Snowflake pipelines via operators, hooks, and sensors.
| Component | Purpose |
|---|---|
| SnowflakeOperator | Execute SQL with parameterized queries |
| Transfer operators | S3/Azure/GCS β Snowflake data movement |
| Hooks | Low-level connection management |
dbt Integration
SQL-based transformation framework natively integrated with Snowflake.
| Layer | Purpose | Materialization |
|---|---|---|
| Staging | Clean and standardize raw data | View |
| Intermediate | Combine staging models | View or Table |
| Marts | Final analytical tables | Table |
- Tests: nulls, uniqueness, referential integrity
{{ source() }}references raw data;{{ ref() }}references models
Integration Pattern Selection
| Use Case | Recommended Tool |
|---|---|
| SQL transformations | dbt |
| Complex ETL with ML | Spark + Snowflake |
| Workflow orchestration | Airflow |
| Real-time streaming | Snowpipe + Streams |
| API integration | External Functions |
| Custom runtimes | Snowpark Container Services |
Key Takeaway: Use Snowpark over Spark when transformations can execute within Snowflake to minimize data movement.
Key Concepts Table
| Integration | Primary Use | Strengths | Limitations |
|---|---|---|---|
| Spark | Large-scale processing | ML libraries, complex transforms | Requires Spark cluster |
| Airflow | Orchestration | Rich operators, scheduling | Steep learning curve |
| dbt | SQL transformations | Version control, testing | SQL-only transformations |
| Component | Purpose | Configuration |
|---|---|---|
| Spark Connector | Data read/write | Snowflake connection options |
| Airflow Operator | Task execution | SQL, parameters, connections |
| dbt Model | Data transformation | SQL, Jinja templating |
| Metric | Spark | Airflow | dbt |
|---|---|---|---|
| Setup Complexity | High | Medium | Low |
| Learning Curve | High | Medium | Low |
| Flexibility | Very High | High | Medium |
| Performance | Very High | N/A | High |
Code Examples
Apache Spark Integration
# Example 1: Spark-Snowflake read with configuration options
from pyspark.sql import SparkSession
# Spark session with Snowflake connector
# The connector JAR handles all Snowflake communication
spark = SparkSession.builder \
.appName("SnowflakeIntegration") \
.config("spark.jars.packages",
"net.snowflake:spark-snowflake_2.12:2.13.0-spark_3.3") \
.getOrCreate()
# Read from Snowflake
# Parameters:
# url: Snowflake account URL
# user/password: Authentication credentials
# db/schema/warehouse: Snowflake context
# dbtable: Source table (mutually exclusive with query)
# query: Custom SQL query (mutually exclusive with dbtable)
df = spark.read \
.format("snowflake") \
.option("url", "myaccount.snowflakecomputing.com") \
.option("user", "myuser") \
.option("password", "mypassword") \
.option("db", "ANALYTICS_DB") \
.option("schema", "PUBLIC") \
.option("warehouse", "ANALYTICS_WH") \
.option("dbtable", "sales_data") \
.load()
# Transform and write back
# mode options: "overwrite" (replace), "append" (add), "ignore" (skip if exists), "error" (fail)
transformed_df = df.filter("amount > 1000") \
.groupBy("region") \
.agg({"amount": "sum", "quantity": "count"})
transformed_df.write \
.format("snowflake") \
.option("dbtable", "sales_summary") \
.mode("overwrite") \
.save()
# Alternative: Use query instead of dbtable for custom reads
df_custom = spark.read \
.format("snowflake") \
.option("query", "SELECT * FROM sales WHERE amount > 1000") \
.load()
# Pushdown predicates (Snowflake executes these filters)
df_filtered = spark.read \
.format("snowflake") \
.option("dbtable", "sales_data") \
.load()
df_filtered = df_filtered.filter(df_filtered["amount"] > 5000) # Pushed to Snowflake
Apache Airflow Integration
# Example 2: Airflow DAG for Snowflake pipeline
from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data_team',
'depends_on_past': False, # Don't wait for previous run
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email': ['data-team@company.com'],
'retries': 2, # Retry twice on failure
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2), # Kill after 2 hours
}
with DAG('snowflake_etl_pipeline',
default_args=default_args,
schedule_interval='@daily', # Run daily at midnight
catchup=False, # Don't backfill missed runs
max_active_runs=1) as dag: # One run at a time
# Extract: Load raw data from staging
extract_data = SnowflakeOperator(
task_id='extract_raw_data',
sql="""
INSERT INTO raw_sales
SELECT * FROM staging_sales
WHERE load_date = '{{ ds }}'
""",
snowflake_conn_id='snowflake_default', # Connection ID from Airflow
warehouse='ETL_WH',
database='RAW_DB',
schema='PUBLIC'
)
# Validate: Check row count
validate_data = SnowflakeOperator(
task_id='validate_data',
sql="""
SELECT COUNT(*) as row_count
FROM raw_sales
WHERE load_date = '{{ ds }}'
""",
snowflake_conn_id='snowflake_default'
)
# Transform: Run dbt or custom SQL
transform_data = SnowflakeOperator(
task_id='transform_data',
sql="transform_query.sql", # External SQL file
parameters={'execution_date': '{{ ds }}'},
snowflake_conn_id='snowflake_default'
)
# Define dependencies (task ordering)
extract_data >> validate_data >> transform_data
# Example 3: Airflow custom operator for Snowflake data quality
from airflow.models import BaseOperator
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.utils.decorators import apply_defaults
class SnowflakeDataQualityOperator(BaseOperator):
"""Custom operator to validate Snowflake data quality."""
@apply_defaults
def __init__(
self,
sql,
expected_result,
comparison='equals', # 'equals', 'greater_than', 'less_than'
snowflake_conn_id='snowflake_default',
*args, **kwargs
):
super().__init__(*args, **kwargs)
self.sql = sql
self.expected_result = expected_result
self.comparison = comparison
self.snowflake_conn_id = snowflake_conn_id
def execute(self, context):
hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id)
result = hook.get_first(self.sql)
actual = result[0]
if self.comparison == 'equals':
if actual != self.expected_result:
raise ValueError(
f"Data quality check failed: {actual} != {self.expected_result}"
)
elif self.comparison == 'greater_than':
if actual <= self.expected_result:
raise ValueError(
f"Data quality check failed: {actual} <= {self.expected_result}"
)
self.log.info(f"Quality check passed: {actual} {self.comparison} {self.expected_result}")
return actual
# Usage in DAG
quality_check = SnowflakeDataQualityOperator(
task_id='check_row_count',
sql="SELECT COUNT(*) FROM fct_orders WHERE order_date = CURRENT_DATE()",
expected_result=1000,
comparison='greater_than',
snowflake_conn_id='snowflake_default'
)
dbt Integration
-- Example 4: dbt staging model (models/staging/stg_orders.sql)
-- Staging models clean and standardize raw data
-- {{ source() }} macro references the source definition in sources.yml
with source as (
select * from {{ source('raw', 'orders') }}
),
renamed as (
select
order_id, -- Keep as-is
customer_id, -- Keep as-is
order_date, -- Keep as-is
amount as order_amount, -- Rename for clarity
status as order_status, -- Rename for clarity
created_at,
updated_at
from source
)
select * from renamed
-- Example 5: dbt mart model (models/marts/fct_orders.sql)
-- Mart models create final analytical tables
-- {{ ref() }} macro references other models (builds dependency graph)
with orders as (
select * from {{ ref('stg_orders') }}
),
customers as (
select * from {{ ref('dim_customers') }}
),
final as (
select
orders.order_id,
orders.customer_id,
customers.customer_name,
customers.customer_segment,
orders.order_date,
orders.order_amount,
orders.order_status,
DATE_TRUNC('month', orders.order_date) as order_month
from orders
left join customers
on orders.customer_id = customers.customer_id
)
select * from final
# Example 6: dbt source definition (models/sources.yml)
version: 2
sources:
- name: raw
database: RAW_DB
schema: PUBLIC
tables:
- name: orders
loaded_at_field: updated_at # Freshness check column
freshness:
warn_after: {count: 12, period: hour} # Warn if > 12 hours stale
error_after: {count: 24, period: hour} # Error if > 24 hours stale
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: customers
columns:
- name: customer_id
tests:
- unique
- not_null
-- Example 7: dbt test (tests/not_null_order_id.sql)
-- Singular tests are custom SQL queries that return failing rows
SELECT *
FROM {{ ref('fct_orders') }}
WHERE order_id IS NULL
-- Example 8: dbt generic test (tests/greater_than_zero.sql)
-- Generic tests accept arguments and can be reused
{% test greater_than_zero(model, column_name) %}
select *
from {{ model }}
where {{ column_name }} <= 0
{% endtest %}
-- Usage in schema YAML:
-- tests:
-- - greater_than_zero
Snowflake-Specific Integration Patterns
-- Example 9: External function for API integration
-- External functions call external APIs from SQL
CREATE OR REPLACE EXTERNAL FUNCTION call_enrichment_api(data VARIANT)
RETURNS VARIANT
LANGUAGE JAVA
HANDLER = 'EnrichmentHandler'
ENDPOINT = 'https://api.example.com/enrich'
API_INTEGRATION = my_api_integration;
-- Use external function in queries
SELECT
customer_id,
name,
call_enrichment_api(OBJECT_CONSTRUCT('email', email)) AS enriched_data
FROM customers;
-- Example 10: Snowpipe for continuous data loading
-- Snowpipe auto-ingests files from cloud storage
CREATE PIPE orders_pipe
AUTO_INGEST = TRUE
INTEGRATION = 's3_notification'
AS
COPY INTO orders_raw
FROM @orders_stage
FILE_FORMAT = (TYPE = 'PARQUET');
-- Monitor pipe status
SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_LOAD_HISTORY(
PIPE_NAME => 'orders_pipe',
START_TIME => DATEADD(day, -1, CURRENT_TIMESTAMP())
));
# Example 11: Python connector for application integration
import snowflake.connector
from snowflake.connector import DictCursor
# Connect to Snowflake
conn = snowflake.connector.connect(
user='your_user',
password='your_password',
account='your_account',
warehouse='COMPUTE_WH',
database='ANALYTICS_DB',
schema='PUBLIC'
)
# Execute queries
cursor = conn.cursor(DictCursor)
cursor.execute("SELECT * FROM sales_data WHERE amount > %s", (1000,))
for row in cursor:
print(f"Order: {row['ORDER_ID']}, Amount: {row['AMOUNT']}")
# Batch insert
cursor.executemany(
"INSERT INTO staging_data (id, value) VALUES (%s, %s)",
[(1, 'a'), (2, 'b'), (3, 'c')]
)
conn.close()
Performance Metrics
| Metric | Spark | Airflow | dbt |
|---|---|---|---|
| Setup Time | 2-4 hours | 1-2 hours | 30 min |
| Learning Curve | 2-4 weeks | 1-2 weeks | 3-5 days |
| Pipeline Speed | Very Fast | Moderate | Fast |
| Maintenance | High | Medium | Low |
Best Practices
-
Choose the right tool: Use Spark for complex transformations requiring ML libraries, Airflow for orchestration, and dbt for SQL transformations.
-
Use Snowpark when possible: Prefer Snowpark over Spark for transformations that can execute within Snowflake to minimize data movement.
-
Implement incremental processing: Use dbt incremental models and Airflow backfill capabilities to process only new data.
-
Test thoroughly: Implement data quality tests in dbt and validation checks in Airflow to catch issues early.
-
Monitor pipelines: Track pipeline execution times, error rates, and resource usage for optimization.
-
Version control: Use Git for all pipeline code, including dbt models, Airflow DAGs, and Spark applications.
-
Document dependencies: Maintain clear documentation of data lineage, dependencies, and transformation logic.
-
Optimize performance: Use appropriate partitioning, clustering, and warehouse sizing for each pipeline component.
-
Implement error handling: Add retry logic, alerting, and fallback mechanisms for production pipelines.
-
Regular reviews: Conduct weekly pipeline reviews to identify optimization opportunities and address issues proactively.
Additional Theory: Integration Pattern Selection
Choosing the right integration tool depends on your use case:
| Use Case | Recommended Tool | Why |
|---|---|---|
| SQL transformations | dbt | Version control, testing, documentation |
| Complex ETL with ML | Spark + Snowflake | ML libraries, large-scale processing |
| Workflow orchestration | Airflow | Rich operators, scheduling, dependency management |
| Real-time streaming | Snowpipe + Streams | Continuous ingestion, low-latency CDC |
| API integration | External Functions | Call external services from SQL |
| Custom runtimes | Snowpark Container Services | Run any Docker container on Snowflake |
Decision matrix:
- Is it SQL-only? β Use dbt
- Does it need ML libraries? β Use Spark or Snowpark
- Does it need scheduling/orchestration? β Use Airflow or Snowflake Tasks
- Does it need real-time? β Use Streams + Tasks
- Does it need external APIs? β Use External Functions
Additional Theory: dbt Model Layering
dbt follows a layered architecture for data transformations:
| Layer | Purpose | Materialization | Naming Convention |
|---|---|---|---|
| Staging | Clean and standardize | View | stg_ prefix |
| Intermediate | Business logic | View or Table | int_ prefix |
| Marts | Analytical tables | Table | fct_/dim_ prefix |
| Seeds | Reference data | Table | CSV files in seeds/ |
Key dbt concepts:
{{ source() }}: References raw data sources (external tables){{ ref() }}: References other dbt models (builds dependency graph)- Tests: Validate data quality (nulls, uniqueness, relationships)
- Documentation: Auto-generated from YAML schema definitions
See Also
- 11-Snowpark-Python - Snowpark as alternative to Spark
- 09-Streams-Tasks - Snowflake-native orchestration
- 06-External-Tables-Integration - External table patterns
- PySpark Iceberg - Spark integration patterns
- Delta Lake on Databricks - Delta Lake integration
- Data Warehouse Concepts - Data warehouse design principles