πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Testing DAGs and Operators in Apache Airflow

🟒 Free Lesson

Advertisement

Testing DAGs and Operators

Testing PyramidUnit Tests (80%)Task logic, XCom, branchingIntegration Tests (15%)Hooks, connections, external systemsE2E Tests (5%)Full pipeline execution, scheduler, executor

Architecture Diagram

Formal Definitions

DfDAG Validation

DAG Validation is the process of verifying that a DAG file can be imported and parsed without errors, and that the resulting DAG object meets structural requirements (acyclicity, valid dependencies, correct task types). Formally, validation checks Vdag=Vimport∩Vstructure∩VsemanticV_{\text{dag}} = V_{\text{import}} \cap V_{\text{structure}} \cap V_{\text{semantic}}.

DfUnit Test

A unit test verifies a single component (function, operator, hook) in isolation with mocked dependencies. The test boundary BunitB_{\text{unit}} isolates the component from external systems (databases, APIs, networks).

DfIntegration Test

An integration test verifies interactions between multiple components, often including real external systems. Integration tests validate I={(c1,c2):c1∈C,c2∈C,(c1,c2)∈Einteraction}I = \{(c_1, c_2) : c_1 \in C, c_2 \in C, (c_1, c_2) \in E_{\text{interaction}}\} where CC are components and EinteractionE_{\text{interaction}} are interaction edges.

Detailed Explanation

DAG Validation Tests

Validate DAG structure and imports before deployment.


Key Validations:

TestDescription
Import SuccessAll DAGs load without import errors
DAG CountVerify expected number of DAGs
Required AttributesCheck dag_id, default_args, start_date
AcyclicityNo circular dependencies
Operator TypesUse only supported operators
class TestDAGValidation:
    @pytest.fixture(autouse=True)
    def setup_dagbag(self):
        self.dagbag = DagBag(dag_folder='/opt/airflow/dags', include_examples=False)

    def test_dags_import_successfully(self):
        assert len(self.dagbag.import_errors) == 0

    def test_dag_has_no_circular_dependencies(self):
        import networkx as nx
        for dag_id, dag in self.dagbag.dags.items():
            G = nx.DiGraph()
            for task in dag.tasks:
                G.add_node(task.task_id)
                for upstream in task.upstream_list:
                    G.add_edge(upstream.task_id, task.task_id)
            assert nx.is_directed_acyclic_graph(G)

Unit Testing Tasks

Test task functions in isolation with mocked dependencies.


Testing Strategy:

Test TypeDescriptionMock
Success PathNormal operationNone
Empty InputEdge case handlingData source
Connection ErrorExternal system failureHook/Connection
Invalid DataData validationData source
class TestExtractTask:
    def test_extract_data_success(self):
        mock_context = MagicMock()
        result = extract_data(**mock_context)
        assert result is not None

    def test_extract_data_connection_error(self):
        with patch('dags.my_dag.get_source_data', side_effect=ConnectionError):
            with pytest.raises(ConnectionError):
                extract_data(**mock_context)

Mocking External Services

Isolate tests from external dependencies.


Common Mocks:

ServiceMock Target
DatabasePostgresHook, SqlAlchemy
S3/GCSS3Hook, GCSHook
APIsHttpHook, requests
Airflow ServicesVariable, Connection
@patch('dags.my_dag.S3Hook')
def test_s3_upload(self, mock_s3_hook):
    mock_hook = MagicMock()
    mock_s3_hook.return_value = mock_hook
    mock_hook.upload_file.return_value = True

    result = upload_to_s3(**mock_context)
    assert result is True
    mock_hook.upload_file.assert_called_once()

def test_transform_data(self): """Test data transformation logic.""" from dags.my_dag import transform_data

input_data = [ {"name": "alice", "score": 85}, {"name": "bob", "score": 92}, ]

result = transform_data(input_data)

assert len(result) == 2 assert result[0]["name"] == "ALICE" # Uppercase assert result[0]["grade"] == "B" # Score-based grade

def test_transform_empty_data(self): """Test transformation with empty input.""" from dags.my_dag import transform_data

result = transform_data([]) assert result == []

def test_transform_invalid_data(self): """Test transformation handles invalid data gracefully.""" from dags.my_dag import transform_data

input_data = [{"name": None, "score": "invalid"}]

with pytest.raises(ValueError): transform_data(input_data)

class TestLoadTask: """Unit tests for load task function."""

@patch('dags.my_dag.get_db_connection') def test_load_success(self, mock_db): """Test successful data loading.""" from dags.my_dag import load_data

mock_conn = MagicMock() mock_db.return_value = mock_conn

data = [{"id": 1, "name": "test"}] result = load_data(data)

assert result['loaded'] == 1 mock_conn.execute.assert_called_once()

@patch('dags.my_dag.get_db_connection') def test_load_partial_failure(self, mock_db): """Test loading with partial failures.""" from dags.my_dag import load_data

mock_conn = MagicMock() mock_conn.execute.side_effect = [ None, # First insert succeeds Exception("Duplicate key"), # Second fails None, # Third succeeds ] mock_db.return_value = mock_conn

data = [{"id": 1}, {"id": 2}, {"id": 3}] result = load_data(data)

assert result['loaded'] == 2 assert result['failed'] == 1

Architecture Diagram

### Mocking External Systems

```python
# tests/test_with_mocks.py
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.postgres.hooks.postgres import PostgresHook

class TestWithMockedExternalSystems:
    """Test tasks with mocked external dependencies."""
    
    @patch('airflow.providers.amazon.aws.hooks.s3.S3Hook')
    def test_s3_upload(self, MockS3Hook):
        """Test S3 upload with mocked hook."""
        from dags.my_dag import upload_to_s3
        
        mock_hook = MagicMock()
        MockS3Hook.return_value = mock_hook
        mock_hook.load_file.return_value = True
        
        result = upload_to_s3(
            local_path='/tmp/file.csv',
            bucket='my-bucket',
            key='data/file.csv',
        )
        
        assert result is True
        mock_hook.load_file.assert_called_once_with(
            filename='/tmp/file.csv',
            key='data/file.csv',
            bucket_name='my-bucket',
            replace=True,
        )
    
    @patch('airflow.providers.postgres.hooks.postgres.PostgresHook')
    def test_database_operations(self, MockPostgresHook):
        """Test database operations with mocked hook."""
        from dags.my_dag import process_database
        
        mock_hook = MagicMock()
        MockPostgresHook.return_value = mock_hook
        mock_hook.get_pandas_df.return_value = MagicMock()
        
        result = process_database(query="SELECT * FROM users")
        
        assert result is not None
        mock_hook.get_pandas_df.assert_called_once()
    
    @patch('requests.get')
    def test_api_call(self, mock_get):
        """Test API call with mocked requests."""
        from dags.my_dag import fetch_from_api
        
        mock_response = MagicMock()
        mock_response.status_code = 200
        mock_response.json.return_value = {"data": [1, 2, 3]}
        mock_get.return_value = mock_response
        
        result = fetch_from_api(url="http://api.example.com/data")
        
        assert result == {"data": [1, 2, 3]}
        mock_get.assert_called_once()
Test Coverage Metric
C=∣Tcovered∣∣Ttotalβˆ£Γ—100%C = \frac{|T_{\text{covered}}|}{|T_{\text{total}}|} \times 100\%

Here,

  • CC=Code coverage percentage
  • TcoveredT_{\text{covered}}=Number of tested code paths
  • TtotalT_{\text{total}}=Total number of code paths

Test Reliability Index

R=1βˆ’NflakyNtotalR = 1 - \frac{N_{\text{flaky}}}{N_{\text{total}}}

Here,

  • RR=Reliability index (0-1)
  • NflakyN_{\text{flaky}}=Number of flaky tests
  • NtotalN_{\text{total}}=Total number of tests

Use DagBag to parse DAGs in tests without starting Airflow services. This enables fast unit testing of DAG structure without database or scheduler dependencies.

Mock external systems (databases, APIs, cloud services) in unit tests. Use integration tests with real services for critical paths. Aim for 80% unit test coverage.

Key Concepts Table

Test TypeScopeSpeedDependenciesCoverage
Unit TestSingle functionFast (~ms)Mocked80%
Integration TestMultiple componentsMedium (~s)Real services15%
E2E TestFull pipelineSlow (~min)Full stack5%
DAG ValidationStructureFast (~ms)NoneAll DAGs
Performance TestExecution timeSlow (~min)Load envCritical paths

Code Examples

Comprehensive Test Suite

# tests/test_comprehensive.py
import pytest
from airflow.models import DagBag, DagRun, TaskInstance
from airflow.utils.state import State
from airflow.utils import timezone
from datetime import datetime, timedelta
from unittest.mock import MagicMock, patch

class TestDAGComprehensive:
    """Comprehensive DAG testing suite."""
    
    @pytest.fixture
    def dagbag(self):
        return DagBag(dag_folder='/opt/airflow/dags', include_examples=False)
    
    def test_all_dags_have_tags(self, dagbag):
        """All DAGs should have tags for organization."""
        for dag_id, dag in dagbag.dags.items():
            assert len(dag.tags) > 0, f"DAG {dag_id} has no tags"
    
    def test_all_dags_have_description(self, dagbag):
        """All DAGs should have descriptions."""
        for dag_id, dag in dagbag.dags.items():
            assert dag.description is not None, f"DAG {dag_id} has no description"
    
    def test_dag_default_args(self, dagbag):
        """Verify default_args configuration."""
        for dag_id, dag in dagbag.dags.items():
            if dag.default_args:
                # Check retries are configured
                assert 'retries' in dag.default_args, \
                    f"DAG {dag_id} has no retries configured"
                
                # Check retry delay
                assert 'retry_delay' in dag.default_args, \
                    f"DAG {dag_id} has no retry_delay configured"
    
    def test_dag_schedule_interval(self, dagbag):
        """Verify schedule intervals are reasonable."""
        for dag_id, dag in dagbag.dags.items():
            # Ensure schedule is not too frequent
            if dag.schedule_interval:
                assert dag.schedule_interval != '@impossible', \
                    f"DAG {dag_id} has impossible schedule"

class TestTaskDependencies:
    """Test task dependency configurations."""
    
    @pytest.fixture
    def dagbag(self):
        return DagBag(dag_folder='/opt/airflow/dags', include_examples=False)
    
    def test_noε­€η«‹_tasks(self, dagbag):
        """All tasks should have upstream or downstream dependencies."""
        for dag_id, dag in dagbag.dags.items():
            for task in dag.tasks:
                # EmptyOperator and BranchPythonOperator can be isolated
                from airflow.operators.empty import EmptyOperator
                from airflow.operators.python import BranchPythonOperator
                
                if type(task) not in [EmptyOperator, BranchPythonOperator]:
                    assert task.upstream_list or task.downstream_list, \
                        f"Task {task.task_id} in DAG {dag_id} is isolated"
    
    def test_task_count_reasonable(self, dagbag):
        """DAGs should not have excessive task counts."""
        for dag_id, dag in dagbag.dags.items():
            assert len(dag.tasks) <= 100, \
                f"DAG {dag_id} has {len(dag.tasks)} tasks (max 100)"

class TestTaskLogic:
    """Test task business logic with mocks."""
    
    def test_etl_extract(self):
        """Test ETL extract function."""
        from dags.etl_dag import extract
        
        mock_context = MagicMock()
        mock_context['ti'] = MagicMock()
        mock_context['params'] = {'source': 'test'}
        
        with patch('dags.etl_dag.get_source_connection') as mock_conn:
            mock_conn.return_value.query.return_value = [
                {'id': 1, 'value': 'test'}
            ]
            
            result = extract(**mock_context)
            
            assert result['record_count'] == 1
            assert result['status'] == 'success'
    
    def test_etl_transform(self):
        """Test ETL transform function."""
        from dags.etl_dag import transform
        
        input_data = [
            {'id': 1, 'name': 'test', 'value': 100},
            {'id': 2, 'name': 'test2', 'value': 200},
        ]
        
        result = transform(input_data)
        
        assert len(result) == 2
        assert all('transformed' in r for r in result)
    
    def test_etl_load(self):
        """Test ETL load function."""
        from dags.etl_dag import load
        
        data = [{'id': 1, 'transformed': True}]
        
        with patch('dags.etl_dag.get_db_connection') as mock_db:
            mock_db.return_value.execute.return_value = 1
            
            result = load(data)
            
            assert result['loaded'] == 1

class TestPerformance:
    """Test DAG execution performance."""
    
    def test_dag_parsing_time(self, dagbag):
        """DAGs should parse quickly."""
        import time
        
        start = time.time()
        DagBag(dag_folder='/opt/airflow/dags', include_examples=False)
        parse_time = time.time() - start
        
        assert parse_time < 30, f"DAG parsing took {parse_time:.2f}s (max 30s)"
    
    def test_task_execution_time(self):
        """Tasks should execute within time limits."""
        from airflow.models import DagBag
        
        dagbag = DagBag(dag_folder='/opt/airflow/dags')
        
        for dag_id, dag in dagbag.dags.items():
            for task in dag.tasks:
                if hasattr(task, 'execution_timeout'):
                    if task.execution_timeout:
                        assert task.execution_timeout.total_seconds() <= 3600, \
                            f"Task {task.task_id} has excessive timeout"

CI/CD Test Configuration

# .github/workflows/test.yml
name: Airflow Tests

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

jobs:
  lint:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'
      - name: Install dependencies
        run: |
          pip install flake8 black isort mypy
      - name: Lint with flake8
        run: flake8 dags/ tests/
      - name: Check formatting with black
        run: black --check dags/ tests/
      - name: Sort imports
        run: isort --check-only dags/ tests/

  test:
    runs-on: ubuntu-latest
    needs: lint
    services:
      postgres:
        image: postgres:15
        env:
          POSTGRES_USER: airflow
          POSTGRES_PASSWORD: airflow
          POSTGRES_DB: airflow
        ports:
          - 5432:5432
    steps:
      - uses: actions/checkout@v3
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'
      - name: Install Airflow
        run: |
          pip install apache-airflow==2.8.0
          pip install pytest pytest-cov
      - name: Run DAG validation
        run: pytest tests/test_dag_validation.py -v
      - name: Run unit tests
        run: pytest tests/test_tasks.py -v --cov=dags --cov-report=xml
      - name: Upload coverage
        uses: codecov/codecov-action@v3

  integration:
    runs-on: ubuntu-latest
    needs: test
    steps:
      - uses: actions/checkout@v3
      - name: Run integration tests
        run: |
          docker-compose -f docker-compose-test.yml up -d
          pytest tests/integration/ -v
          docker-compose -f docker-compose-test.yml down

Performance Metrics

Test Suite Performance

MetricTargetWarningCritical
Unit Test Duration< 5min5-10min> 10min
Integration Test Duration< 15min15-30min> 30min
Total CI Duration< 30min30-60min> 60min
Test Coverage> 80%70-80%< 70%
Flaky Test Rate< 1%1-5%> 5%

Test Distribution

Test CategoryCountDurationPriority
DAG Validation10-2010sP0
Unit Tests50-1002-5minP0
Integration Tests10-2010-15minP1
E2E Tests5-1020-30minP2

Key Takeaways:

  • Use DagBag for fast DAG validation without running Airflow services
  • Mock external systems (databases, APIs) in unit tests
  • Aim for 80% unit test coverage, 15% integration, 5% E2E
  • Validate DAG structure: acyclicity, required attributes, valid operators
  • Test task business logic independently of Airflow execution
  • Use CI/CD pipelines for automated testing on every commit

See Also

⭐

Premium Content

Testing DAGs and Operators in Apache Airflow

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Airflow Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement