TaskFlow API and Decorators
Architecture Diagram
Formal Definitions
DfTaskFlow API
The TaskFlow API is a modern DAG authoring paradigm in Airflow that uses Python decorators (@task, @dag) to reduce boilerplate code. It automatically handles XCom pushing/pulling of return values, enabling function-based task definitions where return values are pushed and upstream return values are injected as function parameters.
DfDynamic Task Mapping
Dynamic Task Mapping allows creating tasks at runtime based on data. Given a function where is input data and is the set of possible tasks, the .map() method expands a single task into parallel task instances at schedule time.
DfXCom Auto-Serialization
XCom Auto-Serialization is the mechanism by which TaskFlow automatically serializes return values to XCom and deserializes them when passed as function arguments. The serialization follows where is the return value and is the serialized bytes stored in the metadata database.
Detailed Explanation
Basic TaskFlow Usage
The TaskFlow API transforms DAG writing using Python decorators instead of explicit operator instantiation.
Key Benefits:
| Feature | Description |
|---|---|
| @task decorator | Transform functions into Airflow tasks |
| @dag decorator | Define DAG with function definition |
| Auto XCom | Return values auto-pushed to XCom |
| Type Hints | Auto-pull upstream XCom via function arguments |
| Dynamic Mapping | .map() creates parallel task instances |
@dag(schedule_interval=timedelta(hours=1), start_date=datetime(2024, 1, 1), catchup=False)
def taskflow_example():
@task
def extract_data():
return [{"id": 1, "name": "Alice"}] # Auto-pushed to XCom
@task
def transform_data(raw_data: list) -> list: # Auto-pulled from XCom
return [r.upper() for r in raw_data]
raw = extract_data()
transformed = transform_data(raw) # Dependency auto-inferred
TaskFlow with Multiple Returns
@task
def extract_metrics() -> Tuple[List[Dict], Dict]:
return records, metadata # Each pushed as separate XCom key
@task
def process_records(records: list, metadata: dict): # Pull by index
print(f"Processing {metadata['total_records']} records")
Dynamic Task Mapping
@task
def get_regions() -> list:
return ["us-east-1", "us-west-2", "eu-west-1"]
@task
def process_region(region: str) -> dict:
return {"region": region, "status": "success"}
@task
def aggregate_results(region_results: list) -> dict:
return {"total_regions": len(region_results)}
# Usage
regions = get_regions()
results = process_region.map(regions) # Dynamic mapping
aggregate_results(results)
Use Case: Process variable number of items (regions, files, etc.) in parallel. "total_records": total_records, "regions": [r["region"] for r in region_results], }
Dynamic mapping - creates parallel tasks at runtime
regions = get_regions() region_results = process_region.map(regions) aggregate_results(region_results)
dynamic_mapping_dag()
<MathKeyFormula
title="Dynamic Task Count"
tex={`N_{\\text{tasks}} = |f_{\\text{map}}(D)|`}
variables={[
{ symbol: "N_{\\text{tasks}}", description: "Number of mapped task instances created" },
{ symbol: "f_{\\text{map}}", description: "Mapping function applied to input data" },
{ symbol: "D", description: "Input dataset (list, dict, or XCom)" }
]}
/>
<MathFormula
title="TaskFlow XCom Throughput"
tex={`T_{\\text{xcom}} = \\sum_{i=1}^{n} (S_{\\text{push},i} + S_{\\text{pull},i}) \\cdot L_{\\text{latency}}`}
variables={[
{ symbol: "n", description: "Number of tasks with XCom operations" },
{ symbol: "S_{\\text{push},i}", description: "Serialization size for task i push" },
{ symbol: "S_{\\text{pull},i}", description: "Deserialization size for task i pull" },
{ symbol: "L_{\\text{latency}}", description: "XCom backend latency per operation" }
]}
/>
<MathNote type="info">
TaskFlow automatically pushes return values to XCom with key `return_value`. To push multiple values, return a tuple. Each element is stored as a separate XCom with keys `return_value`, `return_value__1`, `return_value__2`, etc.
</MathNote>
<MathNote type="tip">
For large data (>48KB), use `xcom_push` and `xcom_pull` with custom keys, or configure an alternative XCom backend like S3, GCS, or a custom backend. The default database backend has performance limitations for large payloads.
</MathNote>
## Key Concepts Table
| Feature | Traditional Operators | TaskFlow API |
|---------|----------------------|--------------|
| **Task Definition** | `PythonOperator(python_callable=func)` | `@task def func():` |
| **XCom Push** | `ti.xcom_push(key, value)` | `return value` (automatic) |
| **XCom Pull** | `ti.xcom_pull(task_ids, key)` | Function parameter injection |
| **Multiple Returns** | Multiple push calls | Return tuple |
| **DAG Definition** | `with DAG(...):` block | `@dag` decorator |
| **Dynamic Mapping** | `expand()` method | `.map()` on TaskFlow tasks |
| **Code Boilerplate** | High | Low |
| **Type Hints** | Optional | Encouraged for clarity |
## Code Examples
### Advanced TaskFlow Patterns
```python
from airflow.decorators import task, dag
from datetime import datetime, timedelta
from typing import Optional, List, Dict
import json
@dag(
schedule_interval="0 6 * * *",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['taskflow', 'advanced'],
doc_md="""
## Advanced TaskFlow Patterns
Demonstrates TaskFlow API with:
- Multiple return values
- Dynamic task mapping
- Error handling
- Custom XCom keys
""",
)
def advanced_taskflow_dag():
@task(retries=3, retry_delay=timedelta(minutes=1))
def extract_from_source(source: str) -> List[Dict]:
"""Extract data with retry logic."""
import random
if random.random() < 0.1:
raise ConnectionError(f"Failed to connect to {source}")
return [
{"id": i, "source": source, "value": random.randint(1, 100)}
for i in range(10)
]
@task
def validate_data(records: List[Dict]) -> tuple:
"""Validate and separate valid/invalid records."""
valid = [r for r in records if 0 <= r["value"] <= 100]
invalid = [r for r in records if r["value"] < 0 or r["value"] > 100]
return valid, invalid
@task
def transform_record(record: Dict) -> Dict:
"""Transform a single record (used with .map())."""
return {
**record,
"value_normalized": record["value"] / 100.0,
"transformed": True,
}
@task
def load_batch(records: List[Dict], destination: str) -> int:
"""Load a batch of records."""
print(f"Loading {len(records)} records to {destination}")
return len(records)
@task
def generate_report(loaded_counts: list, invalid_records: list) -> str:
"""Generate summary report."""
total_loaded = sum(loaded_counts)
total_invalid = len(invalid_records)
report = {
"total_loaded": total_loaded,
"total_invalid": total_invalid,
"success_rate": total_loaded / (total_loaded + total_invalid) * 100,
}
return json.dumps(report, indent=2)
# Define sources
sources = ["postgres", "mysql", "mongodb"]
# Extract from multiple sources (dynamic mapping)
raw_data = extract_from_source.map(sources)
# Validate each source's data
valid_data, invalid_data = validate_data.expand(raw_data)
# Transform valid records (dynamic mapping)
transformed = transform_record.map(valid_data)
# Load to destination
loaded_count = load_batch(transformed, "data_warehouse")
# Generate report
generate_report(loaded_count, invalid_data)
advanced_taskflow_dag()
TaskFlow with XCom Backend Configuration
from airflow.decorators import task, dag
from airflow.models.xcom import BaseXCom
from datetime import datetime, timedelta
import json
import pickle
class CustomXComBackend(BaseXCom):
"""Custom XCom backend for large payloads."""
@staticmethod
def serialize_value(value):
"""Serialize value for storage."""
if isinstance(value, (dict, list)):
return json.dumps(value)
return str(value)
@staticmethod
def deserialize_value(result):
"""Deserialize value from storage."""
try:
return json.loads(result)
except (json.JSONDecodeError, TypeError):
return result
@dag(
schedule_interval="@hourly",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['taskflow', 'xcom-backend'],
render_template_as_native_obj=True,
)
def xcom_backend_dag():
@task(
xcom_backend='path.to.CustomXComBackend',
pool='data_processing',
)
def process_large_dataset(records: list) -> dict:
"""Process dataset using custom XCom backend."""
import pandas as pd
df = pd.DataFrame(records)
result = {
"mean": float(df["value"].mean()),
"std": float(df["value"].std()),
"count": int(len(df)),
"percentiles": {
"25": float(df["value"].quantile(0.25)),
"50": float(df["value"].quantile(0.50)),
"75": float(df["value"].quantile(0.75)),
},
}
return result
@task
def summarize(stats: dict) -> str:
"""Create summary string."""
return f"Processed {stats['count']} records, mean={stats['mean']:.2f}"
sample_data = [{"value": i * 1.5} for i in range(100)]
stats = process_large_dataset(sample_data)
summarize(stats)
xcom_backend_dag()
TaskMap with Cross-Task Dependencies
from airflow.decorators import task, dag
from datetime import datetime, timedelta
@dag(
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['taskflow', 'taskmap', 'cross-dependencies'],
)
def cross_task_dependency_dag():
@task
def get_config() -> dict:
"""Return configuration for downstream tasks."""
return {
"environments": ["dev", "staging", "prod"],
"parallel_workers": 4,
"timeout_seconds": 300,
}
@task
def setup_environment(env: str, config: dict) -> dict:
"""Setup each environment using config."""
return {
"environment": env,
"status": "ready",
"workers": config["parallel_workers"],
}
@task
def run_tests(env_setup: dict) -> dict:
"""Run tests in each environment."""
return {
"environment": env_setup["environment"],
"tests_passed": 42,
"tests_failed": 0,
}
@task
def cleanup_environments(results: list) -> str:
"""Cleanup all environments after tests."""
environments = [r["environment"] for r in results]
return f"Cleaned up: {', '.join(environments)}"
# Get configuration
config = get_config()
# Setup environments dynamically using config
env_setups = setup_environment.map(
env=config["environments"],
config=config, # Same config passed to all mapped tasks
)
# Run tests in each environment
test_results = run_tests.expand(env_setups)
# Cleanup
cleanup_environments(test_results)
cross_task_dependency_dag()
Performance Metrics
TaskFlow vs Traditional Operators
| Metric | Traditional | TaskFlow | Improvement |
|---|---|---|---|
| Lines of Code | ~50 per task | ~15 per task | 70% reduction |
| XCom Operations | Explicit calls | Automatic | Simplified |
| Error Handling | Manual | Built-in retries | Enhanced |
| Dynamic Mapping | Complex | Simple .map() | 80% simpler |
| Type Safety | Optional | Encouraged | Better |
| Testability | Moderate | High | Improved |
XCom Performance
| Backend | Max Payload | Latency | Use Case |
|---|---|---|---|
| Database (default) | 48KB recommended | ~5ms | Small metadata |
| S3 | 5GB | ~50ms | Large datasets |
| GCS | 5TB | ~100ms | Cloud-native |
| Redis | 512MB | ~1ms | High-throughput |
| Custom | Configurable | Varies | Specialized needs |
Key Takeaways:
- TaskFlow API reduces boilerplate by ~70% compared to traditional operators
- Return values are automatically pushed to XCom; parameters auto-pull from upstream XCom
- Dynamic task mapping with
.map()creates parallel tasks at runtime - Use custom XCom backends for payloads > 48KB
@task(retries=N)adds retry logic directly in the decorator- Tuple returns enable multiple XCom values with indexed access
See Also
- XCom Communications β XCom backends, patterns, and limitations
- DAG Design Patterns β DAG composition and design patterns
- Operators and Hooks β Traditional operator-based approaches
- Branching Logic β Conditional task execution