What is Kubeflow?
Kubeflow is an open-source machine learning platform designed to orchestrate complicated ML workflows on Kubernetes. Built by Google and now maintained by the Cloud Native Computing Foundation (CNCF), Kubeflow simplifies the deployment of machine learning models at scale, making it easier to develop, train, and deploy ML models in production environments.
Why Use Kubeflow?
Kubeflow addresses several key challenges in the ML lifecycle:
- Scalability: Leverage Kubernetes for distributed training and serving
- Portability: Run ML workflows across different cloud providers and on-premises
- Reproducibility: Version control your ML pipelines and experiments
- Collaboration: Enable teams to share and reuse ML components
- End-to-End Workflow: Manage the complete ML lifecycle from data preparation to model serving
Prerequisites
Before getting started with Kubeflow, ensure you have:
- Basic knowledge of Kubernetes concepts
- A Kubernetes cluster (minimum 4 CPUs, 8GB RAM, 50GB storage)
kubectlCLI tool installed and configured- Python 3.7 or higher
- Docker installed (for building custom containers)
Installing Kubeflow
Option 1: Using Kubeflow on Kind (Local Development)
For local development and testing, you can use Kind (Kubernetes in Docker):
# Install Kind
curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.20.0/kind-linux-amd64
chmod +x ./kind
sudo mv ./kind /usr/local/bin/kind
# Create a Kubernetes cluster
kind create cluster --name kubeflow-cluster
# Verify cluster is running
kubectl cluster-info --context kind-kubeflow-cluster
Option 2: Installing Kubeflow Using kustomize
# Clone the Kubeflow manifests repository
git clone https://github.com/kubeflow/manifests.git
cd manifests
# Install Kubeflow components using kustomize
while ! kustomize build example | kubectl apply -f -; do echo "Retrying to apply resources"; sleep 10; done
# Wait for all pods to be ready (this may take 10-15 minutes)
kubectl wait --for=condition=Ready pods --all -n kubeflow --timeout=600s
Option 3: Using MiniKF (Fastest Method)
# Install Vagrant
# Visit https://www.vagrantup.com/downloads
# Install VirtualBox
# Visit https://www.virtualbox.org/wiki/Downloads
# Create and start MiniKF VM
vagrant init arrikto/minikf
vagrant up
# Access Kubeflow UI at http://10.10.10.10
Accessing the Kubeflow Dashboard
Once installed, access the Kubeflow Central Dashboard:
# Port forward to access the dashboard locally
kubectl port-forward -n istio-system svc/istio-ingressgateway 8080:80
# Access the dashboard at http://localhost:8080
Default credentials (if using default installation):
- Email:
user@example.com - Password:
12341234
Creating Your First Kubeflow Pipeline
Step 1: Install Kubeflow Pipelines SDK
pip install kfp --upgrade
Step 2: Create a Simple Pipeline
Create a file named simple_pipeline.py:
import kfp
from kfp import dsl
from kfp.dsl import component, Input, Output, Dataset
@component(
packages_to_install=['pandas', 'scikit-learn'],
base_image='python:3.9'
)
def load_data(dataset_output: Output[Dataset]):
"""Load and prepare iris dataset"""
from sklearn.datasets import load_iris
import pandas as pd
iris = load_iris()
df = pd.DataFrame(iris.data, columns=iris.feature_names)
df['target'] = iris.target
df.to_csv(dataset_output.path, index=False)
print(f"Dataset saved with {len(df)} rows")
@component(
packages_to_install=['pandas', 'scikit-learn'],
base_image='python:3.9'
)
def train_model(dataset_input: Input[Dataset], model_accuracy: Output[float]):
"""Train a simple classification model"""
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
# Load data
df = pd.read_csv(dataset_input.path)
X = df.drop('target', axis=1)
y = df['target']
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Evaluate
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
print(f"Model accuracy: {accuracy:.4f}")
model_accuracy.write(accuracy)
@dsl.pipeline(
name='Simple ML Pipeline',
description='A simple machine learning pipeline for demonstration'
)
def simple_ml_pipeline():
"""Define the pipeline workflow"""
# Step 1: Load data
load_data_task = load_data()
# Step 2: Train model using output from step 1
train_model_task = train_model(
dataset_input=load_data_task.outputs['dataset_output']
)
# Compile the pipeline
if __name__ == '__main__':
kfp.compiler.Compiler().compile(
pipeline_func=simple_ml_pipeline,
package_path='simple_pipeline.yaml'
)
print("Pipeline compiled successfully!")
Step 3: Run the Pipeline
import kfp
# Connect to Kubeflow Pipelines
client = kfp.Client(host='http://localhost:8080')
# Upload and run the pipeline
pipeline_func = simple_ml_pipeline
experiment = client.create_experiment('my-first-experiment')
run = client.run_pipeline(
experiment.id,
job_name='simple-ml-run',
pipeline_func=pipeline_func
)
print(f"Pipeline run created: {run.id}")
Working with Kubeflow Notebooks
Kubeflow provides Jupyter notebooks for interactive development:
Creating a Notebook Server
apiVersion: kubeflow.org/v1alpha1
kind: Notebook
metadata:
name: my-notebook
namespace: kubeflow-user
spec:
template:
spec:
containers:
- name: notebook
image: jupyter/tensorflow-notebook:latest
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
volumeMounts:
- name: workspace
mountPath: /home/jovyan
volumes:
- name: workspace
persistentVolumeClaim:
claimName: workspace-pvc
Apply the configuration:
bash
kubectl apply -f notebook.yaml -n kubeflow-user
Building a Complete ML Workflow
Here’s a more comprehensive example that includes data preprocessing, model training, and evaluation:
import kfp
from kfp import dsl
from kfp.dsl import component, Input, Output, Dataset, Model, Metrics
@component(
packages_to_install=['pandas', 'numpy', 'scikit-learn'],
base_image='python:3.9'
)
def preprocess_data(
dataset_input: Input[Dataset],
train_output: Output[Dataset],
test_output: Output[Dataset]
):
"""Preprocess and split data"""
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import pickle
# Load data
df = pd.read_csv(dataset_input.path)
# Feature engineering
X = df.drop('target', axis=1)
y = df['target']
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Save processed data
train_df = pd.DataFrame(X_train_scaled, columns=X.columns)
train_df['target'] = y_train.values
train_df.to_csv(train_output.path, index=False)
test_df = pd.DataFrame(X_test_scaled, columns=X.columns)
test_df['target'] = y_test.values
test_df.to_csv(test_output.path, index=False)
print(f"Training set: {len(train_df)} samples")
print(f"Test set: {len(test_df)} samples")
@component(
packages_to_install=['pandas', 'scikit-learn', 'joblib'],
base_image='python:3.9'
)
def train_and_save_model(
train_data: Input[Dataset],
model_output: Output[Model],
metrics_output: Output[Metrics]
):
"""Train model and save artifacts"""
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import joblib
# Load training data
train_df = pd.read_csv(train_data.path)
X_train = train_df.drop('target', axis=1)
y_train = train_df['target']
# Train model
model = RandomForestClassifier(
n_estimators=200,
max_depth=10,
random_state=42,
n_jobs=-1
)
model.fit(X_train, y_train)
# Save model
joblib.dump(model, model_output.path)
# Calculate training metrics
train_predictions = model.predict(X_train)
train_accuracy = accuracy_score(y_train, train_predictions)
metrics_output.log_metric('train_accuracy', train_accuracy)
print(f"Training completed with accuracy: {train_accuracy:.4f}")
@component(
packages_to_install=['pandas', 'scikit-learn', 'joblib'],
base_image='python:3.9'
)
def evaluate_model(
test_data: Input[Dataset],
model_input: Input[Model],
metrics_output: Output[Metrics]
):
"""Evaluate model on test data"""
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import joblib
import json
# Load test data and model
test_df = pd.read_csv(test_data.path)
X_test = test_df.drop('target', axis=1)
y_test = test_df['target']
model = joblib.load(model_input.path)
# Make predictions
predictions = model.predict(X_test)
# Calculate metrics
accuracy = accuracy_score(y_test, predictions)
precision = precision_score(y_test, predictions, average='weighted')
recall = recall_score(y_test, predictions, average='weighted')
f1 = f1_score(y_test, predictions, average='weighted')
# Log metrics
metrics_output.log_metric('test_accuracy', accuracy)
metrics_output.log_metric('precision', precision)
metrics_output.log_metric('recall', recall)
metrics_output.log_metric('f1_score', f1)
# Print results
print(f"Test Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
@dsl.pipeline(
name='Complete ML Pipeline',
description='End-to-end machine learning pipeline with preprocessing, training, and evaluation'
)
def complete_ml_pipeline(dataset_url: str = 'iris'):
"""Complete ML workflow pipeline"""
# Load initial data
load_task = load_data()
# Preprocess data
preprocess_task = preprocess_data(
dataset_input=load_task.outputs['dataset_output']
)
# Train model
train_task = train_and_save_model(
train_data=preprocess_task.outputs['train_output']
)
# Evaluate model
evaluate_task = evaluate_model(
test_data=preprocess_task.outputs['test_output'],
model_input=train_task.outputs['model_output']
)
if __name__ == '__main__':
kfp.compiler.Compiler().compile(
pipeline_func=complete_ml_pipeline,
package_path='complete_pipeline.yaml'
)
Monitoring and Tracking Experiments
Kubeflow integrates with various tracking tools. Here’s how to track experiments:
from kfp import Client
from kfp.dsl import PipelineRun
# Initialize client
client = Client(host='http://localhost:8080')
# List all experiments
experiments = client.list_experiments()
for exp in experiments.experiments:
print(f"Experiment: {exp.name}, ID: {exp.id}")
# Get runs for an experiment
runs = client.list_runs(experiment_id='your-experiment-id')
for run in runs.runs:
print(f"Run: {run.name}, Status: {run.status}")
# Get metrics from a specific run
run_details = client.get_run(run_id='your-run-id')
print(run_details.run.metrics)
Best Practices for Kubeflow
1. Resource Management
# Specify resource requests and limits for pipeline components
resources:
requests:
memory: "2Gi"
cpu: "500m"
limits:
memory: "4Gi"
cpu: "2"
2. Use Version Control
# Store pipeline definitions in Git
git init
git add simple_pipeline.py
git commit -m "Add ML pipeline definition"
3. Implement Error Handling
@component
def robust_component():
try:
# Your logic here
pass
except Exception as e:
print(f"Error occurred: {str(e)}")
raise
4. Cache Pipeline Steps
@dsl.pipeline
def cached_pipeline():
task = train_model()
task.execution_options.caching_strategy.max_cache_staleness = "P7D" # 7 days
Deploying Models with KServe
Kubeflow includes KServe for model serving:
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: sklearn-iris
spec:
predictor:
sklearn:
storageUri: "gs://your-bucket/model"
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1"
Deploy the model:
kubectl apply -f inference-service.yaml -n kubeflow
Troubleshooting Common Issues
Issue 1: Pods Not Starting
# Check pod status
kubectl get pods -n kubeflow
# View pod logs
kubectl logs <pod-name> -n kubeflow
# Describe pod for events
kubectl describe pod <pod-name> -n kubeflow
Issue 2: Pipeline Execution Failures
# Check pipeline run logs
kubectl logs -n kubeflow -l pipeline/runid=<run-id>
# View Argo workflow status
kubectl get workflow -n kubeflow
Next Steps
Now that you’ve learned the basics of Kubeflow, consider exploring:
- Katib: Automated hyperparameter tuning
- KFServing: Advanced model serving capabilities
- Kubeflow Pipelines SDK: Advanced pipeline features
- Multi-user isolation: Setting up namespace-based isolation
- Integration with MLflow: Experiment tracking
- Custom training operators: For distributed training
Conclusion
Kubeflow provides a powerful platform for orchestrating machine learning workflows on Kubernetes. By following this guide, you’ve learned how to install Kubeflow, create pipelines, train models, and deploy them in production. As you become more familiar with the platform, you can leverage advanced features for building sophisticated ML systems that scale.
The key to success with Kubeflow is starting simple and gradually incorporating more components as your needs grow. Happy experimenting!
Additional Resources: