Skip to content

MLOps Implementation Guide

Ready to Use

Quick Reference
  • What: Practices for deploying and maintaining ML systems reliably
  • Target: Level 2 maturity minimum (CI/CD for ML)
  • Components: Data pipeline, model registry, serving, monitoring
  • Key benefit: Reproducibility, auditability, and compliance

Purpose

This guide provides practical guidance for implementing Machine Learning Operations (MLOps) practices in government AI projects, ensuring reliable, maintainable, and auditable ML systems.


What is MLOps?

MLOps is a set of practices that combines machine learning, DevOps, and data engineering to deploy and maintain ML systems reliably and efficiently.

Why MLOps Matters for Government

Challenge MLOps Solution
Reproducibility Version control for code, data, and models
Auditability Complete lineage and audit trails
Reliability Automated testing and monitoring
Compliance Documented processes and controls
Maintainability Standardized pipelines and practices

MLOps Maturity Model

Level 0: Manual

  • Manual model training
  • Manual deployment
  • No version control for models
  • No monitoring

Level 1: ML Pipeline Automation

  • Automated training pipeline
  • Manual deployment
  • Model versioning
  • Basic monitoring

Level 2: CI/CD for ML

  • Automated training
  • Automated testing
  • Automated deployment
  • Comprehensive monitoring

Level 3: Full Automation

  • Automated retraining
  • Automated model validation
  • Continuous deployment
  • Proactive monitoring and alerting

Recommendation: Government projects should target Level 2 minimum.


Core MLOps Components

flowchart TB
    subgraph Pipeline["<strong>MLOps Architecture</strong>"]
        direction TB
        subgraph Main["Main Pipeline"]
            direction LR
            D[<strong>DATA</strong><br/>Pipeline] --> M[<strong>MODEL</strong><br/>Training] --> DEP[<strong>DEPLOY</strong><br/>Serve] --> MON[<strong>MONITOR</strong><br/>Alert]
        end

        subgraph Support["Supporting Infrastructure"]
            direction LR
            FS[Feature Store] ~~~ MR[Model Registry] ~~~ MS[Model Serving] ~~~ FL[Feedback Loop]
        end

        D --> FS
        M --> MR
        DEP --> MS
        MON --> FL
    end

    subgraph VC["<strong>Version Control</strong>"]
        direction LR
        Code ~~~ Data ~~~ Models ~~~ Config
    end

    subgraph CICD["<strong>CI/CD Pipeline</strong>"]
        direction LR
        Test --> Validate --> Deploy --> Rollback
    end

    Pipeline --> VC
    Pipeline --> CICD

    style D fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
    style M fill:#e8f5e9,stroke:#388e3c,stroke-width:2px
    style DEP fill:#fff3e0,stroke:#f57c00,stroke-width:2px
    style MON fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px

Data Pipeline

Data Versioning

Why version data? - Reproducibility - Debugging - Compliance/audit - Rollback capability

Approaches:

Approach Tools Best For
Git-based DVC, Git LFS Small-medium datasets
Delta Lake Delta Lake, Iceberg Large datasets
Snapshot Database snapshots Structured data

Example: DVC Setup

# Initialize DVC
dvc init

# Track data file
dvc add data/training_data.csv

# Push to remote storage
dvc remote add -d storage s3://bucket/dvc-store
dvc push

# Version with git
git add data/training_data.csv.dvc .gitignore
git commit -m "Add training data v1.0"

Feature Store

Benefits: - Feature reuse across models - Consistent feature computation - Point-in-time correctness - Reduced training-serving skew

Components:

flowchart LR
    subgraph FS["<strong>Feature Store</strong>"]
        direction TB
        FR["<strong>Feature Registry</strong><br/>Feature definitions"]
        OFF["<strong>Offline Store</strong><br/>Historical features"]
        ON["<strong>Online Store</strong><br/>Low-latency serving"]
        FP["<strong>Feature Pipelines</strong><br/>Automated computation"]
    end

    style FR fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
    style OFF fill:#e8f5e9,stroke:#388e3c,stroke-width:2px
    style ON fill:#fff3e0,stroke:#f57c00,stroke-width:2px
    style FP fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px

Example: Feature Definition

from feast import Feature, FeatureView, FileSource

# Define feature source
customer_source = FileSource(
    path="data/customer_features.parquet",
    timestamp_field="event_timestamp"
)

# Define feature view
customer_features = FeatureView(
    name="customer_features",
    entities=["customer_id"],
    features=[
        Feature(name="total_transactions", dtype=Float32),
        Feature(name="avg_transaction_value", dtype=Float32),
        Feature(name="days_since_last_transaction", dtype=Int32),
    ],
    online=True,
    batch_source=customer_source,
)


Model Training Pipeline

Pipeline Structure

# training_pipeline.yaml
name: model_training_pipeline
steps:
  - name: data_validation
    command: python scripts/validate_data.py
    inputs:
      - data/training_data.csv
    outputs:
      - reports/data_quality.json

  - name: feature_engineering
    command: python scripts/engineer_features.py
    inputs:
      - data/training_data.csv
    outputs:
      - data/features.parquet

  - name: train_model
    command: python scripts/train.py
    inputs:
      - data/features.parquet
      - config/model_config.yaml
    outputs:
      - models/model.pkl
      - reports/training_metrics.json

  - name: evaluate_model
    command: python scripts/evaluate.py
    inputs:
      - models/model.pkl
      - data/test_features.parquet
    outputs:
      - reports/evaluation_metrics.json

  - name: validate_fairness
    command: python scripts/fairness_check.py
    inputs:
      - models/model.pkl
      - data/test_features.parquet
    outputs:
      - reports/fairness_metrics.json

Model Training Script Template

# train.py
import mlflow
import yaml
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
import pandas as pd
import joblib

def train_model(config_path: str, data_path: str, output_path: str):
    """Train model with full logging."""

    # Load config
    with open(config_path) as f:
        config = yaml.safe_load(f)

    # Start MLflow run
    with mlflow.start_run():
        # Log config
        mlflow.log_params(config['model_params'])

        # Load and prepare data
        df = pd.read_parquet(data_path)
        X = df.drop('target', axis=1)
        y = df['target']

        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )

        # Train model
        model = GradientBoostingClassifier(**config['model_params'])
        model.fit(X_train, y_train)

        # Evaluate
        train_score = model.score(X_train, y_train)
        test_score = model.score(X_test, y_test)

        # Log metrics
        mlflow.log_metric("train_accuracy", train_score)
        mlflow.log_metric("test_accuracy", test_score)

        # Save model
        joblib.dump(model, output_path)
        mlflow.sklearn.log_model(model, "model")

        # Log model metadata
        mlflow.log_artifact(config_path)

        print(f"Model trained. Train: {train_score:.4f}, Test: {test_score:.4f}")

if __name__ == "__main__":
    train_model(
        config_path="config/model_config.yaml",
        data_path="data/features.parquet",
        output_path="models/model.pkl"
    )

Model Registry

Model Versioning

# Model registration with MLflow
import mlflow

# Register model
model_uri = f"runs:/{run_id}/model"
model_version = mlflow.register_model(model_uri, "my_model")

# Transition to staging
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
    name="my_model",
    version=model_version.version,
    stage="Staging"
)

# After validation, promote to production
client.transition_model_version_stage(
    name="my_model",
    version=model_version.version,
    stage="Production"
)

Model Lifecycle

flowchart LR
    subgraph LC["<strong>Model Lifecycle</strong>"]
        direction LR
        DEV["<strong>Development</strong><br/>Training<br/>Experiment"] --> STG["<strong>Staging</strong><br/>Validation<br/>Testing"] --> PROD["<strong>Production</strong><br/>Serving<br/>Monitoring"] --> ARCH["<strong>Archived</strong><br/>Historical<br/>Reference"]
    end

    style DEV fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
    style STG fill:#fff3e0,stroke:#f57c00,stroke-width:2px
    style PROD fill:#e8f5e9,stroke:#388e3c,stroke-width:2px
    style ARCH fill:#eceff1,stroke:#607d8b,stroke-width:2px

Model Metadata

Every registered model should include:

model_metadata:
  name: customer_churn_predictor
  version: 2.3.1
  created_date: 2024-01-15
  created_by: data_science_team
  description: Predicts customer churn probability

  training:
    data_version: v1.2
    data_date_range: 2022-01-01 to 2023-12-31
    training_samples: 150000
    features_count: 47
    algorithm: XGBoost
    hyperparameters:
      n_estimators: 500
      max_depth: 6
      learning_rate: 0.1

  performance:
    auc_roc: 0.82
    precision: 0.75
    recall: 0.71
    f1_score: 0.73

  fairness:
    demographic_parity: 0.92
    equal_opportunity: 0.89

  approved_by: ethics_committee
  approval_date: 2024-01-20

Testing Strategy

Test Types

Test Type Purpose When
Unit Tests Test individual functions Every commit
Integration Tests Test pipeline components Every commit
Data Tests Validate data quality Every run
Model Tests Validate model performance Before deployment
Fairness Tests Check for bias Before deployment
Load Tests Verify serving capacity Before deployment

Test Implementation

# tests/test_model.py
import pytest
import numpy as np
from model import load_model, predict

class TestModel:

    @pytest.fixture
    def model(self):
        return load_model("models/model.pkl")

    @pytest.fixture
    def test_data(self):
        return np.random.rand(100, 47)  # 100 samples, 47 features

    def test_model_loads(self, model):
        """Model should load without error."""
        assert model is not None

    def test_prediction_shape(self, model, test_data):
        """Predictions should have correct shape."""
        predictions = predict(model, test_data)
        assert predictions.shape == (100,)

    def test_prediction_range(self, model, test_data):
        """Predictions should be valid probabilities."""
        predictions = predict(model, test_data)
        assert all(0 <= p <= 1 for p in predictions)

    def test_model_accuracy(self, model):
        """Model should meet minimum accuracy threshold."""
        # Load test data with known labels
        X_test, y_test = load_test_data()
        accuracy = model.score(X_test, y_test)
        assert accuracy >= 0.80, f"Accuracy {accuracy} below threshold"

    def test_fairness_metric(self, model):
        """Model should meet fairness threshold."""
        disparity = calculate_demographic_parity(model)
        assert disparity >= 0.80, f"Fairness disparity {disparity} below threshold"

Data Validation

# data_validation.py
from great_expectations import expect

def validate_training_data(df):
    """Validate training data quality."""

    expectations = []

    # Completeness checks
    expectations.append(
        expect(df['customer_id'].notnull().all(),
               "customer_id should not have nulls")
    )

    # Range checks
    expectations.append(
        expect(df['age'].between(0, 120).all(),
               "age should be between 0 and 120")
    )

    # Distribution checks
    expectations.append(
        expect(df['target'].mean() > 0.05,
               "target should have >5% positive class")
    )

    # Uniqueness checks
    expectations.append(
        expect(df['customer_id'].is_unique,
               "customer_id should be unique")
    )

    # Report results
    failed = [e for e in expectations if not e.success]
    if failed:
        raise DataValidationError(f"Data validation failed: {failed}")

    return True

Deployment

Deployment Patterns

Pattern Description Use Case
Batch Periodic predictions on data Non-real-time needs
Real-time API REST/gRPC endpoint Interactive systems
Streaming Process events as they arrive Real-time pipelines
Embedded Model in application Edge/offline

Blue-Green Deployment

flowchart TB
    LB["<strong>Load Balancer</strong>"] --> BLUE
    LB --> GREEN

    subgraph BLUE["<strong>BLUE</strong> (Current)"]
        B1[Model v2.2]
        B2[100% traffic]
    end

    subgraph GREEN["<strong>GREEN</strong> (New)"]
        G1[Model v2.3]
        G2[0% traffic]
    end

    GREEN -.->|After validation| SWITCH[Shift traffic to GREEN]
    BLUE -.->|After validation| ROLLBACK[BLUE becomes rollback target]

    style LB fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
    style BLUE fill:#bbdefb,stroke:#1976d2,stroke-width:2px
    style GREEN fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
    style SWITCH fill:#e8f5e9,stroke:#388e3c,stroke-width:2px
    style ROLLBACK fill:#eceff1,stroke:#607d8b,stroke-width:2px

Canary Deployment

# canary_deployment.py

class CanaryDeployer:
    """Gradual traffic shifting for model deployment."""

    def __init__(self, model_name: str):
        self.model_name = model_name
        self.canary_percentage = 0

    def start_canary(self, new_version: str, initial_percent: int = 5):
        """Start canary with small traffic percentage."""
        self.new_version = new_version
        self.canary_percentage = initial_percent
        self._update_routing()
        print(f"Canary started: {initial_percent}% to {new_version}")

    def increase_traffic(self, new_percent: int):
        """Increase canary traffic if metrics are good."""
        if self._check_metrics():
            self.canary_percentage = new_percent
            self._update_routing()
            print(f"Canary traffic increased to {new_percent}%")
        else:
            self.rollback()

    def promote(self):
        """Promote canary to full production."""
        if self._check_metrics():
            self.canary_percentage = 100
            self._update_routing()
            print(f"Canary promoted: {self.new_version} now at 100%")
        else:
            self.rollback()

    def rollback(self):
        """Rollback canary deployment."""
        self.canary_percentage = 0
        self._update_routing()
        print(f"Canary rolled back")

    def _check_metrics(self) -> bool:
        """Check if canary metrics are acceptable."""
        # Compare canary vs production metrics
        canary_metrics = get_canary_metrics(self.new_version)
        prod_metrics = get_production_metrics()

        # Check for degradation
        if canary_metrics['latency_p99'] > prod_metrics['latency_p99'] * 1.2:
            return False
        if canary_metrics['error_rate'] > prod_metrics['error_rate'] * 1.5:
            return False
        return True

Monitoring

Key Metrics

# monitoring.py
from prometheus_client import Counter, Histogram, Gauge

# Request metrics
PREDICTION_COUNT = Counter(
    'ml_predictions_total',
    'Total predictions made',
    ['model_name', 'model_version', 'outcome']
)

PREDICTION_LATENCY = Histogram(
    'ml_prediction_latency_seconds',
    'Prediction latency',
    ['model_name'],
    buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)

# Model quality metrics
MODEL_ACCURACY = Gauge(
    'ml_model_accuracy',
    'Current model accuracy',
    ['model_name', 'model_version']
)

FEATURE_DRIFT = Gauge(
    'ml_feature_drift',
    'Feature drift score',
    ['model_name', 'feature_name']
)

PREDICTION_DRIFT = Gauge(
    'ml_prediction_drift',
    'Prediction distribution drift',
    ['model_name']
)

Alerting Rules

# alerting_rules.yaml
groups:
  - name: ml_model_alerts
    rules:
      - alert: HighErrorRate
        expr: rate(ml_prediction_errors_total[5m]) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High model error rate"
          description: "Model {{ $labels.model_name }} error rate > 5%"

      - alert: HighLatency
        expr: histogram_quantile(0.99, ml_prediction_latency_seconds) > 2
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High prediction latency"

      - alert: AccuracyDegradation
        expr: ml_model_accuracy < 0.75
        for: 1h
        labels:
          severity: warning
        annotations:
          summary: "Model accuracy below threshold"

      - alert: DataDrift
        expr: ml_feature_drift > 0.3
        for: 1h
        labels:
          severity: warning
        annotations:
          summary: "Significant data drift detected"

CI/CD Pipeline

Pipeline Configuration

# .github/workflows/ml_pipeline.yaml
name: ML Pipeline

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'

      - name: Install dependencies
        run: pip install -r requirements.txt

      - name: Run unit tests
        run: pytest tests/unit/

      - name: Run integration tests
        run: pytest tests/integration/

  train:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Train model
        run: python scripts/train.py

      - name: Evaluate model
        run: python scripts/evaluate.py

      - name: Validate fairness
        run: python scripts/fairness_check.py

      - name: Upload model artifact
        uses: actions/upload-artifact@v3
        with:
          name: model
          path: models/

  deploy:
    needs: train
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - name: Download model artifact
        uses: actions/download-artifact@v3
        with:
          name: model

      - name: Deploy to staging
        run: ./scripts/deploy_staging.sh

      - name: Run smoke tests
        run: ./scripts/smoke_test.sh

      - name: Promote to production
        if: success()
        run: ./scripts/promote_production.sh

Checklist: MLOps Implementation

Foundation

  • Version control for code (Git)
  • Version control for data (DVC or similar)
  • Experiment tracking (MLflow)
  • Model registry

Automation

  • Automated training pipeline
  • Automated testing
  • Automated deployment
  • Automated monitoring

Quality

  • Data validation
  • Model validation
  • Fairness testing
  • Performance testing

Operations

  • Monitoring dashboard
  • Alerting configured
  • Incident response procedure
  • Rollback capability

Governance

  • Audit logging
  • Model documentation
  • Approval workflow
  • Access controls

Resources

Tools

  • MLflow: Experiment tracking, model registry
  • DVC: Data version control
  • Great Expectations: Data validation
  • Evidently: ML monitoring
  • Feast: Feature store

Further Reading

  • "Introducing MLOps" by Treveil et al.
  • "Machine Learning Design Patterns" by Lakshmanan et al.
  • Google MLOps Whitepaper