πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Streams & Tasks: Change Tracking & Scheduled Processing

🟒 Free Lesson

Advertisement

Streams & Tasks: Change Tracking & Scheduled Processing

Architecture Diagram 1: Streams Change Tracking Architecture

Architecture Diagram 2: Tasks Scheduling Architecture

Architecture Diagram 3: Task Graphs & Dependencies

Architecture Diagram 4: Stream Change Log Internals

Stream Change Log InternalsINSERTNew row addedACTION=INSERTUPDATEDELETE + INSERTISUPDATE=TRUEDELETERow removedACTION=DELETEMERGEMixed operationsAll trackedADVANCEOffset movesForward only

A stream provides change data capture (CDC) by tracking INSERT, UPDATE, and DELETE operations on a source table. Streams maintain an append-only change log with metadata (METADATAACTION,METADATAACTION, METADATAISUPDATE, METADATA$ROW_ID) enabling consumers to process only changed data since the last consumption.

A task is a scheduling unit that executes SQL statements, stored procedures, or scripts on a defined schedule (CRON expression or interval). Tasks support dependencies (AFTER clause) for workflow orchestration, error handling, and automatic retry.

Stream Consumption Latency
end_to_end_latency=change_detection+stream_query+target_processingend\_to\_end\_latency = change\_detection + stream\_query + target\_processing

Task Scheduling Model

  • CRON syntax: minute hour day-of-month month day-of-week
  • Examples: 0 2 * * * (daily 2 AM), */5 * * * * (every 5 min), 0 9-17 * * 1-5 (hourly 9–5 weekdays)
  • Overlap control: ALLOW_OVERLAPPING_EXECUTION=FALSE prevents concurrent runs
  • Dependency resolution: Child task runs only after ALL parent tasks succeed
  • Graph execution: DAG of tasks with parallel branches and synchronization points

Streams support three types: STANDARD (all DML), APPEND-ONLY (inserts only), INSERT-ONLY (new rows only). Choose STANDARD for complete CDC, APPEND-ONLY for immutable data, INSERT-ONLY for simplest overhead.

  • Streams: Append-only CDC with metadata for INSERT/UPDATE/DELETE tracking
  • Tasks: CRON-based scheduling with dependency graphs (DAGs)
  • Idempotent consumption: Same changes processed multiple times produce same result
  • Task states: SCHEDULED -> EXECUTING -> SUCCEEDED/FAILED/BLOCKED
  • Error propagation: Parent failure blocks all child tasks in the graph


Detailed Explanation

What are Streams?

Streams provide change data capture (CDC) by tracking INSERT, UPDATE, and DELETE operations on a source table via an append-only change log.


Stream Metadata

ColumnDescription
METADATA$ACTIONDML operation type (INSERT, UPDATE, DELETE)
METADATA$ISUPDATETRUE if row was updated
METADATA$ROW_IDUnique change event identifier

Stream Types

  • STANDARD β€” tracks all DML (INSERT, UPDATE, DELETE)
  • APPEND-ONLY β€” tracks only inserts (lower overhead)
  • INSERT-ONLY β€” new rows only (lightweight)

Stream Consumption Patterns

  • Stream returns current state of changed data + metadata
  • UPDATE operations contain both old and new values
  • Offset advances automatically after consumption
  • Idempotent β€” same changes processed multiple times produce same result

Tasks: Scheduled Processing

  • CRON-based or interval-based scheduling
  • Execution states: SCHEDULED β†’ EXECUTING β†’ SUCCEEDED/FAILED/BLOCKED
  • ALLOW_OVERLAPPING_EXECUTION controls concurrent runs

Task Graphs and Dependencies

  • Define dependencies via AFTER clause β†’ directed acyclic graph (DAG)
  • Child tasks run only after ALL parent tasks succeed
  • Supports parallel execution and synchronization points
  • Error propagation: parent failure blocks all children

Error Handling and Monitoring

  • ERROR_INTEGRATION for failure notifications
  • TASK_HISTORY table for execution details
  • SUSPEND_TASK_AFTER_NUM_FAILURES for auto-suspend

Key Takeaway: Monitor stream offsets regularly. Use STANDARD for complete CDC, APPEND-ONLY for immutable data, INSERT-ONLY for simplest overhead.

Key Concepts Table

Stream TypeTracksMetadataUse Case
STANDARDINSERT, UPDATE, DELETEFull change trackingComplete CDC
APPEND-ONLYINSERT onlyInsert metadataImmutable data
INSERT-ONLYINSERT onlyMinimal overheadAppend-only logs
Task StateDescriptionNext State
SCHEDULEDWaiting for execution timeEXECUTING
EXECUTINGCurrently runningSUCCEEDED/FAILED
SUCCEEDEDCompleted successfullySCHEDULED
FAILEDEncountered errorSCHEDULED/BLOCKED
BLOCKEDWaiting for dependencySCHEDULED
Dependency TypeBehaviorUse Case
AFTER (single)Wait for one parentSequential processing
AFTER (multiple)Wait for all parentsSynchronization point
Root taskNo parentsEntry point for workflow
Stream ParameterDefaultDescription
SHOW_INITIAL_ROWSFALSEInclude existing rows as INSERT events on first query
APPEND_ONLYFALSETrack only INSERT operations
INSERT_ONLYFALSETrack only new rows (no change metadata)
COMMENTNULLDescriptive text for the stream object
Task ParameterDefaultDescription
WAREHOUSERequiredVirtual warehouse for task execution
SCHEDULERequiredCRON expression or interval for execution timing
ALLOW_OVERLAPPING_EXECUTIONFALSEPermit concurrent task runs
ERROR_INTEGRATIONNULLNotification service for failure alerts
USER_TASK_TIMEOUT_MS3600000Maximum execution time (ms) before timeout
SUSPEND_TASK_AFTER_NUM_FAILURES10Auto-suspend after N consecutive failures
MAX_CONCURRENCY_LEVEL1Max parallel executions for task graph

Code Examples

Streams: Core Operations

-- Example 1: Create standard stream
-- SHOW_INITIAL_ROWS=TRUE captures existing data as INSERT events
-- This is useful for backfilling downstream tables
CREATE OR REPLACE STREAM sales_stream
ON TABLE sales_transactions
SHOW_INITIAL_ROWS = TRUE    -- Include current rows as initial INSERT events
COMMENT = 'CDC stream for sales transactions';

-- Example 2: Create append-only stream
-- APPEND_ONLY=TRUE reduces overhead by tracking only inserts
-- Cannot detect UPDATE or DELETE operations
CREATE OR REPLACE STREAM immutable_log_stream
ON TABLE audit_log
APPEND_ONLY = TRUE           -- Only track INSERT operations
COMMENT = 'Append-only stream for audit logs';

-- Example 3: Query stream with change metadata
-- METADATA$ACTION: The DML operation (INSERT, UPDATE, DELETE)
-- METADATA$ISUPDATE: TRUE if the row was updated (not a new insert)
-- METADATA$ROW_ID: Unique identifier for this change event
SELECT 
    *,
    METADATA$ACTION as change_action,
    METADATA$ISUPDATE as is_update,
    METADATA$ROW_ID as row_id
FROM sales_stream;

-- Example 4: Process stream with merge
-- MERGE handles INSERT, UPDATE, and DELETE in a single atomic operation
-- METADATA$ACTION determines which WHEN clause to execute
MERGE INTO sales_target t
USING sales_stream s
ON t.id = s.id
WHEN MATCHED AND s.METADATA$ACTION = 'DELETE' THEN
    DELETE
WHEN MATCHED AND s.METADATA$ACTION = 'UPDATE' THEN
    UPDATE SET 
        t.product = s.product,
        t.amount = s.amount,
        t.updated_at = s.updated_at
WHEN NOT MATCHED AND s.METADATA$ACTION = 'INSERT' THEN
    INSERT (id, product, amount, updated_at)
    VALUES (s.id, s.product, s.amount, s.updated_at);

-- Example 5: Stream offset management
-- Check current stream offset (how many unconsumed changes exist)
SELECT SYSTEM$STREAM_HAS_DATA('sales_stream') AS has_data;

-- Check stream offset details
SELECT 
    'sales_stream' AS stream_name,
    SYSTEM$STREAM_GET_TABLE_TIMESTAMP('sales_stream') AS table_timestamp,
    SYSTEM$STREAM_HAS_DATA('sales_stream') AS has_pending_changes;

