πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Building Real-Time Pipelines: Kafka + Flink + Redis

Data EngineeringStream Processing⭐ Premium

Advertisement

Uber & Netflix Interview

Building Real-Time Pipelines: Kafka + Flink + Redis

Designing end-to-end real-time data systems

Interview Question

"Design a real-time recommendation system for an e-commerce platform that: (1) processes 500K events/second, (2) updates user profiles in real-time, (3) serves recommendations with <100ms latency, (4) handles exactly-once processing, (5) scales dynamically. Include architecture, technology choices, and code examples."

Difficulty: Hard | Frequently asked at Uber, Netflix, Amazon, Meta


Theoretical Foundation

Real-Time Pipeline Architecture

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Real-Time Pipeline Architecture                β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                             β”‚
β”‚  Sources ──▢ Ingestion ──▢ Processing ──▢ Storage ──▢ Servingβ”‚
β”‚                                                             β”‚
β”‚  Web App        Kafka          Flink         Redis     API  β”‚
β”‚  Mobile App     Kinesis        Spark         DynamoDB  gRPC β”‚
β”‚  IoT Devices    Pulsar         Flink         PostgreSQL RESTβ”‚
β”‚                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚  Latency Requirements:                              β”‚   β”‚
β”‚  β”‚  - Ingestion: < 10ms                                β”‚   β”‚
β”‚  β”‚  - Processing: < 100ms                              β”‚   β”‚
β”‚  β”‚  - Storage: < 50ms                                  β”‚   β”‚
β”‚  β”‚  - Serving: < 100ms                                 β”‚   β”‚
β”‚  β”‚  - Total end-to-end: < 500ms                        β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Technology Stack

LayerTechnologyUse Case
IngestionKafka, Kinesis, PulsarEvent streaming
ProcessingFlink, Spark Streaming, Kafka StreamsReal-time transformations
StorageRedis, DynamoDB, CassandraLow-latency reads
ServinggRPC, REST API, GraphQLReal-time predictions

Kafka for Ingestion

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Kafka Architecture                       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                             β”‚
β”‚  Producers ──▢ Kafka Cluster ──▢ Consumers                 β”‚
β”‚                                                             β”‚
β”‚  Topics:                                                    β”‚
β”‚  - user_events (100 partitions)                            β”‚
β”‚  - product_views (50 partitions)                           β”‚
β”‚  - purchases (50 partitions)                               β”‚
β”‚  - recommendations (25 partitions)                         β”‚
β”‚                                                             β”‚
β”‚  Configuration for 500K events/sec:                        β”‚
β”‚  - 100 partitions per topic                                β”‚
β”‚  - Replication factor: 3                                   β”‚
β”‚  - Batch size: 64KB                                        β”‚
β”‚  - Compression: snappy                                     β”‚
β”‚  - ACKs: all                                               β”‚
β”‚                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Flink for Processing

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Flink Architecture                       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                             β”‚
β”‚  Source ──▢ Transformations ──▢ Sink                       β”‚
β”‚                                                             β”‚
β”‚  Transformations:                                           β”‚
β”‚  - Filter: Remove irrelevant events                        β”‚
β”‚  - Map: Transform event format                             β”‚
β”‚  - KeyBy: Partition by user_id                             β”‚
β”‚  - Window: Aggregate over time windows                     β”‚
β”‚  - Join: Combine multiple streams                          β”‚
β”‚                                                             β”‚
β”‚  State Management:                                          β”‚
β”‚  - RocksDB state backend                                   β”‚
β”‚  - Checkpointing for fault tolerance                       β”‚
β”‚  - Watermarks for event time processing                    β”‚
β”‚                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Redis for Serving

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Redis Architecture                       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                             β”‚
β”‚  Data Structures:                                           β”‚
β”‚  - Hashes: User profiles                                   β”‚
β”‚  - Sorted Sets: Recommendation scores                     β”‚
β”‚  - Lists: Recent views                                     β”‚
β”‚  - Sets: Product categories                                β”‚
β”‚                                                             β”‚
β”‚  Configuration:                                             β”‚
β”‚  - Cluster mode for scalability                            β”‚
β”‚  - Persistence: AOF for durability                         β”‚
β”‚  - Eviction: LRU for memory management                     β”‚
β”‚                                                             β”‚
β”‚  Latency: < 1ms for reads                                  β”‚
β”‚  Throughput: > 1M ops/sec                                  β”‚
β”‚                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Exactly-Once Processing

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Exactly-Once Pipeline                          β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                             β”‚
β”‚  Kafka ──▢ Flink ──▢ Redis                                 β”‚
β”‚                                                             β”‚
β”‚  1. Kafka: Idempotent producer + transactions              β”‚
β”‚  2. Flink: Checkpointing + two-phase commit                β”‚
β”‚  3. Redis: Idempotent writes (SET is idempotent)           β”‚
β”‚                                                             β”‚
β”‚  Flow:                                                      β”‚
β”‚  - Flink reads from Kafka                                  β”‚
β”‚  - Processes events                                        β”‚
β”‚  - Writes to Redis + commits Kafka offset in checkpoint    β”‚
β”‚  - On failure: Replay from last checkpoint                 β”‚
β”‚                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Code Implementation

