Snowpipe: Automated Data Ingestion & Streaming
Architecture Diagram 1: Snowpipe Ingestion Pipeline
Architecture Diagram 2: File Format Processing Pipeline
Architecture Diagram 3: Snowpipe End-to-End Flow
Architecture Diagram 4: Auto-Ingest Configuration Flow
Snowpipe is Snowflake's serverless, continuous data ingestion service that automatically loads data from cloud storage into tables. It operates on a micro-batch model (1β3 minute latency), handling format detection, validation, transformation, compression, and micro-partition creation without manual orchestration.
Idempotent ingestion ensures loading the same file multiple times produces identical results. Snowpipe tracks loaded files in COPY_HISTORY and automatically excludes duplicates, enabling safe retries and data consistency even with duplicate events.
File Size Optimization
- Optimal file size: 100β250 MB compressed
- Too small (< 50 MB): High overhead per micro-partition, reduced parallelism
- Too large (> 500 MB): Timeout risk, reduced parallelism within file
- Parallelism = min(file_count, warehouse_nodes Γ concurrent_copies)
- Throughput β parallelism Γ per_file_throughput_MB/s
Snowpipe charges per file processed (serverless pricing). Use Parquet for analytics workloads (built-in compression, schema info, predicate pushdown). CSV is simpler but lacks these benefits. JSON supports semi-structured data with parsing overhead.
- Create Storage Integration: Define IAM roles for Snowflake access to cloud storage
- Create Pipe: Specify ingestion logic (COPY INTO statement, file format, target table)
- Configure Event Notifications: Set up S3/Azure/GCS triggers to route file events to Snowpipe
- Test Ingestion: Upload sample files and verify data loads correctly
- Monitor: Check PIPE_STATUS for pending files, COPY_HISTORY for load results
- Alert: Set up alerts for pipe lag (pending files > 100) and error rates (> 1%)
- Serverless: No warehouse management β Snowflake handles compute
- Latency: 1β3 minutes end-to-end (file upload β queryable)
- Idempotent: Duplicate files are automatically excluded
- File sizing: Target 100β250 MB compressed for optimal parallelism
- Monitoring: Use PIPE_STATUS, COPY_HISTORY, and PIPE_ERR views
Detailed Explanation
What is Snowpipe?
Snowpipe is Snowflake's serverless, continuous data ingestion service that automates loading from cloud storage. Unlike bulk COPY operations, it operates continuously on a micro-batch model (1β3 minute latency).
Auto-Ingest Architecture
- Leverages cloud-native event notifications (S3 Events, Azure Event Grid, GCS Triggers)
- Event-driven vs. polling: lower latency, reduced API costs, no polling overhead
- Three required components:
- Storage Integration β IAM roles for Snowflake access
- Pipe β COPY INTO logic, file format, target table
- Event Notifications β route file events to Snowpipe
File Format Optimization
| Format | Compression | Schema Support | Best For |
|---|---|---|---|
| CSV | External | None | Simple flat data |
| JSON | External | Full nesting | Semi-structured |
| Parquet | Built-in (Snappy/ZSTD) | Full schema | Analytics workloads |
| ORC | Built-in (Zlib/ZSTD) | Full schema | Hive ecosystem |
| Avro | Built-in (Deflate) | Full schema | Schema evolution |
- Optimal file size: 100β250 MB compressed
- Too small (< 50 MB): high overhead per micro-partition
- Too large (> 500 MB): timeout risk, reduced parallelism
Error Handling and Idempotency
- Automatic retry with exponential backoff for transient failures
- Persistent errors logged to
PIPE_ERRtable - Idempotent: loading the same file multiple times produces identical results
- Duplicate files automatically excluded via
COPY_HISTORYtracking
Monitoring and Alerting
COPY_HISTORYβ completed load operations (rows, bytes, errors)PIPE_STATUSβ real-time pipe health (pending files, last error)LOAD_HISTORYβ historical file loading patterns- Alerts: pipe lag (> 100 files), error rates (> 1%), latency SLAs
Key Takeaway: Use Parquet for analytics workloads, target 100β250 MB files, and leverage Snowpipe's idempotency for safe retries.
Key Concepts Table
| Component | Purpose | Configuration | Cost Model |
|---|---|---|---|
| Storage Integration | Cloud access permissions | IAM roles, external stages | Free |
| Pipe | Ingestion logic definition | SQL COPY statement | Free (compute charged separately) |
| Auto-Ingest | Event-driven loading | Cloud event notifications | Serverless (per file processed) |
| COPY_HISTORY | Load operation tracking | Automatic | Free |
| PIPE_ERR | Error logging | Automatic | Free |
| File Format | Compression | Schema Support | Best For |
|---|---|---|---|
| CSV | External | None | Simple flat data |
| JSON | External | Full nesting | Semi-structured data |
| Parquet | Built-in (Snappy/ZSTD) | Full schema | Analytics workloads |
| ORC | Built-in (Zlib/ZSTD) | Full schema | Hive ecosystem |
| Avro | Built-in (Deflate) | Full schema | Schema evolution |
| Metric | Target | Warning | Critical |
|---|---|---|---|
| Ingestion Latency | < 3 min | 3-5 min | > 5 min |
| File Size | 100-250 MB | 50-100 MB | < 50 MB or > 500 MB |
| Error Rate | < 0.1% | 0.1-1% | > 1% |
| Pipe Lag | < 10 files | 10-100 files | > 100 files |
| Daily Throughput | Per SLA | 80-100% of SLA | > 100% of SLA |
Code Examples
-- Example 1: Create storage integration for S3
CREATE STORAGE INTEGRATION s3_sales_integration
TYPE = EXTERNAL_STAGE
ENABLED = TRUE
STORAGE_PROVIDER = S3
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::123456789012:role/SnowflakeAccessRole'
STORAGE_ALLOWED_LOCATIONS = ('s3://data-lake-sales/data/', 's3://data-lake-sales/archive/');
-- Example 2: Create pipe with auto-ingest
CREATE PIPE sales_auto_ingest
AUTO_INGEST = TRUE
INTEGRATION = 's3_sales_integration'
ERROR_INTEGRATION = 's3_error_integration'
COMMENT = 'Auto-ingest sales data from S3'
AS
COPY INTO sales_raw
FROM '@s3_sales_integration/data/'
FILE_FORMAT = (
TYPE = CSV
SKIP_HEADER = 1
NULL_IF = ('', 'NULL', 'null')
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
TRIM_SPACE = TRUE
ERROR_ON_COLUMN_COUNT_MISMATCH = TRUE
)
ON_ERROR = 'CONTINUE';
-- Example 3: Create pipe for JSON data
CREATE PIPE json_events_pipe
AUTO_INGEST = TRUE
INTEGRATION = 's3_events_integration'
AS
COPY INTO events_raw
FROM '@s3_events_integration/events/'
FILE_FORMAT = (
TYPE = JSON
STRIP_OUTER_ARRAY = TRUE
IGNORE_UTF8_ERRORS = TRUE
)
ON_ERROR = 'SKIP_FILE';
-- Example 4: Monitor pipe status
SELECT *
FROM TABLE(INFORMATION_SCHEMA.PIPE_STATUS('sales_auto_ingest'));
-- Check pending files
SELECT
pipe_name,
pending_file_count,
last_loaded_file,
last_loaded_time,
error_count
FROM TABLE(INFORMATION_SCHEMA.PIPE_STATUS('sales_auto_ingest'));
-- Example 5: Query copy history
SELECT
pipe_name,
file_name,
file_size,
row_count,
error_count,
status,
load_time
FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
TABLE_NAME => 'sales_raw',
START_TIME => DATEADD(day, -7, CURRENT_TIMESTAMP())
))
ORDER BY load_time DESC;
-- Example 6: Analyze load errors
SELECT
file_name,
error_code,
error_message,
row_number,
column_number,
loaded_time
FROM sales_raw$errors
ORDER BY loaded_time DESC
LIMIT 100;
-- Example 7: Create file format for semi-structured data
CREATE OR REPLACE FILE FORMAT json_format
TYPE = JSON
STRIP_OUTER_ARRAY = FALSE
STRIP_NULL_VALUES = FALSE
IGNORE_UTF8_ERRORS = FALSE
COMMENT = 'Standard JSON format for event data';
CREATE OR REPLACE FILE FORMAT parquet_format
TYPE = PARQUET
TRIM_SPACE = TRUE
IGNORE_UTF8_ERRORS = FALSE
COMMENT = 'Parquet format for analytics data';
-- Example 8: Advanced pipe configuration
CREATE PIPE analytics_ingest
AUTO_INGEST = TRUE
INTEGRATION = 'azure_analytics_integration'
PATTERN = '.*sales_[0-9]{8}\\.csv$'
ERROR_INTEGRATION = 'azure_error_queue'
COMMENT = 'Ingest sales data with pattern matching'
AS
COPY INTO analytics.sales_fact
FROM @azure_analytics_integration/raw_data/
FILE_FORMAT = (
TYPE = CSV
SKIP_HEADER = 1
NULL_IF = ('NA', 'N/A', '')
FIELD_DELIMITER = '|'
DATE_FORMAT = 'YYYY-MM-DD'
TIMESTAMP_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF3'
)
VALIDATION_MODE = RETURN_ERRORS
ON_ERROR = 'SKIP_FILE';
-- Example 9: Create alert for pipe lag
CREATE OR REPLACE ALERT pipe_lag_alert
WAREHOUSE = 'monitoring_wh'
SCHEDULE = 'USING CRON 0 */4 * * * America/New_York'
IF (
SELECT COUNT(*)
FROM TABLE(INFORMATION_SCHEMA.PIPE_STATUS('sales_auto_ingest'))
WHERE pending_file_count > 100
) > 0
THEN
CALL SYSTEM$SEND_EMAIL(
'alert_channel',
'data-team@company.com',
'Snowpipe Lag Alert',
'The sales_auto_ingest pipe has excessive pending files.'
);
Performance Metrics
| Metric | Value | Description |
|---|---|---|
| Typical Latency | 1-3 minutes | End-to-end from file upload to queryable |
| Max File Size | 5 GB | Maximum supported file size |
| Recommended Size | 100-250 MB | Optimal for parallel processing |
| Max Pipes per Account | 100 | Default limit (can be increased) |
| Max Integration per Pipe | 1 | One integration per pipe |
| Daily Quota | Unlimited | No limit on daily ingestion |
Best Practices
-
Optimize file sizes: Target 100-250 MB compressed files for optimal parallelism and processing efficiency.
-
Use Parquet for analytics: Prefer Parquet over CSV for analytics workloads due to built-in compression, schema information, and predicate pushdown.
-
Implement monitoring: Set up alerts for pipe lag, error rates, and ingestion latency to catch issues proactively.
-
Handle errors gracefully: Use ON_ERROR = 'CONTINUE' for non-critical data and 'SKIP_FILE' for malformed files. Review PIPE_ERR tables regularly.
-
Use pattern matching: Configure PATTERN parameters to filter specific file types and avoid accidental ingestion of unrelated files.
-
Implement idempotent loads: Leverage Snowpipe's automatic deduplication to safely handle retries and duplicate events.
-
Separate concerns: Create dedicated pipes for different data sources, file types, or target tables to simplify monitoring and error isolation.
-
Monitor pipe health: Regularly check PIPE_STATUS for pending file counts and error conditions. Address issues before they impact SLAs.
-
Test with small files: Validate file format configurations and error handling with sample files before production deployment.
-
Plan for scale: Design ingestion architecture to handle peak loads by using multiple pipes and optimizing cloud storage configurations.
See Also
- Semi-Structured Data β JSON/XML processing and VARIANT storage
- Snowflake Architecture β Micro-partition creation details
- Stored Procedures β Automated ingestion procedures
- Optimization Techniques β Materialized views for ingested data
- PySpark Iceberg Tables β Iceberg table ingestion patterns
- Data Warehouse Concepts β Data warehouse design principles