πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Snowflake Advanced: Streams, Tasks, Dynamic Tables & Data Pipelines

Module 3: Data Warehouses & StorageCloud Data Platforms - Advanced🟒 Free Lesson

Advertisement

Advanced Snowflake: Building Production Data Pipelines

Beyond basic warehousing, Snowflake provides a rich ecosystem for building end-to-end data pipelines within the platform.

Why Advanced Snowflake Matters


Key Benefits:

  • Incremental processing β€” Streams track row-level changes automatically
  • Orchestration β€” Tasks schedule and chain pipeline steps
  • Real-time ingestion β€” Snowpipe handles continuous data loading
  • No external tools β€” reduces operational complexity and cost

Key Insight: Snowflake's native pipeline primitives eliminate the need for external orchestration tools in many scenarios.


Architecture Overview

Snowflake Pipeline Flow

Snowflake Data Pipeline FlowSourcesDB, APIs, FilesStageS3 / Azure / GCSCOPY INTOBulk loadTransformdbt / Stored ProcServeBI / MLStreams CDC Flow (Change Tracking)Source TableStream (CDC)MERGE into TargetTask (scheduled)Stream tracks INSERT/UPDATE/DELETE | Task runs MERGE on schedule | Zero-copy cloning for dev/test

Streams: Change Data Capture

A Stream is a Snowflake object that records DML changes (INSERT, UPDATE, DELETE) made to a source table. Streams enable incremental data processing by tracking the row-level change metadata, allowing downstream consumers to process only changed rows.

Stream Metadata Columns

Each stream record includes:

  • METADATA$ACTION: 'INSERT' or 'DELETE' (the type of change)
  • METADATA$ISUPDATE: TRUE if the row was part of an UPDATE operation
  • METADATA$ROW_ID: Unique identifier for the changed row
  • METADATA$SNAPSHOT_ID: Identifier for the transaction snapshot
  • Change Volume: Ξ”_rows = |INSERT_rows| + |DELETE_rows|
  • Net Change: Net = INSERT_rows - DELETE_rows
-- Create source table
CREATE TABLE raw_orders (
    order_id    INT PRIMARY KEY,
    customer_id INT,
    amount      DECIMAL(12,2),
    status      VARCHAR(20),
    updated_at  TIMESTAMP
);

-- Create stream on source table (append-only)
CREATE STREAM raw_orders_stream
    ON TABLE raw_orders
    APPEND_ONLY = FALSE      -- Track all DML changes
    SHOW_INITIAL_ROWS = FALSE;  -- Don't capture initial state

-- Simulate DML changes
INSERT INTO raw_orders VALUES (1, 101, 250.00, 'pending', CURRENT_TIMESTAMP());
INSERT INTO raw_orders VALUES (2, 102, 180.50, 'pending', CURRENT_TIMESTAMP());
UPDATE raw_orders SET status = 'shipped', amount = 275.00 WHERE order_id = 1;
DELETE FROM raw_orders WHERE order_id = 2;

-- Query stream to see changes
SELECT
    order_id,
    customer_id,
    amount,
    status,
    METADATA$ACTION      AS change_action,
    METADATA$ISUPDATE    AS is_update,
    METADATA$ROW_ID      AS row_id
FROM raw_orders_stream;

-- Use stream for incremental load into target table
MERGE INTO dim_orders_target AS target
USING (
    SELECT
        order_id,
        customer_id,
        amount,
        status,
        updated_at
    FROM raw_orders_stream
    WHERE METADATA$ACTION = 'INSERT'
) AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN
    UPDATE SET
        amount = source.amount,
        status = source.status,
        updated_at = source.updated_at
WHEN NOT MATCHED THEN
    INSERT (order_id, customer_id, amount, status, updated_at)
    VALUES (source.order_id, source.customer_id, source.amount,
            source.status, source.updated_at);

-- Consume stream and get watermark
SELECT SYSTEM$STREAM_GET_TABLE_TIMESTAMP('raw_orders_stream');

-- Drop stream after consumption
DROP STREAM IF EXISTS raw_orders_stream;

Tasks: Scheduled Execution

A Task is a Snowflake object that executes a single SQL statement or stored procedure on a defined schedule. Tasks form the backbone of scheduled data pipelines within Snowflake.

-- Simple task: daily aggregation
CREATE OR REPLACE TASK daily_sales_aggregate
    WAREHOUSE = 'etl_wh'
    SCHEDULE = 'USING CRON 0 2 * * * UTC'  -- 2 AM UTC daily
    COMMENT = 'Aggregate daily sales metrics'
AS
    INSERT INTO mart_daily_sales (date_key, total_revenue, order_count)
    SELECT
        sale_date,
        SUM(amount) AS total_revenue,
        COUNT(*)    AS order_count
    FROM fact_sales
    WHERE sale_date = CURRENT_DATE() - 1
    GROUP BY sale_date;

-- Task dependency chain (DAG)
CREATE OR REPLACE TASK extract_raw_data
    WAREHOUSE = 'etl_wh'
    SCHEDULE = 'USING CRON 0 1 * * * UTC'
AS
    COPY INTO raw_orders
    FROM @my_stage/orders/
    FILE_FORMAT = (TYPE = PARQUET)
    MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
    ON_ERROR = 'SKIP_FILE';

CREATE OR REPLACE TASK transform_orders
    WAREHOUSE = 'etl_wh'
    AFTER = extract_raw_data  -- Depends on extract task
AS
    INSERT INTO staging_orders
    SELECT * FROM raw_orders
    WHERE order_date >= CURRENT_DATE() - 1;

CREATE OR REPLACE TASK load_mart
    WAREHOUSE = 'etl_wh'
    AFTER = transform_orders  -- Depends on transform task
AS
    INSERT INTO mart_orders
    SELECT * FROM staging_orders
    WHERE processed = FALSE;

-- Enable task chain
ALTER TASK load_mart RESUME;
ALTER TASK transform_orders RESUME;
ALTER TASK extract_raw_data RESUME;

-- Monitor task execution
SELECT
    task_name,
    query_id,
    state,
    error_code,
    error_message,
    scheduled_time,
    completed_time,
    DATEDIFF(second, scheduled_time, completed_time) AS duration_sec
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
    SCHEDULED_TIME_RANGE_START => DATEADD(day, -7, CURRENT_DATE())
))
ORDER BY scheduled_time DESC;

-- Multi-action task with error handling
CREATE OR REPLACE TASK pipeline_orchestrator
    WAREHOUSE = 'etl_wh'
    SCHEDULE = 'USING CRON 0 6 * * * UTC'
    ERROR_INTEGRATION = 'my_notification_integration'
AS
BEGIN
    -- Step 1: Load staging
    COPY INTO staging.raw_events
    FROM @my_stage/events/
    FILE_FORMAT = (TYPE = PARQUET)
    ON_ERROR = 'SKIP_FILE';

    -- Step 2: Transform
    INSERT INTO processed.events
    SELECT
        event_id,
        PARSE_JSON(payload):user_id::STRING AS user_id,
        event_type,
        event_timestamp
    FROM staging.raw_events
    WHERE _load_date = CURRENT_DATE();

    -- Step 3: Aggregate
    MERGE INTO mart.event_metrics AS target
    USING (
        SELECT
            DATE_TRUNC('hour', event_timestamp) AS event_hour,
            event_type,
            COUNT(*) AS event_count,
            COUNT(DISTINCT user_id) AS unique_users
        FROM processed.events
        WHERE event_timestamp >= CURRENT_DATE() - INTERVAL '1 day'
        GROUP BY 1, 2
    ) AS source
    ON target.event_hour = source.event_hour
       AND target.event_type = source.event_type
    WHEN MATCHED THEN UPDATE SET
        event_count = source.event_count,
        unique_users = source.unique_users
    WHEN NOT MATCHED THEN INSERT VALUES (
        source.event_hour, source.event_type,
        source.event_count, source.unique_users
    );
END;

Dynamic Tables

A Dynamic Table is a Snowflake object that automatically refreshes on a defined schedule, maintaining the results of a query. Unlike materialized views, Dynamic Tables support joins, aggregations, and complex transformations with explicit refresh control.

-- Create a dynamic table for real-time metrics
CREATE OR REPLACE DYNAMIC TABLE dt_customer_lifetime_value
    TARGET_LAG = '5 minutes'  -- Refresh at most every 5 minutes
    WAREHOUSE = 'analytics_wh'
AS
    SELECT
        c.customer_key,
        c.customer_id,
        c.full_name,
        c.segment,
        COUNT(DISTINCT f.order_key) AS total_orders,
        SUM(f.net_amount)           AS lifetime_revenue,
        AVG(f.net_amount)           AS avg_order_value,
        MIN(f.order_date)           AS first_order_date,
        MAX(f.order_date)           AS last_order_date,
        DATEDIFF(day, MIN(f.order_date), MAX(f.order_date)) AS customer_tenure_days,
        CURRENT_TIMESTAMP()         AS refreshed_at
    FROM dim_customer c
    LEFT JOIN fact_orders f ON c.customer_key = f.customer_key
    WHERE c.is_current = TRUE
    GROUP BY c.customer_key, c.customer_id, c.full_name, c.segment;

-- Dynamic table with window functions
CREATE OR REPLACE DYNAMIC TABLE dt_running_totals
    TARGET_LAG = '1 hour'
    WAREHOUSE = 'analytics_wh'
AS
    SELECT
        sale_date,
        daily_revenue,
        SUM(daily_revenue) OVER (ORDER BY sale_date) AS cumulative_revenue,
        AVG(daily_revenue) OVER (
            ORDER BY sale_date
            ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
        ) AS rolling_7day_avg,
        RANK() OVER (ORDER BY daily_revenue DESC) AS revenue_rank
    FROM (
        SELECT
            sale_date,
            SUM(net_amount) AS daily_revenue
        FROM fact_sales
        GROUP BY sale_date
    );

-- Monitor dynamic table refresh
SELECT
    name,
    state,
    refresh_start_time,
    refresh_end_time,
    DATEDIFF(second, refresh_start_time, refresh_end_time) AS refresh_duration_sec,
    rows_Refreshed
FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY(
    TABLE_NAME => 'dt_customer_lifetime_value'
))
ORDER BY refresh_start_time DESC;

-- Alter dynamic table settings
ALTER DYNAMIC TABLE dt_customer_lifetime_value
    SET TARGET_LAG = '10 minutes'
    WAREHOUSE = 'etl_wh';

Snowpipe: Continuous Ingestion

Snowpipe is Snowflake's continuous data ingestion service that loads data from cloud storage into Snowflake tables in near-real-time (typically within minutes). It uses a serverless compute model with automatic scaling.

Snowpipe Cost Model

  • Compute: $0.06/TB of files processed (serverless, billed per file)
  • Storage: $0.04/TB/month for staged files
  • Throughput: Typically 10-100 GB/hour depending on file size and compression
  • Latency: 1-5 minutes from file availability to queryable data
  • Optimal File Size: 100-250 MB compressed (balances parallelism and overhead)
-- Create pipe with auto-ingest
CREATE OR REPLACE PIPE raw_orders_pipe
    AUTO_INGEST = TRUE
    INTEGRATION = 'my_s3_integration'
    COMMENT = 'Auto-ingest orders from S3'
AS
    COPY INTO raw_orders
    FROM @my_s3_stage/orders/
    FILE_FORMAT = (
        TYPE = PARQUET
        COMPRESSION = SNAPPY
        TRIM_SPACE = TRUE
        ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE
    )
    ON_ERROR = 'SKIP_FILE'
    MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;

-- Get pipe notification channel for S3 event setup
DESC PIPE raw_orders_pipe;

-- Monitor pipe status
SELECT
    pipe_name,
    is_paused,
    is_auto_ingest,
    owner,
    created_on
FROM INFORMATION_SCHEMA.PIPES
WHERE pipe_name = 'RAW_ORDERS_PIPE';

-- Check pipe load history
SELECT
    pipe_name,
    file_name,
    file_size,
    stage_location,
    last_load_time,
    load_time_ms,
    rows_loaded
FROM TABLE(INFORMATION_SCHEMA.PIPE_LOAD_HISTORY(
    PIPE_NAME => 'raw_orders_pipe',
    START_TIME => DATEADD(hour, -24, CURRENT_DATE())
))
ORDER BY last_load_time DESC;

-- Refresh pipe if files were added manually
ALTER PIPE raw_orders_pipe REFRESH;

Key Concepts Summary

ConceptTypeRefresh ModelUse CaseCost Model
StreamMetadataReal-time (on query)CDC, incremental processingFree metadata tracking
TaskOrchestrationCron/Event-drivenScheduled ETL/ELTPer-credit compute
Dynamic TableMaterializedTARGET_LAG scheduleReal-time aggregationsPer-credit compute
SnowpipeIngestionContinuous autoFile-based ingestion$0.06/TB processed
External TableVirtualQuery-timeDirect query on filesPer-query compute
Materialized ViewPre-computedManual/AutomaticDashboard accelerationPer-credit refresh
Secure ViewVirtualQuery-timeRow-level securityPer-query compute
Stored ProcedureCodeOn-callComplex transformationsPer-credit execution

Performance Metrics

FeatureMin LatencyMax ThroughputCost/GBConcurrent Users
Snowpipe1-5 min100 GB/hr$0.06Unlimited
Stream+Task1 min500 GB/hr$0.04Depends on WH
Dynamic Table5 min1 TB/hr$0.05Depends on WH
COPY INTO10 sec1 TB/hr$0.03Depends on WH
External TableQuery-time10 GB/hr$0.01Depends on WH
Materialized ViewManual100 GB/hr$0.02Unlimited

10 Best Practices

  1. Use APPEND_ONLY streams when you only need INSERT tracking β€” reduces metadata overhead
  2. Set TARGET_LAG aggressively on Dynamic Tables only when fresh data is required
  3. Monitor stream offsets to prevent unbounded stream growth β€” consume and drop streams regularly
  4. Use serverless Tasks (no warehouse) for lightweight DML operations to reduce cost
  5. Implement task DAGs with BEFORE/AFTER dependencies for complex multi-step pipelines
  6. Optimize Snowpipe file sizes to 100-250 MB compressed β€” avoid too-small files
  7. Use MATCH_BY_COLUMN_NAME in COPY INTO to handle schema evolution gracefully
  8. Set MAX_CLUSTER_COUNT on Snowpipe for burst ingestion scenarios
  9. Implement error notifications via ERROR_INTEGRATION on critical tasks
  10. Version control task definitions β€” use CREATE OR REPLACE and deployment scripts

  • Streams enable real-time CDC without external tools β€” query metadata columns for change detection
  • Tasks and Dynamic Tables replace external orchestration for Snowflake-native pipelines
  • Snowpipe provides serverless continuous ingestion with automatic scaling
  • TARGET_LAG on Dynamic Tables balances freshness against compute cost
  • Combining Streams + Tasks + Dynamic Tables creates fully automated incremental pipelines

See Also

⭐

Premium Content

Snowflake Advanced: Streams, Tasks, Dynamic Tables & Data Pipelines

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement