Advanced Snowflake: Building Production Data Pipelines
Beyond basic warehousing, Snowflake provides a rich ecosystem for building end-to-end data pipelines within the platform.
Why Advanced Snowflake Matters
Key Benefits:
- Incremental processing β Streams track row-level changes automatically
- Orchestration β Tasks schedule and chain pipeline steps
- Real-time ingestion β Snowpipe handles continuous data loading
- No external tools β reduces operational complexity and cost
Key Insight: Snowflake's native pipeline primitives eliminate the need for external orchestration tools in many scenarios.
Architecture Overview
Snowflake Pipeline Flow
Streams: Change Data Capture
A Stream is a Snowflake object that records DML changes (INSERT, UPDATE, DELETE) made to a source table. Streams enable incremental data processing by tracking the row-level change metadata, allowing downstream consumers to process only changed rows.
Stream Metadata Columns
Each stream record includes:
- METADATA$ACTION: 'INSERT' or 'DELETE' (the type of change)
- METADATA$ISUPDATE: TRUE if the row was part of an UPDATE operation
- METADATA$ROW_ID: Unique identifier for the changed row
- METADATA$SNAPSHOT_ID: Identifier for the transaction snapshot
- Change Volume: Ξ_rows = |INSERT_rows| + |DELETE_rows|
- Net Change: Net = INSERT_rows - DELETE_rows
-- Create source table
CREATE TABLE raw_orders (
order_id INT PRIMARY KEY,
customer_id INT,
amount DECIMAL(12,2),
status VARCHAR(20),
updated_at TIMESTAMP
);
-- Create stream on source table (append-only)
CREATE STREAM raw_orders_stream
ON TABLE raw_orders
APPEND_ONLY = FALSE -- Track all DML changes
SHOW_INITIAL_ROWS = FALSE; -- Don't capture initial state
-- Simulate DML changes
INSERT INTO raw_orders VALUES (1, 101, 250.00, 'pending', CURRENT_TIMESTAMP());
INSERT INTO raw_orders VALUES (2, 102, 180.50, 'pending', CURRENT_TIMESTAMP());
UPDATE raw_orders SET status = 'shipped', amount = 275.00 WHERE order_id = 1;
DELETE FROM raw_orders WHERE order_id = 2;
-- Query stream to see changes
SELECT
order_id,
customer_id,
amount,
status,
METADATA$ACTION AS change_action,
METADATA$ISUPDATE AS is_update,
METADATA$ROW_ID AS row_id
FROM raw_orders_stream;
-- Use stream for incremental load into target table
MERGE INTO dim_orders_target AS target
USING (
SELECT
order_id,
customer_id,
amount,
status,
updated_at
FROM raw_orders_stream
WHERE METADATA$ACTION = 'INSERT'
) AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN
UPDATE SET
amount = source.amount,
status = source.status,
updated_at = source.updated_at
WHEN NOT MATCHED THEN
INSERT (order_id, customer_id, amount, status, updated_at)
VALUES (source.order_id, source.customer_id, source.amount,
source.status, source.updated_at);
-- Consume stream and get watermark
SELECT SYSTEM$STREAM_GET_TABLE_TIMESTAMP('raw_orders_stream');
-- Drop stream after consumption
DROP STREAM IF EXISTS raw_orders_stream;
Tasks: Scheduled Execution
A Task is a Snowflake object that executes a single SQL statement or stored procedure on a defined schedule. Tasks form the backbone of scheduled data pipelines within Snowflake.
-- Simple task: daily aggregation
CREATE OR REPLACE TASK daily_sales_aggregate
WAREHOUSE = 'etl_wh'
SCHEDULE = 'USING CRON 0 2 * * * UTC' -- 2 AM UTC daily
COMMENT = 'Aggregate daily sales metrics'
AS
INSERT INTO mart_daily_sales (date_key, total_revenue, order_count)
SELECT
sale_date,
SUM(amount) AS total_revenue,
COUNT(*) AS order_count
FROM fact_sales
WHERE sale_date = CURRENT_DATE() - 1
GROUP BY sale_date;
-- Task dependency chain (DAG)
CREATE OR REPLACE TASK extract_raw_data
WAREHOUSE = 'etl_wh'
SCHEDULE = 'USING CRON 0 1 * * * UTC'
AS
COPY INTO raw_orders
FROM @my_stage/orders/
FILE_FORMAT = (TYPE = PARQUET)
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
ON_ERROR = 'SKIP_FILE';
CREATE OR REPLACE TASK transform_orders
WAREHOUSE = 'etl_wh'
AFTER = extract_raw_data -- Depends on extract task
AS
INSERT INTO staging_orders
SELECT * FROM raw_orders
WHERE order_date >= CURRENT_DATE() - 1;
CREATE OR REPLACE TASK load_mart
WAREHOUSE = 'etl_wh'
AFTER = transform_orders -- Depends on transform task
AS
INSERT INTO mart_orders
SELECT * FROM staging_orders
WHERE processed = FALSE;
-- Enable task chain
ALTER TASK load_mart RESUME;
ALTER TASK transform_orders RESUME;
ALTER TASK extract_raw_data RESUME;
-- Monitor task execution
SELECT
task_name,
query_id,
state,
error_code,
error_message,
scheduled_time,
completed_time,
DATEDIFF(second, scheduled_time, completed_time) AS duration_sec
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
SCHEDULED_TIME_RANGE_START => DATEADD(day, -7, CURRENT_DATE())
))
ORDER BY scheduled_time DESC;
-- Multi-action task with error handling
CREATE OR REPLACE TASK pipeline_orchestrator
WAREHOUSE = 'etl_wh'
SCHEDULE = 'USING CRON 0 6 * * * UTC'
ERROR_INTEGRATION = 'my_notification_integration'
AS
BEGIN
-- Step 1: Load staging
COPY INTO staging.raw_events
FROM @my_stage/events/
FILE_FORMAT = (TYPE = PARQUET)
ON_ERROR = 'SKIP_FILE';
-- Step 2: Transform
INSERT INTO processed.events
SELECT
event_id,
PARSE_JSON(payload):user_id::STRING AS user_id,
event_type,
event_timestamp
FROM staging.raw_events
WHERE _load_date = CURRENT_DATE();
-- Step 3: Aggregate
MERGE INTO mart.event_metrics AS target
USING (
SELECT
DATE_TRUNC('hour', event_timestamp) AS event_hour,
event_type,
COUNT(*) AS event_count,
COUNT(DISTINCT user_id) AS unique_users
FROM processed.events
WHERE event_timestamp >= CURRENT_DATE() - INTERVAL '1 day'
GROUP BY 1, 2
) AS source
ON target.event_hour = source.event_hour
AND target.event_type = source.event_type
WHEN MATCHED THEN UPDATE SET
event_count = source.event_count,
unique_users = source.unique_users
WHEN NOT MATCHED THEN INSERT VALUES (
source.event_hour, source.event_type,
source.event_count, source.unique_users
);
END;
Dynamic Tables
A Dynamic Table is a Snowflake object that automatically refreshes on a defined schedule, maintaining the results of a query. Unlike materialized views, Dynamic Tables support joins, aggregations, and complex transformations with explicit refresh control.
-- Create a dynamic table for real-time metrics
CREATE OR REPLACE DYNAMIC TABLE dt_customer_lifetime_value
TARGET_LAG = '5 minutes' -- Refresh at most every 5 minutes
WAREHOUSE = 'analytics_wh'
AS
SELECT
c.customer_key,
c.customer_id,
c.full_name,
c.segment,
COUNT(DISTINCT f.order_key) AS total_orders,
SUM(f.net_amount) AS lifetime_revenue,
AVG(f.net_amount) AS avg_order_value,
MIN(f.order_date) AS first_order_date,
MAX(f.order_date) AS last_order_date,
DATEDIFF(day, MIN(f.order_date), MAX(f.order_date)) AS customer_tenure_days,
CURRENT_TIMESTAMP() AS refreshed_at
FROM dim_customer c
LEFT JOIN fact_orders f ON c.customer_key = f.customer_key
WHERE c.is_current = TRUE
GROUP BY c.customer_key, c.customer_id, c.full_name, c.segment;
-- Dynamic table with window functions
CREATE OR REPLACE DYNAMIC TABLE dt_running_totals
TARGET_LAG = '1 hour'
WAREHOUSE = 'analytics_wh'
AS
SELECT
sale_date,
daily_revenue,
SUM(daily_revenue) OVER (ORDER BY sale_date) AS cumulative_revenue,
AVG(daily_revenue) OVER (
ORDER BY sale_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS rolling_7day_avg,
RANK() OVER (ORDER BY daily_revenue DESC) AS revenue_rank
FROM (
SELECT
sale_date,
SUM(net_amount) AS daily_revenue
FROM fact_sales
GROUP BY sale_date
);
-- Monitor dynamic table refresh
SELECT
name,
state,
refresh_start_time,
refresh_end_time,
DATEDIFF(second, refresh_start_time, refresh_end_time) AS refresh_duration_sec,
rows_Refreshed
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY(
TABLE_NAME => 'dt_customer_lifetime_value'
))
ORDER BY refresh_start_time DESC;
-- Alter dynamic table settings
ALTER DYNAMIC TABLE dt_customer_lifetime_value
SET TARGET_LAG = '10 minutes'
WAREHOUSE = 'etl_wh';
Snowpipe: Continuous Ingestion
Snowpipe is Snowflake's continuous data ingestion service that loads data from cloud storage into Snowflake tables in near-real-time (typically within minutes). It uses a serverless compute model with automatic scaling.
Snowpipe Cost Model
- Compute: $0.06/TB of files processed (serverless, billed per file)
- Storage: $0.04/TB/month for staged files
- Throughput: Typically 10-100 GB/hour depending on file size and compression
- Latency: 1-5 minutes from file availability to queryable data
- Optimal File Size: 100-250 MB compressed (balances parallelism and overhead)
-- Create pipe with auto-ingest
CREATE OR REPLACE PIPE raw_orders_pipe
AUTO_INGEST = TRUE
INTEGRATION = 'my_s3_integration'
COMMENT = 'Auto-ingest orders from S3'
AS
COPY INTO raw_orders
FROM @my_s3_stage/orders/
FILE_FORMAT = (
TYPE = PARQUET
COMPRESSION = SNAPPY
TRIM_SPACE = TRUE
ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE
)
ON_ERROR = 'SKIP_FILE'
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
-- Get pipe notification channel for S3 event setup
DESC PIPE raw_orders_pipe;
-- Monitor pipe status
SELECT
pipe_name,
is_paused,
is_auto_ingest,
owner,
created_on
FROM INFORMATION_SCHEMA.PIPES
WHERE pipe_name = 'RAW_ORDERS_PIPE';
-- Check pipe load history
SELECT
pipe_name,
file_name,
file_size,
stage_location,
last_load_time,
load_time_ms,
rows_loaded
FROM TABLE(INFORMATION_SCHEMA.PIPE_LOAD_HISTORY(
PIPE_NAME => 'raw_orders_pipe',
START_TIME => DATEADD(hour, -24, CURRENT_DATE())
))
ORDER BY last_load_time DESC;
-- Refresh pipe if files were added manually
ALTER PIPE raw_orders_pipe REFRESH;
Key Concepts Summary
| Concept | Type | Refresh Model | Use Case | Cost Model |
|---|---|---|---|---|
| Stream | Metadata | Real-time (on query) | CDC, incremental processing | Free metadata tracking |
| Task | Orchestration | Cron/Event-driven | Scheduled ETL/ELT | Per-credit compute |
| Dynamic Table | Materialized | TARGET_LAG schedule | Real-time aggregations | Per-credit compute |
| Snowpipe | Ingestion | Continuous auto | File-based ingestion | $0.06/TB processed |
| External Table | Virtual | Query-time | Direct query on files | Per-query compute |
| Materialized View | Pre-computed | Manual/Automatic | Dashboard acceleration | Per-credit refresh |
| Secure View | Virtual | Query-time | Row-level security | Per-query compute |
| Stored Procedure | Code | On-call | Complex transformations | Per-credit execution |
Performance Metrics
| Feature | Min Latency | Max Throughput | Cost/GB | Concurrent Users |
|---|---|---|---|---|
| Snowpipe | 1-5 min | 100 GB/hr | $0.06 | Unlimited |
| Stream+Task | 1 min | 500 GB/hr | $0.04 | Depends on WH |
| Dynamic Table | 5 min | 1 TB/hr | $0.05 | Depends on WH |
| COPY INTO | 10 sec | 1 TB/hr | $0.03 | Depends on WH |
| External Table | Query-time | 10 GB/hr | $0.01 | Depends on WH |
| Materialized View | Manual | 100 GB/hr | $0.02 | Unlimited |
10 Best Practices
- Use APPEND_ONLY streams when you only need INSERT tracking β reduces metadata overhead
- Set TARGET_LAG aggressively on Dynamic Tables only when fresh data is required
- Monitor stream offsets to prevent unbounded stream growth β consume and drop streams regularly
- Use serverless Tasks (no warehouse) for lightweight DML operations to reduce cost
- Implement task DAGs with BEFORE/AFTER dependencies for complex multi-step pipelines
- Optimize Snowpipe file sizes to 100-250 MB compressed β avoid too-small files
- Use MATCH_BY_COLUMN_NAME in COPY INTO to handle schema evolution gracefully
- Set MAX_CLUSTER_COUNT on Snowpipe for burst ingestion scenarios
- Implement error notifications via ERROR_INTEGRATION on critical tasks
- Version control task definitions β use CREATE OR REPLACE and deployment scripts
- Streams enable real-time CDC without external tools β query metadata columns for change detection
- Tasks and Dynamic Tables replace external orchestration for Snowflake-native pipelines
- Snowpipe provides serverless continuous ingestion with automatic scaling
- TARGET_LAG on Dynamic Tables balances freshness against compute cost
- Combining Streams + Tasks + Dynamic Tables creates fully automated incremental pipelines
See Also
- Snowflake Fundamentals β Virtual warehouses, micro-partitions, and Time Travel
- dbt Advanced β Custom macros, CI/CD, and multi-project patterns
- Real-Time Analytics β Stream processing with Kafka and Flink
- Data Contracts β Formal producer-consumer agreements for data quality
- CI/CD for Data Pipelines β Automated testing and deployment
- Cost Optimization β Right-sizing warehouses and reducing compute costs