🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

The Data Engineering Lifecycle — From Source to Insight

Data Engineering FoundationsData Lifecycle🟢 Free Lesson

Advertisement

Overview

The data engineering lifecycle is the end-to-end journey data takes from its creation in source systems to its consumption by users, applications, and models. Understanding each stage — and the tools, patterns, and best practices associated with it — is fundamental to building reliable data systems.

Full Data Lifecycle Pipeline with Sub-stepsGenerateDatabasesAPIsLog FilesIoT SensorsIngestBatch ETLStreamingCDCMicro-batchStoreData LakesWarehousesOperational DBFeature StoreProcessCleanTransformEnrichAggregateServeDashboardsAPIsML ModelsData ProductsMonitorQualityPerformanceCostSecurityMedallion Architecture FlowBronze (Raw)Silver (Clean)Gold (Curated)Serving Layer

DfData Engineering Lifecycle

The data engineering lifecycle is the complete process of managing data from its origin to its consumption. It can be formally defined as:

Lifecycle = {Generate → Ingest → Store → Process → Serve → Monitor}

Each stage has specific objectives, tools, and quality metrics that must be optimized for the overall system to function effectively.

Stage 1: Data Generation

Data is generated by various source systems across the organization. Understanding the characteristics of each source type informs how you design ingestion and storage.

Source Types

Source TypeExamplesCharacteristicsChallenges
Transactional DatabasesPostgreSQL, MySQL, OracleStructured, ACID, high consistencySchema changes, CDC complexity
APIsStripe, Salesforce, TwitterSemi-structured, rate-limitedThrottling, pagination, auth
Log FilesApache, Nginx, Application logsUnstructured, high volumeParsing, rotation, retention
IoT SensorsTemperature, GPS, AccelerometerStreaming, time-series, massive scaleOut-of-order events, gaps
Flat FilesCSV, Excel, JSON exportsBatch, variable schemaEncoding, format consistency
Streaming EventsClickstream, TransactionsReal-time, ordered, immutableBackpressure, exactly-once

Data Characteristics

5 V's of Big DataVolumeTB to PBVelocityReal-timeVarietyMulti-formatVeracityTrustworthyValueBusiness Impact

Data Volume Growth Formula

The growth of data volume can be modeled exponentially:

V(t) = V₀ × (1 + r)^t

Where:

  • V(t): Data volume at time t
  • V₀: Initial data volume
  • r: Growth rate (e.g., 0.5 for 50% annual growth)
  • t: Time in years

Example: If you have 1TB today with 50% annual growth: V(3) = 1TB × (1 + 0.5)^3 = 3.375TB in 3 years

Source System Monitoring

# Example: Source system health monitoring
from dataclasses import dataclass
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import logging

@dataclass
class SourceSystemMetrics:
    """Metrics for monitoring source system health."""
    
    system_name: str                    # Name of the source system
    last_extract_time: datetime         # When data was last extracted
    row_count_trend: List[int]          # Historical row counts
    schema_version: str                 # Current schema version
    connection_status: str              # 'healthy', 'degraded', 'down'
    latency_ms: float                   # Average query latency
    error_rate: float                   # Percentage of failed queries
    
    def is_healthy(self) -> bool:
        """Check if source system is healthy."""
        checks = [
            self.connection_status == 'healthy',
            self.latency_ms < 1000,              # Under 1 second
            self.error_rate < 0.01,              # Less than 1% errors
            (datetime.now() - self.last_extract_time) < timedelta(hours=24),
            len(self.row_count_trend) > 0
        ]
        return all(checks)
    
    def get_health_report(self) -> Dict:
        """Generate detailed health report."""
        return {
            'system': self.system_name,
            'status': 'HEALTHY' if self.is_healthy() else 'UNHEALTHY',
            'last_extract': self.last_extract_time.isoformat(),
            'row_count_trend': self.row_count_trend,
            'schema_version': self.schema_version,
            'latency_ms': self.latency_ms,
            'error_rate': f"{self.error_rate * 100:.2f}%",
            'recommendations': self._get_recommendations()
        }
    
    def _get_recommendations(self) -> List[str]:
        """Generate recommendations based on metrics."""
        recommendations = []
        
        if self.latency_ms > 5000:
            recommendations.append("Consider adding indexes or optimizing queries")
        
        if self.error_rate > 0.05:
            recommendations.append("Investigate source system stability")
        
        if (datetime.now() - self.last_extract_time) > timedelta(hours=12):
            recommendations.append("Check extraction pipeline schedule")
        
        return recommendations

Stage 2: Data Ingestion

Ingestion is the process of moving data from source systems to your data infrastructure. The choice of ingestion method depends on latency requirements, data volume, and source capabilities.

Ingestion Patterns

Ingestion Pattern Selection

Ingestion Pattern Selection Formula

Choose the right pattern based on these factors:

Pattern_Score = (Latency_Weight × Latency_Score + Volume_Weight × Volume_Score + Cost_Weight × Cost_Score)

Where weights sum to 1.0:

  • Batch: Low latency score (1), High volume score (10), Low cost score (10)
  • Micro-batch: Medium latency score (7), Medium volume score (8), Medium cost score (7)
  • Streaming: High latency score (10), Medium volume score (7), High cost score (4)
  • CDC: Medium latency score (6), High volume score (9), Medium cost score (6)

Batch Ingestion

Best for: Historical data, daily reports, non-time-sensitive analytics.

# Example: Batch ingestion from PostgreSQL to S3 using Python
import psycopg2
import pandas as pd
import boto3
from io import StringIO
from datetime import datetime, timedelta

def extract_batch(
    connection_params: dict,    # Database connection parameters
    query: str,                 # SQL query to extract data
    table_name: str,            # Name of the source table
    batch_date: str = None,     # Batch date for incremental loads
    compression: str = 'gzip',  # Compression algorithm
    file_format: str = 'csv'    # Output format: 'csv', 'parquet', 'json'
) -> int:
    """Extract data from source and upload to S3 with all parameters explained."""
    
    # Establish database connection with timeout
    conn = psycopg2.connect(
        **connection_params,
        connect_timeout=30,             # Connection timeout in seconds
        options="-c statement_timeout=300000"  # Query timeout: 5 minutes
    )
    
    # Execute query with optional incremental filter
    if batch_date:
        query = f"{query} WHERE updated_at >= '{batch_date}'"
    
    df = pd.read_sql_query(
        query, 
        conn,
        chunksize=100000               # Read in chunks for memory efficiency
    )
    conn.close()
    
    # Convert to specified format in memory
    if file_format == 'csv':
        csv_buffer = StringIO()
        df.to_csv(
            csv_buffer, 
            index=False,                # Don't include row indices
            encoding='utf-8',          # Character encoding
            quoting=1                  # Quote all non-numeric fields
        )
        body = csv_buffer.getvalue()
    elif file_format == 'parquet':
        parquet_buffer = BytesIO()
        df.to_parquet(
            parquet_buffer,
            engine='pyarrow',          # Parquet engine
            compression=compression,   # Compression algorithm
            index=False                # Don't include row indices
        )
        body = parquet_buffer.getvalue()
    
    # Upload to S3 with metadata
    s3 = boto3.client('s3')
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    key = f'raw/{table_name}/{batch_date or timestamp}.{file_format}'
    
    s3.put_object(
        Bucket='data-lake',                           # S3 bucket name
        Key=key,                                      # Object key
        Body=body,                                    # File content
        ContentType=f'application/{file_format}',     # MIME type
        ServerSideEncryption='aws:kms',               # Encryption type
        Metadata={                                    # Custom metadata
            'source_table': table_name,
            'batch_date': batch_date or timestamp,
            'row_count': str(len(df)),
            'extraction_time': datetime.now().isoformat()
        }
    )
    
    return len(df)

# Usage example
rows = extract_batch(
    connection_params={
        "host": "source-db.example.com",      # Database host
        "database": "production",              # Database name
        "user": "reader",                      # Read-only user
        "password": "secret",                  # Password
        "port": 5432                           # Database port
    },
    query="SELECT * FROM orders WHERE date >= CURRENT_DATE - 1",
    table_name="orders",
    batch_date="2024-01-15",
    compression="snappy",
    file_format="parquet"
)

Streaming Ingestion

Best for: Real-time dashboards, fraud detection, live recommendations.

# Example: Streaming ingestion with Apache Kafka (using confluent-kafka)
from confluent_kafka import Producer, Consumer, KafkaError
import json
import time
from typing import Callable, Dict, Any

class KafkaEventProducer:
    """Production-grade Kafka producer with all parameters explained."""
    
    def __init__(
        self,
        bootstrap_servers: str,       # Kafka broker addresses
        client_id: str,               # Unique client identifier
        acks: str = 'all',           # Acknowledgment level: '0', '1', 'all'
        retries: int = 3,            # Number of retries
        batch_size: int = 16384,     # Batch size in bytes
        linger_ms: int = 10,         # Wait time to batch messages
        compression_type: str = 'snappy',  # Compression algorithm
        enable_idempotence: bool = True     # Exactly-once semantics
    ):
        self.producer = Producer({
            'bootstrap.servers': bootstrap_servers,
            'client.id': client_id,
            'acks': acks,
            'retries': retries,
            'batch.size': batch_size,
            'linger.ms': linger_ms,
            'compression.type': compression_type,
            'enable.idempotence': enable_idempotence
        })
    
    def produce_event(
        self,
        topic: str,                   # Kafka topic name
        key: str,                     # Message key for partitioning
        value: Dict[str, Any],        # Message payload
        headers: Dict[str, str] = None,  # Optional headers
        callback: Callable = None     # Delivery callback function
    ) -> None:
        """Produce a single event to Kafka."""
        # Serialize message
        value_bytes = json.dumps(value).encode('utf-8')
        
        # Prepare headers if provided
        headers_list = None
        if headers:
            headers_list = [(k, v.encode('utf-8')) for k, v in headers.items()]
        
        # Produce message
        self.producer.produce(
            topic=topic,
            key=key.encode('utf-8') if key else None,
            value=value_bytes,
            headers=headers_list,
            callback=callback or self._default_callback
        )
        
        # Trigger callbacks
        self.producer.poll(0)
    
    def flush(self, timeout: float = 30.0) -> int:
        """Flush all pending messages."""
        return self.producer.flush(timeout)
    
    def _default_callback(self, err, msg):
        """Default delivery callback."""
        if err:
            print(f"Delivery failed: {err}")
        else:
            print(f"Event delivered to {msg.topic()} [{msg.partition()}]")

# Usage example
producer = KafkaEventProducer(
    bootstrap_servers='kafka-broker-1:9092,kafka-broker-2:9092',
    client_id='data-ingestion-service',
    acks='all',
    retries=3,
    batch_size=32768,
    linger_ms=50,
    compression_type='lz4',
    enable_idempotence=True
)

event = {
    "user_id": 12345,
    "action": "purchase",
    "timestamp": time.time(),
    "amount": 99.99,
    "currency": "USD",
    "product_id": "PROD-001"
}

producer.produce_event(
    topic='user_events',
    key=str(event['user_id']),           # Partition by user_id
    value=event,
    headers={
        'source': 'web-app',
        'version': '1.0',
        'trace_id': 'abc-123-def-456'
    }
)

producer.flush()

Change Data Capture (CDC)

Best for: Keeping downstream systems in sync with source databases without full reloads.

CDC Latency Formula

CDC provides near-real-time synchronization with latency:

CDC_Latency = Binlog_Read_Time + Kafka_Producer_Time + Kafka_Consumer_Time + Apply_Time

Typical values:

  • Binlog_Read_Time: 10-50ms
  • Kafka_Producer_Time: 5-20ms
  • Kafka_Consumer_Time: 5-20ms
  • Apply_Time: 10-100ms
  • Total: 30-190ms (sub-second)

Stage 3: Data Storage

Storage choices depend on data format, access patterns, cost, and query requirements.

Storage Architecture Layers

Medallion Architecture (Bronze -> Silver -> Gold)

LayerPurposeData FormatExampleSLA
BronzeRaw data, as-is from sourceParquet, JSON, CSVRaw API responsesReal-time
SilverCleaned, deduplicated, validatedParquet, Delta LakeDeduplicated ordersHourly
GoldBusiness-level aggregationsParquet, Delta LakeDaily sales by regionDaily

Storage Cost Calculation

Total storage cost can be calculated as:

Storage_Cost = (Data_Size × Storage_Rate × Duration) + (API_Requests × Request_Cost) + (Data_Transfer × Transfer_Rate)

Where:

  • Data_Size: Volume of data stored (GB)
  • Storage_Rate: Cost per GB per month (e.g., $0.023 for S3 Standard)
  • Duration: Time period in months
  • API_Requests: Number of read/write operations
  • Request_Cost: Cost per 10,000 requests (e.g., $0.005 for S3)
  • Data_Transfer: Data transferred out (GB)
  • Transfer_Rate: Cost per GB transferred (e.g., $0.09 for first 10TB)

Storage Options Comparison

Storage TypeBest ForCostQuery PerformanceDurability
Data Lake (S3/GCS)Raw data, archivalLowRequires processing11 9's
Data Warehouse (Snowflake/BigQuery)Analytics, reportingMedium-HighExcellentHigh
Operational DB (PostgreSQL)Application queriesMediumHigh for OLTPHigh
OLAP Cube (Druid, ClickHouse)Real-time analyticsMediumVery HighHigh
Feature Store (Feast)ML featuresLow-MediumHigh for MLMedium

Stage 4: Data Processing

Processing transforms raw data into useful information through cleaning, transformation, and enrichment.

Processing Paradigms

Architecture Diagram
+-------------------------------------------------------------+
|                PROCESSING PARADIGMS                         |
+-------------------------------------------------------------+
| Batch           | Process large datasets periodically       |
|                 | Tools: Spark, dbt, SQL                    |
+-------------------------------------------------------------+
| Micro-batch    | Process small batches every few minutes    |
|                 | Tools: Spark Streaming, Flink (batch)     |
+-------------------------------------------------------------+
| Stream         | Process events as they arrive              |
|                 | Tools: Kafka Streams, Flink, Spark        |
+-------------------------------------------------------------+
| Request-response| Process on-demand via API                 |
|                 | Tools: Lambda functions, FastAPI          |
+-------------------------------------------------------------+

Processing Latency vs Throughput Tradeoff

Common Processing Operations

# Example: Data processing operations using PySpark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, TimestampType

spark = SparkSession.builder \
    .appName("DataProcessing") \
    .config("spark.sql.shuffle.partitions", "200") \       # Number of shuffle partitions
    .config("spark.sql.adaptive.enabled", "true") \        # Adaptive query execution
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \  # Coalesce small partitions
    .getOrCreate()

# Read bronze layer data with schema enforcement
schema = StructType([
    StructField("order_id", IntegerType(), False),
    StructField("customer_id", IntegerType(), False),
    StructField("amount", DecimalType(10, 2), True),
    StructField("order_date", TimestampType(), False),
    StructField("status", StringType(), False)
])

orders = spark.read.format("delta").schema(schema).load("s3://data-lake/bronze/orders")
customers = spark.read.format("delta").load("s3://data-lake/bronze/customers")

# 1. FILTERING — Remove unwanted records
valid_orders = orders.filter(
    (F.col("status") != "cancelled") &           # Remove cancelled orders
    (F.col("amount") > 0) &                      # Remove zero/negative amounts
    (F.col("order_date") >= "2024-01-01")        # Only recent data
)

# 2. DEDUPLICATION — Remove duplicate records
# Using window function for precise deduplication
window_spec = Window.partitionBy("order_id").orderBy(F.desc("order_date"))
deduped_orders = valid_orders \
    .withColumn("row_num", F.row_number().over(window_spec)) \
    .filter(F.col("row_num") == 1) \
    .drop("row_num")

# 3. CLEANING — Standardize data
cleaned_orders = deduped_orders \
    .withColumn("amount", F.col("amount").cast("decimal(10,2)")) \
    .withColumn("order_date", F.to_date("order_date")) \
    .withColumn("status", F.upper(F.col("status"))) \
    .withColumn("customer_id", F.col("customer_id").cast("integer"))

# 4. ENRICHMENT — Join with dimension tables
enriched_orders = cleaned_orders.join(
    customers.select(
        "customer_id", 
        "segment", 
        "region",
        "customer_tier"
    ),
    on="customer_id",
    how="left"
)

# 5. AGGREGATION — Create business metrics
daily_sales = enriched_orders \
    .groupBy(
        "order_date", 
        "region", 
        "segment",
        F.date_format("order_date", "yyyy-MM").alias("month")
    ) \
    .agg(
        F.count("order_id").alias("order_count"),
        F.sum("amount").alias("total_revenue"),
        F.avg("amount").alias("avg_order_value"),
        F.countDistinct("customer_id").alias("unique_customers"),
        F.max("amount").alias("max_order_value"),
        F.percentile_approx("amount", 0.5).alias("median_order_value")
    )

# 6. WRITE to silver/gold layer with partitioning
daily_sales.write.format("delta") \
    .mode("overwrite") \                          # Overwrite existing data
    .partitionBy("month") \                       # Partition by month
    .option("overwriteSchema", "true") \          # Allow schema changes
    .save("s3://data-lake/gold/daily_sales")

# Verify write
result_count = spark.read.format("delta").load("s3://data-lake/gold/daily_sales").count()
print(f"Successfully wrote {result_count} records to gold layer")

Stage 5: Data Serving

Data serving is the final stage where processed data is made available to consumers — dashboards, applications, APIs, and ML models.

Serving Patterns

PatternLatencyUse CaseExample
API ServingMillisecondsApplication queriesREST API for user profiles
Dashboard ServingSecondsBI reportingTableau dashboard refresh
Batch ExportHoursOffline analysisCSV export for quarterly report
Feature ServingMillisecondsML inferenceFeature vector for real-time model
Streaming ServingMillisecondsReal-time viewsLive dashboard updates
Serving Latency Formula

The total serving latency for a request:

Total_Latency = Network_Latency + Query_Time + Serialization_Time + Client_Rendering_Time

Where:

  • Network_Latency: Time for request to travel (typically 10-100ms)
  • Query_Time: Database/query execution time
  • Serialization_Time: Converting results to JSON/other format
  • Client_Rendering_Time: Time for client to render results

Data Product Design

# Example: Data product implementation with quality guarantees
from dataclasses import dataclass
from typing import List, Dict, Any, Callable
from datetime import datetime
import json

@dataclass
class DataProduct:
    """A self-contained data product with quality guarantees."""
    
    # Identity
    product_id: str                    # Unique identifier
    name: str                          # Human-readable name
    description: str                   # What this product provides
    owner: str                         # Team/person responsible
    
    # Schema
    schema: Dict[str, str]             # Column names and types
    partition_columns: List[str]       # Partitioning columns
    
    # Quality guarantees
    freshness_sla: str                 # e.g., "daily by 06:00 UTC"
    completeness_threshold: float      # e.g., 0.99 for 99% completeness
    accuracy_checks: List[str]         # Validation rules
    
    # Access
    access_pattern: str                # 'api', 'query', 'export'
    rate_limit: int                    # Requests per minute
    authentication: str                # 'api_key', 'oauth', 'iam'
    
    # Monitoring
    metrics_endpoint: str              # URL for metrics
    alert_channel: str                 # Where to send alerts
    
    def validate_request(
        self,
        query_params: Dict[str, Any]   # Parameters from the request
    ) -> bool:
        """Validate incoming request against product constraints."""
        # Check rate limit
        # Check authentication
        # Validate query parameters
        return True
    
    def get_data(
        self,
        filters: Dict[str, Any],       # Filter conditions
        columns: List[str] = None,     # Columns to return
        limit: int = 1000              # Maximum rows
    ) -> List[Dict]:
        """Fetch data from the product."""
        # Implementation varies by access pattern
        pass
    
    def get_metrics(self) -> Dict:
        """Get product health metrics."""
        return {
            'last_updated': datetime.now().isoformat(),
            'row_count': self._get_row_count(),
            'freshness_minutes': self._get_freshness(),
            'quality_score': self._calculate_quality_score()
        }

# Example data product
customer_360 = DataProduct(
    product_id="dp-customer-360",
    name="Customer 360 View",
    description="Complete customer profile with all interactions",
    owner="data-engineering",
    schema={
        "customer_id": "integer",
        "name": "string",
        "email": "string",
        "total_purchases": "decimal",
        "last_activity": "timestamp",
        "segment": "string"
    },
    partition_columns=["date"],
    freshness_sla="daily by 06:00 UTC",
    completeness_threshold=0.99,
    accuracy_checks=[
        "customer_id is unique",
        "email format is valid",
        "total_purchases >= 0"
    ],
    access_pattern="query",
    rate_limit=100,
    authentication="api_key",
    metrics_endpoint="/api/products/customer-360/metrics",
    alert_channel="slack://data-engineering-alerts"
)

Stage 6: Data Monitoring

Monitoring ensures data systems remain reliable, performant, and cost-effective.

What to Monitor

Metric CategoryMetricsToolsAlert Threshold
Pipeline HealthSuccess rate, duration, SLAAirflow, Datadog< 95% success rate
Data QualityNull rates, freshness, row countsGreat Expectations, dbt tests> 1% nulls, > 1hr stale
System PerformanceCPU, memory, query timeCloudWatch, Prometheus> 80% CPU, > 5s queries
CostStorage used, compute hours, queriesCloud billing, FinOps tools> 20% budget overrun
SecurityAccess logs, permission changesCloudTrail, audit logsAny unauthorized access

Monitoring Implementation

# Example: Comprehensive data quality monitoring with Great Expectations
import great_expectations as gx
from great_expectations.core import ExpectationConfiguration
from typing import List, Dict
import logging

class DataQualityMonitor:
    """Production-grade data quality monitoring."""
    
    def __init__(
        self,
        datasource_name: str,         # Name of the data source
        data_connector_name: str,     # Name of the data connector
        expectation_suite_name: str   # Name of the expectation suite
    ):
        self.context = gx.get_context()
        self.datasource = self.context.sources.add_pandas(datasource_name)
        self.data_connector = self.datasource.add_dataframe_asset(
            name=data_connector_name
        )
        self.expectation_suite = self.context.add_expectation_suite(
            expectation_suite_name
        )
    
    def add_expectation(
        self,
        expectation_type: str,        # Type of expectation
        kwargs: Dict[str, Any],       # Expectation parameters
        meta: Dict[str, Any] = None   # Optional metadata
    ) -> None:
        """Add an expectation to the suite."""
        expectation = ExpectationConfiguration(
            expectation_type=expectation_type,
            kwargs=kwargs,
            meta=meta or {}
        )
        self.expectation_suite.add_expectation(expectation)
    
    def validate_data(
        self,
        df,                           # DataFrame to validate
        batch_request: Dict = None    # Optional batch request
    ) -> Dict:
        """Validate data against expectations."""
        # Create batch request
        if batch_request is None:
            batch_request = self.data_connector.add_batch_request(
                dataframe=df
            )
        
        # Run validation
        validator = self.context.get_validator(
            batch_request=batch_request,
            expectation_suite=self.expectation_suite
        )
        
        results = validator.validate()
        
        # Process results
        return {
            'success': results.success,
            'statistics': {
                'evaluated_expectations': results.statistics['evaluated_expectations'],
                'successful_expectations': results.statistics['successful_expectations'],
                'unsuccessful_expectations': results.statistics['unsuccessful_expectations'],
                'success_percent': results.statistics['success_percent']
            },
            'results': [
                {
                    'expectation': r.expectation_config.expectation_type,
                    'success': r.success,
                    'result': r.result
                }
                for r in results.results
            ]
        }

# Usage example
monitor = DataQualityMonitor(
    datasource_name="my_datasource",
    data_connector_name="my_connector",
    expectation_suite_name="orders_quality_suite"
)

# Add expectations
monitor.add_expectation(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "order_id"},
    meta={"severity": "critical", "owner": "data-engineering"}
)

monitor.add_expectation(
    expectation_type="expect_column_values_to_be_unique",
    kwargs={"column": "order_id"},
    meta={"severity": "critical", "owner": "data-engineering"}
)

monitor.add_expectation(
    expectation_type="expect_column_values_to_be_between",
    kwargs={
        "column": "amount",
        "min_value": 0,
        "max_value": 1000000
    },
    meta={"severity": "warning", "owner": "data-engineering"}
)

monitor.add_expectation(
    expectation_type="expect_table_row_count_to_be_between",
    kwargs={
        "min_value": 100,
        "max_value": 1000000
    },
    meta={"severity": "warning", "owner": "data-engineering"}
)

# Validate data
results = monitor.validate_data(df)
print(f"Validation success: {results['success']}")
print(f"Statistics: {results['statistics']}")

End-to-End Pipeline Example

# Complete lifecycle example: Daily sales pipeline
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=1),
    'sla': timedelta(hours=2),
    'email': ['alerts@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
}

def extract_orders(**context):
    """Stage 1: Extract from source database."""
    import psycopg2
    import pandas as pd
    
    # Connection with timeout and statement timeout
    conn = psycopg2.connect(
        "postgresql://source-db/orders",
        connect_timeout=30,
        options="-c statement_timeout=300000"
    )
    
    df = pd.read_sql(
        "SELECT * FROM orders WHERE date = %s", 
        conn, 
        params=[context['ds']]
    )
    
    # Write to bronze layer
    df.to_parquet(
        f"s3://data-lake/bronze/orders/{context['ds']}.parquet",
        engine='pyarrow',
        compression='snappy',
        index=False
    )
    
    conn.close()
    return len(df)

def transform_orders(**context):
    """Stage 2: Transform and clean."""
    import pandas as pd
    from pyarrow import parquet
    
    # Read from bronze layer
    df = pd.read_parquet(f"s3://data-lake/bronze/orders/{context['ds']}.parquet")
    
    # Clean and transform
    cleaned = df.drop_duplicates(subset=['order_id'])
    cleaned = cleaned[cleaned['amount'] > 0]
    cleaned['order_date'] = pd.to_datetime(cleaned['order_date'])
    
    # Add quality metrics
    quality_metrics = {
        'row_count': len(cleaned),
        'null_count': cleaned.isnull().sum().sum(),
        'duplicate_count': len(df) - len(cleaned),
    }
    
    # Write to silver layer
    cleaned.to_parquet(
        f"s3://data-lake/silver/orders/{context['ds']}.parquet",
        engine='pyarrow',
        compression='snappy',
        index=False
    )
    
    return quality_metrics

def load_to_warehouse(**context):
    """Stage 3: Load to data warehouse."""
    from sqlalchemy import create_engine
    import pandas as pd
    
    # Create engine with connection pooling
    engine = create_engine(
        "snowflake://warehouse/analytics",
        pool_size=5,                    # Connection pool size
        max_overflow=10,                # Max overflow connections
        pool_timeout=30                 # Timeout for getting connection
    )
    
    df = pd.read_parquet(f"s3://data-lake/silver/orders/{context['ds']}.parquet")
    
    # Load with chunking for large datasets
    df.to_sql(
        'daily_orders', 
        engine, 
        if_exists='append',             # Append to existing table
        index=False,                    # Don't write index
        chunksize=10000,                # Write in chunks
        method='multi'                  # Use multi-row inserts
    )

def monitor(**context):
    """Stage 4: Validate and monitor."""
    # Run quality checks
    pass

# DAG definition
with DAG('daily_sales_pipeline', default_args=default_args,
         schedule_interval='@daily', catchup=False,
         max_active_runs=1, tags=['production', 'sales']) as dag:

    extract = PythonOperator(task_id='extract', python_callable=extract_orders)
    transform = PythonOperator(task_id='transform', python_callable=transform_orders)
    load = PythonOperator(task_id='load', python_callable=load_to_warehouse)
    monitor_task = PythonOperator(task_id='monitor', python_callable=monitor)
    
    # Task dependencies with explicit ordering
    extract >> transform >> load >> monitor_task

Best Practices

Pipeline Design Principles

  1. Idempotency — Pipelines should produce the same result when run multiple times
  2. Fault Tolerance — Handle failures gracefully with retries and fallbacks
  3. Observability — Log everything, set up alerts for anomalies
  4. Documentation — Maintain clear pipeline and schema documentation
  5. Testing — Unit tests for transformations, integration tests for end-to-end

Data Quality Framework

# Example: Data quality framework with scoring
from dataclasses import dataclass
from typing import List, Dict
from datetime import datetime

@dataclass
class QualityScore:
    """Calculates overall data quality score."""
    
    completeness: float     # Percentage of non-null values
    accuracy: float         # Percentage of valid values
    consistency: float      # Percentage of consistent values
    timeliness: float       # How fresh the data is
    uniqueness: float       # Percentage of unique values
    
    def calculate_overall_score(
        self,
        weights: Dict[str, float] = None  # Optional custom weights
    ) -> float:
        """Calculate weighted quality score."""
        if weights is None:
            weights = {
                'completeness': 0.25,
                'accuracy': 0.25,
                'consistency': 0.20,
                'timeliness': 0.15,
                'uniqueness': 0.15
            }
        
        score = (
            self.completeness * weights['completeness'] +
            self.accuracy * weights['accuracy'] +
            self.consistency * weights['consistency'] +
            self.timeliness * weights['timeliness'] +
            self.uniqueness * weights['uniqueness']
        )
        
        return round(score * 100, 2)  # Return as percentage
    
    def get_grade(self) -> str:
        """Convert score to letter grade."""
        score = self.calculate_overall_score()
        
        if score >= 95:
            return 'A'
        elif score >= 90:
            return 'B'
        elif score >= 80:
            return 'C'
        elif score >= 70:
            return 'D'
        else:
            return 'F'

# Usage example
quality = QualityScore(
    completeness=0.98,      # 98% non-null
    accuracy=0.95,          # 95% valid
    consistency=0.92,       # 92% consistent
    timeliness=0.99,        # 99% fresh
    uniqueness=0.99         # 99% unique
)

print(f"Quality Score: {quality.calculate_overall_score()}%")
print(f"Grade: {quality.get_grade()}")

Key Takeaways

  1. The lifecycle has 6 stages: Generate -> Ingest -> Store -> Process -> Serve -> Monitor
  2. Each stage has specific tools and patterns — choose based on latency, volume, and cost requirements
  3. The Medallion Architecture (Bronze -> Silver -> Gold) provides a clean separation of concerns
  4. Monitoring is not optional — without it, data issues go unnoticed until they cause business impact
  5. End-to-end ownership — data engineers are responsible for the entire lifecycle, not just individual stages
  6. Design for change — source systems, requirements, and tools evolve; build flexible pipelines

Practice Exercises

  1. Lifecycle mapping: For a "real-time social media analytics" use case, describe each stage of the data lifecycle with specific tools.

  2. Source analysis: Identify 3 data sources in your organization. For each, document: volume, velocity, format, and recommended ingestion pattern.

  3. Pipeline design: Design an end-to-end pipeline for "e-commerce order analytics" including Bronze, Silver, and Gold layers.

  4. Monitoring plan: Create a monitoring checklist for a daily ETL pipeline. Include metrics, thresholds, and alerting channels.

  5. Cost estimation: Estimate monthly costs for storing 1TB of data in S3 vs Snowflake vs PostgreSQL. Include storage and query costs.

See Also

Premium Content

The Data Engineering Lifecycle — From Source to Insight

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
💼Interview Prep
📜Certificates
🤝Community Access

Already a member? Log in

Need Expert Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement