πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

AWS Streaming Analytics Interview Questions

AWS Data EngineeringReal-Time Streaming & Event Processing Interview Preparation⭐ Premium

Advertisement

⚑ AWS Streaming Analytics Interview

Comprehensive interview preparation for real-time streaming, event processing, and streaming analytics on AWS.

Module: AWS Data Engineering β€’ Topic 52 of 65 β€’ Premium Content

Streaming Architecture Overview

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    AWS Streaming Architecture                       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                     β”‚
β”‚  Producers             Stream Processing        Consumers          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚
β”‚  β”‚ IoT      │────────▢│ Kinesis Data     │────▢│ Lambda   β”‚       β”‚
β”‚  β”‚ Devices  β”‚         β”‚ Streams          β”‚     β”‚          β”‚       β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚
β”‚                                                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚
β”‚  β”‚ Apps     │────────▢│ MSK (Kafka)      │────▢│ Flink    β”‚       β”‚
β”‚  β”‚ Servers  β”‚         β”‚                  β”‚     β”‚ Analyticsβ”‚       β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚
β”‚                                                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚
β”‚  β”‚ Databases│────────▢│ EventBridge      │────▢│ S3/Lake  β”‚       β”‚
β”‚  β”‚ (CDC)    β”‚         β”‚                  β”‚     β”‚          β”‚       β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚
β”‚                                                                     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Q1: Compare Kinesis Data Streams vs MSK (Managed Kafka). When would you choose each?

Answer:

FeatureKinesis Data StreamsMSK (Kafka)
ProtocolKinesis APIKafka API
Retention24h - 365dConfigurable
ThroughputPer shard (1MB/s)Per broker
Cost ModelPer shard-hourPer broker-hour
EcosystemAWS-nativeOpen-source

Choose Kinesis when:

  • AWS-native integration needed
  • Simple producer/consumer patterns
  • Quick setup required
  • Cost is primary concern

Choose MSK when:

  • Existing Kafka expertise
  • Complex stream processing
  • Need Kafka Connect/Streams
  • Multi-cloud portability
Architecture Diagram
Decision Flow:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Do you have existing Kafka code?        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                 β”‚
        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”
        β–Ό                 β–Ό
       Yes                No
        β”‚                 β”‚
        β–Ό                 β–Ό
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚   MSK     β”‚   β”‚ Need Kafka        β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚ Connect/Streams?  β”‚
                  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            β”‚
                 β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                 β–Ό                     β–Ό
                Yes                    No
                 β”‚                     β”‚
                 β–Ό                     β–Ό
           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
           β”‚   MSK     β”‚        β”‚  Kinesis  β”‚
           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Q2: Design a real-time fraud detection system using AWS streaming services.

Answer:

Architecture:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Real-Time Fraud Detection Architecture             β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  Transaction Sources                                           β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”‚
β”‚  β”‚ POS Systems β”‚    β”‚ Mobile Apps β”‚    β”‚ E-commerce  β”‚        β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜        β”‚
β”‚         β”‚                  β”‚                  β”‚                 β”‚
β”‚         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                β”‚
β”‚                            β–Ό                                    β”‚
β”‚                   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                               β”‚
β”‚                   β”‚   Kinesis   β”‚                               β”‚
β”‚                   β”‚   Streams   β”‚                               β”‚
β”‚                   β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                               β”‚
β”‚                          β”‚                                      β”‚
β”‚              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                         β”‚
β”‚              β–Ό           β–Ό           β–Ό                         β”‚
β”‚     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”‚
β”‚     β”‚ Kinesis     β”‚ β”‚ Lambda  β”‚ β”‚ Kinesis     β”‚              β”‚
β”‚     β”‚ Analytics   β”‚ β”‚ (Rules) β”‚ β”‚ Data Firehoseβ”‚              β”‚
β”‚     β”‚ (Flink)     β”‚ β”‚         β”‚ β”‚             β”‚              β”‚
β”‚     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜              β”‚
β”‚            β”‚              β”‚             β”‚                      β”‚
β”‚            β–Ό              β–Ό             β–Ό                      β”‚
β”‚     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”‚
β”‚     β”‚ Fraud       β”‚ β”‚ DynamoDBβ”‚ β”‚ S3 Data     β”‚              β”‚
β”‚     β”‚ Scores      β”‚ β”‚ State   β”‚ β”‚ Lake        β”‚              β”‚
β”‚     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β”‚
β”‚            β”‚                                                   β”‚
β”‚            β–Ό                                                   β”‚
β”‚     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                           β”‚
β”‚     β”‚ Alert       β”‚                                           β”‚
β”‚     β”‚ System      β”‚                                           β”‚
β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                           β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Kinesis Analytics Flink SQL:

-- Detect suspicious patterns
SELECT 
    customer_id,
    COUNT(*) as transaction_count,
    SUM(amount) as total_amount,
    TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start
FROM transactions
WHERE amount > 1000
GROUP BY customer_id, TUMBLE(event_time, INTERVAL '5' MINUTE)
HAVING COUNT(*) > 3 OR SUM(amount) > 10000;

Lambda Fraud Scoring:

import boto3
from datetime import datetime, timedelta

def lambda_handler(event, context):
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('customer_profiles')
    
    for record in event['Records']:
        transaction = json.loads(record['kinesis']['data'])
        
        # Get customer history
        response = table.get_item(Key={'customer_id': transaction['customer_id']})
        profile = response.get('Item', {})
        
        # Calculate fraud score
        fraud_score = calculate_fraud_score(transaction, profile)
        
        if fraud_score > 0.8:
            send_alert(transaction, fraud_score)

⚠️

Key Interview Point: Always discuss state management in streaming. For fraud detection, you need to maintain customer profiles and transaction history across windows.

Q3: How do you handle backpressure in Kinesis Data Streams?

Answer:

Backpressure Causes:

  • Consumer processing slower than producer
  • Network bandwidth limitations
  • Downstream system throttling

Mitigation Strategies:

1. Enhanced Fan-Out:

# Use enhanced fan-out for dedicated throughput
kinesis = boto3.client('kinesis')

kinesis.register_stream_consumer(
    StreamARN='arn:aws:kinesis:us-east-1:123456789:stream/my-stream',
    ConsumerName='my-consumer'
)

2. Shard Splitting:

# Split hot shards
kinesis.update_shard_count(
    StreamName='my-stream',
    TargetShardCount=16,
    ScalingType='UNIFORM_SCALING'
)

3. Batch Processing Optimization:

# Optimize batch size and window
consumer_config = {
    'max_batch_size': 10000,
    'batch_window_seconds': 5,
    'parallelism': 8
}

4. Circuit Breaker Pattern:

class CircuitBreaker:
    def __init__(self, failure_threshold=5, reset_timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.state = 'CLOSED'
    
    def call(self, func, *args, **kwargs):
        if self.state == 'OPEN':
            if time.time() - self.last_failure > self.reset_timeout:
                self.state = 'HALF_OPEN'
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            if self.state == 'HALF_OPEN':
                self.state = 'CLOSED'
                self.failure_count = 0
            return result
        except Exception as e:
            self.failure_count += 1
            if self.failure_count >= self.failure_threshold:
                self.state = 'OPEN'
                self.last_failure = time.time()
            raise

Q4: Explain exactly-once processing semantics in AWS streaming services.

Answer:

Exactly-Once Challenges:

  • Network failures causing duplicate delivery
  • Consumer crashes mid-processing
  • Reprocessing during recovery

AWS Solutions:

1. Kinesis + DynamoDB Transactions:

import boto3
from boto3.dynamodb.conditions import Key

dynamodb = boto3.client('dynamodb')

def process_with_exactly_once(record):
    sequence_number = record['kinesis']['sequenceNumber']
    
    # Check if already processed
    response = dynamodb.get_item(
        TableName='processed_records',
        Key={'sequence_number': {'S': sequence_number}}
    )
    
    if 'Item' in response:
        return  # Already processed
    
    # Process and mark as processed in transaction
    dynamodb.transact_write_items(
        TransactItems=[
            {
                'Put': {
                    'TableName': 'processed_records',
                    'Item': {
                        'sequence_number': {'S': sequence_number},
                        'processed_at': {'S': datetime.now().isoformat()}
                    }
                }
            },
            {
                'Put': {
                    'TableName': 'results',
                    'Item': {
                        'result_id': {'S': record['result_id']},
                        'data': {'S': json.dumps(record['data'])}
                    }
                }
            }
        ]
    )

2. MSK + Kafka Transactions:

// Kafka producer with transactions
Properties props = new Properties();
props.put("transactional.id", "my-transactional-id");
props.put("enable.idempotence", "true");

Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

producer.beginTransaction();
try {
    for (ConsumerRecord<String, String> record : records) {
        producer.send(new ProducerRecord<>("output-topic", 
            record.key(), record.value()));
    }
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

3. Idempotent Writes to S3:

# Use content hash for idempotent writes
import hashlib

def write_idempotent(data, bucket, key):
    content_hash = hashlib.md5(json.dumps(data, sort_keys=True).encode()).hexdigest()
    s3_key = f"{key}/{content_hash}.json"
    
    s3 = boto3.client('s3')
    s3.put_object(
        Bucket=bucket,
        Key=s3_key,
        Body=json.dumps(data)
    )

Q5: Design a real-time analytics dashboard using AWS streaming services.

Answer:

Architecture:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Real-Time Analytics Dashboard Architecture         β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  Data Sources                                                  β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”‚
β”‚  β”‚ Clickstream β”‚    β”‚ API Logs    β”‚    β”‚ IoT Sensors β”‚        β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜        β”‚
β”‚         β”‚                  β”‚                  β”‚                 β”‚
β”‚         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                β”‚
β”‚                            β–Ό                                    β”‚
β”‚                   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                               β”‚
β”‚                   β”‚   Kinesis   β”‚                               β”‚
β”‚                   β”‚ Data Firehoseβ”‚                               β”‚
β”‚                   β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                               β”‚
β”‚                          β”‚                                      β”‚
β”‚              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                         β”‚
β”‚              β–Ό                       β–Ό                         β”‚
β”‚     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                   β”‚
β”‚     β”‚   S3 (Raw)  β”‚         β”‚ Kinesis     β”‚                   β”‚
β”‚     β”‚             β”‚         β”‚ Analytics   β”‚                   β”‚
β”‚     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                   β”‚
β”‚            β”‚                       β”‚                            β”‚
β”‚            β–Ό                       β–Ό                            β”‚
β”‚     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                   β”‚
β”‚     β”‚   Athena    β”‚         β”‚ Lambda      β”‚                   β”‚
β”‚     β”‚ (Ad-hoc)    β”‚         β”‚ (Real-time) β”‚                   β”‚
β”‚     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                   β”‚
β”‚            β”‚                       β”‚                            β”‚
β”‚            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                            β”‚
β”‚                        β–Ό                                        β”‚
β”‚                 β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                β”‚
β”‚                 β”‚ QuickSight  β”‚                                β”‚
β”‚                 β”‚ Dashboard   β”‚                                β”‚
β”‚                 β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Real-Time Aggregation with Lambda:

import boto3
from datetime import datetime

cloudwatch = boto3.client('cloudwatch')

def lambda_handler(event, context):
    # Aggregate metrics in real-time
    metrics = {}
    
    for record in event['Records']:
        data = json.loads(record['kinesis']['data'])
        metric_name = data['metric_name']
        
        if metric_name not in metrics:
            metrics[metric_name] = {'count': 0, 'sum': 0, 'min': float('inf'), 'max': float('-inf')}
        
        metrics[metric_name]['count'] += 1
        metrics[metric_name]['sum'] += data['value']
        metrics[metric_name]['min'] = min(metrics[metric_name]['min'], data['value'])
        metrics[metric_name]['max'] = max(metrics[metric_name]['max'], data['value'])
    
    # Publish to CloudWatch
    for metric_name, values in metrics.items():
        cloudwatch.put_metric_data(
            Namespace='RealTimeAnalytics',
            MetricData=[
                {
                    'MetricName': metric_name,
                    'Dimensions': [
                        {'Name': 'TimeWindow', 'Value': '1-minute'}
                    ],
                    'StatisticValues': {
                        'Sum': values['sum'],
                        'Minimum': values['min'],
                        'Maximum': values['max'],
                        'SampleCount': values['count']
                    },
                    'Unit': 'Count'
                }
            ]
        )

Q6: How do you implement schema evolution in Kinesis Data Streams?

Answer:

Schema Registry Integration:

# Using Glue Schema Registry with Kinesis
import boto3
from aws_kinesis_agg import record_handler

# Register schema
glue = boto3.client('glue')

schema_definition = {
    'type': 'record',
    'name': 'Transaction',
    'fields': [
        {'name': 'transaction_id', 'type': 'string'},
        {'name': 'amount', 'type': 'double'},
        {'name': 'timestamp', 'type': 'long'}
    ]
}

glue.register_schema_version(
    SchemaId={
        'RegistryName': 'my-registry',
        'SchemaName': 'transaction-schema'
    },
    SchemaVersionNumber={
        'LatestVersion': True
    }
)

Consumer Schema Handling:

class SchemaEvolutionHandler:
    def __init__(self):
        self.schema_versions = {}
    
    def process_record(self, record):
        schema_version = record['metadata']['schema_version']
        
        # Get or cache schema
        if schema_version not in self.schema_versions:
            self.schema_versions[schema_version] = self.fetch_schema(schema_version)
        
        schema = self.schema_versions[schema_version]
        
        # Handle missing fields with defaults
        for field in schema['fields']:
            if field['name'] not in record['data']:
                record['data'][field['name']] = self.get_default_value(field['type'])
        
        return record

Schema Evolution Strategies:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Schema Evolution Strategies                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  Forward Compatibility                                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ Producer adds new field β†’ Consumer ignores unknown      β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  Backward Compatibility                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ Producer removes field β†’ Consumer uses default value    β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  Full Compatibility                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ Both directions work with default values                β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Q7: Explain windowing strategies in Kinesis Analytics.

Answer:

Window Types:

1. Tumbling Windows (Fixed):

-- 5-minute tumbling windows
SELECT 
    product_id,
    COUNT(*) as sales_count,
    SUM(amount) as total_sales,
    TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start
FROM sales_stream
GROUP BY product_id, TUMBLE(event_time, INTERVAL '5' MINUTE);

2. Sliding Windows:

-- 10-minute window sliding every 1 minute
SELECT 
    customer_id,
    AVG(amount) as avg_amount,
    SLIDESTART(event_time, INTERVAL '10' MINUTE, INTERVAL '1' MINUTE) as window_start
FROM transactions
GROUP BY customer_id, SLIDE(event_time, INTERVAL '10' MINUTE, INTERVAL '1' MINUTE);

3. Session Windows:

-- Session windows with 30-minute gap
SELECT 
    user_id,
    COUNT(*) as actions,
    SESSIONSTART(event_time, INTERVAL '30' MINUTE) as session_start
FROM user_events
GROUP BY user_id, SESSION(event_time, INTERVAL '30' MINUTE);

4. Custom Windows with Lambda:

# Custom windowing logic
class CustomWindow:
    def __init__(self, window_size, slide_interval):
        self.window_size = window_size
        self.slide_interval = slide_interval
        self.windows = {}
    
    def add_event(self, event):
        window_key = self.get_window_key(event['timestamp'])
        
        if window_key not in self.windows:
            self.windows[window_key] = []
        
        self.windows[window_key].append(event)
        
        # Trigger processing for complete windows
        if self.is_window_complete(window_key):
            self.process_window(window_key, self.windows[window_key])

Windowing Comparison:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Windowing Strategies Comparison                    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  Tumbling Windows:                                             β”‚
β”‚  |----W1----|----W2----|----W3----|                            β”‚
β”‚  (Non-overlapping, fixed size)                                 β”‚
β”‚                                                                 β”‚
β”‚  Sliding Windows:                                              β”‚
β”‚  |----W1----|                                                   β”‚
β”‚     |----W2----|                                                β”‚
β”‚        |----W3----|                                             β”‚
β”‚  (Overlapping, slide interval < window size)                   β”‚
β”‚                                                                 β”‚
β”‚  Session Windows:                                              β”‚
β”‚  |--W1--|     |--W2---------|     |--W3--|                     β”‚
β”‚  (Dynamic size based on activity)                              β”‚
β”‚                                                                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Q8: How do you implement dead letter queues for streaming failures?

Answer:

DLQ Architecture:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              DLQ Pattern for Streaming                          β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                               β”‚
β”‚  β”‚   Kinesis   β”‚                                               β”‚
β”‚  β”‚   Stream    β”‚                                               β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                                               β”‚
β”‚         β”‚                                                       β”‚
β”‚         β–Ό                                                       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚  β”‚  Consumer   │────▢│  Processing │────▢│  Success    β”‚      β”‚
β”‚  β”‚  Lambda     β”‚     β”‚  Function   β”‚     β”‚  Output     β”‚      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β”‚                             β”‚                                   β”‚
β”‚                        Failureβ”‚                                 β”‚
β”‚                             β–Ό                                   β”‚
β”‚                      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                           β”‚
β”‚                      β”‚  DLQ        β”‚                           β”‚
β”‚                      β”‚  (SQS)      β”‚                           β”‚
β”‚                      β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                           β”‚
β”‚                             β”‚                                   β”‚
β”‚                             β–Ό                                   β”‚
β”‚                      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                           β”‚
β”‚                      β”‚  Alert &    β”‚                           β”‚
β”‚                      β”‚  Retry      β”‚                           β”‚
β”‚                      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                           β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Implementation:

import boto3
import json

sqs = boto3.client('sqs')
dlq_url = 'https://sqs.us-east-1.amazonaws.com/123456789/my-dlq'

def lambda_handler(event, context):
    for record in event['Records']:
        try:
            process_record(record)
        except Exception as e:
            # Send to DLQ with error details
            sqs.send_message(
                QueueUrl=dlq_url,
                MessageBody=json.dumps({
                    'record': record,
                    'error': str(e),
                    'timestamp': datetime.now().isoformat(),
                    'retry_count': 0
                }),
                MessageAttributes={
                    'ErrorType': {
                        'DataType': 'String',
                        'StringValue': type(e).__name__
                    }
                }
            )

DLQ Processing Lambda:

def process_dlq(event, context):
    for record in event['Records']:
        message = json.loads(record['body'])
        
        # Check retry count
        if message['retry_count'] < 3:
            # Retry processing
            try:
                process_record(message['record'])
            except Exception as e:
                # Update retry count and send back to DLQ
                message['retry_count'] += 1
                message['error'] = str(e)
                sqs.send_message(
                    QueueUrl=dlq_url,
                    MessageBody=json.dumps(message)
                )
        else:
            # Max retries exceeded, alert
            send_alert(message)

Q9: Design a real-time ETL pipeline using Kinesis Data Firehose.

Answer:

Architecture:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Real-Time ETL with Kinesis Data Firehose           β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                               β”‚
β”‚  β”‚   Data      β”‚                                               β”‚
β”‚  β”‚   Sources   β”‚                                               β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                                               β”‚
β”‚         β”‚                                                       β”‚
β”‚         β–Ό                                                       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚  β”‚   Kinesis   │────▢│  Firehose   │────▢│  Lambda     β”‚      β”‚
β”‚  β”‚ Data Firehoseβ”‚    β”‚  Stream     β”‚     β”‚  Transform  β”‚      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜      β”‚
β”‚                                                  β”‚              β”‚
β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚                    β”‚             β”‚               β”‚       β”‚     β”‚
β”‚                    β–Ό             β–Ό               β–Ό       β–Ό     β”‚
β”‚             β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”β”‚
β”‚             β”‚   S3    β”‚   β”‚ Redshiftβ”‚   β”‚ OpenSearchβ”‚β”‚Splunk β”‚β”‚
β”‚             β”‚ (Data   β”‚   β”‚ (DW)    β”‚   β”‚ (Search)  β”‚β”‚       β”‚β”‚
β”‚             β”‚  Lake)  β”‚   β”‚         β”‚   β”‚           β”‚β”‚       β”‚β”‚
β”‚             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Firehose Configuration:

firehose = boto3.client('firehose')

# Create delivery stream with transformation
firehose.create_delivery_stream(
    DeliveryStreamName='real-time-etl-stream',
    DeliveryStreamType='DirectPut',
    ExtendedS3DestinationConfiguration={
        'RoleARN': 'arn:aws:iam::role/firehose-role',
        'BucketARN': 'arn:aws:s3:::data-lake-bucket',
        'Prefix': 'raw/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/',
        'ErrorOutputPrefix': 'errors/',
        'ProcessingConfiguration': {
            'Enabled': True,
            'Processors': [
                {
                    'Type': 'Lambda',
                    'Parameters': [
                        {
                            'ParameterName': 'LambdaArn',
                            'ParameterValue': 'arn:aws:lambda:us-east-1:123456789:function:transform-function'
                        },
                        {
                            'ParameterName': 'BufferSizeInMBs',
                            'ParameterValue': '3'
                        },
                        {
                            'ParameterName': 'BufferIntervalInSeconds',
                            'ParameterValue': '60'
                        }
                    ]
                }
            ]
        }
    }
)

Lambda Transformation:

import base64
import json

def lambda_handler(event, context):
    output = []
    
    for record in event['records']:
        # Decode input
        payload = base64.b64decode(record['data']).decode('utf-8')
        data = json.loads(payload)
        
        # Transform
        transformed = {
            'event_id': data['id'],
            'event_type': data['type'].upper(),
            'timestamp': data['ts'],
            'processed_at': datetime.now().isoformat(),
            'amount': float(data['amount'])
        }
        
        # Encode output
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(json.dumps(transformed).encode('utf-8')).decode('utf-8')
        }
        output.append(output_record)
    
    return {'records': output}

Q10: How do you implement real-time data enrichment in streaming pipelines?

Answer:

Enrichment Patterns:

1. Lambda-Based Enrichment:

import boto3

dynamodb = boto3.resource('dynamodb')
enrichment_table = dynamodb.Table('customer_profiles')

def lambda_handler(event, context):
    enriched_records = []
    
    for record in event['Records']:
        data = json.loads(record['kinesis']['data'])
        
        # Enrich with customer data
        response = enrichment_table.get_item(
            Key={'customer_id': data['customer_id']}
        )
        
        customer = response.get('Item', {})
        
        enriched_record = {
            **data,
            'customer_name': customer.get('name', 'Unknown'),
            'customer_tier': customer.get('tier', 'Standard'),
            'enriched_at': datetime.now().isoformat()
        }
        
        enriched_records.append(enriched_record)
    
    return {'records': enriched_records}

2. DynamoDB Streams Enrichment:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Real-Time Enrichment Architecture                  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚  β”‚   Kinesis   │────▢│  Lambda     │────▢│  Enriched   β”‚      β”‚
β”‚  β”‚   Stream    β”‚     β”‚  Enrichment β”‚     β”‚  Stream     β”‚      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β”‚                             β”‚                                   β”‚
β”‚                        Queryβ”‚                                   β”‚
β”‚                             β–Ό                                   β”‚
β”‚                      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                           β”‚
β”‚                      β”‚  DynamoDB   β”‚                           β”‚
β”‚                      β”‚  (Lookup)   β”‚                           β”‚
β”‚                      β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                           β”‚
β”‚                             β”‚                                   β”‚
β”‚                             β–Ό                                   β”‚
β”‚                      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                           β”‚
β”‚                      β”‚  DynamoDB   β”‚                           β”‚
β”‚                      β”‚  Streams    β”‚                           β”‚
β”‚                      β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                           β”‚
β”‚                             β”‚                                   β”‚
β”‚                        Updateβ”‚                                  β”‚
β”‚                             β–Ό                                   β”‚
β”‚                      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                           β”‚
β”‚                      β”‚  Cache      β”‚                           β”‚
β”‚                      β”‚  (ElastiCache)                          β”‚
β”‚                      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                           β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

3. Cache-Enhanced Enrichment:

import redis
import json

redis_client = redis.Redis(host='my-redis-cluster.xxx.cache.amazonaws.com', port=6379)

def lambda_handler(event, context):
    for record in event['Records']:
        data = json.loads(record['kinesis']['data'])
        
        # Check cache first
        cache_key = f"customer:{data['customer_id']}"
        cached = redis_client.get(cache_key)
        
        if cached:
            customer = json.loads(cached)
        else:
            # Fetch from DynamoDB
            customer = fetch_from_dynamodb(data['customer_id'])
            # Cache for 5 minutes
            redis_client.setex(cache_key, 300, json.dumps(customer))
        
        # Enrich and continue processing
        enriched = {**data, **customer}
        process_enriched(enriched)

Q11: Explain backfill strategies for streaming data.

Answer:

Backfill Architecture:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Backfill Strategy                                   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  Historical Data        Backfill Process      Current Data     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚   S3        │──────▢│   EMR/Glue  │────▢│   S3        β”‚    β”‚
β”‚  β”‚  (Archive)  β”‚       β”‚  (Reprocess)β”‚     β”‚  (Updated)  β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚                                                                 β”‚
β”‚                            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                     β”‚
β”‚                            β”‚   Kinesis   β”‚                     β”‚
β”‚  Current Stream ──────────▢│   Streams   β”‚                     β”‚
β”‚                            β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                     β”‚
β”‚                                   β”‚                            β”‚
β”‚                                   β–Ό                            β”‚
β”‚                            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                     β”‚
β”‚                            β”‚  Unified    β”‚                     β”‚
β”‚                            β”‚  View       β”‚                     β”‚
β”‚                            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Implementation:

def backfill_stream(backfill_start, backfill_end):
    """Backfill historical data into current stream"""
    
    # Read historical data from S3
    historical_data = read_from_s3(
        bucket='archive-bucket',
        start_date=backfill_start,
        end_date=backfill_end
    )
    
    # Transform to match current format
    transformed = transform_to_current_format(historical_data)
    
    # Send to Kinesis stream
    kinesis = boto3.client('kinesis')
    
    for batch in chunk(transformed, 500):
        kinesis.put_records(
            StreamName='current-stream',
            Records=[
                {
                    'Data': json.dumps(record).encode('utf-8'),
                    'PartitionKey': record['partition_key']
                }
                for record in batch
            ]
        )

Idempotent Backfill:

def idempotent_backfill(record):
    """Ensure backfill doesn't create duplicates"""
    
    # Check if record already exists
    response = dynamodb.get_item(
        TableName='processed_records',
        Key={'record_id': record['id']}
    )
    
    if 'Item' in response:
        return  # Skip, already processed
    
    # Process and mark as processed
    process_record(record)
    
    dynamodb.put_item(
        TableName='processed_records',
        Item={
            'record_id': record['id'],
            'processed_at': datetime.now().isoformat(),
            'source': 'backfill'
        }
    )

Q12: How do you implement real-time anomaly detection in streaming data?

Answer:

Anomaly Detection Architecture:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Real-Time Anomaly Detection                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                               β”‚
β”‚  β”‚   Kinesis   β”‚                                               β”‚
β”‚  β”‚   Stream    β”‚                                               β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                                               β”‚
β”‚         β”‚                                                       β”‚
β”‚         β–Ό                                                       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚  β”‚  Kinesis    │────▢│  Anomaly    │────▢│  Alert      β”‚      β”‚
β”‚  β”‚  Analytics  β”‚     β”‚  Detection  β”‚     β”‚  System     β”‚      β”‚
β”‚  β”‚  (Flink)    β”‚     β”‚  (ML Model) β”‚     β”‚             β”‚      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β”‚                             β”‚                                   β”‚
β”‚                             β–Ό                                   β”‚
β”‚                      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                           β”‚
β”‚                      β”‚  S3         β”‚                           β”‚
β”‚                      β”‚  (Anomalies)β”‚                           β”‚
β”‚                      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                           β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Statistical Anomaly Detection:

class AnomalyDetector:
    def __init__(self, window_size=100, threshold=3.0):
        self.window_size = window_size
        self.threshold = threshold
        self.window = []
    
    def add_value(self, value):
        self.window.append(value)
        if len(self.window) > self.window_size:
            self.window.pop(0)
    
    def is_anomaly(self, value):
        if len(self.window) < 10:
            return False
        
        mean = sum(self.window) / len(self.window)
        std = (sum((x - mean) ** 2 for x in self.window) / len(self.window)) ** 0.5
        
        if std == 0:
            return False
        
        z_score = (value - mean) / std
        return abs(z_score) > self.threshold

Kinesis Analytics Anomaly Detection:

-- Statistical anomaly detection
WITH stats AS (
    SELECT 
        metric_name,
        AVG(value) as avg_value,
        STDDEV(value) as std_value,
        COUNT(*) as sample_count
    FROM metrics_stream
    GROUP BY metric_name
)
SELECT 
    m.metric_name,
    m.value,
    s.avg_value,
    s.std_value,
    CASE 
        WHEN ABS(m.value - s.avg_value) > 3 * s.std_value THEN 'ANOMALY'
        ELSE 'NORMAL'
    END as anomaly_flag
FROM metrics_stream m
JOIN stats s ON m.metric_name = s.metric_name;

ℹ️

Interview Tip: Discuss both statistical methods (z-score, moving average) and ML-based approaches (Isolation Forest, Autoencoders) for anomaly detection.

Q13: Design a multi-region streaming architecture for global applications.

Answer:

Global Streaming Architecture:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Multi-Region Streaming Architecture                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚  Region 1 (us-east-1)        Region 2 (eu-west-1)                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                   β”‚
β”‚  β”‚   Kinesis Stream    │◀──▢│   Kinesis Stream    β”‚                   β”‚
β”‚  β”‚   (Primary)         β”‚    β”‚   (Replica)         β”‚                   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                   β”‚
β”‚             β”‚                          β”‚                               β”‚
β”‚             β–Ό                          β–Ό                               β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                   β”‚
β”‚  β”‚   Processing        β”‚    β”‚   Processing        β”‚                   β”‚
β”‚  β”‚   (Lambda/Flink)    β”‚    β”‚   (Lambda/Flink)    β”‚                   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                   β”‚
β”‚             β”‚                          β”‚                               β”‚
β”‚             β–Ό                          β–Ό                               β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                   β”‚
β”‚  β”‚   S3 (Regional)     β”‚    β”‚   S3 (Regional)     β”‚                   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                   β”‚
β”‚             β”‚                          β”‚                               β”‚
β”‚             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                               β”‚
β”‚                        β–Ό                                               β”‚
β”‚             β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                   β”‚
β”‚             β”‚   Global Redshift   β”‚                                   β”‚
β”‚             β”‚   (Replicated)      β”‚                                   β”‚
β”‚             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Cross-Region Replication:

# Kinesis Mirror Maker for cross-region
def replicate_streams(source_region, dest_region, stream_name):
    source_kinesis = boto3.client('kinesis', region_name=source_region)
    dest_kinesis = boto3.client('kinesis', region_name=dest_region)
    
    # Get shard iterator
    response = source_kinesis.describe_stream(StreamName=stream_name)
    shard_id = response['StreamDescription']['Shards'][0]['ShardId']
    
    iterator = source_kinesis.get_shard_iterator(
        StreamName=stream_name,
        ShardId=shard_id,
        ShardIteratorType='LATEST'
    )['ShardIterator']
    
    while True:
        records = source_kinesis.get_records(ShardIterator=iterator, Limit=100)
        
        # Replicate to destination
        dest_kinesis.put_records(
            StreamName=stream_name,
            Records=[
                {
                    'Data': record['Data'],
                    'PartitionKey': record['PartitionKey']
                }
                for record in records['Records']
            ]
        )
        
        iterator = records['NextShardIterator']
        if not iterator:
            break

Q14: How do you implement exactly-once semantics with Kinesis and Lambda?

Answer:

Exactly-Once Pattern:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Exactly-Once with Kinesis + Lambda                  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚  β”‚   Kinesis   │────▢│   Lambda    │────▢│  DynamoDB   β”‚      β”‚
β”‚  β”‚   Stream    β”‚     β”‚  (Idempotent)β”‚    β”‚  (State)    β”‚      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜      β”‚
β”‚                             β”‚                    β”‚              β”‚
β”‚                        Checkβ”‚               Writeβ”‚              β”‚
β”‚                             β–Ό                    β–Ό              β”‚
β”‚                      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚                      β”‚  DynamoDB   β”‚     β”‚  S3         β”‚      β”‚
β”‚                      β”‚  (Sequence  β”‚     β”‚  (Output)   β”‚      β”‚
β”‚                      β”‚   Check)    β”‚     β”‚             β”‚      β”‚
β”‚                      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Implementation:

import boto3
import json

dynamodb = boto3.resource('dynamodb')
state_table = dynamodb.Table('processing_state')

def lambda_handler(event, context):
    for record in event['Records']:
        sequence_number = record['kinesis']['sequenceNumber']
        shard_id = record['kinesis']['partitionKey']
        
        # Check if already processed
        response = state_table.get_item(
            Key={
                'shard_id': shard_id,
                'sequence_number': sequence_number
            }
        )
        
        if 'Item' in response:
            print(f"Record {sequence_number} already processed, skipping")
            continue
        
        # Process record
        data = json.loads(record['kinesis']['data'])
        result = process_data(data)
        
        # Write result and state atomically
        state_table.transact_write_items(
            TransactItems=[
                {
                    'Put': {
                        'TableName': 'processing_state',
                        'Item': {
                            'shard_id': shard_id,
                            'sequence_number': sequence_number,
                            'processed_at': datetime.now().isoformat()
                        }
                    }
                },
                {
                    'Put': {
                        'TableName': 'results',
                        'Item': {
                            'result_id': data['id'],
                            'result': result,
                            'processed_at': datetime.now().isoformat()
                        }
                    }
                }
            ]
        )

Q15: Explain stream processing with exactly-once guarantees using MSK.

Answer:

Kafka Exactly-Once Pattern:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Kafka Exactly-Once Architecture                     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                               β”‚
β”‚  β”‚   Producer  β”‚                                               β”‚
β”‚  β”‚  (Idempotent)β”‚                                              β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                                               β”‚
β”‚         β”‚                                                       β”‚
β”‚         β–Ό                                                       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚  β”‚  Kafka      │────▢│  Consumer   │────▢│  Kafka      β”‚      β”‚
β”‚  β”‚  (Input)    β”‚     β”‚  (Transaction)β”‚   β”‚  (Output)   β”‚      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β”‚                             β”‚                                   β”‚
β”‚                        Commitβ”‚                                  β”‚
β”‚                             β–Ό                                   β”‚
β”‚                      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                           β”‚
β”‚                      β”‚  Consumer   β”‚                           β”‚
β”‚                      β”‚  Offsets    β”‚                           β”‚
β”‚                      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                           β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Spring Kafka Transactional Producer:

@Configuration
public class KafkaConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txn-");
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        return new DefaultKafkaProducerFactory<>(config);
    }
    
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

@Service
public class TransactionalProcessor {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @Transactional
    public void processAndProduce(List<ConsumerRecord<String, String>> records) {
        kafkaTemplate.executeInTransaction(template -> {
            for (ConsumerRecord<String, String> record : records) {
                // Process record
                String result = processRecord(record);
                
                // Send to output topic
                template.send("output-topic", record.key(), result);
            }
            return null;
        });
    }
}

Q16: How do you monitor and troubleshoot streaming pipelines?

Answer:

Monitoring Dashboard:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Streaming Pipeline Monitoring                      β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ Key Metrics                                             β”‚   β”‚
β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚  β”‚ β”‚ Incoming    β”‚ Outgoing    β”‚ Iterator    β”‚ Processingβ”‚ β”‚   β”‚
β”‚  β”‚ β”‚ Records/s   β”‚ Records/s   β”‚ Age (hours) β”‚ Latency   β”‚ β”‚   β”‚
β”‚  β”‚ β”‚   1,250     β”‚   1,200     β”‚    0.5      β”‚   150ms   β”‚ β”‚   β”‚
β”‚  β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ Shard-Level Metrics                                    β”‚   β”‚
β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚  β”‚ β”‚ Shard 0 β”‚ Shard 1 β”‚ Shard 2 β”‚ Shard 3 β”‚ Shard 4    β”‚ β”‚   β”‚
β”‚  β”‚ β”‚ 250/s   β”‚ 280/s   β”‚ 220/s   β”‚ 240/s   β”‚ 260/s      β”‚ β”‚   β”‚
β”‚  β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ Error Analysis                                          β”‚   β”‚
β”‚  β”‚ β€’ Lambda Errors: 12 (0.1%)                            β”‚   β”‚
β”‚  β”‚ β€’ DLQ Messages: 5                                      β”‚   β”‚
β”‚  β”‚ β€’ Throttling Events: 23                                β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