Kafka Producer

from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
import json
import time
from datetime import datetime

# ============================================================
# KAFKA PRODUCER
# ============================================================

# Create topic
admin_client = KafkaAdminClient(bootstrap_servers='kafka:9092')

topics = [
    NewTopic(
        name='user_events',
        num_partitions=100,
        replication_factor=3,
        config={
            'retention.ms': str(7 * 24 * 60 * 60 * 1000),  # 7 days
            'compression.type': 'snappy',
            'cleanup.policy': 'delete'
        }
    )
]

admin_client.create_topics(new_topics=topics)

# Producer with idempotency
producer = KafkaProducer(
    bootstrap_servers='kafka:9092',
    key_serializer=lambda k: k.encode('utf-8'),
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    acks='all',
    enable_idempotence=True,
    retries=2147483647,
    max_in_flight_requests_per_connection=5,
    linger_ms=10,
    batch_size=65536,
    compression_type='snappy'
)

def send_user_event(user_id, event_type, metadata):
    """Send user event to Kafka"""
    
    event = {
        'user_id': user_id,
        'event_type': event_type,
        'metadata': metadata,
        'timestamp': datetime.now().isoformat(),
        'event_id': f"{user_id}_{int(time.time() * 1000)}"
    }
    
    # Partition by user_id for ordering guarantee
    producer.send('user_events', key=user_id, value=event)
    
    return event['event_id']

# Example usage
for i in range(1000000):
    user_id = f"user_{i % 10000}"
    event_type = 'product_view'
    metadata = {'product_id': f'product_{i % 1000}', 'category': 'electronics'}
    
    send_user_event(user_id, event_type, metadata)
    
    if i % 10000 == 0:
        producer.flush()
        print(f"Sent {i} events")

producer.flush()

Flink Processing

// Flink processing pipeline (Java)
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.util.Collector;

public class UserEventProcessor {
    public static void main(String[] args) throws Exception {
        // Setup Flink environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Enable checkpointing for exactly-once
        env.enableCheckpointing(60000);  // 1 minute
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        
        // Read from Kafka
        DataStream<UserEvent> events = env
            .addSource(new FlinkKafkaConsumer<>(
                "user_events",
                new UserEventSchema(),
                kafkaProperties
            ));
        
        // Process events
        DataStream<UserProfile> profiles = events
            .keyBy(UserEvent::getUserId)
            .window(TumblingEventTimeWindows.of(Time.minutes(5)))
            .process(new UserProfileUpdater());
        
        // Write to Redis
        profiles.addSink(new RedisSink<>(redisConfig, new UserProfileRedisMapper()));
        
        env.execute("User Event Processor");
    }
}

// Custom process function
class UserProfileUpdater extends ProcessWindowFunction<UserEvent, UserProfile, String, TimeWindow> {
    @Override
    public void process(String userId, Context context, Iterable<UserEvent> events, Collector<UserProfile> out) {
        // Aggregate events into user profile
        UserProfile profile = new UserProfile(userId);
        
        for (UserEvent event : events) {
            profile.update(event);
        }
        
        out.collect(profile);
    }
}

PyFlink Processing

# ============================================================
# PYFLINK PROCESSING
# ============================================================

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col, lit

# Create Flink table environment
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)

# Enable checkpointing
t_env.get_config().set("execution.checkpointing.interval", "60s")
t_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE")

# Define Kafka source
t_env.execute_sql("""
    CREATE TABLE user_events (
        event_id STRING,
        user_id STRING,
        event_type STRING,
        metadata STRING,
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_events',
        'properties.bootstrap.servers' = 'kafka:9092',
        'properties.group.id' = 'flink-processor',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'json'
    )
""")

# Define Redis sink
t_env.execute_sql("""
    CREATE TABLE user_profiles (
        user_id STRING,
        total_views BIGINT,
        total_purchases BIGINT,
        favorite_category STRING,
        last_active TIMESTAMP(3),
        PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
        'connector' = 'redis',
        'redis.mode' = 'cluster',
        'redis.hosts' = 'redis-cluster:7000,redis-cluster:7001,redis-cluster:7002',
        'redis.command' = 'XADD'
    )
""")

# Process events
t_env.execute_sql("""
    INSERT INTO user_profiles
    SELECT
        user_id,
        COUNT(CASE WHEN event_type = 'product_view' THEN 1 END) as total_views,
        COUNT(CASE WHEN event_type = 'purchase' THEN 1 END) as total_purchases,
        MODE() WITHIN GROUP (ORDER BY metadata->>'category') as favorite_category,
        MAX(event_time) as last_active
    FROM user_events
    GROUP BY user_id
""")

Redis Serving

# ============================================================
# REDIS SERVING
# ============================================================

import redis
import json
from typing import Dict, List

class RecommendationService:
    def __init__(self):
        self.redis = redis.RedisCluster(
            startup_nodes=[
                {"host": "redis-cluster", "port": 7000},
                {"host": "redis-cluster", "port": 7001},
                {"host": "redis-cluster", "port": 7002},
            ],
            decode_responses=True
        )
    
    def get_recommendations(self, user_id: str, num_recommendations: int = 10) -> List[Dict]:
        """Get real-time recommendations for user"""
        
        # Get user profile
        profile = self.redis.hgetall(f"user:{user_id}:profile")
        
        # Get recent views
        recent_views = self.redis.lrange(f"user:{user_id}:recent_views", 0, 9)
        
        # Get recommendations from sorted set
        recommendations = self.redis.zrevrange(
            f"user:{user_id}:recommendations",
            0,
            num_recommendations - 1,
            withscores=True
        )
        
        # Format results
        results = []
        for product_id, score in recommendations:
            product_info = self.redis.hgetall(f"product:{product_id}")
            results.append({
                'product_id': product_id,
                'score': score,
                'name': product_info.get('name'),
                'category': product_info.get('category'),
                'price': float(product_info.get('price', 0))
            })
        
        return results
    
    def update_user_profile(self, user_id: str, event: Dict):
        """Update user profile with new event"""
        
        pipe = self.redis.pipeline()
        
        # Update profile
        pipe.hincrby(f"user:{user_id}:profile", "total_views", 1)
        pipe.hset(f"user:{user_id}:profile", "last_active", event['timestamp'])
        
        # Add to recent views
        pipe.lpush(f"user:{user_id}:recent_views", event['product_id'])
        pipe.ltrim(f"user:{user_id}:recent_views", 0, 99)  # Keep last 100
        
        # Update recommendation scores
        pipe.zincrby(f"user:{user_id}:recommendations", 1, event['product_id'])
        
        # Execute pipeline
        pipe.execute()
    
    def get_user_profile(self, user_id: str) -> Dict:
        """Get user profile"""
        return self.redis.hgetall(f"user:{user_id}:profile")

# Example usage
service = RecommendationService()

# Get recommendations
recommendations = service.get_recommendations("user_123", num_recommendations=5)
print(f"Recommendations: {recommendations}")

# Update profile with new event
event = {
    'product_id': 'product_456',
    'event_type': 'product_view',
    'timestamp': '2024-01-15T10:30:00'
}
service.update_user_profile("user_123", event)

End-to-End Pipeline

# ============================================================
# END-TO-END REAL-TIME PIPELINE
# ============================================================

from kafka import KafkaProducer, KafkaConsumer
import redis
import json
from datetime import datetime
import time

class RealTimePipeline:
    def __init__(self):
        # Kafka producer
        self.producer = KafkaProducer(
            bootstrap_servers='kafka:9092',
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            acks='all',
            enable_idempotence=True
        )
        
        # Redis client
        self.redis = redis.RedisCluster(
            startup_nodes=[
                {"host": "redis-cluster", "port": 7000},
            ],
            decode_responses=True
        )
    
    def ingest_event(self, user_id: str, event_type: str, metadata: Dict):
        """Ingest event into pipeline"""
        
        event = {
            'user_id': user_id,
            'event_type': event_type,
            'metadata': metadata,
            'timestamp': datetime.now().isoformat(),
            'event_id': f"{user_id}_{int(time.time() * 1000)}"
        }
        
        # Send to Kafka
        self.producer.send('user_events', key=user_id, value=event)
        
        return event['event_id']
    
    def process_event(self, event: Dict):
        """Process event and update recommendations"""
        
        user_id = event['user_id']
        event_type = event['event_type']
        metadata = event['metadata']
        
        pipe = self.redis.pipeline()
        
        # Update user profile
        pipe.hincrby(f"user:{user_id}:profile", "total_views", 1)
        pipe.hset(f"user:{user_id}:profile", "last_active", event['timestamp'])
        
        # Add to recent views
        pipe.lpush(f"user:{user_id}:recent_views", metadata['product_id'])
        pipe.ltrim(f"user:{user_id}:recent_views", 0, 99)
        
        # Update recommendation scores
        pipe.zincrby(f"user:{user_id}:recommendations", 1, metadata['product_id'])
        
        # Get similar products (simplified)
        similar_products = self.get_similar_products(metadata['product_id'])
        for product_id in similar_products:
            pipe.zincrby(f"user:{user_id}:recommendations", 0.5, product_id)
        
        pipe.execute()
    
    def get_similar_products(self, product_id: str) -> List[str]:
        """Get similar products (simplified)"""
        # In production, use collaborative filtering or content-based similarity
        return self.redis.smembers(f"product:{product_id}:similar")
    
    def get_recommendations(self, user_id: str, num_recommendations: int = 10) -> List[Dict]:
        """Get recommendations for user"""
        
        recommendations = self.redis.zrevrange(
            f"user:{user_id}:recommendations",
            0,
            num_recommendations - 1,
            withscores=True
        )
        
        results = []
        for product_id, score in recommendations:
            product_info = self.redis.hgetall(f"product:{product_id}")
            results.append({
                'product_id': product_id,
                'score': score,
                'name': product_info.get('name'),
                'price': float(product_info.get('price', 0))
            })
        
        return results

# Usage
pipeline = RealTimePipeline()

# Ingest events
for i in range(1000000):
    user_id = f"user_{i % 10000}"
    product_id = f"product_{i % 1000}"
    
    pipeline.ingest_event(user_id, 'product_view', {
        'product_id': product_id,
        'category': 'electronics'
    })
    
    if i % 10000 == 0:
        print(f"Ingested {i} events")

Monitoring

# ============================================================
# MONITORING
# ============================================================

from prometheus_client import start_http_server, Gauge, Counter
import time

# Prometheus metrics
EVENTS_PROCESSED = Counter('events_processed_total', 'Total events processed')
EVENTS_FAILED = Counter('events_failed_total', 'Total events failed')
PROCESSING_LATENCY = Gauge('processing_latency_seconds', 'Processing latency')
KAFKA_LAG = Gauge('kafka_lag', 'Kafka consumer lag', ['topic', 'partition'])

# Start metrics server
start_http_server(8000)

def monitor_pipeline():
    """Monitor pipeline health"""
    
    while True:
        # Get Kafka lag
        consumer = KafkaConsumer(
            'user_events',
            bootstrap_servers='kafka:9092',
            group_id='monitor'
        )
        
        for partition in consumer.assignment():
            lag = consumer.end_offsets([partition])[partition] - consumer.position(partition)
            KAFKA_LAG.labels(topic='user_events', partition=partition.partition).set(lag)
        
        consumer.close()
        
        # Check Redis health
        redis_client = redis.Redis(host='redis-cluster', port=7000)
        redis_client.ping()
        
        time.sleep(10)

# Start monitoring
monitor_pipeline()

πŸ’‘

Production Tip: For 500K events/second, use: (1) 100+ Kafka partitions, (2) Flink with RocksDB state backend, (3) Redis Cluster with 6+ nodes, (4) Horizontal scaling with Kubernetes.


Common Follow-Up Questions

Q1: How do you handle backpressure?

# Flink backpressure handling
env.get_config().set("pipeline.backpressure.interval", "500ms")
env.get_config().set("pipeline.backpressure.strategy", "adaptive")

# Kafka consumer backpressure
consumer = KafkaConsumer(
    'user_events',
    max_poll_records=500,
    max_poll_interval_ms=300000
)

Q2: How do you handle late events?

# Flink watermarks for late events
t_env.execute_sql("""
    SELECT
        user_id,
        COUNT(*) as event_count
    FROM user_events
    GROUP BY user_id,
        TUMBLE(event_time, INTERVAL '5' MINUTE)
""")

Q3: How do you monitor real-time pipelines?

# Metrics to monitor
- Kafka lag (consumer lag)
- Processing latency (end-to-end)
- Throughput (events/second)
- Error rate
- Redis memory usage
- Redis hit/miss rate

Q4: How do you test real-time pipelines?

# Integration test with test containers
from testcontainers.kafka import KafkaContainer
from testcontainers.redis import RedisContainer

def test_real_time_pipeline():
    with KafkaContainer() as kafka, RedisContainer() as redis:
        # Setup pipeline with test containers
        pipeline = RealTimePipeline(kafka.get_bootstrap_server(), redis.get_redis_host())
        
        # Ingest test events
        pipeline.ingest_event("test_user", "product_view", {"product_id": "test_product"})
        
        # Wait for processing
        time.sleep(5)
        
        # Verify results
        recommendations = pipeline.get_recommendations("test_user")
        assert len(recommendations) > 0

⚠️

Critical Consideration: Real-time pipelines are harder to debug than batch pipelines. Always implement: (1) comprehensive logging, (2) dead letter queues for failed events, (3) idempotent processing, and (4) monitoring/alerting.


Company-Specific Tips

Uber Interview Tips

  • Discuss real-time location tracking
  • Explain surge pricing calculations
  • Mention ride matching algorithms
  • Talk about fraud detection in real-time

Netflix Interview Tips

  • Focus on content recommendations
  • Explain viewing history processing
  • Mention A/B testing real-time metrics
  • Talk about personalization pipelines

Amazon Interview Tips

  • Discuss product recommendations
  • Explain dynamic pricing calculations
  • Mention inventory management real-time
  • Talk about fraud detection pipelines

ℹ️

Final Takeaway: Real-time pipelines require careful architecture choices. Use Kafka for ingestion, Flink for processing, and Redis for serving. Always consider: (1) latency requirements, (2) throughput requirements, (3) exactly-once semantics, and (4) fault tolerance.

Advertisement