🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Topic: Airflow Secrets Management

Apache Airflow AdvancedSecrets Management⭐ Premium

Advertisement

Airflow Secrets Management

Enterprise-Grade Credential Management

AmazonMicrosoftDifficulty: Advanced

Interview Question

ℹ️Interview Context

Company: Amazon / Microsoft Role: Senior Data Engineer / Security Engineer Difficulty: Advanced Time: 45-60 minutes

Question: "How do you manage secrets and credentials in a production Airflow deployment? Compare different secrets backends and explain when to use each. How do you handle secret rotation?"


Detailed Theory

Secrets Management Fundamentals

# secrets_fundamentals.py
"""
Airflow Secrets Backends:

1. Metadata DB (default):
   - Stores connections in database
   - Simple setup
   - Good for small deployments
   - Limitation: Credentials visible in DB

2. Environment Variables:
   - Use AIRFLOW_CONN_<conn_id> format
   - Good for Docker/K8s
   - No DB setup required
   - Limitation: Limited to env vars

3. HashiCorp Vault:
   - Enterprise secret management
   - Dynamic secrets
   - Audit logging
   - Good for: Large enterprises

4. AWS Secrets Manager:
   - Managed AWS service
   - Automatic rotation
   - IAM integration
   - Good for: AWS deployments

5. GCP Secret Manager:
   - Managed GCP service
   - Version management
   - IAM integration
   - Good for: GCP deployments

6. Azure Key Vault:
   - Managed Azure service
   - HSM backing
   - RBAC integration
   - Good for: Azure deployments
"""

1. Metadata DB Backend

# metadata_db_backend.py
"""
Metadata DB is the default secrets backend.
Credentials are stored in the connection table.
"""

# Configuration
METADATA_DB_CONFIG = """
[secrets]
# Default: use metadata DB
backend = airflow.providers.secrets.local_filesystem.LocalFilesystemBackend

# Or explicitly set metadata DB
backend = airflow.models.connection.Connection
"""

# Programmatic connection management
from airflow.models.connection import Connection
from airflow.utils.session import create_session

def create_connection():
    """Create connection in metadata DB"""
    with create_session() as session:
        conn = Connection(
            conn_id='my_database',
            conn_type='postgres',
            host='localhost',
            login='user',
            password='password',
            schema='my_database',
            port=5432,
        )
        session.add(conn)
        session.commit()

def get_connection():
    """Get connection from metadata DB"""
    from airflow.hooks.base import BaseHook
    
    conn = BaseHook.get_connection(conn_id='my_database')
    return conn

⚠️Security

Metadata DB stores credentials in plain text. Use this only for development/testing. For production, use a dedicated secrets backend.

2. Environment Variables Backend

# environment_variables_backend.py
"""
Environment Variables Backend:
- Uses AIRFLOW_CONN_<conn_id> format
- Good for Docker/Kubernetes deployments
- No external service required
"""

# Configuration
ENV_CONFIG = """
[secrets]
backend = airflow.providers.secrets.environment_variables.EnvironmentVariablesBackend
"""

# Environment variable format
ENV_EXAMPLES = {
    # PostgreSQL connection
    'AIRFLOW_CONN_MY_DATABASE': 'postgresql://user:password@localhost:5432/mydb',
    
    # HTTP connection
    'AIRFLOW_CONN_MY_API': 'https://api.example.com:443',
    
    # AWS connection
    'AIRFLOW_CONN_AWS_DEFAULT': 'aws://key:secret@',
    
    # Google Cloud connection
    'AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT': 'google-cloud-platform://',
}

# Docker example
DOCKER_EXAMPLE = """
# Dockerfile
FROM apache/airflow:latest

ENV AIRFLOW_CONN_MY_DATABASE='postgresql://user:password@postgres:5432/airflow'
ENV AIRFLOW_CONN_MY_API='https://api.example.com:443'
"""

# Kubernetes example
K8S_EXAMPLE = """
# kubernetes deployment
apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
type: Opaque
stringData:
  AIRFLOW_CONN_MY_DATABASE: "postgresql://user:password@postgres:5432/airflow"
"""

3. HashiCorp Vault Backend

# hashicorp_vault_backend.py
"""
HashiCorp Vault Backend:
- Enterprise secret management
- Dynamic secrets with TTL
- Audit logging
- Fine-grained access control
"""

# Configuration
VAULT_CONFIG = """
[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {
    "connections_path": "airflow/connections",
    "variables_path": "airflow/variables",
    "secrets_path": "airflow/secrets",
    "mount_point": "secret",
    "kv_engine_version": 2
}

[vault]
# Vault connection
vault_url = https://vault.example.com:8200
vault_token = {{ env('VAULT_TOKEN') }}
vault_role_id = {{ env('VAULT_ROLE_ID') }}
vault_secret_id = {{ env('VAULT_SECRET_ID') }}
"""

# Custom Vault hook
from airflow.hooks.base import BaseHook
import hvac
from typing import Any, Dict

class CustomVaultHook(BaseHook):
    """Custom hook for HashiCorp Vault"""
    
    def __init__(
        self,
        vault_url: str,
        vault_token: str = None,
        role_id: str = None,
        secret_id: str = None,
    ):
        super().__init__()
        self.vault_url = vault_url
        self.vault_token = vault_token
        self.role_id = role_id
        self.secret_id = secret_id
        self._client = None
    
    @property
    def client(self):
        """Get Vault client"""
        if self._client is None:
            self._client = hvac.Client(url=self.vault_url)
            
            if self.vault_token:
                self._client.token = self.vault_token
            elif self.role_id and self.secret_id:
                self._client.auth.approle.login(
                    role_id=self.role_id,
                    secret_id=self.secret_id
                )
        
        return self._client
    
    def get_secret(
        self,
        path: str,
        mount_point: str = 'secret',
        version: int = None
    ) -> Dict[str, Any]:
        """Get secret from Vault"""
        try:
            secret = self.client.secrets.kv.v2.read_secret_version(
                path=path,
                mount_point=mount_point,
                version=version
            )
            return secret['data']['data']
        except Exception as e:
            self.log.error(f"Failed to get secret: {e}")
            raise
    
    def create_secret(
        self,
        path: str,
        data: Dict[str, Any],
        mount_point: str = 'secret'
    ) -> None:
        """Create secret in Vault"""
        try:
            self.client.secrets.kv.v2.create_or_update_secret(
                path=path,
                mount_point=mount_point,
                secret=data
            )
        except Exception as e:
            self.log.error(f"Failed to create secret: {e}")
            raise

4. AWS Secrets Manager Backend

# aws_secrets_manager_backend.py
"""
AWS Secrets Manager Backend:
- Managed AWS service
- Automatic rotation
- IAM integration
- Cross-account access
"""

# Configuration
AWS_CONFIG = """
[secrets]
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {
    "connections_prefix": "airflow/connections",
    "variables_prefix": "airflow/variables",
    "secrets_prefix": "airflow/secrets",
    "region_name": "us-east-1",
    "profile": "default"
}
"""

# Custom AWS Secrets Manager hook
from airflow.hooks.base import BaseHook
import boto3
from typing import Any, Dict

class CustomSecretsManagerHook(BaseHook):
    """Custom hook for AWS Secrets Manager"""
    
    def __init__(
        self,
        region_name: str = 'us-east-1',
        profile: str = None,
    ):
        super().__init__()
        self.region_name = region_name
        self.profile = profile
        self._client = None
    
    @property
    def client(self):
        """Get Secrets Manager client"""
        if self._client is None:
            session = boto3.Session(
                region_name=self.region_name,
                profile_name=self.profile
            )
            self._client = session.client('secretsmanager')
        
        return self._client
    
    def get_secret(
        self,
        secret_name: str,
        version_id: str = None,
        version_stage: str = None
    ) -> Dict[str, Any]:
        """Get secret from Secrets Manager"""
        try:
            kwargs = {'SecretId': secret_name}
            if version_id:
                kwargs['VersionId'] = version_id
            if version_stage:
                kwargs['VersionStage'] = version_stage
            
            response = self.client.get_secret_value(**kwargs)
            
            # Parse secret
            import json
            secret_string = response['SecretString']
            return json.loads(secret_string)
        except Exception as e:
            self.log.error(f"Failed to get secret: {e}")
            raise
    
    def create_secret(
        self,
        name: str,
        secret: Dict[str, Any],
        description: str = '',
        kms_key_id: str = None,
    ) -> str:
        """Create secret in Secrets Manager"""
        try:
            import json
            
            kwargs = {
                'Name': name,
                'SecretString': json.dumps(secret),
                'Description': description,
            }
            if kms_key_id:
                kwargs['KmsKeyId'] = kms_key_id
            
            response = self.client.create_secret(**kwargs)
            return response['ARN']
        except Exception as e:
            self.log.error(f"Failed to create secret: {e}")
            raise
    
    def rotate_secret(
        self,
        secret_name: str,
        rotation_lambda_arn: str,
        rotation_rules: Dict[str, Any] = None
    ) -> None:
        """Enable automatic rotation"""
        try:
            kwargs = {
                'SecretId': secret_name,
                'RotationLambdaARN': rotation_lambda_arn,
            }
            if rotation_rules:
                kwargs['RotationRules'] = rotation_rules
            
            self.client.rotate_secret(**kwargs)
        except Exception as e:
            self.log.error(f"Failed to enable rotation: {e}")
            raise

ℹ️Pro Tip

Use AWS Secrets Manager with automatic rotation for database credentials. This ensures credentials are rotated regularly without manual intervention.

5. GCP Secret Manager Backend

# gcp_secret_manager_backend.py
"""
GCP Secret Manager Backend:
- Managed GCP service
- Version management
- IAM integration
- Automatic replication
"""

# Configuration
GCP_CONFIG = """
[secrets]
backend = airflow.providers.google.cloud.secrets.secret_manager.SecretManagerBackend
backend_kwargs = {
    "connections_prefix": "airflow-connections",
    "variables_prefix": "airflow-variables",
    "secrets_prefix": "airflow-secrets",
    "project_id": "my-project"
}
"""

# Custom GCP Secret Manager hook
from airflow.hooks.base import BaseHook
from google.cloud import secretmanager
from typing import Any, Dict

class CustomSecretManagerHook(BaseHook):
    """Custom hook for GCP Secret Manager"""
    
    def __init__(
        self,
        project_id: str,
        credentials_path: str = None,
    ):
        super().__init__()
        self.project_id = project_id
        self.credentials_path = credentials_path
        self._client = None
    
    @property
    def client(self):
        """Get Secret Manager client"""
        if self._client is None:
            if self.credentials_path:
                import os
                os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.credentials_path
            
            self._client = secretmanager.SecretManagerServiceClient()
        
        return self._client
    
    def get_secret(
        self,
        secret_id: str,
        version_id: str = 'latest'
    ) -> str:
        """Get secret from Secret Manager"""
        try:
            name = f"projects/{self.project_id}/secrets/{secret_id}/versions/{version_id}"
            response = self.client.access_secret_version(request={"name": name})
            return response.payload.data.decode('UTF-8')
        except Exception as e:
            self.log.error(f"Failed to get secret: {e}")
            raise
    
    def create_secret(
        self,
        secret_id: str,
        secret_value: str,
        labels: Dict[str, str] = None
    ) -> str:
        """Create secret in Secret Manager"""
        try:
            parent = f"projects/{self.project_id}"
            
            request = {
                "parent": parent,
                "secret_id": secret_id,
                "secret": {
                    "replication": {"automatic": {}},
                    "labels": labels or {},
                }
            }
            
            response = self.client.create_secret(request=request)
            
            # Add secret version
            version_request = {
                "parent": response.name,
                "payload": {"data": secret_value.encode('UTF-8')}
            }
            self.client.add_secret_version(request=version_request)
            
            return response.name
        except Exception as e:
            self.log.error(f"Failed to create secret: {e}")
            raise

6. Secret Rotation

# secret_rotation.py
"""
Secret Rotation Patterns:

1. Manual Rotation:
   - Update secrets in backend
   - Restart Airflow workers
   - Verify connections

2. Automatic Rotation:
   - Use rotation Lambda/functions
   - Update secrets automatically
   - Zero downtime

3. Rotation with Downtime:
   - Update secrets during maintenance
   - Graceful shutdown
   - Restart with new secrets
"""

# Rotation Lambda for AWS Secrets Manager
ROTATION_LAMBDA = """
import boto3
import json
import random
import string

def lambda_handler(event, context):
    """Rotate RDS password"""
    
    secret_arn = event['SecretId']
    token = event['ClientRequestToken']
    step = event['Step']
    
    service_client = boto3.client('secretsmanager')
    
    if step == 'createSecret':
        # Generate new secret
        new_password = generate_password()
        
        # Store new secret
        service_client.put_secret_value(
            SecretId=secret_arn,
            ClientRequestToken=token,
            SecretString=json.dumps({
                'username': 'admin',
                'password': new_password
            }),
            VersionStages=['AWSPENDING']
        )
    
    elif step == 'setSecret':
        # Set secret in RDS
        rds_client = boto3.client('rds')
        
        secret = json.loads(
            service_client.get_secret_value(
                SecretId=secret_arn,
                VersionStage='AWSPENDING'
            )['SecretString']
        )
        
        rds_client.modify_db_instance(
            DBInstanceIdentifier='my-db',
            MasterUsername=secret['username'],
            MasterUserPassword=secret['password']
        )
    
    elif step == 'testSecret':
        # Test connection with new secret
        secret = json.loads(
            service_client.get_secret_value(
                SecretId=secret_arn,
                VersionStage='AWSPENDING'
            )['SecretString']
        )
        
        # Test database connection
        import psycopg2
        conn = psycopg2.connect(
            host='my-db.cluster.us-east-1.rds.amazonaws.com',
            database='mydb',
            user=secret['username'],
            password=secret['password']
        )
        conn.close()
    
    elif step == 'finishSecret':
        # Mark new secret as active
        service_client.update_secret_version_stage(
            SecretId=secret_arn,
            VersionStage='AWSCURRENT',
            MoveToVersionId=token,
            RemoveFromVersionId=event['PreviousVersionId']
        )

def generate_password(length=32):
    """Generate random password"""
    characters = string.ascii_letters + string.digits + string.punctuation
    return ''.join(random.choice(characters) for _ in range(length))
"""

# Rotation configuration
ROTATION_CONFIG = """
# AWS Secrets Manager rotation
aws secretsmanager rotate-secret \
    --secret-id airflow/connections/my-database \
    --rotation-lambda-arn arn:aws:lambda:us-east-1:123456789012:function:rotate-password \
    --rotation-rules '{"AutomaticallyAfterDays": 30}'
"""

⚠️Important

Always test secret rotation in a staging environment before implementing in production. Ensure your applications can handle credential changes gracefully.


Real-World Scenarios

Scenario 1: Amazon's Multi-Account Setup

# amazon_multi_account.py
"""
Amazon-style multi-account secrets:
- Cross-account access
- Centralized secrets management
- Audit logging
"""

from airflow.hooks.base import BaseHook
import boto3
from typing import Any, Dict

class MultiAccountSecretsHook(BaseHook):
    """Hook for multi-account AWS setup"""
    
    def __init__(
        self,
        central_account_id: str,
        target_accounts: list,
    ):
        super().__init__()
        self.central_account_id = central_account_id
        self.target_accounts = target_accounts
        self._clients = {}
    
    def get_client(self, account_id: str):
        """Get client for specific account"""
        if account_id not in self._clients:
            # Assume role in target account
            sts_client = boto3.client('sts')
            
            role_arn = f"arn:aws:iam::{account_id}:role/CrossAccountSecretsRole"
            
            response = sts_client.assume_role(
                RoleArn=role_arn,
                RoleSessionName='AirflowSecretsAccess'
            )
            
            credentials = response['Credentials']
            
            # Create client with temporary credentials
            session = boto3.Session(
                aws_access_key_id=credentials['AccessKeyId'],
                aws_secret_access_key=credentials['SecretAccessKey'],
                aws_session_token=credentials['SessionToken']
            )
            
            self._clients[account_id] = session.client('secretsmanager')
        
        return self._clients[account_id]
    
    def get_secret(
        self,
        account_id: str,
        secret_name: str
    ) -> Dict[str, Any]:
        """Get secret from specific account"""
        client = self.get_client(account_id)
        
        response = client.get_secret_value(SecretId=secret_name)
        
        import json
        return json.loads(response['SecretString'])

# Usage
hook = MultiAccountSecretsHook(
    central_account_id='123456789012',
    target_accounts=['234567890123', '345678901234']
)

# Get secrets from different accounts
dev_secret = hook.get_secret('234567890123', 'airflow/dev/database')
prod_secret = hook.get_secret('345678901234', 'airflow/prod/database')

Scenario 2: Microsoft's Azure Key Vault

# microsoft_key_vault.py
"""
Microsoft-style Azure Key Vault integration:
- Managed identity authentication
- Key Vault access policies
- Secret versioning
"""

from airflow.hooks.base import BaseHook
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
from typing import Any, Dict

class AzureKeyVaultHook(BaseHook):
    """Hook for Azure Key Vault"""
    
    def __init__(
        self,
        vault_url: str,
        credential_type: str = 'managed_identity',
    ):
        super().__init__()
        self.vault_url = vault_url
        self.credential_type = credential_type
        self._client = None
    
    @property
    def client(self):
        """Get Key Vault client"""
        if self._client is None:
            if self.credential_type == 'managed_identity':
                credential = DefaultAzureCredential()
            else:
                # Use specific credential
                credential = DefaultAzureCredential()
            
            self._client = SecretClient(
                vault_url=self.vault_url,
                credential=credential
            )
        
        return self._client
    
    def get_secret(
        self,
        secret_name: str,
        version: str = None
    ) -> str:
        """Get secret from Key Vault"""
        try:
            secret = self.client.get_secret(
                name=secret_name,
                version=version
            )
            return secret.value
        except Exception as e:
            self.log.error(f"Failed to get secret: {e}")
            raise
    
    def set_secret(
        self,
        secret_name: str,
        secret_value: str,
        content_type: str = 'application/json'
    ) -> None:
        """Set secret in Key Vault"""
        try:
            self.client.set_secret(
                name=secret_name,
                value=secret_value,
                content_type=content_type
            )
        except Exception as e:
            self.log.error(f"Failed to set secret: {e}")
            raise

# Usage
hook = AzureKeyVaultHook(
    vault_url='https://myvault.vault.azure.net/',
    credential_type='managed_identity'
)

# Get secrets
db_password = hook.get_secret('database-password')
api_key = hook.get_secret('api-key')

Edge Cases

⚠️Common Pitfalls

  1. Secret Caching: Secrets may be cached. Implement cache invalidation.

  2. Network Timeouts: Handle network failures when accessing secrets.

  3. Permission Issues: Ensure proper IAM policies for secrets access.

  4. Secret Rotation: Handle credential changes gracefully.

# edge_cases.py
from airflow.hooks.base import BaseHook
from functools import lru_cache
import time

class CachedSecretsHook(BaseHook):
    """Hook with secret caching"""
    
    def __init__(self, cache_ttl: int = 300):
        super().__init__()
        self.cache_ttl = cache_ttl
        self._cache = {}
        self._cache_times = {}
    
    def get_secret(self, secret_name: str) -> str:
        """Get secret with caching"""
        now = time.time()
        
        # Check cache
        if (
            secret_name in self._cache and
            now - self._cache_times[secret_name] < self.cache_ttl
        ):
            return self._cache[secret_name]
        
        # Fetch from backend
        secret = self._fetch_secret(secret_name)
        
        # Update cache
        self._cache[secret_name] = secret
        self._cache_times[secret_name] = now
        
        return secret
    
    def _fetch_secret(self, secret_name: str) -> str:
        """Fetch secret from backend"""
        # Implementation depends on backend
        pass
    
    def invalidate_cache(self, secret_name: str = None):
        """Invalidate cache"""
        if secret_name:
            self._cache.pop(secret_name, None)
            self._cache_times.pop(secret_name, None)
        else:
            self._cache.clear()
            self._cache_times.clear()

QuizBox


Best Practices

# best_practices.py
"""
Secrets Management Best Practices:

1. Backend Selection:
   - Use dedicated secrets backend for production
   - Choose based on cloud provider
   - Consider multi-cloud requirements

2. Security:
   - Never hardcode credentials
   - Use least-privilege access
   - Enable audit logging
   - Rotate secrets regularly

3. Performance:
   - Implement secret caching
   - Set appropriate cache TTL
   - Monitor secrets access

4. Reliability:
   - Handle network failures
   - Implement fallback mechanisms
   - Test secret rotation

5. Compliance:
   - Meet regulatory requirements
   - Document secret access
   - Regular security audits
"""

ℹ️Amazon Interview Tip

At Amazon, they use AWS Secrets Manager with automatic rotation. When discussing secrets management, emphasize the importance of security, automatic rotation, and audit logging. Also mention how they handle multi-account setups with cross-account IAM roles.


Summary

Secrets management is critical for production security. Key takeaways:

  1. Metadata DB for development only
  2. Environment Variables for containers
  3. HashiCorp Vault for multi-cloud
  4. AWS/GCP/Azure for cloud-native
  5. Automatic rotation for security

For Amazon and Microsoft interviews, focus on:

  • Backend selection criteria
  • Security best practices
  • Automatic rotation
  • Multi-account/multi-cloud patterns
  • Compliance requirements

This question is part of the Apache Airflow Advanced interview preparation series. Practice implementing these patterns before your interview.

Advertisement