πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Real-time Streaming Pipelines on AWS

AWS Data EngineeringKinesis to Lambda to S3 to Athena Pipeline⭐ Premium

Advertisement

⚑ Real-time Streaming Pipelines

Master Kinesis-Lambda-S3-Athena real-time streaming architecture and patterns.

Module: AWS Data Engineering β€’ Topic 17 of 65 β€’ Premium Content

Streaming Pipeline Architecture

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚         REAL-TIME STREAMING PIPELINE: Kinesis β†’ Lambda β†’ S3 β†’ Athena        β”‚
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚  DATA SOURCES (Producers)                                           β”‚    β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”‚    β”‚
β”‚  β”‚  β”‚ IoT      β”‚  β”‚ Web Apps β”‚  β”‚ Mobile   β”‚  β”‚ Logs     β”‚          β”‚    β”‚
β”‚  β”‚  β”‚ Sensors  β”‚  β”‚ Events   β”‚  β”‚ Events   β”‚  β”‚ Streams  β”‚          β”‚    β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜          β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚          β–Ό              β–Ό              β–Ό              β–Ό                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚  KINESIS DATA STREAMS (Ingestion)                                   β”‚    β”‚
β”‚  β”‚                                                                     β”‚    β”‚
β”‚  β”‚  Shard 0        Shard 1        Shard 2        Shard 3              β”‚    β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚    β”‚
β”‚  β”‚  β”‚ 1MB/s in β”‚   β”‚ 1MB/s in β”‚   β”‚ 1MB/s in β”‚   β”‚ 1MB/s in β”‚       β”‚    β”‚
β”‚  β”‚  β”‚ 2MB/s outβ”‚   β”‚ 2MB/s outβ”‚   β”‚ 2MB/s outβ”‚   β”‚ 2MB/s outβ”‚       β”‚    β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚    β”‚
β”‚  β”‚                                                                     β”‚    β”‚
β”‚  β”‚  Retention: 24 hours (extendable to 365)                            β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚                                β”‚                                           β”‚
β”‚                                β–Ό                                           β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚  LAMBDA (Processing)                                                 β”‚    β”‚
β”‚  β”‚                                                                     β”‚    β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚    β”‚
β”‚  β”‚  β”‚  Event Source Mapping                                         β”‚  β”‚    β”‚
β”‚  β”‚  β”‚  β€’ Batch Size: 100-1000 records                               β”‚  β”‚    β”‚
β”‚  β”‚  β”‚  β€’ Batch Window: 60 seconds                                   β”‚  β”‚    β”‚
β”‚  β”‚  β”‚  β€’ Parallelization Factor: 10                                 β”‚  β”‚    β”‚
β”‚  β”‚  β”‚  β€’ Maximum Batching: Enabled                                  β”‚  β”‚    β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚    β”‚
β”‚  β”‚                                                                     β”‚    β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚    β”‚
β”‚  β”‚  β”‚  Processing Logic                                             β”‚  β”‚    β”‚
β”‚  β”‚  β”‚  β€’ Parse and validate records                                 β”‚  β”‚    β”‚
β”‚  β”‚  β”‚  β€’ Enrich with reference data                                 β”‚  β”‚    β”‚
β”‚  β”‚  β”‚  β€’ Aggregate (windowed)                                       β”‚  β”‚    β”‚
β”‚  β”‚  β”‚  β€’ Filter invalid records                                     β”‚  β”‚    β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚                                β”‚                                           β”‚
β”‚                                β–Ό                                           β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚  S3 (Storage)                                                        β”‚    β”‚
β”‚  β”‚                                                                     β”‚    β”‚
β”‚  β”‚  s3://realtime-data/                                                β”‚    β”‚
β”‚  β”‚  β”œβ”€β”€ raw/                (Ingested records)                         β”‚    β”‚
β”‚  β”‚  β”‚   └── {date}/{hour}/  (Hourly partitions)                       β”‚    β”‚
β”‚  β”‚  β”œβ”€β”€ processed/          (Transformed)                              β”‚    β”‚
β”‚  β”‚  β”‚   └── {date}/{hour}/  (Hourly partitions)                       β”‚    β”‚
β”‚  β”‚  └── aggregated/         (Windowed aggregates)                      β”‚    β”‚
β”‚  β”‚      └── {date}/{hour}/  (Hourly aggregates)                       β”‚    β”‚
β”‚  β”‚                                                                     β”‚    β”‚
β”‚  β”‚  Format: Parquet (compressed)                                       β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚                                β”‚                                           β”‚
β”‚              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                         β”‚
β”‚              β–Ό                 β–Ό                 β–Ό                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚
β”‚  β”‚  Athena         β”‚  β”‚  QuickSight     β”‚  β”‚  Lambda         β”‚            β”‚
β”‚  β”‚  (Ad-hoc)       β”‚  β”‚  (Dashboards)   β”‚  β”‚  (Alerts)       β”‚            β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β”‚
β”‚                                                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚  MONITORING                                                         β”‚    β”‚
β”‚  β”‚  CloudWatch Metrics: IteratorAge, GetRecords.IteratorAgeMillisecondsβ”‚    β”‚
β”‚  β”‚  CloudWatch Alarms: Throttles, Errors, IteratorAge > threshold     β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Lambda Processing Code

import json
import base64
import boto3
from datetime import datetime

s3 = boto3.client('s3')

def lambda_handler(event, context):
    """Process Kinesis records and write to S3."""
    
    processed_records = []
    
    for record in event['Records']:
        # Decode Kinesis data
        payload = base64.b64decode(record['kinesis']['data'])
        data = json.loads(payload)
        
        # Process and transform
        processed = process_record(data)
        processed_records.append(processed)
    
    # Write batch to S3
    if processed_records:
        write_to_s3(processed_records)
    
    return {'processed': len(processed_records)}

def process_record(data):
    """Transform and enrich record."""
    return {
        'event_id': data.get('event_id'),
        'user_id': data.get('user_id'),
        'event_type': data.get('event_type'),
        'timestamp': datetime.now().isoformat(),
        'processed_at': datetime.now().isoformat()
    }

def write_to_s3(records):
    """Write batch of records to S3 as Parquet."""
    import pandas as pd
    from io import BytesIO
    
    df = pd.DataFrame(records)
    
    # Partition by date and hour
    now = datetime.now()
    key = f"processed/year={now.year}/month={now.month:02d}/day={now.day:02d}/hour={now.hour:02d}/{now.strftime('%Y%m%d%H%M%S')}.parquet"
    
    buffer = BytesIO()
    df.to_parquet(buffer, index=False, compression='snappy')
    
    s3.put_object(
        Bucket='realtime-data-lake',
        Key=key,
        Body=buffer.getvalue()
    )

Interview Q&A

Q1: What is IteratorAge in Kinesis?

Answer: IteratorAge measures the lag between the latest record in the stream and the last record processed by the consumer. High iterator age indicates the consumer is falling behind.

Q2: How do you handle backpressure in streaming?

Answer: Use batch window configuration, increase parallelization factor, optimize Lambda execution time, or use Kinesis Enhanced Fan-Out.

Q3: What is the difference between at-least-once and exactly-once delivery?

Answer: At-least-once may process duplicates; exactly-once processes each record once. Use idempotent processing or DynamoDB deduplication for exactly-once semantics.

Summary

  • Architecture: Kinesis β†’ Lambda β†’ S3 β†’ Athena/QuickSight
  • Key Metrics: IteratorAge, Throttles, Errors
  • Best Practices: Idempotent processing, partition by time, use Parquet
  • Cost Optimization: Right-size batch size, use provisioned concurrency for critical paths

Advertisement