Branching Logic in Apache Airflow
Architecture Diagram
Formal Definitions
DfBranch Operator
A branch operator selects one or more downstream execution paths based on runtime conditions. Given a set of candidate paths , the branch function returns a non-empty subset of paths to execute. All non-selected paths receive a skipped state.
DfMerge Point
A merge point is a task where multiple branches converge. The merge task uses a TriggerRule to determine when to execute given the states of upstream tasks. The most common rule is NONE_FAILED_MIN_ONE_SUCCESS, which fires when at least one upstream task succeeds and none have failed.
DfShort-Circuit Operator
A short-circuit operator returns a boolean value. If False, all downstream tasks are skipped until the end of the DAG or the next join point. Formally, where triggers cascade skipping.
Detailed Explanation
BranchPythonOperator
The primary branching mechanism β choose downstream tasks based on runtime conditions.
Key Behavior:
| Aspect | Description |
|---|---|
| Return Value | task_id (string) or list of task_ids |
| Branch Selection | Other branches receive skipped state |
| Merge Required | Yes β use TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS |
| Multiple Paths | Return list for parallel execution |
def decide_path(**context):
data_volume = context['ti'].xcom_pull(task_ids='get_data_volume')
if data_volume > 10000:
return 'process_large_dataset'
elif data_volume > 1000:
return ['process_medium_a', 'process_medium_b']
else:
return 'process_small_dataset'
ShortCircuitOperator
Simpler conditional logic β return True to continue, False to skip all downstream.
Use Cases:
- Data validation gates
- Feature flags
- Environment-specific logic
- Simple yes/no decisions
Difference from BranchPythonOperator: ShortCircuitOperator doesn't select specific paths β it continues or stops entirely.
Operator Comparison
| Feature | BranchPythonOperator | ShortCircuitOperator |
|---|---|---|
| Return Type | task_id(s) or list | Boolean |
| Branching Style | Select specific path | Continue/Stop |
| Merge Required | Yes | No |
| Multiple Paths | Supported | Not supported |
| Use Case | Complex decisions | Simple conditions |
Branch Patterns
| Pattern | Description |
|---|---|
| Data-Driven | Analyze input data, choose processing paths |
| Environment-Based | Different logic for dev/staging/prod |
| Time-Based | Paths based on time of day/week |
| Error-Based | Recovery paths based on error types |
Merge Strategies
| Strategy | Description |
|---|---|
| Implicit Merge | All branches converge at single task |
| Explicit Merge | Use TriggerRule for skipped branch handling |
| Conditional Merge | Custom logic checking upstream states |
Best Practice: Limit branch nesting to β€3 levels for maintainability.
Here,
- =Context containing runtime data and XCom
- =Set of candidate downstream task paths
- =Power set of paths (all possible subsets)
Expected Task Count After Branching
Here,
- =Number of possible branch outcomes
- =Probability of selecting branch i
- =Number of tasks in branch i
Branch Overhead Ratio
Here,
- =Time to evaluate branch conditions
- =Time for skip state to propagate through downstream tasks
- =Time to execute the selected path
ThBranch Completeness (Merge Invariant)
In a valid branching DAG, every branch must have a corresponding merge point such that: (1) all paths from the branch decision reach the merge, and (2) the merge task uses TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS or equivalent. Violation: If a branch lacks a merge point, downstream tasks may be indefinitely skipped due to unmet dependencies.
ThShort-Circuit Cascade
When a ShortCircuitOperator returns False, all downstream tasks are skipped, where is the transitive closure of the dependency edge set. This propagation continues until a join point with an upstream task not in .
BranchPythonOperator can return a single task ID (string) or a list of task IDs. Returning a list executes multiple branches in parallel. All returned tasks will run concurrently until they reach the merge point.
Limit branch nesting depth to for maintainability. Deeply nested branches ( levels) significantly increase complexity and make debugging difficult. Consider refactoring into separate DAGs or using ShortCircuitOperator for simple conditions.
Environment-Based Branching: Use different logic for development, staging, and production environments.
Time-Based Branching: Execute different paths based on time of day, day of week, or other temporal conditions.
Error-Based Branching: Choose recovery paths based on error types or failure conditions.
Merge Strategies
Implicit Merge: When all branches converge at a single task, Airflow automatically handles the merge. The merge task waits for all upstream tasks to complete.
Explicit Merge: Use TriggerRule to define how the merge task handles skipped branches. Common rules include NONE_FAILED_MIN_ONE_SUCCESS and NONE_FAILED.
Conditional Merge: Implement custom merge logic using Python operators. Check upstream task states and execute appropriate logic.
Error Handling: Implement proper error handling in merge points. Consider partial failures and implement retry logic.
Key Concepts Table
| Operator | Return Type | Branching Style | Merge Required | Use Case |
|---|---|---|---|---|
| BranchPythonOperator | task_id(s) | Single/Multi-path | Yes | Complex decisions |
| ShortCircuitOperator | boolean | Continue/Stop | No | Simple conditions |
| BranchTriggerOperator | TriggerEvent | Async | Yes | Event-driven |
| ExternalBranchSensor | External state | External | Yes | Cross-system |
TriggerRule Options for Merge Points
from airflow.utils.trigger_rule import TriggerRule
# Available trigger rules
trigger_rules = {
'all_success': TriggerRule.ALL_SUCCESS, # Default
'all_failed': TriggerRule.ALL_FAILED,
'all_done': TriggerRule.ALL_DONE,
'one_failed': TriggerRule.ONE_FAILED,
'one_success': TriggerRule.ONE_SUCCESS,
'none_failed': TriggerRule.NONE_FAILED,
'none_failed_min_one_success': TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
'none_skipped': TriggerRule.NONE_SKIPPED,
}
# For branching merge points
merge_task = PythonOperator(
task_id='merge_results',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, # Recommended
...
)
| TriggerRule | Description | Use Case |
|---|---|---|
ALL_SUCCESS | All upstream tasks succeeded | Default, no branching |
NONE_FAILED_MIN_ONE_SUCCESS | No failures, at least one success | Branching merge points |
NONE_FAILED | No upstream tasks failed | Optional branches |
ONE_SUCCESS | At least one upstream succeeded | Parallel with fallback |
ALL_DONE | All upstream tasks completed | Always run final task |
NONE_SKIPPED | No upstream tasks skipped | Strict validation |
Branching Patterns
Code Examples
Advanced Branching Patterns
# advanced_branching.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import (
BranchPythonOperator,
PythonOperator,
ShortCircuitOperator,
)
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
import random
def data_quality_branch(**context):
"""Branch based on data quality assessment."""
# Simulate data quality check
data_quality_score = random.uniform(0, 1)
data_volume = random.randint(100, 10000)
# Store metrics for downstream tasks
context['ti'].xcom_push(key='quality_score', value=data_quality_score)
context['ti'].xcom_push(key='data_volume', value=data_volume)
# Decision logic
if data_quality_score < 0.3:
return 'data_cleaning'
elif data_quality_score < 0.7:
return 'data_validation'
elif data_volume > 5000:
return 'large_dataset_processing'
else:
return 'standard_processing'
def time_based_branch(**context):
"""Branch based on time of day."""
from airflow.utils import timezone
current_time = timezone.utcnow()
hour = current_time.hour
if hour < 6:
return 'night_processing'
elif hour < 12:
return 'morning_processing'
elif hour < 18:
return 'afternoon_processing'
else:
return 'evening_processing'
def environment_branch(**context):
"""Branch based on environment."""
from airflow.configuration import conf
environment = conf.get('core', 'ENVIRONMENT')
if environment == 'production':
return 'production_processing'
elif environment == 'staging':
return 'staging_processing'
else:
return 'development_processing'
def error_recovery_branch(**context):
"""Branch based on error type."""
error_type = context['ti'].xcom_pull(
task_ids='error_detection',
key='error_type'
)
error_handling_paths = {
'data_corruption': 'data_recovery',
'network_error': 'retry_operation',
'resource_exhaustion': 'scale_resources',
'timeout': 'extend_timeout',
}
return error_handling_paths.get(error_type, 'default_recovery')
def data_cleaning(**context):
"""Clean corrupted data."""
print("Performing data cleaning...")
quality_score = context['ti'].xcom_pull(
task_ids='quality_check',
key='quality_score'
)
print(f"Quality score before cleaning: {quality_score}")
def data_validation(**context):
"""Validate data quality."""
print("Performing data validation...")
quality_score = context['ti'].xcom_pull(
task_ids='quality_check',
key='quality_score'
)
print(f"Quality score: {quality_score}")
def large_dataset_processing(**context):
"""Process large datasets."""
print("Processing large dataset...")
data_volume = context['ti'].xcom_pull(
task_ids='quality_check',
key='data_volume'
)
print(f"Processing {data_volume} records")
def standard_processing(**context):
"""Standard data processing."""
print("Performing standard processing...")
def aggregate_results(**context):
"""Aggregate results from all branches."""
# Get all upstream task states
upstream_tasks = ['data_cleaning', 'data_validation',
'large_dataset_processing', 'standard_processing']
results = {}
for task_id in upstream_tasks:
try:
result = context['ti'].xcom_pull(task_ids=task_id)
results[task_id] = result
except Exception:
results[task_id] = None
print(f"Aggregated results: {results}")
return results
with DAG(
'advanced_branching_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Advanced branching patterns',
schedule_interval=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['branching', 'advanced'],
) as dag:
start = EmptyOperator(task_id='start')
# Quality check branch
quality_branch = BranchPythonOperator(
task_id='quality_check',
python_callable=data_quality_branch,
)
# Branch tasks
cleaning = PythonOperator(
task_id='data_cleaning',
python_callable=data_cleaning,
)
validation = PythonOperator(
task_id='data_validation',
python_callable=data_validation,
)
large_processing = PythonOperator(
task_id='large_dataset_processing',
python_callable=large_dataset_processing,
)
standard_processing = PythonOperator(
task_id='standard_processing',
python_callable=standard_processing,
)
# Merge point
merge = PythonOperator(
task_id='aggregate_results',
python_callable=aggregate_results,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)
# Time-based branch
time_branch = BranchPythonOperator(
task_id='time_branch',
python_callable=time_based_branch,
)
# Time-based tasks
night_task = PythonOperator(
task_id='night_processing',
python_callable=lambda: print("Night processing"),
)
morning_task = PythonOperator(
task_id='morning_processing',
python_callable=lambda: print("Morning processing"),
)
afternoon_task = PythonOperator(
task_id='afternoon_processing',
python_callable=lambda: print("Afternoon processing"),
)
evening_task = PythonOperator(
task_id='evening_processing',
python_callable=lambda: print("Evening processing"),
)
# Final merge
final_merge = EmptyOperator(
task_id='final_merge',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)
# Set dependencies
start >> quality_branch
quality_branch >> [cleaning, validation, large_processing, standard_processing]
[cleaning, validation, large_processing, standard_processing] >> merge
merge >> time_branch
time_branch >> [night_task, morning_task, afternoon_task, evening_task]
[night_task, morning_task, afternoon_task, evening_task] >> final_merge
ShortCircuit Patterns
# shortcircuit_patterns.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import ShortCircuitOperator, PythonOperator
from airflow.operators.empty import EmptyOperator
def check_data_availability(**context):
"""Check if required data is available."""
import os
required_files = [
'/data/input1.csv',
'/data/input2.csv',
'/data/config.json',
]
missing_files = [f for f in required_files if not os.path.exists(f)]
if missing_files:
print(f"Missing files: {missing_files}")
return False
print("All required files are available")
return True
def check_api_health(**context):
"""Check if external API is healthy."""
import requests
try:
response = requests.get(
'http://api.example.com/health',
timeout=10
)
if response.status_code == 200:
print("API is healthy")
return True
else:
print(f"API returned status code: {response.status_code}")
return False
except Exception as e:
print(f"API health check failed: {e}")
return False
def check_database_connectivity(**context):
"""Check database connectivity."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
try:
hook = PostgresHook(postgres_conn_id='postgres_default')
result = hook.get_first("SELECT 1")
if result:
print("Database connection successful")
return True
else:
print("Database connection failed")
return False
except Exception as e:
print(f"Database check failed: {e}")
return False
def check_resource_availability(**context):
"""Check if sufficient resources are available."""
import psutil
cpu_percent = psutil.cpu_percent()
memory = psutil.virtual_memory()
if cpu_percent > 90:
print(f"CPU usage too high: {cpu_percent}%")
return False
if memory.percent > 85:
print(f"Memory usage too high: {memory.percent}%")
return False
print(f"Resources available - CPU: {cpu_percent}%, Memory: {memory.percent}%")
return True
def process_data(**context):
"""Process data after all checks pass."""
print("All checks passed, processing data...")
with DAG(
'shortcircuit_patterns_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='ShortCircuit operator patterns',
schedule_interval=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['shortcircuit', 'patterns'],
) as dag:
start = EmptyOperator(task_id='start')
# Sequential checks
check_files = ShortCircuitOperator(
task_id='check_files',
python_callable=check_data_availability,
)
check_api = ShortCircuitOperator(
task_id='check_api',
python_callable=check_api_health,
)
check_db = ShortCircuitOperator(
task_id='check_database',
python_callable=check_database_connectivity,
)
check_resources = ShortCircuitOperator(
task_id='check_resources',
python_callable=check_resource_availability,
)
# Processing tasks
process = PythonOperator(
task_id='process_data',
python_callable=process_data,
)
# Cleanup
cleanup = EmptyOperator(task_id='cleanup')
# Set dependencies
start >> check_files >> check_api >> check_db >> check_resources
check_resources >> process >> cleanup
Complex Branching with External Dependencies
# complex_branching_external.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import TaskInstance
from airflow import settings
def external_dependency_branch(**context):
"""Branch based on external DAG task states."""
session = settings.Session()
# Check state of external task
external_task = session.query(TaskInstance).filter(
TaskInstance.dag_id == 'external_pipeline',
TaskInstance.task_id == 'extract',
TaskInstance.execution_date == context['execution_date'],
).first()
if external_task is None:
return 'wait_for_external'
if external_task.state == 'success':
return 'process_data'
elif external_task.state == 'failed':
return 'handle_failure'
else:
return 'wait_for_external'
def composite_branch(**context):
"""Branch based on multiple conditions."""
# Get data from previous tasks
data_quality = context['ti'].xcom_pull(
task_ids='quality_assessment',
key='quality_score'
)
data_volume = context['ti'].xcom_pull(
task_ids='volume_assessment',
key='volume'
)
system_load = context['ti'].xcom_pull(
task_ids='system_check',
key='load'
)
# Composite decision logic
if data_quality < 0.3:
return 'data_cleaning'
elif data_volume > 10000 and system_load > 0.8:
return 'queue_processing'
elif data_volume > 10000:
return 'distributed_processing'
else:
return 'standard_processing'
def parallel_branch_decision(**context):
"""Decide which parallel branches to execute."""
import random
# Simulate random decision
branches_to_execute = random.sample(
['branch_a', 'branch_b', 'branch_c', 'branch_d'],
k=random.randint(1, 4)
)
return branches_to_execute
with DAG(
'complex_branching_external_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Complex branching with external dependencies',
schedule_interval=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['branching', 'complex', 'external'],
) as dag:
start = EmptyOperator(task_id='start')
# Wait for external dependency
wait_external = ExternalTaskSensor(
task_id='wait_for_external',
external_dag_id='external_pipeline',
external_task_id='extract',
mode='reschedule',
poke_interval=300,
)
# Branch based on external state
external_branch = BranchPythonOperator(
task_id='external_dependency_branch',
python_callable=external_dependency_branch,
)
# Branch tasks
process_data = PythonOperator(
task_id='process_data',
python_callable=lambda: print("Processing data..."),
)
handle_failure = PythonOperator(
task_id='handle_failure',
python_callable=lambda: print("Handling failure..."),
)
# Composite branch
quality_assessment = PythonOperator(
task_id='quality_assessment',
python_callable=lambda: {'quality_score': 0.8},
)
volume_assessment = PythonOperator(
task_id='volume_assessment',
python_callable=lambda: {'volume': 5000},
)
system_check = PythonOperator(
task_id='system_check',
python_callable=lambda: {'load': 0.5},
)
composite_branch_op = BranchPythonOperator(
task_id='composite_branch',
python_callable=composite_branch,
)
# Processing branches
data_cleaning = PythonOperator(
task_id='data_cleaning',
python_callable=lambda: print("Cleaning data..."),
)
queue_processing = PythonOperator(
task_id='queue_processing',
python_callable=lambda: print("Queue processing..."),
)
distributed_processing = PythonOperator(
task_id='distributed_processing',
python_callable=lambda: print("Distributed processing..."),
)
standard_processing = PythonOperator(
task_id='standard_processing',
python_callable=lambda: print("Standard processing..."),
)
# Parallel branch decision
parallel_branch = BranchPythonOperator(
task_id='parallel_branch_decision',
python_callable=parallel_branch_decision,
)
# Parallel branch tasks
branch_a = PythonOperator(
task_id='branch_a',
python_callable=lambda: print("Branch A executing"),
)
branch_b = PythonOperator(
task_id='branch_b',
python_callable=lambda: print("Branch B executing"),
)
branch_c = PythonOperator(
task_id='branch_c',
python_callable=lambda: print("Branch C executing"),
)
branch_d = PythonOperator(
task_id='branch_d',
python_callable=lambda: print("Branch D executing"),
)
# Merge points
merge_point_1 = EmptyOperator(
task_id='merge_point_1',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)
merge_point_2 = EmptyOperator(
task_id='merge_point_2',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)
final_merge = EmptyOperator(
task_id='final_merge',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)
# Set dependencies
start >> wait_external >> external_branch
external_branch >> [process_data, handle_failure]
process_data >> [quality_assessment, volume_assessment, system_check]
[quality_assessment, volume_assessment, system_check] >> composite_branch_op
composite_branch_op >> [data_cleaning, queue_processing, distributed_processing, standard_processing]
[data_cleaning, queue_processing, distributed_processing, standard_processing] >> merge_point_1
merge_point_1 >> parallel_branch
parallel_branch >> [branch_a, branch_b, branch_c, branch_d]
[branch_a, branch_b, branch_c, branch_d] >> merge_point_2
[handle_failure, merge_point_2] >> final_merge
Performance Metrics
Branching Performance
| Metric | Description | Optimization Strategy | Warning Threshold |
|---|---|---|---|
| Branch Decision Time | Time to evaluate branch conditions | Optimize condition logic | > 1 second |
| Task Skip Rate | Percentage of tasks skipped | Balance branch granularity | > 80% skipped |
| Merge Complexity | Time to resolve merge dependencies | Use efficient trigger rules | > 10 seconds |
| Branch Depth | Nesting level of branches | Limit depth for readability | > 3 levels |
| Parallel Branch Count | Number of concurrent branches | Balance parallelism vs complexity | > 10 branches |
| XCom Usage | Data passed between branches | Minimize XCom data size | > 48KB |
| Error Propagation | How errors affect branching | Implement proper error handling | N/A |
| Resource Usage | Resources consumed by branching | Optimize branch logic | > 100MB |
Branch Decision Time Analysis
| Decision Type | Typical Time | Optimization |
|---|---|---|
| Simple if/else | < 100ms | None needed |
| XCom lookup | 100-500ms | Cache results |
| Database query | 500ms-2s | Add indexes |
| API call | 1-5s | Use async |
| ML inference | 5-30s | Offload to separate service |
Branching Overhead Analysis
| Pattern | Decision Time | Skip Propagation | Total Overhead |
|---|---|---|---|
| Simple branch (2 paths) | < 100ms | < 100ms | < 200ms |
| Multi-branch (4 paths) | < 200ms | < 200ms | < 400ms |
| Nested branch (2 levels) | < 300ms | < 300ms | < 600ms |
| Complex branch (8+ paths) | < 500ms | < 500ms | < 1s |
Best Practices
1. Branch Granularity
Keep branch logic simple and focused. Avoid complex nested branches that are hard to understand and maintain.
# Good: Simple, focused branch logic
def decide_processing_path(**context):
"""Simple branch with clear decision criteria."""
data_quality = context['ti'].xcom_pull(task_ids='assess_quality')
if data_quality < 0.3:
return 'clean_data'
elif data_quality < 0.7:
return 'validate_data'
else:
return 'process_data'
# Bad: Complex, hard-to-understand branch logic
def complex_branch(**context):
"""Avoid this - too many conditions, hard to debug."""
if condition_a and condition_b or condition_c:
if condition_d:
return 'path_1'
else:
if condition_e:
return 'path_2'
else:
return 'path_3'
else:
return 'path_4'
2. Merge Point Design
Always include proper merge points after branching. Use appropriate trigger rules to handle skipped branches.
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
# Always include merge point after branching
merge = EmptyOperator(
task_id='merge_point',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, # Required for branching
)
# Connect branches to merge
[branch_a, branch_b, branch_c] >> merge
3. Error Handling
Implement error handling for branch decisions. Consider what happens when branch logic fails.
def safe_branch_decision(**context):
"""Branch with error handling."""
try:
data_volume = context['ti'].xcom_pull(task_ids='get_volume')
if data_volume > 10000:
return 'process_large'
else:
return 'process_small'
except Exception as e:
# Log error and return default path
logging.error(f"Branch decision failed: {e}")
return 'process_default' # Fallback path
4. Testing
Test all branch paths thoroughly. Use Airflow's testing utilities to verify branch behavior.
# Test branching logic
def test_branch_decision():
"""Test all branch paths."""
from airflow.models import DagBag
dagbag = DagBag(dag_folder='/opt/airflow/dags')
dag = dagbag.get_dag('my_branching_dag')
# Verify branch operator exists
branch_task = dag.get_task('branch_decision')
assert branch_task is not None
# Verify all branch targets exist
possible_paths = ['path_a', 'path_b', 'path_c']
for path in possible_paths:
task = dag.get_task(path)
assert task is not None, f"Task {path} not found"
# Verify merge point exists
merge_task = dag.get_task('merge_point')
assert merge_task is not None
assert merge_task.trigger_rule == TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
5. Documentation
Document branch logic and decision criteria. Explain why certain paths are chosen under different conditions.
6. Monitoring
Track branch execution metrics. Monitor which branches are taken most frequently.
7. Performance
Optimize branch decision logic to minimize execution time. Avoid expensive operations in branch conditions.
8. Maintainability
Use descriptive task IDs for branch tasks. Keep branch logic in separate functions for clarity.
9. XCom Management
Minimize data passed through XCom in branch decisions. Use appropriate XCom backends for larger data.
10. Alternative Patterns
Consider using ShortCircuitOperator for simple conditions. Use TriggerOperator for event-driven branching.
Key Takeaways:
- Branch function selects one or more paths
- Expected task count:
- Every branch requires a merge point with appropriate TriggerRule
- Short-circuit cascade skips all downstream tasks until a join point
- Use
TriggerRule.NONE_FAILED_MIN_ONE_SUCCESSat merge points - Limit branch nesting depth to for maintainability
See also: Kafka Connect (kafka/03), PySpark Submit (pyspark/19), Data Engineering Orchestration (data-engineering/017)
See Also
- Airflow Architecture β Core architecture overview
- DAG Design Patterns β DAG composition patterns
- Operators and Hooks β Operator-based branching
- XCom Communications β Branching data flow patterns