πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Snowflake ETL Pipeline Patterns

🟒 Free Lesson

Advertisement

Snowflake ETL Pipeline Patterns

Effective ETL pipelines in Snowflake combine data ingestion, transformation, and loading with built-in reliability, scalability, and performance.

ETL Pipeline Architecture: Extract, Transform, LoadData SourcesDB, Files, APIsStaging AreaRaw Data StoreTransformBusiness LogicLoad ProcessMERGE/INSERTTarget DWAnalyticsOrchestration (Tasks)CDC (Streams)Pipeline = Extract + Transform + Load + Orchestration
ELT vs ETL ComparisonELT (Extract, Load, Transform)Load raw data first, then transform in-databaseUses Snowflake compute for transformationsBest for: Cloud data warehousesSnowflake recommended patternETL (Extract, Transform, Load)Transform before loading into targetUses external compute for transformationsBest for: Legacy systemsWhen pre-load validation needed

ETL Pipeline Components

1. Extraction Patterns

-- Create external stage for file ingestion
CREATE STAGE my_s3_stage
  URL = 's3://my-bucket/data/'
  STORAGE_INTEGRATION = s3_integration;

-- List files in stage
LIST @my_s3_stage;

-- Copy from external stage
COPY INTO raw_data
  FROM @my_s3_stage
  FILE_FORMAT = (TYPE = PARQUET)
  MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;

-- Create pipe for continuous ingestion
CREATE PIPE raw_data_pipe
  AUTO_INGEST = TRUE
  AS
  COPY INTO raw_data
  FROM @my_s3_stage
  FILE_FORMAT = (TYPE = PARQUET);

2. Transformation Patterns

-- Create transformation procedure
CREATE OR REPLACE PROCEDURE transform_customer_data()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
  -- Truncate target table
  TRUNCATE TABLE silver.customers;

  -- Transform and load
  INSERT INTO silver.customers (
    customer_id,
    customer_name,
    email,
    created_date,
    updated_date
  )
  SELECT
    RAW:customer_id::INTEGER,
    INITCAP(RAW:first_name::STRING || ' ' || RAW:last_name::STRING),
    LOWER(RAW:email::STRING),
    RAW:created_date::DATE,
    CURRENT_TIMESTAMP()
  FROM bronze.raw_customers
  WHERE RAW:customer_id IS NOT NULL;

  RETURN 'SUCCESS: Customer data transformed';
END;
$$;

3. Loading Patterns

-- MERGE pattern for upserts
MERGE INTO silver.orders AS target
USING (
  SELECT
    order_id,
    customer_id,
    order_date,
    amount,
    status
  FROM bronze.raw_orders
) AS source
ON target.order_id = source.order_id
WHEN MATCHED AND source.order_date > target.order_date THEN
  UPDATE SET
    customer_id = source.customer_id,
    order_date = source.order_date,
    amount = source.amount,
    status = source.status,
    updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN
  INSERT (order_id, customer_id, order_date, amount, status, created_at)
  VALUES (source.order_id, source.customer_id, source.order_date, source.amount, source.status, CURRENT_TIMESTAMP());

-- INSERT OVERWRITE pattern
INSERT OVERWRITE INTO silver.daily_summary
SELECT
  order_date,
  COUNT(*) as order_count,
  SUM(amount) as total_revenue
FROM bronze.raw_orders
GROUP BY order_date;

Pipeline Orchestration

Using Tasks

-- Create task hierarchy
CREATE OR REPLACE TASK extract_task
  WAREHOUSE = 'COMPUTE_WH'
  SCHEDULE = 'USING CRON 0 2 * * * America/New_York'
AS
  CALL extract_source_data();

CREATE OR REPLACE TASK transform_task
  WAREHOUSE = 'COMPUTE_WH'
  AFTER extract_task
AS
  CALL transform_customer_data();

CREATE OR REPLACE TASK load_task
  WAREHOUSE = 'COMPUTE_WH'
  AFTER transform_task
AS
  CALL load_to_target();

-- Execute pipeline
EXECUTE TASK extract_task;

-- Monitor task execution
SELECT
  task_name,
  query_id,
  state,
  scheduled_time,
  completed_time
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
  SCHEDULED_TIME_RANGE_START => DATEADD('hour', -24, CURRENT_TIMESTAMP())
))
ORDER BY scheduled_time DESC;

Using Streams and Tasks

-- Create stream for change data capture
CREATE STREAM customer_changes
  ON TABLE bronze.raw_customers
  APPEND_ONLY = FALSE
  SHOW_INITIAL_ROWS = TRUE;

-- Create task to process changes
CREATE OR REPLACE TASK process_customer_changes
  WAREHOUSE = 'COMPUTE_WH'
  SCHEDULE = 'USING CRON * * * * * America/New_York'
