πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Snowflake Streaming Data Ingestion

🟒 Free Lesson

Advertisement

Snowflake Streaming Data Ingestion

Snowflake supports real-time data ingestion through Snowpipe Streaming, enabling low-latency data delivery for time-sensitive analytics and operational workloads.

Snowpipe Streaming vs SnowpipeData SourcesIoT, Apps, LogsKafka/KinesisMessage QueueStreamingSub-secondSnowpipeMicro-batchTargetTablesSnowpipe StreamingNo file staging, direct APISnowpipe (Classic)File-based, auto-ingestLatency ComparisonStreaming <1s vs Snowpipe 1-60s

What is Snowpipe Streaming?

  • Enables sub-second latency ingestion
  • No file staging required
  • Supports exactly-once delivery semantics

Architecture Overview

The streaming ingestion architecture flows through four layers:

  • Data Sources β€” IoT Sensors, App Events, Logs, Transactions
  • Message Queue β€” Kafka, Kinesis, Event Hub, Pub/Sub
  • Connector Layer β€” Snowpipe Streaming, Kafka Connector, REST API, SDK
  • Snowflake β€” Staging Table, Snowpipe, Target Table, Streams

Latency Targets

TierLatency
Real-time<1s
Near-real-time1–5s
Micro-batch5–60s

Key Benefits

  • Low latency ingestion without file staging
  • Real-time analytics with exactly-once delivery
  • Schema evolution and auto-scaling

Snowpipe Streaming

REST API Setup

-- Create table for streaming
CREATE TABLE streaming_events (
  event_id VARCHAR(36),
  event_type VARCHAR(50),
  user_id INT,
  event_timestamp TIMESTAMP_NTZ,
  payload VARIANT,
  ingestion_time TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

-- Create pipe for auto-ingestion
CREATE OR REPLACE PIPE streaming_pipe
  AUTO_INGEST = TRUE
  AS
  COPY INTO streaming_events
  FROM @streaming_stage
  FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = TRUE)
  ON_ERROR = 'CONTINUE';

Kafka Connector Configuration

{
  "name": "snowflake-connector",
  "config": {
    "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "snowflake.ingestion.method": "SNOWPIPE_STREAMING",
    "snowflake.database": "my_database",
    "snowflake.schema": "streaming_schema",
    "snowflake.table": "streaming_events",
    "snowflake.role": "streaming_role",
    "snowflake.auth.type": "SNOWFLAKE_JWT",
    "snowflake.private.key": "${SNOWFLAKE_PRIVATE_KEY}",
    "topics": "events-topic",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter"
  }
}

Performance Tuning

DfStreaming Throughput

DfEnd-to-End Latency

Channel Management

-- Check streaming channels
SELECT *
FROM TABLE(INFORMATION_SCHEMA.STREAMING_INGESTION_CHANNELS(
  DATABASE_NAME => 'my_database',
  SCHEMA_NAME => 'streaming_schema'
));

-- Monitor channel health
SELECT
  channel_name,
  channel_state,
  earliest_commit_ts,
  latest_commit_ts,
  rows_inserted,
  bytes_inserted
FROM TABLE(INFORMATION_SCHEMA.STREAMING_INGESTION_CHANNELS(
  DATABASE_NAME => 'my_database',
  SCHEMA_NAME => 'streaming_schema'
));

Batch vs Streaming Comparison

FeatureBatch (Snowpipe)StreamingHybrid
Latency1-5 minutes<1 second1-60 seconds
ThroughputVery HighHighMedium-High
CostLowerHigherMedium
ComplexityLowHighMedium
Use CaseHistorical loadsReal-timeOperational

For streaming workloads, monitor channel health regularly. Unhealthy channels can cause data delays. Set up alerts for channel lag exceeding your SLA thresholds.

Exactly-Once Delivery

-- Idempotent upsert pattern
MERGE INTO target_table t
USING staging_table s
ON t.event_id = s.event_id
WHEN MATCHED AND s.ingestion_time > t.ingestion_time THEN
  UPDATE SET
    t.payload = s.payload,
    t.ingestion_time = s.ingestion_time
WHEN NOT MATCHED THEN
  INSERT (event_id, event_type, user_id, event_timestamp, payload, ingestion_time)
  VALUES (s.event_id, s.event_type, s.user_id, s.event_timestamp, s.payload, s.ingestion_time);

Monitoring Dashboard

-- Real-time ingestion metrics
SELECT
  DATE_TRUNC('minute', ingestion_time) as minute,
  COUNT(*) as rows_ingested,
  COUNT(DISTINCT event_type) as event_types,
  AVG(processing_time_ms) as avg_latency_ms
FROM streaming_events
WHERE ingestion_time >= DATEADD(hour, -1, CURRENT_TIMESTAMP())
GROUP BY 1
ORDER BY 1 DESC;
  • Snowpipe Streaming enables sub-second latency ingestion
  • Kafka connector provides production-grade streaming integration
  • Channel monitoring is critical for streaming health
  • Exactly-once semantics require idempotent patterns
  • Use hybrid approaches for balanced latency and throughput
⭐

Premium Content

Snowflake Streaming Data Ingestion

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Snowflake Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement