Batch Pipeline Architecture: ADLS → ADF → Synapse
End-to-end batch data pipeline from raw ingestion through transformation to analytics-ready data
Batch Pipeline Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ END-TO-END BATCH PIPELINE ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ INGESTION STAGING TRANSFORMATION │
│ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ Source │───────>│ Raw Zone │──────>│ ADF Copy │ │
│ │ Systems │ │ (ADLS) │ │ Activity │ │
│ │ │ │ │ │ │ │
│ │ • SQL DB │ │ CSV/JSON │ │ Parquet │ │
│ │ • REST │ │ Raw │ │ Conversion │ │
│ │ • Files │ │ Format │ │ │ │
│ └──────────┘ └──────────┘ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Databricks │ │
│ │ Transform │ │
│ │ │ │
│ │ • Validate │ │
│ │ • Clean │ │
│ │ • Enrich │ │
│ └──────┬───────┘ │
│ │ │
│ LOADING SERVING MONITORING │
│ │ │
│ ┌────────▼────────┐ │
│ │ ADF Lookup │ │
│ │ Activity │ │
│ └────────┬────────┘ │
│ │ │
│ ┌──────────┐ ┌──────────┐ ┌───────▼───────┐ │
│ │ Synapse │<───────│ Curated │<──────│ ADF Copy │ │
│ │ SQL Pool │ │ Zone │ │ to Synapse │ │
│ │ │ │ (ADLS) │ │ │ │
│ │ Fact/Dim │ │ Delta │ │ Staging │ │
│ │ Tables │ │ Lake │ │ Tables │ │
│ └────┬─────┘ └──────────┘ └───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ Power BI │───────>│ Reports │ │ Azure Monitor│ │
│ │ Dataset │ │ │ │ + Alerts │ │
│ └──────────┘ └──────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
ADF Pipeline Implementation
{
"name": "pl_batch_ingestion_pipeline",
"properties": {
"activities": [
{
"name": "CopyFromSourceToRaw",
"type": "Copy",
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": {
"value": "SELECT * FROM Sales WHERE ModifiedDate >= '@{pipeline().parameters.lastLoadDate}'",
"type": "Expression"
}
},
"sink": {
"type": "ParquetSink",
"storeSettings": {
"type": "AzureDataLakeGen2WriteSettings"
}
}
},
"inputs": [
{ "name": "ds_sql_source" }
],
"outputs": [
{ "name": "ds_raw_sales" }
]
},
{
"name": "TransformWithDatabricks",
"type": "DatabricksNotebook",
"typeProperties": {
"notebookPath": "/Repos/etl/sales_transformation",
"baseParameters": {
"raw_path": "@pipeline().parameters.rawPath",
"curated_path": "@pipeline().parameters.curatedPath"
}
},
"dependsOn": [
{
"activity": "CopyFromSourceToRaw",
"dependencyConditions": ["Succeeded"]
}
]
},
{
"name": "LoadToSynapse",
"type": "SqlPoolStoredProcedure",
"typeProperties": {
"storedProcedureName": "sp_load_fact_sales",
"storedProcedureParameters": {
"@date": {
"value": "@pipeline().parameters.processDate",
"type": "String"
}
}
},
"dependsOn": [
{
"activity": "TransformWithDatabricks",
"dependencyConditions": ["Succeeded"]
}
]
}
],
"parameters": {
"processDate": {
"type": "String",
"defaultValue": "@formatDateTime(utcNow(), 'yyyy-MM-dd')"
},
"lastLoadDate": {
"type": "String",
"defaultValue": "2024-01-01"
}
}
}
}
Monitoring Dashboard
# ADF monitoring with Python SDK
from azure.identity import DefaultAzureCredential
from azure.mgmt.datafactory import DataFactoryManagementClient
credential = DefaultAzureCredential()
adf_client = DataFactoryManagementClient(credential, subscription_id)
# Get pipeline runs
pipeline_runs = adf_client.pipeline_runs.query_by_factory(
resource_group_name="rg-dataengineering-prod",
factory_name="adf-prod",
filter={
"lastUpdatedAfter": "2024-01-01T00:00:00Z",
"lastUpdatedBefore": "2024-01-31T23:59:59Z"
}
)
for run in pipeline_runs.value:
print(f"Run ID: {run.run_id}")
print(f"Pipeline: {run.pipeline_name}")
print(f"Status: {run.status}")
print(f"Run Start: {run.run_start}")
print(f"Run End: {run.run_end}")
# Get activity runs for a pipeline run
activity_runs = adf_client.activity_runs.query_by_pipeline_run(
resource_group_name="rg-dataengineering-prod",
factory_name="adf-prod",
pipeline_run_id="pipeline-run-id",
filter={
"lastUpdatedAfter": "2024-01-01T00:00:00Z"
}
)
ℹ️
Pro Tip: Implement a watermark pattern for incremental loading. Store the last successful load timestamp in a control table and use it as a parameter in subsequent runs.
Interview Questions
Q1: Explain the difference between full load and incremental load in batch pipelines. A: Full load replaces entire datasets (simple but expensive). Incremental load processes only changes (complex but efficient). Use watermarks, CDC, or timestamps for incremental patterns.
Q2: How do you optimize ADF Copy Activity performance? A: 1) Use parallel copying (degree of parallelism), 2) Optimize partition splitting, 3) Use appropriate file formats (Parquet), 4) Size files appropriately (256MB-1GB), 5) Use staged copy for large transfers.
Q3: What are the best practices for ADF pipeline parameterization? A: 1) Use parameters for dates, paths, and configurations, 2) Store connection strings in Key Vault, 3) Use linked services for reusability, 4) Implement dynamic content expressions, 5) Use ARM templates for environment promotion.