Snowflake Advanced Β· Interview Prep
Dynamic Tables, Streams & Tasks
Difficulty: Hard Β· Commonly asked at Amazon, Netflix, Meta
Interview Question
"Design a real-time CDC pipeline using Streams and Tasks. How do you handle late-arriving data, schema changes, and ensure exactly-once processing semantics?"
βΉοΈ
Companies Asking This: Amazon (L6 Data Engineer), Netflix (Staff Data Engineer), Meta (Data Platform Engineer), Google (Senior Data Engineer)
Dynamic Tables
Dynamic Tables are declarative, incrementally-refreshed tables that automatically keep query results up-to-date.
Creating Dynamic Tables
-- Create a dynamic table for real-time aggregation
CREATE OR REPLACE DYNAMIC TABLE daily_sales_summary
TARGET_LAG = '5 minutes'
WAREHOUSE = analytics_wh
AS
SELECT
sale_date,
region,
product_category,
COUNT(*) AS transaction_count,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_transaction,
COUNT(DISTINCT customer_id) AS unique_customers,
CURRENT_TIMESTAMP() AS last_refreshed
FROM sales
WHERE sale_date >= DATEADD(day, -30, CURRENT_DATE())
GROUP BY 1, 2, 3;
-- Dynamic table with joins
CREATE OR REPLACE DYNAMIC TABLE customer_360
TARGET_LAG = '10 minutes'
WAREHOUSE = analytics_wh
AS
SELECT
c.customer_id,
c.customer_name,
c.segment,
c.registration_date,
COUNT(o.order_id) AS total_orders,
SUM(o.amount) AS lifetime_value,
MAX(o.order_date) AS last_order_date,
DATEDIFF(day, MAX(o.order_date), CURRENT_DATE()) AS days_since_last_order
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
GROUP BY 1, 2, 3, 4;
-- Check dynamic table refresh status
SELECT
name,
refresh_start_time,
refresh_end_time,
state,
bytes_refreshed,
data_metric_query_frequency
FROM information_schema.dynamic_table_refresh_history
WHERE name = 'DAILY_SALES_SUMMARY'
ORDER BY refresh_start_time DESC;
-- Monitor dynamic table lag
SELECT
name,
CURRENT_LAG,
TARGET_LAG,
CASE
WHEN CURRENT_LAG > TARGET_LAG * 2 THEN 'CRITICAL'
WHEN CURRENT_LAG > TARGET_LAG THEN 'WARNING'
ELSE 'HEALTHY'
END AS lag_status
FROM information_schema.dynamic_tables
WHERE name = 'DAILY_SALES_SUMMARY';
Streams (Change Data Capture)
Streams capture DML changes (INSERT, UPDATE, DELETE) on tables, providing a changelog for downstream processing.
Basic Stream Setup
-- Create a stream on a source table
CREATE OR REPLACE STREAM orders_stream
ON TABLE orders
SHOW_INITIAL_ROWS = FALSE
APPEND_ONLY = FALSE
INSERT_ONLY = FALSE
OFFSET = NULL;
-- Check stream status
SELECT
stream_name,
table_name,
create_time,
last_refreshed_time,
is_stale,
MODE
FROM information_schema.streams
WHERE stream_name = 'ORDERS_STREAM';
-- Query stream for changes
SELECT
METADATA$ACTION AS action,
METADATA$ISUPDATE AS is_update,
METADATA$ROW_ID AS row_id,
order_id,
customer_id,
order_date,
total_amount,
status,
CURRENT_TIMESTAMP() AS captured_at
FROM orders_stream;
-- Process stream and merge into target table
MERGE INTO orders_daily_summary t
USING (
SELECT
DATE(order_date) AS sale_date,
COUNT(*) AS new_orders,
SUM(total_amount) AS new_revenue
FROM orders_stream
WHERE METADATA$ACTION = 'INSERT'
GROUP BY 1
) s
ON t.sale_date = s.sale_date
WHEN MATCHED THEN
UPDATE SET
order_count = t.order_count + s.new_orders,
total_revenue = t.total_revenue + s.new_revenue,
last_updated = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN
INSERT (sale_date, order_count, total_revenue, last_updated)
VALUES (s.sale_date, s.new_orders, s.new_revenue, CURRENT_TIMESTAMP());
Real-World Scenario: Amazon
Question: "Design a CDC pipeline that captures changes from a PostgreSQL database, loads them into Snowflake, and maintains a slowly changing dimension (SCD Type 2) table."
Solution: End-to-End CDC Pipeline
-- 1. Create raw table for CDC events
CREATE TABLE raw_cdc_events (
event_id VARCHAR(100),
table_name VARCHAR(100),
operation VARCHAR(10), -- INSERT, UPDATE, DELETE
before_data VARIANT,
after_data VARIANT,
lsn VARCHAR(100), -- Log sequence number
event_timestamp TIMESTAMP_NTZ,
_ingestion_time TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- 2. Create stream for CDC processing
CREATE OR REPLACE STREAM cdc_stream
ON TABLE raw_cdc_events
SHOW_INITIAL_ROWS = FALSE;
-- 3. Create SCD Type 2 target table
CREATE TABLE customers_scd (
customer_id VARCHAR(100),
customer_name VARCHAR(200),
email VARCHAR(200),
segment VARCHAR(50),
effective_start_date TIMESTAMP_NTZ,
effective_end_date TIMESTAMP_NTZ DEFAULT TO_TIMESTAMP_NTZ('9999-12-31'),
is_current BOOLEAN DEFAULT TRUE,
_ingestion_time TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- 4. SCD Type 2 merge procedure
CREATE OR REPLACE PROCEDURE process_scd_type2()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
DECLARE
processed_count NUMBER DEFAULT 0;
BEGIN
-- Process INSERT and UPDATE operations
MERGE INTO customers_scd t
USING (
SELECT
after_data:customer_id::VARCHAR AS customer_id,
after_data:customer_name::VARCHAR AS customer_name,
after_data:email::VARCHAR AS email,
after_data:segment::VARCHAR AS segment,
event_timestamp
FROM cdc_stream
WHERE METADATA$ACTION = 'INSERT'
OR (METADATA$ACTION = 'UPDATE' AND METADATA$ISUPDATE = TRUE)
) s
ON t.customer_id = s.customer_id AND t.is_current = TRUE
WHEN MATCHED AND (
t.customer_name != s.customer_name OR
t.email != s.email OR
t.segment != s.segment
) THEN
UPDATE SET
effective_end_date = s.event_timestamp,
is_current = FALSE
WHEN NOT MATCHED THEN
INSERT (customer_id, customer_name, email, segment, effective_start_date)
VALUES (s.customer_id, s.customer_name, s.email, s.segment, s.event_timestamp);
-- Insert new current versions for updated records
INSERT INTO customers_scd (customer_id, customer_name, email, segment, effective_start_date)
SELECT
after_data:customer_id::VARCHAR,
after_data:customer_name::VARCHAR,
after_data:email::VARCHAR,
after_data:segment::VARCHAR,
event_timestamp
FROM cdc_stream
WHERE METADATA$ACTION = 'INSERT'
OR (METADATA$ACTION = 'UPDATE' AND METADATA$ISUPDATE = TRUE)
AND after_data:customer_id::VARCHAR IN (
SELECT customer_id FROM customers_scd WHERE is_current = FALSE
);
-- Process DELETE operations
UPDATE customers_scd
SET is_current = FALSE,
effective_end_date = CURRENT_TIMESTAMP()
WHERE customer_id IN (
SELECT before_data:customer_id::VARCHAR
FROM cdc_stream
WHERE METADATA$ACTION = 'DELETE'
)
AND is_current = TRUE;
processed_count := SQLROWCOUNT;
RETURN 'SCD Type 2 processing complete. Rows affected: ' || processed_count;
END;
$$;
-- 5. Schedule CDC processing
CREATE OR REPLACE TASK cdc_processing_task
WAREHOUSE = etl_wh
SCHEDULE = '1 MINUTE'
AS
CALL process_scd_type2();
-- 6. Monitor CDC lag
SELECT
task_name,
state,
last_committed_on,
scheduled_time
FROM information_schema.task_history
WHERE task_name = 'CDC_PROCESSING_TASK'
ORDER BY last_committed_on DESC;
Real-World Scenario: Netflix
Question: "How do you handle schema changes in a CDC pipeline? The source table adds a new column."
Schema Evolution in CDC
-- 1. Source table adds a new column
-- ALTER TABLE source.customers ADD COLUMN phone VARCHAR(20);
-- 2. Raw table stores all data as VARIANT
CREATE TABLE raw_cdc_events_v2 (
event_id VARCHAR(100),
table_name VARCHAR(100),
operation VARCHAR(10),
before_data VARIANT,
after_data VARIANT,
lsn VARCHAR(100),
event_timestamp TIMESTAMP_NTZ,
_ingestion_time TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- 3. Stream automatically captures new columns in VARIANT
-- 4. Create view to extract known columns + handle new ones
CREATE OR REPLACE VIEW customers_extracted AS
SELECT
after_data:customer_id::VARCHAR AS customer_id,
after_data:customer_name::VARCHAR AS customer_name,
after_data:email::VARCHAR AS email,
after_data:segment::VARCHAR AS segment,
after_data:phone::VARCHAR AS phone, -- New column
after_data:address::VARCHAR AS address, -- Potential future column
METADATA$ACTION AS cdc_action,
event_timestamp
FROM cdc_stream;
-- 5. Monitor for schema changes
SELECT
OBJECT_KEYS(after_data) AS columns_captured,
COUNT(*) AS event_count,
MIN(event_timestamp) AS first_seen,
MAX(event_timestamp) AS last_seen
FROM raw_cdc_events
GROUP BY 1
ORDER BY 3 DESC;
Tasks & Scheduling
-- Create a task with complex scheduling
CREATE OR REPLACE TASK daily_etl_task
WAREHOUSE = etl_wh
SCHEDULE = 'USING CRON 0 2 * * * America/New_York'
ALLOW_OVERLAPPING_EXECUTION = FALSE
ERROR_INTEGRATION = 'SNOWFLAKE_ALERTS'
AS
BEGIN
-- Step 1: Process CDC
CALL process_cdc_events();
-- Step 2: Update aggregations
REFRESH DYNAMIC TABLE daily_sales_summary;
-- Step 3: Run data quality checks
CALL run_data_quality_checks('orders');
END;
-- Task dependencies
CREATE OR REPLACE TASK post_cdc_processing
WAREHOUSE = analytics_wh
AFTER daily_etl_task
AS
CALL update_analytics_views();
-- Monitor task execution
SELECT
task_name,
query_id,
query_text,
error_code,
error_message,
scheduled_time,
completed_time,
state
FROM information_schema.task_history
WHERE task_name IN ('DAILY_ETL_TASK', 'POST_CDC_PROCESSING')
AND scheduled_time >= DATEADD(day, -1, CURRENT_TIMESTAMP())
ORDER BY scheduled_time DESC;
Best Practices
| Pattern | Recommendation |
|---|---|
| CDC frequency | Use 1-minute schedule for near-real-time, 5-minute for cost savings |
| SCD Type 2 | Use Streams + MERGE for incremental processing |
| Schema evolution | Store raw data as VARIANT, extract in views |
| Error handling | Use TRY/CATCH in stored procedures, alert on failures |
| Monitoring | Set up alerts for task failures and lag exceeding thresholds |
| Deduplication | Track LSN/offset to prevent duplicate processing |
β οΈ
Common Pitfalls:
- Stream staleness β Streams become stale if not queried within 14 days
- Task overlap β Use ALLOW_OVERLAPPING_EXECUTION = FALSE for sequential jobs
- Missing error handling β Always handle errors in stored procedures
- Ignoring lag β Monitor dynamic table lag and task execution times