πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Snowflake Real-Time Data Ingestion

🟒 Free Lesson

Advertisement

Snowflake Real-Time Data Ingestion

Snowflake provides multiple mechanisms for real-time data ingestion, enabling near-instantaneous data availability for analytics and operations.

Real-Time Ingestion ArchitectureProducersApps, IoT, LogsKafkaEvent StreamSnowpipeAuto-IngestStreamsCDC TrackingTasksProcessSnowpipe StreamingDirect APIChange TrackingMETADATA$ columnsReal-Time AnalyticsDashboardsLatency: Snowpipe Streaming < 1s | Snowpipe < 1 min | Batch > 5 min
Latency Tiers for Real-Time IngestionSub-Second (<1s)Snowpipe Streaming APIDirect DML insertsEvent-driven architectureUse: Real-time dashboardsNear Real-Time (1-60s)Snowpipe (auto-ingest)Micro-batch processingFile-based ingestionUse: Operational analyticsBatch (>5 min)COPY INTO commandsScheduled loadsBulk data transfersUse: Historical backfill

Snowpipe for Continuous Ingestion

Setting Up Snowpipe

-- Create file format for streaming
CREATE OR REPLACE FILE FORMAT json_format
  TYPE = 'JSON'
  STRIP_OUTER_ARRAY = TRUE;

-- Create internal stage
CREATE OR REPLACE STAGE streaming_stage
  FILE_FORMAT = json_format;

-- Create Snowpipe
CREATE OR REPLACE PIPE my_pipe
  AUTO_INGEST = TRUE
  COMMENT = 'Real-time data ingestion pipe'
AS
  COPY INTO raw_events
  FROM @streaming_stage
  FILE_FORMAT = json_format
  ON_ERROR = CONTINUE;

-- Get pipe ARN for cloud notification
SHOW PIPES;

-- Check pipe status
SELECT
  pipe_name,
  pipe_status,
  last_load_time,
  total_file_count
FROM TABLE(INFORMATION_SCHEMA.PIPE_USAGE_HISTORY(
  PIPE_NAME => 'MY_PIPE',
  START_TIME => DATEADD('hour', -24, CURRENT_TIMESTAMP())
));

Snowpipe Performance Tuning

-- Monitor pipe metrics
SELECT
  pipe_name,
  bytes_loaded,
  files_loaded,
  files_queued,
  load_start_time,
  load_end_time
FROM TABLE(INFORMATION_SCHEMA.COPY_USAGE_HISTORY(
  START_TIME => DATEADD('hour', -1, CURRENT_TIMESTAMP())
))
WHERE pipe_name = 'MY_PIPE';

-- Optimize pipe settings
ALTER PIPE my_pipe SET
  ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE;

Apache Kafka Integration

Kafka Connector Setup

-- Create Kafka topic connector
CREATE OR REPLACE STREAMING INTEGRATION kafka_connector
  TYPE = KAFKA
  DIRECTION = INBOUND
  ENABLED = TRUE
  KAFKA_BROKER = 'kafka-broker:9092'
  KAFKA_TOPIC = 'events_topic'
  KAFKA_CONSUMER_GROUP = 'snowflake_consumer'
  KAFKA_SECURITY_PROTOCOL = 'SSL'
  KAFKA_SSL_CERTIFICATE_FILE = '/path/to/cert.pem'
  KAFKA_SSL_KEY_FILE = '/path/to/key.pem';

-- Create target table
CREATE TABLE kafka_events (
  event_id STRING,
  event_type STRING,
  event_time TIMESTAMP_NTZ,
  payload VARIANT
);

-- Configure auto-ingest from Kafka
ALTER INTEGRATION kafka_connector SET
  KAFKA_AUTO_OFFSET_RESET = LATEST;

Kafka Data Processing

-- Create stream on Kafka table
CREATE STREAM kafka_event_stream
  ON TABLE kafka_events
  APPEND_ONLY = FALSE;

-- Process Kafka events in real-time
CREATE OR REPLACE TASK process_kafka_events
  WAREHOUSE = 'COMPUTE_WH'
  SCHEDULE = 'USING CRON * * * * * America/New_York'
AS
  BEGIN
    -- Process new events
    INSERT INTO processed_events (
      event_id,
      event_type,
      event_time,
      processed_at
    )
    SELECT
      event_id,
      event_type,
      event_time,
      CURRENT_TIMESTAMP()
    FROM kafka_event_stream
    WHERE METADATA$ACTION = 'INSERT';

    -- Update aggregations
    MERGE INTO hourly_metrics AS target
    USING (
      SELECT
        DATE_TRUNC('hour', event_time) as event_hour,
        event_type,
        COUNT(*) as event_count
      FROM kafka_event_stream
      WHERE METADATA$ACTION = 'INSERT'
      GROUP BY 1, 2
    ) AS source
    ON target.event_hour = source.event_hour
      AND target.event_type = source.event_type
    WHEN MATCHED THEN
      UPDATE SET event_count = target.event_count + source.event_count
    WHEN NOT MATCHED THEN
      INSERT (event_hour, event_type, event_count)
      VALUES (source.event_hour, source.event_type, source.event_count);
  END;

Streams for Real-Time Tracking

Creating Streams

-- Create stream on source table
CREATE STREAM order_changes
  ON TABLE orders
  APPEND_ONLY = FALSE
  SHOW_INITIAL_ROWS = FALSE;

-- Create stream with specific columns
CREATE STREAM customer_changes
  ON TABLE customers
  INSERT = TRUE
  UPDATE = TRUE
  DELETE = TRUE
  SHOW_INITIAL_ROWS = FALSE;

-- Query stream data
SELECT
  *,
  METADATA$ACTION as change_action,
  METADATA$ISUPDATE as is_update,
  METADATA$ROW_ID as row_id
FROM order_changes
WHERE METADATA$ACTION = 'INSERT';

Processing Stream Data

-- Process stream with task
CREATE OR REPLACE TASK sync_orders
  WAREHOUSE = 'COMPUTE_WH'
  SCHEDULE = 'USING CRON * * * * * America/New_York'
AS
  BEGIN
    -- Handle inserts and updates
    MERGE INTO orders_archive AS target
    USING (
      SELECT
        order_id,
        customer_id,
        order_date,
        amount,
        METADATA$ACTION as action
      FROM order_changes
    ) AS source
    ON target.order_id = source.order_id
    WHEN MATCHED AND source.action = 'UPDATE' THEN
      UPDATE SET
        customer_id = source.customer_id,
        order_date = source.order_date,
        amount = source.amount,
        updated_at = CURRENT_TIMESTAMP()
    WHEN NOT MATCHED AND source.action = 'INSERT' THEN
      INSERT (order_id, customer_id, order_date, amount, created_at)
      VALUES (source.order_id, source.customer_id, source.order_date, source.amount, CURRENT_TIMESTAMP());

    -- Handle deletes (soft delete)
    UPDATE orders_archive
    SET deleted_at = CURRENT_TIMESTAMP()
    WHERE order_id IN (
      SELECT order_id FROM order_changes
      WHERE METADATA$ACTION = 'DELETE'
    );
  END;

Streams track changes at the micro-partition level, making them highly efficient for real-time data capture. They maintain change history for a configurable retention period.

Real-Time Analytics

Real-Time Dashboards

-- Create real-time metrics view
CREATE OR REPLACE SECURE VIEW realtime_dashboard AS
SELECT
  DATE_TRUNC('minute', event_time) as event_minute,
  event_type,
  COUNT(*) as event_count,
  COUNT(DISTINCT user_id) as unique_users
FROM events
WHERE event_time >= DATEADD('hour', -1, CURRENT_TIMESTAMP())
GROUP BY 1, 2;

-- Create real-time aggregation task
CREATE OR REPLACE TASK realtime_aggregation
  WAREHOUSE = 'COMPUTE_WH'
  SCHEDULE = 'USING CRON * * * * * America/New_York'
AS
  INSERT INTO realtime_metrics (
    metric_time,
    metric_name,
    metric_value
  )
  SELECT
    CURRENT_TIMESTAMP() as metric_time,
    'events_per_minute' as metric_name,
    COUNT(*) as metric_value
  FROM events
  WHERE event_time >= DATEADD('minute', -1, CURRENT_TIMESTAMP());

Window-Based Aggregations

-- Create sliding window aggregation
CREATE OR REPLACE VIEW sliding_window_metrics AS
SELECT
  event_time,
  event_type,
  COUNT(*) OVER (
    PARTITION BY event_type
    ORDER BY event_time
    RANGE BETWEEN INTERVAL '5 minutes' PRECEDING AND CURRENT ROW
  ) as events_last_5min,
  AVG(amount) OVER (
    PARTITION BY event_type
    ORDER BY event_time
    RANGE BETWEEN INTERVAL '1 hour' PRECEDING AND CURRENT ROW
  ) as avg_amount_last_hour
FROM events;

-- Create tumbling window aggregation
SELECT
  DATE_TRUNC('hour', event_time) as window_start,
  DATEADD('hour', 1, DATE_TRUNC('hour', event_time)) as window_end,
  event_type,
  COUNT(*) as event_count,
  SUM(amount) as total_amount
FROM events
GROUP BY 1, 2, 3;

Performance Optimization

-- Monitor real-time pipeline performance
SELECT
  query_id,
  query_text,
  execution_time_ms,
  rows_produced,
  bytes_scanned
FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY(
  START_TIME => DATEADD('hour', -1, CURRENT_TIMESTAMP())
))
WHERE query_text LIKE '%STREAM%' OR query_text LIKE '%TASK%'
ORDER BY execution_time_ms DESC;

-- Optimize warehouse for streaming
ALTER WAREHOUSE streaming_wh SET
  MIN_CLUSTER_COUNT = 1,
  MAX_CLUSTER_COUNT = 4,
  SCALING_POLICY = 'ECONOMY',
  AUTO_SUSPEND = 60;

-- Monitor stream lag
SELECT
  stream_name,
  earliest_time,
  latest_time,
  TIMESTAMPDIFF('minute', earliest_time, latest_time) as lag_minutes
FROM TABLE(INFORMATION_SCHEMA.STREAM_USAGE_HISTORY(
  START_TIME => DATEADD('hour', -1, CURRENT_TIMESTAMP())
));

Real-Time Ingestion Best Practices

PatternUse CaseLatencyThroughput
SnowpipeFile-based ingestion< 1 minHigh
Kafka ConnectorEvent streaming< 5 secVery High
Streams + TasksCDC processing< 1 minMedium
External FunctionsAPI integration< 10 secLow

For sub-second latency requirements, consider using Snowflake's External Functions with event-driven architectures. For most real-time analytics use cases, Snowpipe and Streams provide sufficient performance.

Summary

Key Takeaways

Snowpipe provides continuous, automated file-based ingestion with sub-minute latency.

Kafka integration enables event-driven data streaming at very high throughput.

Streams track changes at the micro-partition level for efficient CDC processing.

Window functions enable real-time analytics without materializing intermediate results.


Real-Time Ingestion Comparison

PatternUse CaseLatencyThroughput
SnowpipeFile-based ingestion< 1 minHigh
Kafka ConnectorEvent streaming< 5 secVery High
Streams + TasksCDC processing< 1 minMedium
External FunctionsAPI integration< 10 secLow

Implementation Steps

  1. Set up Snowpipe for automated file ingestion
  2. Configure Kafka connector for event streaming
  3. Create Streams on source tables for CDC
  4. Build Tasks to process stream data
  5. Implement window functions for real-time aggregations
  6. Monitor stream lag and pipeline performance
⭐

Premium Content

Snowflake Real-Time Data Ingestion

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Snowflake Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement