Azure Functions: Triggers, Durable Functions & Flex Consumption
Event-driven serverless computing for data engineering pipelines and automation
Azure Functions Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β AZURE FUNCTIONS ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β TRIGGERS BINDINGS OUTPUTS β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Blob Trigger βββββββ>β Input Bind ββββββ>β Blob Output β β
β β Queue Trigger β β HTTP Bind β β Queue Output β β
β β Event Hub β β Cosmos Bind β β Event Hub Outβ β
β β Timer Triggerβ β SQL Bind β β HTTP Output β β
β β HTTP Trigger β β Table Bind β β SignalR Out β β
β β Service Bus β β β β β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β
β HOST PLANS β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Consumption β β Premium β β Flex β β
β β (Y1) β β (EP1-EP3) β β Consumption β β
β β β β β β β β
β β Scale: 0-200 β β Scale: 1-100 β β Scale: 0-30 β β
β β Timeout: 5m β β Timeout: 60m β β Timeout: 60m β β
β β Cold start β β Pre-warmed β β Quick scale β β
β β Pay per exec β β Always ready β β Pay per exec β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β
β DURABLE FUNCTIONS β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Orchestrator Function β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β Activity β β Activity β β Activity β β β
β β β Function β β Function β β Function β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β β
β β Patterns: Function Chaining, Fan-out/Fan-in, β β
β β Async HTTP, Monitor, Human Interaction β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Durable Functions for Data Pipelines
# durable_functions_orchestrator.py
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
# Step 1: Extract data from source
raw_data = yield context.call_activity(
'extract_data',
{'source': 'sales_api', 'date': context.current_utc_date}
)
# Step 2: Validate data quality
validation_result = yield context.call_activity(
'validate_data',
{'data': raw_data, 'rules': 'sales_validation_rules'}
)
if not validation_result['is_valid']:
# Step 2a: Handle validation failure
yield context.call_activity(
'send_alert',
{'message': f"Data validation failed: {validation_result['errors']}"}
)
return {'status': 'failed', 'errors': validation_result['errors']}
# Step 3: Transform data (parallel processing)
transformed_chunks = yield context.task_all([
context.call_activity('transform_chunk', {'chunk': chunk})
for chunk in raw_data['chunks']
])
# Step 4: Load to data lake
load_result = yield context.call_activity(
'load_to_lake',
{'data': transformed_chunks, 'zone': 'curated'}
)
# Step 5: Notify downstream consumers
yield context.call_activity(
'notify_consumers',
{'dataset': 'sales', 'partition': load_result['partition']}
)
return {'status': 'completed', 'records_processed': load_result['count']}
# Activity functions
def extract_data(input_data):
import requests
response = requests.get(f"https://api.example.com/{input_data['source']}")
return {'records': response.json(), 'chunk_size': 10000}
def validate_data(input_data):
errors = []
for record in input_data['data']['records']:
if not record.get('amount') or record['amount'] < 0:
errors.append(f"Invalid amount for record {record['id']}")
return {'is_valid': len(errors) == 0, 'errors': errors}
def transform_chunk(input_data):
# Apply transformations
transformed = []
for record in input_data['chunk']:
transformed.append({
'id': record['id'],
'amount_usd': record['amount'] * record['exchange_rate'],
'processed_date': context.current_utc_date
})
return transformed
Blob Trigger for Data Ingestion
# blob_trigger_ingestion.py
import azure.functions as func
import json
import logging
from azure.storage.filedatalake import DataLakeServiceClient
from azure.identity import DefaultAzureCredential
app = func.FunctionApp()
@app.blob_trigger(
arg_name="myblob",
path="raw/{folder}/{name}",
connection="AzureWebJobsStorage"
)
def blob_trigger_ingestion(myblob: func.InputStream):
logging.info(f"Processing blob: {myblob.name}, Size: {myblob.length}")
# Parse the blob path
path_parts = myblob.name.split('/')
date_partition = path_parts[1] # e.g., "2024/01/15"
# Read blob content
data = json.loads(myblob.read().decode('utf-8'))
# Process and write to curated zone
credential = DefaultAzureCredential()
datalake = DataLakeServiceClient(
account_url="https://stdatalake001.dfs.core.windows.net",
credential=credential
)
# Write processed data
file_client = datalake.get_file_client(
"curated",
f"sales/{date_partition}/processed.json"
)
processed_data = process_records(data)
file_client.upload_data(
json.dumps(processed_data),
overwrite=True
)
logging.info(f"Processed {len(processed_data)} records")
def process_records(data):
return [
{
'id': r['id'],
'amount': float(r['amount']),
'category': r.get('category', 'unknown'),
'processed': True
}
for r in data['records']
if r.get('amount') and float(r['amount']) > 0
]
βΉοΈ
Pro Tip: Use Flex Consumption plan for data engineering Functionsβit provides quick scaling (1-30 instances) without cold start delays, and you only pay for execution time.
Flex Consumption Plan Pricing
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β FLEX CONSUMPTION PRICING β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β EXECUTION COST: β
β β’ $0.000016 per GB-s (memory Γ execution time) β
β β’ First 100,000 GB-s/month free β
β β
β NETWORKING COST: β
β β’ $0.000005 per invocation for VNet connectivity β
β β’ Free for non-VNet workloads β
β β
β EXAMPLE SCENARIOS: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Scenario: Process 1M blobs/day β β
β β β β
β β Average execution: 500ms, 256MB memory β β
β β Daily executions: 1,000,000 β β
β β Daily GB-s: 1,000,000 Γ 0.5s Γ 0.25GB = 125,000 GB-s β β
β β Monthly GB-s: 125,000 Γ 30 = 3,750,000 GB-s β β
β β β β
β β Cost: 3,750,000 Γ $0.000016 = $60.00/month β β
β β (Minus 100,000 GB-s free = $58.40/month) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β COMPARISON: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Consumption (Y1): ~$60/month β β
β β Flex Consumption: ~$58/month β β
β β Premium (EP1): ~$160/month (always ready) β β
β β Dedicated (B1): ~$54/month (always running) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Timer Trigger for Scheduled ETL
# timer_trigger_scheduled.py
import azure.functions as func
import logging
import datetime
import json
from azure.identity import DefaultAzureCredential
from azure.synapse.artifacts import ArtifactsClient
app = func.FunctionApp()
@app.timer_trigger(
arg_name="myTimer",
schedule="0 0 2 * * *", # Daily at 2 AM
run_on_startup=False,
use_monitor=False
)
def scheduled_etl(myTimer: func.TimerRequest):
utc_timestamp = datetime.datetime.utcnow().isoformat()
if myTimer.past_due:
logging.info("The timer is past due!")
logging.info("Starting daily ETL pipeline at %s", utc_timestamp)
# Trigger ADF pipeline
credential = DefaultAzureCredential()
artifacts_client = ArtifactsClient(
credential=credential,
endpoint="https://syn-workspace.dev.azuresynapse.net"
)
# Start pipeline run
pipeline_run = artifacts_client.pipeline.create_pipeline_run(
pipeline_name="daily_sales_etl",
parameters={
"date": datetime.date.today().isoformat(),
"source": "sales_api",
"target": "curated/sales"
}
)
logging.info(f"Pipeline run started: {pipeline_run.run_id}")
# Wait for completion (simplified - in production, use webhook or polling)
import time
while True:
run_status = artifacts_client.pipeline_run.get_pipeline_run(
pipeline_run.run_id
)
if run_status.status in ['Succeeded', 'Failed', 'Cancelled']:
break
time.sleep(30)
logging.info(f"Pipeline completed with status: {run_status.status}")
HTTP Trigger for Data API
# http_trigger_api.py
import azure.functions as func
import json
import logging
from azure.identity import DefaultAzureCredential
from azure.cosmos import CosmosClient
app = func.FunctionApp()
@app.route(route="data/{dataset}", auth_level=func.AuthLevel.FUNCTION)
def data_api(req: func.HttpRequest) -> func.HttpResponse:
dataset = req.params.get('dataset')
start_date = req.params.get('start_date')
end_date = req.params.get('end_date')
if not dataset:
return func.HttpResponse(
"Please provide a dataset name",
status_code=400
)
# Query Cosmos DB
credential = DefaultAzureCredential()
client = CosmosClient(
"https://cosmos-prod.documents.azure.com:443/",
credential
)
database = client.get_database_client("analytics")
container = database.get_container_client(dataset)
query = f"""
SELECT * FROM c
WHERE c.date >= '{start_date}'
AND c.date <= '{end_date}'
"""
items = list(container.query_items(
query=query,
enable_cross_partition_query=True
))
return func.HttpResponse(
json.dumps({"records": items, "count": len(items)}),
status_code=200,
mimetype="application/json"
)
Interview Questions
Q1: When would you use Azure Functions vs Azure Data Factory for data engineering? A: Azure Functions for event-driven, lightweight transformations (blob arrival, API calls). ADF for complex orchestration with multiple activities, monitoring, and scheduling. Functions are better for real-time; ADF for batch ETL.
Q2: Explain the Durable Functions fan-out/fan-in pattern for parallel data processing. A: Fan-out distributes work across multiple activity functions in parallel (e.g., process chunks of data). Fan-in waits for all activities to complete using task_all(), then aggregates results. This pattern maximizes throughput for large datasets.
Q3: What are the cost implications of using Flex Consumption vs Consumption plan? A: Flex Consumption provides better performance (faster scaling, minimal cold starts) at similar cost. It's ideal for data engineering workloads requiring consistent performance without the overhead of always-ready Premium plan instances.