πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Azure Functions: Triggers, Durable Functions & Flex Consumption

Azure Data EngineeringAzure Functions⭐ Premium

Advertisement

Azure Functions: Triggers, Durable Functions & Flex Consumption

Event-driven serverless computing for data engineering pipelines and automation

Azure Functions Architecture

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    AZURE FUNCTIONS ARCHITECTURE                      β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                     β”‚
β”‚  TRIGGERS                BINDINGS              OUTPUTS               β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚ Blob Trigger  │──────>β”‚ Input Bind   │─────>β”‚ Blob Output  β”‚    β”‚
β”‚  β”‚ Queue Trigger β”‚       β”‚ HTTP Bind    β”‚      β”‚ Queue Output β”‚    β”‚
β”‚  β”‚ Event Hub    β”‚       β”‚ Cosmos Bind  β”‚      β”‚ Event Hub Outβ”‚    β”‚
β”‚  β”‚ Timer Triggerβ”‚       β”‚ SQL Bind     β”‚      β”‚ HTTP Output  β”‚    β”‚
β”‚  β”‚ HTTP Trigger β”‚       β”‚ Table Bind   β”‚      β”‚ SignalR Out  β”‚    β”‚
β”‚  β”‚ Service Bus  β”‚       β”‚              β”‚      β”‚              β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚                                                                     β”‚
β”‚  HOST PLANS                                                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚
β”‚  β”‚ Consumption  β”‚  β”‚ Premium      β”‚  β”‚ Flex         β”‚            β”‚
β”‚  β”‚ (Y1)         β”‚  β”‚ (EP1-EP3)    β”‚  β”‚ Consumption  β”‚            β”‚
β”‚  β”‚              β”‚  β”‚              β”‚  β”‚              β”‚            β”‚
β”‚  β”‚ Scale: 0-200 β”‚  β”‚ Scale: 1-100 β”‚  β”‚ Scale: 0-30  β”‚            β”‚
β”‚  β”‚ Timeout: 5m  β”‚  β”‚ Timeout: 60m β”‚  β”‚ Timeout: 60m β”‚            β”‚
β”‚  β”‚ Cold start   β”‚  β”‚ Pre-warmed   β”‚  β”‚ Quick scale  β”‚            β”‚
β”‚  β”‚ Pay per exec β”‚  β”‚ Always ready β”‚  β”‚ Pay per exec β”‚            β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β”‚
β”‚                                                                     β”‚
β”‚  DURABLE FUNCTIONS                                                  β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚  β”‚  Orchestrator Function                                     β”‚     β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”‚     β”‚
β”‚  β”‚  β”‚ Activity β”‚  β”‚ Activity β”‚  β”‚ Activity β”‚              β”‚     β”‚
β”‚  β”‚  β”‚ Function β”‚  β”‚ Function β”‚  β”‚ Function β”‚              β”‚     β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β”‚     β”‚
β”‚  β”‚                                                           β”‚     β”‚
β”‚  β”‚  Patterns: Function Chaining, Fan-out/Fan-in,             β”‚     β”‚
β”‚  β”‚            Async HTTP, Monitor, Human Interaction         β”‚     β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Durable Functions for Data Pipelines

# durable_functions_orchestrator.py
import azure.functions as func
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    # Step 1: Extract data from source
    raw_data = yield context.call_activity(
        'extract_data',
        {'source': 'sales_api', 'date': context.current_utc_date}
    )
    
    # Step 2: Validate data quality
    validation_result = yield context.call_activity(
        'validate_data',
        {'data': raw_data, 'rules': 'sales_validation_rules'}
    )
    
    if not validation_result['is_valid']:
        # Step 2a: Handle validation failure
        yield context.call_activity(
            'send_alert',
            {'message': f"Data validation failed: {validation_result['errors']}"}
        )
        return {'status': 'failed', 'errors': validation_result['errors']}
    
    # Step 3: Transform data (parallel processing)
    transformed_chunks = yield context.task_all([
        context.call_activity('transform_chunk', {'chunk': chunk})
        for chunk in raw_data['chunks']
    ])
    
    # Step 4: Load to data lake
    load_result = yield context.call_activity(
        'load_to_lake',
        {'data': transformed_chunks, 'zone': 'curated'}
    )
    
    # Step 5: Notify downstream consumers
    yield context.call_activity(
        'notify_consumers',
        {'dataset': 'sales', 'partition': load_result['partition']}
    )
    
    return {'status': 'completed', 'records_processed': load_result['count']}

# Activity functions
def extract_data(input_data):
    import requests
    response = requests.get(f"https://api.example.com/{input_data['source']}")
    return {'records': response.json(), 'chunk_size': 10000}

def validate_data(input_data):
    errors = []
    for record in input_data['data']['records']:
        if not record.get('amount') or record['amount'] < 0:
            errors.append(f"Invalid amount for record {record['id']}")
    return {'is_valid': len(errors) == 0, 'errors': errors}

def transform_chunk(input_data):
    # Apply transformations
    transformed = []
    for record in input_data['chunk']:
        transformed.append({
            'id': record['id'],
            'amount_usd': record['amount'] * record['exchange_rate'],
            'processed_date': context.current_utc_date
        })
    return transformed

Blob Trigger for Data Ingestion

# blob_trigger_ingestion.py
import azure.functions as func
import json
import logging
from azure.storage.filedatalake import DataLakeServiceClient
from azure.identity import DefaultAzureCredential

app = func.FunctionApp()

@app.blob_trigger(
    arg_name="myblob",
    path="raw/{folder}/{name}",
    connection="AzureWebJobsStorage"
)
def blob_trigger_ingestion(myblob: func.InputStream):
    logging.info(f"Processing blob: {myblob.name}, Size: {myblob.length}")
    
    # Parse the blob path
    path_parts = myblob.name.split('/')
    date_partition = path_parts[1]  # e.g., "2024/01/15"
    
    # Read blob content
    data = json.loads(myblob.read().decode('utf-8'))
    
    # Process and write to curated zone
    credential = DefaultAzureCredential()
    datalake = DataLakeServiceClient(
        account_url="https://stdatalake001.dfs.core.windows.net",
        credential=credential
    )
    
    # Write processed data
    file_client = datalake.get_file_client(
        "curated",
        f"sales/{date_partition}/processed.json"
    )
    
    processed_data = process_records(data)
    file_client.upload_data(
        json.dumps(processed_data),
        overwrite=True
    )
    
    logging.info(f"Processed {len(processed_data)} records")

def process_records(data):
    return [
        {
            'id': r['id'],
            'amount': float(r['amount']),
            'category': r.get('category', 'unknown'),
            'processed': True
        }
        for r in data['records']
        if r.get('amount') and float(r['amount']) > 0
    ]

ℹ️

Pro Tip: Use Flex Consumption plan for data engineering Functionsβ€”it provides quick scaling (1-30 instances) without cold start delays, and you only pay for execution time.

Flex Consumption Plan Pricing

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    FLEX CONSUMPTION PRICING                      β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  EXECUTION COST:                                                β”‚
β”‚  β€’ $0.000016 per GB-s (memory Γ— execution time)                β”‚
β”‚  β€’ First 100,000 GB-s/month free                               β”‚
β”‚                                                                 β”‚
β”‚  NETWORKING COST:                                               β”‚
β”‚  β€’ $0.000005 per invocation for VNet connectivity              β”‚
β”‚  β€’ Free for non-VNet workloads                                 β”‚
β”‚                                                                 β”‚
β”‚  EXAMPLE SCENARIOS:                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ Scenario: Process 1M blobs/day                          β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚ Average execution: 500ms, 256MB memory                   β”‚   β”‚
β”‚  β”‚ Daily executions: 1,000,000                              β”‚   β”‚
β”‚  β”‚ Daily GB-s: 1,000,000 Γ— 0.5s Γ— 0.25GB = 125,000 GB-s   β”‚   β”‚
β”‚  β”‚ Monthly GB-s: 125,000 Γ— 30 = 3,750,000 GB-s             β”‚   β”‚
β”‚  β”‚                                                          β”‚   β”‚
β”‚  β”‚ Cost: 3,750,000 Γ— $0.000016 = $60.00/month              β”‚   β”‚
β”‚  β”‚ (Minus 100,000 GB-s free = $58.40/month)                β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                                                                 β”‚
β”‚  COMPARISON:                                                    β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ Consumption (Y1):  ~$60/month                           β”‚   β”‚
β”‚  β”‚ Flex Consumption:  ~$58/month                           β”‚   β”‚
β”‚  β”‚ Premium (EP1):     ~$160/month (always ready)           β”‚   β”‚
β”‚  β”‚ Dedicated (B1):    ~$54/month (always running)          β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Timer Trigger for Scheduled ETL

# timer_trigger_scheduled.py
import azure.functions as func
import logging
import datetime
import json
from azure.identity import DefaultAzureCredential
from azure.synapse.artifacts import ArtifactsClient

app = func.FunctionApp()

@app.timer_trigger(
    arg_name="myTimer",
    schedule="0 0 2 * * *",  # Daily at 2 AM
    run_on_startup=False,
    use_monitor=False
)
def scheduled_etl(myTimer: func.TimerRequest):
    utc_timestamp = datetime.datetime.utcnow().isoformat()
    
    if myTimer.past_due:
        logging.info("The timer is past due!")
    
    logging.info("Starting daily ETL pipeline at %s", utc_timestamp)
    
    # Trigger ADF pipeline
    credential = DefaultAzureCredential()
    artifacts_client = ArtifactsClient(
        credential=credential,
        endpoint="https://syn-workspace.dev.azuresynapse.net"
    )
    
    # Start pipeline run
    pipeline_run = artifacts_client.pipeline.create_pipeline_run(
        pipeline_name="daily_sales_etl",
        parameters={
            "date": datetime.date.today().isoformat(),
            "source": "sales_api",
            "target": "curated/sales"
        }
    )
    
    logging.info(f"Pipeline run started: {pipeline_run.run_id}")
    
    # Wait for completion (simplified - in production, use webhook or polling)
    import time
    while True:
        run_status = artifacts_client.pipeline_run.get_pipeline_run(
            pipeline_run.run_id
        )
        if run_status.status in ['Succeeded', 'Failed', 'Cancelled']:
            break
        time.sleep(30)
    
    logging.info(f"Pipeline completed with status: {run_status.status}")

HTTP Trigger for Data API

# http_trigger_api.py
import azure.functions as func
import json
import logging
from azure.identity import DefaultAzureCredential
from azure.cosmos import CosmosClient

app = func.FunctionApp()

@app.route(route="data/{dataset}", auth_level=func.AuthLevel.FUNCTION)
def data_api(req: func.HttpRequest) -> func.HttpResponse:
    dataset = req.params.get('dataset')
    start_date = req.params.get('start_date')
    end_date = req.params.get('end_date')
    
    if not dataset:
        return func.HttpResponse(
            "Please provide a dataset name",
            status_code=400
        )
    
    # Query Cosmos DB
    credential = DefaultAzureCredential()
    client = CosmosClient(
        "https://cosmos-prod.documents.azure.com:443/",
        credential
    )
    
    database = client.get_database_client("analytics")
    container = database.get_container_client(dataset)
    
    query = f"""
        SELECT * FROM c 
        WHERE c.date >= '{start_date}' 
        AND c.date <= '{end_date}'
    """
    
    items = list(container.query_items(
        query=query,
        enable_cross_partition_query=True
    ))
    
    return func.HttpResponse(
        json.dumps({"records": items, "count": len(items)}),
        status_code=200,
        mimetype="application/json"
    )

Interview Questions

Q1: When would you use Azure Functions vs Azure Data Factory for data engineering? A: Azure Functions for event-driven, lightweight transformations (blob arrival, API calls). ADF for complex orchestration with multiple activities, monitoring, and scheduling. Functions are better for real-time; ADF for batch ETL.

Q2: Explain the Durable Functions fan-out/fan-in pattern for parallel data processing. A: Fan-out distributes work across multiple activity functions in parallel (e.g., process chunks of data). Fan-in waits for all activities to complete using task_all(), then aggregates results. This pattern maximizes throughput for large datasets.

Q3: What are the cost implications of using Flex Consumption vs Consumption plan? A: Flex Consumption provides better performance (faster scaling, minimal cold starts) at similar cost. It's ideal for data engineering workloads requiring consistent performance without the overhead of always-ready Premium plan instances.

Advertisement