Cloud Pub/Sub for Messaging and Event Streaming
Pub/Sub Architecture
Cloud Pub/Sub is a fully managed, real-time messaging service that allows you to send and receive messages between applications.
Core Components
Topics:
- Named channels for message distribution
- Support for message filtering
- At-least-once delivery guarantee
- Global endpoint access
Subscriptions:
- Represent message delivery to subscribers
- Support for pull and push delivery
- Message acknowledgment mechanism
- Configurable retention periods
Messages:
- Immutable data payloads
- Custom attributes for metadata
- Message ordering keys
- Exactly-once delivery options
Topic and Subscription Management
Creating Topics
# Create a topic
gcloud pubsub topics create my-topic
# Create a topic with labels
gcloud pubsub topics create my-topic \
--labels=env=production,team=data-engineering
# Create a topic with message retention
gcloud pubsub topics create my-topic \
--message-retention-duration=86400s # 1 day
Creating Subscriptions
# Create a pull subscription
gcloud pubsub subscriptions create my-subscription \
--topic=my-topic \
--ack-deadline=60
# Create a push subscription
gcloud pubsub subscriptions create my-push-subscription \
--topic=my-topic \
--push-endpoint=https://my-endpoint.example.com \
--ack-deadline=30
# Create a subscription with message retention
gcloud pubsub subscriptions create my-subscription \
--topic=my-topic \
--message-retention-duration=604800s # 7 days
Python Client Library
from google.cloud import pubsub_v1
import json
# Initialize publisher
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('my-project', 'my-topic')
# Publish a message
data = json.dumps({
'event': 'user_signup',
'user_id': 'user_123',
'timestamp': '2024-01-15T10:30:00Z'
})
future = publisher.publish(
topic_path,
data.encode('utf-8'),
event_type='user_event',
source='web_app'
)
print(f'Published message ID: {future.result()}')
# Initialize subscriber
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path('my-project', 'my-subscription')
# Pull messages
response = subscriber.pull(
request={'subscription': subscription_path, 'max_messages': 10}
)
for message in response.received_messages:
print(f'Received message: {message.message.data}')
print(f'Attributes: {message.message.attributes}')
# Acknowledge the message
subscriber.acknowledge(
request={'subscription': subscription_path, 'ack_ids': [message.ack_id]}
)
Pull vs Push Subscriptions
Pull Subscription
# Pull subscription example
from google.cloud import pubsub_v1
import concurrent.futures
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path('my-project', 'my-subscription')
def callback(message):
"""Process received message."""
print(f'Received message: {message.data}')
print(f'Message ID: {message.message_id}')
print(f'Attributes: {message.attributes}')
# Process the message
try:
process_message(message)
message.ack() # Acknowledge successful processing
except Exception as e:
message.nack() # Negative acknowledge for retry
# Subscribe to messages
future = subscriber.subscribe(subscription_path, callback=callback)
try:
future.result(timeout=300) # Listen for 5 minutes
except KeyboardInterrupt:
future.cancel()
print('Subscriber stopped.')
Push Subscription
# Push subscription endpoint
from flask import Flask, request
import json
app = Flask(__name__)
@app.route('/pubsub/push', methods=['POST'])
def pubsub_push():
"""Handle Pub/Sub push messages."""
envelope = request.get_json(silent=True)
if 'message' not in envelope:
return 'Bad request', 400
message = envelope['message']
data = json.loads(message['data'])
attributes = message.get('attributes', {})
# Process the message
process_message(data, attributes)
return 'OK', 200
def process_message(data, attributes):
"""Process the received message."""
print(f'Processing: {data}')
print(f'Attributes: {attributes}')
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
Comparison
| Feature | Pull Subscription | Push Subscription |
|---|---|---|
| Control | Subscriber controls when to fetch | Server sends messages automatically |
| Scalability | Better for high-throughput | Limited by endpoint capacity |
| Reliability | More control over retries | Depends on endpoint availability |
| Complexity | More complex to implement | Simpler implementation |
| Cost | Pay for pull operations | Pay for push operations |
Message Ordering
Message ordering ensures messages with the same ordering key are delivered in the order they were published.
Enabling Message Ordering
# Publish with ordering key
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('my-project', 'my-ordered-topic')
# Messages with same ordering key are delivered in order
for i in range(10):
data = f'Message {i}'
future = publisher.publish(
topic_path,
data.encode('utf-8'),
ordering_key='user_123', # Same ordering key
sequence_number=str(i)
)
print(f'Published message {i}: {future.result()}')
# Create subscription with ordering
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path('my-project', 'my-ordered-subscription')
# Enable message ordering
subscription = subscriber.create_subscription(
request={
'name': subscription_path,
'topic': topic_path,
'enable_message_ordering': True,
'ack_deadline_seconds': 60
}
)
Ordering Key Best Practices
# Good: Use meaningful ordering keys
ordering_key = f'user_{user_id}' # Per-user ordering
ordering_key = f'session_{session_id}' # Per-session ordering
ordering_key = f'account_{account_id}' # Per-account ordering
# Bad: Too many ordering keys (reduces throughput)
# Each ordering key creates a separate ordering queue
# Use fewer ordering keys for better throughput
# Monitor ordering latency
from google.cloud import monitoring_v3
import time
client = monitoring_v3.MetricServiceClient()
project_name = f"projects/my-project"
# Query ordering latency
interval = monitoring_v3.TimeInterval()
interval.end_time = time.time()
interval.start_time = time.time() - 3600 # Last hour
results = client.list_time_series(
request={
'name': project_name,
'filter': 'metric.type="pubsub.googleapis.com/subscription/num_undelivered_messages"',
'interval': interval,
'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL
}
)
Dead-Letter Topics
Dead-letter topics capture messages that cannot be processed after a specified number of delivery attempts.
Configuring Dead-Letter Topics
# Create dead-letter topic
gcloud pubsub topics create dead-letter-topic
# Create subscription with dead-letter policy
gcloud pubsub subscriptions create my-subscription \
--topic=my-topic \
--dead-letter-topic=dead-letter-topic \
--max-delivery-attempts=5
# Update existing subscription with dead-letter policy
gcloud pubsub subscriptions update my-subscription \
--dead-letter-topic=dead-letter-topic \
--max-delivery-attempts=5
Python Implementation
from google.cloud import pubsub_v1
import json
# Dead-letter subscription handler
def process_dead_letter(message):
"""Process messages from dead-letter topic."""
print(f'Dead letter message: {message.data}')
print(f'Original topic: {message.attributes.get("googclient_original_topic")}')
print(f'Delivery attempt: {message.delivery_attempt}')
# Log for investigation
log_dead_letter(message)
# Alert if needed
if message.delivery_attempt >= 3:
send_alert(f'Message failed {message.delivery_attempt} times')
message.ack()
# Subscribe to dead-letter topic
subscriber = pubsub_v1.SubscriberClient()
dlq_subscription_path = subscriber.subscription_path(
'my-project', 'dead-letter-subscription'
)
future = subscriber.subscribe(dlq_subscription_path, process_dead_letter)
Dead-Letter Policy Configuration
from google.cloud import pubsub_v1
subscriber = pubsub_v1.SubscriberClient()
topic_path = subscriber.topic_path('my-project', 'my-topic')
subscription_path = subscriber.subscription_path('my-project', 'my-subscription')
# Create subscription with dead-letter policy
subscription = subscriber.create_subscription(
request={
'name': subscription_path,
'topic': topic_path,
'dead_letter_policy': {
'dead_letter_topic': 'projects/my-project/topics/dead-letter-topic',
'max_delivery_attempts': 5
},
'ack_deadline_seconds': 60,
'message_retention_duration': {'seconds': 86400} # 1 day
}
)
print(f'Subscription created: {subscription.name}')
Exactly-Once Delivery
Pub/Sub provides exactly-once delivery semantics through message deduplication.
Enabling Exactly-Once Delivery
from google.cloud import pubsub_v1
# Create topic with exactly-once delivery
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('my-project', 'my-exactly-once-topic')
topic = publisher.create_topic(
request={
'name': topic_path,
'message_retention_duration': {'seconds': 86400}
}
)
# Create subscription with exactly-once delivery
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
'my-project', 'my-exactly-once-subscription'
)
subscription = subscriber.create_subscription(
request={
'name': subscription_path,
'topic': topic_path,
'enable_exactly_once_delivery': True,
'ack_deadline_seconds': 60
}
)
Message Deduplication
# Publish with deduplication ID
from google.cloud import pubsub_v1
import hashlib
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('my-project', 'my-exactly-once-topic')
# Generate deduplication ID
def generate_dedup_id(data):
"""Generate unique deduplication ID."""
return hashlib.sha256(data.encode()).hexdigest()
# Publish message with deduplication
data = json.dumps({'order_id': 'order_123', 'amount': 99.99})
dedup_id = generate_dedup_id(data)
future = publisher.publish(
topic_path,
data.encode('utf-8'),
deduplication_id=dedup_id,
event_type='order_created'
)
print(f'Published with dedup ID: {dedup_id}')
Message Filtering
Message filtering allows subscribers to receive only messages matching specific criteria.
Filter Syntax
# Create subscription with filter
gcloud pubsub subscriptions create filtered-subscription \
--topic=my-topic \
--filter='attributes.event_type = "purchase"'
# Complex filter
gcloud pubsub subscriptions create complex-filtered-subscription \
--topic=my-topic \
--filter='attributes.region = "us-east1" AND attributes.event_type = "signup"'
Python Implementation
from google.cloud import pubsub_v1
subscriber = pubsub_v1.SubscriberClient()
topic_path = subscriber.topic_path('my-project', 'my-topic')
subscription_path = subscriber.subscription_path(
'my-project', 'filtered-subscription'
)
# Create subscription with filter
subscription = subscriber.create_subscription(
request={
'name': subscription_path,
'topic': topic_path,
'filter': 'attributes.event_type = "purchase"',
'ack_deadline_seconds': 60
}
)
# Publish messages with attributes
publisher = pubsub_v1.PublisherClient()
# This message will be delivered
publisher.publish(
topic_path,
b'Purchase event',
event_type='purchase',
region='us-east1'
)
# This message will NOT be delivered (filtered out)
publisher.publish(
topic_path,
b'Login event',
event_type='login',
region='us-east1'
)
Performance Optimization
Throughput Optimization
# Optimize publisher throughput
from google.cloud import pubsub_v1
from concurrent.futures import ThreadPoolExecutor
publisher = pubsub_v1.PublisherClient(
publisher_options=pubsub_v1.types.PublisherOptions(
enable_message_ordering=False,
flow_control=pubsub_v1.types.PublishFlowControl(
message_limit=1000,
byte_limit=10 * 1024 * 1024, # 10MB
limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK
)
)
)
# Batch publishing
def publish_messages_batch(messages, topic_path):
"""Publish messages in batches."""
futures = []
for message in messages:
future = publisher.publish(
topic_path,
message['data'].encode('utf-8'),
**message['attributes']
)
futures.append(future)
# Wait for all messages to be published
for future in futures:
print(f'Published: {future.result()}')
# Use thread pool for parallel publishing
with ThreadPoolExecutor(max_workers=10) as executor:
executor.submit(publish_messages_batch, messages, topic_path)
Subscriber Optimization
# Optimize subscriber throughput
from google.cloud import pubsub_v1
import concurrent.futures
subscriber = pubsub_v1.SubscriberClient(
subscriber_options=pubsub_v1.types.SubscriberOptions(
enable_exactly_once_delivery=True,
flow_control=pubsub_v1.types.FlowControl(
limit_messages=1000,
limit_bytes=100 * 1024 * 1024, # 100MB
limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK
)
)
)
# Process messages in parallel
def process_message_batch(messages):
"""Process a batch of messages."""
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(process_single_message, msg) for msg in messages]
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as e:
print(f'Error processing message: {e}')
Monitoring and Alerting
Key Metrics
# Monitor Pub/Sub metrics
from google.cloud import monitoring_v3
import time
client = monitoring_v3.MetricServiceClient()
project_name = f"projects/my-project"
# Key metrics to monitor:
# - pubsub.googleapis.com/subscription/num_undelivered_messages
# - pubsub.googleapis.com/subscription/oldest_unacked_message_age
# - pubsub.googleapis.com/subscription/ack_deadline_seconds
# - pubsub.googleapis.com/topic/message_count
def get_subscription_metrics(subscription_name, hours=1):
"""Get subscription metrics."""
interval = monitoring_v3.TimeInterval()
end_time = time.time()
interval.end_time = end_time
interval.start_time = end_time - (hours * 3600)
results = client.list_time_series(
request={
'name': project_name,
'filter': f'metric.type="pubsub.googleapis.com/subscription/num_undelivered_messages" AND resource.label.subscription="{subscription_name}"',
'interval': interval,
'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL
}
)
for result in results:
print(f'Metric: {result.metric.type}')
for point in result.points:
print(f' Value: {point.value.int64_value}')
print(f' Time: {point.interval.end_time}')
Alerting Policies
# Create alerting policy
from google.cloud import monitoring_v3
client = monitoring_v3.AlertPolicyServiceClient()
project_name = f"projects/my-project"
alert_policy = monitoring_v3.AlertPolicy(
display_name="Pub/Sub High Lag Alert",
conditions=[
monitoring_v3.AlertPolicy.Condition(
display_name="High message lag",
condition_threshold=monitoring_v3.AlertPolicy.Condition.MetricThreshold(
filter='metric.type="pubsub.googleapis.com/subscription/oldest_unacked_message_age"',
comparison=monitoring_v3.ComparisonType.COMPARISON_GT,
threshold_value=300, # 5 minutes
duration={"seconds": 300},
aggregations=[
monitoring_v3.Aggregation(
alignment_period={"seconds": 60},
per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_MEAN
)
]
)
)
],
notification_channels=[],
alert_strategy=monitoring_v3.AlertPolicy.AlertStrategy(
auto_close={"seconds": 1800} # Auto-close after 30 minutes
)
)
response = client.create_alert_policy(
request={"name": project_name, "alert_policy": alert_policy}
)
print(f'Created alert policy: {response.name}')
Best Practices
- Use message ordering - When message sequence matters
- Implement dead-letter topics - Capture failed messages
- Enable exactly-once delivery - For critical transactions
- Use message filtering - Reduce unnecessary message processing
- Monitor lag metrics - Track subscription backlog
- Optimize batch sizes - Balance throughput and latency
- Implement proper error handling - Use ack/nack appropriately