πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: Dynamic Tables, Streams & Tasks

Snowflake AdvancedStreaming & CDC⭐ Premium

Advertisement

Snowflake Advanced Β· Interview Prep

Dynamic Tables, Streams & Tasks

Difficulty: Hard Β· Commonly asked at Amazon, Netflix, Meta

Interview Question

"Design a real-time CDC pipeline using Streams and Tasks. How do you handle late-arriving data, schema changes, and ensure exactly-once processing semantics?"

ℹ️

Companies Asking This: Amazon (L6 Data Engineer), Netflix (Staff Data Engineer), Meta (Data Platform Engineer), Google (Senior Data Engineer)


Dynamic Tables

Dynamic Tables are declarative, incrementally-refreshed tables that automatically keep query results up-to-date.

Creating Dynamic Tables

-- Create a dynamic table for real-time aggregation
CREATE OR REPLACE DYNAMIC TABLE daily_sales_summary
    TARGET_LAG = '5 minutes'
    WAREHOUSE = analytics_wh
AS
SELECT 
    sale_date,
    region,
    product_category,
    COUNT(*) AS transaction_count,
    SUM(amount) AS total_revenue,
    AVG(amount) AS avg_transaction,
    COUNT(DISTINCT customer_id) AS unique_customers,
    CURRENT_TIMESTAMP() AS last_refreshed
FROM sales
WHERE sale_date >= DATEADD(day, -30, CURRENT_DATE())
GROUP BY 1, 2, 3;

-- Dynamic table with joins
CREATE OR REPLACE DYNAMIC TABLE customer_360
    TARGET_LAG = '10 minutes'
    WAREHOUSE = analytics_wh
AS
SELECT 
    c.customer_id,
    c.customer_name,
    c.segment,
    c.registration_date,
    COUNT(o.order_id) AS total_orders,
    SUM(o.amount) AS lifetime_value,
    MAX(o.order_date) AS last_order_date,
    DATEDIFF(day, MAX(o.order_date), CURRENT_DATE()) AS days_since_last_order
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
GROUP BY 1, 2, 3, 4;

-- Check dynamic table refresh status
SELECT 
    name,
    refresh_start_time,
    refresh_end_time,
    state,
    bytes_refreshed,
    data_metric_query_frequency
FROM information_schema.dynamic_table_refresh_history
WHERE name = 'DAILY_SALES_SUMMARY'
ORDER BY refresh_start_time DESC;

-- Monitor dynamic table lag
SELECT 
    name,
    CURRENT_LAG,
    TARGET_LAG,
    CASE 
        WHEN CURRENT_LAG > TARGET_LAG * 2 THEN 'CRITICAL'
        WHEN CURRENT_LAG > TARGET_LAG THEN 'WARNING'
        ELSE 'HEALTHY'
    END AS lag_status
FROM information_schema.dynamic_tables
WHERE name = 'DAILY_SALES_SUMMARY';

Streams (Change Data Capture)

Streams capture DML changes (INSERT, UPDATE, DELETE) on tables, providing a changelog for downstream processing.

Basic Stream Setup

-- Create a stream on a source table
CREATE OR REPLACE STREAM orders_stream
    ON TABLE orders
    SHOW_INITIAL_ROWS = FALSE
    APPEND_ONLY = FALSE
    INSERT_ONLY = FALSE
    OFFSET = NULL;

-- Check stream status
SELECT 
    stream_name,
    table_name,
    create_time,
    last_refreshed_time,
    is_stale,
    MODE
FROM information_schema.streams
WHERE stream_name = 'ORDERS_STREAM';

-- Query stream for changes
SELECT 
    METADATA$ACTION AS action,
    METADATA$ISUPDATE AS is_update,
    METADATA$ROW_ID AS row_id,
    order_id,
    customer_id,
    order_date,
    total_amount,
    status,
    CURRENT_TIMESTAMP() AS captured_at
FROM orders_stream;

-- Process stream and merge into target table
MERGE INTO orders_daily_summary t
USING (
    SELECT 
        DATE(order_date) AS sale_date,
        COUNT(*) AS new_orders,
        SUM(total_amount) AS new_revenue
    FROM orders_stream
    WHERE METADATA$ACTION = 'INSERT'
    GROUP BY 1
) s
ON t.sale_date = s.sale_date
WHEN MATCHED THEN
    UPDATE SET 
        order_count = t.order_count + s.new_orders,
        total_revenue = t.total_revenue + s.new_revenue,
        last_updated = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN
    INSERT (sale_date, order_count, total_revenue, last_updated)
    VALUES (s.sale_date, s.new_orders, s.new_revenue, CURRENT_TIMESTAMP());

Real-World Scenario: Amazon

Question: "Design a CDC pipeline that captures changes from a PostgreSQL database, loads them into Snowflake, and maintains a slowly changing dimension (SCD Type 2) table."

Solution: End-to-End CDC Pipeline

-- 1. Create raw table for CDC events
CREATE TABLE raw_cdc_events (
    event_id VARCHAR(100),
    table_name VARCHAR(100),
    operation VARCHAR(10),  -- INSERT, UPDATE, DELETE
    before_data VARIANT,
    after_data VARIANT,
    lsn VARCHAR(100),  -- Log sequence number
    event_timestamp TIMESTAMP_NTZ,
    _ingestion_time TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

-- 2. Create stream for CDC processing
CREATE OR REPLACE STREAM cdc_stream
    ON TABLE raw_cdc_events
    SHOW_INITIAL_ROWS = FALSE;

-- 3. Create SCD Type 2 target table
CREATE TABLE customers_scd (
    customer_id VARCHAR(100),
    customer_name VARCHAR(200),
    email VARCHAR(200),
    segment VARCHAR(50),
    effective_start_date TIMESTAMP_NTZ,
    effective_end_date TIMESTAMP_NTZ DEFAULT TO_TIMESTAMP_NTZ('9999-12-31'),
    is_current BOOLEAN DEFAULT TRUE,
    _ingestion_time TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

-- 4. SCD Type 2 merge procedure
CREATE OR REPLACE PROCEDURE process_scd_type2()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
DECLARE
    processed_count NUMBER DEFAULT 0;
BEGIN
    -- Process INSERT and UPDATE operations
    MERGE INTO customers_scd t
    USING (
        SELECT 
            after_data:customer_id::VARCHAR AS customer_id,
            after_data:customer_name::VARCHAR AS customer_name,
            after_data:email::VARCHAR AS email,
            after_data:segment::VARCHAR AS segment,
            event_timestamp
        FROM cdc_stream
        WHERE METADATA$ACTION = 'INSERT'
           OR (METADATA$ACTION = 'UPDATE' AND METADATA$ISUPDATE = TRUE)
    ) s
    ON t.customer_id = s.customer_id AND t.is_current = TRUE
    WHEN MATCHED AND (
        t.customer_name != s.customer_name OR
        t.email != s.email OR
        t.segment != s.segment
    ) THEN
        UPDATE SET 
            effective_end_date = s.event_timestamp,
            is_current = FALSE
    WHEN NOT MATCHED THEN
        INSERT (customer_id, customer_name, email, segment, effective_start_date)
        VALUES (s.customer_id, s.customer_name, s.email, s.segment, s.event_timestamp);

    -- Insert new current versions for updated records
    INSERT INTO customers_scd (customer_id, customer_name, email, segment, effective_start_date)
    SELECT 
        after_data:customer_id::VARCHAR,
        after_data:customer_name::VARCHAR,
        after_data:email::VARCHAR,
        after_data:segment::VARCHAR,
        event_timestamp
    FROM cdc_stream
    WHERE METADATA$ACTION = 'INSERT'
       OR (METADATA$ACTION = 'UPDATE' AND METADATA$ISUPDATE = TRUE)
    AND after_data:customer_id::VARCHAR IN (
        SELECT customer_id FROM customers_scd WHERE is_current = FALSE
    );

    -- Process DELETE operations
    UPDATE customers_scd
    SET is_current = FALSE,
        effective_end_date = CURRENT_TIMESTAMP()
    WHERE customer_id IN (
        SELECT before_data:customer_id::VARCHAR
        FROM cdc_stream
        WHERE METADATA$ACTION = 'DELETE'
    )
    AND is_current = TRUE;

    processed_count := SQLROWCOUNT;
    
    RETURN 'SCD Type 2 processing complete. Rows affected: ' || processed_count;
END;
$$;

-- 5. Schedule CDC processing
CREATE OR REPLACE TASK cdc_processing_task
    WAREHOUSE = etl_wh
    SCHEDULE = '1 MINUTE'
AS
    CALL process_scd_type2();

-- 6. Monitor CDC lag
SELECT 
    task_name,
    state,
    last_committed_on,
    scheduled_time
FROM information_schema.task_history
WHERE task_name = 'CDC_PROCESSING_TASK'
ORDER BY last_committed_on DESC;

Real-World Scenario: Netflix

Question: "How do you handle schema changes in a CDC pipeline? The source table adds a new column."

Schema Evolution in CDC

-- 1. Source table adds a new column
-- ALTER TABLE source.customers ADD COLUMN phone VARCHAR(20);

-- 2. Raw table stores all data as VARIANT
CREATE TABLE raw_cdc_events_v2 (
    event_id VARCHAR(100),
    table_name VARCHAR(100),
    operation VARCHAR(10),
    before_data VARIANT,
    after_data VARIANT,
    lsn VARCHAR(100),
    event_timestamp TIMESTAMP_NTZ,
    _ingestion_time TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

-- 3. Stream automatically captures new columns in VARIANT
-- 4. Create view to extract known columns + handle new ones
CREATE OR REPLACE VIEW customers_extracted AS
SELECT 
    after_data:customer_id::VARCHAR AS customer_id,
    after_data:customer_name::VARCHAR AS customer_name,
    after_data:email::VARCHAR AS email,
    after_data:segment::VARCHAR AS segment,
    after_data:phone::VARCHAR AS phone,  -- New column
    after_data:address::VARCHAR AS address,  -- Potential future column
    METADATA$ACTION AS cdc_action,
    event_timestamp
FROM cdc_stream;

-- 5. Monitor for schema changes
SELECT 
    OBJECT_KEYS(after_data) AS columns_captured,
    COUNT(*) AS event_count,
    MIN(event_timestamp) AS first_seen,
    MAX(event_timestamp) AS last_seen
FROM raw_cdc_events
GROUP BY 1
ORDER BY 3 DESC;

Tasks & Scheduling

-- Create a task with complex scheduling
CREATE OR REPLACE TASK daily_etl_task
    WAREHOUSE = etl_wh
    SCHEDULE = 'USING CRON 0 2 * * * America/New_York'
    ALLOW_OVERLAPPING_EXECUTION = FALSE
    ERROR_INTEGRATION = 'SNOWFLAKE_ALERTS'
AS
BEGIN
    -- Step 1: Process CDC
    CALL process_cdc_events();
    
    -- Step 2: Update aggregations
    REFRESH DYNAMIC TABLE daily_sales_summary;
    
    -- Step 3: Run data quality checks
    CALL run_data_quality_checks('orders');
END;

-- Task dependencies
CREATE OR REPLACE TASK post_cdc_processing
    WAREHOUSE = analytics_wh
    AFTER daily_etl_task
AS
    CALL update_analytics_views();

-- Monitor task execution
SELECT 
    task_name,
    query_id,
    query_text,
    error_code,
    error_message,
    scheduled_time,
    completed_time,
    state
FROM information_schema.task_history
WHERE task_name IN ('DAILY_ETL_TASK', 'POST_CDC_PROCESSING')
  AND scheduled_time >= DATEADD(day, -1, CURRENT_TIMESTAMP())
ORDER BY scheduled_time DESC;

Best Practices

PatternRecommendation
CDC frequencyUse 1-minute schedule for near-real-time, 5-minute for cost savings
SCD Type 2Use Streams + MERGE for incremental processing
Schema evolutionStore raw data as VARIANT, extract in views
Error handlingUse TRY/CATCH in stored procedures, alert on failures
MonitoringSet up alerts for task failures and lag exceeding thresholds
DeduplicationTrack LSN/offset to prevent duplicate processing

⚠️

Common Pitfalls:

  1. Stream staleness β€” Streams become stale if not queried within 14 days
  2. Task overlap β€” Use ALLOW_OVERLAPPING_EXECUTION = FALSE for sequential jobs
  3. Missing error handling β€” Always handle errors in stored procedures
  4. Ignoring lag β€” Monitor dynamic table lag and task execution times

Advertisement