πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Data Governance & Catalog: Managing Data at Scale

Module 4: Advanced DE & CareerAdvanced Data Engineering🟒 Free Lesson

Advertisement

Data Governance: Trust, discoverability, and compliance at scale

Data governance is the collection of processes, policies, and standards that ensure data is managed as a strategic asset.

Why Data Governance Matters


Problems Without Governance:

  • Duplicated datasets with conflicting definitions
  • Unknown data lineage making compliance impossible
  • Stale data eroding trust
  • Security breaches from ungoverned access

Key Insight: Data governance ensures data is trustworthy, discoverable, secure, and compliant.


Architecture Overview

Data Governance FrameworkData CatalogMetadata StoreSearch {'&'} DiscoveryLineage TrackingQuality MonitoringData ProfilingGlossary TermsGovernanceRBAC / ABAC PoliciesData ClassificationRetention PoliciesAccess ReviewsQuality RulesAudit LoggingPolicy EngineNaming ConventionsSchema StandardsSLA RequirementsGDPR / CCPA RulesRetention EnforcementClassification Labels

The Data Governance architecture has three interconnected pillars:

Data Catalog β€” Central repository for metadata store, search & discovery, lineage tracking, and quality monitoring.

Governance Components β€” Manages metadata (technical, business, operational), access control (RBAC, ABAC, masking), quality management (rules, alerts, scoring), and lineage tracking (column, table, pipeline level).

Policy Engine β€” Enforces naming conventions, schema standards, SLA requirements, compliance (GDPR, CCPA), retention policies, and classification rules.


Metadata Management

Metadata is data about data. It includes technical metadata (schema, types, size), operational metadata (freshness, quality, lineage), and business metadata (definitions, owners, glossary terms).

Metadata TypeContentConsumerUpdate Frequency
TechnicalSchema, types, partitionsEngineersOn schema change
OperationalFreshness, quality, lineageEngineersReal-time
BusinessDefinitions, owners, glossaryAnalystsWeekly
SocialRatings, reviews, usageEveryoneContinuous
AdministrativeAccess logs, cost, retentionGovernanceDaily
# Metadata Collection Service
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional
import hashlib
import json

@dataclass
class TableMetadata:
    """Comprehensive metadata for a data asset."""
    table_id: str
    database: str
    schema_name: str
    table_name: str
    description: str = ""
    owner_team: str = ""
    owner_email: str = ""
    domain: str = ""
    classification: str = "internal"  # public, internal, confidential, restricted
    tags: List[str] = field(default_factory=list)
    created_at: datetime = field(default_factory=datetime.now)
    updated_at: datetime = field(default_factory=datetime.now)
    row_count: int = 0
    size_bytes: int = 0
    last_updated: Optional[datetime] = None
    freshness_sla_hours: int = 24
    quality_score: float = 0.0
    lineage_upstream: List[str] = field(default_factory=list)
    lineage_downstream: List[str] = field(default_factory=list)

    def to_dict(self) -> Dict:
        return {
            "table_id": self.table_id,
            "fqn": f"{self.database}.{self.schema_name}.{self.table_name}",
            "description": self.description,
            "owner": {"team": self.owner_team, "email": self.owner_email},
            "domain": self.domain,
            "classification": self.classification,
            "tags": self.tags,
            "statistics": {
                "row_count": self.row_count,
                "size_bytes": self.size_bytes,
                "last_updated": self.last_updated.isoformat() if self.last_updated else None
            },
            "quality": {"score": self.quality_score},
            "lineage": {
                "upstream": self.lineage_upstream,
                "downstream": self.lineage_downstream
            }
        }

    def compute_hash(self) -> str:
        """Compute hash for change detection."""
        content = json.dumps(self.to_dict(), sort_keys=True, default=str)
        return hashlib.sha256(content.encode()).hexdigest()

class MetadataCatalog:
    """Central metadata catalog for all data assets."""

    def __init__(self):
        self.assets: Dict[str, TableMetadata] = {}
        self.lineage_edges: List[Dict] = []

    def register_asset(self, metadata: TableMetadata):
        """Register a data asset in the catalog."""
        self.assets[metadata.table_id] = metadata
        self._update_lineage(metadata)

    def search(self, query: str, domain: str = None) -> List[TableMetadata]:
        """Search assets by description, tags, or name."""
        results = []
        for asset in self.assets.values():
            if query.lower() in asset.description.lower() or \
               query.lower() in asset.table_name.lower() or \
               query.lower() in [t.lower() for t in asset.tags]:
                if domain is None or asset.domain == domain:
                    results.append(asset)
        return results

    def get_lineage(self, table_id: str, direction: str = "both") -> Dict:
        """Get upstream and downstream lineage."""
        asset = self.assets.get(table_id)
        if not asset:
            return {"error": "Asset not found"}

        return {
            "table_id": table_id,
            "upstream": asset.lineage_upstream,
            "downstream": asset.lineage_downstream,
            "depth": self._calculate_lineage_depth(table_id)
        }

    def _update_lineage(self, metadata: TableMetadata):
        """Update lineage graph with new edges."""
        for upstream in metadata.lineage_upstream:
            self.lineage_edges.append({
                "source": upstream,
                "target": metadata.table_id,
                "type": "data_flow"
            })

    def _calculate_lineage_depth(self, table_id: str) -> int:
        """Calculate maximum lineage depth from source."""
        visited = set()
        stack = [(table_id, 0)]
        max_depth = 0

        while stack:
            current, depth = stack.pop()
            if current in visited:
                continue
            visited.add(current)
            max_depth = max(max_depth, depth)

            for edge in self.lineage_edges:
                if edge["target"] == current:
                    stack.append((edge["source"], depth + 1))

        return max_depth

# Usage
catalog = MetadataCatalog()

catalog.register_asset(TableMetadata(
    table_id="orders_001",
    database="analytics",
    schema_name="marts",
    table_name="fact_orders",
    description="Fact table containing all customer orders",
    owner_team="data-platform",
    owner_email="data-platform@company.com",
    domain="sales",
    classification="internal",
    tags=["finance", "daily", "production"],
    row_count=15000000,
    size_bytes=2_500_000_000,
    quality_score=0.998,
    lineage_upstream=["staging.stg_shopify_orders", "staging.stg_stripe_payments"],
    lineage_downstream=["mart.revenue_dashboard", "mart.customer_360"]
))

results = catalog.search("orders", domain="sales")

Data Lineage

Data Lineage FlowShopify APIorders.jsonSourceStripe APIpayments.jsonSourcestg_ordersView (rename, cast)stg_paymentsView (rename, cast)fact_ordersIncremental modelStar schema factRevenue DashboardLookerCustomer 360ML ModelData CatalogImpact analysisLineage graphSource {'\u2192'} Staging {'\u2192'} Mart {'\u2192'} Consumer {'\u2192'} Catalog | Column-level tracking

Data lineage tracks the flow of data from its origin through transformations to its final destination. It answers: where did this data come from, what transformations were applied, and who consumes it?

Lineage Complexity

  • Tables: T
  • Edges: E (average dependencies per table = E/T)
  • Lineage Depth: D = max path length from source to sink
  • Impact Analysis Cost: O(E Γ— D) for full downstream traversal
  • Metadata Storage: M = T Γ— (avg_metadata_size) + E Γ— (avg_edge_size)
-- Lineage query: Find all upstream sources
WITH RECURSIVE lineage_upstream AS (
    -- Base: target table
    SELECT
        target_table,
        source_table,
        1 AS depth
    FROM data_lineage
    WHERE target_table = 'mart.fact_orders'

    UNION ALL

    -- Recursive: follow upstream
    SELECT
        l.target_table,
        dl.source_table,
        l.depth + 1
    FROM lineage_upstream l
    JOIN data_lineage dl ON l.source_table = dl.target_table
    WHERE l.depth < 10  -- Prevent infinite loops
)

SELECT DISTINCT
    source_table,
    depth
FROM lineage_upstream
ORDER BY depth;

-- Lineage query: Impact analysis
WITH RECURSIVE impact AS (
    SELECT target_table, source_table, 1 AS depth
    FROM data_lineage
    WHERE source_table = 'staging.stg_orders'

    UNION ALL

    SELECT i.target_table, dl.target_table, i.depth + 1
    FROM impact i
    JOIN data_lineage dl ON i.target_table = dl.source_table
    WHERE i.depth < 10
)

SELECT DISTINCT target_table, depth
FROM impact
ORDER BY depth;

-- Column-level lineage
SELECT
    source_table,
    source_column,
    target_table,
    target_column,
    transformation_type
FROM column_lineage
WHERE target_table = 'mart.fact_orders'
ORDER BY target_column;

Data Catalog Tools Comparison

A data catalog is an organized inventory of data assets that provides metadata management, data discovery, data lineage, and governance capabilities. It serves as the central interface for users to find, understand, and trust data.

ToolTypeKey FeaturesCostBest For
DataHubOpen SourceMetadata, lineage, discoveryFreeSelf-hosted, custom
AmundsenOpen SourceSearch, discovery, lineageFreeLyft-style architecture
OpenMetadataOpen SourceMetadata, lineage, qualityFreeModern, all-in-one
AlationEnterpriseCollaboration, governance<MathBlock tex=\ /> <MathBlock tex=\ />Large enterprises
CollibraEnterpriseGovernance, cataloging<MathBlock tex=\ /> <MathBlock tex=\ />Compliance-heavy
AWS Glue CatalogManagedSchema, partitioningPay-per-useAWS-native
Databricks UnityManagedGovernance, lineageIncludedDatabricks users
AtlanCommercialModern UI, automation<MathBlock tex=\ /> $Data teams
# OpenMetadata catalog integration
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadata

# Connect to OpenMetadata
server_config = {
    "api_endpoint": "http://localhost:8585/api",
    "auth_provider": "no-auth"
}
metadata = OpenMetadata(server_config)

# Create table entity
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.entity.data.table import Column, DataType

table_request = CreateTableRequest(
    name="fact_orders",
    description="Fact table containing all customer orders",
    columns=[
        Column(name="order_key", dataType=DataType.BIGINT, description="Surrogate key"),
        Column(name="order_id", dataType=DataType.STRING, description="Natural key"),
        Column(name="customer_key", dataType=DataType.INT, description="FK to dim_customers"),
        Column(name="order_date", dataType=DataType.DATE, description="Order date"),
        Column(name="net_amount", dataType=DataType.DECIMAL, description="Net order amount"),
    ],
    databaseSchema="analytics.marts",
    tags=[{"tagFQN": "Finance"}, {"tagFQN": "Daily"}],
    owner={"type": "team", "name": "data-platform"}
)

# Search catalog
results = metadata.search("orders")
for result in results:
    print(f"{result.fully_qualified}: {result.description}")

Key Concepts Summary

ConceptDescriptionTool/ImplementationMetric
Metadata CatalogCentral repository for data metadataDataHub, Amundsen, OpenMetadataCatalog coverage
Data LineageTrack data flow and transformationsdbt, Apache Atlas, MarquezLineage completeness
Data QualityAutomated quality monitoringGreat Expectations, SodaQuality score
Access ControlRBAC/ABAC for data accessUnity Catalog, PurviewPolicy compliance
Data ClassificationSensitivity labelingManual + ML-assistedClassification coverage
Schema RegistrySchema versioning and evolutionConfluent, AWS GlueSchema compliance
Data Catalog SearchDiscoverable data assetsCustom UI + ElasticsearchFindability score
Policy EngineAutomated governance enforcementOPA, Custom rulesViolation count

Performance Metrics

MetricWithout GovernanceWith GovernanceTarget
Dataset Discovery TimeHours-DaysSeconds-Minutes< 1 min
Data Quality Score60-80%95-99%> 98%
Duplicate Datasets30-50%< 5%< 3%
Compliance ViolationsUnknownTracked0 critical
Lineage Coverage20-40%90-100%> 95%
Time to Trust New DataWeeksHours< 1 day
Security IncidentsReactiveProactive0 breaches
Data Literacy ScoreLowMedium-High> 80%

10 Best Practices

  1. Implement a data catalog from day one β€” retroactive cataloging is 10x harder
  2. Automate metadata collection β€” use hooks in dbt, Airflow, and ingestion tools
  3. Enforce data quality at ingestion β€” reject bad data before it enters the lake
  4. Track column-level lineage β€” understand impact of schema changes
  5. Implement data classification β€” label all datasets by sensitivity level
  6. Use a policy engine for automated governance enforcement
  7. Create a business glossary β€” define metrics consistently across the organization
  8. Implement data SLAs β€” define freshness, quality, and availability requirements
  9. Monitor governance compliance β€” track metrics and alert on violations
  10. Make governance self-serve β€” provide tools and templates for domain teams

  • Metadata catalogs enable data discovery, trust, and governance at scale
  • Column-level lineage tracks data flow from source to consumer for impact analysis
  • Automated quality monitoring catches issues before they reach consumers
  • RBAC/ABAC policies enforce access controls computationally
  • Governance must be self-serve for domain teams in decentralized architectures

See Also

⭐

Premium Content

Data Governance & Catalog: Managing Data at Scale

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement