🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Real-Time E-Commerce Analytics Pipeline (Kafka + Spark + Redis)

Data Engineering ProjectsReal-Time Streaming⭐ Premium

Advertisement

Real-Time E-Commerce Analytics Pipeline

Kafka + Spark Structured Streaming + Redis + PostgreSQL

ℹ️

Project Difficulty: Advanced | Duration: 3-4 weeks | Cloud: AWS/GCP This project implements a complete real-time analytics pipeline processing 100K+ events/second for an e-commerce platform, enabling instant business insights and fraud detection.

Project Overview

Problem Statement

E-commerce platforms generate massive volumes of clickstream data, transactions, and inventory updates every second. Traditional batch processing creates hours of latency, preventing real-time decision-making for dynamic pricing, fraud detection, and personalized recommendations.

Objectives

  1. Ingest 100K+ events/second from multiple sources
  2. Process and enrich data in real-time with sub-second latency
  3. Detect fraudulent transactions within 500ms
  4. Power real-time dashboards for business metrics
  5. Maintain exactly-once processing semantics

Tech Stack

ComponentTechnologyPurpose
Message BrokerApache KafkaEvent ingestion and buffering
Stream ProcessingApache Spark Structured StreamingReal-time transformations
Cache LayerRedis ClusterLow-latency lookups and aggregations
DatabasePostgreSQL + TimescaleDBPersistent storage and time-series
InfrastructureTerraform + KubernetesIaC and orchestration
MonitoringPrometheus + GrafanaPipeline observability

Architecture Diagram

DATA SOURCESWeb App / ClickstreamMobile App / EventsPayment GW / TxnsInventory / StockAPACHE KAFKA CLUSTERclicks topictransactions topicinventory topicalerts topicSPARK STRUCTURED STREAMINGEnrichment (Joins, UDFs)Fraud Detection (ML)Aggregation (Window ops)SINK LAYERSRedis Cache (Hot)PostgreSQL (Warm)S3 Data Lake (Cold)Grafana Dashboards

Data Source Setup and Schema

Kafka Topic Schemas

# schemas/events.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from enum import Enum

class EventType(Enum):
    PAGE_VIEW = "page_view"
    ADD_TO_CART = "add_to_cart"
    PURCHASE = "purchase"
    PRODUCT_SEARCH = "product_search"
    USER_LOGIN = "user_login"

@dataclass
class ClickstreamEvent:
    event_id: str
    user_id: str
    session_id: str
    event_type: EventType
    product_id: Optional[str]
    timestamp: datetime
    page_url: str
    referrer: str
    device_type: str
    ip_address: str
    geo_location: dict
    metadata: dict

@dataclass
class TransactionEvent:
    transaction_id: str
    user_id: str
    order_id: str
    amount: float
    currency: str
    payment_method: str
    items: list
    timestamp: datetime
    shipping_address: dict
    billing_address: dict

@dataclass
class InventoryEvent:
    product_id: str
    warehouse_id: str
    quantity_change: int
    current_quantity: int
    operation: str  # 'restock', 'sale', 'adjustment'
    timestamp: datetime

PostgreSQL Schema

-- migrations/001_create_tables.sql
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "timescaledb";

-- Users dimension table
CREATE TABLE dim_users (
    user_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    email VARCHAR(255) UNIQUE NOT NULL,
    first_name VARCHAR(100),
    last_name VARCHAR(100),
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    is_premium BOOLEAN DEFAULT FALSE,
    lifetime_value DECIMAL(12, 2) DEFAULT 0.00
);

-- Products dimension table
CREATE TABLE dim_products (
    product_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    sku VARCHAR(50) UNIQUE NOT NULL,
    name VARCHAR(255) NOT NULL,
    category VARCHAR(100),
    subcategory VARCHAR(100),
    price DECIMAL(10, 2) NOT NULL,
    cost DECIMAL(10, 2),
    stock_quantity INTEGER DEFAULT 0,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Real-time transactions (hypertable for time-series)
CREATE TABLE rt_transactions (
    transaction_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    user_id UUID REFERENCES dim_users(user_id),
    order_id VARCHAR(50) NOT NULL,
    amount DECIMAL(12, 2) NOT NULL,
    currency VARCHAR(3) DEFAULT 'USD',
    status VARCHAR(20) NOT NULL,
    payment_method VARCHAR(50),
    fraud_score FLOAT,
    processed_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

SELECT create_hypertable('rt_transactions', 'processed_at');

-- Clickstream events (hypertable)
CREATE TABLE rt_clickstream (
    event_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    user_id UUID,
    session_id VARCHAR(100),
    event_type VARCHAR(50) NOT NULL,
    product_id UUID,
    page_url TEXT,
    device_type VARCHAR(50),
    ip_address INET,
    geo_country VARCHAR(2),
    geo_city VARCHAR(100),
    event_timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
    processed_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

SELECT create_hypertable('rt_clickstream', 'event_timestamp');

-- Real-time aggregations materialized view
CREATE MATERIALIZED VIEW mv_realtime_metrics
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 minute', processed_at) AS minute_bucket,
    user_id,
    COUNT(*) AS event_count,
    SUM(amount) AS total_amount,
    COUNT(DISTINCT session_id) AS unique_sessions
FROM rt_clickstream
LEFT JOIN rt_transactions USING (user_id)
GROUP BY minute_bucket, user_id;

-- Fraud detection results
CREATE TABLE fraud_alerts (
    alert_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    transaction_id UUID REFERENCES rt_transactions(transaction_id),
    user_id UUID REFERENCES dim_users(user_id),
    risk_score FLOAT NOT NULL,
    risk_factors JSONB,
    action_taken VARCHAR(50),
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

CREATE INDEX idx_fraud_alerts_score ON fraud_alerts(risk_score DESC);
CREATE INDEX idx_fraud_alerts_user ON fraud_alerts(user_id, created_at DESC);

Step-by-Step Implementation Guide

Step 1: Kafka Producer Setup

# producers/clickstream_producer.py
import json
import uuid
import random
from datetime import datetime
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ClickstreamProducer:
    def __init__(self, config: dict):
        self.conf = {
            'bootstrap.servers': config['kafka_brokers'],
            'client.id': 'clickstream-producer',
            'acks': 'all',
            'retries': 5,
            'retry.backoff.ms': 100,
            'enable.idempotence': True,
            'compression.type': 'snappy',
            'batch.size': 32768,
            'linger.ms': 10,
        }
        self.producer = Producer(self.conf)
        
        # Schema Registry setup
        schema_registry_conf = {
            'url': config['schema_registry_url']
        }
        self.schema_registry = SchemaRegistryClient(schema_registry_conf)
        
        # Define Avro schema
        self.clickstream_schema = """
        {
            "type": "record",
            "name": "ClickstreamEvent",
            "namespace": "com.ecommerce.events",
            "fields": [
                {"name": "event_id", "type": "string"},
                {"name": "user_id", "type": "string"},
                {"name": "session_id", "type": "string"},
                {"name": "event_type", "type": {"type": "enum", "name": "EventType", "symbols": ["PAGE_VIEW", "ADD_TO_CART", "PURCHASE", "SEARCH"]}},
                {"name": "product_id", "type": ["null", "string"], "default": null},
                {"name": "timestamp", "type": "string"},
                {"name": "page_url", "type": "string"},
                {"name": "device_type", "type": "string"},
                {"name": "ip_address", "type": "string"},
                {"name": "metadata", "type": {"type": "map", "values": "string"}, "default": {}}
            ]
        }
        """
        
        self.serializer = AvroSerializer(
            self.schema_registry,
            self.clickstream_schema,
            conf={'auto.register.schemas': True}
        )
    
    def delivery_callback(self, err, msg):
        if err:
            logger.error(f"Message delivery failed: {err}")
        else:
            logger.debug(f"Message delivered to {msg.topic()} [{msg.partition()}]")
    
    def produce_event(self, event_data: dict):
        """Produce a single clickstream event to Kafka."""
        try:
            key = event_data['user_id'].encode('utf-8')
            value = self.serializer(
                event_data,
                {"subject": f"{self.topic}-value"}
            )
            
            self.producer.produce(
                topic=self.topic,
                key=key,
                value=value,
                callback=self.delivery_callback
            )
            self.producer.poll(0)
            
        except Exception as e:
            logger.error(f"Failed to produce event: {e}")
            raise
    
    def generate_synthetic_events(self, count: int = 1000):
        """Generate synthetic clickstream data for testing."""
        pages = ['/home', '/products', '/cart', '/checkout', '/search']
        devices = ['desktop', 'mobile', 'tablet']
        event_types = ['PAGE_VIEW', 'PAGE_VIEW', 'PAGE_VIEW', 'ADD_TO_CART', 'SEARCH']
        
        for _ in range(count):
            event = {
                'event_id': str(uuid.uuid4()),
                'user_id': f"user_{random.randint(1, 10000)}",
                'session_id': f"session_{uuid.uuid4().hex[:12]}",
                'event_type': random.choice(event_types),
                'product_id': f"prod_{random.randint(1, 5000)}" if random.random() > 0.3 else None,
                'timestamp': datetime.utcnow().isoformat(),
                'page_url': random.choice(pages),
                'device_type': random.choice(devices),
                'ip_address': f"{random.randint(1, 255)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(0, 255)}",
                'metadata': {
                    'browser': random.choice(['Chrome', 'Firefox', 'Safari']),
                    'os': random.choice(['Windows', 'macOS', 'iOS', 'Android']),
                    'page_load_time': str(random.uniform(0.1, 5.0))
                }
            }
            self.produce_event(event)
        
        self.producer.flush()
        logger.info(f"Produced {count} synthetic events")
    
    def close(self):
        self.producer.flush()
        logger.info("Producer closed")

Step 2: Spark Structured Streaming Processing

# streaming/spark_processor.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, window, count, sum, avg, max, min,
    current_timestamp, lit, when, expr,
    from_json, schema_of_json, udf
)
from pyspark.sql.types import (
    StructType, StructField, StringType, FloatType,
    IntegerType, TimestampType, MapType, ArrayType
)
import redis
import json

class EcommerceStreamProcessor:
    def __init__(self, spark_config: dict):
        self.spark = SparkSession.builder \
            .appName("EcommerceRealTimeAnalytics") \
            .config("spark.sql.shuffle.partitions", "200") \
            .config("spark.streaming.backpressure.enabled", "true") \
            .config("spark.streaming.kafka.maxRatePerPartition", "10000") \
            .config("spark.sql.streaming.checkpointLocation", "/checkpoints/ecommerce") \
            .config("spark.jars.packages", 
                    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0,"
                    "org.postgresql:postgresql:42.6.0") \
            .getOrCreate()
        
        self.spark.sparkContext.setLogLevel("WARN")
        
        # Define schemas
        self.clickstream_schema = StructType([
            StructField("event_id", StringType(), False),
            StructField("user_id", StringType(), False),
            StructField("session_id", StringType(), False),
            StructField("event_type", StringType(), False),
            StructField("product_id", StringType(), True),
            StructField("timestamp", StringType(), False),
            StructField("page_url", StringType(), False),
            StructField("device_type", StringType(), False),
            StructField("ip_address", StringType(), False),
            StructField("metadata", MapType(StringType(), StringType()), True)
        ])
        
        self.transaction_schema = StructType([
            StructField("transaction_id", StringType(), False),
            StructField("user_id", StringType(), False),
            StructField("order_id", StringType(), False),
            StructField("amount", FloatType(), False),
            StructField("currency", StringType(), False),
            StructField("payment_method", StringType(), False),
            StructField("items", ArrayType(StringType()), False),
            StructField("timestamp", StringType(), False),
            StructField("shipping_address", MapType(StringType(), StringType()), True)
        ])
    
    def read_kafka_stream(self, topic: str, bootstrap_servers: str):
        """Read streaming data from Kafka topic."""
        return self.spark \
            .readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", bootstrap_servers) \
            .option("subscribe", topic) \
            .option("startingOffsets", "latest") \
            .option("failOnDataLoss", "false") \
            .option("maxOffsetsPerTrigger", 100000) \
            .load()
    
    def process_clickstream(self, kafka_df, schema):
        """Process raw clickstream events from Kafka."""
        # Parse JSON and extract fields
        parsed_df = kafka_df \
            .select(
                col("key").cast("string").alias("kafka_key"),
                from_json(col("value").cast("string"), schema).alias("data"),
                col("timestamp").alias("kafka_timestamp"),
                col("partition"),
                col("offset")
            ) \
            .select("kafka_key", "data.*", "kafka_timestamp", "partition", "offset") \
            .withColumn("event_timestamp", col("timestamp").cast(TimestampType())) \
            .withColumn("processed_at", current_timestamp())
        
        # Add derived columns
        enriched_df = parsed_df \
            .withColumn("hour", expr("hour(event_timestamp)")) \
            .withColumn("day_of_week", expr("dayofweek(event_timestamp)")) \
            .withColumn("is_business_hour", 
                       when((col("hour") >= 9) & (col("hour") <= 17), True)
                       .otherwise(False))
        
        return enriched_df
    
    def calculate_realtime_metrics(self, clickstream_df):
        """Calculate real-time business metrics using windowed aggregations."""
        # 5-minute tumbling window metrics
        windowed_metrics = clickstream_df \
            .withWatermark("event_timestamp", "2 minutes") \
            .groupBy(
                window("event_timestamp", "5 minutes"),
                "event_type"
            ) \
            .agg(
                count("*").alias("event_count"),
                countDistinct("user_id").alias("unique_users"),
                countDistinct("session_id").alias("unique_sessions")
            ) \
            .select(
                col("window.start").alias("window_start"),
                col("window.end").alias("window_end"),
                col("event_type"),
                col("event_count"),
                col("unique_users"),
                col("unique_sessions")
            )
        
        return windowed_metrics
    
    def detect_fraud_signals(self, transaction_df):
        """Real-time fraud detection using rule-based and ML scoring."""
        # Define fraud detection UDF
        @udf(returnType=FloatType())
        def calculate_fraud_score(amount, payment_method, user_id, ip_address):
            score = 0.0
            
            # High amount transactions
            if amount and float(amount) > 1000:
                score += 0.3
            if amount and float(amount) > 5000:
                score += 0.2
            
            # Suspicious payment methods
            if payment_method in ['prepaid_card', 'gift_card']:
                score += 0.2
            
            # IP-based heuristics (simplified)
            if ip_address and ip_address.startswith('10.'):
                score += 0.1
            
            return min(score, 1.0)
        
        # Apply fraud scoring
        fraud_scores_df = transaction_df \
            .withColumn(
                "fraud_score",
                calculate_fraud_score(
                    col("amount"),
                    col("payment_method"),
                    col("user_id"),
                    col("ip_address")
                )
            ) \
            .withColumn(
                "risk_level",
                when(col("fraud_score") > 0.7, "HIGH")
                .when(col("fraud_score") > 0.4, "MEDIUM")
                .otherwise("LOW")
            ) \
            .withColumn(
                "requires_review",
                when(col("fraud_score") > 0.5, True).otherwise(False)
            )
        
        return fraud_scores_df
    
    def write_to_redis(self, df, batch_id):
        """Write micro-batch results to Redis for low-latency access."""
        redis_client = redis.Redis(
            host='redis-cluster.internal',
            port=6379,
            decode_responses=True,
            ssl=True
        )
        
        try:
            # Collect results (for small batches)
            results = df.collect()
            
            pipe = redis_client.pipeline()
            
            for row in results:
                key = f"metrics:{row['window_start']}:{row['event_type']}"
                value = {
                    'event_count': row['event_count'],
                    'unique_users': row['unique_users'],
                    'unique_sessions': row['unique_sessions'],
                    'window_start': str(row['window_start']),
                    'window_end': str(row['window_end']),
                    'updated_at': datetime.utcnow().isoformat()
                }
                
                # Set with 1 hour TTL
                pipe.setex(key, 3600, json.dumps(value))
                
                # Update real-time counters
                pipe.hincrby("realtime:counters", f"events:{row['event_type']}", row['event_count'])
                pipe.hincrby("realtime:counters", "total_events", row['event_count'])
            
            pipe.execute()
            logger.info(f"Batch {batch_id}: Wrote {len(results)} records to Redis")
            
        except Exception as e:
            logger.error(f"Redis write failed for batch {batch_id}: {e}")
            raise
        finally:
            redis_client.close()
    
    def write_to_postgresql(self, df, batch_id, table_name):
        """Write results to PostgreSQL with upsert logic."""
        jdbc_url = "jdbc:postgresql://postgres-cluster.internal:5432/ecommerce"
        connection_properties = {
            "user": "pipeline_user",
            "password": "${DB_PASSWORD}",
            "driver": "org.postgresql.Driver",
            "batchsize": "5000",
            "rewriteBatchedStatements": "true"
        }
        
        try:
            df.write \
                .mode("append") \
                .jdbc(jdbc_url, table_name, connection_properties)
            
            logger.info(f"Batch {batch_id}: Wrote to PostgreSQL table {table_name}")
            
        except Exception as e:
            logger.error(f"PostgreSQL write failed for batch {batch_id}: {e}")
            raise
    
    def start_streaming_pipeline(self, config: dict):
        """Main streaming pipeline orchestration."""
        # Read clickstream stream
        clickstream_raw = self.read_kafka_stream(
            topic=config['clickstream_topic'],
            bootstrap_servers=config['kafka_brokers']
        )
        
        # Process clickstream
        clickstream_processed = self.process_clickstream(
            clickstream_raw, 
            self.clickstream_schema
        )
        
        # Calculate real-time metrics
        realtime_metrics = self.calculate_realtime_metrics(clickstream_processed)
        
        # Start metrics query with Redis sink
        metrics_query = realtime_metrics.writeStream \
            .outputMode("update") \
            .foreachBatch(self.write_to_redis) \
            .option("checkpointLocation", "/checkpoints/metrics") \
            .trigger(processingTime="30 seconds") \
            .start()
        
        # Write detailed events to PostgreSQL
        events_query = clickstream_processed.writeStream \
            .outputMode("append") \
            .foreachBatch(lambda df, batch_id: self.write_to_postgresql(
                df.select("event_id", "user_id", "session_id", "event_type",
                         "product_id", "event_timestamp", "page_url", 
                         "device_type", "ip_address", "metadata"),
                batch_id,
                "rt_clickstream"
            )) \
            .option("checkpointLocation", "/checkpoints/events") \
            .trigger(processingTime="1 minute") \
            .start()
        
        # Read transaction stream
        transaction_raw = self.read_kafka_stream(
            topic=config['transaction_topic'],
            bootstrap_servers=config['kafka_brokers']
        )
        
        # Process transactions with fraud detection
        transactions_processed = transaction_raw \
            .select(from_json(col("value").cast("string"), self.transaction_schema).alias("data")) \
            .select("data.*") \
            .withColumn("event_timestamp", col("timestamp").cast(TimestampType()))
        
        fraud_scored = self.detect_fraud_signals(transactions_processed)
        
        # Write fraud alerts
        fraud_query = fraud_scored \
            .filter(col("requires_review") == True) \
            .writeStream \
            .outputMode("append") \
            .foreachBatch(lambda df, batch_id: self.write_to_postgresql(
                df.select("transaction_id", "user_id", "fraud_score", 
                         "risk_level", "requires_review"),
                batch_id,
                "fraud_alerts"
            )) \
            .option("checkpointLocation", "/checkpoints/fraud") \
            .trigger(processingTime="1 minute") \
            .start()
        
        logger.info("All streaming queries started successfully")
        
        # Wait for termination
        self.spark.streams.awaitAnyTermination()

Step 3: Redis Cache Layer

# cache/redis_manager.py
import redis
import json
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import logging

class RedisCacheManager:
    def __init__(self, config: dict):
        self.redis_client = redis.Redis(
            host=config['redis_host'],
            port=config.get('redis_port', 6379),
            db=config.get('redis_db', 0),
            decode_responses=True,
            ssl=config.get('ssl', True),
            socket_connect_timeout=5,
            retry_on_timeout=True
        )
        self.ttl_config = {
            'realtime_metrics': 300,      # 5 minutes
            'user_sessions': 1800,         # 30 minutes
            'product_views': 3600,         # 1 hour
            'fraud_alerts': 86400,         # 24 hours
            'daily_metrics': 86400 * 7     # 7 days
        }
    
    def cache_realtime_metrics(self, metrics: Dict[str, Any], window_key: str):
        """Cache real-time metrics with automatic expiration."""
        key = f"rt:metrics:{window_key}"
        pipeline = self.redis_client.pipeline()
        
        # Store main metrics
        pipeline.hset(key, mapping=metrics)
        pipeline.expire(key, self.ttl_config['realtime_metrics'])
        
        # Update global counters
        pipeline.hincrby("rt:counters:global", "total_events", metrics.get('event_count', 0))
        pipeline.hincrby("rt:counters:global", "total_users", metrics.get('unique_users', 0))
        
        # Add to time-series for trend analysis
        ts_key = f"rt:ts:{metrics.get('event_type', 'unknown')}"
        pipeline.zadd(ts_key, {json.dumps(metrics): datetime.utcnow().timestamp()})
        pipeline.expire(ts_key, self.ttl_config['realtime_metrics'])
        
        pipeline.execute()
        return True
    
    def get_realtime_dashboard_data(self) -> Dict[str, Any]:
        """Aggregate real-time data for dashboard display."""
        pipeline = self.redis_client.pipeline()
        
        # Get global counters
        pipeline.hgetall("rt:counters:global")
        
        # Get recent metrics (last 5 windows)
        pipeline.zrevrange("rt:ts:page_view", 0, 4, withscores=True)
        pipeline.zrevrange("rt:ts:purchase", 0, 4, withscores=True)
        pipeline.zrevrange("rt:ts:add_to_cart", 0, 4, withscores=True)
        
        results = pipeline.execute()
        
        return {
            'global_counters': results[0] or {},
            'recent_page_views': [json.loads(r[0]) for r in (results[1] or [])],
            'recent_purchases': [json.loads(r[0]) for r in (results[2] or [])],
            'recent_cart_adds': [json.loads(r[0]) for r in (results[3] or [])],
            'last_updated': datetime.utcnow().isoformat()
        }
    
    def track_user_session(self, user_id: str, session_data: Dict[str, Any]):
        """Track active user sessions."""
        key = f"session:{user_id}:{session_data.get('session_id')}"
        pipeline = self.redis_client.pipeline()
        
        pipeline.hset(key, mapping=session_data)
        pipeline.expire(key, self.ttl_config['user_sessions'])
        
        # Add to active users set
        pipeline.sadd("active_users", user_id)
        pipeline.expire("active_users", 300)
        
        pipeline.execute()
    
    def get_product_analytics(self, product_id: str) -> Dict[str, Any]:
        """Get aggregated product analytics."""
        key = f"product:analytics:{product_id}"
        
        cached = self.redis_client.get(key)
        if cached:
            return json.loads(cached)
        
        # Calculate from real-time data
        analytics = {
            'product_id': product_id,
            'views_1h': self._count_events(product_id, 'page_view', 3600),
            'cart_adds_1h': self._count_events(product_id, 'add_to_cart', 3600),
            'purchases_1h': self._count_events(product_id, 'purchase', 3600),
            'conversion_rate': 0.0,
            'last_updated': datetime.utcnow().isoformat()
        }
        
        # Calculate conversion rate
        if analytics['views_1h'] > 0:
            analytics['conversion_rate'] = (
                analytics['purchases_1h'] / analytics['views_1h']
            ) * 100
        
        # Cache the result
        self.redis_client.setex(
            key,
            self.ttl_config['product_views'],
            json.dumps(analytics)
        )
        
        return analytics
    
    def _count_events(self, product_id: str, event_type: str, 
                      window_seconds: int) -> int:
        """Count events for a product within time window."""
        cutoff = datetime.utcnow() - timedelta(seconds=window_seconds)
        key = f"events:{event_type}:{product_id}"
        
        # Use sorted set for time-windowed counting
        return self.redis_client.zcount(
            key,
            cutoff.timestamp(),
            datetime.utcnow().timestamp()
        )
    
    def publish_alert(self, alert_type: str, alert_data: Dict[str, Any]):
        """Publish real-time alerts via Redis Pub/Sub."""
        channel = f"alerts:{alert_type}"
        message = json.dumps({
            **alert_data,
            'published_at': datetime.utcnow().isoformat()
        })
        
        self.redis_client.publish(channel, message)
        
        # Also store in recent alerts list
        self.redis_client.lpush(
            f"recent_alerts:{alert_type}",
            message
        )
        self.redis_client.ltrim(f"recent_alerts:{alert_type}", 0, 99)
        self.redis_client.expire(f"recent_alerts:{alert_type}", 86400)
    
    def health_check(self) -> bool:
        """Check Redis cluster health."""
        try:
            return self.redis_client.ping()
        except redis.ConnectionError:
            return False

Infrastructure Setup (Terraform)

# infrastructure/main.tf
terraform {
  required_version = ">= 1.5.0"
  
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
    helm = {
      source  = "hashicorp/helm"
      version = "~> 2.11"
    }
  }
  
  backend "s3" {
    bucket         = "ecommerce-terraform-state"
    key            = "streaming-pipeline/terraform.tfstate"
    region         = "us-east-1"
    dynamodb_table = "terraform-locks"
    encrypt        = true
  }
}

provider "aws" {
  region = var.aws_region
  
  default_tags {
    tags = {
      Project     = "ecommerce-pipeline"
      Environment = var.environment
      ManagedBy   = "terraform"
    }
  }
}

# Variables
variable "aws_region" {
  description = "AWS region"
  type        = string
  default     = "us-east-1"
}

variable "environment" {
  description = "Environment name"
  type        = string
  default     = "production"
}

variable "cluster_name" {
  description = "EKS cluster name"
  type        = string
  default     = "ecommerce-pipeline"
}

# VPC Configuration
module "vpc" {
  source  = "terraform-aws-modules/vpc/aws"
  version = "5.0.0"

  name = "${var.cluster_name}-vpc"
  cidr = "10.0.0.0/16"

  azs             = ["${var.aws_region}a", "${var.aws_region}b", "${var.aws_region}c"]
  private_subnets = ["10.0.1.0/24", "10.0.2.0/24", "10.0.3.0/24"]
  public_subnets  = ["10.0.101.0/24", "10.0.102.0/24", "10.0.103.0/24"]

  enable_nat_gateway   = true
  single_nat_gateway   = var.environment != "production"
  enable_dns_hostnames = true
  enable_dns_support   = true

  tags = {
    "kubernetes.io/cluster/${var.cluster_name}" = "owned"
  }

  public_subnet_tags = {
    "kubernetes.io/cluster/${var.cluster_name}" = "shared"
    "kubernetes.io/role/elb"                     = 1
  }

  private_subnet_tags = {
    "kubernetes.io/cluster/${var.cluster_name}" = "shared"
    "kubernetes.io/role/internal-elb"            = 1
  }
}

# EKS Cluster
module "eks" {
  source  = "terraform-aws-modules/eks/aws"
  version = "19.0.0"

  cluster_name    = var.cluster_name
  cluster_version = "1.28"

  vpc_id     = module.vpc.vpc_id
  subnet_ids = module.vpc.private_subnets

  cluster_endpoint_public_access  = true
  cluster_endpoint_private_access = true

  # Managed node groups
  eks_managed_node_groups = {
    # Kafka brokers
    kafka = {
      name           = "kafka-nodes"
      instance_types = ["m5.2xlarge"]
      
      min_size     = 3
      max_size     = 9
      desired_size = 3

      labels = {
        role = "kafka"
      }

      taints = [{
        key    = "dedicated"
        value  = "kafka"
        effect = "NO_SCHEDULE"
      }]
    }

    # Spark workers
    spark = {
      name           = "spark-nodes"
      instance_types = ["r5.2xlarge"]
      
      min_size     = 2
      max_size     = 20
      desired_size = 4

      labels = {
        role = "spark"
      }

      taints = [{
        key    = "dedicated"
        value  = "spark"
        effect = "NO_SCHEDULE"
      }]
    }

    # General workloads
    general = {
      name           = "general-nodes"
      instance_types = ["m5.xlarge"]
      
      min_size     = 2
      max_size     = 10
      desired_size = 3

      labels = {
        role = "general"
      }
    }
  }

  # Cluster addons
  cluster_addons = {
    coredns = {
      most_recent = true
    }
    kube-proxy = {
      most_recent = true
    }
    vpc-cni = {
      most_recent = true
    }
    aws-ebs-csi-driver = {
      most_recent              = true
      service_account_role_arn = module.ebs_csi_irsa.iam_role_arn
    }
  }
}

# IAM Role for EBS CSI Driver
module "ebs_csi_irsa" {
  source  = "terraform-aws-modules/iam/aws//modules/iam-role-for-service-accounts-eks"
  version = "5.30.0"

  role_name             = "${var.cluster_name}-ebs-csi-controller"
  attach_ebs_csi_policy = true

  oidc_providers = {
    main = {
      provider_arn               = module.eks.oidc_provider_arn
      namespace_service_accounts = ["kube-system:ebs-csi-controller-sa"]
    }
  }
}

# MSK (Managed Kafka)
resource "aws_msk_cluster" "kafka" {
  cluster_name           = "${var.cluster_name}-kafka"
  kafka_version          = "3.5.1"
  number_of_broker_nodes = 3

  broker_node_group_info {
    instance_type   = "kafka.m5.2xlarge"
    client_subnets  = module.vpc.private_subnets
    security_groups = [aws_security_group.kafka.id]

    storage_info {
      ebs_storage_info {
        volume_size = 500
      }
    }
  }

  configuration_info {
    arn      = aws_msk_configuration.kafka.arn
    revision = aws_msk_configuration.kafka.latest_revision
  }

  encryption_info {
    encryption_in_transit {
      client_broker = "TLS"
      in_cluster    = true
    }
    encryption_at_rest_kms_key_arn = aws_kms_key.kafka.arn
  }

  logging_info {
    broker_logs {
      cloudwatch_logs {
        enabled   = true
        log_group = aws_cloudwatch_log_group.kafka.name
      }
    }
  }
}

resource "aws_msk_configuration" "kafka" {
  kafka_versions    = ["3.5.1"]
  name              = "${var.cluster_name}-kafka-config"
  replication_factor = 3

  configuration_properties = {
    "auto.create.topics.enable"      = "true"
    "delete.topic.enable"            = "true"
    "log.retention.hours"            = "168"
    "log.retention.bytes"            = "1073741824"
    "num.partitions"                 = "6"
    "default.replication.factor"     = "3"
    "min.insync.replicas"           = "2"
    "compression.type"              = "snappy"
  }
}

# Security Group for Kafka
resource "aws_security_group" "kafka" {
  name_prefix = "${var.cluster_name}-kafka-"
  vpc_id      = module.vpc.vpc_id

  ingress {
    from_port       = 9092
    to_port         = 9092
    protocol        = "tcp"
    cidr_blocks     = [module.vpc.vpc_cidr_block]
    description     = "Kafka broker"
  }

  ingress {
    from_port       = 9094
    to_port         = 9094
    protocol        = "tcp"
    cidr_blocks     = [module.vpc.vpc_cidr_block]
    description     = "Kafka broker SSL"
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }

  tags = {
    Name = "${var.cluster_name}-kafka-sg"
  }
}

# KMS Key for encryption
resource "aws_kms_key" "kafka" {
  description             = "KMS key for MSK cluster encryption"
  deletion_window_in_days = 7
  enable_key_rotation     = true
}

# CloudWatch Log Group
resource "aws_cloudwatch_log_group" "kafka" {
  name              = "/aws/msk/${var.cluster_name}"
  retention_in_days = 30
}

# ElastiCache Redis
resource "aws_elasticache_replication_group" "redis" {
  replication_group_id       = "${var.cluster_name}-redis"
  description                = "Redis cluster for real-time caching"
  
  node_type            = "cache.r6g.large"
  num_cache_clusters   = 3
  
  engine               = "redis"
  engine_version       = "7.0"
  port                 = 6379
  
  subnet_group_name    = aws_elasticache_subnet_group.redis.name
  security_group_ids   = [aws_security_group.redis.id]
  
  at_rest_encryption_enabled = true
  transit_encryption_enabled = true
  
  automatic_failover_enabled = true
  multi_az_enabled          = true
  
  snapshot_retention_limit = 7
  snapshot_window         = "03:00-04:00"
  
  maintenance_window = "sun:04:00-sun:05:00"
  
  tags = {
    Name = "${var.cluster_name}-redis"
  }
}

resource "aws_elasticache_subnet_group" "redis" {
  name       = "${var.cluster_name}-redis-subnet"
  subnet_ids = module.vpc.private_subnets
}

resource "aws_security_group" "redis" {
  name_prefix = "${var.cluster_name}-redis-"
  vpc_id      = module.vpc.vpc_id

  ingress {
    from_port       = 6379
    to_port         = 6379
    protocol        = "tcp"
    cidr_blocks     = [module.vpc.vpc_cidr_block]
    description     = "Redis"
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

# RDS PostgreSQL
resource "aws_db_instance" "postgres" {
  identifier = "${var.cluster_name}-postgres"
  
  engine               = "postgres"
  engine_version       = "15.4"
  instance_class       = "db.r6g.xlarge"
  
  allocated_storage     = 100
  max_allocated_storage = 500
  
  db_name  = "ecommerce"
  username = "admin"
  password = var.db_password
  
  vpc_security_group_ids = [aws_security_group.postgres.id]
  db_subnet_group_name   = aws_db_subnet_group.postgres.name
  
  backup_retention_period = 7
  backup_window          = "03:00-04:00"
  maintenance_window     = "Mon:04:00-Mon:05:00"
  
  storage_encrypted = true
  kms_key_id       = aws_kms_key.rds.arn
  
  performance_insights_enabled = true
  monitoring_interval          = 60
  
  deletion_protection = var.environment == "production"
  
  tags = {
    Name = "${var.cluster_name}-postgres"
  }
}

resource "aws_db_subnet_group" "postgres" {
  name       = "${var.cluster_name}-postgres-subnet"
  subnet_ids = module.vpc.private_subnets
}

resource "aws_security_group" "postgres" {
  name_prefix = "${var.cluster_name}-postgres-"
  vpc_id      = module.vpc.vpc_id

  ingress {
    from_port       = 5432
    to_port         = 5432
    protocol        = "tcp"
    cidr_blocks     = [module.vpc.vpc_cidr_block]
    description     = "PostgreSQL"
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

resource "aws_kms_key" "rds" {
  description             = "KMS key for RDS encryption"
  deletion_window_in_days = 7
  enable_key_rotation     = true
}

# S3 Bucket for data lake
resource "aws_s3_bucket" "data_lake" {
  bucket = "${var.cluster_name}-data-lake-${var.aws_region}"
}

resource "aws_s3_bucket_versioning" "data_lake" {
  bucket = aws_s3_bucket.data_lake.id
  
  versioning_configuration {
    status = "Enabled"
  }
}

resource "aws_s3_bucket_server_side_encryption_configuration" "data_lake" {
  bucket = aws_s3_bucket.data_lake.id

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm = "aws:kms"
      kms_master_key_id = aws_kms_key.s3.arn
    }
  }
}

resource "aws_s3_bucket_lifecycle_configuration" "data_lake" {
  bucket = aws_s3_bucket.data_lake.id

  rule {
    id     = "transition-to-ia"
    status = "Enabled"
    
    transition {
      days          = 30
      storage_class = "STANDARD_IA"
    }
    
    transition {
      days          = 90
      storage_class = "GLACIER"
    }
    
    transition {
      days          = 365
      storage_class = "DEEP_ARCHIVE"
    }
  }
}

resource "aws_kms_key" "s3" {
  description             = "KMS key for S3 encryption"
  deletion_window_in_days = 7
  enable_key_rotation     = true
}

# Helm Chart for Spark Operator
resource "helm_release" "spark_operator" {
  name       = "spark-operator"
  repository = "https://kubeflow.github.io/spark-operator"
  chart      = "spark-operator"
  version    = "1.1.15"
  namespace  = "spark"

  create_namespace = true

  values = [
    templatefile("${path.module}/values/spark-operator.yaml", {
      aws_region = var.aws_region
    })
  ]

  depends_on = [module.eks]
}

# Outputs
output "kafka_bootstrap_servers" {
  description = "Kafka bootstrap servers"
  value       = aws_msk_cluster.kafka.bootstrap_brokers
}

output "redis_endpoint" {
  description = "Redis endpoint"
  value       = aws_elasticache_replication_group.redis.primary_endpoint_address
}

output "postgres_endpoint" {
  description = "PostgreSQL endpoint"
  value       = aws_db_instance.postgres.endpoint
}

output "eks_cluster_name" {
  description = "EKS cluster name"
  value       = module.eks.cluster_name
}

Testing and Validation

# tests/test_streaming_pipeline.py
import pytest
import json
from datetime import datetime, timedelta
from unittest.mock import Mock, patch, MagicMock
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType

class TestStreamingPipeline:
    @pytest.fixture(scope="session")
    def spark(self):
        """Create a local Spark session for testing."""
        return SparkSession.builder \
            .appName("TestEcommercePipeline") \
            .master("local[2]") \
            .config("spark.sql.shuffle.partitions", "2") \
            .config("spark.ui.enabled", "false") \
            .getOrCreate()
    
    @pytest.fixture
    def sample_clickstream(self):
        """Generate sample clickstream data."""
        return [
            {
                "event_id": "evt_001",
                "user_id": "user_123",
                "session_id": "sess_456",
                "event_type": "PAGE_VIEW",
                "product_id": "prod_789",
                "timestamp": datetime.utcnow().isoformat(),
                "page_url": "/products/123",
                "device_type": "mobile",
                "ip_address": "192.168.1.1",
                "metadata": {"browser": "Chrome"}
            },
            {
                "event_id": "evt_002",
                "user_id": "user_123",
                "session_id": "sess_456",
                "event_type": "ADD_TO_CART",
                "product_id": "prod_789",
                "timestamp": datetime.utcnow().isoformat(),
                "page_url": "/products/123",
                "device_type": "mobile",
                "ip_address": "192.168.1.1",
                "metadata": {"browser": "Chrome"}
            }
        ]
    
    @pytest.fixture
    def sample_transaction(self):
        """Generate sample transaction data."""
        return {
            "transaction_id": "txn_001",
            "user_id": "user_123",
            "order_id": "ord_001",
            "amount": 149.99,
            "currency": "USD",
            "payment_method": "credit_card",
            "items": ["prod_789"],
            "timestamp": datetime.utcnow().isoformat(),
            "shipping_address": {
                "street": "123 Main St",
                "city": "New York",
                "state": "NY",
                "zip": "10001"
            }
        }
    
    def test_clickstream_processing(self, spark, sample_clickstream):
        """Test clickstream event processing."""
        from streaming.spark_processor import EcommerceStreamProcessor
        
        processor = EcommerceStreamProcessor({})
        processor.spark = spark
        
        # Create DataFrame from sample data
        df = spark.createDataFrame(sample_clickstream)
        
        # Verify schema
        assert "event_id" in df.columns
        assert "user_id" in df.columns
        assert "event_type" in df.columns
        
        # Verify data
        assert df.count() == 2
        assert df.filter("event_type = 'PAGE_VIEW'").count() == 1
        assert df.filter("event_type = 'ADD_TO_CART'").count() == 1
    
    def test_fraud_detection(self, spark, sample_transaction):
        """Test fraud detection logic."""
        from streaming.spark_processor import EcommerceStreamProcessor
        
        processor = EcommerceStreamProcessor({})
        processor.spark = spark
        
        # Test high-value transaction
        high_value_txn = sample_transaction.copy()
        high_value_txn['amount'] = 10000.00
        
        df = spark.createDataFrame([high_value_txn])
        fraud_scored = processor.detect_fraud_signals(df)
        
        # High value should trigger fraud score
        result = fraud_scored.collect()[0]
        assert result['fraud_score'] > 0.3
    
    def test_windowed_aggregation(self, spark, sample_clickstream):
        """Test windowed aggregation logic."""
        from streaming.spark_processor import EcommerceStreamProcessor
        
        processor = EcommerceStreamProcessor({})
        processor.spark = spark
        
        # Add more data points for aggregation
        events = []
        for i in range(100):
            event = sample_clickstream[0].copy()
            event['event_id'] = f"evt_{i:03d}"
            event['timestamp'] = (
                datetime.utcnow() - timedelta(minutes=i)
            ).isoformat()
            events.append(event)
        
        df = spark.createDataFrame(events)
        
        # Test aggregation
        windowed = processor.calculate_realtime_metrics(df)
        
        # Verify aggregation produced results
        assert windowed.count() > 0
    
    @patch('redis.Redis')
    def test_redis_cache_write(self, mock_redis):
        """Test Redis cache operations."""
        from cache.redis_manager import RedisCacheManager
        
        mock_client = Mock()
        mock_redis.return_value = mock_client
        mock_client.pipeline.return_value.execute.return_value = [True, True]
        
        config = {
            'redis_host': 'localhost',
            'redis_port': 6379,
            'ssl': False
        }
        
        cache_manager = RedisCacheManager(config)
        
        metrics = {
            'event_count': 150,
            'unique_users': 45,
            'unique_sessions': 52,
            'event_type': 'page_view'
        }
        
        result = cache_manager.cache_realtime_metrics(metrics, "2024-01-15T10:00:00")
        
        assert result == True
        mock_client.pipeline.assert_called()
    
    def test_data_quality_checks(self, spark, sample_clickstream):
        """Test data quality validation."""
        df = spark.createDataFrame(sample_clickstream)
        
        # Check for null values in critical fields
        assert df.filter("event_id IS NULL").count() == 0
        assert df.filter("user_id IS NULL").count() == 0
        assert df.filter("timestamp IS NULL").count() == 0
        
        # Check data types
        assert dict(df.dtypes)['event_id'] == 'string'
        assert dict(df.dtypes)['user_id'] == 'string'
        
        # Check for valid event types
        valid_types = ['PAGE_VIEW', 'ADD_TO_CART', 'PURCHASE', 'SEARCH']
        invalid = df.filter(~df.event_type.isin(valid_types)).count()
        assert invalid == 0
    
    def test_end_to_end_latency(self, spark, sample_clickstream):
        """Test end-to-end processing latency."""
        import time
        
        start_time = time.time()
        
        # Simulate processing
        df = spark.createDataFrame(sample_clickstream)
        
        # Perform typical transformations
        processed = df.withColumn("processed_at", 
            spark.sql.functions.current_timestamp())
        
        count = processed.count()
        
        end_time = time.time()
        latency_ms = (end_time - start_time) * 1000
        
        # Verify latency is within acceptable range (< 100ms for small dataset)
        assert latency_ms < 100
        assert count == len(sample_clickstream)

Cost Analysis

Monthly Cost Breakdown (Production)

ComponentSpecificationMonthly Cost
EKS Cluster3 node groups, 15 instances$2,400
MSK (Kafka)3 brokers (kafka.m5.2xlarge)$1,800
ElastiCache Redis3 nodes (cache.r6g.large)$900
RDS PostgreSQLdb.r6g.xlarge, Multi-AZ$700
S3 Storage5TB with lifecycle policies$115
Data TransferCross-AZ and internet$200
CloudWatchLogs, metrics, dashboards$150
Total$6,265

Cost Optimization Strategies

💡

Tip: Implement these strategies to reduce costs by 30-40%:

  1. Reserved Instances: 1-year commitment saves 40% on EC2/EKS
  2. Spot Instances: Use for Spark workers (70% savings)
  3. Right-sizing: Monitor and adjust instance types monthly
  4. S3 Lifecycle: Auto-transition to cheaper storage classes
  5. Auto-scaling: Scale down during off-peak hours

ROI Analysis

MetricBeforeAfterImprovement
Data Latency4 hours< 1 minute240x faster
Dashboard UpdatesDailyReal-timeContinuous
Fraud DetectionBatch (24h)Real-time (500ms)172,800x faster
Manual Reporting20 hrs/week0 hrs/week100% reduction

Interview Talking Points

Architecture Decisions

ℹ️

Best Practice: When discussing this project in interviews, focus on these key decisions:

  1. Why Kafka over Kinesis?

    • Better ecosystem and tooling
    • Multi-datacenter replication
    • Exactly-once semantics with transactions
    • Schema Registry for data governance
  2. Why Spark Streaming over Flink?

    • Unified batch and stream processing
    • Better integration with data lake formats
    • Mature MLlib for on-stream ML inference
    • SQL interface for business users
  3. Why Redis for caching?

    • Sub-millisecond latency for dashboards
    • Native support for time-series data
    • Pub/Sub for real-time alerting
    • Cluster mode for horizontal scaling

Key Technical Concepts

# Demonstrate understanding of these concepts:
concepts = {
    "Exactly-Once Semantics": "Kafka transactions + Spark checkpointing",
    "Watermarking": "Handling late-arriving data with 2-minute watermark",
    "Backpressure": "Spark's rate limiter prevents overwhelming sinks",
    "Micro-batching": "30-second intervals balance latency vs throughput",
    "Data Skew": "Salted keys for even distribution in aggregations",
    "Fault Tolerance": "Checkpointing + WAL for recovery guarantees"
}

Performance Metrics to Highlight

  • Throughput: 100K+ events/second sustained
  • Latency: P99 < 500ms end-to-end
  • Availability: 99.95% uptime SLA
  • Data Loss: Zero with exactly-once guarantees
  • Recovery Time: < 5 minutes from checkpoint

Common Follow-up Questions

  1. "How do you handle schema evolution?"

    • Use Avro with Schema Registry
    • Backward/forward compatibility modes
    • Schema versioning and compatibility checks
  2. "How do you monitor pipeline health?"

    • Custom metrics with Prometheus
    • Grafana dashboards for real-time visibility
    • Alerting on lag, throughput, and error rates
    • Dead letter queues for failed events
  3. "How do you test streaming pipelines?"

    • Unit tests with local Spark
    • Integration tests with test containers
    • Load testing with synthetic data
    • Chaos engineering for resilience

Deployment Checklist

  • Provision infrastructure with Terraform
  • Configure Kafka topics and retention policies
  • Deploy Spark applications to Kubernetes
  • Set up Redis cluster and replication
  • Initialize PostgreSQL with TimescaleDB
  • Configure monitoring and alerting
  • Run load tests and validate performance
  • Set up CI/CD pipeline for deployments
  • Document operational runbooks
  • Train operations team on monitoring

⚠️

Warning: Ensure all security configurations are applied before production deployment, including encryption at rest/in transit, VPC peering, and IAM roles.


This project demonstrates production-grade real-time data engineering skills and is suitable for senior data engineering interviews at top tech companies.

Advertisement