Cloud Security Patterns
Difficulty: Senior Level | Companies: AWS, Google, Microsoft, Netflix, Uber
Zero Trust Architecture
Never trust, always verify. Every request must be authenticated and authorized, regardless of network location.
โน๏ธ
Zero Trust assumes breach. Verify explicitly, use least privilege access, and assume compromise for all resources.
Security Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Zero Trust Boundary โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โ Identity โ โ Device โ โ Network โ โ
โ โ Provider โ โ Trust โ โ Micro- โ โ
โ โ (IdP) โ โ Engine โ โ Segment โ โ
โ โโโโโโฌโโโโโโ โโโโโโฌโโโโโโ โโโโโโฌโโโโโโ โ
โ โ โ โ โ
โ โโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโผโโโโโโโโโ โ
โ โ Policy Engine โ โ
โ โ (OPA / Cedar) โ โ
โ โโโโโโโโโโฌโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ โ โ
โ โผ โผ โผ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โ Service A โ โ Service B โ โ Service C โ โ
โ โ (mTLS) โโโโโถโ (mTLS) โโโโโถโ (mTLS) โ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Pattern 1: Identity-Aware Proxy
Authenticate at the edge, pass identity to services.
// Identity-Aware Proxy implementation
import * as jose from 'jose';
export class IdentityAwareProxy {
private jwksClient: jose.JWTVerifyGetKey;
constructor(jwksUri: string) {
this.jwksClient = jose.createRemoteJWKSet(new URL(jwksUri));
}
async authenticate(req: Request): Promise<IdentityContext> {
const token = req.headers.authorization?.replace('Bearer ', '');
if (!token) {
throw new AuthenticationError('Missing token');
}
const { payload } = await jose.jwtVerify(token, this.jwksClient, {
issuer: 'https://auth.example.com',
audience: 'api://gateway',
});
return {
userId: payload.sub!,
email: payload.email as string,
roles: payload.roles as string[],
permissions: payload.permissions as string[],
tenantId: payload.tenant_id as string,
};
}
async authorize(
identity: IdentityContext,
resource: string,
action: string,
): Promise<boolean> {
// Check if user has required permission
const requiredPermission = `${resource}:${action}`;
return identity.permissions.includes(requiredPermission) ||
identity.permissions.includes(`${resource}:*`) ||
identity.permissions.includes('*:*');
}
}
Pattern 2: Encryption at Rest and in Transit
Encrypt all data everywhere.
# Encryption patterns for different services
import boto3
from cryptography.fernet import Fernet
class EncryptionManager:
def __init__(self):
self.kms = boto3.client('kms')
self.ssm = boto3.client('ssm')
def encrypt_sensitive_field(self, plaintext: str, key_id: str) -> str:
"""Encrypt sensitive data with AWS KMS."""
response = self.kms.encrypt(
KeyId=key_id,
Plaintext=plaintext.encode(),
EncryptionAlgorithm='SYMMETRIC_DEFAULT',
)
return response['CiphertextBlob'].decode('latin-1')
def decrypt_sensitive_field(self, ciphertext: str) -> str:
"""Decrypt sensitive data."""
response = self.kms.decrypt(
CiphertextBlob=ciphertext.encode('latin-1'),
)
return response['Plaintext'].decode()
def get_database_encryption_config(self):
"""Configure RDS encryption at rest."""
return {
'StorageEncrypted': True,
'KmsKeyId': 'alias/rds-encryption-key',
# Enable transit encryption
'DBSecurityGroups': [],
'EnableIAMDatabaseAuthentication': True,
# SSL for connections
'CACertificateIdentifier': 'rds-ca-rsa2048-g1',
}
def get_s3_encryption_config(self):
"""Configure S3 bucket encryption."""
return {
'ServerSideEncryptionConfiguration': {
'Rules': [
{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'aws:kms',
'KMSMasterKeyID': 'alias/s3-encryption-key',
},
'BucketKeyEnabled': True,
}
]
},
'PublicAccessBlockConfiguration': {
'BlockPublicAcls': True,
'BlockPublicPolicy': True,
'IgnorePublicAcls': True,
'RestrictPublicBuckets': True,
},
}
โน๏ธ
Enable bucket keys for S3 KMS encryption to reduce KMS costs by up to 99% for high-throughput buckets.
Pattern 3: Least Privilege IAM
Grant minimum required permissions.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowSpecificS3Access",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": "arn:aws:s3:::app-data-${aws:PrincipalTag/tenant}/*",
"Condition": {
"StringEquals": {
"s3:x-amz-server-side-encryption": "aws:kms"
}
}
},
{
"Sid": "AllowDynamoDBAccess",
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:Query"
],
"Resource": "arn:aws:dynamodb:us-east-1:123456789:table/*",
"Condition": {
"ForAllValues:StringEquals": {
"dynamodb:LeadingKeys": [
"${aws:PrincipalTag/tenant}"
]
}
}
},
{
"Sid": "DenyResourceTagging",
"Effect": "Deny",
"Action": [
"ec2:CreateTags",
"ec2:DeleteTags"
],
"Resource": "*",
"Condition": {
"StringNotEquals": {
"aws:RequestTag/Environment": [
"dev",
"staging",
"prod"
]
}
}
}
]
}
Pattern 4: Security Scanning Pipeline
Automate security checks in CI/CD.
# Security scanning pipeline
name: Security Scan
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
sast:
name: Static Application Security Testing
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run Semgrep
uses: returntocorp/semgrep-action@v1
with:
config: p/owasp-top-ten p/cwe-top-25
- name: Run CodeQL
uses: github/codeql-action/analyze@v2
with:
languages: javascript,python
dependency-scan:
name: Dependency Vulnerability Scan
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run Snyk
uses: snyk/actions/node@master
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
with:
args: --all-projects
- name: Run Trivy (container scan)
uses: aquasecurity/trivy-action@master
with:
image-ref: myapp:${{ github.sha }}
format: 'sarif'
output: 'trivy-results.sarif'
severity: 'CRITICAL,HIGH'
infrastructure-scan:
name: Infrastructure Security Scan
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run Checkov
uses: bridgecrewio/checkov-action@master
with:
directory: terraform/
framework: terraform
soft_fail: false
output_format: cli,sarif
output_file_path: console,checkov-results.sarif
- name: Run tfsec
uses: aquasecurity/tfsec-action@master
with:
working_directory: terraform/
soft_fail: true
โ ๏ธ
Security scanning should not block deployments for all findings. Use severity thresholds: CRITICAL/HIGH block, MEDIUM/LOW alert.
Pattern 5: Secrets Management
Securely manage secrets across services.
# AWS Secrets Manager integration
import boto3
import json
from functools import lru_cache
class SecretsManager:
def __init__(self):
self.client = boto3.client('secretsmanager')
self.cache = {}
@lru_cache(maxsize=128)
def get_secret(self, secret_id: str) -> dict:
"""Get secret with caching."""
if secret_id in self.cache:
return self.cache[secret_id]
response = self.client.get_secret_value(SecretId=secret_id)
secret = json.loads(response['SecretString'])
self.cache[secret_id] = secret
return secret
def get_database_credentials(self, secret_id: str) -> dict:
"""Get database credentials."""
secret = self.get_secret(secret_id)
return {
'host': secret['host'],
'port': secret['port'],
'database': secret['dbname'],
'user': secret['username'],
'password': secret['password'],
'sslmode': 'require',
}
def rotate_secret(self, secret_id: str):
"""Trigger secret rotation."""
self.client.rotate_secret(SecretId=secret_id)
# Clear cache to get new secret on next access
if secret_id in self.cache:
del self.cache[secret_id]
Security Checklist
- Zero Trust - Verify every request
- Encryption - At rest and in transit for all data
- Least Privilege - Minimum required permissions
- Secrets Management - Never hardcode credentials
- Security Scanning - Automated in CI/CD
- Audit Logging - CloudTrail for all API calls
- Incident Response - Documented and practiced
Follow-Up Questions
- How do you implement just-in-time access for privileged operations?
- What strategies would you use to detect and respond to cloud account compromises?
- How do you maintain compliance (SOC2, HIPAA) in a rapidly changing cloud environment?