AS
  MERGE INTO silver.customers AS target
  USING (
    SELECT
      RAW:customer_id::INTEGER as customer_id,
      RAW:first_name::STRING as first_name,
      RAW:last_name::STRING as last_name,
      RAW:email::STRING as email,
      METADATA$ACTION as action,
      METADATA$ISUPDATE as is_update
    FROM customer_changes
    WHERE METADATA$ACTION = 'INSERT'
  ) AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED AND source.is_update THEN
    UPDATE SET
      first_name = source.first_name,
      last_name = source.last_name,
      email = source.email,
      updated_at = CURRENT_TIMESTAMP()
  WHEN NOT MATCHED THEN
    INSERT (customer_id, first_name, last_name, email, created_at)
    VALUES (source.customer_id, source.first_name, source.last_name, source.email, CURRENT_TIMESTAMP());

Error Handling

-- Create error handling procedure
CREATE OR REPLACE PROCEDURE execute_with_error_handling(
  procedure_name STRING
)
RETURNS STRING
LANGUAGE SQL
AS
$$
DECLARE
  result STRING;
  error_message STRING;
BEGIN
  BEGIN TRANSACTION;
    EXECUTE IMMEDIATE 'CALL ' || procedure_name || '()';
  COMMIT;

  RETURN 'SUCCESS: ' || procedure_name;
EXCEPTION
  WHEN OTHER THEN
    ROLLBACK;
    error_message := SQLERRM;
    INSERT INTO pipeline_errors (
      procedure_name,
      error_message,
      error_timestamp
    ) VALUES (
      procedure_name,
      error_message,
      CURRENT_TIMESTAMP()
    );
    RETURN 'ERROR: ' || error_message;
END;
$$;

-- Monitor pipeline errors
SELECT
  procedure_name,
  error_message,
  error_timestamp
FROM pipeline_errors
WHERE error_timestamp >= DATEADD('day', -1, CURRENT_TIMESTAMP())
ORDER BY error_timestamp DESC;

Always implement error handling and logging in ETL pipelines. Use transactions to ensure atomicity and rollback capabilities for data integrity.

Performance Optimization

-- Use staging tables for bulk operations
CREATE TEMPORARY TABLE temp_stage AS
SELECT * FROM source_table;

-- Bulk insert from staging
INSERT INTO target_table
SELECT * FROM temp_stage;

-- Use parallel processing
ALTER WAREHOUSE compute_wh SET
  MAX_CLUSTER_COUNT = 4,
  SCALING_POLICY = 'ECONOMY';

-- Monitor pipeline performance
SELECT
  query_id,
  query_text,
  execution_time_ms,
  bytes_scanned,
  rows_produced
FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY(
  START_TIME => DATEADD('hour', -1, CURRENT_TIMESTAMP())
))
WHERE query_text LIKE '%INSERT%' OR query_text LIKE '%MERGE%'
ORDER BY execution_time_ms DESC;

Pipeline Monitoring

-- Create pipeline monitoring view
CREATE OR REPLACE VIEW pipeline_monitoring AS
SELECT
  task_name,
  state,
  scheduled_time,
  completed_time,
  TIMESTAMPDIFF('second', scheduled_time, completed_time) as duration_seconds,
  query_id,
  error_message
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
  SCHEDULED_TIME_RANGE_START => DATEADD('day', -7, CURRENT_TIMESTAMP())
))
ORDER BY scheduled_time DESC;

-- Monitor data freshness
SELECT
  table_name,
  MAX(updated_at) as last_updated,
  TIMESTAMPDIFF('hour', MAX(updated_at), CURRENT_TIMESTAMP()) as hours_since_update
FROM silver.customers
GROUP BY table_name;

ETL Best Practices

PatternImplementationBenefit
Incremental ProcessingStreams + TasksEfficient updates
Idempotent OperationsMERGE statementsSafe retries
Error HandlingTRY/CATCH blocksReliable execution
LoggingAudit tablesTroubleshooting
MonitoringTask history viewsPipeline visibility
OptimizationWarehouse scalingPerformance

Summary

Key Takeaways

ETL pipelines combine extraction, transformation, and loading with built-in reliability.

Snowflake Tasks enable scheduled and dependent execution for orchestration.

Streams provide real-time change data capture at the micro-partition level.

Error handling with transactions ensures pipeline reliability and data integrity.


ETL Pipeline Patterns

PatternImplementationBenefit
Incremental ProcessingStreams + TasksEfficient updates
Idempotent OperationsMERGE statementsSafe retries
Error HandlingTRY/CATCH blocksReliable execution
LoggingAudit tablesTroubleshooting
MonitoringTask history viewsPipeline visibility

Pipeline Implementation Checklist

  1. Extract data using external stages and COPY INTO
  2. Transform using stored procedures with error handling
  3. Load using MERGE for upserts or INSERT OVERWRITE
  4. Orchestrate with Tasks and dependencies
  5. Monitor using Task History views
  6. Implement logging for audit and troubleshooting
⭐

Premium Content

Snowflake ETL Pipeline Patterns

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Snowflake Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement