🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Python for Data Engineers — Essential Libraries and Patterns

Data Engineering FoundationsPython Programming🟢 Free Lesson

Advertisement

Python in Data Engineering

Python is the primary programming language for data engineers. It's used for building pipelines, interacting with databases, automating tasks, and processing data at scale.

Python Ecosystem Map for Data EngineeringPython CoreData Engineering LanguageOrchestrationAirflow, DagsterPrefect, LuigiProcessingPandas, PySparkDask, PolarsDatabaseSQLAlchemypsycopg2, mysqlCloudboto3 (AWS)google-cloud, azureCLI & SchedulingClick, argparseAPSchedulerKey Libraries: pandas, SQLAlchemy, boto3, pyarrow, great-expectations, pydantic

Theory: Why Python Dominates Data Engineering

Python became the de facto language for data engineering due to several factors:

  1. Rich ecosystem — Libraries like pandas, SQLAlchemy, and boto3 cover virtually every data task.
  2. Readability — Python's syntax reduces cognitive load, enabling faster development of complex pipelines.
  3. Integration — Python interfaces seamlessly with databases, cloud APIs, message queues, and orchestration tools.
  4. Community — Massive open-source community means most problems have battle-tested solutions.
  5. Rapid prototyping — Scripts that work locally can be promoted to production with minimal changes.

Setting Up Your Environment

Virtual Environment Setup FlowCreate venvpython -m venvActivatesource venv/bin/activateInstall depspip install -rFreeze depspip freezeDeployDocker/ProductionBest Practice: Always use virtual environments to isolate project dependenciesCommands: python -m venv venv | source venv/bin/activate | pip install | pip freeze > requirements.txt

Virtual Environments

# Create virtual environment
python -m venv venv

# Activate (Windows)
venv\Scripts\activate

# Activate (macOS/Linux)
source venv/bin/activate

# Install dependencies
pip install pandas sqlalchemy psycopg2-binary boto3 pyarrow

# Freeze dependencies
pip freeze > requirements.txt

# Install from requirements
pip install -r requirements.txt

Type Hints (Production Best Practice)

from typing import Optional, List, Dict, Tuple
from datetime import datetime

# Type hints improve code clarity and enable static analysis
def process_orders(
    orders: List[Dict],
    start_date: datetime,
    end_date: Optional[datetime] = None,
    batch_size: int = 1000
) -> Tuple[int, List[str]]:
    """Process orders within date range.
    
    Returns:
        Tuple of (processed_count, error_messages)
    """
    processed = 0
    errors = []
    
    for order in orders:
        try:
            # Process logic here
            processed += 1
        except Exception as e:
            errors.append(f"Order {order['id']}: {str(e)}")
    
    return processed, errors

Pandas for Data Processing

Core Operations

import pandas as pd
import numpy as np

# Reading data
df = pd.read_csv('data.csv', dtype={'id': int, 'amount': float})
df = pd.read_parquet('data.parquet')
df = pd.read_sql("SELECT * FROM orders", connection)

# Basic operations
print(df.shape)           # (rows, columns)
print(df.dtypes)          # Column data types
print(df.head())          # First 5 rows
print(df.info())          # Column info and null counts
print(df.describe())      # Statistical summary

# Selection
df[['col1', 'col2']]                     # Select columns
df[df['amount'] > 100]                   # Filter rows
df.loc[0:10, ['col1', 'col2']]          # Label-based selection
df.iloc[0:10, 0:3]                       # Position-based selection

# Missing values
df.isnull().sum()                         # Count nulls per column
df.dropna(subset=['critical_col'])        # Drop rows with nulls
df['col'].fillna(df['col'].mean())        # Fill with mean
df['col'].fillna(method='ffill')          # Forward fill

Data Transformation Patterns

# Column transformations
df['total'] = df['quantity'] * df['unit_price']
df['category'] = df['category'].str.lower().str.strip()
df['date'] = pd.to_datetime(df['date'])
df['year_month'] = df['date'].dt.to_period('M')

# Apply custom functions
def categorize_amount(amount):
    if amount < 10:
        return 'low'
    elif amount < 100:
        return 'medium'
    else:
        return 'high'

df['amount_category'] = df['amount'].apply(categorize_amount)

# Vectorized operations (faster than apply)
df['amount_category'] = pd.cut(
    df['amount'], 
    bins=[0, 10, 100, float('inf')],
    labels=['low', 'medium', 'high']
)

# GroupBy operations
summary = df.groupby(['region', 'product_category']).agg(
    total_orders=('order_id', 'count'),
    total_revenue=('amount', 'sum'),
    avg_order_value=('amount', 'mean'),
    unique_customers=('customer_id', 'nunique')
).reset_index()

# Merge operations
result = pd.merge(
    orders_df, 
    customers_df, 
    on='customer_id', 
    how='left',
    validate='many_to_one'
)

# Reshape data
# Pivot: rows to columns
pivot_df = df.pivot_table(
    index='region',
    columns='quarter',
    values='revenue',
    aggfunc='sum',
    fill_value=0
)

# Melt: columns to rows
melted_df = pd.melt(
    df, 
    id_vars=['product'], 
    value_vars=['Q1', 'Q2', 'Q3', 'Q4'],
    var_name='quarter', 
    value_name='revenue'
)

Performance Optimization

# Use chunking for large files
def process_large_csv(filepath, chunk_size=100000):
    results = []
    for chunk in pd.read_csv(filepath, chunksize=chunk_size):
        # Process each chunk
        processed = chunk.groupby('category').agg({'amount': 'sum'})
        results.append(processed)
    return pd.concat(results)

# Optimize dtypes to reduce memory
def optimize_dtypes(df):
    for col in df.select_dtypes(include=['int']).columns:
        if df[col].max() < 128:
            df[col] = df[col].astype('int8')
        elif df[col].max() < 32768:
            df[col] = df[col].astype('int16')
        elif df[col].max() < 2147483648:
            df[col] = df[col].astype('int32')
    
    for col in df.select_dtypes(include=['object']).columns:
        num_unique = df[col].nunique()
        num_total = len(df[col])
        if num_unique / num_total < 0.5:  # Less than 50% unique
            df[col] = df[col].astype('category')
    
    return df

# Parallel processing with Dask
import dask.dataframe as dd

ddf = dd.read_parquet('s3://bucket/data/*.parquet')
result = ddf.groupby('category').agg({'amount': 'sum'}).compute()

Performance Comparison: Pandas Operations

OperationVectorized.apply()LoopSpeedup Factor
Column mathdf['a'] * df['b']df.apply(lambda r: r.a*r.b, axis=1)for r in df.itertuples()100x / 10x vs apply
String opsdf['col'].str.upper()df.apply(lambda r: r.col.upper(), axis=1)list comprehension50x
Conditionalpd.cut() / np.where().apply() with if/eliffor loop200x
Aggregationgroupby().agg()manual apply per groupnested loops50x

Rule of thumb: Always prefer vectorized operations over .apply() and never use Python loops for data processing at scale.

SQLAlchemy for Database Operations

Connection Management

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager
import pandas as pd

# Create engine with connection pooling
engine = create_engine(
    "postgresql://user:password@host:5432/dbname",
    pool_size=5,
    max_overflow=10,
    pool_pre_ping=True,  # Verify connections before use
    echo=False
)

# Context manager for connections
@contextmanager
def get_session():
    Session = sessionmaker(bind=engine)
    session = Session()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

# Read data with pandas
with engine.connect() as conn:
    df = pd.read_sql(
        text("SELECT * FROM orders WHERE date >= :start_date"),
        conn,
        params={"start_date": "2024-01-01"}
    )

# Write data to database
df.to_sql(
    'processed_orders',
    engine,
    if_exists='append',
    index=False,
    chunksize=1000,
    method='multi'
)

ORM Pattern

from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship

Base = declarative_base()

class Customer(Base):
    __tablename__ = 'customers'
    
    customer_id = Column(Integer, primary_key=True)
    first_name = Column(String(100), nullable=False)
    last_name = Column(String(100), nullable=False)
    email = Column(String(255), unique=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    
    # Relationship
    orders = relationship("Order", back_populates="customer")

class Order(Base):
    __tablename__ = 'orders'
    
    order_id = Column(Integer, primary_key=True)
    customer_id = Column(Integer, ForeignKey('customers.customer_id'))
    amount = Column(Integer, nullable=False)  # Stored as cents
    status = Column(String(50), default='pending')
    
    customer = relationship("Customer", back_populates="orders")

# Query with ORM
with get_session() as session:
    # Get customer with orders
    customer = session.query(Customer).filter_by(customer_id=1).first()
    print(f"Customer: {customer.first_name} {customer.last_name}")
    print(f"Orders: {len(customer.orders)}")
    
    # Complex query
    high_value_orders = session.query(Order) \
        .join(Customer) \
        .filter(Order.amount > 10000) \
        .filter(Customer.country == 'USA') \
        .all()

psycopg2 for PostgreSQL

import psycopg2
from psycopg2 import sql
from psycopg2.extras import execute_values
import csv

class PostgresHandler:
    def __init__(self, host, database, user, password, port=5432):
        self.conn_params = {
            'host': host,
            'database': database,
            'user': user,
            'password': password,
            'port': port
        }
    
    def get_connection(self):
        return psycopg2.connect(**self.conn_params)
    
    def execute_query(self, query, params=None):
        """Execute a single query."""
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(query, params)
                if cur.description:  # SELECT query
                    return cur.fetchall()
                conn.commit()
    
    def bulk_insert(self, table, columns, data, page_size=1000):
        """Efficient bulk insert using execute_values."""
        query = sql.SQL("INSERT INTO {} ({}) VALUES %s").format(
            sql.Identifier(table),
            sql.SQL(', ').join(map(sql.Identifier, columns))
        )
        
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                execute_values(cur, query, data, page_size=page_size)
                conn.commit()
    
    def copy_from_csv(self, table, filepath, delimiter=','):
        """Fast CSV import using COPY."""
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                with open(filepath, 'r') as f:
                    cur.copy_expert(
                        sql.SQL("COPY {} FROM STDIN WITH CSV HEADER DELIMITER AS {}").format(
                            sql.Identifier(table),
                            sql.Literal(delimiter)
                        ),
                        f
                    )
                conn.commit()
    
    def upsert(self, table, columns, data, conflict_columns):
        """Insert or update on conflict."""
        insert_query = sql.SQL("""
            INSERT INTO {table} ({columns})
            VALUES %s
            ON CONFLICT ({conflict_cols})
            DO UPDATE SET {update_clause}
        """).format(
            table=sql.Identifier(table),
            columns=sql.SQL(', ').join(map(sql.Identifier, columns)),
            conflict_cols=sql.SQL(', ').join(map(sql.Identifier, conflict_columns)),
            update_clause=sql.SQL(', ').join(
                sql.SQL("{} = EXCLUDED.{}").format(
                    sql.Identifier(col), sql.Identifier(col)
                ) for col in columns if col not in conflict_columns
            )
        )
        
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                execute_values(cur, insert_query, data, page_size=1000)
                conn.commit()

# Usage
db = PostgresHandler('localhost', 'mydb', 'user', 'password')

# Bulk insert
data = [
    (1, 'John', 'Doe', 'john@example.com'),
    (2, 'Jane', 'Smith', 'jane@example.com')
]
db.bulk_insert('users', ['id', 'first_name', 'last_name', 'email'], data)

# Upsert
db.upsert(
    'users',
    ['id', 'first_name', 'last_name', 'email'],
    data,
    conflict_columns=['email']
)

SQLAlchemy vs psycopg2: When to Use Which

CriteriaSQLAlchemypsycopg2
Connection poolingBuilt-in (configurable)Manual management
ORM supportFull ORM with relationshipsNone (raw SQL only)
Bulk insert speedGood (with method='multi')Excellent (COPY, execute_values)
Learning curveMediumLow
Use caseComplex models, mixed read/writeHigh-throughput ETL loads
ParameterizationNamed (:param)%s positional

boto3 for AWS Services

import boto3
from botocore.exceptions import ClientError
import json
from io import BytesIO

class S3Handler:
    def __init__(self, bucket_name):
        self.bucket = bucket_name
        self.s3 = boto3.client('s3')
    
    def upload_file(self, local_path, s3_key):
        """Upload a file to S3."""
        self.s3.upload_file(local_path, self.bucket, s3_key)
    
    def upload_dataframe(self, df, s3_key, format='parquet'):
        """Upload a pandas DataFrame to S3."""
        buffer = BytesIO()
        
        if format == 'parquet':
            df.to_parquet(buffer, index=False)
        elif format == 'csv':
            df.to_csv(buffer, index=False)
        
        buffer.seek(0)
        self.s3.put_object(
            Bucket=self.bucket,
            Key=s3_key,
            Body=buffer.getvalue()
        )
    
    def read_parquet(self, s3_key):
        """Read a Parquet file from S3."""
        response = self.s3.get_object(Bucket=self.bucket, Key=s3_key)
        return pd.read_parquet(BytesIO(response['Body'].read()))
    
    def list_files(self, prefix=''):
        """List files with given prefix."""
        response = self.s3.list_objects_v2(
            Bucket=self.bucket,
            Prefix=prefix
        )
        return [obj['Key'] for obj in response.get('Contents', [])]
    
    def delete_file(self, s3_key):
        """Delete a file from S3."""
        self.s3.delete_object(Bucket=self.bucket, Key=s3_key)

# Usage
s3 = S3Handler('my-data-lake')

# Upload processed data
df = pd.read_csv('processed.csv')
s3.upload_dataframe(df, 'gold/processed_data/2024-01-01.parquet')

# Read from S3
df = s3.read_parquet('silver/orders/2024-01-01.parquet')

CLI Tools with Click

import click
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@click.group()
@click.option('--verbose', is_flag=True, help='Enable verbose logging')
def cli(verbose):
    """Data Engineering CLI Tool."""
    if verbose:
        logging.getLogger().setLevel(logging.DEBUG)

@cli.command()
@click.option('--source', required=True, help='Source database connection string')
@click.option('--target', required=True, help='Target table name')
@click.option('--date', type=click.DateTime(), default=datetime.now(), help='Processing date')
@click.option('--batch-size', default=1000, type=int, help='Batch size for processing')
def extract(source, target, date, batch_size):
    """Extract data from source database."""
    logger.info(f"Extracting data for {date.date()}")
    
    # Extraction logic here
    db = PostgresHandler.from_url(source)
    
    query = f"""
        SELECT * FROM orders 
        WHERE order_date = %s
    """
    data = db.execute_query(query, (date.date(),))
    
    logger.info(f"Extracted {len(data)} records")

@cli.command()
@click.argument('input_path')
@click.argument('output_path')
@click.option('--compression', type=click.Choice(['gzip', 'snappy', 'none']), default='snappy')
def transform(input_path, output_path, compression):
    """Transform raw data to clean format."""
    logger.info(f"Transforming {input_path} -> {output_path}")
    
    df = pd.read_parquet(input_path)
    
    # Transformations
    df = df.drop_duplicates(subset=['order_id'])
    df = df[df['amount'] > 0]
    df['processed_at'] = datetime.now()
    
    df.to_parquet(output_path, compression=compression, index=False)
    
    logger.info(f"Transformed {len(df)} records")

@cli.command()
@click.option('--table', required=True, help='Target table name')
@click.option('--data-path', required=True, help='Path to data file')
def load(table, data_path):
    """Load data into target table."""
    logger.info(f"Loading {data_path} into {table}")
    
    df = pd.read_parquet(data_path)
    
    engine = create_engine("postgresql://user:pass@host/db")
    df.to_sql(table, engine, if_exists='append', index=False, chunksize=1000)
    
    logger.info(f"Loaded {len(df)} records")

if __name__ == '__main__':
    cli()

Scheduling with APScheduler

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

scheduler = BlockingScheduler()

@scheduler.scheduled_job(CronTrigger(hour=2, minute=0))  # Daily at 2 AM
def daily_etl():
    """Run daily ETL pipeline."""
    logger.info(f"Starting daily ETL at {datetime.now()}")
    
    try:
        # Extract
        extract_data()
        # Transform
        transform_data()
        # Load
        load_data()
        logger.info("Daily ETL completed successfully")
    except Exception as e:
        logger.error(f"ETL failed: {e}")
        send_alert("ETL Pipeline Failed", str(e))

@scheduler.scheduled_job(CronTrigger(hour='*/6'))  # Every 6 hours
def incremental_sync():
    """Run incremental sync."""
    logger.info("Starting incremental sync")
    sync_incremental_data()

@scheduler.scheduled_job(CronTrigger(day_of_week='sun', hour=3))  # Weekly
def weekly_maintenance():
    """Weekly maintenance tasks."""
    logger.info("Running weekly maintenance")
    vacuum_analyze_tables()
    clean_old_files()

if __name__ == '__main__':
    scheduler.start()

Best Practices for Production Python

PracticeWhy It MattersTool / Pattern
Type hintsCatches bugs early, enables IDE supportmypy, pyright
Structured loggingMachine-parseable logs for alertingstructlog, python-json-logger
Virtual environmentsPrevents dependency conflictsvenv, poetry
Parameterized SQLPrevents SQL injectionSQLAlchemy text(), psycopg2 %s
Connection poolingAvoids exhausting DB connectionsSQLAlchemy pool_size
Chunked I/OPrevents OOM on large datasetspd.read_csv(chunksize=...)
Retries with backoffHandles transient network failurestenacity library

MathSummary Takeaways

  1. Virtual environments are mandatory — always isolate project dependencies with venv or poetry.
  2. Type hints improve maintainability — use them in all production code and enforce with mypy.
  3. Pandas is essential — master DataFrame operations, groupby, merge, and prefer vectorized operations over .apply().
  4. SQLAlchemy provides connection pooling — use it for production database access and complex ORM models.
  5. psycopg2 is faster for bulk operations — use COPY and execute_values for ETL loads.
  6. boto3 is the AWS SDK — learn S3, EC2, and Lambda basics for cloud data pipelines.
  7. Click makes great CLI tools — build reusable data engineering scripts with typed options.
  8. Schedule with APScheduler or Airflow — automate pipeline execution and handle failures gracefully.

See Also

Practice Exercises

  1. Pandas challenge: Write a function that reads a 10GB CSV file in chunks, performs aggregations, and writes the result to Parquet.

  2. Database handler: Create a PostgreSQL class that supports bulk insert, upsert, and data validation.

  3. CLI tool: Build a CLI tool that accepts a database connection string and generates a data profile report (column types, nulls, distributions).

  4. S3 data lake: Create a utility class that organizes S3 files into Bronze/Silver/Gold layers with proper partitioning.

  5. End-to-end pipeline: Build a complete ETL script that extracts from a database, transforms with pandas, validates with Pydantic, and loads to S3.

Premium Content

Python for Data Engineers — Essential Libraries and Patterns

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
💼Interview Prep
📜Certificates
🤝Community Access

Already a member? Log in

Need Expert Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement