πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Variable Management and Templates in Apache Airflow

🟒 Free Lesson

Advertisement

Variable Management and Templates

Variable Management ArchitectureAirflow UIAdmin {'>'} VarsCLIairflow variablesREST API/api/v1/varsEnv VarsAIRFLOW_VAR_*Metadata DBVariable TableVariable.get()In-memory cacheJinja Templating{'{{ var.value.my_key }}'} {'{{ var.json.get("key") }}'}Performance TipUse env vars for fast reads, DB for dynamic valuesVariable access is O(1) after cache, avoid in tight loops

Architecture Diagram

Formal Definitions

DfVariable

A Variable in Airflow is a key-value pair stored in the metadata database, accessible via Variable.get() or {{ var.value.key }} in templates. Variables support string, JSON, and serialized data types. Formally, V=(k,v,t)V = (k, v, t) where kk is the key, vv is the value, and tt is the data type.

DfJinja Template

A Jinja template is a string containing template expressions that are resolved at task execution time. Airflow uses Jinja2 templating with additional context variables (execution date, task instance, etc.). A template T={e1,e2,…,en}T = \{e_1, e_2, \ldots, e_n\} where each eie_i is a template expression.

DfVariable Cache

The Variable Cache stores recently accessed Variables in memory to reduce database queries. The cache uses a TTL-based eviction policy with configurable duration. Cache hit ratio Rcache=NhitsNtotalR_{\text{cache}} = \frac{N_{\text{hits}}}{N_{\text{total}}} should exceed 90% for optimal performance.

Detailed Explanation

Variable Basics

Variables provide a simple way to store configuration values across DAGs and tasks.


Operations:

OperationMethodExample
GetVariable.get(key, default_var)Variable.get("api_key", "default")
Get JSONVariable.get(key, deserialize_json=True)Returns Python dict
SetVariable.set(key, value)Variable.set("config", {"key": "val"})
CLI Getairflow variables get my_variableCommand line
CLI Setairflow variables set my_variable "value"Command line

Jinja Template Variables

Airflow provides built-in template variables for task parameters.

Common Template Variables:

VariableDescription
{{ ds }}Execution date (YYYY-MM-DD)
{{ ds_nodash }}Execution date (YYYYMMDD)
{{ ts }}Execution timestamp
{{ prev_execution_date }}Previous execution date
{{ params }}DAG run parameters
{{ ds_nodash }}Date without dashes
SELECT * FROM events 
WHERE event_date = '{{ ds }}'
AND hour = {{ params.hour }}

Variable Best Practices

PracticeDescription
Avoid in LoopDon't call Variable.get() in DAG top-level β€” causes DB query per parse
Use JSONStore complex configs as JSON variables
Secrets BackendUse Vault/AWS Secrets Manager for sensitive values
Default ValuesAlways provide default_var for optional variables

Variable Templates

from airflow.decorators import task, dag
from datetime import datetime

@dag(
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['variable-templates'],
)
def variable_template_dag():
    
    @task
    def use_variable_template():
        """Access variables using Jinja templates."""
        # In operator arguments, use Jinja syntax
        # The actual variable access happens at runtime
        from airflow.models import Variable
        
        # Direct access
        bucket = Variable.get("data_bucket")
        prefix = Variable.get("data_prefix", default_var="raw/")
        
        return f"s3://{bucket}/{prefix}"
    
    @task
    def configure_from_variables():
        """Load configuration from multiple variables."""
        from airflow.models import Variable
        import json
        
        # Load structured config
        config = Variable.get("pipeline_config", deserialize_json=True)
        
        # Access individual settings
        max_retries = Variable.get("max_retries", default_var="3")
        timeout = Variable.get("timeout_seconds", default_var="300")
        
        return {
            "bucket": config.get("bucket"),
            "region": config.get("region"),
            "max_retries": int(max_retries),
            "timeout": int(timeout),
        }
    
    config = configure_from_variables()
    use_variable_template()

variable_template_dag()
Variable Cache TTL
Tcache=Taccess+Ξ”TTTLT_{\text{cache}} = T_{\text{access}} + \Delta T_{\text{TTL}}

Here,

  • TcacheT_{\text{cache}}=Time when cached value expires
  • TaccessT_{\text{access}}=Time of last cache access
  • Ξ”TTTL\Delta T_{\text{TTL}}=Configured TTL duration

Variable Lookup Performance

Lvar={LcacheifΒ cacheΒ hitLdb+Lcache_updateifΒ cacheΒ missL_{\text{var}} = \begin{cases} L_{\text{cache}} & \text{if cache hit} \\ L_{\text{db}} + L_{\text{cache\_update}} & \text{if cache miss} \end{cases}

Here,

  • LvarL_{\text{var}}=Total variable lookup latency
  • LcacheL_{\text{cache}}=Cache lookup latency (~1ΞΌs)
  • LdbL_{\text{db}}=Database lookup latency (~5ms)
  • L_{\text{cache_update}}=Cache update overhead (~1ms)

Variables are cached in memory with a default TTL of 5 minutes. Frequent access to the same variable will use the cached value, reducing database load. Use Variable.get() sparingly in loops; fetch once and reuse.

For sensitive data like API keys, use Airflow's Secrets Backend (Vault, AWS Secrets Manager) instead of Variables. Variables are stored in plaintext in the metadata database.

Key Concepts Table

Variable TypeStorageAccess MethodUse Case
StringMetadata DBVariable.get("key")Simple config
JSONMetadata DBVariable.get("key", deserialize_json=True)Complex config
EnvironmentOS EnvironmentVariable.get("key") with env prefixSecrets, Docker
Secrets BackendExternal vaultVariable.get("key") with backendProduction secrets
DAG ParametersDAG run{{ params.key }}Runtime config

Code Examples

Advanced Templating Patterns

from airflow.decorators import task, dag
from datetime import datetime, timedelta
from typing import Dict, Any

@dag(
    schedule_interval="0 6 * * *",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['templating', 'advanced'],
    params={
        "environment": "production",
        "dry_run": False,
        "batch_size": 1000,
    },
)
def advanced_templating_dag():
    
    @task
    def process_with_templates(
        ds: str,
        ds_nodash: str,
        execution_date: datetime,
        prev_execution_date: datetime,
        params: Dict[str, Any],
        **context,
    ):
        """Demonstrate advanced template usage."""
        from airflow.models import Variable
        
        # Access DAG run parameters
        environment = params.get("environment", "development")
        dry_run = params.get("dry_run", False)
        batch_size = params.get("batch_size", 1000)
        
        # Access variables
        config = Variable.get(f"{environment}_config", deserialize_json=True)
        
        # Build dynamic query
        query = f"""
            SELECT * FROM events 
            WHERE event_date >= '{prev_execution_date.strftime("%Y-%m-%d")}'
            AND event_date < '{ds}'
            AND environment = '{environment}'
            LIMIT {batch_size}
        """
        
        print(f"Environment: {environment}")
        print(f"Dry run: {dry_run}")
        print(f"Query:\n{query}")
        
        return {
            "environment": environment,
            "dry_run": dry_run,
            "batch_size": batch_size,
            "query": query,
        }
    
    @task
    def generate_report(config: dict):
        """Generate report with templated content."""
        from airflow.models import Variable
        
        report_template = Variable.get("report_template")
        
        # Use Jinja2 directly for complex templating
        from jinja2 import Template
        
        template = Template(report_template)
        report = template.render(
            environment=config["environment"],
            date=config.get("execution_date"),
            batch_size=config["batch_size"],
        )
        
        return report
    
    config = process_with_templates()
    generate_report(config)

advanced_templating_dag()

Variable Caching Strategy

# variable_caching.py
from airflow.models import Variable
from airflow import settings
from datetime import datetime, timedelta
import json

class VariableCache:
    """Custom variable caching with TTL."""
    
    def __init__(self, ttl_seconds=300):
        self.ttl = timedelta(seconds=ttl_seconds)
        self.cache = {}
        self.last_access = {}
    
    def get(self, key, default=None, deserialize_json=False):
        """Get variable with caching."""
        now = datetime.now()
        
        # Check cache
        if key in self.cache:
            if now - self.last_access[key] < self.ttl:
                value = self.cache[key]
                if deserialize_json and isinstance(value, str):
                    return json.loads(value)
                return value
        
        # Cache miss - fetch from database
        try:
            value = Variable.get(key, default_var=default)
            self.cache[key] = value
            self.last_access[key] = now
            
            if deserialize_json and isinstance(value, str):
                return json.loads(value)
            return value
        except Exception as e:
            print(f"Error fetching variable {key}: {e}")
            return default
    
    def set(self, key, value, serialize_json=False):
        """Set variable and update cache."""
        if serialize_json:
            value = json.dumps(value)
        
        Variable.set(key, value)
        self.cache[key] = value
        self.last_access[key] = datetime.now()
    
    def invalidate(self, key):
        """Invalidate cached variable."""
        self.cache.pop(key, None)
        self.last_access.pop(key, None)
    
    def clear(self):
        """Clear entire cache."""
        self.cache.clear()
        self.last_access.clear()

# Usage in DAG
variable_cache = VariableCache(ttl_seconds=600)

@dag(schedule_interval="@daily", start_date=datetime(2024, 1, 1))
def cached_variable_dag():
    
    @task
    def process_data():
        """Use cached variable access."""
        config = variable_cache.get("pipeline_config", deserialize_json=True)
        api_key = variable_cache.get("api_key")
        
        print(f"Config: {config}")
        print(f"Using API key: {api_key[:4]}...")
        
        return {"status": "success"}
    
    process_data()

cached_variable_dag()

Variable-Based DAG Configuration

from airflow.decorators import task, dag
from datetime import datetime
from airflow.models import Variable
import json

# Load environment-specific configuration
ENVIRONMENT = Variable.get("environment", default_var="development")
CONFIG = Variable.get(f"{ENVIRONMENT}_config", deserialize_json=True)

@dag(
    schedule_interval=CONFIG.get("schedule", "@daily"),
    start_date=datetime(2024, 1, 1),
    catchup=CONFIG.get("catchup", False),
    tags=[ENVIRONMENT, 'config-driven'],
    max_active_runs=CONFIG.get("max_active_runs", 1),
)
def config_driven_dag():
    
    @task
    def extract():
        """Extract using environment config."""
        source = CONFIG.get("source")
        connection_id = CONFIG.get(f"{source}_conn_id")
        
        print(f"Extracting from {source} using {connection_id}")
        return {"source": source, "connection_id": connection_id}
    
    @task
    def transform(extract_result: dict):
        """Transform using environment config."""
        transformations = CONFIG.get("transformations", [])
        
        print(f"Applying {len(transformations)} transformations")
        for t in transformations:
            print(f"  - {t}")
        
        return extract_result
    
    @task
    def load(transform_result: dict):
        """Load using environment config."""
        destination = CONFIG.get("destination")
        table = CONFIG.get("load_table")
        
        print(f"Loading to {destination}.{table}")
        return {"status": "success", "records": 0}
    
    extract() >> transform() >> load()

config_driven_dag()

Performance Metrics

Variable Access Patterns

Access PatternLatencyUse Case
Direct Variable.get()~5msOne-time access
Cached access~1ΞΌsRepeated access
Template variable~0ms (pre-rendered)DAG parameter
Secrets Backend~50msSecure values

Variable vs Parameter Comparison

FeatureVariableDAG ParameterTemplate Variable
StorageMetadata DBDAG RunPre-rendered
ScopeGlobalPer-runPer-task
MutabilityMutableImmutableImmutable
Access PatternVariable.get()params.key{{ var.value.key }}
Performance~5ms~0ms~0ms
Use CaseShared configRuntime configTask config

Key Takeaways:

  • Variables are global key-value pairs stored in the metadata database
  • Use Variable.get(key, deserialize_json=True) for JSON configuration
  • Variable caching reduces database load; default TTL is 5 minutes
  • Template variables ({{ var.value.key }}) provide concise access in operator arguments
  • For sensitive data, use Secrets Backends instead of plaintext Variables
  • DAG Parameters enable runtime configuration without modifying DAG code

See Also

⭐

Premium Content

Variable Management and Templates in Apache Airflow

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Airflow Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement