πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Dynamic DAG Generation in Airflow

🟒 Free Lesson

Advertisement

Dynamic DAG Generation in Airflow

Dynamic DAG GenerationYAML ConfigPipeline defsDatabaseMetadata storeExternal APIDynamic configDAG FactoryPython generatorGenerated DAGsMultiple objectsGeneration FlowScan {'->'} Parse {'->'} Factory runs {'->'} Loop config {'->'} Create DAGsRegistrationglobals()[dag_id] = dag_objectUse DAG factory pattern to avoid repetitive DAG definitions

Architecture Diagram

Formal Definitions

DfDynamic DAG Generation

Dynamic DAG generation is the process of creating DAG objects at parse time using a factory function that reads external configuration. Given a configuration set C={c1,c2,…,cn}C = \{c_1, c_2, \ldots, c_n\}, the generator produces nn DAGs: Di=generate(ci)D_i = \text{generate}(c_i) for each ci∈Cc_i \in C.

DfDAG Factory Function

A DAG factory function f:C→DAGf: C \rightarrow \text{DAG} takes a configuration object and returns a fully-formed DAG instance. The function encapsulates task creation, dependency wiring, and scheduling logic parametrically.

DfIdempotent Generation

Idempotent generation means that running the factory with the same configuration produces equivalent DAGs. Formally, βˆ€c∈C\forall c \in C: f(c)≑f(c)f(c) \equiv f(c) (same task structure, dependencies, and parameters). This is critical for scheduler stability.

Detailed Explanation

Why Dynamic DAGs?

Static DAG definitions work well for small deployments. As the number of pipelines grows (hundreds or thousands), manual DAG definitions become unsustainable. Dynamic generation lets you define pipeline logic once and instantiate it across multiple datasets, teams, or environments from configuration.

Key Insight: Dynamic DAGs reduce code duplication and make it easier to maintain consistent pipeline patterns across your organization.

Configuration Sources

SourceProsConsBest For
YAML FilesVersion-controlled, readableFile size limitsSmall to medium scale
DatabaseDynamic updates, centralParse-time DB queriesLarge scale, multi-team
Python LoopsSimple, no external depsCode changes neededFixed set of pipelines
External APIReal-time configNetwork dependencyHighly dynamic environments

DAG Generation Lifecycle

  1. Scheduler scans dags/ folder for Python files
  2. Python file is parsed β€” all top-level code executes
  3. Factory function runs β€” reads configuration
  4. Config items are looped β€” creates DAG objects
  5. DAGs are registered β€” added to globals() for scheduler discovery

Pattern 1: YAML-Driven Generation

# dags/pipeline_factory.py
import yaml
from pathlib import Path
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator


def load_pipeline_config(config_path: str = '/opt/airflow/configs/pipelines.yaml') -> list:
    with open(config_path, 'r') as f:
        return yaml.safe_load(f)


def create_etl_task(task_id: str, sql: str, conn_id: str):
    """Factory for creating ETL tasks."""
    return PostgresOperator(
        task_id=task_id,
        postgres_conn_id=conn_id,
        sql=sql,
    )


def create_quality_task(task_id: str, table: str, rules: dict, conn_id: str):
    """Factory for creating quality check tasks."""
    def check_quality(**context):
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        hook = PostgresHook(postgres_conn_id=conn_id)

        for rule_name, rule_sql in rules.items():
            result = hook.get_first(rule_sql)
            if result[0] == 0:
                raise ValueError(f'Quality rule failed: {rule_name}')
        return True

    return PythonOperator(
        task_id=task_id,
        python_callable=check_quality,
    )


def generate_dags():
    """Main DAG generator function."""
    configs = load_pipeline_config()

    for config in configs:
        dag_id = f'etl_{config["name"]}'
        schedule = config.get('schedule', '@daily')

        with DAG(
            dag_id=dag_id,
            default_args={
                'owner': config.get('owner', 'data-team'),
                'retries': config.get('retries', 2),
                'retry_delay': timedelta(minutes=5),
            },
            start_date=datetime.fromisoformat(config['start_date']),
            schedule_interval=schedule,
            catchup=config.get('catchup', False),
            tags=config.get('tags', []),
            max_active_runs=config.get('max_active_runs', 1),
        ) as dag:

            extract = create_etl_task(
                task_id='extract',
                sql=config['extract_sql'],
                conn_id=config['source_conn'],
            )

            transform = create_etl_task(
                task_id='transform',
                sql=config['transform_sql'],
                conn_id=config['target_conn'],
            )

            load = create_etl_task(
                task_id='load',
                sql=config['load_sql'],
                conn_id=config['target_conn'],
            )

            quality = create_quality_task(
                task_id='quality_check',
                table=config['target_table'],
                rules=config.get('quality_rules', {}),
                conn_id=config['target_conn'],
            )

            extract >> transform >> load >> quality

        globals()[dag_id] = dag


generate_dags()

Pipeline Configuration YAML

# /opt/airflow/configs/pipelines.yaml
- name: orders_daily
  owner: orders-team
  start_date: "2024-01-01"
  schedule: "0 2 * * *"
  source_conn: source_postgres
  target_conn: warehouse_postgres
  target_table: fct_orders
  extract_sql: |
    SELECT * FROM raw_orders
    WHERE date = '{{ ds }}'
  transform_sql: |
    INSERT INTO stg_orders
    SELECT order_id, customer_id, amount * 1.1 as adjusted_amount
    FROM raw_orders WHERE date = '{{ ds }}'
  load_sql: |
    INSERT INTO fct_orders
    SELECT * FROM stg_orders WHERE date = '{{ ds }}'
  quality_rules:
    not_null: "SELECT COUNT(*) FROM fct_orders WHERE order_id IS NULL"
    positive_amount: "SELECT COUNT(*) FROM fct_orders WHERE amount <= 0"
  tags: ["orders", "daily"]

- name: customers_weekly
  owner: customer-team
  start_date: "2024-01-01"
  schedule: "0 6 * * 1"
  source_conn: source_mysql
  target_conn: warehouse_postgres
  target_table: dim_customers
  extract_sql: "SELECT * FROM customers WHERE updated_at >= '{{ ds }}'"
  transform_sql: "INSERT INTO dim_customers SELECT * FROM stg_customers"
  load_sql: "SELECT 1"
  quality_rules: {}
  tags: ["customers", "weekly"]

Pattern 2: Database-Driven Generation

# dags/db_driven_generator.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow import settings
from airflow.models import Variable


def fetch_pipeline_configs() -> list:
    """Fetch pipeline configurations from database."""
    from sqlalchemy import text

    session = settings.Session()
    result = session.execute(text("""
        SELECT pipeline_id, name, owner, schedule,
               source_conn, target_conn, query, config_json
        FROM pipeline_configurations
        WHERE is_active = true
    """))

    return [
        {
            'pipeline_id': row[0],
            'name': row[1],
            'owner': row[2],
            'schedule': row[3],
            'source_conn': row[4],
            'target_conn': row[5],
            'query': row[6],
            'config': row[7],
        }
        for row in result
    ]


def generate_dags():
    configs = fetch_pipeline_configs()

    for config in configs:
        dag_id = f'pipeline_{config["pipeline_id"]}'

        with DAG(
            dag_id=dag_id,
            default_args={'owner': config['owner']},
            start_date=datetime(2024, 1, 1),
            schedule_interval=config['schedule'],
            catchup=False,
        ) as dag:

            def run_pipeline(**context, cfg=config):
                from airflow.providers.postgres.hooks.postgres import PostgresHook
                hook = PostgresHook(postgres_conn_id=cfg['source_conn'])
                records = hook.get_records(cfg['query'])
                context['ti'].xcom_push(key='record_count', value=len(records))

            process = PythonOperator(
                task_id='process',
                python_callable=run_pipeline,
            )

        globals()[dag_id] = dag


generate_dags()

Pattern 3: Loop-Based Generation

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator


regions = ['us-east-1', 'us-west-2', 'eu-west-1', 'ap-southeast-1']
tables = ['orders', 'customers', 'products']
schedule = '0 3 * * *'

for region in regions:
    for table in tables:
        dag_id = f'etl_{region.replace("-", "_")}_{table}'

        with DAG(
            dag_id=dag_id,
            start_date=datetime(2024, 1, 1),
            schedule_interval=schedule,
            catchup=False,
            tags=[region, table],
        ) as dag:

            start = EmptyOperator(task_id='start')

            def extract(**context, r=region, t=table):
                print(f'Extracting {t} from {r}')

            def transform(**context, r=region, t=table):
                print(f'Transforming {t} for {r}')

            def load(**context, r=region, t=table):
                print(f'Loading {t} to {r}')

            ext = PythonOperator(task_id='extract', python_callable=extract)
            xform = PythonOperator(task_id='transform', python_callable=transform)
            ld = PythonOperator(task_id='load', python_callable=load)

            start >> ext >> xform >> ld

        globals()[dag_id] = dag

Key Concepts Table

PatternConfig SourceCouplingScalabilityComplexity
YAML-drivenFile on diskLowMedium (file size)Low
Database-drivenSQL databaseLowHighMedium
Loop-basedPython literalsHighLow (code change needed)Low
API-drivenExternal serviceLowHighHigh
Template-basedJinja templatesMediumMediumMedium

Performance Metrics

MetricStatic DAGsDynamic DAGsConsideration
Parse timeO(1) per fileO(n) per config itemOptimize config reads
Scheduler memoryFixed per DAGProportional to DAG countMonitor at >1000 DAGs
DAG file count1 file = 1 DAG1 file = N DAGsFewer files, more objects
ReconfigurationCode change + deployConfig change + parseFaster iteration

Best Practices

Configuration Management

  1. Idempotent generation: Always produce equivalent DAGs from the same config. Avoid time-dependent values in DAG definitions.
  2. Cache config reads: When reading from databases or APIs, cache results during parse to avoid repeated calls.
  3. Validate configs: Add schema validation for YAML/database configs before DAG generation.
  4. Version configs: Track configuration changes alongside code changes for reproducibility.

Performance Optimization

  1. Set max_active_runs: Prevent resource exhaustion when many generated DAGs trigger simultaneously.
  2. Use globals() registration: Generated DAGs must be added to globals() for the scheduler to discover them.
  3. Monitor parse time: Dynamic generation increases parse overhead. Keep config reads fast and cacheable.

Operational Guidelines

  1. Tag generated DAGs: Include source config identifiers in tags for monitoring and filtering.

Common Mistakes

MistakeImpactSolution
Non-idempotent generationDAG changes on every parseUse deterministic config values
Uncached DB queriesSlow parse timesCache in Variables or files
Missing globals() registrationDAGs not discoveredAlways use globals()[dag_id] = dag
No config validationParse errors at runtimeAdd schema validation early

Generated DAGs are re-parsed every min_file_process_interval seconds. Ensure your generation function is fast and idempotent. For database-driven configs, consider caching the configuration in a Variable or file to avoid parse-time DB queries.

Dynamic DAGs generated from a single Python file share the file's parse time. If you generate 100 DAGs from one file, all 100 are parsed together β€” this is more efficient than 100 separate files.

Key Takeaways:

  • Dynamic DAG generation produces multiple DAGs from a single factory function at parse time
  • Configuration sources: YAML files, databases, APIs, or Python loops
  • Always use globals()[dag_id] = dag to register generated DAGs
  • Ensure idempotent generation β€” same config must produce equivalent DAGs
  • Monitor parse time as generated DAG count grows
  • Generated DAGs are re-parsed periodically; keep generation functions fast and deterministic

See Also

⭐

Premium Content

Dynamic DAG Generation in Airflow

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Airflow Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement