πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Prefect: Modern Workflow Orchestration

Data Pipelines & OrchestrationPipeline Engineering🟒 Free Lesson

Advertisement

Prefect: Python-Native Workflow Orchestration

Prefect is a modern, Python-native workflow orchestration framework designed for data engineers who want to build, run, and monitor pipelines using pure Python.

Why Prefect Over Airflow?


Key Differences:

  • No DAG boilerplate β€” @flow and @task decorators instead of DAG objects and operators
  • Dynamic workflows β€” easily create tasks at runtime based on data
  • Local execution β€” test pipelines locally without an Airflow cluster
  • Native Python β€” no Jinja templates or Airflow-specific APIs
  • Modern UI β€” real-time pipeline monitoring with rich visualizations
  • Hybrid execution β€” orchestration in cloud, execution on your infrastructure

Prefect vs Airflow Selection Guide

ScenarioRecommendedRationale
Python-heavy teamPrefectNative Python, minimal boilerplate
Complex schedulingAirflowMature scheduling with catchup/backfill
Dynamic workflowsPrefectNative .map() and task generation
Large enterpriseAirflow100+ operators, established ecosystem
Rapid prototypingPrefectLocal execution, fast iteration
Compliance requirementsAirflowAudit logs, access controls
Cloud-nativePrefect CloudManaged orchestration
Self-hosted controlAirflowFull infrastructure control

Prefect Architecture

Prefect Orchestration ArchitectureCode (Python)@flow decoratorsTask definitionsInfrastructurePrefect CloudOrchestration engineState managementScheduling + eventsWorkersExecute flow runsWork pools (K8s, etc.)Auto-scalingInfraDocker / K8sCloud RunECS / LambdaFlow Execution FlowTriggerScheduleWorkerExecuteComplete

Architecture Diagram

A flow is the highest-level unit of work in Prefect. A flow is a Python function decorated with @flow that orchestrates one or more tasks. Flows handle: (1) state management (running, completed, failed), (2) retry logic (automatic retries on failure), (3) caching (skip re-execution of completed tasks), (4) concurrency limits (control parallel execution), (5) scheduling (cron, interval, RRule), (6) observability (metrics, logs, traces). Flows can be nested: a flow can call other flows as tasks.

A task is a discrete unit of work within a flow. A task is a Python function decorated with @task. Tasks handle: (1) Caching β€” skip execution if inputs haven't changed, (2) Retries β€” automatic retries with configurable delay, (3) Concurrency β€” limit concurrent task executions, (4) State β€” track execution state and results, (5) Artifacts β€” store and display results in the UI. Tasks are the building blocks of flows.

A task runner determines how tasks are executed within a flow. Prefect provides three runners: (1) SequentialTaskRunner β€” tasks execute one after another (default), (2) ConcurrentTaskRunner β€” tasks execute concurrently using Python asyncio, (3) DaskTaskRunner β€” tasks execute on a Dask cluster for distributed computing. The choice depends on workload characteristics: I/O-bound tasks benefit from concurrency; CPU-bound tasks benefit from Dask parallelism.

Caching in Prefect allows task results to be reused across flow runs without re-execution. A cached task skips execution if: (1) the cache key matches (based on function name, inputs, or custom key), (2) the cache hasn't expired (based on TTL). Caching is essential for expensive computations (ML training, large joins) and for idempotent re-runs.

Caching Effectiveness

Caching effectiveness = (Skipped_executions / Total_executions) * 100%. For a flow with 10 tasks, where 3 are expensive and cached: If the flow runs 100 times/day and the cache hit rate is 80%: Saved_executions = 3 * 100 * 0.8 = 240 task executions/day. Cost savings = Saved_executions * cost_per_execution.

Concurrency Limit Impact

For a flow with N tasks and concurrency limit C: Parallelism = min(N, C). Throughput = C * task_throughput (for independent tasks). For C=10 and task_throughput=100 records/second: Flow_throughput = 1000 records/second. Without concurrency limit: Flow_throughput = N * 100 (potentially overwhelming resources).

Prefect guarantees that a flow run with the same parameters produces the same result if tasks are idempotent. The idempotency guarantee holds when: (1) tasks use deterministic caching keys, (2) tasks perform idempotent operations (UPSERT, MERGE), (3) no external state changes between runs. Formal: For flow F with parameters P, runs R1 and R2: Result(F, P, R1) = Result(F, P, R2) if tasks are idempotent.

Dynamic task generation allows flows to create tasks at runtime based on data. The number of tasks is determined by the input data, not the code structure. Formally: For input data D with |D| elements, a flow using .map() creates |D| task instances. This enables data-parallel execution patterns without pre-defining task counts.

Key Concepts

ConceptDescriptionAPI
FlowTop-level orchestration unit@flow(name="my-flow")
TaskDiscrete unit of work@task(name="my-task")
Task RunnerExecution strategy for tasksSequentialTaskRunner, ConcurrentTaskRunner, DaskTaskRunner
CacheReuse task results across runs@task(cache_key_fn=..., cache_expiration=...)
RetryAutomatic retry on task failure@task(retries=3, retry_delay_seconds=10)
Concurrency LimitControl parallel task executionconcurrency_limit={"task-name": 10}
StateExecution state of flow/taskRunning, Completed, Failed, Crashed
ParameterInput value for flow runs@flow with def my_flow(param: int)
ArtifactStored result displayed in UIprefect.artifacts.create_markdown(...)
DeploymentScheduled or triggered flow runDeployment.build_from_flow(...)
Work PoolPool of workers for executionprefect work-pool create my-pool
Flow RunSingle execution of a flowTriggered by schedule, API, or UI
SubflowFlow called by another flow@flow called within a @flow
Task MapDynamic task generation from datatask.map(data)
ResultStored task/flow outputResultPersistor
NotificationAlert on flow/task state changesNotification blocks
BlockReusable infrastructure configSecret, GCSBucket, SQLAlchemyConnector
Flow VisVisual representation in UIReal-time flow run visualization
  1. Define tasks: Decompose pipeline into discrete tasks. Each task should be a single responsibility (extract, transform, validate, load).
  2. Add error handling: Configure retries with exponential backoff. Set retry delays based on error type (transient vs permanent).
  3. Implement caching: Cache expensive tasks (ML training, large joins) with deterministic cache keys based on inputs.
  4. Set concurrency limits: Limit concurrent task executions to prevent resource exhaustion. Use different limits for I/O-bound and CPU-bound tasks.
  5. Configure scheduling: Set up schedules using cron expressions, intervals, or RRules. Configure timezone and catchup settings.
  6. Add monitoring: Use Prefect UI for real-time monitoring. Set up notifications for flow failures.
  7. Deploy to production: Use Prefect Cloud or self-hosted server. Configure work pools and workers for execution.
  8. Test locally: Run flows locally with flow() call before deploying. Use pytest for unit testing tasks.
  9. Implement dynamic workflows: Use .map() for data-parallel execution. Use conditional logic for branching.
  10. Document and iterate: Add docstrings, type hints, and comments. Monitor performance and optimize bottlenecks.

Production Code

Complete ETL Pipeline with Prefect

from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.cache_policies import NO_CACHE, INPUTS
from prefect.deployments import Deployment
from datetime import timedelta, datetime
from typing import List, Dict, Optional
import pandas as pd
import hashlib
import time
import logging

logger = logging.getLogger(__name__)


# ------------------------------------------------------
# TASKS
# ------------------------------------------------------
@task(
    name="extract-orders",                    # Task name for UI display
    retries=3,                                # Retry up to 3 times on failure
    retry_delay_seconds=10,                   # Wait 10 seconds between retries
    cache_key_fn=task_input_hash,             # Cache based on function inputs
    cache_expiration=timedelta(hours=1),      # Cache expires after 1 hour
    log_prints=True,                          # Capture print() statements in logs
)
def extract_orders(execution_date: str) -> pd.DataFrame:
    """
    Extract orders from source database for the given date.
    
    Parameters:
        execution_date (str): Date string in 'YYYY-MM-DD' format
        
    Returns:
        pd.DataFrame: Order records for the specified date
        
    Caching behavior:
        If same execution_date is passed within 1 hour, cached result is returned
        without hitting the database.
    """
    import sqlalchemy

    engine = sqlalchemy.create_engine("postgresql://user:pass@host/db")
    query = f"""
        SELECT order_id, customer_id, amount, status, created_at
        FROM orders
        WHERE DATE(created_at) = '{execution_date}'
    """
    df = pd.read_sql(query, engine)
    logger.info(f"Extracted {len(df)} orders for {execution_date}")
    return df


