Overview
The data engineering lifecycle is the end-to-end journey data takes from its creation in source systems to its consumption by users, applications, and models. Understanding each stage — and the tools, patterns, and best practices associated with it — is fundamental to building reliable data systems.
DfData Engineering Lifecycle
The data engineering lifecycle is the complete process of managing data from its origin to its consumption. It can be formally defined as:
Lifecycle = {Generate → Ingest → Store → Process → Serve → Monitor}
Each stage has specific objectives, tools, and quality metrics that must be optimized for the overall system to function effectively.
Stage 1: Data Generation
Data is generated by various source systems across the organization. Understanding the characteristics of each source type informs how you design ingestion and storage.
Source Types
| Source Type | Examples | Characteristics | Challenges |
|---|---|---|---|
| Transactional Databases | PostgreSQL, MySQL, Oracle | Structured, ACID, high consistency | Schema changes, CDC complexity |
| APIs | Stripe, Salesforce, Twitter | Semi-structured, rate-limited | Throttling, pagination, auth |
| Log Files | Apache, Nginx, Application logs | Unstructured, high volume | Parsing, rotation, retention |
| IoT Sensors | Temperature, GPS, Accelerometer | Streaming, time-series, massive scale | Out-of-order events, gaps |
| Flat Files | CSV, Excel, JSON exports | Batch, variable schema | Encoding, format consistency |
| Streaming Events | Clickstream, Transactions | Real-time, ordered, immutable | Backpressure, exactly-once |
Data Characteristics
Data Volume Growth Formula
The growth of data volume can be modeled exponentially:
V(t) = V₀ × (1 + r)^t
Where:
- V(t): Data volume at time t
- V₀: Initial data volume
- r: Growth rate (e.g., 0.5 for 50% annual growth)
- t: Time in years
Example: If you have 1TB today with 50% annual growth:
V(3) = 1TB × (1 + 0.5)^3 = 3.375TB in 3 years
Source System Monitoring
# Example: Source system health monitoring
from dataclasses import dataclass
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import logging
@dataclass
class SourceSystemMetrics:
"""Metrics for monitoring source system health."""
system_name: str # Name of the source system
last_extract_time: datetime # When data was last extracted
row_count_trend: List[int] # Historical row counts
schema_version: str # Current schema version
connection_status: str # 'healthy', 'degraded', 'down'
latency_ms: float # Average query latency
error_rate: float # Percentage of failed queries
def is_healthy(self) -> bool:
"""Check if source system is healthy."""
checks = [
self.connection_status == 'healthy',
self.latency_ms < 1000, # Under 1 second
self.error_rate < 0.01, # Less than 1% errors
(datetime.now() - self.last_extract_time) < timedelta(hours=24),
len(self.row_count_trend) > 0
]
return all(checks)
def get_health_report(self) -> Dict:
"""Generate detailed health report."""
return {
'system': self.system_name,
'status': 'HEALTHY' if self.is_healthy() else 'UNHEALTHY',
'last_extract': self.last_extract_time.isoformat(),
'row_count_trend': self.row_count_trend,
'schema_version': self.schema_version,
'latency_ms': self.latency_ms,
'error_rate': f"{self.error_rate * 100:.2f}%",
'recommendations': self._get_recommendations()
}
def _get_recommendations(self) -> List[str]:
"""Generate recommendations based on metrics."""
recommendations = []
if self.latency_ms > 5000:
recommendations.append("Consider adding indexes or optimizing queries")
if self.error_rate > 0.05:
recommendations.append("Investigate source system stability")
if (datetime.now() - self.last_extract_time) > timedelta(hours=12):
recommendations.append("Check extraction pipeline schedule")
return recommendations
Stage 2: Data Ingestion
Ingestion is the process of moving data from source systems to your data infrastructure. The choice of ingestion method depends on latency requirements, data volume, and source capabilities.
Ingestion Patterns
Ingestion Pattern Selection
Choose the right pattern based on these factors:
Pattern_Score = (Latency_Weight × Latency_Score + Volume_Weight × Volume_Score + Cost_Weight × Cost_Score)
Where weights sum to 1.0:
- Batch: Low latency score (1), High volume score (10), Low cost score (10)
- Micro-batch: Medium latency score (7), Medium volume score (8), Medium cost score (7)
- Streaming: High latency score (10), Medium volume score (7), High cost score (4)
- CDC: Medium latency score (6), High volume score (9), Medium cost score (6)
Batch Ingestion
Best for: Historical data, daily reports, non-time-sensitive analytics.
# Example: Batch ingestion from PostgreSQL to S3 using Python
import psycopg2
import pandas as pd
import boto3
from io import StringIO
from datetime import datetime, timedelta
def extract_batch(
connection_params: dict, # Database connection parameters
query: str, # SQL query to extract data
table_name: str, # Name of the source table
batch_date: str = None, # Batch date for incremental loads
compression: str = 'gzip', # Compression algorithm
file_format: str = 'csv' # Output format: 'csv', 'parquet', 'json'
) -> int:
"""Extract data from source and upload to S3 with all parameters explained."""
# Establish database connection with timeout
conn = psycopg2.connect(
**connection_params,
connect_timeout=30, # Connection timeout in seconds
options="-c statement_timeout=300000" # Query timeout: 5 minutes
)
# Execute query with optional incremental filter
if batch_date:
query = f"{query} WHERE updated_at >= '{batch_date}'"
df = pd.read_sql_query(
query,
conn,
chunksize=100000 # Read in chunks for memory efficiency
)
conn.close()
# Convert to specified format in memory
if file_format == 'csv':
csv_buffer = StringIO()
df.to_csv(
csv_buffer,
index=False, # Don't include row indices
encoding='utf-8', # Character encoding
quoting=1 # Quote all non-numeric fields
)
body = csv_buffer.getvalue()
elif file_format == 'parquet':
parquet_buffer = BytesIO()
df.to_parquet(
parquet_buffer,
engine='pyarrow', # Parquet engine
compression=compression, # Compression algorithm
index=False # Don't include row indices
)
body = parquet_buffer.getvalue()
# Upload to S3 with metadata
s3 = boto3.client('s3')
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
key = f'raw/{table_name}/{batch_date or timestamp}.{file_format}'
s3.put_object(
Bucket='data-lake', # S3 bucket name
Key=key, # Object key
Body=body, # File content
ContentType=f'application/{file_format}', # MIME type
ServerSideEncryption='aws:kms', # Encryption type
Metadata={ # Custom metadata
'source_table': table_name,
'batch_date': batch_date or timestamp,
'row_count': str(len(df)),
'extraction_time': datetime.now().isoformat()
}
)
return len(df)
# Usage example
rows = extract_batch(
connection_params={
"host": "source-db.example.com", # Database host
"database": "production", # Database name
"user": "reader", # Read-only user
"password": "secret", # Password
"port": 5432 # Database port
},
query="SELECT * FROM orders WHERE date >= CURRENT_DATE - 1",
table_name="orders",
batch_date="2024-01-15",
compression="snappy",
file_format="parquet"
)
Streaming Ingestion
Best for: Real-time dashboards, fraud detection, live recommendations.
# Example: Streaming ingestion with Apache Kafka (using confluent-kafka)
from confluent_kafka import Producer, Consumer, KafkaError
import json
import time
from typing import Callable, Dict, Any
class KafkaEventProducer:
"""Production-grade Kafka producer with all parameters explained."""
def __init__(
self,
bootstrap_servers: str, # Kafka broker addresses
client_id: str, # Unique client identifier
acks: str = 'all', # Acknowledgment level: '0', '1', 'all'
retries: int = 3, # Number of retries
batch_size: int = 16384, # Batch size in bytes
linger_ms: int = 10, # Wait time to batch messages
compression_type: str = 'snappy', # Compression algorithm
enable_idempotence: bool = True # Exactly-once semantics
):
self.producer = Producer({
'bootstrap.servers': bootstrap_servers,
'client.id': client_id,
'acks': acks,
'retries': retries,
'batch.size': batch_size,
'linger.ms': linger_ms,
'compression.type': compression_type,
'enable.idempotence': enable_idempotence
})
def produce_event(
self,
topic: str, # Kafka topic name
key: str, # Message key for partitioning
value: Dict[str, Any], # Message payload
headers: Dict[str, str] = None, # Optional headers
callback: Callable = None # Delivery callback function
) -> None:
"""Produce a single event to Kafka."""
# Serialize message
value_bytes = json.dumps(value).encode('utf-8')
# Prepare headers if provided
headers_list = None
if headers:
headers_list = [(k, v.encode('utf-8')) for k, v in headers.items()]
# Produce message
self.producer.produce(
topic=topic,
key=key.encode('utf-8') if key else None,
value=value_bytes,
headers=headers_list,
callback=callback or self._default_callback
)
# Trigger callbacks
self.producer.poll(0)
def flush(self, timeout: float = 30.0) -> int:
"""Flush all pending messages."""
return self.producer.flush(timeout)
def _default_callback(self, err, msg):
"""Default delivery callback."""
if err:
print(f"Delivery failed: {err}")
else:
print(f"Event delivered to {msg.topic()} [{msg.partition()}]")
# Usage example
producer = KafkaEventProducer(
bootstrap_servers='kafka-broker-1:9092,kafka-broker-2:9092',
client_id='data-ingestion-service',
acks='all',
retries=3,
batch_size=32768,
linger_ms=50,
compression_type='lz4',
enable_idempotence=True
)
event = {
"user_id": 12345,
"action": "purchase",
"timestamp": time.time(),
"amount": 99.99,
"currency": "USD",
"product_id": "PROD-001"
}
producer.produce_event(
topic='user_events',
key=str(event['user_id']), # Partition by user_id
value=event,
headers={
'source': 'web-app',
'version': '1.0',
'trace_id': 'abc-123-def-456'
}
)
producer.flush()
Change Data Capture (CDC)
Best for: Keeping downstream systems in sync with source databases without full reloads.
CDC Latency Formula
CDC provides near-real-time synchronization with latency:
CDC_Latency = Binlog_Read_Time + Kafka_Producer_Time + Kafka_Consumer_Time + Apply_Time
Typical values:
- Binlog_Read_Time: 10-50ms
- Kafka_Producer_Time: 5-20ms
- Kafka_Consumer_Time: 5-20ms
- Apply_Time: 10-100ms
- Total: 30-190ms (sub-second)
Stage 3: Data Storage
Storage choices depend on data format, access patterns, cost, and query requirements.
Storage Architecture Layers
Medallion Architecture (Bronze -> Silver -> Gold)
| Layer | Purpose | Data Format | Example | SLA |
|---|---|---|---|---|
| Bronze | Raw data, as-is from source | Parquet, JSON, CSV | Raw API responses | Real-time |
| Silver | Cleaned, deduplicated, validated | Parquet, Delta Lake | Deduplicated orders | Hourly |
| Gold | Business-level aggregations | Parquet, Delta Lake | Daily sales by region | Daily |
Storage Cost Calculation
Total storage cost can be calculated as:
Storage_Cost = (Data_Size × Storage_Rate × Duration) + (API_Requests × Request_Cost) + (Data_Transfer × Transfer_Rate)
Where:
- Data_Size: Volume of data stored (GB)
- Storage_Rate: Cost per GB per month (e.g., $0.023 for S3 Standard)
- Duration: Time period in months
- API_Requests: Number of read/write operations
- Request_Cost: Cost per 10,000 requests (e.g., $0.005 for S3)
- Data_Transfer: Data transferred out (GB)
- Transfer_Rate: Cost per GB transferred (e.g., $0.09 for first 10TB)
Storage Options Comparison
| Storage Type | Best For | Cost | Query Performance | Durability |
|---|---|---|---|---|
| Data Lake (S3/GCS) | Raw data, archival | Low | Requires processing | 11 9's |
| Data Warehouse (Snowflake/BigQuery) | Analytics, reporting | Medium-High | Excellent | High |
| Operational DB (PostgreSQL) | Application queries | Medium | High for OLTP | High |
| OLAP Cube (Druid, ClickHouse) | Real-time analytics | Medium | Very High | High |
| Feature Store (Feast) | ML features | Low-Medium | High for ML | Medium |
Stage 4: Data Processing
Processing transforms raw data into useful information through cleaning, transformation, and enrichment.
Processing Paradigms
+-------------------------------------------------------------+
| PROCESSING PARADIGMS |
+-------------------------------------------------------------+
| Batch | Process large datasets periodically |
| | Tools: Spark, dbt, SQL |
+-------------------------------------------------------------+
| Micro-batch | Process small batches every few minutes |
| | Tools: Spark Streaming, Flink (batch) |
+-------------------------------------------------------------+
| Stream | Process events as they arrive |
| | Tools: Kafka Streams, Flink, Spark |
+-------------------------------------------------------------+
| Request-response| Process on-demand via API |
| | Tools: Lambda functions, FastAPI |
+-------------------------------------------------------------+
Processing Latency vs Throughput Tradeoff
Common Processing Operations
# Example: Data processing operations using PySpark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, TimestampType
spark = SparkSession.builder \
.appName("DataProcessing") \
.config("spark.sql.shuffle.partitions", "200") \ # Number of shuffle partitions
.config("spark.sql.adaptive.enabled", "true") \ # Adaptive query execution
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \ # Coalesce small partitions
.getOrCreate()
# Read bronze layer data with schema enforcement
schema = StructType([
StructField("order_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", DecimalType(10, 2), True),
StructField("order_date", TimestampType(), False),
StructField("status", StringType(), False)
])
orders = spark.read.format("delta").schema(schema).load("s3://data-lake/bronze/orders")
customers = spark.read.format("delta").load("s3://data-lake/bronze/customers")
# 1. FILTERING — Remove unwanted records
valid_orders = orders.filter(
(F.col("status") != "cancelled") & # Remove cancelled orders
(F.col("amount") > 0) & # Remove zero/negative amounts
(F.col("order_date") >= "2024-01-01") # Only recent data
)
# 2. DEDUPLICATION — Remove duplicate records
# Using window function for precise deduplication
window_spec = Window.partitionBy("order_id").orderBy(F.desc("order_date"))
deduped_orders = valid_orders \
.withColumn("row_num", F.row_number().over(window_spec)) \
.filter(F.col("row_num") == 1) \
.drop("row_num")
# 3. CLEANING — Standardize data
cleaned_orders = deduped_orders \
.withColumn("amount", F.col("amount").cast("decimal(10,2)")) \
.withColumn("order_date", F.to_date("order_date")) \
.withColumn("status", F.upper(F.col("status"))) \
.withColumn("customer_id", F.col("customer_id").cast("integer"))
# 4. ENRICHMENT — Join with dimension tables
enriched_orders = cleaned_orders.join(
customers.select(
"customer_id",
"segment",
"region",
"customer_tier"
),
on="customer_id",
how="left"
)
# 5. AGGREGATION — Create business metrics
daily_sales = enriched_orders \
.groupBy(
"order_date",
"region",
"segment",
F.date_format("order_date", "yyyy-MM").alias("month")
) \
.agg(
F.count("order_id").alias("order_count"),
F.sum("amount").alias("total_revenue"),
F.avg("amount").alias("avg_order_value"),
F.countDistinct("customer_id").alias("unique_customers"),
F.max("amount").alias("max_order_value"),
F.percentile_approx("amount", 0.5).alias("median_order_value")
)
# 6. WRITE to silver/gold layer with partitioning
daily_sales.write.format("delta") \
.mode("overwrite") \ # Overwrite existing data
.partitionBy("month") \ # Partition by month
.option("overwriteSchema", "true") \ # Allow schema changes
.save("s3://data-lake/gold/daily_sales")
# Verify write
result_count = spark.read.format("delta").load("s3://data-lake/gold/daily_sales").count()
print(f"Successfully wrote {result_count} records to gold layer")
Stage 5: Data Serving
Data serving is the final stage where processed data is made available to consumers — dashboards, applications, APIs, and ML models.
Serving Patterns
| Pattern | Latency | Use Case | Example |
|---|---|---|---|
| API Serving | Milliseconds | Application queries | REST API for user profiles |
| Dashboard Serving | Seconds | BI reporting | Tableau dashboard refresh |
| Batch Export | Hours | Offline analysis | CSV export for quarterly report |
| Feature Serving | Milliseconds | ML inference | Feature vector for real-time model |
| Streaming Serving | Milliseconds | Real-time views | Live dashboard updates |
The total serving latency for a request:
Total_Latency = Network_Latency + Query_Time + Serialization_Time + Client_Rendering_Time
Where:
- Network_Latency: Time for request to travel (typically 10-100ms)
- Query_Time: Database/query execution time
- Serialization_Time: Converting results to JSON/other format
- Client_Rendering_Time: Time for client to render results
Data Product Design
# Example: Data product implementation with quality guarantees
from dataclasses import dataclass
from typing import List, Dict, Any, Callable
from datetime import datetime
import json
@dataclass
class DataProduct:
"""A self-contained data product with quality guarantees."""
# Identity
product_id: str # Unique identifier
name: str # Human-readable name
description: str # What this product provides
owner: str # Team/person responsible
# Schema
schema: Dict[str, str] # Column names and types
partition_columns: List[str] # Partitioning columns
# Quality guarantees
freshness_sla: str # e.g., "daily by 06:00 UTC"
completeness_threshold: float # e.g., 0.99 for 99% completeness
accuracy_checks: List[str] # Validation rules
# Access
access_pattern: str # 'api', 'query', 'export'
rate_limit: int # Requests per minute
authentication: str # 'api_key', 'oauth', 'iam'
# Monitoring
metrics_endpoint: str # URL for metrics
alert_channel: str # Where to send alerts
def validate_request(
self,
query_params: Dict[str, Any] # Parameters from the request
) -> bool:
"""Validate incoming request against product constraints."""
# Check rate limit
# Check authentication
# Validate query parameters
return True
def get_data(
self,
filters: Dict[str, Any], # Filter conditions
columns: List[str] = None, # Columns to return
limit: int = 1000 # Maximum rows
) -> List[Dict]:
"""Fetch data from the product."""
# Implementation varies by access pattern
pass
def get_metrics(self) -> Dict:
"""Get product health metrics."""
return {
'last_updated': datetime.now().isoformat(),
'row_count': self._get_row_count(),
'freshness_minutes': self._get_freshness(),
'quality_score': self._calculate_quality_score()
}
# Example data product
customer_360 = DataProduct(
product_id="dp-customer-360",
name="Customer 360 View",
description="Complete customer profile with all interactions",
owner="data-engineering",
schema={
"customer_id": "integer",
"name": "string",
"email": "string",
"total_purchases": "decimal",
"last_activity": "timestamp",
"segment": "string"
},
partition_columns=["date"],
freshness_sla="daily by 06:00 UTC",
completeness_threshold=0.99,
accuracy_checks=[
"customer_id is unique",
"email format is valid",
"total_purchases >= 0"
],
access_pattern="query",
rate_limit=100,
authentication="api_key",
metrics_endpoint="/api/products/customer-360/metrics",
alert_channel="slack://data-engineering-alerts"
)
Stage 6: Data Monitoring
Monitoring ensures data systems remain reliable, performant, and cost-effective.
What to Monitor
| Metric Category | Metrics | Tools | Alert Threshold |
|---|---|---|---|
| Pipeline Health | Success rate, duration, SLA | Airflow, Datadog | < 95% success rate |
| Data Quality | Null rates, freshness, row counts | Great Expectations, dbt tests | > 1% nulls, > 1hr stale |
| System Performance | CPU, memory, query time | CloudWatch, Prometheus | > 80% CPU, > 5s queries |
| Cost | Storage used, compute hours, queries | Cloud billing, FinOps tools | > 20% budget overrun |
| Security | Access logs, permission changes | CloudTrail, audit logs | Any unauthorized access |
Monitoring Implementation
# Example: Comprehensive data quality monitoring with Great Expectations
import great_expectations as gx
from great_expectations.core import ExpectationConfiguration
from typing import List, Dict
import logging
class DataQualityMonitor:
"""Production-grade data quality monitoring."""
def __init__(
self,
datasource_name: str, # Name of the data source
data_connector_name: str, # Name of the data connector
expectation_suite_name: str # Name of the expectation suite
):
self.context = gx.get_context()
self.datasource = self.context.sources.add_pandas(datasource_name)
self.data_connector = self.datasource.add_dataframe_asset(
name=data_connector_name
)
self.expectation_suite = self.context.add_expectation_suite(
expectation_suite_name
)
def add_expectation(
self,
expectation_type: str, # Type of expectation
kwargs: Dict[str, Any], # Expectation parameters
meta: Dict[str, Any] = None # Optional metadata
) -> None:
"""Add an expectation to the suite."""
expectation = ExpectationConfiguration(
expectation_type=expectation_type,
kwargs=kwargs,
meta=meta or {}
)
self.expectation_suite.add_expectation(expectation)
def validate_data(
self,
df, # DataFrame to validate
batch_request: Dict = None # Optional batch request
) -> Dict:
"""Validate data against expectations."""
# Create batch request
if batch_request is None:
batch_request = self.data_connector.add_batch_request(
dataframe=df
)
# Run validation
validator = self.context.get_validator(
batch_request=batch_request,
expectation_suite=self.expectation_suite
)
results = validator.validate()
# Process results
return {
'success': results.success,
'statistics': {
'evaluated_expectations': results.statistics['evaluated_expectations'],
'successful_expectations': results.statistics['successful_expectations'],
'unsuccessful_expectations': results.statistics['unsuccessful_expectations'],
'success_percent': results.statistics['success_percent']
},
'results': [
{
'expectation': r.expectation_config.expectation_type,
'success': r.success,
'result': r.result
}
for r in results.results
]
}
# Usage example
monitor = DataQualityMonitor(
datasource_name="my_datasource",
data_connector_name="my_connector",
expectation_suite_name="orders_quality_suite"
)
# Add expectations
monitor.add_expectation(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "order_id"},
meta={"severity": "critical", "owner": "data-engineering"}
)
monitor.add_expectation(
expectation_type="expect_column_values_to_be_unique",
kwargs={"column": "order_id"},
meta={"severity": "critical", "owner": "data-engineering"}
)
monitor.add_expectation(
expectation_type="expect_column_values_to_be_between",
kwargs={
"column": "amount",
"min_value": 0,
"max_value": 1000000
},
meta={"severity": "warning", "owner": "data-engineering"}
)
monitor.add_expectation(
expectation_type="expect_table_row_count_to_be_between",
kwargs={
"min_value": 100,
"max_value": 1000000
},
meta={"severity": "warning", "owner": "data-engineering"}
)
# Validate data
results = monitor.validate_data(df)
print(f"Validation success: {results['success']}")
print(f"Statistics: {results['statistics']}")
End-to-End Pipeline Example
# Complete lifecycle example: Daily sales pipeline
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 2,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=1),
'sla': timedelta(hours=2),
'email': ['alerts@company.com'],
'email_on_failure': True,
'email_on_retry': False,
}
def extract_orders(**context):
"""Stage 1: Extract from source database."""
import psycopg2
import pandas as pd
# Connection with timeout and statement timeout
conn = psycopg2.connect(
"postgresql://source-db/orders",
connect_timeout=30,
options="-c statement_timeout=300000"
)
df = pd.read_sql(
"SELECT * FROM orders WHERE date = %s",
conn,
params=[context['ds']]
)
# Write to bronze layer
df.to_parquet(
f"s3://data-lake/bronze/orders/{context['ds']}.parquet",
engine='pyarrow',
compression='snappy',
index=False
)
conn.close()
return len(df)
def transform_orders(**context):
"""Stage 2: Transform and clean."""
import pandas as pd
from pyarrow import parquet
# Read from bronze layer
df = pd.read_parquet(f"s3://data-lake/bronze/orders/{context['ds']}.parquet")
# Clean and transform
cleaned = df.drop_duplicates(subset=['order_id'])
cleaned = cleaned[cleaned['amount'] > 0]
cleaned['order_date'] = pd.to_datetime(cleaned['order_date'])
# Add quality metrics
quality_metrics = {
'row_count': len(cleaned),
'null_count': cleaned.isnull().sum().sum(),
'duplicate_count': len(df) - len(cleaned),
}
# Write to silver layer
cleaned.to_parquet(
f"s3://data-lake/silver/orders/{context['ds']}.parquet",
engine='pyarrow',
compression='snappy',
index=False
)
return quality_metrics
def load_to_warehouse(**context):
"""Stage 3: Load to data warehouse."""
from sqlalchemy import create_engine
import pandas as pd
# Create engine with connection pooling
engine = create_engine(
"snowflake://warehouse/analytics",
pool_size=5, # Connection pool size
max_overflow=10, # Max overflow connections
pool_timeout=30 # Timeout for getting connection
)
df = pd.read_parquet(f"s3://data-lake/silver/orders/{context['ds']}.parquet")
# Load with chunking for large datasets
df.to_sql(
'daily_orders',
engine,
if_exists='append', # Append to existing table
index=False, # Don't write index
chunksize=10000, # Write in chunks
method='multi' # Use multi-row inserts
)
def monitor(**context):
"""Stage 4: Validate and monitor."""
# Run quality checks
pass
# DAG definition
with DAG('daily_sales_pipeline', default_args=default_args,
schedule_interval='@daily', catchup=False,
max_active_runs=1, tags=['production', 'sales']) as dag:
extract = PythonOperator(task_id='extract', python_callable=extract_orders)
transform = PythonOperator(task_id='transform', python_callable=transform_orders)
load = PythonOperator(task_id='load', python_callable=load_to_warehouse)
monitor_task = PythonOperator(task_id='monitor', python_callable=monitor)
# Task dependencies with explicit ordering
extract >> transform >> load >> monitor_task
Best Practices
Pipeline Design Principles
- Idempotency — Pipelines should produce the same result when run multiple times
- Fault Tolerance — Handle failures gracefully with retries and fallbacks
- Observability — Log everything, set up alerts for anomalies
- Documentation — Maintain clear pipeline and schema documentation
- Testing — Unit tests for transformations, integration tests for end-to-end
Data Quality Framework
# Example: Data quality framework with scoring
from dataclasses import dataclass
from typing import List, Dict
from datetime import datetime
@dataclass
class QualityScore:
"""Calculates overall data quality score."""
completeness: float # Percentage of non-null values
accuracy: float # Percentage of valid values
consistency: float # Percentage of consistent values
timeliness: float # How fresh the data is
uniqueness: float # Percentage of unique values
def calculate_overall_score(
self,
weights: Dict[str, float] = None # Optional custom weights
) -> float:
"""Calculate weighted quality score."""
if weights is None:
weights = {
'completeness': 0.25,
'accuracy': 0.25,
'consistency': 0.20,
'timeliness': 0.15,
'uniqueness': 0.15
}
score = (
self.completeness * weights['completeness'] +
self.accuracy * weights['accuracy'] +
self.consistency * weights['consistency'] +
self.timeliness * weights['timeliness'] +
self.uniqueness * weights['uniqueness']
)
return round(score * 100, 2) # Return as percentage
def get_grade(self) -> str:
"""Convert score to letter grade."""
score = self.calculate_overall_score()
if score >= 95:
return 'A'
elif score >= 90:
return 'B'
elif score >= 80:
return 'C'
elif score >= 70:
return 'D'
else:
return 'F'
# Usage example
quality = QualityScore(
completeness=0.98, # 98% non-null
accuracy=0.95, # 95% valid
consistency=0.92, # 92% consistent
timeliness=0.99, # 99% fresh
uniqueness=0.99 # 99% unique
)
print(f"Quality Score: {quality.calculate_overall_score()}%")
print(f"Grade: {quality.get_grade()}")
Key Takeaways
- The lifecycle has 6 stages: Generate -> Ingest -> Store -> Process -> Serve -> Monitor
- Each stage has specific tools and patterns — choose based on latency, volume, and cost requirements
- The Medallion Architecture (Bronze -> Silver -> Gold) provides a clean separation of concerns
- Monitoring is not optional — without it, data issues go unnoticed until they cause business impact
- End-to-end ownership — data engineers are responsible for the entire lifecycle, not just individual stages
- Design for change — source systems, requirements, and tools evolve; build flexible pipelines
Practice Exercises
-
Lifecycle mapping: For a "real-time social media analytics" use case, describe each stage of the data lifecycle with specific tools.
-
Source analysis: Identify 3 data sources in your organization. For each, document: volume, velocity, format, and recommended ingestion pattern.
-
Pipeline design: Design an end-to-end pipeline for "e-commerce order analytics" including Bronze, Silver, and Gold layers.
-
Monitoring plan: Create a monitoring checklist for a daily ETL pipeline. Include metrics, thresholds, and alerting channels.
-
Cost estimation: Estimate monthly costs for storing 1TB of data in S3 vs Snowflake vs PostgreSQL. Include storage and query costs.
See Also
- What is Data Engineering — Introduction to data engineering
- SQL Fundamentals — Essential SQL skills
- Advanced SQL — Advanced SQL techniques
- Data Formats — JSON, Parquet, Avro comparison
- Cloud Platforms Overview — AWS, GCP, and Azure comparison