Snowflake Advanced Task Patterns
Advanced task patterns in Snowflake enable complex workflow orchestration with dependencies, error handling, conditional logic, and dynamic SQL execution.
What are Advanced Tasks?
- Build complex DAGs with AFTER dependencies
- Use TRY/CATCH for resilient error handling
- Execute dynamic SQL with EXECUTE IMMEDIATE
Architecture Overview
The advanced task workflow architecture follows this flow:
- Entry Task (CRON: 0 6 * * *) β IF condition
- Task A: Process Data (when condition = TRUE) or Task B: Handle Error (when condition = FALSE)
- Join β Task C: Validate
- Fork β Task D1: Notify and Task D2: Notify
Task History & Monitoring
- Execution history and error tracking
- Performance metrics (duration, credits)
- Dependency visualization (DAG)
Scheduling Patterns
- CRON expressions
- Event-driven triggers
- Dependency chains
Error Handling
- TRY/CATCH blocks
- Automatic retries
- Alert on failure
Dynamic Operations
- Dynamic SQL execution
- Parameterized queries
- Conditional logic
Task Dependencies
DAG Workflow
-- Create task DAG
CREATE OR REPLACE TASK task_1 WAREHOUSE = wh AS INSERT INTO t1 SELECT 1;
CREATE OR REPLACE TASK task_2 WAREHOUSE = wh AFTER task_1 AS INSERT INTO t2 SELECT 2;
CREATE OR REPLACE TASK task_3 WAREHOUSE = wh AFTER task_1 AS INSERT INTO t3 SELECT 3;
CREATE OR REPLACE TASK task_4 WAREHOUSE = wh AFTER task_2 AND task_3 AS INSERT INTO t4 SELECT 4;
-- Resume all tasks
ALTER TASK task_1 RESUME;
ALTER TASK task_2 RESUME;
ALTER TASK task_3 RESUME;
ALTER TASK task_4 RESUME;
Conditional Branching
CREATE OR REPLACE TASK conditional_task
WAREHOUSE = compute_wh
SCHEDULE = '1 HOUR'
AS
BEGIN
-- Check condition
LET record_count := (SELECT COUNT(*) FROM staging_table);
IF (:record_count > 0) THEN
-- Process data
INSERT INTO target_table
SELECT * FROM staging_table;
-- Send success notification
CALL SYSTEM$SEND_EMAIL(
'my_integration',
'admin@company.com',
'Data Loaded',
'Successfully loaded ' || :record_count || ' records'
);
ELSE
-- Log no data
INSERT INTO task_log (task_name, status, message, timestamp)
VALUES ('conditional_task', 'SKIPPED', 'No records to process', CURRENT_TIMESTAMP());
END IF;
END;
Dynamic SQL Tasks
CREATE OR REPLACE TASK dynamic_etl_task
WAREHOUSE = compute_wh
SCHEDULE = 'USING CRON 0 2 * * * America/New_York'
AS
BEGIN
-- Get list of tables to process
LET tables := (SELECT ARRAY_AGG(table_name) FROM metadata_table WHERE active = TRUE);
FOR i IN 0 TO ARRAY_SIZE(:tables) - 1 DO
LET table_name := :tables[i];
-- Dynamic SQL
LET sql := 'INSERT INTO target.' || :table_name ||
' SELECT * FROM source.' || :table_name ||
' WHERE processed = FALSE';
EXECUTE IMMEDIATE :sql;
-- Log execution
INSERT INTO task_log (task_name, table_name, status, timestamp)
VALUES ('dynamic_etl', :table_name, 'SUCCESS', CURRENT_TIMESTAMP());
END FOR;
END;
Error Handling Patterns
CREATE OR REPLACE TASK resilient_task
WAREHOUSE = compute_wh
SCHEDULE = '5 MINUTE'
AS
BEGIN
-- Try block
BEGIN
INSERT INTO target_table
SELECT * FROM source_table WHERE processed = FALSE;
-- Update source as processed
UPDATE source_table SET processed = TRUE WHERE processed = FALSE;
EXCEPTION
WHEN OTHER THEN
-- Log error
INSERT INTO error_log (task_name, error_code, error_message, timestamp)
VALUES ('resilient_task', SQLCODE, SQLERRM, CURRENT_TIMESTAMP());
-- Send alert
CALL SYSTEM$SEND_EMAIL(
'alert_integration',
'ops@company.com',
'Task Failed: resilient_task',
'Error: ' || SQLERRM
);
END;
END;
Task Monitoring
-- Task execution history
SELECT
name,
state,
completed_time,
scheduled_time,
error_code,
error_message,
DATEDIFF(second, scheduled_time, completed_time) as duration_seconds
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
START_TIME => DATEADD(day, -7, CURRENT_TIMESTAMP()),
TASK_NAME => 'resilient_task'
))
ORDER BY scheduled_time DESC;
-- Task dependency analysis
SELECT
t.name as task_name,
t.state,
t.schedule,
t.allow_overlapping_execution,
t.error_integration
FROM TABLE(INFORMATION_SCHEMA.TASKS()) t
ORDER BY t.name;
Use task dependencies to create complex workflows. Snowflake automatically handles parallel execution and failure propagation. Monitor task history for performance optimization and error tracking.
Advanced Patterns
| Pattern | Implementation | Use Case |
|---|---|---|
| DAG | Multiple AFTER dependencies | Complex ETL |
| Fan-out | Multiple tasks after one | Parallel processing |
| Fan-in | Multiple tasks before one | Aggregation |
| Conditional | IF/ELSE in stored proc | Decision trees |
| Dynamic | EXECUTE IMMEDIATE | Schema-aware |
- Task dependencies enable complex workflow orchestration
- Conditional branching allows decision-based execution
- Dynamic SQL supports schema-aware processing
- TRY/CATCH provides resilient error handling
- Task history enables monitoring and debugging