πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: Airflow CI/CD Pipeline

Apache Airflow AdvancedCI/CD⭐ Premium

Advertisement

Airflow CI/CD Pipeline

Testing, Deployment, and Automation

MicrosoftMetaDifficulty: Advanced

Interview Question

ℹ️Interview Context

Company: Microsoft / Meta Role: Senior Data Engineer / Platform Engineer Difficulty: Advanced Time: 45-60 minutes

Question: "Design a CI/CD pipeline for Airflow DAGs. How do you test, deploy, and monitor DAG changes? What tools and practices would you use?"


Detailed Theory

CI/CD Fundamentals

# cicd_fundamentals.py
"""
Airflow CI/CD Pipeline:

1. Source Control:
   - Git for version control
   - Branching strategy
   - Code review process

2. Testing:
   - Unit tests
   - Integration tests
   - DAG validation

3. Build:
   - Docker image build
   - DAG packaging
   - Dependency management

4. Deployment:
   - Git-sync
   - Airflow REST API
   - Helm charts

5. Monitoring:
   - Deployment status
   - Rollback triggers
   - Health checks
"""

1. GitHub Actions Pipeline

# github_actions.py
"""
GitHub Actions CI/CD for Airflow:

Automated testing and deployment pipeline.
"""

# .github/workflows/airflow-ci.yml
GITHUB_ACTIONS_CONFIG = """
name: Airflow CI/CD

on:
  push:
    branches: [ main, develop ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest
    
    steps:
    - uses: actions/checkout@v3
    
    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.9'
    
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
        pip install -r requirements-dev.txt
    
    - name: Lint with flake8
      run: |
        flake8 dags/ --max-line-length=120
    
    - name: Type check with mypy
      run: |
        mypy dags/ --ignore-missing-imports
    
    - name: Run unit tests
      run: |
        pytest tests/unit/ -v --cov=dags/ --cov-report=xml
    
    - name: Run DAG validation
      run: |
        python scripts/validate_dags.py
    
    - name: Upload coverage
      uses: codecov/codecov-action@v3
      with:
        file: ./coverage.xml

  build:
    needs: test
    runs-on: ubuntu-latest
    if: github.event_name == 'push' && github.ref == 'refs/heads/main'
    
    steps:
    - uses: actions/checkout@v3
    
    - name: Build Docker image
      run: |
        docker build -t my-airflow:${{ github.sha }} .
        docker tag my-airflow:${{ github.sha }} registry.example.com/my-airflow:latest
    
    - name: Push to registry
      run: |
        echo ${{ secrets.REGISTRY_PASSWORD }} | docker login registry.example.com -u ${{ secrets.REGISTRY_USERNAME }} --password-stdin
        docker push registry.example.com/my-airflow:latest

  deploy:
    needs: build
    runs-on: ubuntu-latest
    if: github.event_name == 'push' && github.ref == 'refs/heads/main'
    
    steps:
    - name: Deploy to staging
      run: |
        helm upgrade my-airflow apache-airflow/airflow \
          --namespace airflow-staging \
          --set image.tag=${{ github.sha }} \
          --wait --timeout 300s
    
    - name: Run integration tests
      run: |
        python scripts/integration_tests.py --env staging
    
    - name: Deploy to production
      if: success()
      run: |
        helm upgrade my-airflow apache-airflow/airflow \
          --namespace airflow-production \
          --set image.tag=${{ github.sha }} \
          --wait --timeout 300s
"""

ℹ️Pro Tip

Use GitHub Actions for CI/CD pipelines. It provides tight integration with GitHub repositories and supports multiple deployment targets.

2. DAG Validation

# dag_validation.py
"""
DAG Validation Script:

Validates DAGs before deployment.
"""

import ast
import importlib.util
import sys
from pathlib import Path

def validate_dag(dag_path: str) -> bool:
    """Validate a single DAG file"""
    try:
        # Parse AST
        with open(dag_path, 'r') as f:
            tree = ast.parse(f.read())
        
        # Check for DAG definitions
        has_dag = False
        for node in ast.walk(tree):
            if isinstance(node, ast.Call):
                if hasattr(node, 'func'):
                    if hasattr(node.func, 'id'):
                        if node.func.id == 'DAG':
                            has_dag = True
        
        if not has_dag:
            print(f"WARNING: No DAG found in {dag_path}")
            return False
        
        # Import and validate
        spec = importlib.util.spec_from_file_location("dag_module", dag_path)
        module = importlib.util.module_from_spec(spec)
        sys.modules["dag_module"] = module
        spec.loader.exec_module(module)
        
        print(f"SUCCESS: {dag_path} validated")
        return True
        
    except Exception as e:
        print(f"ERROR: {dag_path} - {str(e)}")
        return False

def validate_all_dags(dag_folder: str) -> bool:
    """Validate all DAGs in folder"""
    dag_files = Path(dag_folder).glob("*.py")
    
    results = []
    for dag_file in dag_files:
        if not dag_file.name.startswith('__'):
            results.append(validate_dag(str(dag_file)))
    
    return all(results)

if __name__ == "__main__":
    dag_folder = sys.argv[1] if len(sys.argv) > 1 else "dags/"
    success = validate_all_dags(dag_folder)
    sys.exit(0 if success else 1)

3. Git-Sync Deployment

# git_sync.py
"""
Git-Sync Deployment:

Automatically sync DAGs from Git repository.
"""

# Git-Sync sidecar configuration
GIT_SYNC_CONFIG = """
# Kubernetes Pod with Git-Sync
apiVersion: v1
kind: Pod
metadata:
  name: airflow-dags-sync
spec:
  containers:
  - name: git-sync
    image: registry.k8s.io/git-sync:v4.0.0
    args:
    - --repo=https://github.com/example/airflow-dags.git
    - --root=/dags
    - --one-time
    - --depth=1
    volumeMounts:
    - name: dags
      mountPath: /dags
  
  - name: airflow
    image: apache/airflow:2.7.0
    volumeMounts:
    - name: dags
      mountPath: /opt/airflow/dags
  
  volumes:
  - name: dags
    emptyDir: {}
"""

# Airflow configuration for Git-Sync
AIRFLOW_GIT_SYNC_CONFIG = """
[dags]
# Git-Sync settings
git_sync_enabled = True
git_sync_container_repository = registry.k8s.io/git-sync
git_sync_container_tag = v4.0.0
git_sync_run_as_user = 65533
git_sync_dest = /git/dags
git_sync_root = /git
git_sync_repo = https://github.com/example/airflow-dags.git
git_sync_branch = main
git_sync_subpath = dags
git_sync_wait = 60
"""

4. Airflow REST API Deployment

# rest_api_deployment.py
"""
Airflow REST API Deployment:

Deploy DAGs using Airflow's REST API.
"""

import requests
import json
from typing import Dict, Any

class AirflowDeployer:
    """Deploy DAGs using Airflow REST API"""
    
    def __init__(
        self,
        base_url: str,
        username: str,
        password: str,
    ):
        self.base_url = base_url
        self.session = requests.Session()
        self.session.auth = (username, password)
    
    def upload_dag(
        self,
        dag_path: str,
        dag_id: str,
    ) -> bool:
        """Upload DAG file"""
        try:
            with open(dag_path, 'rb') as f:
                response = self.session.post(
                    f"{self.base_url}/api/v1/dags",
                    files={'file': (f'{dag_id}.py', f)},
                )
            
            return response.status_code == 200
        except Exception as e:
            print(f"Upload failed: {e}")
            return False
    
    def pause_dag(self, dag_id: str) -> bool:
        """Pause DAG"""
        response = self.session.patch(
            f"{self.base_url}/api/v1/dags/{dag_id}",
            json={'is_paused': True},
        )
        return response.status_code == 200
    
    def unpause_dag(self, dag_id: str) -> bool:
        """Unpause DAG"""
        response = self.session.patch(
            f"{self.base_url}/api/v1/dags/{dag_id}",
            json={'is_paused': False},
        )
        return response.status_code == 200
    
    def trigger_dag(
        self,
        dag_id: str,
        conf: Dict[str, Any] = None,
    ) -> str:
        """Trigger DAG run"""
        payload = {}
        if conf:
            payload['conf'] = conf
        
        response = self.session.post(
            f"{self.base_url}/api/v1/dags/{dag_id}/dagRuns",
            json=payload,
        )
        
        if response.status_code == 200:
            return response.json()['dag_run_id']
        return None
    
    def get_dag_status(self, dag_id: str) -> Dict[str, Any]:
        """Get DAG status"""
        response = self.session.get(
            f"{self.base_url}/api/v1/dags/{dag_id}",
        )
        
        if response.status_code == 200:
            return response.json()
        return {}

# Usage
deployer = AirflowDeployer(
    base_url='http://airflow-webserver:8080',
    username='admin',
    password='admin',
)

# Deploy DAG
deployer.upload_dag('dags/my_dag.py', 'my_dag')

# Trigger DAG
run_id = deployer.trigger_dag('my_dag', {'param': 'value'})

⚠️Important

Always test DAG deployments in staging before production. Use deployment strategies like blue-green or canary deployments for safety.

5. Deployment Strategies

# deployment_strategies.py
"""
Deployment Strategies:

1. Blue-Green Deployment:
   - Maintain two identical environments
   - Switch traffic between them
   - Easy rollback

2. Canary Deployment:
   - Deploy to small subset first
   - Monitor for issues
   - Gradually increase traffic

3. Rolling Deployment:
   - Deploy incrementally
   - Update one instance at a time
   - Zero downtime
"""

# Blue-Green deployment script
def blue_green_deploy(
    deployer: AirflowDeployer,
    dag_path: str,
    dag_id: str,
):
    """Blue-green deployment"""
    # Deploy to green environment
    deployer.upload_dag(dag_path, dag_id)
    
    # Run smoke tests
    success = run_smoke_tests(dag_id)
    
    if success:
        # Switch traffic
        deployer.unpause_dag(dag_id)
        print(f"Deployed {dag_id} successfully")
    else:
        # Rollback
        deployer.pause_dag(dag_id)
        print(f"Deployment failed, rolled back {dag_id}")

# Canary deployment script
def canary_deploy(
    deployer: AirflowDeployer,
    dag_path: str,
    dag_id: str,
    canary_percentage: int = 10,
):
    """Canary deployment"""
    # Deploy canary
    deployer.upload_dag(dag_path, dag_id)
    
    # Trigger with canary flag
    run_id = deployer.trigger_dag(dag_id, {'canary': True})
    
    # Monitor
    monitor_canary(dag_id, run_id, canary_percentage)
    
    # Full deployment if successful
    deployer.unpause_dag(dag_id)

Real-World Scenarios

Scenario 1: Microsoft's CI/CD Pipeline

# microsoft_cicd.py
"""
Microsoft-style CI/CD pipeline:
- Azure DevOps integration
- Multi-stage deployment
- Automated testing
"""

# Azure DevOps YAML
AZURE_DEVOPS_CONFIG = """
trigger:
  branches:
    include:
    - main
    - develop

pool:
  vmImage: 'ubuntu-latest'

stages:
- stage: Test
  jobs:
  - job: Test
    steps:
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.9'
    
    - script: |
        pip install -r requirements.txt
        pip install -r requirements-dev.txt
      displayName: 'Install dependencies'
    
    - script: |
        flake8 dags/ --max-line-length=120
        mypy dags/ --ignore-missing-imports
      displayName: 'Lint and type check'
    
    - script: |
        pytest tests/unit/ -v --cov=dags/
      displayName: 'Run unit tests'

- stage: Build
  dependsOn: Test
  jobs:
  - job: Build
    steps:
    - task: Docker@2
      inputs:
        containerRegistry: 'ACR'
        repository: 'airflow'
        command: 'buildAndPush'
        Dockerfile: '**/Dockerfile'
        tags: '$(Build.BuildId)'

- stage: Deploy
  dependsOn: Build
  jobs:
  - job: Deploy
    steps:
    - task: HelmInstaller@1
      inputs:
        helmVersion: '3.12.0'
    
    - script: |
        helm upgrade my-airflow apache-airflow/airflow \
          --namespace airflow \
          --set image.tag=$(Build.BuildId) \
          --wait
      displayName: 'Deploy to Kubernetes'
"""

Scenario 2: Meta's Automated Pipeline

# meta_automated.py
"""
Meta-style automated pipeline:
- Monorepo structure
- Automated testing
- Continuous deployment
"""

# Monorepo structure
MONOREPO_STRUCTURE = """
airflow-pipelines/
"""

<div className="my-6 flex justify-center">
<svg viewBox="0 0 800 380" width="100%" style={{ maxWidth: 800 }} xmlns="http://www.w3.org/2000/svg">
  <defs>
    <linearGradient id="af2-root" x1="0" y1="0" x2="0" y2="1">
      <stop offset="0%" stopColor="#f59e0b" />
      <stop offset="100%" stopColor="#d97706" />
    </linearGradient>
    <linearGradient id="af2-dir" x1="0" y1="0" x2="0" y2="1">
      <stop offset="0%" stopColor="#3b82f6" />
      <stop offset="100%" stopColor="#1d4ed8" />
    </linearGradient>
    <linearGradient id="af2-file" x1="0" y1="0" x2="0" y2="1">
      <stop offset="0%" stopColor="#10b981" />
      <stop offset="100%" stopColor="#059669" />
    </linearGradient>
    <linearGradient id="af2-sub" x1="0" y1="0" x2="0" y2="1">
      <stop offset="0%" stopColor="#8b5cf6" />
      <stop offset="100%" stopColor="#6d28d9" />
    </linearGradient>
    <filter id="af2-sh"><feDropShadow dx="0" dy="1" stdDeviation="2" floodOpacity="0.1" /></filter>
  </defs>

  <rect x="40" y="10" width="160" height="32" rx="14" fill="url(#af2-root)" filter="url(#af2-sh)" />
  <text x="120" y="31" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="11" fontWeight="600">airflow-pipelines/</text>

  <line x1="120" y1="42" x2="120" y2="56" stroke="#64748b" strokeWidth="1.5" />
  <line x1="120" y1="56" x2="650" y2="56" stroke="#64748b" strokeWidth="1.5" />

  <rect x="160" y="46" width="70" height="24" rx="10" fill="url(#af2-dir)" filter="url(#af2-sh)" />
  <text x="195" y="63" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="10" fontWeight="600">dags/</text>

  <line x1="195" y1="70" x2="195" y2="82" stroke="#64748b" strokeWidth="1.5" />
  <line x1="195" y1="82" x2="400" y2="82" stroke="#64748b" strokeWidth="1.5" />

  <rect x="230" y="72" width="80" height="22" rx="8" fill="url(#af2-sub)" filter="url(#af2-sh)" />
  <text x="270" y="88" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="9" fontWeight="600">team_a/</text>

  <line x1="270" y1="94" x2="270" y2="106" stroke="#64748b" strokeWidth="1" />
  <line x1="270" y1="106" x2="370" y2="106" stroke="#64748b" strokeWidth="1" />

  <rect x="300" y="100" width="70" height="18" rx="6" fill="url(#af2-file)" filter="url(#af2-sh)" />
  <text x="335" y="113" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="8">dag1.py</text>

  <rect x="300" y="122" width="70" height="18" rx="6" fill="url(#af2-file)" filter="url(#af2-sh)" />
  <text x="335" y="135" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="8">dag2.py</text>

  <rect x="230" y="146" width="80" height="22" rx="8" fill="url(#af2-sub)" filter="url(#af2-sh)" />
  <text x="270" y="162" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="9" fontWeight="600">team_b/</text>

  <line x1="270" y1="168" x2="270" y2="180" stroke="#64748b" strokeWidth="1" />
  <line x1="270" y1="180" x2="370" y2="180" stroke="#64748b" strokeWidth="1" />

  <rect x="300" y="174" width="70" height="18" rx="6" fill="url(#af2-file)" filter="url(#af2-sh)" />
  <text x="335" y="187" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="8">dag3.py</text>

  <rect x="300" y="196" width="70" height="18" rx="6" fill="url(#af2-file)" filter="url(#af2-sh)" />
  <text x="335" y="209" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="8">dag4.py</text>

  <rect x="160" y="224" width="80" height="22" rx="8" fill="url(#af2-dir)" filter="url(#af2-sh)" />
  <text x="200" y="240" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="10" fontWeight="600">tests/</text>

  <line x1="200" y1="246" x2="200" y2="258" stroke="#64748b" strokeWidth="1" />
  <line x1="200" y1="258" x2="320" y2="258" stroke="#64748b" strokeWidth="1" />

  <rect x="240" y="252" width="60" height="18" rx="6" fill="url(#af2-sub)" filter="url(#af2-sh)" />
  <text x="270" y="265" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="8" fontWeight="600">unit/</text>

  <rect x="240" y="274" width="110" height="18" rx="6" fill="url(#af2-sub)" filter="url(#af2-sh)" />
  <text x="295" y="287" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="8" fontWeight="600">integration/</text>

  <rect x="160" y="302" width="80" height="22" rx="8" fill="url(#af2-dir)" filter="url(#af2-sh)" />
  <text x="200" y="318" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="10" fontWeight="600">scripts/</text>

  <rect x="160" y="332" width="80" height="22" rx="8" fill="url(#af2-dir)" filter="url(#af2-sh)" />
  <text x="200" y="348" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="10" fontWeight="600">docker/</text>

  <rect x="160" y="362" width="80" height="22" rx="8" fill="url(#af2-dir)" filter="url(#af2-sh)" />
  <text x="200" y="378" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="10" fontWeight="600">.github/</text>
</svg>

MONOREPO_STRUCTURE = """

# Automated deployment
def auto_deploy():
    """Automated deployment pipeline"""
    # 1. Validate DAGs
    if not validate_all_dags('dags/'):
        raise Exception("DAG validation failed")
    
    # 2. Run tests
    if not run_tests():
        raise Exception("Tests failed")
    
    # 3. Build image
    build_docker_image()
    
    # 4. Deploy to staging
    deploy_to_staging()
    
    # 5. Run integration tests
    if not run_integration_tests():
        raise Exception("Integration tests failed")
    
    # 6. Deploy to production
    deploy_to_production()
    
    print("Deployment successful!")

QuizBox


Best Practices

# best_practices.py
"""
CI/CD Best Practices:

1. Version Control:
   - Use Git for all DAGs
   - Implement branching strategy
   - Code review process

2. Testing:
   - Unit tests for tasks
   - Integration tests for pipelines
   - DAG validation

3. Deployment:
   - Automated deployments
   - Blue-green or canary strategy
   - Rollback procedures

4. Monitoring:
   - Track deployment status
   - Monitor DAG health
   - Alert on failures

5. Documentation:
   - Deployment procedures
   - Rollback procedures
   - Troubleshooting guides
"""

ℹ️Microsoft Interview Tip

At Microsoft, they use Azure DevOps for CI/CD with multi-stage pipelines. When discussing CI/CD, emphasize automated testing, safe deployment strategies, and monitoring. Also mention how they handle rollback and incident response.


Summary

CI/CD is critical for reliable Airflow deployments. Key takeaways:

  1. Git for version control
  2. Automated testing for quality
  3. Safe deployment strategies for reliability
  4. Monitoring for observability
  5. Documentation for maintainability

For Microsoft and Meta interviews, focus on:

  • CI/CD pipeline design
  • Testing strategies
  • Deployment safety
  • Monitoring and alerting
  • Incident response

This question is part of the Apache Airflow Advanced interview preparation series. Practice implementing these patterns before your interview.

Advertisement