Interview Question
βΉοΈInterview Context
Company: Google / Meta Role: Senior Data Engineer / Software Engineer Difficulty: Advanced Time: 45-60 minutes
Question: "How do you test Airflow DAGs? Explain different testing strategies including unit testing, integration testing, and mocking. What tools and frameworks do you use?"
Detailed Theory
Testing Fundamentals
# testing_fundamentals.py
"""
Airflow Testing Strategies:
1. Unit Testing:
- Test individual tasks
- Mock external dependencies
- Fast execution
- Isolated tests
2. Integration Testing:
- Test task interactions
- Test with real dependencies
- Slower execution
- End-to-end validation
3. DAG Testing:
- Test DAG structure
- Test task dependencies
- Validate configuration
- Check for errors
4. Performance Testing:
- Test execution time
- Test resource usage
- Load testing
- Stress testing
"""
1. Unit Testing Tasks
# unit_testing.py
import pytest
from unittest.mock import Mock, patch, MagicMock
from datetime import datetime
# Test simple task
def test_simple_task():
"""Test a simple task function"""
from my_dag import simple_task
result = simple_task()
assert result['status'] == 'success'
assert 'timestamp' in result
# Test task with dependencies
def test_task_with_dependencies():
"""Test task with mocked dependencies"""
from my_dag import process_data
# Mock external dependency
with patch('my_dag.get_data') as mock_get_data:
mock_get_data.return_value = [
{'id': 1, 'value': 'a'},
{'id': 2, 'value': 'b'},
]
result = process_data()
assert len(result) == 2
mock_get_data.assert_called_once()
# Test task with XCom
def test_task_with_xcom():
"""Test task that uses XCom"""
from my_dag import transform_data
# Create mock XCom
mock_ti = Mock()
mock_ti.xcom_pull.return_value = {
'records': [{'id': 1, 'value': 'test'}]
}
# Mock context
context = {
'ti': mock_ti,
'ds': '2024-01-01',
'run_id': 'test_run',
}
result = transform_data(**context)
assert result['processed'] == True
mock_ti.xcom_push.assert_called()
# Test with pytest fixtures
@pytest.fixture
def sample_data():
"""Sample data for testing"""
return {
'records': [
{'id': 1, 'value': 'a'},
{'id': 2, 'value': 'b'},
]
}
def test_with_fixture(sample_data):
"""Test using fixture"""
from my_dag import process_data
result = process_data(sample_data)
assert len(result) == len(sample_data['records'])
2. Mocking Airflow Components
# mocking_airflow.py
import pytest
from unittest.mock import Mock, patch, MagicMock
from airflow.models import DagRun, TaskInstance, Connection
from datetime import datetime
# Mock Connection
def test_with_mocked_connection():
"""Test task with mocked connection"""
from my_dag import database_task
with patch('airflow.hooks.base.BaseHook.get_connection') as mock_conn:
# Mock connection
mock_connection = Mock(spec=Connection)
mock_connection.host = 'localhost'
mock_connection.login = 'user'
mock_connection.password = 'password'
mock_conn.return_value = mock_connection
result = database_task()
assert result['status'] == 'success'
# Mock XCom
def test_with_mocked_xcom():
"""Test task with mocked XCom"""
from my_dag import process_task
with patch('airflow.models.XCom.get_one') as mock_xcom:
mock_xcom.return_value = {'data': 'test'}
result = process_task()
assert result['processed'] == True
# Mock external API
def test_with_mocked_api():
"""Test task with mocked API"""
from my_dag import api_task
with patch('requests.get') as mock_get:
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {'result': 'success'}
mock_get.return_value = mock_response
result = api_task()
assert result['status'] == 'success'
mock_get.assert_called_once()
# Mock S3 operations
def test_with_mocked_s3():
"""Test task with mocked S3"""
from my_dag import s3_task
with patch('boto3.client') as mock_boto3:
mock_s3 = Mock()
mock_boto3.return_value = mock_s3
result = s3_task()
assert result['uploaded'] == True
mock_s3.put_object.assert_called_once()
βΉοΈPro Tip
Use pytest-mock for easier mocking. It provides a mocker fixture that simplifies mock setup and teardown.
3. DAG Testing
# dag_testing.py
import pytest
from airflow.models import DagBag
from airflow.utils.state import State
import os
# Test DAG loading
def test_dag_loading():
"""Test that DAGs load without errors"""
dag_bag = DagBag(
dag_folder='/opt/airflow/dags',
include_examples=False
)
# Check for import errors
assert len(dag_bag.import_errors) == 0, \
f"DAG import errors: {dag_bag.import_errors}"
# Check that DAGs were loaded
assert len(dag_bag.dags) > 0, "No DAGs loaded"
# Test specific DAG
def test_specific_dag():
"""Test a specific DAG"""
dag_bag = DagBag(
dag_folder='/opt/airflow/dags',
include_examples=False
)
dag_id = 'my_dag'
assert dag_id in dag_bag.dags, f"DAG {dag_id} not found"
dag = dag_bag.dags[dag_id]
# Test DAG properties
assert dag.schedule_interval == '@daily'
assert dag.default_args['retries'] == 3
# Test task count
assert len(dag.tasks) > 0
# Test task dependencies
def test_task_dependencies():
"""Test task dependencies"""
dag_bag = DagBag(
dag_folder='/opt/airflow/dags',
include_examples=False
)
dag = dag_bag.dags['my_dag']
# Get task dependencies
task_dict = {task.task_id: task for task in dag.tasks}
# Test specific dependency
task_a = task_dict['task_a']
task_b = task_dict['task_b']
assert task_b in task_a.downstream_list
assert task_a in task_b.upstream_list
# Test DAG structure
def test_dag_structure():
"""Test DAG structure"""
dag_bag = DagBag(
dag_folder='/opt/airflow/dags',
include_examples=False
)
dag = dag_bag.dags['my_dag']
# Test no cycles
assert not dag.has_cycle, "DAG has circular dependencies"
# Test all tasks have operators
for task in dag.tasks:
assert task.task_id is not None
assert task.operator is not None
# Test DAG default args
def test_dag_default_args():
"""Test DAG default arguments"""
dag_bag = DagBag(
dag_folder='/opt/airflow/dags',
include_examples=False
)
dag = dag_bag.dags['my_dag']
# Test required default args
assert 'owner' in dag.default_args
assert 'retries' in dag.default_args
assert 'retry_delay' in dag.default_args
4. Integration Testing
# integration_testing.py
import pytest
from airflow.models import DagRun, TaskInstance
from airflow.utils.state import State
from airflow.utils.db import create_session
from datetime import datetime
# Integration test with database
@pytest.mark.integration
def test_database_integration():
"""Test database integration"""
from my_dag import database_task
with create_session() as session:
# Create test DAG run
dag_run = DagRun(
dag_id='test_dag',
run_id='test_run',
execution_date=datetime(2024, 1, 1),
state=State.RUNNING,
)
session.add(dag_run)
# Create task instance
ti = TaskInstance(
task_id='database_task',
dag_id='test_dag',
run_id='test_run',
state=State.RUNNING,
)
session.add(ti)
session.commit()
# Execute task
result = database_task()
# Verify result
assert result['status'] == 'success'
# Integration test with external service
@pytest.mark.integration
def test_api_integration():
"""Test API integration"""
from my_dag import api_task
# Skip if no API access
pytest.skipUnless(
os.environ.get('TEST_API_ACCESS'),
'API access not configured'
)
result = api_task()
assert result['status'] == 'success'
# Integration test with file system
@pytest.mark.integration
def test_file_system_integration():
"""Test file system integration"""
from my_dag import file_task
import tempfile
import os
# Create temp directory
with tempfile.TemporaryDirectory() as tmpdir:
# Set environment variable
os.environ['DATA_DIR'] = tmpdir
result = file_task()
assert result['status'] == 'success'
assert os.path.exists(os.path.join(tmpdir, 'output.csv'))
5. Testing Frameworks
# testing_frameworks.py
"""
Testing Frameworks for Airflow:
1. pytest:
- Most popular Python testing framework
- Rich plugin ecosystem
- Fixtures and parametrize
2. unittest:
- Built-in Python testing
- Test classes and methods
- Mocking support
3. airflow.testing:
- Airflow-specific test utilities
- DagTestMixin
- BaseTest
4. Custom Frameworks:
- Company-specific test utilities
- Shared test fixtures
- Test helpers
"""
# pytest configuration
PYTEST_CONFIG = """
# pytest.ini or pyproject.toml
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
markers = [
"integration: Integration tests",
"unit: Unit tests",
"slow: Slow tests",
]
# conftest.py
import pytest
from airflow.models import DagBag
@pytest.fixture(scope="session")
def dag_bag():
"""Shared DAG bag for all tests"""
return DagBag(
dag_folder='/opt/airflow/dags',
include_examples=False
)
@pytest.fixture
def sample_data():
"""Sample data for tests"""
return {
'records': [
{'id': 1, 'value': 'test'},
]
}
"""
# Custom test utilities
class AirflowTestMixin:
"""Mixin for Airflow tests"""
def assert_dag_loaded(self, dag_bag, dag_id):
"""Assert DAG is loaded"""
assert dag_id in dag_bag.dags, f"DAG {dag_id} not found"
def assert_task_dependencies(self, dag, task_a_id, task_b_id):
"""Assert task dependencies"""
task_dict = {task.task_id: task for task in dag.tasks}
assert task_b_id in [
t.task_id for t in task_dict[task_a_id].downstream_list
]
def assert_dag_properties(self, dag, **kwargs):
"""Assert DAG properties"""
for key, value in kwargs.items():
assert getattr(dag, key) == value
β οΈImportant
Always mock external dependencies in unit tests. This ensures tests are isolated and don't depend on external services.
Real-World Scenarios
Scenario 1: Google's Testing Pipeline
# google_testing_pipeline.py
"""
Google-style testing pipeline:
- Automated testing in CI/CD
- Multiple test types
- Test reporting
"""
import pytest
from airflow.models import DagBag
from datetime import datetime
class TestGooglePipeline:
"""Test suite for Google pipeline"""
def test_dag_loading(self, dag_bag):
"""Test DAG loads correctly"""
assert 'google_pipeline' in dag_bag.dags
dag = dag_bag.dags['google_pipeline']
assert len(dag.tasks) > 0
def test_task_count(self, dag_bag):
"""Test expected task count"""
dag = dag_bag.dags['google_pipeline']
assert len(dag.tasks) == 5
def test_schedule_interval(self, dag_bag):
"""Test schedule interval"""
dag = dag_bag.dags['google_pipeline']
assert dag.schedule_interval == '@daily'
def test_default_args(self, dag_bag):
"""Test default arguments"""
dag = dag_bag.dags['google_pipeline']
assert dag.default_args['retries'] == 3
assert dag.default_args['retry_delay'].total_seconds() == 300
@pytest.mark.integration
def test_extract_task(self):
"""Test extract task"""
from google_pipeline import extract
result = extract()
assert result['status'] == 'extracted'
@pytest.mark.integration
def test_transform_task(self):
"""Test transform task"""
from google_pipeline import transform
data = {'records': [{'id': 1, 'value': 'test'}]}
result = transform(data)
assert result['status'] == 'transformed'
@pytest.mark.slow
def test_full_pipeline(self):
"""Test full pipeline end-to-end"""
from google_pipeline import full_pipeline
result = full_pipeline()
assert result['status'] == 'complete'
Scenario 2: Meta's Testing Framework
# meta_testing_framework.py
"""
Meta-style testing framework:
- Custom test utilities
- Shared fixtures
- Test reporting
"""
import pytest
from unittest.mock import Mock, patch
from airflow.models import DagBag
# Shared fixtures
@pytest.fixture(scope="session")
def test_config():
"""Test configuration"""
return {
'database': {
'host': 'localhost',
'port': 5432,
'database': 'test_db',
},
'api': {
'base_url': 'http://localhost:8080',
'timeout': 30,
}
}
@pytest.fixture
def mock_database(test_config):
"""Mock database connection"""
with patch('psycopg2.connect') as mock_connect:
mock_conn = Mock()
mock_cursor = Mock()
mock_cursor.fetchall.return_value = [
(1, 'test'),
]
mock_conn.cursor.return_value = mock_cursor
mock_connect.return_value = mock_conn
yield mock_conn
# Custom test class
class MetaTestBase:
"""Base class for Meta tests"""
def setup_method(self):
"""Setup for each test method"""
self.dag_bag = DagBag(
dag_folder='/opt/airflow/dags',
include_examples=False
)
def teardown_method(self):
"""Teardown for each test method"""
pass
def get_dag(self, dag_id):
"""Get DAG by ID"""
assert dag_id in self.dag_bag.dags
return self.dag_bag.dags[dag_id]
# Test class
class TestMetaPipeline(MetaTestBase):
"""Test Meta pipeline"""
def test_dag_exists(self):
"""Test DAG exists"""
dag = self.get_dag('meta_pipeline')
assert dag is not None
def test_task_dependencies(self):
"""Test task dependencies"""
dag = self.get_dag('meta_pipeline')
task_dict = {task.task_id: task for task in dag.tasks}
# Test specific dependency
assert 'transform' in [
t.task_id for t in task_dict['extract'].downstream_list
]
Edge Cases
β οΈCommon Pitfalls
-
Test Isolation: Ensure tests don't depend on each other.
-
Mock Configuration: Mock all external dependencies properly.
-
Test Data: Use realistic but safe test data.
-
Test Environment: Use separate test environment from production.
# edge_cases.py
import pytest
from unittest.mock import Mock, patch
# Isolation issue
class TestIsolation:
"""Test isolation"""
def setup_method(self):
"""Reset state before each test"""
# Clear any shared state
pass
def test_one(self):
"""First test"""
pass
def test_two(self):
"""Second test (should not depend on test_one)"""
pass
# Mock configuration issue
def test_with_proper_mocking():
"""Test with proper mocking"""
with patch('my_dag.external_service') as mock_service:
# Configure mock
mock_service.return_value = Mock()
mock_service.return_value.get_data.return_value = {
'data': 'test'
}
# Test
result = my_task()
# Verify
assert result['status'] == 'success'
mock_service.return_value.get_data.assert_called_once()
# Test data issue
@pytest.fixture
def safe_test_data():
"""Safe test data"""
return {
'email': 'test@example.com', # Not real email
'name': 'Test User',
'id': 'test-123',
}
QuizBox
Best Practices
# best_practices.py
"""
Testing Best Practices:
1. Test Structure:
- Separate unit and integration tests
- Use meaningful test names
- Follow AAA pattern (Arrange, Act, Assert)
2. Mocking:
- Mock all external dependencies
- Use appropriate mock levels
- Verify mock calls
3. Test Data:
- Use realistic but safe data
- Create test fixtures
- Clean up after tests
4. CI/CD Integration:
- Run tests automatically
- Generate test reports
- Fail builds on test failures
5. Coverage:
- Aim for high test coverage
- Test edge cases
- Test error scenarios
"""
βΉοΈGoogle Interview Tip
At Google, they emphasize comprehensive testing. When discussing testing, highlight the importance of unit testing for speed, integration testing for reliability, and proper mocking for isolation. Also mention how they integrate tests into CI/CD pipelines.
Summary
Testing is critical for reliable Airflow pipelines. Key takeaways:
- Unit tests for isolated, fast testing
- Integration tests for end-to-end validation
- DAG testing for structure and configuration
- Mocking for external dependencies
- CI/CD integration for automation
For Google and Meta interviews, focus on:
- Test strategy design
- Mocking techniques
- Test automation
- Coverage metrics
- Test maintenance
This question is part of the Apache Airflow Advanced interview preparation series. Practice implementing these patterns before your interview.