Python Authentication — JWT, OAuth & Session Management
Authentication verifies user identity. This tutorial covers JWT tokens (access + refresh), OAuth2 flows, password hashing with bcrypt/passlib, session management, and building a complete authentication system.
Learning Objectives
- Generate and verify JWT tokens with access and refresh flows
- Hash passwords securely with bcrypt
- Implement OAuth2 authentication flows
- Build session-based authentication
- Create a complete auth system with FastAPI
Authentication vs Authorization
Architecture Diagram
Authentication: "Who are you?"
-> Login, verify identity
Authorization: "What can you do?"
-> Permissions, roles, access control
Example:
Alice logs in (authentication)
Alice can view her profile (authorization)
Alice cannot delete other users' profiles (no authorization)
Authentication Methods Comparison
| Method | State | Use Case | Complexity |
|---|---|---|---|
| Session/Cookie | Server-side | Traditional web apps | Low |
| JWT | Stateless | APIs, microservices | Medium |
| OAuth2 | Delegated | Third-party login | High |
| API Key | Stateless | Service-to-service | Low |
Password Hashing with bcrypt
import bcrypt
# NEVER store passwords in plain text!
# BAD: password = "my_secret" (stored as-is)
# GOOD: password = bcrypt.hashpw(...) (hashed)
def hash_password(password: str) -> str:
"""Hash a password for storage."""
salt = bcrypt.gensalt(rounds=12) # 12 rounds = slow but secure
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
def verify_password(password: str, hashed: str) -> bool:
"""Verify a password against its hash."""
return bcrypt.checkpw(
password.encode('utf-8'),
hashed.encode('utf-8')
)
# Usage
password = "my_secret_password"
hashed = hash_password(password)
print(f"Hash: {hashed}") # $2b$12$...
# Verify
print(verify_password("my_secret_password", hashed)) # True
print(verify_password("wrong_password", hashed)) # False
Password Hashing with passlib
# pip install passlib bcrypt
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(password: str, hashed: str) -> bool:
return pwd_context.verify(password, hashed)
# Usage
hashed = hash_password("secret123")
print(verify_password("secret123", hashed)) # True
JWT (JSON Web Tokens)
JWT is a stateless authentication mechanism. The token contains all needed information — no server-side session storage required.
JWT Structure
Architecture Diagram
Header.Payload.Signature
Header: {"alg": "HS256", "typ": "JWT"}
Payload: {"user_id": 123, "role": "admin", "exp": 1705312800, "iat": 1705226400}
Signature: HMAC-SHA256(base64(header) + "." + base64(payload), secret)
Access + Refresh Token Flow
import jwt
from datetime import datetime, timedelta
SECRET_KEY = "your-super-secret-key" # Store in environment!
REFRESH_SECRET = "your-refresh-secret-key"
def create_access_token(user_id: int, role: str = "user", expires_hours: int = 1) -> str:
"""Create short-lived access token."""
payload = {
"user_id": user_id,
"role": role,
"type": "access",
"exp": datetime.utcnow() + timedelta(hours=expires_hours),
"iat": datetime.utcnow(),
}
return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
def create_refresh_token(user_id: int, expires_days: int = 7) -> str:
"""Create long-lived refresh token."""
payload = {
"user_id": user_id,
"type": "refresh",
"exp": datetime.utcnow() + timedelta(days=expires_days),
"iat": datetime.utcnow(),
}
return jwt.encode(payload, REFRESH_SECRET, algorithm="HS256")
def verify_token(token: str, token_type: str = "access") -> dict:
"""Verify and decode a token."""
secret = SECRET_KEY if token_type == "access" else REFRESH_SECRET
try:
payload = jwt.decode(token, secret, algorithms=["HS256"])
if payload.get("type") != token_type:
raise ValueError("Invalid token type")
return payload
except jwt.ExpiredSignatureError:
raise ValueError("Token expired")
except jwt.InvalidTokenError:
raise ValueError("Invalid token")
# Usage
access = create_access_token(user_id=123, role="admin")
refresh = create_refresh_token(user_id=123)
print(f"Access: {access[:50]}...")
print(f"Refresh: {refresh[:50]}...")
# Verify
payload = verify_token(access, "access")
print(f"User ID: {payload['user_id']}")
# Refresh expired access token
new_access = create_access_token(user_id=payload["user_id"])
FastAPI Authentication Example
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
security = HTTPBearer()
class TokenRequest(BaseModel):
username: str
password: str
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
# Mock user database
users_db = {
"alice": {"id": 1, "password_hash": hash_password("secret123"), "role": "admin"},
"bob": {"id": 2, "password_hash": hash_password("pass456"), "role": "user"},
}
@app.post("/auth/login", response_model=TokenResponse)
def login(request: TokenRequest):
user = users_db.get(request.username)
if not user or not verify_password(request.password, user["password_hash"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials"
)
access = create_access_token(user["id"], user["role"])
refresh = create_refresh_token(user["id"])
return TokenResponse(
access_token=access,
refresh_token=refresh,
)
@app.post("/auth/refresh", response_model=TokenResponse)
def refresh_token(refresh_token: str):
try:
payload = verify_token(refresh_token, "refresh")
except ValueError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token"
)
user = users_db.get(payload["username"])
access = create_access_token(payload["user_id"], user["role"])
new_refresh = create_refresh_token(payload["user_id"])
return TokenResponse(
access_token=access,
refresh_token=new_refresh,
)
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
try:
payload = verify_token(credentials.credentials, "access")
return payload
except ValueError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
@app.get("/protected")
def protected_route(user: dict = Depends(get_current_user)):
return {"message": f"Hello, user {user['user_id']}"}
@app.get("/admin")
def admin_route(user: dict = Depends(get_current_user)):
if user["role"] != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required"
)
return {"message": "Welcome, admin!"}
OAuth2 Authentication
# pip install httpx
import httpx
from fastapi import FastAPI
from fastapi.responses import RedirectResponse
app = FastAPI()
# GitHub OAuth2 configuration
GITHUB_CLIENT_ID = "your-client-id"
GITHUB_CLIENT_SECRET = "your-client-secret"
GITHUB_AUTHORIZE_URL = "https://github.com/login/oauth/authorize"
GITHUB_TOKEN_URL = "https://github.com/login/oauth/access_token"
GITHUB_USER_URL = "https://api.github.com/user"
@app.get("/auth/github")
def github_login():
"""Redirect to GitHub OAuth."""
return RedirectResponse(
f"{GITHUB_AUTHORIZE_URL}?client_id={GITHUB_CLIENT_ID}&scope=user:email"
)
@app.get("/auth/github/callback")
async def github_callback(code: str):
"""Handle GitHub OAuth callback."""
async with httpx.AsyncClient() as client:
# Exchange code for token
token_response = await client.post(
GITHUB_TOKEN_URL,
data={
"client_id": GITHUB_CLIENT_ID,
"client_secret": GITHUB_CLIENT_SECRET,
"code": code,
},
headers={"Accept": "application/json"},
)
access_token = token_response.json()["access_token"]
# Get user info
user_response = await client.get(
GITHUB_USER_URL,
headers={"Authorization": f"Bearer {access_token}"},
)
user = user_response.json()
return {
"github_id": user["id"],
"username": user["login"],
"email": user.get("email"),
"avatar": user["avatar_url"],
}
Session-Based Authentication
from fastapi import FastAPI, Response, Request
from datetime import datetime, timedelta
import secrets
app = FastAPI()
# In-memory session store (use Redis in production)
sessions = {}
@app.post("/auth/login")
def login(username: str, password: str, response: Response):
# Verify credentials
user = authenticate_user(username, password)
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
# Create session
session_id = secrets.token_urlsafe(32)
sessions[session_id] = {
"user_id": user["id"],
"username": user["username"],
"expires": datetime.now() + timedelta(hours=24),
}
# Set cookie
response.set_cookie(
key="session_id",
value=session_id,
httponly=True, # Not accessible via JavaScript
secure=True, # HTTPS only
samesite="lax", # CSRF protection
max_age=86400, # 24 hours
)
return {"message": "Logged in"}
@app.get("/auth/me")
def get_current_user(request: Request):
session_id = request.cookies.get("session_id")
if not session_id or session_id not in sessions:
raise HTTPException(status_code=401, detail="Not authenticated")
session = sessions[session_id]
if datetime.now() > session["expires"]:
del sessions[session_id]
raise HTTPException(status_code=401, detail="Session expired")
return {"user_id": session["user_id"], "username": session["username"]}
@app.post("/auth/logout")
def logout(request: Request, response: Response):
session_id = request.cookies.get("session_id")
if session_id and session_id in sessions:
del sessions[session_id]
response.delete_cookie("session_id")
return {"message": "Logged out"}
Common Mistakes
| Mistake | Problem | Solution |
|---|---|---|
| Storing passwords in plain text | Catastrophic security breach | Always hash with bcrypt |
| Short JWT expiration without refresh | Users constantly logged out | Use access + refresh token flow |
| Not validating token type | Security bypass | Check type claim in token |
| Hardcoded secrets | Exposed in source code | Use environment variables |
| No rate limiting on login | Brute force attacks | Add rate limiting |
| Not using HTTPS | Tokens intercepted | Always use HTTPS |
| Storing JWT in localStorage | XSS vulnerability | Use httpOnly cookies |
Best Practices
- NEVER store passwords in plain text — always hash with bcrypt (12+ rounds)
- Use access + refresh tokens — short-lived access, long-lived refresh
- Always set token expiration — never issue infinite tokens
- Store secrets in environment variables, not code
- Use HTTPS for all authentication — prevents token interception
- Implement rate limiting — prevent brute force attacks
- Use httpOnly cookies for session-based auth — prevents XSS
- Validate token type — ensure access tokens aren't used as refresh
- Rotate refresh tokens — issue new refresh token on each use
- Log authentication events — monitor for suspicious activity
Key Takeaways
- NEVER store passwords in plain text
- Use bcrypt for password hashing (slow = secure)
- JWT is stateless — good for APIs and microservices
- Always set token expiration
- Store secrets in environment variables, not code
- Use HTTPS for all authentication
- Implement rate limiting to prevent brute force attacks
- Consider using OAuth2 for third-party authentication
- Use access + refresh token flow for better UX
- Log all authentication events for security monitoring