πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Threading & Multiprocessing in Python

Python InterviewConcurrency⭐ Premium

Advertisement

Threading & Multiprocessing in Python

Difficulty: Hard | Companies: Google, Meta, Amazon, Netflix, Stripe

Understanding the GIL

import threading
import multiprocessing
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# GIL Impact Demonstration
def cpu_bound_task(n):
    """CPU-intensive task affected by GIL."""
    total = 0
    for i in range(n):
        total += i * i
    return total

def io_bound_task():
    """IO-bound task not affected by GIL."""
    time.sleep(1)
    return "IO complete"

# Threading - Good for IO-bound tasks
def threading_demo():
    """Threading doesn't help CPU-bound tasks due to GIL."""
    start = time.time()
    
    with ThreadPoolExecutor(max_workers=2) as executor:
        futures = [executor.submit(cpu_bound_task, 10**7) for _ in range(2)]
        results = [f.result() for f in futures]
    
    end = time.time()
    print(f"Threading: {end - start:.2f}s")  # ~2x slower than single thread
    return results

# Multiprocessing - Good for CPU-bound tasks
def multiprocessing_demo():
    """Multiprocessing bypasses GIL for CPU-bound tasks."""
    start = time.time()
    
    with ProcessPoolExecutor(max_workers=2) as executor:
        futures = [executor.submit(cpu_bound_task, 10**7) for _ in range(2)]
        results = [f.result() for f in futures]
    
    end = time.time()
    print(f"Multiprocessing: {end - start:.2f}s")  # ~1x time with 2 cores
    return results

ℹ️

Use threading for IO-bound tasks (network calls, file I/O). Use multiprocessing for CPU-bound tasks (data processing, computations).

Thread Synchronization

Lock and RLock

import threading
from contextlib import contextmanager

class SafeCounter:
    """Thread-safe counter using Lock."""
    
    def __init__(self):
        self._value = 0
        self._lock = threading.Lock()
    
    def increment(self):
        with self._lock:
            self._value += 1
    
    def get_value(self):
        with self._lock:
            return self._value

class BankAccount:
    """Thread-safe bank account with reentrant lock."""
    
    def __init__(self, balance: float = 0):
        self._balance = balance
        self._lock = threading.RLock()  # Reentrant lock
    
    @property
    def balance(self):
        with self._lock:
            return self._balance
    
    def deposit(self, amount: float):
        with self._lock:
            if amount <= 0:
                raise ValueError("Deposit must be positive")
            self._balance += amount
            return self._balance
    
    def withdraw(self, amount: float):
        with self._lock:
            if amount <= 0:
                raise ValueError("Withdrawal must be positive")
            if amount > self._balance:
                raise ValueError("Insufficient funds")
            self._balance -= amount
            return self._balance
    
    def transfer(self, other: 'BankAccount', amount: float):
        """Transfer money between accounts with deadlock prevention."""
        # Always lock in consistent order (by id)
        first, second = (self, other) if id(self) < id(other) else (other, self)
        
        with first._lock:
            with second._lock:
                self.withdraw(amount)
                other.deposit(amount)

⚠️

Always acquire locks in a consistent order to prevent deadlocks. Use RLock when a thread may need to acquire the same lock multiple times.

Condition Variables and Semaphores

import threading
import time
from collections import deque

class BoundedBuffer:
    """Producer-consumer pattern using Condition variables."""
    
    def __init__(self, capacity: int):
        self.buffer = deque()
        self.capacity = capacity
        self.not_full = threading.Condition(threading.Lock())
        self.not_empty = threading.Condition(threading.Lock())
    
    def put(self, item):
        with self.not_full:
            while len(self.buffer) >= self.capacity:
                self.not_full.wait()
            
            self.buffer.append(item)
            self.not_empty.notify()
    
    def get(self):
        with self.not_empty:
            while len(self.buffer) == 0:
                self.not_empty.wait()
            
            item = self.buffer.popleft()
            self.not_full.notify()
            return item

class ConnectionPool:
    """Connection pool using Semaphore."""
    
    def __init__(self, max_connections: int):
        self.semaphore = threading.Semaphore(max_connections)
        self.connections = []
        self._lock = threading.Lock()
    
    def get_connection(self):
        """Get a connection from the pool."""
        self.semaphore.acquire()
        
        with self._lock:
            if self.connections:
                return self.connections.pop()
            return self._create_connection()
    
    def release_connection(self, conn):
        """Return connection to the pool."""
        with self._lock:
            self.connections.append(conn)
        self.semaphore.release()
    
    def _create_connection(self):
        """Create a new database connection."""
        return {"id": len(self.connections), "active": True}

# Usage
pool = ConnectionPool(max_connections=5)

def worker(worker_id):
    conn = pool.get_connection()
    try:
        print(f"Worker {worker_id} using connection {conn['id']}")
        time.sleep(0.1)
    finally:
        pool.release_connection(conn)

threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

Multiprocessing Patterns

Process Pool for Parallel Processing

from multiprocessing import Pool, Queue, Process, Manager
from functools import partial
import os

def worker_function(data_chunk, multiplier):
    """Worker function for parallel processing."""
    pid = os.getpid()
    result = [x * multiplier for x in data_chunk]
    return pid, result

def parallel_map_example():
    """Demonstrate parallel map with process pool."""
    data = list(range(20))
    chunk_size = 5
    chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]
    
    with Pool(processes=4) as pool:
        func = partial(worker_function, multiplier=2)
        results = pool.map(func, chunks)
    
    for pid, chunk_result in results:
        print(f"PID {pid}: {chunk_result}")

class ProgressTracker:
    """Track progress across multiple processes."""
    
    def __init__(self):
        self.manager = Manager()
        self.progress = self.manager.dict()
        self.lock = self.manager.Lock()
    
    def update(self, task_id, progress):
        with self.lock:
            self.progress[task_id] = progress
    
    def get_progress(self):
        with self.lock:
            return dict(self.progress)

def process_with_progress(task_id, data, tracker):
    """Process data with progress tracking."""
    for i, item in enumerate(data):
        # Process item
        time.sleep(0.01)
        tracker.update(task_id, (i + 1) / len(data))
    return f"Task {task_id} complete"

Inter-Process Communication

from multiprocessing import Process, Queue, Pipe
import time

def producer(queue, n):
    """Producer putting items in queue."""
    for i in range(n):
        item = {"id": i, "data": f"item_{i}"}
        queue.put(item)
        print(f"Produced: {item}")
        time.sleep(0.1)
    queue.put(None)  # Sentinel value

def consumer(queue):
    """Consumer getting items from queue."""
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"Consumed: {item}")

def pipe_communication():
    """Demonstrate bidirectional communication with Pipe."""
    parent_conn, child_conn = Pipe()
    
    def sender(connection):
        for i in range(5):
            connection.send({"message": f"Hello {i}"})
        connection.close()
    
    def receiver(connection):
        while True:
            try:
                msg = connection.recv()
                print(f"Received: {msg}")
            except EOFError:
                break
    
    p1 = Process(target=sender, args=(parent_conn,))
    p2 = Process(target=receiver, args=(child_conn,))
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()

# Shared Memory
from multiprocessing import Array, Value

def shared_memory_example():
    """Demonstrate shared memory between processes."""
    counter = Value('i', 0)  # Shared integer
    array = Array('d', [0.0] * 5)  # Shared array of doubles
    
    def increment_counter(counter, n):
        for _ in range(n):
            with counter.get_lock():
                counter.value += 1
    
    def update_array(array, index, value):
        array[index] = value
    
    processes = [
        Process(target=increment_counter, args=(counter, 1000)),
        Process(target=increment_counter, args=(counter, 1000)),
    ]
    
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    
    print(f"Final counter: {counter.value}")  # 2000

Thread Safety Patterns

Thread-Local Storage

import threading
from typing import Dict, Any

class ThreadContext:
    """Thread-local storage for request context."""
    
    def __init__(self):
        self._local = threading.local()
    
    def get(self, key: str, default: Any = None) -> Any:
        return getattr(self._local, key, default)
    
    def set(self, key: str, value: Any):
        setattr(self._local, key, value)
    
    def clear(self):
        self._local.__dict__.clear()

# Global context
context = ThreadContext()

def handle_request(request_id):
    """Simulate handling a request."""
    context.set("request_id", request_id)
    context.set("user", f"user_{request_id}")
    
    # Process request
    time.sleep(0.1)
    
    # Access context
    print(f"Request {context.get('request_id')} handled by {context.get('user')}")
    context.clear()

# Multiple threads with separate contexts
threads = [threading.Thread(target=handle_request, args=(i,)) for i in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

Producer-Consumer with Queue

from queue import Queue, Empty
from threading import Thread, Event

class ThreadPool:
    """Simple thread pool implementation."""
    
    def __init__(self, num_workers: int):
        self.tasks = Queue()
        self.workers = []
        self.shutdown_event = Event()
        
        for _ in range(num_workers):
            worker = Thread(target=self._worker_loop, daemon=True)
            worker.start()
            self.workers.append(worker)
    
    def _worker_loop(self):
        """Worker thread main loop."""
        while not self.shutdown_event.is_set():
            try:
                task = self.tasks.get(timeout=0.1)
                try:
                    task()
                except Exception as e:
                    print(f"Task error: {e}")
                finally:
                    self.tasks.task_done()
            except Empty:
                continue
    
    def submit(self, task):
        """Submit a task to the pool."""
        self.tasks.put(task)
    
    def shutdown(self, wait=True):
        """Shutdown the thread pool."""
        self.shutdown_event.set()
        if wait:
            self.tasks.join()

# Usage
def sample_task(task_id):
    print(f"Task {task_id} running in {threading.current_thread().name}")
    time.sleep(0.1)

pool = ThreadPool(num_workers=3)
for i in range(10):
    pool.submit(lambda i=i: sample_task(i))
pool.shutdown()

ℹ️

For production systems, consider using concurrent.futures ThreadPoolExecutor or ProcessPoolExecutor instead of manual thread management.

Follow-Up Questions

  1. Explain the Global Interpreter Lock (GIL) and its impact on Python concurrency.

  2. When would you use threading vs multiprocessing vs asyncio?

  3. How do you prevent deadlocks in multi-threaded applications?

  4. Explain the difference between Lock and RLock.

  5. How does Python's garbage collector interact with multi-threaded programs?

Advertisement