πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Performance Optimization in Python

Python InterviewOptimization⭐ Premium

Advertisement

Performance Optimization in Python

Difficulty: Hard | Companies: Google, Meta, Amazon, Netflix, Stripe

Profiling and Benchmarking

import cProfile
import time
import functools
from typing import Callable, Any
from collections import defaultdict
import tracemalloc

# Simple timing decorator
def timing_decorator(func: Callable) -> Callable:
    """Measure execution time."""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        end = time.perf_counter()
        print(f"{func.__name__}: {end - start:.6f}s")
        return result
    return wrapper

# Memory profiling decorator
def memory_profile(func: Callable) -> Callable:
    """Measure memory usage."""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        tracemalloc.start()
        
        result = func(*args, **kwargs)
        
        current, peak = tracemalloc.get_traced_memory()
        tracemalloc.stop()
        
        print(f"{func.__name__} memory: {current / 1024 / 1024:.2f} MB (peak: {peak / 1024 / 1024:.2f} MB)")
        return result
    return wrapper

# Advanced profiler
class PerformanceProfiler:
    """Comprehensive performance profiler."""
    
    def __init__(self):
        self.metrics = defaultdict(list)
    
    def profile(self, func: Callable) -> Callable:
        """Profile function execution."""
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # Time profiling
            start_time = time.perf_counter()
            
            # Memory profiling
            tracemalloc.start()
            
            result = func(*args, **kwargs)
            
            # Collect metrics
            end_time = time.perf_counter()
            current, peak = tracemalloc.get_traced_memory()
            tracemalloc.stop()
            
            self.metrics[func.__name__].append({
                'time': end_time - start_time,
                'memory_current': current,
                'memory_peak': peak,
                'args': str(args)[:100],
                'kwargs': str(kwargs)[:100]
            })
            
            return result
        return wrapper
    
    def get_stats(self, func_name: str = None) -> dict:
        """Get profiling statistics."""
        if func_name:
            return self._calculate_stats(self.metrics[func_name])
        
        return {name: self._calculate_stats(metrics) 
                for name, metrics in self.metrics.items()}
    
    def _calculate_stats(self, metrics: list) -> dict:
        """Calculate statistics for a function."""
        times = [m['time'] for m in metrics]
        memories = [m['memory_current'] for m in metrics]
        
        return {
            'calls': len(metrics),
            'total_time': sum(times),
            'avg_time': sum(times) / len(times) if times else 0,
            'min_time': min(times) if times else 0,
            'max_time': max(times) if times else 0,
            'avg_memory': sum(memories) / len(memories) if memories else 0,
            'peak_memory': max(m['memory_peak'] for m in metrics) if metrics else 0
        }
    
    def print_report(self):
        """Print profiling report."""
        print("\n=== Performance Report ===")
        for name, stats in self.get_stats().items():
            print(f"\n{name}:")
            print(f"  Calls: {stats['calls']}")
            print(f"  Time: {stats['total_time']:.6f}s (avg: {stats['avg_time']:.6f}s)")
            print(f"  Memory: {stats['avg_memory'] / 1024 / 1024:.2f} MB (peak: {stats['peak_memory'] / 1024 / 1024:.2f} MB)")

# Usage
profiler = PerformanceProfiler()

@profiler.profile
def fibonacci_recursive(n: int) -> int:
    """Recursive fibonacci - slow for large n."""
    if n <= 1:
        return n
    return fibonacci_recursive(n - 1) + fibonacci_recursive(n - 2)

@profiler.profile
def fibonacci_memoized(n: int, memo: dict = None) -> int:
    """Memoized fibonacci - much faster."""
    if memo is None:
        memo = {}
    
    if n in memo:
        return memo[n]
    
    if n <= 1:
        return n
    
    memo[n] = fibonacci_memoized(n - 1, memo) + fibonacci_memoized(n - 2, memo)
    return memo[n]

# Test
fibonacci_recursive(30)
fibonacci_memoized(30)
profiler.print_report()

ℹ️

Always profile before optimizing. Focus on the hot paths that consume most time or memory.

Algorithmic Optimization

from typing import List, Dict
from collections import defaultdict, Counter
import bisect

# Bad: O(n^2) solution
def two_sum_slow(nums: List[int], target: int) -> List[int]:
    """Brute force two sum - O(n^2)."""
    for i in range(len(nums)):
        for j in range(i + 1, len(nums)):
            if nums[i] + nums[j] == target:
                return [i, j]
    return []

# Good: O(n) solution
def two_sum_fast(nums: List[int], target: int) -> List[int]:
    """Hash map two sum - O(n)."""
    num_map = {}
    for i, num in enumerate(nums):
        complement = target - num
        if complement in num_map:
            return [num_map[complement], i]
        num_map[num] = i
    return []

# Bad: O(n^2) for sorted array search
def search_pair_slow(sorted_nums: List[int], target: int) -> List[int]:
    """Brute force pair search in sorted array."""
    for i in range(len(sorted_nums)):
        for j in range(i + 1, len(sorted_nums)):
            if sorted_nums[i] + sorted_nums[j] == target:
                return [i, j]
    return []

# Good: O(n) with two pointers
def search_pair_fast(sorted_nums: List[int], target: int) -> List[int]:
    """Two pointer pair search in sorted array."""
    left, right = 0, len(sorted_nums) - 1
    
    while left < right:
        current_sum = sorted_nums[left] + sorted_nums[right]
        if current_sum == target:
            return [left, right]
        elif current_sum < target:
            left += 1
        else:
            right -= 1
    
    return []

# Subarray optimization
def max_subarray_brute(nums: List[int]) -> int:
    """Brute force maximum subarray - O(n^2)."""
    max_sum = float('-inf')
    
    for i in range(len(nums)):
        current_sum = 0
        for j in range(i, len(nums)):
            current_sum += nums[j]
            max_sum = max(max_sum, current_sum)
    
    return max_sum

def max_subarray_kadane(nums: List[int]) -> int:
    """Kadane's algorithm - O(n)."""
    max_sum = current_sum = nums[0]
    
    for num in nums[1:]:
        current_sum = max(num, current_sum + num)
        max_sum = max(max_sum, current_sum)
    
    return max_sum

# String concatenation optimization
def build_string_slow(parts: List[str]) -> str:
    """Slow string building - O(n^2) due to immutability."""
    result = ""
    for part in parts:
        result += part  # Creates new string each time
    return result

def build_string_fast(parts: List[str]) -> str:
    """Fast string building - O(n) with join."""
    return "".join(parts)  # Single allocation

# List operations optimization
def find_duplicates_slow(nums: List[int]) -> List[int]:
    """Find duplicates - O(n^2)."""
    duplicates = []
    for i in range(len(nums)):
        for j in range(i + 1, len(nums)):
            if nums[i] == nums[j] and nums[i] not in duplicates:
                duplicates.append(nums[i])
    return duplicates

def find_duplicates_fast(nums: List[int]) -> List[int]:
    """Find duplicates - O(n) with hash set."""
    seen = set()
    duplicates = set()
    
    for num in nums:
        if num in seen:
            duplicates.add(num)
        else:
            seen.add(num)
    
    return list(duplicates)

Caching Strategies

from functools import lru_cache, wraps
from typing import Callable, Any, Optional
import time
from collections import OrderedDict
import hashlib
import json

# Simple LRU Cache
class LRUCache:
    """LRU Cache with TTL support."""
    
    def __init__(self, maxsize: int = 128, ttl: float = 300):
        self.maxsize = maxsize
        self.ttl = ttl
        self.cache = OrderedDict()
        self.timestamps = {}
    
    def get(self, key: str) -> Optional[Any]:
        """Get value from cache."""
        if key in self.cache:
            # Check TTL
            if time.time() - self.timestamps[key] < self.ttl:
                # Move to end (most recently used)
                self.cache.move_to_end(key)
                return self.cache[key]
            else:
                # Expired
                del self.cache[key]
                del self.timestamps[key]
        
        return None
    
    def set(self, key: str, value: Any):
        """Set value in cache."""
        if key in self.cache:
            self.cache.move_to_end(key)
        elif len(self.cache) >= self.maxsize:
            # Remove oldest
            oldest_key = next(iter(self.cache))
            del self.cache[oldest_key]
            del self.timestamps[oldest_key]
        
        self.cache[key] = value
        self.timestamps[key] = time.time()
    
    def invalidate(self, key: str):
        """Remove key from cache."""
        if key in self.cache:
            del self.cache[key]
            del self.timestamps[key]
    
    def clear(self):
        """Clear all cache."""
        self.cache.clear()
        self.timestamps.clear()

# Function-level caching
def cache_with_ttl(ttl: float = 300):
    """Decorator for caching function results with TTL."""
    cache = {}
    
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Create cache key
            key = hashlib.md5(
                json.dumps((args, kwargs), sort_keys=True, default=str).encode()
            ).hexdigest()
            
            # Check cache
            if key in cache:
                cached_time, cached_result = cache[key]
                if time.time() - cached_time < ttl:
                    return cached_result
            
            # Execute and cache
            result = func(*args, **kwargs)
            cache[key] = (time.time(), result)
            
            return result
        
        wrapper.cache = cache
        wrapper.cache_clear = lambda: cache.clear()
        
        return wrapper
    return decorator

# Usage
@cache_with_ttl(ttl=60)
def expensive_computation(n: int) -> int:
    """Expensive computation with caching."""
    time.sleep(0.1)  # Simulate work
    return n ** 2

# Built-in LRU cache
@lru_cache(maxsize=128)
def fibonacci_cached(n: int) -> int:
    """Fibonacci with built-in LRU cache."""
    if n <= 1:
        return n
    return fibonacci_cached(n - 1) + fibonacci_cached(n - 2)

# Cache statistics
print(fibonacci_cached.cache_info())

⚠️

Cache invalidation is one of the hardest problems in computer science. Always consider when cached data becomes stale.

Memory Optimization

import sys
from typing import List, Any
import array

class MemoryOptimizer:
    """Memory optimization techniques."""
    
    @staticmethod
    def compare_data_structures():
        """Compare memory usage of different structures."""
        n = 100000
        
        # List vs array vs generator
        list_data = [i for i in range(n)]
        array_data = array.array('i', range(n))
        generator_data = (i for i in range(n))
        
        print(f"List: {sys.getsizeof(list_data)} bytes")
        print(f"Array: {sys.getsizeof(array_data)} bytes")
        print(f"Generator: {sys.getsizeof(generator_data)} bytes")
        
        # Dict vs slots
        class RegularClass:
            def __init__(self, x, y, z):
                self.x = x
                self.y = y
                self.z = z
        
        class SlotClass:
            __slots__ = ['x', 'y', 'z']
            
            def __init__(self, x, y, z):
                self.x = x
                self.y = y
                self.z = z
        
        regular = RegularClass(1, 2, 3)
        slotted = SlotClass(1, 2, 3)
        
        print(f"Regular class: {sys.getsizeof(regular) + sys.getsizeof(regular.__dict__)} bytes")
        print(f"Slotted class: {sys.getsizeof(slotted)} bytes")

class OptimizedList:
    """Memory-efficient list using array."""
    
    def __init__(self, item_type: str = 'i'):
        self.data = array.array(item_type)
    
    def append(self, item):
        self.data.append(item)
    
    def __getitem__(self, index):
        return self.data[index]
    
    def __len__(self):
        return len(self.data)
    
    def __iter__(self):
        return iter(self.data)

class ObjectPool:
    """Object pool for reducing allocation overhead."""
    
    def __init__(self, factory, max_size: int = 100):
        self.factory = factory
        self.max_size = max_size
        self.pool = []
    
    def acquire(self):
        """Get object from pool."""
        if self.pool:
            return self.pool.pop()
        return self.factory()
    
    def release(self, obj):
        """Return object to pool."""
        if len(self.pool) < self.max_size:
            self.pool.append(obj)
    
    def __enter__(self):
        return self.acquire()
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release(self)
        return False

# Memory-efficient data processing
def process_large_file_memory_efficient(filename: str):
    """Process large file without loading entire content."""
    with open(filename, 'r') as f:
        for line in f:  # Generator - one line at a time
            yield line.strip()

def chunked_processing(data, chunk_size: int = 1000):
    """Process data in chunks."""
    for i in range(0, len(data), chunk_size):
        yield data[i:i + chunk_size]

# Usage
MemoryOptimizer.compare_data_structures()

# Object pool example
pool = ObjectPool(lambda: {"data": [], "metadata": {}}, max_size=10)

with pool as obj:
    obj["data"].append(1)
    print(obj)

Concurrency Optimization

import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from typing import List
import time

# IO-bound: Use threading or asyncio
async def fetch_data_async(urls: List[str]) -> List[dict]:
    """Async fetching for IO-bound tasks."""
    async def fetch_one(url):
        await asyncio.sleep(0.1)  # Simulate network delay
        return {"url": url, "data": "response"}
    
    tasks = [fetch_one(url) for url in urls]
    return await asyncio.gather(*tasks)

def fetch_data_threaded(urls: List[str]) -> List[dict]:
    """Thread-based fetching for IO-bound tasks."""
    def fetch_one(url):
        time.sleep(0.1)  # Simulate network delay
        return {"url": url, "data": "response"}
    
    with ThreadPoolExecutor(max_workers=10) as executor:
        return list(executor.map(fetch_one, urls))

# CPU-bound: Use multiprocessing
def cpu_intensive_task(n: int) -> int:
    """CPU-intensive computation."""
    return sum(i * i for i in range(n))

def parallel_cpu_tasks(numbers: List[int]) -> List[int]:
    """Parallel processing for CPU-bound tasks."""
    with ProcessPoolExecutor(max_workers=4) as executor:
        return list(executor.map(cpu_intensive_task, numbers))

# Hybrid approach
class AdaptiveExecutor:
    """Automatically choose between threading and processing."""
    
    def __init__(self, io_workers: int = 10, cpu_workers: int = 4):
        self.io_executor = ThreadPoolExecutor(max_workers=io_workers)
        self.cpu_executor = ProcessPoolExecutor(max_workers=cpu_workers)
    
    def execute_io(self, func, *args, **kwargs):
        """Execute IO-bound task."""
        return self.io_executor.submit(func, *args, **kwargs)
    
    def execute_cpu(self, func, *args, **kwargs):
        """Execute CPU-bound task."""
        return self.cpu_executor.submit(func, *args, **kwargs)
    
    def shutdown(self):
        """Shutdown both executors."""
        self.io_executor.shutdown()
        self.cpu_executor.shutdown()

# Benchmarking
def benchmark(func, *args, iterations: int = 100, **kwargs):
    """Benchmark function execution."""
    times = []
    
    for _ in range(iterations):
        start = time.perf_counter()
        func(*args, **kwargs)
        end = time.perf_counter()
        times.append(end - start)
    
    return {
        'avg': sum(times) / len(times),
        'min': min(times),
        'max': max(times),
        'total': sum(times)
    }

# Example benchmarks
def sequential_sum(n: int) -> int:
    return sum(range(n))

def parallel_sum(n: int) -> int:
    chunk_size = n // 4
    with ProcessPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(sum, range(i, i + chunk_size)) 
                  for i in range(0, n, chunk_size)]
        return sum(f.result() for f in futures)

# Compare
seq_stats = benchmark(sequential_sum, 1000000, iterations=10)
par_stats = benchmark(parallel_sum, 1000000, iterations=10)

print(f"Sequential: {seq_stats['avg']:.6f}s")
print(f"Parallel: {par_stats['avg']:.6f}s")

ℹ️

Profile first, then optimize. The fastest code is the code that doesn't run.

Follow-Up Questions

  1. Explain the difference between threading and multiprocessing.

  2. When would you use asyncio over threading?

  3. How do you optimize Python code for memory efficiency?

  4. What are the common performance pitfalls in Python?

  5. Explain the concept of lazy evaluation and its benefits.

Advertisement