Building a Data Lake on GCP with GCS and BigQuery
Data Lake Architecture
A data lake is a centralized repository for storing structured, semi-structured, and unstructured data at any scale. On GCP, Cloud Storage serves as the foundation for data lakes.
Data Lake Layers
Bronze Layer (Raw):
- Original data in native format
- Immutable, append-only
- Schema-on-read
- Historical archive
Silver Layer (Cleansed):
- Validated and cleansed data
- Standardized formats
- Deduplicated
- Enriched with metadata
Gold Layer (Business):
- Business-ready datasets
- Aggregated and transformed
- Optimized for analytics
- Curated for specific use cases
Bucket Structure
Hierarchical Bucket Design
# Create bucket structure
gsutil mb -l us-central1 -c STANDARD gs://my-data-lake
# Create folder structure
gsutil mkdir gs://my-data-lake/bronze/raw/sales/2024/01/15
gsutil mkdir gs://my-data-lake/bronze/raw/sales/2024/01/16
gsutil mkdir gs://my-data-lake/silver/cleansed/sales
gsutil mkdir gs://my-data-lake/gold/curated/sales_daily_summary
gsutil mkdir gs://my-data-lake/looker/reports
Bucket Naming Conventions
Path Naming Patterns
# Path naming conventions
path_patterns = {
'date_partitioned': 'gs://bucket/bronze/raw/{domain}/{yyyy}/{mm}/{dd}/',
'hourly_partitioned': 'gs://bucket/bronze/raw/{domain}/{yyyy}/{mm}/{dd}/{hh}/',
'monthly_partitioned': 'gs://bucket/silver/cleansed/{domain}/{yyyy}/{mm}/',
'versioned': 'gs://bucket/gold/curated/{domain}/v{version}/',
'archived': 'gs://bucket/archive/{domain}/{yyyy}/{mm}/{dd}/',
}
# Example paths
paths = {
'sales_data': 'gs://my-data-lake/bronze/raw/sales/2024/01/15/',
'user_events': 'gs://my-data-lake/bronze/raw/events/2024/01/15/10/',
'daily_summary': 'gs://my-data-lake/gold/curated/sales_daily_summary/2024/01/',
}
Lifecycle Rules
Lifecycle rules automatically transition or delete objects based on age or other criteria.
Always monitor your BigQuery costs using INFORMATION_SCHEMA. Set up budget alerts at 50%, 80%, and 100% thresholds.
Cost Optimization Rules
# Create lifecycle configuration
cat > lifecycle.json << EOF
{
"rule": [
{
"action": {"type": "SetStorageClass", "storageClass": "NEARLINE"},
"condition": {"age": 30, "matchesPrefix": ["bronze/"]}
},
{
"action": {"type": "SetStorageClass", "storageClass": "COLDLINE"},
"condition": {"age": 90, "matchesPrefix": ["bronze/"]}
},
{
"action": {"type": "SetStorageClass", "storageClass": "ARCHIVE"},
"condition": {"age": 365, "matchesPrefix": ["bronze/"]}
},
{
"action": {"type": "Delete"},
"condition": {"age": 730, "matchesPrefix": ["bronze/raw/"]}
},
{
"action": {"type": "SetStorageClass", "storageClass": "NEARLINE"},
"condition": {"age": 60, "matchesPrefix": ["silver/"]}
},
{
"action": {"type": "SetStorageClass", "storageClass": "COLDLINE"},
"condition": {"age": 180, "matchesPrefix": ["silver/"]}
}
]
}
EOF
# Apply lifecycle rules
gsutil lifecycle set lifecycle.json gs://my-data-lake
Lifecycle Rule Types
# Common lifecycle rule patterns
lifecycle_rules = {
'transition_to_nearline': {
'action': {'type': 'SetStorageClass', 'storageClass': 'NEARLINE'},
'condition': {'age': 30}
},
'transition_to_coldline': {
'action': {'type': 'SetStorageClass', 'storageClass': 'COLDLINE'},
'condition': {'age': 90}
},
'transition_to_archive': {
'action': {'type': 'SetStorageClass', 'storageClass': 'ARCHIVE'},
'condition': {'age': 365}
},
'delete_old_data': {
'action': {'type': 'Delete'},
'condition': {'age': 730}
},
'abort_incomplete_multipart': {
'action': {'type': 'AbortIncompleteMultipartUpload'},
'condition': {'age': 7}
}
}
# Apply lifecycle to specific prefixes
for prefix in ['bronze/', 'silver/', 'gold/']:
bucket_name = f'gs://my-data-lake/{prefix}'
print(f'Applying lifecycle to {bucket_name}')
Object Versioning
Object versioning protects against accidental deletion and provides data recovery capabilities.
Enabling Versioning
# Enable versioning on bucket
gsutil versioning set on gs://my-data-lake
# Check versioning status
gsutil versioning get gs://my-data-lake
# List object versions
gsutil ls -la gs://my-data-lake/bronze/raw/sales/
Versioning Best Practices
# Versioning configuration
versioning_config = {
'enabled': True,
'metadata': 'preserve',
'temporary_holds': True,
'object_retention': True
}
# Lifecycle rules for versioned objects
versioned_lifecycle = {
'rule': [
{
'action': {'type': 'Delete'},
'condition': {
'age': 365,
'isLive': False # Only delete non-current versions
}
},
{
'action': {'type': 'Delete'},
'condition': {
'age': 90,
'isLive': True # Keep current versions for 90 days
}
}
]
}
BigQuery External Tables
External tables allow BigQuery to query data directly in Cloud Storage.
Creating External Tables
-- Create external table for CSV files
CREATE EXTERNAL TABLE `analytics.external_sales`
OPTIONS (
format = 'CSV',
uris = ['gs://my-data-lake/bronze/raw/sales/*.csv'],
skip_leading_rows = 1,
field_delimiter = ',',
null_marker = '',
ignore_unknown_values = true
);
-- Create external table for Parquet files
CREATE EXTERNAL TABLE `analytics.external_events`
OPTIONS (
format = 'PARQUET',
uris = ['gs://my-data-lake/bronze/raw/events/*.parquet']
);
-- Create external table for JSON files
CREATE EXTERNAL TABLE `analytics.external_logs`
OPTIONS (
format = 'JSON',
uris = ['gs://my-data-lake/bronze/raw/logs/*.json']
);
External Table Configuration
-- External table with partitioning
CREATE EXTERNAL TABLE `analytics.external_sales_partitioned`
PARTITION BY DATE(_PARTITIONTIME)
OPTIONS (
format = 'PARQUET',
uris = ['gs://my-data-lake/bronze/raw/sales/*']
);
-- Query external table with partition filter
SELECT *
FROM `analytics.external_sales_partitioned`
WHERE DATE(_PARTITIONTIME) = '2024-01-15';
-- Create table from external table
CREATE TABLE `analytics.sales`
AS
SELECT * FROM `analytics.external_sales`
WHERE amount > 0;
Authorized Views
Authorized views provide controlled access to data without granting direct table access.
Creating Authorized Views
-- Create authorized view
CREATE VIEW `analytics.sales_summary_view`
OPTIONS (
description = 'Sales summary view for analysts',
security = 'INVOKER'
)
AS
SELECT
DATE(sale_date) as sale_date,
region,
product_category,
COUNT(*) as transaction_count,
SUM(amount) as total_amount
FROM `analytics.sales`
GROUP BY 1, 2, 3;
-- Grant access to the view
GRANT `roles/bigquery.dataViewer`
ON VIEW `analytics.sales_summary_view`
TO 'user:analyst@company.com';
-- Create dataset with authorized views
CREATE SCHEMA `analytics_views`
OPTIONS (
description = 'Dataset for authorized views',
access = [
{'role': 'READER', 'specialGroup': 'projectReaders'},
{'role': 'WRITER', 'specialGroup': 'projectWriters'},
{'role': 'OWNER', 'specialGroup': 'projectOwners'}
]
);
View Security Patterns
# Security patterns for authorized views
security_patterns = {
'row_level_security': """
CREATE VIEW `analytics.filtered_view`
AS
SELECT * FROM `analytics.sales`
WHERE region = 'US'
""",
'column_level_security': """
CREATE VIEW `analytics.safe_view`
AS
SELECT
sale_id,
sale_date,
region,
amount
FROM `analytics.sales`
-- Exclude PII columns
""",
'aggregated_view': """
CREATE VIEW `analytics.aggregated_view`
AS
SELECT
DATE(sale_date) as sale_date,
region,
SUM(amount) as total_amount
FROM `analytics.sales`
GROUP BY 1, 2
"""
}
Data Ingestion Patterns
Batch Ingestion
# Batch ingestion with Dataflow
import apache_beam as beam
def run_batch_ingestion():
"""Batch ingestion pipeline."""
with beam.Pipeline() as pipeline:
(
pipeline
| 'Read from Source' >> beam.io.ReadFromText('gs://source-bucket/*.csv')
| 'Parse CSV' >> beam.Map(parse_csv)
| 'Validate Data' >> beam.Filter(validate_record)
| 'Write to Bronze' >> beam.io.WriteToText(
'gs://my-data-lake/bronze/raw/sales/2024/01/15/',
file_name_suffix='.parquet'
)
)
Streaming Ingestion
# Streaming ingestion with Pub/Sub and Dataflow
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
def run_streaming_ingestion():
"""Streaming ingestion pipeline."""
with beam.Pipeline() as pipeline:
(
pipeline
| 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(topic='projects/my-project/topics/sales')
| 'Window' >> beam.WindowInto(FixedWindows(60))
| 'Write to Bronze' >> beam.io.WriteToText(
'gs://my-data-lake/bronze/raw/sales/',
file_name_suffix='.parquet'
)
)
Data Quality
Data Validation
# Data quality validation
import pandas as pd
from great_expectations.core import ExpectationSuite, ExpectationConfiguration
def validate_data_quality(file_path):
"""Validate data quality."""
df = pd.read_parquet(file_path)
# Define expectations
expectations = ExpectationSuite('sales_data')
expectations.add_expectation(
ExpectationConfiguration(
expectation_type='expect_column_values_to_not_be_null',
kwargs={'column': 'sale_id'}
)
)
expectations.add_expectation(
ExpectationConfiguration(
expectation_type='expect_column_values_to_be_unique',
kwargs={'column': 'sale_id'}
)
)
expectations.add_expectation(
ExpectationConfiguration(
expectation_type='expect_column_values_to_be_between',
kwargs={'column': 'amount', 'min_value': 0, 'max_value': 1000000}
)
)
# Validate data
result = df.validate(expectations)
return result.success
Data Lineage
# Track data lineage
def track_lineage(source_path, destination_path, transformation_type):
"""Track data lineage."""
lineage = {
'source': source_path,
'destination': destination_path,
'transformation': transformation_type,
'timestamp': datetime.now().isoformat(),
'pipeline_id': 'pipeline_001'
}
# Store lineage metadata
store_lineage_metadata(lineage)
Monitoring and Alerting
Monitor Data Lake Metrics
# Monitor data lake metrics
from google.cloud import monitoring_v3
import time
def monitor_data_lake(project_id, bucket_name):
"""Monitor data lake metrics."""
client = monitoring_v3.MetricServiceClient()
project_name = f"projects/{project_id}"
# Monitor bucket size
interval = monitoring_v3.TimeInterval()
interval.end_time = time.time()
interval.start_time = time.time() - 3600
results = client.list_time_series(
request={
'name': project_name,
'filter': f'resource.label.bucket_name = "{bucket_name}" AND metric.type = "storage.googleapis.com/storage/total_bytes"',
'interval': interval,
'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL
}
)
for result in results:
print(f'Bucket size: {result.points[0].value.int64_value / 1024 / 1024 / 1024:.2f} GB')
Best Practices
- Implement proper layering - Bronze, Silver, Gold architecture
- Use lifecycle rules - Optimize storage costs automatically
- Enable versioning - Protect against data loss
- Create external tables - Enable direct querying in BigQuery
- Use authorized views - Control data access securely
- Monitor data quality - Implement validation and alerting
- Track data lineage - Maintain data provenance information