Streaming Data Pipelines
Real-time data processing enables live features, instant recommendations, and immediate fraud detection. This lesson covers the core technologies and patterns for building streaming pipelines.
Streaming Architecture Overview
Apache Kafka Fundamentals
Kafka is the backbone of most streaming architectures. Master producers, consumers, and partitioning.
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import AdminClient, NewTopic
import json
# Producer configuration
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8'),
acks='all',
retries=3,
batch_size=16384,
linger_ms=10
)
# Send events with partitioning
for event in events:
producer.send(
topic='user-events',
key=event['user_id'],
value=event
)
producer.flush()
# Consumer with group management
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
group_id='ml-pipeline',
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
event = message.value
process_event(event)
consumer.commit()
Kafka Streams for Real-Time Processing
Kafka Streams provides exactly-once semantics and windowed aggregations without external dependencies.
from kafka import KafkaConsumer, KafkaProducer
from collections import defaultdict
from datetime import datetime, timedelta
import json
class WindowedAggregator:
def __init__(self, window_size_seconds=60, slide_seconds=10):
self.window_size = timedelta(seconds=window_size_seconds)
self.slide = timedelta(seconds=slide_seconds)
self.windows = defaultdict(list)
def add_event(self, key, timestamp, value):
window_start = timestamp - (timestamp % self.slide)
self.windows[(key, window_start)].append(value)
def get_window_results(self, current_time):
results = {}
for (key, window_start), values in self.windows.items():
if window_start <= current_time - self.window_size:
results[key] = {
'count': len(values),
'sum': sum(values),
'mean': sum(values) / len(values),
'window_start': window_start.isoformat()
}
del self.windows[(key, window_start)]
return results
# Usage
aggregator = WindowedAggregator(window_size_seconds=300)
consumer = KafkaConsumer('raw-metrics')
for msg in consumer:
event = msg.value
aggregator.add_event(
key=event['metric_name'],
timestamp=event['timestamp'],
value=event['value']
)
results = aggregator.get_window_results(datetime.now())
if results:
publish_aggregates(results)
Apache Flink for Complex Event Processing
Flink excels at stateful processing, exactly-once guarantees, and complex event patterns.
# Flink SQL for streaming analytics
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
tenv = StreamTableEnvironment.create(env)
# Create a source table from Kafka
tenv.execute_sql("""
CREATE TABLE user_events (
user_id STRING,
event_type STRING,
product_id STRING,
amount DECIMAL(10,2),
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
)
""")
# Windowed aggregation with session windows
tenv.execute_sql("""
SELECT
user_id,
TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
COUNT(*) AS event_count,
SUM(amount) AS total_amount
FROM user_events
WHERE event_type = 'purchase'
GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' MINUTE)
""")
# Session window for activity tracking
tenv.execute_sql("""
SELECT
user_id,
SESSION_START(event_time, INTERVAL '30' MINUTE) AS session_start,
SESSION_END(event_time, INTERVAL '30' MINUTE) AS session_end,
COUNT(*) AS actions_in_session
FROM user_events
GROUP BY user_id, SESSION(event_time, INTERVAL '30' MINUTE)
""")
Real-Time Feature Computation
import json
from kafka import KafkaConsumer
from redis import Redis
import numpy as np
from collections import deque
class RealTimeFeatureEngine:
def __init__(self):
self.redis = Redis(host='localhost', port=6379, db=0)
self.consumer = KafkaConsumer(
'user-actions',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
def compute_features(self, user_id, event):
pipe = self.redis.pipeline()
# Count features (last hour)
pipe.incr(f"count:{user_id}:1h:{event['action']}")
pipe.expire(f"count:{user_id}:1h:{event['action']}", 3600)
# Sliding window average
key = f"window:{user_id}:amounts"
pipe.lpush(key, event.get('amount', 0))
pipe.ltrim(key, 0, 99) # Keep last 100
pipe.ttl(key)
# Engagement score
action_weights = {'view': 1, 'click': 2, 'add_to_cart': 3, 'purchase': 5}
weight = action_weights.get(event['action'], 0)
pipe.incrbyfloat(f"engagement:{user_id}", weight * 0.1)
pipe.expire(f"engagement:{user_id}", 86400)
results = pipe.execute()
return {
'user_id': user_id,
'action_count': results[0],
'recent_amounts': self._get_amounts(user_id),
'engagement_score': float(results[-1] or 0),
'timestamp': event['timestamp']
}
def _get_amounts(self, user_id):
raw = self.redis.lrange(f"window:{user_id}:amounts", 0, -1)
return [float(x) for x in raw] if raw else [0.0]
Event-Driven Architecture Patterns
from enum import Enum
from dataclasses import dataclass, field
from typing import List, Callable
import asyncio
class EventType(Enum):
USER_CREATED = "user.created"
ORDER_PLACED = "order.placed"
PAYMENT_RECEIVED = "payment.received"
INVENTORY_UPDATED = "inventory.updated"
@dataclass
class Event:
event_type: EventType
payload: dict
correlation_id: str = ""
metadata: dict = field(default_factory=dict)
class EventBus:
def __init__(self):
self.handlers: dict[EventType, List[Callable]] = {}
def subscribe(self, event_type: EventType, handler: Callable):
if event_type not in self.handlers:
self.handlers[event_type] = []
self.handlers[event_type].append(handler)
async def publish(self, event: Event):
handlers = self.handlers.get(event.event_type, [])
tasks = [handler(event) for handler in handlers]
await asyncio.gather(*tasks)
# Domain event handlers
async def on_order_placed(event: Event):
order = event.payload
await send_confirmation_email(order['user_id'])
await update_inventory(order['items'])
await trigger_revenue_calculation(order)
async def on_payment_received(event: Event):
payment = event.payload
await update_order_status(payment['order_id'], 'paid')
await credit_loyalty_points(payment['user_id'], payment['amount'])
# Wire up the event bus
bus = EventBus()
bus.subscribe(EventType.ORDER_PLACED, on_order_placed)
bus.subscribe(EventType.PAYMENT_RECEIVED, on_payment_received)
Key Takeaways
- Kafka is the standard for event ingestion; use partitioning for scalability
- Flink excels at stateful processing with exactly-once semantics
- Real-time features require careful windowing and state management
- Event-driven architectures decouple producers from consumers for resilience