πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Real-Time Analytics: Streaming Data Infrastructure

Module 4: Advanced DE & CareerAdvanced Data Engineering🟒 Free Lesson

Advertisement

Real-Time Analytics: Processing Data as It Arrives

Real-time analytics processes data continuously as it arrives, enabling sub-second to minute-level insights.

Why Real-Time Analytics Matters


Business Use Cases:

  • Fraud detection
  • Dynamic pricing
  • Recommendation engines
  • Operational monitoring

Batch vs Streaming:

  • Batch: wait for nightly runs
  • Streaming: action on data within seconds of creation

Key Insight: Streaming architectures enable action on data within seconds of creation, transforming batch-oriented architectures into streaming-first systems.


Architecture Overview

Real-Time Analytics PipelineSourcesIoT SensorsApp EventsCDC StreamsAPI Calls100M events/dayIngestionApache KafkaSchema Registry3-day retentionPartitioned topics100K msgs/secProcessingFlink / SparkWindowingExactly-onceState management{'<'} 1s latencyStorageDelta Lake on S3Partitioned by dateACID transactionsTime travel500 TB totalServingDashboardsFeature StoreAlertsAPIReal-time + Batch

Stream Processing Fundamentals

Stream processing continuously processes data records as they arrive, applying transformations, aggregations, and computations in real-time. Unlike batch processing, it operates on unbounded data streams with low latency.

Streaming Latency Model

  • End-to-End Latency: L = L_ingest + L_process + L_serve
  • Ingestion Latency: L_ingest = Network_Delay + Broker_Queue_Time
  • Processing Latency: L_process = Computation_Time + Window_Wait_Time
  • Serving Latency: L_serve = Write_Time + Cache_Invalidation
  • Target Latency: L < 1 second (real-time), L < 60 seconds (near-real-time)
  • Throughput: T = Messages_Processed / Time_Unit
  • Backpressure: When L_process > L_ingest -> buffer grows -> backpressure needed
# Kafka Producer
from kafka import KafkaProducer
import json
from datetime import datetime
import uuid

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    acks='all',
    retries=3,
    batch_size=16384,
    linger_ms=10
)

def publish_event(topic: str, event: dict):
    """Publish event to Kafka topic."""
    event['event_id'] = str(uuid.uuid4())
    event['timestamp'] = datetime.now().isoformat()

    producer.send(topic, value=event)
    producer.flush()

# Publish clickstream events
publish_event('clickstream', {
    'user_id': 12345,
    'page': '/products/shoes',
    'action': 'click',
    'session_id': 'abc-123',
    'device': 'mobile',
    'country': 'US'
})

# Kafka Consumer with processing
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'clickstream',
    bootstrap_servers=['localhost:9092'],
    group_id='analytics-processor',
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    event = message.value
    # Process event
    print(f"Processing: {event['action']} on {event['page']}")

    # Commit offset after successful processing
    consumer.commit()

Windowing Strategies

Windowing TypesTumbling WindowFixed, non-overlappingW1W2W3W40-5m5-10m10-15m15-20mCount events per 5-min windowLatency: window sizeUse case: count per minutewindow(event_time, '5 min')Sliding WindowFixed, overlappingW1 (0-10m)W2 (1-11m)W3 (2-12m)overlap10-min window, slide 1-minLatency: slide intervalUse case: moving averageswindow(e, '10 min', '1 min')Session WindowActivity-based gapsevents within 30-min gapgapgapUser session (30-min timeout)Latency: session endUse case: user sessionswindow(e, '30 min')

Windowing divides an unbounded stream into finite segments for processing. Windows define which events belong to each computation unit.

Window TypeBehaviorUse CaseLatency
TumblingFixed, non-overlappingCount per minuteWindow size
SlidingFixed, overlappingMoving averagesWindow size
SessionActivity-based gapsUser sessionsSession end
GlobalAll data seen so farCumulative aggregatesNever (unbounded)
# Spark Structured Streaming with Windowing
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("RealTimeAnalytics") \
    .getOrCreate()

# Read stream from Kafka
raw_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "clickstream") \
    .option("startingOffsets", "earliest") \
    .load()

# Parse JSON events
events = raw_stream.select(
    col("key").cast("string").alias("key"),
    from_json(col("value").cast("string"), schema).alias("data"),
    col("timestamp").alias("kafka_timestamp")
).select("data.*", "kafka_timestamp")

# Tumbling Window: Count events per 5-minute window
tumbling_counts = events \
    .withWatermark("event_timestamp", "10 minutes") \
    .groupBy(
        window(col("event_timestamp"), "5 minutes"),
        col("page")
    ) \
    .agg(
        count("*").alias("event_count"),
        countDistinct("user_id").alias("unique_users")
    )

# Sliding Window: Average events over 10-minute window, sliding every 1 minute
sliding_avg = events \
    .withWatermark("event_timestamp", "10 minutes") \
    .groupBy(
        window(col("event_timestamp"), "10 minutes", "1 minute"),
        col("action")
    ) \
    .agg(
        avg("session_duration").alias("avg_session_duration"),
        count("*").alias("event_count")
    )

# Session Window: Group events by user session (30-min inactivity gap)
session_counts = events \
    .withWatermark("event_timestamp", "30 minutes") \
    .groupBy(
        window(col("event_timestamp"), "30 minutes"),  # session gap
        col("user_id")
    ) \
    .agg(
        count("*").alias("session_events"),
        min("event_timestamp").alias("session_start"),
        max("event_timestamp").alias("session_end")
    )

# Write stream to sink
query = tumbling_counts.writeStream \
    .outputMode("update") \
    .format("delta") \
    .option("checkpointLocation", "s3://checkpoints/tumbling/") \
    .start("s3://analytics/realtime/tumbling_counts/")

Flink Stream Processing

Apache Flink is a distributed stream processing framework designed for stateful computations over unbounded data streams. It provides exactly-once processing guarantees, event-time semantics, and fault tolerance via checkpointing.

# Flink Stream Processing Example (PyFlink)
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col, lit

# Create Flink table environment
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)

# Define source table (Kafka)
t_env.execute_sql("""
    CREATE TABLE clickstream (
        user_id BIGINT,
        page STRING,
        action STRING,
        event_timestamp TIMESTAMP(3),
        session_id STRING,
        WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'clickstream',
        'properties.bootstrap.servers' = 'localhost:9092',
        'format' = 'json',
        'scan.startup.mode' = 'earliest-offset'
    )
""")

# Define sink table (Delta Lake)
t_env.execute_sql("""
    CREATE TABLE page_metrics (
        window_start TIMESTAMP(3),
        window_end TIMESTAMP(3),
        page STRING,
        view_count BIGINT,
        unique_users BIGINT
    ) WITH (
        'connector' = 'delta-lake',
        'table-path' = 's3://analytics/realtime/page_metrics/'
    )
""")

# Tumbling window aggregation
t_env.execute_sql("""
    INSERT INTO page_metrics
    SELECT
        window_start,
        window_end,
        page,
        COUNT(*) AS view_count,
        COUNT(DISTINCT user_id) AS unique_users
    FROM TABLE(
        TUMBLE(TABLE clickstream, DESCRIPTOR(event_timestamp), INTERVAL '5' MINUTE)
    )
    GROUP BY window_start, window_end, page
""")

# Sliding window for moving averages
t_env.execute_sql("""
    SELECT
        window_start,
        window_end,
        action,
        COUNT(*) AS event_count,
        AVG(COUNT(*)) OVER (
            ORDER BY window_start
            ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
        ) AS moving_avg_3window
    FROM TABLE(
        HOP(TABLE clickstream, DESCRIPTOR(event_timestamp), INTERVAL '1' MINUTE, INTERVAL '10' MINUTE)
    )
    GROUP BY window_start, window_end, action
""")

Event Time vs. Processing Time

Event time is the timestamp when an event actually occurred at the source. Processing time is when the event is processed by the streaming system. Event-time processing ensures correct results even with out-of-order events.

Watermark Processing

  • Watermark: W(t) = max(event_time_seen) - max_lateness
  • Late Data: Event arrives after W(t) has passed its event time
  • Allowed Lateness: L = max_lateness threshold
  • Output Trigger: Event processed when event_time ≀ W(t)
  • Late Data Handling: Side output, drop, or update previous results
# Event-time processing with watermark
from pyspark.sql.functions import *

# Define watermark (allow 10 minutes of lateness)
events_with_watermark = events \
    .withWatermark("event_timestamp", "10 minutes") \
    .groupBy(
        window(col("event_timestamp"), "5 minutes"),
        col("page")
    ) \
    .agg(
        count("*").alias("event_count"),
        max("event_timestamp").alias("last_event_time")
    )

# Handle late data with side output
from pyspark.sql.streaming import StreamingQuery

# Define late data handling
late_data = events \
    .withWatermark("event_timestamp", "10 minutes") \
    .groupBy(
        window(col("event_timestamp"), "5 minutes"),
        col("page")
    ) \
    .agg(
        count("*").alias("event_count")
    )

# Write with late data handling
query = late_data.writeStream \
    .outputMode("update") \
    .foreachBatch(lambda batch_df, batch_id: process_batch(batch_df, batch_id)) \
    .option("checkpointLocation", "s3://checkpoints/event_time/") \
    .start()

Key Concepts Summary

ConceptDescriptionLatencyUse Case
Stream ProcessingContinuous data processingSub-secondReal-time analytics
WindowingFinite segments of streamsWindow-dependentAggregations
WatermarkTrack event-time progressConfigurableLate data handling
Exactly-OnceGuaranteed single processingN/AFinancial transactions
BackpressureFlow control for slow consumersN/APrevent overload
State ManagementMaintain computation stateN/ASession tracking
Event SourcingStore all state changesN/AAudit, replay
CQRSSeparate read/write modelsN/AHigh-throughput reads
Materialized ViewsPre-computed stream aggregatesSecondsDashboard acceleration
CDC StreamsCapture database changesSecondsData integration

Performance Metrics

MetricBatchMicro-BatchTrue Streaming
LatencyHours1-30 seconds< 1 second
ThroughputVery HighHighMedium-High
State ManagementN/ALimitedFull
Exactly-OnceYesYesYes (Flink)
ComplexityLowMediumHigh
CostLowMediumHigh
Use CaseReportingNear-real-timeReal-time

10 Best Practices

  1. Use event-time processing β€” processing-time gives incorrect results for out-of-order events
  2. Set watermarks appropriately β€” balance latency vs. completeness for late data
  3. Implement exactly-once semantics β€” use transactional sinks or idempotent writes
  4. Monitor consumer lag β€” alert when Kafka consumer lag exceeds threshold
  5. Use compacted topics for latest-state scenarios β€” avoid replaying full history
  6. Implement backpressure β€” prevent slow consumers from overwhelming the pipeline
  7. Checkpoint regularly β€” enable fault tolerance with periodic state snapshots
  8. Separate hot and cold paths β€” real-time for alerts, batch for comprehensive analytics
  9. Test with replay β€” ensure pipelines produce identical results when replaying events
  10. Use schema registry β€” enforce event schemas to prevent breaking changes

  • Stream processing enables sub-second analytics on unbounded data streams
  • Windowing divides streams into finite segments for aggregation
  • Event-time processing with watermarks handles out-of-order events correctly
  • Exactly-once processing guarantees are essential for financial and critical workloads
  • Real-time and batch analytics complement each other β€” use both strategically

See Also

⭐

Premium Content

Real-Time Analytics: Streaming Data Infrastructure

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement