Google Cloud Dataflow for Batch and Stream Processing
Apache Beam Programming Model
Apache Beam provides a unified programming model for batch and stream processing. Dataflow is Google's managed service for running Beam pipelines.
Core Concepts
PCollection: Immutable, distributed dataset PTransform: Operation that processes PCollections Pipeline: Series of PTransforms applied to PCollections Runner: Execution engine (Dataflow, DirectRunner, SparkRunner)
Pipeline Structure
Basic Pipeline Examples
Batch Processing Pipeline
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def parse_csv(line):
"""Parse CSV line into dictionary."""
fields = line.split(',')
return {
'id': fields[0],
'name': fields[1],
'amount': float(fields[2]),
'timestamp': fields[3]
}
def format_for_bigquery(element):
"""Format element for BigQuery insertion."""
return {
'id': element['id'],
'name': element['name'],
'amount': element['amount'],
'processed_at': element['timestamp']
}
def run_batch_pipeline():
options = PipelineOptions([
'--project', 'my-project',
'--region', 'us-central1',
'--runner', 'DataflowRunner',
'--temp_location', 'gs://my-bucket/temp',
'--staging_location', 'gs://my-bucket/staging',
'--num_workers', '4',
'--max_num_workers', '8',
'--autoscaling_algorithm', 'THROUGHPUT_BASED'
])
with beam.Pipeline(options=options) as pipeline:
(
pipeline
| 'Read CSV' >> beam.io.ReadFromText('gs://my-bucket/input/*.csv')
| 'Parse CSV' >> beam.Map(parse_csv)
| 'Filter Valid' >> beam.Filter(lambda x: x['amount'] > 0)
| 'Format for BQ' >> beam.Map(format_for_bigquery)
| 'Write to BQ' >> beam.io.WriteToBigQuery(
'my-project:analytics.processed_data',
schema='id:STRING,name:STRING,amount:FLOAT64,processed_at:TIMESTAMP',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER
)
)
if __name__ == '__main__':
run_batch_pipeline()
Stream Processing Pipeline
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows, Sessions
from apache_beam.transforms.trigger import (
AfterWatermark, AfterProcessingTime, AccumulationMode
)
from apache_beam.options.pipeline_options import StandardOptions
def parse_pubsub_message(message):
"""Parse Pub/Sub message."""
import json
data = json.loads(message.data.decode('utf-8'))
return {
'user_id': data['user_id'],
'action': data['action'],
'amount': data.get('amount', 0),
'timestamp': data['timestamp']
}
def run_streaming_pipeline():
options = PipelineOptions([
'--project', 'my-project',
'--region', 'us-central1',
'--runner', 'DataflowRunner',
'--temp_location', 'gs://my-bucket/temp',
'--streaming'
])
options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=options) as pipeline:
(
pipeline
| 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(
topic='projects/my-project/topics/user-events'
)
| 'Parse Messages' >> beam.Map(parse_pubsub_message)
| 'Window into 1-minute' >> beam.WindowInto(
FixedWindows(60),
trigger=AfterWatermark(
early=AfterProcessingTime(10)
),
accumulation_mode=AccumulationMode.DISCARDING
)
| 'Aggregate by User' >> beam.CombinePerKey(sum)
| 'Format Results' >> beam.Map(lambda x: {
'user_id': x[0],
'total_amount': x[1],
'window_start': str(x[2])
})
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
'my-project:analytics.user_totals',
schema='user_id:STRING,total_amount:FLOAT64,window_start:TIMESTAMP',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)
Windowing Strategies
Windowing divides streaming data into finite groups for processing.
Fixed Windows
from apache_beam.transforms.window import FixedWindows
# Fixed 5-minute windows
(
pipeline
| 'Read' >> beam.io.ReadFromPubSub(topic='topic')
| 'Window' >> beam.WindowInto(FixedWindows(300)) # 5 minutes
| 'Process' >> beam.CombinePerKey(sum)
)
Sliding Windows
from apache_beam.transforms.window import SlidingWindows
# Sliding windows: 10-minute window, advance every 1 minute
(
pipeline
| 'Read' >> beam.io.ReadFromPubSub(topic='topic')
| 'Window' >> beam.WindowInto(SlidingWindows(600, 60)) # 10min window, 1min advance
| 'Process' >> beam.CombinePerKey(sum)
)
Session Windows
from apache_beam.transforms.window import Sessions
# Session windows: gap duration of 5 minutes
(
pipeline
| 'Read' >> beam.io.ReadFromPubSub(topic='topic')
| 'Window' >> beam.WindowInto(Sessions(300)) # 5-minute gap
| 'Process' >> beam.CombinePerKey(sum)
)
Global Windows
from apache_beam.transforms.window import GlobalWindows
# Global window with custom trigger
(
pipeline
| 'Read' >> beam.io.ReadFromPubSub(topic='topic')
| 'Window' >> beam.WindowInto(
GlobalWindows(),
trigger=AfterProcessingTime(60), # Trigger every minute
accumulation_mode=AccumulationMode.ACCUMULATING
)
| 'Process' >> beam.CombinePerKey(sum)
)
Triggers
Triggers determine when to emit results from windows.
Common Trigger Patterns
from apache_beam.transforms.trigger import (
AfterWatermark, AfterProcessingTime, AfterCount,
AccumulationMode, Repeatedly, AfterAny
)
# Early results: emit every 30 seconds
# On-time: emit when watermark passes window end
# Late results: emit up to 5 minutes late
trigger = AfterWatermark(
early=AfterProcessingTime(30),
late=AfterProcessingTime(300)
)
# Count-based trigger: emit every 1000 elements
trigger = AfterCount(1000)
# Composite trigger: emit on count OR time
trigger = AfterAny(AfterCount(1000), AfterProcessingTime(60))
# Repeated trigger: repeat the inner trigger
trigger = Repeatedly(AfterCount(100))
Accumulation Modes
from apache_beam.transforms.trigger import AccumulationMode
# Discard accumulating state (default)
# Each trigger fires with only new data since last trigger
accumulation_mode = AccumulationMode.DISCARDING
# Accumulate state
# Each trigger fires with all data since window opened
accumulation_mode = AccumulationMode.ACCUMULATING
Watermarks
Watermarks track progress through event time. They determine when windows are considered complete.
Watermark Concepts
Event Time: When the event actually occurred
Processing Time: When the event is processed
Watermark: Estimate of how far behind real-time we are
Lateness: How late an event arrives after the watermark
Watermark Management
# Watermark tracking in pipeline
(
pipeline
| 'Read' >> beam.io.ReadFromPubSub(topic='topic')
| 'Add Timestamps' >> beam.Map(
lambda x: beam.window.TimestampedValue(x, x['timestamp'])
)
| 'Window' >> beam.WindowInto(
FixedWindows(60),
trigger=AfterWatermark(
early=AfterProcessingTime(10),
late=AfterProcessingTime(300)
),
allowed_lateness=300, # 5 minutes
accumulation_mode=AccumulationMode.DISCARDING
)
)
Handling Late Data
# Late data handling strategy
def handle_late_data(element):
"""Process late-arriving data."""
user_id, data = element
return {
'user_id': user_id,
'total': sum(data['amounts']),
'is_late': data['is_late'],
'window_start': data['window_start']
}
# Pipeline with late data handling
(
pipeline
| 'Read' >> beam.io.ReadFromPubSub(topic='topic')
| 'Window' >> beam.WindowInto(
FixedWindows(60),
trigger=AfterWatermark(
early=AfterProcessingTime(10),
late=AfterProcessingTime(300)
),
allowed_lateness=300,
accumulation_mode=AccumulationMode.ACCUMULATING
)
| 'Combine' >> beam.CombinePerKey(sum)
| 'Handle Late' >> beam.Map(handle_late_data)
)
Autoscaling
Dataflow automatically scales the number of workers based on throughput.
Autoscaling Configuration
# Enable autoscaling
options = PipelineOptions([
'--project', 'my-project',
'--region', 'us-central1',
'--runner', 'DataflowRunner',
'--num_workers', '2',
'--max_num_workers', '10',
'--autoscaling_algorithm', 'THROUGHPUT_BASED',
'--worker_machine_type', 'n1-standard-4',
'--disk_size_gb', '100',
'--disk_type', 'pd-ssd'
])
Autoscaling Parameters
| Parameter | Description | Default |
|---|---|---|
num_workers | Initial number of workers | 3 |
max_num_workers | Maximum number of workers | 1000 |
autoscaling_algorithm | THROUGHPUT_BASED or NONE | THROUGHPUT_BASED |
worker_machine_type | Machine type for workers | n1-standard-1 |
Monitoring Autoscaling
# Monitor autoscaling via API
from google.cloud import dataflow_v1beta3
client = dataflow_v1beta3.DataflowV1Beta3Client()
# Get job status
job = client.get_job(
project_id='my-project',
job_id='job-id',
location='us-central1'
)
# Check worker count
print(f"Current workers: {job.current_worker_count}")
print(f"Job status: {job.job_state}")
State Management
State management enables maintaining information across elements in a pipeline.
Keyed State
from apache_beam.transforms.userstate import (
BagStateSpec, CombiningValueStateSpec, TimerSpec,
on_timer, RuntimeTimer, RuntimeState
)
# State specifications
BUFFER_STATE = BagStateSpec('buffer', beam.VarIntCoder())
COUNT_STATE = CombiningValueStateSpec('count', beam.VarIntCoder(), sum)
EXPIRY_TIMER = TimerSpec('expiry', beam.RealtimeClock())
# Stateful DoFn
class StatefulCombineFn(beam.DoFn):
def process(self, element, buffer=beam.DoFn.StateParam(BUFFER_STATE),
count=beam.DoFn.StateParam(COUNT_STATE),
timer=beam.DoFn.TimerParam(EXPIRY_TIMER)):
key, value = element
buffer.add(value)
count.add(1)
timer.set(beam.Timestamp.from_secs(300)) # 5-minute expiry
@on_timer(EXPIRY_TIMER)
def on_expiry(self, buffer=beam.DoFn.StateParam(BUFFER_STATE),
count=beam.DoFn.StateParam(COUNT_STATE)):
yield (list(buffer.read()), count.read())
buffer.clear()
count.clear()
Stateful Processing Example
# Session windowing with state
(
pipeline
| 'Read' >> beam.io.ReadFromPubSub(topic='topic')
| 'Key by User' >> beam.Map(lambda x: (x['user_id'], x))
| 'Window' >> beam.WindowInto(Sessions(300))
| 'Stateful Combine' >> beam.ParDo(StatefulCombineFn())
| 'Format Output' >> beam.Map(format_session_output)
)
Monitoring and Debugging
Stackdriver Metrics
# Custom metrics in pipeline
from apache_beam.metrics import Metrics
class ProcessFn(beam.DoFn):
def __init__(self):
self.processed_counter = Metrics.counter(self.__class__, 'elements_processed')
self.error_counter = Metrics.counter(self.__class__, 'errors')
self.latency_histogram = Metrics.distribution(self.__class__, 'processing_latency_ms')
def process(self, element):
start_time = time.time()
try:
# Process element
result = transform(element)
self.processed_counter.inc()
yield result
except Exception as e:
self.error_counter.inc()
raise
finally:
latency = (time.time() - start_time) * 1000
self.latency_histogram.update(latency)
Monitoring Dashboard
# Create monitoring dashboard
from google.cloud import monitoring_v3
client = monitoring_v3.MetricServiceClient()
project_name = f"projects/my-project"
# Create dashboard
dashboard = monitoring_v3.Dashboard({
"display_name": "Dataflow Pipeline Dashboard",
"mosaic_layout": {
"tiles": [
{
"width": 6,
"height": 4,
"widget": {
"title": "Elements Processed",
"xy_chart": {
"data_sets": [{
"time_series_query": {
"time_series_filter": {
"filter": 'metric.type="dataflow.googleapis.com/job/element_count"',
"aggregation": {
"alignment_period": {"seconds": 60},
"per_series_aligner": "ALIGN_RATE"
}
}
}
}]
}
}
}
]
}
})
Best Practices
- Use windowing appropriately - Match window size to business requirements
- Implement triggers - Handle early, on-time, and late data
- Monitor watermarks - Track watermark lag and late data
- Enable autoscaling - Let Dataflow manage worker count
- Use stateful processing - Maintain state across elements when needed
- Implement error handling - Use dead-letter patterns for failed elements
- Monitor costs - Track slot usage and optimize query patterns