Real-Time Analytics: Processing Data as It Arrives
Real-time analytics processes data continuously as it arrives, enabling sub-second to minute-level insights.
Why Real-Time Analytics Matters
Business Use Cases:
- Fraud detection
- Dynamic pricing
- Recommendation engines
- Operational monitoring
Batch vs Streaming:
- Batch: wait for nightly runs
- Streaming: action on data within seconds of creation
Key Insight: Streaming architectures enable action on data within seconds of creation, transforming batch-oriented architectures into streaming-first systems.
Architecture Overview
Stream Processing Fundamentals
Stream processing continuously processes data records as they arrive, applying transformations, aggregations, and computations in real-time. Unlike batch processing, it operates on unbounded data streams with low latency.
Streaming Latency Model
- End-to-End Latency: L = L_ingest + L_process + L_serve
- Ingestion Latency: L_ingest = Network_Delay + Broker_Queue_Time
- Processing Latency: L_process = Computation_Time + Window_Wait_Time
- Serving Latency: L_serve = Write_Time + Cache_Invalidation
- Target Latency: L < 1 second (real-time), L < 60 seconds (near-real-time)
- Throughput: T = Messages_Processed / Time_Unit
- Backpressure: When L_process > L_ingest -> buffer grows -> backpressure needed
# Kafka Producer
from kafka import KafkaProducer
import json
from datetime import datetime
import uuid
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=3,
batch_size=16384,
linger_ms=10
)
def publish_event(topic: str, event: dict):
"""Publish event to Kafka topic."""
event['event_id'] = str(uuid.uuid4())
event['timestamp'] = datetime.now().isoformat()
producer.send(topic, value=event)
producer.flush()
# Publish clickstream events
publish_event('clickstream', {
'user_id': 12345,
'page': '/products/shoes',
'action': 'click',
'session_id': 'abc-123',
'device': 'mobile',
'country': 'US'
})
# Kafka Consumer with processing
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'clickstream',
bootstrap_servers=['localhost:9092'],
group_id='analytics-processor',
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
event = message.value
# Process event
print(f"Processing: {event['action']} on {event['page']}")
# Commit offset after successful processing
consumer.commit()
Windowing Strategies
Windowing divides an unbounded stream into finite segments for processing. Windows define which events belong to each computation unit.
| Window Type | Behavior | Use Case | Latency |
|---|---|---|---|
| Tumbling | Fixed, non-overlapping | Count per minute | Window size |
| Sliding | Fixed, overlapping | Moving averages | Window size |
| Session | Activity-based gaps | User sessions | Session end |
| Global | All data seen so far | Cumulative aggregates | Never (unbounded) |
# Spark Structured Streaming with Windowing
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("RealTimeAnalytics") \
.getOrCreate()
# Read stream from Kafka
raw_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "clickstream") \
.option("startingOffsets", "earliest") \
.load()
# Parse JSON events
events = raw_stream.select(
col("key").cast("string").alias("key"),
from_json(col("value").cast("string"), schema).alias("data"),
col("timestamp").alias("kafka_timestamp")
).select("data.*", "kafka_timestamp")
# Tumbling Window: Count events per 5-minute window
tumbling_counts = events \
.withWatermark("event_timestamp", "10 minutes") \
.groupBy(
window(col("event_timestamp"), "5 minutes"),
col("page")
) \
.agg(
count("*").alias("event_count"),
countDistinct("user_id").alias("unique_users")
)
# Sliding Window: Average events over 10-minute window, sliding every 1 minute
sliding_avg = events \
.withWatermark("event_timestamp", "10 minutes") \
.groupBy(
window(col("event_timestamp"), "10 minutes", "1 minute"),
col("action")
) \
.agg(
avg("session_duration").alias("avg_session_duration"),
count("*").alias("event_count")
)
# Session Window: Group events by user session (30-min inactivity gap)
session_counts = events \
.withWatermark("event_timestamp", "30 minutes") \
.groupBy(
window(col("event_timestamp"), "30 minutes"), # session gap
col("user_id")
) \
.agg(
count("*").alias("session_events"),
min("event_timestamp").alias("session_start"),
max("event_timestamp").alias("session_end")
)
# Write stream to sink
query = tumbling_counts.writeStream \
.outputMode("update") \
.format("delta") \
.option("checkpointLocation", "s3://checkpoints/tumbling/") \
.start("s3://analytics/realtime/tumbling_counts/")
Flink Stream Processing
Apache Flink is a distributed stream processing framework designed for stateful computations over unbounded data streams. It provides exactly-once processing guarantees, event-time semantics, and fault tolerance via checkpointing.
# Flink Stream Processing Example (PyFlink)
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col, lit
# Create Flink table environment
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
# Define source table (Kafka)
t_env.execute_sql("""
CREATE TABLE clickstream (
user_id BIGINT,
page STRING,
action STRING,
event_timestamp TIMESTAMP(3),
session_id STRING,
WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'clickstream',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
)
""")
# Define sink table (Delta Lake)
t_env.execute_sql("""
CREATE TABLE page_metrics (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
page STRING,
view_count BIGINT,
unique_users BIGINT
) WITH (
'connector' = 'delta-lake',
'table-path' = 's3://analytics/realtime/page_metrics/'
)
""")
# Tumbling window aggregation
t_env.execute_sql("""
INSERT INTO page_metrics
SELECT
window_start,
window_end,
page,
COUNT(*) AS view_count,
COUNT(DISTINCT user_id) AS unique_users
FROM TABLE(
TUMBLE(TABLE clickstream, DESCRIPTOR(event_timestamp), INTERVAL '5' MINUTE)
)
GROUP BY window_start, window_end, page
""")
# Sliding window for moving averages
t_env.execute_sql("""
SELECT
window_start,
window_end,
action,
COUNT(*) AS event_count,
AVG(COUNT(*)) OVER (
ORDER BY window_start
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) AS moving_avg_3window
FROM TABLE(
HOP(TABLE clickstream, DESCRIPTOR(event_timestamp), INTERVAL '1' MINUTE, INTERVAL '10' MINUTE)
)
GROUP BY window_start, window_end, action
""")
Event Time vs. Processing Time
Event time is the timestamp when an event actually occurred at the source. Processing time is when the event is processed by the streaming system. Event-time processing ensures correct results even with out-of-order events.
Watermark Processing
- Watermark: W(t) = max(event_time_seen) - max_lateness
- Late Data: Event arrives after W(t) has passed its event time
- Allowed Lateness: L = max_lateness threshold
- Output Trigger: Event processed when event_time β€ W(t)
- Late Data Handling: Side output, drop, or update previous results
# Event-time processing with watermark
from pyspark.sql.functions import *
# Define watermark (allow 10 minutes of lateness)
events_with_watermark = events \
.withWatermark("event_timestamp", "10 minutes") \
.groupBy(
window(col("event_timestamp"), "5 minutes"),
col("page")
) \
.agg(
count("*").alias("event_count"),
max("event_timestamp").alias("last_event_time")
)
# Handle late data with side output
from pyspark.sql.streaming import StreamingQuery
# Define late data handling
late_data = events \
.withWatermark("event_timestamp", "10 minutes") \
.groupBy(
window(col("event_timestamp"), "5 minutes"),
col("page")
) \
.agg(
count("*").alias("event_count")
)
# Write with late data handling
query = late_data.writeStream \
.outputMode("update") \
.foreachBatch(lambda batch_df, batch_id: process_batch(batch_df, batch_id)) \
.option("checkpointLocation", "s3://checkpoints/event_time/") \
.start()
Key Concepts Summary
| Concept | Description | Latency | Use Case |
|---|---|---|---|
| Stream Processing | Continuous data processing | Sub-second | Real-time analytics |
| Windowing | Finite segments of streams | Window-dependent | Aggregations |
| Watermark | Track event-time progress | Configurable | Late data handling |
| Exactly-Once | Guaranteed single processing | N/A | Financial transactions |
| Backpressure | Flow control for slow consumers | N/A | Prevent overload |
| State Management | Maintain computation state | N/A | Session tracking |
| Event Sourcing | Store all state changes | N/A | Audit, replay |
| CQRS | Separate read/write models | N/A | High-throughput reads |
| Materialized Views | Pre-computed stream aggregates | Seconds | Dashboard acceleration |
| CDC Streams | Capture database changes | Seconds | Data integration |
Performance Metrics
| Metric | Batch | Micro-Batch | True Streaming |
|---|---|---|---|
| Latency | Hours | 1-30 seconds | < 1 second |
| Throughput | Very High | High | Medium-High |
| State Management | N/A | Limited | Full |
| Exactly-Once | Yes | Yes | Yes (Flink) |
| Complexity | Low | Medium | High |
| Cost | Low | Medium | High |
| Use Case | Reporting | Near-real-time | Real-time |
10 Best Practices
- Use event-time processing β processing-time gives incorrect results for out-of-order events
- Set watermarks appropriately β balance latency vs. completeness for late data
- Implement exactly-once semantics β use transactional sinks or idempotent writes
- Monitor consumer lag β alert when Kafka consumer lag exceeds threshold
- Use compacted topics for latest-state scenarios β avoid replaying full history
- Implement backpressure β prevent slow consumers from overwhelming the pipeline
- Checkpoint regularly β enable fault tolerance with periodic state snapshots
- Separate hot and cold paths β real-time for alerts, batch for comprehensive analytics
- Test with replay β ensure pipelines produce identical results when replaying events
- Use schema registry β enforce event schemas to prevent breaking changes
- Stream processing enables sub-second analytics on unbounded data streams
- Windowing divides streams into finite segments for aggregation
- Event-time processing with watermarks handles out-of-order events correctly
- Exactly-once processing guarantees are essential for financial and critical workloads
- Real-time and batch analytics complement each other β use both strategically
See Also
- MLOps for Data Engineering β Real-time feature serving for ML models
- Snowflake Advanced β Snowpipe and Streams for near-real-time ingestion
- Delta Lake & Iceberg β Streaming into lakehouse tables
- Data Lakehouse β Spark Structured Streaming
- Performance Optimization β Stream processing performance tuning
- Portfolio Projects β Real-time streaming project specification