πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Vertex AI for ML Pipelines on GCP

🟒 Free Lesson

Advertisement

Vertex AI for ML Pipelines on GCP

Vertex AI Platform ArchitectureDataBigQuery, GCSFeature StoreFeature EngineeringTrainingAutoML / CustomRegistryModel VersioningEndpointPrediction ServiceML PipelinesKubeflow PipelinesModel MonitoringDrift DetectionExplainable AIModel InterpretationBatch PredictionScheduled Inference

Vertex AI Architecture

Vertex AI is Google Cloud's unified AI/ML platform that provides end-to-end tools for building, deploying, and managing ML models.

Core Components

Data Layer:

  • BigQuery for structured data
  • Cloud Storage for unstructured data
  • Data labeling services
  • Data versioning and lineage

Feature Store:

  • Centralized feature repository
  • Online and offline serving
  • Feature versioning and monitoring
  • Point-in-time correctness

Training:

  • AutoML for automated model training
  • Custom training with your own code
  • Distributed training support
  • Hyperparameter tuning

Model Registry:

  • Model versioning and metadata
  • Model lineage tracking
  • A/B testing support
  • Model approval workflows

Prediction:

  • Online prediction endpoints
  • Batch prediction jobs
  • Auto-scaling and monitoring
  • Explainable AI

AutoML

AutoML enables automated model training with minimal code.

Creating AutoML Models

from google.cloud import aiplatform

# Initialize Vertex AI
aiplatform.init(
    project='my-project',
    location='us-central1',
    staging_bucket='gs://my-staging-bucket'
)

# Create Tabular Dataset
dataset = aiplatform.TabularDataset.create(
    display_name='sales-prediction',
    bq_source='bq://my-project.analytics.sales_data'
)

# Train AutoML model
model = aiplatform.AutoMLTabularTrainingJob(
    display_name='sales-prediction-model',
    optimization_prediction_type='regression',
    optimization_objective='minimize-rmse',
).run(
    dataset=dataset,
    target_column='amount',
    training_fraction_split=0.8,
    validation_fraction_split=0.1,
    test_fraction_split=0.1,
    budget_milli_node_hours=1000,
    model_display_name='sales-prediction-v1',
    disable_early_stopping=False,
)

AutoML Configuration

# AutoML training configuration
training_config = {
    'display_name': 'sales-prediction-model',
    'dataset_id': 'sales-dataset',
    'target_column': 'amount',
    'training_fraction_split': 0.8,
    'validation_fraction_split': 0.1,
    'test_fraction_split': 0.1,
    'budget_milli_node_hours': 1000,
    'model_display_name': 'sales-prediction-v1',
    'optimization_prediction_type': 'regression',
    'optimization_objective': 'minimize-rmse',
    'early_stopping': True,
    'additional_experiments': ['QUICK_DEPLOY'],
}

# Create and run training job
job = aiplatform.AutoMLTabularTrainingJob(
    display_name=training_config['display_name'],
    optimization_prediction_type=training_config['optimization_prediction_type'],
    optimization_objective=training_config['optimization_objective'],
)

model = job.run(
    dataset=aiplatform.TabularDataset(training_config['dataset_id']),
    target_column=training_config['target_column'],
    training_fraction_split=training_config['training_fraction_split'],
    validation_fraction_split=training_config['validation_fraction_split'],
    test_fraction_split=training_config['test_fraction_split'],
    budget_milli_node_hours=training_config['budget_milli_node_hours'],
    model_display_name=training_config['model_display_name'],
)

Custom Training

Custom training allows you to run your own ML code on Vertex AI.

Custom Training Job

from google.cloud import aiplatform

# Define training script
training_script = """
import argparse
import tensorflow as tf
from tensorflow import keras

def train_model(args):
    # Load data
    train_data = tf.data.experimental.make_csv_dataset(
        args.train_data,
        batch_size=args.batch_size,
        label_name='amount'
    )
    
    # Build model
    model = keras.Sequential([
        keras.layers.Dense(64, activation='relu'),
        keras.layers.Dense(32, activation='relu'),
        keras.layers.Dense(1)
    ])
    
    model.compile(
        optimizer='adam',
        loss='mse',
        metrics=['mae']
    )
    
    # Train model
    model.fit(
        train_data,
        epochs=args.epochs,
        validation_split=0.2
    )
    
    # Save model
    model.save(args.model_dir)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--train-data', type=str, required=True)
    parser.add_argument('--model-dir', type=str, required=True)
    parser.add_argument('--batch-size', type=int, default=32)
    parser.add_argument('--epochs', type=int, default=10)
    args = parser.parse_args()
    train_model(args)
"""

# Create custom training job
job = aiplatform.CustomTrainingJob(
    display_name='custom-sales-model',
    script_path='train.py',
    container_uri='us-docker.pkg.dev/vertex-ai/training/tf-gpu.2-12:latest',
    requirements=['tensorflow>=2.12'],
    model_serving_container_image_uri='us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-12:latest',
)

# Run training job
model = job.run(
    replica_count=1,
    machine_type='n1-standard-8',
    accelerator_type='NVIDIA_TESLA_T4',
    accelerator_count=1,
    args=['--train-data', 'gs://my-bucket/train.csv', '--model-dir', 'gs://my-bucket/model'],
)

Custom Container Training

# Dockerfile for custom training
FROM us-docker.pkg.dev/vertex-ai/training/tf-gpu.2-12:latest

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY train.py .
COPY config.yaml .

ENTRYPOINT ["python", "train.py"]
# config.yaml
training:
  batch_size: 64
  epochs: 20
  learning_rate: 0.001
  validation_split: 0.2

model:
  architecture: "resnet50"
  pretrained: true
  fine_tune_layers: 10

Feature Store

Feature Store provides a centralized repository for storing, serving, and managing ML features.

Creating Feature Store

from google.cloud import aiplatform

# Create Feature Store
featurestore = aiplatform.Featurestore.create(
    featurestore_id='sales_features',
    online_store_fixed_node_count=3,
)

# Create Feature Group
feature_group = featurestore.create_feature_group(
    feature_group_id='customer_features',
    source=aiplatform.featurestore.FeatureGroupSource(
        bigquery_source=aiplatform.featurestore.FeatureGroupSource.BigQuerySource(
            input_uri='bq://my-project.analytics.customer_features'
        )
    ),
)

# Create Features
feature_group.create_feature(
    feature_id='customer_lifetime_value',
    value_type='DOUBLE',
)

feature_group.create_feature(
    feature_id='purchase_frequency',
    value_type='DOUBLE',
)

# Ingest features
feature_group.ingest_from_bq(
    feature_ids=['customer_lifetime_value', 'purchase_frequency'],
    feature_time='feature_timestamp',
    bq_source_uri='bq://my-project.analytics.customer_features',
)

Feature Serving

# Online feature serving
featurestore = aiplatform.Featurestore('sales_features')

# Get feature values
feature_values = featurestore.read(
    entity_type_ids='customers',
    feature_ids=['customer_lifetime_value', 'purchase_frequency'],
    entity_ids=['customer_123', 'customer_456'],
)

# Batch feature serving
batch_features = featurestore.batch_serve(
    entity_type_ids='customers',
    feature_ids=['customer_lifetime_value', 'purchase_frequency'],
    serving_feature_timestamp=aiplatform.featurestore.FeatureTimestamp(
        feature_time='feature_timestamp'
    ),
)

ML Pipelines

ML Pipelines orchestrate the end-to-end ML workflow.

Creating ML Pipelines

from kfp import dsl
from google_cloud_pipeline_components.v1 import (
    BigQueryDatasetCreateOp,
    TabularTrainingJobPredictHyperparameterTuningOp,
    ModelDeployOp,
)

@dsl.pipeline(
    name='sales-prediction-pipeline',
    description='End-to-end sales prediction pipeline',
)
def sales_prediction_pipeline(
    project_id: str,
    region: str,
    dataset_id: str,
    bq_source: str,
    target_column: str,
):
    # Create BigQuery Dataset
    create_dataset = BigQueryDatasetCreateOp(
        project=project_id,
        dataset_id=dataset_id,
        location='US',
    )
    
    # Train model with hyperparameter tuning
    training_op = TabularTrainingJobPredictHyperparameterTuningOp(
        project=project_id,
        location=region,
        display_name='sales-prediction-training',
        dataset=create_dataset.outputs['dataset'],
        target_column=target_column,
        prediction_type='regression',
        optimization_objective='minimize-rmse',
        hyperparameter_ranges={
            'learning_rate': dsl.V2HyperparameterRange(min_value=0.001, max_value=0.1),
            'num_layers': dsl.V2HyperparameterRange(min_value=2, max_value=5),
        },
        max_trial_count=10,
        parallel_trial_count=2,
        search_algorithm='RANDOM',
    )
    
    # Deploy model to endpoint
    deploy_op = ModelDeployOp(
        model=training_op.outputs['model'],
        endpoint=endpoint,
        deployed_model_display_name='sales-prediction-deployed',
        machine_type='n1-standard-4',
        min_replica_count=1,
        max_replica_count=3,
    )

# Compile pipeline
from kfp import compiler
compiler.Compiler().compile(
    pipeline_func=sales_prediction_pipeline,
    package_path='sales_prediction_pipeline.json',
)

Running ML Pipelines

from google.cloud import aiplatform

# Initialize Vertex AI
aiplatform.init(
    project='my-project',
    location='us-central1',
    staging_bucket='gs://my-staging-bucket'
)

# Create pipeline job
job = aiplatform.PipelineJob(
    display_name='sales-prediction-run',
    template_path='sales_prediction_pipeline.json',
    pipeline_root='gs://my-pipeline-root',
    parameter_values={
        'project_id': 'my-project',
        'region': 'us-central1',
        'dataset_id': 'sales_dataset',
        'bq_source': 'bq://my-project.analytics.sales_data',
        'target_column': 'amount',
    },
)

# Run pipeline
job.submit()

# Wait for completion
job.wait()

