Distributed training has become essential for training large-scale machine learning models efficiently. Kubernetes provides an ideal platform for orchestrating distributed training workloads, offering scalability, resource management, and fault tolerance. This comprehensive guide explores best practices for implementing distributed training on Kubernetes, with practical examples and production-ready configurations.
Understanding Distributed Training Architecture on Kubernetes
Distributed training splits the computational workload across multiple nodes, dramatically reducing training time for large models. Kubernetes orchestrates these distributed workloads through specialized operators and custom resources that manage the lifecycle of training jobs.
The most common distributed training patterns include:
- Data Parallelism: Each worker trains on different data subsets with model replicas
- Model Parallelism: Different model portions are distributed across workers
- Hybrid Parallelism: Combines both approaches for extremely large models
Setting Up Kubeflow Training Operator
The Kubeflow Training Operator is the de facto standard for running distributed training jobs on Kubernetes. It supports PyTorch, TensorFlow, MXNet, XGBoost, and other popular frameworks.
Installing the Training Operator
kubectl apply -k "github.com/kubeflow/training-operator/manifests/overlays/standalone?ref=v1.7.0"
# Verify installation
kubectl get pods -n kubeflow
kubectl get crd | grep kubeflow.org
This installation creates Custom Resource Definitions (CRDs) for PyTorchJob, TFJob, MXJob, and other training frameworks.
Implementing PyTorch Distributed Training
PyTorch’s DistributedDataParallel (DDP) is widely used for distributed training. Here’s a production-ready implementation.
PyTorch Training Script
import torch
import torch.distributed as dist
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
import os
def setup_distributed():
"""Initialize distributed training environment"""
dist.init_process_group(
backend='nccl',
init_method='env://',
)
torch.cuda.set_device(int(os.environ['LOCAL_RANK']))
def cleanup():
dist.destroy_process_group()
def train(model, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
output = model(data)
loss = nn.functional.cross_entropy(output, target)
loss.backward()
optimizer.step()
if batch_idx % 100 == 0 and dist.get_rank() == 0:
print(f'Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss.item():.4f}')
def main():
setup_distributed()
# Create model and move to GPU
model = YourModel().cuda()
model = DDP(model, device_ids=[int(os.environ['LOCAL_RANK'])])
# Create distributed sampler
train_dataset = YourDataset()
train_sampler = DistributedSampler(train_dataset)
train_loader = DataLoader(
train_dataset,
batch_size=32,
sampler=train_sampler,
num_workers=4,
pin_memory=True
)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(10):
train_sampler.set_epoch(epoch)
train(model, train_loader, optimizer, epoch)
cleanup()
if __name__ == '__main__':
main()
PyTorchJob YAML Configuration
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: pytorch-distributed-training
namespace: kubeflow
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
restartPolicy: OnFailure
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
containers:
- name: pytorch
image: your-registry/pytorch-training:latest
imagePullPolicy: Always
command:
- python
- /app/train.py
resources:
limits:
nvidia.com/gpu: 1
memory: 16Gi
cpu: 4
requests:
nvidia.com/gpu: 1
memory: 16Gi
cpu: 4
env:
- name: NCCL_DEBUG
value: "INFO"
- name: NCCL_SOCKET_IFNAME
value: "eth0"
volumeMounts:
- name: training-data
mountPath: /data
- name: model-output
mountPath: /output
volumes:
- name: training-data
persistentVolumeClaim:
claimName: training-data-pvc
- name: model-output
persistentVolumeClaim:
claimName: model-output-pvc
Worker:
replicas: 3
restartPolicy: OnFailure
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
containers:
- name: pytorch
image: your-registry/pytorch-training:latest
imagePullPolicy: Always
command:
- python
- /app/train.py
resources:
limits:
nvidia.com/gpu: 1
memory: 16Gi
cpu: 4
requests:
nvidia.com/gpu: 1
memory: 16Gi
cpu: 4
env:
- name: NCCL_DEBUG
value: "INFO"
- name: NCCL_SOCKET_IFNAME
value: "eth0"
volumeMounts:
- name: training-data
mountPath: /data
- name: model-output
mountPath: /output
volumes:
- name: training-data
persistentVolumeClaim:
claimName: training-data-pvc
- name: model-output
persistentVolumeClaim:
claimName: model-output-pvc
Deploying the Training Job
# Apply the PyTorchJob
kubectl apply -f pytorch-job.yaml
# Monitor job status
kubectl get pytorchjob -n kubeflow
kubectl describe pytorchjob pytorch-distributed-training -n kubeflow
# Check pod logs
kubectl logs -n kubeflow pytorch-distributed-training-master-0
kubectl logs -n kubeflow pytorch-distributed-training-worker-0
TensorFlow Distributed Training with TFJob
For TensorFlow users, the TFJob custom resource provides native support for TensorFlow’s distribution strategies.
apiVersion: kubeflow.org/v1
kind: TFJob
metadata:
name: tensorflow-distributed-training
namespace: kubeflow
spec:
tfReplicaSpecs:
Chief:
replicas: 1
restartPolicy: OnFailure
template:
spec:
containers:
- name: tensorflow
image: your-registry/tensorflow-training:latest
command:
- python
- /app/train.py
resources:
limits:
nvidia.com/gpu: 1
memory: 16Gi
requests:
nvidia.com/gpu: 1
memory: 16Gi
env:
- name: TF_CONFIG
value: '{}'
Worker:
replicas: 3
restartPolicy: OnFailure
template:
spec:
containers:
- name: tensorflow
image: your-registry/tensorflow-training:latest
command:
- python
- /app/train.py
resources:
limits:
nvidia.com/gpu: 1
memory: 16Gi
requests:
nvidia.com/gpu: 1
memory: 16Gi
PS:
replicas: 2
restartPolicy: OnFailure
template:
spec:
containers:
- name: tensorflow
image: your-registry/tensorflow-training:latest
command:
- python
- /app/train.py
resources:
limits:
memory: 8Gi
cpu: 4
requests:
memory: 8Gi
cpu: 4
Best Practices for Production Deployments
1. Resource Management and GPU Scheduling
Proper resource allocation is critical for distributed training efficiency. Use node affinity and taints/tolerations to ensure GPU pods are scheduled correctly.
spec:
template:
spec:
nodeSelector:
accelerator: nvidia-tesla-v100
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: training.kubeflow.org/job-name
operator: In
values:
- pytorch-distributed-training
topologyKey: kubernetes.io/hostname
2. Network Optimization for NCCL
NVIDIA Collective Communications Library (NCCL) performance is crucial for multi-GPU training. Configure network settings appropriately:
env:
- name: NCCL_DEBUG
value: "INFO"
- name: NCCL_SOCKET_IFNAME
value: "eth0"
- name: NCCL_IB_DISABLE
value: "0"
- name: NCCL_NET_GDR_LEVEL
value: "5"
- name: NCCL_P2P_DISABLE
value: "0"
3. Persistent Storage Configuration
Use high-performance storage solutions for training data and model checkpoints:
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: training-data-pvc
namespace: kubeflow
spec:
accessModes:
- ReadWriteMany
storageClassName: fast-ssd
resources:
requests:
storage: 500Gi
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: model-output-pvc
namespace: kubeflow
spec:
accessModes:
- ReadWriteMany
storageClassName: fast-ssd
resources:
requests:
storage: 100Gi
4. Monitoring and Observability
Implement comprehensive monitoring for distributed training jobs:
# Install Prometheus and Grafana
kubectl create namespace monitoring
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm install prometheus prometheus-community/kube-prometheus-stack -n monitoring
# Create ServiceMonitor for training jobs
kubectl apply -f - <<EOF
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: training-jobs-monitor
namespace: kubeflow
spec:
selector:
matchLabels:
training.kubeflow.org/job-role: master
endpoints:
- port: metrics
interval: 30s
EOF
5. Fault Tolerance and Checkpointing
Implement robust checkpointing to handle node failures:
import torch
import os
def save_checkpoint(model, optimizer, epoch, loss, checkpoint_dir):
if dist.get_rank() == 0:
checkpoint = {
'epoch': epoch,
'model_state_dict': model.module.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': loss,
}
checkpoint_path = os.path.join(checkpoint_dir, f'checkpoint_epoch_{epoch}.pt')
torch.save(checkpoint, checkpoint_path)
print(f'Checkpoint saved: {checkpoint_path}')
def load_checkpoint(model, optimizer, checkpoint_path):
checkpoint = torch.load(checkpoint_path)
model.module.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
return checkpoint['epoch'], checkpoint['loss']
Troubleshooting Common Issues
NCCL Timeout Errors
If you encounter NCCL timeout errors, increase the timeout value and verify network connectivity:
# Add to environment variables
NCCL_TIMEOUT=7200
NCCL_BLOCKING_WAIT=1
# Test network connectivity between pods
kubectl exec -it pytorch-distributed-training-worker-0 -n kubeflow -- ping pytorch-distributed-training-master-0
Out of Memory (OOM) Errors
Optimize memory usage with gradient accumulation and mixed precision training:
from torch.cuda.amp import autocast, GradScaler
scaler = GradScaler()
accumulation_steps = 4
for batch_idx, (data, target) in enumerate(train_loader):
with autocast():
output = model(data)
loss = criterion(output, target) / accumulation_steps
scaler.scale(loss).backward()
if (batch_idx + 1) % accumulation_steps == 0:
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad()
Pod Scheduling Issues
# Check GPU availability
kubectl describe nodes | grep -A 5 "Allocated resources"
# Verify GPU device plugin
kubectl get daemonset -n kube-system | grep nvidia
# Check node labels
kubectl get nodes --show-labels | grep gpu
Performance Optimization Strategies
Data Loading Optimization
Efficient data loading is crucial for maximizing GPU utilization:
train_loader = DataLoader(
train_dataset,
batch_size=batch_size,
sampler=train_sampler,
num_workers=8, # Increase for better I/O
pin_memory=True, # Faster GPU transfer
persistent_workers=True, # Reuse workers
prefetch_factor=2 # Prefetch batches
)
Gradient Compression
For bandwidth-limited environments, implement gradient compression using Horovod:
# Install Horovod with NCCL support
pip install horovod[pytorch]
# Run with compression
horovodrun -np 4 --compression fp16 python train.py
Conclusion
Distributed training on Kubernetes offers powerful scalability for machine learning workloads. By following these best practices—proper resource allocation, network optimization, robust checkpointing, and comprehensive monitoring—you can build production-ready distributed training pipelines that efficiently utilize your cluster resources.
Key takeaways include using Kubeflow Training Operator for standardized job management, implementing proper fault tolerance mechanisms, optimizing network communication with NCCL, and maintaining observability through monitoring solutions. As your models and datasets grow, these practices will ensure your training infrastructure scales effectively while maintaining reliability and performance.
Start small with a single training job, validate your configuration, and gradually scale to multi-node distributed training as you refine your setup. The investment in proper infrastructure pays dividends in reduced training time and improved model iteration velocity.