MLOps Implementation Guide¶
Ready to Use
Quick Reference
- What: Practices for deploying and maintaining ML systems reliably
- Target: Level 2 maturity minimum (CI/CD for ML)
- Components: Data pipeline, model registry, serving, monitoring
- Key benefit: Reproducibility, auditability, and compliance
Purpose¶
This guide provides practical guidance for implementing Machine Learning Operations (MLOps) practices in government AI projects, ensuring reliable, maintainable, and auditable ML systems.
What is MLOps?¶
MLOps is a set of practices that combines machine learning, DevOps, and data engineering to deploy and maintain ML systems reliably and efficiently.
Why MLOps Matters for Government¶
| Challenge | MLOps Solution |
|---|---|
| Reproducibility | Version control for code, data, and models |
| Auditability | Complete lineage and audit trails |
| Reliability | Automated testing and monitoring |
| Compliance | Documented processes and controls |
| Maintainability | Standardized pipelines and practices |
MLOps Maturity Model¶
Level 0: Manual¶
- Manual model training
- Manual deployment
- No version control for models
- No monitoring
Level 1: ML Pipeline Automation¶
- Automated training pipeline
- Manual deployment
- Model versioning
- Basic monitoring
Level 2: CI/CD for ML¶
- Automated training
- Automated testing
- Automated deployment
- Comprehensive monitoring
Level 3: Full Automation¶
- Automated retraining
- Automated model validation
- Continuous deployment
- Proactive monitoring and alerting
Recommendation: Government projects should target Level 2 minimum.
Core MLOps Components¶
flowchart TB
subgraph Pipeline["<strong>MLOps Architecture</strong>"]
direction TB
subgraph Main["Main Pipeline"]
direction LR
D[<strong>DATA</strong><br/>Pipeline] --> M[<strong>MODEL</strong><br/>Training] --> DEP[<strong>DEPLOY</strong><br/>Serve] --> MON[<strong>MONITOR</strong><br/>Alert]
end
subgraph Support["Supporting Infrastructure"]
direction LR
FS[Feature Store] ~~~ MR[Model Registry] ~~~ MS[Model Serving] ~~~ FL[Feedback Loop]
end
D --> FS
M --> MR
DEP --> MS
MON --> FL
end
subgraph VC["<strong>Version Control</strong>"]
direction LR
Code ~~~ Data ~~~ Models ~~~ Config
end
subgraph CICD["<strong>CI/CD Pipeline</strong>"]
direction LR
Test --> Validate --> Deploy --> Rollback
end
Pipeline --> VC
Pipeline --> CICD
style D fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style M fill:#e8f5e9,stroke:#388e3c,stroke-width:2px
style DEP fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style MON fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px Data Pipeline¶
Data Versioning¶
Why version data? - Reproducibility - Debugging - Compliance/audit - Rollback capability
Approaches:
| Approach | Tools | Best For |
|---|---|---|
| Git-based | DVC, Git LFS | Small-medium datasets |
| Delta Lake | Delta Lake, Iceberg | Large datasets |
| Snapshot | Database snapshots | Structured data |
Example: DVC Setup
# Initialize DVC
dvc init
# Track data file
dvc add data/training_data.csv
# Push to remote storage
dvc remote add -d storage s3://bucket/dvc-store
dvc push
# Version with git
git add data/training_data.csv.dvc .gitignore
git commit -m "Add training data v1.0"
Feature Store¶
Benefits: - Feature reuse across models - Consistent feature computation - Point-in-time correctness - Reduced training-serving skew
Components:
flowchart LR
subgraph FS["<strong>Feature Store</strong>"]
direction TB
FR["<strong>Feature Registry</strong><br/>Feature definitions"]
OFF["<strong>Offline Store</strong><br/>Historical features"]
ON["<strong>Online Store</strong><br/>Low-latency serving"]
FP["<strong>Feature Pipelines</strong><br/>Automated computation"]
end
style FR fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style OFF fill:#e8f5e9,stroke:#388e3c,stroke-width:2px
style ON fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style FP fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px Example: Feature Definition
from feast import Feature, FeatureView, FileSource
# Define feature source
customer_source = FileSource(
path="data/customer_features.parquet",
timestamp_field="event_timestamp"
)
# Define feature view
customer_features = FeatureView(
name="customer_features",
entities=["customer_id"],
features=[
Feature(name="total_transactions", dtype=Float32),
Feature(name="avg_transaction_value", dtype=Float32),
Feature(name="days_since_last_transaction", dtype=Int32),
],
online=True,
batch_source=customer_source,
)
Model Training Pipeline¶
Pipeline Structure¶
# training_pipeline.yaml
name: model_training_pipeline
steps:
- name: data_validation
command: python scripts/validate_data.py
inputs:
- data/training_data.csv
outputs:
- reports/data_quality.json
- name: feature_engineering
command: python scripts/engineer_features.py
inputs:
- data/training_data.csv
outputs:
- data/features.parquet
- name: train_model
command: python scripts/train.py
inputs:
- data/features.parquet
- config/model_config.yaml
outputs:
- models/model.pkl
- reports/training_metrics.json
- name: evaluate_model
command: python scripts/evaluate.py
inputs:
- models/model.pkl
- data/test_features.parquet
outputs:
- reports/evaluation_metrics.json
- name: validate_fairness
command: python scripts/fairness_check.py
inputs:
- models/model.pkl
- data/test_features.parquet
outputs:
- reports/fairness_metrics.json
Model Training Script Template¶
# train.py
import mlflow
import yaml
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
import pandas as pd
import joblib
def train_model(config_path: str, data_path: str, output_path: str):
"""Train model with full logging."""
# Load config
with open(config_path) as f:
config = yaml.safe_load(f)
# Start MLflow run
with mlflow.start_run():
# Log config
mlflow.log_params(config['model_params'])
# Load and prepare data
df = pd.read_parquet(data_path)
X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Train model
model = GradientBoostingClassifier(**config['model_params'])
model.fit(X_train, y_train)
# Evaluate
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
# Log metrics
mlflow.log_metric("train_accuracy", train_score)
mlflow.log_metric("test_accuracy", test_score)
# Save model
joblib.dump(model, output_path)
mlflow.sklearn.log_model(model, "model")
# Log model metadata
mlflow.log_artifact(config_path)
print(f"Model trained. Train: {train_score:.4f}, Test: {test_score:.4f}")
if __name__ == "__main__":
train_model(
config_path="config/model_config.yaml",
data_path="data/features.parquet",
output_path="models/model.pkl"
)
Model Registry¶
Model Versioning¶
# Model registration with MLflow
import mlflow
# Register model
model_uri = f"runs:/{run_id}/model"
model_version = mlflow.register_model(model_uri, "my_model")
# Transition to staging
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name="my_model",
version=model_version.version,
stage="Staging"
)
# After validation, promote to production
client.transition_model_version_stage(
name="my_model",
version=model_version.version,
stage="Production"
)
Model Lifecycle¶
flowchart LR
subgraph LC["<strong>Model Lifecycle</strong>"]
direction LR
DEV["<strong>Development</strong><br/>Training<br/>Experiment"] --> STG["<strong>Staging</strong><br/>Validation<br/>Testing"] --> PROD["<strong>Production</strong><br/>Serving<br/>Monitoring"] --> ARCH["<strong>Archived</strong><br/>Historical<br/>Reference"]
end
style DEV fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style STG fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style PROD fill:#e8f5e9,stroke:#388e3c,stroke-width:2px
style ARCH fill:#eceff1,stroke:#607d8b,stroke-width:2px Model Metadata¶
Every registered model should include:
model_metadata:
name: customer_churn_predictor
version: 2.3.1
created_date: 2024-01-15
created_by: data_science_team
description: Predicts customer churn probability
training:
data_version: v1.2
data_date_range: 2022-01-01 to 2023-12-31
training_samples: 150000
features_count: 47
algorithm: XGBoost
hyperparameters:
n_estimators: 500
max_depth: 6
learning_rate: 0.1
performance:
auc_roc: 0.82
precision: 0.75
recall: 0.71
f1_score: 0.73
fairness:
demographic_parity: 0.92
equal_opportunity: 0.89
approved_by: ethics_committee
approval_date: 2024-01-20
Testing Strategy¶
Test Types¶
| Test Type | Purpose | When |
|---|---|---|
| Unit Tests | Test individual functions | Every commit |
| Integration Tests | Test pipeline components | Every commit |
| Data Tests | Validate data quality | Every run |
| Model Tests | Validate model performance | Before deployment |
| Fairness Tests | Check for bias | Before deployment |
| Load Tests | Verify serving capacity | Before deployment |
Test Implementation¶
# tests/test_model.py
import pytest
import numpy as np
from model import load_model, predict
class TestModel:
@pytest.fixture
def model(self):
return load_model("models/model.pkl")
@pytest.fixture
def test_data(self):
return np.random.rand(100, 47) # 100 samples, 47 features
def test_model_loads(self, model):
"""Model should load without error."""
assert model is not None
def test_prediction_shape(self, model, test_data):
"""Predictions should have correct shape."""
predictions = predict(model, test_data)
assert predictions.shape == (100,)
def test_prediction_range(self, model, test_data):
"""Predictions should be valid probabilities."""
predictions = predict(model, test_data)
assert all(0 <= p <= 1 for p in predictions)
def test_model_accuracy(self, model):
"""Model should meet minimum accuracy threshold."""
# Load test data with known labels
X_test, y_test = load_test_data()
accuracy = model.score(X_test, y_test)
assert accuracy >= 0.80, f"Accuracy {accuracy} below threshold"
def test_fairness_metric(self, model):
"""Model should meet fairness threshold."""
disparity = calculate_demographic_parity(model)
assert disparity >= 0.80, f"Fairness disparity {disparity} below threshold"
Data Validation¶
# data_validation.py
from great_expectations import expect
def validate_training_data(df):
"""Validate training data quality."""
expectations = []
# Completeness checks
expectations.append(
expect(df['customer_id'].notnull().all(),
"customer_id should not have nulls")
)
# Range checks
expectations.append(
expect(df['age'].between(0, 120).all(),
"age should be between 0 and 120")
)
# Distribution checks
expectations.append(
expect(df['target'].mean() > 0.05,
"target should have >5% positive class")
)
# Uniqueness checks
expectations.append(
expect(df['customer_id'].is_unique,
"customer_id should be unique")
)
# Report results
failed = [e for e in expectations if not e.success]
if failed:
raise DataValidationError(f"Data validation failed: {failed}")
return True
Deployment¶
Deployment Patterns¶
| Pattern | Description | Use Case |
|---|---|---|
| Batch | Periodic predictions on data | Non-real-time needs |
| Real-time API | REST/gRPC endpoint | Interactive systems |
| Streaming | Process events as they arrive | Real-time pipelines |
| Embedded | Model in application | Edge/offline |
Blue-Green Deployment¶
flowchart TB
LB["<strong>Load Balancer</strong>"] --> BLUE
LB --> GREEN
subgraph BLUE["<strong>BLUE</strong> (Current)"]
B1[Model v2.2]
B2[100% traffic]
end
subgraph GREEN["<strong>GREEN</strong> (New)"]
G1[Model v2.3]
G2[0% traffic]
end
GREEN -.->|After validation| SWITCH[Shift traffic to GREEN]
BLUE -.->|After validation| ROLLBACK[BLUE becomes rollback target]
style LB fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style BLUE fill:#bbdefb,stroke:#1976d2,stroke-width:2px
style GREEN fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
style SWITCH fill:#e8f5e9,stroke:#388e3c,stroke-width:2px
style ROLLBACK fill:#eceff1,stroke:#607d8b,stroke-width:2px Canary Deployment¶
# canary_deployment.py
class CanaryDeployer:
"""Gradual traffic shifting for model deployment."""
def __init__(self, model_name: str):
self.model_name = model_name
self.canary_percentage = 0
def start_canary(self, new_version: str, initial_percent: int = 5):
"""Start canary with small traffic percentage."""
self.new_version = new_version
self.canary_percentage = initial_percent
self._update_routing()
print(f"Canary started: {initial_percent}% to {new_version}")
def increase_traffic(self, new_percent: int):
"""Increase canary traffic if metrics are good."""
if self._check_metrics():
self.canary_percentage = new_percent
self._update_routing()
print(f"Canary traffic increased to {new_percent}%")
else:
self.rollback()
def promote(self):
"""Promote canary to full production."""
if self._check_metrics():
self.canary_percentage = 100
self._update_routing()
print(f"Canary promoted: {self.new_version} now at 100%")
else:
self.rollback()
def rollback(self):
"""Rollback canary deployment."""
self.canary_percentage = 0
self._update_routing()
print(f"Canary rolled back")
def _check_metrics(self) -> bool:
"""Check if canary metrics are acceptable."""
# Compare canary vs production metrics
canary_metrics = get_canary_metrics(self.new_version)
prod_metrics = get_production_metrics()
# Check for degradation
if canary_metrics['latency_p99'] > prod_metrics['latency_p99'] * 1.2:
return False
if canary_metrics['error_rate'] > prod_metrics['error_rate'] * 1.5:
return False
return True
Monitoring¶
Key Metrics¶
# monitoring.py
from prometheus_client import Counter, Histogram, Gauge
# Request metrics
PREDICTION_COUNT = Counter(
'ml_predictions_total',
'Total predictions made',
['model_name', 'model_version', 'outcome']
)
PREDICTION_LATENCY = Histogram(
'ml_prediction_latency_seconds',
'Prediction latency',
['model_name'],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)
# Model quality metrics
MODEL_ACCURACY = Gauge(
'ml_model_accuracy',
'Current model accuracy',
['model_name', 'model_version']
)
FEATURE_DRIFT = Gauge(
'ml_feature_drift',
'Feature drift score',
['model_name', 'feature_name']
)
PREDICTION_DRIFT = Gauge(
'ml_prediction_drift',
'Prediction distribution drift',
['model_name']
)
Alerting Rules¶
# alerting_rules.yaml
groups:
- name: ml_model_alerts
rules:
- alert: HighErrorRate
expr: rate(ml_prediction_errors_total[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High model error rate"
description: "Model {{ $labels.model_name }} error rate > 5%"
- alert: HighLatency
expr: histogram_quantile(0.99, ml_prediction_latency_seconds) > 2
for: 5m
labels:
severity: warning
annotations:
summary: "High prediction latency"
- alert: AccuracyDegradation
expr: ml_model_accuracy < 0.75
for: 1h
labels:
severity: warning
annotations:
summary: "Model accuracy below threshold"
- alert: DataDrift
expr: ml_feature_drift > 0.3
for: 1h
labels:
severity: warning
annotations:
summary: "Significant data drift detected"
CI/CD Pipeline¶
Pipeline Configuration¶
# .github/workflows/ml_pipeline.yaml
name: ML Pipeline
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run unit tests
run: pytest tests/unit/
- name: Run integration tests
run: pytest tests/integration/
train:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Train model
run: python scripts/train.py
- name: Evaluate model
run: python scripts/evaluate.py
- name: Validate fairness
run: python scripts/fairness_check.py
- name: Upload model artifact
uses: actions/upload-artifact@v3
with:
name: model
path: models/
deploy:
needs: train
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Download model artifact
uses: actions/download-artifact@v3
with:
name: model
- name: Deploy to staging
run: ./scripts/deploy_staging.sh
- name: Run smoke tests
run: ./scripts/smoke_test.sh
- name: Promote to production
if: success()
run: ./scripts/promote_production.sh
Checklist: MLOps Implementation¶
Foundation¶
- Version control for code (Git)
- Version control for data (DVC or similar)
- Experiment tracking (MLflow)
- Model registry
Automation¶
- Automated training pipeline
- Automated testing
- Automated deployment
- Automated monitoring
Quality¶
- Data validation
- Model validation
- Fairness testing
- Performance testing
Operations¶
- Monitoring dashboard
- Alerting configured
- Incident response procedure
- Rollback capability
Governance¶
- Audit logging
- Model documentation
- Approval workflow
- Access controls
Resources¶
Tools¶
- MLflow: Experiment tracking, model registry
- DVC: Data version control
- Great Expectations: Data validation
- Evidently: ML monitoring
- Feast: Feature store
Further Reading¶
- "Introducing MLOps" by Treveil et al.
- "Machine Learning Design Patterns" by Lakshmanan et al.
- Google MLOps Whitepaper