πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Pipeline Orchestration Patterns: DAGs, Error Handling & Idempotency

Data EngineeringPipeline Design⭐ Premium

Advertisement

Pipeline Orchestration Patterns: DAGs, Error Handling & Idempotency

Difficulty: Senior Level | Companies: Netflix, Uber, Airbnb, Lyft, DoorDash

1. DAG Design Patterns

Fan-Out / Fan-In

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def fan_out_fan_in_dag():
    """Process multiple regions in parallel, then merge"""
    
    with DAG(
        'fan_out_fan_in',
        start_date=datetime(2024, 1, 1),
        schedule_interval='@daily',
    ) as dag:
        
        # Fan-out: process each region in parallel
        regions = ['us-east', 'us-west', 'eu-west', 'ap-south']
        process_tasks = []
        
        for region in regions:
            task = PythonOperator(
                task_id=f'process_{region}',
                python_callable=process_region,
                op_kwargs={'region': region},
            )
            process_tasks.append(task)
        
        # Fan-in: merge results
        merge = PythonOperator(
            task_id='merge_results',
            python_callable=merge_regions,
        )
        
        # All region tasks feed into merge
        process_tasks >> merge
    
    return dag

def process_region(region: str):
    """Process data for a single region"""
    # Independent processing per region
    pass

def merge_regions():
    """Merge all region results"""
    # Requires all upstream tasks to complete
    pass

Conditional Branching

def conditional_dag():
    """Branch based on data quality"""
    
    with DAG('conditional_pipeline', ...) as dag:
        
        quality_check = PythonOperator(
            task_id='check_quality',
            python_callable=run_quality_checks,
        )
        
        def branch_func(**context):
            quality_result = context['ti'].xcom_pull(task_ids='check_quality')
            if quality_result['passed']:
                return 'process_data'
            else:
                return 'quarantine_data'
        
        branch = BranchPythonOperator(
            task_id='branch_on_quality',
            python_callable=branch_func,
        )
        
        process = PythonOperator(
            task_id='process_data',
            python_callable=process_data,
        )
        
        quarantine = PythonOperator(
            task_id='quarantine_data',
            python_callable=quarantine_data,
        )
        
        quality_check >> branch >> [process, quarantine]

2. Idempotency Patterns

Idempotent Writes

class IdempotentWriter:
    """Write operations that can be safely retried"""
    
    def __init__(self, target_path: str):
        self.target_path = target_path
    
    def write_with_dedup(self, df, partition_key: str):
        """Write with deduplication β€” same input = same output"""
        # Deduplicate input
        deduped_df = df.dropDuplicates([partition_key])
        
        # Write to temp location first
        temp_path = f"{self.target_path}/_temp/{uuid.uuid4()}"
        deduped_df.write.format("delta").mode("overwrite").save(temp_path)
        
        # Atomic rename (or Delta MERGE)
        self._atomic_replace(temp_path, self.target_path)
    
    def _atomic_replace(self, temp_path: str, target_path: str):
        """Atomically replace target with temp"""
        # For Delta Lake:
        # Use MERGE or INSERT OVERWRITE
        pass

Idempotent Backfills

class BackfillManager:
    def __init__(self, spark, checkpoint_path: str):
        self.spark = spark
        self.checkpoint_path = checkpoint_path
    
    def backfill_date(self, date: str, force: bool = False):
        """Backfill a single date β€” safe to retry"""
        checkpoint = self._get_checkpoint(date)
        
        if checkpoint == "completed" and not force:
            print(f"Date {date} already backfilled, skipping")
            return
        
        try:
            # Mark as in-progress
            self._update_checkpoint(date, "in_progress")
            
            # Process
            self._process_date(date)
            
            # Mark as completed
            self._update_checkpoint(date, "completed")
            
        except Exception as e:
            self._update_checkpoint(date, f"failed: {e}")
            raise
    
    def backfill_range(self, start_date: str, end_date: str, parallelism: int = 4):
        """Backfill a date range"""
        dates = pd.date_range(start_date, end_date)
        
        with ThreadPoolExecutor(max_workers=parallelism) as executor:
            futures = [
                executor.submit(self.backfill_date, str(d.date()))
                for d in dates
            ]
            
            for future in as_completed(futures):
                future.result()  # Raise exceptions

3. Error Handling & Retry Strategies

from airflow.models import Variable
from functools import wraps
import time

class RetryStrategy:
    """Configurable retry with exponential backoff"""
    
    def __init__(self, max_retries: int = 3, base_delay: float = 1.0,
                 max_delay: float = 60.0, backoff_factor: float = 2.0):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.backoff_factor = backoff_factor
    
    def retry(self, func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(self.max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    
                    if attempt < self.max_retries:
                        delay = min(
                            self.base_delay * (self.backoff_factor ** attempt),
                            self.max_delay
                        )
                        print(f"Attempt {attempt + 1} failed: {e}, retrying in {delay}s")
                        time.sleep(delay)
            
            raise last_exception
        
        return wrapper

# Usage in Airflow
default_args = {
    'owner': 'data-engineering',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=30),
    'on_retry_callback': notify_retry,
    'on_failure_callback': notify_failure,
}

Circuit Breaker Pattern

class CircuitBreaker:
    """Prevent cascading failures in dependent services"""
    
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open" # Testing if service recovered
    
    def __init__(self, failure_threshold: int = 5, 
                 recovery_timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.state = self.CLOSED
        self.failure_count = 0
        self.last_failure_time = None
    
    def call(self, func, *args, **kwargs):
        if self.state == self.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = self.HALF_OPEN
            else:
                raise CircuitBreakerOpenError("Circuit is open")
        
        try:
            result = func(*args, **kwargs)
            if self.state == self.HALF_OPEN:
                self.state = self.CLOSED
                self.failure_count = 0
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            
            if self.failure_count >= self.failure_threshold:
                self.state = self.OPEN
            
            raise

4. Data Quality Gates

class QualityGate:
    """Gate that blocks downstream if quality fails"""
    
    def __init__(self, checks: list):
        self.checks = checks
    
    def validate(self, df) -> dict:
        results = {}
        all_passed = True
        
        for check in self.checks:
            result = check.run(df)
            results[check.name] = result
            if not result.passed:
                all_passed = False
        
        return {
            "passed": all_passed,
            "results": results,
            "failed_checks": [name for name, r in results.items() if not r.passed]
        }

class NotNullCheck:
    def __init__(self, columns: list):
        self.columns = columns
        self.name = f"not_null_{','.join(columns)}"
    
    def run(self, df):
        for col in self.columns:
            null_count = df.filter(F.col(col).isNull()).count()
            if null_count > 0:
                return CheckResult(passed=False, message=f"{col} has {null_count} nulls")
        return CheckResult(passed=True)

class VolumeCheck:
    def __init__(self, min_rows: int, max_rows: int):
        self.min_rows = min_rows
        self.max_rows = max_rows
        self.name = "volume_check"
    
    def run(self, df):
        count = df.count()
        if count < self.min_rows:
            return CheckResult(passed=False, message=f"Too few rows: {count}")
        if count > self.max_rows:
            return CheckResult(passed=False, message=f"Too many rows: {count}")
        return CheckResult(passed=True)

# Usage
quality_gate = QualityGate([
    NotNullCheck(["order_id", "user_id"]),
    VolumeCheck(min_rows=1000, max_rows=10000000),
])

5. Backfill & Recovery Patterns

class BackfillStrategy:
    """Handle historical data backfills"""
    
    def __init__(self, dag_id: str, start_date: str, end_date: str):
        self.dag_id = dag_id
        self.start_date = start_date
        self.end_date = end_date
    
    def generate_backfill_config(self) -> dict:
        """Generate Airflow backfill configuration"""
        return {
            "dag_id": self.dag_id,
            "start_date": self.start_date,
            "end_date": self.end_date,
            "conf": {"is_backfill": True},
            "reset_dag_run": True,
            "donot_pickle": False,
        }
    
    def backfill_with_dependencies(self, spark):
        """Backfill while respecting data dependencies"""
        dates = pd.date_range(self.start_date, self.end_date)
        
        for date in dates:
            # Check if upstream dependencies are ready
            if self._upstream_ready(date):
                self._backfill_date(spark, date)
            else:
                print(f"Skipping {date} β€” upstream not ready")
    
    def _upstream_ready(self, date) -> bool:
        """Check if upstream data exists for this date"""
        # Query upstream tables
        return True

ℹ️

Best Practice: Every pipeline operation should be idempotent. If you can't guarantee idempotency, add a unique job ID and deduplicate on read.

Follow-Up Questions

  1. Design a DAG for a multi-region ETL pipeline with cross-region dependencies.
  2. How do you handle backfills that need to update downstream aggregates?
  3. Design a self-healing pipeline that automatically recovers from failures.
  4. How would you implement a blue-green deployment for a data pipeline?
  5. Design a pipeline that can handle out-of-order data delivery.

Advertisement