# Check results
print(f'Pipeline state: {job.state}')
print(f'Pipeline resource name: {job.resource_name}')

Model Monitoring

Model monitoring detects data drift and model performance degradation.

Setting Up Monitoring

from google.cloud import aiplatform

# Deploy model with monitoring
endpoint = model.deploy(
    deployed_model_display_name='sales-prediction-deployed',
    machine_type='n1-standard-4',
    min_replica_count=1,
    max_replica_count=3,
    monitoring_config=aiplatform.ModelDeploymentMonitoringConfig(
        trigger_config=aiplatform.ModelDeploymentMonitoringTriggerConfig(
            monitoring_interval_seconds=3600,
            monitoring_alert_config=aiplatform.ModelDeploymentMonitoringAlertConfig(
                email_alert_config=aiplatform.ModelDeploymentMonitoringAlertConfig.EmailAlertConfig(
                    user_emails=['ml-team@company.com']
                )
            ),
        ),
        data_sampling_config=aiplatform.ModelDeploymentMonitoringConfig.DataSamplingConfig(
            training_dataset=aiplatform.ModelDeploymentMonitoringConfig.DataSamplingConfig.TrainingDataset(
                bq_source='bq://my-project.analytics.training_data'
            ),
            training_prediction_skew_detection_config=aiplatform.ModelDeploymentMonitoringConfig.DataSamplingConfig.TrainingPredictionSkewDetectionConfig(
                skew_thresholds={
                    'feature_1': 0.3,
                    'feature_2': 0.3,
                }
            ),
            prediction_drift_detection_config=aiplatform.ModelDeploymentMonitoringConfig.DataSamplingConfig.PredictionDriftDetectionConfig(
                drift_thresholds={
                    'feature_1': 0.3,
                    'feature_2': 0.3,
                }
            ),
        ),
    ),
)

Monitoring Metrics

# Monitor model metrics
from google.cloud import monitoring_v3
import time

def monitor_model_performance(project_id, endpoint_id):
    """Monitor model performance metrics."""
    client = monitoring_v3.MetricServiceClient()
    project_name = f"projects/{project_id}"
    
    # Monitor prediction latency
    interval = monitoring_v3.TimeInterval()
    interval.end_time = time.time()
    interval.start_time = time.time() - 3600
    
    results = client.list_time_series(
        request={
            'name': project_name,
            'filter': f'resource.label.endpoint_id = "{endpoint_id}" AND metric.type = "aiplatform.googleapis.com/prediction/latency"',
            'interval': interval,
            'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL
        }
    )
    
    for result in results:
        print(f'Average latency: {result.points[0].value.double_value}ms')

Explainable AI

Explainable AI helps understand model predictions.

Getting Explanations

from google.cloud import aiplatform

# Deploy model with explanations
endpoint = model.deploy(
    deployed_model_display_name='sales-prediction-explainable',
    machine_type='n1-standard-4',
    min_replica_count=1,
    max_replica_count=3,
    explain_config=aiplatform.ExplainConfig(
        featured_attributions=aiplatform.ExplainConfig.FeatureAttribution(
            top_k_features=10,
        ),
    ),
)

# Get prediction with explanation
instances = [
    {'feature_1': 1.0, 'feature_2': 2.0, 'feature_3': 3.0}
]

predictions = endpoint.predict(
    instances=instances,
    parameters={'excluded_features': ['feature_4']},
)

# Access explanations
for prediction in predictions.predictions:
    explanation = prediction.explanation
    print(f'Attribution scores: {explanation.feature_attributions}')

Batch Prediction

Batch prediction runs inference on large datasets.

Running Batch Prediction

from google.cloud import aiplatform

# Create batch prediction job
job = aiplatform.BatchPredictionJob.create(
    display_name='sales-prediction-batch',
    model_name=model.resource_name,
    instances_format='bigquery',
    predictions_format='bigquery',
    input_config=aiplatform.BatchPredictionJob.InputConfig(
        bigquery_source=aiplatform.BatchPredictionJob.InputConfig.BigQuerySource(
            input_uri='bq://my-project.analytics.batch_input'
        )
    ),
    output_config=aiplatform.BatchPredictionJob.OutputConfig(
        bigquery_destination_prefix='bq://my-project.analytics.batch_output'
    ),
    model_parameters={'confidence_threshold': 0.5},
    machine_type='n1-standard-4',
    starting_replica_count=1,
    max_replica_count=5,
)

# Wait for completion
job.wait()

# Check results
print(f'Job state: {job.state}')
print(f'Output location: {job.output_info.bigquery_output_table}')

Best Practices

  1. Use Feature Store - Centralize feature management
  2. Implement ML Pipelines - Automate end-to-end workflows
  3. Enable Model Monitoring - Detect drift and performance issues
  4. Use Explainable AI - Understand model predictions
  5. Optimize costs - Use appropriate machine types and auto-scaling
  6. Version models - Track model lineage and versions
  7. Implement A/B testing - Compare model performance
⭐

Premium Content

Vertex AI for ML Pipelines on GCP

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert GCP Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement