Snowflake Streaming Data Ingestion
Snowflake supports real-time data ingestion through Snowpipe Streaming, enabling low-latency data delivery for time-sensitive analytics and operational workloads.
What is Snowpipe Streaming?
- Enables sub-second latency ingestion
- No file staging required
- Supports exactly-once delivery semantics
Architecture Overview
The streaming ingestion architecture flows through four layers:
- Data Sources β IoT Sensors, App Events, Logs, Transactions
- Message Queue β Kafka, Kinesis, Event Hub, Pub/Sub
- Connector Layer β Snowpipe Streaming, Kafka Connector, REST API, SDK
- Snowflake β Staging Table, Snowpipe, Target Table, Streams
Latency Targets
| Tier | Latency |
|---|---|
| Real-time | <1s |
| Near-real-time | 1β5s |
| Micro-batch | 5β60s |
Key Benefits
- Low latency ingestion without file staging
- Real-time analytics with exactly-once delivery
- Schema evolution and auto-scaling
Snowpipe Streaming
REST API Setup
-- Create table for streaming
CREATE TABLE streaming_events (
event_id VARCHAR(36),
event_type VARCHAR(50),
user_id INT,
event_timestamp TIMESTAMP_NTZ,
payload VARIANT,
ingestion_time TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Create pipe for auto-ingestion
CREATE OR REPLACE PIPE streaming_pipe
AUTO_INGEST = TRUE
AS
COPY INTO streaming_events
FROM @streaming_stage
FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = TRUE)
ON_ERROR = 'CONTINUE';
Kafka Connector Configuration
{
"name": "snowflake-connector",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"snowflake.database": "my_database",
"snowflake.schema": "streaming_schema",
"snowflake.table": "streaming_events",
"snowflake.role": "streaming_role",
"snowflake.auth.type": "SNOWFLAKE_JWT",
"snowflake.private.key": "${SNOWFLAKE_PRIVATE_KEY}",
"topics": "events-topic",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
Performance Tuning
DfStreaming Throughput
DfEnd-to-End Latency
Channel Management
-- Check streaming channels
SELECT *
FROM TABLE(INFORMATION_SCHEMA.STREAMING_INGESTION_CHANNELS(
DATABASE_NAME => 'my_database',
SCHEMA_NAME => 'streaming_schema'
));
-- Monitor channel health
SELECT
channel_name,
channel_state,
earliest_commit_ts,
latest_commit_ts,
rows_inserted,
bytes_inserted
FROM TABLE(INFORMATION_SCHEMA.STREAMING_INGESTION_CHANNELS(
DATABASE_NAME => 'my_database',
SCHEMA_NAME => 'streaming_schema'
));
Batch vs Streaming Comparison
| Feature | Batch (Snowpipe) | Streaming | Hybrid |
|---|---|---|---|
| Latency | 1-5 minutes | <1 second | 1-60 seconds |
| Throughput | Very High | High | Medium-High |
| Cost | Lower | Higher | Medium |
| Complexity | Low | High | Medium |
| Use Case | Historical loads | Real-time | Operational |
For streaming workloads, monitor channel health regularly. Unhealthy channels can cause data delays. Set up alerts for channel lag exceeding your SLA thresholds.
Exactly-Once Delivery
-- Idempotent upsert pattern
MERGE INTO target_table t
USING staging_table s
ON t.event_id = s.event_id
WHEN MATCHED AND s.ingestion_time > t.ingestion_time THEN
UPDATE SET
t.payload = s.payload,
t.ingestion_time = s.ingestion_time
WHEN NOT MATCHED THEN
INSERT (event_id, event_type, user_id, event_timestamp, payload, ingestion_time)
VALUES (s.event_id, s.event_type, s.user_id, s.event_timestamp, s.payload, s.ingestion_time);
Monitoring Dashboard
-- Real-time ingestion metrics
SELECT
DATE_TRUNC('minute', ingestion_time) as minute,
COUNT(*) as rows_ingested,
COUNT(DISTINCT event_type) as event_types,
AVG(processing_time_ms) as avg_latency_ms
FROM streaming_events
WHERE ingestion_time >= DATEADD(hour, -1, CURRENT_TIMESTAMP())
GROUP BY 1
ORDER BY 1 DESC;
- Snowpipe Streaming enables sub-second latency ingestion
- Kafka connector provides production-grade streaming integration
- Channel monitoring is critical for streaming health
- Exactly-once semantics require idempotent patterns
- Use hybrid approaches for balanced latency and throughput