-- Example 6: Create insert-only stream for append-only source tables
-- INSERT_ONLY is the most lightweight stream type
-- Best for tables that only receive INSERT operations (e.g., logs, events)
CREATE OR REPLACE STREAM event_log_stream
ON TABLE event_log
INSERT_ONLY = TRUE
COMMENT = 'Lightweight stream for append-only event logs';

-- Example 7: Stream on external tables for data lake CDC
-- Streams can track changes on external tables (Parquet, JSON, etc.)
CREATE OR REPLACE STREAM external_data_stream
ON TABLE external_orders
APPEND_ONLY = TRUE
COMMENT = 'CDC stream for external table data lake ingestion';

Tasks: Scheduling and Workflows

-- Example 8: Create basic task
-- CRON syntax: minute hour day-of-month month day-of-week
-- '0 3 * * *' means daily at 3:00 AM
-- America/New_York timezone ensures correct local execution
CREATE OR REPLACE TASK daily_cleanup_task
    WAREHOUSE = 'maintenance_wh'                              -- Dedicated warehouse for task
    SCHEDULE = 'USING CRON 0 3 * * * America/New_York'       -- Daily at 3 AM EST
    COMMENT = 'Daily cleanup of old data'
AS
    DELETE FROM audit_log 
    WHERE created_at < DATEADD(day, -90, CURRENT_TIMESTAMP());

-- Example 9: Create task with error handling
-- ERROR_INTEGRATION specifies notification service for failures
-- ALLOW_OVERLAPPING_EXECUTION=FALSE prevents concurrent runs
-- BEGIN...EXCEPTION block provides structured error handling
CREATE OR REPLACE TASK etl_task
    WAREHOUSE = 'etl_wh'
    SCHEDULE = 'USING CRON 0 2 * * * America/New_York'
    ERROR_INTEGRATION = 's3_error_notifications'              -- Notify on failure
    ALLOW_OVERLAPPING_EXECUTION = FALSE                       -- No concurrent executions
    USER_TASK_TIMEOUT_MS = 7200000                            -- 2-hour timeout
    SUSPEND_TASK_AFTER_NUM_FAILURES = 5                       -- Auto-suspend after 5 failures
AS
BEGIN
    -- Log start
    INSERT INTO task_log (task_name, start_time, status)
    VALUES ('etl_task', CURRENT_TIMESTAMP(), 'RUNNING');
    
    -- Execute ETL
    INSERT INTO target_table 
    SELECT * FROM source_table 
    WHERE date = CURRENT_DATE();
    
    -- Log success
    INSERT INTO task_log (task_name, start_time, status, end_time)
    VALUES ('etl_task', CURRENT_TIMESTAMP(), 'SUCCESS', CURRENT_TIMESTAMP());
    
    EXCEPTION
        WHEN OTHER THEN
            -- Log failure with error details
            INSERT INTO task_log (task_name, start_time, status, error_message)
            VALUES ('etl_task', CURRENT_TIMESTAMP(), 'FAILED', SQLERRM);
            RAISE;  -- Re-raise to mark task as FAILED
END;

-- Example 10: Interval-based scheduling (alternative to CRON)
-- Runs every 15 minutes using INTERVAL syntax
CREATE OR REPLACE TASK frequent_sync_task
    WAREHOUSE = 'sync_wh'
    SCHEDULE = '60 * * * *'          -- Every 60 minutes
AS
    INSERT INTO staging_data 
    SELECT * FROM source_table 
    WHERE updated_at > DATEADD(minute, -60, CURRENT_TIMESTAMP());

Task Graphs: Complex Workflows

-- Example 11: Create task graph (DAG)
-- Root task runs on schedule; child tasks run after parent succeeds
-- Parallel branches execute independently; final_task waits for all

CREATE OR REPLACE TASK root_task
    WAREHOUSE = 'etl_wh'
    SCHEDULE = 'USING CRON 0 2 * * * America/New_York'
AS
    INSERT INTO raw_sales SELECT * FROM stage_sales;

CREATE OR REPLACE TASK child_task_1
    WAREHOUSE = 'etl_wh'
    AFTER root_task                  -- Runs after root_task completes successfully
AS
    INSERT INTO clean_sales SELECT * FROM raw_sales WHERE amount > 0;

CREATE OR REPLACE TASK child_task_2
    WAREHOUSE = 'etl_wh'
    AFTER root_task                  -- Parallel branch: also runs after root_task
AS
    INSERT INTO raw_inventory SELECT * FROM stage_inventory;

CREATE OR REPLACE TASK final_task
    WAREHOUSE = 'etl_wh'
    AFTER child_task_1, child_task_2  -- Synchronization point: waits for BOTH
AS
    INSERT INTO analytics_sales 
    SELECT s.*, i.stock 
    FROM clean_sales s 
    JOIN raw_inventory i ON s.product_id = i.product_id;

-- Example 12: Resume/suspend task graph
-- Must resume child tasks before parent tasks
ALTER TASK child_task_1 RESUME;
ALTER TASK child_task_2 RESUME;
ALTER TASK root_task RESUME;

-- Suspend in reverse order (parent first)
ALTER TASK root_task SUSPEND;
ALTER TASK child_task_1 SUSPEND;
ALTER TASK child_task_2 SUSPEND;

-- Example 13: Alter task schedule
ALTER TASK daily_cleanup_task RESUME;
ALTER TASK daily_cleanup_task SUSPEND;
ALTER TASK daily_cleanup_task SET SCHEDULE = 'USING CRON 0 4 * * * America/New_York';

Monitoring and Observability

-- Example 14: Monitor task execution history
-- TASK_HISTORY provides detailed execution metrics per task run
SELECT 
    task_name,
    query_id,
    state,                                          -- SUCCEEDED, FAILED, BLOCKED, EXECUTING
    scheduled_time,                                 -- When task was scheduled to run
    started_time,                                   -- When task actually started executing
    completed_time,                                 -- When task finished
    DATEDIFF(second, started_time, completed_time) AS duration_seconds,
    error_code,
    error_message
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
    SCHEDULED_TIME_RANGE_START => DATEADD(day, -7, CURRENT_TIMESTAMP())
))
ORDER BY scheduled_time DESC;

-- Example 15: Monitor task graph dependencies
-- Identify blocked tasks and their parent dependencies
SELECT 
    t.task_name,
    t.state AS task_state,
    d.dependency_type,
    d.downstream_task_name AS parent_task,
    p.state AS parent_state
FROM INFORMATION_SCHEMA.TASK_DEPENDENCIES d
JOIN INFORMATION_SCHEMA.TASKS t ON d.task_name = t.task_name
JOIN INFORMATION_SCHEMA.TASKS p ON d.downstream_task_name = p.task_name
WHERE t.state = 'BLOCKED'
ORDER BY t.task_name;

-- Example 16: Stream consumption monitoring
-- Track stream lag to prevent unbounded growth
SELECT 
    stream_name,
    table_name,
    SYSTEM$STREAM_HAS_DATA(stream_name) AS has_pending_data,
    created_on,
    owner_name
FROM INFORMATION_SCHEMA.STREAMS
ORDER BY created_on DESC;

-- Example 17: Stream consumption pattern with error handling
CREATE OR REPLACE PROCEDURE process_stream_changes()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
DECLARE
    change_count INTEGER;
BEGIN
    -- Count available changes in the stream
    SELECT COUNT(*) INTO change_count FROM sales_stream;
    
    -- Process only if changes exist
    IF (change_count > 0) THEN
        -- Process INSERT operations
        INSERT INTO sales_archive (id, product, amount, updated_at)
        SELECT id, product, amount, updated_at
        FROM sales_stream
        WHERE METADATA$ACTION = 'INSERT';
        
        -- Process UPDATE operations
        UPDATE sales_archive t
        SET 
            product = s.product,
            amount = s.amount,
            updated_at = s.updated_at
        FROM sales_stream s
        WHERE t.id = s.id 
          AND s.METADATA$ACTION = 'UPDATE'
          AND s.METADATA$ISUPDATE = TRUE;
        
        -- Process DELETE operations
        DELETE FROM sales_archive t
        WHERE EXISTS (
            SELECT 1 FROM sales_stream s
            WHERE t.id = s.id 
              AND s.METADATA$ACTION = 'DELETE'
        );
        
        RETURN 'Processed ' || change_count || ' changes';
    ELSE
        RETURN 'No changes to process';
    END IF;
END;
$$;

