🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Python WebSockets — Real-Time Communication

Python WebWebSockets🟢 Free Lesson

Advertisement

Python WebSockets — Real-Time Communication

WebSockets enable bidirectional, real-time communication between client and server. Unlike HTTP (request-response), WebSockets maintain a persistent connection for live updates. This tutorial covers the WebSocket protocol, building servers with FastAPI, handling concurrent connections, and building real-world applications.

Learning Objectives

  • Understand WebSocket vs HTTP trade-offs
  • Build WebSocket servers with FastAPI
  • Handle concurrent connections with a connection manager
  • Implement chat and notification systems
  • Add authentication to WebSocket connections
  • Build a complete real-time notification system

WebSocket vs HTTP

Architecture Diagram
HTTP (Request-Response):
Client --► "Give me data" --► Server
Client ◄-- "Here is data" ◄-- Server
(Connection closes)

WebSocket (Persistent):
Client ◄---- Persistent Connection ----► Server
Client --► "Hello" ----------------------► Server
Client ◄-- "Response" ◄------------------ Server
Client ◄-- "Update"  ◄------------------ Server (server can push!)
Client --► "Message" --------------------► Server

Feature Comparison

FeatureHTTPWebSocket
ConnectionRequest-responsePersistent, bidirectional
Server pushPolling/SSE onlyNative
OverheadHeaders per requestMinimal after handshake
Use caseREST APIs, pagesChat, gaming, live updates
Protocolhttp://ws:// or wss://
Browser supportUniversalUniversal

Use cases: Chat apps, live dashboards, multiplayer games, real-time notifications, live sports scores, collaborative editing.


FastAPI WebSocket Server

Basic Echo Server

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Echo: {data}")
    except WebSocketDisconnect:
        print("Client disconnected")

Connection Manager

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List, Dict, Set

class ConnectionManager:
    """Manages active WebSocket connections."""

    def __init__(self):
        self.active_connections: List[WebSocket] = []
        self.user_connections: Dict[str, Set[WebSocket]] = {}

    async def connect(self, websocket: WebSocket, user_id: str = None):
        await websocket.accept()
        self.active_connections.append(websocket)
        if user_id:
            if user_id not in self.user_connections:
                self.user_connections[user_id] = set()
            self.user_connections[user_id].add(websocket)

    def disconnect(self, websocket: WebSocket, user_id: str = None):
        self.active_connections.remove(websocket)
        if user_id and user_id in self.user_connections:
            self.user_connections[user_id].discard(websocket)
            if not self.user_connections[user_id]:
                del self.user_connections[user_id]

    async def send_personal(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def send_to_user(self, user_id: str, message: str):
        if user_id in self.user_connections:
            for connection in self.user_connections[user_id]:
                await connection.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

    def get_online_users(self) -> List[str]:
        return list(self.user_connections.keys())

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await manager.connect(websocket, client_id)
    try:
        await manager.broadcast(f"{client_id} joined the chat")

        while True:
            data = await websocket.receive_text()
            await manager.send_personal(f"You said: {data}", websocket)
            await manager.broadcast(f"{client_id}: {data}")

    except WebSocketDisconnect:
        manager.disconnect(websocket, client_id)
        await manager.broadcast(f"{client_id} left the chat")

WebSocket Client

Basic Client

import asyncio
import websockets

async def client():
    async with websockets.connect("ws://localhost:8000/ws/alice") as ws:
        # Send message
        await ws.send("Hello, server!")

        # Receive response
        response = await ws.recv()
        print(f"Received: {response}")

asyncio.run(client())

Multi-User Client

import asyncio
import websockets
import json

async def user_client(user_id: str, messages: list):
    async with websockets.connect(f"ws://localhost:8000/ws/{user_id}") as ws:
        for message in messages:
            await ws.send(message)
            response = await ws.recv()
            print(f"{user_id} received: {response}")

async def main():
    await asyncio.gather(
        user_client("alice", ["Hi!", "How are you?"]),
        user_client("bob", ["Hey!", "Good thanks!"]),
    )

asyncio.run(main())

Real-Time Chat Application

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query
from fastapi.responses import HTMLResponse
from typing import Dict, Set
import json
from datetime import datetime

app = FastAPI()

class ChatRoom:
    def __init__(self):
        self.connections: Dict[str, WebSocket] = {}
        self.history: list = []

    async def connect(self, user_id: str, websocket: WebSocket):
        await websocket.accept()
        self.connections[user_id] = websocket
        await self.broadcast({
            "type": "user_joined",
            "user_id": user_id,
            "users": list(self.connections.keys()),
            "timestamp": datetime.now().isoformat(),
        })

    def disconnect(self, user_id: str):
        if user_id in self.connections:
            del self.connections[user_id]

    async def broadcast(self, message: dict):
        self.history.append(message)
        for connection in self.connections.values():
            await connection.send_json(message)

    async def send_to_user(self, user_id: str, message: dict):
        if user_id in self.connections:
            await self.connections[user_id].send_json(message)

rooms: Dict[str, ChatRoom] = {}

@app.websocket("/ws/chat/{room_id}")
async def chat_endpoint(
    websocket: WebSocket,
    room_id: str,
    user_id: str = Query(...)
):
    if room_id not in rooms:
        rooms[room_id] = ChatRoom()

    room = rooms[room_id]
    await room.connect(user_id, websocket)

    try:
        # Send chat history
        for msg in room.history[-50:]:  # Last 50 messages
            await websocket.send_json(msg)

        while True:
            data = await websocket.receive_json()

            if data["type"] == "message":
                await room.broadcast({
                    "type": "message",
                    "user_id": user_id,
                    "content": data["content"],
                    "timestamp": datetime.now().isoformat(),
                })
            elif data["type"] == "private":
                target = data["target"]
                await room.send_to_user(target, {
                    "type": "private",
                    "from": user_id,
                    "content": data["content"],
                    "timestamp": datetime.now().isoformat(),
                })

    except WebSocketDisconnect:
        room.disconnect(user_id)
        await room.broadcast({
            "type": "user_left",
            "user_id": user_id,
            "users": list(room.connections.keys()),
            "timestamp": datetime.now().isoformat(),
        })

Real-Time Notification System

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.responses import HTMLResponse
from typing import Dict, Set, List
from datetime import datetime
import json
import asyncio

app = FastAPI()

class NotificationManager:
    def __init__(self):
        self.subscriptions: Dict[str, WebSocket] = {}
        self.notification_queue: Dict[str, List[dict]] = {}

    async def subscribe(self, user_id: str, ws: WebSocket):
        await ws.accept()
        self.subscriptions[user_id] = ws

        # Send queued notifications
        if user_id in self.notification_queue:
            for notification in self.notification_queue[user_id]:
                await ws.send_json(notification)
            del self.notification_queue[user_id]

    def unsubscribe(self, user_id: str):
        if user_id in self.subscriptions:
            del self.subscriptions[user_id]

    async def notify(self, user_id: str, notification: dict):
        notification["timestamp"] = datetime.now().isoformat()

        if user_id in self.subscriptions:
            try:
                await self.subscriptions[user_id].send_json(notification)
            except:
                self.unsubscribe(user_id)
                # Queue for later
                self._queue_notification(user_id, notification)
        else:
            self._queue_notification(user_id, notification)

    async def broadcast(self, notification: dict):
        notification["timestamp"] = datetime.now().isoformat()
        disconnected = []
        for user_id, ws in self.subscriptions.items():
            try:
                await ws.send_json(notification)
            except:
                disconnected.append(user_id)
        for user_id in disconnected:
            self.unsubscribe(user_id)

    def _queue_notification(self, user_id: str, notification: dict):
        if user_id not in self.notification_queue:
            self.notification_queue[user_id] = []
        self.notification_queue[user_id].append(notification)
        # Keep only last 100 queued notifications
        self.notification_queue[user_id] = self.notification_queue[user_id][-100:]

manager = NotificationManager()

@app.websocket("/ws/notifications")
async def notification_endpoint(
    websocket: WebSocket,
    user_id: str
):
    await manager.subscribe(user_id, websocket)
    try:
        while True:
            # Keep connection alive, handle client messages
            data = await websocket.receive_text()
            if data == "ping":
                await websocket.send_json({"type": "pong"})
    except WebSocketDisconnect:
        manager.unsubscribe(user_id)

# API to send notifications
@app.post("/api/notify/{user_id}")
async def send_notification(user_id: str, notification: dict):
    await manager.notify(user_id, notification)
    return {"status": "sent"}

@app.post("/api/broadcast")
async def broadcast_notification(notification: dict):
    await manager.broadcast(notification)
    return {"status": "broadcast"}

Authentication for WebSockets

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query
import jwt

SECRET_KEY = "your-secret-key"

async def authenticate_websocket(websocket: WebSocket, token: str) -> dict:
    """Authenticate WebSocket connection via JWT token."""
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return payload
    except jwt.ExpiredSignatureError:
        await websocket.close(code=4001, reason="Token expired")
        return None
    except jwt.InvalidTokenError:
        await websocket.close(code=4002, reason="Invalid token")
        return None

@app.websocket("/ws/authenticated")
async def authenticated_websocket(
    websocket: WebSocket,
    token: str = Query(...)
):
    user = await authenticate_websocket(websocket, token)
    if not user:
        return

    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_json({
                "user": user["user_id"],
                "message": data,
            })
    except WebSocketDisconnect:
        pass

Common Mistakes

MistakeProblemSolution
No heartbeat/pingStale connections accumulateImplement ping/pong
Not handling disconnectsMemory leaksAlways use try/except WebSocketDisconnect
No authenticationAnyone can connectValidate tokens on connection
No message validationCrashes from bad dataValidate all incoming messages
Blocking operationsBlocks event loopUse async/await for I/O
No connection limitsResource exhaustionLimit concurrent connections
Not broadcasting errorsUsers unaware of issuesNotify on errors

Best Practices

  1. Handle disconnections gracefully — always use try/except WebSocketDisconnect
  2. Use connection managers — track active connections efficiently
  3. Broadcast messages to all connected clients when needed
  4. Add authentication for WebSocket connections
  5. Use ping/pong to detect stale connections
  6. Limit message size — prevent abuse
  7. Queue messages for offline users — ensure delivery
  8. Use JSON for structured messages
  9. Add reconnection logic on the client side
  10. Monitor connection count — prevent resource exhaustion

Key Takeaways

  1. WebSockets are full-duplex (both directions simultaneously)
  2. Use WebSockets for real-time features (chat, gaming, live data)
  3. Handle disconnections gracefully with try/except
  4. Use connection managers to track active connections
  5. Broadcast messages to all connected clients when needed
  6. Consider authentication for WebSocket connections
  7. Use ping/pong to detect stale connections
  8. Queue messages for offline users
  9. Always validate incoming messages
  10. Monitor connection count to prevent resource exhaustion

Premium Content

Python WebSockets — Real-Time Communication

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
💼Interview Prep
📜Certificates
🤝Community Access

Already a member? Log in

Need Expert Python Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement