Join our Discord Server
Collabnix Team The Collabnix Team is a diverse collective of Docker, Kubernetes, and IoT experts united by a passion for cloud-native technologies. With backgrounds spanning across DevOps, platform engineering, cloud architecture, and container orchestration, our contributors bring together decades of combined experience from various industries and technical domains.

NVIDIA Jetson AGX Thor: Top 5 Use Cases with Tutorials

16 min read

The NVIDIA Jetson Thor represents a paradigm shift in edge computing for physical AI. Powered by the 2560 Blackwell-based CUDA cores, GPU architecture and featuring 128GB of LPDDR5X memory, Thor delivers an astounding 2,070 FP4 TFLOPS of AI compute—that’s 7.5x more AI performance and 3.5x better energy efficiency compared to Jetson AGX Orin.

Key Specifications

FeatureJetson Thor T5000
GPUBlackwell Architecture
AI Performance2070 FP4 TFLOPS
Memory128 GB LPDDR5X
CPU14-core Arm Neoverse V3AE
PowerConfigurable 40W-130W
Video Encode6x 4Kp60 (H.265/H.264)
Video Decode4x 8Kp30 or 10x 4Kp60
CamerasUp to 20 cameras
Networking4x 25GbE
PriceDeveloper Kit: $3,499

Let’s look at the top 5 use cases where you can leverage NVIDIA Jetson AGX Thor:

Use Case 1: Humanoid Robot Control with Isaac GR00T N1.6

Why Thor for Humanoid Robotics?

Humanoid robots require real-time multi-modal perception, reasoning, and control—exactly what Thor was designed for. Companies like Boston Dynamics, Figure AI, and Agility Robotics are already using Thor for next-generation humanoids.

Architecture Overview

┌─────────────────────────────────────────┐
│   Vision-Language-Action (VLA) Model   │
│         Isaac GR00T N1.6                │
├─────────────────────────────────────────┤
│  Multi-Sensor Input Processing          │
│  • Stereo Cameras (Perception)          │
│  • LiDAR (Spatial Mapping)              │
│  • IMU (Balance & Orientation)          │
│  • Force/Torque Sensors                 │
├─────────────────────────────────────────┤
│    Jetson Thor Hardware                 │
│  • Blackwell GPU (2070 TFLOPS)          │
│  • 14-core CPU                          │
│  • PVA (Programmable Vision Accel)      │
│  • 128GB Memory for Large Models        │
└─────────────────────────────────────────┘

Step-by-Step Setup Tutorial

1. Prerequisites

# Hardware Requirements
- NVIDIA Jetson AGX Thor Developer Kit
- microSD card (64GB+) or NVMe SSD (500GB+ recommended)
- Power supply (130W)
- Internet connection

# Software Requirements
- JetPack 7.0 or later
- CUDA 13.0
- Docker with NVIDIA Container Runtime

2. Install JetPack and Flash Thor

# Download JetPack 7.0 from NVIDIA Developer Portal
# Flash using SDK Manager or Jetson USB method

# Verify installation
cat /etc/nv_tegra_release
# Expected output: R38 (release), REVISION: 2.1

# Check CUDA version
nvcc --version
# Should show CUDA 13.0

3. Setup Isaac ROS Environment

# Create workspace
mkdir -p /mnt/nova_ssd/workspaces/isaac_ros-dev/src
export ISAAC_ROS_WS="/mnt/nova_ssd/workspaces/isaac_ros-dev/"

# Add to bashrc for persistence
echo 'export ISAAC_ROS_WS="${ISAAC_ROS_WS:-/mnt/nova_ssd/workspaces/isaac_ros-dev/}"' >> ~/.bashrc
source ~/.bashrc

# Configure Isaac ROS Apt Repository
sudo apt update && sudo apt install locales
sudo locale-gen en_US en_US.UTF-8
sudo update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8
export LANG=en_US.UTF-8

# Install Isaac ROS CLI
sudo apt-get install -y isaac-ros-cli

4. Install NVIDIA Container Toolkit

# Install prerequisites
sudo apt-get update
sudo apt-get install -y ca-certificates curl gnupg

# Add NVIDIA Container Toolkit keyring
curl -fsSL https://nvidia.github.io/libnvidia-container/gpgkey | \
  sudo gpg --dearmor -o /usr/share/keyrings/nvidia-container-toolkit-keyring.gpg --yes

# Add repository
echo "deb [signed-by=/usr/share/keyrings/nvidia-container-toolkit-keyring.gpg] https://nvidia.github.io/libnvidia-container/stable/deb/$(dpkg --print-architecture) /" | \
  sudo tee /etc/apt/sources.list.d/nvidia-container-toolkit.list

# Install toolkit
sudo apt-get update
sudo apt-get install -y nvidia-container-toolkit

# Configure Docker daemon
sudo nvidia-ctk runtime configure --runtime=docker
sudo systemctl restart docker

# Test Docker GPU access
docker run --rm --runtime=nvidia --gpus all nvidia/cuda:12.4.0-base-ubuntu22.04 nvidia-smi

5. Clone Isaac GR00T Repository

cd $ISAAC_ROS_WS/src
git clone https://github.com/NVIDIA/Isaac-GR00T.git
cd Isaac-GR00T

# Install dependencies
pip install uv
uv pip install -e .

6. Download Pre-trained GR00T Model

# Download GR00T N1.6-3B base model (for GR1 humanoid platform)
huggingface-cli download nvidia/GR00T-N1.6-3B --local-dir ./models/gr00t-n1.6-3b

# Or for larger 7B model (requires more VRAM)
huggingface-cli download nvidia/GR00T-N1.6-7B --local-dir ./models/gr00t-n1.6-7b

7. Run GR00T Policy Server

# gr00t/eval/run_gr00t_server.py
import torch
from gr00t.policy import Gr00tPolicy
from flask import Flask, request, jsonify

app = Flask(__name__)

# Load GR00T model
device = "cuda" if torch.cuda.is_available() else "cpu"
policy = Gr00tPolicy.from_pretrained(
    "nvidia/GR00T-N1.6-3B",
    device=device,
    dtype=torch.float16  # Use FP16 for efficiency
)

@app.route('/predict', methods=['POST'])
def predict_action():
    """
    Input: {
        'images': [...],  # RGB images from cameras
        'proprio': [...], # Joint positions, velocities
        'instruction': "pick up the red cube"
    }
    Output: {
        'actions': [...]  # Joint commands for next 16 timesteps
    }
    """
    data = request.json
    
    # Prepare inputs
    images = torch.tensor(data['images']).to(device)
    proprio = torch.tensor(data['proprio']).to(device)
    instruction = data['instruction']
    
    # Run inference
    with torch.no_grad():
        actions = policy.predict(
            images=images,
            proprio=proprio,
            instruction=instruction,
            chunk_size=16  # Predict 16 timesteps ahead
        )
    
    return jsonify({'actions': actions.cpu().tolist()})

if __name__ == '__main__':
    print(f"GR00T Policy Server running on {device}")
    print(f"Model: GR00T N1.6-3B")
    app.run(host='0.0.0.0', port=5000)

8. Run the Policy Server

# Start the server
cd $ISAAC_ROS_WS/src/Isaac-GR00T
uv run python gr00t/eval/run_gr00t_server.py --embodiment-tag GR1 --model-path nvidia/GR00T-N1.6-3B

# Expected Output:
# Loading checkpoint from nvidia/GR00T-N1.6-3B
# Model loaded on cuda:0
# GR00T Policy Server running on cuda
# * Running on http://0.0.0.0:5000

9. Client Code to Control Robot

# robot_control_client.py
import requests
import numpy as np
import cv2

class HumanoidController:
    def __init__(self, server_url="http://localhost:5000"):
        self.server_url = server_url
        
    def get_action(self, camera_images, joint_states, instruction):
        """
        Send sensor data to GR00T and receive actions
        """
        # Prepare payload
        payload = {
            'images': [img.tolist() for img in camera_images],
            'proprio': joint_states.tolist(),
            'instruction': instruction
        }
        
        # Call policy server
        response = requests.post(
            f"{self.server_url}/predict",
            json=payload
        )
        
        actions = np.array(response.json()['actions'])
        return actions
    
    def control_loop(self, robot_interface):
        """Main control loop"""
        instruction = "walk forward and pick up the box"
        
        while True:
            # Get sensor readings
            images = robot_interface.get_camera_images()
            joint_states = robot_interface.get_joint_states()
            
            # Get actions from GR00T
            actions = self.get_action(images, joint_states, instruction)
            
            # Execute first action (model predictive control)
            robot_interface.execute_action(actions[0])
            
            # Wait for next control cycle (typically 30-50 Hz)
            time.sleep(0.02)

# Usage
controller = HumanoidController()
# controller.control_loop(robot_interface)
```

### **Real-World Applications**

- **Manufacturing**: Assembly line tasks, quality inspection, material handling
- **Healthcare**: Patient assistance, hospital logistics, rehabilitation support
- **Warehousing**: Order fulfillment, inventory management (Amazon, Figure AI)
- **Construction**: Site inspection, tool handling, safety monitoring

### **Performance Benchmarks**

- **Inference Latency**: ~15-30ms per control cycle (3B model)
- **Throughput**: 30-50 Hz control frequency
- **Multi-sensor Processing**: 4+ cameras @ 30 FPS simultaneously
- **Power**: 80-100W typical during operation

---

## **Use Case 2: Running Large Language Models (70B-120B) at the Edge**

### **Why Thor for Edge LLMs?**

Thor's **128GB of unified memory** enables running models that would require multiple GPUs on other platforms. With NVFP4 quantization, even 120B parameter models can run at impressive speeds.

### **Architecture**
```
┌──────────────────────────────────────┐
│     Application Layer                │
│  • Chat Interface / API Server       │
│  • Robot Command Interpreter         │
├──────────────────────────────────────┤
│  LLM Inference Engine                │
│  • vLLM / SGLang / TensorRT-LLM      │
│  • KV Cache Management               │
├──────────────────────────────────────┤
│  Model Serving                       │
│  • Quantized Model (FP4/FP8/INT4)    │
│  • Llama 3.2 70B / GPT-OSS 120B      │
├──────────────────────────────────────┤
│  Thor Hardware                       │
│  • 128GB Unified Memory              │
│  • Blackwell Tensor Cores (FP4)      │
└──────────────────────────────────────┘

Tutorial: Deploy Llama 3.2 Vision 70B on Thor

1. Install vLLM

# Install vLLM with CUDA support
pip3 install --break-system-packages vllm

# Verify installation
python3 -c "import vllm; print(vllm.__version__)"

2. Download Llama 3.2 Vision Model

# Install Hugging Face CLI
pip3 install --break-system-packages huggingface-hub

# Login to Hugging Face (requires Meta license approval)
huggingface-cli login

# Download model (will take some time - ~140GB)
huggingface-cli download meta-llama/Llama-3.2-Vision-70B \
  --local-dir /mnt/nova_ssd/models/llama-3.2-vision-70b

3. Quantize Model to FP4 (Optional but Recommended)

# quantize_model.py
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

model_path = "/mnt/nova_ssd/models/llama-3.2-vision-70b"
output_path = "/mnt/nova_ssd/models/llama-3.2-vision-70b-fp4"

# Load model
print("Loading model...")
model = AutoModelForCausalLM.from_pretrained(
    model_path,
    torch_dtype=torch.float16,
    device_map="auto"
)

# Quantize to FP4 using NVIDIA's NVFP4 format
print("Quantizing to FP4...")
from nv_tensorrt_llm.quantization import quantize_model

quantized_model = quantize_model(
    model,
    quant_config="nvfp4",
    calibration_dataset="c4"  # Use C4 for calibration
)

# Save quantized model
quantized_model.save_pretrained(output_path)
print(f"Quantized model saved to {output_path}")

4. Create vLLM Server

# llm_server.py
from vllm import LLM, SamplingParams
from flask import Flask, request, jsonify
import torch

app = Flask(__name__)

# Initialize vLLM with optimizations for Thor
print("Initializing vLLM...")
llm = LLM(
    model="/mnt/nova_ssd/models/llama-3.2-vision-70b-fp4",
    tensor_parallel_size=1,  # Single GPU on Thor
    gpu_memory_utilization=0.90,  # Use 90% of 128GB
    max_model_len=8192,  # Context length
    quantization="nvfp4",  # Use NVIDIA FP4 format
    enforce_eager=False,  # Use CUDA graphs for speed
    dtype="float16"
)

sampling_params = SamplingParams(
    temperature=0.7,
    top_p=0.95,
    max_tokens=512
)

@app.route('/generate', methods=['POST'])
def generate():
    data = request.json
    prompt = data.get('prompt', '')
    
    # Generate response
    outputs = llm.generate([prompt], sampling_params)
    response = outputs[0].outputs[0].text
    
    return jsonify({
        'response': response,
        'tokens_generated': len(outputs[0].outputs[0].token_ids),
        'gpu_memory_used': torch.cuda.memory_allocated() / 1e9  # GB
    })

@app.route('/health', methods=['GET'])
def health():
    return jsonify({
        'status': 'healthy',
        'model': 'Llama-3.2-Vision-70B-FP4',
        'device': 'Jetson Thor T5000',
        'memory_available': torch.cuda.get_device_properties(0).total_memory / 1e9
    })

if __name__ == '__main__':
    print("LLM Server ready!")
    app.run(host='0.0.0.0', port=8000, threaded=True)

5. Run the Server

# Start server
python3 llm_server.py

# Expected output:
# Initializing vLLM...
# INFO: Loading model weights...
# INFO: Model loaded successfully
# INFO: Using 115.2 GB / 128 GB memory
# LLM Server ready!
# * Running on http://0.0.0.0:8000

6. Test the API

# Test generation
curl -X POST http://localhost:8000/generate \
  -H "Content-Type: application/json" \
  -d '{
    "prompt": "Explain quantum entanglement in simple terms:"
  }'

# Check health
curl http://localhost:8000/health

7. Robot Control Integration

# robot_llm_controller.py
import requests

class RobotLLMController:
    def __init__(self, llm_url="http://localhost:8000"):
        self.llm_url = llm_url
        
    def interpret_command(self, human_instruction):
        """Convert natural language to robot commands"""
        prompt = f"""
Convert this human instruction into robot commands:
Instruction: "{human_instruction}"

Output JSON format:
{{
  "action": "move" | "grasp" | "place" | "wait",
  "target": "object_name",
  "parameters": {{...}}
}}

JSON:"""
        
        response = requests.post(
            f"{self.llm_url}/generate",
            json={'prompt': prompt}
        )
        
        return response.json()['response']
    
    def explain_failure(self, error_data):
        """Explain robot failure to human"""
        prompt = f"""
The robot encountered an error:
Error: {error_data['error_type']}
Details: {error_data['details']}

Explain this to a human operator in simple terms and suggest solutions:"""
        
        response = requests.post(
            f"{self.llm_url}/generate",
            json={'prompt': prompt}
        )
        
        return response.json()['response']

# Usage
controller = RobotLLMController()
command = controller.interpret_command("Pick up the red box and put it on the shelf")
print(command)
```

### **Performance Benchmarks**

- **70B Model**: ~25-40 tokens/second with FP4 quantization
- **120B Model**: ~15-20 tokens/second with FP4 quantization
- **Memory Usage**: 70B model uses ~85GB, 120B uses ~115GB
- **Latency**: First token in ~200-500ms
- **Power**: 100-120W during inference

### **Applications**

- **Robotics**: Natural language control, task planning, error recovery
- **Edge AI**: Private chatbots for hospitals, factories, retail
- **Autonomous Vehicles**: Real-time decision making without cloud connectivity
- **Industrial**: Maintenance documentation, technical support

---

## **Use Case 3: Multi-Camera Real-Time Video Analytics with Vision Programming Interface**

### **Why Thor for Multi-Camera Systems?**

Thor can handle **up to 20 cameras simultaneously** with:
- Dual NVENC/NVDEC engines
- Programmable Vision Accelerator (PVA) for stereo processing
- Camera Offload Engine
- Holoscan Sensor Bridge for Camera over Ethernet

### **Architecture**
```
┌────────────────────────────────────────────────┐
│           Application Layer                    │
│  • Object Detection Dashboard                  │
│  • Alert System • Video Archive                │
├────────────────────────────────────────────────┤
│     AI Inference Layer (on GPU)                │
│  • YOLOv8 Object Detection                     │
│  • Person Re-ID • Activity Recognition         │
├────────────────────────────────────────────────┤
│   Video Processing Pipeline (Multi-Accel)      │
│  • NVDEC: Hardware decode                      │
│  • VIC: Scaling, color conversion              │
│  • PVA: Stereo, optical flow                   │
│  • NVENC: Hardware encode                      │
├────────────────────────────────────────────────┤
│        Camera Inputs (20 streams)              │
│  4K@30fps x 20 cameras (via Holoscan)         │
└────────────────────────────────────────────────┘

Tutorial: Deploy 20-Camera Surveillance System

1. Setup GStreamer Pipeline

# Install GStreamer plugins
sudo apt-get install -y \
    gstreamer1.0-tools \
    gstreamer1.0-plugins-good \
    gstreamer1.0-plugins-bad \
    gstreamer1.0-plugins-ugly \
    gstreamer1.0-libav

# Verify NVDEC/NVENC availability
gst-inspect-1.0 nvv4l2decoder
gst-inspect-1.0 nvv4l2h264enc

2. Multi-Camera Capture Script

# multicam_capture.py
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import numpy as np
import cv2
import threading

Gst.init(None)

class MultiCameraSystem:
    def __init__(self, num_cameras=20):
        self.num_cameras = num_cameras
        self.pipelines = []
        self.frames = [None] * num_cameras
        self.locks = [threading.Lock() for _ in range(num_cameras)]
        
    def create_pipeline(self, camera_id, rtsp_url):
        """Create GStreamer pipeline with hardware acceleration"""
        pipeline_str = f"""
            rtspsrc location={rtsp_url} latency=0 ! 
            rtph264depay ! 
            h264parse ! 
            nvv4l2decoder ! 
            nvvidconv ! 
            video/x-raw, format=BGRx ! 
            appsink name=sink_{camera_id} max-buffers=1 drop=true
        """
        
        pipeline = Gst.parse_launch(pipeline_str)
        appsink = pipeline.get_by_name(f"sink_{camera_id}")
        
        # Set callback for frames
        appsink.set_property('emit-signals', True)
        appsink.connect('new-sample', self.on_new_frame, camera_id)
        
        return pipeline
    
    def on_new_frame(self, appsink, camera_id):
        """Callback when new frame arrives"""
        sample = appsink.emit('pull-sample')
        if sample:
            buffer = sample.get_buffer()
            caps = sample.get_caps()
            
            # Get frame dimensions
            structure = caps.get_structure(0)
            width = structure.get_value('width')
            height = structure.get_value('height')
            
            # Convert to numpy array
            success, map_info = buffer.map(Gst.MapFlags.READ)
            if success:
                frame = np.ndarray(
                    shape=(height, width, 4),
                    dtype=np.uint8,
                    buffer=map_info.data
                )
                
                # Store frame (thread-safe)
                with self.locks[camera_id]:
                    self.frames[camera_id] = frame[:, :, :3]  # Drop alpha
                
                buffer.unmap(map_info)
        
        return Gst.FlowReturn.OK
    
    def start_all_cameras(self, rtsp_urls):
        """Start all camera pipelines"""
        for i, url in enumerate(rtsp_urls):
            pipeline = self.create_pipeline(i, url)
            self.pipelines.append(pipeline)
            pipeline.set_state(Gst.State.PLAYING)
            print(f"Camera {i} started")
    
    def get_frame(self, camera_id):
        """Get latest frame from camera"""
        with self.locks[camera_id]:
            return self.frames[camera_id].copy() if self.frames[camera_id] is not None else None
    
    def stop_all(self):
        """Stop all pipelines"""
        for pipeline in self.pipelines:
            pipeline.set_state(Gst.State.NULL)

# Usage
if __name__ == "__main__":
    # RTSP camera URLs
    camera_urls = [
        f"rtsp://admin:password@192.168.1.{100+i}:554/stream1"
        for i in range(20)
    ]
    
    system = MultiCameraSystem(num_cameras=20)
    system.start_all_cameras(camera_urls)
    
    # Keep running
    try:
        GLib.MainLoop().run()
    except KeyboardInterrupt:
        system.stop_all()

3. Real-Time Object Detection on All Streams

# multi_stream_detection.py
import torch
from ultralytics import YOLO
import cv2
import time
from multicam_capture import MultiCameraSystem
import threading
from collections import deque

class MultiStreamDetector:
    def __init__(self, model_path="yolov8x.pt"):
        # Load YOLOv8 model
        self.model = YOLO(model_path)
        self.model.to('cuda')
        
        # FPS tracking
        self.fps_queues = [deque(maxlen=30) for _ in range(20)]
        
    def detect_batch(self, frames):
        """Run detection on batch of frames"""
        # Filter out None frames
        valid_frames = [(i, f) for i, f in enumerate(frames) if f is not None]
        if not valid_frames:
            return {}
        
        indices, frame_list = zip(*valid_frames)
        
        # Run batch inference (much faster than individual)
        results = self.model(frame_list, stream=False, verbose=False)
        
        # Map results back to camera IDs
        detections = {}
        for idx, result in zip(indices, results):
            detections[idx] = {
                'boxes': result.boxes.xyxy.cpu().numpy(),
                'scores': result.boxes.conf.cpu().numpy(),
                'classes': result.boxes.cls.cpu().numpy()
            }
        
        return detections
    
    def process_stream(self, camera_system, output_queue):
        """Main processing loop"""
        batch_size = 4  # Process 4 cameras at a time for optimal GPU usage
        
        while True:
            start_time = time.time()
            
            # Gather frames from all cameras
            all_frames = []
            for i in range(camera_system.num_cameras):
                frame = camera_system.get_frame(i)
                all_frames.append(frame)
            
            # Process in batches
            for batch_start in range(0, len(all_frames), batch_size):
                batch_end = min(batch_start + batch_size, len(all_frames))
                batch_frames = all_frames[batch_start:batch_end]
                
                # Run detection
                detections = self.detect_batch(batch_frames)
                
                # Put results in queue
                for cam_id, det in detections.items():
                    output_queue.put((cam_id, det))
            
            # Calculate FPS
            elapsed = time.time() - start_time
            fps = 1.0 / elapsed if elapsed > 0 else 0
            
            print(f"Processing FPS: {fps:.1f} | "
                  f"Latency: {elapsed*1000:.1f}ms | "
                  f"Streams: {camera_system.num_cameras}")
            
            time.sleep(0.001)  # Small sleep to prevent CPU spinning

# Usage
if __name__ == "__main__":
    import queue
    
    # Setup camera system
    camera_urls = [f"rtsp://camera{i}.local/stream" for i in range(20)]
    camera_system = MultiCameraSystem(num_cameras=20)
    camera_system.start_all_cameras(camera_urls)
    
    # Setup detector
    detector = MultiStreamDetector(model_path="yolov8x.pt")
    output_queue = queue.Queue(maxsize=100)
    
    # Start detection thread
    detection_thread = threading.Thread(
        target=detector.process_stream,
        args=(camera_system, output_queue),
        daemon=True
    )
    detection_thread.start()
    
    # Process results
    while True:
        try:
            cam_id, detections = output_queue.get(timeout=1)
            print(f"Camera {cam_id}: {len(detections['boxes'])} objects detected")
        except queue.Empty:
            continue

4. Using Vision Programming Interface (VPI) for Stereo Vision

# stereo_vpi.py
import vpi
import numpy as np
import cv2

class StereoVisionProcessor:
    def __init__(self, max_disparity=128):
        self.max_disparity = max_disparity
        
        # Create VPI context for PVA (Programmable Vision Accelerator)
        self.backend = vpi.Backend.PVA
        
        # Create stereo disparity estimator
        self.stereo = vpi.StereoDisparityEstimator(
            output_format=vpi.Format.U16,
            backend=self.backend,
            max_disparity=max_disparity,
            window_size=5,
            confidence_threshold=16
        )
        
    def process_stereo_pair(self, left_img, right_img):
        """
        Process stereo images using hardware acceleration
        Returns: disparity map
        """
        # Convert to VPI images
        with vpi.Backend.CUDA:
            left_vpi = vpi.asimage(left_img, vpi.Format.U8)
            right_vpi = vpi.asimage(right_img, vpi.Format.U8)
        
        # Run stereo matching on PVA
        with self.backend:
            disparity = self.stereo(left_vpi, right_vpi)
            confidence = self.stereo.confidence
        
        # Convert back to numpy
        disparity_np = disparity.cpu()
        confidence_np = confidence.cpu()
        
        return disparity_np, confidence_np
    
    def depth_from_disparity(self, disparity, baseline_m, focal_px):
        """
        Convert disparity to depth
        depth (meters) = (baseline * focal_length) / disparity
        """
        # Avoid division by zero
        disparity_safe = np.where(disparity > 0, disparity, 1)
        depth = (baseline_m * focal_px) / disparity_safe
        depth = np.where(disparity > 0, depth, 0)
        
        return depth

# Usage
if __name__ == "__main__":
    processor = StereoVisionProcessor(max_disparity=128)
    
    # Load stereo images
    left = cv2.imread('left.png', cv2.IMREAD_GRAYSCALE)
    right = cv2.imread('right.png', cv2.IMREAD_GRAYSCALE)
    
    # Process
    disparity, confidence = processor.process_stereo_pair(left, right)
    
    # Convert to depth
    depth = processor.depth_from_disparity(
        disparity,
        baseline_m=0.12,  # 12cm baseline
        focal_px=700      # focal length in pixels
    )
    
    print(f"Disparity shape: {disparity.shape}")
    print(f"Processing time: hardware accelerated on PVA")

Performance Benchmarks

  • 20 cameras @ 1080p30: Achievable with YOLOv8m
  • Stereo Processing: 8 concurrent streams @ 960×600 @ 30 FPS
  • Detection Latency: <50ms per frame (batched inference)
  • Total System Throughput: ~600 FPS across all streams
  • Power: 110-120W under full load

Applications

  • Smart Cities: Traffic monitoring, parking management
  • Retail: Customer analytics, theft prevention
  • Industrial: Quality control, safety monitoring
  • Airports: Security, crowd management

Use Case 4: Autonomous Mobile Robot Navigation with Isaac ROS

Why Thor for AMR Navigation?

Isaac ROS provides GPU-accelerated perception and navigation packages optimized for Thor’s hardware accelerators.

Tutorial: Deploy SLAM and Navigation

1. Install Isaac ROS Packages

# Add Isaac ROS Apt repository (already done in Use Case 1)

# Install core packages
sudo apt-get install -y \
    ros-jazzy-isaac-ros-visual-slam \
    ros-jazzy-isaac-ros-nvblox \
    ros-jazzy-isaac-ros-apriltag \
    ros-jazzy-isaac-ros-depth-segmentation

# Install Navigation2
sudo apt-get install -y ros-jazzy-navigation2 ros-jazzy-nav2-bringup

2. cuVSLAM Configuration

# cuVSLAM.yaml
/**:
  ros__parameters:
    # Visual SLAM parameters
    enable_imu_fusion: true
    enable_slam_visualization: true
    enable_localization_n_mapping: true
    
    # Camera parameters
    camera_optical_frames:
      - camera_left_optical_frame
      - camera_right_optical_frame
    
    # IMU parameters
    imu_frame: imu_link
    
    # Performance tuning for Thor
    num_threads: 8  # Use multiple CPU cores
    gpu_device_id: 0
    
    # Map parameters
    enable_observations_view: true
    enable_landmarks_view: true
    path_max_size: 1024

3. Launch cuVSLAM

# cuVSLAM_launch.py
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration

def generate_launch_description():
    return LaunchDescription([
        # cuVSLAM node
        Node(
            package='isaac_ros_visual_slam',
            executable='isaac_ros_visual_slam',
            name='visual_slam',
            parameters=[{
                'enable_slam_visualization': True,
                'enable_landmarks_view': True,
                'enable_observations_view': True,
                'num_cameras': 2,
                'min_num_images': 2,
                'enable_imu_fusion': True,
                'rectified_images': True,
            }],
            remappings=[
                ('stereo_camera/left/image', '/camera/left/image_rect'),
                ('stereo_camera/right/image', '/camera/right/image_rect'),
                ('stereo_camera/left/camera_info', '/camera/left/camera_info'),
                ('stereo_camera/right/camera_info', '/camera/right/camera_info'),
                ('visual_slam/imu', '/imu/data'),
            ]
        ),
        
        # RViz for visualization
        Node(
            package='rviz2',
            executable='rviz2',
            name='rviz2',
            arguments=['-d', '/path/to/slam.rviz']
        ),
    ])

4. Run Navigation Stack

# Launch cuVSLAM
ros2 launch isaac_ros_visual_slam cuVSLAM_launch.py

# Launch Navigation2
ros2 launch nav2_bringup navigation_launch.py \
    use_sim_time:=false \
    params_file:=/path/to/nav2_params.yaml

# Send navigation goal
ros2 topic pub /goal_pose geometry_msgs/PoseStamped "
{
  header: {frame_id: 'map'},
  pose: {
    position: {x: 5.0, y: 3.0, z: 0.0},
    orientation: {x: 0.0, y: 0.0, z: 0.0, w: 1.0}
  }
}"

5. Python Navigation Client

# amr_navigation_client.py
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped
from nav_msgs.msg import Odometry, Path
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2

class AMRNavigationClient(Node):
    def __init__(self):
        super().__init__('amr_navigation_client')
        
        # Publishers
        self.goal_pub = self.create_publisher(
            PoseStamped, 
            '/goal_pose', 
            10
        )
        
        # Subscribers
        self.odom_sub = self.create_subscription(
            Odometry,
            '/visual_slam/tracking/odometry',
            self.odom_callback,
            10
        )
        
        self.path_sub = self.create_subscription(
            Path,
            '/plan',
            self.path_callback,
            10
        )
        
        self.current_pose = None
        self.cv_bridge = CvBridge()
        
    def odom_callback(self, msg):
        """Update current robot position"""
        self.current_pose = msg.pose.pose
        
    def path_callback(self, msg):
        """Visualize planned path"""
        self.get_logger().info(
            f'Received path with {len(msg.poses)} waypoints'
        )
        
    def send_goal(self, x, y, theta=0.0):
        """Send navigation goal"""
        goal = PoseStamped()
        goal.header.frame_id = 'map'
        goal.header.stamp = self.get_clock().now().to_msg()
        
        goal.pose.position.x = x
        goal.pose.position.y = y
        goal.pose.position.z = 0.0
        
        # Convert theta to quaternion
        import math
        goal.pose.orientation.z = math.sin(theta / 2.0)
        goal.pose.orientation.w = math.cos(theta / 2.0)
        
        self.goal_pub.publish(goal)
        self.get_logger().info(f'Sent goal: ({x}, {y}, {theta})')
        
    def navigate_waypoints(self, waypoints):
        """Navigate through list of waypoints"""
        for i, (x, y, theta) in enumerate(waypoints):
            self.get_logger().info(f'Waypoint {i+1}/{len(waypoints)}')
            self.send_goal(x, y, theta)
            
            # Wait for robot to reach goal (simplified)
            import time
            time.sleep(10)  # In practice, check distance to goal

def main():
    rclpy.init()
    
    client = AMRNavigationClient()
    
    # Define warehouse waypoints
    waypoints = [
        (5.0, 0.0, 0.0),      # Move forward
        (5.0, 5.0, 1.57),     # Turn left
        (0.0, 5.0, 3.14),     # Move back
        (0.0, 0.0, 0.0),      # Return to start
    ]
    
    # Execute mission
    client.navigate_waypoints(waypoints)
    
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Performance

  • cuVSLAM: 30 FPS stereo processing with <30ms latency
  • Mapping Rate: 1000+ landmarks/sec
  • Navigation Update: 20 Hz path planning
  • Power: 70-90W during navigation

Use Case 5: Video Search and Summarization with Multi-Modal AI

Why Thor for VSS?

Thor’s Multi-Instance GPU (MIG) technology allows running multiple AI models concurrently for efficient video analysis.

Tutorial: Build Video Search System

1. Install Dependencies

# Install video processing tools
pip3 install --break-system-packages \
    opencv-python \
    moviepy \
    faiss-gpu \
    sentence-transformers

# Install VLM dependencies
pip3 install --break-system-packages \
    transformers \
    torch \
    pillow

2. Video Indexing System

python

# video_indexer.py
import torch
from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import cv2
import numpy as np
import faiss
import pickle
from pathlib import Path

class VideoIndexer:
    def __init__(self, model_name="openai/clip-vit-large-patch14"):
        self.device = "cuda"
        
        # Load CLIP model for vision-language understanding
        print("Loading CLIP model...")
        self.model = CLIPModel.from_pretrained(model_name).to(self.device)
        self.processor = CLIPProcessor.from_pretrained(model_name)
        
        # FAISS index for fast similarity search
        self.dimension = 768  # CLIP embedding dimension
        self.index = faiss.IndexFlatIP(self.dimension)  # Inner product (cosine similarity)
        
        # Metadata storage
        self.frame_metadata = []
        
    def extract_frames(self, video_path, fps=1):
        """Extract frames from video at specified FPS"""
        cap = cv2.VideoCapture(video_path)
        video_fps = cap.get(cv2.CAP_PROP_FPS)
        frame_interval = int(video_fps / fps)
        
        frames = []
        frame_count = 0
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break
                
            if frame_count % frame_interval == 0:
                # Convert BGR to RGB
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frames.append((frame_count / video_fps, frame_rgb))
            
            frame_count += 1
        
        cap.release()
        return frames
    
    def encode_frames(self, frames):
        """Encode frames to CLIP embeddings"""
        embeddings = []
        
        batch_size = 32
        for i in range(0, len(frames), batch_size):
            batch_frames = [f[1] for f in frames[i:i+batch_size]]
            
            # Process images
            images = [Image.fromarray(f) for f in batch_frames]
            inputs = self.processor(images=images, return_tensors="pt", padding=True)
            inputs = {k: v.to(self.device) for k, v in inputs.items()}
            
            # Get embeddings
            with torch.no_grad():
                image_features = self.model.get_image_features(**inputs)
                image_features = image_features / image_features.norm(p=2, dim=-1, keepdim=True)
            
            embeddings.append(image_features.cpu().numpy())
        
        return np.vstack(embeddings)
    
    def index_video(self, video_path):
        """Index entire video"""
        print(f"Processing {video_path}...")
        
        # Extract frames
        frames = self.extract_frames(video_path, fps=1)
        print(f"Extracted {len(frames)} frames")
        
        # Encode frames
        embeddings = self.encode_frames(frames)
        
        # Add to FAISS index
        self.index.add(embeddings.astype('float32'))
        
        # Store metadata
        for timestamp, frame in frames:
            self.frame_metadata.append({
                'video': str(video_path),
                'timestamp': timestamp,
                'frame_id': len(self.frame_metadata)
            })
        
        print(f"Indexed {len(frames)} frames from {video_path}")
    
    def search(self, text_query, k=10):
        """Search videos using text query"""
        # Encode text query
        inputs = self.processor(text=[text_query], return_tensors="pt", padding=True)
        inputs = {k: v.to(self.device) for k, v in inputs.items()}
        
        with torch.no_grad():
            text_features = self.model.get_text_features(**inputs)
            text_features = text_features / text_features.norm(p=2, dim=-1, keepdim=True)
        
        query_embedding = text_features.cpu().numpy().astype('float32')
        
        # Search FAISS index
        scores, indices = self.index.search(query_embedding, k)
        
        # Get results
        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx < len(self.frame_metadata):
                result = self.frame_metadata[idx].copy()
                result['score'] = float(score)
                results.append(result)
        
        return results
    
    def save_index(self, path):
        """Save index and metadata"""
        faiss.write_index(self.index, f"{path}.index")
        with open(f"{path}.metadata", 'wb') as f:
            pickle.dump(self.frame_metadata, f)
    
    def load_index(self, path):
        """Load index and metadata"""
        self.index = faiss.read_index(f"{path}.index")
        with open(f"{path}.metadata", 'rb') as f:
            self.frame_metadata = pickle.load(f)

# Usage
if __name__ == "__main__":
    indexer = VideoIndexer()
    
    # Index videos
    video_dir = Path("/mnt/nova_ssd/videos/")
    for video_file in video_dir.glob("*.mp4"):
        indexer.index_video(str(video_file))
    
    # Save index
    indexer.save_index("/mnt/nova_ssd/video_index")
    
    # Search
    results = indexer.search("person walking with a dog", k=5)
    for r in results:
        print(f"Video: {r['video']} | Time: {r['timestamp']:.1f}s | Score: {r['score']:.3f}")

3. Video Summarization

# video_summarizer.py
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import cv2
from PIL import Image
import base64
from io import BytesIO

class VideoSummarizer:
    def __init__(self, model_name="microsoft/phi-3.5-vision-instruct"):
        self.device = "cuda"
        
        # Load vision-language model
        print("Loading VLM for summarization...")
        self.model = AutoModelForCausalLM.from_pretrained(
            model_name,
            torch_dtype=torch.float16,
            trust_remote_code=True
        ).to(self.device)
        
        self.tokenizer = AutoTokenizer.from_pretrained(
            model_name,
            trust_remote_code=True
        )
        
    def extract_key_frames(self, video_path, num_frames=10):
        """Extract representative frames"""
        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        # Sample frames evenly
        frame_indices = np.linspace(0, total_frames-1, num_frames, dtype=int)
        
        frames = []
        for idx in frame_indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            ret, frame = cap.read()
            if ret:
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frames.append(Image.fromarray(frame_rgb))
        
        cap.release()
        return frames
    
    def summarize_video(self, video_path):
        """Generate text summary of video"""
        # Extract key frames
        frames = self.extract_key_frames(video_path, num_frames=8)
        
        # Prepare prompt
        prompt = """Analyze these frames from a video and provide a concise summary including:
1. Main activities or events
2. People or objects present
3. Setting/location
4. Notable actions or changes

Summary:"""
        
        # Encode frames + prompt
        inputs = self.tokenizer(
            prompt,
            return_tensors="pt",
            padding=True
        ).to(self.device)
        
        # Generate summary
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=256,
                temperature=0.7,
                do_sample=True
            )
        
        summary = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        
        return summary.split("Summary:")[-1].strip()

# Usage
if __name__ == "__main__":
    summarizer = VideoSummarizer()
    
    video_path = "/mnt/nova_ssd/videos/security_cam_001.mp4"
    summary = summarizer.summarize_video(video_path)
    
    print("Video Summary:")
    print(summary)

Performance

  • Indexing: ~1 FPS per video stream
  • Search Latency: <100ms for 10k+ frames
  • Summarization: ~5-10 seconds per video
  • Concurrent Streams: 4+ using MIG

Conclusion

NVIDIA Jetson Thor represents a quantum leap in edge AI capabilities. With 2,070 FP4 TFLOPS, 128GB of memory, and the Blackwell architecture, Thor enables applications that were previously impossible at the edge:

  1. Humanoid Robotics: Real-time control with Isaac GR00T
  2. Edge LLMs: Running 70B-120B models locally
  3. Multi-Camera Analytics: 20+ simultaneous streams
  4. Autonomous Navigation: GPU-accelerated SLAM
  5. Video Intelligence: Content search and summarization

Thor is not just faster—it unlocks entirely new categories of edge AI applications, from warehouse humanoids to private AI assistants to intelligent surveillance systems.

Getting Started Resources

Have Queries? Join https://launchpass.com/collabnix

Collabnix Team The Collabnix Team is a diverse collective of Docker, Kubernetes, and IoT experts united by a passion for cloud-native technologies. With backgrounds spanning across DevOps, platform engineering, cloud architecture, and container orchestration, our contributors bring together decades of combined experience from various industries and technical domains.
Join our Discord Server
Index