Streams & Tasks: Change Tracking & Scheduled Processing
Architecture Diagram 1: Streams Change Tracking Architecture
Architecture Diagram 2: Tasks Scheduling Architecture
Architecture Diagram 3: Task Graphs & Dependencies
Architecture Diagram 4: Stream Change Log Internals
A stream provides change data capture (CDC) by tracking INSERT, UPDATE, and DELETE operations on a source table. Streams maintain an append-only change log with metadata (METADATAISUPDATE, METADATA$ROW_ID) enabling consumers to process only changed data since the last consumption.
A task is a scheduling unit that executes SQL statements, stored procedures, or scripts on a defined schedule (CRON expression or interval). Tasks support dependencies (AFTER clause) for workflow orchestration, error handling, and automatic retry.
Task Scheduling Model
- CRON syntax: minute hour day-of-month month day-of-week
- Examples:
0 2 * * *(daily 2 AM),*/5 * * * *(every 5 min),0 9-17 * * 1-5(hourly 9β5 weekdays) - Overlap control: ALLOW_OVERLAPPING_EXECUTION=FALSE prevents concurrent runs
- Dependency resolution: Child task runs only after ALL parent tasks succeed
- Graph execution: DAG of tasks with parallel branches and synchronization points
Streams support three types: STANDARD (all DML), APPEND-ONLY (inserts only), INSERT-ONLY (new rows only). Choose STANDARD for complete CDC, APPEND-ONLY for immutable data, INSERT-ONLY for simplest overhead.
- Streams: Append-only CDC with metadata for INSERT/UPDATE/DELETE tracking
- Tasks: CRON-based scheduling with dependency graphs (DAGs)
- Idempotent consumption: Same changes processed multiple times produce same result
- Task states: SCHEDULED -> EXECUTING -> SUCCEEDED/FAILED/BLOCKED
- Error propagation: Parent failure blocks all child tasks in the graph
Detailed Explanation
What are Streams?
Streams provide change data capture (CDC) by tracking INSERT, UPDATE, and DELETE operations on a source table via an append-only change log.
Stream Metadata
| Column | Description |
|---|---|
METADATA$ACTION | DML operation type (INSERT, UPDATE, DELETE) |
METADATA$ISUPDATE | TRUE if row was updated |
METADATA$ROW_ID | Unique change event identifier |
Stream Types
- STANDARD β tracks all DML (INSERT, UPDATE, DELETE)
- APPEND-ONLY β tracks only inserts (lower overhead)
- INSERT-ONLY β new rows only (lightweight)
Stream Consumption Patterns
- Stream returns current state of changed data + metadata
- UPDATE operations contain both old and new values
- Offset advances automatically after consumption
- Idempotent β same changes processed multiple times produce same result
Tasks: Scheduled Processing
- CRON-based or interval-based scheduling
- Execution states: SCHEDULED β EXECUTING β SUCCEEDED/FAILED/BLOCKED
ALLOW_OVERLAPPING_EXECUTIONcontrols concurrent runs
Task Graphs and Dependencies
- Define dependencies via
AFTERclause β directed acyclic graph (DAG) - Child tasks run only after ALL parent tasks succeed
- Supports parallel execution and synchronization points
- Error propagation: parent failure blocks all children
Error Handling and Monitoring
ERROR_INTEGRATIONfor failure notificationsTASK_HISTORYtable for execution detailsSUSPEND_TASK_AFTER_NUM_FAILURESfor auto-suspend
Key Takeaway: Monitor stream offsets regularly. Use STANDARD for complete CDC, APPEND-ONLY for immutable data, INSERT-ONLY for simplest overhead.
Key Concepts Table
| Stream Type | Tracks | Metadata | Use Case |
|---|---|---|---|
| STANDARD | INSERT, UPDATE, DELETE | Full change tracking | Complete CDC |
| APPEND-ONLY | INSERT only | Insert metadata | Immutable data |
| INSERT-ONLY | INSERT only | Minimal overhead | Append-only logs |
| Task State | Description | Next State |
|---|---|---|
| SCHEDULED | Waiting for execution time | EXECUTING |
| EXECUTING | Currently running | SUCCEEDED/FAILED |
| SUCCEEDED | Completed successfully | SCHEDULED |
| FAILED | Encountered error | SCHEDULED/BLOCKED |
| BLOCKED | Waiting for dependency | SCHEDULED |
| Dependency Type | Behavior | Use Case |
|---|---|---|
| AFTER (single) | Wait for one parent | Sequential processing |
| AFTER (multiple) | Wait for all parents | Synchronization point |
| Root task | No parents | Entry point for workflow |
| Stream Parameter | Default | Description |
|---|---|---|
| SHOW_INITIAL_ROWS | FALSE | Include existing rows as INSERT events on first query |
| APPEND_ONLY | FALSE | Track only INSERT operations |
| INSERT_ONLY | FALSE | Track only new rows (no change metadata) |
| COMMENT | NULL | Descriptive text for the stream object |
| Task Parameter | Default | Description |
|---|---|---|
| WAREHOUSE | Required | Virtual warehouse for task execution |
| SCHEDULE | Required | CRON expression or interval for execution timing |
| ALLOW_OVERLAPPING_EXECUTION | FALSE | Permit concurrent task runs |
| ERROR_INTEGRATION | NULL | Notification service for failure alerts |
| USER_TASK_TIMEOUT_MS | 3600000 | Maximum execution time (ms) before timeout |
| SUSPEND_TASK_AFTER_NUM_FAILURES | 10 | Auto-suspend after N consecutive failures |
| MAX_CONCURRENCY_LEVEL | 1 | Max parallel executions for task graph |
Code Examples
Streams: Core Operations
-- Example 1: Create standard stream
-- SHOW_INITIAL_ROWS=TRUE captures existing data as INSERT events
-- This is useful for backfilling downstream tables
CREATE OR REPLACE STREAM sales_stream
ON TABLE sales_transactions
SHOW_INITIAL_ROWS = TRUE -- Include current rows as initial INSERT events
COMMENT = 'CDC stream for sales transactions';
-- Example 2: Create append-only stream
-- APPEND_ONLY=TRUE reduces overhead by tracking only inserts
-- Cannot detect UPDATE or DELETE operations
CREATE OR REPLACE STREAM immutable_log_stream
ON TABLE audit_log
APPEND_ONLY = TRUE -- Only track INSERT operations
COMMENT = 'Append-only stream for audit logs';
-- Example 3: Query stream with change metadata
-- METADATA$ACTION: The DML operation (INSERT, UPDATE, DELETE)
-- METADATA$ISUPDATE: TRUE if the row was updated (not a new insert)
-- METADATA$ROW_ID: Unique identifier for this change event
SELECT
*,
METADATA$ACTION as change_action,
METADATA$ISUPDATE as is_update,
METADATA$ROW_ID as row_id
FROM sales_stream;
-- Example 4: Process stream with merge
-- MERGE handles INSERT, UPDATE, and DELETE in a single atomic operation
-- METADATA$ACTION determines which WHEN clause to execute
MERGE INTO sales_target t
USING sales_stream s
ON t.id = s.id
WHEN MATCHED AND s.METADATA$ACTION = 'DELETE' THEN
DELETE
WHEN MATCHED AND s.METADATA$ACTION = 'UPDATE' THEN
UPDATE SET
t.product = s.product,
t.amount = s.amount,
t.updated_at = s.updated_at
WHEN NOT MATCHED AND s.METADATA$ACTION = 'INSERT' THEN
INSERT (id, product, amount, updated_at)
VALUES (s.id, s.product, s.amount, s.updated_at);
-- Example 5: Stream offset management
-- Check current stream offset (how many unconsumed changes exist)
SELECT SYSTEM$STREAM_HAS_DATA('sales_stream') AS has_data;
-- Check stream offset details
SELECT
'sales_stream' AS stream_name,
SYSTEM$STREAM_GET_TABLE_TIMESTAMP('sales_stream') AS table_timestamp,
SYSTEM$STREAM_HAS_DATA('sales_stream') AS has_pending_changes;
-- Example 6: Create insert-only stream for append-only source tables
-- INSERT_ONLY is the most lightweight stream type
-- Best for tables that only receive INSERT operations (e.g., logs, events)
CREATE OR REPLACE STREAM event_log_stream
ON TABLE event_log
INSERT_ONLY = TRUE
COMMENT = 'Lightweight stream for append-only event logs';
-- Example 7: Stream on external tables for data lake CDC
-- Streams can track changes on external tables (Parquet, JSON, etc.)
CREATE OR REPLACE STREAM external_data_stream
ON TABLE external_orders
APPEND_ONLY = TRUE
COMMENT = 'CDC stream for external table data lake ingestion';
Tasks: Scheduling and Workflows
-- Example 8: Create basic task
-- CRON syntax: minute hour day-of-month month day-of-week
-- '0 3 * * *' means daily at 3:00 AM
-- America/New_York timezone ensures correct local execution
CREATE OR REPLACE TASK daily_cleanup_task
WAREHOUSE = 'maintenance_wh' -- Dedicated warehouse for task
SCHEDULE = 'USING CRON 0 3 * * * America/New_York' -- Daily at 3 AM EST
COMMENT = 'Daily cleanup of old data'
AS
DELETE FROM audit_log
WHERE created_at < DATEADD(day, -90, CURRENT_TIMESTAMP());
-- Example 9: Create task with error handling
-- ERROR_INTEGRATION specifies notification service for failures
-- ALLOW_OVERLAPPING_EXECUTION=FALSE prevents concurrent runs
-- BEGIN...EXCEPTION block provides structured error handling
CREATE OR REPLACE TASK etl_task
WAREHOUSE = 'etl_wh'
SCHEDULE = 'USING CRON 0 2 * * * America/New_York'
ERROR_INTEGRATION = 's3_error_notifications' -- Notify on failure
ALLOW_OVERLAPPING_EXECUTION = FALSE -- No concurrent executions
USER_TASK_TIMEOUT_MS = 7200000 -- 2-hour timeout
SUSPEND_TASK_AFTER_NUM_FAILURES = 5 -- Auto-suspend after 5 failures
AS
BEGIN
-- Log start
INSERT INTO task_log (task_name, start_time, status)
VALUES ('etl_task', CURRENT_TIMESTAMP(), 'RUNNING');
-- Execute ETL
INSERT INTO target_table
SELECT * FROM source_table
WHERE date = CURRENT_DATE();
-- Log success
INSERT INTO task_log (task_name, start_time, status, end_time)
VALUES ('etl_task', CURRENT_TIMESTAMP(), 'SUCCESS', CURRENT_TIMESTAMP());
EXCEPTION
WHEN OTHER THEN
-- Log failure with error details
INSERT INTO task_log (task_name, start_time, status, error_message)
VALUES ('etl_task', CURRENT_TIMESTAMP(), 'FAILED', SQLERRM);
RAISE; -- Re-raise to mark task as FAILED
END;
-- Example 10: Interval-based scheduling (alternative to CRON)
-- Runs every 15 minutes using INTERVAL syntax
CREATE OR REPLACE TASK frequent_sync_task
WAREHOUSE = 'sync_wh'
SCHEDULE = '60 * * * *' -- Every 60 minutes
AS
INSERT INTO staging_data
SELECT * FROM source_table
WHERE updated_at > DATEADD(minute, -60, CURRENT_TIMESTAMP());
Task Graphs: Complex Workflows
-- Example 11: Create task graph (DAG)
-- Root task runs on schedule; child tasks run after parent succeeds
-- Parallel branches execute independently; final_task waits for all
CREATE OR REPLACE TASK root_task
WAREHOUSE = 'etl_wh'
SCHEDULE = 'USING CRON 0 2 * * * America/New_York'
AS
INSERT INTO raw_sales SELECT * FROM stage_sales;
CREATE OR REPLACE TASK child_task_1
WAREHOUSE = 'etl_wh'
AFTER root_task -- Runs after root_task completes successfully
AS
INSERT INTO clean_sales SELECT * FROM raw_sales WHERE amount > 0;
CREATE OR REPLACE TASK child_task_2
WAREHOUSE = 'etl_wh'
AFTER root_task -- Parallel branch: also runs after root_task
AS
INSERT INTO raw_inventory SELECT * FROM stage_inventory;
CREATE OR REPLACE TASK final_task
WAREHOUSE = 'etl_wh'
AFTER child_task_1, child_task_2 -- Synchronization point: waits for BOTH
AS
INSERT INTO analytics_sales
SELECT s.*, i.stock
FROM clean_sales s
JOIN raw_inventory i ON s.product_id = i.product_id;
-- Example 12: Resume/suspend task graph
-- Must resume child tasks before parent tasks
ALTER TASK child_task_1 RESUME;
ALTER TASK child_task_2 RESUME;
ALTER TASK root_task RESUME;
-- Suspend in reverse order (parent first)
ALTER TASK root_task SUSPEND;
ALTER TASK child_task_1 SUSPEND;
ALTER TASK child_task_2 SUSPEND;
-- Example 13: Alter task schedule
ALTER TASK daily_cleanup_task RESUME;
ALTER TASK daily_cleanup_task SUSPEND;
ALTER TASK daily_cleanup_task SET SCHEDULE = 'USING CRON 0 4 * * * America/New_York';
Monitoring and Observability
-- Example 14: Monitor task execution history
-- TASK_HISTORY provides detailed execution metrics per task run
SELECT
task_name,
query_id,
state, -- SUCCEEDED, FAILED, BLOCKED, EXECUTING
scheduled_time, -- When task was scheduled to run
started_time, -- When task actually started executing
completed_time, -- When task finished
DATEDIFF(second, started_time, completed_time) AS duration_seconds,
error_code,
error_message
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
SCHEDULED_TIME_RANGE_START => DATEADD(day, -7, CURRENT_TIMESTAMP())
))
ORDER BY scheduled_time DESC;
-- Example 15: Monitor task graph dependencies
-- Identify blocked tasks and their parent dependencies
SELECT
t.task_name,
t.state AS task_state,
d.dependency_type,
d.downstream_task_name AS parent_task,
p.state AS parent_state
FROM INFORMATION_SCHEMA.TASK_DEPENDENCIES d
JOIN INFORMATION_SCHEMA.TASKS t ON d.task_name = t.task_name
JOIN INFORMATION_SCHEMA.TASKS p ON d.downstream_task_name = p.task_name
WHERE t.state = 'BLOCKED'
ORDER BY t.task_name;
-- Example 16: Stream consumption monitoring
-- Track stream lag to prevent unbounded growth
SELECT
stream_name,
table_name,
SYSTEM$STREAM_HAS_DATA(stream_name) AS has_pending_data,
created_on,
owner_name
FROM INFORMATION_SCHEMA.STREAMS
ORDER BY created_on DESC;
-- Example 17: Stream consumption pattern with error handling
CREATE OR REPLACE PROCEDURE process_stream_changes()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
DECLARE
change_count INTEGER;
BEGIN
-- Count available changes in the stream
SELECT COUNT(*) INTO change_count FROM sales_stream;
-- Process only if changes exist
IF (change_count > 0) THEN
-- Process INSERT operations
INSERT INTO sales_archive (id, product, amount, updated_at)
SELECT id, product, amount, updated_at
FROM sales_stream
WHERE METADATA$ACTION = 'INSERT';
-- Process UPDATE operations
UPDATE sales_archive t
SET
product = s.product,
amount = s.amount,
updated_at = s.updated_at
FROM sales_stream s
WHERE t.id = s.id
AND s.METADATA$ACTION = 'UPDATE'
AND s.METADATA$ISUPDATE = TRUE;
-- Process DELETE operations
DELETE FROM sales_archive t
WHERE EXISTS (
SELECT 1 FROM sales_stream s
WHERE t.id = s.id
AND s.METADATA$ACTION = 'DELETE'
);
RETURN 'Processed ' || change_count || ' changes';
ELSE
RETURN 'No changes to process';
END IF;
END;
$$;
-- Example 18: Task graph with conditional execution
CREATE OR REPLACE TASK conditional_etl_task
WAREHOUSE = 'etl_wh'
SCHEDULE = 'USING CRON 0 2 * * * America/New_York'
AS
BEGIN
-- Only process if source data exists
IF (SELECT COUNT(*) FROM source_table WHERE load_date = CURRENT_DATE()) > 0 THEN
INSERT INTO target_table
SELECT * FROM source_table
WHERE load_date = CURRENT_DATE();
END IF;
END;
-- Example 19: Multi-cluster warehouse for task execution
-- Ensures sufficient concurrency for parallel task graphs
CREATE OR REPLACE TASK high_volume_etl
WAREHOUSE = 'etl_wh_scaled'
SCHEDULE = 'USING CRON 0 1 * * * America/New_York'
AS
INSERT INTO aggregated_data
SELECT region, SUM(amount)
FROM raw_sales
GROUP BY region;
-- Configure warehouse for auto-scaling
ALTER WAREHOUSE etl_wh_scaled
SET
MIN_CLUSTER_COUNT = 1,
MAX_CLUSTER_COUNT = 5,
SCALING_POLICY = 'ECONOMY';
-- Example 20: Drop stream (requires no pending changes or explicitly drop)
-- Streams with unconsumed data can be dropped (offset is lost)
DROP STREAM IF EXISTS old_stream CASCADE;
Performance Metrics
| Metric | Target | Warning | Critical |
|---|---|---|---|
| Stream Consumption Latency | < 5 min | 5-15 min | > 15 min |
| Task Execution Time | < 30 min | 30-60 min | > 60 min |
| Task Failure Rate | < 1% | 1-5% | > 5% |
| Stream Offset Lag | < 100 changes | 100-1000 | > 1000 |
| Task Graph Completion | < 2 hours | 2-4 hours | > 4 hours |
Best Practices
-
Monitor stream offsets: Regularly check stream consumption to prevent unbounded growth. Set up alerts for excessive offset lag.
-
Use appropriate stream types: Choose STANDARD for complete CDC, APPEND-ONLY for immutable data, and INSERT-ONLY for simple ingestion.
-
Implement idempotent processing: Design stream consumers to handle duplicate processing safely. Use MERGE operations for upserts.
-
Dedicate warehouses for tasks: Create separate warehouses for task execution to isolate scheduled workloads from interactive queries.
-
Use task graphs for workflows: Replace complex stored procedures with task graphs for better visibility, error handling, and maintainability.
-
Set appropriate schedules: Use CRON expressions for complex schedules. Consider time zones and business hours when scheduling.
-
Implement error handling: Configure ERROR_INTEGRATION for failure notifications. Log errors to tables for debugging.
-
Monitor task history: Regularly review task execution history to identify patterns, bottlenecks, and failures.
-
Use ALLOW_OVERLAPPING_EXECUTION carefully: Set to FALSE for tasks with state dependencies. Set to TRUE for independent tasks.
-
Clean up old streams: Remove streams that are no longer needed to reduce metadata overhead and improve performance.
Additional Theory: Stream Offset Management
Understanding stream offsets is critical for reliable CDC implementations. A stream offset tracks the position in the change log, advancing automatically when changes are consumed. If a consumer processes changes but fails before the offset advances, the same changes will be re-delivered on the next query. This idempotent behavior ensures at-least-once delivery semantics.
Offset lifecycle:
- Changes occur on source table (INSERT/UPDATE/DELETE)
- Stream records changes in append-only log with metadata
- Consumer queries stream and processes changes
- Stream offset advances to the next unread position
- Subsequent queries return only new changes
Key consideration: Stream offsets are stored as metadata. If a stream is dropped, all unconsumed changes are lost. Always ensure consumers have processed changes before dropping streams.
Additional Theory: Task Graph Error Propagation
Task graphs implement a fault-tolerant execution model:
- Success: Child tasks execute normally
- Failure: All downstream children are automatically BLOCKED
- Recovery: Resuming a failed task re-evaluates child dependencies
- Concurrency: Independent branches execute in parallel on separate warehouse resources
This model prevents cascading failures and ensures data consistency across complex ETL workflows.
See Also
- 04-Time-Travel-and-Cloning - Time travel concepts referenced by streams
- 03-Warehouses-and-Auto-Suspend - Warehouse configuration for task execution
- 14-Monitoring-Queries - Monitoring task execution and stream consumption
- PySpark Iceberg - Iceberg CDC patterns
- Delta Lake on Databricks - Delta Lake Change Data Feed
- Data Warehouse Concepts - Data warehouse design principles