CloudWatch Alarms:

cloudwatch = boto3.client('cloudwatch')

# Iterator age alarm
cloudwatch.put_metric_alarm(
    AlarmName='KinesisIteratorAge',
    MetricName='GetRecords.IteratorAgeMilliseconds',
    Namespace='AWS/Kinesis',
    Statistic='Maximum',
    Period=300,
    EvaluationPeriods=2,
    Threshold=3600000,  # 1 hour in milliseconds
    ComparisonOperator='GreaterThanThreshold',
    AlarmActions=['arn:aws:sns:us-east-1:123456789:alerts'],
    Dimensions=[
        {'Name': 'StreamName', 'Value': 'my-stream'}
    ]
)

# Throttling alarm
cloudwatch.put_metric_alarm(
    AlarmName='KinesisThrottling',
    MetricName='ReadProvisionedThroughputExceeded',
    Namespace='AWS/Kinesis',
    Statistic='Sum',
    Period=300,
    EvaluationPeriods=1,
    Threshold=10,
    ComparisonOperator='GreaterThanThreshold',
    AlarmActions=['arn:aws:sns:us-east-1:123456789:alerts'],
    Dimensions=[
        {'Name': 'StreamName', 'Value': 'my-stream'}
    ]
)

Q17: Design a stream processing application with state management.

Answer:

State Management Patterns:

1. External State Store (DynamoDB):

class StatefulProcessor:
    def __init__(self):
        self.dynamodb = boto3.resource('dynamodb')
        self.state_table = self.dynamodb.Table('processing_state')
    
    def process_with_state(self, record):
        # Get current state
        response = self.state_table.get_item(
            Key={'key': record['key']}
        )
        
        current_state = response.get('Item', {'count': 0, 'sum': 0})
        
        # Update state
        new_state = {
            'count': current_state['count'] + 1,
            'sum': current_state['sum'] + record['value'],
            'last_updated': datetime.now().isoformat()
        }
        
        # Save state
        self.state_table.put_item(Item={
            'key': record['key'],
            **new_state
        })
        
        return new_state

2. Windowed State with Kinesis Analytics:

-- Maintain running aggregates
SELECT 
    customer_id,
    COUNT(*) OVER w as running_count,
    SUM(amount) OVER w as running_sum,
    AVG(amount) OVER w as running_avg
FROM transactions
WINDOW w AS (
    PARTITION BY customer_id 
    ORDER BY event_time 
    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
);

3. State Backend Architecture:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              State Management Architecture                      β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                               β”‚
β”‚  β”‚   Stream    β”‚                                               β”‚
β”‚  β”‚   Processor β”‚                                               β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                                               β”‚
β”‚         β”‚                                                       β”‚
β”‚         β–Ό                                                       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                   State Backend Options                  β”‚   β”‚
β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”‚
β”‚  β”‚ β”‚  DynamoDB   β”‚ ElastiCache β”‚    S3       β”‚  Local    β”‚ β”‚   β”‚
β”‚  β”‚ β”‚  (Durable)  β”‚  (Fast)     β”‚ (Bulk)      β”‚ (Testing) β”‚ β”‚   β”‚
β”‚  β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Q18: How do you implement event sourcing with AWS streaming services?

