Join our Discord Server
Collabnix Team The Collabnix Team is a diverse collective of Docker, Kubernetes, and IoT experts united by a passion for cloud-native technologies. With backgrounds spanning across DevOps, platform engineering, cloud architecture, and container orchestration, our contributors bring together decades of combined experience from various industries and technical domains.

Kubernetes Operators for ML: Complete CRD Implementation Guide

6 min read

Machine Learning workloads present unique challenges in Kubernetes environments—from managing training jobs and model versioning to orchestrating complex pipelines. Kubernetes Operators, powered by Custom Resource Definitions (CRDs), provide the automation framework needed to manage ML workflows at scale. This comprehensive guide explores how to build, deploy, and manage Kubernetes Operators specifically designed for ML operations.

Understanding Kubernetes Operators and CRDs for ML Workloads

A Kubernetes Operator is an application-specific controller that extends the Kubernetes API to create, configure, and manage complex stateful applications. For ML workloads, Operators automate tasks like distributed training, hyperparameter tuning, model serving, and resource optimization.

Custom Resource Definitions (CRDs) are the foundation of Operators, allowing you to define custom resources that represent ML-specific concepts like training jobs, inference servers, or data pipelines. When combined with custom controllers, CRDs enable declarative management of your entire ML infrastructure.

Why ML Workloads Need Custom Operators

  • Resource Management: ML training requires dynamic GPU allocation and scheduling
  • Distributed Training: Coordinate multiple worker nodes for frameworks like TensorFlow, PyTorch, or MXNet
  • Lifecycle Management: Handle checkpointing, recovery, and model versioning automatically
  • Cost Optimization: Implement spot instance handling and auto-scaling policies
  • Pipeline Orchestration: Manage dependencies between data preprocessing, training, and deployment stages

Creating Your First ML Training CRD

Let’s build a Custom Resource Definition for a PyTorch distributed training job. This CRD will define the structure and validation rules for our ML training resources.

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: mltrainingjobs.ml.collabnix.io
spec:
  group: ml.collabnix.io
  versions:
    - name: v1alpha1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                framework:
                  type: string
                  enum: ["pytorch", "tensorflow", "mxnet"]
                image:
                  type: string
                replicas:
                  type: integer
                  minimum: 1
                  maximum: 100
                resources:
                  type: object
                  properties:
                    gpu:
                      type: integer
                    memory:
                      type: string
                    cpu:
                      type: string
                hyperparameters:
                  type: object
                  additionalProperties:
                    type: string
                checkpointPath:
                  type: string
              required:
                - framework
                - image
                - replicas
            status:
              type: object
              properties:
                phase:
                  type: string
                startTime:
                  type: string
                completionTime:
                  type: string
                workerStatuses:
                  type: array
                  items:
                    type: object
  scope: Namespaced
  names:
    plural: mltrainingjobs
    singular: mltrainingjob
    kind: MLTrainingJob
    shortNames:
      - mltj

Apply this CRD to your cluster:

kubectl apply -f mltrainingjob-crd.yaml
kubectl get crds | grep mltrainingjobs

Implementing the Operator Controller Logic

With the CRD defined, we need a controller that watches for MLTrainingJob resources and reconciles the desired state. We’ll use the Operator SDK framework with Python for this implementation.

Setting Up the Operator SDK

# Install Operator SDK
curl -LO https://github.com/operator-framework/operator-sdk/releases/latest/download/operator-sdk_linux_amd64
chmod +x operator-sdk_linux_amd64
sudo mv operator-sdk_linux_amd64 /usr/local/bin/operator-sdk

# Initialize the operator project
operator-sdk init --domain=collabnix.io --repo=github.com/collabnix/ml-operator
operator-sdk create api --group=ml --version=v1alpha1 --kind=MLTrainingJob --resource --controller

Controller Implementation

Here’s a simplified Python controller using the Kopf framework, which provides a more Pythonic approach to building operators:

import kopf
import kubernetes
from kubernetes import client, config
import logging

config.load_incluster_config()
v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()

@kopf.on.create('ml.collabnix.io', 'v1alpha1', 'mltrainingjobs')
def create_training_job(spec, name, namespace, **kwargs):
    logging.info(f"Creating ML training job: {name}")
    
    framework = spec.get('framework')
    replicas = spec.get('replicas', 1)
    image = spec.get('image')
    gpu_count = spec.get('resources', {}).get('gpu', 0)
    
    # Create master pod
    master_pod = create_master_pod(
        name=f"{name}-master",
        namespace=namespace,
        image=image,
        gpu_count=gpu_count,
        hyperparameters=spec.get('hyperparameters', {})
    )
    
    v1.create_namespaced_pod(namespace=namespace, body=master_pod)
    
    # Create worker pods for distributed training
    if replicas > 1:
        for i in range(replicas - 1):
            worker_pod = create_worker_pod(
                name=f"{name}-worker-{i}",
                namespace=namespace,
                image=image,
                gpu_count=gpu_count,
                master_address=f"{name}-master"
            )
            v1.create_namespaced_pod(namespace=namespace, body=worker_pod)
    
    # Create service for master
    service = create_master_service(name=name, namespace=namespace)
    v1.create_namespaced_service(namespace=namespace, body=service)
    
    return {'message': f'Training job {name} created successfully'}

def create_master_pod(name, namespace, image, gpu_count, hyperparameters):
    container = client.V1Container(
        name="training",
        image=image,
        env=[
            client.V1EnvVar(name="ROLE", value="master"),
            client.V1EnvVar(name="WORLD_SIZE", value=str(replicas))
        ] + [client.V1EnvVar(name=k, value=v) for k, v in hyperparameters.items()],
        resources=client.V1ResourceRequirements(
            limits={
                "nvidia.com/gpu": str(gpu_count)
            } if gpu_count > 0 else {}
        )
    )
    
    pod = client.V1Pod(
        metadata=client.V1ObjectMeta(name=name, labels={"role": "master", "job": name}),
        spec=client.V1PodSpec(containers=[container], restart_policy="Never")
    )
    
    return pod

def create_worker_pod(name, namespace, image, gpu_count, master_address):
    container = client.V1Container(
        name="training",
        image=image,
        env=[
            client.V1EnvVar(name="ROLE", value="worker"),
            client.V1EnvVar(name="MASTER_ADDR", value=master_address),
            client.V1EnvVar(name="MASTER_PORT", value="29500")
        ],
        resources=client.V1ResourceRequirements(
            limits={"nvidia.com/gpu": str(gpu_count)} if gpu_count > 0 else {}
        )
    )
    
    pod = client.V1Pod(
        metadata=client.V1ObjectMeta(name=name, labels={"role": "worker", "job": name}),
        spec=client.V1PodSpec(containers=[container], restart_policy="Never")
    )
    
    return pod

def create_master_service(name, namespace):
    service = client.V1Service(
        metadata=client.V1ObjectMeta(name=f"{name}-master"),
        spec=client.V1ServiceSpec(
            selector={"role": "master", "job": name},
            ports=[client.V1ServicePort(port=29500, target_port=29500)],
            cluster_ip="None"
        )
    )
    return service

@kopf.on.delete('ml.collabnix.io', 'v1alpha1', 'mltrainingjobs')
def delete_training_job(spec, name, namespace, **kwargs):
    logging.info(f"Deleting ML training job: {name}")
    
    # Cleanup pods and services
    v1.delete_collection_namespaced_pod(
        namespace=namespace,
        label_selector=f"job={name}"
    )
    
    try:
        v1.delete_namespaced_service(
            name=f"{name}-master",
            namespace=namespace
        )
    except kubernetes.client.exceptions.ApiException:
        pass
    
    return {'message': f'Training job {name} deleted'}

Deploying the ML Operator

Package your operator as a container and deploy it to your Kubernetes cluster:

FROM python:3.9-slim

WORKDIR /app

RUN pip install kopf kubernetes

COPY controller.py /app/

CMD ["kopf", "run", "/app/controller.py", "--verbose"]

Build and push the operator image:

docker build -t your-registry/ml-operator:v1.0 .
docker push your-registry/ml-operator:v1.0

Create the operator deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-operator
  namespace: ml-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: ml-operator
  template:
    metadata:
      labels:
        app: ml-operator
    spec:
      serviceAccountName: ml-operator
      containers:
      - name: operator
        image: your-registry/ml-operator:v1.0
        resources:
          limits:
            cpu: 500m
            memory: 512Mi
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: ml-operator
  namespace: ml-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: ml-operator-role
rules:
- apiGroups: ["ml.collabnix.io"]
  resources: ["mltrainingjobs"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: [""]
  resources: ["pods", "services"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: ml-operator-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: ml-operator-role
subjects:
- kind: ServiceAccount
  name: ml-operator
  namespace: ml-system
kubectl create namespace ml-system
kubectl apply -f operator-deployment.yaml

Creating ML Training Jobs with Your Operator

Now you can create ML training jobs declaratively using your custom resource:

apiVersion: ml.collabnix.io/v1alpha1
kind: MLTrainingJob
metadata:
  name: pytorch-distributed-training
  namespace: ml-workloads
spec:
  framework: pytorch
  image: pytorch/pytorch:2.0.0-cuda11.8-cudnn8-runtime
  replicas: 4
  resources:
    gpu: 1
    memory: "16Gi"
    cpu: "4"
  hyperparameters:
    BATCH_SIZE: "32"
    LEARNING_RATE: "0.001"
    EPOCHS: "100"
  checkpointPath: "s3://ml-checkpoints/pytorch-training/"
kubectl apply -f training-job.yaml
kubectl get mltrainingjobs -n ml-workloads
kubectl describe mltrainingjob pytorch-distributed-training -n ml-workloads

Monitoring and Observability

Implement status updates in your controller to track training progress:

@kopf.on.field('ml.collabnix.io', 'v1alpha1', 'mltrainingjobs', field='status.phase')
def monitor_training_status(old, new, name, namespace, **kwargs):
    if new == 'Running':
        logging.info(f"Training job {name} is now running")
    elif new == 'Completed':
        logging.info(f"Training job {name} completed successfully")
    elif new == 'Failed':
        logging.error(f"Training job {name} failed")

Check the status of your training jobs:

# Watch training job status
kubectl get mltrainingjobs -w

# Get detailed status
kubectl get mltrainingjob pytorch-distributed-training -o jsonpath='{.status.phase}'

# View logs from master pod
kubectl logs -l role=master,job=pytorch-distributed-training -f

Best Practices for ML Operators

Resource Management

  • Implement resource quotas: Prevent training jobs from consuming all cluster resources
  • Use pod priorities: Ensure critical inference workloads aren’t starved by training jobs
  • Enable GPU time-slicing: Maximize GPU utilization for smaller models
  • Implement auto-scaling: Scale worker nodes based on pending training jobs

Fault Tolerance

  • Checkpoint regularly: Save model state to persistent storage every N iterations
  • Implement retry logic: Automatically restart failed training jobs with exponential backoff
  • Handle spot instance interruptions: Gracefully save state before node termination
  • Use PodDisruptionBudgets: Ensure minimum replicas remain available during updates

Security Considerations

  • Use NetworkPolicies: Restrict communication between training pods
  • Implement RBAC: Limit who can create and manage training jobs
  • Scan images: Ensure training images are free from vulnerabilities
  • Use secrets management: Store cloud credentials and API keys securely

Troubleshooting Common Issues

Operator Not Reconciling Resources

# Check operator logs
kubectl logs -n ml-system deployment/ml-operator

# Verify CRD is registered
kubectl get crd mltrainingjobs.ml.collabnix.io

# Check RBAC permissions
kubectl auth can-i create pods --as=system:serviceaccount:ml-system:ml-operator

Training Jobs Stuck in Pending

# Check pod events
kubectl describe pod -l job=pytorch-distributed-training

# Verify GPU resources
kubectl describe nodes | grep -A 5 "Allocated resources"

# Check resource quotas
kubectl get resourcequota -n ml-workloads

Worker Pods Cannot Connect to Master

# Verify service exists
kubectl get svc -l job=pytorch-distributed-training

# Test DNS resolution from worker pod
kubectl exec -it pytorch-distributed-training-worker-0 -- nslookup pytorch-distributed-training-master

# Check network policies
kubectl get networkpolicies -n ml-workloads

Advanced Features: Auto-Tuning and Optimization

Extend your operator with intelligent features like hyperparameter optimization:

apiVersion: ml.collabnix.io/v1alpha1
kind: MLTrainingJob
metadata:
  name: autotuned-training
spec:
  framework: pytorch
  image: pytorch/pytorch:2.0.0-cuda11.8-cudnn8-runtime
  replicas: 4
  autoTuning:
    enabled: true
    algorithm: "bayesian"
    maxTrials: 20
    objective: "val_accuracy"
    parameters:
      learningRate:
        type: float
        min: 0.0001
        max: 0.1
      batchSize:
        type: int
        values: [16, 32, 64, 128]

Conclusion

Kubernetes Operators and Custom Resource Definitions provide a powerful framework for managing ML workloads at scale. By implementing custom controllers, you can automate complex ML operations, enforce best practices, and provide a seamless experience for data scientists and ML engineers.

The operator pattern enables you to codify operational knowledge, handle distributed training complexities, and integrate with existing Kubernetes tooling. As your ML infrastructure grows, operators become essential for maintaining consistency, reliability, and efficiency across your ML platform.

Start with simple use cases like job scheduling, then gradually add features like auto-scaling, hyperparameter tuning, and model serving integration. The investment in building robust ML operators pays dividends in operational efficiency and developer productivity.

Have Queries? Join https://launchpass.com/collabnix

Collabnix Team The Collabnix Team is a diverse collective of Docker, Kubernetes, and IoT experts united by a passion for cloud-native technologies. With backgrounds spanning across DevOps, platform engineering, cloud architecture, and container orchestration, our contributors bring together decades of combined experience from various industries and technical domains.
Join our Discord Server
Index