@task(
    name="validate-raw-data",
    retries=2,
    retry_delay_seconds=5,
)
def validate_raw_data(df: pd.DataFrame) -> pd.DataFrame:
    """Validate raw data quality before transformation."""
    initial_count = len(df)

    # Remove nulls in critical columns
    df = df.dropna(subset=["order_id", "customer_id", "amount"])

    # Remove negative amounts
    df = df[df["amount"] > 0]

    # Remove duplicates
    df = df.drop_duplicates(subset=["order_id"])

    validation_stats = {
        "initial_count": initial_count,
        "final_count": len(df),
        "removed_count": initial_count - len(df),
        "removal_rate": (initial_count - len(df)) / initial_count if initial_count > 0 else 0,
    }

    if validation_stats["removal_rate"] > 0.1:
        raise ValueError(
            f"Data quality issue: {validation_stats['removal_rate']:.1%} "
            f"of records removed during validation"
        )

    logger.info(f"Validation passed: {validation_stats}")
    return df


@task(
    name="transform-orders",
    retries=2,
    retry_delay_seconds=30,
)
def transform_orders(df: pd.DataFrame) -> pd.DataFrame:
    """Transform orders with enrichment and aggregation."""
    # Enrich with customer data
    import sqlalchemy
    engine = sqlalchemy.create_engine("postgresql://user:pass@host/db")
    customer_df = pd.read_sql("SELECT customer_id, tier, segment FROM customers", engine)

    enriched = df.merge(customer_df, on="customer_id", how="left")
    enriched["tier"] = enriched["tier"].fillna("unknown")

    # Calculate derived fields
    enriched["order_date"] = pd.to_datetime(enriched["created_at"]).dt.date
    enriched["amount_bucket"] = pd.cut(
        enriched["amount"],
        bins=[0, 10, 100, 1000, float("inf")],
        labels=["micro", "small", "medium", "large"],
    )

    logger.info(f"Transformed {len(enriched)} orders")
    return enriched


@task(
    name="load-to-warehouse",
    retries=3,                    # More retries for load phase (critical)
    retry_delay_seconds=60,       # Longer delay for database recovery
)
def load_to_warehouse(df: pd.DataFrame, target_table: str) -> Dict:
    """
    Load transformed data to the warehouse using idempotent upsert.
    
    Parameters:
        df (pd.DataFrame): Transformed data to load
        target_table (str): Target table name in the warehouse
        
    Returns:
        Dict: Load statistics with record count and target table
        
    Idempotency:
        Uses ON CONFLICT DO UPDATE (UPSERT) to ensure repeated runs
        produce the same result without duplicates.
    """
    import sqlalchemy

    engine = sqlalchemy.create_engine("postgresql://user:pass@host/db")

    # Use temporary table for staging the upsert
    temp_table = f"{target_table}_temp"
    df.to_sql(temp_table, engine, if_exists="replace", index=False)

    # Upsert via MERGE (PostgreSQL: INSERT ... ON CONFLICT)
    merge_query = f"""
        INSERT INTO {target_table}
        SELECT * FROM {temp_table}
        ON CONFLICT (order_id)
        DO UPDATE SET
            amount = EXCLUDED.amount,
            status = EXCLUDED.status,
            tier = EXCLUDED.tier,
            segment = EXCLUDED.segment
    """
    engine.execute(merge_query)
    engine.execute(f"DROP TABLE {temp_table}")

    result = {"loaded_count": len(df), "target_table": target_table}
    logger.info(f"Loaded {len(df)} records to {target_table}")
    return result


@task(name="send-notification")
def send_notification(message: str, severity: str = "info"):
    """Send notification about pipeline completion."""
    logger.info(f"Notification [{severity}]: {message}")
    # In production, integrate with Slack, PagerDuty, etc.


# ------------------------------------------------------
# FLOW
# ------------------------------------------------------
@flow(
    name="daily-order-etl",
    description="Daily order extraction, transformation, and warehouse load",
    log_prints=True,
    retry_delay_seconds=300,
    retries=1,
)
def daily_order_etl(execution_date: str = None):
    """Main ETL flow orchestrating extraction, transformation, and loading."""
    if execution_date is None:
        execution_date = datetime.now().strftime("%Y-%m-%d")

    logger.info(f"Starting daily order ETL for {execution_date}")

    # Step 1: Extract
    raw_df = extract_orders(execution_date)

    # Step 2: Validate
    validated_df = validate_raw_data(raw_df)

    # Step 3: Transform
    transformed_df = transform_orders(validated_df)

    # Step 4: Load
    load_result = load_to_warehouse(transformed_df, "orders_fact")

    # Step 5: Notify
    send_notification(
        message=f"ETL completed for {execution_date}: {load_result['loaded_count']} records loaded",
        severity="info",
    )

    return load_result


# ------------------------------------------------------
# DEPLOYMENT
# ------------------------------------------------------
if __name__ == "__main__":
    # Run locally for testing
    result = daily_order_etl(execution_date="2024-01-15")
    print(result)

    # Deploy to Prefect Cloud
    deployment = Deployment.build_from_flow(
        daily_order_etl,
        name="daily-schedule",
        schedule={
            "cron": "0 6 * * *",
            "timezone": "UTC",
        },
        work_pool_name="default-agent-pool",
        parameters={"execution_date": "{{ data_interval_end }}"},
    )
    deployment.apply()

Dynamic Workflow with Task Mapping

from prefect import flow, task, get_run_logger
from typing import List
import pandas as pd


@task(name="discover-tables")
def discover_tables(connection_string: str) -> List[str]:
    """Dynamically discover tables to process."""
    import sqlalchemy
    engine = sqlalchemy.create_engine(connection_string)
    with engine.connect() as conn:
        result = conn.execute(
            "SELECT table_name FROM information_schema.tables "
            "WHERE table_schema = 'public' AND table_name LIKE 'events_%'"
        )
        tables = [row[0] for row in result]
    logger = get_run_logger()
    logger.info(f"Discovered {len(tables)} tables: {tables}")
    return tables


@task(name="extract-table", retries=2, retry_delay_seconds=10)
def extract_table(connection_string: str, table_name: str) -> str:
    """Extract a single table to a parquet file."""
    import sqlalchemy
    engine = sqlalchemy.create_engine(connection_string)
    df = pd.read_sql(f"SELECT * FROM {table_name}", engine)
    output_path = f"/tmp/raw/{table_name}.parquet"
    df.to_parquet(output_path, index=False)
    logger = get_run_logger()
    logger.info(f"Extracted {len(df)} rows from {table_name}")
    return output_path


@task(name="validate-table")
def validate_table(table_name: str, file_path: str) -> dict:
    """Validate extracted data quality."""
    df = pd.read_parquet(file_path)
    null_rate = df.isnull().mean().mean()
    duplicate_rate = 1 - (len(df.drop_duplicates()) / len(df))

    result = {
        "table": table_name,
        "row_count": len(df),
        "null_rate": float(null_rate),
        "duplicate_rate": float(duplicate_rate),
        "is_valid": null_rate < 0.1 and duplicate_rate < 0.05,
    }

    logger = get_run_logger()
    if not result["is_valid"]:
        logger.warning(f"Validation FAILED for {table_name}: {result}")
    else:
        logger.info(f"Validation passed for {table_name}")

    return result


@task(name="aggregate-results")
def aggregate_results(validations: List[dict]) -> dict:
    """Aggregate validation results across all tables."""
    total = len(validations)
    valid = sum(1 for v in validations if v["is_valid"])
    return {
        "total_tables": total,
        "valid_tables": valid,
        "validity_rate": valid / total if total > 0 else 0,
        "details": validations,
    }


@flow(name="dynamic-table-pipeline")
def dynamic_table_pipeline(connection_string: str):
    """Dynamic pipeline that processes discovered tables in parallel."""
    # Discover tables dynamically
    tables = discover_tables(connection_string)

    # Map extraction across tables (parallel execution)
    raw_files = extract_table.map(
        connection_string=connection_string,
        table_name=tables,
    )

    # Map validation across tables
    validations = validate_table.map(
        table_name=tables,
        file_path=raw_files,
    )

    # Aggregate results
    summary = aggregate_results(validations)

    logger = get_run_logger()
    logger.info(f"Pipeline summary: {summary}")

    if summary["validity_rate"] < 0.9:
        raise ValueError(
            f"Data quality below threshold: {summary['validity_rate']:.1%} "
            f"of tables passed validation"
        )

    return summary


# Run the dynamic pipeline
if __name__ == "__main__":
    result = dynamic_table_pipeline("postgresql://user:pass@host/db")
    print(result)

Prefect vs Airflow Decision Matrix: Choose Prefect when: (1) your team is Python-focused and wants minimal boilerplate, (2) you need dynamic task generation at runtime, (3) you prefer local execution for testing, (4) you want a modern UI with real-time monitoring. Choose Airflow when: (1) you need a mature ecosystem with 100+ operators, (2) your team already has Airflow expertise, (3) you need complex scheduling with catchup and backfill.

Caching Strategy: Use INPUTS cache policy to cache based on function inputs. Use custom cache keys for domain-specific caching (e.g., cache by data version, not just inputs). Set cache_expiration to prevent stale data from being served. For ML pipelines, cache model training tasks by dataset hash to avoid re-training when data hasn't changed.

  • Prefect uses Python decorators (@flow, @task) instead of DAG definitions, reducing boilerplate.
  • Tasks support retries, caching, concurrency limits, and state management out of the box.
  • Task runners determine execution strategy: Sequential, Concurrent (asyncio), or Dask (distributed).
  • Caching skips task execution when inputs haven't changed, saving compute for expensive operations.
  • Dynamic task generation (.map()) creates tasks at runtime based on data, enabling data-parallel patterns.
  • Prefect handles scheduling, retries, caching, and observability with minimal configuration.
  • Deploy to Prefect Cloud or self-hosted server with work pools and workers.
  • Test locally with flow() call before deploying to production.

Best Practices

  1. Decompose into small tasks β€” each task should have a single responsibility. This enables granular caching, retries, and concurrency control.
  2. Configure retries with backoff β€” use retries=3 and retry_delay_seconds=30 for transient errors. Increase delay for rate-limited APIs.
  3. Cache expensive tasks β€” use cache_key_fn=task_input_hash for deterministic caching. Set cache_expiration to prevent stale results.
  4. Set concurrency limits β€” limit concurrent task executions to prevent resource exhaustion. Use different limits for I/O-bound and CPU-bound tasks.
  5. Use .map() for parallelism β€” map tasks across data partitions for data-parallel execution without pre-defining task counts.
  6. Test locally before deploying β€” run flows locally with flow() call. Use pytest for unit testing individual tasks.
  7. Add logging and artifacts β€” use get_run_logger() for structured logging. Create artifacts for important results.
  8. Configure notifications β€” set up alerts for flow failures in Prefect UI. Integrate with Slack/PagerDuty for critical failures.
  9. Use subflows for modularity β€” break complex flows into subflows for readability and reusability.
  10. Monitor cache hit rates β€” track how often tasks are cached vs executed. Optimize cache keys for maximum hit rates.

Prefect vs Airflow Comparison

FeaturePrefectApache Airflow
DAG DefinitionPython decoratorsDAG objects + operators
Task Definition@task decoratorOperator classes
Dynamic WorkflowsNative .map()Dynamic DAG generation
Local TestingYes (native)Limited (requires CLI)
UIModern, real-timeMature, feature-rich
EcosystemGrowing100+ providers
SchedulingCloud or self-hostedBuilt-in scheduler
CachingNative, flexibleLimited (XCom-based)
ConcurrencyNative limitsPool-based
Best ForPython-native teamsEnterprise, complex scheduling

Task Runner Selection

Task TypeSequentialTaskRunnerConcurrentTaskRunnerDaskTaskRunner
I/O-BoundSlowFastFast
CPU-BoundSlowSlowFast
MixedSlowMediumFast
Memory-IntensiveSafeSafeRisk of OOM
DependenciesN/AasyncioDask cluster
OverheadNoneLowHigh

See Also

⭐

Premium Content

Prefect: Modern Workflow Orchestration

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement