πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Data Lineage Tracking: OpenLineage, Apache Atlas

Data EngineeringData Governance & Lineage⭐ Premium

Advertisement

Data Lineage Tracking: OpenLineage, Apache Atlas

Difficulty: Senior/Staff Level | Companies: LinkedIn, Airbnb, Netflix, Lyft, Stripe, Databricks

This question tests your understanding of data lineage systems, metadata management, and how to implement end-to-end visibility in complex data pipelines.

1. Data Lineage Fundamentals

What is Data Lineage?

Data lineage tracks the origin, movement, and transformation of data:

Lineage={(source,transform,destination)}\text{Lineage} = \{(\text{source}, \text{transform}, \text{destination})\}

Lineage Dimensions

FullΒ Lineage=HorizontalΓ—VerticalΓ—Temporal\text{Full Lineage} = \text{Horizontal} \times \text{Vertical} \times \text{Temporal}
  • Horizontal: Data flow across systems
  • Vertical: Column-level transformations within tables
  • Temporal: Version history of schema and data changes

Lineage Graph Model

G=(V,E)G = (V, E)

Where:

  • VV = {datasets, jobs, dashboards, ML models}
  • EE = {data_flow, dependency, transformation}
# Lineage graph representation
from dataclasses import dataclass
from typing import List, Dict, Set
from collections import defaultdict

@dataclass
class LineageEdge:
    source: str
    target: str
    transform_type: str  # 'copy', 'aggregate', 'filter', 'join'
    transform_sql: str
    column_mapping: Dict[str, str]

@dataclass
class LineageGraph:
    nodes: Dict[str, dict]
    edges: List[LineageEdge]
    
    def add_edge(self, edge: LineageEdge):
        self.edges.append(edge)
    
    def get_upstream(self, dataset: str, depth: int = None) -> Set[str]:
        """Get all upstream dependencies"""
        visited = set()
        queue = [(dataset, 0)]
        
        while queue:
            current, d = queue.pop(0)
            if depth and d >= depth:
                continue
            if current in visited:
                continue
            
            visited.add(current)
            for edge in self.edges:
                if edge.target == current and edge.source not in visited:
                    queue.append((edge.source, d + 1))
        
        return visited
    
    def get_downstream(self, dataset: str) -> Set[str]:
        """Get all downstream dependents"""
        visited = set()
        queue = [dataset]
        
        while queue:
            current = queue.pop(0)
            if current in visited:
                continue
            visited.add(current)
            for edge in self.edges:
                if edge.source == current and edge.target not in visited:
                    queue.append(edge.target)
        
        return visited
    
    def impact_analysis(self, dataset: str) -> Dict[str, List[str]]:
        """Analyze impact of changes to a dataset"""
        downstream = self.get_downstream(dataset)
        
        impacts = {
            'tables': [],
            'dashboards': [],
            'ml_models': [],
            'jobs': []
        }
        
        for node_id in downstream:
            node = self.nodes.get(node_id, {})
            node_type = node.get('type', '')
            if node_type == 'table':
                impacts['tables'].append(node_id)
            elif node_type == 'dashboard':
                impacts['dashboards'].append(node_id)
            elif node_type == 'ml_model':
                impacts['ml_models'].append(node_id)
            elif node_type == 'job':
                impacts['jobs'].append(node_id)
        
        return impacts

2. OpenLineage Framework

Architecture

Architecture Diagram
OpenLineage Architecture
β”œβ”€β”€ Producers (Spark, Airflow, dbt, Flink)
β”‚   └── Emit RunEvents, JobEvents, DatasetEvents
β”œβ”€β”€ API (Marquez)
β”‚   β”œβ”€β”€ Job Registry
β”‚   β”œβ”€β”€ Dataset Registry
β”‚   └── Run Registry
β”œβ”€β”€ Backends
β”‚   β”œβ”€β”€ PostgreSQL
β”‚   β”œβ”€β”€ MySQL
β”‚   └── BigQuery
└── Consumers
    β”œβ”€β”€ Lineage UI
    β”œβ”€β”€ Impact Analysis Service
    └── Data Catalog Integration

Event Model

{
  "eventType": "COMPLETE",
  "eventTime": "2024-01-15T10:30:00Z",
  "run": {
    "runId": "run-abc-123",
    "facets": {
      "spark_version": {
        "version": "3.4.1"
      },
      "job_parameters": {
        "parameters": {
          "input_date": "2024-01-15"
        }
      }
    }
  },
  "job": {
    "namespace": "production",
    "name": "etl_daily_aggregation",
    "facets": {
      "sql": {
        "query": "SELECT date, SUM(amount) FROM sales GROUP BY date"
      }
    }
  },
  "inputs": [
    {
      "namespace": "s3://data-lake",
      "name": "sales_raw",
      "facets": {
        "schema": {
          "fields": [
            {"name": "date", "type": "date"},
            {"name": "amount", "type": "decimal"}
          ]
        }
      }
    }
  ],
  "outputs": [
    {
      "namespace": "s3://data-lake",
      "name": "sales_daily",
      "facets": {
        "schema": {
          "fields": [
            {"name": "date", "type": "date"},
            {"name": "daily_total", "type": "decimal"}
          ]
        },
        "columnLineage": {
          "fields": {
            "date": {
              "inputFields": [
                {
                  "namespace": "s3://data-lake",
                  "name": "sales_raw",
                  "field": "date"
                }
              ]
            },
            "daily_total": {
              "inputFields": [
                {
                  "namespace": "s3://data-lake",
                  "name": "sales_raw",
                  "field": "amount",
                  "transformations": [
                    {
                      "type": "AGGREGATION",
                      "function": "SUM"
                    }
                  ]
                }
              ]
            }
          }
        }
      }
    }
  ]
}

Spark OpenLineage Integration

from pyspark.sql import SparkSession
from openlineage.spark import OpenLineageSparkListener

# Configure OpenLineage with Spark
spark = SparkSession.builder \
    .appName("LineageExample") \
    .config("spark.extraListeners", "openlineage.spark.SparkOpenLineageListener") \
    .config("spark.openlineage.url", "http://marquez:8080/api/v1/lineage") \
    .config("spark.openlineage.apiKey", "your-api-key") \
    .config("spark.openlineage.namespace", "production") \
    .config("spark.openlineage.parentRunId", "parent-run-uuid") \
    .getOrCreate()

# All Spark operations automatically emit lineage events
df = spark.read.parquet("s3://data-lake/raw/sales")
df_filtered = df.filter(df.amount > 100)
df_filtered.write.parquet("s3://data-lake/processed/high_value_sales")

# Manual lineage emission (advanced)
from openlineage.client import OpenLineageClient
from openlineage.client.event import RunEvent, RunState, Job, Run, Dataset

client = OpenLineageClient("http://marquez:8080/api/v1")

run = Run(runId="custom-run-id")
job = Job(namespace="production", name="custom_etl_job")

dataset = Dataset(
    namespace="s3://data-lake",
    name="processed/sales"
)

event = RunEvent(
    eventType=RunState.COMPLETE,
    run=run,
    job=job,
    outputs=[dataset]
)

client.emit(event)

3. Apache Atlas

Atlas Metadata Model

{
  "typeName": "DataSet",
  "typeCategory": "entity",
  "attributeDefinitions": [
    {
      "name": "name",
      "typeName": "string",
      "isUnique": true,
      "isIndexable": true
    },
    {
      "name": "description",
      "typeName": "string"
    },
    {
      "name": "owner",
      "typeName": "string"
    }
  ],
  "relationshipAttributeDefinitions": [
    {
      "name": "inputToProcesses",
      "typeName": "array",
      "cardinality": "LIST",
      "elementType": {
        "typeName": "Process"
      }
    },
    {
      "name": "outputFromProcesses",
      "typeName": "array",
      "cardinality": "LIST",
      "elementType": {
        "typeName": "Process"
      }
    }
  ]
}

Atlas API Integration

import requests
import json
from typing import Dict, List, Optional

class AtlasClient:
    def __init__(self, base_url: str, username: str, password: str):
        self.base_url = base_url.rstrip('/')
        self.auth = (username, password)
        self.headers = {
            'Content-Type': 'application/json',
            'Accept': 'application/json'
        }
    
    def create_entity(self, entity_type: str, attributes: dict) -> dict:
        """Create a new entity in Atlas"""
        entity = {
            'typeName': entity_type,
            'attributes': attributes
        }
        
        response = requests.post(
            f'{self.base_url}/api/v2/entity',
            auth=self.auth,
            headers=self.headers,
            json=entity
        )
        response.raise_for_status()
        return response.json()
    
    def get_lineage(self, guid: str, direction: str = 'BOTH', depth: int = 3) -> dict:
        """Get lineage for an entity"""
        response = requests.get(
            f'{self.base_url}/api/v2/lineage/{guid}',
            auth=self.auth,
            headers=self.headers,
            params={
                'direction': direction,
                'depth': depth
            }
        )
        response.raise_for_status()
        return response.json()
    
    def add_classification(self, entity_guid: str, classification: dict) -> dict:
        """Add classification to an entity"""
        response = requests.post(
            f'{self.base_url}/api/v2/entity/guid/{entity_guid}/classifications',
            auth=self.auth,
            headers=self.headers,
            json=classification
        )
        response.raise_for_status()
        return response.json()
    
    def search_entities(self, query: str, entity_type: Optional[str] = None) -> list:
        """Search for entities"""
        params = {'query': query}
        if entity_type:
            params['typeName'] = entity_type
        
        response = requests.get(
            f'{self.base_url}/api/v2/search/basic',
            auth=self.auth,
            headers=self.headers,
            params=params
        )
        response.raise_for_status()
        return response.json()['entities']

# Usage
atlas = AtlasClient('http://atlas:21000', 'admin', 'admin')

# Create table entity
table_guid = atlas.create_entity('hive_table', {
    'name': 'sales_daily',
    'dbName': 'analytics',
    'owner': 'data-team',
    'columns': [
        {'name': 'date', 'type': 'date'},
        {'name': 'daily_total', 'type': 'decimal'}
    ]
})

# Get lineage
lineage = atlas.get_lineage(table_guid['guid'], direction='INPUT', depth=5)

4. Column-Level Lineage

SQL Parsing for Column Lineage

import sqlparse
from sqlparse.sql import IdentifierList, Identifier
from sqlparse.tokens import Keyword, DML

class ColumnLineageExtractor:
    def __init__(self):
        self.lineage = {}
    
    def extract(self, sql: str) -> dict:
        """Extract column-level lineage from SQL"""
        parsed = sqlparse.parse(sql)[0]
        
        # Extract SELECT columns and their sources
        select_found = False
        for token in parsed.tokens:
            if token.ttype is DML and token.value.upper() == 'SELECT':
                select_found = True
                continue
            
            if select_found and isinstance(token, IdentifierList):
                for identifier in token.get_identifiers():
                    self._process_identifier(identifier)
        
        return self.lineage
    
    def _process_identifier(self, identifier):
        """Process a single identifier from SELECT"""
        alias = identifier.get_alias()
        name = identifier.get_real_name()
        
        # Handle column expressions
        if isinstance(identifier, Identifier):
            # Simple column reference
            if '.' in name:
                source_table, source_column = name.split('.')
                self.lineage[alias or name] = {
                    'source_table': source_table,
                    'source_column': source_column,
                    'transform': None
                }
            else:
                self.lineage[alias or name] = {
                    'source_table': None,
                    'source_column': name,
                    'transform': None
                }

# Example usage
sql = """
INSERT INTO analytics.sales_daily
SELECT 
    sale_date AS date,
    SUM(amount) AS daily_total,
    COUNT(*) AS transaction_count,
    AVG(amount) AS avg_order_value
FROM raw.sales
WHERE sale_date >= '2024-01-01'
GROUP BY sale_date
"""

extractor = ColumnLineageExtractor()
lineage = extractor.extract(sql)
print(json.dumps(lineage, indent=2))

Column Lineage Storage Model

from dataclasses import dataclass
from typing import List, Dict, Optional
from datetime import datetime

@dataclass
class ColumnLineage:
    target_dataset: str
    target_column: str
    source_dataset: str
    source_column: str
    transformation: Optional[str]
    job_name: str
    timestamp: datetime
    
    def to_dict(self):
        return {
            'target': f"{self.target_dataset}.{self.target_column}",
            'source': f"{self.source_dataset}.{self.source_column}",
            'transform': self.transformation,
            'job': self.job_name,
            'timestamp': self.timestamp.isoformat()
        }

class ColumnLineageStore:
    def __init__(self):
        self.lineages: List[ColumnLineage] = []
    
    def add_lineage(self, lineage: ColumnLineage):
        self.lineages.append(lineage)
    
    def get_column_lineage(self, dataset: str, column: str) -> List[ColumnLineage]:
        """Get all lineage for a specific column"""
        return [
            l for l in self.lineages
            if l.target_dataset == dataset and l.target_column == column
        ]
    
    def get_impact(self, dataset: str, column: str) -> Dict[str, List[str]]:
        """Get impact analysis for a column change"""
        upstream = set()
        downstream = set()
        
        for l in self.lineages:
            if l.source_dataset == dataset and l.source_column == column:
                downstream.add(f"{l.target_dataset}.{l.target_column}")
            elif l.target_dataset == dataset and l.target_column == column:
                upstream.add(f"{l.source_dataset}.{l.source_column}")
        
        return {
            'upstream': list(upstream),
            'downstream': list(downstream)
        }

5. Lineage in Modern Data Stacks

dbt Lineage

# dbt automatically captures lineage
# In your dbt project

# models/staging/stg_sales.sql
"""
SELECT
    id,
    sale_date,
    amount,
    product_id
FROM {{ source('raw', 'sales') }}
WHERE is_valid = TRUE
"""

# models/marts/daily_sales.sql
"""
SELECT
    sale_date,
    SUM(amount) AS daily_total,
    COUNT(*) AS transaction_count
FROM {{ ref('stg_sales') }}
GROUP BY sale_date
"""

# dbt provides built-in lineage
# Run: dbt docs generate
# Open: target/index.html -> Lineage Graph

Spark + OpenLineage Airflow DAG

from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
}

with DAG(
    'etl_pipeline_with_lineage',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False,
    tags=['lineage', 'production']
) as dag:
    
    extract = SparkSubmitOperator(
        task_id='extract_raw_data',
        application='s3://apps/extract.py',
        conf={
            'spark.openlineage.url': 'http://marquez:8080/api/v1/lineage',
            'spark.openlineage.namespace': 'production',
            'spark.openlineage.job.name': 'etl_pipeline.extract'
        }
    )
    
    transform = SparkSubmitOperator(
        task_id='transform_data',
        application='s3://apps/transform.py',
        conf={
            'spark.openlineage.url': 'http://marquez:8080/api/v1/lineage',
            'spark.openlineage.namespace': 'production'
        }
    )
    
    load = SparkSubmitOperator(
        task_id='load_to_warehouse',
        application='s3://apps/load.py',
        conf={
            'spark.openlineage.url': 'http://marquez:8080/api/v1/lineage',
            'spark.openlineage.namespace': 'production'
        }
    )
    
    extract >> transform >> load

6. Impact Analysis & Change Management

Schema Change Detection

class SchemaChangeDetector:
    def __init__(self, atlas_client):
        self.atlas = atlas_client
    
    def detect_changes(self, dataset_guid: str, new_schema: dict) -> list:
        """Detect schema changes and their impact"""
        current = self.atlas.get_entity(dataset_guid)
        changes = []
        
        current_columns = {c['name']: c for c in current['attributes']['columns']}
        new_columns = {c['name']: c for c in new_schema['columns']}
        
        # Detect added columns
        added = set(new_columns.keys()) - set(current_columns.keys())
        for col in added:
            changes.append({
                'type': 'COLUMN_ADDED',
                'column': col,
                'impact': 'LOW',
                'action': 'Safe to add'
            })
        
        # Detect removed columns
        removed = set(current_columns.keys()) - set(new_columns.keys())
        for col in removed:
            downstream = self.atlas.get_lineage(dataset_guid, direction='OUTPUT')
            impacted = [d for d in downstream if col in d.get('input_columns', [])]
            
            changes.append({
                'type': 'COLUMN_REMOVED',
                'column': col,
                'impact': 'HIGH' if impacted else 'MEDIUM',
                'action': f'Impacts {len(impacted)} downstream consumers'
            })
        
        # Detect type changes
        for col in set(current_columns.keys()) & set(new_columns.keys()):
            old_type = current_columns[col]['type']
            new_type = new_columns[col]['type']
            if old_type != new_type:
                changes.append({
                    'type': 'TYPE_CHANGED',
                    'column': col,
                    'from': old_type,
                    'to': new_type,
                    'impact': 'HIGH',
                    'action': 'May break downstream queries'
                })
        
        return changes

Compliance Lineage Queries

-- GDPR: Find all PII data lineage
WITH pii_lineage AS (
    SELECT 
        t.table_name,
        c.column_name,
        c.classification,
        l.source_table,
        l.source_column
    FROM metadata.tables t
    JOIN metadata.columns c ON t.table_id = c.table_id
    LEFT JOIN metadata.lineage l ON t.table_id = l.target_table_id
    WHERE c.classification IN ('PII', 'SENSITIVE', 'PHI')
)
SELECT 
    table_name,
    column_name,
    classification,
    STRING_AGG(DISTINCT source_table, ', ') AS upstream_sources
FROM pii_lineage
GROUP BY table_name, column_name, classification;

-- Track data retention compliance
SELECT 
    dataset_name,
    retention_policy,
    last_access_date,
    DATEDIFF('day', last_access_date, CURRENT_DATE()) AS days_since_access,
    CASE 
        WHEN DATEDIFF('day', last_access_date, CURRENT_DATE()) > 365 THEN 'ARCHIVE'
        WHEN DATEDIFF('day', last_access_date, CURRENT_DATE()) > 730 THEN 'DELETE'
        else 'KEEP'
    END AS recommended_action
FROM metadata.datasets
WHERE retention_policy IS NOT NULL;

7. Best Practices

ℹ️

Incremental Lineage Collection: Don't recompute lineage for entire pipelines. Use event-driven collection (OpenLineage) to capture changes incrementally.

⚠️

Lineage Accuracy: Manual lineage annotations are error-prone. Prefer automated extraction from SQL parsers and orchestration tools.

πŸ’‘

Column-Level Granularity: Table-level lineage is insufficient for impact analysis. Invest in column-level lineage for proper change management.

ℹ️

Lineage for Debugging: When a pipeline fails, use lineage to identify all affected downstream datasets and stakeholders.

Follow-up Questions

  1. How would you implement real-time lineage tracking for streaming pipelines?
  2. Explain the trade-offs between pull-based (polling) and push-based (event) lineage collection.
  3. How do you handle lineage for federated queries across multiple data sources?
  4. Design a lineage-based access control system for sensitive data.
  5. How would you implement lineage for machine learning feature pipelines?
  6. Explain how lineage helps with data quality root cause analysis.
  7. How do you handle lineage when using dynamic SQL or code-generated queries?
  8. Design a lineage system that can handle petabyte-scale data platforms.

Advertisement