Cost-Optimized Data Warehouse on Redshift
Performance Tuning + Compression + WLM + Serverless
ℹ️
Project Difficulty: Advanced | Duration: 2-3 weeks | Cloud: AWS Build a production data warehouse on Amazon Redshift optimized for both performance and cost, achieving 60-70% savings over naive configurations.
Project Overview
Problem Statement
Data warehouses are expensive to operate. Without proper optimization, organizations overspend on compute resources while experiencing poor query performance. A well-tuned Redshift cluster can deliver enterprise-grade analytics at a fraction of the cost.
Objectives
- Achieve 60%+ cost reduction vs baseline
- Maintain sub-second query performance for dashboards
- Support 100+ concurrent users
- Implement automated scaling and workload management
- Enable seamless data loading from multiple sources
Tech Stack
| Component | Technology | Purpose |
|---|---|---|
| Warehouse | Amazon Redshift | Core analytics engine |
| ETL | AWS Glue | Data integration |
| Storage | S3 + Redshift Spectrum | Data lake extension |
| Monitoring | CloudWatch + Redshift Console | Performance monitoring |
| Security | AWS KMS + IAM | Data protection |
Architecture Diagram
Data Source Setup and Schema
Optimized Redshift Schema
-- schemas/optimized_schema.sql
-- Dimension Tables with optimal distribution
CREATE TABLE IF NOT EXISTS dim_customers (
customer_key BIGINT IDENTITY(1,1) DISTKEY,
customer_id VARCHAR(50) ENCODE raw,
email VARCHAR(255) ENCODE lzo,
full_name VARCHAR(201) ENCODE lzo,
company_name VARCHAR(255) ENCODE lzo,
industry VARCHAR(100) ENCODE bytedict,
segment VARCHAR(50) ENCODE bytedict,
city VARCHAR(100) ENCODE lzo,
state VARCHAR(50) ENCODE bytedict,
country VARCHAR(50) ENCODE bytedict,
created_at TIMESTAMP ENCODE lzo,
updated_at TIMESTAMP ENCODE lzo,
is_active BOOLEAN ENCODE runlength,
lifetime_value DECIMAL(12,2) ENCODE az64,
PRIMARY KEY (customer_id)
)
DISTSTYLE KEY
SORTKEY (created_at, customer_id);
-- Product dimension with even distribution
CREATE TABLE IF NOT EXISTS dim_products (
product_key BIGINT IDENTITY(1,1) DISTKEY,
product_id VARCHAR(50) ENCODE raw,
sku VARCHAR(50) ENCODE lzo,
name VARCHAR(255) ENCODE lzo,
category VARCHAR(100) ENCODE bytedict,
subcategory VARCHAR(100) ENCODE bytedict,
brand VARCHAR(100) ENCODE lzo,
price DECIMAL(10,2) ENCODE az64,
cost DECIMAL(10,2) ENCODE az64,
weight_kg DECIMAL(8,3) ENCODE az64,
is_active BOOLEAN ENCODE runlength,
created_at TIMESTAMP ENCODE lzo,
updated_at TIMESTAMP ENCODE lzo,
PRIMARY KEY (product_id)
)
DISTSTYLE ALL
SORTKEY (category, product_id);
-- Large fact table with key distribution
CREATE TABLE IF NOT EXISTS fct_orders (
order_key BIGINT IDENTITY(1,1) DISTKEY,
order_id VARCHAR(50) ENCODE raw,
customer_key BIGINT ENCODE az64,
order_date DATE ENCODE lzo,
ship_date DATE ENCODE lzo,
status VARCHAR(30) ENCODE bytedict,
subtotal DECIMAL(12,2) ENCODE az64,
tax_amount DECIMAL(10,2) ENCODE az64,
shipping_amount DECIMAL(10,2) ENCODE az64,
discount_amount DECIMAL(10,2) ENCODE az64,
total_amount DECIMAL(12,2) ENCODE az64,
currency VARCHAR(3) ENCODE bytedict,
payment_method VARCHAR(50) ENCODE bytedict,
shipping_address VARCHAR(500) ENCODE lzo,
created_at TIMESTAMP ENCODE lzo,
updated_at TIMESTAMP ENCODE lzo
)
DISTSTYLE KEY
COMPOUND SORTKEY (order_date, status, customer_key);
-- Order items with interleaved sort key
CREATE TABLE IF NOT EXISTS fct_order_items (
order_item_key BIGINT IDENTITY(1,1) DISTKEY,
order_id VARCHAR(50) ENCODE raw,
product_key BIGINT ENCODE az64,
quantity INTEGER ENCODE az64,
unit_price DECIMAL(10,2) ENCODE az64,
discount DECIMAL(5,2) ENCODE az64,
line_total DECIMAL(12,2) ENCODE az64,
created_at TIMESTAMP ENCODE lzo
)
DISTSTYLE KEY
INTERLEAVED SORTKEY (order_id, product_key);
-- Daily aggregated metrics (materialized)
CREATE TABLE IF NOT EXISTS agg_daily_sales (
metric_date DATE ENCODE lzo,
product_key BIGINT ENCODE az64,
customer_key BIGINT ENCODE az64,
category VARCHAR(100) ENCODE bytedict,
total_orders INTEGER ENCODE az64,
total_items INTEGER ENCODE az64,
total_revenue DECIMAL(15,2) ENCODE az64,
avg_order_value DECIMAL(10,2) ENCODE az64,
unique_customers INTEGER ENCODE az64,
created_at TIMESTAMP ENCODE lzo
)
DISTSTYLE KEY
COMPOUND SORTKEY (metric_date, category);
-- Concurrency scaling cluster configuration
CREATE SERVERLESS_NAMESPACE IF NOT EXISTS analytics_namespace
WITH (admin_username = 'admin', admin_user_password = 'CHANGE_ME');
-- Workload Management Queues
CREATE WLM CONFIGURATION wlm_config
AUTO WLM;
Step-by-Step Implementation Guide
Step 1: Redshift Cluster Setup
# redshift/cluster_manager.py
import boto3
import json
import time
from typing import Dict, List, Optional
import logging
logging.basicConfig(level.INFO)
logger = logging.getLogger(__name__)
class RedshiftClusterManager:
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.redshift = boto3.client('redshift', region_name=region)
self.s3 = boto3.client('s3', region_name=region)
self.iam = boto3.client('iam', region_name=region)
def create_cluster(self, config: Dict) -> Dict:
cluster_config = {
'ClusterIdentifier': config['cluster_name'],
'NodeType': config.get('node_type', 'dc2.large'),
'MasterUsername': config['master_username'],
'MasterUserPassword': config['master_password'],
'NumberOfNodes': config.get('num_nodes', 2),
'Encrypted': True,
'KmsKeyId': config.get('kms_key_id'),
'VpcSecurityGroupIds': config['security_group_ids'],
'ClusterSubnetGroupName': config['subnet_group'],
'PubliclyAccessible': False,
'AutomatedSnapshotRetentionPeriod': config.get('snapshot_retention', 7),
'ManualSnapshotRetentionPeriod': 35,
'PreferredMaintenanceWindow': 'Sun:03:00-Sun:04:00',
'ClusterParameterGroupName': config.get('parameter_group', 'default.redshift-8.0'),
'IamRoles': config.get('iam_roles', []),
'LoggingProperties': {
'BucketName': config['log_bucket'],
'S3KeyPrefix': 'redshift-logs/'
}
}
if config.get('node_type', '').startswith('ra3'):
cluster_config['NumberOfNodes'] = config.get('num_nodes', 2)
cluster_config['ManagedStorage'] = True
try:
response = self.redshift.create_cluster(**cluster_config)
logger.info(f"Creating cluster: {config['cluster_name']}")
self._wait_for_cluster(config['cluster_name'], 'available')
return response['Cluster']
except Exception as e:
logger.error(f"Failed to create cluster: {e}")
raise
def _wait_for_cluster(self, cluster_id: str, target_state: str,
timeout: int = 1800):
start_time = time.time()
while True:
response = self.redshift.describe_clusters(ClusterIdentifier=cluster_id)
state = response['Clusters'][0]['ClusterStatus']
if state == target_state:
logger.info(f"Cluster {cluster_id} is now {target_state}")
return
elif state in ['deleted', 'deleting']:
raise Exception(f"Cluster entered unexpected state: {state}")
elapsed = time.time() - start_time
if elapsed > timeout:
raise TimeoutError(f"Cluster not ready within {timeout} seconds")
logger.info(f"Cluster state: {state} ({elapsed:.0f}s elapsed)")
time.sleep(30)
def get_cluster_metrics(self, cluster_id: str) -> Dict:
response = self.redshift.describe_clusters(ClusterIdentifier=cluster_id)
cluster = response['Clusters'][0]
return {
'cluster_id': cluster['ClusterIdentifier'],
'node_type': cluster['NodeType'],
'num_nodes': cluster['NumberOfNodes'],
'status': cluster['ClusterStatus'],
'endpoint': cluster.get('Endpoint', {}).get('Address'),
'port': cluster.get('Endpoint', {}).get('Port', 5439),
'creation_time': cluster['ClusterCreateTime'].isoformat(),
'encrypted': cluster['Encrypted']
}
def resize_cluster(self, cluster_id: str, new_node_type: str,
new_num_nodes: int):
try:
self.redshift.modify_cluster(
ClusterIdentifier=cluster_id,
ClusterType='multi-node' if new_num_nodes > 1 else 'single-node',
NodeType=new_node_type,
NumberOfNodes=new_num_nodes
)
logger.info(f"Resizing cluster {cluster_id} to {new_node_type} x{new_num_nodes}")
self._wait_for_cluster(cluster_id, 'available')
except Exception as e:
logger.error(f"Resize failed: {e}")
raise
def create_snapshot(self, cluster_id: str, snapshot_id: str) -> Dict:
response = self.redshift.create_cluster_snapshot(
SnapshotIdentifier=snapshot_id,
ClusterIdentifier=cluster_id
)
logger.info(f"Created snapshot: {snapshot_id}")
return response['Snapshot']
def delete_cluster(self, cluster_id: str, skip_final_snapshot: bool = True):
try:
self.redshift.delete_cluster(
ClusterIdentifier=cluster_id,
SkipFinalClusterSnapshot=skip_final_snapshot,
FinalClusterSnapshotIdentifier=f"{cluster_id}-final-{int(time.time())}"
)
logger.info(f"Deleting cluster: {cluster_id}")
except Exception as e:
logger.error(f"Delete failed: {e}")
raise
def setup_iam_role(self, role_name: str, s3_bucket: str) -> str:
trust_policy = {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "redshift.amazonaws.com"},
"Action": "sts:AssumeRole"
}]
}
try:
role = self.iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy),
Description="IAM role for Redshift S3 access"
)
self.iam.attach_role_policy(
RoleName=role_name,
PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
)
logger.info(f"Created IAM role: {role_name}")
return role['Role']['Arn']
except self.iam.exceptions.EntityAlreadyExistsException:
role = self.iam.get_role(RoleName=role_name)
return role['Role']['Arn']
def setup_vpc_endpoint(self, vpc_id: str, route_table_ids: List[str]) -> str:
ec2 = boto3.client('ec2', region_name=self.region)
endpoint = ec2.create_vpc_endpoint(
VpcId=vpc_id,
ServiceName=f'com.amazonaws.{self.region}.redshift',
RouteTableIds=route_table_ids,
PrivateDnsEnabled=True
)
logger.info(f"Created VPC endpoint: {endpoint['VpcEndpoint']['VpcEndpointId']}")
return endpoint['VpcEndpoint']['VpcEndpointId']
Step 2: Performance Optimization
-- optimization/analyze_optimize.sql
-- Analyze table statistics
ANALYZE dim_customers;
ANALYZE dim_products;
ANALYZE fct_orders;
ANALYZE fct_order_items;
-- Vacuum to reclaim space and sort
VACUUM SORT ONLY dim_customers;
VACUUM SORT ONLY fct_orders;
VACUUM DELETE ONLY fct_orders;
VACUUM ANALYZE fct_order_items;
-- Update statistics
ANALYZE COMPRESSION dim_customers;
ANALYZE COMPRESSION fct_orders;
-- Create materialized views for common queries
CREATE MATERIALIZED VIEW mv_daily_sales_summary AS
SELECT
order_date,
COUNT(DISTINCT order_id) AS total_orders,
SUM(total_amount) AS total_revenue,
AVG(total_amount) AS avg_order_value,
COUNT(DISTINCT customer_key) AS unique_customers
FROM fct_orders
WHERE order_date >= DATEADD('month', -3, CURRENT_DATE)
GROUP BY order_date
WITH NO DATA;
-- Refresh materialized view
REFRESH MATERIALIZED VIEW mv_daily_sales_summary;
-- Create statistics for query optimizer
CREATE STATISTICS stats_orders_date ON fct_orders (order_date);
CREATE STATISTICS stats_orders_customer ON fct_orders (customer_key);
CREATE STATISTICS stats_items_product ON fct_order_items (product_key);
-- Workload Management Configuration
CREATE WLM QUERY QUEUE reporting_queue
WITH (concurrency=5, memory_percent_to_use=40);
CREATE WLM QUERY QUEUE etl_queue
WITH (concurrency=3, memory_percent_to_use=50);
CREATE WLM QUERY QUEUE adhoc_queue
WITH (concurrency=10, memory_percent_to_use=30);
-- Query monitoring rules
CREATE WLM QUERY MONITORING RULE rule_long_running
WITH (query_execution_time=300000, action=Terminate);
CREATE WLM QUERY MONITORING RULE rule_high_memory
WITH (query_execution_time=60000, action=Snapshot);
# optimization/performance_tuner.py
import boto3
import time
from typing import Dict, List, Tuple
import logging
logging.basicConfig(level.INFO)
logger = logging.getLogger(__name__)
class RedshiftPerformanceTuner:
def __init__(self, cluster_id: str, region: str = 'us-east-1'):
self.cluster_id = cluster_id
self.region = region
self.redshift = boto3.client('redshift', region_name=region)
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
def analyze_table_skew(self, connection, table_name: str) -> Dict:
cursor = connection.cursor()
skew_query = f"""
SELECT
stewardnodeid,
COUNT(*) AS row_count,
COUNT(*) * 100.0 / SUM(COUNT(*)) OVER() AS percentage
FROM svv_table_info
WHERE name = '{table_name}'
GROUP BY stewardnodeid
ORDER BY row_count DESC;
"""
cursor.execute(skew_query)
results = cursor.fetchall()
total_rows = sum(r[1] for r in results)
expected_per_node = total_rows / len(results) if results else 0
skew_metrics = []
for node_id, row_count, pct in results:
skew = abs(row_count - expected_per_node) / expected_per_node * 100 if expected_per_node > 0 else 0
skew_metrics.append({
'node_id': node_id,
'row_count': row_count,
'percentage': pct,
'skew_percent': skew
})
return {
'table_name': table_name,
'total_rows': total_rows,
'num_nodes': len(results),
'max_skew': max(m['skew_percent'] for m in skew_metrics) if skew_metrics else 0,
'node_details': skew_metrics
}
def analyze_query_performance(self, connection, query: str) -> Dict:
cursor = connection.cursor()
explain_query = f"EXPLAIN {query}"
cursor.execute(explain_query)
explain_plan = cursor.fetchall()
cursor.execute(query)
start_time = time.time()
results = cursor.fetchall()
execution_time = time.time() - start_time
return {
'query': query,
'execution_time_seconds': execution_time,
'rows_returned': len(results),
'explain_plan': explain_plan
}
def recommend_distribution_style(self, connection, table_name: str) -> str:
cursor = connection.cursor()
query = f"""
SELECT
COUNT(*) AS total_rows,
COUNT(DISTINCT customer_key) AS distinct_values,
COUNT(*) / NULLIF(COUNT(DISTINCT customer_key), 0) AS avg_rows_per_value
FROM {table_name}
WHERE customer_key IS NOT NULL;
"""
cursor.execute(query)
result = cursor.fetchone()
total_rows = result[0]
distinct_values = result[1]
avg_rows_per_value = result[2]
if total_rows < 100000:
return 'ALL'
elif avg_rows_per_value > 100000:
return 'KEY'
else:
return 'EVEN'
def recommend_sort_key(self, connection, table_name: str,
query_patterns: List[str]) -> Dict:
cursor = connection.cursor()
column_usage = {}
for query in query_patterns:
columns = self._extract_where_columns(query)
for col in columns:
column_usage[col] = column_usage.get(col, 0) + 1
sorted_columns = sorted(column_usage.items(), key=lambda x: x[1], reverse=True)
if len(sorted_columns) >= 2:
sort_type = 'COMPOUND'
sort_columns = [c[0] for c in sorted_columns[:4]]
elif len(sorted_columns) == 1:
sort_type = 'COMPOUND'
sort_columns = [sorted_columns[0][0]]
else:
sort_type = 'COMPOUND'
sort_columns = ['created_at']
return {
'table_name': table_name,
'sort_type': sort_type,
'sort_columns': sort_columns,
'recommendation': f"{sort_type} SORTKEY ({', '.join(sort_columns)})"
}
def _extract_where_columns(self, query: str) -> List[str]:
import re
where_match = re.search(r'WHERE\s+(.+?)(?:ORDER|GROUP|LIMIT|$)', query, re.IGNORECASE)
if not where_match:
return []
where_clause = where_match.group(1)
columns = re.findall(r'(\w+)\s*(?:=|>|<|>=|<=|IN|LIKE|BETWEEN)', where_clause, re.IGNORECASE)
return columns
def get_cluster_utilization(self) -> Dict:
end_time = time.time()
start_time = end_time - 3600
cpu_metrics = self.cloudwatch.get_metric_statistics(
Namespace='AWS/Redshift',
MetricName='CPUUtilization',
Dimensions=[{'Name': 'ClusterIdentifier', 'Value': self.cluster_id}],
StartTime=start_time,
EndTime=end_time,
Period=300,
Statistics=['Average', 'Maximum']
)
storage_metrics = self.cloudwatch.get_metric_statistics(
Namespace='AWS/Redshift',
MetricName='PercentageDiskSpaceUsed',
Dimensions=[{'Name': 'ClusterIdentifier', 'Value': self.cluster_id}],
StartTime=start_time,
EndTime=end_time,
Period=300,
Statistics=['Average', 'Maximum']
)
return {
'cpu': {
'avg': cpu_metrics['Datapoints'][0]['Average'] if cpu_metrics['Datapoints'] else 0,
'max': max(dp['Maximum'] for dp in cpu_metrics['Datapoints']) if cpu_metrics['Datapoints'] else 0
},
'storage': {
'avg': storage_metrics['Datapoints'][0]['Average'] if storage_metrics['Datapoints'] else 0,
'max': max(dp['Maximum'] for dp in storage_metrics['Datapoints']) if storage_metrics['Datapoints'] else 0
}
}
def generate_optimization_report(self, connection) -> Dict:
report = {
'cluster_id': self.cluster_id,
'timestamp': time.time(),
'recommendations': [],
'utilization': self.get_cluster_utilization()
}
cursor = connection.cursor()
cursor.execute("""
SELECT name FROM svv_table_info
WHERE schema = 'public'
ORDER BY size DESC
LIMIT 10
""")
tables = [r[0] for r in cursor.fetchall()]
for table in tables:
skew = self.analyze_table_skew(connection, table)
if skew['max_skew'] > 20:
report['recommendations'].append({
'table': table,
'issue': 'high_skew',
'severity': 'high',
'detail': f"Skew of {skew['max_skew']:.1f}% detected"
})
if report['utilization']['cpu']['avg'] < 20:
report['recommendations'].append({
'type': 'right_sizing',
'severity': 'medium',
'detail': f"CPU utilization is only {report['utilization']['cpu']['avg']:.1f}% avg. Consider downsizing."
})
return report
Step 3: Cost Monitoring
# cost/cost_monitor.py
import boto3
from datetime import datetime, timedelta
from typing import Dict, List
import logging
logging.basicConfig(level.INFO)
logger = logging.getLogger(__name__)
class RedshiftCostMonitor:
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.ce = boto3.client('ce', region_name='us-east-1')
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
def get_monthly_cost(self, cluster_id: str) -> Dict:
end_date = datetime.now().strftime('%Y-%m-01')
start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-01')
response = self.ce.get_cost_and_usage(
TimePeriod={'Start': start_date, 'End': end_date},
Granularity='MONTHLY',
Metrics=['UnblendedCost'],
Filter={
'Dimensions': {
'Key': 'SERVICE',
'Values': ['Amazon Redshift']
}
}
)
cost_data = response['ResultsByTime'][0]['Total']['UnblendedCost']
return {
'period': f"{start_date} to {end_date}",
'total_cost': float(cost_data['Amount']),
'currency': cost_data['Unit']
}
def estimate_cost_by_node_type(self) -> Dict:
pricing = {
'dc2.large': {'vcpu': 2, 'memory_gb': 15, 'hourly_cost': 0.25},
'dc2.8xlarge': {'vcpu': 32, 'memory_gb': 244, 'hourly_cost': 4.80},
'ra3.xlplus': {'vcpu': 12, 'memory_gb': 96, 'hourly_cost': 1.065},
'ra3.4xlarge': {'vcpu': 12, 'memory_gb': 96, 'hourly_cost': 3.26},
'ra3.16xlarge': {'vcpu': 48, 'memory_gb': 384, 'hourly_cost': 13.04}
}
estimates = {}
hours_per_month = 730
for node_type, info in pricing.items():
for num_nodes in [2, 4, 8]:
monthly_cost = info['hourly_cost'] * hours_per_month * num_nodes
estimates[f"{node_type}x{num_nodes}"] = {
'node_type': node_type,
'num_nodes': num_nodes,
'hourly_cost': info['hourly_cost'] * num_nodes,
'monthly_cost': monthly_cost,
'vcpu_total': info['vcpu'] * num_nodes,
'memory_total_gb': info['memory_gb'] * num_nodes
}
return estimates
def get_utilization_trends(self, cluster_id: str, days: int = 30) -> Dict:
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
cpu_trend = self.cloudwatch.get_metric_statistics(
Namespace='AWS/Redshift',
MetricName='CPUUtilization',
Dimensions=[{'Name': 'ClusterIdentifier', 'Value': cluster_id}],
StartTime=start_time,
EndTime=end_time,
Period=86400,
Statistics=['Average', 'Maximum']
)
return {
'cpu_trend': cpu_trend['Datapoints']
}
def calculate_cost_per_query(self, cluster_id: str) -> Dict:
utilization = self.get_utilization_trends(cluster_id, days=30)
if utilization['cpu_trend']:
avg_cpu = sum(dp['Average'] for dp in utilization['cpu_trend']) / len(utilization['cpu_trend'])
else:
avg_cpu = 0
cost = self.get_monthly_cost(cluster_id)
return {
'monthly_cost': cost['total_cost'],
'avg_cpu_utilization': avg_cpu,
'cost_efficiency': cost['total_cost'] / max(avg_cpu, 1)
}
def generate_cost_report(self, cluster_id: str) -> Dict:
return {
'cluster_id': cluster_id,
'current_cost': self.get_monthly_cost(cluster_id),
'utilization': self.get_utilization_trends(cluster_id),
'cost_estimates': self.estimate_cost_by_node_type(),
'cost_per_query': self.calculate_cost_per_query(cluster_id)
}
Infrastructure Setup (Terraform)
# infrastructure/redshift_warehouse.tf
variable "environment" {
default = "production"
}
variable "master_password" {
sensitive = true
}
resource "aws_redshift_cluster" "analytics" {
cluster_identifier = "analytics-warehouse-${var.environment}"
database_name = "analytics"
master_username = "admin"
master_password = var.master_password
node_type = "ra3.xlplus"
number_of_nodes = 3
cluster_type = "multi-node"
encrypted = true
kms_key_id = aws_kms_key.redshift.arn
vpc_security_group_ids = [aws_security_group.redshift.id]
cluster_subnet_group_name = aws_redshift_subnet_group.analytics.name
iam_roles = [aws_iam_role.redshift_role.arn]
publicly_accessible = false
skip_final_snapshot = var.environment != "production"
final_snapshot_identifier = var.environment == "production" ? "analytics-final-${formatdate("YYYY-MM-DD-hhmm", timestamp())}" : null
automated_snapshot_retention_period = 7
manual_snapshot_retention_period = 35
preferred_maintenance_window = "sun:03:00-sun:04:00"
availability_zone = "${var.region}a"
logging_properties {
bucket_name = aws_s3_bucket.redshift_logs.id
s3_key_prefix = "redshift-logs/"
}
tags = {
Environment = var.environment
Project = "analytics"
}
}
resource "aws_redshift_parameter_group" "analytics" {
name = "analytics-params-${var.environment}"
family = "redshift-8.0"
parameter {
name = "enable_user_activity_logging"
value = "1"
}
parameter {
name = "max_query_timeout"
value = "3600"
}
parameter {
name = "query_group_timeout"
value = "600"
}
}
resource "aws_redshift_subnet_group" "analytics" {
name = "analytics-subnet-group"
subnet_ids = var.private_subnets
tags = {
Environment = var.environment
}
}
resource "aws_security_group" "redshift" {
name_prefix = "redshift-"
vpc_id = var.vpc_id
ingress {
from_port = 5439
to_port = 5439
protocol = "tcp"
security_groups = [aws_security_group.data_analysts.id]
description = "Redshift access from analyst security group"
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
tags = {
Name = "redshift-security-group"
}
}
resource "aws_kms_key" "redshift" {
description = "KMS key for Redshift encryption"
deletion_window_in_days = 7
enable_key_rotation = true
}
resource "aws_s3_bucket" "redshift_logs" {
bucket = "redshift-logs-${var.environment}-${var.aws_account_id}"
}
resource "aws_s3_bucket_lifecycle_configuration" "redshift_logs" {
bucket = aws_s3_bucket.redshift_logs.id
rule {
id = "archive-logs"
status = "Enabled"
transition {
days = 30
storage_class = "STANDARD_IA"
}
transition {
days = 90
storage_class = "GLACIER"
}
expiration {
days = 365
}
}
}
resource "aws_iam_role" "redshift_role" {
name = "redshift-analytics-role-${var.environment}"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = { Service = "redshift.amazonaws.com" }
}]
})
}
resource "aws_iam_role_policy" "redshift_s3_access" {
name = "redshift-s3-access"
role = aws_iam_role.redshift_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = ["s3:GetObject", "s3:ListBucket"]
Resource = [aws_s3_bucket.data_lake.arn, "${aws_s3_bucket.data_lake.arn}/*"]
},
{
Effect = "Allow"
Action = ["redshift:DescribeClusters", "redshift:GetClusterCredentials"]
Resource = "*"
}
]
})
}
output "redshift_endpoint" {
value = aws_redshift_cluster.analytics.endpoint
}
output "redshift_port" {
value = aws_redshift_cluster.analytics.port
}
Testing and Validation
# tests/test_redshift_optimization.py
import pytest
class TestRedshiftOptimization:
def test_compression_encoding_selection(self):
column_types = {
'customer_id': 'VARCHAR',
'amount': 'DECIMAL',
'status': 'VARCHAR',
'created_at': 'TIMESTAMP'
}
encoding_recommendations = {}
for col, dtype in column_types.items():
if dtype == 'VARCHAR':
encoding_recommendations[col] = 'lzo'
elif dtype == 'DECIMAL':
encoding_recommendations[col] = 'az64'
elif dtype == 'TIMESTAMP':
encoding_recommendations[col] = 'lzo'
assert len(encoding_recommendations) == 4
def test_distribution_style_recommendation(self):
test_cases = [
{'rows': 50000, 'distinct_values': 50000, 'expected': 'ALL'},
{'rows': 10000000, 'distinct_values': 1000, 'expected': 'KEY'},
{'rows': 500000, 'distinct_values': 100000, 'expected': 'EVEN'}
]
for tc in test_cases:
if tc['rows'] < 100000:
result = 'ALL'
elif tc['rows'] / tc['distinct_values'] > 100000:
result = 'KEY'
else:
result = 'EVEN'
assert result == tc['expected']
def test_sort_key_type_selection(self):
where_columns_frequency = {
'order_date': 100,
'status': 50,
'customer_id': 30,
'region': 10
}
sorted_cols = sorted(where_columns_frequency.items(), key=lambda x: x[1], reverse=True)
top_cols = [c[0] for c in sorted_cols[:2]]
assert len(top_cols) == 2
assert top_cols[0] == 'order_date'
def test_cost_estimation(self):
pricing = {
'ra3.xlplus': 1.065,
'ra3.4xlarge': 3.26,
'ra3.16xlarge': 13.04
}
hours_per_month = 730
num_nodes = 3
for node_type, hourly_cost in pricing.items():
monthly = hourly_cost * hours_per_month * num_nodes
assert monthly > 0
def test_materialized_view_creation(self):
mv_query = """
CREATE MATERIALIZED VIEW mv_daily_sales_summary AS
SELECT order_date, COUNT(*) as order_count, SUM(total_amount) as revenue
FROM fct_orders
GROUP BY order_date
WITH NO DATA;
"""
assert 'MATERIALIZED VIEW' in mv_query
assert 'WITH NO DATA' in mv_query
def test_wlm_configuration(self):
queues = [
{'name': 'reporting', 'concurrency': 5, 'memory': 40},
{'name': 'etl', 'concurrency': 3, 'memory': 50},
{'name': 'adhoc', 'concurrency': 10, 'memory': 30}
]
total_memory = sum(q['memory'] for q in queues)
assert total_memory == 100
def test_vacuum_strategy(self):
tables = [
{'name': 'dim_customers', 'strategy': 'SORT ONLY'},
{'name': 'fct_orders', 'strategy': 'DELETE ONLY'},
{'name': 'fct_order_items', 'strategy': 'FULL'}
]
for table in tables:
assert table['strategy'] in ['SORT ONLY', 'DELETE ONLY', 'FULL']
def test_analyze_statistics(self):
tables = ['dim_customers', 'dim_products', 'fct_orders', 'fct_order_items']
for table in tables:
analyze_query = f"ANALYZE {table};"
assert analyze_query.startswith('ANALYZE')
def test_cluster_resize_recommendation(self):
utilization = {
'cpu_avg': 15.0,
'cpu_max': 45.0,
'storage_used_percent': 20.0
}
if utilization['cpu_avg'] < 20 and utilization['storage_used_percent'] < 30:
recommendation = "downsize"
elif utilization['cpu_avg'] > 70:
recommendation = "upsize"
else:
recommendation = "maintain"
assert recommendation == "downsize"
def test_data_loading_optimization(self):
load_strategies = {
'bulk': 'COPY command with manifest',
'incremental': 'INSERT with sort key awareness',
'streaming': 'Kinesis Redshift integration'
}
for strategy, description in load_strategies.items():
assert isinstance(description, str)
Cost Analysis
Monthly Cost Breakdown (Production)
| Component | Specification | Monthly Cost |
|---|---|---|
| Redshift ra3.xlplus | 3 nodes, 24/7 | $2,338 |
| S3 Storage | 2TB data lake | $46 |
| AWS Glue | 200 hours ETL | $70 |
| Data Transfer | Cross-AZ + internet | $150 |
| KMS | Key management | $3 |
| CloudWatch | Monitoring | $50 |
| Total | $2,657 |
Cost Optimization Strategies
💡
Tip: Reduce Redshift costs by 50-70%:
- RA3 with Managed Storage: Pay only for data stored
- Concurrency Scaling: Auto-scale for burst workloads
- Reserved Instances: 1-year commitment saves 40%
- Spectrum: Query S3 directly instead of loading
- Serverless: Pay-per-query for variable workloads
Performance vs Cost Comparison
| Configuration | Monthly Cost | Query Performance | Cost per Query |
|---|---|---|---|
| dc2.large x2 | 0.011 | ||
| ra3.xlplus x3 | 0.004 | ||
| ra3.4xlarge x2 | 0.003 | ||
| Serverless | Variable | Best | $0.002 |
Interview Talking Points
ℹ️
Best Practice: Focus on these Redshift optimization concepts in interviews:
-
Why distribution keys matter?
- Minimize data shuffling across nodes
- Co-locate joined tables
- Even data distribution prevents skew
-
Why sort keys improve performance?
- Prune blocks during query execution
- Compound keys for sequential access
- Interleaved keys for multi-column queries
-
Why compression encoding saves cost?
- Reduces storage by 60-80%
- Improves I/O performance
- Less data scanned = lower cost
Common Interview Questions
Q: "How do you handle data skew in Redshift?"
skew_solutions = {
"Even Distribution": "Use EVEN distkey for small dimension tables",
"Key Selection": "Choose high-cardinality column as distkey",
"Vacuum": "Reclaim space and rebalance after deletes",
"Analyze": "Update statistics for query optimizer"
}
Q: "How do you optimize for concurrent queries?"
concurrency_optimization = {
"WLM Queues": "Separate ETL, reporting, and ad-hoc queues",
"Concurrency Scaling": "Auto-scale for burst workloads",
"Materialized Views": "Pre-compute common aggregations",
"Query Monitoring": "Terminate long-running queries"
}
Deployment Checklist
- Set up VPC and security groups
- Create Redshift cluster with encryption
- Configure IAM roles for S3 access
- Design optimal table schemas
- Set up WLM queues for workload management
- Load initial data with COPY command
- Run ANALYZE and VACUUM
- Create materialized views
- Configure monitoring and alerting
- Test query performance
- Document optimization recommendations
⚠️
Warning: Always test schema changes in a dev cluster before production. Distribution key changes require table recreation and can cause significant downtime.
This project demonstrates cloud data warehousing optimization skills and is highly relevant for data engineering interviews at companies with large-scale analytics requirements.