-- Example 18: Task graph with conditional execution
CREATE OR REPLACE TASK conditional_etl_task
    WAREHOUSE = 'etl_wh'
    SCHEDULE = 'USING CRON 0 2 * * * America/New_York'
AS
BEGIN
    -- Only process if source data exists
    IF (SELECT COUNT(*) FROM source_table WHERE load_date = CURRENT_DATE()) > 0 THEN
        INSERT INTO target_table 
        SELECT * FROM source_table 
        WHERE load_date = CURRENT_DATE();
    END IF;
END;

-- Example 19: Multi-cluster warehouse for task execution
-- Ensures sufficient concurrency for parallel task graphs
CREATE OR REPLACE TASK high_volume_etl
    WAREHOUSE = 'etl_wh_scaled'
    SCHEDULE = 'USING CRON 0 1 * * * America/New_York'
AS
    INSERT INTO aggregated_data 
    SELECT region, SUM(amount) 
    FROM raw_sales 
    GROUP BY region;

-- Configure warehouse for auto-scaling
ALTER WAREHOUSE etl_wh_scaled 
    SET 
        MIN_CLUSTER_COUNT = 1,
        MAX_CLUSTER_COUNT = 5,
        SCALING_POLICY = 'ECONOMY';

-- Example 20: Drop stream (requires no pending changes or explicitly drop)
-- Streams with unconsumed data can be dropped (offset is lost)
DROP STREAM IF EXISTS old_stream CASCADE;

Performance Metrics

MetricTargetWarningCritical
Stream Consumption Latency< 5 min5-15 min> 15 min
Task Execution Time< 30 min30-60 min> 60 min
Task Failure Rate< 1%1-5%> 5%
Stream Offset Lag< 100 changes100-1000> 1000
Task Graph Completion< 2 hours2-4 hours> 4 hours

Best Practices

  1. Monitor stream offsets: Regularly check stream consumption to prevent unbounded growth. Set up alerts for excessive offset lag.

  2. Use appropriate stream types: Choose STANDARD for complete CDC, APPEND-ONLY for immutable data, and INSERT-ONLY for simple ingestion.

  3. Implement idempotent processing: Design stream consumers to handle duplicate processing safely. Use MERGE operations for upserts.

  4. Dedicate warehouses for tasks: Create separate warehouses for task execution to isolate scheduled workloads from interactive queries.

  5. Use task graphs for workflows: Replace complex stored procedures with task graphs for better visibility, error handling, and maintainability.

  6. Set appropriate schedules: Use CRON expressions for complex schedules. Consider time zones and business hours when scheduling.

  7. Implement error handling: Configure ERROR_INTEGRATION for failure notifications. Log errors to tables for debugging.

  8. Monitor task history: Regularly review task execution history to identify patterns, bottlenecks, and failures.

  9. Use ALLOW_OVERLAPPING_EXECUTION carefully: Set to FALSE for tasks with state dependencies. Set to TRUE for independent tasks.

  10. Clean up old streams: Remove streams that are no longer needed to reduce metadata overhead and improve performance.


Additional Theory: Stream Offset Management

Understanding stream offsets is critical for reliable CDC implementations. A stream offset tracks the position in the change log, advancing automatically when changes are consumed. If a consumer processes changes but fails before the offset advances, the same changes will be re-delivered on the next query. This idempotent behavior ensures at-least-once delivery semantics.

Offset lifecycle:

  1. Changes occur on source table (INSERT/UPDATE/DELETE)
  2. Stream records changes in append-only log with metadata
  3. Consumer queries stream and processes changes
  4. Stream offset advances to the next unread position
  5. Subsequent queries return only new changes

Key consideration: Stream offsets are stored as metadata. If a stream is dropped, all unconsumed changes are lost. Always ensure consumers have processed changes before dropping streams.

Additional Theory: Task Graph Error Propagation

Task graphs implement a fault-tolerant execution model:

  • Success: Child tasks execute normally
  • Failure: All downstream children are automatically BLOCKED
  • Recovery: Resuming a failed task re-evaluates child dependencies
  • Concurrency: Independent branches execute in parallel on separate warehouse resources

This model prevents cascading failures and ensures data consistency across complex ETL workflows.

See Also

⭐

Premium Content

Streams & Tasks: Change Tracking & Scheduled Processing

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Snowflake Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement