Cloud Security Architecture: Zero Trust, IAM, Encryption
Difficulty: Senior Level | Companies: AWS, Google, Microsoft, CrowdStrike, Palo Alto
Interview Question
"Design a zero-trust security architecture for a multi-cloud environment. How do you handle identity, access control, and data encryption?"
โน๏ธKey Concepts
This question tests your understanding of zero trust principles, identity management, and cloud security best practices.
Complete Security Architecture
Architecture Overview
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ ZERO TRUST SECURITY ARCHITECTURE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโโโ IDENTITY LAYER โโโโโโโโโโโโโโโโโโ โ
โ โ Identity Provider โ MFA โ SSO โ Federation โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ ACCESS CONTROL LAYER โโโโโโโโโโโโ โ
โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ Policy Decision Point โ โ โ
โ โ โ (OPA/Cedar) โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ Policy Enforcement Point โ โ โ
โ โ โ (Service Mesh/API Gateway) โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ DATA PROTECTION LAYER โโโโโโโโโโโ โ
โ โ Encryption โ Key Management โ Data Classificationโ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ NETWORK SECURITY LAYER โโโโโโโโโโ โ
โ โ VPC โ Security Groups โ WAF โ Shield โ DDoS โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Mathematical Foundation: Security Metrics
Risk Assessment:
- Asset value: V = $1,000,000
- Threat probability: P = 0.01 (1% annually)
- Vulnerability probability: V = 0.1 (10%)
- Risk = V ร P ร V = 1,000
Zero Trust Score:
- Identity verification: I = 0.25
- Device trust: D = 0.25
- Network security: N = 0.25
- Data protection: P = 0.25
- Zero Trust Score = I + D + N + P
Encryption Strength:
- AES-256: 2^256 possible keys
- Brute force time: T = 2^256 / (operations_per_second)
- For 10^18 ops/sec: T = 10^60 years
AWS IAM Configuration
# IAM policies for zero trust
data "aws_iam_policy_document" "deny_all" {
statement {
sid = "DenyAll"
effect = "Deny"
actions = ["*"]
resources = ["*"]
}
}
data "aws_iam_policy_document" "allow_specific" {
statement {
sid = "AllowS3Read"
effect = "Allow"
actions = [
"s3:GetObject",
"s3:ListBucket"
]
resources = [
aws_s3_bucket.data.arn,
"${aws_s3_bucket.data.arn}/*"
]
}
statement {
sid = "AllowDynamoDBRead"
effect = "Allow"
actions = [
"dynamodb:GetItem",
"dynamodb:Query"
]
resources = [
aws_dynamodb_table.data.arn
]
}
}
# IAM role for service
resource "aws_iam_role" "service_role" {
name = "order-service-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "ecs-tasks.amazonaws.com"
}
Condition = {
StringEquals = {
"aws:SourceAccount" = data.aws_caller_identity.current.account_id
}
}
}
]
})
}
resource "aws_iam_role_policy" "service_policy" {
name = "order-service-policy"
role = aws_iam_role.service_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "AllowDynamoDB"
Effect = "Allow"
Action = [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:Query"
]
Resource = aws_dynamodb_table.data.arn
Condition = {
ForAllValues:StringEquals = {
"dynamodb:LeadingKeys" = [
"ORDER#${aws:PrincipalTag/user_id}"
]
}
}
},
{
Sid = "AllowS3"
Effect = "Allow"
Action = [
"s3:GetObject",
"s3:PutObject"
]
Resource = "${aws_s3_bucket.data.arn}/*"
Condition = {
StringEquals = {
"s3:x-amz-server-side-encryption": "aws:kms"
}
}
},
{
Sid = "AllowKMS"
Effect = "Allow"
Action = [
"kms:Decrypt",
"kms:GenerateDataKey"
]
Resource = aws_kms_key.data.arn
}
]
})
}
# IAM policy with conditions
data "aws_iam_policy_document" "conditional_access" {
statement {
sid = "ConditionalAccess"
effect = "Allow"
actions = [
"s3:GetObject"
]
resources = [
"${aws_s3_bucket.data.arn}/*"
]
condition {
test = "StringEquals"
variable = "aws:PrincipalTag/department"
values = ["engineering"]
}
condition {
test = "DateGreaterThan"
variable = "aws:CurrentTime"
values = ["2024-01-01T00:00:00Z"]
}
condition {
test = "IpAddress"
variable = "aws:SourceIp"
values = ["10.0.0.0/8"]
}
}
}
Zero Trust Implementation
# Zero trust authorization
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from enum import Enum
import hashlib
import time
class TrustLevel(Enum):
UNTRUSTED = 0
BASIC = 1
VERIFIED = 2
TRUSTED = 3
HIGHLY_TRUSTED = 4
@dataclass
class Identity:
user_id: str
device_id: str
ip_address: str
trust_level: TrustLevel
mfa_verified: bool
last_verification: float
class ZeroTrustEngine:
"""Zero trust authorization engine"""
def __init__(self):
self.policies: List[Dict[str, Any]] = []
def evaluate_access(self, identity: Identity, resource: str,
action: str) -> bool:
"""Evaluate access request"""
# Check minimum trust level
if identity.trust_level.value < self._get_required_trust_level(resource, action):
return False
# Check MFA requirement
if self._requires_mfa(resource, action) and not identity.mfa_verified:
return False
# Check time-based access
if not self._check_time_based_access(resource):
return False
# Check location-based access
if not self._check_location_based_access(identity.ip_address, resource):
return False
# Check device health
if not self._check_device_health(identity.device_id):
return False
# Evaluate policies
return self._evaluate_policies(identity, resource, action)
def _get_required_trust_level(self, resource: str, action: str) -> int:
"""Get required trust level for resource/action"""
if resource.startswith("sensitive/"):
return TrustLevel.HIGHLY_TRUSTED.value
elif action in ["delete", "update"]:
return TrustLevel.VERIFIED.value
else:
return TrustLevel.BASIC.value
def _requires_mfa(self, resource: str, action: str) -> bool:
"""Check if MFA is required"""
return resource.startswith("sensitive/") or action in ["delete", "update"]
def _check_time_based_access(self, resource: str) -> bool:
"""Check time-based access restrictions"""
current_hour = time.localtime().tm_hour
# Restrict sensitive resources to business hours
if resource.startswith("sensitive/"):
return 9 <= current_hour <= 17
return True
def _check_location_based_access(self, ip_address: str, resource: str) -> bool:
"""Check location-based access restrictions"""
# In production, use GeoIP to determine location
# For now, check if IP is in allowed range
allowed_ranges = ["10.0.0.0/8", "192.168.0.0/16"]
# Simplified check
return True
def _check_device_health(self, device_id: str) -> bool:
"""Check device health status"""
# In production, check device compliance
return True
def _evaluate_policies(self, identity: Identity, resource: str,
action: str) -> bool:
"""Evaluate OPA/Cedar policies"""
# In production, use OPA or Cedar for policy evaluation
return True
class IdentityProvider:
"""Identity provider integration"""
def __init__(self):
self.users: Dict[str, Dict[str, Any]] = {}
def authenticate(self, username: str, password: str) -> Optional[Identity]:
"""Authenticate user"""
# In production, use OAuth2/OIDC
user = self.users.get(username)
if not user:
return None
# Verify password
password_hash = hashlib.sha256(password.encode()).hexdigest()
if password_hash != user['password_hash']:
return None
return Identity(
user_id=user['user_id'],
device_id=user.get('device_id', 'unknown'),
ip_address=user.get('ip_address', 'unknown'),
trust_level=TrustLevel.BASIC,
mfa_verified=False,
last_verification=time.time()
)
def verify_mfa(self, user_id: str, mfa_code: str) -> bool:
"""Verify MFA code"""
# In production, use TOTP/SMS
return True
โ ๏ธZero Trust Principles
Never trust, always verify. Implement least privilege access, micro-segmentation, and continuous verification for all users and devices.
Encryption Implementation
# Encryption at rest and in transit
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization
import base64
import os
from typing import Tuple
class EncryptionManager:
"""Encryption management"""
def __init__(self):
self.fernet = None
def generate_key(self, password: str, salt: bytes = None) -> Tuple[bytes, bytes]:
"""Generate encryption key from password"""
if salt is None:
salt = os.urandom(16)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=480000,
)
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
return key, salt
def encrypt_data(self, data: str, key: bytes) -> bytes:
"""Encrypt data with Fernet"""
f = Fernet(key)
return f.encrypt(data.encode())
def decrypt_data(self, encrypted_data: bytes, key: bytes) -> str:
"""Decrypt data with Fernet"""
f = Fernet(key)
return f.decrypt(encrypted_data).decode()
class RSAEncryption:
"""RSA encryption for asymmetric operations"""
def __init__(self):
self.private_key = None
self.public_key = None
def generate_key_pair(self, key_size: int = 2048):
"""Generate RSA key pair"""
self.private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size
)
self.public_key = self.private_key.public_key()
def encrypt(self, plaintext: str) -> bytes:
"""Encrypt with public key"""
ciphertext = self.public_key.encrypt(
plaintext.encode(),
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return ciphertext
def decrypt(self, ciphertext: bytes) -> str:
"""Decrypt with private key"""
plaintext = self.private_key.decrypt(
ciphertext,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return plaintext.decode()
def export_public_key(self) -> str:
"""Export public key"""
return self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode()
def export_private_key(self, password: str = None) -> str:
"""Export private key"""
encryption = (
serialization.BestAvailableEncryption(password.encode())
if password
else serialization.NoEncryption()
)
return self.private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=encryption
).decode()
WAF Configuration
# AWS WAF configuration
resource "aws_wafv2_web_acl" "main" {
name = "main-waf"
description = "WAF rules for main application"
scope = "REGIONAL"
default_action {
allow {}
}
rule {
name = "rate-limit"
priority = 1
override_action {
none {}
}
statement {
rate_based_statement {
limit = 2000
aggregate_key_type = "IP"
}
}
visibility_config {
cloudwatch_metrics_enabled = true
metric_name = "RateLimitMetric"
sampled_requests_enabled = true
}
}
rule {
name = "aws-managed-common"
priority = 2
override_action {
none {}
}
statement {
managed_rule_group_statement {
name = "AWSManagedRulesCommonRuleSet"
vendor_name = "AWS"
}
}
visibility_config {
cloudwatch_metrics_enabled = true
metric_name = "CommonRuleSetMetric"
sampled_requests_enabled = true
}
}
rule {
name = "sql-injection"
priority = 3
override_action {
none {}
}
statement {
managed_rule_group_statement {
name = "AWSManagedRulesSQLiRuleSet"
vendor_name = "AWS"
}
}
visibility_config {
cloudwatch_metrics_enabled = true
metric_name = "SQLInjectionMetric"
sampled_requests_enabled = true
}
}
rule {
name = "xss-protection"
priority = 4
override_action {
none {}
}
statement {
managed_rule_group_statement {
name = "AWSManagedRulesXSSRuleSet"
vendor_name = "AWS"
}
}
visibility_config {
cloudwatch_metrics_enabled = true
metric_name = "XSSMetric"
sampled_requests_enabled = true
}
}
visibility_config {
cloudwatch_metrics_enabled = true
metric_name = "WAFAclMetric"
sampled_requests_enabled = true
}
}
# Associate WAF with ALB
resource "aws_wafv2_web_acl_association" "main" {
resource_arn = aws_lb.main.arn
web_acl_arn = aws_wafv2_web_acl.main.arn
}
Security Monitoring
# Security monitoring and alerting
import boto3
import json
from typing import Dict, Any, List
from datetime import datetime, timedelta
class SecurityMonitor:
"""Security monitoring and alerting"""
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
self.guardduty = boto3.client('guardduty')
self.sns = boto3.client('sns')
def monitor_iam_changes(self):
"""Monitor IAM changes"""
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/IAM',
MetricName='PolicyChanges',
Dimensions=[],
StartTime=datetime.utcnow() - timedelta(hours=1),
EndTime=datetime.utcnow(),
Period=300,
Statistics=['Sum']
)
if response['Datapoints']:
total_changes = sum(point['Sum'] for point in response['Datapoints'])
if total_changes > 10:
self._send_alert('IAM Changes Detected',
f'{total_changes} IAM changes in the last hour')
def monitor_failed_logins(self):
"""Monitor failed login attempts"""
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/CloudTrail',
MetricName='FailedLoginAttempts',
Dimensions=[],
StartTime=datetime.utcnow() - timedelta(hours=1),
EndTime=datetime.utcnow(),
Period=300,
Statistics=['Sum']
)
if response['Datapoints']:
total_failures = sum(point['Sum'] for point in response['Datapoints'])
if total_failures > 100:
self._send_alert('Brute Force Attack Detected',
f'{total_failures} failed login attempts')
def monitor_guardduty_findings(self):
"""Monitor GuardDuty findings"""
response = self.guardduty.list_findings(
FindingCriteria={
'Severity': {
'Gte': 7.0
}
}
)
for finding_id in response['FindingIds']:
finding = self.guardduty.get_findings(
FindingIds=[finding_id]
)['Findings'][0]
self._send_alert('GuardDuty Finding',
f"High severity finding: {finding['Title']}")
def _send_alert(self, title: str, message: str):
"""Send security alert"""
self.sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:security-alerts',
Message=json.dumps({
'title': title,
'message': message,
'timestamp': datetime.utcnow().isoformat(),
'severity': 'high'
}),
Subject=title
)
โ Security Best Practices
Implement defense in depth, encrypt all sensitive data, monitor for threats, and regularly audit access. Use automated tools for security scanning.
Summary
| Component | Purpose | Implementation |
|---|---|---|
| IAM | Identity management | AWS IAM, Azure AD |
| Zero Trust | Access control | OPA/Cedar policies |
| Encryption | Data protection | AES-256, RSA |
| WAF | Web protection | AWS WAF, Cloudflare |
| Monitoring | Threat detection | GuardDuty, CloudWatch |