Performance Optimization: Making Data Systems Fast
Performance optimization systematically identifies and eliminates bottlenecks in data pipelines and queries to reduce latency, increase throughput, and minimize resource consumption.
Why Performance Optimization Matters
Cost Impact:
- A query that takes 10 minutes costs 10x more than one that takes 1 minute
- At scale, performance optimization directly impacts cost
User Impact:
- Slow queries waste user time
- Poor performance leads to frustration
Key Insight: At scale, performance optimization directly impacts cost and user satisfaction.
Architecture Overview
The Performance Optimization Framework has five layers: Layer 1 - Query Optimization (partition pruning with 10-100x speedup, indexing with B-tree/Bitmap/Bloom, materialized views), Layer 2 - Pipeline Optimization (parallelism with Airflow TaskGroups, incremental processing, batching with bulk operations), Layer 3 - Infrastructure (right-sizing for workload match, auto-scaling with elastic compute, spot instances for 70% savings), Layer 4 - Storage (columnar formats, compression with Snappy/Zstd, file size 128-256 MB target), and Layer 5 - Monitoring (query profile, metrics, alerting for SLA violations).
Query Optimization
Query optimization rewrites and executes SQL queries to minimize resource consumption while maintaining correctness. It involves logical optimization (predicate pushdown, projection pruning) and physical optimization (join ordering, access paths).
Query Cost Model
- Data Scanned: D_scan = Ξ£ (table_size Γ filter_selectivity)
- JOIN Cost: C_join = D_left Γ D_right / max(D_left, D_right) (hash join)
- Sort Cost: C_sort = N Γ log(N) Γ row_size
- Aggregation Cost: C_agg = N Γ (group_cardinality Γ agg_cost)
- Total Query Cost: C_total = D_scan + C_join + C_sort + C_agg
- Optimization Leverage: Reduce D_scan by 90% (partitioning) -> 10x speedup
-- BAD: Full table scan
SELECT *
FROM fact_orders
WHERE customer_id = 12345;
-- GOOD: Partition pruning + selective columns
SELECT order_id, order_date, total_amount
FROM fact_orders
WHERE order_date >= '2025-01-01'
AND order_date < '2025-02-01'
AND customer_id = 12345;
-- Analyze query performance
EXPLAIN ANALYZE
SELECT
c.customer_name,
SUM(f.net_amount) AS total_spent
FROM fact_orders f
JOIN dim_customer c ON f.customer_key = c.customer_key
WHERE f.order_date BETWEEN '2025-01-01' AND '2025-12-31'
GROUP BY c.customer_name
ORDER BY total_spent DESC
LIMIT 10;
-- Snowflake: Query profile
SELECT *
FROM TABLE(GET_QUERY_OPERATOR_STATS('query_id_here'));
-- Redshift: Analyze query plan
EXPLAIN
SELECT * FROM fact_orders
WHERE order_date = '2025-01-15' AND customer_id = 12345;
-- Materialized View for repeated aggregations
CREATE MATERIALIZED VIEW mv_daily_revenue AS
SELECT
order_date,
SUM(net_amount) AS daily_revenue,
COUNT(*) AS order_count
FROM fact_orders
GROUP BY order_date;
-- Refresh materialized view
ALTER MATERIALIZED VIEW mv_daily_revenue REFRESH;
-- Partition-specific query
SELECT * FROM fact_orders
WHERE order_date = '2025-01-15' -- Prunes to single partition
AND customer_id = 12345; -- Uses index within partition
Pipeline Optimization
Pipeline parallelism executes independent pipeline stages simultaneously to reduce end-to-end latency. The critical path (longest sequential dependency chain) determines minimum pipeline duration.
Pipeline Parallelism
- Serial Execution: T_serial = Ξ£ T_stage_i
- Parallel Execution: T_parallel = max(T_stage_i) + T_serial_independent
- Speedup: S = T_serial / T_parallel
- Efficiency: E = S / N_processors
- Amdahl's Law: Speedup = 1 / ((1-P) + P/N) where P = parallelizable fraction
- Bottleneck: Stage with max(T_stage_i) limits overall throughput
# Parallel Pipeline Execution with Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta
import concurrent.futures
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def extract_data(source_name: str, **context):
"""Extract data from a source."""
print(f"Extracting data from {source_name}")
# Simulate extraction
return {"source": source_name, "rows": 10000}
def transform_data(data: dict, transform_name: str, **context):
"""Apply transformation."""
print(f"Applying {transform_name} to {data['source']}")
return {**data, "transformed": True}
def load_data(data: dict, target: str, **context):
"""Load data into target."""
print(f"Loading {data['source']} into {target}")
with DAG(
dag_id='parallel_pipeline',
default_args=default_args,
schedule_interval='0 6 * * *',
start_date=datetime(2025, 1, 1),
) as dag:
# Independent extractions (run in parallel)
with TaskGroup("extract") as extract_group:
extract_shopify = PythonOperator(
task_id='extract_shopify',
python_callable=extract_data,
op_kwargs={'source_name': 'shopify'},
)
extract_stripe = PythonOperator(
task_id='extract_stripe',
python_callable=extract_data,
op_kwargs={'source_name': 'stripe'},
)
extract_segment = PythonOperator(
task_id='extract_segment',
python_callable=extract_data,
op_kwargs={'source_name': 'segment'},
)
# Transformations (some parallel, some sequential)
with TaskGroup("transform") as transform_group:
transform_orders = PythonOperator(
task_id='transform_orders',
python_callable=transform_data,
op_kwargs={'transform_name': 'orders'},
)
transform_payments = PythonOperator(
task_id='transform_payments',
python_callable=transform_data,
op_kwargs={'transform_name': 'payments'},
)
transform_events = PythonOperator(
task_id='transform_events',
python_callable=transform_data,
op_kwargs={'transform_name': 'events'},
)
# Load (parallel)
with TaskGroup("load") as load_group:
load_dim = PythonOperator(
task_id='load_dimensions',
python_callable=load_data,
op_kwargs={'target': 'dim_tables'},
)
load_fact = PythonOperator(
task_id='load_facts',
python_callable=load_data,
op_kwargs={'target': 'fact_tables'},
)
# Dependencies
[extract_shopify, extract_stripe, extract_segment] >> \
[transform_orders, transform_payments, transform_events] >> \
[load_dim, load_fact]
Caching Strategies
Caching stores frequently accessed data in fast storage (memory, SSD) to avoid repeated expensive computations or I/O operations. Cache hit ratio determines effectiveness.
| Cache Type | Location | Latency | Use Case | Invalidation |
|---|---|---|---|---|
| Result Cache | Warehouse | < 1ms | Repeat queries | TTL-based |
| Materialized View | Database | 1-10ms | Dashboard queries | Scheduled refresh |
| BI Engine | In-memory | < 100ms | Interactive analytics | Auto-refresh |
| Application Cache | Redis/Memcached | 1-5ms | API responses | Event-driven |
| CDN Cache | Edge | 10-50ms | Static content | TTL-based |
| Query Cache | Warehouse | < 1ms | Identical queries | 24-hour TTL |
-- Snowflake: Result caching (automatic)
-- Identical queries within 24 hours return cached results
SELECT * FROM fact_orders WHERE order_date = '2025-01-15';
-- Materialized View for caching aggregations
CREATE MATERIALIZED VIEW mv_customer_metrics AS
SELECT
customer_key,
COUNT(*) AS order_count,
SUM(net_amount) AS lifetime_value,
AVG(net_amount) AS avg_order_value
FROM fact_orders
GROUP BY customer_key;
-- BigQuery: BI Engine for in-memory caching
-- Reserve BI Engine capacity via Console
-- Redshift: Result caching
-- Automatic for identical queries
-- Application-level caching with Redis
import redis
import json
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_cached_query(query_hash: str):
"""Get cached query result."""
cached = redis_client.get(f"query:{query_hash}")
if cached:
return json.loads(cached)
return None
def cache_query_result(query_hash: str, result: list, ttl: int = 3600):
"""Cache query result with TTL."""
redis_client.setex(
f"query:{query_hash}",
ttl,
json.dumps(result)
)
Performance Profiling Tools
Performance profiling systematically measures and analyzes system behavior to identify bottlenecks. It involves query plan analysis, resource utilization monitoring, and trace analysis to pinpoint optimization opportunities.
| Tool | Purpose | Platform | Key Feature |
|---|---|---|---|
| EXPLAIN ANALYZE | Query plan analysis | SQL databases | Execution plan visualization |
| Snowflake Query Profile | Warehouse performance | Snowflake | Operator-level metrics |
| BigQuery Execution Details | Slot utilization | BigQuery | Slot usage over time |
| Spark UI | Distributed job analysis | Spark | DAG visualization, task metrics |
| Airflow DAG Audit | Pipeline timing | Airflow | Task duration, success rate |
| Datadog | Infrastructure monitoring | Multi-cloud | Metrics, traces, logs |
| dbt Artifacts | Model build times | dbt | Model-level performance |
-- Snowflake: Query profile analysis
SELECT
query_id,
query_text,
execution_status,
ROUND(bytes_scanned / 1024 / 1024 / 1024, 2) AS gb_scanned,
partitions_scanned,
partitions_total,
ROUND(partitions_scanned * 100.0 / partitions_total, 2) AS scan_pct,
compilation_time_ms,
execution_time_ms,
total_execution_time_ms,
ROUND(credits_used, 4) AS credits,
result_cache_hit
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE start_time >= DATEADD(hour, -1, CURRENT_DATE())
ORDER BY execution_time_ms DESC
LIMIT 10;
-- BigQuery: Slot utilization analysis
SELECT
job_id,
query,
TIMESTAMP_DIFF(end_time, start_time, MILLISECOND) AS duration_ms,
total_slot_ms / TIMESTAMP_DIFF(end_time, start_time, MILLISECOND) AS avg_slots,
total_bytes_processed / 1024 / 1024 / 1024 AS data_gb,
statement_type
FROM `region-us`.INFORMATION_SCHEMA.JOBS
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
ORDER BY total_slot_ms DESC
LIMIT 10;
-- Spark: Analyze shuffle and spill
EXPLAIN EXTENDED
SELECT
customer_id,
SUM(amount) AS total_spent
FROM fact_orders
GROUP BY customer_id
ORDER BY total_spent DESC;
Key Concepts Summary
| Optimization | Layer | Impact | Complexity | Cost |
|---|---|---|---|---|
| Partition Pruning | Query | 10-100x | Low | Minimal |
| Indexing | Query | 10-1000x | Medium | +10-30% storage |
| Materialized Views | Query | 2-10x | Low | +storage |
| Parallel Execution | Pipeline | 2-10x | Medium | +compute |
| Incremental Processing | Pipeline | 10-100x | Medium | Minimal |
| Result Caching | Query | 2-100x | Low | Minimal |
| Columnar Storage | Storage | 2-10x | Low | -50% storage |
| Compression | Storage | 2-5x | Low | +CPU |
| Right-sizing | Infrastructure | 2-5x | Low | -30-60% cost |
| Auto-scaling | Infrastructure | Variable | Medium | Variable |
Performance Metrics
| Metric | Unoptimized | Optimized | Improvement |
|---|---|---|---|
| Query Latency (1TB) | 300 sec | 30 sec | 10x |
| Pipeline Duration | 4 hours | 45 min | 5x |
| Cost per Query | 0.63 | 90% | |
| Storage Used | 10 TB | 3 TB | 70% |
| Concurrent Users | 10 | 100 | 10x |
| Cache Hit Rate | 0% | 60% | +60% |
| Data Freshness | 24 hours | 1 hour | 24x |
10 Best Practices
- Profile queries before optimizing β identify the actual bottleneck before fixing
- Use EXPLAIN plans β understand how the query engine executes your queries
- Partition by date β the single most impactful optimization for time-series data
- Materialize repeated aggregations β dashboards should query materialized views
- Implement incremental processing β process only new/changed data, not full rebuilds
- Cache expensive computations β use result caching, materialized views, or Redis
- Parallelize independent stages β use Airflow TaskGroups for parallel execution
- Compress data aggressively β columnar formats (Parquet) with Snappy compression
- Right-size compute resources β monitor utilization and adjust weekly
- Set performance budgets β define latency targets and alert on violations
- Profile first, optimize second β identify the actual bottleneck before applying fixes
- Partition pruning is the single most impactful query optimization (10-100x speedup)
- Incremental processing reduces pipeline duration from hours to minutes
- Caching (materialized views, result cache, BI Engine) provides instant query acceleration
- Parallelism and auto-scaling enable elastic performance for variable workloads
See Also
- Partitioning & Indexing β Range, hash, and bloom filter strategies
- Cost Optimization β Right-sizing and auto-suspend for cost savings
- Snowflake Fundamentals β Micro-partition pruning and clustering keys
- BigQuery Fundamentals β BI Engine and materialized views
- dbt Advanced β Model build optimization and ephemeral materialization
- Interview Prep β Performance optimization interview questions