API Gateway Design: Rate Limiting, Auth, Caching
Difficulty: Senior Level | Companies: Netflix, Stripe, Twilio, AWS, Google Cloud
Interview Question
"Design an API Gateway handling 100,000 RPS with rate limiting, authentication, request/response transformation, and caching. How do you handle hotspots and ensure sub-10ms latency?"
โน๏ธKey Concepts
This question tests your understanding of API management, performance optimization, and scalable gateway patterns.
Complete API Gateway Architecture
Architecture Overview
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ API GATEWAY ARCHITECTURE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโโโโโ CLIENT LAYER โโโโโโโโโโโโโโโโโโโโโ โ
โ โ Web Apps โ Mobile Apps โ IoT Devices โ Partners โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโ EDGE LAYER โโโโโโโโโโโโโโโโโโโโโโ โ
โ โ CloudFront โ WAF โ Shield โ DDoS Protection โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโ GATEWAY LAYER โโโโโโโโโโโโโโโโโโโ โ
โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ API Gateway โ โ โ
โ โ โ โ โ โ
โ โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ โ
โ โ โ โ Auth โ โ Rate โ โ Cache โ โ โ โ
โ โ โ โ Layer โ โ Limiter โ โ Layer โ โ โ โ
โ โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ โ
โ โ โ โ โ โ
โ โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ โ
โ โ โ โ Request โ โ Responseโ โ Logging โ โ โ โ
โ โ โ โ Transformโ โ Transformโ โ & Audit โ โ โ โ
โ โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ โ
โ โ โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโ SERVICE LAYER โโโโโโโโโโโโโโโโโโ โ
โ โ Microservices โ Lambda Functions โ External APIs โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Mathematical Foundation: Rate Limiting
Token Bucket Algorithm:
- Bucket capacity: C tokens
- Refill rate: R tokens/second
- Time interval: T seconds
- Tokens available: B(t) = min(C, B(t-1) + R ร T)
- Request allowed if: B(t) โฅ 1
Sliding Window Algorithm:
- Window size: W seconds
- Max requests: M
- Current window count: N(t) = sum(requests in [t-W, t])
- Request allowed if: N(t) < M
Rate Limiting for 100K RPS:
- Total capacity: 100,000 requests/second
- Per-user limit: 100 requests/second
- Concurrent users: 1,000
- Buffer size: 10% = 10,000 requests
API Gateway Implementation
AWS API Gateway Configuration
# API Gateway REST API
resource "aws_api_gateway_rest_api" "main" {
name = "main-api-gateway"
description = "Main API Gateway for production services"
endpoint_configuration {
types = ["REGIONAL"]
}
policy = data.aws_iam_policy_document.api_gateway_policy.json
}
# Resource and methods
resource "aws_api_gateway_resource" "users" {
rest_api_id = aws_api_gateway_rest_api.main.id
parent_id = aws_api_gateway_rest_api.main.root_resource_id
path_part = "users"
}
resource "aws_api_gateway_method" "get_users" {
rest_api_id = aws_api_gateway_rest_api.main.id
resource_id = aws_api_gateway_resource.users.id
http_method = "GET"
authorization = "COGNITO_USER_POOLS"
authorizer_id = aws_api_gateway_authorizer.cognito.id
request_validator_id = aws_api_gateway_request_validator.query_params.id
}
# Integration with Lambda
resource "aws_api_gateway_integration" "users_lambda" {
rest_api_id = aws_api_gateway_rest_api.main.id
resource_id = aws_api_gateway_resource.users.id
http_method = aws_api_gateway_method.get_users.http_method
integration_http_method = "POST"
type = "AWS_PROXY"
uri = aws_lambda_function.users.invoke_arn
}
# Request validator
resource "aws_api_gateway_request_validator" "query_params" {
rest_api_id = aws_api_gateway_rest_api.main.id
name = "validate-query-params"
validate_request_body = false
validate_request_parameters = true
}
# API Key and usage plan for rate limiting
resource "aws_api_gateway_api_key" "client" {
name = "client-api-key"
enabled = true
}
resource "aws_api_gateway_usage_plan" "standard" {
name = "standard-usage-plan"
description = "Standard usage plan with rate limiting"
api_stages {
api_id = aws_api_gateway_rest_api.main.id
stage = aws_api_gateway_stage.production.stage_name
}
throttle_settings {
burst_limit = 2000
rate_limit = 1000
}
quota_settings {
limit = 1000000
period = "DAY"
}
}
resource "aws_api_gateway_usage_plan_key" "client" {
key_id = aws_api_gateway_api_key.client.id
key_type = "API_KEY"
usage_plan_id = aws_api_gateway_usage_plan.standard.id
}
# Custom domain
resource "aws_api_gateway_domain_name" "main" {
domain_name = "api.example.com"
certificate_arn = aws_acm_certificate.api.arn
endpoint_configuration {
types = ["REGIONAL"]
}
}
resource "aws_api_gateway_base_path_mapping" "main" {
domain_name = aws_api_gateway_domain_name.main.domain_name
api_id = aws_api_gateway_rest_api.main.id
stage_name = aws_api_gateway_stage.production.stage_name
}
Rate Limiting Implementation
# Advanced rate limiting with multiple strategies
import time
from typing import Dict, Optional
from dataclasses import dataclass
from collections import defaultdict
import redis
from enum import Enum
class RateLimitStrategy(Enum):
TOKEN_BUCKET = "token_bucket"
SLIDING_WINDOW = "sliding_window"
FIXED_WINDOW = "fixed_window"
LEAKY_BUCKET = "leaky_bucket"
@dataclass
class RateLimitConfig:
requests_per_second: int
burst_capacity: int
strategy: RateLimitStrategy = RateLimitStrategy.TOKEN_BUCKET
window_size: int = 60 # seconds
class RateLimiter:
"""Multi-strategy rate limiter"""
def __init__(self, redis_client: redis.Redis, config: RateLimitConfig):
self.redis = redis_client
self.config = config
def is_allowed(self, client_id: str) -> bool:
"""Check if request is allowed"""
if self.config.strategy == RateLimitStrategy.TOKEN_BUCKET:
return self._token_bucket(client_id)
elif self.config.strategy == RateLimitStrategy.SLIDING_WINDOW:
return self._sliding_window(client_id)
elif self.config.strategy == RateLimitStrategy.FIXED_WINDOW:
return self._fixed_window(client_id)
elif self.config.strategy == RateLimitStrategy.LEAKY_BUCKET:
return self._leaky_bucket(client_id)
return False
def _token_bucket(self, client_id: str) -> bool:
"""Token bucket algorithm"""
key = f"rate_limit:token_bucket:{client_id}"
now = time.time()
# Get current bucket state
bucket = self.redis.hgetall(key)
if not bucket:
# Initialize bucket
self.redis.hset(key, mapping={
'tokens': self.config.burst_capacity,
'last_refill': now
})
self.redis.expire(key, 60)
tokens = self.config.burst_capacity
else:
tokens = float(bucket[b'tokens'])
last_refill = float(bucket[b'last_refill'])
# Refill tokens
elapsed = now - last_refill
refill_amount = elapsed * self.config.requests_per_second
tokens = min(self.config.burst_capacity, tokens + refill_amount)
if tokens >= 1:
# Consume token
self.redis.hset(key, mapping={
'tokens': tokens - 1,
'last_refill': now
})
return True
return False
def _sliding_window(self, client_id: str) -> bool:
"""Sliding window algorithm"""
key = f"rate_limit:sliding_window:{client_id}"
now = time.time()
window_start = now - self.config.window_size
# Use Redis sorted set
pipe = self.redis.pipeline()
pipe.zremrangebyscore(key, 0, window_start)
pipe.zcard(key)
pipe.zadd(key, {str(now): now})
pipe.expire(key, self.config.window_size)
results = pipe.execute()
current_count = results[1]
return current_count < self.config.requests_per_second * self.config.window_size
def _fixed_window(self, client_id: str) -> bool:
"""Fixed window algorithm"""
current_window = int(time.time() / self.config.window_size)
key = f"rate_limit:fixed_window:{client_id}:{current_window}"
current_count = self.redis.incr(key)
if current_count == 1:
self.redis.expire(key, self.config.window_size)
return current_count <= self.config.requests_per_second * self.config.window_size
def _leaky_bucket(self, client_id: str) -> bool:
"""Leaky bucket algorithm"""
key = f"rate_limit:leaky_bucket:{client_id}"
now = time.time()
bucket = self.redis.hgetall(key)
if not bucket:
self.redis.hset(key, mapping={
'water': 0,
'last_leak': now
})
self.redis.expire(key, 60)
water = 0
else:
water = float(bucket[b'water'])
last_leak = float(bucket[b'last_leak'])
# Leak water
elapsed = now - last_leak
leaked = elapsed * self.config.requests_per_second
water = max(0, water - leaked)
if water < self.config.burst_capacity:
self.redis.hset(key, mapping={
'water': water + 1,
'last_leak': now
})
return True
return False
def get_retry_after(self, client_id: str) -> Optional[int]:
"""Get seconds until next request is allowed"""
if self.config.strategy == RateLimitStrategy.TOKEN_BUCKET:
key = f"rate_limit:token_bucket:{client_id}"
bucket = self.redis.hgetall(key)
if bucket:
tokens = float(bucket[b'tokens'])
if tokens < 1:
return int((1 - tokens) / self.config.requests_per_second)
return None
โ ๏ธRate Limiting
Use distributed rate limiting with Redis for multi-instance deployments. Consider different limits for different API endpoints and user tiers.
Authentication & Authorization
# JWT authentication middleware
import jwt
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
from dataclasses import dataclass
import hashlib
import hmac
@dataclass
class JWTConfig:
secret_key: str
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
refresh_token_expire_days: int = 7
class JWTAuthenticator:
"""JWT authentication handler"""
def __init__(self, config: JWTConfig):
self.config = config
def create_access_token(self, data: Dict[str, Any]) -> str:
"""Create JWT access token"""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(
minutes=self.config.access_token_expire_minutes
)
to_encode.update({
"exp": expire,
"iat": datetime.utcnow(),
"type": "access"
})
return jwt.encode(
to_encode,
self.config.secret_key,
algorithm=self.config.algorithm
)
def create_refresh_token(self, data: Dict[str, Any]) -> str:
"""Create JWT refresh token"""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(
days=self.config.refresh_token_expire_days
)
to_encode.update({
"exp": expire,
"iat": datetime.utcnow(),
"type": "refresh"
})
return jwt.encode(
to_encode,
self.config.secret_key,
algorithm=self.config.algorithm
)
def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
"""Verify JWT token"""
try:
payload = jwt.decode(
token,
self.config.secret_key,
algorithms=[self.config.algorithm]
)
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
class APIKeyManager:
"""API key management"""
def __init__(self):
self.keys: Dict[str, Dict[str, Any]] = {}
def generate_api_key(self, client_id: str, permissions: list) -> str:
"""Generate new API key"""
api_key = hashlib.sha256(
f"{client_id}{datetime.utcnow().isoformat()}".encode()
).hexdigest()
self.keys[api_key] = {
'client_id': client_id,
'permissions': permissions,
'created_at': datetime.utcnow(),
'active': True
}
return api_key
def validate_api_key(self, api_key: str) -> Optional[Dict[str, Any]]:
"""Validate API key"""
key_data = self.keys.get(api_key)
if key_data and key_data['active']:
return key_data
return None
def revoke_api_key(self, api_key: str) -> bool:
"""Revoke API key"""
if api_key in self.keys:
self.keys[api_key]['active'] = False
return True
return False
Request/Response Transformation
# Request and response transformation
from typing import Dict, Any, List, Callable
from dataclasses import dataclass
import json
import re
@dataclass
class TransformRule:
source_field: str
target_field: str
transform: Callable = None
required: bool = False
class RequestTransformer:
"""Transform API requests"""
def __init__(self):
self.rules: Dict[str, List[TransformRule]] = {}
def add_rule(self, endpoint: str, rule: TransformRule):
if endpoint not in self.rules:
self.rules[endpoint] = []
self.rules[endpoint].append(rule)
def transform(self, endpoint: str, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""Apply transformation rules"""
rules = self.rules.get(endpoint, [])
transformed = {}
for rule in rules:
value = request_data.get(rule.source_field)
if rule.required and value is None:
raise ValueError(f"Required field missing: {rule.source_field}")
if value is not None:
if rule.transform:
value = rule.transform(value)
transformed[rule.target_field] = value
return transformed
class ResponseTransformer:
"""Transform API responses"""
def __init__(self):
self.rules: Dict[str, List[TransformRule]] = {}
def add_rule(self, endpoint: str, rule: TransformRule):
if endpoint not in self.rules:
self.rules[endpoint] = []
self.rules[endpoint].append(rule)
def transform(self, endpoint: str, response_data: Dict[str, Any]) -> Dict[str, Any]:
"""Apply transformation rules"""
rules = self.rules.get(endpoint, [])
transformed = {}
for rule in rules:
value = self._get_nested_value(response_data, rule.source_field)
if value is not None:
if rule.transform:
value = rule.transform(value)
transformed[rule.target_field] = value
return transformed
def _get_nested_value(self, data: Dict[str, Any], field_path: str) -> Any:
"""Get value from nested dictionary"""
fields = field_path.split('.')
current = data
for field in fields:
if isinstance(current, dict):
current = current.get(field)
else:
return None
return current
# Example transformations
def transform_user_request(data: Dict[str, Any]) -> Dict[str, Any]:
"""Transform user request"""
transformer = RequestTransformer()
# Transform camelCase to snake_case
transformer.add_rule('/users', TransformRule(
source_field='firstName',
target_field='first_name',
required=True
))
transformer.add_rule('/users', TransformRule(
source_field='lastName',
target_field='last_name',
required=True
))
transformer.add_rule('/users', TransformRule(
source_field='emailAddress',
target_field='email',
transform=lambda x: x.lower()
))
return transformer.transform('/users', data)
def transform_user_response(data: Dict[str, Any]) -> Dict[str, Any]:
"""Transform user response"""
transformer = ResponseTransformer()
# Transform snake_case to camelCase
transformer.add_rule('/users', TransformRule(
source_field='user_id',
target_field='userId'
))
transformer.add_rule('/users', TransformRule(
source_field='first_name',
target_field='firstName'
))
transformer.add_rule('/users', TransformRule(
source_field='created_at',
target_field='createdAt',
transform=lambda x: x.isoformat() if hasattr(x, 'isoformat') else x
))
return transformer.transform('/users', data)
Response Caching
# Multi-layer caching strategy
import redis
import json
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
import hashlib
@dataclass
class CacheConfig:
default_ttl: int = 300 # 5 minutes
max_ttl: int = 3600 # 1 hour
key_prefix: str = "api_cache:"
class APICache:
"""Multi-layer API cache"""
def __init__(self, redis_client: redis.Redis, config: CacheConfig):
self.redis = redis_client
self.config = config
self.local_cache: Dict[str, Any] = {}
def get(self, key: str) -> Optional[Dict[str, Any]]:
"""Get cached response"""
# Check local cache first
if key in self.local_cache:
item = self.local_cache[key]
if item['expires'] > datetime.utcnow():
return item['data']
else:
del self.local_cache[key]
# Check Redis
full_key = f"{self.config.key_prefix}{key}"
cached = self.redis.get(full_key)
if cached:
data = json.loads(cached)
# Populate local cache
self.local_cache[key] = {
'data': data,
'expires': datetime.utcnow() + timedelta(seconds=60)
}
return data
return None
def set(self, key: str, data: Dict[str, Any], ttl: int = None):
"""Cache response"""
if ttl is None:
ttl = self.config.default_ttl
# Set in Redis
full_key = f"{self.config.key_prefix}{key}"
self.redis.setex(
full_key,
ttl,
json.dumps(data, default=str)
)
# Set in local cache
self.local_cache[key] = {
'data': data,
'expires': datetime.utcnow() + timedelta(seconds=min(ttl, 60))
}
def invalidate(self, pattern: str):
"""Invalidate cache by pattern"""
# Clear local cache
keys_to_delete = [
key for key in self.local_cache.keys()
if pattern in key
]
for key in keys_to_delete:
del self.local_cache[key]
# Clear Redis cache
full_pattern = f"{self.config.key_prefix}{pattern}*"
cursor = 0
while True:
cursor, keys = self.redis.scan(
cursor,
match=full_pattern,
count=100
)
if keys:
self.redis.delete(*keys)
if cursor == 0:
break
def generate_cache_key(self, method: str, path: str, params: Dict[str, Any] = None) -> str:
"""Generate cache key from request"""
key_parts = [method, path]
if params:
sorted_params = sorted(params.items())
key_parts.append(json.dumps(sorted_params))
key_string = ':'.join(key_parts)
return hashlib.md5(key_string.encode()).hexdigest()
class CacheMiddleware:
"""Cache middleware for API Gateway"""
def __init__(self, cache: APICache):
self.cache = cache
self.cacheable_methods = {'GET', 'HEAD'}
self.cacheable_status_codes = {200, 203, 204, 206, 300, 301, 404, 410}
def should_cache(self, method: str, status_code: int, headers: Dict[str, str]) -> bool:
"""Determine if response should be cached"""
if method not in self.cacheable_methods:
return False
if status_code not in self.cacheable_status_codes:
return False
# Check cache control headers
cache_control = headers.get('Cache-Control', '')
if 'no-store' in cache_control or 'no-cache' in cache_control:
return False
return True
def get_cache_ttl(self, headers: Dict[str, str]) -> Optional[int]:
"""Get cache TTL from headers"""
cache_control = headers.get('Cache-Control', '')
if 'max-age' in cache_control:
match = re.search(r'max-age=(\d+)', cache_control)
if match:
return int(match.group(1))
return None
โ Caching Strategy
Use multi-level caching: L1 (local memory) for hot data, L2 (Redis) for shared data, L3 (CDN) for static content. Implement cache invalidation carefully.
Request Validation
# Request validation with JSON Schema
from typing import Dict, Any, List
from dataclasses import dataclass
import jsonschema
from jsonschema import validate, ValidationError
@dataclass
class ValidationSchema:
endpoint: str
method: str
schema: Dict[str, Any]
class RequestValidator:
"""Request validation with JSON Schema"""
def __init__(self):
self.schemas: Dict[str, ValidationSchema] = {}
def add_schema(self, endpoint: str, method: str, schema: Dict[str, Any]):
key = f"{method}:{endpoint}"
self.schemas[key] = ValidationSchema(
endpoint=endpoint,
method=method,
schema=schema
)
def validate(self, endpoint: str, method: str, data: Dict[str, Any]) -> List[str]:
"""Validate request data"""
key = f"{method}:{endpoint}"
schema = self.schemas.get(key)
if not schema:
return []
errors = []
try:
validate(instance=data, schema=schema.schema)
except ValidationError as e:
errors.append(f"Validation error: {e.message}")
return errors
# Example schema
USER_SCHEMA = {
"type": "object",
"properties": {
"firstName": {"type": "string", "minLength": 1, "maxLength": 50},
"lastName": {"type": "string", "minLength": 1, "maxLength": 50},
"email": {"type": "string", "format": "email"},
"age": {"type": "integer", "minimum": 13, "maximum": 120}
},
"required": ["firstName", "lastName", "email"],
"additionalProperties": False
}
# Initialize validator
validator = RequestValidator()
validator.add_schema('/users', 'POST', USER_SCHEMA)
# Validate request
errors = validator.validate('/users', 'POST', {
'firstName': 'John',
'lastName': 'Doe',
'email': 'john@example.com',
'age': 30
})
Summary
| Component | Purpose | Implementation |
|---|---|---|
| Rate Limiting | Request throttling | Token bucket, sliding window |
| Authentication | Identity verification | JWT, API keys |
| Authorization | Permission checking | RBAC, ABAC |
| Transformation | Data mapping | Request/response transforms |
| Caching | Response caching | Multi-level caching |
| Validation | Request validation | JSON Schema |