🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Batch Pipeline Architecture: ADLS → ADF → Synapse

Azure Data EngineeringBatch Pipeline⭐ Premium

Advertisement

Batch Pipeline Architecture: ADLS → ADF → Synapse

End-to-end batch data pipeline from raw ingestion through transformation to analytics-ready data

Batch Pipeline Architecture

Architecture Diagram
┌─────────────────────────────────────────────────────────────────────┐
│                    END-TO-END BATCH PIPELINE ARCHITECTURE            │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  INGESTION             STAGING            TRANSFORMATION            │
│  ┌──────────┐        ┌──────────┐       ┌──────────────┐          │
│  │ Source   │───────>│ Raw Zone │──────>│ ADF Copy     │          │
│  │ Systems │        │ (ADLS)   │       │ Activity     │          │
│  │          │        │          │       │              │          │
│  │ • SQL DB │        │ CSV/JSON │       │ Parquet      │          │
│  │ • REST   │        │ Raw      │       │ Conversion   │          │
│  │ • Files  │        │ Format   │       │              │          │
│  └──────────┘        └──────────┘       └──────┬───────┘          │
│                                                  │                   │
│                                                  ▼                   │
│                                         ┌──────────────┐            │
│                                         │ Databricks   │            │
│                                         │ Transform    │            │
│                                         │              │            │
│                                         │ • Validate   │            │
│                                         │ • Clean      │            │
│                                         │ • Enrich     │            │
│                                         └──────┬───────┘            │
│                                                  │                   │
│  LOADING               SERVING              MONITORING              │
│                                                  │                   │
│                                         ┌────────▼────────┐        │
│                                         │ ADF Lookup      │        │
│                                         │ Activity        │        │
│                                         └────────┬────────┘        │
│                                                  │                   │
│  ┌──────────┐        ┌──────────┐       ┌───────▼───────┐         │
│  │ Synapse  │<───────│ Curated  │<──────│ ADF Copy      │         │
│  │ SQL Pool │        │ Zone     │       │ to Synapse    │         │
│  │          │        │ (ADLS)   │       │               │         │
│  │ Fact/Dim │        │ Delta    │       │ Staging       │         │
│  │ Tables   │        │ Lake     │       │ Tables        │         │
│  └────┬─────┘        └──────────┘       └───────────────┘         │
│       │                                                             │
│       ▼                                                             │
│  ┌──────────┐        ┌──────────┐       ┌──────────────┐          │
│  │ Power BI │───────>│ Reports  │       │ Azure Monitor│          │
│  │ Dataset  │        │          │       │ + Alerts     │          │
│  └──────────┘        └──────────┘       └──────────────┘          │
└─────────────────────────────────────────────────────────────────────┘

ADF Pipeline Implementation

{
  "name": "pl_batch_ingestion_pipeline",
  "properties": {
    "activities": [
      {
        "name": "CopyFromSourceToRaw",
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "AzureSqlSource",
            "sqlReaderQuery": {
              "value": "SELECT * FROM Sales WHERE ModifiedDate >= '@{pipeline().parameters.lastLoadDate}'",
              "type": "Expression"
            }
          },
          "sink": {
            "type": "ParquetSink",
            "storeSettings": {
              "type": "AzureDataLakeGen2WriteSettings"
            }
          }
        },
        "inputs": [
          { "name": "ds_sql_source" }
        ],
        "outputs": [
          { "name": "ds_raw_sales" }
        ]
      },
      {
        "name": "TransformWithDatabricks",
        "type": "DatabricksNotebook",
        "typeProperties": {
          "notebookPath": "/Repos/etl/sales_transformation",
          "baseParameters": {
            "raw_path": "@pipeline().parameters.rawPath",
            "curated_path": "@pipeline().parameters.curatedPath"
          }
        },
        "dependsOn": [
          {
            "activity": "CopyFromSourceToRaw",
            "dependencyConditions": ["Succeeded"]
          }
        ]
      },
      {
        "name": "LoadToSynapse",
        "type": "SqlPoolStoredProcedure",
        "typeProperties": {
          "storedProcedureName": "sp_load_fact_sales",
          "storedProcedureParameters": {
            "@date": {
              "value": "@pipeline().parameters.processDate",
              "type": "String"
            }
          }
        },
        "dependsOn": [
          {
            "activity": "TransformWithDatabricks",
            "dependencyConditions": ["Succeeded"]
          }
        ]
      }
    ],
    "parameters": {
      "processDate": {
        "type": "String",
        "defaultValue": "@formatDateTime(utcNow(), 'yyyy-MM-dd')"
      },
      "lastLoadDate": {
        "type": "String",
        "defaultValue": "2024-01-01"
      }
    }
  }
}

Monitoring Dashboard

# ADF monitoring with Python SDK
from azure.identity import DefaultAzureCredential
from azure.mgmt.datafactory import DataFactoryManagementClient

credential = DefaultAzureCredential()
adf_client = DataFactoryManagementClient(credential, subscription_id)

# Get pipeline runs
pipeline_runs = adf_client.pipeline_runs.query_by_factory(
    resource_group_name="rg-dataengineering-prod",
    factory_name="adf-prod",
    filter={
        "lastUpdatedAfter": "2024-01-01T00:00:00Z",
        "lastUpdatedBefore": "2024-01-31T23:59:59Z"
    }
)

for run in pipeline_runs.value:
    print(f"Run ID: {run.run_id}")
    print(f"Pipeline: {run.pipeline_name}")
    print(f"Status: {run.status}")
    print(f"Run Start: {run.run_start}")
    print(f"Run End: {run.run_end}")

# Get activity runs for a pipeline run
activity_runs = adf_client.activity_runs.query_by_pipeline_run(
    resource_group_name="rg-dataengineering-prod",
    factory_name="adf-prod",
    pipeline_run_id="pipeline-run-id",
    filter={
        "lastUpdatedAfter": "2024-01-01T00:00:00Z"
    }
)

ℹ️

Pro Tip: Implement a watermark pattern for incremental loading. Store the last successful load timestamp in a control table and use it as a parameter in subsequent runs.

Interview Questions

Q1: Explain the difference between full load and incremental load in batch pipelines. A: Full load replaces entire datasets (simple but expensive). Incremental load processes only changes (complex but efficient). Use watermarks, CDC, or timestamps for incremental patterns.

Q2: How do you optimize ADF Copy Activity performance? A: 1) Use parallel copying (degree of parallelism), 2) Optimize partition splitting, 3) Use appropriate file formats (Parquet), 4) Size files appropriately (256MB-1GB), 5) Use staged copy for large transfers.

Q3: What are the best practices for ADF pipeline parameterization? A: 1) Use parameters for dates, paths, and configurations, 2) Store connection strings in Key Vault, 3) Use linked services for reusability, 4) Implement dynamic content expressions, 5) Use ARM templates for environment promotion.

Advertisement