Answer:

Event Sourcing Architecture:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Event Sourcing with AWS                             β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  Commands          Event Store           Projections            β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”‚
β”‚  β”‚  API    │─────▢│  Kinesis    │─────▢│  Read Model β”‚        β”‚
β”‚  β”‚ Gateway β”‚      β”‚  Streams    β”‚      β”‚  (DynamoDB) β”‚        β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β”‚
β”‚                          β”‚                                      β”‚
β”‚                          β”‚         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”‚
β”‚                          β”œβ”€β”€β”€β”€β”€β”€β”€β”€β–Άβ”‚  S3 (Event  β”‚             β”‚
β”‚                          β”‚         β”‚  Archive)   β”‚             β”‚
β”‚                          β”‚         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β”‚
β”‚                          β”‚                                      β”‚
β”‚                          β”‚         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”‚
β”‚                          └────────▢│  Analytics  β”‚             β”‚
β”‚                                    β”‚  (Athena)   β”‚             β”‚
β”‚                                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Event Store Implementation:

class EventStore:
    def __init__(self, stream_name):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def append(self, aggregate_id, events):
        for event in events:
            self.kinesis.put_record(
                StreamName=self.stream_name,
                Data=json.dumps({
                    'aggregate_id': aggregate_id,
                    'event_type': event['type'],
                    'payload': event['payload'],
                    'timestamp': datetime.now().isoformat(),
                    'version': event['version']
                }).encode('utf-8'),
                PartitionKey=aggregate_id
            )
    
    def get_events(self, aggregate_id, from_version=0):
        # Read from stream for this aggregate
        shard_iterator = self.get_shard_iterator(aggregate_id)
        
        events = []
        while True:
            records = self.kinesis.get_records(ShardIterator=shard_iterator)
            
            for record in records['Records']:
                event = json.loads(record['Data'])
                if event['aggregate_id'] == aggregate_id:
                    if event['version'] > from_version:
                        events.append(event)
            
            shard_iterator = records.get('NextShardIterator')
            if not shard_iterator:
                break
        
        return events

Q19: Explain stream processing patterns for IoT data on AWS.

Answer:

IoT Streaming Architecture:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              IoT Data Streaming Architecture                    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  IoT Devices         Ingestion          Processing             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”‚
β”‚  β”‚ Sensors     │───▢│ IoT Core     │───▢│ Kinesis     β”‚        β”‚
β”‚  β”‚ (MQTT)      β”‚    β”‚              β”‚    β”‚ Streams     β”‚        β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜        β”‚
β”‚                                               β”‚                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”‚                 β”‚
β”‚  β”‚ Gateways    │───▢│ IoT Analytics│───────────                 β”‚
β”‚  β”‚             β”‚    β”‚              β”‚           β”‚                 β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜           β”‚                 β”‚
β”‚                                               β–Ό                 β”‚
β”‚                                        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”‚
β”‚                                        β”‚  Lambda     β”‚         β”‚
β”‚                                        β”‚  (Process)  β”‚         β”‚
β”‚                                        β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜         β”‚
β”‚                                               β”‚                 β”‚
β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚                    β–Ό                          β–Ό          β–Ό     β”‚
β”‚             β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚
β”‚             β”‚   S3        β”‚    β”‚ TimeStream  β”‚ β”‚ DynamoDB   β”‚β”‚
β”‚             β”‚  (Data Lake)β”‚    β”‚ (Metrics)   β”‚ β”‚ (State)    β”‚β”‚
β”‚             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

IoT Core Rules:

-- IoT Rule to route to Kinesis
SELECT 
    timestamp() as event_time,
    topic(2) as device_id,
    temperature,
    humidity,
    pressure
FROM 'sensors/+/data'
WHERE temperature > 50

Lambda IoT Processor:

def lambda_handler(event, context):
    for record in event['Records']:
        payload = json.loads(base64.b64decode(record['kinesis']['data']))
        
        # Process IoT data
        processed = {
            'device_id': payload['device_id'],
            'timestamp': payload['event_time'],
            'temperature': payload['temperature'],
            'alert': payload['temperature'] > 60,
            'processed_at': datetime.now().isoformat()
        }
        
        # Store in TimeStream for metrics
        store_in_timestream(processed)
        
        # Check for alerts
        if processed['alert']:
            send_alert(processed)

Q20: How do you implement data deduplication in streaming pipelines?

Answer:

Deduplication Strategies:

1. Sequence Number Tracking:

class Deduplicator:
    def __init__(self):
        self.processed_sequences = set()
        self.redis = redis.Redis()
    
    def is_duplicate(self, sequence_key, sequence_number):
        # Check memory cache
        if sequence_key in self.processed_sequences:
            return True
        
        # Check Redis for distributed deduplication
        if self.redis.sismember('processed_sequences', sequence_key):
            return True
        
        # Mark as processed
        self.processed_sequences.add(sequence_key)
        self.redis.sadd('processed_sequences', sequence_key)
        
        return False

2. Content Hash Deduplication:

import hashlib

class ContentDeduplicator:
    def __init__(self):
        self.seen_hashes = set()
    
    def is_duplicate(self, record):
        # Create content hash
        content = json.dumps(record, sort_keys=True)
        content_hash = hashlib.md5(content.encode()).hexdigest()
        
        if content_hash in self.seen_hashes:
            return True
        
        self.seen_hashes.add(content_hash)
        return False

3. Window-Based Deduplication:

class WindowDeduplicator:
    def __init__(self, window_size=300):  # 5 minutes
        self.window_size = window_size
        self.window = {}
    
    def is_duplicate(self, key, timestamp):
        # Clean old entries
        cutoff = time.time() - self.window_size
        self.window = {k: v for k, v in self.window.items() if v > cutoff}
        
        # Check for duplicate
        if key in self.window:
            return True
        
        self.window[key] = timestamp
        return False

Q21: Design a real-time feature store for machine learning.

Answer:

Feature Store Architecture:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Real-Time Feature Store                            β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  Data Sources          Feature Engineering    Feature Store    β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ Clickstream │─────▢│  Kinesis    │──────▢│ DynamoDB    β”‚  β”‚
β”‚  β”‚             β”‚      β”‚  Analytics  β”‚       β”‚ (Online)    β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ Transactions│─────▢│  Lambda     │──────▢│ S3          β”‚  β”‚
β”‚  β”‚             β”‚      β”‚  Features   β”‚       β”‚ (Offline)   β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚                                                                 β”‚
β”‚                            β”‚                    β”‚              β”‚
β”‚                            β–Ό                    β–Ό              β”‚
β”‚                     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚                     β”‚  SageMaker  │◀──────│  Glue       β”‚     β”‚
β”‚                     β”‚  (Training) β”‚       β”‚  Catalog    β”‚     β”‚
β”‚                     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Feature Engineering Lambda:

def lambda_handler(event, context):
    features = []
    
    for record in event['Records']:
        data = json.loads(record['kinesis']['data'])
        
        # Compute real-time features
        feature_vector = {
            'user_id': data['user_id'],
            'timestamp': data['timestamp'],
            'features': {
                'transaction_count_1h': get_transaction_count(data['user_id'], '1h'),
                'avg_amount_24h': get_avg_amount(data['user_id'], '24h'),
                'unique_products_7d': get_unique_products(data['user_id'], '7d'),
                'last_purchase_time': get_last_purchase(data['user_id'])
            }
        }
        
        features.append(feature_vector)
    
    # Store in DynamoDB for online serving
    store_online_features(features)
    
    # Store in S3 for offline training
    store_offline_features(features)

Q22: How do you handle late data and out-of-order events in streaming?

Answer:

Handling Strategies:

1. Watermark-Based Processing:

-- Kinesis Analytics watermark
SELECT 
    customer_id,
    COUNT(*) as event_count,
    TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start
FROM events
GROUP BY customer_id, TUMBLE(event_time, INTERVAL '5' MINUTE);

2. Lambda Architecture for Late Data:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Lambda Architecture for Late Data                  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  Real-Time Path (Speed Layer)                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”‚
β”‚  β”‚   Stream    │───▢│  Real-Time  │───▢│  Real-Time  β”‚        β”‚
β”‚  β”‚   Events    β”‚    β”‚  Processing β”‚    β”‚  View       β”‚        β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β”‚
β”‚                                                                 β”‚
β”‚  Batch Path (Batch Layer)                                      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”‚
β”‚  β”‚   All       │───▢│  Batch      │───▢│  Batch      β”‚        β”‚
β”‚  β”‚   Events    β”‚    β”‚  Processing β”‚    β”‚  View       β”‚        β”‚
β”‚  β”‚   (S3)      β”‚    β”‚  (EMR)      β”‚    β”‚             β”‚        β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β”‚
β”‚                                                                 β”‚
β”‚  Serving Layer                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚              Unified View (Redshift/Athena)              β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

3. Out-of-Order Buffer:

class OutOfOrderBuffer:
    def __init__(self, max_delay_seconds=300):
        self.max_delay = max_delay_seconds
        self.buffer = {}
    
    def add_event(self, event):
        event_time = event['timestamp']
        current_time = time.time()
        
        # Check if too late
        if current_time - event_time > self.max_delay:
            # Send to late data handler
            self.handle_late_event(event)
            return
        
        # Add to buffer
        window_key = self.get_window_key(event_time)
        if window_key not in self.buffer:
            self.buffer[window_key] = []
        
        self.buffer[window_key].append(event)
        
        # Process complete windows
        self.process_complete_windows()

Q23: Explain exactly-once semantics with Kinesis and Step Functions.

Answer:

Step Functions Orchestration:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Exactly-Once with Step Functions                   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                               β”‚
β”‚  β”‚   Start     β”‚                                               β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                                               β”‚
β”‚         β”‚                                                       β”‚
β”‚         β–Ό                                                       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚  β”‚  Get Records│────▢│  Process    │────▢│  Write      β”‚      β”‚
β”‚  β”‚  (Kinesis)  β”‚     β”‚  Records   β”‚     β”‚  Results    β”‚      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜      β”‚
β”‚                                                  β”‚              β”‚
β”‚                                            Successβ”‚             β”‚
β”‚                                                  β–Ό              β”‚
β”‚                                           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚                                           β”‚  Update     β”‚      β”‚
β”‚                                           β”‚  Checkpoint β”‚      β”‚
β”‚                                           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Step Functions Definition:

{
  "StartAt": "GetKinesisRecords",
  "States": {
    "GetKinesisRecords": {
      "Type": "Task",
      "Resource": "arn:aws:states:::kinesis:getRecords",
      "Parameters": {
        "ShardIterator.$": "$.shard_iterator",
        "Limit": 100
      },
      "Next": "ProcessRecords"
    },
    "ProcessRecords": {
      "Type": "Map",
      "ItemsPath": "$.Records",
      "MaxConcurrency": 10,
      "Iterator": {
        "StartAt": "ProcessSingleRecord",
        "States": {
          "ProcessSingleRecord": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:process-record",
            "Retry": [
              {
                "ErrorEquals": ["States.TaskFailed"],
                "MaxAttempts": 3,
                "BackoffRate": 2
              }
            ],
            "End": true
          }
        }
      },
      "Next": "Checkpoint"
    },
    "Checkpoint": {
      "Type": "Task",
      "Resource": "arn:aws:dynamodb:putItem",
      "Parameters": {
        "TableName": "checkpoints",
        "Item": {
          "stream_name": {"S": "my-stream"},
          "shard_id": {"S.$": "$.shard_id"},
          "sequence_number": {"S.$": "$.Records[-1].sequenceNumber"}
        }
      },
      "End": true
    }
  }
}

Q24: How do you implement stream processing with schema validation?

Answer:

Schema Validation Layer:

from jsonschema import validate, ValidationError

# Define schema
transaction_schema = {
    "type": "object",
    "properties": {
        "transaction_id": {"type": "string"},
        "amount": {"type": "number", "minimum": 0},
        "currency": {"type": "string", "pattern": "^[A-Z]{3}$"},
        "timestamp": {"type": "integer"}
    },
    "required": ["transaction_id", "amount", "currency", "timestamp"]
}

class SchemaValidator:
    def __init__(self, schema):
        self.schema = schema
    
    def validate(self, record):
        try:
            validate(instance=record, schema=self.schema)
            return True, None
        except ValidationError as e:
            return False, str(e)

# Usage in Lambda
def lambda_handler(event, context):
    validator = SchemaValidator(transaction_schema)
    
    valid_records = []
    invalid_records = []
    
    for record in event['Records']:
        data = json.loads(record['kinesis']['data'])
        is_valid, error = validator.validate(data)
        
        if is_valid:
            valid_records.append(data)
        else:
            invalid_records.append({
                'record': data,
                'error': error,
                'timestamp': datetime.now().isoformat()
            })
    
    # Process valid records
    process_valid_records(valid_records)
    
    # Send invalid records to DLQ
    if invalid_records:
        send_to_dlq(invalid_records)

Glue Schema Registry:

# Use Glue Schema Registry for schema validation
from aws_kinesis_agg.kinesis_record import KinesisRecord

def process_with_schema_validation(record):
    # Get schema from registry
    schema = get_schema_from_registry(record['schema_version'])
    
    # Validate
    if not validate_against_schema(record['data'], schema):
        raise SchemaValidationError("Record doesn't match schema")
    
    # Process
    return process_record(record)

Q25: Design a cost-optimized streaming architecture.

Answer:

Cost Optimization Strategies:

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Cost-Optimized Streaming Architecture              β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  Tier 1: Hot Path (Real-Time)                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ β€’ Kinesis Data Streams (shard optimization)             β”‚   β”‚
β”‚  β”‚ β€’ Lambda (right-sized memory)                           β”‚   β”‚
β”‚  β”‚ β€’ DynamoDB (on-demand for variable workloads)           β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  Tier 2: Warm Path (Near Real-Time)                           β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ β€’ Kinesis Data Firehose (batching)                      β”‚   β”‚
β”‚  β”‚ β€’ S3 (Standard β†’ IA after 30 days)                     β”‚   β”‚
β”‚  β”‚ β€’ Athena (pay-per-query)                                β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  Tier 3: Cold Path (Batch)                                    β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ β€’ S3 Glacier (archival)                                 β”‚   β”‚
β”‚  β”‚ β€’ EMR Spot instances                                    β”‚   β”‚
β”‚  β”‚ β€’ Redshift Reserved instances                           β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Cost Calculation Example:

# Estimate streaming costs
def estimate_monthly_cost(
    records_per_second,
    average_record_size_kb,
    retention_hours
):
    # Kinesis costs
    shard_count = max(1, int((records_per_second * average_record_size_kb * 1024) / (1024 * 1024)))
    kinesis_cost = shard_count * 0.015 * 730  # hours in month
    
    # Lambda costs
    monthly_requests = records_per_second * 86400 * 30
    lambda_cost = (monthly_requests / 1000000) * 0.20
    
    # DynamoDB costs (if using for state)
    write_units = records_per_second * 86400 * 30
    dynamodb_cost = (write_units / 1000000) * 1.25
    
    return {
        'kinesis': kinesis_cost,
        'lambda': lambda_cost,
        'dynamodb': dynamodb_cost,
        'total': kinesis_cost + lambda_cost + dynamodb_cost
    }

Shard Optimization:

# Optimize shard count based on throughput
def optimize_shards(records_per_second, avg_record_size_bytes):
    # Calculate required throughput
    required_throughput_mb = (records_per_second * avg_record_size_bytes) / (1024 * 1024)
    
    # Each shard handles 1MB/s write, 2MB/s read
    write_shards = int(required_throughput_mb) + 1
    
    # Consider read throughput
    read_shards = int(required_throughput_mb / 2) + 1
    
    return max(write_shards, read_shards)

ℹ️

Key Interview Point: Always discuss cost optimization in streaming. Mention shard splitting/merging for Kinesis, reserved capacity for DynamoDB, and S3 lifecycle policies for data tiering.

Summary

Mastering AWS streaming analytics requires understanding:

  • Service Selection: Kinesis vs MSK vs Firehose based on use case
  • Processing Patterns: Exactly-once, windowing, state management
  • Architecture Design: Multi-region, cost optimization, monitoring
  • Data Quality: Schema validation, deduplication, late data handling
  • Real-Time ML: Feature stores, anomaly detection, online inference

These concepts form the foundation for building scalable, reliable, and cost-effective real-time data processing systems on AWS.

Advertisement