Interview Question
ℹ️Interview Context
Company: Amazon / Microsoft Role: Senior Data Engineer / Security Engineer Difficulty: Advanced Time: 45-60 minutes
Question: "How do you manage secrets and credentials in a production Airflow deployment? Compare different secrets backends and explain when to use each. How do you handle secret rotation?"
Detailed Theory
Secrets Management Fundamentals
# secrets_fundamentals.py
"""
Airflow Secrets Backends:
1. Metadata DB (default):
- Stores connections in database
- Simple setup
- Good for small deployments
- Limitation: Credentials visible in DB
2. Environment Variables:
- Use AIRFLOW_CONN_<conn_id> format
- Good for Docker/K8s
- No DB setup required
- Limitation: Limited to env vars
3. HashiCorp Vault:
- Enterprise secret management
- Dynamic secrets
- Audit logging
- Good for: Large enterprises
4. AWS Secrets Manager:
- Managed AWS service
- Automatic rotation
- IAM integration
- Good for: AWS deployments
5. GCP Secret Manager:
- Managed GCP service
- Version management
- IAM integration
- Good for: GCP deployments
6. Azure Key Vault:
- Managed Azure service
- HSM backing
- RBAC integration
- Good for: Azure deployments
"""
1. Metadata DB Backend
# metadata_db_backend.py
"""
Metadata DB is the default secrets backend.
Credentials are stored in the connection table.
"""
# Configuration
METADATA_DB_CONFIG = """
[secrets]
# Default: use metadata DB
backend = airflow.providers.secrets.local_filesystem.LocalFilesystemBackend
# Or explicitly set metadata DB
backend = airflow.models.connection.Connection
"""
# Programmatic connection management
from airflow.models.connection import Connection
from airflow.utils.session import create_session
def create_connection():
"""Create connection in metadata DB"""
with create_session() as session:
conn = Connection(
conn_id='my_database',
conn_type='postgres',
host='localhost',
login='user',
password='password',
schema='my_database',
port=5432,
)
session.add(conn)
session.commit()
def get_connection():
"""Get connection from metadata DB"""
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection(conn_id='my_database')
return conn
⚠️Security
Metadata DB stores credentials in plain text. Use this only for development/testing. For production, use a dedicated secrets backend.
2. Environment Variables Backend
# environment_variables_backend.py
"""
Environment Variables Backend:
- Uses AIRFLOW_CONN_<conn_id> format
- Good for Docker/Kubernetes deployments
- No external service required
"""
# Configuration
ENV_CONFIG = """
[secrets]
backend = airflow.providers.secrets.environment_variables.EnvironmentVariablesBackend
"""
# Environment variable format
ENV_EXAMPLES = {
# PostgreSQL connection
'AIRFLOW_CONN_MY_DATABASE': 'postgresql://user:password@localhost:5432/mydb',
# HTTP connection
'AIRFLOW_CONN_MY_API': 'https://api.example.com:443',
# AWS connection
'AIRFLOW_CONN_AWS_DEFAULT': 'aws://key:secret@',
# Google Cloud connection
'AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT': 'google-cloud-platform://',
}
# Docker example
DOCKER_EXAMPLE = """
# Dockerfile
FROM apache/airflow:latest
ENV AIRFLOW_CONN_MY_DATABASE='postgresql://user:password@postgres:5432/airflow'
ENV AIRFLOW_CONN_MY_API='https://api.example.com:443'
"""
# Kubernetes example
K8S_EXAMPLE = """
# kubernetes deployment
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
type: Opaque
stringData:
AIRFLOW_CONN_MY_DATABASE: "postgresql://user:password@postgres:5432/airflow"
"""
3. HashiCorp Vault Backend
# hashicorp_vault_backend.py
"""
HashiCorp Vault Backend:
- Enterprise secret management
- Dynamic secrets with TTL
- Audit logging
- Fine-grained access control
"""
# Configuration
VAULT_CONFIG = """
[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {
"connections_path": "airflow/connections",
"variables_path": "airflow/variables",
"secrets_path": "airflow/secrets",
"mount_point": "secret",
"kv_engine_version": 2
}
[vault]
# Vault connection
vault_url = https://vault.example.com:8200
vault_token = {{ env('VAULT_TOKEN') }}
vault_role_id = {{ env('VAULT_ROLE_ID') }}
vault_secret_id = {{ env('VAULT_SECRET_ID') }}
"""
# Custom Vault hook
from airflow.hooks.base import BaseHook
import hvac
from typing import Any, Dict
class CustomVaultHook(BaseHook):
"""Custom hook for HashiCorp Vault"""
def __init__(
self,
vault_url: str,
vault_token: str = None,
role_id: str = None,
secret_id: str = None,
):
super().__init__()
self.vault_url = vault_url
self.vault_token = vault_token
self.role_id = role_id
self.secret_id = secret_id
self._client = None
@property
def client(self):
"""Get Vault client"""
if self._client is None:
self._client = hvac.Client(url=self.vault_url)
if self.vault_token:
self._client.token = self.vault_token
elif self.role_id and self.secret_id:
self._client.auth.approle.login(
role_id=self.role_id,
secret_id=self.secret_id
)
return self._client
def get_secret(
self,
path: str,
mount_point: str = 'secret',
version: int = None
) -> Dict[str, Any]:
"""Get secret from Vault"""
try:
secret = self.client.secrets.kv.v2.read_secret_version(
path=path,
mount_point=mount_point,
version=version
)
return secret['data']['data']
except Exception as e:
self.log.error(f"Failed to get secret: {e}")
raise
def create_secret(
self,
path: str,
data: Dict[str, Any],
mount_point: str = 'secret'
) -> None:
"""Create secret in Vault"""
try:
self.client.secrets.kv.v2.create_or_update_secret(
path=path,
mount_point=mount_point,
secret=data
)
except Exception as e:
self.log.error(f"Failed to create secret: {e}")
raise
4. AWS Secrets Manager Backend
# aws_secrets_manager_backend.py
"""
AWS Secrets Manager Backend:
- Managed AWS service
- Automatic rotation
- IAM integration
- Cross-account access
"""
# Configuration
AWS_CONFIG = """
[secrets]
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {
"connections_prefix": "airflow/connections",
"variables_prefix": "airflow/variables",
"secrets_prefix": "airflow/secrets",
"region_name": "us-east-1",
"profile": "default"
}
"""
# Custom AWS Secrets Manager hook
from airflow.hooks.base import BaseHook
import boto3
from typing import Any, Dict
class CustomSecretsManagerHook(BaseHook):
"""Custom hook for AWS Secrets Manager"""
def __init__(
self,
region_name: str = 'us-east-1',
profile: str = None,
):
super().__init__()
self.region_name = region_name
self.profile = profile
self._client = None
@property
def client(self):
"""Get Secrets Manager client"""
if self._client is None:
session = boto3.Session(
region_name=self.region_name,
profile_name=self.profile
)
self._client = session.client('secretsmanager')
return self._client
def get_secret(
self,
secret_name: str,
version_id: str = None,
version_stage: str = None
) -> Dict[str, Any]:
"""Get secret from Secrets Manager"""
try:
kwargs = {'SecretId': secret_name}
if version_id:
kwargs['VersionId'] = version_id
if version_stage:
kwargs['VersionStage'] = version_stage
response = self.client.get_secret_value(**kwargs)
# Parse secret
import json
secret_string = response['SecretString']
return json.loads(secret_string)
except Exception as e:
self.log.error(f"Failed to get secret: {e}")
raise
def create_secret(
self,
name: str,
secret: Dict[str, Any],
description: str = '',
kms_key_id: str = None,
) -> str:
"""Create secret in Secrets Manager"""
try:
import json
kwargs = {
'Name': name,
'SecretString': json.dumps(secret),
'Description': description,
}
if kms_key_id:
kwargs['KmsKeyId'] = kms_key_id
response = self.client.create_secret(**kwargs)
return response['ARN']
except Exception as e:
self.log.error(f"Failed to create secret: {e}")
raise
def rotate_secret(
self,
secret_name: str,
rotation_lambda_arn: str,
rotation_rules: Dict[str, Any] = None
) -> None:
"""Enable automatic rotation"""
try:
kwargs = {
'SecretId': secret_name,
'RotationLambdaARN': rotation_lambda_arn,
}
if rotation_rules:
kwargs['RotationRules'] = rotation_rules
self.client.rotate_secret(**kwargs)
except Exception as e:
self.log.error(f"Failed to enable rotation: {e}")
raise
ℹ️Pro Tip
Use AWS Secrets Manager with automatic rotation for database credentials. This ensures credentials are rotated regularly without manual intervention.
5. GCP Secret Manager Backend
# gcp_secret_manager_backend.py
"""
GCP Secret Manager Backend:
- Managed GCP service
- Version management
- IAM integration
- Automatic replication
"""
# Configuration
GCP_CONFIG = """
[secrets]
backend = airflow.providers.google.cloud.secrets.secret_manager.SecretManagerBackend
backend_kwargs = {
"connections_prefix": "airflow-connections",
"variables_prefix": "airflow-variables",
"secrets_prefix": "airflow-secrets",
"project_id": "my-project"
}
"""
# Custom GCP Secret Manager hook
from airflow.hooks.base import BaseHook
from google.cloud import secretmanager
from typing import Any, Dict
class CustomSecretManagerHook(BaseHook):
"""Custom hook for GCP Secret Manager"""
def __init__(
self,
project_id: str,
credentials_path: str = None,
):
super().__init__()
self.project_id = project_id
self.credentials_path = credentials_path
self._client = None
@property
def client(self):
"""Get Secret Manager client"""
if self._client is None:
if self.credentials_path:
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.credentials_path
self._client = secretmanager.SecretManagerServiceClient()
return self._client
def get_secret(
self,
secret_id: str,
version_id: str = 'latest'
) -> str:
"""Get secret from Secret Manager"""
try:
name = f"projects/{self.project_id}/secrets/{secret_id}/versions/{version_id}"
response = self.client.access_secret_version(request={"name": name})
return response.payload.data.decode('UTF-8')
except Exception as e:
self.log.error(f"Failed to get secret: {e}")
raise
def create_secret(
self,
secret_id: str,
secret_value: str,
labels: Dict[str, str] = None
) -> str:
"""Create secret in Secret Manager"""
try:
parent = f"projects/{self.project_id}"
request = {
"parent": parent,
"secret_id": secret_id,
"secret": {
"replication": {"automatic": {}},
"labels": labels or {},
}
}
response = self.client.create_secret(request=request)
# Add secret version
version_request = {
"parent": response.name,
"payload": {"data": secret_value.encode('UTF-8')}
}
self.client.add_secret_version(request=version_request)
return response.name
except Exception as e:
self.log.error(f"Failed to create secret: {e}")
raise
6. Secret Rotation
# secret_rotation.py
"""
Secret Rotation Patterns:
1. Manual Rotation:
- Update secrets in backend
- Restart Airflow workers
- Verify connections
2. Automatic Rotation:
- Use rotation Lambda/functions
- Update secrets automatically
- Zero downtime
3. Rotation with Downtime:
- Update secrets during maintenance
- Graceful shutdown
- Restart with new secrets
"""
# Rotation Lambda for AWS Secrets Manager
ROTATION_LAMBDA = """
import boto3
import json
import random
import string
def lambda_handler(event, context):
"""Rotate RDS password"""
secret_arn = event['SecretId']
token = event['ClientRequestToken']
step = event['Step']
service_client = boto3.client('secretsmanager')
if step == 'createSecret':
# Generate new secret
new_password = generate_password()
# Store new secret
service_client.put_secret_value(
SecretId=secret_arn,
ClientRequestToken=token,
SecretString=json.dumps({
'username': 'admin',
'password': new_password
}),
VersionStages=['AWSPENDING']
)
elif step == 'setSecret':
# Set secret in RDS
rds_client = boto3.client('rds')
secret = json.loads(
service_client.get_secret_value(
SecretId=secret_arn,
VersionStage='AWSPENDING'
)['SecretString']
)
rds_client.modify_db_instance(
DBInstanceIdentifier='my-db',
MasterUsername=secret['username'],
MasterUserPassword=secret['password']
)
elif step == 'testSecret':
# Test connection with new secret
secret = json.loads(
service_client.get_secret_value(
SecretId=secret_arn,
VersionStage='AWSPENDING'
)['SecretString']
)
# Test database connection
import psycopg2
conn = psycopg2.connect(
host='my-db.cluster.us-east-1.rds.amazonaws.com',
database='mydb',
user=secret['username'],
password=secret['password']
)
conn.close()
elif step == 'finishSecret':
# Mark new secret as active
service_client.update_secret_version_stage(
SecretId=secret_arn,
VersionStage='AWSCURRENT',
MoveToVersionId=token,
RemoveFromVersionId=event['PreviousVersionId']
)
def generate_password(length=32):
"""Generate random password"""
characters = string.ascii_letters + string.digits + string.punctuation
return ''.join(random.choice(characters) for _ in range(length))
"""
# Rotation configuration
ROTATION_CONFIG = """
# AWS Secrets Manager rotation
aws secretsmanager rotate-secret \
--secret-id airflow/connections/my-database \
--rotation-lambda-arn arn:aws:lambda:us-east-1:123456789012:function:rotate-password \
--rotation-rules '{"AutomaticallyAfterDays": 30}'
"""
⚠️Important
Always test secret rotation in a staging environment before implementing in production. Ensure your applications can handle credential changes gracefully.
Real-World Scenarios
Scenario 1: Amazon's Multi-Account Setup
# amazon_multi_account.py
"""
Amazon-style multi-account secrets:
- Cross-account access
- Centralized secrets management
- Audit logging
"""
from airflow.hooks.base import BaseHook
import boto3
from typing import Any, Dict
class MultiAccountSecretsHook(BaseHook):
"""Hook for multi-account AWS setup"""
def __init__(
self,
central_account_id: str,
target_accounts: list,
):
super().__init__()
self.central_account_id = central_account_id
self.target_accounts = target_accounts
self._clients = {}
def get_client(self, account_id: str):
"""Get client for specific account"""
if account_id not in self._clients:
# Assume role in target account
sts_client = boto3.client('sts')
role_arn = f"arn:aws:iam::{account_id}:role/CrossAccountSecretsRole"
response = sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName='AirflowSecretsAccess'
)
credentials = response['Credentials']
# Create client with temporary credentials
session = boto3.Session(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
self._clients[account_id] = session.client('secretsmanager')
return self._clients[account_id]
def get_secret(
self,
account_id: str,
secret_name: str
) -> Dict[str, Any]:
"""Get secret from specific account"""
client = self.get_client(account_id)
response = client.get_secret_value(SecretId=secret_name)
import json
return json.loads(response['SecretString'])
# Usage
hook = MultiAccountSecretsHook(
central_account_id='123456789012',
target_accounts=['234567890123', '345678901234']
)
# Get secrets from different accounts
dev_secret = hook.get_secret('234567890123', 'airflow/dev/database')
prod_secret = hook.get_secret('345678901234', 'airflow/prod/database')
Scenario 2: Microsoft's Azure Key Vault
# microsoft_key_vault.py
"""
Microsoft-style Azure Key Vault integration:
- Managed identity authentication
- Key Vault access policies
- Secret versioning
"""
from airflow.hooks.base import BaseHook
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
from typing import Any, Dict
class AzureKeyVaultHook(BaseHook):
"""Hook for Azure Key Vault"""
def __init__(
self,
vault_url: str,
credential_type: str = 'managed_identity',
):
super().__init__()
self.vault_url = vault_url
self.credential_type = credential_type
self._client = None
@property
def client(self):
"""Get Key Vault client"""
if self._client is None:
if self.credential_type == 'managed_identity':
credential = DefaultAzureCredential()
else:
# Use specific credential
credential = DefaultAzureCredential()
self._client = SecretClient(
vault_url=self.vault_url,
credential=credential
)
return self._client
def get_secret(
self,
secret_name: str,
version: str = None
) -> str:
"""Get secret from Key Vault"""
try:
secret = self.client.get_secret(
name=secret_name,
version=version
)
return secret.value
except Exception as e:
self.log.error(f"Failed to get secret: {e}")
raise
def set_secret(
self,
secret_name: str,
secret_value: str,
content_type: str = 'application/json'
) -> None:
"""Set secret in Key Vault"""
try:
self.client.set_secret(
name=secret_name,
value=secret_value,
content_type=content_type
)
except Exception as e:
self.log.error(f"Failed to set secret: {e}")
raise
# Usage
hook = AzureKeyVaultHook(
vault_url='https://myvault.vault.azure.net/',
credential_type='managed_identity'
)
# Get secrets
db_password = hook.get_secret('database-password')
api_key = hook.get_secret('api-key')
Edge Cases
⚠️Common Pitfalls
-
Secret Caching: Secrets may be cached. Implement cache invalidation.
-
Network Timeouts: Handle network failures when accessing secrets.
-
Permission Issues: Ensure proper IAM policies for secrets access.
-
Secret Rotation: Handle credential changes gracefully.
# edge_cases.py
from airflow.hooks.base import BaseHook
from functools import lru_cache
import time
class CachedSecretsHook(BaseHook):
"""Hook with secret caching"""
def __init__(self, cache_ttl: int = 300):
super().__init__()
self.cache_ttl = cache_ttl
self._cache = {}
self._cache_times = {}
def get_secret(self, secret_name: str) -> str:
"""Get secret with caching"""
now = time.time()
# Check cache
if (
secret_name in self._cache and
now - self._cache_times[secret_name] < self.cache_ttl
):
return self._cache[secret_name]
# Fetch from backend
secret = self._fetch_secret(secret_name)
# Update cache
self._cache[secret_name] = secret
self._cache_times[secret_name] = now
return secret
def _fetch_secret(self, secret_name: str) -> str:
"""Fetch secret from backend"""
# Implementation depends on backend
pass
def invalidate_cache(self, secret_name: str = None):
"""Invalidate cache"""
if secret_name:
self._cache.pop(secret_name, None)
self._cache_times.pop(secret_name, None)
else:
self._cache.clear()
self._cache_times.clear()
QuizBox
Best Practices
# best_practices.py
"""
Secrets Management Best Practices:
1. Backend Selection:
- Use dedicated secrets backend for production
- Choose based on cloud provider
- Consider multi-cloud requirements
2. Security:
- Never hardcode credentials
- Use least-privilege access
- Enable audit logging
- Rotate secrets regularly
3. Performance:
- Implement secret caching
- Set appropriate cache TTL
- Monitor secrets access
4. Reliability:
- Handle network failures
- Implement fallback mechanisms
- Test secret rotation
5. Compliance:
- Meet regulatory requirements
- Document secret access
- Regular security audits
"""
ℹ️Amazon Interview Tip
At Amazon, they use AWS Secrets Manager with automatic rotation. When discussing secrets management, emphasize the importance of security, automatic rotation, and audit logging. Also mention how they handle multi-account setups with cross-account IAM roles.
Summary
Secrets management is critical for production security. Key takeaways:
- Metadata DB for development only
- Environment Variables for containers
- HashiCorp Vault for multi-cloud
- AWS/GCP/Azure for cloud-native
- Automatic rotation for security
For Amazon and Microsoft interviews, focus on:
- Backend selection criteria
- Security best practices
- Automatic rotation
- Multi-account/multi-cloud patterns
- Compliance requirements
This question is part of the Apache Airflow Advanced interview preparation series. Practice implementing these patterns before your interview.