Prefect: Python-Native Workflow Orchestration
Prefect is a modern, Python-native workflow orchestration framework designed for data engineers who want to build, run, and monitor pipelines using pure Python.
Why Prefect Over Airflow?
Key Differences:
- No DAG boilerplate β
@flowand@taskdecorators instead of DAG objects and operators - Dynamic workflows β easily create tasks at runtime based on data
- Local execution β test pipelines locally without an Airflow cluster
- Native Python β no Jinja templates or Airflow-specific APIs
- Modern UI β real-time pipeline monitoring with rich visualizations
- Hybrid execution β orchestration in cloud, execution on your infrastructure
Prefect vs Airflow Selection Guide
| Scenario | Recommended | Rationale |
|---|---|---|
| Python-heavy team | Prefect | Native Python, minimal boilerplate |
| Complex scheduling | Airflow | Mature scheduling with catchup/backfill |
| Dynamic workflows | Prefect | Native .map() and task generation |
| Large enterprise | Airflow | 100+ operators, established ecosystem |
| Rapid prototyping | Prefect | Local execution, fast iteration |
| Compliance requirements | Airflow | Audit logs, access controls |
| Cloud-native | Prefect Cloud | Managed orchestration |
| Self-hosted control | Airflow | Full infrastructure control |
Prefect Architecture
Architecture Diagram
A flow is the highest-level unit of work in Prefect. A flow is a Python function decorated with @flow that orchestrates one or more tasks. Flows handle: (1) state management (running, completed, failed), (2) retry logic (automatic retries on failure), (3) caching (skip re-execution of completed tasks), (4) concurrency limits (control parallel execution), (5) scheduling (cron, interval, RRule), (6) observability (metrics, logs, traces). Flows can be nested: a flow can call other flows as tasks.
A task is a discrete unit of work within a flow. A task is a Python function decorated with @task. Tasks handle: (1) Caching β skip execution if inputs haven't changed, (2) Retries β automatic retries with configurable delay, (3) Concurrency β limit concurrent task executions, (4) State β track execution state and results, (5) Artifacts β store and display results in the UI. Tasks are the building blocks of flows.
A task runner determines how tasks are executed within a flow. Prefect provides three runners: (1) SequentialTaskRunner β tasks execute one after another (default), (2) ConcurrentTaskRunner β tasks execute concurrently using Python asyncio, (3) DaskTaskRunner β tasks execute on a Dask cluster for distributed computing. The choice depends on workload characteristics: I/O-bound tasks benefit from concurrency; CPU-bound tasks benefit from Dask parallelism.
Caching in Prefect allows task results to be reused across flow runs without re-execution. A cached task skips execution if: (1) the cache key matches (based on function name, inputs, or custom key), (2) the cache hasn't expired (based on TTL). Caching is essential for expensive computations (ML training, large joins) and for idempotent re-runs.
Caching Effectiveness
Caching effectiveness = (Skipped_executions / Total_executions) * 100%. For a flow with 10 tasks, where 3 are expensive and cached: If the flow runs 100 times/day and the cache hit rate is 80%: Saved_executions = 3 * 100 * 0.8 = 240 task executions/day. Cost savings = Saved_executions * cost_per_execution.
Concurrency Limit Impact
For a flow with N tasks and concurrency limit C: Parallelism = min(N, C). Throughput = C * task_throughput (for independent tasks). For C=10 and task_throughput=100 records/second: Flow_throughput = 1000 records/second. Without concurrency limit: Flow_throughput = N * 100 (potentially overwhelming resources).
Prefect guarantees that a flow run with the same parameters produces the same result if tasks are idempotent. The idempotency guarantee holds when: (1) tasks use deterministic caching keys, (2) tasks perform idempotent operations (UPSERT, MERGE), (3) no external state changes between runs. Formal: For flow F with parameters P, runs R1 and R2: Result(F, P, R1) = Result(F, P, R2) if tasks are idempotent.
Dynamic task generation allows flows to create tasks at runtime based on data. The number of tasks is determined by the input data, not the code structure. Formally: For input data D with |D| elements, a flow using .map() creates |D| task instances. This enables data-parallel execution patterns without pre-defining task counts.
Key Concepts
| Concept | Description | API |
|---|---|---|
| Flow | Top-level orchestration unit | @flow(name="my-flow") |
| Task | Discrete unit of work | @task(name="my-task") |
| Task Runner | Execution strategy for tasks | SequentialTaskRunner, ConcurrentTaskRunner, DaskTaskRunner |
| Cache | Reuse task results across runs | @task(cache_key_fn=..., cache_expiration=...) |
| Retry | Automatic retry on task failure | @task(retries=3, retry_delay_seconds=10) |
| Concurrency Limit | Control parallel task execution | concurrency_limit={"task-name": 10} |
| State | Execution state of flow/task | Running, Completed, Failed, Crashed |
| Parameter | Input value for flow runs | @flow with def my_flow(param: int) |
| Artifact | Stored result displayed in UI | prefect.artifacts.create_markdown(...) |
| Deployment | Scheduled or triggered flow run | Deployment.build_from_flow(...) |
| Work Pool | Pool of workers for execution | prefect work-pool create my-pool |
| Flow Run | Single execution of a flow | Triggered by schedule, API, or UI |
| Subflow | Flow called by another flow | @flow called within a @flow |
| Task Map | Dynamic task generation from data | task.map(data) |
| Result | Stored task/flow output | ResultPersistor |
| Notification | Alert on flow/task state changes | Notification blocks |
| Block | Reusable infrastructure config | Secret, GCSBucket, SQLAlchemyConnector |
| Flow Vis | Visual representation in UI | Real-time flow run visualization |
- Define tasks: Decompose pipeline into discrete tasks. Each task should be a single responsibility (extract, transform, validate, load).
- Add error handling: Configure retries with exponential backoff. Set retry delays based on error type (transient vs permanent).
- Implement caching: Cache expensive tasks (ML training, large joins) with deterministic cache keys based on inputs.
- Set concurrency limits: Limit concurrent task executions to prevent resource exhaustion. Use different limits for I/O-bound and CPU-bound tasks.
- Configure scheduling: Set up schedules using cron expressions, intervals, or RRules. Configure timezone and catchup settings.
- Add monitoring: Use Prefect UI for real-time monitoring. Set up notifications for flow failures.
- Deploy to production: Use Prefect Cloud or self-hosted server. Configure work pools and workers for execution.
- Test locally: Run flows locally with
flow()call before deploying. Usepytestfor unit testing tasks. - Implement dynamic workflows: Use
.map()for data-parallel execution. Use conditional logic for branching. - Document and iterate: Add docstrings, type hints, and comments. Monitor performance and optimize bottlenecks.
Production Code
Complete ETL Pipeline with Prefect
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.cache_policies import NO_CACHE, INPUTS
from prefect.deployments import Deployment
from datetime import timedelta, datetime
from typing import List, Dict, Optional
import pandas as pd
import hashlib
import time
import logging
logger = logging.getLogger(__name__)
# ------------------------------------------------------
# TASKS
# ------------------------------------------------------
@task(
name="extract-orders", # Task name for UI display
retries=3, # Retry up to 3 times on failure
retry_delay_seconds=10, # Wait 10 seconds between retries
cache_key_fn=task_input_hash, # Cache based on function inputs
cache_expiration=timedelta(hours=1), # Cache expires after 1 hour
log_prints=True, # Capture print() statements in logs
)
def extract_orders(execution_date: str) -> pd.DataFrame:
"""
Extract orders from source database for the given date.
Parameters:
execution_date (str): Date string in 'YYYY-MM-DD' format
Returns:
pd.DataFrame: Order records for the specified date
Caching behavior:
If same execution_date is passed within 1 hour, cached result is returned
without hitting the database.
"""
import sqlalchemy
engine = sqlalchemy.create_engine("postgresql://user:pass@host/db")
query = f"""
SELECT order_id, customer_id, amount, status, created_at
FROM orders
WHERE DATE(created_at) = '{execution_date}'
"""
df = pd.read_sql(query, engine)
logger.info(f"Extracted {len(df)} orders for {execution_date}")
return df
@task(
name="validate-raw-data",
retries=2,
retry_delay_seconds=5,
)
def validate_raw_data(df: pd.DataFrame) -> pd.DataFrame:
"""Validate raw data quality before transformation."""
initial_count = len(df)
# Remove nulls in critical columns
df = df.dropna(subset=["order_id", "customer_id", "amount"])
# Remove negative amounts
df = df[df["amount"] > 0]
# Remove duplicates
df = df.drop_duplicates(subset=["order_id"])
validation_stats = {
"initial_count": initial_count,
"final_count": len(df),
"removed_count": initial_count - len(df),
"removal_rate": (initial_count - len(df)) / initial_count if initial_count > 0 else 0,
}
if validation_stats["removal_rate"] > 0.1:
raise ValueError(
f"Data quality issue: {validation_stats['removal_rate']:.1%} "
f"of records removed during validation"
)
logger.info(f"Validation passed: {validation_stats}")
return df
@task(
name="transform-orders",
retries=2,
retry_delay_seconds=30,
)
def transform_orders(df: pd.DataFrame) -> pd.DataFrame:
"""Transform orders with enrichment and aggregation."""
# Enrich with customer data
import sqlalchemy
engine = sqlalchemy.create_engine("postgresql://user:pass@host/db")
customer_df = pd.read_sql("SELECT customer_id, tier, segment FROM customers", engine)
enriched = df.merge(customer_df, on="customer_id", how="left")
enriched["tier"] = enriched["tier"].fillna("unknown")
# Calculate derived fields
enriched["order_date"] = pd.to_datetime(enriched["created_at"]).dt.date
enriched["amount_bucket"] = pd.cut(
enriched["amount"],
bins=[0, 10, 100, 1000, float("inf")],
labels=["micro", "small", "medium", "large"],
)
logger.info(f"Transformed {len(enriched)} orders")
return enriched
@task(
name="load-to-warehouse",
retries=3, # More retries for load phase (critical)
retry_delay_seconds=60, # Longer delay for database recovery
)
def load_to_warehouse(df: pd.DataFrame, target_table: str) -> Dict:
"""
Load transformed data to the warehouse using idempotent upsert.
Parameters:
df (pd.DataFrame): Transformed data to load
target_table (str): Target table name in the warehouse
Returns:
Dict: Load statistics with record count and target table
Idempotency:
Uses ON CONFLICT DO UPDATE (UPSERT) to ensure repeated runs
produce the same result without duplicates.
"""
import sqlalchemy
engine = sqlalchemy.create_engine("postgresql://user:pass@host/db")
# Use temporary table for staging the upsert
temp_table = f"{target_table}_temp"
df.to_sql(temp_table, engine, if_exists="replace", index=False)
# Upsert via MERGE (PostgreSQL: INSERT ... ON CONFLICT)
merge_query = f"""
INSERT INTO {target_table}
SELECT * FROM {temp_table}
ON CONFLICT (order_id)
DO UPDATE SET
amount = EXCLUDED.amount,
status = EXCLUDED.status,
tier = EXCLUDED.tier,
segment = EXCLUDED.segment
"""
engine.execute(merge_query)
engine.execute(f"DROP TABLE {temp_table}")
result = {"loaded_count": len(df), "target_table": target_table}
logger.info(f"Loaded {len(df)} records to {target_table}")
return result
@task(name="send-notification")
def send_notification(message: str, severity: str = "info"):
"""Send notification about pipeline completion."""
logger.info(f"Notification [{severity}]: {message}")
# In production, integrate with Slack, PagerDuty, etc.
# ------------------------------------------------------
# FLOW
# ------------------------------------------------------
@flow(
name="daily-order-etl",
description="Daily order extraction, transformation, and warehouse load",
log_prints=True,
retry_delay_seconds=300,
retries=1,
)
def daily_order_etl(execution_date: str = None):
"""Main ETL flow orchestrating extraction, transformation, and loading."""
if execution_date is None:
execution_date = datetime.now().strftime("%Y-%m-%d")
logger.info(f"Starting daily order ETL for {execution_date}")
# Step 1: Extract
raw_df = extract_orders(execution_date)
# Step 2: Validate
validated_df = validate_raw_data(raw_df)
# Step 3: Transform
transformed_df = transform_orders(validated_df)
# Step 4: Load
load_result = load_to_warehouse(transformed_df, "orders_fact")
# Step 5: Notify
send_notification(
message=f"ETL completed for {execution_date}: {load_result['loaded_count']} records loaded",
severity="info",
)
return load_result
# ------------------------------------------------------
# DEPLOYMENT
# ------------------------------------------------------
if __name__ == "__main__":
# Run locally for testing
result = daily_order_etl(execution_date="2024-01-15")
print(result)
# Deploy to Prefect Cloud
deployment = Deployment.build_from_flow(
daily_order_etl,
name="daily-schedule",
schedule={
"cron": "0 6 * * *",
"timezone": "UTC",
},
work_pool_name="default-agent-pool",
parameters={"execution_date": "{{ data_interval_end }}"},
)
deployment.apply()
Dynamic Workflow with Task Mapping
from prefect import flow, task, get_run_logger
from typing import List
import pandas as pd
@task(name="discover-tables")
def discover_tables(connection_string: str) -> List[str]:
"""Dynamically discover tables to process."""
import sqlalchemy
engine = sqlalchemy.create_engine(connection_string)
with engine.connect() as conn:
result = conn.execute(
"SELECT table_name FROM information_schema.tables "
"WHERE table_schema = 'public' AND table_name LIKE 'events_%'"
)
tables = [row[0] for row in result]
logger = get_run_logger()
logger.info(f"Discovered {len(tables)} tables: {tables}")
return tables
@task(name="extract-table", retries=2, retry_delay_seconds=10)
def extract_table(connection_string: str, table_name: str) -> str:
"""Extract a single table to a parquet file."""
import sqlalchemy
engine = sqlalchemy.create_engine(connection_string)
df = pd.read_sql(f"SELECT * FROM {table_name}", engine)
output_path = f"/tmp/raw/{table_name}.parquet"
df.to_parquet(output_path, index=False)
logger = get_run_logger()
logger.info(f"Extracted {len(df)} rows from {table_name}")
return output_path
@task(name="validate-table")
def validate_table(table_name: str, file_path: str) -> dict:
"""Validate extracted data quality."""
df = pd.read_parquet(file_path)
null_rate = df.isnull().mean().mean()
duplicate_rate = 1 - (len(df.drop_duplicates()) / len(df))
result = {
"table": table_name,
"row_count": len(df),
"null_rate": float(null_rate),
"duplicate_rate": float(duplicate_rate),
"is_valid": null_rate < 0.1 and duplicate_rate < 0.05,
}
logger = get_run_logger()
if not result["is_valid"]:
logger.warning(f"Validation FAILED for {table_name}: {result}")
else:
logger.info(f"Validation passed for {table_name}")
return result
@task(name="aggregate-results")
def aggregate_results(validations: List[dict]) -> dict:
"""Aggregate validation results across all tables."""
total = len(validations)
valid = sum(1 for v in validations if v["is_valid"])
return {
"total_tables": total,
"valid_tables": valid,
"validity_rate": valid / total if total > 0 else 0,
"details": validations,
}
@flow(name="dynamic-table-pipeline")
def dynamic_table_pipeline(connection_string: str):
"""Dynamic pipeline that processes discovered tables in parallel."""
# Discover tables dynamically
tables = discover_tables(connection_string)
# Map extraction across tables (parallel execution)
raw_files = extract_table.map(
connection_string=connection_string,
table_name=tables,
)
# Map validation across tables
validations = validate_table.map(
table_name=tables,
file_path=raw_files,
)
# Aggregate results
summary = aggregate_results(validations)
logger = get_run_logger()
logger.info(f"Pipeline summary: {summary}")
if summary["validity_rate"] < 0.9:
raise ValueError(
f"Data quality below threshold: {summary['validity_rate']:.1%} "
f"of tables passed validation"
)
return summary
# Run the dynamic pipeline
if __name__ == "__main__":
result = dynamic_table_pipeline("postgresql://user:pass@host/db")
print(result)
Prefect vs Airflow Decision Matrix: Choose Prefect when: (1) your team is Python-focused and wants minimal boilerplate, (2) you need dynamic task generation at runtime, (3) you prefer local execution for testing, (4) you want a modern UI with real-time monitoring. Choose Airflow when: (1) you need a mature ecosystem with 100+ operators, (2) your team already has Airflow expertise, (3) you need complex scheduling with catchup and backfill.
Caching Strategy: Use INPUTS cache policy to cache based on function inputs. Use custom cache keys for domain-specific caching (e.g., cache by data version, not just inputs). Set cache_expiration to prevent stale data from being served. For ML pipelines, cache model training tasks by dataset hash to avoid re-training when data hasn't changed.
- Prefect uses Python decorators (
@flow,@task) instead of DAG definitions, reducing boilerplate. - Tasks support retries, caching, concurrency limits, and state management out of the box.
- Task runners determine execution strategy: Sequential, Concurrent (asyncio), or Dask (distributed).
- Caching skips task execution when inputs haven't changed, saving compute for expensive operations.
- Dynamic task generation (
.map()) creates tasks at runtime based on data, enabling data-parallel patterns. - Prefect handles scheduling, retries, caching, and observability with minimal configuration.
- Deploy to Prefect Cloud or self-hosted server with work pools and workers.
- Test locally with
flow()call before deploying to production.
Best Practices
- Decompose into small tasks β each task should have a single responsibility. This enables granular caching, retries, and concurrency control.
- Configure retries with backoff β use
retries=3andretry_delay_seconds=30for transient errors. Increase delay for rate-limited APIs. - Cache expensive tasks β use
cache_key_fn=task_input_hashfor deterministic caching. Setcache_expirationto prevent stale results. - Set concurrency limits β limit concurrent task executions to prevent resource exhaustion. Use different limits for I/O-bound and CPU-bound tasks.
- Use
.map()for parallelism β map tasks across data partitions for data-parallel execution without pre-defining task counts. - Test locally before deploying β run flows locally with
flow()call. Usepytestfor unit testing individual tasks. - Add logging and artifacts β use
get_run_logger()for structured logging. Create artifacts for important results. - Configure notifications β set up alerts for flow failures in Prefect UI. Integrate with Slack/PagerDuty for critical failures.
- Use subflows for modularity β break complex flows into subflows for readability and reusability.
- Monitor cache hit rates β track how often tasks are cached vs executed. Optimize cache keys for maximum hit rates.
Prefect vs Airflow Comparison
| Feature | Prefect | Apache Airflow |
|---|---|---|
| DAG Definition | Python decorators | DAG objects + operators |
| Task Definition | @task decorator | Operator classes |
| Dynamic Workflows | Native .map() | Dynamic DAG generation |
| Local Testing | Yes (native) | Limited (requires CLI) |
| UI | Modern, real-time | Mature, feature-rich |
| Ecosystem | Growing | 100+ providers |
| Scheduling | Cloud or self-hosted | Built-in scheduler |
| Caching | Native, flexible | Limited (XCom-based) |
| Concurrency | Native limits | Pool-based |
| Best For | Python-native teams | Enterprise, complex scheduling |
Task Runner Selection
| Task Type | SequentialTaskRunner | ConcurrentTaskRunner | DaskTaskRunner |
|---|---|---|---|
| I/O-Bound | Slow | Fast | Fast |
| CPU-Bound | Slow | Slow | Fast |
| Mixed | Slow | Medium | Fast |
| Memory-Intensive | Safe | Safe | Risk of OOM |
| Dependencies | N/A | asyncio | Dask cluster |
| Overhead | None | Low | High |
See Also
- 017 - Apache Airflow: DAGs, Operators, and Scheduling - Alternative orchestration platform
- 018 - Advanced Airflow: XComs, Dynamic DAGs, and the TaskFlow API - Advanced Airflow patterns
- 028 - Error Handling, Retries, and Dead Letter Queues - Error handling in flows
- 026 - Data Pipeline Testing - Testing Prefect tasks and flows
- 016 - ETL vs ELT - Orchestrating transformation paradigms