Interview Question
βΉοΈInterview Context
Company: Amazon / Apple Role: Senior Data Engineer / Software Engineer Difficulty: Advanced Time: 45-60 minutes
Question: "Explain XCom in Airflow. What are its limitations? When would you use XCom vs alternative patterns for passing data between tasks? How do you handle large payloads?"
Detailed Theory
XCom Fundamentals
# xcom_fundamentals.py
"""
XCom (Cross-Communication) in Airflow:
Purpose:
- Pass small amounts of data between tasks
- Store task metadata
- Enable task communication
Limitations:
- Default max size: 48KB (configurable)
- Stored in metadata DB
- Not suitable for large data
- JSON serializable only
Storage:
- Database table: xcom
- Key-value pairs per task
- Accessible via task_id and key
"""
1. Basic XCom Operations
# xcom_basics.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import Dict, Any, List
@dag(
dag_id='xcom_basics',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
)
def xcom_basics():
@task
def push_data() -> Dict[str, Any]:
"""Push data to XCom"""
data = {
'key1': 'value1',
'key2': [1, 2, 3],
'key3': {'nested': 'value'}
}
# Method 1: Return value (automatic push)
return data
# Method 2: Explicit push
# ti = context['ti']
# ti.xcom_push(key='explicit_key', value=data)
@task
def pull_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""Pull data from XCom (automatic via return)"""
print(f"Received: {data}")
return {'processed': True}
@task
def explicit_pull() -> Dict[str, Any]:
"""Explicit pull using context"""
from airflow.operators.python import get_current_context
ti = context['ti']
# Pull from specific task
data = ti.xcom_pull(
task_ids='push_data',
key='return_value'
)
# Pull specific key
specific_data = ti.xcom_pull(
task_ids='push_data',
key='explicit_key'
)
return {'data': data, 'specific': specific_data}
@task
def pull_multiple() -> List[Dict]:
"""Pull from multiple tasks"""
from airflow.operators.python import get_current_context
ti = context['ti']
# Pull from multiple tasks
results = ti.xcom_pull(
task_ids=['task1', 'task2', 'task3'],
key='return_value'
)
return results
# Dependencies
pushed = push_data()
pulled = pull_data(pushed)
explicit = explicit_pull()
multiple = pull_multiple()
pushed >> pulled >> explicit >> multiple
xcom_basics()
βΉοΈXCom Key
The default key is 'return_value'. When you return a value from a task function, it's automatically stored with this key.
2. XCom Serialization
# xcom_serialization.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import Any
import json
# Custom XCom serialization
class CustomXComSerializer:
"""Custom serializer for XCom values"""
@staticmethod
def serialize(value: Any) -> str:
"""Serialize value for XCom storage"""
if isinstance(value, (dict, list)):
return json.dumps(value)
elif isinstance(value, str):
return value
else:
return json.dumps({'value': value})
@staticmethod
def deserialize(value: str) -> Any:
"""Deserialize XCom value"""
try:
return json.loads(value)
except json.JSONDecodeError:
return value
# Configure XCom serialization
XCOM_CONFIG = """
[xcom]
# Default backend (database)
xcom_backend = airflow.models.xcom.BaseXCom
# Or use custom backend
# xcom_backend = my_module.CustomXComBackend
# Max XCom size (bytes)
max_xcom_size = 49152 # 48KB default
"""
@dag(
dag_id='xcom_serialization',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
)
def serialization_example():
@task
def serialize_data() -> dict:
"""Serialize complex data"""
import pandas as pd
# Create DataFrame
df = pd.DataFrame({
'col1': [1, 2, 3],
'col2': ['a', 'b', 'c']
})
# Convert to dict for XCom
return df.to_dict(orient='records')
@task
def deserialize_data(data: list) -> None:
"""Deserialize data"""
import pandas as pd
# Convert back to DataFrame
df = pd.DataFrame(data)
print(df)
serialized = serialize_data()
deserialized = deserialize_data(serialized)
serialization_example()
3. Custom XCom Backend
# custom_xcom_backend.py
from airflow.models.xcom import BaseXCom
from airflow.utils.session import provide_session
from typing import Any, Optional
import json
import logging
class S3XComBackend(BaseXCom):
"""
Custom XCom backend using S3 for storage.
Benefits:
- No size limit
- Better performance for large payloads
- Distributed storage
"""
@staticmethod
@provide_session
def set(
key: str,
value: Any,
task_id: str,
dag_id: str,
run_id: str,
session=None,
) -> None:
"""Store XCom value in S3"""
import boto3
# Serialize value
serialized = json.dumps(value)
# Upload to S3
s3 = boto3.client('s3')
s3.put_object(
Bucket='xcom-bucket',
Key=f'xcom/{dag_id}/{run_id}/{task_id}/{key}.json',
Body=serialized
)
# Store reference in database
xcom = BaseXCom(
key=key,
value=f's3://xcom-bucket/xcom/{dag_id}/{run_id}/{task_id}/{key}.json',
task_id=task_id,
dag_id=dag_id,
run_id=run_id,
)
session.add(xcom)
@staticmethod
@provide_session
def get(
key: str,
task_id: str,
dag_id: str,
run_id: str,
session=None,
) -> Any:
"""Retrieve XCom value from S3"""
import boto3
# Get reference from database
xcom = session.query(BaseXCom).filter(
BaseXCom.key == key,
BaseXCom.task_id == task_id,
BaseXCom.dag_id == dag_id,
BaseXCom.run_id == run_id,
).first()
if not xcom:
return None
# Download from S3
s3 = boto3.client('s3')
response = s3.get_object(
Bucket='xcom-bucket',
Key=xcom.value.replace('s3://xcom-bucket/', '')
)
# Deserialize
return json.loads(response['Body'].read())
# Configure custom backend
CUSTOM_BACKEND_CONFIG = """
[xcom]
xcom_backend = my_module.S3XComBackend
"""
β οΈImportant
Custom XCom backends must implement set and get methods. The set method should store the value, and get should retrieve it. Consider error handling and cleanup.
4. Alternative Patterns for Large Data
# alternative_patterns.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import Dict, Any
@dag(
dag_id='alternative_patterns',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
)
def alternative_patterns():
@task
def write_to_s3() -> str:
"""Write large data to S3, return path"""
import boto3
import json
large_data = {'data': list(range(1000000))}
s3 = boto3.client('s3')
key = f'data/{datetime.now().strftime("%Y%m%d")}/output.json'
s3.put_object(
Bucket='my-bucket',
Key=key,
Body=json.dumps(large_data)
)
# Return path, not data
return f's3://my-bucket/{key}'
@task
def read_from_s3(s3_path: str) -> Dict[str, Any]:
"""Read data from S3 path"""
import boto3
import json
s3 = boto3.client('s3')
bucket, key = s3_path.replace('s3://', '').split('/', 1)
response = s3.get_object(Bucket=bucket, Key=key)
return json.loads(response['Body'].read())
@task
def write_to_database() -> int:
"""Write large data to database, return count"""
import psycopg2
conn = psycopg2.connect('postgresql://user:pass@host/db')
cursor = conn.cursor()
# Insert large dataset
data = [(i, f'value_{i}') for i in range(1000000)]
args_str = ','.join(
cursor.mogrify("(%s,%s)", x).decode("utf-8")
for x in data
)
cursor.execute(
f"INSERT INTO large_table (id, value) VALUES {args_str}"
)
conn.commit()
return len(data)
@task
def read_from_database(count: int) -> list:
"""Read data from database"""
import psycopg2
conn = psycopg2.connect('postgresql://user:pass@host/db')
cursor = conn.cursor()
cursor.execute(f"SELECT * FROM large_table LIMIT {count}")
return cursor.fetchall()
# Use Variables for configuration
@task
def get_config() -> dict:
"""Get configuration from Variables"""
from airflow.models import Variable
config = Variable.get('pipeline_config', deserialize_json=True)
return config
# File-based communication
@task
def write_to_file() -> str:
"""Write to temporary file, return path"""
import tempfile
import json
large_data = {'data': list(range(1000000))}
with tempfile.NamedTemporaryFile(
mode='w',
suffix='.json',
delete=False
) as f:
json.dump(large_data, f)
return f.name
s3_path = write_to_s3()
s3_data = read_from_s3(s3_path)
db_count = write_to_database()
db_data = read_from_database(db_count)
alternative_patterns()
5. XCom in TaskFlow API
# xcom_taskflow.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import Dict, Any, List
@dag(
dag_id='xcom_taskflow',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
)
def xcom_taskflow():
@task
def extract() -> Dict[str, Any]:
"""Extract data - returns automatically pushed to XCom"""
return {
'records': [
{'id': 1, 'value': 'a'},
{'id': 2, 'value': 'b'},
]
}
@task
def transform(data: Dict[str, Any]) -> Dict[str, Any]:
"""Transform - receives from XCom automatically"""
transformed = [
{'id': r['id'], 'value': r['value'].upper()}
for r in data['records']
]
return {'records': transformed}
@task
def load(data: Dict[str, Any]) -> bool:
"""Load - receives from XCom automatically"""
# Load to target
return True
@task
def notify(success: bool) -> None:
"""Notify - receives from XCom automatically"""
if success:
print("Pipeline completed successfully")
# Automatic XCom passing
extracted = extract()
transformed = transform(extracted)
loaded = load(transformed)
notify(loaded)
xcom_taskflow()
βΉοΈPro Tip
The TaskFlow API with @task decorator automatically handles XCom passing. This is the recommended approach for most use cases as it's cleaner and less error-prone.
Real-World Scenarios
Scenario 1: Amazon's Data Pipeline
# amazon_data_pipeline.py
"""
Amazon-style data pipeline:
- Extract from multiple sources
- Pass metadata via XCom
- Handle large payloads with S3
"""
from airflow.decorators import dag, task
from datetime import datetime
from typing import Dict, Any
@dag(
dag_id='amazon_data_pipeline',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['amazon', 'production'],
)
def amazon_pipeline():
@task
def extract_metadata() -> Dict[str, Any]:
"""Extract metadata (small payload)"""
return {
'source_count': 5,
'extraction_time': datetime.now().isoformat(),
'schema_version': '1.0'
}
@task
def extract_data() -> str:
"""Extract large data to S3, return path"""
import boto3
import json
# Large extraction
data = {'records': list(range(1000000))}
s3 = boto3.client('s3')
key = f'raw/{datetime.now().strftime("%Y%m%d")}/data.json'
s3.put_object(
Bucket='data-lake',
Key=key,
Body=json.dumps(data)
)
return f's3://data-lake/{key}'
@task
def transform(
metadata: Dict[str, Any],
data_path: str
) -> str:
"""Transform data"""
import boto3
import json
# Read from S3
s3 = boto3.client('s3')
bucket, key = data_path.replace('s3://', '').split('/', 1)
response = s3.get_object(Bucket=bucket, Key=key)
data = json.loads(response['Body'].read())
# Transform
transformed = [
{'id': r['id'], 'value': r['value'].upper()}
for r in data['records']
]
# Write back to S3
output_key = f'transformed/{datetime.now().strftime("%Y%m%d")}/data.json'
s3.put_object(
Bucket='data-lake',
Key=output_key,
Body=json.dumps(transformed)
)
return f's3://data-lake/{output_key}'
@task
def load(transformed_path: str) -> Dict[str, Any]:
"""Load to data warehouse"""
# Load logic
return {'rows_loaded': 1000000}
# Pipeline
metadata = extract_metadata()
data_path = extract_data()
transformed_path = transform(metadata, data_path)
load_result = load(transformed_path)
amazon_pipeline()
Edge Cases
β οΈCommon Pitfalls
-
Size Limit: XCom values are limited to 48KB by default. Use alternative patterns for larger data.
-
Serialization: Only JSON-serializable objects can be stored. Convert complex objects first.
-
Task Retries: XCom values persist across retries. Be aware of stale data.
-
Cross-DAG: XCom is not designed for cross-DAG communication. Use ExternalTaskSensor or files.
# edge_cases.py
from airflow.decorators import dag, task
from datetime import datetime
@dag(dag_id='xcom_edge_cases', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def edge_cases():
# Size limit issue
@task
def large_payload():
"""BAD: Large payload will fail"""
# return {'data': list(range(1000000))} # Too large!
# GOOD: Return path instead
return {'path': 's3://bucket/data.json'}
# Serialization issue
@task
def serialization_issue():
"""BAD: Non-serializable object"""
import pandas as pd
# return pd.DataFrame(...) # Not serializable!
# GOOD: Convert to dict
df = pd.DataFrame({'col': [1, 2, 3]})
return df.to_dict(orient='records')
large_payload() >> serialization_issue()
edge_cases()
QuizBox
Best Practices
# best_practices.py
"""
XCom Best Practices:
1. Size Management:
- Keep payloads small (< 48KB)
- Use S3 for large data
- Return paths, not data
2. Serialization:
- Use JSON-serializable objects
- Convert complex objects first
- Test serialization
3. Key Management:
- Use descriptive keys
- Avoid key collisions
- Document key usage
4. Alternative Patterns:
- S3 for large data
- Database for structured data
- Files for temporary data
5. Monitoring:
- Track XCom size
- Monitor storage usage
- Clean up old XCom entries
"""
βΉοΈAmazon Interview Tip
At Amazon, they use XCom for metadata and small payloads, but S3 for large data. When discussing XCom, emphasize the importance of size management and alternative patterns. Also mention how they clean up old XCom entries to prevent database bloat.
Summary
XCom is essential for cross-task communication in Airflow. Key takeaways:
- XCom is for small payloads (< 48KB)
- TaskFlow API provides automatic XCom passing
- Alternative patterns (S3, DB) for large data
- Custom backends for advanced use cases
- Size management is critical
For Amazon and Apple interviews, focus on:
- Size limits and management
- Alternative patterns for large data
- Custom XCom backends
- Serialization best practices
- Performance optimization
This question is part of the Apache Airflow Advanced interview preparation series. Practice explaining these concepts before your interview.