πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Security Best Practices in Python

Python InterviewSecurity⭐ Premium

Advertisement

Security Best Practices in Python

Difficulty: Medium-Hard | Companies: Google, Meta, Amazon, Netflix, Stripe

Authentication and Authorization

import hashlib
import hmac
import secrets
import jwt
from datetime import datetime, timedelta
from typing import Optional, Dict
from functools import wraps
from passlib.context import CryptContext
import bcrypt

# Password Hashing
class PasswordManager:
    """Secure password hashing and verification."""
    
    def __init__(self):
        self.pwd_context = CryptContext(
            schemes=["bcrypt"],
            deprecated="auto",
            bcrypt__rounds=12
        )
    
    def hash_password(self, password: str) -> str:
        """Hash password with bcrypt."""
        return self.pwd_context.hash(password)
    
    def verify_password(self, plain_password: str, hashed_password: str) -> bool:
        """Verify password against hash."""
        return self.pwd_context.verify(plain_password, hashed_password)
    
    def needs_update(self, hashed_password: str) -> bool:
        """Check if password hash needs updating."""
        return self.pwd_context.needs_update(hashed_password)

# JWT Token Management
class JWTManager:
    """JWT token creation and verification."""
    
    def __init__(self, secret_key: str, algorithm: str = "HS256"):
        self.secret_key = secret_key
        self.algorithm = algorithm
        self.access_token_expire = timedelta(minutes=15)
        self.refresh_token_expire = timedelta(days=7)
    
    def create_access_token(self, data: Dict, expires_delta: Optional[timedelta] = None) -> str:
        """Create access token."""
        to_encode = data.copy()
        expire = datetime.utcnow() + (expires_delta or self.access_token_expire)
        to_encode.update({"exp": expire, "type": "access"})
        return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
    
    def create_refresh_token(self, data: Dict) -> str:
        """Create refresh token."""
        to_encode = data.copy()
        expire = datetime.utcnow() + self.refresh_token_expire
        to_encode.update({"exp": expire, "type": "refresh"})
        return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
    
    def verify_token(self, token: str) -> Optional[Dict]:
        """Verify and decode token."""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
            return payload
        except jwt.ExpiredSignatureError:
            raise ValueError("Token expired")
        except jwt.InvalidTokenError:
            raise ValueError("Invalid token")

# API Key Authentication
class APIKeyManager:
    """API key generation and validation."""
    
    def __init__(self):
        self.keys = {}
    
    def generate_key(self, user_id: str, scopes: list = None) -> str:
        """Generate new API key."""
        api_key = f"sk_{secrets.token_hex(32)}"
        
        self.keys[api_key] = {
            "user_id": user_id,
            "scopes": scopes or ["read"],
            "created_at": datetime.now(),
            "last_used": None,
            "is_active": True
        }
        
        return api_key
    
    def validate_key(self, api_key: str) -> Optional[Dict]:
        """Validate API key."""
        if api_key not in self.keys:
            return None
        
        key_info = self.keys[api_key]
        
        if not key_info["is_active"]:
            return None
        
        key_info["last_used"] = datetime.now()
        return key_info
    
    def revoke_key(self, api_key: str) -> bool:
        """Revoke API key."""
        if api_key in self.keys:
            self.keys[api_key]["is_active"] = False
            return True
        return False

# Role-Based Access Control
class RBACManager:
    """Role-based access control."""
    
    def __init__(self):
        self.roles = {
            "admin": ["read", "write", "delete", "manage_users"],
            "editor": ["read", "write"],
            "viewer": ["read"]
        }
        self.user_roles = {}
    
    def assign_role(self, user_id: str, role: str):
        """Assign role to user."""
        if role not in self.roles:
            raise ValueError(f"Invalid role: {role}")
        self.user_roles[user_id] = role
    
    def check_permission(self, user_id: str, permission: str) -> bool:
        """Check if user has permission."""
        if user_id not in self.user_roles:
            return False
        
        role = self.user_roles[user_id]
        return permission in self.roles.get(role, [])

def require_permission(permission: str):
    """Decorator to require specific permission."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Get current user from context
            current_user = kwargs.get('current_user') or args[0] if args else None
            
            if not current_user:
                raise ValueError("No authenticated user")
            
            rbac = RBACManager()
            if not rbac.check_permission(current_user['id'], permission):
                raise PermissionError(f"Missing permission: {permission}")
            
            return func(*args, **kwargs)
        return wrapper
    return decorator

ℹ️

Always use bcrypt or Argon2 for password hashing. Never store passwords in plain text or use weak hashing algorithms like MD5.

Input Validation and Sanitization

import re
from typing import Any, Optional
from pydantic import BaseModel, Field, validator, EmailStr
from html import escape
import bleach

class SecureInputValidator:
    """Input validation and sanitization."""
    
    @staticmethod
    def validate_email(email: str) -> bool:
        """Validate email format."""
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        return bool(re.match(pattern, email))
    
    @staticmethod
    def sanitize_string(input_str: str, max_length: int = 1000) -> str:
        """Sanitize string input."""
        # Remove control characters
        sanitized = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', input_str)
        
        # Trim and limit length
        sanitized = sanitized.strip()[:max_length]
        
        return sanitized
    
    @staticmethod
    def sanitize_html(html_content: str) -> str:
        """Sanitize HTML content."""
        # Allowed tags and attributes
        allowed_tags = ['p', 'b', 'i', 'u', 'em', 'strong', 'a', 'ul', 'ol', 'li']
        allowed_attrs = {'a': ['href', 'title']}
        
        return bleach.clean(html_content, tags=allowed_tags, attributes=allowed_attrs)
    
    @staticmethod
    def validate_sql_input(input_str: str) -> bool:
        """Basic SQL injection detection."""
        sql_patterns = [
            r"(\b(SELECT|INSERT|UPDATE|DELETE|DROP|UNION|ALTER)\b)",
            r"(--|;|'|\"|\bOR\b\s+\b1\b\s*=\s*\b1\b)",
            r"(\bSLEEP\b\s*\()",
            r"(\bBENCHMARK\b\s*\()",
        ]
        
        for pattern in sql_patterns:
            if re.search(pattern, input_str, re.IGNORECASE):
                return False
        
        return True
    
    @staticmethod
    def validate_file_upload(filename: str, allowed_extensions: list) -> bool:
        """Validate file upload."""
        if not filename:
            return False
        
        # Check extension
        ext = filename.rsplit('.', 1)[-1].lower() if '.' in filename else ''
        return ext in allowed_extensions

# Pydantic Models with Validation
class SecureUserInput(BaseModel):
    """User input with security validation."""
    
    username: str = Field(..., min_length=3, max_length=50)
    email: EmailStr
    bio: Optional[str] = Field(None, max_length=500)
    
    @validator('username')
    def validate_username(cls, v):
        """Validate username format."""
        if not re.match(r'^[a-zA-Z0-9_]+$', v):
            raise ValueError('Username can only contain letters, numbers, and underscores')
        return v
    
    @validator('bio')
    def validate_bio(cls, v):
        """Sanitize bio content."""
        if v:
            # Remove HTML tags
            v = re.sub(r'<[^>]+>', '', v)
            # Limit length
            v = v[:500]
        return v

class SecureFileUpload(BaseModel):
    """File upload with security validation."""
    
    filename: str
    content_type: str
    size: int
    
    @validator('filename')
    def validate_filename(cls, v):
        """Validate filename."""
        # Check for path traversal
        if '..' in v or '/' in v or '\\' in v:
            raise ValueError('Invalid filename')
        
        # Check extension
        allowed = ['jpg', 'jpeg', 'png', 'gif', 'pdf', 'txt']
        ext = v.rsplit('.', 1)[-1].lower() if '.' in v else ''
        if ext not in allowed:
            raise ValueError(f'File type not allowed: {ext}')
        
        return v
    
    @validator('size')
    def validate_size(cls, v):
        """Validate file size."""
        max_size = 10 * 1024 * 1024  # 10MB
        if v > max_size:
            raise ValueError(f'File too large: {v} bytes (max: {max_size})')
        return v

Encryption and Cryptography

from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization
import base64
import os

class EncryptionManager:
    """Data encryption and decryption."""
    
    def __init__(self, key: bytes = None):
        if key is None:
            key = Fernet.generate_key()
        self.cipher = Fernet(key)
    
    def encrypt(self, data: str) -> str:
        """Encrypt string data."""
        return self.cipher.encrypt(data.encode()).decode()
    
    def decrypt(self, encrypted_data: str) -> str:
        """Decrypt string data."""
        return self.cipher.decrypt(encrypted_data.encode()).decode()
    
    @staticmethod
    def derive_key_from_password(password: str, salt: bytes = None) -> tuple:
        """Derive encryption key from password."""
        if salt is None:
            salt = os.urandom(16)
        
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=100000,
        )
        
        key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
        return key, salt

class RSAEncryption:
    """RSA encryption for asymmetric operations."""
    
    def __init__(self):
        self.private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048,
        )
        self.public_key = self.private_key.public_key()
    
    def encrypt(self, data: bytes) -> bytes:
        """Encrypt data with public key."""
        return self.public_key.encrypt(
            data,
            padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )
    
    def decrypt(self, encrypted_data: bytes) -> bytes:
        """Decrypt data with private key."""
        return self.private_key.decrypt(
            encrypted_data,
            padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )
    
    def export_public_key(self) -> str:
        """Export public key."""
        return self.public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        ).decode()

# Digital Signatures
class DigitalSignature:
    """Digital signature creation and verification."""
    
    def __init__(self):
        self.private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048,
        )
        self.public_key = self.private_key.public_key()
    
    def sign(self, data: bytes) -> bytes:
        """Create digital signature."""
        return self.private_key.sign(
            data,
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()
        )
    
    def verify(self, data: bytes, signature: bytes) -> bool:
        """Verify digital signature."""
        try:
            self.public_key.verify(
                signature,
                data,
                padding.PSS(
                    mgf=padding.MGF1(hashes.SHA256()),
                    salt_length=padding.PSS.MAX_LENGTH
                ),
                hashes.SHA256()
            )
            return True
        except Exception:
            return False

⚠️

Never implement your own cryptography. Use well-established libraries like cryptography or pyca.

Secure Configuration Management

import os
from typing import Optional
from dataclasses import dataclass
from pathlib import Path
import json

@dataclass
class SecurityConfig:
    """Security configuration."""
    
    # JWT Settings
    jwt_secret_key: str
    jwt_algorithm: str = "HS256"
    jwt_access_token_expire_minutes: int = 15
    jwt_refresh_token_expire_days: int = 7
    
    # Password Settings
    password_min_length: int = 8
    password_require_uppercase: bool = True
    password_require_numbers: bool = True
    password_require_special: bool = True
    
    # Rate Limiting
    rate_limit_requests: int = 100
    rate_limit_window_seconds: int = 60
    
    # CORS Settings
    cors_origins: list = None
    cors_methods: list = None
    
    # File Upload Settings
    max_upload_size_mb: int = 10
    allowed_file_types: list = None
    
    @classmethod
    def from_env(cls) -> 'SecurityConfig':
        """Load configuration from environment variables."""
        return cls(
            jwt_secret_key=os.getenv('JWT_SECRET_KEY', secrets.token_hex(32)),
            cors_origins=os.getenv('CORS_ORIGINS', '*').split(','),
            allowed_file_types=os.getenv('ALLOWED_FILE_TYPES', 'jpg,png,pdf').split(',')
        )

class SecretsManager:
    """Manage application secrets."""
    
    def __init__(self):
        self.secrets = {}
    
    def set_secret(self, name: str, value: str):
        """Set a secret."""
        # In production, use a secure vault like HashiCorp Vault
        self.secrets[name] = value
    
    def get_secret(self, name: str) -> Optional[str]:
        """Get a secret."""
        return self.secrets.get(name)
    
    def load_from_file(self, filepath: str):
        """Load secrets from file."""
        with open(filepath, 'r') as f:
            self.secrets = json.load(f)
    
    def save_to_file(self, filepath: str):
        """Save secrets to file."""
        with open(filepath, 'w') as f:
            json.dump(self.secrets, f, indent=2)

# Security Headers Middleware
class SecurityHeadersMiddleware:
    """Add security headers to responses."""
    
    def __init__(self, app):
        self.app = app
    
    def __call__(self, environ, start_response):
        def custom_start_response(status, headers, exc_info=None):
            # Add security headers
            headers.extend([
                ('X-Content-Type-Options', 'nosniff'),
                ('X-Frame-Options', 'DENY'),
                ('X-XSS-Protection', '1; mode=block'),
                ('Strict-Transport-Security', 'max-age=31536000; includeSubDomains'),
                ('Content-Security-Policy', "default-src 'self'"),
                ('Referrer-Policy', 'strict-origin-when-cross-origin'),
            ])
            return start_response(status, headers, exc_info)
        
        return self.app(environ, custom_start_response)

Security Testing

import pytest
from unittest.mock import Mock

class SecurityTestSuite:
    """Security testing utilities."""
    
    @staticmethod
    def test_sql_injection():
        """Test SQL injection prevention."""
        validator = SecureInputValidator()
        
        # Malicious inputs
        malicious_inputs = [
            "'; DROP TABLE users; --",
            "1' OR '1'='1",
            "admin'--",
            "1; SELECT * FROM users",
        ]
        
        for malicious_input in malicious_inputs:
            assert not validator.validate_sql_input(malicious_input)
    
    @staticmethod
    def test_xss_prevention():
        """Test XSS prevention."""
        validator = SecureInputValidator()
        
        xss_payloads = [
            "<script>alert('xss')</script>",
            "<img src=x onerror=alert('xss')>",
            "javascript:alert('xss')",
        ]
        
        for payload in xss_payloads:
            sanitized = validator.sanitize_html(payload)
            assert "<script>" not in sanitized
            assert "javascript:" not in sanitized
    
    @staticmethod
    def test_path_traversal():
        """Test path traversal prevention."""
        validator = SecureInputValidator()
        
        malicious_filenames = [
            "../../../etc/passwd",
            "..\\..\\windows\\system32",
            "test/../../../secret",
        ]
        
        for filename in malicious_filenames:
            assert not validator.validate_file_upload(filename, ['txt', 'pdf'])

# Security headers check
def test_security_headers(client):
    """Test that security headers are present."""
    response = client.get('/')
    
    assert response.headers.get('X-Content-Type-Options') == 'nosniff'
    assert response.headers.get('X-Frame-Options') == 'DENY'
    assert 'Strict-Transport-Security' in response.headers

ℹ️

Security is a continuous process. Regularly update dependencies, scan for vulnerabilities, and conduct security audits.

Follow-Up Questions

  1. Explain the difference between authentication and authorization.

  2. How do you protect against common web vulnerabilities (OWASP Top 10)?

  3. What are the best practices for storing secrets in production?

  4. How do you implement rate limiting to prevent abuse?

  5. Explain the principle of least privilege.

Advertisement