πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Snowflake Advanced Task Patterns

🟒 Free Lesson

Advertisement

Snowflake Advanced Task Patterns

Advanced task patterns in Snowflake enable complex workflow orchestration with dependencies, error handling, conditional logic, and dynamic SQL execution.

Advanced Task DAG: Error Handling FlowEntry TaskCRON ScheduleIF ConditionCheck DataTask AProcess DataTask BHandle ErrorTask CValidateD1: NotifyD2: LogTRUEFALSETRY/CATCH pattern with conditional branching and parallel notifications

What are Advanced Tasks?

  • Build complex DAGs with AFTER dependencies
  • Use TRY/CATCH for resilient error handling
  • Execute dynamic SQL with EXECUTE IMMEDIATE

Architecture Overview

The advanced task workflow architecture follows this flow:

  1. Entry Task (CRON: 0 6 * * *) β†’ IF condition
  2. Task A: Process Data (when condition = TRUE) or Task B: Handle Error (when condition = FALSE)
  3. Join β†’ Task C: Validate
  4. Fork β†’ Task D1: Notify and Task D2: Notify

Task History & Monitoring

  • Execution history and error tracking
  • Performance metrics (duration, credits)
  • Dependency visualization (DAG)

Scheduling Patterns

  • CRON expressions
  • Event-driven triggers
  • Dependency chains

Error Handling

  • TRY/CATCH blocks
  • Automatic retries
  • Alert on failure

Dynamic Operations

  • Dynamic SQL execution
  • Parameterized queries
  • Conditional logic

Task Dependencies

DAG Workflow

-- Create task DAG
CREATE OR REPLACE TASK task_1 WAREHOUSE = wh AS INSERT INTO t1 SELECT 1;
CREATE OR REPLACE TASK task_2 WAREHOUSE = wh AFTER task_1 AS INSERT INTO t2 SELECT 2;
CREATE OR REPLACE TASK task_3 WAREHOUSE = wh AFTER task_1 AS INSERT INTO t3 SELECT 3;
CREATE OR REPLACE TASK task_4 WAREHOUSE = wh AFTER task_2 AND task_3 AS INSERT INTO t4 SELECT 4;

-- Resume all tasks
ALTER TASK task_1 RESUME;
ALTER TASK task_2 RESUME;
ALTER TASK task_3 RESUME;
ALTER TASK task_4 RESUME;

Conditional Branching

CREATE OR REPLACE TASK conditional_task
  WAREHOUSE = compute_wh
  SCHEDULE = '1 HOUR'
AS
BEGIN
  -- Check condition
  LET record_count := (SELECT COUNT(*) FROM staging_table);
  
  IF (:record_count > 0) THEN
    -- Process data
    INSERT INTO target_table
    SELECT * FROM staging_table;
    
    -- Send success notification
    CALL SYSTEM$SEND_EMAIL(
      'my_integration',
      'admin@company.com',
      'Data Loaded',
      'Successfully loaded ' || :record_count || ' records'
    );
  ELSE
    -- Log no data
    INSERT INTO task_log (task_name, status, message, timestamp)
    VALUES ('conditional_task', 'SKIPPED', 'No records to process', CURRENT_TIMESTAMP());
  END IF;
END;

Dynamic SQL Tasks

CREATE OR REPLACE TASK dynamic_etl_task
  WAREHOUSE = compute_wh
  SCHEDULE = 'USING CRON 0 2 * * * America/New_York'
AS
BEGIN
  -- Get list of tables to process
  LET tables := (SELECT ARRAY_AGG(table_name) FROM metadata_table WHERE active = TRUE);
  
  FOR i IN 0 TO ARRAY_SIZE(:tables) - 1 DO
    LET table_name := :tables[i];
    
    -- Dynamic SQL
    LET sql := 'INSERT INTO target.' || :table_name ||
               ' SELECT * FROM source.' || :table_name ||
               ' WHERE processed = FALSE';
    
    EXECUTE IMMEDIATE :sql;
    
    -- Log execution
    INSERT INTO task_log (task_name, table_name, status, timestamp)
    VALUES ('dynamic_etl', :table_name, 'SUCCESS', CURRENT_TIMESTAMP());
  END FOR;
END;

Error Handling Patterns

CREATE OR REPLACE TASK resilient_task
  WAREHOUSE = compute_wh
  SCHEDULE = '5 MINUTE'
AS
BEGIN
  -- Try block
  BEGIN
    INSERT INTO target_table
    SELECT * FROM source_table WHERE processed = FALSE;
    
    -- Update source as processed
    UPDATE source_table SET processed = TRUE WHERE processed = FALSE;
  EXCEPTION
    WHEN OTHER THEN
      -- Log error
      INSERT INTO error_log (task_name, error_code, error_message, timestamp)
      VALUES ('resilient_task', SQLCODE, SQLERRM, CURRENT_TIMESTAMP());
      
      -- Send alert
      CALL SYSTEM$SEND_EMAIL(
        'alert_integration',
        'ops@company.com',
        'Task Failed: resilient_task',
        'Error: ' || SQLERRM
      );
  END;
END;

Task Monitoring

-- Task execution history
SELECT
  name,
  state,
  completed_time,
  scheduled_time,
  error_code,
  error_message,
  DATEDIFF(second, scheduled_time, completed_time) as duration_seconds
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
  START_TIME => DATEADD(day, -7, CURRENT_TIMESTAMP()),
  TASK_NAME => 'resilient_task'
))
ORDER BY scheduled_time DESC;

-- Task dependency analysis
SELECT
  t.name as task_name,
  t.state,
  t.schedule,
  t.allow_overlapping_execution,
  t.error_integration
FROM TABLE(INFORMATION_SCHEMA.TASKS()) t
ORDER BY t.name;

Use task dependencies to create complex workflows. Snowflake automatically handles parallel execution and failure propagation. Monitor task history for performance optimization and error tracking.

Advanced Patterns

PatternImplementationUse Case
DAGMultiple AFTER dependenciesComplex ETL
Fan-outMultiple tasks after oneParallel processing
Fan-inMultiple tasks before oneAggregation
ConditionalIF/ELSE in stored procDecision trees
DynamicEXECUTE IMMEDIATESchema-aware
  • Task dependencies enable complex workflow orchestration
  • Conditional branching allows decision-based execution
  • Dynamic SQL supports schema-aware processing
  • TRY/CATCH provides resilient error handling
  • Task history enables monitoring and debugging
⭐

Premium Content

Snowflake Advanced Task Patterns

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Snowflake Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement