πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Data Security & Encryption: Compliance and Protection

Data EngineeringSecurity & Compliance⭐ Premium

Advertisement

Data Security & Encryption: Compliance and Protection

Difficulty: Senior Level | Companies: Stripe, PayPal, Amazon, Google, Microsoft

1. Data Security Layers

Architecture Diagram
Security Stack:
β”œβ”€β”€ Network Security
β”‚   β”œβ”€β”€ VPC, Security Groups, NACLs
β”‚   β”œβ”€β”€ Private Endpoints
β”‚   └── TLS/SSL
β”œβ”€β”€ Access Control
β”‚   β”œβ”€β”€ IAM Roles & Policies
β”‚   β”œβ”€β”€ Column-Level Security
β”‚   └── Row-Level Security
β”œβ”€β”€ Encryption
β”‚   β”œβ”€β”€ At Rest (AES-256)
β”‚   β”œβ”€β”€ In Transit (TLS 1.3)
β”‚   └── Column-Level Encryption
β”œβ”€β”€ Data Masking
β”‚   β”œβ”€β”€ Dynamic Data Masking
β”‚   β”œβ”€β”€ Static Data Masking
β”‚   └── Tokenization
└── Audit & Compliance
    β”œβ”€β”€ Access Logs
    β”œβ”€β”€ Data Lineage
    └── GDPR/CCPA Tools

2. Encryption Strategies

At-Rest Encryption

import hashlib
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64

class EncryptionManager:
    def __init__(self, master_key: bytes):
        self.master_key = master_key
    
    def generate_data_key(self) -> bytes:
        """Generate a unique data encryption key"""
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=self.master_key[:16],
            iterations=480000,
        )
        key = base64.urlsafe_b64encode(kdf.derive(self.master_key))
        return key
    
    def encrypt_column(self, plaintext: str, data_key: bytes) -> dict:
        """Encrypt a single column value"""
        f = Fernet(data_key)
        encrypted = f.encrypt(plaintext.encode())
        
        return {
            "ciphertext": encrypted.decode(),
            "key_id": hashlib.sha256(data_key).hexdigest()[:16],
            "algorithm": "AES-256-GCM",
        }
    
    def decrypt_column(self, encrypted_data: dict, data_key: bytes) -> str:
        """Decrypt a single column value"""
        f = Fernet(data_key)
        return f.decrypt(encrypted_data["ciphertext"].encode()).decode()

Column-Level Encryption in Spark

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName("ColumnEncryption").getOrCreate()

# Register UDF for encryption
@udf(returnType=StringType())
def encrypt_pii(value, key):
    if value is None:
        return None
    encryptor = EncryptionManager(key.encode())
    data_key = encryptor.generate_data_key()
    encrypted = encryptor.encrypt_column(value, data_key)
    return json.dumps(encrypted)

@udf(returnType=StringType())
def decrypt_pii(encrypted_json, key):
    if encrypted_json is None:
        return None
    encryptor = EncryptionManager(key.encode())
    data = json.loads(encrypted_json)
    return encryptor.decrypt_column(data, key.encode())

# Apply column-level encryption
encrypted_df = df.withColumn(
    "email_encrypted",
    encrypt_pii(F.col("email"), F.lit(MASTER_KEY))
)

# Write encrypted data
encrypted_df.write.format("parquet").save("s3://secure-data/users/")

3. Access Control Patterns

Row-Level Security

class RowLevelSecurity:
    def __init__(self):
        self.policies = {}
    
    def add_policy(self, table: str, role: str, filter_condition: str):
        """Add row-level security policy"""
        if table not in self.policies:
            self.policies[table] = []
        self.policies[table].append({
            "role": role,
            "filter": filter_condition
        })
    
    def apply_policy(self, table_name: str, user_role: str, query: str) -> str:
        """Inject filter into query based on user role"""
        policies = self.policies.get(table_name, [])
        
        filters = []
        for policy in policies:
            if policy["role"] == user_role or policy["role"] == "*":
                filters.append(policy["filter"])
        
        if filters:
            where_clause = " AND ".join(filters)
            if "WHERE" in query.upper():
                query = f"{query} AND ({where_clause})"
            else:
                query = f"{query} WHERE {where_clause}"
        
        return query

# Usage
rls = RowLevelSecurity()
rls.add_policy("sales", "region_manager", "region = CURRENT_REGION()")
rls.add_policy("sales", "analyst", "1=1")  # analysts see all
rls.add_policy("hr_data", "employee", "employee_id = CURRENT_USER_ID()")

secured_query = rls.apply_policy("sales", "region_manager", 
    "SELECT * FROM sales WHERE date >= '2024-01-01'")
# Result: SELECT * FROM sales WHERE date >= '2024-01-01' AND (region = CURRENT_REGION())

Column-Level Security

class ColumnLevelSecurity:
    def __init__(self):
        self.classifications = {}  # column -> classification
        self.role_permissions = {}  # role -> allowed classifications
    
    def classify_column(self, table: str, column: str, classification: str):
        key = f"{table}.{column}"
        self.classifications[key] = classification
    
    def grant_access(self, role: str, classifications: list):
        self.role_permissions[role] = set(classifications)
    
    def filter_columns(self, table: str, role: str, columns: list) -> list:
        allowed_classifications = self.role_permissions.get(role, set())
        filtered = []
        
        for col in columns:
            key = f"{table}.{col}"
            classification = self.classifications.get(key, "public")
            if classification in allowed_classifications or role == "admin":
                filtered.append(col)
            else:
                filtered.append(f"NULL AS {col}")  # Replace with NULL
        
        return filtered

# Usage
cls = ColumnLevelSecurity()
cls.classify_column("users", "email", "pii")
cls.classify_column("users", "ssn", "highly_sensitive")
cls.classify_column("users", "name", "internal")
cls.grant_access("analyst", ["internal", "public"])
cls.grant_access("hr_team", ["pii", "highly_sensitive", "internal", "public"])

filtered = cls.filter_columns("users", "analyst", ["name", "email", "ssn"])
# Result: ["name", "NULL AS email", "NULL AS ssn"]

4. GDPR & CCPA Compliance

Right to Erasure (Right to be Forgotten)

class GDPRHandler:
    def __init__(self, spark, data_catalog):
        self.spark = spark
        self.catalog = data_catalog
    
    def handle_deletion_request(self, user_id: str) -> dict:
        """Process a GDPR deletion request"""
        affected_datasets = self.catalog.find_datasets_with_entity(user_id)
        results = {"deleted": [], "anonymized": [], "errors": []}
        
        for dataset in affected_datasets:
            try:
                if dataset.deletion_possible:
                    # Hard delete from table
                    self.spark.sql(f"""
                        DELETE FROM {dataset.fully_qualified}
                        WHERE user_id = '{user_id}'
                    """)
                    results["deleted"].append(dataset.name)
                else:
                    # Anonymize instead of delete (for referential integrity)
                    self.spark.sql(f"""
                        UPDATE {dataset.fully_qualified}
                        SET user_id = 'ANONYMOUS',
                            email = NULL,
                            name = 'DELETED_USER'
                        WHERE user_id = '{user_id}'
                    """)
                    results["anonymized"].append(dataset.name)
            except Exception as e:
                results["errors"].append({"dataset": dataset.name, "error": str(e)})
        
        return results
    
    def export_user_data(self, user_id: str) -> dict:
        """Export all data for a user (Right to Portability)"""
        all_data = {}
        
        for dataset in self.catalog.find_datasets_with_entity(user_id):
            data = self.spark.sql(f"""
                SELECT * FROM {dataset.fully_qualified}
                WHERE user_id = '{user_id}'
            """).toPandas().to_dict(orient="records")
            
            all_data[dataset.name] = data
        
        return all_data

5. Key Management

class KeyManager:
    """AWS KMS key management"""
    
    def __init__(self, kms_client):
        self.kms = kms_client
    
    def create_data_key(self, key_id: str) -> dict:
        """Generate data encryption key"""
        response = self.kms.generate_data_key(
            KeyId=key_id,
            KeySpec='AES_256'
        )
        return {
            "plaintext_key": response['Plaintext'],
            "encrypted_key": response['CiphertextBlob'],
        }
    
    def encrypt_key(self, key_id: str, data_key: bytes) -> bytes:
        """Encrypt a data key"""
        response = self.kms.encrypt(
            KeyId=key_id,
            Plaintext=data_key
        )
        return response['CiphertextBlob']
    
    def decrypt_key(self, encrypted_key: bytes) -> bytes:
        """Decrypt a data key"""
        response = self.kms.decrypt(
            CiphertextBlob=encrypted_key
        )
        return response['Plaintext']
    
    def rotate_key(self, key_id: str):
        """Rotate encryption key"""
        self.kms.enable_key_rotation(KeyId=key_id)

ℹ️

Best Practice: Use envelope encryption β€” encrypt data with a data key, then encrypt the data key with a master key. This limits the exposure if a single key is compromised.

Follow-Up Questions

  1. How would you implement end-to-end encryption for a data pipeline?
  2. Design a GDPR-compliant data deletion system for a data lake.
  3. How do you handle encryption key rotation without downtime?
  4. Design a data masking solution for development environments.
  5. How would you audit data access for compliance purposes?

Advertisement